diff -r 000000000000 -r 6474c204b198 services/metrics/storage.jsm --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/services/metrics/storage.jsm Wed Dec 31 06:09:35 2014 +0100 @@ -0,0 +1,2179 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +"use strict"; + +#ifndef MERGED_COMPARTMENT + +this.EXPORTED_SYMBOLS = [ + "DailyValues", + "MetricsStorageBackend", + "dateToDays", + "daysToDate", +]; + +const {utils: Cu} = Components; + +const MILLISECONDS_PER_DAY = 24 * 60 * 60 * 1000; + +#endif + +Cu.import("resource://gre/modules/Promise.jsm"); +Cu.import("resource://gre/modules/Sqlite.jsm"); +Cu.import("resource://gre/modules/Task.jsm"); +Cu.import("resource://gre/modules/Log.jsm"); +Cu.import("resource://services-common/utils.js"); + + +// These do not account for leap seconds. Meh. +function dateToDays(date) { + return Math.floor(date.getTime() / MILLISECONDS_PER_DAY); +} + +function daysToDate(days) { + return new Date(days * MILLISECONDS_PER_DAY); +} + +/** + * Represents a collection of per-day values. + * + * This is a proxy around a Map which can transparently round Date instances to + * their appropriate key. + * + * This emulates Map by providing .size and iterator support. Note that keys + * from the iterator are Date instances corresponding to midnight of the start + * of the day. get(), has(), and set() are modeled as getDay(), hasDay(), and + * setDay(), respectively. + * + * All days are defined in terms of UTC (as opposed to local time). + */ +this.DailyValues = function () { + this._days = new Map(); +}; + +DailyValues.prototype = Object.freeze({ + __iterator__: function () { + for (let [k, v] of this._days) { + yield [daysToDate(k), v]; + } + }, + + get size() { + return this._days.size; + }, + + hasDay: function (date) { + return this._days.has(dateToDays(date)); + }, + + getDay: function (date) { + return this._days.get(dateToDays(date)); + }, + + setDay: function (date, value) { + this._days.set(dateToDays(date), value); + }, + + appendValue: function (date, value) { + let key = dateToDays(date); + + if (this._days.has(key)) { + return this._days.get(key).push(value); + } + + this._days.set(key, [value]); + }, +}); + + +/** + * DATABASE INFO + * ============= + * + * We use a SQLite database as the backend for persistent storage of metrics + * data. + * + * Every piece of recorded data is associated with a measurement. A measurement + * is an entity with a name and version. Each measurement is associated with a + * named provider. + * + * When the metrics system is initialized, we ask providers (the entities that + * emit data) to configure the database for storage of their data. They tell + * storage what their requirements are. For example, they'll register + * named daily counters associated with specific measurements. + * + * Recorded data is stored differently depending on the requirements for + * storing it. We have facilities for storing the following classes of data: + * + * 1) Counts of event/field occurrences aggregated by day. + * 2) Discrete values of fields aggregated by day. + * 3) Discrete values of fields aggregated by day max 1 per day (last write + * wins). + * 4) Discrete values of fields max 1 (last write wins). + * + * Most data is aggregated per day mainly for privacy reasons. This does throw + * away potentially useful data. But, it's not currently used, so there is no + * need to keep the granular information. + * + * Database Schema + * --------------- + * + * This database contains the following tables: + * + * providers -- Maps provider string name to an internal ID. + * provider_state -- Holds opaque persisted state for providers. + * measurements -- Holds the set of known measurements (name, version, + * provider tuples). + * types -- The data types that can be stored in measurements/fields. + * fields -- Describes entities that occur within measurements. + * daily_counters -- Holds daily-aggregated counts of events. Each row is + * associated with a field and a day. + * daily_discrete_numeric -- Holds numeric values for fields grouped by day. + * Each row contains a discrete value associated with a field that occurred + * on a specific day. There can be multiple rows per field per day. + * daily_discrete_text -- Holds text values for fields grouped by day. Each + * row contains a discrete value associated with a field that occurred on a + * specific day. + * daily_last_numeric -- Holds numeric values where the last encountered + * value for a given day is retained. + * daily_last_text -- Like daily_last_numeric except for text values. + * last_numeric -- Holds the most recent value for a numeric field. + * last_text -- Like last_numeric except for text fields. + * + * Notes + * ----- + * + * It is tempting to use SQLite's julianday() function to store days that + * things happened. However, a Julian Day begins at *noon* in 4714 B.C. This + * results in weird half day offsets from UNIX time. So, we instead store + * number of days since UNIX epoch, not Julian. + */ + +/** + * All of our SQL statements are stored in a central mapping so they can easily + * be audited for security, perf, etc. + */ +const SQL = { + // Create the providers table. + createProvidersTable: "\ +CREATE TABLE providers (\ + id INTEGER PRIMARY KEY AUTOINCREMENT, \ + name TEXT, \ + UNIQUE (name) \ +)", + + createProviderStateTable: "\ +CREATE TABLE provider_state (\ + id INTEGER PRIMARY KEY AUTOINCREMENT, \ + provider_id INTEGER, \ + name TEXT, \ + VALUE TEXT, \ + UNIQUE (provider_id, name), \ + FOREIGN KEY (provider_id) REFERENCES providers(id) ON DELETE CASCADE\ +)", + + createProviderStateProviderIndex: "\ +CREATE INDEX i_provider_state_provider_id ON provider_state (provider_id)", + + createMeasurementsTable: "\ +CREATE TABLE measurements (\ + id INTEGER PRIMARY KEY AUTOINCREMENT, \ + provider_id INTEGER, \ + name TEXT, \ + version INTEGER, \ + UNIQUE (provider_id, name, version), \ + FOREIGN KEY (provider_id) REFERENCES providers(id) ON DELETE CASCADE\ +)", + + createMeasurementsProviderIndex: "\ +CREATE INDEX i_measurements_provider_id ON measurements (provider_id)", + + createMeasurementsView: "\ +CREATE VIEW v_measurements AS \ + SELECT \ + providers.id AS provider_id, \ + providers.name AS provider_name, \ + measurements.id AS measurement_id, \ + measurements.name AS measurement_name, \ + measurements.version AS measurement_version \ + FROM providers, measurements \ + WHERE \ + measurements.provider_id = providers.id", + + createTypesTable: "\ +CREATE TABLE types (\ + id INTEGER PRIMARY KEY AUTOINCREMENT, \ + name TEXT, \ + UNIQUE (name)\ +)", + + createFieldsTable: "\ +CREATE TABLE fields (\ + id INTEGER PRIMARY KEY AUTOINCREMENT, \ + measurement_id INTEGER, \ + name TEXT, \ + value_type INTEGER , \ + UNIQUE (measurement_id, name), \ + FOREIGN KEY (measurement_id) REFERENCES measurements(id) ON DELETE CASCADE \ + FOREIGN KEY (value_type) REFERENCES types(id) ON DELETE CASCADE \ +)", + + createFieldsMeasurementIndex: "\ +CREATE INDEX i_fields_measurement_id ON fields (measurement_id)", + + createFieldsView: "\ +CREATE VIEW v_fields AS \ + SELECT \ + providers.id AS provider_id, \ + providers.name AS provider_name, \ + measurements.id AS measurement_id, \ + measurements.name AS measurement_name, \ + measurements.version AS measurement_version, \ + fields.id AS field_id, \ + fields.name AS field_name, \ + types.id AS type_id, \ + types.name AS type_name \ + FROM providers, measurements, fields, types \ + WHERE \ + fields.measurement_id = measurements.id \ + AND measurements.provider_id = providers.id \ + AND fields.value_type = types.id", + + createDailyCountersTable: "\ +CREATE TABLE daily_counters (\ + field_id INTEGER, \ + day INTEGER, \ + value INTEGER, \ + UNIQUE(field_id, day), \ + FOREIGN KEY (field_id) REFERENCES fields(id) ON DELETE CASCADE\ +)", + + createDailyCountersFieldIndex: "\ +CREATE INDEX i_daily_counters_field_id ON daily_counters (field_id)", + + createDailyCountersDayIndex: "\ +CREATE INDEX i_daily_counters_day ON daily_counters (day)", + + createDailyCountersView: "\ +CREATE VIEW v_daily_counters AS SELECT \ + providers.id AS provider_id, \ + providers.name AS provider_name, \ + measurements.id AS measurement_id, \ + measurements.name AS measurement_name, \ + measurements.version AS measurement_version, \ + fields.id AS field_id, \ + fields.name AS field_name, \ + daily_counters.day AS day, \ + daily_counters.value AS value \ +FROM providers, measurements, fields, daily_counters \ +WHERE \ + daily_counters.field_id = fields.id \ + AND fields.measurement_id = measurements.id \ + AND measurements.provider_id = providers.id", + + createDailyDiscreteNumericsTable: "\ +CREATE TABLE daily_discrete_numeric (\ + id INTEGER PRIMARY KEY AUTOINCREMENT, \ + field_id INTEGER, \ + day INTEGER, \ + value INTEGER, \ + FOREIGN KEY (field_id) REFERENCES fields(id) ON DELETE CASCADE\ +)", + + createDailyDiscreteNumericsFieldIndex: "\ +CREATE INDEX i_daily_discrete_numeric_field_id \ +ON daily_discrete_numeric (field_id)", + + createDailyDiscreteNumericsDayIndex: "\ +CREATE INDEX i_daily_discrete_numeric_day \ +ON daily_discrete_numeric (day)", + + createDailyDiscreteTextTable: "\ +CREATE TABLE daily_discrete_text (\ + id INTEGER PRIMARY KEY AUTOINCREMENT, \ + field_id INTEGER, \ + day INTEGER, \ + value TEXT, \ + FOREIGN KEY (field_id) REFERENCES fields(id) ON DELETE CASCADE\ +)", + + createDailyDiscreteTextFieldIndex: "\ +CREATE INDEX i_daily_discrete_text_field_id \ +ON daily_discrete_text (field_id)", + + createDailyDiscreteTextDayIndex: "\ +CREATE INDEX i_daily_discrete_text_day \ +ON daily_discrete_text (day)", + + createDailyDiscreteView: "\ +CREATE VIEW v_daily_discrete AS \ + SELECT \ + providers.id AS provider_id, \ + providers.name AS provider_name, \ + measurements.id AS measurement_id, \ + measurements.name AS measurement_name, \ + measurements.version AS measurement_version, \ + fields.id AS field_id, \ + fields.name AS field_name, \ + daily_discrete_numeric.id AS value_id, \ + daily_discrete_numeric.day AS day, \ + daily_discrete_numeric.value AS value, \ + \"numeric\" AS value_type \ + FROM providers, measurements, fields, daily_discrete_numeric \ + WHERE \ + daily_discrete_numeric.field_id = fields.id \ + AND fields.measurement_id = measurements.id \ + AND measurements.provider_id = providers.id \ + UNION ALL \ + SELECT \ + providers.id AS provider_id, \ + providers.name AS provider_name, \ + measurements.id AS measurement_id, \ + measurements.name AS measurement_name, \ + measurements.version AS measurement_version, \ + fields.id AS field_id, \ + fields.name AS field_name, \ + daily_discrete_text.id AS value_id, \ + daily_discrete_text.day AS day, \ + daily_discrete_text.value AS value, \ + \"text\" AS value_type \ + FROM providers, measurements, fields, daily_discrete_text \ + WHERE \ + daily_discrete_text.field_id = fields.id \ + AND fields.measurement_id = measurements.id \ + AND measurements.provider_id = providers.id \ + ORDER BY day ASC, value_id ASC", + + createLastNumericTable: "\ +CREATE TABLE last_numeric (\ + field_id INTEGER PRIMARY KEY, \ + day INTEGER, \ + value NUMERIC, \ + FOREIGN KEY (field_id) REFERENCES fields(id) ON DELETE CASCADE\ +)", + + createLastTextTable: "\ +CREATE TABLE last_text (\ + field_id INTEGER PRIMARY KEY, \ + day INTEGER, \ + value TEXT, \ + FOREIGN KEY (field_id) REFERENCES fields(id) ON DELETE CASCADE\ +)", + + createLastView: "\ +CREATE VIEW v_last AS \ + SELECT \ + providers.id AS provider_id, \ + providers.name AS provider_name, \ + measurements.id AS measurement_id, \ + measurements.name AS measurement_name, \ + measurements.version AS measurement_version, \ + fields.id AS field_id, \ + fields.name AS field_name, \ + last_numeric.day AS day, \ + last_numeric.value AS value, \ + \"numeric\" AS value_type \ + FROM providers, measurements, fields, last_numeric \ + WHERE \ + last_numeric.field_id = fields.id \ + AND fields.measurement_id = measurements.id \ + AND measurements.provider_id = providers.id \ + UNION ALL \ + SELECT \ + providers.id AS provider_id, \ + providers.name AS provider_name, \ + measurements.id AS measurement_id, \ + measurements.name AS measurement_name, \ + measurements.version AS measurement_version, \ + fields.id AS field_id, \ + fields.name AS field_name, \ + last_text.day AS day, \ + last_text.value AS value, \ + \"text\" AS value_type \ + FROM providers, measurements, fields, last_text \ + WHERE \ + last_text.field_id = fields.id \ + AND fields.measurement_id = measurements.id \ + AND measurements.provider_id = providers.id", + + createDailyLastNumericTable: "\ +CREATE TABLE daily_last_numeric (\ + field_id INTEGER, \ + day INTEGER, \ + value NUMERIC, \ + UNIQUE (field_id, day) \ + FOREIGN KEY (field_id) REFERENCES fields(id) ON DELETE CASCADE\ +)", + + createDailyLastNumericFieldIndex: "\ +CREATE INDEX i_daily_last_numeric_field_id ON daily_last_numeric (field_id)", + + createDailyLastNumericDayIndex: "\ +CREATE INDEX i_daily_last_numeric_day ON daily_last_numeric (day)", + + createDailyLastTextTable: "\ +CREATE TABLE daily_last_text (\ + field_id INTEGER, \ + day INTEGER, \ + value TEXT, \ + UNIQUE (field_id, day) \ + FOREIGN KEY (field_id) REFERENCES fields(id) ON DELETE CASCADE\ +)", + + createDailyLastTextFieldIndex: "\ +CREATE INDEX i_daily_last_text_field_id ON daily_last_text (field_id)", + + createDailyLastTextDayIndex: "\ +CREATE INDEX i_daily_last_text_day ON daily_last_text (day)", + + createDailyLastView: "\ +CREATE VIEW v_daily_last AS \ + SELECT \ + providers.id AS provider_id, \ + providers.name AS provider_name, \ + measurements.id AS measurement_id, \ + measurements.name AS measurement_name, \ + measurements.version AS measurement_version, \ + fields.id AS field_id, \ + fields.name AS field_name, \ + daily_last_numeric.day AS day, \ + daily_last_numeric.value AS value, \ + \"numeric\" as value_type \ + FROM providers, measurements, fields, daily_last_numeric \ + WHERE \ + daily_last_numeric.field_id = fields.id \ + AND fields.measurement_id = measurements.id \ + AND measurements.provider_id = providers.id \ + UNION ALL \ + SELECT \ + providers.id AS provider_id, \ + providers.name AS provider_name, \ + measurements.id AS measurement_id, \ + measurements.name AS measurement_name, \ + measurements.version AS measurement_version, \ + fields.id AS field_id, \ + fields.name AS field_name, \ + daily_last_text.day AS day, \ + daily_last_text.value AS value, \ + \"text\" as value_type \ + FROM providers, measurements, fields, daily_last_text \ + WHERE \ + daily_last_text.field_id = fields.id \ + AND fields.measurement_id = measurements.id \ + AND measurements.provider_id = providers.id", + + // Mutation. + + addProvider: "INSERT INTO providers (name) VALUES (:provider)", + + setProviderState: "\ +INSERT OR REPLACE INTO provider_state \ + (provider_id, name, value) \ + VALUES (:provider_id, :name, :value)", + + addMeasurement: "\ +INSERT INTO measurements (provider_id, name, version) \ + VALUES (:provider_id, :measurement, :version)", + + addType: "INSERT INTO types (name) VALUES (:name)", + + addField: "\ +INSERT INTO fields (measurement_id, name, value_type) \ + VALUES (:measurement_id, :field, :value_type)", + + incrementDailyCounterFromFieldID: "\ +INSERT OR REPLACE INTO daily_counters VALUES (\ + :field_id, \ + :days, \ + COALESCE(\ + (SELECT value FROM daily_counters WHERE \ + field_id = :field_id AND day = :days \ + ), \ + 0\ + ) + :by)", + + deleteLastNumericFromFieldID: "\ +DELETE FROM last_numeric WHERE field_id = :field_id", + + deleteLastTextFromFieldID: "\ +DELETE FROM last_text WHERE field_id = :field_id", + + setLastNumeric: "\ +INSERT OR REPLACE INTO last_numeric VALUES (:field_id, :days, :value)", + + setLastText: "\ +INSERT OR REPLACE INTO last_text VALUES (:field_id, :days, :value)", + + setDailyLastNumeric: "\ +INSERT OR REPLACE INTO daily_last_numeric VALUES (:field_id, :days, :value)", + + setDailyLastText: "\ +INSERT OR REPLACE INTO daily_last_text VALUES (:field_id, :days, :value)", + + addDailyDiscreteNumeric: "\ +INSERT INTO daily_discrete_numeric \ +(field_id, day, value) VALUES (:field_id, :days, :value)", + + addDailyDiscreteText: "\ +INSERT INTO daily_discrete_text \ +(field_id, day, value) VALUES (:field_id, :days, :value)", + + pruneOldDailyCounters: "DELETE FROM daily_counters WHERE day < :days", + pruneOldDailyDiscreteNumeric: "DELETE FROM daily_discrete_numeric WHERE day < :days", + pruneOldDailyDiscreteText: "DELETE FROM daily_discrete_text WHERE day < :days", + pruneOldDailyLastNumeric: "DELETE FROM daily_last_numeric WHERE day < :days", + pruneOldDailyLastText: "DELETE FROM daily_last_text WHERE day < :days", + pruneOldLastNumeric: "DELETE FROM last_numeric WHERE day < :days", + pruneOldLastText: "DELETE FROM last_text WHERE day < :days", + + // Retrieval. + + getProviderID: "SELECT id FROM providers WHERE name = :provider", + + getProviders: "SELECT id, name FROM providers", + + getProviderStateWithName: "\ +SELECT value FROM provider_state \ + WHERE provider_id = :provider_id \ + AND name = :name", + + getMeasurements: "SELECT * FROM v_measurements", + + getMeasurementID: "\ +SELECT id FROM measurements \ + WHERE provider_id = :provider_id \ + AND name = :measurement \ + AND version = :version", + + getFieldID: "\ +SELECT id FROM fields \ + WHERE measurement_id = :measurement_id \ + AND name = :field \ + AND value_type = :value_type \ +", + + getTypes: "SELECT * FROM types", + + getTypeID: "SELECT id FROM types WHERE name = :name", + + getDailyCounterCountsFromFieldID: "\ +SELECT day, value FROM daily_counters \ + WHERE field_id = :field_id \ + ORDER BY day ASC", + + getDailyCounterCountFromFieldID: "\ +SELECT value FROM daily_counters \ + WHERE field_id = :field_id \ + AND day = :days", + + getMeasurementDailyCounters: "\ +SELECT field_name, day, value FROM v_daily_counters \ +WHERE measurement_id = :measurement_id", + + getFieldInfo: "SELECT * FROM v_fields", + + getLastNumericFromFieldID: "\ +SELECT day, value FROM last_numeric WHERE field_id = :field_id", + + getLastTextFromFieldID: "\ +SELECT day, value FROM last_text WHERE field_id = :field_id", + + getMeasurementLastValues: "\ +SELECT field_name, day, value FROM v_last \ +WHERE measurement_id = :measurement_id", + + getDailyDiscreteNumericFromFieldID: "\ +SELECT day, value FROM daily_discrete_numeric \ + WHERE field_id = :field_id \ + ORDER BY day ASC, id ASC", + + getDailyDiscreteNumericFromFieldIDAndDay: "\ +SELECT day, value FROM daily_discrete_numeric \ + WHERE field_id = :field_id AND day = :days \ + ORDER BY id ASC", + + getDailyDiscreteTextFromFieldID: "\ +SELECT day, value FROM daily_discrete_text \ + WHERE field_id = :field_id \ + ORDER BY day ASC, id ASC", + + getDailyDiscreteTextFromFieldIDAndDay: "\ +SELECT day, value FROM daily_discrete_text \ + WHERE field_id = :field_id AND day = :days \ + ORDER BY id ASC", + + getMeasurementDailyDiscreteValues: "\ +SELECT field_name, day, value_id, value FROM v_daily_discrete \ +WHERE measurement_id = :measurement_id \ +ORDER BY day ASC, value_id ASC", + + getDailyLastNumericFromFieldID: "\ +SELECT day, value FROM daily_last_numeric \ + WHERE field_id = :field_id \ + ORDER BY day ASC", + + getDailyLastNumericFromFieldIDAndDay: "\ +SELECT day, value FROM daily_last_numeric \ + WHERE field_id = :field_id AND day = :days", + + getDailyLastTextFromFieldID: "\ +SELECT day, value FROM daily_last_text \ + WHERE field_id = :field_id \ + ORDER BY day ASC", + + getDailyLastTextFromFieldIDAndDay: "\ +SELECT day, value FROM daily_last_text \ + WHERE field_id = :field_id AND day = :days", + + getMeasurementDailyLastValues: "\ +SELECT field_name, day, value FROM v_daily_last \ +WHERE measurement_id = :measurement_id", +}; + + +function dailyKeyFromDate(date) { + let year = String(date.getUTCFullYear()); + let month = String(date.getUTCMonth() + 1); + let day = String(date.getUTCDate()); + + if (month.length < 2) { + month = "0" + month; + } + + if (day.length < 2) { + day = "0" + day; + } + + return year + "-" + month + "-" + day; +} + + +/** + * Create a new backend instance bound to a SQLite database at the given path. + * + * This returns a promise that will resolve to a `MetricsStorageSqliteBackend` + * instance. The resolved instance will be initialized and ready for use. + * + * Very few consumers have a need to call this. Instead, a higher-level entity + * likely calls this and sets up the database connection for a service or + * singleton. + */ +this.MetricsStorageBackend = function (path) { + return Task.spawn(function initTask() { + let connection = yield Sqlite.openConnection({ + path: path, + + // There should only be one connection per database, so we disable this + // for perf reasons. + sharedMemoryCache: false, + }); + + // If we fail initializing the storage object, we need to close the + // database connection or else Storage will assert on shutdown. + let storage; + try { + storage = new MetricsStorageSqliteBackend(connection); + yield storage._init(); + } catch (ex) { + yield connection.close(); + throw ex; + } + + throw new Task.Result(storage); + }); +}; + + +/** + * Manages storage of metrics data in a SQLite database. + * + * This is the main type used for interfacing with the database. + * + * Instances of this should be obtained by calling MetricsStorageConnection(). + * + * The current implementation will not work if the database is mutated by + * multiple connections because of the way we cache primary keys. + * + * FUTURE enforce 1 read/write connection per database limit. + */ +function MetricsStorageSqliteBackend(connection) { + this._log = Log.repository.getLogger("Services.Metrics.MetricsStorage"); + + this._connection = connection; + this._enabledWALCheckpointPages = null; + + // Integer IDs to string name. + this._typesByID = new Map(); + + // String name to integer IDs. + this._typesByName = new Map(); + + // Maps provider names to integer IDs. + this._providerIDs = new Map(); + + // Maps :-delimited strings of [provider name, name, version] to integer IDs. + this._measurementsByInfo = new Map(); + + // Integer IDs to Arrays of [provider name, name, version]. + this._measurementsByID = new Map(); + + // Integer IDs to Arrays of [measurement id, field name, value name] + this._fieldsByID = new Map(); + + // Maps :-delimited strings of [measurement id, field name] to integer ID. + this._fieldsByInfo = new Map(); + + // Maps measurement ID to sets of field IDs. + this._fieldsByMeasurement = new Map(); + + this._queuedOperations = []; + this._queuedInProgress = false; +} + +MetricsStorageSqliteBackend.prototype = Object.freeze({ + // Max size (in kibibytes) the WAL log is allowed to grow to before it is + // checkpointed. + // + // This was first deployed in bug 848136. We want a value large enough + // that we aren't checkpointing all the time. However, we want it + // small enough so we don't have to read so much when we open the + // database. + MAX_WAL_SIZE_KB: 512, + + FIELD_DAILY_COUNTER: "daily-counter", + FIELD_DAILY_DISCRETE_NUMERIC: "daily-discrete-numeric", + FIELD_DAILY_DISCRETE_TEXT: "daily-discrete-text", + FIELD_DAILY_LAST_NUMERIC: "daily-last-numeric", + FIELD_DAILY_LAST_TEXT: "daily-last-text", + FIELD_LAST_NUMERIC: "last-numeric", + FIELD_LAST_TEXT: "last-text", + + _BUILTIN_TYPES: [ + "FIELD_DAILY_COUNTER", + "FIELD_DAILY_DISCRETE_NUMERIC", + "FIELD_DAILY_DISCRETE_TEXT", + "FIELD_DAILY_LAST_NUMERIC", + "FIELD_DAILY_LAST_TEXT", + "FIELD_LAST_NUMERIC", + "FIELD_LAST_TEXT", + ], + + // Statements that are used to create the initial DB schema. + _SCHEMA_STATEMENTS: [ + "createProvidersTable", + "createProviderStateTable", + "createProviderStateProviderIndex", + "createMeasurementsTable", + "createMeasurementsProviderIndex", + "createMeasurementsView", + "createTypesTable", + "createFieldsTable", + "createFieldsMeasurementIndex", + "createFieldsView", + "createDailyCountersTable", + "createDailyCountersFieldIndex", + "createDailyCountersDayIndex", + "createDailyCountersView", + "createDailyDiscreteNumericsTable", + "createDailyDiscreteNumericsFieldIndex", + "createDailyDiscreteNumericsDayIndex", + "createDailyDiscreteTextTable", + "createDailyDiscreteTextFieldIndex", + "createDailyDiscreteTextDayIndex", + "createDailyDiscreteView", + "createDailyLastNumericTable", + "createDailyLastNumericFieldIndex", + "createDailyLastNumericDayIndex", + "createDailyLastTextTable", + "createDailyLastTextFieldIndex", + "createDailyLastTextDayIndex", + "createDailyLastView", + "createLastNumericTable", + "createLastTextTable", + "createLastView", + ], + + // Statements that are used to prune old data. + _PRUNE_STATEMENTS: [ + "pruneOldDailyCounters", + "pruneOldDailyDiscreteNumeric", + "pruneOldDailyDiscreteText", + "pruneOldDailyLastNumeric", + "pruneOldDailyLastText", + "pruneOldLastNumeric", + "pruneOldLastText", + ], + + /** + * Close the database connection. + * + * This should be called on all instances or the SQLite layer may complain + * loudly. After this has been called, the connection cannot be used. + * + * @return Promise<> + */ + close: function () { + return Task.spawn(function doClose() { + // There is some light magic involved here. First, we enqueue an + // operation to ensure that all pending operations have the opportunity + // to execute. We additionally execute a SQL operation. Due to the FIFO + // execution order of issued statements, this will cause us to wait on + // any outstanding statements before closing. + try { + yield this.enqueueOperation(function dummyOperation() { + return this._connection.execute("SELECT 1"); + }.bind(this)); + } catch (ex) {} + + try { + yield this._connection.close(); + } finally { + this._connection = null; + } + }.bind(this)); + }, + + /** + * Whether a provider is known to exist. + * + * @param provider + * (string) Name of the provider. + */ + hasProvider: function (provider) { + return this._providerIDs.has(provider); + }, + + /** + * Whether a measurement is known to exist. + * + * @param provider + * (string) Name of the provider. + * @param name + * (string) Name of the measurement. + * @param version + * (Number) Integer measurement version. + */ + hasMeasurement: function (provider, name, version) { + return this._measurementsByInfo.has([provider, name, version].join(":")); + }, + + /** + * Whether a named field exists in a measurement. + * + * @param measurementID + * (Number) The integer primary key of the measurement. + * @param field + * (string) The name of the field to look for. + */ + hasFieldFromMeasurement: function (measurementID, field) { + return this._fieldsByInfo.has([measurementID, field].join(":")); + }, + + /** + * Whether a field is known. + * + * @param provider + * (string) Name of the provider having the field. + * @param measurement + * (string) Name of the measurement in the provider having the field. + * @param field + * (string) Name of the field in the measurement. + */ + hasField: function (provider, measurement, version, field) { + let key = [provider, measurement, version].join(":"); + let measurementID = this._measurementsByInfo.get(key); + if (!measurementID) { + return false; + } + + return this.hasFieldFromMeasurement(measurementID, field); + }, + + /** + * Look up the integer primary key of a provider. + * + * @param provider + * (string) Name of the provider. + */ + providerID: function (provider) { + return this._providerIDs.get(provider); + }, + + /** + * Look up the integer primary key of a measurement. + * + * @param provider + * (string) Name of the provider. + * @param measurement + * (string) Name of the measurement. + * @param version + * (Number) Integer version of the measurement. + */ + measurementID: function (provider, measurement, version) { + return this._measurementsByInfo.get([provider, measurement, version].join(":")); + }, + + fieldIDFromMeasurement: function (measurementID, field) { + return this._fieldsByInfo.get([measurementID, field].join(":")); + }, + + fieldID: function (provider, measurement, version, field) { + let measurementID = this.measurementID(provider, measurement, version); + if (!measurementID) { + return null; + } + + return this.fieldIDFromMeasurement(measurementID, field); + }, + + measurementHasAnyDailyCounterFields: function (measurementID) { + return this.measurementHasAnyFieldsOfTypes(measurementID, + [this.FIELD_DAILY_COUNTER]); + }, + + measurementHasAnyLastFields: function (measurementID) { + return this.measurementHasAnyFieldsOfTypes(measurementID, + [this.FIELD_LAST_NUMERIC, + this.FIELD_LAST_TEXT]); + }, + + measurementHasAnyDailyLastFields: function (measurementID) { + return this.measurementHasAnyFieldsOfTypes(measurementID, + [this.FIELD_DAILY_LAST_NUMERIC, + this.FIELD_DAILY_LAST_TEXT]); + }, + + measurementHasAnyDailyDiscreteFields: function (measurementID) { + return this.measurementHasAnyFieldsOfTypes(measurementID, + [this.FIELD_DAILY_DISCRETE_NUMERIC, + this.FIELD_DAILY_DISCRETE_TEXT]); + }, + + measurementHasAnyFieldsOfTypes: function (measurementID, types) { + if (!this._fieldsByMeasurement.has(measurementID)) { + return false; + } + + let fieldIDs = this._fieldsByMeasurement.get(measurementID); + for (let fieldID of fieldIDs) { + let fieldType = this._fieldsByID.get(fieldID)[2]; + if (types.indexOf(fieldType) != -1) { + return true; + } + } + + return false; + }, + + /** + * Register a measurement with the backend. + * + * Measurements must be registered before storage can be allocated to them. + * + * A measurement consists of a string name and integer version attached + * to a named provider. + * + * This returns a promise that resolves to the storage ID for this + * measurement. + * + * If the measurement is not known to exist, it is registered with storage. + * If the measurement has already been registered, this is effectively a + * no-op (that still returns a promise resolving to the storage ID). + * + * @param provider + * (string) Name of the provider this measurement belongs to. + * @param name + * (string) Name of this measurement. + * @param version + * (Number) Integer version of this measurement. + */ + registerMeasurement: function (provider, name, version) { + if (this.hasMeasurement(provider, name, version)) { + return CommonUtils.laterTickResolvingPromise( + this.measurementID(provider, name, version)); + } + + // Registrations might not be safe to perform in parallel with provider + // operations. So, we queue them. + let self = this; + return this.enqueueOperation(function createMeasurementOperation() { + return Task.spawn(function createMeasurement() { + let providerID = self._providerIDs.get(provider); + + if (!providerID) { + yield self._connection.executeCached(SQL.addProvider, {provider: provider}); + let rows = yield self._connection.executeCached(SQL.getProviderID, + {provider: provider}); + + providerID = rows[0].getResultByIndex(0); + + self._providerIDs.set(provider, providerID); + } + + let params = { + provider_id: providerID, + measurement: name, + version: version, + }; + + yield self._connection.executeCached(SQL.addMeasurement, params); + let rows = yield self._connection.executeCached(SQL.getMeasurementID, params); + + let measurementID = rows[0].getResultByIndex(0); + + self._measurementsByInfo.set([provider, name, version].join(":"), measurementID); + self._measurementsByID.set(measurementID, [provider, name, version]); + self._fieldsByMeasurement.set(measurementID, new Set()); + + throw new Task.Result(measurementID); + }); + }); + }, + + /** + * Register a field with the backend. + * + * Fields are what recorded pieces of data are primarily associated with. + * + * Fields are associated with measurements. Measurements must be registered + * via `registerMeasurement` before fields can be registered. This is + * enforced by this function requiring the database primary key of the + * measurement as an argument. + * + * @param measurementID + * (Number) Integer primary key of measurement this field belongs to. + * @param field + * (string) Name of this field. + * @param valueType + * (string) Type name of this field. Must be a registered type. Is + * likely one of the FIELD_ constants on this type. + * + * @return Promise + */ + registerField: function (measurementID, field, valueType) { + if (!valueType) { + throw new Error("Value type must be defined."); + } + + if (!this._measurementsByID.has(measurementID)) { + throw new Error("Measurement not known: " + measurementID); + } + + if (!this._typesByName.has(valueType)) { + throw new Error("Unknown value type: " + valueType); + } + + let typeID = this._typesByName.get(valueType); + + if (!typeID) { + throw new Error("Undefined type: " + valueType); + } + + if (this.hasFieldFromMeasurement(measurementID, field)) { + let id = this.fieldIDFromMeasurement(measurementID, field); + let existingType = this._fieldsByID.get(id)[2]; + + if (valueType != existingType) { + throw new Error("Field already defined with different type: " + existingType); + } + + return CommonUtils.laterTickResolvingPromise( + this.fieldIDFromMeasurement(measurementID, field)); + } + + let self = this; + return Task.spawn(function createField() { + let params = { + measurement_id: measurementID, + field: field, + value_type: typeID, + }; + + yield self._connection.executeCached(SQL.addField, params); + + let rows = yield self._connection.executeCached(SQL.getFieldID, params); + + let fieldID = rows[0].getResultByIndex(0); + + self._fieldsByID.set(fieldID, [measurementID, field, valueType]); + self._fieldsByInfo.set([measurementID, field].join(":"), fieldID); + self._fieldsByMeasurement.get(measurementID).add(fieldID); + + throw new Task.Result(fieldID); + }); + }, + + /** + * Initializes this instance with the database. + * + * This performs 2 major roles: + * + * 1) Set up database schema (creates tables). + * 2) Synchronize database with local instance. + */ + _init: function() { + let self = this; + return Task.spawn(function initTask() { + // 0. Database file and connection configuration. + + // This should never fail. But, we assume the default of 1024 in case it + // does. + let rows = yield self._connection.execute("PRAGMA page_size"); + let pageSize = 1024; + if (rows.length) { + pageSize = rows[0].getResultByIndex(0); + } + + self._log.debug("Page size is " + pageSize); + + // Ensure temp tables are stored in memory, not on disk. + yield self._connection.execute("PRAGMA temp_store=MEMORY"); + + let journalMode; + rows = yield self._connection.execute("PRAGMA journal_mode=WAL"); + if (rows.length) { + journalMode = rows[0].getResultByIndex(0); + } + + self._log.info("Journal mode is " + journalMode); + + if (journalMode == "wal") { + self._enabledWALCheckpointPages = + Math.ceil(self.MAX_WAL_SIZE_KB * 1024 / pageSize); + + self._log.info("WAL auto checkpoint pages: " + + self._enabledWALCheckpointPages); + + // We disable auto checkpoint during initialization to make it + // quicker. + yield self.setAutoCheckpoint(0); + } else { + if (journalMode != "truncate") { + // Fall back to truncate (which is faster than delete). + yield self._connection.execute("PRAGMA journal_mode=TRUNCATE"); + } + + // And always use full synchronous mode to reduce possibility for data + // loss. + yield self._connection.execute("PRAGMA synchronous=FULL"); + } + + let doCheckpoint = false; + + // 1. Create the schema. + yield self._connection.executeTransaction(function ensureSchema(conn) { + let schema = yield conn.getSchemaVersion(); + + if (schema == 0) { + self._log.info("Creating database schema."); + + for (let k of self._SCHEMA_STATEMENTS) { + yield self._connection.execute(SQL[k]); + } + + yield self._connection.setSchemaVersion(1); + doCheckpoint = true; + } else if (schema != 1) { + throw new Error("Unknown database schema: " + schema); + } else { + self._log.debug("Database schema up to date."); + } + }); + + // 2. Retrieve existing types. + yield self._connection.execute(SQL.getTypes, null, function onRow(row) { + let id = row.getResultByName("id"); + let name = row.getResultByName("name"); + + self._typesByID.set(id, name); + self._typesByName.set(name, id); + }); + + // 3. Populate built-in types with database. + let missingTypes = []; + for (let type of self._BUILTIN_TYPES) { + type = self[type]; + if (self._typesByName.has(type)) { + continue; + } + + missingTypes.push(type); + } + + // Don't perform DB transaction unless there is work to do. + if (missingTypes.length) { + yield self._connection.executeTransaction(function populateBuiltinTypes() { + for (let type of missingTypes) { + let params = {name: type}; + yield self._connection.executeCached(SQL.addType, params); + let rows = yield self._connection.executeCached(SQL.getTypeID, params); + let id = rows[0].getResultByIndex(0); + + self._typesByID.set(id, type); + self._typesByName.set(type, id); + } + }); + + doCheckpoint = true; + } + + // 4. Obtain measurement info. + yield self._connection.execute(SQL.getMeasurements, null, function onRow(row) { + let providerID = row.getResultByName("provider_id"); + let providerName = row.getResultByName("provider_name"); + let measurementID = row.getResultByName("measurement_id"); + let measurementName = row.getResultByName("measurement_name"); + let measurementVersion = row.getResultByName("measurement_version"); + + self._providerIDs.set(providerName, providerID); + + let info = [providerName, measurementName, measurementVersion].join(":"); + + self._measurementsByInfo.set(info, measurementID); + self._measurementsByID.set(measurementID, info); + self._fieldsByMeasurement.set(measurementID, new Set()); + }); + + // 5. Obtain field info. + yield self._connection.execute(SQL.getFieldInfo, null, function onRow(row) { + let measurementID = row.getResultByName("measurement_id"); + let fieldID = row.getResultByName("field_id"); + let fieldName = row.getResultByName("field_name"); + let typeName = row.getResultByName("type_name"); + + self._fieldsByID.set(fieldID, [measurementID, fieldName, typeName]); + self._fieldsByInfo.set([measurementID, fieldName].join(":"), fieldID); + self._fieldsByMeasurement.get(measurementID).add(fieldID); + }); + + // Perform a checkpoint after initialization (if needed) and + // enable auto checkpoint during regular operation. + if (doCheckpoint) { + yield self.checkpoint(); + } + + yield self.setAutoCheckpoint(1); + }); + }, + + /** + * Prune all data from earlier than the specified date. + * + * Data stored on days before the specified Date will be permanently + * deleted. + * + * This returns a promise that will be resolved when data has been deleted. + * + * @param date + * (Date) Old data threshold. + * @return Promise<> + */ + pruneDataBefore: function (date) { + let statements = this._PRUNE_STATEMENTS; + + let self = this; + return this.enqueueOperation(function doPrune() { + return self._connection.executeTransaction(function prune(conn) { + let days = dateToDays(date); + + let params = {days: days}; + for (let name of statements) { + yield conn.execute(SQL[name], params); + } + }); + }); + }, + + /** + * Reduce memory usage as much as possible. + * + * This returns a promise that will be resolved on completion. + * + * @return Promise<> + */ + compact: function () { + let self = this; + return this.enqueueOperation(function doCompact() { + self._connection.discardCachedStatements(); + return self._connection.shrinkMemory(); + }); + }, + + /** + * Checkpoint writes requiring flush to disk. + * + * This is called to persist queued and non-flushed writes to disk. + * It will force an fsync, so it is expensive and should be used + * sparingly. + */ + checkpoint: function () { + if (!this._enabledWALCheckpointPages) { + return CommonUtils.laterTickResolvingPromise(); + } + + return this.enqueueOperation(function checkpoint() { + this._log.info("Performing manual WAL checkpoint."); + return this._connection.execute("PRAGMA wal_checkpoint"); + }.bind(this)); + }, + + setAutoCheckpoint: function (on) { + // If we aren't in WAL mode, wal_autocheckpoint won't do anything so + // we no-op. + if (!this._enabledWALCheckpointPages) { + return CommonUtils.laterTickResolvingPromise(); + } + + let val = on ? this._enabledWALCheckpointPages : 0; + + return this.enqueueOperation(function setWALCheckpoint() { + this._log.info("Setting WAL auto checkpoint to " + val); + return this._connection.execute("PRAGMA wal_autocheckpoint=" + val); + }.bind(this)); + }, + + /** + * Ensure a field ID matches a specified type. + * + * This is called internally as part of adding values to ensure that + * the type of a field matches the operation being performed. + */ + _ensureFieldType: function (id, type) { + let info = this._fieldsByID.get(id); + + if (!info || !Array.isArray(info)) { + throw new Error("Unknown field ID: " + id); + } + + if (type != info[2]) { + throw new Error("Field type does not match the expected for this " + + "operation. Actual: " + info[2] + "; Expected: " + + type); + } + }, + + /** + * Enqueue a storage operation to be performed when the database is ready. + * + * The primary use case of this function is to prevent potentially + * conflicting storage operations from being performed in parallel. By + * calling this function, passed storage operations will be serially + * executed, avoiding potential order of operation issues. + * + * The passed argument is a function that will perform storage operations. + * The function should return a promise that will be resolved when all + * storage operations have been completed. + * + * The passed function may be executed immediately. If there are already + * queued operations, it will be appended to the queue and executed after all + * before it have finished. + * + * This function returns a promise that will be resolved or rejected with + * the same value that the function's promise was resolved or rejected with. + * + * @param func + * (function) Function performing storage interactions. + * @return Promise<> + */ + enqueueOperation: function (func) { + if (typeof(func) != "function") { + throw new Error("enqueueOperation expects a function. Got: " + typeof(func)); + } + + this._log.trace("Enqueueing operation."); + let deferred = Promise.defer(); + + this._queuedOperations.push([func, deferred]); + + if (this._queuedOperations.length == 1) { + this._popAndPerformQueuedOperation(); + } + + return deferred.promise; + }, + + /** + * Enqueue a function to be performed as a transaction. + * + * The passed function should be a generator suitable for calling with + * `executeTransaction` from the SQLite connection. + */ + enqueueTransaction: function (func, type) { + return this.enqueueOperation( + this._connection.executeTransaction.bind(this._connection, func, type) + ); + }, + + _popAndPerformQueuedOperation: function () { + if (!this._queuedOperations.length || this._queuedInProgress) { + return; + } + + this._log.trace("Performing queued operation."); + let [func, deferred] = this._queuedOperations.shift(); + let promise; + + try { + this._queuedInProgress = true; + promise = func(); + } catch (ex) { + this._log.warn("Queued operation threw during execution: " + + CommonUtils.exceptionStr(ex)); + this._queuedInProgress = false; + deferred.reject(ex); + this._popAndPerformQueuedOperation(); + return; + } + + if (!promise || typeof(promise.then) != "function") { + let msg = "Queued operation did not return a promise: " + func; + this._log.warn(msg); + + this._queuedInProgress = false; + deferred.reject(new Error(msg)); + this._popAndPerformQueuedOperation(); + return; + } + + promise.then( + function onSuccess(result) { + this._log.trace("Queued operation completed."); + this._queuedInProgress = false; + deferred.resolve(result); + this._popAndPerformQueuedOperation(); + }.bind(this), + function onError(error) { + this._log.warn("Failure when performing queued operation: " + + CommonUtils.exceptionStr(error)); + this._queuedInProgress = false; + deferred.reject(error); + this._popAndPerformQueuedOperation(); + }.bind(this) + ); + }, + + /** + * Obtain all values associated with a measurement. + * + * This returns a promise that resolves to an object. The keys of the object + * are: + * + * days -- DailyValues where the values are Maps of field name to data + * structures. The data structures could be simple (string or number) or + * Arrays if the field type allows multiple values per day. + * + * singular -- Map of field names to values. This holds all fields that + * don't have a temporal component. + * + * @param id + * (Number) Primary key of measurement whose values to retrieve. + */ + getMeasurementValues: function (id) { + let deferred = Promise.defer(); + let days = new DailyValues(); + let singular = new Map(); + + let self = this; + this.enqueueOperation(function enqueuedGetMeasurementValues() { + return Task.spawn(function fetchMeasurementValues() { + function handleResult(data) { + for (let [field, values] of data) { + for (let [day, value] of Iterator(values)) { + if (!days.hasDay(day)) { + days.setDay(day, new Map()); + } + + days.getDay(day).set(field, value); + } + } + } + + if (self.measurementHasAnyDailyCounterFields(id)) { + let counters = yield self.getMeasurementDailyCountersFromMeasurementID(id); + handleResult(counters); + } + + if (self.measurementHasAnyDailyLastFields(id)) { + let dailyLast = yield self.getMeasurementDailyLastValuesFromMeasurementID(id); + handleResult(dailyLast); + } + + if (self.measurementHasAnyDailyDiscreteFields(id)) { + let dailyDiscrete = yield self.getMeasurementDailyDiscreteValuesFromMeasurementID(id); + handleResult(dailyDiscrete); + } + + if (self.measurementHasAnyLastFields(id)) { + let last = yield self.getMeasurementLastValuesFromMeasurementID(id); + + for (let [field, value] of last) { + singular.set(field, value); + } + } + + }); + }).then(function onSuccess() { + deferred.resolve({singular: singular, days: days}); + }, function onError(error) { + deferred.reject(error); + }); + + return deferred.promise; + }, + + //--------------------------------------------------------------------------- + // Low-level storage operations + // + // These will be performed immediately (or at least as soon as the underlying + // connection allows them to be.) It is recommended to call these from within + // a function added via `enqueueOperation()` or they may inadvertently be + // performed during another enqueued operation, which may be a transaction + // that is rolled back. + // --------------------------------------------------------------------------- + + /** + * Set state for a provider. + * + * Providers have the ability to register persistent state with the backend. + * Persistent state doesn't expire. The format of the data is completely up + * to the provider beyond the requirement that values be UTF-8 strings. + * + * This returns a promise that will be resolved when the underlying database + * operation has completed. + * + * @param provider + * (string) Name of the provider. + * @param key + * (string) Key under which to store this state. + * @param value + * (string) Value for this state. + * @return Promise<> + */ + setProviderState: function (provider, key, value) { + if (typeof(key) != "string") { + throw new Error("State key must be a string. Got: " + key); + } + + if (typeof(value) != "string") { + throw new Error("State value must be a string. Got: " + value); + } + + let id = this.providerID(provider); + if (!id) { + throw new Error("Unknown provider: " + provider); + } + + return this._connection.executeCached(SQL.setProviderState, { + provider_id: id, + name: key, + value: value, + }); + }, + + /** + * Obtain named state for a provider. + * + * + * The returned promise will resolve to the state from the database or null + * if the key is not stored. + * + * @param provider + * (string) The name of the provider whose state to obtain. + * @param key + * (string) The state's key to retrieve. + * + * @return Promise + */ + getProviderState: function (provider, key) { + let id = this.providerID(provider); + if (!id) { + throw new Error("Unknown provider: " + provider); + } + + let conn = this._connection; + return Task.spawn(function queryDB() { + let rows = yield conn.executeCached(SQL.getProviderStateWithName, { + provider_id: id, + name: key, + }); + + if (!rows.length) { + throw new Task.Result(null); + } + + throw new Task.Result(rows[0].getResultByIndex(0)); + }); + }, + + /** + * Increment a daily counter from a numeric field id. + * + * @param id + * (integer) Primary key of field to increment. + * @param date + * (Date) When the increment occurred. This is typically "now" but can + * be explicitly defined for events that occurred in the past. + * @param by + * (integer) How much to increment the value by. Defaults to 1. + */ + incrementDailyCounterFromFieldID: function (id, date=new Date(), by=1) { + this._ensureFieldType(id, this.FIELD_DAILY_COUNTER); + + let params = { + field_id: id, + days: dateToDays(date), + by: by, + }; + + return this._connection.executeCached(SQL.incrementDailyCounterFromFieldID, + params); + }, + + /** + * Obtain all counts for a specific daily counter. + * + * @param id + * (integer) The ID of the field being retrieved. + */ + getDailyCounterCountsFromFieldID: function (id) { + this._ensureFieldType(id, this.FIELD_DAILY_COUNTER); + + let self = this; + return Task.spawn(function fetchCounterDays() { + let rows = yield self._connection.executeCached(SQL.getDailyCounterCountsFromFieldID, + {field_id: id}); + + let result = new DailyValues(); + for (let row of rows) { + let days = row.getResultByIndex(0); + let counter = row.getResultByIndex(1); + + let date = daysToDate(days); + result.setDay(date, counter); + } + + throw new Task.Result(result); + }); + }, + + /** + * Get the value of a daily counter for a given day. + * + * @param field + * (integer) Field ID to retrieve. + * @param date + * (Date) Date for day from which to obtain data. + */ + getDailyCounterCountFromFieldID: function (field, date) { + this._ensureFieldType(field, this.FIELD_DAILY_COUNTER); + + let params = { + field_id: field, + days: dateToDays(date), + }; + + let self = this; + return Task.spawn(function fetchCounter() { + let rows = yield self._connection.executeCached(SQL.getDailyCounterCountFromFieldID, + params); + if (!rows.length) { + throw new Task.Result(null); + } + + throw new Task.Result(rows[0].getResultByIndex(0)); + }); + }, + + /** + * Define the value for a "last numeric" field. + * + * The previous value (if any) will be replaced by the value passed, even if + * the date of the incoming value is older than what's recorded in the + * database. + * + * @param fieldID + * (Number) Integer primary key of field to update. + * @param value + * (Number) Value to record. + * @param date + * (Date) When this value was produced. + */ + setLastNumericFromFieldID: function (fieldID, value, date=new Date()) { + this._ensureFieldType(fieldID, this.FIELD_LAST_NUMERIC); + + if (typeof(value) != "number") { + throw new Error("Value is not a number: " + value); + } + + let params = { + field_id: fieldID, + days: dateToDays(date), + value: value, + }; + + return this._connection.executeCached(SQL.setLastNumeric, params); + }, + + /** + * Define the value of a "last text" field. + * + * See `setLastNumericFromFieldID` for behavior. + */ + setLastTextFromFieldID: function (fieldID, value, date=new Date()) { + this._ensureFieldType(fieldID, this.FIELD_LAST_TEXT); + + if (typeof(value) != "string") { + throw new Error("Value is not a string: " + value); + } + + let params = { + field_id: fieldID, + days: dateToDays(date), + value: value, + }; + + return this._connection.executeCached(SQL.setLastText, params); + }, + + /** + * Obtain the value of a "last numeric" field. + * + * This returns a promise that will be resolved with an Array of [date, value] + * if a value is known or null if no last value is present. + * + * @param fieldID + * (Number) Integer primary key of field to retrieve. + */ + getLastNumericFromFieldID: function (fieldID) { + this._ensureFieldType(fieldID, this.FIELD_LAST_NUMERIC); + + let self = this; + return Task.spawn(function fetchLastField() { + let rows = yield self._connection.executeCached(SQL.getLastNumericFromFieldID, + {field_id: fieldID}); + + if (!rows.length) { + throw new Task.Result(null); + } + + let row = rows[0]; + let days = row.getResultByIndex(0); + let value = row.getResultByIndex(1); + + throw new Task.Result([daysToDate(days), value]); + }); + }, + + /** + * Obtain the value of a "last text" field. + * + * See `getLastNumericFromFieldID` for behavior. + */ + getLastTextFromFieldID: function (fieldID) { + this._ensureFieldType(fieldID, this.FIELD_LAST_TEXT); + + let self = this; + return Task.spawn(function fetchLastField() { + let rows = yield self._connection.executeCached(SQL.getLastTextFromFieldID, + {field_id: fieldID}); + + if (!rows.length) { + throw new Task.Result(null); + } + + let row = rows[0]; + let days = row.getResultByIndex(0); + let value = row.getResultByIndex(1); + + throw new Task.Result([daysToDate(days), value]); + }); + }, + + /** + * Delete the value (if any) in a "last numeric" field. + */ + deleteLastNumericFromFieldID: function (fieldID) { + this._ensureFieldType(fieldID, this.FIELD_LAST_NUMERIC); + + return this._connection.executeCached(SQL.deleteLastNumericFromFieldID, + {field_id: fieldID}); + }, + + /** + * Delete the value (if any) in a "last text" field. + */ + deleteLastTextFromFieldID: function (fieldID) { + this._ensureFieldType(fieldID, this.FIELD_LAST_TEXT); + + return this._connection.executeCached(SQL.deleteLastTextFromFieldID, + {field_id: fieldID}); + }, + + /** + * Record a value for a "daily last numeric" field. + * + * The field can hold 1 value per calendar day. If the field already has a + * value for the day specified (defaults to now), that value will be + * replaced, even if the date specified is older (within the day) than the + * previously recorded value. + * + * @param fieldID + * (Number) Integer primary key of field. + * @param value + * (Number) Value to record. + * @param date + * (Date) When the value was produced. Defaults to now. + */ + setDailyLastNumericFromFieldID: function (fieldID, value, date=new Date()) { + this._ensureFieldType(fieldID, this.FIELD_DAILY_LAST_NUMERIC); + + let params = { + field_id: fieldID, + days: dateToDays(date), + value: value, + }; + + return this._connection.executeCached(SQL.setDailyLastNumeric, params); + }, + + /** + * Record a value for a "daily last text" field. + * + * See `setDailyLastNumericFromFieldID` for behavior. + */ + setDailyLastTextFromFieldID: function (fieldID, value, date=new Date()) { + this._ensureFieldType(fieldID, this.FIELD_DAILY_LAST_TEXT); + + let params = { + field_id: fieldID, + days: dateToDays(date), + value: value, + }; + + return this._connection.executeCached(SQL.setDailyLastText, params); + }, + + /** + * Obtain value(s) from a "daily last numeric" field. + * + * This returns a promise that resolves to a DailyValues instance. If `date` + * is specified, that instance will have at most 1 entry. If there is no + * `date` constraint, then all stored values will be retrieved. + * + * @param fieldID + * (Number) Integer primary key of field to retrieve. + * @param date optional + * (Date) If specified, only return data for this day. + * + * @return Promise + */ + getDailyLastNumericFromFieldID: function (fieldID, date=null) { + this._ensureFieldType(fieldID, this.FIELD_DAILY_LAST_NUMERIC); + + let params = {field_id: fieldID}; + let name = "getDailyLastNumericFromFieldID"; + + if (date) { + params.days = dateToDays(date); + name = "getDailyLastNumericFromFieldIDAndDay"; + } + + return this._getDailyLastFromFieldID(name, params); + }, + + /** + * Obtain value(s) from a "daily last text" field. + * + * See `getDailyLastNumericFromFieldID` for behavior. + */ + getDailyLastTextFromFieldID: function (fieldID, date=null) { + this._ensureFieldType(fieldID, this.FIELD_DAILY_LAST_TEXT); + + let params = {field_id: fieldID}; + let name = "getDailyLastTextFromFieldID"; + + if (date) { + params.days = dateToDays(date); + name = "getDailyLastTextFromFieldIDAndDay"; + } + + return this._getDailyLastFromFieldID(name, params); + }, + + _getDailyLastFromFieldID: function (name, params) { + let self = this; + return Task.spawn(function fetchDailyLastForField() { + let rows = yield self._connection.executeCached(SQL[name], params); + + let result = new DailyValues(); + for (let row of rows) { + let d = daysToDate(row.getResultByIndex(0)); + let value = row.getResultByIndex(1); + + result.setDay(d, value); + } + + throw new Task.Result(result); + }); + }, + + /** + * Add a new value for a "daily discrete numeric" field. + * + * This appends a new value to the list of values for a specific field. All + * values are retained. Duplicate values are allowed. + * + * @param fieldID + * (Number) Integer primary key of field. + * @param value + * (Number) Value to record. + * @param date optional + * (Date) When this value occurred. Values are bucketed by day. + */ + addDailyDiscreteNumericFromFieldID: function (fieldID, value, date=new Date()) { + this._ensureFieldType(fieldID, this.FIELD_DAILY_DISCRETE_NUMERIC); + + if (typeof(value) != "number") { + throw new Error("Number expected. Got: " + value); + } + + let params = { + field_id: fieldID, + days: dateToDays(date), + value: value, + }; + + return this._connection.executeCached(SQL.addDailyDiscreteNumeric, params); + }, + + /** + * Add a new value for a "daily discrete text" field. + * + * See `addDailyDiscreteNumericFromFieldID` for behavior. + */ + addDailyDiscreteTextFromFieldID: function (fieldID, value, date=new Date()) { + this._ensureFieldType(fieldID, this.FIELD_DAILY_DISCRETE_TEXT); + + if (typeof(value) != "string") { + throw new Error("String expected. Got: " + value); + } + + let params = { + field_id: fieldID, + days: dateToDays(date), + value: value, + }; + + return this._connection.executeCached(SQL.addDailyDiscreteText, params); + }, + + /** + * Obtain values for a "daily discrete numeric" field. + * + * This returns a promise that resolves to a `DailyValues` instance. If + * `date` is specified, there will be at most 1 key in that instance. If + * not, all data from the database will be retrieved. + * + * Values in that instance will be arrays of the raw values. + * + * @param fieldID + * (Number) Integer primary key of field to retrieve. + * @param date optional + * (Date) Day to obtain data for. Date can be any time in the day. + */ + getDailyDiscreteNumericFromFieldID: function (fieldID, date=null) { + this._ensureFieldType(fieldID, this.FIELD_DAILY_DISCRETE_NUMERIC); + + let params = {field_id: fieldID}; + + let name = "getDailyDiscreteNumericFromFieldID"; + + if (date) { + params.days = dateToDays(date); + name = "getDailyDiscreteNumericFromFieldIDAndDay"; + } + + return this._getDailyDiscreteFromFieldID(name, params); + }, + + /** + * Obtain values for a "daily discrete text" field. + * + * See `getDailyDiscreteNumericFromFieldID` for behavior. + */ + getDailyDiscreteTextFromFieldID: function (fieldID, date=null) { + this._ensureFieldType(fieldID, this.FIELD_DAILY_DISCRETE_TEXT); + + let params = {field_id: fieldID}; + + let name = "getDailyDiscreteTextFromFieldID"; + + if (date) { + params.days = dateToDays(date); + name = "getDailyDiscreteTextFromFieldIDAndDay"; + } + + return this._getDailyDiscreteFromFieldID(name, params); + }, + + _getDailyDiscreteFromFieldID: function (name, params) { + let self = this; + return Task.spawn(function fetchDailyDiscreteValuesForField() { + let rows = yield self._connection.executeCached(SQL[name], params); + + let result = new DailyValues(); + for (let row of rows) { + let d = daysToDate(row.getResultByIndex(0)); + let value = row.getResultByIndex(1); + + result.appendValue(d, value); + } + + throw new Task.Result(result); + }); + }, + + /** + * Obtain the counts of daily counters in a measurement. + * + * This returns a promise that resolves to a Map of field name strings to + * DailyValues that hold per-day counts. + * + * @param id + * (Number) Integer primary key of measurement. + * + * @return Promise + */ + getMeasurementDailyCountersFromMeasurementID: function (id) { + let self = this; + return Task.spawn(function fetchDailyCounters() { + let rows = yield self._connection.execute(SQL.getMeasurementDailyCounters, + {measurement_id: id}); + + let result = new Map(); + for (let row of rows) { + let field = row.getResultByName("field_name"); + let date = daysToDate(row.getResultByName("day")); + let value = row.getResultByName("value"); + + if (!result.has(field)) { + result.set(field, new DailyValues()); + } + + result.get(field).setDay(date, value); + } + + throw new Task.Result(result); + }); + }, + + /** + * Obtain the values of "last" fields from a measurement. + * + * This returns a promise that resolves to a Map of field name to an array + * of [date, value]. + * + * @param id + * (Number) Integer primary key of measurement whose data to retrieve. + * + * @return Promise + */ + getMeasurementLastValuesFromMeasurementID: function (id) { + let self = this; + return Task.spawn(function fetchMeasurementLastValues() { + let rows = yield self._connection.execute(SQL.getMeasurementLastValues, + {measurement_id: id}); + + let result = new Map(); + for (let row of rows) { + let date = daysToDate(row.getResultByIndex(1)); + let value = row.getResultByIndex(2); + result.set(row.getResultByIndex(0), [date, value]); + } + + throw new Task.Result(result); + }); + }, + + /** + * Obtain the values of "last daily" fields from a measurement. + * + * This returns a promise that resolves to a Map of field name to DailyValues + * instances. Each DailyValues instance has days for which a daily last value + * is defined. The values in each DailyValues are the raw last value for that + * day. + * + * @param id + * (Number) Integer primary key of measurement whose data to retrieve. + * + * @return Promise + */ + getMeasurementDailyLastValuesFromMeasurementID: function (id) { + let self = this; + return Task.spawn(function fetchMeasurementDailyLastValues() { + let rows = yield self._connection.execute(SQL.getMeasurementDailyLastValues, + {measurement_id: id}); + + let result = new Map(); + for (let row of rows) { + let field = row.getResultByName("field_name"); + let date = daysToDate(row.getResultByName("day")); + let value = row.getResultByName("value"); + + if (!result.has(field)) { + result.set(field, new DailyValues()); + } + + result.get(field).setDay(date, value); + } + + throw new Task.Result(result); + }); + }, + + /** + * Obtain the values of "daily discrete" fields from a measurement. + * + * This obtains all discrete values for all "daily discrete" fields in a + * measurement. + * + * This returns a promise that resolves to a Map. The Map's keys are field + * string names. Values are `DailyValues` instances. The values inside + * the `DailyValues` are arrays of the raw discrete values. + * + * @param id + * (Number) Integer primary key of measurement. + * + * @return Promise + */ + getMeasurementDailyDiscreteValuesFromMeasurementID: function (id) { + let deferred = Promise.defer(); + let result = new Map(); + + this._connection.execute(SQL.getMeasurementDailyDiscreteValues, + {measurement_id: id}, function onRow(row) { + let field = row.getResultByName("field_name"); + let date = daysToDate(row.getResultByName("day")); + let value = row.getResultByName("value"); + + if (!result.has(field)) { + result.set(field, new DailyValues()); + } + + result.get(field).appendValue(date, value); + }).then(function onComplete() { + deferred.resolve(result); + }, function onError(error) { + deferred.reject(error); + }); + + return deferred.promise; + }, +}); + +// Alias built-in field types to public API. +for (let property of MetricsStorageSqliteBackend.prototype._BUILTIN_TYPES) { + this.MetricsStorageBackend[property] = MetricsStorageSqliteBackend.prototype[property]; +} +