Thu, 22 Jan 2015 13:21:57 +0100
Incorporate requested changes from Mozilla in review:
https://bugzilla.mozilla.org/show_bug.cgi?id=1123480#c6
1 /* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
5 this.EXPORTED_SYMBOLS = [
6 "EngineManager",
7 "Engine",
8 "SyncEngine",
9 "Tracker",
10 "Store"
11 ];
13 const {classes: Cc, interfaces: Ci, results: Cr, utils: Cu} = Components;
15 Cu.import("resource://services-common/async.js");
16 Cu.import("resource://gre/modules/Log.jsm");
17 Cu.import("resource://services-common/observers.js");
18 Cu.import("resource://services-common/utils.js");
19 Cu.import("resource://services-sync/constants.js");
20 Cu.import("resource://services-sync/identity.js");
21 Cu.import("resource://services-sync/record.js");
22 Cu.import("resource://services-sync/resource.js");
23 Cu.import("resource://services-sync/util.js");
25 /*
26 * Trackers are associated with a single engine and deal with
27 * listening for changes to their particular data type.
28 *
29 * There are two things they keep track of:
30 * 1) A score, indicating how urgently the engine wants to sync
31 * 2) A list of IDs for all the changed items that need to be synced
32 * and updating their 'score', indicating how urgently they
33 * want to sync.
34 *
35 */
36 this.Tracker = function Tracker(name, engine) {
37 if (!engine) {
38 throw new Error("Tracker must be associated with an Engine instance.");
39 }
41 name = name || "Unnamed";
42 this.name = this.file = name.toLowerCase();
43 this.engine = engine;
45 this._log = Log.repository.getLogger("Sync.Tracker." + name);
46 let level = Svc.Prefs.get("log.logger.engine." + this.name, "Debug");
47 this._log.level = Log.Level[level];
49 this._score = 0;
50 this._ignored = [];
51 this.ignoreAll = false;
52 this.changedIDs = {};
53 this.loadChangedIDs();
55 Svc.Obs.add("weave:engine:start-tracking", this);
56 Svc.Obs.add("weave:engine:stop-tracking", this);
57 };
59 Tracker.prototype = {
60 /*
61 * Score can be called as often as desired to decide which engines to sync
62 *
63 * Valid values for score:
64 * -1: Do not sync unless the user specifically requests it (almost disabled)
65 * 0: Nothing has changed
66 * 100: Please sync me ASAP!
67 *
68 * Setting it to other values should (but doesn't currently) throw an exception
69 */
70 get score() {
71 return this._score;
72 },
74 set score(value) {
75 this._score = value;
76 Observers.notify("weave:engine:score:updated", this.name);
77 },
79 // Should be called by service everytime a sync has been done for an engine
80 resetScore: function () {
81 this._score = 0;
82 },
84 persistChangedIDs: true,
86 /**
87 * Persist changedIDs to disk at a later date.
88 * Optionally pass a callback to be invoked when the write has occurred.
89 */
90 saveChangedIDs: function (cb) {
91 if (!this.persistChangedIDs) {
92 this._log.debug("Not saving changedIDs.");
93 return;
94 }
95 Utils.namedTimer(function () {
96 this._log.debug("Saving changed IDs to " + this.file);
97 Utils.jsonSave("changes/" + this.file, this, this.changedIDs, cb);
98 }, 1000, this, "_lazySave");
99 },
101 loadChangedIDs: function (cb) {
102 Utils.jsonLoad("changes/" + this.file, this, function(json) {
103 if (json && (typeof(json) == "object")) {
104 this.changedIDs = json;
105 } else {
106 this._log.warn("Changed IDs file " + this.file + " contains non-object value.");
107 json = null;
108 }
109 if (cb) {
110 cb.call(this, json);
111 }
112 });
113 },
115 // ignore/unignore specific IDs. Useful for ignoring items that are
116 // being processed, or that shouldn't be synced.
117 // But note: not persisted to disk
119 ignoreID: function (id) {
120 this.unignoreID(id);
121 this._ignored.push(id);
122 },
124 unignoreID: function (id) {
125 let index = this._ignored.indexOf(id);
126 if (index != -1)
127 this._ignored.splice(index, 1);
128 },
130 addChangedID: function (id, when) {
131 if (!id) {
132 this._log.warn("Attempted to add undefined ID to tracker");
133 return false;
134 }
136 if (this.ignoreAll || (id in this._ignored)) {
137 return false;
138 }
140 // Default to the current time in seconds if no time is provided.
141 if (when == null) {
142 when = Math.floor(Date.now() / 1000);
143 }
145 // Add/update the entry if we have a newer time.
146 if ((this.changedIDs[id] || -Infinity) < when) {
147 this._log.trace("Adding changed ID: " + id + ", " + when);
148 this.changedIDs[id] = when;
149 this.saveChangedIDs(this.onSavedChangedIDs);
150 }
152 return true;
153 },
155 removeChangedID: function (id) {
156 if (!id) {
157 this._log.warn("Attempted to remove undefined ID to tracker");
158 return false;
159 }
160 if (this.ignoreAll || (id in this._ignored))
161 return false;
162 if (this.changedIDs[id] != null) {
163 this._log.trace("Removing changed ID " + id);
164 delete this.changedIDs[id];
165 this.saveChangedIDs();
166 }
167 return true;
168 },
170 clearChangedIDs: function () {
171 this._log.trace("Clearing changed ID list");
172 this.changedIDs = {};
173 this.saveChangedIDs();
174 },
176 _isTracking: false,
178 // Override these in your subclasses.
179 startTracking: function () {
180 },
182 stopTracking: function () {
183 },
185 engineIsEnabled: function () {
186 if (!this.engine) {
187 // Can't tell -- we must be running in a test!
188 return true;
189 }
190 return this.engine.enabled;
191 },
193 onEngineEnabledChanged: function (engineEnabled) {
194 if (engineEnabled == this._isTracking) {
195 return;
196 }
198 if (engineEnabled) {
199 this.startTracking();
200 this._isTracking = true;
201 } else {
202 this.stopTracking();
203 this._isTracking = false;
204 this.clearChangedIDs();
205 }
206 },
208 observe: function (subject, topic, data) {
209 switch (topic) {
210 case "weave:engine:start-tracking":
211 if (!this.engineIsEnabled()) {
212 return;
213 }
214 this._log.trace("Got start-tracking.");
215 if (!this._isTracking) {
216 this.startTracking();
217 this._isTracking = true;
218 }
219 return;
220 case "weave:engine:stop-tracking":
221 this._log.trace("Got stop-tracking.");
222 if (this._isTracking) {
223 this.stopTracking();
224 this._isTracking = false;
225 }
226 return;
227 }
228 }
229 };
233 /**
234 * The Store serves as the interface between Sync and stored data.
235 *
236 * The name "store" is slightly a misnomer because it doesn't actually "store"
237 * anything. Instead, it serves as a gateway to something that actually does
238 * the "storing."
239 *
240 * The store is responsible for record management inside an engine. It tells
241 * Sync what items are available for Sync, converts items to and from Sync's
242 * record format, and applies records from Sync into changes on the underlying
243 * store.
244 *
245 * Store implementations require a number of functions to be implemented. These
246 * are all documented below.
247 *
248 * For stores that deal with many records or which have expensive store access
249 * routines, it is highly recommended to implement a custom applyIncomingBatch
250 * and/or applyIncoming function on top of the basic APIs.
251 */
253 this.Store = function Store(name, engine) {
254 if (!engine) {
255 throw new Error("Store must be associated with an Engine instance.");
256 }
258 name = name || "Unnamed";
259 this.name = name.toLowerCase();
260 this.engine = engine;
262 this._log = Log.repository.getLogger("Sync.Store." + name);
263 let level = Svc.Prefs.get("log.logger.engine." + this.name, "Debug");
264 this._log.level = Log.Level[level];
266 XPCOMUtils.defineLazyGetter(this, "_timer", function() {
267 return Cc["@mozilla.org/timer;1"].createInstance(Ci.nsITimer);
268 });
269 }
270 Store.prototype = {
272 _sleep: function _sleep(delay) {
273 let cb = Async.makeSyncCallback();
274 this._timer.initWithCallback(cb, delay, Ci.nsITimer.TYPE_ONE_SHOT);
275 Async.waitForSyncCallback(cb);
276 },
278 /**
279 * Apply multiple incoming records against the store.
280 *
281 * This is called with a set of incoming records to process. The function
282 * should look at each record, reconcile with the current local state, and
283 * make the local changes required to bring its state in alignment with the
284 * record.
285 *
286 * The default implementation simply iterates over all records and calls
287 * applyIncoming(). Store implementations may overwrite this function
288 * if desired.
289 *
290 * @param records Array of records to apply
291 * @return Array of record IDs which did not apply cleanly
292 */
293 applyIncomingBatch: function (records) {
294 let failed = [];
295 for each (let record in records) {
296 try {
297 this.applyIncoming(record);
298 } catch (ex if (ex.code == Engine.prototype.eEngineAbortApplyIncoming)) {
299 // This kind of exception should have a 'cause' attribute, which is an
300 // originating exception.
301 // ex.cause will carry its stack with it when rethrown.
302 throw ex.cause;
303 } catch (ex) {
304 this._log.warn("Failed to apply incoming record " + record.id);
305 this._log.warn("Encountered exception: " + Utils.exceptionStr(ex));
306 failed.push(record.id);
307 }
308 };
309 return failed;
310 },
312 /**
313 * Apply a single record against the store.
314 *
315 * This takes a single record and makes the local changes required so the
316 * local state matches what's in the record.
317 *
318 * The default implementation calls one of remove(), create(), or update()
319 * depending on the state obtained from the store itself. Store
320 * implementations may overwrite this function if desired.
321 *
322 * @param record
323 * Record to apply
324 */
325 applyIncoming: function (record) {
326 if (record.deleted)
327 this.remove(record);
328 else if (!this.itemExists(record.id))
329 this.create(record);
330 else
331 this.update(record);
332 },
334 // override these in derived objects
336 /**
337 * Create an item in the store from a record.
338 *
339 * This is called by the default implementation of applyIncoming(). If using
340 * applyIncomingBatch(), this won't be called unless your store calls it.
341 *
342 * @param record
343 * The store record to create an item from
344 */
345 create: function (record) {
346 throw "override create in a subclass";
347 },
349 /**
350 * Remove an item in the store from a record.
351 *
352 * This is called by the default implementation of applyIncoming(). If using
353 * applyIncomingBatch(), this won't be called unless your store calls it.
354 *
355 * @param record
356 * The store record to delete an item from
357 */
358 remove: function (record) {
359 throw "override remove in a subclass";
360 },
362 /**
363 * Update an item from a record.
364 *
365 * This is called by the default implementation of applyIncoming(). If using
366 * applyIncomingBatch(), this won't be called unless your store calls it.
367 *
368 * @param record
369 * The record to use to update an item from
370 */
371 update: function (record) {
372 throw "override update in a subclass";
373 },
375 /**
376 * Determine whether a record with the specified ID exists.
377 *
378 * Takes a string record ID and returns a booleans saying whether the record
379 * exists.
380 *
381 * @param id
382 * string record ID
383 * @return boolean indicating whether record exists locally
384 */
385 itemExists: function (id) {
386 throw "override itemExists in a subclass";
387 },
389 /**
390 * Create a record from the specified ID.
391 *
392 * If the ID is known, the record should be populated with metadata from
393 * the store. If the ID is not known, the record should be created with the
394 * delete field set to true.
395 *
396 * @param id
397 * string record ID
398 * @param collection
399 * Collection to add record to. This is typically passed into the
400 * constructor for the newly-created record.
401 * @return record type for this engine
402 */
403 createRecord: function (id, collection) {
404 throw "override createRecord in a subclass";
405 },
407 /**
408 * Change the ID of a record.
409 *
410 * @param oldID
411 * string old/current record ID
412 * @param newID
413 * string new record ID
414 */
415 changeItemID: function (oldID, newID) {
416 throw "override changeItemID in a subclass";
417 },
419 /**
420 * Obtain the set of all known record IDs.
421 *
422 * @return Object with ID strings as keys and values of true. The values
423 * are ignored.
424 */
425 getAllIDs: function () {
426 throw "override getAllIDs in a subclass";
427 },
429 /**
430 * Wipe all data in the store.
431 *
432 * This function is called during remote wipes or when replacing local data
433 * with remote data.
434 *
435 * This function should delete all local data that the store is managing. It
436 * can be thought of as clearing out all state and restoring the "new
437 * browser" state.
438 */
439 wipe: function () {
440 throw "override wipe in a subclass";
441 }
442 };
444 this.EngineManager = function EngineManager(service) {
445 this.service = service;
447 this._engines = {};
449 // This will be populated by Service on startup.
450 this._declined = new Set();
451 this._log = Log.repository.getLogger("Sync.EngineManager");
452 this._log.level = Log.Level[Svc.Prefs.get("log.logger.service.engines", "Debug")];
453 }
454 EngineManager.prototype = {
455 get: function (name) {
456 // Return an array of engines if we have an array of names
457 if (Array.isArray(name)) {
458 let engines = [];
459 name.forEach(function(name) {
460 let engine = this.get(name);
461 if (engine) {
462 engines.push(engine);
463 }
464 }, this);
465 return engines;
466 }
468 let engine = this._engines[name];
469 if (!engine) {
470 this._log.debug("Could not get engine: " + name);
471 if (Object.keys) {
472 this._log.debug("Engines are: " + JSON.stringify(Object.keys(this._engines)));
473 }
474 }
475 return engine;
476 },
478 getAll: function () {
479 return [engine for ([name, engine] in Iterator(this._engines))];
480 },
482 /**
483 * N.B., does not pay attention to the declined list.
484 */
485 getEnabled: function () {
486 return this.getAll().filter(function(engine) engine.enabled);
487 },
489 get enabledEngineNames() {
490 return [e.name for each (e in this.getEnabled())];
491 },
493 persistDeclined: function () {
494 Svc.Prefs.set("declinedEngines", [...this._declined].join(","));
495 },
497 /**
498 * Returns an array.
499 */
500 getDeclined: function () {
501 return [...this._declined];
502 },
504 setDeclined: function (engines) {
505 this._declined = new Set(engines);
506 this.persistDeclined();
507 },
509 isDeclined: function (engineName) {
510 return this._declined.has(engineName);
511 },
513 /**
514 * Accepts a Set or an array.
515 */
516 decline: function (engines) {
517 for (let e of engines) {
518 this._declined.add(e);
519 }
520 this.persistDeclined();
521 },
523 undecline: function (engines) {
524 for (let e of engines) {
525 this._declined.delete(e);
526 }
527 this.persistDeclined();
528 },
530 /**
531 * Mark any non-enabled engines as declined.
532 *
533 * This is useful after initial customization during setup.
534 */
535 declineDisabled: function () {
536 for (let e of this.getAll()) {
537 if (!e.enabled) {
538 this._log.debug("Declining disabled engine " + e.name);
539 this._declined.add(e.name);
540 }
541 }
542 this.persistDeclined();
543 },
545 /**
546 * Register an Engine to the service. Alternatively, give an array of engine
547 * objects to register.
548 *
549 * @param engineObject
550 * Engine object used to get an instance of the engine
551 * @return The engine object if anything failed
552 */
553 register: function (engineObject) {
554 if (Array.isArray(engineObject)) {
555 return engineObject.map(this.register, this);
556 }
558 try {
559 let engine = new engineObject(this.service);
560 let name = engine.name;
561 if (name in this._engines) {
562 this._log.error("Engine '" + name + "' is already registered!");
563 } else {
564 this._engines[name] = engine;
565 }
566 } catch (ex) {
567 this._log.error(CommonUtils.exceptionStr(ex));
569 let mesg = ex.message ? ex.message : ex;
570 let name = engineObject || "";
571 name = name.prototype || "";
572 name = name.name || "";
574 let out = "Could not initialize engine '" + name + "': " + mesg;
575 this._log.error(out);
577 return engineObject;
578 }
579 },
581 unregister: function (val) {
582 let name = val;
583 if (val instanceof Engine) {
584 name = val.name;
585 }
586 delete this._engines[name];
587 },
589 clear: function () {
590 for (let name in this._engines) {
591 delete this._engines[name];
592 }
593 },
594 };
596 this.Engine = function Engine(name, service) {
597 if (!service) {
598 throw new Error("Engine must be associated with a Service instance.");
599 }
601 this.Name = name || "Unnamed";
602 this.name = name.toLowerCase();
603 this.service = service;
605 this._notify = Utils.notify("weave:engine:");
606 this._log = Log.repository.getLogger("Sync.Engine." + this.Name);
607 let level = Svc.Prefs.get("log.logger.engine." + this.name, "Debug");
608 this._log.level = Log.Level[level];
610 this._tracker; // initialize tracker to load previously changed IDs
611 this._log.debug("Engine initialized");
612 }
613 Engine.prototype = {
614 // _storeObj, and _trackerObj should to be overridden in subclasses
615 _storeObj: Store,
616 _trackerObj: Tracker,
618 // Local 'constant'.
619 // Signal to the engine that processing further records is pointless.
620 eEngineAbortApplyIncoming: "error.engine.abort.applyincoming",
622 get prefName() this.name,
623 get enabled() {
624 return Svc.Prefs.get("engine." + this.prefName, false);
625 },
627 set enabled(val) {
628 Svc.Prefs.set("engine." + this.prefName, !!val);
629 this._tracker.onEngineEnabledChanged(val);
630 },
632 get score() this._tracker.score,
634 get _store() {
635 let store = new this._storeObj(this.Name, this);
636 this.__defineGetter__("_store", function() store);
637 return store;
638 },
640 get _tracker() {
641 let tracker = new this._trackerObj(this.Name, this);
642 this.__defineGetter__("_tracker", function() tracker);
643 return tracker;
644 },
646 sync: function () {
647 if (!this.enabled) {
648 return;
649 }
651 if (!this._sync) {
652 throw "engine does not implement _sync method";
653 }
655 this._notify("sync", this.name, this._sync)();
656 },
658 /**
659 * Get rid of any local meta-data.
660 */
661 resetClient: function () {
662 if (!this._resetClient) {
663 throw "engine does not implement _resetClient method";
664 }
666 this._notify("reset-client", this.name, this._resetClient)();
667 },
669 _wipeClient: function () {
670 this.resetClient();
671 this._log.debug("Deleting all local data");
672 this._tracker.ignoreAll = true;
673 this._store.wipe();
674 this._tracker.ignoreAll = false;
675 this._tracker.clearChangedIDs();
676 },
678 wipeClient: function () {
679 this._notify("wipe-client", this.name, this._wipeClient)();
680 }
681 };
683 this.SyncEngine = function SyncEngine(name, service) {
684 Engine.call(this, name || "SyncEngine", service);
686 this.loadToFetch();
687 this.loadPreviousFailed();
688 }
690 // Enumeration to define approaches to handling bad records.
691 // Attached to the constructor to allow use as a kind of static enumeration.
692 SyncEngine.kRecoveryStrategy = {
693 ignore: "ignore",
694 retry: "retry",
695 error: "error"
696 };
698 SyncEngine.prototype = {
699 __proto__: Engine.prototype,
700 _recordObj: CryptoWrapper,
701 version: 1,
703 // How many records to pull in a single sync. This is primarily to avoid very
704 // long first syncs against profiles with many history records.
705 downloadLimit: null,
707 // How many records to pull at one time when specifying IDs. This is to avoid
708 // URI length limitations.
709 guidFetchBatchSize: DEFAULT_GUID_FETCH_BATCH_SIZE,
710 mobileGUIDFetchBatchSize: DEFAULT_MOBILE_GUID_FETCH_BATCH_SIZE,
712 // How many records to process in a single batch.
713 applyIncomingBatchSize: DEFAULT_STORE_BATCH_SIZE,
715 get storageURL() this.service.storageURL,
717 get engineURL() this.storageURL + this.name,
719 get cryptoKeysURL() this.storageURL + "crypto/keys",
721 get metaURL() this.storageURL + "meta/global",
723 get syncID() {
724 // Generate a random syncID if we don't have one
725 let syncID = Svc.Prefs.get(this.name + ".syncID", "");
726 return syncID == "" ? this.syncID = Utils.makeGUID() : syncID;
727 },
728 set syncID(value) {
729 Svc.Prefs.set(this.name + ".syncID", value);
730 },
732 /*
733 * lastSync is a timestamp in server time.
734 */
735 get lastSync() {
736 return parseFloat(Svc.Prefs.get(this.name + ".lastSync", "0"));
737 },
738 set lastSync(value) {
739 // Reset the pref in-case it's a number instead of a string
740 Svc.Prefs.reset(this.name + ".lastSync");
741 // Store the value as a string to keep floating point precision
742 Svc.Prefs.set(this.name + ".lastSync", value.toString());
743 },
744 resetLastSync: function () {
745 this._log.debug("Resetting " + this.name + " last sync time");
746 Svc.Prefs.reset(this.name + ".lastSync");
747 Svc.Prefs.set(this.name + ".lastSync", "0");
748 this.lastSyncLocal = 0;
749 },
751 get toFetch() this._toFetch,
752 set toFetch(val) {
753 let cb = (error) => this._log.error(Utils.exceptionStr(error));
754 // Coerce the array to a string for more efficient comparison.
755 if (val + "" == this._toFetch) {
756 return;
757 }
758 this._toFetch = val;
759 Utils.namedTimer(function () {
760 Utils.jsonSave("toFetch/" + this.name, this, val, cb);
761 }, 0, this, "_toFetchDelay");
762 },
764 loadToFetch: function () {
765 // Initialize to empty if there's no file.
766 this._toFetch = [];
767 Utils.jsonLoad("toFetch/" + this.name, this, function(toFetch) {
768 if (toFetch) {
769 this._toFetch = toFetch;
770 }
771 });
772 },
774 get previousFailed() this._previousFailed,
775 set previousFailed(val) {
776 let cb = (error) => this._log.error(Utils.exceptionStr(error));
777 // Coerce the array to a string for more efficient comparison.
778 if (val + "" == this._previousFailed) {
779 return;
780 }
781 this._previousFailed = val;
782 Utils.namedTimer(function () {
783 Utils.jsonSave("failed/" + this.name, this, val, cb);
784 }, 0, this, "_previousFailedDelay");
785 },
787 loadPreviousFailed: function () {
788 // Initialize to empty if there's no file
789 this._previousFailed = [];
790 Utils.jsonLoad("failed/" + this.name, this, function(previousFailed) {
791 if (previousFailed) {
792 this._previousFailed = previousFailed;
793 }
794 });
795 },
797 /*
798 * lastSyncLocal is a timestamp in local time.
799 */
800 get lastSyncLocal() {
801 return parseInt(Svc.Prefs.get(this.name + ".lastSyncLocal", "0"), 10);
802 },
803 set lastSyncLocal(value) {
804 // Store as a string because pref can only store C longs as numbers.
805 Svc.Prefs.set(this.name + ".lastSyncLocal", value.toString());
806 },
808 /*
809 * Returns a mapping of IDs -> changed timestamp. Engine implementations
810 * can override this method to bypass the tracker for certain or all
811 * changed items.
812 */
813 getChangedIDs: function () {
814 return this._tracker.changedIDs;
815 },
817 // Create a new record using the store and add in crypto fields.
818 _createRecord: function (id) {
819 let record = this._store.createRecord(id, this.name);
820 record.id = id;
821 record.collection = this.name;
822 return record;
823 },
825 // Any setup that needs to happen at the beginning of each sync.
826 _syncStartup: function () {
828 // Determine if we need to wipe on outdated versions
829 let metaGlobal = this.service.recordManager.get(this.metaURL);
830 let engines = metaGlobal.payload.engines || {};
831 let engineData = engines[this.name] || {};
833 let needsWipe = false;
835 // Assume missing versions are 0 and wipe the server
836 if ((engineData.version || 0) < this.version) {
837 this._log.debug("Old engine data: " + [engineData.version, this.version]);
839 // Prepare to clear the server and upload everything
840 needsWipe = true;
841 this.syncID = "";
843 // Set the newer version and newly generated syncID
844 engineData.version = this.version;
845 engineData.syncID = this.syncID;
847 // Put the new data back into meta/global and mark for upload
848 engines[this.name] = engineData;
849 metaGlobal.payload.engines = engines;
850 metaGlobal.changed = true;
851 }
852 // Don't sync this engine if the server has newer data
853 else if (engineData.version > this.version) {
854 let error = new String("New data: " + [engineData.version, this.version]);
855 error.failureCode = VERSION_OUT_OF_DATE;
856 throw error;
857 }
858 // Changes to syncID mean we'll need to upload everything
859 else if (engineData.syncID != this.syncID) {
860 this._log.debug("Engine syncIDs: " + [engineData.syncID, this.syncID]);
861 this.syncID = engineData.syncID;
862 this._resetClient();
863 };
865 // Delete any existing data and reupload on bad version or missing meta.
866 // No crypto component here...? We could regenerate per-collection keys...
867 if (needsWipe) {
868 this.wipeServer();
869 }
871 // Save objects that need to be uploaded in this._modified. We also save
872 // the timestamp of this fetch in this.lastSyncLocal. As we successfully
873 // upload objects we remove them from this._modified. If an error occurs
874 // or any objects fail to upload, they will remain in this._modified. At
875 // the end of a sync, or after an error, we add all objects remaining in
876 // this._modified to the tracker.
877 this.lastSyncLocal = Date.now();
878 if (this.lastSync) {
879 this._modified = this.getChangedIDs();
880 } else {
881 // Mark all items to be uploaded, but treat them as changed from long ago
882 this._log.debug("First sync, uploading all items");
883 this._modified = {};
884 for (let id in this._store.getAllIDs()) {
885 this._modified[id] = 0;
886 }
887 }
888 // Clear the tracker now. If the sync fails we'll add the ones we failed
889 // to upload back.
890 this._tracker.clearChangedIDs();
892 this._log.info(Object.keys(this._modified).length +
893 " outgoing items pre-reconciliation");
895 // Keep track of what to delete at the end of sync
896 this._delete = {};
897 },
899 /**
900 * A tiny abstraction to make it easier to test incoming record
901 * application.
902 */
903 _itemSource: function () {
904 return new Collection(this.engineURL, this._recordObj, this.service);
905 },
907 /**
908 * Process incoming records.
909 * In the most awful and untestable way possible.
910 * This now accepts something that makes testing vaguely less impossible.
911 */
912 _processIncoming: function (newitems) {
913 this._log.trace("Downloading & applying server changes");
915 // Figure out how many total items to fetch this sync; do less on mobile.
916 let batchSize = this.downloadLimit || Infinity;
917 let isMobile = (Svc.Prefs.get("client.type") == "mobile");
919 if (!newitems) {
920 newitems = this._itemSource();
921 }
923 if (isMobile) {
924 batchSize = MOBILE_BATCH_SIZE;
925 }
926 newitems.newer = this.lastSync;
927 newitems.full = true;
928 newitems.limit = batchSize;
930 // applied => number of items that should be applied.
931 // failed => number of items that failed in this sync.
932 // newFailed => number of items that failed for the first time in this sync.
933 // reconciled => number of items that were reconciled.
934 let count = {applied: 0, failed: 0, newFailed: 0, reconciled: 0};
935 let handled = [];
936 let applyBatch = [];
937 let failed = [];
938 let failedInPreviousSync = this.previousFailed;
939 let fetchBatch = Utils.arrayUnion(this.toFetch, failedInPreviousSync);
940 // Reset previousFailed for each sync since previously failed items may not fail again.
941 this.previousFailed = [];
943 // Used (via exceptions) to allow the record handler/reconciliation/etc.
944 // methods to signal that they would like processing of incoming records to
945 // cease.
946 let aborting = undefined;
948 function doApplyBatch() {
949 this._tracker.ignoreAll = true;
950 try {
951 failed = failed.concat(this._store.applyIncomingBatch(applyBatch));
952 } catch (ex) {
953 // Catch any error that escapes from applyIncomingBatch. At present
954 // those will all be abort events.
955 this._log.warn("Got exception " + Utils.exceptionStr(ex) +
956 ", aborting processIncoming.");
957 aborting = ex;
958 }
959 this._tracker.ignoreAll = false;
960 applyBatch = [];
961 }
963 function doApplyBatchAndPersistFailed() {
964 // Apply remaining batch.
965 if (applyBatch.length) {
966 doApplyBatch.call(this);
967 }
968 // Persist failed items so we refetch them.
969 if (failed.length) {
970 this.previousFailed = Utils.arrayUnion(failed, this.previousFailed);
971 count.failed += failed.length;
972 this._log.debug("Records that failed to apply: " + failed);
973 failed = [];
974 }
975 }
977 let key = this.service.collectionKeys.keyForCollection(this.name);
979 // Not binding this method to 'this' for performance reasons. It gets
980 // called for every incoming record.
981 let self = this;
983 newitems.recordHandler = function(item) {
984 if (aborting) {
985 return;
986 }
988 // Grab a later last modified if possible
989 if (self.lastModified == null || item.modified > self.lastModified)
990 self.lastModified = item.modified;
992 // Track the collection for the WBO.
993 item.collection = self.name;
995 // Remember which records were processed
996 handled.push(item.id);
998 try {
999 try {
1000 item.decrypt(key);
1001 } catch (ex if Utils.isHMACMismatch(ex)) {
1002 let strategy = self.handleHMACMismatch(item, true);
1003 if (strategy == SyncEngine.kRecoveryStrategy.retry) {
1004 // You only get one retry.
1005 try {
1006 // Try decrypting again, typically because we've got new keys.
1007 self._log.info("Trying decrypt again...");
1008 key = self.service.collectionKeys.keyForCollection(self.name);
1009 item.decrypt(key);
1010 strategy = null;
1011 } catch (ex if Utils.isHMACMismatch(ex)) {
1012 strategy = self.handleHMACMismatch(item, false);
1013 }
1014 }
1016 switch (strategy) {
1017 case null:
1018 // Retry succeeded! No further handling.
1019 break;
1020 case SyncEngine.kRecoveryStrategy.retry:
1021 self._log.debug("Ignoring second retry suggestion.");
1022 // Fall through to error case.
1023 case SyncEngine.kRecoveryStrategy.error:
1024 self._log.warn("Error decrypting record: " + Utils.exceptionStr(ex));
1025 failed.push(item.id);
1026 return;
1027 case SyncEngine.kRecoveryStrategy.ignore:
1028 self._log.debug("Ignoring record " + item.id +
1029 " with bad HMAC: already handled.");
1030 return;
1031 }
1032 }
1033 } catch (ex) {
1034 self._log.warn("Error decrypting record: " + Utils.exceptionStr(ex));
1035 failed.push(item.id);
1036 return;
1037 }
1039 let shouldApply;
1040 try {
1041 shouldApply = self._reconcile(item);
1042 } catch (ex if (ex.code == Engine.prototype.eEngineAbortApplyIncoming)) {
1043 self._log.warn("Reconciliation failed: aborting incoming processing.");
1044 failed.push(item.id);
1045 aborting = ex.cause;
1046 } catch (ex) {
1047 self._log.warn("Failed to reconcile incoming record " + item.id);
1048 self._log.warn("Encountered exception: " + Utils.exceptionStr(ex));
1049 failed.push(item.id);
1050 return;
1051 }
1053 if (shouldApply) {
1054 count.applied++;
1055 applyBatch.push(item);
1056 } else {
1057 count.reconciled++;
1058 self._log.trace("Skipping reconciled incoming item " + item.id);
1059 }
1061 if (applyBatch.length == self.applyIncomingBatchSize) {
1062 doApplyBatch.call(self);
1063 }
1064 self._store._sleep(0);
1065 };
1067 // Only bother getting data from the server if there's new things
1068 if (this.lastModified == null || this.lastModified > this.lastSync) {
1069 let resp = newitems.get();
1070 doApplyBatchAndPersistFailed.call(this);
1071 if (!resp.success) {
1072 resp.failureCode = ENGINE_DOWNLOAD_FAIL;
1073 throw resp;
1074 }
1076 if (aborting) {
1077 throw aborting;
1078 }
1079 }
1081 // Mobile: check if we got the maximum that we requested; get the rest if so.
1082 if (handled.length == newitems.limit) {
1083 let guidColl = new Collection(this.engineURL, null, this.service);
1085 // Sort and limit so that on mobile we only get the last X records.
1086 guidColl.limit = this.downloadLimit;
1087 guidColl.newer = this.lastSync;
1089 // index: Orders by the sortindex descending (highest weight first).
1090 guidColl.sort = "index";
1092 let guids = guidColl.get();
1093 if (!guids.success)
1094 throw guids;
1096 // Figure out which guids weren't just fetched then remove any guids that
1097 // were already waiting and prepend the new ones
1098 let extra = Utils.arraySub(guids.obj, handled);
1099 if (extra.length > 0) {
1100 fetchBatch = Utils.arrayUnion(extra, fetchBatch);
1101 this.toFetch = Utils.arrayUnion(extra, this.toFetch);
1102 }
1103 }
1105 // Fast-foward the lastSync timestamp since we have stored the
1106 // remaining items in toFetch.
1107 if (this.lastSync < this.lastModified) {
1108 this.lastSync = this.lastModified;
1109 }
1111 // Process any backlog of GUIDs.
1112 // At this point we impose an upper limit on the number of items to fetch
1113 // in a single request, even for desktop, to avoid hitting URI limits.
1114 batchSize = isMobile ? this.mobileGUIDFetchBatchSize :
1115 this.guidFetchBatchSize;
1117 while (fetchBatch.length && !aborting) {
1118 // Reuse the original query, but get rid of the restricting params
1119 // and batch remaining records.
1120 newitems.limit = 0;
1121 newitems.newer = 0;
1122 newitems.ids = fetchBatch.slice(0, batchSize);
1124 // Reuse the existing record handler set earlier
1125 let resp = newitems.get();
1126 if (!resp.success) {
1127 resp.failureCode = ENGINE_DOWNLOAD_FAIL;
1128 throw resp;
1129 }
1131 // This batch was successfully applied. Not using
1132 // doApplyBatchAndPersistFailed() here to avoid writing toFetch twice.
1133 fetchBatch = fetchBatch.slice(batchSize);
1134 this.toFetch = Utils.arraySub(this.toFetch, newitems.ids);
1135 this.previousFailed = Utils.arrayUnion(this.previousFailed, failed);
1136 if (failed.length) {
1137 count.failed += failed.length;
1138 this._log.debug("Records that failed to apply: " + failed);
1139 }
1140 failed = [];
1142 if (aborting) {
1143 throw aborting;
1144 }
1146 if (this.lastSync < this.lastModified) {
1147 this.lastSync = this.lastModified;
1148 }
1149 }
1151 // Apply remaining items.
1152 doApplyBatchAndPersistFailed.call(this);
1154 count.newFailed = Utils.arraySub(this.previousFailed, failedInPreviousSync).length;
1155 count.succeeded = Math.max(0, count.applied - count.failed);
1156 this._log.info(["Records:",
1157 count.applied, "applied,",
1158 count.succeeded, "successfully,",
1159 count.failed, "failed to apply,",
1160 count.newFailed, "newly failed to apply,",
1161 count.reconciled, "reconciled."].join(" "));
1162 Observers.notify("weave:engine:sync:applied", count, this.name);
1163 },
1165 /**
1166 * Find a GUID of an item that is a duplicate of the incoming item but happens
1167 * to have a different GUID
1168 *
1169 * @return GUID of the similar item; falsy otherwise
1170 */
1171 _findDupe: function (item) {
1172 // By default, assume there's no dupe items for the engine
1173 },
1175 _deleteId: function (id) {
1176 this._tracker.removeChangedID(id);
1178 // Remember this id to delete at the end of sync
1179 if (this._delete.ids == null)
1180 this._delete.ids = [id];
1181 else
1182 this._delete.ids.push(id);
1183 },
1185 /**
1186 * Reconcile incoming record with local state.
1187 *
1188 * This function essentially determines whether to apply an incoming record.
1189 *
1190 * @param item
1191 * Record from server to be tested for application.
1192 * @return boolean
1193 * Truthy if incoming record should be applied. False if not.
1194 */
1195 _reconcile: function (item) {
1196 if (this._log.level <= Log.Level.Trace) {
1197 this._log.trace("Incoming: " + item);
1198 }
1200 // We start reconciling by collecting a bunch of state. We do this here
1201 // because some state may change during the course of this function and we
1202 // need to operate on the original values.
1203 let existsLocally = this._store.itemExists(item.id);
1204 let locallyModified = item.id in this._modified;
1206 // TODO Handle clock drift better. Tracked in bug 721181.
1207 let remoteAge = AsyncResource.serverTime - item.modified;
1208 let localAge = locallyModified ?
1209 (Date.now() / 1000 - this._modified[item.id]) : null;
1210 let remoteIsNewer = remoteAge < localAge;
1212 this._log.trace("Reconciling " + item.id + ". exists=" +
1213 existsLocally + "; modified=" + locallyModified +
1214 "; local age=" + localAge + "; incoming age=" +
1215 remoteAge);
1217 // We handle deletions first so subsequent logic doesn't have to check
1218 // deleted flags.
1219 if (item.deleted) {
1220 // If the item doesn't exist locally, there is nothing for us to do. We
1221 // can't check for duplicates because the incoming record has no data
1222 // which can be used for duplicate detection.
1223 if (!existsLocally) {
1224 this._log.trace("Ignoring incoming item because it was deleted and " +
1225 "the item does not exist locally.");
1226 return false;
1227 }
1229 // We decide whether to process the deletion by comparing the record
1230 // ages. If the item is not modified locally, the remote side wins and
1231 // the deletion is processed. If it is modified locally, we take the
1232 // newer record.
1233 if (!locallyModified) {
1234 this._log.trace("Applying incoming delete because the local item " +
1235 "exists and isn't modified.");
1236 return true;
1237 }
1239 // TODO As part of bug 720592, determine whether we should do more here.
1240 // In the case where the local changes are newer, it is quite possible
1241 // that the local client will restore data a remote client had tried to
1242 // delete. There might be a good reason for that delete and it might be
1243 // enexpected for this client to restore that data.
1244 this._log.trace("Incoming record is deleted but we had local changes. " +
1245 "Applying the youngest record.");
1246 return remoteIsNewer;
1247 }
1249 // At this point the incoming record is not for a deletion and must have
1250 // data. If the incoming record does not exist locally, we check for a local
1251 // duplicate existing under a different ID. The default implementation of
1252 // _findDupe() is empty, so engines have to opt in to this functionality.
1253 //
1254 // If we find a duplicate, we change the local ID to the incoming ID and we
1255 // refresh the metadata collected above. See bug 710448 for the history
1256 // of this logic.
1257 if (!existsLocally) {
1258 let dupeID = this._findDupe(item);
1259 if (dupeID) {
1260 this._log.trace("Local item " + dupeID + " is a duplicate for " +
1261 "incoming item " + item.id);
1263 // The local, duplicate ID is always deleted on the server.
1264 this._deleteId(dupeID);
1266 // The current API contract does not mandate that the ID returned by
1267 // _findDupe() actually exists. Therefore, we have to perform this
1268 // check.
1269 existsLocally = this._store.itemExists(dupeID);
1271 // We unconditionally change the item's ID in case the engine knows of
1272 // an item but doesn't expose it through itemExists. If the API
1273 // contract were stronger, this could be changed.
1274 this._log.debug("Switching local ID to incoming: " + dupeID + " -> " +
1275 item.id);
1276 this._store.changeItemID(dupeID, item.id);
1278 // If the local item was modified, we carry its metadata forward so
1279 // appropriate reconciling can be performed.
1280 if (dupeID in this._modified) {
1281 locallyModified = true;
1282 localAge = Date.now() / 1000 - this._modified[dupeID];
1283 remoteIsNewer = remoteAge < localAge;
1285 this._modified[item.id] = this._modified[dupeID];
1286 delete this._modified[dupeID];
1287 } else {
1288 locallyModified = false;
1289 localAge = null;
1290 }
1292 this._log.debug("Local item after duplication: age=" + localAge +
1293 "; modified=" + locallyModified + "; exists=" +
1294 existsLocally);
1295 } else {
1296 this._log.trace("No duplicate found for incoming item: " + item.id);
1297 }
1298 }
1300 // At this point we've performed duplicate detection. But, nothing here
1301 // should depend on duplicate detection as the above should have updated
1302 // state seamlessly.
1304 if (!existsLocally) {
1305 // If the item doesn't exist locally and we have no local modifications
1306 // to the item (implying that it was not deleted), always apply the remote
1307 // item.
1308 if (!locallyModified) {
1309 this._log.trace("Applying incoming because local item does not exist " +
1310 "and was not deleted.");
1311 return true;
1312 }
1314 // If the item was modified locally but isn't present, it must have
1315 // been deleted. If the incoming record is younger, we restore from
1316 // that record.
1317 if (remoteIsNewer) {
1318 this._log.trace("Applying incoming because local item was deleted " +
1319 "before the incoming item was changed.");
1320 delete this._modified[item.id];
1321 return true;
1322 }
1324 this._log.trace("Ignoring incoming item because the local item's " +
1325 "deletion is newer.");
1326 return false;
1327 }
1329 // If the remote and local records are the same, there is nothing to be
1330 // done, so we don't do anything. In the ideal world, this logic wouldn't
1331 // be here and the engine would take a record and apply it. The reason we
1332 // want to defer this logic is because it would avoid a redundant and
1333 // possibly expensive dip into the storage layer to query item state.
1334 // This should get addressed in the async rewrite, so we ignore it for now.
1335 let localRecord = this._createRecord(item.id);
1336 let recordsEqual = Utils.deepEquals(item.cleartext,
1337 localRecord.cleartext);
1339 // If the records are the same, we don't need to do anything. This does
1340 // potentially throw away a local modification time. But, if the records
1341 // are the same, does it matter?
1342 if (recordsEqual) {
1343 this._log.trace("Ignoring incoming item because the local item is " +
1344 "identical.");
1346 delete this._modified[item.id];
1347 return false;
1348 }
1350 // At this point the records are different.
1352 // If we have no local modifications, always take the server record.
1353 if (!locallyModified) {
1354 this._log.trace("Applying incoming record because no local conflicts.");
1355 return true;
1356 }
1358 // At this point, records are different and the local record is modified.
1359 // We resolve conflicts by record age, where the newest one wins. This does
1360 // result in data loss and should be handled by giving the engine an
1361 // opportunity to merge the records. Bug 720592 tracks this feature.
1362 this._log.warn("DATA LOSS: Both local and remote changes to record: " +
1363 item.id);
1364 return remoteIsNewer;
1365 },
1367 // Upload outgoing records.
1368 _uploadOutgoing: function () {
1369 this._log.trace("Uploading local changes to server.");
1371 let modifiedIDs = Object.keys(this._modified);
1372 if (modifiedIDs.length) {
1373 this._log.trace("Preparing " + modifiedIDs.length +
1374 " outgoing records");
1376 // collection we'll upload
1377 let up = new Collection(this.engineURL, null, this.service);
1378 let count = 0;
1380 // Upload what we've got so far in the collection
1381 let doUpload = Utils.bind2(this, function(desc) {
1382 this._log.info("Uploading " + desc + " of " + modifiedIDs.length +
1383 " records");
1384 let resp = up.post();
1385 if (!resp.success) {
1386 this._log.debug("Uploading records failed: " + resp);
1387 resp.failureCode = ENGINE_UPLOAD_FAIL;
1388 throw resp;
1389 }
1391 // Update server timestamp from the upload.
1392 let modified = resp.headers["x-weave-timestamp"];
1393 if (modified > this.lastSync)
1394 this.lastSync = modified;
1396 let failed_ids = Object.keys(resp.obj.failed);
1397 if (failed_ids.length)
1398 this._log.debug("Records that will be uploaded again because "
1399 + "the server couldn't store them: "
1400 + failed_ids.join(", "));
1402 // Clear successfully uploaded objects.
1403 for each (let id in resp.obj.success) {
1404 delete this._modified[id];
1405 }
1407 up.clearRecords();
1408 });
1410 for each (let id in modifiedIDs) {
1411 try {
1412 let out = this._createRecord(id);
1413 if (this._log.level <= Log.Level.Trace)
1414 this._log.trace("Outgoing: " + out);
1416 out.encrypt(this.service.collectionKeys.keyForCollection(this.name));
1417 up.pushData(out);
1418 }
1419 catch(ex) {
1420 this._log.warn("Error creating record: " + Utils.exceptionStr(ex));
1421 }
1423 // Partial upload
1424 if ((++count % MAX_UPLOAD_RECORDS) == 0)
1425 doUpload((count - MAX_UPLOAD_RECORDS) + " - " + count + " out");
1427 this._store._sleep(0);
1428 }
1430 // Final upload
1431 if (count % MAX_UPLOAD_RECORDS > 0)
1432 doUpload(count >= MAX_UPLOAD_RECORDS ? "last batch" : "all");
1433 }
1434 },
1436 // Any cleanup necessary.
1437 // Save the current snapshot so as to calculate changes at next sync
1438 _syncFinish: function () {
1439 this._log.trace("Finishing up sync");
1440 this._tracker.resetScore();
1442 let doDelete = Utils.bind2(this, function(key, val) {
1443 let coll = new Collection(this.engineURL, this._recordObj, this.service);
1444 coll[key] = val;
1445 coll.delete();
1446 });
1448 for (let [key, val] in Iterator(this._delete)) {
1449 // Remove the key for future uses
1450 delete this._delete[key];
1452 // Send a simple delete for the property
1453 if (key != "ids" || val.length <= 100)
1454 doDelete(key, val);
1455 else {
1456 // For many ids, split into chunks of at most 100
1457 while (val.length > 0) {
1458 doDelete(key, val.slice(0, 100));
1459 val = val.slice(100);
1460 }
1461 }
1462 }
1463 },
1465 _syncCleanup: function () {
1466 if (!this._modified) {
1467 return;
1468 }
1470 // Mark failed WBOs as changed again so they are reuploaded next time.
1471 for (let [id, when] in Iterator(this._modified)) {
1472 this._tracker.addChangedID(id, when);
1473 }
1474 this._modified = {};
1475 },
1477 _sync: function () {
1478 try {
1479 this._syncStartup();
1480 Observers.notify("weave:engine:sync:status", "process-incoming");
1481 this._processIncoming();
1482 Observers.notify("weave:engine:sync:status", "upload-outgoing");
1483 this._uploadOutgoing();
1484 this._syncFinish();
1485 } finally {
1486 this._syncCleanup();
1487 }
1488 },
1490 canDecrypt: function () {
1491 // Report failure even if there's nothing to decrypt
1492 let canDecrypt = false;
1494 // Fetch the most recently uploaded record and try to decrypt it
1495 let test = new Collection(this.engineURL, this._recordObj, this.service);
1496 test.limit = 1;
1497 test.sort = "newest";
1498 test.full = true;
1500 let key = this.service.collectionKeys.keyForCollection(this.name);
1501 test.recordHandler = function recordHandler(record) {
1502 record.decrypt(key);
1503 canDecrypt = true;
1504 }.bind(this);
1506 // Any failure fetching/decrypting will just result in false
1507 try {
1508 this._log.trace("Trying to decrypt a record from the server..");
1509 test.get();
1510 }
1511 catch(ex) {
1512 this._log.debug("Failed test decrypt: " + Utils.exceptionStr(ex));
1513 }
1515 return canDecrypt;
1516 },
1518 _resetClient: function () {
1519 this.resetLastSync();
1520 this.previousFailed = [];
1521 this.toFetch = [];
1522 },
1524 wipeServer: function () {
1525 let response = this.service.resource(this.engineURL).delete();
1526 if (response.status != 200 && response.status != 404) {
1527 throw response;
1528 }
1529 this._resetClient();
1530 },
1532 removeClientData: function () {
1533 // Implement this method in engines that store client specific data
1534 // on the server.
1535 },
1537 /*
1538 * Decide on (and partially effect) an error-handling strategy.
1539 *
1540 * Asks the Service to respond to an HMAC error, which might result in keys
1541 * being downloaded. That call returns true if an action which might allow a
1542 * retry to occur.
1543 *
1544 * If `mayRetry` is truthy, and the Service suggests a retry,
1545 * handleHMACMismatch returns kRecoveryStrategy.retry. Otherwise, it returns
1546 * kRecoveryStrategy.error.
1547 *
1548 * Subclasses of SyncEngine can override this method to allow for different
1549 * behavior -- e.g., to delete and ignore erroneous entries.
1550 *
1551 * All return values will be part of the kRecoveryStrategy enumeration.
1552 */
1553 handleHMACMismatch: function (item, mayRetry) {
1554 // By default we either try again, or bail out noisily.
1555 return (this.service.handleHMACEvent() && mayRetry) ?
1556 SyncEngine.kRecoveryStrategy.retry :
1557 SyncEngine.kRecoveryStrategy.error;
1558 }
1559 };