|
1 /* This Source Code Form is subject to the terms of the Mozilla Public |
|
2 * License, v. 2.0. If a copy of the MPL was not distributed with this |
|
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ |
|
4 |
|
5 this.EXPORTED_SYMBOLS = [ |
|
6 "EngineManager", |
|
7 "Engine", |
|
8 "SyncEngine", |
|
9 "Tracker", |
|
10 "Store" |
|
11 ]; |
|
12 |
|
13 const {classes: Cc, interfaces: Ci, results: Cr, utils: Cu} = Components; |
|
14 |
|
15 Cu.import("resource://services-common/async.js"); |
|
16 Cu.import("resource://gre/modules/Log.jsm"); |
|
17 Cu.import("resource://services-common/observers.js"); |
|
18 Cu.import("resource://services-common/utils.js"); |
|
19 Cu.import("resource://services-sync/constants.js"); |
|
20 Cu.import("resource://services-sync/identity.js"); |
|
21 Cu.import("resource://services-sync/record.js"); |
|
22 Cu.import("resource://services-sync/resource.js"); |
|
23 Cu.import("resource://services-sync/util.js"); |
|
24 |
|
25 /* |
|
26 * Trackers are associated with a single engine and deal with |
|
27 * listening for changes to their particular data type. |
|
28 * |
|
29 * There are two things they keep track of: |
|
30 * 1) A score, indicating how urgently the engine wants to sync |
|
31 * 2) A list of IDs for all the changed items that need to be synced |
|
32 * and updating their 'score', indicating how urgently they |
|
33 * want to sync. |
|
34 * |
|
35 */ |
|
36 this.Tracker = function Tracker(name, engine) { |
|
37 if (!engine) { |
|
38 throw new Error("Tracker must be associated with an Engine instance."); |
|
39 } |
|
40 |
|
41 name = name || "Unnamed"; |
|
42 this.name = this.file = name.toLowerCase(); |
|
43 this.engine = engine; |
|
44 |
|
45 this._log = Log.repository.getLogger("Sync.Tracker." + name); |
|
46 let level = Svc.Prefs.get("log.logger.engine." + this.name, "Debug"); |
|
47 this._log.level = Log.Level[level]; |
|
48 |
|
49 this._score = 0; |
|
50 this._ignored = []; |
|
51 this.ignoreAll = false; |
|
52 this.changedIDs = {}; |
|
53 this.loadChangedIDs(); |
|
54 |
|
55 Svc.Obs.add("weave:engine:start-tracking", this); |
|
56 Svc.Obs.add("weave:engine:stop-tracking", this); |
|
57 }; |
|
58 |
|
59 Tracker.prototype = { |
|
60 /* |
|
61 * Score can be called as often as desired to decide which engines to sync |
|
62 * |
|
63 * Valid values for score: |
|
64 * -1: Do not sync unless the user specifically requests it (almost disabled) |
|
65 * 0: Nothing has changed |
|
66 * 100: Please sync me ASAP! |
|
67 * |
|
68 * Setting it to other values should (but doesn't currently) throw an exception |
|
69 */ |
|
70 get score() { |
|
71 return this._score; |
|
72 }, |
|
73 |
|
74 set score(value) { |
|
75 this._score = value; |
|
76 Observers.notify("weave:engine:score:updated", this.name); |
|
77 }, |
|
78 |
|
79 // Should be called by service everytime a sync has been done for an engine |
|
80 resetScore: function () { |
|
81 this._score = 0; |
|
82 }, |
|
83 |
|
84 persistChangedIDs: true, |
|
85 |
|
86 /** |
|
87 * Persist changedIDs to disk at a later date. |
|
88 * Optionally pass a callback to be invoked when the write has occurred. |
|
89 */ |
|
90 saveChangedIDs: function (cb) { |
|
91 if (!this.persistChangedIDs) { |
|
92 this._log.debug("Not saving changedIDs."); |
|
93 return; |
|
94 } |
|
95 Utils.namedTimer(function () { |
|
96 this._log.debug("Saving changed IDs to " + this.file); |
|
97 Utils.jsonSave("changes/" + this.file, this, this.changedIDs, cb); |
|
98 }, 1000, this, "_lazySave"); |
|
99 }, |
|
100 |
|
101 loadChangedIDs: function (cb) { |
|
102 Utils.jsonLoad("changes/" + this.file, this, function(json) { |
|
103 if (json && (typeof(json) == "object")) { |
|
104 this.changedIDs = json; |
|
105 } else { |
|
106 this._log.warn("Changed IDs file " + this.file + " contains non-object value."); |
|
107 json = null; |
|
108 } |
|
109 if (cb) { |
|
110 cb.call(this, json); |
|
111 } |
|
112 }); |
|
113 }, |
|
114 |
|
115 // ignore/unignore specific IDs. Useful for ignoring items that are |
|
116 // being processed, or that shouldn't be synced. |
|
117 // But note: not persisted to disk |
|
118 |
|
119 ignoreID: function (id) { |
|
120 this.unignoreID(id); |
|
121 this._ignored.push(id); |
|
122 }, |
|
123 |
|
124 unignoreID: function (id) { |
|
125 let index = this._ignored.indexOf(id); |
|
126 if (index != -1) |
|
127 this._ignored.splice(index, 1); |
|
128 }, |
|
129 |
|
130 addChangedID: function (id, when) { |
|
131 if (!id) { |
|
132 this._log.warn("Attempted to add undefined ID to tracker"); |
|
133 return false; |
|
134 } |
|
135 |
|
136 if (this.ignoreAll || (id in this._ignored)) { |
|
137 return false; |
|
138 } |
|
139 |
|
140 // Default to the current time in seconds if no time is provided. |
|
141 if (when == null) { |
|
142 when = Math.floor(Date.now() / 1000); |
|
143 } |
|
144 |
|
145 // Add/update the entry if we have a newer time. |
|
146 if ((this.changedIDs[id] || -Infinity) < when) { |
|
147 this._log.trace("Adding changed ID: " + id + ", " + when); |
|
148 this.changedIDs[id] = when; |
|
149 this.saveChangedIDs(this.onSavedChangedIDs); |
|
150 } |
|
151 |
|
152 return true; |
|
153 }, |
|
154 |
|
155 removeChangedID: function (id) { |
|
156 if (!id) { |
|
157 this._log.warn("Attempted to remove undefined ID to tracker"); |
|
158 return false; |
|
159 } |
|
160 if (this.ignoreAll || (id in this._ignored)) |
|
161 return false; |
|
162 if (this.changedIDs[id] != null) { |
|
163 this._log.trace("Removing changed ID " + id); |
|
164 delete this.changedIDs[id]; |
|
165 this.saveChangedIDs(); |
|
166 } |
|
167 return true; |
|
168 }, |
|
169 |
|
170 clearChangedIDs: function () { |
|
171 this._log.trace("Clearing changed ID list"); |
|
172 this.changedIDs = {}; |
|
173 this.saveChangedIDs(); |
|
174 }, |
|
175 |
|
176 _isTracking: false, |
|
177 |
|
178 // Override these in your subclasses. |
|
179 startTracking: function () { |
|
180 }, |
|
181 |
|
182 stopTracking: function () { |
|
183 }, |
|
184 |
|
185 engineIsEnabled: function () { |
|
186 if (!this.engine) { |
|
187 // Can't tell -- we must be running in a test! |
|
188 return true; |
|
189 } |
|
190 return this.engine.enabled; |
|
191 }, |
|
192 |
|
193 onEngineEnabledChanged: function (engineEnabled) { |
|
194 if (engineEnabled == this._isTracking) { |
|
195 return; |
|
196 } |
|
197 |
|
198 if (engineEnabled) { |
|
199 this.startTracking(); |
|
200 this._isTracking = true; |
|
201 } else { |
|
202 this.stopTracking(); |
|
203 this._isTracking = false; |
|
204 this.clearChangedIDs(); |
|
205 } |
|
206 }, |
|
207 |
|
208 observe: function (subject, topic, data) { |
|
209 switch (topic) { |
|
210 case "weave:engine:start-tracking": |
|
211 if (!this.engineIsEnabled()) { |
|
212 return; |
|
213 } |
|
214 this._log.trace("Got start-tracking."); |
|
215 if (!this._isTracking) { |
|
216 this.startTracking(); |
|
217 this._isTracking = true; |
|
218 } |
|
219 return; |
|
220 case "weave:engine:stop-tracking": |
|
221 this._log.trace("Got stop-tracking."); |
|
222 if (this._isTracking) { |
|
223 this.stopTracking(); |
|
224 this._isTracking = false; |
|
225 } |
|
226 return; |
|
227 } |
|
228 } |
|
229 }; |
|
230 |
|
231 |
|
232 |
|
233 /** |
|
234 * The Store serves as the interface between Sync and stored data. |
|
235 * |
|
236 * The name "store" is slightly a misnomer because it doesn't actually "store" |
|
237 * anything. Instead, it serves as a gateway to something that actually does |
|
238 * the "storing." |
|
239 * |
|
240 * The store is responsible for record management inside an engine. It tells |
|
241 * Sync what items are available for Sync, converts items to and from Sync's |
|
242 * record format, and applies records from Sync into changes on the underlying |
|
243 * store. |
|
244 * |
|
245 * Store implementations require a number of functions to be implemented. These |
|
246 * are all documented below. |
|
247 * |
|
248 * For stores that deal with many records or which have expensive store access |
|
249 * routines, it is highly recommended to implement a custom applyIncomingBatch |
|
250 * and/or applyIncoming function on top of the basic APIs. |
|
251 */ |
|
252 |
|
253 this.Store = function Store(name, engine) { |
|
254 if (!engine) { |
|
255 throw new Error("Store must be associated with an Engine instance."); |
|
256 } |
|
257 |
|
258 name = name || "Unnamed"; |
|
259 this.name = name.toLowerCase(); |
|
260 this.engine = engine; |
|
261 |
|
262 this._log = Log.repository.getLogger("Sync.Store." + name); |
|
263 let level = Svc.Prefs.get("log.logger.engine." + this.name, "Debug"); |
|
264 this._log.level = Log.Level[level]; |
|
265 |
|
266 XPCOMUtils.defineLazyGetter(this, "_timer", function() { |
|
267 return Cc["@mozilla.org/timer;1"].createInstance(Ci.nsITimer); |
|
268 }); |
|
269 } |
|
270 Store.prototype = { |
|
271 |
|
272 _sleep: function _sleep(delay) { |
|
273 let cb = Async.makeSyncCallback(); |
|
274 this._timer.initWithCallback(cb, delay, Ci.nsITimer.TYPE_ONE_SHOT); |
|
275 Async.waitForSyncCallback(cb); |
|
276 }, |
|
277 |
|
278 /** |
|
279 * Apply multiple incoming records against the store. |
|
280 * |
|
281 * This is called with a set of incoming records to process. The function |
|
282 * should look at each record, reconcile with the current local state, and |
|
283 * make the local changes required to bring its state in alignment with the |
|
284 * record. |
|
285 * |
|
286 * The default implementation simply iterates over all records and calls |
|
287 * applyIncoming(). Store implementations may overwrite this function |
|
288 * if desired. |
|
289 * |
|
290 * @param records Array of records to apply |
|
291 * @return Array of record IDs which did not apply cleanly |
|
292 */ |
|
293 applyIncomingBatch: function (records) { |
|
294 let failed = []; |
|
295 for each (let record in records) { |
|
296 try { |
|
297 this.applyIncoming(record); |
|
298 } catch (ex if (ex.code == Engine.prototype.eEngineAbortApplyIncoming)) { |
|
299 // This kind of exception should have a 'cause' attribute, which is an |
|
300 // originating exception. |
|
301 // ex.cause will carry its stack with it when rethrown. |
|
302 throw ex.cause; |
|
303 } catch (ex) { |
|
304 this._log.warn("Failed to apply incoming record " + record.id); |
|
305 this._log.warn("Encountered exception: " + Utils.exceptionStr(ex)); |
|
306 failed.push(record.id); |
|
307 } |
|
308 }; |
|
309 return failed; |
|
310 }, |
|
311 |
|
312 /** |
|
313 * Apply a single record against the store. |
|
314 * |
|
315 * This takes a single record and makes the local changes required so the |
|
316 * local state matches what's in the record. |
|
317 * |
|
318 * The default implementation calls one of remove(), create(), or update() |
|
319 * depending on the state obtained from the store itself. Store |
|
320 * implementations may overwrite this function if desired. |
|
321 * |
|
322 * @param record |
|
323 * Record to apply |
|
324 */ |
|
325 applyIncoming: function (record) { |
|
326 if (record.deleted) |
|
327 this.remove(record); |
|
328 else if (!this.itemExists(record.id)) |
|
329 this.create(record); |
|
330 else |
|
331 this.update(record); |
|
332 }, |
|
333 |
|
334 // override these in derived objects |
|
335 |
|
336 /** |
|
337 * Create an item in the store from a record. |
|
338 * |
|
339 * This is called by the default implementation of applyIncoming(). If using |
|
340 * applyIncomingBatch(), this won't be called unless your store calls it. |
|
341 * |
|
342 * @param record |
|
343 * The store record to create an item from |
|
344 */ |
|
345 create: function (record) { |
|
346 throw "override create in a subclass"; |
|
347 }, |
|
348 |
|
349 /** |
|
350 * Remove an item in the store from a record. |
|
351 * |
|
352 * This is called by the default implementation of applyIncoming(). If using |
|
353 * applyIncomingBatch(), this won't be called unless your store calls it. |
|
354 * |
|
355 * @param record |
|
356 * The store record to delete an item from |
|
357 */ |
|
358 remove: function (record) { |
|
359 throw "override remove in a subclass"; |
|
360 }, |
|
361 |
|
362 /** |
|
363 * Update an item from a record. |
|
364 * |
|
365 * This is called by the default implementation of applyIncoming(). If using |
|
366 * applyIncomingBatch(), this won't be called unless your store calls it. |
|
367 * |
|
368 * @param record |
|
369 * The record to use to update an item from |
|
370 */ |
|
371 update: function (record) { |
|
372 throw "override update in a subclass"; |
|
373 }, |
|
374 |
|
375 /** |
|
376 * Determine whether a record with the specified ID exists. |
|
377 * |
|
378 * Takes a string record ID and returns a booleans saying whether the record |
|
379 * exists. |
|
380 * |
|
381 * @param id |
|
382 * string record ID |
|
383 * @return boolean indicating whether record exists locally |
|
384 */ |
|
385 itemExists: function (id) { |
|
386 throw "override itemExists in a subclass"; |
|
387 }, |
|
388 |
|
389 /** |
|
390 * Create a record from the specified ID. |
|
391 * |
|
392 * If the ID is known, the record should be populated with metadata from |
|
393 * the store. If the ID is not known, the record should be created with the |
|
394 * delete field set to true. |
|
395 * |
|
396 * @param id |
|
397 * string record ID |
|
398 * @param collection |
|
399 * Collection to add record to. This is typically passed into the |
|
400 * constructor for the newly-created record. |
|
401 * @return record type for this engine |
|
402 */ |
|
403 createRecord: function (id, collection) { |
|
404 throw "override createRecord in a subclass"; |
|
405 }, |
|
406 |
|
407 /** |
|
408 * Change the ID of a record. |
|
409 * |
|
410 * @param oldID |
|
411 * string old/current record ID |
|
412 * @param newID |
|
413 * string new record ID |
|
414 */ |
|
415 changeItemID: function (oldID, newID) { |
|
416 throw "override changeItemID in a subclass"; |
|
417 }, |
|
418 |
|
419 /** |
|
420 * Obtain the set of all known record IDs. |
|
421 * |
|
422 * @return Object with ID strings as keys and values of true. The values |
|
423 * are ignored. |
|
424 */ |
|
425 getAllIDs: function () { |
|
426 throw "override getAllIDs in a subclass"; |
|
427 }, |
|
428 |
|
429 /** |
|
430 * Wipe all data in the store. |
|
431 * |
|
432 * This function is called during remote wipes or when replacing local data |
|
433 * with remote data. |
|
434 * |
|
435 * This function should delete all local data that the store is managing. It |
|
436 * can be thought of as clearing out all state and restoring the "new |
|
437 * browser" state. |
|
438 */ |
|
439 wipe: function () { |
|
440 throw "override wipe in a subclass"; |
|
441 } |
|
442 }; |
|
443 |
|
444 this.EngineManager = function EngineManager(service) { |
|
445 this.service = service; |
|
446 |
|
447 this._engines = {}; |
|
448 |
|
449 // This will be populated by Service on startup. |
|
450 this._declined = new Set(); |
|
451 this._log = Log.repository.getLogger("Sync.EngineManager"); |
|
452 this._log.level = Log.Level[Svc.Prefs.get("log.logger.service.engines", "Debug")]; |
|
453 } |
|
454 EngineManager.prototype = { |
|
455 get: function (name) { |
|
456 // Return an array of engines if we have an array of names |
|
457 if (Array.isArray(name)) { |
|
458 let engines = []; |
|
459 name.forEach(function(name) { |
|
460 let engine = this.get(name); |
|
461 if (engine) { |
|
462 engines.push(engine); |
|
463 } |
|
464 }, this); |
|
465 return engines; |
|
466 } |
|
467 |
|
468 let engine = this._engines[name]; |
|
469 if (!engine) { |
|
470 this._log.debug("Could not get engine: " + name); |
|
471 if (Object.keys) { |
|
472 this._log.debug("Engines are: " + JSON.stringify(Object.keys(this._engines))); |
|
473 } |
|
474 } |
|
475 return engine; |
|
476 }, |
|
477 |
|
478 getAll: function () { |
|
479 return [engine for ([name, engine] in Iterator(this._engines))]; |
|
480 }, |
|
481 |
|
482 /** |
|
483 * N.B., does not pay attention to the declined list. |
|
484 */ |
|
485 getEnabled: function () { |
|
486 return this.getAll().filter(function(engine) engine.enabled); |
|
487 }, |
|
488 |
|
489 get enabledEngineNames() { |
|
490 return [e.name for each (e in this.getEnabled())]; |
|
491 }, |
|
492 |
|
493 persistDeclined: function () { |
|
494 Svc.Prefs.set("declinedEngines", [...this._declined].join(",")); |
|
495 }, |
|
496 |
|
497 /** |
|
498 * Returns an array. |
|
499 */ |
|
500 getDeclined: function () { |
|
501 return [...this._declined]; |
|
502 }, |
|
503 |
|
504 setDeclined: function (engines) { |
|
505 this._declined = new Set(engines); |
|
506 this.persistDeclined(); |
|
507 }, |
|
508 |
|
509 isDeclined: function (engineName) { |
|
510 return this._declined.has(engineName); |
|
511 }, |
|
512 |
|
513 /** |
|
514 * Accepts a Set or an array. |
|
515 */ |
|
516 decline: function (engines) { |
|
517 for (let e of engines) { |
|
518 this._declined.add(e); |
|
519 } |
|
520 this.persistDeclined(); |
|
521 }, |
|
522 |
|
523 undecline: function (engines) { |
|
524 for (let e of engines) { |
|
525 this._declined.delete(e); |
|
526 } |
|
527 this.persistDeclined(); |
|
528 }, |
|
529 |
|
530 /** |
|
531 * Mark any non-enabled engines as declined. |
|
532 * |
|
533 * This is useful after initial customization during setup. |
|
534 */ |
|
535 declineDisabled: function () { |
|
536 for (let e of this.getAll()) { |
|
537 if (!e.enabled) { |
|
538 this._log.debug("Declining disabled engine " + e.name); |
|
539 this._declined.add(e.name); |
|
540 } |
|
541 } |
|
542 this.persistDeclined(); |
|
543 }, |
|
544 |
|
545 /** |
|
546 * Register an Engine to the service. Alternatively, give an array of engine |
|
547 * objects to register. |
|
548 * |
|
549 * @param engineObject |
|
550 * Engine object used to get an instance of the engine |
|
551 * @return The engine object if anything failed |
|
552 */ |
|
553 register: function (engineObject) { |
|
554 if (Array.isArray(engineObject)) { |
|
555 return engineObject.map(this.register, this); |
|
556 } |
|
557 |
|
558 try { |
|
559 let engine = new engineObject(this.service); |
|
560 let name = engine.name; |
|
561 if (name in this._engines) { |
|
562 this._log.error("Engine '" + name + "' is already registered!"); |
|
563 } else { |
|
564 this._engines[name] = engine; |
|
565 } |
|
566 } catch (ex) { |
|
567 this._log.error(CommonUtils.exceptionStr(ex)); |
|
568 |
|
569 let mesg = ex.message ? ex.message : ex; |
|
570 let name = engineObject || ""; |
|
571 name = name.prototype || ""; |
|
572 name = name.name || ""; |
|
573 |
|
574 let out = "Could not initialize engine '" + name + "': " + mesg; |
|
575 this._log.error(out); |
|
576 |
|
577 return engineObject; |
|
578 } |
|
579 }, |
|
580 |
|
581 unregister: function (val) { |
|
582 let name = val; |
|
583 if (val instanceof Engine) { |
|
584 name = val.name; |
|
585 } |
|
586 delete this._engines[name]; |
|
587 }, |
|
588 |
|
589 clear: function () { |
|
590 for (let name in this._engines) { |
|
591 delete this._engines[name]; |
|
592 } |
|
593 }, |
|
594 }; |
|
595 |
|
596 this.Engine = function Engine(name, service) { |
|
597 if (!service) { |
|
598 throw new Error("Engine must be associated with a Service instance."); |
|
599 } |
|
600 |
|
601 this.Name = name || "Unnamed"; |
|
602 this.name = name.toLowerCase(); |
|
603 this.service = service; |
|
604 |
|
605 this._notify = Utils.notify("weave:engine:"); |
|
606 this._log = Log.repository.getLogger("Sync.Engine." + this.Name); |
|
607 let level = Svc.Prefs.get("log.logger.engine." + this.name, "Debug"); |
|
608 this._log.level = Log.Level[level]; |
|
609 |
|
610 this._tracker; // initialize tracker to load previously changed IDs |
|
611 this._log.debug("Engine initialized"); |
|
612 } |
|
613 Engine.prototype = { |
|
614 // _storeObj, and _trackerObj should to be overridden in subclasses |
|
615 _storeObj: Store, |
|
616 _trackerObj: Tracker, |
|
617 |
|
618 // Local 'constant'. |
|
619 // Signal to the engine that processing further records is pointless. |
|
620 eEngineAbortApplyIncoming: "error.engine.abort.applyincoming", |
|
621 |
|
622 get prefName() this.name, |
|
623 get enabled() { |
|
624 return Svc.Prefs.get("engine." + this.prefName, false); |
|
625 }, |
|
626 |
|
627 set enabled(val) { |
|
628 Svc.Prefs.set("engine." + this.prefName, !!val); |
|
629 this._tracker.onEngineEnabledChanged(val); |
|
630 }, |
|
631 |
|
632 get score() this._tracker.score, |
|
633 |
|
634 get _store() { |
|
635 let store = new this._storeObj(this.Name, this); |
|
636 this.__defineGetter__("_store", function() store); |
|
637 return store; |
|
638 }, |
|
639 |
|
640 get _tracker() { |
|
641 let tracker = new this._trackerObj(this.Name, this); |
|
642 this.__defineGetter__("_tracker", function() tracker); |
|
643 return tracker; |
|
644 }, |
|
645 |
|
646 sync: function () { |
|
647 if (!this.enabled) { |
|
648 return; |
|
649 } |
|
650 |
|
651 if (!this._sync) { |
|
652 throw "engine does not implement _sync method"; |
|
653 } |
|
654 |
|
655 this._notify("sync", this.name, this._sync)(); |
|
656 }, |
|
657 |
|
658 /** |
|
659 * Get rid of any local meta-data. |
|
660 */ |
|
661 resetClient: function () { |
|
662 if (!this._resetClient) { |
|
663 throw "engine does not implement _resetClient method"; |
|
664 } |
|
665 |
|
666 this._notify("reset-client", this.name, this._resetClient)(); |
|
667 }, |
|
668 |
|
669 _wipeClient: function () { |
|
670 this.resetClient(); |
|
671 this._log.debug("Deleting all local data"); |
|
672 this._tracker.ignoreAll = true; |
|
673 this._store.wipe(); |
|
674 this._tracker.ignoreAll = false; |
|
675 this._tracker.clearChangedIDs(); |
|
676 }, |
|
677 |
|
678 wipeClient: function () { |
|
679 this._notify("wipe-client", this.name, this._wipeClient)(); |
|
680 } |
|
681 }; |
|
682 |
|
683 this.SyncEngine = function SyncEngine(name, service) { |
|
684 Engine.call(this, name || "SyncEngine", service); |
|
685 |
|
686 this.loadToFetch(); |
|
687 this.loadPreviousFailed(); |
|
688 } |
|
689 |
|
690 // Enumeration to define approaches to handling bad records. |
|
691 // Attached to the constructor to allow use as a kind of static enumeration. |
|
692 SyncEngine.kRecoveryStrategy = { |
|
693 ignore: "ignore", |
|
694 retry: "retry", |
|
695 error: "error" |
|
696 }; |
|
697 |
|
698 SyncEngine.prototype = { |
|
699 __proto__: Engine.prototype, |
|
700 _recordObj: CryptoWrapper, |
|
701 version: 1, |
|
702 |
|
703 // How many records to pull in a single sync. This is primarily to avoid very |
|
704 // long first syncs against profiles with many history records. |
|
705 downloadLimit: null, |
|
706 |
|
707 // How many records to pull at one time when specifying IDs. This is to avoid |
|
708 // URI length limitations. |
|
709 guidFetchBatchSize: DEFAULT_GUID_FETCH_BATCH_SIZE, |
|
710 mobileGUIDFetchBatchSize: DEFAULT_MOBILE_GUID_FETCH_BATCH_SIZE, |
|
711 |
|
712 // How many records to process in a single batch. |
|
713 applyIncomingBatchSize: DEFAULT_STORE_BATCH_SIZE, |
|
714 |
|
715 get storageURL() this.service.storageURL, |
|
716 |
|
717 get engineURL() this.storageURL + this.name, |
|
718 |
|
719 get cryptoKeysURL() this.storageURL + "crypto/keys", |
|
720 |
|
721 get metaURL() this.storageURL + "meta/global", |
|
722 |
|
723 get syncID() { |
|
724 // Generate a random syncID if we don't have one |
|
725 let syncID = Svc.Prefs.get(this.name + ".syncID", ""); |
|
726 return syncID == "" ? this.syncID = Utils.makeGUID() : syncID; |
|
727 }, |
|
728 set syncID(value) { |
|
729 Svc.Prefs.set(this.name + ".syncID", value); |
|
730 }, |
|
731 |
|
732 /* |
|
733 * lastSync is a timestamp in server time. |
|
734 */ |
|
735 get lastSync() { |
|
736 return parseFloat(Svc.Prefs.get(this.name + ".lastSync", "0")); |
|
737 }, |
|
738 set lastSync(value) { |
|
739 // Reset the pref in-case it's a number instead of a string |
|
740 Svc.Prefs.reset(this.name + ".lastSync"); |
|
741 // Store the value as a string to keep floating point precision |
|
742 Svc.Prefs.set(this.name + ".lastSync", value.toString()); |
|
743 }, |
|
744 resetLastSync: function () { |
|
745 this._log.debug("Resetting " + this.name + " last sync time"); |
|
746 Svc.Prefs.reset(this.name + ".lastSync"); |
|
747 Svc.Prefs.set(this.name + ".lastSync", "0"); |
|
748 this.lastSyncLocal = 0; |
|
749 }, |
|
750 |
|
751 get toFetch() this._toFetch, |
|
752 set toFetch(val) { |
|
753 let cb = (error) => this._log.error(Utils.exceptionStr(error)); |
|
754 // Coerce the array to a string for more efficient comparison. |
|
755 if (val + "" == this._toFetch) { |
|
756 return; |
|
757 } |
|
758 this._toFetch = val; |
|
759 Utils.namedTimer(function () { |
|
760 Utils.jsonSave("toFetch/" + this.name, this, val, cb); |
|
761 }, 0, this, "_toFetchDelay"); |
|
762 }, |
|
763 |
|
764 loadToFetch: function () { |
|
765 // Initialize to empty if there's no file. |
|
766 this._toFetch = []; |
|
767 Utils.jsonLoad("toFetch/" + this.name, this, function(toFetch) { |
|
768 if (toFetch) { |
|
769 this._toFetch = toFetch; |
|
770 } |
|
771 }); |
|
772 }, |
|
773 |
|
774 get previousFailed() this._previousFailed, |
|
775 set previousFailed(val) { |
|
776 let cb = (error) => this._log.error(Utils.exceptionStr(error)); |
|
777 // Coerce the array to a string for more efficient comparison. |
|
778 if (val + "" == this._previousFailed) { |
|
779 return; |
|
780 } |
|
781 this._previousFailed = val; |
|
782 Utils.namedTimer(function () { |
|
783 Utils.jsonSave("failed/" + this.name, this, val, cb); |
|
784 }, 0, this, "_previousFailedDelay"); |
|
785 }, |
|
786 |
|
787 loadPreviousFailed: function () { |
|
788 // Initialize to empty if there's no file |
|
789 this._previousFailed = []; |
|
790 Utils.jsonLoad("failed/" + this.name, this, function(previousFailed) { |
|
791 if (previousFailed) { |
|
792 this._previousFailed = previousFailed; |
|
793 } |
|
794 }); |
|
795 }, |
|
796 |
|
797 /* |
|
798 * lastSyncLocal is a timestamp in local time. |
|
799 */ |
|
800 get lastSyncLocal() { |
|
801 return parseInt(Svc.Prefs.get(this.name + ".lastSyncLocal", "0"), 10); |
|
802 }, |
|
803 set lastSyncLocal(value) { |
|
804 // Store as a string because pref can only store C longs as numbers. |
|
805 Svc.Prefs.set(this.name + ".lastSyncLocal", value.toString()); |
|
806 }, |
|
807 |
|
808 /* |
|
809 * Returns a mapping of IDs -> changed timestamp. Engine implementations |
|
810 * can override this method to bypass the tracker for certain or all |
|
811 * changed items. |
|
812 */ |
|
813 getChangedIDs: function () { |
|
814 return this._tracker.changedIDs; |
|
815 }, |
|
816 |
|
817 // Create a new record using the store and add in crypto fields. |
|
818 _createRecord: function (id) { |
|
819 let record = this._store.createRecord(id, this.name); |
|
820 record.id = id; |
|
821 record.collection = this.name; |
|
822 return record; |
|
823 }, |
|
824 |
|
825 // Any setup that needs to happen at the beginning of each sync. |
|
826 _syncStartup: function () { |
|
827 |
|
828 // Determine if we need to wipe on outdated versions |
|
829 let metaGlobal = this.service.recordManager.get(this.metaURL); |
|
830 let engines = metaGlobal.payload.engines || {}; |
|
831 let engineData = engines[this.name] || {}; |
|
832 |
|
833 let needsWipe = false; |
|
834 |
|
835 // Assume missing versions are 0 and wipe the server |
|
836 if ((engineData.version || 0) < this.version) { |
|
837 this._log.debug("Old engine data: " + [engineData.version, this.version]); |
|
838 |
|
839 // Prepare to clear the server and upload everything |
|
840 needsWipe = true; |
|
841 this.syncID = ""; |
|
842 |
|
843 // Set the newer version and newly generated syncID |
|
844 engineData.version = this.version; |
|
845 engineData.syncID = this.syncID; |
|
846 |
|
847 // Put the new data back into meta/global and mark for upload |
|
848 engines[this.name] = engineData; |
|
849 metaGlobal.payload.engines = engines; |
|
850 metaGlobal.changed = true; |
|
851 } |
|
852 // Don't sync this engine if the server has newer data |
|
853 else if (engineData.version > this.version) { |
|
854 let error = new String("New data: " + [engineData.version, this.version]); |
|
855 error.failureCode = VERSION_OUT_OF_DATE; |
|
856 throw error; |
|
857 } |
|
858 // Changes to syncID mean we'll need to upload everything |
|
859 else if (engineData.syncID != this.syncID) { |
|
860 this._log.debug("Engine syncIDs: " + [engineData.syncID, this.syncID]); |
|
861 this.syncID = engineData.syncID; |
|
862 this._resetClient(); |
|
863 }; |
|
864 |
|
865 // Delete any existing data and reupload on bad version or missing meta. |
|
866 // No crypto component here...? We could regenerate per-collection keys... |
|
867 if (needsWipe) { |
|
868 this.wipeServer(); |
|
869 } |
|
870 |
|
871 // Save objects that need to be uploaded in this._modified. We also save |
|
872 // the timestamp of this fetch in this.lastSyncLocal. As we successfully |
|
873 // upload objects we remove them from this._modified. If an error occurs |
|
874 // or any objects fail to upload, they will remain in this._modified. At |
|
875 // the end of a sync, or after an error, we add all objects remaining in |
|
876 // this._modified to the tracker. |
|
877 this.lastSyncLocal = Date.now(); |
|
878 if (this.lastSync) { |
|
879 this._modified = this.getChangedIDs(); |
|
880 } else { |
|
881 // Mark all items to be uploaded, but treat them as changed from long ago |
|
882 this._log.debug("First sync, uploading all items"); |
|
883 this._modified = {}; |
|
884 for (let id in this._store.getAllIDs()) { |
|
885 this._modified[id] = 0; |
|
886 } |
|
887 } |
|
888 // Clear the tracker now. If the sync fails we'll add the ones we failed |
|
889 // to upload back. |
|
890 this._tracker.clearChangedIDs(); |
|
891 |
|
892 this._log.info(Object.keys(this._modified).length + |
|
893 " outgoing items pre-reconciliation"); |
|
894 |
|
895 // Keep track of what to delete at the end of sync |
|
896 this._delete = {}; |
|
897 }, |
|
898 |
|
899 /** |
|
900 * A tiny abstraction to make it easier to test incoming record |
|
901 * application. |
|
902 */ |
|
903 _itemSource: function () { |
|
904 return new Collection(this.engineURL, this._recordObj, this.service); |
|
905 }, |
|
906 |
|
907 /** |
|
908 * Process incoming records. |
|
909 * In the most awful and untestable way possible. |
|
910 * This now accepts something that makes testing vaguely less impossible. |
|
911 */ |
|
912 _processIncoming: function (newitems) { |
|
913 this._log.trace("Downloading & applying server changes"); |
|
914 |
|
915 // Figure out how many total items to fetch this sync; do less on mobile. |
|
916 let batchSize = this.downloadLimit || Infinity; |
|
917 let isMobile = (Svc.Prefs.get("client.type") == "mobile"); |
|
918 |
|
919 if (!newitems) { |
|
920 newitems = this._itemSource(); |
|
921 } |
|
922 |
|
923 if (isMobile) { |
|
924 batchSize = MOBILE_BATCH_SIZE; |
|
925 } |
|
926 newitems.newer = this.lastSync; |
|
927 newitems.full = true; |
|
928 newitems.limit = batchSize; |
|
929 |
|
930 // applied => number of items that should be applied. |
|
931 // failed => number of items that failed in this sync. |
|
932 // newFailed => number of items that failed for the first time in this sync. |
|
933 // reconciled => number of items that were reconciled. |
|
934 let count = {applied: 0, failed: 0, newFailed: 0, reconciled: 0}; |
|
935 let handled = []; |
|
936 let applyBatch = []; |
|
937 let failed = []; |
|
938 let failedInPreviousSync = this.previousFailed; |
|
939 let fetchBatch = Utils.arrayUnion(this.toFetch, failedInPreviousSync); |
|
940 // Reset previousFailed for each sync since previously failed items may not fail again. |
|
941 this.previousFailed = []; |
|
942 |
|
943 // Used (via exceptions) to allow the record handler/reconciliation/etc. |
|
944 // methods to signal that they would like processing of incoming records to |
|
945 // cease. |
|
946 let aborting = undefined; |
|
947 |
|
948 function doApplyBatch() { |
|
949 this._tracker.ignoreAll = true; |
|
950 try { |
|
951 failed = failed.concat(this._store.applyIncomingBatch(applyBatch)); |
|
952 } catch (ex) { |
|
953 // Catch any error that escapes from applyIncomingBatch. At present |
|
954 // those will all be abort events. |
|
955 this._log.warn("Got exception " + Utils.exceptionStr(ex) + |
|
956 ", aborting processIncoming."); |
|
957 aborting = ex; |
|
958 } |
|
959 this._tracker.ignoreAll = false; |
|
960 applyBatch = []; |
|
961 } |
|
962 |
|
963 function doApplyBatchAndPersistFailed() { |
|
964 // Apply remaining batch. |
|
965 if (applyBatch.length) { |
|
966 doApplyBatch.call(this); |
|
967 } |
|
968 // Persist failed items so we refetch them. |
|
969 if (failed.length) { |
|
970 this.previousFailed = Utils.arrayUnion(failed, this.previousFailed); |
|
971 count.failed += failed.length; |
|
972 this._log.debug("Records that failed to apply: " + failed); |
|
973 failed = []; |
|
974 } |
|
975 } |
|
976 |
|
977 let key = this.service.collectionKeys.keyForCollection(this.name); |
|
978 |
|
979 // Not binding this method to 'this' for performance reasons. It gets |
|
980 // called for every incoming record. |
|
981 let self = this; |
|
982 |
|
983 newitems.recordHandler = function(item) { |
|
984 if (aborting) { |
|
985 return; |
|
986 } |
|
987 |
|
988 // Grab a later last modified if possible |
|
989 if (self.lastModified == null || item.modified > self.lastModified) |
|
990 self.lastModified = item.modified; |
|
991 |
|
992 // Track the collection for the WBO. |
|
993 item.collection = self.name; |
|
994 |
|
995 // Remember which records were processed |
|
996 handled.push(item.id); |
|
997 |
|
998 try { |
|
999 try { |
|
1000 item.decrypt(key); |
|
1001 } catch (ex if Utils.isHMACMismatch(ex)) { |
|
1002 let strategy = self.handleHMACMismatch(item, true); |
|
1003 if (strategy == SyncEngine.kRecoveryStrategy.retry) { |
|
1004 // You only get one retry. |
|
1005 try { |
|
1006 // Try decrypting again, typically because we've got new keys. |
|
1007 self._log.info("Trying decrypt again..."); |
|
1008 key = self.service.collectionKeys.keyForCollection(self.name); |
|
1009 item.decrypt(key); |
|
1010 strategy = null; |
|
1011 } catch (ex if Utils.isHMACMismatch(ex)) { |
|
1012 strategy = self.handleHMACMismatch(item, false); |
|
1013 } |
|
1014 } |
|
1015 |
|
1016 switch (strategy) { |
|
1017 case null: |
|
1018 // Retry succeeded! No further handling. |
|
1019 break; |
|
1020 case SyncEngine.kRecoveryStrategy.retry: |
|
1021 self._log.debug("Ignoring second retry suggestion."); |
|
1022 // Fall through to error case. |
|
1023 case SyncEngine.kRecoveryStrategy.error: |
|
1024 self._log.warn("Error decrypting record: " + Utils.exceptionStr(ex)); |
|
1025 failed.push(item.id); |
|
1026 return; |
|
1027 case SyncEngine.kRecoveryStrategy.ignore: |
|
1028 self._log.debug("Ignoring record " + item.id + |
|
1029 " with bad HMAC: already handled."); |
|
1030 return; |
|
1031 } |
|
1032 } |
|
1033 } catch (ex) { |
|
1034 self._log.warn("Error decrypting record: " + Utils.exceptionStr(ex)); |
|
1035 failed.push(item.id); |
|
1036 return; |
|
1037 } |
|
1038 |
|
1039 let shouldApply; |
|
1040 try { |
|
1041 shouldApply = self._reconcile(item); |
|
1042 } catch (ex if (ex.code == Engine.prototype.eEngineAbortApplyIncoming)) { |
|
1043 self._log.warn("Reconciliation failed: aborting incoming processing."); |
|
1044 failed.push(item.id); |
|
1045 aborting = ex.cause; |
|
1046 } catch (ex) { |
|
1047 self._log.warn("Failed to reconcile incoming record " + item.id); |
|
1048 self._log.warn("Encountered exception: " + Utils.exceptionStr(ex)); |
|
1049 failed.push(item.id); |
|
1050 return; |
|
1051 } |
|
1052 |
|
1053 if (shouldApply) { |
|
1054 count.applied++; |
|
1055 applyBatch.push(item); |
|
1056 } else { |
|
1057 count.reconciled++; |
|
1058 self._log.trace("Skipping reconciled incoming item " + item.id); |
|
1059 } |
|
1060 |
|
1061 if (applyBatch.length == self.applyIncomingBatchSize) { |
|
1062 doApplyBatch.call(self); |
|
1063 } |
|
1064 self._store._sleep(0); |
|
1065 }; |
|
1066 |
|
1067 // Only bother getting data from the server if there's new things |
|
1068 if (this.lastModified == null || this.lastModified > this.lastSync) { |
|
1069 let resp = newitems.get(); |
|
1070 doApplyBatchAndPersistFailed.call(this); |
|
1071 if (!resp.success) { |
|
1072 resp.failureCode = ENGINE_DOWNLOAD_FAIL; |
|
1073 throw resp; |
|
1074 } |
|
1075 |
|
1076 if (aborting) { |
|
1077 throw aborting; |
|
1078 } |
|
1079 } |
|
1080 |
|
1081 // Mobile: check if we got the maximum that we requested; get the rest if so. |
|
1082 if (handled.length == newitems.limit) { |
|
1083 let guidColl = new Collection(this.engineURL, null, this.service); |
|
1084 |
|
1085 // Sort and limit so that on mobile we only get the last X records. |
|
1086 guidColl.limit = this.downloadLimit; |
|
1087 guidColl.newer = this.lastSync; |
|
1088 |
|
1089 // index: Orders by the sortindex descending (highest weight first). |
|
1090 guidColl.sort = "index"; |
|
1091 |
|
1092 let guids = guidColl.get(); |
|
1093 if (!guids.success) |
|
1094 throw guids; |
|
1095 |
|
1096 // Figure out which guids weren't just fetched then remove any guids that |
|
1097 // were already waiting and prepend the new ones |
|
1098 let extra = Utils.arraySub(guids.obj, handled); |
|
1099 if (extra.length > 0) { |
|
1100 fetchBatch = Utils.arrayUnion(extra, fetchBatch); |
|
1101 this.toFetch = Utils.arrayUnion(extra, this.toFetch); |
|
1102 } |
|
1103 } |
|
1104 |
|
1105 // Fast-foward the lastSync timestamp since we have stored the |
|
1106 // remaining items in toFetch. |
|
1107 if (this.lastSync < this.lastModified) { |
|
1108 this.lastSync = this.lastModified; |
|
1109 } |
|
1110 |
|
1111 // Process any backlog of GUIDs. |
|
1112 // At this point we impose an upper limit on the number of items to fetch |
|
1113 // in a single request, even for desktop, to avoid hitting URI limits. |
|
1114 batchSize = isMobile ? this.mobileGUIDFetchBatchSize : |
|
1115 this.guidFetchBatchSize; |
|
1116 |
|
1117 while (fetchBatch.length && !aborting) { |
|
1118 // Reuse the original query, but get rid of the restricting params |
|
1119 // and batch remaining records. |
|
1120 newitems.limit = 0; |
|
1121 newitems.newer = 0; |
|
1122 newitems.ids = fetchBatch.slice(0, batchSize); |
|
1123 |
|
1124 // Reuse the existing record handler set earlier |
|
1125 let resp = newitems.get(); |
|
1126 if (!resp.success) { |
|
1127 resp.failureCode = ENGINE_DOWNLOAD_FAIL; |
|
1128 throw resp; |
|
1129 } |
|
1130 |
|
1131 // This batch was successfully applied. Not using |
|
1132 // doApplyBatchAndPersistFailed() here to avoid writing toFetch twice. |
|
1133 fetchBatch = fetchBatch.slice(batchSize); |
|
1134 this.toFetch = Utils.arraySub(this.toFetch, newitems.ids); |
|
1135 this.previousFailed = Utils.arrayUnion(this.previousFailed, failed); |
|
1136 if (failed.length) { |
|
1137 count.failed += failed.length; |
|
1138 this._log.debug("Records that failed to apply: " + failed); |
|
1139 } |
|
1140 failed = []; |
|
1141 |
|
1142 if (aborting) { |
|
1143 throw aborting; |
|
1144 } |
|
1145 |
|
1146 if (this.lastSync < this.lastModified) { |
|
1147 this.lastSync = this.lastModified; |
|
1148 } |
|
1149 } |
|
1150 |
|
1151 // Apply remaining items. |
|
1152 doApplyBatchAndPersistFailed.call(this); |
|
1153 |
|
1154 count.newFailed = Utils.arraySub(this.previousFailed, failedInPreviousSync).length; |
|
1155 count.succeeded = Math.max(0, count.applied - count.failed); |
|
1156 this._log.info(["Records:", |
|
1157 count.applied, "applied,", |
|
1158 count.succeeded, "successfully,", |
|
1159 count.failed, "failed to apply,", |
|
1160 count.newFailed, "newly failed to apply,", |
|
1161 count.reconciled, "reconciled."].join(" ")); |
|
1162 Observers.notify("weave:engine:sync:applied", count, this.name); |
|
1163 }, |
|
1164 |
|
1165 /** |
|
1166 * Find a GUID of an item that is a duplicate of the incoming item but happens |
|
1167 * to have a different GUID |
|
1168 * |
|
1169 * @return GUID of the similar item; falsy otherwise |
|
1170 */ |
|
1171 _findDupe: function (item) { |
|
1172 // By default, assume there's no dupe items for the engine |
|
1173 }, |
|
1174 |
|
1175 _deleteId: function (id) { |
|
1176 this._tracker.removeChangedID(id); |
|
1177 |
|
1178 // Remember this id to delete at the end of sync |
|
1179 if (this._delete.ids == null) |
|
1180 this._delete.ids = [id]; |
|
1181 else |
|
1182 this._delete.ids.push(id); |
|
1183 }, |
|
1184 |
|
1185 /** |
|
1186 * Reconcile incoming record with local state. |
|
1187 * |
|
1188 * This function essentially determines whether to apply an incoming record. |
|
1189 * |
|
1190 * @param item |
|
1191 * Record from server to be tested for application. |
|
1192 * @return boolean |
|
1193 * Truthy if incoming record should be applied. False if not. |
|
1194 */ |
|
1195 _reconcile: function (item) { |
|
1196 if (this._log.level <= Log.Level.Trace) { |
|
1197 this._log.trace("Incoming: " + item); |
|
1198 } |
|
1199 |
|
1200 // We start reconciling by collecting a bunch of state. We do this here |
|
1201 // because some state may change during the course of this function and we |
|
1202 // need to operate on the original values. |
|
1203 let existsLocally = this._store.itemExists(item.id); |
|
1204 let locallyModified = item.id in this._modified; |
|
1205 |
|
1206 // TODO Handle clock drift better. Tracked in bug 721181. |
|
1207 let remoteAge = AsyncResource.serverTime - item.modified; |
|
1208 let localAge = locallyModified ? |
|
1209 (Date.now() / 1000 - this._modified[item.id]) : null; |
|
1210 let remoteIsNewer = remoteAge < localAge; |
|
1211 |
|
1212 this._log.trace("Reconciling " + item.id + ". exists=" + |
|
1213 existsLocally + "; modified=" + locallyModified + |
|
1214 "; local age=" + localAge + "; incoming age=" + |
|
1215 remoteAge); |
|
1216 |
|
1217 // We handle deletions first so subsequent logic doesn't have to check |
|
1218 // deleted flags. |
|
1219 if (item.deleted) { |
|
1220 // If the item doesn't exist locally, there is nothing for us to do. We |
|
1221 // can't check for duplicates because the incoming record has no data |
|
1222 // which can be used for duplicate detection. |
|
1223 if (!existsLocally) { |
|
1224 this._log.trace("Ignoring incoming item because it was deleted and " + |
|
1225 "the item does not exist locally."); |
|
1226 return false; |
|
1227 } |
|
1228 |
|
1229 // We decide whether to process the deletion by comparing the record |
|
1230 // ages. If the item is not modified locally, the remote side wins and |
|
1231 // the deletion is processed. If it is modified locally, we take the |
|
1232 // newer record. |
|
1233 if (!locallyModified) { |
|
1234 this._log.trace("Applying incoming delete because the local item " + |
|
1235 "exists and isn't modified."); |
|
1236 return true; |
|
1237 } |
|
1238 |
|
1239 // TODO As part of bug 720592, determine whether we should do more here. |
|
1240 // In the case where the local changes are newer, it is quite possible |
|
1241 // that the local client will restore data a remote client had tried to |
|
1242 // delete. There might be a good reason for that delete and it might be |
|
1243 // enexpected for this client to restore that data. |
|
1244 this._log.trace("Incoming record is deleted but we had local changes. " + |
|
1245 "Applying the youngest record."); |
|
1246 return remoteIsNewer; |
|
1247 } |
|
1248 |
|
1249 // At this point the incoming record is not for a deletion and must have |
|
1250 // data. If the incoming record does not exist locally, we check for a local |
|
1251 // duplicate existing under a different ID. The default implementation of |
|
1252 // _findDupe() is empty, so engines have to opt in to this functionality. |
|
1253 // |
|
1254 // If we find a duplicate, we change the local ID to the incoming ID and we |
|
1255 // refresh the metadata collected above. See bug 710448 for the history |
|
1256 // of this logic. |
|
1257 if (!existsLocally) { |
|
1258 let dupeID = this._findDupe(item); |
|
1259 if (dupeID) { |
|
1260 this._log.trace("Local item " + dupeID + " is a duplicate for " + |
|
1261 "incoming item " + item.id); |
|
1262 |
|
1263 // The local, duplicate ID is always deleted on the server. |
|
1264 this._deleteId(dupeID); |
|
1265 |
|
1266 // The current API contract does not mandate that the ID returned by |
|
1267 // _findDupe() actually exists. Therefore, we have to perform this |
|
1268 // check. |
|
1269 existsLocally = this._store.itemExists(dupeID); |
|
1270 |
|
1271 // We unconditionally change the item's ID in case the engine knows of |
|
1272 // an item but doesn't expose it through itemExists. If the API |
|
1273 // contract were stronger, this could be changed. |
|
1274 this._log.debug("Switching local ID to incoming: " + dupeID + " -> " + |
|
1275 item.id); |
|
1276 this._store.changeItemID(dupeID, item.id); |
|
1277 |
|
1278 // If the local item was modified, we carry its metadata forward so |
|
1279 // appropriate reconciling can be performed. |
|
1280 if (dupeID in this._modified) { |
|
1281 locallyModified = true; |
|
1282 localAge = Date.now() / 1000 - this._modified[dupeID]; |
|
1283 remoteIsNewer = remoteAge < localAge; |
|
1284 |
|
1285 this._modified[item.id] = this._modified[dupeID]; |
|
1286 delete this._modified[dupeID]; |
|
1287 } else { |
|
1288 locallyModified = false; |
|
1289 localAge = null; |
|
1290 } |
|
1291 |
|
1292 this._log.debug("Local item after duplication: age=" + localAge + |
|
1293 "; modified=" + locallyModified + "; exists=" + |
|
1294 existsLocally); |
|
1295 } else { |
|
1296 this._log.trace("No duplicate found for incoming item: " + item.id); |
|
1297 } |
|
1298 } |
|
1299 |
|
1300 // At this point we've performed duplicate detection. But, nothing here |
|
1301 // should depend on duplicate detection as the above should have updated |
|
1302 // state seamlessly. |
|
1303 |
|
1304 if (!existsLocally) { |
|
1305 // If the item doesn't exist locally and we have no local modifications |
|
1306 // to the item (implying that it was not deleted), always apply the remote |
|
1307 // item. |
|
1308 if (!locallyModified) { |
|
1309 this._log.trace("Applying incoming because local item does not exist " + |
|
1310 "and was not deleted."); |
|
1311 return true; |
|
1312 } |
|
1313 |
|
1314 // If the item was modified locally but isn't present, it must have |
|
1315 // been deleted. If the incoming record is younger, we restore from |
|
1316 // that record. |
|
1317 if (remoteIsNewer) { |
|
1318 this._log.trace("Applying incoming because local item was deleted " + |
|
1319 "before the incoming item was changed."); |
|
1320 delete this._modified[item.id]; |
|
1321 return true; |
|
1322 } |
|
1323 |
|
1324 this._log.trace("Ignoring incoming item because the local item's " + |
|
1325 "deletion is newer."); |
|
1326 return false; |
|
1327 } |
|
1328 |
|
1329 // If the remote and local records are the same, there is nothing to be |
|
1330 // done, so we don't do anything. In the ideal world, this logic wouldn't |
|
1331 // be here and the engine would take a record and apply it. The reason we |
|
1332 // want to defer this logic is because it would avoid a redundant and |
|
1333 // possibly expensive dip into the storage layer to query item state. |
|
1334 // This should get addressed in the async rewrite, so we ignore it for now. |
|
1335 let localRecord = this._createRecord(item.id); |
|
1336 let recordsEqual = Utils.deepEquals(item.cleartext, |
|
1337 localRecord.cleartext); |
|
1338 |
|
1339 // If the records are the same, we don't need to do anything. This does |
|
1340 // potentially throw away a local modification time. But, if the records |
|
1341 // are the same, does it matter? |
|
1342 if (recordsEqual) { |
|
1343 this._log.trace("Ignoring incoming item because the local item is " + |
|
1344 "identical."); |
|
1345 |
|
1346 delete this._modified[item.id]; |
|
1347 return false; |
|
1348 } |
|
1349 |
|
1350 // At this point the records are different. |
|
1351 |
|
1352 // If we have no local modifications, always take the server record. |
|
1353 if (!locallyModified) { |
|
1354 this._log.trace("Applying incoming record because no local conflicts."); |
|
1355 return true; |
|
1356 } |
|
1357 |
|
1358 // At this point, records are different and the local record is modified. |
|
1359 // We resolve conflicts by record age, where the newest one wins. This does |
|
1360 // result in data loss and should be handled by giving the engine an |
|
1361 // opportunity to merge the records. Bug 720592 tracks this feature. |
|
1362 this._log.warn("DATA LOSS: Both local and remote changes to record: " + |
|
1363 item.id); |
|
1364 return remoteIsNewer; |
|
1365 }, |
|
1366 |
|
1367 // Upload outgoing records. |
|
1368 _uploadOutgoing: function () { |
|
1369 this._log.trace("Uploading local changes to server."); |
|
1370 |
|
1371 let modifiedIDs = Object.keys(this._modified); |
|
1372 if (modifiedIDs.length) { |
|
1373 this._log.trace("Preparing " + modifiedIDs.length + |
|
1374 " outgoing records"); |
|
1375 |
|
1376 // collection we'll upload |
|
1377 let up = new Collection(this.engineURL, null, this.service); |
|
1378 let count = 0; |
|
1379 |
|
1380 // Upload what we've got so far in the collection |
|
1381 let doUpload = Utils.bind2(this, function(desc) { |
|
1382 this._log.info("Uploading " + desc + " of " + modifiedIDs.length + |
|
1383 " records"); |
|
1384 let resp = up.post(); |
|
1385 if (!resp.success) { |
|
1386 this._log.debug("Uploading records failed: " + resp); |
|
1387 resp.failureCode = ENGINE_UPLOAD_FAIL; |
|
1388 throw resp; |
|
1389 } |
|
1390 |
|
1391 // Update server timestamp from the upload. |
|
1392 let modified = resp.headers["x-weave-timestamp"]; |
|
1393 if (modified > this.lastSync) |
|
1394 this.lastSync = modified; |
|
1395 |
|
1396 let failed_ids = Object.keys(resp.obj.failed); |
|
1397 if (failed_ids.length) |
|
1398 this._log.debug("Records that will be uploaded again because " |
|
1399 + "the server couldn't store them: " |
|
1400 + failed_ids.join(", ")); |
|
1401 |
|
1402 // Clear successfully uploaded objects. |
|
1403 for each (let id in resp.obj.success) { |
|
1404 delete this._modified[id]; |
|
1405 } |
|
1406 |
|
1407 up.clearRecords(); |
|
1408 }); |
|
1409 |
|
1410 for each (let id in modifiedIDs) { |
|
1411 try { |
|
1412 let out = this._createRecord(id); |
|
1413 if (this._log.level <= Log.Level.Trace) |
|
1414 this._log.trace("Outgoing: " + out); |
|
1415 |
|
1416 out.encrypt(this.service.collectionKeys.keyForCollection(this.name)); |
|
1417 up.pushData(out); |
|
1418 } |
|
1419 catch(ex) { |
|
1420 this._log.warn("Error creating record: " + Utils.exceptionStr(ex)); |
|
1421 } |
|
1422 |
|
1423 // Partial upload |
|
1424 if ((++count % MAX_UPLOAD_RECORDS) == 0) |
|
1425 doUpload((count - MAX_UPLOAD_RECORDS) + " - " + count + " out"); |
|
1426 |
|
1427 this._store._sleep(0); |
|
1428 } |
|
1429 |
|
1430 // Final upload |
|
1431 if (count % MAX_UPLOAD_RECORDS > 0) |
|
1432 doUpload(count >= MAX_UPLOAD_RECORDS ? "last batch" : "all"); |
|
1433 } |
|
1434 }, |
|
1435 |
|
1436 // Any cleanup necessary. |
|
1437 // Save the current snapshot so as to calculate changes at next sync |
|
1438 _syncFinish: function () { |
|
1439 this._log.trace("Finishing up sync"); |
|
1440 this._tracker.resetScore(); |
|
1441 |
|
1442 let doDelete = Utils.bind2(this, function(key, val) { |
|
1443 let coll = new Collection(this.engineURL, this._recordObj, this.service); |
|
1444 coll[key] = val; |
|
1445 coll.delete(); |
|
1446 }); |
|
1447 |
|
1448 for (let [key, val] in Iterator(this._delete)) { |
|
1449 // Remove the key for future uses |
|
1450 delete this._delete[key]; |
|
1451 |
|
1452 // Send a simple delete for the property |
|
1453 if (key != "ids" || val.length <= 100) |
|
1454 doDelete(key, val); |
|
1455 else { |
|
1456 // For many ids, split into chunks of at most 100 |
|
1457 while (val.length > 0) { |
|
1458 doDelete(key, val.slice(0, 100)); |
|
1459 val = val.slice(100); |
|
1460 } |
|
1461 } |
|
1462 } |
|
1463 }, |
|
1464 |
|
1465 _syncCleanup: function () { |
|
1466 if (!this._modified) { |
|
1467 return; |
|
1468 } |
|
1469 |
|
1470 // Mark failed WBOs as changed again so they are reuploaded next time. |
|
1471 for (let [id, when] in Iterator(this._modified)) { |
|
1472 this._tracker.addChangedID(id, when); |
|
1473 } |
|
1474 this._modified = {}; |
|
1475 }, |
|
1476 |
|
1477 _sync: function () { |
|
1478 try { |
|
1479 this._syncStartup(); |
|
1480 Observers.notify("weave:engine:sync:status", "process-incoming"); |
|
1481 this._processIncoming(); |
|
1482 Observers.notify("weave:engine:sync:status", "upload-outgoing"); |
|
1483 this._uploadOutgoing(); |
|
1484 this._syncFinish(); |
|
1485 } finally { |
|
1486 this._syncCleanup(); |
|
1487 } |
|
1488 }, |
|
1489 |
|
1490 canDecrypt: function () { |
|
1491 // Report failure even if there's nothing to decrypt |
|
1492 let canDecrypt = false; |
|
1493 |
|
1494 // Fetch the most recently uploaded record and try to decrypt it |
|
1495 let test = new Collection(this.engineURL, this._recordObj, this.service); |
|
1496 test.limit = 1; |
|
1497 test.sort = "newest"; |
|
1498 test.full = true; |
|
1499 |
|
1500 let key = this.service.collectionKeys.keyForCollection(this.name); |
|
1501 test.recordHandler = function recordHandler(record) { |
|
1502 record.decrypt(key); |
|
1503 canDecrypt = true; |
|
1504 }.bind(this); |
|
1505 |
|
1506 // Any failure fetching/decrypting will just result in false |
|
1507 try { |
|
1508 this._log.trace("Trying to decrypt a record from the server.."); |
|
1509 test.get(); |
|
1510 } |
|
1511 catch(ex) { |
|
1512 this._log.debug("Failed test decrypt: " + Utils.exceptionStr(ex)); |
|
1513 } |
|
1514 |
|
1515 return canDecrypt; |
|
1516 }, |
|
1517 |
|
1518 _resetClient: function () { |
|
1519 this.resetLastSync(); |
|
1520 this.previousFailed = []; |
|
1521 this.toFetch = []; |
|
1522 }, |
|
1523 |
|
1524 wipeServer: function () { |
|
1525 let response = this.service.resource(this.engineURL).delete(); |
|
1526 if (response.status != 200 && response.status != 404) { |
|
1527 throw response; |
|
1528 } |
|
1529 this._resetClient(); |
|
1530 }, |
|
1531 |
|
1532 removeClientData: function () { |
|
1533 // Implement this method in engines that store client specific data |
|
1534 // on the server. |
|
1535 }, |
|
1536 |
|
1537 /* |
|
1538 * Decide on (and partially effect) an error-handling strategy. |
|
1539 * |
|
1540 * Asks the Service to respond to an HMAC error, which might result in keys |
|
1541 * being downloaded. That call returns true if an action which might allow a |
|
1542 * retry to occur. |
|
1543 * |
|
1544 * If `mayRetry` is truthy, and the Service suggests a retry, |
|
1545 * handleHMACMismatch returns kRecoveryStrategy.retry. Otherwise, it returns |
|
1546 * kRecoveryStrategy.error. |
|
1547 * |
|
1548 * Subclasses of SyncEngine can override this method to allow for different |
|
1549 * behavior -- e.g., to delete and ignore erroneous entries. |
|
1550 * |
|
1551 * All return values will be part of the kRecoveryStrategy enumeration. |
|
1552 */ |
|
1553 handleHMACMismatch: function (item, mayRetry) { |
|
1554 // By default we either try again, or bail out noisily. |
|
1555 return (this.service.handleHMACEvent() && mayRetry) ? |
|
1556 SyncEngine.kRecoveryStrategy.retry : |
|
1557 SyncEngine.kRecoveryStrategy.error; |
|
1558 } |
|
1559 }; |