michael@0: /* This Source Code Form is subject to the terms of the Mozilla Public michael@0: * License, v. 2.0. If a copy of the MPL was not distributed with this michael@0: * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ michael@0: michael@0: "use strict"; michael@0: michael@0: #ifndef MERGED_COMPARTMENT michael@0: michael@0: this.EXPORTED_SYMBOLS = [ michael@0: "DailyValues", michael@0: "MetricsStorageBackend", michael@0: "dateToDays", michael@0: "daysToDate", michael@0: ]; michael@0: michael@0: const {utils: Cu} = Components; michael@0: michael@0: const MILLISECONDS_PER_DAY = 24 * 60 * 60 * 1000; michael@0: michael@0: #endif michael@0: michael@0: Cu.import("resource://gre/modules/Promise.jsm"); michael@0: Cu.import("resource://gre/modules/Sqlite.jsm"); michael@0: Cu.import("resource://gre/modules/Task.jsm"); michael@0: Cu.import("resource://gre/modules/Log.jsm"); michael@0: Cu.import("resource://services-common/utils.js"); michael@0: michael@0: michael@0: // These do not account for leap seconds. Meh. michael@0: function dateToDays(date) { michael@0: return Math.floor(date.getTime() / MILLISECONDS_PER_DAY); michael@0: } michael@0: michael@0: function daysToDate(days) { michael@0: return new Date(days * MILLISECONDS_PER_DAY); michael@0: } michael@0: michael@0: /** michael@0: * Represents a collection of per-day values. michael@0: * michael@0: * This is a proxy around a Map which can transparently round Date instances to michael@0: * their appropriate key. michael@0: * michael@0: * This emulates Map by providing .size and iterator support. Note that keys michael@0: * from the iterator are Date instances corresponding to midnight of the start michael@0: * of the day. get(), has(), and set() are modeled as getDay(), hasDay(), and michael@0: * setDay(), respectively. michael@0: * michael@0: * All days are defined in terms of UTC (as opposed to local time). michael@0: */ michael@0: this.DailyValues = function () { michael@0: this._days = new Map(); michael@0: }; michael@0: michael@0: DailyValues.prototype = Object.freeze({ michael@0: __iterator__: function () { michael@0: for (let [k, v] of this._days) { michael@0: yield [daysToDate(k), v]; michael@0: } michael@0: }, michael@0: michael@0: get size() { michael@0: return this._days.size; michael@0: }, michael@0: michael@0: hasDay: function (date) { michael@0: return this._days.has(dateToDays(date)); michael@0: }, michael@0: michael@0: getDay: function (date) { michael@0: return this._days.get(dateToDays(date)); michael@0: }, michael@0: michael@0: setDay: function (date, value) { michael@0: this._days.set(dateToDays(date), value); michael@0: }, michael@0: michael@0: appendValue: function (date, value) { michael@0: let key = dateToDays(date); michael@0: michael@0: if (this._days.has(key)) { michael@0: return this._days.get(key).push(value); michael@0: } michael@0: michael@0: this._days.set(key, [value]); michael@0: }, michael@0: }); michael@0: michael@0: michael@0: /** michael@0: * DATABASE INFO michael@0: * ============= michael@0: * michael@0: * We use a SQLite database as the backend for persistent storage of metrics michael@0: * data. michael@0: * michael@0: * Every piece of recorded data is associated with a measurement. A measurement michael@0: * is an entity with a name and version. Each measurement is associated with a michael@0: * named provider. michael@0: * michael@0: * When the metrics system is initialized, we ask providers (the entities that michael@0: * emit data) to configure the database for storage of their data. They tell michael@0: * storage what their requirements are. For example, they'll register michael@0: * named daily counters associated with specific measurements. michael@0: * michael@0: * Recorded data is stored differently depending on the requirements for michael@0: * storing it. We have facilities for storing the following classes of data: michael@0: * michael@0: * 1) Counts of event/field occurrences aggregated by day. michael@0: * 2) Discrete values of fields aggregated by day. michael@0: * 3) Discrete values of fields aggregated by day max 1 per day (last write michael@0: * wins). michael@0: * 4) Discrete values of fields max 1 (last write wins). michael@0: * michael@0: * Most data is aggregated per day mainly for privacy reasons. This does throw michael@0: * away potentially useful data. But, it's not currently used, so there is no michael@0: * need to keep the granular information. michael@0: * michael@0: * Database Schema michael@0: * --------------- michael@0: * michael@0: * This database contains the following tables: michael@0: * michael@0: * providers -- Maps provider string name to an internal ID. michael@0: * provider_state -- Holds opaque persisted state for providers. michael@0: * measurements -- Holds the set of known measurements (name, version, michael@0: * provider tuples). michael@0: * types -- The data types that can be stored in measurements/fields. michael@0: * fields -- Describes entities that occur within measurements. michael@0: * daily_counters -- Holds daily-aggregated counts of events. Each row is michael@0: * associated with a field and a day. michael@0: * daily_discrete_numeric -- Holds numeric values for fields grouped by day. michael@0: * Each row contains a discrete value associated with a field that occurred michael@0: * on a specific day. There can be multiple rows per field per day. michael@0: * daily_discrete_text -- Holds text values for fields grouped by day. Each michael@0: * row contains a discrete value associated with a field that occurred on a michael@0: * specific day. michael@0: * daily_last_numeric -- Holds numeric values where the last encountered michael@0: * value for a given day is retained. michael@0: * daily_last_text -- Like daily_last_numeric except for text values. michael@0: * last_numeric -- Holds the most recent value for a numeric field. michael@0: * last_text -- Like last_numeric except for text fields. michael@0: * michael@0: * Notes michael@0: * ----- michael@0: * michael@0: * It is tempting to use SQLite's julianday() function to store days that michael@0: * things happened. However, a Julian Day begins at *noon* in 4714 B.C. This michael@0: * results in weird half day offsets from UNIX time. So, we instead store michael@0: * number of days since UNIX epoch, not Julian. michael@0: */ michael@0: michael@0: /** michael@0: * All of our SQL statements are stored in a central mapping so they can easily michael@0: * be audited for security, perf, etc. michael@0: */ michael@0: const SQL = { michael@0: // Create the providers table. michael@0: createProvidersTable: "\ michael@0: CREATE TABLE providers (\ michael@0: id INTEGER PRIMARY KEY AUTOINCREMENT, \ michael@0: name TEXT, \ michael@0: UNIQUE (name) \ michael@0: )", michael@0: michael@0: createProviderStateTable: "\ michael@0: CREATE TABLE provider_state (\ michael@0: id INTEGER PRIMARY KEY AUTOINCREMENT, \ michael@0: provider_id INTEGER, \ michael@0: name TEXT, \ michael@0: VALUE TEXT, \ michael@0: UNIQUE (provider_id, name), \ michael@0: FOREIGN KEY (provider_id) REFERENCES providers(id) ON DELETE CASCADE\ michael@0: )", michael@0: michael@0: createProviderStateProviderIndex: "\ michael@0: CREATE INDEX i_provider_state_provider_id ON provider_state (provider_id)", michael@0: michael@0: createMeasurementsTable: "\ michael@0: CREATE TABLE measurements (\ michael@0: id INTEGER PRIMARY KEY AUTOINCREMENT, \ michael@0: provider_id INTEGER, \ michael@0: name TEXT, \ michael@0: version INTEGER, \ michael@0: UNIQUE (provider_id, name, version), \ michael@0: FOREIGN KEY (provider_id) REFERENCES providers(id) ON DELETE CASCADE\ michael@0: )", michael@0: michael@0: createMeasurementsProviderIndex: "\ michael@0: CREATE INDEX i_measurements_provider_id ON measurements (provider_id)", michael@0: michael@0: createMeasurementsView: "\ michael@0: CREATE VIEW v_measurements AS \ michael@0: SELECT \ michael@0: providers.id AS provider_id, \ michael@0: providers.name AS provider_name, \ michael@0: measurements.id AS measurement_id, \ michael@0: measurements.name AS measurement_name, \ michael@0: measurements.version AS measurement_version \ michael@0: FROM providers, measurements \ michael@0: WHERE \ michael@0: measurements.provider_id = providers.id", michael@0: michael@0: createTypesTable: "\ michael@0: CREATE TABLE types (\ michael@0: id INTEGER PRIMARY KEY AUTOINCREMENT, \ michael@0: name TEXT, \ michael@0: UNIQUE (name)\ michael@0: )", michael@0: michael@0: createFieldsTable: "\ michael@0: CREATE TABLE fields (\ michael@0: id INTEGER PRIMARY KEY AUTOINCREMENT, \ michael@0: measurement_id INTEGER, \ michael@0: name TEXT, \ michael@0: value_type INTEGER , \ michael@0: UNIQUE (measurement_id, name), \ michael@0: FOREIGN KEY (measurement_id) REFERENCES measurements(id) ON DELETE CASCADE \ michael@0: FOREIGN KEY (value_type) REFERENCES types(id) ON DELETE CASCADE \ michael@0: )", michael@0: michael@0: createFieldsMeasurementIndex: "\ michael@0: CREATE INDEX i_fields_measurement_id ON fields (measurement_id)", michael@0: michael@0: createFieldsView: "\ michael@0: CREATE VIEW v_fields AS \ michael@0: SELECT \ michael@0: providers.id AS provider_id, \ michael@0: providers.name AS provider_name, \ michael@0: measurements.id AS measurement_id, \ michael@0: measurements.name AS measurement_name, \ michael@0: measurements.version AS measurement_version, \ michael@0: fields.id AS field_id, \ michael@0: fields.name AS field_name, \ michael@0: types.id AS type_id, \ michael@0: types.name AS type_name \ michael@0: FROM providers, measurements, fields, types \ michael@0: WHERE \ michael@0: fields.measurement_id = measurements.id \ michael@0: AND measurements.provider_id = providers.id \ michael@0: AND fields.value_type = types.id", michael@0: michael@0: createDailyCountersTable: "\ michael@0: CREATE TABLE daily_counters (\ michael@0: field_id INTEGER, \ michael@0: day INTEGER, \ michael@0: value INTEGER, \ michael@0: UNIQUE(field_id, day), \ michael@0: FOREIGN KEY (field_id) REFERENCES fields(id) ON DELETE CASCADE\ michael@0: )", michael@0: michael@0: createDailyCountersFieldIndex: "\ michael@0: CREATE INDEX i_daily_counters_field_id ON daily_counters (field_id)", michael@0: michael@0: createDailyCountersDayIndex: "\ michael@0: CREATE INDEX i_daily_counters_day ON daily_counters (day)", michael@0: michael@0: createDailyCountersView: "\ michael@0: CREATE VIEW v_daily_counters AS SELECT \ michael@0: providers.id AS provider_id, \ michael@0: providers.name AS provider_name, \ michael@0: measurements.id AS measurement_id, \ michael@0: measurements.name AS measurement_name, \ michael@0: measurements.version AS measurement_version, \ michael@0: fields.id AS field_id, \ michael@0: fields.name AS field_name, \ michael@0: daily_counters.day AS day, \ michael@0: daily_counters.value AS value \ michael@0: FROM providers, measurements, fields, daily_counters \ michael@0: WHERE \ michael@0: daily_counters.field_id = fields.id \ michael@0: AND fields.measurement_id = measurements.id \ michael@0: AND measurements.provider_id = providers.id", michael@0: michael@0: createDailyDiscreteNumericsTable: "\ michael@0: CREATE TABLE daily_discrete_numeric (\ michael@0: id INTEGER PRIMARY KEY AUTOINCREMENT, \ michael@0: field_id INTEGER, \ michael@0: day INTEGER, \ michael@0: value INTEGER, \ michael@0: FOREIGN KEY (field_id) REFERENCES fields(id) ON DELETE CASCADE\ michael@0: )", michael@0: michael@0: createDailyDiscreteNumericsFieldIndex: "\ michael@0: CREATE INDEX i_daily_discrete_numeric_field_id \ michael@0: ON daily_discrete_numeric (field_id)", michael@0: michael@0: createDailyDiscreteNumericsDayIndex: "\ michael@0: CREATE INDEX i_daily_discrete_numeric_day \ michael@0: ON daily_discrete_numeric (day)", michael@0: michael@0: createDailyDiscreteTextTable: "\ michael@0: CREATE TABLE daily_discrete_text (\ michael@0: id INTEGER PRIMARY KEY AUTOINCREMENT, \ michael@0: field_id INTEGER, \ michael@0: day INTEGER, \ michael@0: value TEXT, \ michael@0: FOREIGN KEY (field_id) REFERENCES fields(id) ON DELETE CASCADE\ michael@0: )", michael@0: michael@0: createDailyDiscreteTextFieldIndex: "\ michael@0: CREATE INDEX i_daily_discrete_text_field_id \ michael@0: ON daily_discrete_text (field_id)", michael@0: michael@0: createDailyDiscreteTextDayIndex: "\ michael@0: CREATE INDEX i_daily_discrete_text_day \ michael@0: ON daily_discrete_text (day)", michael@0: michael@0: createDailyDiscreteView: "\ michael@0: CREATE VIEW v_daily_discrete AS \ michael@0: SELECT \ michael@0: providers.id AS provider_id, \ michael@0: providers.name AS provider_name, \ michael@0: measurements.id AS measurement_id, \ michael@0: measurements.name AS measurement_name, \ michael@0: measurements.version AS measurement_version, \ michael@0: fields.id AS field_id, \ michael@0: fields.name AS field_name, \ michael@0: daily_discrete_numeric.id AS value_id, \ michael@0: daily_discrete_numeric.day AS day, \ michael@0: daily_discrete_numeric.value AS value, \ michael@0: \"numeric\" AS value_type \ michael@0: FROM providers, measurements, fields, daily_discrete_numeric \ michael@0: WHERE \ michael@0: daily_discrete_numeric.field_id = fields.id \ michael@0: AND fields.measurement_id = measurements.id \ michael@0: AND measurements.provider_id = providers.id \ michael@0: UNION ALL \ michael@0: SELECT \ michael@0: providers.id AS provider_id, \ michael@0: providers.name AS provider_name, \ michael@0: measurements.id AS measurement_id, \ michael@0: measurements.name AS measurement_name, \ michael@0: measurements.version AS measurement_version, \ michael@0: fields.id AS field_id, \ michael@0: fields.name AS field_name, \ michael@0: daily_discrete_text.id AS value_id, \ michael@0: daily_discrete_text.day AS day, \ michael@0: daily_discrete_text.value AS value, \ michael@0: \"text\" AS value_type \ michael@0: FROM providers, measurements, fields, daily_discrete_text \ michael@0: WHERE \ michael@0: daily_discrete_text.field_id = fields.id \ michael@0: AND fields.measurement_id = measurements.id \ michael@0: AND measurements.provider_id = providers.id \ michael@0: ORDER BY day ASC, value_id ASC", michael@0: michael@0: createLastNumericTable: "\ michael@0: CREATE TABLE last_numeric (\ michael@0: field_id INTEGER PRIMARY KEY, \ michael@0: day INTEGER, \ michael@0: value NUMERIC, \ michael@0: FOREIGN KEY (field_id) REFERENCES fields(id) ON DELETE CASCADE\ michael@0: )", michael@0: michael@0: createLastTextTable: "\ michael@0: CREATE TABLE last_text (\ michael@0: field_id INTEGER PRIMARY KEY, \ michael@0: day INTEGER, \ michael@0: value TEXT, \ michael@0: FOREIGN KEY (field_id) REFERENCES fields(id) ON DELETE CASCADE\ michael@0: )", michael@0: michael@0: createLastView: "\ michael@0: CREATE VIEW v_last AS \ michael@0: SELECT \ michael@0: providers.id AS provider_id, \ michael@0: providers.name AS provider_name, \ michael@0: measurements.id AS measurement_id, \ michael@0: measurements.name AS measurement_name, \ michael@0: measurements.version AS measurement_version, \ michael@0: fields.id AS field_id, \ michael@0: fields.name AS field_name, \ michael@0: last_numeric.day AS day, \ michael@0: last_numeric.value AS value, \ michael@0: \"numeric\" AS value_type \ michael@0: FROM providers, measurements, fields, last_numeric \ michael@0: WHERE \ michael@0: last_numeric.field_id = fields.id \ michael@0: AND fields.measurement_id = measurements.id \ michael@0: AND measurements.provider_id = providers.id \ michael@0: UNION ALL \ michael@0: SELECT \ michael@0: providers.id AS provider_id, \ michael@0: providers.name AS provider_name, \ michael@0: measurements.id AS measurement_id, \ michael@0: measurements.name AS measurement_name, \ michael@0: measurements.version AS measurement_version, \ michael@0: fields.id AS field_id, \ michael@0: fields.name AS field_name, \ michael@0: last_text.day AS day, \ michael@0: last_text.value AS value, \ michael@0: \"text\" AS value_type \ michael@0: FROM providers, measurements, fields, last_text \ michael@0: WHERE \ michael@0: last_text.field_id = fields.id \ michael@0: AND fields.measurement_id = measurements.id \ michael@0: AND measurements.provider_id = providers.id", michael@0: michael@0: createDailyLastNumericTable: "\ michael@0: CREATE TABLE daily_last_numeric (\ michael@0: field_id INTEGER, \ michael@0: day INTEGER, \ michael@0: value NUMERIC, \ michael@0: UNIQUE (field_id, day) \ michael@0: FOREIGN KEY (field_id) REFERENCES fields(id) ON DELETE CASCADE\ michael@0: )", michael@0: michael@0: createDailyLastNumericFieldIndex: "\ michael@0: CREATE INDEX i_daily_last_numeric_field_id ON daily_last_numeric (field_id)", michael@0: michael@0: createDailyLastNumericDayIndex: "\ michael@0: CREATE INDEX i_daily_last_numeric_day ON daily_last_numeric (day)", michael@0: michael@0: createDailyLastTextTable: "\ michael@0: CREATE TABLE daily_last_text (\ michael@0: field_id INTEGER, \ michael@0: day INTEGER, \ michael@0: value TEXT, \ michael@0: UNIQUE (field_id, day) \ michael@0: FOREIGN KEY (field_id) REFERENCES fields(id) ON DELETE CASCADE\ michael@0: )", michael@0: michael@0: createDailyLastTextFieldIndex: "\ michael@0: CREATE INDEX i_daily_last_text_field_id ON daily_last_text (field_id)", michael@0: michael@0: createDailyLastTextDayIndex: "\ michael@0: CREATE INDEX i_daily_last_text_day ON daily_last_text (day)", michael@0: michael@0: createDailyLastView: "\ michael@0: CREATE VIEW v_daily_last AS \ michael@0: SELECT \ michael@0: providers.id AS provider_id, \ michael@0: providers.name AS provider_name, \ michael@0: measurements.id AS measurement_id, \ michael@0: measurements.name AS measurement_name, \ michael@0: measurements.version AS measurement_version, \ michael@0: fields.id AS field_id, \ michael@0: fields.name AS field_name, \ michael@0: daily_last_numeric.day AS day, \ michael@0: daily_last_numeric.value AS value, \ michael@0: \"numeric\" as value_type \ michael@0: FROM providers, measurements, fields, daily_last_numeric \ michael@0: WHERE \ michael@0: daily_last_numeric.field_id = fields.id \ michael@0: AND fields.measurement_id = measurements.id \ michael@0: AND measurements.provider_id = providers.id \ michael@0: UNION ALL \ michael@0: SELECT \ michael@0: providers.id AS provider_id, \ michael@0: providers.name AS provider_name, \ michael@0: measurements.id AS measurement_id, \ michael@0: measurements.name AS measurement_name, \ michael@0: measurements.version AS measurement_version, \ michael@0: fields.id AS field_id, \ michael@0: fields.name AS field_name, \ michael@0: daily_last_text.day AS day, \ michael@0: daily_last_text.value AS value, \ michael@0: \"text\" as value_type \ michael@0: FROM providers, measurements, fields, daily_last_text \ michael@0: WHERE \ michael@0: daily_last_text.field_id = fields.id \ michael@0: AND fields.measurement_id = measurements.id \ michael@0: AND measurements.provider_id = providers.id", michael@0: michael@0: // Mutation. michael@0: michael@0: addProvider: "INSERT INTO providers (name) VALUES (:provider)", michael@0: michael@0: setProviderState: "\ michael@0: INSERT OR REPLACE INTO provider_state \ michael@0: (provider_id, name, value) \ michael@0: VALUES (:provider_id, :name, :value)", michael@0: michael@0: addMeasurement: "\ michael@0: INSERT INTO measurements (provider_id, name, version) \ michael@0: VALUES (:provider_id, :measurement, :version)", michael@0: michael@0: addType: "INSERT INTO types (name) VALUES (:name)", michael@0: michael@0: addField: "\ michael@0: INSERT INTO fields (measurement_id, name, value_type) \ michael@0: VALUES (:measurement_id, :field, :value_type)", michael@0: michael@0: incrementDailyCounterFromFieldID: "\ michael@0: INSERT OR REPLACE INTO daily_counters VALUES (\ michael@0: :field_id, \ michael@0: :days, \ michael@0: COALESCE(\ michael@0: (SELECT value FROM daily_counters WHERE \ michael@0: field_id = :field_id AND day = :days \ michael@0: ), \ michael@0: 0\ michael@0: ) + :by)", michael@0: michael@0: deleteLastNumericFromFieldID: "\ michael@0: DELETE FROM last_numeric WHERE field_id = :field_id", michael@0: michael@0: deleteLastTextFromFieldID: "\ michael@0: DELETE FROM last_text WHERE field_id = :field_id", michael@0: michael@0: setLastNumeric: "\ michael@0: INSERT OR REPLACE INTO last_numeric VALUES (:field_id, :days, :value)", michael@0: michael@0: setLastText: "\ michael@0: INSERT OR REPLACE INTO last_text VALUES (:field_id, :days, :value)", michael@0: michael@0: setDailyLastNumeric: "\ michael@0: INSERT OR REPLACE INTO daily_last_numeric VALUES (:field_id, :days, :value)", michael@0: michael@0: setDailyLastText: "\ michael@0: INSERT OR REPLACE INTO daily_last_text VALUES (:field_id, :days, :value)", michael@0: michael@0: addDailyDiscreteNumeric: "\ michael@0: INSERT INTO daily_discrete_numeric \ michael@0: (field_id, day, value) VALUES (:field_id, :days, :value)", michael@0: michael@0: addDailyDiscreteText: "\ michael@0: INSERT INTO daily_discrete_text \ michael@0: (field_id, day, value) VALUES (:field_id, :days, :value)", michael@0: michael@0: pruneOldDailyCounters: "DELETE FROM daily_counters WHERE day < :days", michael@0: pruneOldDailyDiscreteNumeric: "DELETE FROM daily_discrete_numeric WHERE day < :days", michael@0: pruneOldDailyDiscreteText: "DELETE FROM daily_discrete_text WHERE day < :days", michael@0: pruneOldDailyLastNumeric: "DELETE FROM daily_last_numeric WHERE day < :days", michael@0: pruneOldDailyLastText: "DELETE FROM daily_last_text WHERE day < :days", michael@0: pruneOldLastNumeric: "DELETE FROM last_numeric WHERE day < :days", michael@0: pruneOldLastText: "DELETE FROM last_text WHERE day < :days", michael@0: michael@0: // Retrieval. michael@0: michael@0: getProviderID: "SELECT id FROM providers WHERE name = :provider", michael@0: michael@0: getProviders: "SELECT id, name FROM providers", michael@0: michael@0: getProviderStateWithName: "\ michael@0: SELECT value FROM provider_state \ michael@0: WHERE provider_id = :provider_id \ michael@0: AND name = :name", michael@0: michael@0: getMeasurements: "SELECT * FROM v_measurements", michael@0: michael@0: getMeasurementID: "\ michael@0: SELECT id FROM measurements \ michael@0: WHERE provider_id = :provider_id \ michael@0: AND name = :measurement \ michael@0: AND version = :version", michael@0: michael@0: getFieldID: "\ michael@0: SELECT id FROM fields \ michael@0: WHERE measurement_id = :measurement_id \ michael@0: AND name = :field \ michael@0: AND value_type = :value_type \ michael@0: ", michael@0: michael@0: getTypes: "SELECT * FROM types", michael@0: michael@0: getTypeID: "SELECT id FROM types WHERE name = :name", michael@0: michael@0: getDailyCounterCountsFromFieldID: "\ michael@0: SELECT day, value FROM daily_counters \ michael@0: WHERE field_id = :field_id \ michael@0: ORDER BY day ASC", michael@0: michael@0: getDailyCounterCountFromFieldID: "\ michael@0: SELECT value FROM daily_counters \ michael@0: WHERE field_id = :field_id \ michael@0: AND day = :days", michael@0: michael@0: getMeasurementDailyCounters: "\ michael@0: SELECT field_name, day, value FROM v_daily_counters \ michael@0: WHERE measurement_id = :measurement_id", michael@0: michael@0: getFieldInfo: "SELECT * FROM v_fields", michael@0: michael@0: getLastNumericFromFieldID: "\ michael@0: SELECT day, value FROM last_numeric WHERE field_id = :field_id", michael@0: michael@0: getLastTextFromFieldID: "\ michael@0: SELECT day, value FROM last_text WHERE field_id = :field_id", michael@0: michael@0: getMeasurementLastValues: "\ michael@0: SELECT field_name, day, value FROM v_last \ michael@0: WHERE measurement_id = :measurement_id", michael@0: michael@0: getDailyDiscreteNumericFromFieldID: "\ michael@0: SELECT day, value FROM daily_discrete_numeric \ michael@0: WHERE field_id = :field_id \ michael@0: ORDER BY day ASC, id ASC", michael@0: michael@0: getDailyDiscreteNumericFromFieldIDAndDay: "\ michael@0: SELECT day, value FROM daily_discrete_numeric \ michael@0: WHERE field_id = :field_id AND day = :days \ michael@0: ORDER BY id ASC", michael@0: michael@0: getDailyDiscreteTextFromFieldID: "\ michael@0: SELECT day, value FROM daily_discrete_text \ michael@0: WHERE field_id = :field_id \ michael@0: ORDER BY day ASC, id ASC", michael@0: michael@0: getDailyDiscreteTextFromFieldIDAndDay: "\ michael@0: SELECT day, value FROM daily_discrete_text \ michael@0: WHERE field_id = :field_id AND day = :days \ michael@0: ORDER BY id ASC", michael@0: michael@0: getMeasurementDailyDiscreteValues: "\ michael@0: SELECT field_name, day, value_id, value FROM v_daily_discrete \ michael@0: WHERE measurement_id = :measurement_id \ michael@0: ORDER BY day ASC, value_id ASC", michael@0: michael@0: getDailyLastNumericFromFieldID: "\ michael@0: SELECT day, value FROM daily_last_numeric \ michael@0: WHERE field_id = :field_id \ michael@0: ORDER BY day ASC", michael@0: michael@0: getDailyLastNumericFromFieldIDAndDay: "\ michael@0: SELECT day, value FROM daily_last_numeric \ michael@0: WHERE field_id = :field_id AND day = :days", michael@0: michael@0: getDailyLastTextFromFieldID: "\ michael@0: SELECT day, value FROM daily_last_text \ michael@0: WHERE field_id = :field_id \ michael@0: ORDER BY day ASC", michael@0: michael@0: getDailyLastTextFromFieldIDAndDay: "\ michael@0: SELECT day, value FROM daily_last_text \ michael@0: WHERE field_id = :field_id AND day = :days", michael@0: michael@0: getMeasurementDailyLastValues: "\ michael@0: SELECT field_name, day, value FROM v_daily_last \ michael@0: WHERE measurement_id = :measurement_id", michael@0: }; michael@0: michael@0: michael@0: function dailyKeyFromDate(date) { michael@0: let year = String(date.getUTCFullYear()); michael@0: let month = String(date.getUTCMonth() + 1); michael@0: let day = String(date.getUTCDate()); michael@0: michael@0: if (month.length < 2) { michael@0: month = "0" + month; michael@0: } michael@0: michael@0: if (day.length < 2) { michael@0: day = "0" + day; michael@0: } michael@0: michael@0: return year + "-" + month + "-" + day; michael@0: } michael@0: michael@0: michael@0: /** michael@0: * Create a new backend instance bound to a SQLite database at the given path. michael@0: * michael@0: * This returns a promise that will resolve to a `MetricsStorageSqliteBackend` michael@0: * instance. The resolved instance will be initialized and ready for use. michael@0: * michael@0: * Very few consumers have a need to call this. Instead, a higher-level entity michael@0: * likely calls this and sets up the database connection for a service or michael@0: * singleton. michael@0: */ michael@0: this.MetricsStorageBackend = function (path) { michael@0: return Task.spawn(function initTask() { michael@0: let connection = yield Sqlite.openConnection({ michael@0: path: path, michael@0: michael@0: // There should only be one connection per database, so we disable this michael@0: // for perf reasons. michael@0: sharedMemoryCache: false, michael@0: }); michael@0: michael@0: // If we fail initializing the storage object, we need to close the michael@0: // database connection or else Storage will assert on shutdown. michael@0: let storage; michael@0: try { michael@0: storage = new MetricsStorageSqliteBackend(connection); michael@0: yield storage._init(); michael@0: } catch (ex) { michael@0: yield connection.close(); michael@0: throw ex; michael@0: } michael@0: michael@0: throw new Task.Result(storage); michael@0: }); michael@0: }; michael@0: michael@0: michael@0: /** michael@0: * Manages storage of metrics data in a SQLite database. michael@0: * michael@0: * This is the main type used for interfacing with the database. michael@0: * michael@0: * Instances of this should be obtained by calling MetricsStorageConnection(). michael@0: * michael@0: * The current implementation will not work if the database is mutated by michael@0: * multiple connections because of the way we cache primary keys. michael@0: * michael@0: * FUTURE enforce 1 read/write connection per database limit. michael@0: */ michael@0: function MetricsStorageSqliteBackend(connection) { michael@0: this._log = Log.repository.getLogger("Services.Metrics.MetricsStorage"); michael@0: michael@0: this._connection = connection; michael@0: this._enabledWALCheckpointPages = null; michael@0: michael@0: // Integer IDs to string name. michael@0: this._typesByID = new Map(); michael@0: michael@0: // String name to integer IDs. michael@0: this._typesByName = new Map(); michael@0: michael@0: // Maps provider names to integer IDs. michael@0: this._providerIDs = new Map(); michael@0: michael@0: // Maps :-delimited strings of [provider name, name, version] to integer IDs. michael@0: this._measurementsByInfo = new Map(); michael@0: michael@0: // Integer IDs to Arrays of [provider name, name, version]. michael@0: this._measurementsByID = new Map(); michael@0: michael@0: // Integer IDs to Arrays of [measurement id, field name, value name] michael@0: this._fieldsByID = new Map(); michael@0: michael@0: // Maps :-delimited strings of [measurement id, field name] to integer ID. michael@0: this._fieldsByInfo = new Map(); michael@0: michael@0: // Maps measurement ID to sets of field IDs. michael@0: this._fieldsByMeasurement = new Map(); michael@0: michael@0: this._queuedOperations = []; michael@0: this._queuedInProgress = false; michael@0: } michael@0: michael@0: MetricsStorageSqliteBackend.prototype = Object.freeze({ michael@0: // Max size (in kibibytes) the WAL log is allowed to grow to before it is michael@0: // checkpointed. michael@0: // michael@0: // This was first deployed in bug 848136. We want a value large enough michael@0: // that we aren't checkpointing all the time. However, we want it michael@0: // small enough so we don't have to read so much when we open the michael@0: // database. michael@0: MAX_WAL_SIZE_KB: 512, michael@0: michael@0: FIELD_DAILY_COUNTER: "daily-counter", michael@0: FIELD_DAILY_DISCRETE_NUMERIC: "daily-discrete-numeric", michael@0: FIELD_DAILY_DISCRETE_TEXT: "daily-discrete-text", michael@0: FIELD_DAILY_LAST_NUMERIC: "daily-last-numeric", michael@0: FIELD_DAILY_LAST_TEXT: "daily-last-text", michael@0: FIELD_LAST_NUMERIC: "last-numeric", michael@0: FIELD_LAST_TEXT: "last-text", michael@0: michael@0: _BUILTIN_TYPES: [ michael@0: "FIELD_DAILY_COUNTER", michael@0: "FIELD_DAILY_DISCRETE_NUMERIC", michael@0: "FIELD_DAILY_DISCRETE_TEXT", michael@0: "FIELD_DAILY_LAST_NUMERIC", michael@0: "FIELD_DAILY_LAST_TEXT", michael@0: "FIELD_LAST_NUMERIC", michael@0: "FIELD_LAST_TEXT", michael@0: ], michael@0: michael@0: // Statements that are used to create the initial DB schema. michael@0: _SCHEMA_STATEMENTS: [ michael@0: "createProvidersTable", michael@0: "createProviderStateTable", michael@0: "createProviderStateProviderIndex", michael@0: "createMeasurementsTable", michael@0: "createMeasurementsProviderIndex", michael@0: "createMeasurementsView", michael@0: "createTypesTable", michael@0: "createFieldsTable", michael@0: "createFieldsMeasurementIndex", michael@0: "createFieldsView", michael@0: "createDailyCountersTable", michael@0: "createDailyCountersFieldIndex", michael@0: "createDailyCountersDayIndex", michael@0: "createDailyCountersView", michael@0: "createDailyDiscreteNumericsTable", michael@0: "createDailyDiscreteNumericsFieldIndex", michael@0: "createDailyDiscreteNumericsDayIndex", michael@0: "createDailyDiscreteTextTable", michael@0: "createDailyDiscreteTextFieldIndex", michael@0: "createDailyDiscreteTextDayIndex", michael@0: "createDailyDiscreteView", michael@0: "createDailyLastNumericTable", michael@0: "createDailyLastNumericFieldIndex", michael@0: "createDailyLastNumericDayIndex", michael@0: "createDailyLastTextTable", michael@0: "createDailyLastTextFieldIndex", michael@0: "createDailyLastTextDayIndex", michael@0: "createDailyLastView", michael@0: "createLastNumericTable", michael@0: "createLastTextTable", michael@0: "createLastView", michael@0: ], michael@0: michael@0: // Statements that are used to prune old data. michael@0: _PRUNE_STATEMENTS: [ michael@0: "pruneOldDailyCounters", michael@0: "pruneOldDailyDiscreteNumeric", michael@0: "pruneOldDailyDiscreteText", michael@0: "pruneOldDailyLastNumeric", michael@0: "pruneOldDailyLastText", michael@0: "pruneOldLastNumeric", michael@0: "pruneOldLastText", michael@0: ], michael@0: michael@0: /** michael@0: * Close the database connection. michael@0: * michael@0: * This should be called on all instances or the SQLite layer may complain michael@0: * loudly. After this has been called, the connection cannot be used. michael@0: * michael@0: * @return Promise<> michael@0: */ michael@0: close: function () { michael@0: return Task.spawn(function doClose() { michael@0: // There is some light magic involved here. First, we enqueue an michael@0: // operation to ensure that all pending operations have the opportunity michael@0: // to execute. We additionally execute a SQL operation. Due to the FIFO michael@0: // execution order of issued statements, this will cause us to wait on michael@0: // any outstanding statements before closing. michael@0: try { michael@0: yield this.enqueueOperation(function dummyOperation() { michael@0: return this._connection.execute("SELECT 1"); michael@0: }.bind(this)); michael@0: } catch (ex) {} michael@0: michael@0: try { michael@0: yield this._connection.close(); michael@0: } finally { michael@0: this._connection = null; michael@0: } michael@0: }.bind(this)); michael@0: }, michael@0: michael@0: /** michael@0: * Whether a provider is known to exist. michael@0: * michael@0: * @param provider michael@0: * (string) Name of the provider. michael@0: */ michael@0: hasProvider: function (provider) { michael@0: return this._providerIDs.has(provider); michael@0: }, michael@0: michael@0: /** michael@0: * Whether a measurement is known to exist. michael@0: * michael@0: * @param provider michael@0: * (string) Name of the provider. michael@0: * @param name michael@0: * (string) Name of the measurement. michael@0: * @param version michael@0: * (Number) Integer measurement version. michael@0: */ michael@0: hasMeasurement: function (provider, name, version) { michael@0: return this._measurementsByInfo.has([provider, name, version].join(":")); michael@0: }, michael@0: michael@0: /** michael@0: * Whether a named field exists in a measurement. michael@0: * michael@0: * @param measurementID michael@0: * (Number) The integer primary key of the measurement. michael@0: * @param field michael@0: * (string) The name of the field to look for. michael@0: */ michael@0: hasFieldFromMeasurement: function (measurementID, field) { michael@0: return this._fieldsByInfo.has([measurementID, field].join(":")); michael@0: }, michael@0: michael@0: /** michael@0: * Whether a field is known. michael@0: * michael@0: * @param provider michael@0: * (string) Name of the provider having the field. michael@0: * @param measurement michael@0: * (string) Name of the measurement in the provider having the field. michael@0: * @param field michael@0: * (string) Name of the field in the measurement. michael@0: */ michael@0: hasField: function (provider, measurement, version, field) { michael@0: let key = [provider, measurement, version].join(":"); michael@0: let measurementID = this._measurementsByInfo.get(key); michael@0: if (!measurementID) { michael@0: return false; michael@0: } michael@0: michael@0: return this.hasFieldFromMeasurement(measurementID, field); michael@0: }, michael@0: michael@0: /** michael@0: * Look up the integer primary key of a provider. michael@0: * michael@0: * @param provider michael@0: * (string) Name of the provider. michael@0: */ michael@0: providerID: function (provider) { michael@0: return this._providerIDs.get(provider); michael@0: }, michael@0: michael@0: /** michael@0: * Look up the integer primary key of a measurement. michael@0: * michael@0: * @param provider michael@0: * (string) Name of the provider. michael@0: * @param measurement michael@0: * (string) Name of the measurement. michael@0: * @param version michael@0: * (Number) Integer version of the measurement. michael@0: */ michael@0: measurementID: function (provider, measurement, version) { michael@0: return this._measurementsByInfo.get([provider, measurement, version].join(":")); michael@0: }, michael@0: michael@0: fieldIDFromMeasurement: function (measurementID, field) { michael@0: return this._fieldsByInfo.get([measurementID, field].join(":")); michael@0: }, michael@0: michael@0: fieldID: function (provider, measurement, version, field) { michael@0: let measurementID = this.measurementID(provider, measurement, version); michael@0: if (!measurementID) { michael@0: return null; michael@0: } michael@0: michael@0: return this.fieldIDFromMeasurement(measurementID, field); michael@0: }, michael@0: michael@0: measurementHasAnyDailyCounterFields: function (measurementID) { michael@0: return this.measurementHasAnyFieldsOfTypes(measurementID, michael@0: [this.FIELD_DAILY_COUNTER]); michael@0: }, michael@0: michael@0: measurementHasAnyLastFields: function (measurementID) { michael@0: return this.measurementHasAnyFieldsOfTypes(measurementID, michael@0: [this.FIELD_LAST_NUMERIC, michael@0: this.FIELD_LAST_TEXT]); michael@0: }, michael@0: michael@0: measurementHasAnyDailyLastFields: function (measurementID) { michael@0: return this.measurementHasAnyFieldsOfTypes(measurementID, michael@0: [this.FIELD_DAILY_LAST_NUMERIC, michael@0: this.FIELD_DAILY_LAST_TEXT]); michael@0: }, michael@0: michael@0: measurementHasAnyDailyDiscreteFields: function (measurementID) { michael@0: return this.measurementHasAnyFieldsOfTypes(measurementID, michael@0: [this.FIELD_DAILY_DISCRETE_NUMERIC, michael@0: this.FIELD_DAILY_DISCRETE_TEXT]); michael@0: }, michael@0: michael@0: measurementHasAnyFieldsOfTypes: function (measurementID, types) { michael@0: if (!this._fieldsByMeasurement.has(measurementID)) { michael@0: return false; michael@0: } michael@0: michael@0: let fieldIDs = this._fieldsByMeasurement.get(measurementID); michael@0: for (let fieldID of fieldIDs) { michael@0: let fieldType = this._fieldsByID.get(fieldID)[2]; michael@0: if (types.indexOf(fieldType) != -1) { michael@0: return true; michael@0: } michael@0: } michael@0: michael@0: return false; michael@0: }, michael@0: michael@0: /** michael@0: * Register a measurement with the backend. michael@0: * michael@0: * Measurements must be registered before storage can be allocated to them. michael@0: * michael@0: * A measurement consists of a string name and integer version attached michael@0: * to a named provider. michael@0: * michael@0: * This returns a promise that resolves to the storage ID for this michael@0: * measurement. michael@0: * michael@0: * If the measurement is not known to exist, it is registered with storage. michael@0: * If the measurement has already been registered, this is effectively a michael@0: * no-op (that still returns a promise resolving to the storage ID). michael@0: * michael@0: * @param provider michael@0: * (string) Name of the provider this measurement belongs to. michael@0: * @param name michael@0: * (string) Name of this measurement. michael@0: * @param version michael@0: * (Number) Integer version of this measurement. michael@0: */ michael@0: registerMeasurement: function (provider, name, version) { michael@0: if (this.hasMeasurement(provider, name, version)) { michael@0: return CommonUtils.laterTickResolvingPromise( michael@0: this.measurementID(provider, name, version)); michael@0: } michael@0: michael@0: // Registrations might not be safe to perform in parallel with provider michael@0: // operations. So, we queue them. michael@0: let self = this; michael@0: return this.enqueueOperation(function createMeasurementOperation() { michael@0: return Task.spawn(function createMeasurement() { michael@0: let providerID = self._providerIDs.get(provider); michael@0: michael@0: if (!providerID) { michael@0: yield self._connection.executeCached(SQL.addProvider, {provider: provider}); michael@0: let rows = yield self._connection.executeCached(SQL.getProviderID, michael@0: {provider: provider}); michael@0: michael@0: providerID = rows[0].getResultByIndex(0); michael@0: michael@0: self._providerIDs.set(provider, providerID); michael@0: } michael@0: michael@0: let params = { michael@0: provider_id: providerID, michael@0: measurement: name, michael@0: version: version, michael@0: }; michael@0: michael@0: yield self._connection.executeCached(SQL.addMeasurement, params); michael@0: let rows = yield self._connection.executeCached(SQL.getMeasurementID, params); michael@0: michael@0: let measurementID = rows[0].getResultByIndex(0); michael@0: michael@0: self._measurementsByInfo.set([provider, name, version].join(":"), measurementID); michael@0: self._measurementsByID.set(measurementID, [provider, name, version]); michael@0: self._fieldsByMeasurement.set(measurementID, new Set()); michael@0: michael@0: throw new Task.Result(measurementID); michael@0: }); michael@0: }); michael@0: }, michael@0: michael@0: /** michael@0: * Register a field with the backend. michael@0: * michael@0: * Fields are what recorded pieces of data are primarily associated with. michael@0: * michael@0: * Fields are associated with measurements. Measurements must be registered michael@0: * via `registerMeasurement` before fields can be registered. This is michael@0: * enforced by this function requiring the database primary key of the michael@0: * measurement as an argument. michael@0: * michael@0: * @param measurementID michael@0: * (Number) Integer primary key of measurement this field belongs to. michael@0: * @param field michael@0: * (string) Name of this field. michael@0: * @param valueType michael@0: * (string) Type name of this field. Must be a registered type. Is michael@0: * likely one of the FIELD_ constants on this type. michael@0: * michael@0: * @return Promise michael@0: */ michael@0: registerField: function (measurementID, field, valueType) { michael@0: if (!valueType) { michael@0: throw new Error("Value type must be defined."); michael@0: } michael@0: michael@0: if (!this._measurementsByID.has(measurementID)) { michael@0: throw new Error("Measurement not known: " + measurementID); michael@0: } michael@0: michael@0: if (!this._typesByName.has(valueType)) { michael@0: throw new Error("Unknown value type: " + valueType); michael@0: } michael@0: michael@0: let typeID = this._typesByName.get(valueType); michael@0: michael@0: if (!typeID) { michael@0: throw new Error("Undefined type: " + valueType); michael@0: } michael@0: michael@0: if (this.hasFieldFromMeasurement(measurementID, field)) { michael@0: let id = this.fieldIDFromMeasurement(measurementID, field); michael@0: let existingType = this._fieldsByID.get(id)[2]; michael@0: michael@0: if (valueType != existingType) { michael@0: throw new Error("Field already defined with different type: " + existingType); michael@0: } michael@0: michael@0: return CommonUtils.laterTickResolvingPromise( michael@0: this.fieldIDFromMeasurement(measurementID, field)); michael@0: } michael@0: michael@0: let self = this; michael@0: return Task.spawn(function createField() { michael@0: let params = { michael@0: measurement_id: measurementID, michael@0: field: field, michael@0: value_type: typeID, michael@0: }; michael@0: michael@0: yield self._connection.executeCached(SQL.addField, params); michael@0: michael@0: let rows = yield self._connection.executeCached(SQL.getFieldID, params); michael@0: michael@0: let fieldID = rows[0].getResultByIndex(0); michael@0: michael@0: self._fieldsByID.set(fieldID, [measurementID, field, valueType]); michael@0: self._fieldsByInfo.set([measurementID, field].join(":"), fieldID); michael@0: self._fieldsByMeasurement.get(measurementID).add(fieldID); michael@0: michael@0: throw new Task.Result(fieldID); michael@0: }); michael@0: }, michael@0: michael@0: /** michael@0: * Initializes this instance with the database. michael@0: * michael@0: * This performs 2 major roles: michael@0: * michael@0: * 1) Set up database schema (creates tables). michael@0: * 2) Synchronize database with local instance. michael@0: */ michael@0: _init: function() { michael@0: let self = this; michael@0: return Task.spawn(function initTask() { michael@0: // 0. Database file and connection configuration. michael@0: michael@0: // This should never fail. But, we assume the default of 1024 in case it michael@0: // does. michael@0: let rows = yield self._connection.execute("PRAGMA page_size"); michael@0: let pageSize = 1024; michael@0: if (rows.length) { michael@0: pageSize = rows[0].getResultByIndex(0); michael@0: } michael@0: michael@0: self._log.debug("Page size is " + pageSize); michael@0: michael@0: // Ensure temp tables are stored in memory, not on disk. michael@0: yield self._connection.execute("PRAGMA temp_store=MEMORY"); michael@0: michael@0: let journalMode; michael@0: rows = yield self._connection.execute("PRAGMA journal_mode=WAL"); michael@0: if (rows.length) { michael@0: journalMode = rows[0].getResultByIndex(0); michael@0: } michael@0: michael@0: self._log.info("Journal mode is " + journalMode); michael@0: michael@0: if (journalMode == "wal") { michael@0: self._enabledWALCheckpointPages = michael@0: Math.ceil(self.MAX_WAL_SIZE_KB * 1024 / pageSize); michael@0: michael@0: self._log.info("WAL auto checkpoint pages: " + michael@0: self._enabledWALCheckpointPages); michael@0: michael@0: // We disable auto checkpoint during initialization to make it michael@0: // quicker. michael@0: yield self.setAutoCheckpoint(0); michael@0: } else { michael@0: if (journalMode != "truncate") { michael@0: // Fall back to truncate (which is faster than delete). michael@0: yield self._connection.execute("PRAGMA journal_mode=TRUNCATE"); michael@0: } michael@0: michael@0: // And always use full synchronous mode to reduce possibility for data michael@0: // loss. michael@0: yield self._connection.execute("PRAGMA synchronous=FULL"); michael@0: } michael@0: michael@0: let doCheckpoint = false; michael@0: michael@0: // 1. Create the schema. michael@0: yield self._connection.executeTransaction(function ensureSchema(conn) { michael@0: let schema = yield conn.getSchemaVersion(); michael@0: michael@0: if (schema == 0) { michael@0: self._log.info("Creating database schema."); michael@0: michael@0: for (let k of self._SCHEMA_STATEMENTS) { michael@0: yield self._connection.execute(SQL[k]); michael@0: } michael@0: michael@0: yield self._connection.setSchemaVersion(1); michael@0: doCheckpoint = true; michael@0: } else if (schema != 1) { michael@0: throw new Error("Unknown database schema: " + schema); michael@0: } else { michael@0: self._log.debug("Database schema up to date."); michael@0: } michael@0: }); michael@0: michael@0: // 2. Retrieve existing types. michael@0: yield self._connection.execute(SQL.getTypes, null, function onRow(row) { michael@0: let id = row.getResultByName("id"); michael@0: let name = row.getResultByName("name"); michael@0: michael@0: self._typesByID.set(id, name); michael@0: self._typesByName.set(name, id); michael@0: }); michael@0: michael@0: // 3. Populate built-in types with database. michael@0: let missingTypes = []; michael@0: for (let type of self._BUILTIN_TYPES) { michael@0: type = self[type]; michael@0: if (self._typesByName.has(type)) { michael@0: continue; michael@0: } michael@0: michael@0: missingTypes.push(type); michael@0: } michael@0: michael@0: // Don't perform DB transaction unless there is work to do. michael@0: if (missingTypes.length) { michael@0: yield self._connection.executeTransaction(function populateBuiltinTypes() { michael@0: for (let type of missingTypes) { michael@0: let params = {name: type}; michael@0: yield self._connection.executeCached(SQL.addType, params); michael@0: let rows = yield self._connection.executeCached(SQL.getTypeID, params); michael@0: let id = rows[0].getResultByIndex(0); michael@0: michael@0: self._typesByID.set(id, type); michael@0: self._typesByName.set(type, id); michael@0: } michael@0: }); michael@0: michael@0: doCheckpoint = true; michael@0: } michael@0: michael@0: // 4. Obtain measurement info. michael@0: yield self._connection.execute(SQL.getMeasurements, null, function onRow(row) { michael@0: let providerID = row.getResultByName("provider_id"); michael@0: let providerName = row.getResultByName("provider_name"); michael@0: let measurementID = row.getResultByName("measurement_id"); michael@0: let measurementName = row.getResultByName("measurement_name"); michael@0: let measurementVersion = row.getResultByName("measurement_version"); michael@0: michael@0: self._providerIDs.set(providerName, providerID); michael@0: michael@0: let info = [providerName, measurementName, measurementVersion].join(":"); michael@0: michael@0: self._measurementsByInfo.set(info, measurementID); michael@0: self._measurementsByID.set(measurementID, info); michael@0: self._fieldsByMeasurement.set(measurementID, new Set()); michael@0: }); michael@0: michael@0: // 5. Obtain field info. michael@0: yield self._connection.execute(SQL.getFieldInfo, null, function onRow(row) { michael@0: let measurementID = row.getResultByName("measurement_id"); michael@0: let fieldID = row.getResultByName("field_id"); michael@0: let fieldName = row.getResultByName("field_name"); michael@0: let typeName = row.getResultByName("type_name"); michael@0: michael@0: self._fieldsByID.set(fieldID, [measurementID, fieldName, typeName]); michael@0: self._fieldsByInfo.set([measurementID, fieldName].join(":"), fieldID); michael@0: self._fieldsByMeasurement.get(measurementID).add(fieldID); michael@0: }); michael@0: michael@0: // Perform a checkpoint after initialization (if needed) and michael@0: // enable auto checkpoint during regular operation. michael@0: if (doCheckpoint) { michael@0: yield self.checkpoint(); michael@0: } michael@0: michael@0: yield self.setAutoCheckpoint(1); michael@0: }); michael@0: }, michael@0: michael@0: /** michael@0: * Prune all data from earlier than the specified date. michael@0: * michael@0: * Data stored on days before the specified Date will be permanently michael@0: * deleted. michael@0: * michael@0: * This returns a promise that will be resolved when data has been deleted. michael@0: * michael@0: * @param date michael@0: * (Date) Old data threshold. michael@0: * @return Promise<> michael@0: */ michael@0: pruneDataBefore: function (date) { michael@0: let statements = this._PRUNE_STATEMENTS; michael@0: michael@0: let self = this; michael@0: return this.enqueueOperation(function doPrune() { michael@0: return self._connection.executeTransaction(function prune(conn) { michael@0: let days = dateToDays(date); michael@0: michael@0: let params = {days: days}; michael@0: for (let name of statements) { michael@0: yield conn.execute(SQL[name], params); michael@0: } michael@0: }); michael@0: }); michael@0: }, michael@0: michael@0: /** michael@0: * Reduce memory usage as much as possible. michael@0: * michael@0: * This returns a promise that will be resolved on completion. michael@0: * michael@0: * @return Promise<> michael@0: */ michael@0: compact: function () { michael@0: let self = this; michael@0: return this.enqueueOperation(function doCompact() { michael@0: self._connection.discardCachedStatements(); michael@0: return self._connection.shrinkMemory(); michael@0: }); michael@0: }, michael@0: michael@0: /** michael@0: * Checkpoint writes requiring flush to disk. michael@0: * michael@0: * This is called to persist queued and non-flushed writes to disk. michael@0: * It will force an fsync, so it is expensive and should be used michael@0: * sparingly. michael@0: */ michael@0: checkpoint: function () { michael@0: if (!this._enabledWALCheckpointPages) { michael@0: return CommonUtils.laterTickResolvingPromise(); michael@0: } michael@0: michael@0: return this.enqueueOperation(function checkpoint() { michael@0: this._log.info("Performing manual WAL checkpoint."); michael@0: return this._connection.execute("PRAGMA wal_checkpoint"); michael@0: }.bind(this)); michael@0: }, michael@0: michael@0: setAutoCheckpoint: function (on) { michael@0: // If we aren't in WAL mode, wal_autocheckpoint won't do anything so michael@0: // we no-op. michael@0: if (!this._enabledWALCheckpointPages) { michael@0: return CommonUtils.laterTickResolvingPromise(); michael@0: } michael@0: michael@0: let val = on ? this._enabledWALCheckpointPages : 0; michael@0: michael@0: return this.enqueueOperation(function setWALCheckpoint() { michael@0: this._log.info("Setting WAL auto checkpoint to " + val); michael@0: return this._connection.execute("PRAGMA wal_autocheckpoint=" + val); michael@0: }.bind(this)); michael@0: }, michael@0: michael@0: /** michael@0: * Ensure a field ID matches a specified type. michael@0: * michael@0: * This is called internally as part of adding values to ensure that michael@0: * the type of a field matches the operation being performed. michael@0: */ michael@0: _ensureFieldType: function (id, type) { michael@0: let info = this._fieldsByID.get(id); michael@0: michael@0: if (!info || !Array.isArray(info)) { michael@0: throw new Error("Unknown field ID: " + id); michael@0: } michael@0: michael@0: if (type != info[2]) { michael@0: throw new Error("Field type does not match the expected for this " + michael@0: "operation. Actual: " + info[2] + "; Expected: " + michael@0: type); michael@0: } michael@0: }, michael@0: michael@0: /** michael@0: * Enqueue a storage operation to be performed when the database is ready. michael@0: * michael@0: * The primary use case of this function is to prevent potentially michael@0: * conflicting storage operations from being performed in parallel. By michael@0: * calling this function, passed storage operations will be serially michael@0: * executed, avoiding potential order of operation issues. michael@0: * michael@0: * The passed argument is a function that will perform storage operations. michael@0: * The function should return a promise that will be resolved when all michael@0: * storage operations have been completed. michael@0: * michael@0: * The passed function may be executed immediately. If there are already michael@0: * queued operations, it will be appended to the queue and executed after all michael@0: * before it have finished. michael@0: * michael@0: * This function returns a promise that will be resolved or rejected with michael@0: * the same value that the function's promise was resolved or rejected with. michael@0: * michael@0: * @param func michael@0: * (function) Function performing storage interactions. michael@0: * @return Promise<> michael@0: */ michael@0: enqueueOperation: function (func) { michael@0: if (typeof(func) != "function") { michael@0: throw new Error("enqueueOperation expects a function. Got: " + typeof(func)); michael@0: } michael@0: michael@0: this._log.trace("Enqueueing operation."); michael@0: let deferred = Promise.defer(); michael@0: michael@0: this._queuedOperations.push([func, deferred]); michael@0: michael@0: if (this._queuedOperations.length == 1) { michael@0: this._popAndPerformQueuedOperation(); michael@0: } michael@0: michael@0: return deferred.promise; michael@0: }, michael@0: michael@0: /** michael@0: * Enqueue a function to be performed as a transaction. michael@0: * michael@0: * The passed function should be a generator suitable for calling with michael@0: * `executeTransaction` from the SQLite connection. michael@0: */ michael@0: enqueueTransaction: function (func, type) { michael@0: return this.enqueueOperation( michael@0: this._connection.executeTransaction.bind(this._connection, func, type) michael@0: ); michael@0: }, michael@0: michael@0: _popAndPerformQueuedOperation: function () { michael@0: if (!this._queuedOperations.length || this._queuedInProgress) { michael@0: return; michael@0: } michael@0: michael@0: this._log.trace("Performing queued operation."); michael@0: let [func, deferred] = this._queuedOperations.shift(); michael@0: let promise; michael@0: michael@0: try { michael@0: this._queuedInProgress = true; michael@0: promise = func(); michael@0: } catch (ex) { michael@0: this._log.warn("Queued operation threw during execution: " + michael@0: CommonUtils.exceptionStr(ex)); michael@0: this._queuedInProgress = false; michael@0: deferred.reject(ex); michael@0: this._popAndPerformQueuedOperation(); michael@0: return; michael@0: } michael@0: michael@0: if (!promise || typeof(promise.then) != "function") { michael@0: let msg = "Queued operation did not return a promise: " + func; michael@0: this._log.warn(msg); michael@0: michael@0: this._queuedInProgress = false; michael@0: deferred.reject(new Error(msg)); michael@0: this._popAndPerformQueuedOperation(); michael@0: return; michael@0: } michael@0: michael@0: promise.then( michael@0: function onSuccess(result) { michael@0: this._log.trace("Queued operation completed."); michael@0: this._queuedInProgress = false; michael@0: deferred.resolve(result); michael@0: this._popAndPerformQueuedOperation(); michael@0: }.bind(this), michael@0: function onError(error) { michael@0: this._log.warn("Failure when performing queued operation: " + michael@0: CommonUtils.exceptionStr(error)); michael@0: this._queuedInProgress = false; michael@0: deferred.reject(error); michael@0: this._popAndPerformQueuedOperation(); michael@0: }.bind(this) michael@0: ); michael@0: }, michael@0: michael@0: /** michael@0: * Obtain all values associated with a measurement. michael@0: * michael@0: * This returns a promise that resolves to an object. The keys of the object michael@0: * are: michael@0: * michael@0: * days -- DailyValues where the values are Maps of field name to data michael@0: * structures. The data structures could be simple (string or number) or michael@0: * Arrays if the field type allows multiple values per day. michael@0: * michael@0: * singular -- Map of field names to values. This holds all fields that michael@0: * don't have a temporal component. michael@0: * michael@0: * @param id michael@0: * (Number) Primary key of measurement whose values to retrieve. michael@0: */ michael@0: getMeasurementValues: function (id) { michael@0: let deferred = Promise.defer(); michael@0: let days = new DailyValues(); michael@0: let singular = new Map(); michael@0: michael@0: let self = this; michael@0: this.enqueueOperation(function enqueuedGetMeasurementValues() { michael@0: return Task.spawn(function fetchMeasurementValues() { michael@0: function handleResult(data) { michael@0: for (let [field, values] of data) { michael@0: for (let [day, value] of Iterator(values)) { michael@0: if (!days.hasDay(day)) { michael@0: days.setDay(day, new Map()); michael@0: } michael@0: michael@0: days.getDay(day).set(field, value); michael@0: } michael@0: } michael@0: } michael@0: michael@0: if (self.measurementHasAnyDailyCounterFields(id)) { michael@0: let counters = yield self.getMeasurementDailyCountersFromMeasurementID(id); michael@0: handleResult(counters); michael@0: } michael@0: michael@0: if (self.measurementHasAnyDailyLastFields(id)) { michael@0: let dailyLast = yield self.getMeasurementDailyLastValuesFromMeasurementID(id); michael@0: handleResult(dailyLast); michael@0: } michael@0: michael@0: if (self.measurementHasAnyDailyDiscreteFields(id)) { michael@0: let dailyDiscrete = yield self.getMeasurementDailyDiscreteValuesFromMeasurementID(id); michael@0: handleResult(dailyDiscrete); michael@0: } michael@0: michael@0: if (self.measurementHasAnyLastFields(id)) { michael@0: let last = yield self.getMeasurementLastValuesFromMeasurementID(id); michael@0: michael@0: for (let [field, value] of last) { michael@0: singular.set(field, value); michael@0: } michael@0: } michael@0: michael@0: }); michael@0: }).then(function onSuccess() { michael@0: deferred.resolve({singular: singular, days: days}); michael@0: }, function onError(error) { michael@0: deferred.reject(error); michael@0: }); michael@0: michael@0: return deferred.promise; michael@0: }, michael@0: michael@0: //--------------------------------------------------------------------------- michael@0: // Low-level storage operations michael@0: // michael@0: // These will be performed immediately (or at least as soon as the underlying michael@0: // connection allows them to be.) It is recommended to call these from within michael@0: // a function added via `enqueueOperation()` or they may inadvertently be michael@0: // performed during another enqueued operation, which may be a transaction michael@0: // that is rolled back. michael@0: // --------------------------------------------------------------------------- michael@0: michael@0: /** michael@0: * Set state for a provider. michael@0: * michael@0: * Providers have the ability to register persistent state with the backend. michael@0: * Persistent state doesn't expire. The format of the data is completely up michael@0: * to the provider beyond the requirement that values be UTF-8 strings. michael@0: * michael@0: * This returns a promise that will be resolved when the underlying database michael@0: * operation has completed. michael@0: * michael@0: * @param provider michael@0: * (string) Name of the provider. michael@0: * @param key michael@0: * (string) Key under which to store this state. michael@0: * @param value michael@0: * (string) Value for this state. michael@0: * @return Promise<> michael@0: */ michael@0: setProviderState: function (provider, key, value) { michael@0: if (typeof(key) != "string") { michael@0: throw new Error("State key must be a string. Got: " + key); michael@0: } michael@0: michael@0: if (typeof(value) != "string") { michael@0: throw new Error("State value must be a string. Got: " + value); michael@0: } michael@0: michael@0: let id = this.providerID(provider); michael@0: if (!id) { michael@0: throw new Error("Unknown provider: " + provider); michael@0: } michael@0: michael@0: return this._connection.executeCached(SQL.setProviderState, { michael@0: provider_id: id, michael@0: name: key, michael@0: value: value, michael@0: }); michael@0: }, michael@0: michael@0: /** michael@0: * Obtain named state for a provider. michael@0: * michael@0: * michael@0: * The returned promise will resolve to the state from the database or null michael@0: * if the key is not stored. michael@0: * michael@0: * @param provider michael@0: * (string) The name of the provider whose state to obtain. michael@0: * @param key michael@0: * (string) The state's key to retrieve. michael@0: * michael@0: * @return Promise michael@0: */ michael@0: getProviderState: function (provider, key) { michael@0: let id = this.providerID(provider); michael@0: if (!id) { michael@0: throw new Error("Unknown provider: " + provider); michael@0: } michael@0: michael@0: let conn = this._connection; michael@0: return Task.spawn(function queryDB() { michael@0: let rows = yield conn.executeCached(SQL.getProviderStateWithName, { michael@0: provider_id: id, michael@0: name: key, michael@0: }); michael@0: michael@0: if (!rows.length) { michael@0: throw new Task.Result(null); michael@0: } michael@0: michael@0: throw new Task.Result(rows[0].getResultByIndex(0)); michael@0: }); michael@0: }, michael@0: michael@0: /** michael@0: * Increment a daily counter from a numeric field id. michael@0: * michael@0: * @param id michael@0: * (integer) Primary key of field to increment. michael@0: * @param date michael@0: * (Date) When the increment occurred. This is typically "now" but can michael@0: * be explicitly defined for events that occurred in the past. michael@0: * @param by michael@0: * (integer) How much to increment the value by. Defaults to 1. michael@0: */ michael@0: incrementDailyCounterFromFieldID: function (id, date=new Date(), by=1) { michael@0: this._ensureFieldType(id, this.FIELD_DAILY_COUNTER); michael@0: michael@0: let params = { michael@0: field_id: id, michael@0: days: dateToDays(date), michael@0: by: by, michael@0: }; michael@0: michael@0: return this._connection.executeCached(SQL.incrementDailyCounterFromFieldID, michael@0: params); michael@0: }, michael@0: michael@0: /** michael@0: * Obtain all counts for a specific daily counter. michael@0: * michael@0: * @param id michael@0: * (integer) The ID of the field being retrieved. michael@0: */ michael@0: getDailyCounterCountsFromFieldID: function (id) { michael@0: this._ensureFieldType(id, this.FIELD_DAILY_COUNTER); michael@0: michael@0: let self = this; michael@0: return Task.spawn(function fetchCounterDays() { michael@0: let rows = yield self._connection.executeCached(SQL.getDailyCounterCountsFromFieldID, michael@0: {field_id: id}); michael@0: michael@0: let result = new DailyValues(); michael@0: for (let row of rows) { michael@0: let days = row.getResultByIndex(0); michael@0: let counter = row.getResultByIndex(1); michael@0: michael@0: let date = daysToDate(days); michael@0: result.setDay(date, counter); michael@0: } michael@0: michael@0: throw new Task.Result(result); michael@0: }); michael@0: }, michael@0: michael@0: /** michael@0: * Get the value of a daily counter for a given day. michael@0: * michael@0: * @param field michael@0: * (integer) Field ID to retrieve. michael@0: * @param date michael@0: * (Date) Date for day from which to obtain data. michael@0: */ michael@0: getDailyCounterCountFromFieldID: function (field, date) { michael@0: this._ensureFieldType(field, this.FIELD_DAILY_COUNTER); michael@0: michael@0: let params = { michael@0: field_id: field, michael@0: days: dateToDays(date), michael@0: }; michael@0: michael@0: let self = this; michael@0: return Task.spawn(function fetchCounter() { michael@0: let rows = yield self._connection.executeCached(SQL.getDailyCounterCountFromFieldID, michael@0: params); michael@0: if (!rows.length) { michael@0: throw new Task.Result(null); michael@0: } michael@0: michael@0: throw new Task.Result(rows[0].getResultByIndex(0)); michael@0: }); michael@0: }, michael@0: michael@0: /** michael@0: * Define the value for a "last numeric" field. michael@0: * michael@0: * The previous value (if any) will be replaced by the value passed, even if michael@0: * the date of the incoming value is older than what's recorded in the michael@0: * database. michael@0: * michael@0: * @param fieldID michael@0: * (Number) Integer primary key of field to update. michael@0: * @param value michael@0: * (Number) Value to record. michael@0: * @param date michael@0: * (Date) When this value was produced. michael@0: */ michael@0: setLastNumericFromFieldID: function (fieldID, value, date=new Date()) { michael@0: this._ensureFieldType(fieldID, this.FIELD_LAST_NUMERIC); michael@0: michael@0: if (typeof(value) != "number") { michael@0: throw new Error("Value is not a number: " + value); michael@0: } michael@0: michael@0: let params = { michael@0: field_id: fieldID, michael@0: days: dateToDays(date), michael@0: value: value, michael@0: }; michael@0: michael@0: return this._connection.executeCached(SQL.setLastNumeric, params); michael@0: }, michael@0: michael@0: /** michael@0: * Define the value of a "last text" field. michael@0: * michael@0: * See `setLastNumericFromFieldID` for behavior. michael@0: */ michael@0: setLastTextFromFieldID: function (fieldID, value, date=new Date()) { michael@0: this._ensureFieldType(fieldID, this.FIELD_LAST_TEXT); michael@0: michael@0: if (typeof(value) != "string") { michael@0: throw new Error("Value is not a string: " + value); michael@0: } michael@0: michael@0: let params = { michael@0: field_id: fieldID, michael@0: days: dateToDays(date), michael@0: value: value, michael@0: }; michael@0: michael@0: return this._connection.executeCached(SQL.setLastText, params); michael@0: }, michael@0: michael@0: /** michael@0: * Obtain the value of a "last numeric" field. michael@0: * michael@0: * This returns a promise that will be resolved with an Array of [date, value] michael@0: * if a value is known or null if no last value is present. michael@0: * michael@0: * @param fieldID michael@0: * (Number) Integer primary key of field to retrieve. michael@0: */ michael@0: getLastNumericFromFieldID: function (fieldID) { michael@0: this._ensureFieldType(fieldID, this.FIELD_LAST_NUMERIC); michael@0: michael@0: let self = this; michael@0: return Task.spawn(function fetchLastField() { michael@0: let rows = yield self._connection.executeCached(SQL.getLastNumericFromFieldID, michael@0: {field_id: fieldID}); michael@0: michael@0: if (!rows.length) { michael@0: throw new Task.Result(null); michael@0: } michael@0: michael@0: let row = rows[0]; michael@0: let days = row.getResultByIndex(0); michael@0: let value = row.getResultByIndex(1); michael@0: michael@0: throw new Task.Result([daysToDate(days), value]); michael@0: }); michael@0: }, michael@0: michael@0: /** michael@0: * Obtain the value of a "last text" field. michael@0: * michael@0: * See `getLastNumericFromFieldID` for behavior. michael@0: */ michael@0: getLastTextFromFieldID: function (fieldID) { michael@0: this._ensureFieldType(fieldID, this.FIELD_LAST_TEXT); michael@0: michael@0: let self = this; michael@0: return Task.spawn(function fetchLastField() { michael@0: let rows = yield self._connection.executeCached(SQL.getLastTextFromFieldID, michael@0: {field_id: fieldID}); michael@0: michael@0: if (!rows.length) { michael@0: throw new Task.Result(null); michael@0: } michael@0: michael@0: let row = rows[0]; michael@0: let days = row.getResultByIndex(0); michael@0: let value = row.getResultByIndex(1); michael@0: michael@0: throw new Task.Result([daysToDate(days), value]); michael@0: }); michael@0: }, michael@0: michael@0: /** michael@0: * Delete the value (if any) in a "last numeric" field. michael@0: */ michael@0: deleteLastNumericFromFieldID: function (fieldID) { michael@0: this._ensureFieldType(fieldID, this.FIELD_LAST_NUMERIC); michael@0: michael@0: return this._connection.executeCached(SQL.deleteLastNumericFromFieldID, michael@0: {field_id: fieldID}); michael@0: }, michael@0: michael@0: /** michael@0: * Delete the value (if any) in a "last text" field. michael@0: */ michael@0: deleteLastTextFromFieldID: function (fieldID) { michael@0: this._ensureFieldType(fieldID, this.FIELD_LAST_TEXT); michael@0: michael@0: return this._connection.executeCached(SQL.deleteLastTextFromFieldID, michael@0: {field_id: fieldID}); michael@0: }, michael@0: michael@0: /** michael@0: * Record a value for a "daily last numeric" field. michael@0: * michael@0: * The field can hold 1 value per calendar day. If the field already has a michael@0: * value for the day specified (defaults to now), that value will be michael@0: * replaced, even if the date specified is older (within the day) than the michael@0: * previously recorded value. michael@0: * michael@0: * @param fieldID michael@0: * (Number) Integer primary key of field. michael@0: * @param value michael@0: * (Number) Value to record. michael@0: * @param date michael@0: * (Date) When the value was produced. Defaults to now. michael@0: */ michael@0: setDailyLastNumericFromFieldID: function (fieldID, value, date=new Date()) { michael@0: this._ensureFieldType(fieldID, this.FIELD_DAILY_LAST_NUMERIC); michael@0: michael@0: let params = { michael@0: field_id: fieldID, michael@0: days: dateToDays(date), michael@0: value: value, michael@0: }; michael@0: michael@0: return this._connection.executeCached(SQL.setDailyLastNumeric, params); michael@0: }, michael@0: michael@0: /** michael@0: * Record a value for a "daily last text" field. michael@0: * michael@0: * See `setDailyLastNumericFromFieldID` for behavior. michael@0: */ michael@0: setDailyLastTextFromFieldID: function (fieldID, value, date=new Date()) { michael@0: this._ensureFieldType(fieldID, this.FIELD_DAILY_LAST_TEXT); michael@0: michael@0: let params = { michael@0: field_id: fieldID, michael@0: days: dateToDays(date), michael@0: value: value, michael@0: }; michael@0: michael@0: return this._connection.executeCached(SQL.setDailyLastText, params); michael@0: }, michael@0: michael@0: /** michael@0: * Obtain value(s) from a "daily last numeric" field. michael@0: * michael@0: * This returns a promise that resolves to a DailyValues instance. If `date` michael@0: * is specified, that instance will have at most 1 entry. If there is no michael@0: * `date` constraint, then all stored values will be retrieved. michael@0: * michael@0: * @param fieldID michael@0: * (Number) Integer primary key of field to retrieve. michael@0: * @param date optional michael@0: * (Date) If specified, only return data for this day. michael@0: * michael@0: * @return Promise michael@0: */ michael@0: getDailyLastNumericFromFieldID: function (fieldID, date=null) { michael@0: this._ensureFieldType(fieldID, this.FIELD_DAILY_LAST_NUMERIC); michael@0: michael@0: let params = {field_id: fieldID}; michael@0: let name = "getDailyLastNumericFromFieldID"; michael@0: michael@0: if (date) { michael@0: params.days = dateToDays(date); michael@0: name = "getDailyLastNumericFromFieldIDAndDay"; michael@0: } michael@0: michael@0: return this._getDailyLastFromFieldID(name, params); michael@0: }, michael@0: michael@0: /** michael@0: * Obtain value(s) from a "daily last text" field. michael@0: * michael@0: * See `getDailyLastNumericFromFieldID` for behavior. michael@0: */ michael@0: getDailyLastTextFromFieldID: function (fieldID, date=null) { michael@0: this._ensureFieldType(fieldID, this.FIELD_DAILY_LAST_TEXT); michael@0: michael@0: let params = {field_id: fieldID}; michael@0: let name = "getDailyLastTextFromFieldID"; michael@0: michael@0: if (date) { michael@0: params.days = dateToDays(date); michael@0: name = "getDailyLastTextFromFieldIDAndDay"; michael@0: } michael@0: michael@0: return this._getDailyLastFromFieldID(name, params); michael@0: }, michael@0: michael@0: _getDailyLastFromFieldID: function (name, params) { michael@0: let self = this; michael@0: return Task.spawn(function fetchDailyLastForField() { michael@0: let rows = yield self._connection.executeCached(SQL[name], params); michael@0: michael@0: let result = new DailyValues(); michael@0: for (let row of rows) { michael@0: let d = daysToDate(row.getResultByIndex(0)); michael@0: let value = row.getResultByIndex(1); michael@0: michael@0: result.setDay(d, value); michael@0: } michael@0: michael@0: throw new Task.Result(result); michael@0: }); michael@0: }, michael@0: michael@0: /** michael@0: * Add a new value for a "daily discrete numeric" field. michael@0: * michael@0: * This appends a new value to the list of values for a specific field. All michael@0: * values are retained. Duplicate values are allowed. michael@0: * michael@0: * @param fieldID michael@0: * (Number) Integer primary key of field. michael@0: * @param value michael@0: * (Number) Value to record. michael@0: * @param date optional michael@0: * (Date) When this value occurred. Values are bucketed by day. michael@0: */ michael@0: addDailyDiscreteNumericFromFieldID: function (fieldID, value, date=new Date()) { michael@0: this._ensureFieldType(fieldID, this.FIELD_DAILY_DISCRETE_NUMERIC); michael@0: michael@0: if (typeof(value) != "number") { michael@0: throw new Error("Number expected. Got: " + value); michael@0: } michael@0: michael@0: let params = { michael@0: field_id: fieldID, michael@0: days: dateToDays(date), michael@0: value: value, michael@0: }; michael@0: michael@0: return this._connection.executeCached(SQL.addDailyDiscreteNumeric, params); michael@0: }, michael@0: michael@0: /** michael@0: * Add a new value for a "daily discrete text" field. michael@0: * michael@0: * See `addDailyDiscreteNumericFromFieldID` for behavior. michael@0: */ michael@0: addDailyDiscreteTextFromFieldID: function (fieldID, value, date=new Date()) { michael@0: this._ensureFieldType(fieldID, this.FIELD_DAILY_DISCRETE_TEXT); michael@0: michael@0: if (typeof(value) != "string") { michael@0: throw new Error("String expected. Got: " + value); michael@0: } michael@0: michael@0: let params = { michael@0: field_id: fieldID, michael@0: days: dateToDays(date), michael@0: value: value, michael@0: }; michael@0: michael@0: return this._connection.executeCached(SQL.addDailyDiscreteText, params); michael@0: }, michael@0: michael@0: /** michael@0: * Obtain values for a "daily discrete numeric" field. michael@0: * michael@0: * This returns a promise that resolves to a `DailyValues` instance. If michael@0: * `date` is specified, there will be at most 1 key in that instance. If michael@0: * not, all data from the database will be retrieved. michael@0: * michael@0: * Values in that instance will be arrays of the raw values. michael@0: * michael@0: * @param fieldID michael@0: * (Number) Integer primary key of field to retrieve. michael@0: * @param date optional michael@0: * (Date) Day to obtain data for. Date can be any time in the day. michael@0: */ michael@0: getDailyDiscreteNumericFromFieldID: function (fieldID, date=null) { michael@0: this._ensureFieldType(fieldID, this.FIELD_DAILY_DISCRETE_NUMERIC); michael@0: michael@0: let params = {field_id: fieldID}; michael@0: michael@0: let name = "getDailyDiscreteNumericFromFieldID"; michael@0: michael@0: if (date) { michael@0: params.days = dateToDays(date); michael@0: name = "getDailyDiscreteNumericFromFieldIDAndDay"; michael@0: } michael@0: michael@0: return this._getDailyDiscreteFromFieldID(name, params); michael@0: }, michael@0: michael@0: /** michael@0: * Obtain values for a "daily discrete text" field. michael@0: * michael@0: * See `getDailyDiscreteNumericFromFieldID` for behavior. michael@0: */ michael@0: getDailyDiscreteTextFromFieldID: function (fieldID, date=null) { michael@0: this._ensureFieldType(fieldID, this.FIELD_DAILY_DISCRETE_TEXT); michael@0: michael@0: let params = {field_id: fieldID}; michael@0: michael@0: let name = "getDailyDiscreteTextFromFieldID"; michael@0: michael@0: if (date) { michael@0: params.days = dateToDays(date); michael@0: name = "getDailyDiscreteTextFromFieldIDAndDay"; michael@0: } michael@0: michael@0: return this._getDailyDiscreteFromFieldID(name, params); michael@0: }, michael@0: michael@0: _getDailyDiscreteFromFieldID: function (name, params) { michael@0: let self = this; michael@0: return Task.spawn(function fetchDailyDiscreteValuesForField() { michael@0: let rows = yield self._connection.executeCached(SQL[name], params); michael@0: michael@0: let result = new DailyValues(); michael@0: for (let row of rows) { michael@0: let d = daysToDate(row.getResultByIndex(0)); michael@0: let value = row.getResultByIndex(1); michael@0: michael@0: result.appendValue(d, value); michael@0: } michael@0: michael@0: throw new Task.Result(result); michael@0: }); michael@0: }, michael@0: michael@0: /** michael@0: * Obtain the counts of daily counters in a measurement. michael@0: * michael@0: * This returns a promise that resolves to a Map of field name strings to michael@0: * DailyValues that hold per-day counts. michael@0: * michael@0: * @param id michael@0: * (Number) Integer primary key of measurement. michael@0: * michael@0: * @return Promise michael@0: */ michael@0: getMeasurementDailyCountersFromMeasurementID: function (id) { michael@0: let self = this; michael@0: return Task.spawn(function fetchDailyCounters() { michael@0: let rows = yield self._connection.execute(SQL.getMeasurementDailyCounters, michael@0: {measurement_id: id}); michael@0: michael@0: let result = new Map(); michael@0: for (let row of rows) { michael@0: let field = row.getResultByName("field_name"); michael@0: let date = daysToDate(row.getResultByName("day")); michael@0: let value = row.getResultByName("value"); michael@0: michael@0: if (!result.has(field)) { michael@0: result.set(field, new DailyValues()); michael@0: } michael@0: michael@0: result.get(field).setDay(date, value); michael@0: } michael@0: michael@0: throw new Task.Result(result); michael@0: }); michael@0: }, michael@0: michael@0: /** michael@0: * Obtain the values of "last" fields from a measurement. michael@0: * michael@0: * This returns a promise that resolves to a Map of field name to an array michael@0: * of [date, value]. michael@0: * michael@0: * @param id michael@0: * (Number) Integer primary key of measurement whose data to retrieve. michael@0: * michael@0: * @return Promise michael@0: */ michael@0: getMeasurementLastValuesFromMeasurementID: function (id) { michael@0: let self = this; michael@0: return Task.spawn(function fetchMeasurementLastValues() { michael@0: let rows = yield self._connection.execute(SQL.getMeasurementLastValues, michael@0: {measurement_id: id}); michael@0: michael@0: let result = new Map(); michael@0: for (let row of rows) { michael@0: let date = daysToDate(row.getResultByIndex(1)); michael@0: let value = row.getResultByIndex(2); michael@0: result.set(row.getResultByIndex(0), [date, value]); michael@0: } michael@0: michael@0: throw new Task.Result(result); michael@0: }); michael@0: }, michael@0: michael@0: /** michael@0: * Obtain the values of "last daily" fields from a measurement. michael@0: * michael@0: * This returns a promise that resolves to a Map of field name to DailyValues michael@0: * instances. Each DailyValues instance has days for which a daily last value michael@0: * is defined. The values in each DailyValues are the raw last value for that michael@0: * day. michael@0: * michael@0: * @param id michael@0: * (Number) Integer primary key of measurement whose data to retrieve. michael@0: * michael@0: * @return Promise michael@0: */ michael@0: getMeasurementDailyLastValuesFromMeasurementID: function (id) { michael@0: let self = this; michael@0: return Task.spawn(function fetchMeasurementDailyLastValues() { michael@0: let rows = yield self._connection.execute(SQL.getMeasurementDailyLastValues, michael@0: {measurement_id: id}); michael@0: michael@0: let result = new Map(); michael@0: for (let row of rows) { michael@0: let field = row.getResultByName("field_name"); michael@0: let date = daysToDate(row.getResultByName("day")); michael@0: let value = row.getResultByName("value"); michael@0: michael@0: if (!result.has(field)) { michael@0: result.set(field, new DailyValues()); michael@0: } michael@0: michael@0: result.get(field).setDay(date, value); michael@0: } michael@0: michael@0: throw new Task.Result(result); michael@0: }); michael@0: }, michael@0: michael@0: /** michael@0: * Obtain the values of "daily discrete" fields from a measurement. michael@0: * michael@0: * This obtains all discrete values for all "daily discrete" fields in a michael@0: * measurement. michael@0: * michael@0: * This returns a promise that resolves to a Map. The Map's keys are field michael@0: * string names. Values are `DailyValues` instances. The values inside michael@0: * the `DailyValues` are arrays of the raw discrete values. michael@0: * michael@0: * @param id michael@0: * (Number) Integer primary key of measurement. michael@0: * michael@0: * @return Promise michael@0: */ michael@0: getMeasurementDailyDiscreteValuesFromMeasurementID: function (id) { michael@0: let deferred = Promise.defer(); michael@0: let result = new Map(); michael@0: michael@0: this._connection.execute(SQL.getMeasurementDailyDiscreteValues, michael@0: {measurement_id: id}, function onRow(row) { michael@0: let field = row.getResultByName("field_name"); michael@0: let date = daysToDate(row.getResultByName("day")); michael@0: let value = row.getResultByName("value"); michael@0: michael@0: if (!result.has(field)) { michael@0: result.set(field, new DailyValues()); michael@0: } michael@0: michael@0: result.get(field).appendValue(date, value); michael@0: }).then(function onComplete() { michael@0: deferred.resolve(result); michael@0: }, function onError(error) { michael@0: deferred.reject(error); michael@0: }); michael@0: michael@0: return deferred.promise; michael@0: }, michael@0: }); michael@0: michael@0: // Alias built-in field types to public API. michael@0: for (let property of MetricsStorageSqliteBackend.prototype._BUILTIN_TYPES) { michael@0: this.MetricsStorageBackend[property] = MetricsStorageSqliteBackend.prototype[property]; michael@0: } michael@0: