services/sync/modules/engines.js

Thu, 22 Jan 2015 13:21:57 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Thu, 22 Jan 2015 13:21:57 +0100
branch
TOR_BUG_9701
changeset 15
b8a032363ba2
permissions
-rw-r--r--

Incorporate requested changes from Mozilla in review:
https://bugzilla.mozilla.org/show_bug.cgi?id=1123480#c6

michael@0 1 /* This Source Code Form is subject to the terms of the Mozilla Public
michael@0 2 * License, v. 2.0. If a copy of the MPL was not distributed with this
michael@0 3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
michael@0 4
michael@0 5 this.EXPORTED_SYMBOLS = [
michael@0 6 "EngineManager",
michael@0 7 "Engine",
michael@0 8 "SyncEngine",
michael@0 9 "Tracker",
michael@0 10 "Store"
michael@0 11 ];
michael@0 12
michael@0 13 const {classes: Cc, interfaces: Ci, results: Cr, utils: Cu} = Components;
michael@0 14
michael@0 15 Cu.import("resource://services-common/async.js");
michael@0 16 Cu.import("resource://gre/modules/Log.jsm");
michael@0 17 Cu.import("resource://services-common/observers.js");
michael@0 18 Cu.import("resource://services-common/utils.js");
michael@0 19 Cu.import("resource://services-sync/constants.js");
michael@0 20 Cu.import("resource://services-sync/identity.js");
michael@0 21 Cu.import("resource://services-sync/record.js");
michael@0 22 Cu.import("resource://services-sync/resource.js");
michael@0 23 Cu.import("resource://services-sync/util.js");
michael@0 24
michael@0 25 /*
michael@0 26 * Trackers are associated with a single engine and deal with
michael@0 27 * listening for changes to their particular data type.
michael@0 28 *
michael@0 29 * There are two things they keep track of:
michael@0 30 * 1) A score, indicating how urgently the engine wants to sync
michael@0 31 * 2) A list of IDs for all the changed items that need to be synced
michael@0 32 * and updating their 'score', indicating how urgently they
michael@0 33 * want to sync.
michael@0 34 *
michael@0 35 */
michael@0 36 this.Tracker = function Tracker(name, engine) {
michael@0 37 if (!engine) {
michael@0 38 throw new Error("Tracker must be associated with an Engine instance.");
michael@0 39 }
michael@0 40
michael@0 41 name = name || "Unnamed";
michael@0 42 this.name = this.file = name.toLowerCase();
michael@0 43 this.engine = engine;
michael@0 44
michael@0 45 this._log = Log.repository.getLogger("Sync.Tracker." + name);
michael@0 46 let level = Svc.Prefs.get("log.logger.engine." + this.name, "Debug");
michael@0 47 this._log.level = Log.Level[level];
michael@0 48
michael@0 49 this._score = 0;
michael@0 50 this._ignored = [];
michael@0 51 this.ignoreAll = false;
michael@0 52 this.changedIDs = {};
michael@0 53 this.loadChangedIDs();
michael@0 54
michael@0 55 Svc.Obs.add("weave:engine:start-tracking", this);
michael@0 56 Svc.Obs.add("weave:engine:stop-tracking", this);
michael@0 57 };
michael@0 58
michael@0 59 Tracker.prototype = {
michael@0 60 /*
michael@0 61 * Score can be called as often as desired to decide which engines to sync
michael@0 62 *
michael@0 63 * Valid values for score:
michael@0 64 * -1: Do not sync unless the user specifically requests it (almost disabled)
michael@0 65 * 0: Nothing has changed
michael@0 66 * 100: Please sync me ASAP!
michael@0 67 *
michael@0 68 * Setting it to other values should (but doesn't currently) throw an exception
michael@0 69 */
michael@0 70 get score() {
michael@0 71 return this._score;
michael@0 72 },
michael@0 73
michael@0 74 set score(value) {
michael@0 75 this._score = value;
michael@0 76 Observers.notify("weave:engine:score:updated", this.name);
michael@0 77 },
michael@0 78
michael@0 79 // Should be called by service everytime a sync has been done for an engine
michael@0 80 resetScore: function () {
michael@0 81 this._score = 0;
michael@0 82 },
michael@0 83
michael@0 84 persistChangedIDs: true,
michael@0 85
michael@0 86 /**
michael@0 87 * Persist changedIDs to disk at a later date.
michael@0 88 * Optionally pass a callback to be invoked when the write has occurred.
michael@0 89 */
michael@0 90 saveChangedIDs: function (cb) {
michael@0 91 if (!this.persistChangedIDs) {
michael@0 92 this._log.debug("Not saving changedIDs.");
michael@0 93 return;
michael@0 94 }
michael@0 95 Utils.namedTimer(function () {
michael@0 96 this._log.debug("Saving changed IDs to " + this.file);
michael@0 97 Utils.jsonSave("changes/" + this.file, this, this.changedIDs, cb);
michael@0 98 }, 1000, this, "_lazySave");
michael@0 99 },
michael@0 100
michael@0 101 loadChangedIDs: function (cb) {
michael@0 102 Utils.jsonLoad("changes/" + this.file, this, function(json) {
michael@0 103 if (json && (typeof(json) == "object")) {
michael@0 104 this.changedIDs = json;
michael@0 105 } else {
michael@0 106 this._log.warn("Changed IDs file " + this.file + " contains non-object value.");
michael@0 107 json = null;
michael@0 108 }
michael@0 109 if (cb) {
michael@0 110 cb.call(this, json);
michael@0 111 }
michael@0 112 });
michael@0 113 },
michael@0 114
michael@0 115 // ignore/unignore specific IDs. Useful for ignoring items that are
michael@0 116 // being processed, or that shouldn't be synced.
michael@0 117 // But note: not persisted to disk
michael@0 118
michael@0 119 ignoreID: function (id) {
michael@0 120 this.unignoreID(id);
michael@0 121 this._ignored.push(id);
michael@0 122 },
michael@0 123
michael@0 124 unignoreID: function (id) {
michael@0 125 let index = this._ignored.indexOf(id);
michael@0 126 if (index != -1)
michael@0 127 this._ignored.splice(index, 1);
michael@0 128 },
michael@0 129
michael@0 130 addChangedID: function (id, when) {
michael@0 131 if (!id) {
michael@0 132 this._log.warn("Attempted to add undefined ID to tracker");
michael@0 133 return false;
michael@0 134 }
michael@0 135
michael@0 136 if (this.ignoreAll || (id in this._ignored)) {
michael@0 137 return false;
michael@0 138 }
michael@0 139
michael@0 140 // Default to the current time in seconds if no time is provided.
michael@0 141 if (when == null) {
michael@0 142 when = Math.floor(Date.now() / 1000);
michael@0 143 }
michael@0 144
michael@0 145 // Add/update the entry if we have a newer time.
michael@0 146 if ((this.changedIDs[id] || -Infinity) < when) {
michael@0 147 this._log.trace("Adding changed ID: " + id + ", " + when);
michael@0 148 this.changedIDs[id] = when;
michael@0 149 this.saveChangedIDs(this.onSavedChangedIDs);
michael@0 150 }
michael@0 151
michael@0 152 return true;
michael@0 153 },
michael@0 154
michael@0 155 removeChangedID: function (id) {
michael@0 156 if (!id) {
michael@0 157 this._log.warn("Attempted to remove undefined ID to tracker");
michael@0 158 return false;
michael@0 159 }
michael@0 160 if (this.ignoreAll || (id in this._ignored))
michael@0 161 return false;
michael@0 162 if (this.changedIDs[id] != null) {
michael@0 163 this._log.trace("Removing changed ID " + id);
michael@0 164 delete this.changedIDs[id];
michael@0 165 this.saveChangedIDs();
michael@0 166 }
michael@0 167 return true;
michael@0 168 },
michael@0 169
michael@0 170 clearChangedIDs: function () {
michael@0 171 this._log.trace("Clearing changed ID list");
michael@0 172 this.changedIDs = {};
michael@0 173 this.saveChangedIDs();
michael@0 174 },
michael@0 175
michael@0 176 _isTracking: false,
michael@0 177
michael@0 178 // Override these in your subclasses.
michael@0 179 startTracking: function () {
michael@0 180 },
michael@0 181
michael@0 182 stopTracking: function () {
michael@0 183 },
michael@0 184
michael@0 185 engineIsEnabled: function () {
michael@0 186 if (!this.engine) {
michael@0 187 // Can't tell -- we must be running in a test!
michael@0 188 return true;
michael@0 189 }
michael@0 190 return this.engine.enabled;
michael@0 191 },
michael@0 192
michael@0 193 onEngineEnabledChanged: function (engineEnabled) {
michael@0 194 if (engineEnabled == this._isTracking) {
michael@0 195 return;
michael@0 196 }
michael@0 197
michael@0 198 if (engineEnabled) {
michael@0 199 this.startTracking();
michael@0 200 this._isTracking = true;
michael@0 201 } else {
michael@0 202 this.stopTracking();
michael@0 203 this._isTracking = false;
michael@0 204 this.clearChangedIDs();
michael@0 205 }
michael@0 206 },
michael@0 207
michael@0 208 observe: function (subject, topic, data) {
michael@0 209 switch (topic) {
michael@0 210 case "weave:engine:start-tracking":
michael@0 211 if (!this.engineIsEnabled()) {
michael@0 212 return;
michael@0 213 }
michael@0 214 this._log.trace("Got start-tracking.");
michael@0 215 if (!this._isTracking) {
michael@0 216 this.startTracking();
michael@0 217 this._isTracking = true;
michael@0 218 }
michael@0 219 return;
michael@0 220 case "weave:engine:stop-tracking":
michael@0 221 this._log.trace("Got stop-tracking.");
michael@0 222 if (this._isTracking) {
michael@0 223 this.stopTracking();
michael@0 224 this._isTracking = false;
michael@0 225 }
michael@0 226 return;
michael@0 227 }
michael@0 228 }
michael@0 229 };
michael@0 230
michael@0 231
michael@0 232
michael@0 233 /**
michael@0 234 * The Store serves as the interface between Sync and stored data.
michael@0 235 *
michael@0 236 * The name "store" is slightly a misnomer because it doesn't actually "store"
michael@0 237 * anything. Instead, it serves as a gateway to something that actually does
michael@0 238 * the "storing."
michael@0 239 *
michael@0 240 * The store is responsible for record management inside an engine. It tells
michael@0 241 * Sync what items are available for Sync, converts items to and from Sync's
michael@0 242 * record format, and applies records from Sync into changes on the underlying
michael@0 243 * store.
michael@0 244 *
michael@0 245 * Store implementations require a number of functions to be implemented. These
michael@0 246 * are all documented below.
michael@0 247 *
michael@0 248 * For stores that deal with many records or which have expensive store access
michael@0 249 * routines, it is highly recommended to implement a custom applyIncomingBatch
michael@0 250 * and/or applyIncoming function on top of the basic APIs.
michael@0 251 */
michael@0 252
michael@0 253 this.Store = function Store(name, engine) {
michael@0 254 if (!engine) {
michael@0 255 throw new Error("Store must be associated with an Engine instance.");
michael@0 256 }
michael@0 257
michael@0 258 name = name || "Unnamed";
michael@0 259 this.name = name.toLowerCase();
michael@0 260 this.engine = engine;
michael@0 261
michael@0 262 this._log = Log.repository.getLogger("Sync.Store." + name);
michael@0 263 let level = Svc.Prefs.get("log.logger.engine." + this.name, "Debug");
michael@0 264 this._log.level = Log.Level[level];
michael@0 265
michael@0 266 XPCOMUtils.defineLazyGetter(this, "_timer", function() {
michael@0 267 return Cc["@mozilla.org/timer;1"].createInstance(Ci.nsITimer);
michael@0 268 });
michael@0 269 }
michael@0 270 Store.prototype = {
michael@0 271
michael@0 272 _sleep: function _sleep(delay) {
michael@0 273 let cb = Async.makeSyncCallback();
michael@0 274 this._timer.initWithCallback(cb, delay, Ci.nsITimer.TYPE_ONE_SHOT);
michael@0 275 Async.waitForSyncCallback(cb);
michael@0 276 },
michael@0 277
michael@0 278 /**
michael@0 279 * Apply multiple incoming records against the store.
michael@0 280 *
michael@0 281 * This is called with a set of incoming records to process. The function
michael@0 282 * should look at each record, reconcile with the current local state, and
michael@0 283 * make the local changes required to bring its state in alignment with the
michael@0 284 * record.
michael@0 285 *
michael@0 286 * The default implementation simply iterates over all records and calls
michael@0 287 * applyIncoming(). Store implementations may overwrite this function
michael@0 288 * if desired.
michael@0 289 *
michael@0 290 * @param records Array of records to apply
michael@0 291 * @return Array of record IDs which did not apply cleanly
michael@0 292 */
michael@0 293 applyIncomingBatch: function (records) {
michael@0 294 let failed = [];
michael@0 295 for each (let record in records) {
michael@0 296 try {
michael@0 297 this.applyIncoming(record);
michael@0 298 } catch (ex if (ex.code == Engine.prototype.eEngineAbortApplyIncoming)) {
michael@0 299 // This kind of exception should have a 'cause' attribute, which is an
michael@0 300 // originating exception.
michael@0 301 // ex.cause will carry its stack with it when rethrown.
michael@0 302 throw ex.cause;
michael@0 303 } catch (ex) {
michael@0 304 this._log.warn("Failed to apply incoming record " + record.id);
michael@0 305 this._log.warn("Encountered exception: " + Utils.exceptionStr(ex));
michael@0 306 failed.push(record.id);
michael@0 307 }
michael@0 308 };
michael@0 309 return failed;
michael@0 310 },
michael@0 311
michael@0 312 /**
michael@0 313 * Apply a single record against the store.
michael@0 314 *
michael@0 315 * This takes a single record and makes the local changes required so the
michael@0 316 * local state matches what's in the record.
michael@0 317 *
michael@0 318 * The default implementation calls one of remove(), create(), or update()
michael@0 319 * depending on the state obtained from the store itself. Store
michael@0 320 * implementations may overwrite this function if desired.
michael@0 321 *
michael@0 322 * @param record
michael@0 323 * Record to apply
michael@0 324 */
michael@0 325 applyIncoming: function (record) {
michael@0 326 if (record.deleted)
michael@0 327 this.remove(record);
michael@0 328 else if (!this.itemExists(record.id))
michael@0 329 this.create(record);
michael@0 330 else
michael@0 331 this.update(record);
michael@0 332 },
michael@0 333
michael@0 334 // override these in derived objects
michael@0 335
michael@0 336 /**
michael@0 337 * Create an item in the store from a record.
michael@0 338 *
michael@0 339 * This is called by the default implementation of applyIncoming(). If using
michael@0 340 * applyIncomingBatch(), this won't be called unless your store calls it.
michael@0 341 *
michael@0 342 * @param record
michael@0 343 * The store record to create an item from
michael@0 344 */
michael@0 345 create: function (record) {
michael@0 346 throw "override create in a subclass";
michael@0 347 },
michael@0 348
michael@0 349 /**
michael@0 350 * Remove an item in the store from a record.
michael@0 351 *
michael@0 352 * This is called by the default implementation of applyIncoming(). If using
michael@0 353 * applyIncomingBatch(), this won't be called unless your store calls it.
michael@0 354 *
michael@0 355 * @param record
michael@0 356 * The store record to delete an item from
michael@0 357 */
michael@0 358 remove: function (record) {
michael@0 359 throw "override remove in a subclass";
michael@0 360 },
michael@0 361
michael@0 362 /**
michael@0 363 * Update an item from a record.
michael@0 364 *
michael@0 365 * This is called by the default implementation of applyIncoming(). If using
michael@0 366 * applyIncomingBatch(), this won't be called unless your store calls it.
michael@0 367 *
michael@0 368 * @param record
michael@0 369 * The record to use to update an item from
michael@0 370 */
michael@0 371 update: function (record) {
michael@0 372 throw "override update in a subclass";
michael@0 373 },
michael@0 374
michael@0 375 /**
michael@0 376 * Determine whether a record with the specified ID exists.
michael@0 377 *
michael@0 378 * Takes a string record ID and returns a booleans saying whether the record
michael@0 379 * exists.
michael@0 380 *
michael@0 381 * @param id
michael@0 382 * string record ID
michael@0 383 * @return boolean indicating whether record exists locally
michael@0 384 */
michael@0 385 itemExists: function (id) {
michael@0 386 throw "override itemExists in a subclass";
michael@0 387 },
michael@0 388
michael@0 389 /**
michael@0 390 * Create a record from the specified ID.
michael@0 391 *
michael@0 392 * If the ID is known, the record should be populated with metadata from
michael@0 393 * the store. If the ID is not known, the record should be created with the
michael@0 394 * delete field set to true.
michael@0 395 *
michael@0 396 * @param id
michael@0 397 * string record ID
michael@0 398 * @param collection
michael@0 399 * Collection to add record to. This is typically passed into the
michael@0 400 * constructor for the newly-created record.
michael@0 401 * @return record type for this engine
michael@0 402 */
michael@0 403 createRecord: function (id, collection) {
michael@0 404 throw "override createRecord in a subclass";
michael@0 405 },
michael@0 406
michael@0 407 /**
michael@0 408 * Change the ID of a record.
michael@0 409 *
michael@0 410 * @param oldID
michael@0 411 * string old/current record ID
michael@0 412 * @param newID
michael@0 413 * string new record ID
michael@0 414 */
michael@0 415 changeItemID: function (oldID, newID) {
michael@0 416 throw "override changeItemID in a subclass";
michael@0 417 },
michael@0 418
michael@0 419 /**
michael@0 420 * Obtain the set of all known record IDs.
michael@0 421 *
michael@0 422 * @return Object with ID strings as keys and values of true. The values
michael@0 423 * are ignored.
michael@0 424 */
michael@0 425 getAllIDs: function () {
michael@0 426 throw "override getAllIDs in a subclass";
michael@0 427 },
michael@0 428
michael@0 429 /**
michael@0 430 * Wipe all data in the store.
michael@0 431 *
michael@0 432 * This function is called during remote wipes or when replacing local data
michael@0 433 * with remote data.
michael@0 434 *
michael@0 435 * This function should delete all local data that the store is managing. It
michael@0 436 * can be thought of as clearing out all state and restoring the "new
michael@0 437 * browser" state.
michael@0 438 */
michael@0 439 wipe: function () {
michael@0 440 throw "override wipe in a subclass";
michael@0 441 }
michael@0 442 };
michael@0 443
michael@0 444 this.EngineManager = function EngineManager(service) {
michael@0 445 this.service = service;
michael@0 446
michael@0 447 this._engines = {};
michael@0 448
michael@0 449 // This will be populated by Service on startup.
michael@0 450 this._declined = new Set();
michael@0 451 this._log = Log.repository.getLogger("Sync.EngineManager");
michael@0 452 this._log.level = Log.Level[Svc.Prefs.get("log.logger.service.engines", "Debug")];
michael@0 453 }
michael@0 454 EngineManager.prototype = {
michael@0 455 get: function (name) {
michael@0 456 // Return an array of engines if we have an array of names
michael@0 457 if (Array.isArray(name)) {
michael@0 458 let engines = [];
michael@0 459 name.forEach(function(name) {
michael@0 460 let engine = this.get(name);
michael@0 461 if (engine) {
michael@0 462 engines.push(engine);
michael@0 463 }
michael@0 464 }, this);
michael@0 465 return engines;
michael@0 466 }
michael@0 467
michael@0 468 let engine = this._engines[name];
michael@0 469 if (!engine) {
michael@0 470 this._log.debug("Could not get engine: " + name);
michael@0 471 if (Object.keys) {
michael@0 472 this._log.debug("Engines are: " + JSON.stringify(Object.keys(this._engines)));
michael@0 473 }
michael@0 474 }
michael@0 475 return engine;
michael@0 476 },
michael@0 477
michael@0 478 getAll: function () {
michael@0 479 return [engine for ([name, engine] in Iterator(this._engines))];
michael@0 480 },
michael@0 481
michael@0 482 /**
michael@0 483 * N.B., does not pay attention to the declined list.
michael@0 484 */
michael@0 485 getEnabled: function () {
michael@0 486 return this.getAll().filter(function(engine) engine.enabled);
michael@0 487 },
michael@0 488
michael@0 489 get enabledEngineNames() {
michael@0 490 return [e.name for each (e in this.getEnabled())];
michael@0 491 },
michael@0 492
michael@0 493 persistDeclined: function () {
michael@0 494 Svc.Prefs.set("declinedEngines", [...this._declined].join(","));
michael@0 495 },
michael@0 496
michael@0 497 /**
michael@0 498 * Returns an array.
michael@0 499 */
michael@0 500 getDeclined: function () {
michael@0 501 return [...this._declined];
michael@0 502 },
michael@0 503
michael@0 504 setDeclined: function (engines) {
michael@0 505 this._declined = new Set(engines);
michael@0 506 this.persistDeclined();
michael@0 507 },
michael@0 508
michael@0 509 isDeclined: function (engineName) {
michael@0 510 return this._declined.has(engineName);
michael@0 511 },
michael@0 512
michael@0 513 /**
michael@0 514 * Accepts a Set or an array.
michael@0 515 */
michael@0 516 decline: function (engines) {
michael@0 517 for (let e of engines) {
michael@0 518 this._declined.add(e);
michael@0 519 }
michael@0 520 this.persistDeclined();
michael@0 521 },
michael@0 522
michael@0 523 undecline: function (engines) {
michael@0 524 for (let e of engines) {
michael@0 525 this._declined.delete(e);
michael@0 526 }
michael@0 527 this.persistDeclined();
michael@0 528 },
michael@0 529
michael@0 530 /**
michael@0 531 * Mark any non-enabled engines as declined.
michael@0 532 *
michael@0 533 * This is useful after initial customization during setup.
michael@0 534 */
michael@0 535 declineDisabled: function () {
michael@0 536 for (let e of this.getAll()) {
michael@0 537 if (!e.enabled) {
michael@0 538 this._log.debug("Declining disabled engine " + e.name);
michael@0 539 this._declined.add(e.name);
michael@0 540 }
michael@0 541 }
michael@0 542 this.persistDeclined();
michael@0 543 },
michael@0 544
michael@0 545 /**
michael@0 546 * Register an Engine to the service. Alternatively, give an array of engine
michael@0 547 * objects to register.
michael@0 548 *
michael@0 549 * @param engineObject
michael@0 550 * Engine object used to get an instance of the engine
michael@0 551 * @return The engine object if anything failed
michael@0 552 */
michael@0 553 register: function (engineObject) {
michael@0 554 if (Array.isArray(engineObject)) {
michael@0 555 return engineObject.map(this.register, this);
michael@0 556 }
michael@0 557
michael@0 558 try {
michael@0 559 let engine = new engineObject(this.service);
michael@0 560 let name = engine.name;
michael@0 561 if (name in this._engines) {
michael@0 562 this._log.error("Engine '" + name + "' is already registered!");
michael@0 563 } else {
michael@0 564 this._engines[name] = engine;
michael@0 565 }
michael@0 566 } catch (ex) {
michael@0 567 this._log.error(CommonUtils.exceptionStr(ex));
michael@0 568
michael@0 569 let mesg = ex.message ? ex.message : ex;
michael@0 570 let name = engineObject || "";
michael@0 571 name = name.prototype || "";
michael@0 572 name = name.name || "";
michael@0 573
michael@0 574 let out = "Could not initialize engine '" + name + "': " + mesg;
michael@0 575 this._log.error(out);
michael@0 576
michael@0 577 return engineObject;
michael@0 578 }
michael@0 579 },
michael@0 580
michael@0 581 unregister: function (val) {
michael@0 582 let name = val;
michael@0 583 if (val instanceof Engine) {
michael@0 584 name = val.name;
michael@0 585 }
michael@0 586 delete this._engines[name];
michael@0 587 },
michael@0 588
michael@0 589 clear: function () {
michael@0 590 for (let name in this._engines) {
michael@0 591 delete this._engines[name];
michael@0 592 }
michael@0 593 },
michael@0 594 };
michael@0 595
michael@0 596 this.Engine = function Engine(name, service) {
michael@0 597 if (!service) {
michael@0 598 throw new Error("Engine must be associated with a Service instance.");
michael@0 599 }
michael@0 600
michael@0 601 this.Name = name || "Unnamed";
michael@0 602 this.name = name.toLowerCase();
michael@0 603 this.service = service;
michael@0 604
michael@0 605 this._notify = Utils.notify("weave:engine:");
michael@0 606 this._log = Log.repository.getLogger("Sync.Engine." + this.Name);
michael@0 607 let level = Svc.Prefs.get("log.logger.engine." + this.name, "Debug");
michael@0 608 this._log.level = Log.Level[level];
michael@0 609
michael@0 610 this._tracker; // initialize tracker to load previously changed IDs
michael@0 611 this._log.debug("Engine initialized");
michael@0 612 }
michael@0 613 Engine.prototype = {
michael@0 614 // _storeObj, and _trackerObj should to be overridden in subclasses
michael@0 615 _storeObj: Store,
michael@0 616 _trackerObj: Tracker,
michael@0 617
michael@0 618 // Local 'constant'.
michael@0 619 // Signal to the engine that processing further records is pointless.
michael@0 620 eEngineAbortApplyIncoming: "error.engine.abort.applyincoming",
michael@0 621
michael@0 622 get prefName() this.name,
michael@0 623 get enabled() {
michael@0 624 return Svc.Prefs.get("engine." + this.prefName, false);
michael@0 625 },
michael@0 626
michael@0 627 set enabled(val) {
michael@0 628 Svc.Prefs.set("engine." + this.prefName, !!val);
michael@0 629 this._tracker.onEngineEnabledChanged(val);
michael@0 630 },
michael@0 631
michael@0 632 get score() this._tracker.score,
michael@0 633
michael@0 634 get _store() {
michael@0 635 let store = new this._storeObj(this.Name, this);
michael@0 636 this.__defineGetter__("_store", function() store);
michael@0 637 return store;
michael@0 638 },
michael@0 639
michael@0 640 get _tracker() {
michael@0 641 let tracker = new this._trackerObj(this.Name, this);
michael@0 642 this.__defineGetter__("_tracker", function() tracker);
michael@0 643 return tracker;
michael@0 644 },
michael@0 645
michael@0 646 sync: function () {
michael@0 647 if (!this.enabled) {
michael@0 648 return;
michael@0 649 }
michael@0 650
michael@0 651 if (!this._sync) {
michael@0 652 throw "engine does not implement _sync method";
michael@0 653 }
michael@0 654
michael@0 655 this._notify("sync", this.name, this._sync)();
michael@0 656 },
michael@0 657
michael@0 658 /**
michael@0 659 * Get rid of any local meta-data.
michael@0 660 */
michael@0 661 resetClient: function () {
michael@0 662 if (!this._resetClient) {
michael@0 663 throw "engine does not implement _resetClient method";
michael@0 664 }
michael@0 665
michael@0 666 this._notify("reset-client", this.name, this._resetClient)();
michael@0 667 },
michael@0 668
michael@0 669 _wipeClient: function () {
michael@0 670 this.resetClient();
michael@0 671 this._log.debug("Deleting all local data");
michael@0 672 this._tracker.ignoreAll = true;
michael@0 673 this._store.wipe();
michael@0 674 this._tracker.ignoreAll = false;
michael@0 675 this._tracker.clearChangedIDs();
michael@0 676 },
michael@0 677
michael@0 678 wipeClient: function () {
michael@0 679 this._notify("wipe-client", this.name, this._wipeClient)();
michael@0 680 }
michael@0 681 };
michael@0 682
michael@0 683 this.SyncEngine = function SyncEngine(name, service) {
michael@0 684 Engine.call(this, name || "SyncEngine", service);
michael@0 685
michael@0 686 this.loadToFetch();
michael@0 687 this.loadPreviousFailed();
michael@0 688 }
michael@0 689
michael@0 690 // Enumeration to define approaches to handling bad records.
michael@0 691 // Attached to the constructor to allow use as a kind of static enumeration.
michael@0 692 SyncEngine.kRecoveryStrategy = {
michael@0 693 ignore: "ignore",
michael@0 694 retry: "retry",
michael@0 695 error: "error"
michael@0 696 };
michael@0 697
michael@0 698 SyncEngine.prototype = {
michael@0 699 __proto__: Engine.prototype,
michael@0 700 _recordObj: CryptoWrapper,
michael@0 701 version: 1,
michael@0 702
michael@0 703 // How many records to pull in a single sync. This is primarily to avoid very
michael@0 704 // long first syncs against profiles with many history records.
michael@0 705 downloadLimit: null,
michael@0 706
michael@0 707 // How many records to pull at one time when specifying IDs. This is to avoid
michael@0 708 // URI length limitations.
michael@0 709 guidFetchBatchSize: DEFAULT_GUID_FETCH_BATCH_SIZE,
michael@0 710 mobileGUIDFetchBatchSize: DEFAULT_MOBILE_GUID_FETCH_BATCH_SIZE,
michael@0 711
michael@0 712 // How many records to process in a single batch.
michael@0 713 applyIncomingBatchSize: DEFAULT_STORE_BATCH_SIZE,
michael@0 714
michael@0 715 get storageURL() this.service.storageURL,
michael@0 716
michael@0 717 get engineURL() this.storageURL + this.name,
michael@0 718
michael@0 719 get cryptoKeysURL() this.storageURL + "crypto/keys",
michael@0 720
michael@0 721 get metaURL() this.storageURL + "meta/global",
michael@0 722
michael@0 723 get syncID() {
michael@0 724 // Generate a random syncID if we don't have one
michael@0 725 let syncID = Svc.Prefs.get(this.name + ".syncID", "");
michael@0 726 return syncID == "" ? this.syncID = Utils.makeGUID() : syncID;
michael@0 727 },
michael@0 728 set syncID(value) {
michael@0 729 Svc.Prefs.set(this.name + ".syncID", value);
michael@0 730 },
michael@0 731
michael@0 732 /*
michael@0 733 * lastSync is a timestamp in server time.
michael@0 734 */
michael@0 735 get lastSync() {
michael@0 736 return parseFloat(Svc.Prefs.get(this.name + ".lastSync", "0"));
michael@0 737 },
michael@0 738 set lastSync(value) {
michael@0 739 // Reset the pref in-case it's a number instead of a string
michael@0 740 Svc.Prefs.reset(this.name + ".lastSync");
michael@0 741 // Store the value as a string to keep floating point precision
michael@0 742 Svc.Prefs.set(this.name + ".lastSync", value.toString());
michael@0 743 },
michael@0 744 resetLastSync: function () {
michael@0 745 this._log.debug("Resetting " + this.name + " last sync time");
michael@0 746 Svc.Prefs.reset(this.name + ".lastSync");
michael@0 747 Svc.Prefs.set(this.name + ".lastSync", "0");
michael@0 748 this.lastSyncLocal = 0;
michael@0 749 },
michael@0 750
michael@0 751 get toFetch() this._toFetch,
michael@0 752 set toFetch(val) {
michael@0 753 let cb = (error) => this._log.error(Utils.exceptionStr(error));
michael@0 754 // Coerce the array to a string for more efficient comparison.
michael@0 755 if (val + "" == this._toFetch) {
michael@0 756 return;
michael@0 757 }
michael@0 758 this._toFetch = val;
michael@0 759 Utils.namedTimer(function () {
michael@0 760 Utils.jsonSave("toFetch/" + this.name, this, val, cb);
michael@0 761 }, 0, this, "_toFetchDelay");
michael@0 762 },
michael@0 763
michael@0 764 loadToFetch: function () {
michael@0 765 // Initialize to empty if there's no file.
michael@0 766 this._toFetch = [];
michael@0 767 Utils.jsonLoad("toFetch/" + this.name, this, function(toFetch) {
michael@0 768 if (toFetch) {
michael@0 769 this._toFetch = toFetch;
michael@0 770 }
michael@0 771 });
michael@0 772 },
michael@0 773
michael@0 774 get previousFailed() this._previousFailed,
michael@0 775 set previousFailed(val) {
michael@0 776 let cb = (error) => this._log.error(Utils.exceptionStr(error));
michael@0 777 // Coerce the array to a string for more efficient comparison.
michael@0 778 if (val + "" == this._previousFailed) {
michael@0 779 return;
michael@0 780 }
michael@0 781 this._previousFailed = val;
michael@0 782 Utils.namedTimer(function () {
michael@0 783 Utils.jsonSave("failed/" + this.name, this, val, cb);
michael@0 784 }, 0, this, "_previousFailedDelay");
michael@0 785 },
michael@0 786
michael@0 787 loadPreviousFailed: function () {
michael@0 788 // Initialize to empty if there's no file
michael@0 789 this._previousFailed = [];
michael@0 790 Utils.jsonLoad("failed/" + this.name, this, function(previousFailed) {
michael@0 791 if (previousFailed) {
michael@0 792 this._previousFailed = previousFailed;
michael@0 793 }
michael@0 794 });
michael@0 795 },
michael@0 796
michael@0 797 /*
michael@0 798 * lastSyncLocal is a timestamp in local time.
michael@0 799 */
michael@0 800 get lastSyncLocal() {
michael@0 801 return parseInt(Svc.Prefs.get(this.name + ".lastSyncLocal", "0"), 10);
michael@0 802 },
michael@0 803 set lastSyncLocal(value) {
michael@0 804 // Store as a string because pref can only store C longs as numbers.
michael@0 805 Svc.Prefs.set(this.name + ".lastSyncLocal", value.toString());
michael@0 806 },
michael@0 807
michael@0 808 /*
michael@0 809 * Returns a mapping of IDs -> changed timestamp. Engine implementations
michael@0 810 * can override this method to bypass the tracker for certain or all
michael@0 811 * changed items.
michael@0 812 */
michael@0 813 getChangedIDs: function () {
michael@0 814 return this._tracker.changedIDs;
michael@0 815 },
michael@0 816
michael@0 817 // Create a new record using the store and add in crypto fields.
michael@0 818 _createRecord: function (id) {
michael@0 819 let record = this._store.createRecord(id, this.name);
michael@0 820 record.id = id;
michael@0 821 record.collection = this.name;
michael@0 822 return record;
michael@0 823 },
michael@0 824
michael@0 825 // Any setup that needs to happen at the beginning of each sync.
michael@0 826 _syncStartup: function () {
michael@0 827
michael@0 828 // Determine if we need to wipe on outdated versions
michael@0 829 let metaGlobal = this.service.recordManager.get(this.metaURL);
michael@0 830 let engines = metaGlobal.payload.engines || {};
michael@0 831 let engineData = engines[this.name] || {};
michael@0 832
michael@0 833 let needsWipe = false;
michael@0 834
michael@0 835 // Assume missing versions are 0 and wipe the server
michael@0 836 if ((engineData.version || 0) < this.version) {
michael@0 837 this._log.debug("Old engine data: " + [engineData.version, this.version]);
michael@0 838
michael@0 839 // Prepare to clear the server and upload everything
michael@0 840 needsWipe = true;
michael@0 841 this.syncID = "";
michael@0 842
michael@0 843 // Set the newer version and newly generated syncID
michael@0 844 engineData.version = this.version;
michael@0 845 engineData.syncID = this.syncID;
michael@0 846
michael@0 847 // Put the new data back into meta/global and mark for upload
michael@0 848 engines[this.name] = engineData;
michael@0 849 metaGlobal.payload.engines = engines;
michael@0 850 metaGlobal.changed = true;
michael@0 851 }
michael@0 852 // Don't sync this engine if the server has newer data
michael@0 853 else if (engineData.version > this.version) {
michael@0 854 let error = new String("New data: " + [engineData.version, this.version]);
michael@0 855 error.failureCode = VERSION_OUT_OF_DATE;
michael@0 856 throw error;
michael@0 857 }
michael@0 858 // Changes to syncID mean we'll need to upload everything
michael@0 859 else if (engineData.syncID != this.syncID) {
michael@0 860 this._log.debug("Engine syncIDs: " + [engineData.syncID, this.syncID]);
michael@0 861 this.syncID = engineData.syncID;
michael@0 862 this._resetClient();
michael@0 863 };
michael@0 864
michael@0 865 // Delete any existing data and reupload on bad version or missing meta.
michael@0 866 // No crypto component here...? We could regenerate per-collection keys...
michael@0 867 if (needsWipe) {
michael@0 868 this.wipeServer();
michael@0 869 }
michael@0 870
michael@0 871 // Save objects that need to be uploaded in this._modified. We also save
michael@0 872 // the timestamp of this fetch in this.lastSyncLocal. As we successfully
michael@0 873 // upload objects we remove them from this._modified. If an error occurs
michael@0 874 // or any objects fail to upload, they will remain in this._modified. At
michael@0 875 // the end of a sync, or after an error, we add all objects remaining in
michael@0 876 // this._modified to the tracker.
michael@0 877 this.lastSyncLocal = Date.now();
michael@0 878 if (this.lastSync) {
michael@0 879 this._modified = this.getChangedIDs();
michael@0 880 } else {
michael@0 881 // Mark all items to be uploaded, but treat them as changed from long ago
michael@0 882 this._log.debug("First sync, uploading all items");
michael@0 883 this._modified = {};
michael@0 884 for (let id in this._store.getAllIDs()) {
michael@0 885 this._modified[id] = 0;
michael@0 886 }
michael@0 887 }
michael@0 888 // Clear the tracker now. If the sync fails we'll add the ones we failed
michael@0 889 // to upload back.
michael@0 890 this._tracker.clearChangedIDs();
michael@0 891
michael@0 892 this._log.info(Object.keys(this._modified).length +
michael@0 893 " outgoing items pre-reconciliation");
michael@0 894
michael@0 895 // Keep track of what to delete at the end of sync
michael@0 896 this._delete = {};
michael@0 897 },
michael@0 898
michael@0 899 /**
michael@0 900 * A tiny abstraction to make it easier to test incoming record
michael@0 901 * application.
michael@0 902 */
michael@0 903 _itemSource: function () {
michael@0 904 return new Collection(this.engineURL, this._recordObj, this.service);
michael@0 905 },
michael@0 906
michael@0 907 /**
michael@0 908 * Process incoming records.
michael@0 909 * In the most awful and untestable way possible.
michael@0 910 * This now accepts something that makes testing vaguely less impossible.
michael@0 911 */
michael@0 912 _processIncoming: function (newitems) {
michael@0 913 this._log.trace("Downloading & applying server changes");
michael@0 914
michael@0 915 // Figure out how many total items to fetch this sync; do less on mobile.
michael@0 916 let batchSize = this.downloadLimit || Infinity;
michael@0 917 let isMobile = (Svc.Prefs.get("client.type") == "mobile");
michael@0 918
michael@0 919 if (!newitems) {
michael@0 920 newitems = this._itemSource();
michael@0 921 }
michael@0 922
michael@0 923 if (isMobile) {
michael@0 924 batchSize = MOBILE_BATCH_SIZE;
michael@0 925 }
michael@0 926 newitems.newer = this.lastSync;
michael@0 927 newitems.full = true;
michael@0 928 newitems.limit = batchSize;
michael@0 929
michael@0 930 // applied => number of items that should be applied.
michael@0 931 // failed => number of items that failed in this sync.
michael@0 932 // newFailed => number of items that failed for the first time in this sync.
michael@0 933 // reconciled => number of items that were reconciled.
michael@0 934 let count = {applied: 0, failed: 0, newFailed: 0, reconciled: 0};
michael@0 935 let handled = [];
michael@0 936 let applyBatch = [];
michael@0 937 let failed = [];
michael@0 938 let failedInPreviousSync = this.previousFailed;
michael@0 939 let fetchBatch = Utils.arrayUnion(this.toFetch, failedInPreviousSync);
michael@0 940 // Reset previousFailed for each sync since previously failed items may not fail again.
michael@0 941 this.previousFailed = [];
michael@0 942
michael@0 943 // Used (via exceptions) to allow the record handler/reconciliation/etc.
michael@0 944 // methods to signal that they would like processing of incoming records to
michael@0 945 // cease.
michael@0 946 let aborting = undefined;
michael@0 947
michael@0 948 function doApplyBatch() {
michael@0 949 this._tracker.ignoreAll = true;
michael@0 950 try {
michael@0 951 failed = failed.concat(this._store.applyIncomingBatch(applyBatch));
michael@0 952 } catch (ex) {
michael@0 953 // Catch any error that escapes from applyIncomingBatch. At present
michael@0 954 // those will all be abort events.
michael@0 955 this._log.warn("Got exception " + Utils.exceptionStr(ex) +
michael@0 956 ", aborting processIncoming.");
michael@0 957 aborting = ex;
michael@0 958 }
michael@0 959 this._tracker.ignoreAll = false;
michael@0 960 applyBatch = [];
michael@0 961 }
michael@0 962
michael@0 963 function doApplyBatchAndPersistFailed() {
michael@0 964 // Apply remaining batch.
michael@0 965 if (applyBatch.length) {
michael@0 966 doApplyBatch.call(this);
michael@0 967 }
michael@0 968 // Persist failed items so we refetch them.
michael@0 969 if (failed.length) {
michael@0 970 this.previousFailed = Utils.arrayUnion(failed, this.previousFailed);
michael@0 971 count.failed += failed.length;
michael@0 972 this._log.debug("Records that failed to apply: " + failed);
michael@0 973 failed = [];
michael@0 974 }
michael@0 975 }
michael@0 976
michael@0 977 let key = this.service.collectionKeys.keyForCollection(this.name);
michael@0 978
michael@0 979 // Not binding this method to 'this' for performance reasons. It gets
michael@0 980 // called for every incoming record.
michael@0 981 let self = this;
michael@0 982
michael@0 983 newitems.recordHandler = function(item) {
michael@0 984 if (aborting) {
michael@0 985 return;
michael@0 986 }
michael@0 987
michael@0 988 // Grab a later last modified if possible
michael@0 989 if (self.lastModified == null || item.modified > self.lastModified)
michael@0 990 self.lastModified = item.modified;
michael@0 991
michael@0 992 // Track the collection for the WBO.
michael@0 993 item.collection = self.name;
michael@0 994
michael@0 995 // Remember which records were processed
michael@0 996 handled.push(item.id);
michael@0 997
michael@0 998 try {
michael@0 999 try {
michael@0 1000 item.decrypt(key);
michael@0 1001 } catch (ex if Utils.isHMACMismatch(ex)) {
michael@0 1002 let strategy = self.handleHMACMismatch(item, true);
michael@0 1003 if (strategy == SyncEngine.kRecoveryStrategy.retry) {
michael@0 1004 // You only get one retry.
michael@0 1005 try {
michael@0 1006 // Try decrypting again, typically because we've got new keys.
michael@0 1007 self._log.info("Trying decrypt again...");
michael@0 1008 key = self.service.collectionKeys.keyForCollection(self.name);
michael@0 1009 item.decrypt(key);
michael@0 1010 strategy = null;
michael@0 1011 } catch (ex if Utils.isHMACMismatch(ex)) {
michael@0 1012 strategy = self.handleHMACMismatch(item, false);
michael@0 1013 }
michael@0 1014 }
michael@0 1015
michael@0 1016 switch (strategy) {
michael@0 1017 case null:
michael@0 1018 // Retry succeeded! No further handling.
michael@0 1019 break;
michael@0 1020 case SyncEngine.kRecoveryStrategy.retry:
michael@0 1021 self._log.debug("Ignoring second retry suggestion.");
michael@0 1022 // Fall through to error case.
michael@0 1023 case SyncEngine.kRecoveryStrategy.error:
michael@0 1024 self._log.warn("Error decrypting record: " + Utils.exceptionStr(ex));
michael@0 1025 failed.push(item.id);
michael@0 1026 return;
michael@0 1027 case SyncEngine.kRecoveryStrategy.ignore:
michael@0 1028 self._log.debug("Ignoring record " + item.id +
michael@0 1029 " with bad HMAC: already handled.");
michael@0 1030 return;
michael@0 1031 }
michael@0 1032 }
michael@0 1033 } catch (ex) {
michael@0 1034 self._log.warn("Error decrypting record: " + Utils.exceptionStr(ex));
michael@0 1035 failed.push(item.id);
michael@0 1036 return;
michael@0 1037 }
michael@0 1038
michael@0 1039 let shouldApply;
michael@0 1040 try {
michael@0 1041 shouldApply = self._reconcile(item);
michael@0 1042 } catch (ex if (ex.code == Engine.prototype.eEngineAbortApplyIncoming)) {
michael@0 1043 self._log.warn("Reconciliation failed: aborting incoming processing.");
michael@0 1044 failed.push(item.id);
michael@0 1045 aborting = ex.cause;
michael@0 1046 } catch (ex) {
michael@0 1047 self._log.warn("Failed to reconcile incoming record " + item.id);
michael@0 1048 self._log.warn("Encountered exception: " + Utils.exceptionStr(ex));
michael@0 1049 failed.push(item.id);
michael@0 1050 return;
michael@0 1051 }
michael@0 1052
michael@0 1053 if (shouldApply) {
michael@0 1054 count.applied++;
michael@0 1055 applyBatch.push(item);
michael@0 1056 } else {
michael@0 1057 count.reconciled++;
michael@0 1058 self._log.trace("Skipping reconciled incoming item " + item.id);
michael@0 1059 }
michael@0 1060
michael@0 1061 if (applyBatch.length == self.applyIncomingBatchSize) {
michael@0 1062 doApplyBatch.call(self);
michael@0 1063 }
michael@0 1064 self._store._sleep(0);
michael@0 1065 };
michael@0 1066
michael@0 1067 // Only bother getting data from the server if there's new things
michael@0 1068 if (this.lastModified == null || this.lastModified > this.lastSync) {
michael@0 1069 let resp = newitems.get();
michael@0 1070 doApplyBatchAndPersistFailed.call(this);
michael@0 1071 if (!resp.success) {
michael@0 1072 resp.failureCode = ENGINE_DOWNLOAD_FAIL;
michael@0 1073 throw resp;
michael@0 1074 }
michael@0 1075
michael@0 1076 if (aborting) {
michael@0 1077 throw aborting;
michael@0 1078 }
michael@0 1079 }
michael@0 1080
michael@0 1081 // Mobile: check if we got the maximum that we requested; get the rest if so.
michael@0 1082 if (handled.length == newitems.limit) {
michael@0 1083 let guidColl = new Collection(this.engineURL, null, this.service);
michael@0 1084
michael@0 1085 // Sort and limit so that on mobile we only get the last X records.
michael@0 1086 guidColl.limit = this.downloadLimit;
michael@0 1087 guidColl.newer = this.lastSync;
michael@0 1088
michael@0 1089 // index: Orders by the sortindex descending (highest weight first).
michael@0 1090 guidColl.sort = "index";
michael@0 1091
michael@0 1092 let guids = guidColl.get();
michael@0 1093 if (!guids.success)
michael@0 1094 throw guids;
michael@0 1095
michael@0 1096 // Figure out which guids weren't just fetched then remove any guids that
michael@0 1097 // were already waiting and prepend the new ones
michael@0 1098 let extra = Utils.arraySub(guids.obj, handled);
michael@0 1099 if (extra.length > 0) {
michael@0 1100 fetchBatch = Utils.arrayUnion(extra, fetchBatch);
michael@0 1101 this.toFetch = Utils.arrayUnion(extra, this.toFetch);
michael@0 1102 }
michael@0 1103 }
michael@0 1104
michael@0 1105 // Fast-foward the lastSync timestamp since we have stored the
michael@0 1106 // remaining items in toFetch.
michael@0 1107 if (this.lastSync < this.lastModified) {
michael@0 1108 this.lastSync = this.lastModified;
michael@0 1109 }
michael@0 1110
michael@0 1111 // Process any backlog of GUIDs.
michael@0 1112 // At this point we impose an upper limit on the number of items to fetch
michael@0 1113 // in a single request, even for desktop, to avoid hitting URI limits.
michael@0 1114 batchSize = isMobile ? this.mobileGUIDFetchBatchSize :
michael@0 1115 this.guidFetchBatchSize;
michael@0 1116
michael@0 1117 while (fetchBatch.length && !aborting) {
michael@0 1118 // Reuse the original query, but get rid of the restricting params
michael@0 1119 // and batch remaining records.
michael@0 1120 newitems.limit = 0;
michael@0 1121 newitems.newer = 0;
michael@0 1122 newitems.ids = fetchBatch.slice(0, batchSize);
michael@0 1123
michael@0 1124 // Reuse the existing record handler set earlier
michael@0 1125 let resp = newitems.get();
michael@0 1126 if (!resp.success) {
michael@0 1127 resp.failureCode = ENGINE_DOWNLOAD_FAIL;
michael@0 1128 throw resp;
michael@0 1129 }
michael@0 1130
michael@0 1131 // This batch was successfully applied. Not using
michael@0 1132 // doApplyBatchAndPersistFailed() here to avoid writing toFetch twice.
michael@0 1133 fetchBatch = fetchBatch.slice(batchSize);
michael@0 1134 this.toFetch = Utils.arraySub(this.toFetch, newitems.ids);
michael@0 1135 this.previousFailed = Utils.arrayUnion(this.previousFailed, failed);
michael@0 1136 if (failed.length) {
michael@0 1137 count.failed += failed.length;
michael@0 1138 this._log.debug("Records that failed to apply: " + failed);
michael@0 1139 }
michael@0 1140 failed = [];
michael@0 1141
michael@0 1142 if (aborting) {
michael@0 1143 throw aborting;
michael@0 1144 }
michael@0 1145
michael@0 1146 if (this.lastSync < this.lastModified) {
michael@0 1147 this.lastSync = this.lastModified;
michael@0 1148 }
michael@0 1149 }
michael@0 1150
michael@0 1151 // Apply remaining items.
michael@0 1152 doApplyBatchAndPersistFailed.call(this);
michael@0 1153
michael@0 1154 count.newFailed = Utils.arraySub(this.previousFailed, failedInPreviousSync).length;
michael@0 1155 count.succeeded = Math.max(0, count.applied - count.failed);
michael@0 1156 this._log.info(["Records:",
michael@0 1157 count.applied, "applied,",
michael@0 1158 count.succeeded, "successfully,",
michael@0 1159 count.failed, "failed to apply,",
michael@0 1160 count.newFailed, "newly failed to apply,",
michael@0 1161 count.reconciled, "reconciled."].join(" "));
michael@0 1162 Observers.notify("weave:engine:sync:applied", count, this.name);
michael@0 1163 },
michael@0 1164
michael@0 1165 /**
michael@0 1166 * Find a GUID of an item that is a duplicate of the incoming item but happens
michael@0 1167 * to have a different GUID
michael@0 1168 *
michael@0 1169 * @return GUID of the similar item; falsy otherwise
michael@0 1170 */
michael@0 1171 _findDupe: function (item) {
michael@0 1172 // By default, assume there's no dupe items for the engine
michael@0 1173 },
michael@0 1174
michael@0 1175 _deleteId: function (id) {
michael@0 1176 this._tracker.removeChangedID(id);
michael@0 1177
michael@0 1178 // Remember this id to delete at the end of sync
michael@0 1179 if (this._delete.ids == null)
michael@0 1180 this._delete.ids = [id];
michael@0 1181 else
michael@0 1182 this._delete.ids.push(id);
michael@0 1183 },
michael@0 1184
michael@0 1185 /**
michael@0 1186 * Reconcile incoming record with local state.
michael@0 1187 *
michael@0 1188 * This function essentially determines whether to apply an incoming record.
michael@0 1189 *
michael@0 1190 * @param item
michael@0 1191 * Record from server to be tested for application.
michael@0 1192 * @return boolean
michael@0 1193 * Truthy if incoming record should be applied. False if not.
michael@0 1194 */
michael@0 1195 _reconcile: function (item) {
michael@0 1196 if (this._log.level <= Log.Level.Trace) {
michael@0 1197 this._log.trace("Incoming: " + item);
michael@0 1198 }
michael@0 1199
michael@0 1200 // We start reconciling by collecting a bunch of state. We do this here
michael@0 1201 // because some state may change during the course of this function and we
michael@0 1202 // need to operate on the original values.
michael@0 1203 let existsLocally = this._store.itemExists(item.id);
michael@0 1204 let locallyModified = item.id in this._modified;
michael@0 1205
michael@0 1206 // TODO Handle clock drift better. Tracked in bug 721181.
michael@0 1207 let remoteAge = AsyncResource.serverTime - item.modified;
michael@0 1208 let localAge = locallyModified ?
michael@0 1209 (Date.now() / 1000 - this._modified[item.id]) : null;
michael@0 1210 let remoteIsNewer = remoteAge < localAge;
michael@0 1211
michael@0 1212 this._log.trace("Reconciling " + item.id + ". exists=" +
michael@0 1213 existsLocally + "; modified=" + locallyModified +
michael@0 1214 "; local age=" + localAge + "; incoming age=" +
michael@0 1215 remoteAge);
michael@0 1216
michael@0 1217 // We handle deletions first so subsequent logic doesn't have to check
michael@0 1218 // deleted flags.
michael@0 1219 if (item.deleted) {
michael@0 1220 // If the item doesn't exist locally, there is nothing for us to do. We
michael@0 1221 // can't check for duplicates because the incoming record has no data
michael@0 1222 // which can be used for duplicate detection.
michael@0 1223 if (!existsLocally) {
michael@0 1224 this._log.trace("Ignoring incoming item because it was deleted and " +
michael@0 1225 "the item does not exist locally.");
michael@0 1226 return false;
michael@0 1227 }
michael@0 1228
michael@0 1229 // We decide whether to process the deletion by comparing the record
michael@0 1230 // ages. If the item is not modified locally, the remote side wins and
michael@0 1231 // the deletion is processed. If it is modified locally, we take the
michael@0 1232 // newer record.
michael@0 1233 if (!locallyModified) {
michael@0 1234 this._log.trace("Applying incoming delete because the local item " +
michael@0 1235 "exists and isn't modified.");
michael@0 1236 return true;
michael@0 1237 }
michael@0 1238
michael@0 1239 // TODO As part of bug 720592, determine whether we should do more here.
michael@0 1240 // In the case where the local changes are newer, it is quite possible
michael@0 1241 // that the local client will restore data a remote client had tried to
michael@0 1242 // delete. There might be a good reason for that delete and it might be
michael@0 1243 // enexpected for this client to restore that data.
michael@0 1244 this._log.trace("Incoming record is deleted but we had local changes. " +
michael@0 1245 "Applying the youngest record.");
michael@0 1246 return remoteIsNewer;
michael@0 1247 }
michael@0 1248
michael@0 1249 // At this point the incoming record is not for a deletion and must have
michael@0 1250 // data. If the incoming record does not exist locally, we check for a local
michael@0 1251 // duplicate existing under a different ID. The default implementation of
michael@0 1252 // _findDupe() is empty, so engines have to opt in to this functionality.
michael@0 1253 //
michael@0 1254 // If we find a duplicate, we change the local ID to the incoming ID and we
michael@0 1255 // refresh the metadata collected above. See bug 710448 for the history
michael@0 1256 // of this logic.
michael@0 1257 if (!existsLocally) {
michael@0 1258 let dupeID = this._findDupe(item);
michael@0 1259 if (dupeID) {
michael@0 1260 this._log.trace("Local item " + dupeID + " is a duplicate for " +
michael@0 1261 "incoming item " + item.id);
michael@0 1262
michael@0 1263 // The local, duplicate ID is always deleted on the server.
michael@0 1264 this._deleteId(dupeID);
michael@0 1265
michael@0 1266 // The current API contract does not mandate that the ID returned by
michael@0 1267 // _findDupe() actually exists. Therefore, we have to perform this
michael@0 1268 // check.
michael@0 1269 existsLocally = this._store.itemExists(dupeID);
michael@0 1270
michael@0 1271 // We unconditionally change the item's ID in case the engine knows of
michael@0 1272 // an item but doesn't expose it through itemExists. If the API
michael@0 1273 // contract were stronger, this could be changed.
michael@0 1274 this._log.debug("Switching local ID to incoming: " + dupeID + " -> " +
michael@0 1275 item.id);
michael@0 1276 this._store.changeItemID(dupeID, item.id);
michael@0 1277
michael@0 1278 // If the local item was modified, we carry its metadata forward so
michael@0 1279 // appropriate reconciling can be performed.
michael@0 1280 if (dupeID in this._modified) {
michael@0 1281 locallyModified = true;
michael@0 1282 localAge = Date.now() / 1000 - this._modified[dupeID];
michael@0 1283 remoteIsNewer = remoteAge < localAge;
michael@0 1284
michael@0 1285 this._modified[item.id] = this._modified[dupeID];
michael@0 1286 delete this._modified[dupeID];
michael@0 1287 } else {
michael@0 1288 locallyModified = false;
michael@0 1289 localAge = null;
michael@0 1290 }
michael@0 1291
michael@0 1292 this._log.debug("Local item after duplication: age=" + localAge +
michael@0 1293 "; modified=" + locallyModified + "; exists=" +
michael@0 1294 existsLocally);
michael@0 1295 } else {
michael@0 1296 this._log.trace("No duplicate found for incoming item: " + item.id);
michael@0 1297 }
michael@0 1298 }
michael@0 1299
michael@0 1300 // At this point we've performed duplicate detection. But, nothing here
michael@0 1301 // should depend on duplicate detection as the above should have updated
michael@0 1302 // state seamlessly.
michael@0 1303
michael@0 1304 if (!existsLocally) {
michael@0 1305 // If the item doesn't exist locally and we have no local modifications
michael@0 1306 // to the item (implying that it was not deleted), always apply the remote
michael@0 1307 // item.
michael@0 1308 if (!locallyModified) {
michael@0 1309 this._log.trace("Applying incoming because local item does not exist " +
michael@0 1310 "and was not deleted.");
michael@0 1311 return true;
michael@0 1312 }
michael@0 1313
michael@0 1314 // If the item was modified locally but isn't present, it must have
michael@0 1315 // been deleted. If the incoming record is younger, we restore from
michael@0 1316 // that record.
michael@0 1317 if (remoteIsNewer) {
michael@0 1318 this._log.trace("Applying incoming because local item was deleted " +
michael@0 1319 "before the incoming item was changed.");
michael@0 1320 delete this._modified[item.id];
michael@0 1321 return true;
michael@0 1322 }
michael@0 1323
michael@0 1324 this._log.trace("Ignoring incoming item because the local item's " +
michael@0 1325 "deletion is newer.");
michael@0 1326 return false;
michael@0 1327 }
michael@0 1328
michael@0 1329 // If the remote and local records are the same, there is nothing to be
michael@0 1330 // done, so we don't do anything. In the ideal world, this logic wouldn't
michael@0 1331 // be here and the engine would take a record and apply it. The reason we
michael@0 1332 // want to defer this logic is because it would avoid a redundant and
michael@0 1333 // possibly expensive dip into the storage layer to query item state.
michael@0 1334 // This should get addressed in the async rewrite, so we ignore it for now.
michael@0 1335 let localRecord = this._createRecord(item.id);
michael@0 1336 let recordsEqual = Utils.deepEquals(item.cleartext,
michael@0 1337 localRecord.cleartext);
michael@0 1338
michael@0 1339 // If the records are the same, we don't need to do anything. This does
michael@0 1340 // potentially throw away a local modification time. But, if the records
michael@0 1341 // are the same, does it matter?
michael@0 1342 if (recordsEqual) {
michael@0 1343 this._log.trace("Ignoring incoming item because the local item is " +
michael@0 1344 "identical.");
michael@0 1345
michael@0 1346 delete this._modified[item.id];
michael@0 1347 return false;
michael@0 1348 }
michael@0 1349
michael@0 1350 // At this point the records are different.
michael@0 1351
michael@0 1352 // If we have no local modifications, always take the server record.
michael@0 1353 if (!locallyModified) {
michael@0 1354 this._log.trace("Applying incoming record because no local conflicts.");
michael@0 1355 return true;
michael@0 1356 }
michael@0 1357
michael@0 1358 // At this point, records are different and the local record is modified.
michael@0 1359 // We resolve conflicts by record age, where the newest one wins. This does
michael@0 1360 // result in data loss and should be handled by giving the engine an
michael@0 1361 // opportunity to merge the records. Bug 720592 tracks this feature.
michael@0 1362 this._log.warn("DATA LOSS: Both local and remote changes to record: " +
michael@0 1363 item.id);
michael@0 1364 return remoteIsNewer;
michael@0 1365 },
michael@0 1366
michael@0 1367 // Upload outgoing records.
michael@0 1368 _uploadOutgoing: function () {
michael@0 1369 this._log.trace("Uploading local changes to server.");
michael@0 1370
michael@0 1371 let modifiedIDs = Object.keys(this._modified);
michael@0 1372 if (modifiedIDs.length) {
michael@0 1373 this._log.trace("Preparing " + modifiedIDs.length +
michael@0 1374 " outgoing records");
michael@0 1375
michael@0 1376 // collection we'll upload
michael@0 1377 let up = new Collection(this.engineURL, null, this.service);
michael@0 1378 let count = 0;
michael@0 1379
michael@0 1380 // Upload what we've got so far in the collection
michael@0 1381 let doUpload = Utils.bind2(this, function(desc) {
michael@0 1382 this._log.info("Uploading " + desc + " of " + modifiedIDs.length +
michael@0 1383 " records");
michael@0 1384 let resp = up.post();
michael@0 1385 if (!resp.success) {
michael@0 1386 this._log.debug("Uploading records failed: " + resp);
michael@0 1387 resp.failureCode = ENGINE_UPLOAD_FAIL;
michael@0 1388 throw resp;
michael@0 1389 }
michael@0 1390
michael@0 1391 // Update server timestamp from the upload.
michael@0 1392 let modified = resp.headers["x-weave-timestamp"];
michael@0 1393 if (modified > this.lastSync)
michael@0 1394 this.lastSync = modified;
michael@0 1395
michael@0 1396 let failed_ids = Object.keys(resp.obj.failed);
michael@0 1397 if (failed_ids.length)
michael@0 1398 this._log.debug("Records that will be uploaded again because "
michael@0 1399 + "the server couldn't store them: "
michael@0 1400 + failed_ids.join(", "));
michael@0 1401
michael@0 1402 // Clear successfully uploaded objects.
michael@0 1403 for each (let id in resp.obj.success) {
michael@0 1404 delete this._modified[id];
michael@0 1405 }
michael@0 1406
michael@0 1407 up.clearRecords();
michael@0 1408 });
michael@0 1409
michael@0 1410 for each (let id in modifiedIDs) {
michael@0 1411 try {
michael@0 1412 let out = this._createRecord(id);
michael@0 1413 if (this._log.level <= Log.Level.Trace)
michael@0 1414 this._log.trace("Outgoing: " + out);
michael@0 1415
michael@0 1416 out.encrypt(this.service.collectionKeys.keyForCollection(this.name));
michael@0 1417 up.pushData(out);
michael@0 1418 }
michael@0 1419 catch(ex) {
michael@0 1420 this._log.warn("Error creating record: " + Utils.exceptionStr(ex));
michael@0 1421 }
michael@0 1422
michael@0 1423 // Partial upload
michael@0 1424 if ((++count % MAX_UPLOAD_RECORDS) == 0)
michael@0 1425 doUpload((count - MAX_UPLOAD_RECORDS) + " - " + count + " out");
michael@0 1426
michael@0 1427 this._store._sleep(0);
michael@0 1428 }
michael@0 1429
michael@0 1430 // Final upload
michael@0 1431 if (count % MAX_UPLOAD_RECORDS > 0)
michael@0 1432 doUpload(count >= MAX_UPLOAD_RECORDS ? "last batch" : "all");
michael@0 1433 }
michael@0 1434 },
michael@0 1435
michael@0 1436 // Any cleanup necessary.
michael@0 1437 // Save the current snapshot so as to calculate changes at next sync
michael@0 1438 _syncFinish: function () {
michael@0 1439 this._log.trace("Finishing up sync");
michael@0 1440 this._tracker.resetScore();
michael@0 1441
michael@0 1442 let doDelete = Utils.bind2(this, function(key, val) {
michael@0 1443 let coll = new Collection(this.engineURL, this._recordObj, this.service);
michael@0 1444 coll[key] = val;
michael@0 1445 coll.delete();
michael@0 1446 });
michael@0 1447
michael@0 1448 for (let [key, val] in Iterator(this._delete)) {
michael@0 1449 // Remove the key for future uses
michael@0 1450 delete this._delete[key];
michael@0 1451
michael@0 1452 // Send a simple delete for the property
michael@0 1453 if (key != "ids" || val.length <= 100)
michael@0 1454 doDelete(key, val);
michael@0 1455 else {
michael@0 1456 // For many ids, split into chunks of at most 100
michael@0 1457 while (val.length > 0) {
michael@0 1458 doDelete(key, val.slice(0, 100));
michael@0 1459 val = val.slice(100);
michael@0 1460 }
michael@0 1461 }
michael@0 1462 }
michael@0 1463 },
michael@0 1464
michael@0 1465 _syncCleanup: function () {
michael@0 1466 if (!this._modified) {
michael@0 1467 return;
michael@0 1468 }
michael@0 1469
michael@0 1470 // Mark failed WBOs as changed again so they are reuploaded next time.
michael@0 1471 for (let [id, when] in Iterator(this._modified)) {
michael@0 1472 this._tracker.addChangedID(id, when);
michael@0 1473 }
michael@0 1474 this._modified = {};
michael@0 1475 },
michael@0 1476
michael@0 1477 _sync: function () {
michael@0 1478 try {
michael@0 1479 this._syncStartup();
michael@0 1480 Observers.notify("weave:engine:sync:status", "process-incoming");
michael@0 1481 this._processIncoming();
michael@0 1482 Observers.notify("weave:engine:sync:status", "upload-outgoing");
michael@0 1483 this._uploadOutgoing();
michael@0 1484 this._syncFinish();
michael@0 1485 } finally {
michael@0 1486 this._syncCleanup();
michael@0 1487 }
michael@0 1488 },
michael@0 1489
michael@0 1490 canDecrypt: function () {
michael@0 1491 // Report failure even if there's nothing to decrypt
michael@0 1492 let canDecrypt = false;
michael@0 1493
michael@0 1494 // Fetch the most recently uploaded record and try to decrypt it
michael@0 1495 let test = new Collection(this.engineURL, this._recordObj, this.service);
michael@0 1496 test.limit = 1;
michael@0 1497 test.sort = "newest";
michael@0 1498 test.full = true;
michael@0 1499
michael@0 1500 let key = this.service.collectionKeys.keyForCollection(this.name);
michael@0 1501 test.recordHandler = function recordHandler(record) {
michael@0 1502 record.decrypt(key);
michael@0 1503 canDecrypt = true;
michael@0 1504 }.bind(this);
michael@0 1505
michael@0 1506 // Any failure fetching/decrypting will just result in false
michael@0 1507 try {
michael@0 1508 this._log.trace("Trying to decrypt a record from the server..");
michael@0 1509 test.get();
michael@0 1510 }
michael@0 1511 catch(ex) {
michael@0 1512 this._log.debug("Failed test decrypt: " + Utils.exceptionStr(ex));
michael@0 1513 }
michael@0 1514
michael@0 1515 return canDecrypt;
michael@0 1516 },
michael@0 1517
michael@0 1518 _resetClient: function () {
michael@0 1519 this.resetLastSync();
michael@0 1520 this.previousFailed = [];
michael@0 1521 this.toFetch = [];
michael@0 1522 },
michael@0 1523
michael@0 1524 wipeServer: function () {
michael@0 1525 let response = this.service.resource(this.engineURL).delete();
michael@0 1526 if (response.status != 200 && response.status != 404) {
michael@0 1527 throw response;
michael@0 1528 }
michael@0 1529 this._resetClient();
michael@0 1530 },
michael@0 1531
michael@0 1532 removeClientData: function () {
michael@0 1533 // Implement this method in engines that store client specific data
michael@0 1534 // on the server.
michael@0 1535 },
michael@0 1536
michael@0 1537 /*
michael@0 1538 * Decide on (and partially effect) an error-handling strategy.
michael@0 1539 *
michael@0 1540 * Asks the Service to respond to an HMAC error, which might result in keys
michael@0 1541 * being downloaded. That call returns true if an action which might allow a
michael@0 1542 * retry to occur.
michael@0 1543 *
michael@0 1544 * If `mayRetry` is truthy, and the Service suggests a retry,
michael@0 1545 * handleHMACMismatch returns kRecoveryStrategy.retry. Otherwise, it returns
michael@0 1546 * kRecoveryStrategy.error.
michael@0 1547 *
michael@0 1548 * Subclasses of SyncEngine can override this method to allow for different
michael@0 1549 * behavior -- e.g., to delete and ignore erroneous entries.
michael@0 1550 *
michael@0 1551 * All return values will be part of the kRecoveryStrategy enumeration.
michael@0 1552 */
michael@0 1553 handleHMACMismatch: function (item, mayRetry) {
michael@0 1554 // By default we either try again, or bail out noisily.
michael@0 1555 return (this.service.handleHMACEvent() && mayRetry) ?
michael@0 1556 SyncEngine.kRecoveryStrategy.retry :
michael@0 1557 SyncEngine.kRecoveryStrategy.error;
michael@0 1558 }
michael@0 1559 };

mercurial