1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 1.2 +++ b/services/sync/modules/engines.js Wed Dec 31 06:09:35 2014 +0100 1.3 @@ -0,0 +1,1559 @@ 1.4 +/* This Source Code Form is subject to the terms of the Mozilla Public 1.5 + * License, v. 2.0. If a copy of the MPL was not distributed with this 1.6 + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 1.7 + 1.8 +this.EXPORTED_SYMBOLS = [ 1.9 + "EngineManager", 1.10 + "Engine", 1.11 + "SyncEngine", 1.12 + "Tracker", 1.13 + "Store" 1.14 +]; 1.15 + 1.16 +const {classes: Cc, interfaces: Ci, results: Cr, utils: Cu} = Components; 1.17 + 1.18 +Cu.import("resource://services-common/async.js"); 1.19 +Cu.import("resource://gre/modules/Log.jsm"); 1.20 +Cu.import("resource://services-common/observers.js"); 1.21 +Cu.import("resource://services-common/utils.js"); 1.22 +Cu.import("resource://services-sync/constants.js"); 1.23 +Cu.import("resource://services-sync/identity.js"); 1.24 +Cu.import("resource://services-sync/record.js"); 1.25 +Cu.import("resource://services-sync/resource.js"); 1.26 +Cu.import("resource://services-sync/util.js"); 1.27 + 1.28 +/* 1.29 + * Trackers are associated with a single engine and deal with 1.30 + * listening for changes to their particular data type. 1.31 + * 1.32 + * There are two things they keep track of: 1.33 + * 1) A score, indicating how urgently the engine wants to sync 1.34 + * 2) A list of IDs for all the changed items that need to be synced 1.35 + * and updating their 'score', indicating how urgently they 1.36 + * want to sync. 1.37 + * 1.38 + */ 1.39 +this.Tracker = function Tracker(name, engine) { 1.40 + if (!engine) { 1.41 + throw new Error("Tracker must be associated with an Engine instance."); 1.42 + } 1.43 + 1.44 + name = name || "Unnamed"; 1.45 + this.name = this.file = name.toLowerCase(); 1.46 + this.engine = engine; 1.47 + 1.48 + this._log = Log.repository.getLogger("Sync.Tracker." + name); 1.49 + let level = Svc.Prefs.get("log.logger.engine." + this.name, "Debug"); 1.50 + this._log.level = Log.Level[level]; 1.51 + 1.52 + this._score = 0; 1.53 + this._ignored = []; 1.54 + this.ignoreAll = false; 1.55 + this.changedIDs = {}; 1.56 + this.loadChangedIDs(); 1.57 + 1.58 + Svc.Obs.add("weave:engine:start-tracking", this); 1.59 + Svc.Obs.add("weave:engine:stop-tracking", this); 1.60 +}; 1.61 + 1.62 +Tracker.prototype = { 1.63 + /* 1.64 + * Score can be called as often as desired to decide which engines to sync 1.65 + * 1.66 + * Valid values for score: 1.67 + * -1: Do not sync unless the user specifically requests it (almost disabled) 1.68 + * 0: Nothing has changed 1.69 + * 100: Please sync me ASAP! 1.70 + * 1.71 + * Setting it to other values should (but doesn't currently) throw an exception 1.72 + */ 1.73 + get score() { 1.74 + return this._score; 1.75 + }, 1.76 + 1.77 + set score(value) { 1.78 + this._score = value; 1.79 + Observers.notify("weave:engine:score:updated", this.name); 1.80 + }, 1.81 + 1.82 + // Should be called by service everytime a sync has been done for an engine 1.83 + resetScore: function () { 1.84 + this._score = 0; 1.85 + }, 1.86 + 1.87 + persistChangedIDs: true, 1.88 + 1.89 + /** 1.90 + * Persist changedIDs to disk at a later date. 1.91 + * Optionally pass a callback to be invoked when the write has occurred. 1.92 + */ 1.93 + saveChangedIDs: function (cb) { 1.94 + if (!this.persistChangedIDs) { 1.95 + this._log.debug("Not saving changedIDs."); 1.96 + return; 1.97 + } 1.98 + Utils.namedTimer(function () { 1.99 + this._log.debug("Saving changed IDs to " + this.file); 1.100 + Utils.jsonSave("changes/" + this.file, this, this.changedIDs, cb); 1.101 + }, 1000, this, "_lazySave"); 1.102 + }, 1.103 + 1.104 + loadChangedIDs: function (cb) { 1.105 + Utils.jsonLoad("changes/" + this.file, this, function(json) { 1.106 + if (json && (typeof(json) == "object")) { 1.107 + this.changedIDs = json; 1.108 + } else { 1.109 + this._log.warn("Changed IDs file " + this.file + " contains non-object value."); 1.110 + json = null; 1.111 + } 1.112 + if (cb) { 1.113 + cb.call(this, json); 1.114 + } 1.115 + }); 1.116 + }, 1.117 + 1.118 + // ignore/unignore specific IDs. Useful for ignoring items that are 1.119 + // being processed, or that shouldn't be synced. 1.120 + // But note: not persisted to disk 1.121 + 1.122 + ignoreID: function (id) { 1.123 + this.unignoreID(id); 1.124 + this._ignored.push(id); 1.125 + }, 1.126 + 1.127 + unignoreID: function (id) { 1.128 + let index = this._ignored.indexOf(id); 1.129 + if (index != -1) 1.130 + this._ignored.splice(index, 1); 1.131 + }, 1.132 + 1.133 + addChangedID: function (id, when) { 1.134 + if (!id) { 1.135 + this._log.warn("Attempted to add undefined ID to tracker"); 1.136 + return false; 1.137 + } 1.138 + 1.139 + if (this.ignoreAll || (id in this._ignored)) { 1.140 + return false; 1.141 + } 1.142 + 1.143 + // Default to the current time in seconds if no time is provided. 1.144 + if (when == null) { 1.145 + when = Math.floor(Date.now() / 1000); 1.146 + } 1.147 + 1.148 + // Add/update the entry if we have a newer time. 1.149 + if ((this.changedIDs[id] || -Infinity) < when) { 1.150 + this._log.trace("Adding changed ID: " + id + ", " + when); 1.151 + this.changedIDs[id] = when; 1.152 + this.saveChangedIDs(this.onSavedChangedIDs); 1.153 + } 1.154 + 1.155 + return true; 1.156 + }, 1.157 + 1.158 + removeChangedID: function (id) { 1.159 + if (!id) { 1.160 + this._log.warn("Attempted to remove undefined ID to tracker"); 1.161 + return false; 1.162 + } 1.163 + if (this.ignoreAll || (id in this._ignored)) 1.164 + return false; 1.165 + if (this.changedIDs[id] != null) { 1.166 + this._log.trace("Removing changed ID " + id); 1.167 + delete this.changedIDs[id]; 1.168 + this.saveChangedIDs(); 1.169 + } 1.170 + return true; 1.171 + }, 1.172 + 1.173 + clearChangedIDs: function () { 1.174 + this._log.trace("Clearing changed ID list"); 1.175 + this.changedIDs = {}; 1.176 + this.saveChangedIDs(); 1.177 + }, 1.178 + 1.179 + _isTracking: false, 1.180 + 1.181 + // Override these in your subclasses. 1.182 + startTracking: function () { 1.183 + }, 1.184 + 1.185 + stopTracking: function () { 1.186 + }, 1.187 + 1.188 + engineIsEnabled: function () { 1.189 + if (!this.engine) { 1.190 + // Can't tell -- we must be running in a test! 1.191 + return true; 1.192 + } 1.193 + return this.engine.enabled; 1.194 + }, 1.195 + 1.196 + onEngineEnabledChanged: function (engineEnabled) { 1.197 + if (engineEnabled == this._isTracking) { 1.198 + return; 1.199 + } 1.200 + 1.201 + if (engineEnabled) { 1.202 + this.startTracking(); 1.203 + this._isTracking = true; 1.204 + } else { 1.205 + this.stopTracking(); 1.206 + this._isTracking = false; 1.207 + this.clearChangedIDs(); 1.208 + } 1.209 + }, 1.210 + 1.211 + observe: function (subject, topic, data) { 1.212 + switch (topic) { 1.213 + case "weave:engine:start-tracking": 1.214 + if (!this.engineIsEnabled()) { 1.215 + return; 1.216 + } 1.217 + this._log.trace("Got start-tracking."); 1.218 + if (!this._isTracking) { 1.219 + this.startTracking(); 1.220 + this._isTracking = true; 1.221 + } 1.222 + return; 1.223 + case "weave:engine:stop-tracking": 1.224 + this._log.trace("Got stop-tracking."); 1.225 + if (this._isTracking) { 1.226 + this.stopTracking(); 1.227 + this._isTracking = false; 1.228 + } 1.229 + return; 1.230 + } 1.231 + } 1.232 +}; 1.233 + 1.234 + 1.235 + 1.236 +/** 1.237 + * The Store serves as the interface between Sync and stored data. 1.238 + * 1.239 + * The name "store" is slightly a misnomer because it doesn't actually "store" 1.240 + * anything. Instead, it serves as a gateway to something that actually does 1.241 + * the "storing." 1.242 + * 1.243 + * The store is responsible for record management inside an engine. It tells 1.244 + * Sync what items are available for Sync, converts items to and from Sync's 1.245 + * record format, and applies records from Sync into changes on the underlying 1.246 + * store. 1.247 + * 1.248 + * Store implementations require a number of functions to be implemented. These 1.249 + * are all documented below. 1.250 + * 1.251 + * For stores that deal with many records or which have expensive store access 1.252 + * routines, it is highly recommended to implement a custom applyIncomingBatch 1.253 + * and/or applyIncoming function on top of the basic APIs. 1.254 + */ 1.255 + 1.256 +this.Store = function Store(name, engine) { 1.257 + if (!engine) { 1.258 + throw new Error("Store must be associated with an Engine instance."); 1.259 + } 1.260 + 1.261 + name = name || "Unnamed"; 1.262 + this.name = name.toLowerCase(); 1.263 + this.engine = engine; 1.264 + 1.265 + this._log = Log.repository.getLogger("Sync.Store." + name); 1.266 + let level = Svc.Prefs.get("log.logger.engine." + this.name, "Debug"); 1.267 + this._log.level = Log.Level[level]; 1.268 + 1.269 + XPCOMUtils.defineLazyGetter(this, "_timer", function() { 1.270 + return Cc["@mozilla.org/timer;1"].createInstance(Ci.nsITimer); 1.271 + }); 1.272 +} 1.273 +Store.prototype = { 1.274 + 1.275 + _sleep: function _sleep(delay) { 1.276 + let cb = Async.makeSyncCallback(); 1.277 + this._timer.initWithCallback(cb, delay, Ci.nsITimer.TYPE_ONE_SHOT); 1.278 + Async.waitForSyncCallback(cb); 1.279 + }, 1.280 + 1.281 + /** 1.282 + * Apply multiple incoming records against the store. 1.283 + * 1.284 + * This is called with a set of incoming records to process. The function 1.285 + * should look at each record, reconcile with the current local state, and 1.286 + * make the local changes required to bring its state in alignment with the 1.287 + * record. 1.288 + * 1.289 + * The default implementation simply iterates over all records and calls 1.290 + * applyIncoming(). Store implementations may overwrite this function 1.291 + * if desired. 1.292 + * 1.293 + * @param records Array of records to apply 1.294 + * @return Array of record IDs which did not apply cleanly 1.295 + */ 1.296 + applyIncomingBatch: function (records) { 1.297 + let failed = []; 1.298 + for each (let record in records) { 1.299 + try { 1.300 + this.applyIncoming(record); 1.301 + } catch (ex if (ex.code == Engine.prototype.eEngineAbortApplyIncoming)) { 1.302 + // This kind of exception should have a 'cause' attribute, which is an 1.303 + // originating exception. 1.304 + // ex.cause will carry its stack with it when rethrown. 1.305 + throw ex.cause; 1.306 + } catch (ex) { 1.307 + this._log.warn("Failed to apply incoming record " + record.id); 1.308 + this._log.warn("Encountered exception: " + Utils.exceptionStr(ex)); 1.309 + failed.push(record.id); 1.310 + } 1.311 + }; 1.312 + return failed; 1.313 + }, 1.314 + 1.315 + /** 1.316 + * Apply a single record against the store. 1.317 + * 1.318 + * This takes a single record and makes the local changes required so the 1.319 + * local state matches what's in the record. 1.320 + * 1.321 + * The default implementation calls one of remove(), create(), or update() 1.322 + * depending on the state obtained from the store itself. Store 1.323 + * implementations may overwrite this function if desired. 1.324 + * 1.325 + * @param record 1.326 + * Record to apply 1.327 + */ 1.328 + applyIncoming: function (record) { 1.329 + if (record.deleted) 1.330 + this.remove(record); 1.331 + else if (!this.itemExists(record.id)) 1.332 + this.create(record); 1.333 + else 1.334 + this.update(record); 1.335 + }, 1.336 + 1.337 + // override these in derived objects 1.338 + 1.339 + /** 1.340 + * Create an item in the store from a record. 1.341 + * 1.342 + * This is called by the default implementation of applyIncoming(). If using 1.343 + * applyIncomingBatch(), this won't be called unless your store calls it. 1.344 + * 1.345 + * @param record 1.346 + * The store record to create an item from 1.347 + */ 1.348 + create: function (record) { 1.349 + throw "override create in a subclass"; 1.350 + }, 1.351 + 1.352 + /** 1.353 + * Remove an item in the store from a record. 1.354 + * 1.355 + * This is called by the default implementation of applyIncoming(). If using 1.356 + * applyIncomingBatch(), this won't be called unless your store calls it. 1.357 + * 1.358 + * @param record 1.359 + * The store record to delete an item from 1.360 + */ 1.361 + remove: function (record) { 1.362 + throw "override remove in a subclass"; 1.363 + }, 1.364 + 1.365 + /** 1.366 + * Update an item from a record. 1.367 + * 1.368 + * This is called by the default implementation of applyIncoming(). If using 1.369 + * applyIncomingBatch(), this won't be called unless your store calls it. 1.370 + * 1.371 + * @param record 1.372 + * The record to use to update an item from 1.373 + */ 1.374 + update: function (record) { 1.375 + throw "override update in a subclass"; 1.376 + }, 1.377 + 1.378 + /** 1.379 + * Determine whether a record with the specified ID exists. 1.380 + * 1.381 + * Takes a string record ID and returns a booleans saying whether the record 1.382 + * exists. 1.383 + * 1.384 + * @param id 1.385 + * string record ID 1.386 + * @return boolean indicating whether record exists locally 1.387 + */ 1.388 + itemExists: function (id) { 1.389 + throw "override itemExists in a subclass"; 1.390 + }, 1.391 + 1.392 + /** 1.393 + * Create a record from the specified ID. 1.394 + * 1.395 + * If the ID is known, the record should be populated with metadata from 1.396 + * the store. If the ID is not known, the record should be created with the 1.397 + * delete field set to true. 1.398 + * 1.399 + * @param id 1.400 + * string record ID 1.401 + * @param collection 1.402 + * Collection to add record to. This is typically passed into the 1.403 + * constructor for the newly-created record. 1.404 + * @return record type for this engine 1.405 + */ 1.406 + createRecord: function (id, collection) { 1.407 + throw "override createRecord in a subclass"; 1.408 + }, 1.409 + 1.410 + /** 1.411 + * Change the ID of a record. 1.412 + * 1.413 + * @param oldID 1.414 + * string old/current record ID 1.415 + * @param newID 1.416 + * string new record ID 1.417 + */ 1.418 + changeItemID: function (oldID, newID) { 1.419 + throw "override changeItemID in a subclass"; 1.420 + }, 1.421 + 1.422 + /** 1.423 + * Obtain the set of all known record IDs. 1.424 + * 1.425 + * @return Object with ID strings as keys and values of true. The values 1.426 + * are ignored. 1.427 + */ 1.428 + getAllIDs: function () { 1.429 + throw "override getAllIDs in a subclass"; 1.430 + }, 1.431 + 1.432 + /** 1.433 + * Wipe all data in the store. 1.434 + * 1.435 + * This function is called during remote wipes or when replacing local data 1.436 + * with remote data. 1.437 + * 1.438 + * This function should delete all local data that the store is managing. It 1.439 + * can be thought of as clearing out all state and restoring the "new 1.440 + * browser" state. 1.441 + */ 1.442 + wipe: function () { 1.443 + throw "override wipe in a subclass"; 1.444 + } 1.445 +}; 1.446 + 1.447 +this.EngineManager = function EngineManager(service) { 1.448 + this.service = service; 1.449 + 1.450 + this._engines = {}; 1.451 + 1.452 + // This will be populated by Service on startup. 1.453 + this._declined = new Set(); 1.454 + this._log = Log.repository.getLogger("Sync.EngineManager"); 1.455 + this._log.level = Log.Level[Svc.Prefs.get("log.logger.service.engines", "Debug")]; 1.456 +} 1.457 +EngineManager.prototype = { 1.458 + get: function (name) { 1.459 + // Return an array of engines if we have an array of names 1.460 + if (Array.isArray(name)) { 1.461 + let engines = []; 1.462 + name.forEach(function(name) { 1.463 + let engine = this.get(name); 1.464 + if (engine) { 1.465 + engines.push(engine); 1.466 + } 1.467 + }, this); 1.468 + return engines; 1.469 + } 1.470 + 1.471 + let engine = this._engines[name]; 1.472 + if (!engine) { 1.473 + this._log.debug("Could not get engine: " + name); 1.474 + if (Object.keys) { 1.475 + this._log.debug("Engines are: " + JSON.stringify(Object.keys(this._engines))); 1.476 + } 1.477 + } 1.478 + return engine; 1.479 + }, 1.480 + 1.481 + getAll: function () { 1.482 + return [engine for ([name, engine] in Iterator(this._engines))]; 1.483 + }, 1.484 + 1.485 + /** 1.486 + * N.B., does not pay attention to the declined list. 1.487 + */ 1.488 + getEnabled: function () { 1.489 + return this.getAll().filter(function(engine) engine.enabled); 1.490 + }, 1.491 + 1.492 + get enabledEngineNames() { 1.493 + return [e.name for each (e in this.getEnabled())]; 1.494 + }, 1.495 + 1.496 + persistDeclined: function () { 1.497 + Svc.Prefs.set("declinedEngines", [...this._declined].join(",")); 1.498 + }, 1.499 + 1.500 + /** 1.501 + * Returns an array. 1.502 + */ 1.503 + getDeclined: function () { 1.504 + return [...this._declined]; 1.505 + }, 1.506 + 1.507 + setDeclined: function (engines) { 1.508 + this._declined = new Set(engines); 1.509 + this.persistDeclined(); 1.510 + }, 1.511 + 1.512 + isDeclined: function (engineName) { 1.513 + return this._declined.has(engineName); 1.514 + }, 1.515 + 1.516 + /** 1.517 + * Accepts a Set or an array. 1.518 + */ 1.519 + decline: function (engines) { 1.520 + for (let e of engines) { 1.521 + this._declined.add(e); 1.522 + } 1.523 + this.persistDeclined(); 1.524 + }, 1.525 + 1.526 + undecline: function (engines) { 1.527 + for (let e of engines) { 1.528 + this._declined.delete(e); 1.529 + } 1.530 + this.persistDeclined(); 1.531 + }, 1.532 + 1.533 + /** 1.534 + * Mark any non-enabled engines as declined. 1.535 + * 1.536 + * This is useful after initial customization during setup. 1.537 + */ 1.538 + declineDisabled: function () { 1.539 + for (let e of this.getAll()) { 1.540 + if (!e.enabled) { 1.541 + this._log.debug("Declining disabled engine " + e.name); 1.542 + this._declined.add(e.name); 1.543 + } 1.544 + } 1.545 + this.persistDeclined(); 1.546 + }, 1.547 + 1.548 + /** 1.549 + * Register an Engine to the service. Alternatively, give an array of engine 1.550 + * objects to register. 1.551 + * 1.552 + * @param engineObject 1.553 + * Engine object used to get an instance of the engine 1.554 + * @return The engine object if anything failed 1.555 + */ 1.556 + register: function (engineObject) { 1.557 + if (Array.isArray(engineObject)) { 1.558 + return engineObject.map(this.register, this); 1.559 + } 1.560 + 1.561 + try { 1.562 + let engine = new engineObject(this.service); 1.563 + let name = engine.name; 1.564 + if (name in this._engines) { 1.565 + this._log.error("Engine '" + name + "' is already registered!"); 1.566 + } else { 1.567 + this._engines[name] = engine; 1.568 + } 1.569 + } catch (ex) { 1.570 + this._log.error(CommonUtils.exceptionStr(ex)); 1.571 + 1.572 + let mesg = ex.message ? ex.message : ex; 1.573 + let name = engineObject || ""; 1.574 + name = name.prototype || ""; 1.575 + name = name.name || ""; 1.576 + 1.577 + let out = "Could not initialize engine '" + name + "': " + mesg; 1.578 + this._log.error(out); 1.579 + 1.580 + return engineObject; 1.581 + } 1.582 + }, 1.583 + 1.584 + unregister: function (val) { 1.585 + let name = val; 1.586 + if (val instanceof Engine) { 1.587 + name = val.name; 1.588 + } 1.589 + delete this._engines[name]; 1.590 + }, 1.591 + 1.592 + clear: function () { 1.593 + for (let name in this._engines) { 1.594 + delete this._engines[name]; 1.595 + } 1.596 + }, 1.597 +}; 1.598 + 1.599 +this.Engine = function Engine(name, service) { 1.600 + if (!service) { 1.601 + throw new Error("Engine must be associated with a Service instance."); 1.602 + } 1.603 + 1.604 + this.Name = name || "Unnamed"; 1.605 + this.name = name.toLowerCase(); 1.606 + this.service = service; 1.607 + 1.608 + this._notify = Utils.notify("weave:engine:"); 1.609 + this._log = Log.repository.getLogger("Sync.Engine." + this.Name); 1.610 + let level = Svc.Prefs.get("log.logger.engine." + this.name, "Debug"); 1.611 + this._log.level = Log.Level[level]; 1.612 + 1.613 + this._tracker; // initialize tracker to load previously changed IDs 1.614 + this._log.debug("Engine initialized"); 1.615 +} 1.616 +Engine.prototype = { 1.617 + // _storeObj, and _trackerObj should to be overridden in subclasses 1.618 + _storeObj: Store, 1.619 + _trackerObj: Tracker, 1.620 + 1.621 + // Local 'constant'. 1.622 + // Signal to the engine that processing further records is pointless. 1.623 + eEngineAbortApplyIncoming: "error.engine.abort.applyincoming", 1.624 + 1.625 + get prefName() this.name, 1.626 + get enabled() { 1.627 + return Svc.Prefs.get("engine." + this.prefName, false); 1.628 + }, 1.629 + 1.630 + set enabled(val) { 1.631 + Svc.Prefs.set("engine." + this.prefName, !!val); 1.632 + this._tracker.onEngineEnabledChanged(val); 1.633 + }, 1.634 + 1.635 + get score() this._tracker.score, 1.636 + 1.637 + get _store() { 1.638 + let store = new this._storeObj(this.Name, this); 1.639 + this.__defineGetter__("_store", function() store); 1.640 + return store; 1.641 + }, 1.642 + 1.643 + get _tracker() { 1.644 + let tracker = new this._trackerObj(this.Name, this); 1.645 + this.__defineGetter__("_tracker", function() tracker); 1.646 + return tracker; 1.647 + }, 1.648 + 1.649 + sync: function () { 1.650 + if (!this.enabled) { 1.651 + return; 1.652 + } 1.653 + 1.654 + if (!this._sync) { 1.655 + throw "engine does not implement _sync method"; 1.656 + } 1.657 + 1.658 + this._notify("sync", this.name, this._sync)(); 1.659 + }, 1.660 + 1.661 + /** 1.662 + * Get rid of any local meta-data. 1.663 + */ 1.664 + resetClient: function () { 1.665 + if (!this._resetClient) { 1.666 + throw "engine does not implement _resetClient method"; 1.667 + } 1.668 + 1.669 + this._notify("reset-client", this.name, this._resetClient)(); 1.670 + }, 1.671 + 1.672 + _wipeClient: function () { 1.673 + this.resetClient(); 1.674 + this._log.debug("Deleting all local data"); 1.675 + this._tracker.ignoreAll = true; 1.676 + this._store.wipe(); 1.677 + this._tracker.ignoreAll = false; 1.678 + this._tracker.clearChangedIDs(); 1.679 + }, 1.680 + 1.681 + wipeClient: function () { 1.682 + this._notify("wipe-client", this.name, this._wipeClient)(); 1.683 + } 1.684 +}; 1.685 + 1.686 +this.SyncEngine = function SyncEngine(name, service) { 1.687 + Engine.call(this, name || "SyncEngine", service); 1.688 + 1.689 + this.loadToFetch(); 1.690 + this.loadPreviousFailed(); 1.691 +} 1.692 + 1.693 +// Enumeration to define approaches to handling bad records. 1.694 +// Attached to the constructor to allow use as a kind of static enumeration. 1.695 +SyncEngine.kRecoveryStrategy = { 1.696 + ignore: "ignore", 1.697 + retry: "retry", 1.698 + error: "error" 1.699 +}; 1.700 + 1.701 +SyncEngine.prototype = { 1.702 + __proto__: Engine.prototype, 1.703 + _recordObj: CryptoWrapper, 1.704 + version: 1, 1.705 + 1.706 + // How many records to pull in a single sync. This is primarily to avoid very 1.707 + // long first syncs against profiles with many history records. 1.708 + downloadLimit: null, 1.709 + 1.710 + // How many records to pull at one time when specifying IDs. This is to avoid 1.711 + // URI length limitations. 1.712 + guidFetchBatchSize: DEFAULT_GUID_FETCH_BATCH_SIZE, 1.713 + mobileGUIDFetchBatchSize: DEFAULT_MOBILE_GUID_FETCH_BATCH_SIZE, 1.714 + 1.715 + // How many records to process in a single batch. 1.716 + applyIncomingBatchSize: DEFAULT_STORE_BATCH_SIZE, 1.717 + 1.718 + get storageURL() this.service.storageURL, 1.719 + 1.720 + get engineURL() this.storageURL + this.name, 1.721 + 1.722 + get cryptoKeysURL() this.storageURL + "crypto/keys", 1.723 + 1.724 + get metaURL() this.storageURL + "meta/global", 1.725 + 1.726 + get syncID() { 1.727 + // Generate a random syncID if we don't have one 1.728 + let syncID = Svc.Prefs.get(this.name + ".syncID", ""); 1.729 + return syncID == "" ? this.syncID = Utils.makeGUID() : syncID; 1.730 + }, 1.731 + set syncID(value) { 1.732 + Svc.Prefs.set(this.name + ".syncID", value); 1.733 + }, 1.734 + 1.735 + /* 1.736 + * lastSync is a timestamp in server time. 1.737 + */ 1.738 + get lastSync() { 1.739 + return parseFloat(Svc.Prefs.get(this.name + ".lastSync", "0")); 1.740 + }, 1.741 + set lastSync(value) { 1.742 + // Reset the pref in-case it's a number instead of a string 1.743 + Svc.Prefs.reset(this.name + ".lastSync"); 1.744 + // Store the value as a string to keep floating point precision 1.745 + Svc.Prefs.set(this.name + ".lastSync", value.toString()); 1.746 + }, 1.747 + resetLastSync: function () { 1.748 + this._log.debug("Resetting " + this.name + " last sync time"); 1.749 + Svc.Prefs.reset(this.name + ".lastSync"); 1.750 + Svc.Prefs.set(this.name + ".lastSync", "0"); 1.751 + this.lastSyncLocal = 0; 1.752 + }, 1.753 + 1.754 + get toFetch() this._toFetch, 1.755 + set toFetch(val) { 1.756 + let cb = (error) => this._log.error(Utils.exceptionStr(error)); 1.757 + // Coerce the array to a string for more efficient comparison. 1.758 + if (val + "" == this._toFetch) { 1.759 + return; 1.760 + } 1.761 + this._toFetch = val; 1.762 + Utils.namedTimer(function () { 1.763 + Utils.jsonSave("toFetch/" + this.name, this, val, cb); 1.764 + }, 0, this, "_toFetchDelay"); 1.765 + }, 1.766 + 1.767 + loadToFetch: function () { 1.768 + // Initialize to empty if there's no file. 1.769 + this._toFetch = []; 1.770 + Utils.jsonLoad("toFetch/" + this.name, this, function(toFetch) { 1.771 + if (toFetch) { 1.772 + this._toFetch = toFetch; 1.773 + } 1.774 + }); 1.775 + }, 1.776 + 1.777 + get previousFailed() this._previousFailed, 1.778 + set previousFailed(val) { 1.779 + let cb = (error) => this._log.error(Utils.exceptionStr(error)); 1.780 + // Coerce the array to a string for more efficient comparison. 1.781 + if (val + "" == this._previousFailed) { 1.782 + return; 1.783 + } 1.784 + this._previousFailed = val; 1.785 + Utils.namedTimer(function () { 1.786 + Utils.jsonSave("failed/" + this.name, this, val, cb); 1.787 + }, 0, this, "_previousFailedDelay"); 1.788 + }, 1.789 + 1.790 + loadPreviousFailed: function () { 1.791 + // Initialize to empty if there's no file 1.792 + this._previousFailed = []; 1.793 + Utils.jsonLoad("failed/" + this.name, this, function(previousFailed) { 1.794 + if (previousFailed) { 1.795 + this._previousFailed = previousFailed; 1.796 + } 1.797 + }); 1.798 + }, 1.799 + 1.800 + /* 1.801 + * lastSyncLocal is a timestamp in local time. 1.802 + */ 1.803 + get lastSyncLocal() { 1.804 + return parseInt(Svc.Prefs.get(this.name + ".lastSyncLocal", "0"), 10); 1.805 + }, 1.806 + set lastSyncLocal(value) { 1.807 + // Store as a string because pref can only store C longs as numbers. 1.808 + Svc.Prefs.set(this.name + ".lastSyncLocal", value.toString()); 1.809 + }, 1.810 + 1.811 + /* 1.812 + * Returns a mapping of IDs -> changed timestamp. Engine implementations 1.813 + * can override this method to bypass the tracker for certain or all 1.814 + * changed items. 1.815 + */ 1.816 + getChangedIDs: function () { 1.817 + return this._tracker.changedIDs; 1.818 + }, 1.819 + 1.820 + // Create a new record using the store and add in crypto fields. 1.821 + _createRecord: function (id) { 1.822 + let record = this._store.createRecord(id, this.name); 1.823 + record.id = id; 1.824 + record.collection = this.name; 1.825 + return record; 1.826 + }, 1.827 + 1.828 + // Any setup that needs to happen at the beginning of each sync. 1.829 + _syncStartup: function () { 1.830 + 1.831 + // Determine if we need to wipe on outdated versions 1.832 + let metaGlobal = this.service.recordManager.get(this.metaURL); 1.833 + let engines = metaGlobal.payload.engines || {}; 1.834 + let engineData = engines[this.name] || {}; 1.835 + 1.836 + let needsWipe = false; 1.837 + 1.838 + // Assume missing versions are 0 and wipe the server 1.839 + if ((engineData.version || 0) < this.version) { 1.840 + this._log.debug("Old engine data: " + [engineData.version, this.version]); 1.841 + 1.842 + // Prepare to clear the server and upload everything 1.843 + needsWipe = true; 1.844 + this.syncID = ""; 1.845 + 1.846 + // Set the newer version and newly generated syncID 1.847 + engineData.version = this.version; 1.848 + engineData.syncID = this.syncID; 1.849 + 1.850 + // Put the new data back into meta/global and mark for upload 1.851 + engines[this.name] = engineData; 1.852 + metaGlobal.payload.engines = engines; 1.853 + metaGlobal.changed = true; 1.854 + } 1.855 + // Don't sync this engine if the server has newer data 1.856 + else if (engineData.version > this.version) { 1.857 + let error = new String("New data: " + [engineData.version, this.version]); 1.858 + error.failureCode = VERSION_OUT_OF_DATE; 1.859 + throw error; 1.860 + } 1.861 + // Changes to syncID mean we'll need to upload everything 1.862 + else if (engineData.syncID != this.syncID) { 1.863 + this._log.debug("Engine syncIDs: " + [engineData.syncID, this.syncID]); 1.864 + this.syncID = engineData.syncID; 1.865 + this._resetClient(); 1.866 + }; 1.867 + 1.868 + // Delete any existing data and reupload on bad version or missing meta. 1.869 + // No crypto component here...? We could regenerate per-collection keys... 1.870 + if (needsWipe) { 1.871 + this.wipeServer(); 1.872 + } 1.873 + 1.874 + // Save objects that need to be uploaded in this._modified. We also save 1.875 + // the timestamp of this fetch in this.lastSyncLocal. As we successfully 1.876 + // upload objects we remove them from this._modified. If an error occurs 1.877 + // or any objects fail to upload, they will remain in this._modified. At 1.878 + // the end of a sync, or after an error, we add all objects remaining in 1.879 + // this._modified to the tracker. 1.880 + this.lastSyncLocal = Date.now(); 1.881 + if (this.lastSync) { 1.882 + this._modified = this.getChangedIDs(); 1.883 + } else { 1.884 + // Mark all items to be uploaded, but treat them as changed from long ago 1.885 + this._log.debug("First sync, uploading all items"); 1.886 + this._modified = {}; 1.887 + for (let id in this._store.getAllIDs()) { 1.888 + this._modified[id] = 0; 1.889 + } 1.890 + } 1.891 + // Clear the tracker now. If the sync fails we'll add the ones we failed 1.892 + // to upload back. 1.893 + this._tracker.clearChangedIDs(); 1.894 + 1.895 + this._log.info(Object.keys(this._modified).length + 1.896 + " outgoing items pre-reconciliation"); 1.897 + 1.898 + // Keep track of what to delete at the end of sync 1.899 + this._delete = {}; 1.900 + }, 1.901 + 1.902 + /** 1.903 + * A tiny abstraction to make it easier to test incoming record 1.904 + * application. 1.905 + */ 1.906 + _itemSource: function () { 1.907 + return new Collection(this.engineURL, this._recordObj, this.service); 1.908 + }, 1.909 + 1.910 + /** 1.911 + * Process incoming records. 1.912 + * In the most awful and untestable way possible. 1.913 + * This now accepts something that makes testing vaguely less impossible. 1.914 + */ 1.915 + _processIncoming: function (newitems) { 1.916 + this._log.trace("Downloading & applying server changes"); 1.917 + 1.918 + // Figure out how many total items to fetch this sync; do less on mobile. 1.919 + let batchSize = this.downloadLimit || Infinity; 1.920 + let isMobile = (Svc.Prefs.get("client.type") == "mobile"); 1.921 + 1.922 + if (!newitems) { 1.923 + newitems = this._itemSource(); 1.924 + } 1.925 + 1.926 + if (isMobile) { 1.927 + batchSize = MOBILE_BATCH_SIZE; 1.928 + } 1.929 + newitems.newer = this.lastSync; 1.930 + newitems.full = true; 1.931 + newitems.limit = batchSize; 1.932 + 1.933 + // applied => number of items that should be applied. 1.934 + // failed => number of items that failed in this sync. 1.935 + // newFailed => number of items that failed for the first time in this sync. 1.936 + // reconciled => number of items that were reconciled. 1.937 + let count = {applied: 0, failed: 0, newFailed: 0, reconciled: 0}; 1.938 + let handled = []; 1.939 + let applyBatch = []; 1.940 + let failed = []; 1.941 + let failedInPreviousSync = this.previousFailed; 1.942 + let fetchBatch = Utils.arrayUnion(this.toFetch, failedInPreviousSync); 1.943 + // Reset previousFailed for each sync since previously failed items may not fail again. 1.944 + this.previousFailed = []; 1.945 + 1.946 + // Used (via exceptions) to allow the record handler/reconciliation/etc. 1.947 + // methods to signal that they would like processing of incoming records to 1.948 + // cease. 1.949 + let aborting = undefined; 1.950 + 1.951 + function doApplyBatch() { 1.952 + this._tracker.ignoreAll = true; 1.953 + try { 1.954 + failed = failed.concat(this._store.applyIncomingBatch(applyBatch)); 1.955 + } catch (ex) { 1.956 + // Catch any error that escapes from applyIncomingBatch. At present 1.957 + // those will all be abort events. 1.958 + this._log.warn("Got exception " + Utils.exceptionStr(ex) + 1.959 + ", aborting processIncoming."); 1.960 + aborting = ex; 1.961 + } 1.962 + this._tracker.ignoreAll = false; 1.963 + applyBatch = []; 1.964 + } 1.965 + 1.966 + function doApplyBatchAndPersistFailed() { 1.967 + // Apply remaining batch. 1.968 + if (applyBatch.length) { 1.969 + doApplyBatch.call(this); 1.970 + } 1.971 + // Persist failed items so we refetch them. 1.972 + if (failed.length) { 1.973 + this.previousFailed = Utils.arrayUnion(failed, this.previousFailed); 1.974 + count.failed += failed.length; 1.975 + this._log.debug("Records that failed to apply: " + failed); 1.976 + failed = []; 1.977 + } 1.978 + } 1.979 + 1.980 + let key = this.service.collectionKeys.keyForCollection(this.name); 1.981 + 1.982 + // Not binding this method to 'this' for performance reasons. It gets 1.983 + // called for every incoming record. 1.984 + let self = this; 1.985 + 1.986 + newitems.recordHandler = function(item) { 1.987 + if (aborting) { 1.988 + return; 1.989 + } 1.990 + 1.991 + // Grab a later last modified if possible 1.992 + if (self.lastModified == null || item.modified > self.lastModified) 1.993 + self.lastModified = item.modified; 1.994 + 1.995 + // Track the collection for the WBO. 1.996 + item.collection = self.name; 1.997 + 1.998 + // Remember which records were processed 1.999 + handled.push(item.id); 1.1000 + 1.1001 + try { 1.1002 + try { 1.1003 + item.decrypt(key); 1.1004 + } catch (ex if Utils.isHMACMismatch(ex)) { 1.1005 + let strategy = self.handleHMACMismatch(item, true); 1.1006 + if (strategy == SyncEngine.kRecoveryStrategy.retry) { 1.1007 + // You only get one retry. 1.1008 + try { 1.1009 + // Try decrypting again, typically because we've got new keys. 1.1010 + self._log.info("Trying decrypt again..."); 1.1011 + key = self.service.collectionKeys.keyForCollection(self.name); 1.1012 + item.decrypt(key); 1.1013 + strategy = null; 1.1014 + } catch (ex if Utils.isHMACMismatch(ex)) { 1.1015 + strategy = self.handleHMACMismatch(item, false); 1.1016 + } 1.1017 + } 1.1018 + 1.1019 + switch (strategy) { 1.1020 + case null: 1.1021 + // Retry succeeded! No further handling. 1.1022 + break; 1.1023 + case SyncEngine.kRecoveryStrategy.retry: 1.1024 + self._log.debug("Ignoring second retry suggestion."); 1.1025 + // Fall through to error case. 1.1026 + case SyncEngine.kRecoveryStrategy.error: 1.1027 + self._log.warn("Error decrypting record: " + Utils.exceptionStr(ex)); 1.1028 + failed.push(item.id); 1.1029 + return; 1.1030 + case SyncEngine.kRecoveryStrategy.ignore: 1.1031 + self._log.debug("Ignoring record " + item.id + 1.1032 + " with bad HMAC: already handled."); 1.1033 + return; 1.1034 + } 1.1035 + } 1.1036 + } catch (ex) { 1.1037 + self._log.warn("Error decrypting record: " + Utils.exceptionStr(ex)); 1.1038 + failed.push(item.id); 1.1039 + return; 1.1040 + } 1.1041 + 1.1042 + let shouldApply; 1.1043 + try { 1.1044 + shouldApply = self._reconcile(item); 1.1045 + } catch (ex if (ex.code == Engine.prototype.eEngineAbortApplyIncoming)) { 1.1046 + self._log.warn("Reconciliation failed: aborting incoming processing."); 1.1047 + failed.push(item.id); 1.1048 + aborting = ex.cause; 1.1049 + } catch (ex) { 1.1050 + self._log.warn("Failed to reconcile incoming record " + item.id); 1.1051 + self._log.warn("Encountered exception: " + Utils.exceptionStr(ex)); 1.1052 + failed.push(item.id); 1.1053 + return; 1.1054 + } 1.1055 + 1.1056 + if (shouldApply) { 1.1057 + count.applied++; 1.1058 + applyBatch.push(item); 1.1059 + } else { 1.1060 + count.reconciled++; 1.1061 + self._log.trace("Skipping reconciled incoming item " + item.id); 1.1062 + } 1.1063 + 1.1064 + if (applyBatch.length == self.applyIncomingBatchSize) { 1.1065 + doApplyBatch.call(self); 1.1066 + } 1.1067 + self._store._sleep(0); 1.1068 + }; 1.1069 + 1.1070 + // Only bother getting data from the server if there's new things 1.1071 + if (this.lastModified == null || this.lastModified > this.lastSync) { 1.1072 + let resp = newitems.get(); 1.1073 + doApplyBatchAndPersistFailed.call(this); 1.1074 + if (!resp.success) { 1.1075 + resp.failureCode = ENGINE_DOWNLOAD_FAIL; 1.1076 + throw resp; 1.1077 + } 1.1078 + 1.1079 + if (aborting) { 1.1080 + throw aborting; 1.1081 + } 1.1082 + } 1.1083 + 1.1084 + // Mobile: check if we got the maximum that we requested; get the rest if so. 1.1085 + if (handled.length == newitems.limit) { 1.1086 + let guidColl = new Collection(this.engineURL, null, this.service); 1.1087 + 1.1088 + // Sort and limit so that on mobile we only get the last X records. 1.1089 + guidColl.limit = this.downloadLimit; 1.1090 + guidColl.newer = this.lastSync; 1.1091 + 1.1092 + // index: Orders by the sortindex descending (highest weight first). 1.1093 + guidColl.sort = "index"; 1.1094 + 1.1095 + let guids = guidColl.get(); 1.1096 + if (!guids.success) 1.1097 + throw guids; 1.1098 + 1.1099 + // Figure out which guids weren't just fetched then remove any guids that 1.1100 + // were already waiting and prepend the new ones 1.1101 + let extra = Utils.arraySub(guids.obj, handled); 1.1102 + if (extra.length > 0) { 1.1103 + fetchBatch = Utils.arrayUnion(extra, fetchBatch); 1.1104 + this.toFetch = Utils.arrayUnion(extra, this.toFetch); 1.1105 + } 1.1106 + } 1.1107 + 1.1108 + // Fast-foward the lastSync timestamp since we have stored the 1.1109 + // remaining items in toFetch. 1.1110 + if (this.lastSync < this.lastModified) { 1.1111 + this.lastSync = this.lastModified; 1.1112 + } 1.1113 + 1.1114 + // Process any backlog of GUIDs. 1.1115 + // At this point we impose an upper limit on the number of items to fetch 1.1116 + // in a single request, even for desktop, to avoid hitting URI limits. 1.1117 + batchSize = isMobile ? this.mobileGUIDFetchBatchSize : 1.1118 + this.guidFetchBatchSize; 1.1119 + 1.1120 + while (fetchBatch.length && !aborting) { 1.1121 + // Reuse the original query, but get rid of the restricting params 1.1122 + // and batch remaining records. 1.1123 + newitems.limit = 0; 1.1124 + newitems.newer = 0; 1.1125 + newitems.ids = fetchBatch.slice(0, batchSize); 1.1126 + 1.1127 + // Reuse the existing record handler set earlier 1.1128 + let resp = newitems.get(); 1.1129 + if (!resp.success) { 1.1130 + resp.failureCode = ENGINE_DOWNLOAD_FAIL; 1.1131 + throw resp; 1.1132 + } 1.1133 + 1.1134 + // This batch was successfully applied. Not using 1.1135 + // doApplyBatchAndPersistFailed() here to avoid writing toFetch twice. 1.1136 + fetchBatch = fetchBatch.slice(batchSize); 1.1137 + this.toFetch = Utils.arraySub(this.toFetch, newitems.ids); 1.1138 + this.previousFailed = Utils.arrayUnion(this.previousFailed, failed); 1.1139 + if (failed.length) { 1.1140 + count.failed += failed.length; 1.1141 + this._log.debug("Records that failed to apply: " + failed); 1.1142 + } 1.1143 + failed = []; 1.1144 + 1.1145 + if (aborting) { 1.1146 + throw aborting; 1.1147 + } 1.1148 + 1.1149 + if (this.lastSync < this.lastModified) { 1.1150 + this.lastSync = this.lastModified; 1.1151 + } 1.1152 + } 1.1153 + 1.1154 + // Apply remaining items. 1.1155 + doApplyBatchAndPersistFailed.call(this); 1.1156 + 1.1157 + count.newFailed = Utils.arraySub(this.previousFailed, failedInPreviousSync).length; 1.1158 + count.succeeded = Math.max(0, count.applied - count.failed); 1.1159 + this._log.info(["Records:", 1.1160 + count.applied, "applied,", 1.1161 + count.succeeded, "successfully,", 1.1162 + count.failed, "failed to apply,", 1.1163 + count.newFailed, "newly failed to apply,", 1.1164 + count.reconciled, "reconciled."].join(" ")); 1.1165 + Observers.notify("weave:engine:sync:applied", count, this.name); 1.1166 + }, 1.1167 + 1.1168 + /** 1.1169 + * Find a GUID of an item that is a duplicate of the incoming item but happens 1.1170 + * to have a different GUID 1.1171 + * 1.1172 + * @return GUID of the similar item; falsy otherwise 1.1173 + */ 1.1174 + _findDupe: function (item) { 1.1175 + // By default, assume there's no dupe items for the engine 1.1176 + }, 1.1177 + 1.1178 + _deleteId: function (id) { 1.1179 + this._tracker.removeChangedID(id); 1.1180 + 1.1181 + // Remember this id to delete at the end of sync 1.1182 + if (this._delete.ids == null) 1.1183 + this._delete.ids = [id]; 1.1184 + else 1.1185 + this._delete.ids.push(id); 1.1186 + }, 1.1187 + 1.1188 + /** 1.1189 + * Reconcile incoming record with local state. 1.1190 + * 1.1191 + * This function essentially determines whether to apply an incoming record. 1.1192 + * 1.1193 + * @param item 1.1194 + * Record from server to be tested for application. 1.1195 + * @return boolean 1.1196 + * Truthy if incoming record should be applied. False if not. 1.1197 + */ 1.1198 + _reconcile: function (item) { 1.1199 + if (this._log.level <= Log.Level.Trace) { 1.1200 + this._log.trace("Incoming: " + item); 1.1201 + } 1.1202 + 1.1203 + // We start reconciling by collecting a bunch of state. We do this here 1.1204 + // because some state may change during the course of this function and we 1.1205 + // need to operate on the original values. 1.1206 + let existsLocally = this._store.itemExists(item.id); 1.1207 + let locallyModified = item.id in this._modified; 1.1208 + 1.1209 + // TODO Handle clock drift better. Tracked in bug 721181. 1.1210 + let remoteAge = AsyncResource.serverTime - item.modified; 1.1211 + let localAge = locallyModified ? 1.1212 + (Date.now() / 1000 - this._modified[item.id]) : null; 1.1213 + let remoteIsNewer = remoteAge < localAge; 1.1214 + 1.1215 + this._log.trace("Reconciling " + item.id + ". exists=" + 1.1216 + existsLocally + "; modified=" + locallyModified + 1.1217 + "; local age=" + localAge + "; incoming age=" + 1.1218 + remoteAge); 1.1219 + 1.1220 + // We handle deletions first so subsequent logic doesn't have to check 1.1221 + // deleted flags. 1.1222 + if (item.deleted) { 1.1223 + // If the item doesn't exist locally, there is nothing for us to do. We 1.1224 + // can't check for duplicates because the incoming record has no data 1.1225 + // which can be used for duplicate detection. 1.1226 + if (!existsLocally) { 1.1227 + this._log.trace("Ignoring incoming item because it was deleted and " + 1.1228 + "the item does not exist locally."); 1.1229 + return false; 1.1230 + } 1.1231 + 1.1232 + // We decide whether to process the deletion by comparing the record 1.1233 + // ages. If the item is not modified locally, the remote side wins and 1.1234 + // the deletion is processed. If it is modified locally, we take the 1.1235 + // newer record. 1.1236 + if (!locallyModified) { 1.1237 + this._log.trace("Applying incoming delete because the local item " + 1.1238 + "exists and isn't modified."); 1.1239 + return true; 1.1240 + } 1.1241 + 1.1242 + // TODO As part of bug 720592, determine whether we should do more here. 1.1243 + // In the case where the local changes are newer, it is quite possible 1.1244 + // that the local client will restore data a remote client had tried to 1.1245 + // delete. There might be a good reason for that delete and it might be 1.1246 + // enexpected for this client to restore that data. 1.1247 + this._log.trace("Incoming record is deleted but we had local changes. " + 1.1248 + "Applying the youngest record."); 1.1249 + return remoteIsNewer; 1.1250 + } 1.1251 + 1.1252 + // At this point the incoming record is not for a deletion and must have 1.1253 + // data. If the incoming record does not exist locally, we check for a local 1.1254 + // duplicate existing under a different ID. The default implementation of 1.1255 + // _findDupe() is empty, so engines have to opt in to this functionality. 1.1256 + // 1.1257 + // If we find a duplicate, we change the local ID to the incoming ID and we 1.1258 + // refresh the metadata collected above. See bug 710448 for the history 1.1259 + // of this logic. 1.1260 + if (!existsLocally) { 1.1261 + let dupeID = this._findDupe(item); 1.1262 + if (dupeID) { 1.1263 + this._log.trace("Local item " + dupeID + " is a duplicate for " + 1.1264 + "incoming item " + item.id); 1.1265 + 1.1266 + // The local, duplicate ID is always deleted on the server. 1.1267 + this._deleteId(dupeID); 1.1268 + 1.1269 + // The current API contract does not mandate that the ID returned by 1.1270 + // _findDupe() actually exists. Therefore, we have to perform this 1.1271 + // check. 1.1272 + existsLocally = this._store.itemExists(dupeID); 1.1273 + 1.1274 + // We unconditionally change the item's ID in case the engine knows of 1.1275 + // an item but doesn't expose it through itemExists. If the API 1.1276 + // contract were stronger, this could be changed. 1.1277 + this._log.debug("Switching local ID to incoming: " + dupeID + " -> " + 1.1278 + item.id); 1.1279 + this._store.changeItemID(dupeID, item.id); 1.1280 + 1.1281 + // If the local item was modified, we carry its metadata forward so 1.1282 + // appropriate reconciling can be performed. 1.1283 + if (dupeID in this._modified) { 1.1284 + locallyModified = true; 1.1285 + localAge = Date.now() / 1000 - this._modified[dupeID]; 1.1286 + remoteIsNewer = remoteAge < localAge; 1.1287 + 1.1288 + this._modified[item.id] = this._modified[dupeID]; 1.1289 + delete this._modified[dupeID]; 1.1290 + } else { 1.1291 + locallyModified = false; 1.1292 + localAge = null; 1.1293 + } 1.1294 + 1.1295 + this._log.debug("Local item after duplication: age=" + localAge + 1.1296 + "; modified=" + locallyModified + "; exists=" + 1.1297 + existsLocally); 1.1298 + } else { 1.1299 + this._log.trace("No duplicate found for incoming item: " + item.id); 1.1300 + } 1.1301 + } 1.1302 + 1.1303 + // At this point we've performed duplicate detection. But, nothing here 1.1304 + // should depend on duplicate detection as the above should have updated 1.1305 + // state seamlessly. 1.1306 + 1.1307 + if (!existsLocally) { 1.1308 + // If the item doesn't exist locally and we have no local modifications 1.1309 + // to the item (implying that it was not deleted), always apply the remote 1.1310 + // item. 1.1311 + if (!locallyModified) { 1.1312 + this._log.trace("Applying incoming because local item does not exist " + 1.1313 + "and was not deleted."); 1.1314 + return true; 1.1315 + } 1.1316 + 1.1317 + // If the item was modified locally but isn't present, it must have 1.1318 + // been deleted. If the incoming record is younger, we restore from 1.1319 + // that record. 1.1320 + if (remoteIsNewer) { 1.1321 + this._log.trace("Applying incoming because local item was deleted " + 1.1322 + "before the incoming item was changed."); 1.1323 + delete this._modified[item.id]; 1.1324 + return true; 1.1325 + } 1.1326 + 1.1327 + this._log.trace("Ignoring incoming item because the local item's " + 1.1328 + "deletion is newer."); 1.1329 + return false; 1.1330 + } 1.1331 + 1.1332 + // If the remote and local records are the same, there is nothing to be 1.1333 + // done, so we don't do anything. In the ideal world, this logic wouldn't 1.1334 + // be here and the engine would take a record and apply it. The reason we 1.1335 + // want to defer this logic is because it would avoid a redundant and 1.1336 + // possibly expensive dip into the storage layer to query item state. 1.1337 + // This should get addressed in the async rewrite, so we ignore it for now. 1.1338 + let localRecord = this._createRecord(item.id); 1.1339 + let recordsEqual = Utils.deepEquals(item.cleartext, 1.1340 + localRecord.cleartext); 1.1341 + 1.1342 + // If the records are the same, we don't need to do anything. This does 1.1343 + // potentially throw away a local modification time. But, if the records 1.1344 + // are the same, does it matter? 1.1345 + if (recordsEqual) { 1.1346 + this._log.trace("Ignoring incoming item because the local item is " + 1.1347 + "identical."); 1.1348 + 1.1349 + delete this._modified[item.id]; 1.1350 + return false; 1.1351 + } 1.1352 + 1.1353 + // At this point the records are different. 1.1354 + 1.1355 + // If we have no local modifications, always take the server record. 1.1356 + if (!locallyModified) { 1.1357 + this._log.trace("Applying incoming record because no local conflicts."); 1.1358 + return true; 1.1359 + } 1.1360 + 1.1361 + // At this point, records are different and the local record is modified. 1.1362 + // We resolve conflicts by record age, where the newest one wins. This does 1.1363 + // result in data loss and should be handled by giving the engine an 1.1364 + // opportunity to merge the records. Bug 720592 tracks this feature. 1.1365 + this._log.warn("DATA LOSS: Both local and remote changes to record: " + 1.1366 + item.id); 1.1367 + return remoteIsNewer; 1.1368 + }, 1.1369 + 1.1370 + // Upload outgoing records. 1.1371 + _uploadOutgoing: function () { 1.1372 + this._log.trace("Uploading local changes to server."); 1.1373 + 1.1374 + let modifiedIDs = Object.keys(this._modified); 1.1375 + if (modifiedIDs.length) { 1.1376 + this._log.trace("Preparing " + modifiedIDs.length + 1.1377 + " outgoing records"); 1.1378 + 1.1379 + // collection we'll upload 1.1380 + let up = new Collection(this.engineURL, null, this.service); 1.1381 + let count = 0; 1.1382 + 1.1383 + // Upload what we've got so far in the collection 1.1384 + let doUpload = Utils.bind2(this, function(desc) { 1.1385 + this._log.info("Uploading " + desc + " of " + modifiedIDs.length + 1.1386 + " records"); 1.1387 + let resp = up.post(); 1.1388 + if (!resp.success) { 1.1389 + this._log.debug("Uploading records failed: " + resp); 1.1390 + resp.failureCode = ENGINE_UPLOAD_FAIL; 1.1391 + throw resp; 1.1392 + } 1.1393 + 1.1394 + // Update server timestamp from the upload. 1.1395 + let modified = resp.headers["x-weave-timestamp"]; 1.1396 + if (modified > this.lastSync) 1.1397 + this.lastSync = modified; 1.1398 + 1.1399 + let failed_ids = Object.keys(resp.obj.failed); 1.1400 + if (failed_ids.length) 1.1401 + this._log.debug("Records that will be uploaded again because " 1.1402 + + "the server couldn't store them: " 1.1403 + + failed_ids.join(", ")); 1.1404 + 1.1405 + // Clear successfully uploaded objects. 1.1406 + for each (let id in resp.obj.success) { 1.1407 + delete this._modified[id]; 1.1408 + } 1.1409 + 1.1410 + up.clearRecords(); 1.1411 + }); 1.1412 + 1.1413 + for each (let id in modifiedIDs) { 1.1414 + try { 1.1415 + let out = this._createRecord(id); 1.1416 + if (this._log.level <= Log.Level.Trace) 1.1417 + this._log.trace("Outgoing: " + out); 1.1418 + 1.1419 + out.encrypt(this.service.collectionKeys.keyForCollection(this.name)); 1.1420 + up.pushData(out); 1.1421 + } 1.1422 + catch(ex) { 1.1423 + this._log.warn("Error creating record: " + Utils.exceptionStr(ex)); 1.1424 + } 1.1425 + 1.1426 + // Partial upload 1.1427 + if ((++count % MAX_UPLOAD_RECORDS) == 0) 1.1428 + doUpload((count - MAX_UPLOAD_RECORDS) + " - " + count + " out"); 1.1429 + 1.1430 + this._store._sleep(0); 1.1431 + } 1.1432 + 1.1433 + // Final upload 1.1434 + if (count % MAX_UPLOAD_RECORDS > 0) 1.1435 + doUpload(count >= MAX_UPLOAD_RECORDS ? "last batch" : "all"); 1.1436 + } 1.1437 + }, 1.1438 + 1.1439 + // Any cleanup necessary. 1.1440 + // Save the current snapshot so as to calculate changes at next sync 1.1441 + _syncFinish: function () { 1.1442 + this._log.trace("Finishing up sync"); 1.1443 + this._tracker.resetScore(); 1.1444 + 1.1445 + let doDelete = Utils.bind2(this, function(key, val) { 1.1446 + let coll = new Collection(this.engineURL, this._recordObj, this.service); 1.1447 + coll[key] = val; 1.1448 + coll.delete(); 1.1449 + }); 1.1450 + 1.1451 + for (let [key, val] in Iterator(this._delete)) { 1.1452 + // Remove the key for future uses 1.1453 + delete this._delete[key]; 1.1454 + 1.1455 + // Send a simple delete for the property 1.1456 + if (key != "ids" || val.length <= 100) 1.1457 + doDelete(key, val); 1.1458 + else { 1.1459 + // For many ids, split into chunks of at most 100 1.1460 + while (val.length > 0) { 1.1461 + doDelete(key, val.slice(0, 100)); 1.1462 + val = val.slice(100); 1.1463 + } 1.1464 + } 1.1465 + } 1.1466 + }, 1.1467 + 1.1468 + _syncCleanup: function () { 1.1469 + if (!this._modified) { 1.1470 + return; 1.1471 + } 1.1472 + 1.1473 + // Mark failed WBOs as changed again so they are reuploaded next time. 1.1474 + for (let [id, when] in Iterator(this._modified)) { 1.1475 + this._tracker.addChangedID(id, when); 1.1476 + } 1.1477 + this._modified = {}; 1.1478 + }, 1.1479 + 1.1480 + _sync: function () { 1.1481 + try { 1.1482 + this._syncStartup(); 1.1483 + Observers.notify("weave:engine:sync:status", "process-incoming"); 1.1484 + this._processIncoming(); 1.1485 + Observers.notify("weave:engine:sync:status", "upload-outgoing"); 1.1486 + this._uploadOutgoing(); 1.1487 + this._syncFinish(); 1.1488 + } finally { 1.1489 + this._syncCleanup(); 1.1490 + } 1.1491 + }, 1.1492 + 1.1493 + canDecrypt: function () { 1.1494 + // Report failure even if there's nothing to decrypt 1.1495 + let canDecrypt = false; 1.1496 + 1.1497 + // Fetch the most recently uploaded record and try to decrypt it 1.1498 + let test = new Collection(this.engineURL, this._recordObj, this.service); 1.1499 + test.limit = 1; 1.1500 + test.sort = "newest"; 1.1501 + test.full = true; 1.1502 + 1.1503 + let key = this.service.collectionKeys.keyForCollection(this.name); 1.1504 + test.recordHandler = function recordHandler(record) { 1.1505 + record.decrypt(key); 1.1506 + canDecrypt = true; 1.1507 + }.bind(this); 1.1508 + 1.1509 + // Any failure fetching/decrypting will just result in false 1.1510 + try { 1.1511 + this._log.trace("Trying to decrypt a record from the server.."); 1.1512 + test.get(); 1.1513 + } 1.1514 + catch(ex) { 1.1515 + this._log.debug("Failed test decrypt: " + Utils.exceptionStr(ex)); 1.1516 + } 1.1517 + 1.1518 + return canDecrypt; 1.1519 + }, 1.1520 + 1.1521 + _resetClient: function () { 1.1522 + this.resetLastSync(); 1.1523 + this.previousFailed = []; 1.1524 + this.toFetch = []; 1.1525 + }, 1.1526 + 1.1527 + wipeServer: function () { 1.1528 + let response = this.service.resource(this.engineURL).delete(); 1.1529 + if (response.status != 200 && response.status != 404) { 1.1530 + throw response; 1.1531 + } 1.1532 + this._resetClient(); 1.1533 + }, 1.1534 + 1.1535 + removeClientData: function () { 1.1536 + // Implement this method in engines that store client specific data 1.1537 + // on the server. 1.1538 + }, 1.1539 + 1.1540 + /* 1.1541 + * Decide on (and partially effect) an error-handling strategy. 1.1542 + * 1.1543 + * Asks the Service to respond to an HMAC error, which might result in keys 1.1544 + * being downloaded. That call returns true if an action which might allow a 1.1545 + * retry to occur. 1.1546 + * 1.1547 + * If `mayRetry` is truthy, and the Service suggests a retry, 1.1548 + * handleHMACMismatch returns kRecoveryStrategy.retry. Otherwise, it returns 1.1549 + * kRecoveryStrategy.error. 1.1550 + * 1.1551 + * Subclasses of SyncEngine can override this method to allow for different 1.1552 + * behavior -- e.g., to delete and ignore erroneous entries. 1.1553 + * 1.1554 + * All return values will be part of the kRecoveryStrategy enumeration. 1.1555 + */ 1.1556 + handleHMACMismatch: function (item, mayRetry) { 1.1557 + // By default we either try again, or bail out noisily. 1.1558 + return (this.service.handleHMACEvent() && mayRetry) ? 1.1559 + SyncEngine.kRecoveryStrategy.retry : 1.1560 + SyncEngine.kRecoveryStrategy.error; 1.1561 + } 1.1562 +};