services/sync/modules/engines.js

changeset 0
6474c204b198
     1.1 --- /dev/null	Thu Jan 01 00:00:00 1970 +0000
     1.2 +++ b/services/sync/modules/engines.js	Wed Dec 31 06:09:35 2014 +0100
     1.3 @@ -0,0 +1,1559 @@
     1.4 +/* This Source Code Form is subject to the terms of the Mozilla Public
     1.5 + * License, v. 2.0. If a copy of the MPL was not distributed with this
     1.6 + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
     1.7 +
     1.8 +this.EXPORTED_SYMBOLS = [
     1.9 +  "EngineManager",
    1.10 +  "Engine",
    1.11 +  "SyncEngine",
    1.12 +  "Tracker",
    1.13 +  "Store"
    1.14 +];
    1.15 +
    1.16 +const {classes: Cc, interfaces: Ci, results: Cr, utils: Cu} = Components;
    1.17 +
    1.18 +Cu.import("resource://services-common/async.js");
    1.19 +Cu.import("resource://gre/modules/Log.jsm");
    1.20 +Cu.import("resource://services-common/observers.js");
    1.21 +Cu.import("resource://services-common/utils.js");
    1.22 +Cu.import("resource://services-sync/constants.js");
    1.23 +Cu.import("resource://services-sync/identity.js");
    1.24 +Cu.import("resource://services-sync/record.js");
    1.25 +Cu.import("resource://services-sync/resource.js");
    1.26 +Cu.import("resource://services-sync/util.js");
    1.27 +
    1.28 +/*
    1.29 + * Trackers are associated with a single engine and deal with
    1.30 + * listening for changes to their particular data type.
    1.31 + *
    1.32 + * There are two things they keep track of:
    1.33 + * 1) A score, indicating how urgently the engine wants to sync
    1.34 + * 2) A list of IDs for all the changed items that need to be synced
    1.35 + * and updating their 'score', indicating how urgently they
    1.36 + * want to sync.
    1.37 + *
    1.38 + */
    1.39 +this.Tracker = function Tracker(name, engine) {
    1.40 +  if (!engine) {
    1.41 +    throw new Error("Tracker must be associated with an Engine instance.");
    1.42 +  }
    1.43 +
    1.44 +  name = name || "Unnamed";
    1.45 +  this.name = this.file = name.toLowerCase();
    1.46 +  this.engine = engine;
    1.47 +
    1.48 +  this._log = Log.repository.getLogger("Sync.Tracker." + name);
    1.49 +  let level = Svc.Prefs.get("log.logger.engine." + this.name, "Debug");
    1.50 +  this._log.level = Log.Level[level];
    1.51 +
    1.52 +  this._score = 0;
    1.53 +  this._ignored = [];
    1.54 +  this.ignoreAll = false;
    1.55 +  this.changedIDs = {};
    1.56 +  this.loadChangedIDs();
    1.57 +
    1.58 +  Svc.Obs.add("weave:engine:start-tracking", this);
    1.59 +  Svc.Obs.add("weave:engine:stop-tracking", this);
    1.60 +};
    1.61 +
    1.62 +Tracker.prototype = {
    1.63 +  /*
    1.64 +   * Score can be called as often as desired to decide which engines to sync
    1.65 +   *
    1.66 +   * Valid values for score:
    1.67 +   * -1: Do not sync unless the user specifically requests it (almost disabled)
    1.68 +   * 0: Nothing has changed
    1.69 +   * 100: Please sync me ASAP!
    1.70 +   *
    1.71 +   * Setting it to other values should (but doesn't currently) throw an exception
    1.72 +   */
    1.73 +  get score() {
    1.74 +    return this._score;
    1.75 +  },
    1.76 +
    1.77 +  set score(value) {
    1.78 +    this._score = value;
    1.79 +    Observers.notify("weave:engine:score:updated", this.name);
    1.80 +  },
    1.81 +
    1.82 +  // Should be called by service everytime a sync has been done for an engine
    1.83 +  resetScore: function () {
    1.84 +    this._score = 0;
    1.85 +  },
    1.86 +
    1.87 +  persistChangedIDs: true,
    1.88 +
    1.89 +  /**
    1.90 +   * Persist changedIDs to disk at a later date.
    1.91 +   * Optionally pass a callback to be invoked when the write has occurred.
    1.92 +   */
    1.93 +  saveChangedIDs: function (cb) {
    1.94 +    if (!this.persistChangedIDs) {
    1.95 +      this._log.debug("Not saving changedIDs.");
    1.96 +      return;
    1.97 +    }
    1.98 +    Utils.namedTimer(function () {
    1.99 +      this._log.debug("Saving changed IDs to " + this.file);
   1.100 +      Utils.jsonSave("changes/" + this.file, this, this.changedIDs, cb);
   1.101 +    }, 1000, this, "_lazySave");
   1.102 +  },
   1.103 +
   1.104 +  loadChangedIDs: function (cb) {
   1.105 +    Utils.jsonLoad("changes/" + this.file, this, function(json) {
   1.106 +      if (json && (typeof(json) == "object")) {
   1.107 +        this.changedIDs = json;
   1.108 +      } else {
   1.109 +        this._log.warn("Changed IDs file " + this.file + " contains non-object value.");
   1.110 +        json = null;
   1.111 +      }
   1.112 +      if (cb) {
   1.113 +        cb.call(this, json);
   1.114 +      }
   1.115 +    });
   1.116 +  },
   1.117 +
   1.118 +  // ignore/unignore specific IDs.  Useful for ignoring items that are
   1.119 +  // being processed, or that shouldn't be synced.
   1.120 +  // But note: not persisted to disk
   1.121 +
   1.122 +  ignoreID: function (id) {
   1.123 +    this.unignoreID(id);
   1.124 +    this._ignored.push(id);
   1.125 +  },
   1.126 +
   1.127 +  unignoreID: function (id) {
   1.128 +    let index = this._ignored.indexOf(id);
   1.129 +    if (index != -1)
   1.130 +      this._ignored.splice(index, 1);
   1.131 +  },
   1.132 +
   1.133 +  addChangedID: function (id, when) {
   1.134 +    if (!id) {
   1.135 +      this._log.warn("Attempted to add undefined ID to tracker");
   1.136 +      return false;
   1.137 +    }
   1.138 +
   1.139 +    if (this.ignoreAll || (id in this._ignored)) {
   1.140 +      return false;
   1.141 +    }
   1.142 +
   1.143 +    // Default to the current time in seconds if no time is provided.
   1.144 +    if (when == null) {
   1.145 +      when = Math.floor(Date.now() / 1000);
   1.146 +    }
   1.147 +
   1.148 +    // Add/update the entry if we have a newer time.
   1.149 +    if ((this.changedIDs[id] || -Infinity) < when) {
   1.150 +      this._log.trace("Adding changed ID: " + id + ", " + when);
   1.151 +      this.changedIDs[id] = when;
   1.152 +      this.saveChangedIDs(this.onSavedChangedIDs);
   1.153 +    }
   1.154 +
   1.155 +    return true;
   1.156 +  },
   1.157 +
   1.158 +  removeChangedID: function (id) {
   1.159 +    if (!id) {
   1.160 +      this._log.warn("Attempted to remove undefined ID to tracker");
   1.161 +      return false;
   1.162 +    }
   1.163 +    if (this.ignoreAll || (id in this._ignored))
   1.164 +      return false;
   1.165 +    if (this.changedIDs[id] != null) {
   1.166 +      this._log.trace("Removing changed ID " + id);
   1.167 +      delete this.changedIDs[id];
   1.168 +      this.saveChangedIDs();
   1.169 +    }
   1.170 +    return true;
   1.171 +  },
   1.172 +
   1.173 +  clearChangedIDs: function () {
   1.174 +    this._log.trace("Clearing changed ID list");
   1.175 +    this.changedIDs = {};
   1.176 +    this.saveChangedIDs();
   1.177 +  },
   1.178 +
   1.179 +  _isTracking: false,
   1.180 +
   1.181 +  // Override these in your subclasses.
   1.182 +  startTracking: function () {
   1.183 +  },
   1.184 +
   1.185 +  stopTracking: function () {
   1.186 +  },
   1.187 +
   1.188 +  engineIsEnabled: function () {
   1.189 +    if (!this.engine) {
   1.190 +      // Can't tell -- we must be running in a test!
   1.191 +      return true;
   1.192 +    }
   1.193 +    return this.engine.enabled;
   1.194 +  },
   1.195 +
   1.196 +  onEngineEnabledChanged: function (engineEnabled) {
   1.197 +    if (engineEnabled == this._isTracking) {
   1.198 +      return;
   1.199 +    }
   1.200 +
   1.201 +    if (engineEnabled) {
   1.202 +      this.startTracking();
   1.203 +      this._isTracking = true;
   1.204 +    } else {
   1.205 +      this.stopTracking();
   1.206 +      this._isTracking = false;
   1.207 +      this.clearChangedIDs();
   1.208 +    }
   1.209 +  },
   1.210 +
   1.211 +  observe: function (subject, topic, data) {
   1.212 +    switch (topic) {
   1.213 +      case "weave:engine:start-tracking":
   1.214 +        if (!this.engineIsEnabled()) {
   1.215 +          return;
   1.216 +        }
   1.217 +        this._log.trace("Got start-tracking.");
   1.218 +        if (!this._isTracking) {
   1.219 +          this.startTracking();
   1.220 +          this._isTracking = true;
   1.221 +        }
   1.222 +        return;
   1.223 +      case "weave:engine:stop-tracking":
   1.224 +        this._log.trace("Got stop-tracking.");
   1.225 +        if (this._isTracking) {
   1.226 +          this.stopTracking();
   1.227 +          this._isTracking = false;
   1.228 +        }
   1.229 +        return;
   1.230 +    }
   1.231 +  }
   1.232 +};
   1.233 +
   1.234 +
   1.235 +
   1.236 +/**
   1.237 + * The Store serves as the interface between Sync and stored data.
   1.238 + *
   1.239 + * The name "store" is slightly a misnomer because it doesn't actually "store"
   1.240 + * anything. Instead, it serves as a gateway to something that actually does
   1.241 + * the "storing."
   1.242 + *
   1.243 + * The store is responsible for record management inside an engine. It tells
   1.244 + * Sync what items are available for Sync, converts items to and from Sync's
   1.245 + * record format, and applies records from Sync into changes on the underlying
   1.246 + * store.
   1.247 + *
   1.248 + * Store implementations require a number of functions to be implemented. These
   1.249 + * are all documented below.
   1.250 + *
   1.251 + * For stores that deal with many records or which have expensive store access
   1.252 + * routines, it is highly recommended to implement a custom applyIncomingBatch
   1.253 + * and/or applyIncoming function on top of the basic APIs.
   1.254 + */
   1.255 +
   1.256 +this.Store = function Store(name, engine) {
   1.257 +  if (!engine) {
   1.258 +    throw new Error("Store must be associated with an Engine instance.");
   1.259 +  }
   1.260 +
   1.261 +  name = name || "Unnamed";
   1.262 +  this.name = name.toLowerCase();
   1.263 +  this.engine = engine;
   1.264 +
   1.265 +  this._log = Log.repository.getLogger("Sync.Store." + name);
   1.266 +  let level = Svc.Prefs.get("log.logger.engine." + this.name, "Debug");
   1.267 +  this._log.level = Log.Level[level];
   1.268 +
   1.269 +  XPCOMUtils.defineLazyGetter(this, "_timer", function() {
   1.270 +    return Cc["@mozilla.org/timer;1"].createInstance(Ci.nsITimer);
   1.271 +  });
   1.272 +}
   1.273 +Store.prototype = {
   1.274 +
   1.275 +  _sleep: function _sleep(delay) {
   1.276 +    let cb = Async.makeSyncCallback();
   1.277 +    this._timer.initWithCallback(cb, delay, Ci.nsITimer.TYPE_ONE_SHOT);
   1.278 +    Async.waitForSyncCallback(cb);
   1.279 +  },
   1.280 +
   1.281 +  /**
   1.282 +   * Apply multiple incoming records against the store.
   1.283 +   *
   1.284 +   * This is called with a set of incoming records to process. The function
   1.285 +   * should look at each record, reconcile with the current local state, and
   1.286 +   * make the local changes required to bring its state in alignment with the
   1.287 +   * record.
   1.288 +   *
   1.289 +   * The default implementation simply iterates over all records and calls
   1.290 +   * applyIncoming(). Store implementations may overwrite this function
   1.291 +   * if desired.
   1.292 +   *
   1.293 +   * @param  records Array of records to apply
   1.294 +   * @return Array of record IDs which did not apply cleanly
   1.295 +   */
   1.296 +  applyIncomingBatch: function (records) {
   1.297 +    let failed = [];
   1.298 +    for each (let record in records) {
   1.299 +      try {
   1.300 +        this.applyIncoming(record);
   1.301 +      } catch (ex if (ex.code == Engine.prototype.eEngineAbortApplyIncoming)) {
   1.302 +        // This kind of exception should have a 'cause' attribute, which is an
   1.303 +        // originating exception.
   1.304 +        // ex.cause will carry its stack with it when rethrown.
   1.305 +        throw ex.cause;
   1.306 +      } catch (ex) {
   1.307 +        this._log.warn("Failed to apply incoming record " + record.id);
   1.308 +        this._log.warn("Encountered exception: " + Utils.exceptionStr(ex));
   1.309 +        failed.push(record.id);
   1.310 +      }
   1.311 +    };
   1.312 +    return failed;
   1.313 +  },
   1.314 +
   1.315 +  /**
   1.316 +   * Apply a single record against the store.
   1.317 +   *
   1.318 +   * This takes a single record and makes the local changes required so the
   1.319 +   * local state matches what's in the record.
   1.320 +   *
   1.321 +   * The default implementation calls one of remove(), create(), or update()
   1.322 +   * depending on the state obtained from the store itself. Store
   1.323 +   * implementations may overwrite this function if desired.
   1.324 +   *
   1.325 +   * @param record
   1.326 +   *        Record to apply
   1.327 +   */
   1.328 +  applyIncoming: function (record) {
   1.329 +    if (record.deleted)
   1.330 +      this.remove(record);
   1.331 +    else if (!this.itemExists(record.id))
   1.332 +      this.create(record);
   1.333 +    else
   1.334 +      this.update(record);
   1.335 +  },
   1.336 +
   1.337 +  // override these in derived objects
   1.338 +
   1.339 +  /**
   1.340 +   * Create an item in the store from a record.
   1.341 +   *
   1.342 +   * This is called by the default implementation of applyIncoming(). If using
   1.343 +   * applyIncomingBatch(), this won't be called unless your store calls it.
   1.344 +   *
   1.345 +   * @param record
   1.346 +   *        The store record to create an item from
   1.347 +   */
   1.348 +  create: function (record) {
   1.349 +    throw "override create in a subclass";
   1.350 +  },
   1.351 +
   1.352 +  /**
   1.353 +   * Remove an item in the store from a record.
   1.354 +   *
   1.355 +   * This is called by the default implementation of applyIncoming(). If using
   1.356 +   * applyIncomingBatch(), this won't be called unless your store calls it.
   1.357 +   *
   1.358 +   * @param record
   1.359 +   *        The store record to delete an item from
   1.360 +   */
   1.361 +  remove: function (record) {
   1.362 +    throw "override remove in a subclass";
   1.363 +  },
   1.364 +
   1.365 +  /**
   1.366 +   * Update an item from a record.
   1.367 +   *
   1.368 +   * This is called by the default implementation of applyIncoming(). If using
   1.369 +   * applyIncomingBatch(), this won't be called unless your store calls it.
   1.370 +   *
   1.371 +   * @param record
   1.372 +   *        The record to use to update an item from
   1.373 +   */
   1.374 +  update: function (record) {
   1.375 +    throw "override update in a subclass";
   1.376 +  },
   1.377 +
   1.378 +  /**
   1.379 +   * Determine whether a record with the specified ID exists.
   1.380 +   *
   1.381 +   * Takes a string record ID and returns a booleans saying whether the record
   1.382 +   * exists.
   1.383 +   *
   1.384 +   * @param  id
   1.385 +   *         string record ID
   1.386 +   * @return boolean indicating whether record exists locally
   1.387 +   */
   1.388 +  itemExists: function (id) {
   1.389 +    throw "override itemExists in a subclass";
   1.390 +  },
   1.391 +
   1.392 +  /**
   1.393 +   * Create a record from the specified ID.
   1.394 +   *
   1.395 +   * If the ID is known, the record should be populated with metadata from
   1.396 +   * the store. If the ID is not known, the record should be created with the
   1.397 +   * delete field set to true.
   1.398 +   *
   1.399 +   * @param  id
   1.400 +   *         string record ID
   1.401 +   * @param  collection
   1.402 +   *         Collection to add record to. This is typically passed into the
   1.403 +   *         constructor for the newly-created record.
   1.404 +   * @return record type for this engine
   1.405 +   */
   1.406 +  createRecord: function (id, collection) {
   1.407 +    throw "override createRecord in a subclass";
   1.408 +  },
   1.409 +
   1.410 +  /**
   1.411 +   * Change the ID of a record.
   1.412 +   *
   1.413 +   * @param  oldID
   1.414 +   *         string old/current record ID
   1.415 +   * @param  newID
   1.416 +   *         string new record ID
   1.417 +   */
   1.418 +  changeItemID: function (oldID, newID) {
   1.419 +    throw "override changeItemID in a subclass";
   1.420 +  },
   1.421 +
   1.422 +  /**
   1.423 +   * Obtain the set of all known record IDs.
   1.424 +   *
   1.425 +   * @return Object with ID strings as keys and values of true. The values
   1.426 +   *         are ignored.
   1.427 +   */
   1.428 +  getAllIDs: function () {
   1.429 +    throw "override getAllIDs in a subclass";
   1.430 +  },
   1.431 +
   1.432 +  /**
   1.433 +   * Wipe all data in the store.
   1.434 +   *
   1.435 +   * This function is called during remote wipes or when replacing local data
   1.436 +   * with remote data.
   1.437 +   *
   1.438 +   * This function should delete all local data that the store is managing. It
   1.439 +   * can be thought of as clearing out all state and restoring the "new
   1.440 +   * browser" state.
   1.441 +   */
   1.442 +  wipe: function () {
   1.443 +    throw "override wipe in a subclass";
   1.444 +  }
   1.445 +};
   1.446 +
   1.447 +this.EngineManager = function EngineManager(service) {
   1.448 +  this.service = service;
   1.449 +
   1.450 +  this._engines = {};
   1.451 +
   1.452 +  // This will be populated by Service on startup.
   1.453 +  this._declined = new Set();
   1.454 +  this._log = Log.repository.getLogger("Sync.EngineManager");
   1.455 +  this._log.level = Log.Level[Svc.Prefs.get("log.logger.service.engines", "Debug")];
   1.456 +}
   1.457 +EngineManager.prototype = {
   1.458 +  get: function (name) {
   1.459 +    // Return an array of engines if we have an array of names
   1.460 +    if (Array.isArray(name)) {
   1.461 +      let engines = [];
   1.462 +      name.forEach(function(name) {
   1.463 +        let engine = this.get(name);
   1.464 +        if (engine) {
   1.465 +          engines.push(engine);
   1.466 +        }
   1.467 +      }, this);
   1.468 +      return engines;
   1.469 +    }
   1.470 +
   1.471 +    let engine = this._engines[name];
   1.472 +    if (!engine) {
   1.473 +      this._log.debug("Could not get engine: " + name);
   1.474 +      if (Object.keys) {
   1.475 +        this._log.debug("Engines are: " + JSON.stringify(Object.keys(this._engines)));
   1.476 +      }
   1.477 +    }
   1.478 +    return engine;
   1.479 +  },
   1.480 +
   1.481 +  getAll: function () {
   1.482 +    return [engine for ([name, engine] in Iterator(this._engines))];
   1.483 +  },
   1.484 +
   1.485 +  /**
   1.486 +   * N.B., does not pay attention to the declined list.
   1.487 +   */
   1.488 +  getEnabled: function () {
   1.489 +    return this.getAll().filter(function(engine) engine.enabled);
   1.490 +  },
   1.491 +
   1.492 +  get enabledEngineNames() {
   1.493 +    return [e.name for each (e in this.getEnabled())];
   1.494 +  },
   1.495 +
   1.496 +  persistDeclined: function () {
   1.497 +    Svc.Prefs.set("declinedEngines", [...this._declined].join(","));
   1.498 +  },
   1.499 +
   1.500 +  /**
   1.501 +   * Returns an array.
   1.502 +   */
   1.503 +  getDeclined: function () {
   1.504 +    return [...this._declined];
   1.505 +  },
   1.506 +
   1.507 +  setDeclined: function (engines) {
   1.508 +    this._declined = new Set(engines);
   1.509 +    this.persistDeclined();
   1.510 +  },
   1.511 +
   1.512 +  isDeclined: function (engineName) {
   1.513 +    return this._declined.has(engineName);
   1.514 +  },
   1.515 +
   1.516 +  /**
   1.517 +   * Accepts a Set or an array.
   1.518 +   */
   1.519 +  decline: function (engines) {
   1.520 +    for (let e of engines) {
   1.521 +      this._declined.add(e);
   1.522 +    }
   1.523 +    this.persistDeclined();
   1.524 +  },
   1.525 +
   1.526 +  undecline: function (engines) {
   1.527 +    for (let e of engines) {
   1.528 +      this._declined.delete(e);
   1.529 +    }
   1.530 +    this.persistDeclined();
   1.531 +  },
   1.532 +
   1.533 +  /**
   1.534 +   * Mark any non-enabled engines as declined.
   1.535 +   *
   1.536 +   * This is useful after initial customization during setup.
   1.537 +   */
   1.538 +  declineDisabled: function () {
   1.539 +    for (let e of this.getAll()) {
   1.540 +      if (!e.enabled) {
   1.541 +        this._log.debug("Declining disabled engine " + e.name);
   1.542 +        this._declined.add(e.name);
   1.543 +      }
   1.544 +    }
   1.545 +    this.persistDeclined();
   1.546 +  },
   1.547 +
   1.548 +  /**
   1.549 +   * Register an Engine to the service. Alternatively, give an array of engine
   1.550 +   * objects to register.
   1.551 +   *
   1.552 +   * @param engineObject
   1.553 +   *        Engine object used to get an instance of the engine
   1.554 +   * @return The engine object if anything failed
   1.555 +   */
   1.556 +  register: function (engineObject) {
   1.557 +    if (Array.isArray(engineObject)) {
   1.558 +      return engineObject.map(this.register, this);
   1.559 +    }
   1.560 +
   1.561 +    try {
   1.562 +      let engine = new engineObject(this.service);
   1.563 +      let name = engine.name;
   1.564 +      if (name in this._engines) {
   1.565 +        this._log.error("Engine '" + name + "' is already registered!");
   1.566 +      } else {
   1.567 +        this._engines[name] = engine;
   1.568 +      }
   1.569 +    } catch (ex) {
   1.570 +      this._log.error(CommonUtils.exceptionStr(ex));
   1.571 +
   1.572 +      let mesg = ex.message ? ex.message : ex;
   1.573 +      let name = engineObject || "";
   1.574 +      name = name.prototype || "";
   1.575 +      name = name.name || "";
   1.576 +
   1.577 +      let out = "Could not initialize engine '" + name + "': " + mesg;
   1.578 +      this._log.error(out);
   1.579 +
   1.580 +      return engineObject;
   1.581 +    }
   1.582 +  },
   1.583 +
   1.584 +  unregister: function (val) {
   1.585 +    let name = val;
   1.586 +    if (val instanceof Engine) {
   1.587 +      name = val.name;
   1.588 +    }
   1.589 +    delete this._engines[name];
   1.590 +  },
   1.591 +
   1.592 +  clear: function () {
   1.593 +    for (let name in this._engines) {
   1.594 +      delete this._engines[name];
   1.595 +    }
   1.596 +  },
   1.597 +};
   1.598 +
   1.599 +this.Engine = function Engine(name, service) {
   1.600 +  if (!service) {
   1.601 +    throw new Error("Engine must be associated with a Service instance.");
   1.602 +  }
   1.603 +
   1.604 +  this.Name = name || "Unnamed";
   1.605 +  this.name = name.toLowerCase();
   1.606 +  this.service = service;
   1.607 +
   1.608 +  this._notify = Utils.notify("weave:engine:");
   1.609 +  this._log = Log.repository.getLogger("Sync.Engine." + this.Name);
   1.610 +  let level = Svc.Prefs.get("log.logger.engine." + this.name, "Debug");
   1.611 +  this._log.level = Log.Level[level];
   1.612 +
   1.613 +  this._tracker; // initialize tracker to load previously changed IDs
   1.614 +  this._log.debug("Engine initialized");
   1.615 +}
   1.616 +Engine.prototype = {
   1.617 +  // _storeObj, and _trackerObj should to be overridden in subclasses
   1.618 +  _storeObj: Store,
   1.619 +  _trackerObj: Tracker,
   1.620 +
   1.621 +  // Local 'constant'.
   1.622 +  // Signal to the engine that processing further records is pointless.
   1.623 +  eEngineAbortApplyIncoming: "error.engine.abort.applyincoming",
   1.624 +
   1.625 +  get prefName() this.name,
   1.626 +  get enabled() {
   1.627 +    return Svc.Prefs.get("engine." + this.prefName, false);
   1.628 +  },
   1.629 +
   1.630 +  set enabled(val) {
   1.631 +    Svc.Prefs.set("engine." + this.prefName, !!val);
   1.632 +    this._tracker.onEngineEnabledChanged(val);
   1.633 +  },
   1.634 +
   1.635 +  get score() this._tracker.score,
   1.636 +
   1.637 +  get _store() {
   1.638 +    let store = new this._storeObj(this.Name, this);
   1.639 +    this.__defineGetter__("_store", function() store);
   1.640 +    return store;
   1.641 +  },
   1.642 +
   1.643 +  get _tracker() {
   1.644 +    let tracker = new this._trackerObj(this.Name, this);
   1.645 +    this.__defineGetter__("_tracker", function() tracker);
   1.646 +    return tracker;
   1.647 +  },
   1.648 +
   1.649 +  sync: function () {
   1.650 +    if (!this.enabled) {
   1.651 +      return;
   1.652 +    }
   1.653 +
   1.654 +    if (!this._sync) {
   1.655 +      throw "engine does not implement _sync method";
   1.656 +    }
   1.657 +
   1.658 +    this._notify("sync", this.name, this._sync)();
   1.659 +  },
   1.660 +
   1.661 +  /**
   1.662 +   * Get rid of any local meta-data.
   1.663 +   */
   1.664 +  resetClient: function () {
   1.665 +    if (!this._resetClient) {
   1.666 +      throw "engine does not implement _resetClient method";
   1.667 +    }
   1.668 +
   1.669 +    this._notify("reset-client", this.name, this._resetClient)();
   1.670 +  },
   1.671 +
   1.672 +  _wipeClient: function () {
   1.673 +    this.resetClient();
   1.674 +    this._log.debug("Deleting all local data");
   1.675 +    this._tracker.ignoreAll = true;
   1.676 +    this._store.wipe();
   1.677 +    this._tracker.ignoreAll = false;
   1.678 +    this._tracker.clearChangedIDs();
   1.679 +  },
   1.680 +
   1.681 +  wipeClient: function () {
   1.682 +    this._notify("wipe-client", this.name, this._wipeClient)();
   1.683 +  }
   1.684 +};
   1.685 +
   1.686 +this.SyncEngine = function SyncEngine(name, service) {
   1.687 +  Engine.call(this, name || "SyncEngine", service);
   1.688 +
   1.689 +  this.loadToFetch();
   1.690 +  this.loadPreviousFailed();
   1.691 +}
   1.692 +
   1.693 +// Enumeration to define approaches to handling bad records.
   1.694 +// Attached to the constructor to allow use as a kind of static enumeration.
   1.695 +SyncEngine.kRecoveryStrategy = {
   1.696 +  ignore: "ignore",
   1.697 +  retry:  "retry",
   1.698 +  error:  "error"
   1.699 +};
   1.700 +
   1.701 +SyncEngine.prototype = {
   1.702 +  __proto__: Engine.prototype,
   1.703 +  _recordObj: CryptoWrapper,
   1.704 +  version: 1,
   1.705 +
   1.706 +  // How many records to pull in a single sync. This is primarily to avoid very
   1.707 +  // long first syncs against profiles with many history records.
   1.708 +  downloadLimit: null,
   1.709 +
   1.710 +  // How many records to pull at one time when specifying IDs. This is to avoid
   1.711 +  // URI length limitations.
   1.712 +  guidFetchBatchSize: DEFAULT_GUID_FETCH_BATCH_SIZE,
   1.713 +  mobileGUIDFetchBatchSize: DEFAULT_MOBILE_GUID_FETCH_BATCH_SIZE,
   1.714 +
   1.715 +  // How many records to process in a single batch.
   1.716 +  applyIncomingBatchSize: DEFAULT_STORE_BATCH_SIZE,
   1.717 +
   1.718 +  get storageURL() this.service.storageURL,
   1.719 +
   1.720 +  get engineURL() this.storageURL + this.name,
   1.721 +
   1.722 +  get cryptoKeysURL() this.storageURL + "crypto/keys",
   1.723 +
   1.724 +  get metaURL() this.storageURL + "meta/global",
   1.725 +
   1.726 +  get syncID() {
   1.727 +    // Generate a random syncID if we don't have one
   1.728 +    let syncID = Svc.Prefs.get(this.name + ".syncID", "");
   1.729 +    return syncID == "" ? this.syncID = Utils.makeGUID() : syncID;
   1.730 +  },
   1.731 +  set syncID(value) {
   1.732 +    Svc.Prefs.set(this.name + ".syncID", value);
   1.733 +  },
   1.734 +
   1.735 +  /*
   1.736 +   * lastSync is a timestamp in server time.
   1.737 +   */
   1.738 +  get lastSync() {
   1.739 +    return parseFloat(Svc.Prefs.get(this.name + ".lastSync", "0"));
   1.740 +  },
   1.741 +  set lastSync(value) {
   1.742 +    // Reset the pref in-case it's a number instead of a string
   1.743 +    Svc.Prefs.reset(this.name + ".lastSync");
   1.744 +    // Store the value as a string to keep floating point precision
   1.745 +    Svc.Prefs.set(this.name + ".lastSync", value.toString());
   1.746 +  },
   1.747 +  resetLastSync: function () {
   1.748 +    this._log.debug("Resetting " + this.name + " last sync time");
   1.749 +    Svc.Prefs.reset(this.name + ".lastSync");
   1.750 +    Svc.Prefs.set(this.name + ".lastSync", "0");
   1.751 +    this.lastSyncLocal = 0;
   1.752 +  },
   1.753 +
   1.754 +  get toFetch() this._toFetch,
   1.755 +  set toFetch(val) {
   1.756 +    let cb = (error) => this._log.error(Utils.exceptionStr(error));
   1.757 +    // Coerce the array to a string for more efficient comparison.
   1.758 +    if (val + "" == this._toFetch) {
   1.759 +      return;
   1.760 +    }
   1.761 +    this._toFetch = val;
   1.762 +    Utils.namedTimer(function () {
   1.763 +      Utils.jsonSave("toFetch/" + this.name, this, val, cb);
   1.764 +    }, 0, this, "_toFetchDelay");
   1.765 +  },
   1.766 +
   1.767 +  loadToFetch: function () {
   1.768 +    // Initialize to empty if there's no file.
   1.769 +    this._toFetch = [];
   1.770 +    Utils.jsonLoad("toFetch/" + this.name, this, function(toFetch) {
   1.771 +      if (toFetch) {
   1.772 +        this._toFetch = toFetch;
   1.773 +      }
   1.774 +    });
   1.775 +  },
   1.776 +
   1.777 +  get previousFailed() this._previousFailed,
   1.778 +  set previousFailed(val) {
   1.779 +    let cb = (error) => this._log.error(Utils.exceptionStr(error));
   1.780 +    // Coerce the array to a string for more efficient comparison.
   1.781 +    if (val + "" == this._previousFailed) {
   1.782 +      return;
   1.783 +    }
   1.784 +    this._previousFailed = val;
   1.785 +    Utils.namedTimer(function () {
   1.786 +      Utils.jsonSave("failed/" + this.name, this, val, cb);
   1.787 +    }, 0, this, "_previousFailedDelay");
   1.788 +  },
   1.789 +
   1.790 +  loadPreviousFailed: function () {
   1.791 +    // Initialize to empty if there's no file
   1.792 +    this._previousFailed = [];
   1.793 +    Utils.jsonLoad("failed/" + this.name, this, function(previousFailed) {
   1.794 +      if (previousFailed) {
   1.795 +        this._previousFailed = previousFailed;
   1.796 +      }
   1.797 +    });
   1.798 +  },
   1.799 +
   1.800 +  /*
   1.801 +   * lastSyncLocal is a timestamp in local time.
   1.802 +   */
   1.803 +  get lastSyncLocal() {
   1.804 +    return parseInt(Svc.Prefs.get(this.name + ".lastSyncLocal", "0"), 10);
   1.805 +  },
   1.806 +  set lastSyncLocal(value) {
   1.807 +    // Store as a string because pref can only store C longs as numbers.
   1.808 +    Svc.Prefs.set(this.name + ".lastSyncLocal", value.toString());
   1.809 +  },
   1.810 +
   1.811 +  /*
   1.812 +   * Returns a mapping of IDs -> changed timestamp. Engine implementations
   1.813 +   * can override this method to bypass the tracker for certain or all
   1.814 +   * changed items.
   1.815 +   */
   1.816 +  getChangedIDs: function () {
   1.817 +    return this._tracker.changedIDs;
   1.818 +  },
   1.819 +
   1.820 +  // Create a new record using the store and add in crypto fields.
   1.821 +  _createRecord: function (id) {
   1.822 +    let record = this._store.createRecord(id, this.name);
   1.823 +    record.id = id;
   1.824 +    record.collection = this.name;
   1.825 +    return record;
   1.826 +  },
   1.827 +
   1.828 +  // Any setup that needs to happen at the beginning of each sync.
   1.829 +  _syncStartup: function () {
   1.830 +
   1.831 +    // Determine if we need to wipe on outdated versions
   1.832 +    let metaGlobal = this.service.recordManager.get(this.metaURL);
   1.833 +    let engines = metaGlobal.payload.engines || {};
   1.834 +    let engineData = engines[this.name] || {};
   1.835 +
   1.836 +    let needsWipe = false;
   1.837 +
   1.838 +    // Assume missing versions are 0 and wipe the server
   1.839 +    if ((engineData.version || 0) < this.version) {
   1.840 +      this._log.debug("Old engine data: " + [engineData.version, this.version]);
   1.841 +
   1.842 +      // Prepare to clear the server and upload everything
   1.843 +      needsWipe = true;
   1.844 +      this.syncID = "";
   1.845 +
   1.846 +      // Set the newer version and newly generated syncID
   1.847 +      engineData.version = this.version;
   1.848 +      engineData.syncID = this.syncID;
   1.849 +
   1.850 +      // Put the new data back into meta/global and mark for upload
   1.851 +      engines[this.name] = engineData;
   1.852 +      metaGlobal.payload.engines = engines;
   1.853 +      metaGlobal.changed = true;
   1.854 +    }
   1.855 +    // Don't sync this engine if the server has newer data
   1.856 +    else if (engineData.version > this.version) {
   1.857 +      let error = new String("New data: " + [engineData.version, this.version]);
   1.858 +      error.failureCode = VERSION_OUT_OF_DATE;
   1.859 +      throw error;
   1.860 +    }
   1.861 +    // Changes to syncID mean we'll need to upload everything
   1.862 +    else if (engineData.syncID != this.syncID) {
   1.863 +      this._log.debug("Engine syncIDs: " + [engineData.syncID, this.syncID]);
   1.864 +      this.syncID = engineData.syncID;
   1.865 +      this._resetClient();
   1.866 +    };
   1.867 +
   1.868 +    // Delete any existing data and reupload on bad version or missing meta.
   1.869 +    // No crypto component here...? We could regenerate per-collection keys...
   1.870 +    if (needsWipe) {
   1.871 +      this.wipeServer();
   1.872 +    }
   1.873 +
   1.874 +    // Save objects that need to be uploaded in this._modified. We also save
   1.875 +    // the timestamp of this fetch in this.lastSyncLocal. As we successfully
   1.876 +    // upload objects we remove them from this._modified. If an error occurs
   1.877 +    // or any objects fail to upload, they will remain in this._modified. At
   1.878 +    // the end of a sync, or after an error, we add all objects remaining in
   1.879 +    // this._modified to the tracker.
   1.880 +    this.lastSyncLocal = Date.now();
   1.881 +    if (this.lastSync) {
   1.882 +      this._modified = this.getChangedIDs();
   1.883 +    } else {
   1.884 +      // Mark all items to be uploaded, but treat them as changed from long ago
   1.885 +      this._log.debug("First sync, uploading all items");
   1.886 +      this._modified = {};
   1.887 +      for (let id in this._store.getAllIDs()) {
   1.888 +        this._modified[id] = 0;
   1.889 +      }
   1.890 +    }
   1.891 +    // Clear the tracker now. If the sync fails we'll add the ones we failed
   1.892 +    // to upload back.
   1.893 +    this._tracker.clearChangedIDs();
   1.894 +
   1.895 +    this._log.info(Object.keys(this._modified).length +
   1.896 +                   " outgoing items pre-reconciliation");
   1.897 +
   1.898 +    // Keep track of what to delete at the end of sync
   1.899 +    this._delete = {};
   1.900 +  },
   1.901 +
   1.902 +  /**
   1.903 +   * A tiny abstraction to make it easier to test incoming record
   1.904 +   * application.
   1.905 +   */
   1.906 +  _itemSource: function () {
   1.907 +    return new Collection(this.engineURL, this._recordObj, this.service);
   1.908 +  },
   1.909 +
   1.910 +  /**
   1.911 +   * Process incoming records.
   1.912 +   * In the most awful and untestable way possible.
   1.913 +   * This now accepts something that makes testing vaguely less impossible.
   1.914 +   */
   1.915 +  _processIncoming: function (newitems) {
   1.916 +    this._log.trace("Downloading & applying server changes");
   1.917 +
   1.918 +    // Figure out how many total items to fetch this sync; do less on mobile.
   1.919 +    let batchSize = this.downloadLimit || Infinity;
   1.920 +    let isMobile = (Svc.Prefs.get("client.type") == "mobile");
   1.921 +
   1.922 +    if (!newitems) {
   1.923 +      newitems = this._itemSource();
   1.924 +    }
   1.925 +
   1.926 +    if (isMobile) {
   1.927 +      batchSize = MOBILE_BATCH_SIZE;
   1.928 +    }
   1.929 +    newitems.newer = this.lastSync;
   1.930 +    newitems.full  = true;
   1.931 +    newitems.limit = batchSize;
   1.932 +
   1.933 +    // applied    => number of items that should be applied.
   1.934 +    // failed     => number of items that failed in this sync.
   1.935 +    // newFailed  => number of items that failed for the first time in this sync.
   1.936 +    // reconciled => number of items that were reconciled.
   1.937 +    let count = {applied: 0, failed: 0, newFailed: 0, reconciled: 0};
   1.938 +    let handled = [];
   1.939 +    let applyBatch = [];
   1.940 +    let failed = [];
   1.941 +    let failedInPreviousSync = this.previousFailed;
   1.942 +    let fetchBatch = Utils.arrayUnion(this.toFetch, failedInPreviousSync);
   1.943 +    // Reset previousFailed for each sync since previously failed items may not fail again.
   1.944 +    this.previousFailed = [];
   1.945 +
   1.946 +    // Used (via exceptions) to allow the record handler/reconciliation/etc.
   1.947 +    // methods to signal that they would like processing of incoming records to
   1.948 +    // cease.
   1.949 +    let aborting = undefined;
   1.950 +
   1.951 +    function doApplyBatch() {
   1.952 +      this._tracker.ignoreAll = true;
   1.953 +      try {
   1.954 +        failed = failed.concat(this._store.applyIncomingBatch(applyBatch));
   1.955 +      } catch (ex) {
   1.956 +        // Catch any error that escapes from applyIncomingBatch. At present
   1.957 +        // those will all be abort events.
   1.958 +        this._log.warn("Got exception " + Utils.exceptionStr(ex) +
   1.959 +                       ", aborting processIncoming.");
   1.960 +        aborting = ex;
   1.961 +      }
   1.962 +      this._tracker.ignoreAll = false;
   1.963 +      applyBatch = [];
   1.964 +    }
   1.965 +
   1.966 +    function doApplyBatchAndPersistFailed() {
   1.967 +      // Apply remaining batch.
   1.968 +      if (applyBatch.length) {
   1.969 +        doApplyBatch.call(this);
   1.970 +      }
   1.971 +      // Persist failed items so we refetch them.
   1.972 +      if (failed.length) {
   1.973 +        this.previousFailed = Utils.arrayUnion(failed, this.previousFailed);
   1.974 +        count.failed += failed.length;
   1.975 +        this._log.debug("Records that failed to apply: " + failed);
   1.976 +        failed = [];
   1.977 +      }
   1.978 +    }
   1.979 +
   1.980 +    let key = this.service.collectionKeys.keyForCollection(this.name);
   1.981 +
   1.982 +    // Not binding this method to 'this' for performance reasons. It gets
   1.983 +    // called for every incoming record.
   1.984 +    let self = this;
   1.985 +
   1.986 +    newitems.recordHandler = function(item) {
   1.987 +      if (aborting) {
   1.988 +        return;
   1.989 +      }
   1.990 +
   1.991 +      // Grab a later last modified if possible
   1.992 +      if (self.lastModified == null || item.modified > self.lastModified)
   1.993 +        self.lastModified = item.modified;
   1.994 +
   1.995 +      // Track the collection for the WBO.
   1.996 +      item.collection = self.name;
   1.997 +
   1.998 +      // Remember which records were processed
   1.999 +      handled.push(item.id);
  1.1000 +
  1.1001 +      try {
  1.1002 +        try {
  1.1003 +          item.decrypt(key);
  1.1004 +        } catch (ex if Utils.isHMACMismatch(ex)) {
  1.1005 +          let strategy = self.handleHMACMismatch(item, true);
  1.1006 +          if (strategy == SyncEngine.kRecoveryStrategy.retry) {
  1.1007 +            // You only get one retry.
  1.1008 +            try {
  1.1009 +              // Try decrypting again, typically because we've got new keys.
  1.1010 +              self._log.info("Trying decrypt again...");
  1.1011 +              key = self.service.collectionKeys.keyForCollection(self.name);
  1.1012 +              item.decrypt(key);
  1.1013 +              strategy = null;
  1.1014 +            } catch (ex if Utils.isHMACMismatch(ex)) {
  1.1015 +              strategy = self.handleHMACMismatch(item, false);
  1.1016 +            }
  1.1017 +          }
  1.1018 +
  1.1019 +          switch (strategy) {
  1.1020 +            case null:
  1.1021 +              // Retry succeeded! No further handling.
  1.1022 +              break;
  1.1023 +            case SyncEngine.kRecoveryStrategy.retry:
  1.1024 +              self._log.debug("Ignoring second retry suggestion.");
  1.1025 +              // Fall through to error case.
  1.1026 +            case SyncEngine.kRecoveryStrategy.error:
  1.1027 +              self._log.warn("Error decrypting record: " + Utils.exceptionStr(ex));
  1.1028 +              failed.push(item.id);
  1.1029 +              return;
  1.1030 +            case SyncEngine.kRecoveryStrategy.ignore:
  1.1031 +              self._log.debug("Ignoring record " + item.id +
  1.1032 +                              " with bad HMAC: already handled.");
  1.1033 +              return;
  1.1034 +          }
  1.1035 +        }
  1.1036 +      } catch (ex) {
  1.1037 +        self._log.warn("Error decrypting record: " + Utils.exceptionStr(ex));
  1.1038 +        failed.push(item.id);
  1.1039 +        return;
  1.1040 +      }
  1.1041 +
  1.1042 +      let shouldApply;
  1.1043 +      try {
  1.1044 +        shouldApply = self._reconcile(item);
  1.1045 +      } catch (ex if (ex.code == Engine.prototype.eEngineAbortApplyIncoming)) {
  1.1046 +        self._log.warn("Reconciliation failed: aborting incoming processing.");
  1.1047 +        failed.push(item.id);
  1.1048 +        aborting = ex.cause;
  1.1049 +      } catch (ex) {
  1.1050 +        self._log.warn("Failed to reconcile incoming record " + item.id);
  1.1051 +        self._log.warn("Encountered exception: " + Utils.exceptionStr(ex));
  1.1052 +        failed.push(item.id);
  1.1053 +        return;
  1.1054 +      }
  1.1055 +
  1.1056 +      if (shouldApply) {
  1.1057 +        count.applied++;
  1.1058 +        applyBatch.push(item);
  1.1059 +      } else {
  1.1060 +        count.reconciled++;
  1.1061 +        self._log.trace("Skipping reconciled incoming item " + item.id);
  1.1062 +      }
  1.1063 +
  1.1064 +      if (applyBatch.length == self.applyIncomingBatchSize) {
  1.1065 +        doApplyBatch.call(self);
  1.1066 +      }
  1.1067 +      self._store._sleep(0);
  1.1068 +    };
  1.1069 +
  1.1070 +    // Only bother getting data from the server if there's new things
  1.1071 +    if (this.lastModified == null || this.lastModified > this.lastSync) {
  1.1072 +      let resp = newitems.get();
  1.1073 +      doApplyBatchAndPersistFailed.call(this);
  1.1074 +      if (!resp.success) {
  1.1075 +        resp.failureCode = ENGINE_DOWNLOAD_FAIL;
  1.1076 +        throw resp;
  1.1077 +      }
  1.1078 +
  1.1079 +      if (aborting) {
  1.1080 +        throw aborting;
  1.1081 +      }
  1.1082 +    }
  1.1083 +
  1.1084 +    // Mobile: check if we got the maximum that we requested; get the rest if so.
  1.1085 +    if (handled.length == newitems.limit) {
  1.1086 +      let guidColl = new Collection(this.engineURL, null, this.service);
  1.1087 +
  1.1088 +      // Sort and limit so that on mobile we only get the last X records.
  1.1089 +      guidColl.limit = this.downloadLimit;
  1.1090 +      guidColl.newer = this.lastSync;
  1.1091 +
  1.1092 +      // index: Orders by the sortindex descending (highest weight first).
  1.1093 +      guidColl.sort  = "index";
  1.1094 +
  1.1095 +      let guids = guidColl.get();
  1.1096 +      if (!guids.success)
  1.1097 +        throw guids;
  1.1098 +
  1.1099 +      // Figure out which guids weren't just fetched then remove any guids that
  1.1100 +      // were already waiting and prepend the new ones
  1.1101 +      let extra = Utils.arraySub(guids.obj, handled);
  1.1102 +      if (extra.length > 0) {
  1.1103 +        fetchBatch = Utils.arrayUnion(extra, fetchBatch);
  1.1104 +        this.toFetch = Utils.arrayUnion(extra, this.toFetch);
  1.1105 +      }
  1.1106 +    }
  1.1107 +
  1.1108 +    // Fast-foward the lastSync timestamp since we have stored the
  1.1109 +    // remaining items in toFetch.
  1.1110 +    if (this.lastSync < this.lastModified) {
  1.1111 +      this.lastSync = this.lastModified;
  1.1112 +    }
  1.1113 +
  1.1114 +    // Process any backlog of GUIDs.
  1.1115 +    // At this point we impose an upper limit on the number of items to fetch
  1.1116 +    // in a single request, even for desktop, to avoid hitting URI limits.
  1.1117 +    batchSize = isMobile ? this.mobileGUIDFetchBatchSize :
  1.1118 +                           this.guidFetchBatchSize;
  1.1119 +
  1.1120 +    while (fetchBatch.length && !aborting) {
  1.1121 +      // Reuse the original query, but get rid of the restricting params
  1.1122 +      // and batch remaining records.
  1.1123 +      newitems.limit = 0;
  1.1124 +      newitems.newer = 0;
  1.1125 +      newitems.ids = fetchBatch.slice(0, batchSize);
  1.1126 +
  1.1127 +      // Reuse the existing record handler set earlier
  1.1128 +      let resp = newitems.get();
  1.1129 +      if (!resp.success) {
  1.1130 +        resp.failureCode = ENGINE_DOWNLOAD_FAIL;
  1.1131 +        throw resp;
  1.1132 +      }
  1.1133 +
  1.1134 +      // This batch was successfully applied. Not using
  1.1135 +      // doApplyBatchAndPersistFailed() here to avoid writing toFetch twice.
  1.1136 +      fetchBatch = fetchBatch.slice(batchSize);
  1.1137 +      this.toFetch = Utils.arraySub(this.toFetch, newitems.ids);
  1.1138 +      this.previousFailed = Utils.arrayUnion(this.previousFailed, failed);
  1.1139 +      if (failed.length) {
  1.1140 +        count.failed += failed.length;
  1.1141 +        this._log.debug("Records that failed to apply: " + failed);
  1.1142 +      }
  1.1143 +      failed = [];
  1.1144 +
  1.1145 +      if (aborting) {
  1.1146 +        throw aborting;
  1.1147 +      }
  1.1148 +
  1.1149 +      if (this.lastSync < this.lastModified) {
  1.1150 +        this.lastSync = this.lastModified;
  1.1151 +      }
  1.1152 +    }
  1.1153 +
  1.1154 +    // Apply remaining items.
  1.1155 +    doApplyBatchAndPersistFailed.call(this);
  1.1156 +
  1.1157 +    count.newFailed = Utils.arraySub(this.previousFailed, failedInPreviousSync).length;
  1.1158 +    count.succeeded = Math.max(0, count.applied - count.failed);
  1.1159 +    this._log.info(["Records:",
  1.1160 +                    count.applied, "applied,",
  1.1161 +                    count.succeeded, "successfully,",
  1.1162 +                    count.failed, "failed to apply,",
  1.1163 +                    count.newFailed, "newly failed to apply,",
  1.1164 +                    count.reconciled, "reconciled."].join(" "));
  1.1165 +    Observers.notify("weave:engine:sync:applied", count, this.name);
  1.1166 +  },
  1.1167 +
  1.1168 +  /**
  1.1169 +   * Find a GUID of an item that is a duplicate of the incoming item but happens
  1.1170 +   * to have a different GUID
  1.1171 +   *
  1.1172 +   * @return GUID of the similar item; falsy otherwise
  1.1173 +   */
  1.1174 +  _findDupe: function (item) {
  1.1175 +    // By default, assume there's no dupe items for the engine
  1.1176 +  },
  1.1177 +
  1.1178 +  _deleteId: function (id) {
  1.1179 +    this._tracker.removeChangedID(id);
  1.1180 +
  1.1181 +    // Remember this id to delete at the end of sync
  1.1182 +    if (this._delete.ids == null)
  1.1183 +      this._delete.ids = [id];
  1.1184 +    else
  1.1185 +      this._delete.ids.push(id);
  1.1186 +  },
  1.1187 +
  1.1188 +  /**
  1.1189 +   * Reconcile incoming record with local state.
  1.1190 +   *
  1.1191 +   * This function essentially determines whether to apply an incoming record.
  1.1192 +   *
  1.1193 +   * @param  item
  1.1194 +   *         Record from server to be tested for application.
  1.1195 +   * @return boolean
  1.1196 +   *         Truthy if incoming record should be applied. False if not.
  1.1197 +   */
  1.1198 +  _reconcile: function (item) {
  1.1199 +    if (this._log.level <= Log.Level.Trace) {
  1.1200 +      this._log.trace("Incoming: " + item);
  1.1201 +    }
  1.1202 +
  1.1203 +    // We start reconciling by collecting a bunch of state. We do this here
  1.1204 +    // because some state may change during the course of this function and we
  1.1205 +    // need to operate on the original values.
  1.1206 +    let existsLocally   = this._store.itemExists(item.id);
  1.1207 +    let locallyModified = item.id in this._modified;
  1.1208 +
  1.1209 +    // TODO Handle clock drift better. Tracked in bug 721181.
  1.1210 +    let remoteAge = AsyncResource.serverTime - item.modified;
  1.1211 +    let localAge  = locallyModified ?
  1.1212 +      (Date.now() / 1000 - this._modified[item.id]) : null;
  1.1213 +    let remoteIsNewer = remoteAge < localAge;
  1.1214 +
  1.1215 +    this._log.trace("Reconciling " + item.id + ". exists=" +
  1.1216 +                    existsLocally + "; modified=" + locallyModified +
  1.1217 +                    "; local age=" + localAge + "; incoming age=" +
  1.1218 +                    remoteAge);
  1.1219 +
  1.1220 +    // We handle deletions first so subsequent logic doesn't have to check
  1.1221 +    // deleted flags.
  1.1222 +    if (item.deleted) {
  1.1223 +      // If the item doesn't exist locally, there is nothing for us to do. We
  1.1224 +      // can't check for duplicates because the incoming record has no data
  1.1225 +      // which can be used for duplicate detection.
  1.1226 +      if (!existsLocally) {
  1.1227 +        this._log.trace("Ignoring incoming item because it was deleted and " +
  1.1228 +                        "the item does not exist locally.");
  1.1229 +        return false;
  1.1230 +      }
  1.1231 +
  1.1232 +      // We decide whether to process the deletion by comparing the record
  1.1233 +      // ages. If the item is not modified locally, the remote side wins and
  1.1234 +      // the deletion is processed. If it is modified locally, we take the
  1.1235 +      // newer record.
  1.1236 +      if (!locallyModified) {
  1.1237 +        this._log.trace("Applying incoming delete because the local item " +
  1.1238 +                        "exists and isn't modified.");
  1.1239 +        return true;
  1.1240 +      }
  1.1241 +
  1.1242 +      // TODO As part of bug 720592, determine whether we should do more here.
  1.1243 +      // In the case where the local changes are newer, it is quite possible
  1.1244 +      // that the local client will restore data a remote client had tried to
  1.1245 +      // delete. There might be a good reason for that delete and it might be
  1.1246 +      // enexpected for this client to restore that data.
  1.1247 +      this._log.trace("Incoming record is deleted but we had local changes. " +
  1.1248 +                      "Applying the youngest record.");
  1.1249 +      return remoteIsNewer;
  1.1250 +    }
  1.1251 +
  1.1252 +    // At this point the incoming record is not for a deletion and must have
  1.1253 +    // data. If the incoming record does not exist locally, we check for a local
  1.1254 +    // duplicate existing under a different ID. The default implementation of
  1.1255 +    // _findDupe() is empty, so engines have to opt in to this functionality.
  1.1256 +    //
  1.1257 +    // If we find a duplicate, we change the local ID to the incoming ID and we
  1.1258 +    // refresh the metadata collected above. See bug 710448 for the history
  1.1259 +    // of this logic.
  1.1260 +    if (!existsLocally) {
  1.1261 +      let dupeID = this._findDupe(item);
  1.1262 +      if (dupeID) {
  1.1263 +        this._log.trace("Local item " + dupeID + " is a duplicate for " +
  1.1264 +                        "incoming item " + item.id);
  1.1265 +
  1.1266 +        // The local, duplicate ID is always deleted on the server.
  1.1267 +        this._deleteId(dupeID);
  1.1268 +
  1.1269 +        // The current API contract does not mandate that the ID returned by
  1.1270 +        // _findDupe() actually exists. Therefore, we have to perform this
  1.1271 +        // check.
  1.1272 +        existsLocally = this._store.itemExists(dupeID);
  1.1273 +
  1.1274 +        // We unconditionally change the item's ID in case the engine knows of
  1.1275 +        // an item but doesn't expose it through itemExists. If the API
  1.1276 +        // contract were stronger, this could be changed.
  1.1277 +        this._log.debug("Switching local ID to incoming: " + dupeID + " -> " +
  1.1278 +                        item.id);
  1.1279 +        this._store.changeItemID(dupeID, item.id);
  1.1280 +
  1.1281 +        // If the local item was modified, we carry its metadata forward so
  1.1282 +        // appropriate reconciling can be performed.
  1.1283 +        if (dupeID in this._modified) {
  1.1284 +          locallyModified = true;
  1.1285 +          localAge = Date.now() / 1000 - this._modified[dupeID];
  1.1286 +          remoteIsNewer = remoteAge < localAge;
  1.1287 +
  1.1288 +          this._modified[item.id] = this._modified[dupeID];
  1.1289 +          delete this._modified[dupeID];
  1.1290 +        } else {
  1.1291 +          locallyModified = false;
  1.1292 +          localAge = null;
  1.1293 +        }
  1.1294 +
  1.1295 +        this._log.debug("Local item after duplication: age=" + localAge +
  1.1296 +                        "; modified=" + locallyModified + "; exists=" +
  1.1297 +                        existsLocally);
  1.1298 +      } else {
  1.1299 +        this._log.trace("No duplicate found for incoming item: " + item.id);
  1.1300 +      }
  1.1301 +    }
  1.1302 +
  1.1303 +    // At this point we've performed duplicate detection. But, nothing here
  1.1304 +    // should depend on duplicate detection as the above should have updated
  1.1305 +    // state seamlessly.
  1.1306 +
  1.1307 +    if (!existsLocally) {
  1.1308 +      // If the item doesn't exist locally and we have no local modifications
  1.1309 +      // to the item (implying that it was not deleted), always apply the remote
  1.1310 +      // item.
  1.1311 +      if (!locallyModified) {
  1.1312 +        this._log.trace("Applying incoming because local item does not exist " +
  1.1313 +                        "and was not deleted.");
  1.1314 +        return true;
  1.1315 +      }
  1.1316 +
  1.1317 +      // If the item was modified locally but isn't present, it must have
  1.1318 +      // been deleted. If the incoming record is younger, we restore from
  1.1319 +      // that record.
  1.1320 +      if (remoteIsNewer) {
  1.1321 +        this._log.trace("Applying incoming because local item was deleted " +
  1.1322 +                        "before the incoming item was changed.");
  1.1323 +        delete this._modified[item.id];
  1.1324 +        return true;
  1.1325 +      }
  1.1326 +
  1.1327 +      this._log.trace("Ignoring incoming item because the local item's " +
  1.1328 +                      "deletion is newer.");
  1.1329 +      return false;
  1.1330 +    }
  1.1331 +
  1.1332 +    // If the remote and local records are the same, there is nothing to be
  1.1333 +    // done, so we don't do anything. In the ideal world, this logic wouldn't
  1.1334 +    // be here and the engine would take a record and apply it. The reason we
  1.1335 +    // want to defer this logic is because it would avoid a redundant and
  1.1336 +    // possibly expensive dip into the storage layer to query item state.
  1.1337 +    // This should get addressed in the async rewrite, so we ignore it for now.
  1.1338 +    let localRecord = this._createRecord(item.id);
  1.1339 +    let recordsEqual = Utils.deepEquals(item.cleartext,
  1.1340 +                                        localRecord.cleartext);
  1.1341 +
  1.1342 +    // If the records are the same, we don't need to do anything. This does
  1.1343 +    // potentially throw away a local modification time. But, if the records
  1.1344 +    // are the same, does it matter?
  1.1345 +    if (recordsEqual) {
  1.1346 +      this._log.trace("Ignoring incoming item because the local item is " +
  1.1347 +                      "identical.");
  1.1348 +
  1.1349 +      delete this._modified[item.id];
  1.1350 +      return false;
  1.1351 +    }
  1.1352 +
  1.1353 +    // At this point the records are different.
  1.1354 +
  1.1355 +    // If we have no local modifications, always take the server record.
  1.1356 +    if (!locallyModified) {
  1.1357 +      this._log.trace("Applying incoming record because no local conflicts.");
  1.1358 +      return true;
  1.1359 +    }
  1.1360 +
  1.1361 +    // At this point, records are different and the local record is modified.
  1.1362 +    // We resolve conflicts by record age, where the newest one wins. This does
  1.1363 +    // result in data loss and should be handled by giving the engine an
  1.1364 +    // opportunity to merge the records. Bug 720592 tracks this feature.
  1.1365 +    this._log.warn("DATA LOSS: Both local and remote changes to record: " +
  1.1366 +                   item.id);
  1.1367 +    return remoteIsNewer;
  1.1368 +  },
  1.1369 +
  1.1370 +  // Upload outgoing records.
  1.1371 +  _uploadOutgoing: function () {
  1.1372 +    this._log.trace("Uploading local changes to server.");
  1.1373 +
  1.1374 +    let modifiedIDs = Object.keys(this._modified);
  1.1375 +    if (modifiedIDs.length) {
  1.1376 +      this._log.trace("Preparing " + modifiedIDs.length +
  1.1377 +                      " outgoing records");
  1.1378 +
  1.1379 +      // collection we'll upload
  1.1380 +      let up = new Collection(this.engineURL, null, this.service);
  1.1381 +      let count = 0;
  1.1382 +
  1.1383 +      // Upload what we've got so far in the collection
  1.1384 +      let doUpload = Utils.bind2(this, function(desc) {
  1.1385 +        this._log.info("Uploading " + desc + " of " + modifiedIDs.length +
  1.1386 +                       " records");
  1.1387 +        let resp = up.post();
  1.1388 +        if (!resp.success) {
  1.1389 +          this._log.debug("Uploading records failed: " + resp);
  1.1390 +          resp.failureCode = ENGINE_UPLOAD_FAIL;
  1.1391 +          throw resp;
  1.1392 +        }
  1.1393 +
  1.1394 +        // Update server timestamp from the upload.
  1.1395 +        let modified = resp.headers["x-weave-timestamp"];
  1.1396 +        if (modified > this.lastSync)
  1.1397 +          this.lastSync = modified;
  1.1398 +
  1.1399 +        let failed_ids = Object.keys(resp.obj.failed);
  1.1400 +        if (failed_ids.length)
  1.1401 +          this._log.debug("Records that will be uploaded again because "
  1.1402 +                          + "the server couldn't store them: "
  1.1403 +                          + failed_ids.join(", "));
  1.1404 +
  1.1405 +        // Clear successfully uploaded objects.
  1.1406 +        for each (let id in resp.obj.success) {
  1.1407 +          delete this._modified[id];
  1.1408 +        }
  1.1409 +
  1.1410 +        up.clearRecords();
  1.1411 +      });
  1.1412 +
  1.1413 +      for each (let id in modifiedIDs) {
  1.1414 +        try {
  1.1415 +          let out = this._createRecord(id);
  1.1416 +          if (this._log.level <= Log.Level.Trace)
  1.1417 +            this._log.trace("Outgoing: " + out);
  1.1418 +
  1.1419 +          out.encrypt(this.service.collectionKeys.keyForCollection(this.name));
  1.1420 +          up.pushData(out);
  1.1421 +        }
  1.1422 +        catch(ex) {
  1.1423 +          this._log.warn("Error creating record: " + Utils.exceptionStr(ex));
  1.1424 +        }
  1.1425 +
  1.1426 +        // Partial upload
  1.1427 +        if ((++count % MAX_UPLOAD_RECORDS) == 0)
  1.1428 +          doUpload((count - MAX_UPLOAD_RECORDS) + " - " + count + " out");
  1.1429 +
  1.1430 +        this._store._sleep(0);
  1.1431 +      }
  1.1432 +
  1.1433 +      // Final upload
  1.1434 +      if (count % MAX_UPLOAD_RECORDS > 0)
  1.1435 +        doUpload(count >= MAX_UPLOAD_RECORDS ? "last batch" : "all");
  1.1436 +    }
  1.1437 +  },
  1.1438 +
  1.1439 +  // Any cleanup necessary.
  1.1440 +  // Save the current snapshot so as to calculate changes at next sync
  1.1441 +  _syncFinish: function () {
  1.1442 +    this._log.trace("Finishing up sync");
  1.1443 +    this._tracker.resetScore();
  1.1444 +
  1.1445 +    let doDelete = Utils.bind2(this, function(key, val) {
  1.1446 +      let coll = new Collection(this.engineURL, this._recordObj, this.service);
  1.1447 +      coll[key] = val;
  1.1448 +      coll.delete();
  1.1449 +    });
  1.1450 +
  1.1451 +    for (let [key, val] in Iterator(this._delete)) {
  1.1452 +      // Remove the key for future uses
  1.1453 +      delete this._delete[key];
  1.1454 +
  1.1455 +      // Send a simple delete for the property
  1.1456 +      if (key != "ids" || val.length <= 100)
  1.1457 +        doDelete(key, val);
  1.1458 +      else {
  1.1459 +        // For many ids, split into chunks of at most 100
  1.1460 +        while (val.length > 0) {
  1.1461 +          doDelete(key, val.slice(0, 100));
  1.1462 +          val = val.slice(100);
  1.1463 +        }
  1.1464 +      }
  1.1465 +    }
  1.1466 +  },
  1.1467 +
  1.1468 +  _syncCleanup: function () {
  1.1469 +    if (!this._modified) {
  1.1470 +      return;
  1.1471 +    }
  1.1472 +
  1.1473 +    // Mark failed WBOs as changed again so they are reuploaded next time.
  1.1474 +    for (let [id, when] in Iterator(this._modified)) {
  1.1475 +      this._tracker.addChangedID(id, when);
  1.1476 +    }
  1.1477 +    this._modified = {};
  1.1478 +  },
  1.1479 +
  1.1480 +  _sync: function () {
  1.1481 +    try {
  1.1482 +      this._syncStartup();
  1.1483 +      Observers.notify("weave:engine:sync:status", "process-incoming");
  1.1484 +      this._processIncoming();
  1.1485 +      Observers.notify("weave:engine:sync:status", "upload-outgoing");
  1.1486 +      this._uploadOutgoing();
  1.1487 +      this._syncFinish();
  1.1488 +    } finally {
  1.1489 +      this._syncCleanup();
  1.1490 +    }
  1.1491 +  },
  1.1492 +
  1.1493 +  canDecrypt: function () {
  1.1494 +    // Report failure even if there's nothing to decrypt
  1.1495 +    let canDecrypt = false;
  1.1496 +
  1.1497 +    // Fetch the most recently uploaded record and try to decrypt it
  1.1498 +    let test = new Collection(this.engineURL, this._recordObj, this.service);
  1.1499 +    test.limit = 1;
  1.1500 +    test.sort = "newest";
  1.1501 +    test.full = true;
  1.1502 +
  1.1503 +    let key = this.service.collectionKeys.keyForCollection(this.name);
  1.1504 +    test.recordHandler = function recordHandler(record) {
  1.1505 +      record.decrypt(key);
  1.1506 +      canDecrypt = true;
  1.1507 +    }.bind(this);
  1.1508 +
  1.1509 +    // Any failure fetching/decrypting will just result in false
  1.1510 +    try {
  1.1511 +      this._log.trace("Trying to decrypt a record from the server..");
  1.1512 +      test.get();
  1.1513 +    }
  1.1514 +    catch(ex) {
  1.1515 +      this._log.debug("Failed test decrypt: " + Utils.exceptionStr(ex));
  1.1516 +    }
  1.1517 +
  1.1518 +    return canDecrypt;
  1.1519 +  },
  1.1520 +
  1.1521 +  _resetClient: function () {
  1.1522 +    this.resetLastSync();
  1.1523 +    this.previousFailed = [];
  1.1524 +    this.toFetch = [];
  1.1525 +  },
  1.1526 +
  1.1527 +  wipeServer: function () {
  1.1528 +    let response = this.service.resource(this.engineURL).delete();
  1.1529 +    if (response.status != 200 && response.status != 404) {
  1.1530 +      throw response;
  1.1531 +    }
  1.1532 +    this._resetClient();
  1.1533 +  },
  1.1534 +
  1.1535 +  removeClientData: function () {
  1.1536 +    // Implement this method in engines that store client specific data
  1.1537 +    // on the server.
  1.1538 +  },
  1.1539 +
  1.1540 +  /*
  1.1541 +   * Decide on (and partially effect) an error-handling strategy.
  1.1542 +   *
  1.1543 +   * Asks the Service to respond to an HMAC error, which might result in keys
  1.1544 +   * being downloaded. That call returns true if an action which might allow a
  1.1545 +   * retry to occur.
  1.1546 +   *
  1.1547 +   * If `mayRetry` is truthy, and the Service suggests a retry,
  1.1548 +   * handleHMACMismatch returns kRecoveryStrategy.retry. Otherwise, it returns
  1.1549 +   * kRecoveryStrategy.error.
  1.1550 +   *
  1.1551 +   * Subclasses of SyncEngine can override this method to allow for different
  1.1552 +   * behavior -- e.g., to delete and ignore erroneous entries.
  1.1553 +   *
  1.1554 +   * All return values will be part of the kRecoveryStrategy enumeration.
  1.1555 +   */
  1.1556 +  handleHMACMismatch: function (item, mayRetry) {
  1.1557 +    // By default we either try again, or bail out noisily.
  1.1558 +    return (this.service.handleHMACEvent() && mayRetry) ?
  1.1559 +           SyncEngine.kRecoveryStrategy.retry :
  1.1560 +           SyncEngine.kRecoveryStrategy.error;
  1.1561 +  }
  1.1562 +};

mercurial