Thu, 22 Jan 2015 13:21:57 +0100
Incorporate requested changes from Mozilla in review:
https://bugzilla.mozilla.org/show_bug.cgi?id=1123480#c6
michael@0 | 1 | /* This Source Code Form is subject to the terms of the Mozilla Public |
michael@0 | 2 | * License, v. 2.0. If a copy of the MPL was not distributed with this |
michael@0 | 3 | * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ |
michael@0 | 4 | |
michael@0 | 5 | this.EXPORTED_SYMBOLS = [ |
michael@0 | 6 | "EngineManager", |
michael@0 | 7 | "Engine", |
michael@0 | 8 | "SyncEngine", |
michael@0 | 9 | "Tracker", |
michael@0 | 10 | "Store" |
michael@0 | 11 | ]; |
michael@0 | 12 | |
michael@0 | 13 | const {classes: Cc, interfaces: Ci, results: Cr, utils: Cu} = Components; |
michael@0 | 14 | |
michael@0 | 15 | Cu.import("resource://services-common/async.js"); |
michael@0 | 16 | Cu.import("resource://gre/modules/Log.jsm"); |
michael@0 | 17 | Cu.import("resource://services-common/observers.js"); |
michael@0 | 18 | Cu.import("resource://services-common/utils.js"); |
michael@0 | 19 | Cu.import("resource://services-sync/constants.js"); |
michael@0 | 20 | Cu.import("resource://services-sync/identity.js"); |
michael@0 | 21 | Cu.import("resource://services-sync/record.js"); |
michael@0 | 22 | Cu.import("resource://services-sync/resource.js"); |
michael@0 | 23 | Cu.import("resource://services-sync/util.js"); |
michael@0 | 24 | |
michael@0 | 25 | /* |
michael@0 | 26 | * Trackers are associated with a single engine and deal with |
michael@0 | 27 | * listening for changes to their particular data type. |
michael@0 | 28 | * |
michael@0 | 29 | * There are two things they keep track of: |
michael@0 | 30 | * 1) A score, indicating how urgently the engine wants to sync |
michael@0 | 31 | * 2) A list of IDs for all the changed items that need to be synced |
michael@0 | 32 | * and updating their 'score', indicating how urgently they |
michael@0 | 33 | * want to sync. |
michael@0 | 34 | * |
michael@0 | 35 | */ |
michael@0 | 36 | this.Tracker = function Tracker(name, engine) { |
michael@0 | 37 | if (!engine) { |
michael@0 | 38 | throw new Error("Tracker must be associated with an Engine instance."); |
michael@0 | 39 | } |
michael@0 | 40 | |
michael@0 | 41 | name = name || "Unnamed"; |
michael@0 | 42 | this.name = this.file = name.toLowerCase(); |
michael@0 | 43 | this.engine = engine; |
michael@0 | 44 | |
michael@0 | 45 | this._log = Log.repository.getLogger("Sync.Tracker." + name); |
michael@0 | 46 | let level = Svc.Prefs.get("log.logger.engine." + this.name, "Debug"); |
michael@0 | 47 | this._log.level = Log.Level[level]; |
michael@0 | 48 | |
michael@0 | 49 | this._score = 0; |
michael@0 | 50 | this._ignored = []; |
michael@0 | 51 | this.ignoreAll = false; |
michael@0 | 52 | this.changedIDs = {}; |
michael@0 | 53 | this.loadChangedIDs(); |
michael@0 | 54 | |
michael@0 | 55 | Svc.Obs.add("weave:engine:start-tracking", this); |
michael@0 | 56 | Svc.Obs.add("weave:engine:stop-tracking", this); |
michael@0 | 57 | }; |
michael@0 | 58 | |
michael@0 | 59 | Tracker.prototype = { |
michael@0 | 60 | /* |
michael@0 | 61 | * Score can be called as often as desired to decide which engines to sync |
michael@0 | 62 | * |
michael@0 | 63 | * Valid values for score: |
michael@0 | 64 | * -1: Do not sync unless the user specifically requests it (almost disabled) |
michael@0 | 65 | * 0: Nothing has changed |
michael@0 | 66 | * 100: Please sync me ASAP! |
michael@0 | 67 | * |
michael@0 | 68 | * Setting it to other values should (but doesn't currently) throw an exception |
michael@0 | 69 | */ |
michael@0 | 70 | get score() { |
michael@0 | 71 | return this._score; |
michael@0 | 72 | }, |
michael@0 | 73 | |
michael@0 | 74 | set score(value) { |
michael@0 | 75 | this._score = value; |
michael@0 | 76 | Observers.notify("weave:engine:score:updated", this.name); |
michael@0 | 77 | }, |
michael@0 | 78 | |
michael@0 | 79 | // Should be called by service everytime a sync has been done for an engine |
michael@0 | 80 | resetScore: function () { |
michael@0 | 81 | this._score = 0; |
michael@0 | 82 | }, |
michael@0 | 83 | |
michael@0 | 84 | persistChangedIDs: true, |
michael@0 | 85 | |
michael@0 | 86 | /** |
michael@0 | 87 | * Persist changedIDs to disk at a later date. |
michael@0 | 88 | * Optionally pass a callback to be invoked when the write has occurred. |
michael@0 | 89 | */ |
michael@0 | 90 | saveChangedIDs: function (cb) { |
michael@0 | 91 | if (!this.persistChangedIDs) { |
michael@0 | 92 | this._log.debug("Not saving changedIDs."); |
michael@0 | 93 | return; |
michael@0 | 94 | } |
michael@0 | 95 | Utils.namedTimer(function () { |
michael@0 | 96 | this._log.debug("Saving changed IDs to " + this.file); |
michael@0 | 97 | Utils.jsonSave("changes/" + this.file, this, this.changedIDs, cb); |
michael@0 | 98 | }, 1000, this, "_lazySave"); |
michael@0 | 99 | }, |
michael@0 | 100 | |
michael@0 | 101 | loadChangedIDs: function (cb) { |
michael@0 | 102 | Utils.jsonLoad("changes/" + this.file, this, function(json) { |
michael@0 | 103 | if (json && (typeof(json) == "object")) { |
michael@0 | 104 | this.changedIDs = json; |
michael@0 | 105 | } else { |
michael@0 | 106 | this._log.warn("Changed IDs file " + this.file + " contains non-object value."); |
michael@0 | 107 | json = null; |
michael@0 | 108 | } |
michael@0 | 109 | if (cb) { |
michael@0 | 110 | cb.call(this, json); |
michael@0 | 111 | } |
michael@0 | 112 | }); |
michael@0 | 113 | }, |
michael@0 | 114 | |
michael@0 | 115 | // ignore/unignore specific IDs. Useful for ignoring items that are |
michael@0 | 116 | // being processed, or that shouldn't be synced. |
michael@0 | 117 | // But note: not persisted to disk |
michael@0 | 118 | |
michael@0 | 119 | ignoreID: function (id) { |
michael@0 | 120 | this.unignoreID(id); |
michael@0 | 121 | this._ignored.push(id); |
michael@0 | 122 | }, |
michael@0 | 123 | |
michael@0 | 124 | unignoreID: function (id) { |
michael@0 | 125 | let index = this._ignored.indexOf(id); |
michael@0 | 126 | if (index != -1) |
michael@0 | 127 | this._ignored.splice(index, 1); |
michael@0 | 128 | }, |
michael@0 | 129 | |
michael@0 | 130 | addChangedID: function (id, when) { |
michael@0 | 131 | if (!id) { |
michael@0 | 132 | this._log.warn("Attempted to add undefined ID to tracker"); |
michael@0 | 133 | return false; |
michael@0 | 134 | } |
michael@0 | 135 | |
michael@0 | 136 | if (this.ignoreAll || (id in this._ignored)) { |
michael@0 | 137 | return false; |
michael@0 | 138 | } |
michael@0 | 139 | |
michael@0 | 140 | // Default to the current time in seconds if no time is provided. |
michael@0 | 141 | if (when == null) { |
michael@0 | 142 | when = Math.floor(Date.now() / 1000); |
michael@0 | 143 | } |
michael@0 | 144 | |
michael@0 | 145 | // Add/update the entry if we have a newer time. |
michael@0 | 146 | if ((this.changedIDs[id] || -Infinity) < when) { |
michael@0 | 147 | this._log.trace("Adding changed ID: " + id + ", " + when); |
michael@0 | 148 | this.changedIDs[id] = when; |
michael@0 | 149 | this.saveChangedIDs(this.onSavedChangedIDs); |
michael@0 | 150 | } |
michael@0 | 151 | |
michael@0 | 152 | return true; |
michael@0 | 153 | }, |
michael@0 | 154 | |
michael@0 | 155 | removeChangedID: function (id) { |
michael@0 | 156 | if (!id) { |
michael@0 | 157 | this._log.warn("Attempted to remove undefined ID to tracker"); |
michael@0 | 158 | return false; |
michael@0 | 159 | } |
michael@0 | 160 | if (this.ignoreAll || (id in this._ignored)) |
michael@0 | 161 | return false; |
michael@0 | 162 | if (this.changedIDs[id] != null) { |
michael@0 | 163 | this._log.trace("Removing changed ID " + id); |
michael@0 | 164 | delete this.changedIDs[id]; |
michael@0 | 165 | this.saveChangedIDs(); |
michael@0 | 166 | } |
michael@0 | 167 | return true; |
michael@0 | 168 | }, |
michael@0 | 169 | |
michael@0 | 170 | clearChangedIDs: function () { |
michael@0 | 171 | this._log.trace("Clearing changed ID list"); |
michael@0 | 172 | this.changedIDs = {}; |
michael@0 | 173 | this.saveChangedIDs(); |
michael@0 | 174 | }, |
michael@0 | 175 | |
michael@0 | 176 | _isTracking: false, |
michael@0 | 177 | |
michael@0 | 178 | // Override these in your subclasses. |
michael@0 | 179 | startTracking: function () { |
michael@0 | 180 | }, |
michael@0 | 181 | |
michael@0 | 182 | stopTracking: function () { |
michael@0 | 183 | }, |
michael@0 | 184 | |
michael@0 | 185 | engineIsEnabled: function () { |
michael@0 | 186 | if (!this.engine) { |
michael@0 | 187 | // Can't tell -- we must be running in a test! |
michael@0 | 188 | return true; |
michael@0 | 189 | } |
michael@0 | 190 | return this.engine.enabled; |
michael@0 | 191 | }, |
michael@0 | 192 | |
michael@0 | 193 | onEngineEnabledChanged: function (engineEnabled) { |
michael@0 | 194 | if (engineEnabled == this._isTracking) { |
michael@0 | 195 | return; |
michael@0 | 196 | } |
michael@0 | 197 | |
michael@0 | 198 | if (engineEnabled) { |
michael@0 | 199 | this.startTracking(); |
michael@0 | 200 | this._isTracking = true; |
michael@0 | 201 | } else { |
michael@0 | 202 | this.stopTracking(); |
michael@0 | 203 | this._isTracking = false; |
michael@0 | 204 | this.clearChangedIDs(); |
michael@0 | 205 | } |
michael@0 | 206 | }, |
michael@0 | 207 | |
michael@0 | 208 | observe: function (subject, topic, data) { |
michael@0 | 209 | switch (topic) { |
michael@0 | 210 | case "weave:engine:start-tracking": |
michael@0 | 211 | if (!this.engineIsEnabled()) { |
michael@0 | 212 | return; |
michael@0 | 213 | } |
michael@0 | 214 | this._log.trace("Got start-tracking."); |
michael@0 | 215 | if (!this._isTracking) { |
michael@0 | 216 | this.startTracking(); |
michael@0 | 217 | this._isTracking = true; |
michael@0 | 218 | } |
michael@0 | 219 | return; |
michael@0 | 220 | case "weave:engine:stop-tracking": |
michael@0 | 221 | this._log.trace("Got stop-tracking."); |
michael@0 | 222 | if (this._isTracking) { |
michael@0 | 223 | this.stopTracking(); |
michael@0 | 224 | this._isTracking = false; |
michael@0 | 225 | } |
michael@0 | 226 | return; |
michael@0 | 227 | } |
michael@0 | 228 | } |
michael@0 | 229 | }; |
michael@0 | 230 | |
michael@0 | 231 | |
michael@0 | 232 | |
michael@0 | 233 | /** |
michael@0 | 234 | * The Store serves as the interface between Sync and stored data. |
michael@0 | 235 | * |
michael@0 | 236 | * The name "store" is slightly a misnomer because it doesn't actually "store" |
michael@0 | 237 | * anything. Instead, it serves as a gateway to something that actually does |
michael@0 | 238 | * the "storing." |
michael@0 | 239 | * |
michael@0 | 240 | * The store is responsible for record management inside an engine. It tells |
michael@0 | 241 | * Sync what items are available for Sync, converts items to and from Sync's |
michael@0 | 242 | * record format, and applies records from Sync into changes on the underlying |
michael@0 | 243 | * store. |
michael@0 | 244 | * |
michael@0 | 245 | * Store implementations require a number of functions to be implemented. These |
michael@0 | 246 | * are all documented below. |
michael@0 | 247 | * |
michael@0 | 248 | * For stores that deal with many records or which have expensive store access |
michael@0 | 249 | * routines, it is highly recommended to implement a custom applyIncomingBatch |
michael@0 | 250 | * and/or applyIncoming function on top of the basic APIs. |
michael@0 | 251 | */ |
michael@0 | 252 | |
michael@0 | 253 | this.Store = function Store(name, engine) { |
michael@0 | 254 | if (!engine) { |
michael@0 | 255 | throw new Error("Store must be associated with an Engine instance."); |
michael@0 | 256 | } |
michael@0 | 257 | |
michael@0 | 258 | name = name || "Unnamed"; |
michael@0 | 259 | this.name = name.toLowerCase(); |
michael@0 | 260 | this.engine = engine; |
michael@0 | 261 | |
michael@0 | 262 | this._log = Log.repository.getLogger("Sync.Store." + name); |
michael@0 | 263 | let level = Svc.Prefs.get("log.logger.engine." + this.name, "Debug"); |
michael@0 | 264 | this._log.level = Log.Level[level]; |
michael@0 | 265 | |
michael@0 | 266 | XPCOMUtils.defineLazyGetter(this, "_timer", function() { |
michael@0 | 267 | return Cc["@mozilla.org/timer;1"].createInstance(Ci.nsITimer); |
michael@0 | 268 | }); |
michael@0 | 269 | } |
michael@0 | 270 | Store.prototype = { |
michael@0 | 271 | |
michael@0 | 272 | _sleep: function _sleep(delay) { |
michael@0 | 273 | let cb = Async.makeSyncCallback(); |
michael@0 | 274 | this._timer.initWithCallback(cb, delay, Ci.nsITimer.TYPE_ONE_SHOT); |
michael@0 | 275 | Async.waitForSyncCallback(cb); |
michael@0 | 276 | }, |
michael@0 | 277 | |
michael@0 | 278 | /** |
michael@0 | 279 | * Apply multiple incoming records against the store. |
michael@0 | 280 | * |
michael@0 | 281 | * This is called with a set of incoming records to process. The function |
michael@0 | 282 | * should look at each record, reconcile with the current local state, and |
michael@0 | 283 | * make the local changes required to bring its state in alignment with the |
michael@0 | 284 | * record. |
michael@0 | 285 | * |
michael@0 | 286 | * The default implementation simply iterates over all records and calls |
michael@0 | 287 | * applyIncoming(). Store implementations may overwrite this function |
michael@0 | 288 | * if desired. |
michael@0 | 289 | * |
michael@0 | 290 | * @param records Array of records to apply |
michael@0 | 291 | * @return Array of record IDs which did not apply cleanly |
michael@0 | 292 | */ |
michael@0 | 293 | applyIncomingBatch: function (records) { |
michael@0 | 294 | let failed = []; |
michael@0 | 295 | for each (let record in records) { |
michael@0 | 296 | try { |
michael@0 | 297 | this.applyIncoming(record); |
michael@0 | 298 | } catch (ex if (ex.code == Engine.prototype.eEngineAbortApplyIncoming)) { |
michael@0 | 299 | // This kind of exception should have a 'cause' attribute, which is an |
michael@0 | 300 | // originating exception. |
michael@0 | 301 | // ex.cause will carry its stack with it when rethrown. |
michael@0 | 302 | throw ex.cause; |
michael@0 | 303 | } catch (ex) { |
michael@0 | 304 | this._log.warn("Failed to apply incoming record " + record.id); |
michael@0 | 305 | this._log.warn("Encountered exception: " + Utils.exceptionStr(ex)); |
michael@0 | 306 | failed.push(record.id); |
michael@0 | 307 | } |
michael@0 | 308 | }; |
michael@0 | 309 | return failed; |
michael@0 | 310 | }, |
michael@0 | 311 | |
michael@0 | 312 | /** |
michael@0 | 313 | * Apply a single record against the store. |
michael@0 | 314 | * |
michael@0 | 315 | * This takes a single record and makes the local changes required so the |
michael@0 | 316 | * local state matches what's in the record. |
michael@0 | 317 | * |
michael@0 | 318 | * The default implementation calls one of remove(), create(), or update() |
michael@0 | 319 | * depending on the state obtained from the store itself. Store |
michael@0 | 320 | * implementations may overwrite this function if desired. |
michael@0 | 321 | * |
michael@0 | 322 | * @param record |
michael@0 | 323 | * Record to apply |
michael@0 | 324 | */ |
michael@0 | 325 | applyIncoming: function (record) { |
michael@0 | 326 | if (record.deleted) |
michael@0 | 327 | this.remove(record); |
michael@0 | 328 | else if (!this.itemExists(record.id)) |
michael@0 | 329 | this.create(record); |
michael@0 | 330 | else |
michael@0 | 331 | this.update(record); |
michael@0 | 332 | }, |
michael@0 | 333 | |
michael@0 | 334 | // override these in derived objects |
michael@0 | 335 | |
michael@0 | 336 | /** |
michael@0 | 337 | * Create an item in the store from a record. |
michael@0 | 338 | * |
michael@0 | 339 | * This is called by the default implementation of applyIncoming(). If using |
michael@0 | 340 | * applyIncomingBatch(), this won't be called unless your store calls it. |
michael@0 | 341 | * |
michael@0 | 342 | * @param record |
michael@0 | 343 | * The store record to create an item from |
michael@0 | 344 | */ |
michael@0 | 345 | create: function (record) { |
michael@0 | 346 | throw "override create in a subclass"; |
michael@0 | 347 | }, |
michael@0 | 348 | |
michael@0 | 349 | /** |
michael@0 | 350 | * Remove an item in the store from a record. |
michael@0 | 351 | * |
michael@0 | 352 | * This is called by the default implementation of applyIncoming(). If using |
michael@0 | 353 | * applyIncomingBatch(), this won't be called unless your store calls it. |
michael@0 | 354 | * |
michael@0 | 355 | * @param record |
michael@0 | 356 | * The store record to delete an item from |
michael@0 | 357 | */ |
michael@0 | 358 | remove: function (record) { |
michael@0 | 359 | throw "override remove in a subclass"; |
michael@0 | 360 | }, |
michael@0 | 361 | |
michael@0 | 362 | /** |
michael@0 | 363 | * Update an item from a record. |
michael@0 | 364 | * |
michael@0 | 365 | * This is called by the default implementation of applyIncoming(). If using |
michael@0 | 366 | * applyIncomingBatch(), this won't be called unless your store calls it. |
michael@0 | 367 | * |
michael@0 | 368 | * @param record |
michael@0 | 369 | * The record to use to update an item from |
michael@0 | 370 | */ |
michael@0 | 371 | update: function (record) { |
michael@0 | 372 | throw "override update in a subclass"; |
michael@0 | 373 | }, |
michael@0 | 374 | |
michael@0 | 375 | /** |
michael@0 | 376 | * Determine whether a record with the specified ID exists. |
michael@0 | 377 | * |
michael@0 | 378 | * Takes a string record ID and returns a booleans saying whether the record |
michael@0 | 379 | * exists. |
michael@0 | 380 | * |
michael@0 | 381 | * @param id |
michael@0 | 382 | * string record ID |
michael@0 | 383 | * @return boolean indicating whether record exists locally |
michael@0 | 384 | */ |
michael@0 | 385 | itemExists: function (id) { |
michael@0 | 386 | throw "override itemExists in a subclass"; |
michael@0 | 387 | }, |
michael@0 | 388 | |
michael@0 | 389 | /** |
michael@0 | 390 | * Create a record from the specified ID. |
michael@0 | 391 | * |
michael@0 | 392 | * If the ID is known, the record should be populated with metadata from |
michael@0 | 393 | * the store. If the ID is not known, the record should be created with the |
michael@0 | 394 | * delete field set to true. |
michael@0 | 395 | * |
michael@0 | 396 | * @param id |
michael@0 | 397 | * string record ID |
michael@0 | 398 | * @param collection |
michael@0 | 399 | * Collection to add record to. This is typically passed into the |
michael@0 | 400 | * constructor for the newly-created record. |
michael@0 | 401 | * @return record type for this engine |
michael@0 | 402 | */ |
michael@0 | 403 | createRecord: function (id, collection) { |
michael@0 | 404 | throw "override createRecord in a subclass"; |
michael@0 | 405 | }, |
michael@0 | 406 | |
michael@0 | 407 | /** |
michael@0 | 408 | * Change the ID of a record. |
michael@0 | 409 | * |
michael@0 | 410 | * @param oldID |
michael@0 | 411 | * string old/current record ID |
michael@0 | 412 | * @param newID |
michael@0 | 413 | * string new record ID |
michael@0 | 414 | */ |
michael@0 | 415 | changeItemID: function (oldID, newID) { |
michael@0 | 416 | throw "override changeItemID in a subclass"; |
michael@0 | 417 | }, |
michael@0 | 418 | |
michael@0 | 419 | /** |
michael@0 | 420 | * Obtain the set of all known record IDs. |
michael@0 | 421 | * |
michael@0 | 422 | * @return Object with ID strings as keys and values of true. The values |
michael@0 | 423 | * are ignored. |
michael@0 | 424 | */ |
michael@0 | 425 | getAllIDs: function () { |
michael@0 | 426 | throw "override getAllIDs in a subclass"; |
michael@0 | 427 | }, |
michael@0 | 428 | |
michael@0 | 429 | /** |
michael@0 | 430 | * Wipe all data in the store. |
michael@0 | 431 | * |
michael@0 | 432 | * This function is called during remote wipes or when replacing local data |
michael@0 | 433 | * with remote data. |
michael@0 | 434 | * |
michael@0 | 435 | * This function should delete all local data that the store is managing. It |
michael@0 | 436 | * can be thought of as clearing out all state and restoring the "new |
michael@0 | 437 | * browser" state. |
michael@0 | 438 | */ |
michael@0 | 439 | wipe: function () { |
michael@0 | 440 | throw "override wipe in a subclass"; |
michael@0 | 441 | } |
michael@0 | 442 | }; |
michael@0 | 443 | |
michael@0 | 444 | this.EngineManager = function EngineManager(service) { |
michael@0 | 445 | this.service = service; |
michael@0 | 446 | |
michael@0 | 447 | this._engines = {}; |
michael@0 | 448 | |
michael@0 | 449 | // This will be populated by Service on startup. |
michael@0 | 450 | this._declined = new Set(); |
michael@0 | 451 | this._log = Log.repository.getLogger("Sync.EngineManager"); |
michael@0 | 452 | this._log.level = Log.Level[Svc.Prefs.get("log.logger.service.engines", "Debug")]; |
michael@0 | 453 | } |
michael@0 | 454 | EngineManager.prototype = { |
michael@0 | 455 | get: function (name) { |
michael@0 | 456 | // Return an array of engines if we have an array of names |
michael@0 | 457 | if (Array.isArray(name)) { |
michael@0 | 458 | let engines = []; |
michael@0 | 459 | name.forEach(function(name) { |
michael@0 | 460 | let engine = this.get(name); |
michael@0 | 461 | if (engine) { |
michael@0 | 462 | engines.push(engine); |
michael@0 | 463 | } |
michael@0 | 464 | }, this); |
michael@0 | 465 | return engines; |
michael@0 | 466 | } |
michael@0 | 467 | |
michael@0 | 468 | let engine = this._engines[name]; |
michael@0 | 469 | if (!engine) { |
michael@0 | 470 | this._log.debug("Could not get engine: " + name); |
michael@0 | 471 | if (Object.keys) { |
michael@0 | 472 | this._log.debug("Engines are: " + JSON.stringify(Object.keys(this._engines))); |
michael@0 | 473 | } |
michael@0 | 474 | } |
michael@0 | 475 | return engine; |
michael@0 | 476 | }, |
michael@0 | 477 | |
michael@0 | 478 | getAll: function () { |
michael@0 | 479 | return [engine for ([name, engine] in Iterator(this._engines))]; |
michael@0 | 480 | }, |
michael@0 | 481 | |
michael@0 | 482 | /** |
michael@0 | 483 | * N.B., does not pay attention to the declined list. |
michael@0 | 484 | */ |
michael@0 | 485 | getEnabled: function () { |
michael@0 | 486 | return this.getAll().filter(function(engine) engine.enabled); |
michael@0 | 487 | }, |
michael@0 | 488 | |
michael@0 | 489 | get enabledEngineNames() { |
michael@0 | 490 | return [e.name for each (e in this.getEnabled())]; |
michael@0 | 491 | }, |
michael@0 | 492 | |
michael@0 | 493 | persistDeclined: function () { |
michael@0 | 494 | Svc.Prefs.set("declinedEngines", [...this._declined].join(",")); |
michael@0 | 495 | }, |
michael@0 | 496 | |
michael@0 | 497 | /** |
michael@0 | 498 | * Returns an array. |
michael@0 | 499 | */ |
michael@0 | 500 | getDeclined: function () { |
michael@0 | 501 | return [...this._declined]; |
michael@0 | 502 | }, |
michael@0 | 503 | |
michael@0 | 504 | setDeclined: function (engines) { |
michael@0 | 505 | this._declined = new Set(engines); |
michael@0 | 506 | this.persistDeclined(); |
michael@0 | 507 | }, |
michael@0 | 508 | |
michael@0 | 509 | isDeclined: function (engineName) { |
michael@0 | 510 | return this._declined.has(engineName); |
michael@0 | 511 | }, |
michael@0 | 512 | |
michael@0 | 513 | /** |
michael@0 | 514 | * Accepts a Set or an array. |
michael@0 | 515 | */ |
michael@0 | 516 | decline: function (engines) { |
michael@0 | 517 | for (let e of engines) { |
michael@0 | 518 | this._declined.add(e); |
michael@0 | 519 | } |
michael@0 | 520 | this.persistDeclined(); |
michael@0 | 521 | }, |
michael@0 | 522 | |
michael@0 | 523 | undecline: function (engines) { |
michael@0 | 524 | for (let e of engines) { |
michael@0 | 525 | this._declined.delete(e); |
michael@0 | 526 | } |
michael@0 | 527 | this.persistDeclined(); |
michael@0 | 528 | }, |
michael@0 | 529 | |
michael@0 | 530 | /** |
michael@0 | 531 | * Mark any non-enabled engines as declined. |
michael@0 | 532 | * |
michael@0 | 533 | * This is useful after initial customization during setup. |
michael@0 | 534 | */ |
michael@0 | 535 | declineDisabled: function () { |
michael@0 | 536 | for (let e of this.getAll()) { |
michael@0 | 537 | if (!e.enabled) { |
michael@0 | 538 | this._log.debug("Declining disabled engine " + e.name); |
michael@0 | 539 | this._declined.add(e.name); |
michael@0 | 540 | } |
michael@0 | 541 | } |
michael@0 | 542 | this.persistDeclined(); |
michael@0 | 543 | }, |
michael@0 | 544 | |
michael@0 | 545 | /** |
michael@0 | 546 | * Register an Engine to the service. Alternatively, give an array of engine |
michael@0 | 547 | * objects to register. |
michael@0 | 548 | * |
michael@0 | 549 | * @param engineObject |
michael@0 | 550 | * Engine object used to get an instance of the engine |
michael@0 | 551 | * @return The engine object if anything failed |
michael@0 | 552 | */ |
michael@0 | 553 | register: function (engineObject) { |
michael@0 | 554 | if (Array.isArray(engineObject)) { |
michael@0 | 555 | return engineObject.map(this.register, this); |
michael@0 | 556 | } |
michael@0 | 557 | |
michael@0 | 558 | try { |
michael@0 | 559 | let engine = new engineObject(this.service); |
michael@0 | 560 | let name = engine.name; |
michael@0 | 561 | if (name in this._engines) { |
michael@0 | 562 | this._log.error("Engine '" + name + "' is already registered!"); |
michael@0 | 563 | } else { |
michael@0 | 564 | this._engines[name] = engine; |
michael@0 | 565 | } |
michael@0 | 566 | } catch (ex) { |
michael@0 | 567 | this._log.error(CommonUtils.exceptionStr(ex)); |
michael@0 | 568 | |
michael@0 | 569 | let mesg = ex.message ? ex.message : ex; |
michael@0 | 570 | let name = engineObject || ""; |
michael@0 | 571 | name = name.prototype || ""; |
michael@0 | 572 | name = name.name || ""; |
michael@0 | 573 | |
michael@0 | 574 | let out = "Could not initialize engine '" + name + "': " + mesg; |
michael@0 | 575 | this._log.error(out); |
michael@0 | 576 | |
michael@0 | 577 | return engineObject; |
michael@0 | 578 | } |
michael@0 | 579 | }, |
michael@0 | 580 | |
michael@0 | 581 | unregister: function (val) { |
michael@0 | 582 | let name = val; |
michael@0 | 583 | if (val instanceof Engine) { |
michael@0 | 584 | name = val.name; |
michael@0 | 585 | } |
michael@0 | 586 | delete this._engines[name]; |
michael@0 | 587 | }, |
michael@0 | 588 | |
michael@0 | 589 | clear: function () { |
michael@0 | 590 | for (let name in this._engines) { |
michael@0 | 591 | delete this._engines[name]; |
michael@0 | 592 | } |
michael@0 | 593 | }, |
michael@0 | 594 | }; |
michael@0 | 595 | |
michael@0 | 596 | this.Engine = function Engine(name, service) { |
michael@0 | 597 | if (!service) { |
michael@0 | 598 | throw new Error("Engine must be associated with a Service instance."); |
michael@0 | 599 | } |
michael@0 | 600 | |
michael@0 | 601 | this.Name = name || "Unnamed"; |
michael@0 | 602 | this.name = name.toLowerCase(); |
michael@0 | 603 | this.service = service; |
michael@0 | 604 | |
michael@0 | 605 | this._notify = Utils.notify("weave:engine:"); |
michael@0 | 606 | this._log = Log.repository.getLogger("Sync.Engine." + this.Name); |
michael@0 | 607 | let level = Svc.Prefs.get("log.logger.engine." + this.name, "Debug"); |
michael@0 | 608 | this._log.level = Log.Level[level]; |
michael@0 | 609 | |
michael@0 | 610 | this._tracker; // initialize tracker to load previously changed IDs |
michael@0 | 611 | this._log.debug("Engine initialized"); |
michael@0 | 612 | } |
michael@0 | 613 | Engine.prototype = { |
michael@0 | 614 | // _storeObj, and _trackerObj should to be overridden in subclasses |
michael@0 | 615 | _storeObj: Store, |
michael@0 | 616 | _trackerObj: Tracker, |
michael@0 | 617 | |
michael@0 | 618 | // Local 'constant'. |
michael@0 | 619 | // Signal to the engine that processing further records is pointless. |
michael@0 | 620 | eEngineAbortApplyIncoming: "error.engine.abort.applyincoming", |
michael@0 | 621 | |
michael@0 | 622 | get prefName() this.name, |
michael@0 | 623 | get enabled() { |
michael@0 | 624 | return Svc.Prefs.get("engine." + this.prefName, false); |
michael@0 | 625 | }, |
michael@0 | 626 | |
michael@0 | 627 | set enabled(val) { |
michael@0 | 628 | Svc.Prefs.set("engine." + this.prefName, !!val); |
michael@0 | 629 | this._tracker.onEngineEnabledChanged(val); |
michael@0 | 630 | }, |
michael@0 | 631 | |
michael@0 | 632 | get score() this._tracker.score, |
michael@0 | 633 | |
michael@0 | 634 | get _store() { |
michael@0 | 635 | let store = new this._storeObj(this.Name, this); |
michael@0 | 636 | this.__defineGetter__("_store", function() store); |
michael@0 | 637 | return store; |
michael@0 | 638 | }, |
michael@0 | 639 | |
michael@0 | 640 | get _tracker() { |
michael@0 | 641 | let tracker = new this._trackerObj(this.Name, this); |
michael@0 | 642 | this.__defineGetter__("_tracker", function() tracker); |
michael@0 | 643 | return tracker; |
michael@0 | 644 | }, |
michael@0 | 645 | |
michael@0 | 646 | sync: function () { |
michael@0 | 647 | if (!this.enabled) { |
michael@0 | 648 | return; |
michael@0 | 649 | } |
michael@0 | 650 | |
michael@0 | 651 | if (!this._sync) { |
michael@0 | 652 | throw "engine does not implement _sync method"; |
michael@0 | 653 | } |
michael@0 | 654 | |
michael@0 | 655 | this._notify("sync", this.name, this._sync)(); |
michael@0 | 656 | }, |
michael@0 | 657 | |
michael@0 | 658 | /** |
michael@0 | 659 | * Get rid of any local meta-data. |
michael@0 | 660 | */ |
michael@0 | 661 | resetClient: function () { |
michael@0 | 662 | if (!this._resetClient) { |
michael@0 | 663 | throw "engine does not implement _resetClient method"; |
michael@0 | 664 | } |
michael@0 | 665 | |
michael@0 | 666 | this._notify("reset-client", this.name, this._resetClient)(); |
michael@0 | 667 | }, |
michael@0 | 668 | |
michael@0 | 669 | _wipeClient: function () { |
michael@0 | 670 | this.resetClient(); |
michael@0 | 671 | this._log.debug("Deleting all local data"); |
michael@0 | 672 | this._tracker.ignoreAll = true; |
michael@0 | 673 | this._store.wipe(); |
michael@0 | 674 | this._tracker.ignoreAll = false; |
michael@0 | 675 | this._tracker.clearChangedIDs(); |
michael@0 | 676 | }, |
michael@0 | 677 | |
michael@0 | 678 | wipeClient: function () { |
michael@0 | 679 | this._notify("wipe-client", this.name, this._wipeClient)(); |
michael@0 | 680 | } |
michael@0 | 681 | }; |
michael@0 | 682 | |
michael@0 | 683 | this.SyncEngine = function SyncEngine(name, service) { |
michael@0 | 684 | Engine.call(this, name || "SyncEngine", service); |
michael@0 | 685 | |
michael@0 | 686 | this.loadToFetch(); |
michael@0 | 687 | this.loadPreviousFailed(); |
michael@0 | 688 | } |
michael@0 | 689 | |
michael@0 | 690 | // Enumeration to define approaches to handling bad records. |
michael@0 | 691 | // Attached to the constructor to allow use as a kind of static enumeration. |
michael@0 | 692 | SyncEngine.kRecoveryStrategy = { |
michael@0 | 693 | ignore: "ignore", |
michael@0 | 694 | retry: "retry", |
michael@0 | 695 | error: "error" |
michael@0 | 696 | }; |
michael@0 | 697 | |
michael@0 | 698 | SyncEngine.prototype = { |
michael@0 | 699 | __proto__: Engine.prototype, |
michael@0 | 700 | _recordObj: CryptoWrapper, |
michael@0 | 701 | version: 1, |
michael@0 | 702 | |
michael@0 | 703 | // How many records to pull in a single sync. This is primarily to avoid very |
michael@0 | 704 | // long first syncs against profiles with many history records. |
michael@0 | 705 | downloadLimit: null, |
michael@0 | 706 | |
michael@0 | 707 | // How many records to pull at one time when specifying IDs. This is to avoid |
michael@0 | 708 | // URI length limitations. |
michael@0 | 709 | guidFetchBatchSize: DEFAULT_GUID_FETCH_BATCH_SIZE, |
michael@0 | 710 | mobileGUIDFetchBatchSize: DEFAULT_MOBILE_GUID_FETCH_BATCH_SIZE, |
michael@0 | 711 | |
michael@0 | 712 | // How many records to process in a single batch. |
michael@0 | 713 | applyIncomingBatchSize: DEFAULT_STORE_BATCH_SIZE, |
michael@0 | 714 | |
michael@0 | 715 | get storageURL() this.service.storageURL, |
michael@0 | 716 | |
michael@0 | 717 | get engineURL() this.storageURL + this.name, |
michael@0 | 718 | |
michael@0 | 719 | get cryptoKeysURL() this.storageURL + "crypto/keys", |
michael@0 | 720 | |
michael@0 | 721 | get metaURL() this.storageURL + "meta/global", |
michael@0 | 722 | |
michael@0 | 723 | get syncID() { |
michael@0 | 724 | // Generate a random syncID if we don't have one |
michael@0 | 725 | let syncID = Svc.Prefs.get(this.name + ".syncID", ""); |
michael@0 | 726 | return syncID == "" ? this.syncID = Utils.makeGUID() : syncID; |
michael@0 | 727 | }, |
michael@0 | 728 | set syncID(value) { |
michael@0 | 729 | Svc.Prefs.set(this.name + ".syncID", value); |
michael@0 | 730 | }, |
michael@0 | 731 | |
michael@0 | 732 | /* |
michael@0 | 733 | * lastSync is a timestamp in server time. |
michael@0 | 734 | */ |
michael@0 | 735 | get lastSync() { |
michael@0 | 736 | return parseFloat(Svc.Prefs.get(this.name + ".lastSync", "0")); |
michael@0 | 737 | }, |
michael@0 | 738 | set lastSync(value) { |
michael@0 | 739 | // Reset the pref in-case it's a number instead of a string |
michael@0 | 740 | Svc.Prefs.reset(this.name + ".lastSync"); |
michael@0 | 741 | // Store the value as a string to keep floating point precision |
michael@0 | 742 | Svc.Prefs.set(this.name + ".lastSync", value.toString()); |
michael@0 | 743 | }, |
michael@0 | 744 | resetLastSync: function () { |
michael@0 | 745 | this._log.debug("Resetting " + this.name + " last sync time"); |
michael@0 | 746 | Svc.Prefs.reset(this.name + ".lastSync"); |
michael@0 | 747 | Svc.Prefs.set(this.name + ".lastSync", "0"); |
michael@0 | 748 | this.lastSyncLocal = 0; |
michael@0 | 749 | }, |
michael@0 | 750 | |
michael@0 | 751 | get toFetch() this._toFetch, |
michael@0 | 752 | set toFetch(val) { |
michael@0 | 753 | let cb = (error) => this._log.error(Utils.exceptionStr(error)); |
michael@0 | 754 | // Coerce the array to a string for more efficient comparison. |
michael@0 | 755 | if (val + "" == this._toFetch) { |
michael@0 | 756 | return; |
michael@0 | 757 | } |
michael@0 | 758 | this._toFetch = val; |
michael@0 | 759 | Utils.namedTimer(function () { |
michael@0 | 760 | Utils.jsonSave("toFetch/" + this.name, this, val, cb); |
michael@0 | 761 | }, 0, this, "_toFetchDelay"); |
michael@0 | 762 | }, |
michael@0 | 763 | |
michael@0 | 764 | loadToFetch: function () { |
michael@0 | 765 | // Initialize to empty if there's no file. |
michael@0 | 766 | this._toFetch = []; |
michael@0 | 767 | Utils.jsonLoad("toFetch/" + this.name, this, function(toFetch) { |
michael@0 | 768 | if (toFetch) { |
michael@0 | 769 | this._toFetch = toFetch; |
michael@0 | 770 | } |
michael@0 | 771 | }); |
michael@0 | 772 | }, |
michael@0 | 773 | |
michael@0 | 774 | get previousFailed() this._previousFailed, |
michael@0 | 775 | set previousFailed(val) { |
michael@0 | 776 | let cb = (error) => this._log.error(Utils.exceptionStr(error)); |
michael@0 | 777 | // Coerce the array to a string for more efficient comparison. |
michael@0 | 778 | if (val + "" == this._previousFailed) { |
michael@0 | 779 | return; |
michael@0 | 780 | } |
michael@0 | 781 | this._previousFailed = val; |
michael@0 | 782 | Utils.namedTimer(function () { |
michael@0 | 783 | Utils.jsonSave("failed/" + this.name, this, val, cb); |
michael@0 | 784 | }, 0, this, "_previousFailedDelay"); |
michael@0 | 785 | }, |
michael@0 | 786 | |
michael@0 | 787 | loadPreviousFailed: function () { |
michael@0 | 788 | // Initialize to empty if there's no file |
michael@0 | 789 | this._previousFailed = []; |
michael@0 | 790 | Utils.jsonLoad("failed/" + this.name, this, function(previousFailed) { |
michael@0 | 791 | if (previousFailed) { |
michael@0 | 792 | this._previousFailed = previousFailed; |
michael@0 | 793 | } |
michael@0 | 794 | }); |
michael@0 | 795 | }, |
michael@0 | 796 | |
michael@0 | 797 | /* |
michael@0 | 798 | * lastSyncLocal is a timestamp in local time. |
michael@0 | 799 | */ |
michael@0 | 800 | get lastSyncLocal() { |
michael@0 | 801 | return parseInt(Svc.Prefs.get(this.name + ".lastSyncLocal", "0"), 10); |
michael@0 | 802 | }, |
michael@0 | 803 | set lastSyncLocal(value) { |
michael@0 | 804 | // Store as a string because pref can only store C longs as numbers. |
michael@0 | 805 | Svc.Prefs.set(this.name + ".lastSyncLocal", value.toString()); |
michael@0 | 806 | }, |
michael@0 | 807 | |
michael@0 | 808 | /* |
michael@0 | 809 | * Returns a mapping of IDs -> changed timestamp. Engine implementations |
michael@0 | 810 | * can override this method to bypass the tracker for certain or all |
michael@0 | 811 | * changed items. |
michael@0 | 812 | */ |
michael@0 | 813 | getChangedIDs: function () { |
michael@0 | 814 | return this._tracker.changedIDs; |
michael@0 | 815 | }, |
michael@0 | 816 | |
michael@0 | 817 | // Create a new record using the store and add in crypto fields. |
michael@0 | 818 | _createRecord: function (id) { |
michael@0 | 819 | let record = this._store.createRecord(id, this.name); |
michael@0 | 820 | record.id = id; |
michael@0 | 821 | record.collection = this.name; |
michael@0 | 822 | return record; |
michael@0 | 823 | }, |
michael@0 | 824 | |
michael@0 | 825 | // Any setup that needs to happen at the beginning of each sync. |
michael@0 | 826 | _syncStartup: function () { |
michael@0 | 827 | |
michael@0 | 828 | // Determine if we need to wipe on outdated versions |
michael@0 | 829 | let metaGlobal = this.service.recordManager.get(this.metaURL); |
michael@0 | 830 | let engines = metaGlobal.payload.engines || {}; |
michael@0 | 831 | let engineData = engines[this.name] || {}; |
michael@0 | 832 | |
michael@0 | 833 | let needsWipe = false; |
michael@0 | 834 | |
michael@0 | 835 | // Assume missing versions are 0 and wipe the server |
michael@0 | 836 | if ((engineData.version || 0) < this.version) { |
michael@0 | 837 | this._log.debug("Old engine data: " + [engineData.version, this.version]); |
michael@0 | 838 | |
michael@0 | 839 | // Prepare to clear the server and upload everything |
michael@0 | 840 | needsWipe = true; |
michael@0 | 841 | this.syncID = ""; |
michael@0 | 842 | |
michael@0 | 843 | // Set the newer version and newly generated syncID |
michael@0 | 844 | engineData.version = this.version; |
michael@0 | 845 | engineData.syncID = this.syncID; |
michael@0 | 846 | |
michael@0 | 847 | // Put the new data back into meta/global and mark for upload |
michael@0 | 848 | engines[this.name] = engineData; |
michael@0 | 849 | metaGlobal.payload.engines = engines; |
michael@0 | 850 | metaGlobal.changed = true; |
michael@0 | 851 | } |
michael@0 | 852 | // Don't sync this engine if the server has newer data |
michael@0 | 853 | else if (engineData.version > this.version) { |
michael@0 | 854 | let error = new String("New data: " + [engineData.version, this.version]); |
michael@0 | 855 | error.failureCode = VERSION_OUT_OF_DATE; |
michael@0 | 856 | throw error; |
michael@0 | 857 | } |
michael@0 | 858 | // Changes to syncID mean we'll need to upload everything |
michael@0 | 859 | else if (engineData.syncID != this.syncID) { |
michael@0 | 860 | this._log.debug("Engine syncIDs: " + [engineData.syncID, this.syncID]); |
michael@0 | 861 | this.syncID = engineData.syncID; |
michael@0 | 862 | this._resetClient(); |
michael@0 | 863 | }; |
michael@0 | 864 | |
michael@0 | 865 | // Delete any existing data and reupload on bad version or missing meta. |
michael@0 | 866 | // No crypto component here...? We could regenerate per-collection keys... |
michael@0 | 867 | if (needsWipe) { |
michael@0 | 868 | this.wipeServer(); |
michael@0 | 869 | } |
michael@0 | 870 | |
michael@0 | 871 | // Save objects that need to be uploaded in this._modified. We also save |
michael@0 | 872 | // the timestamp of this fetch in this.lastSyncLocal. As we successfully |
michael@0 | 873 | // upload objects we remove them from this._modified. If an error occurs |
michael@0 | 874 | // or any objects fail to upload, they will remain in this._modified. At |
michael@0 | 875 | // the end of a sync, or after an error, we add all objects remaining in |
michael@0 | 876 | // this._modified to the tracker. |
michael@0 | 877 | this.lastSyncLocal = Date.now(); |
michael@0 | 878 | if (this.lastSync) { |
michael@0 | 879 | this._modified = this.getChangedIDs(); |
michael@0 | 880 | } else { |
michael@0 | 881 | // Mark all items to be uploaded, but treat them as changed from long ago |
michael@0 | 882 | this._log.debug("First sync, uploading all items"); |
michael@0 | 883 | this._modified = {}; |
michael@0 | 884 | for (let id in this._store.getAllIDs()) { |
michael@0 | 885 | this._modified[id] = 0; |
michael@0 | 886 | } |
michael@0 | 887 | } |
michael@0 | 888 | // Clear the tracker now. If the sync fails we'll add the ones we failed |
michael@0 | 889 | // to upload back. |
michael@0 | 890 | this._tracker.clearChangedIDs(); |
michael@0 | 891 | |
michael@0 | 892 | this._log.info(Object.keys(this._modified).length + |
michael@0 | 893 | " outgoing items pre-reconciliation"); |
michael@0 | 894 | |
michael@0 | 895 | // Keep track of what to delete at the end of sync |
michael@0 | 896 | this._delete = {}; |
michael@0 | 897 | }, |
michael@0 | 898 | |
michael@0 | 899 | /** |
michael@0 | 900 | * A tiny abstraction to make it easier to test incoming record |
michael@0 | 901 | * application. |
michael@0 | 902 | */ |
michael@0 | 903 | _itemSource: function () { |
michael@0 | 904 | return new Collection(this.engineURL, this._recordObj, this.service); |
michael@0 | 905 | }, |
michael@0 | 906 | |
michael@0 | 907 | /** |
michael@0 | 908 | * Process incoming records. |
michael@0 | 909 | * In the most awful and untestable way possible. |
michael@0 | 910 | * This now accepts something that makes testing vaguely less impossible. |
michael@0 | 911 | */ |
michael@0 | 912 | _processIncoming: function (newitems) { |
michael@0 | 913 | this._log.trace("Downloading & applying server changes"); |
michael@0 | 914 | |
michael@0 | 915 | // Figure out how many total items to fetch this sync; do less on mobile. |
michael@0 | 916 | let batchSize = this.downloadLimit || Infinity; |
michael@0 | 917 | let isMobile = (Svc.Prefs.get("client.type") == "mobile"); |
michael@0 | 918 | |
michael@0 | 919 | if (!newitems) { |
michael@0 | 920 | newitems = this._itemSource(); |
michael@0 | 921 | } |
michael@0 | 922 | |
michael@0 | 923 | if (isMobile) { |
michael@0 | 924 | batchSize = MOBILE_BATCH_SIZE; |
michael@0 | 925 | } |
michael@0 | 926 | newitems.newer = this.lastSync; |
michael@0 | 927 | newitems.full = true; |
michael@0 | 928 | newitems.limit = batchSize; |
michael@0 | 929 | |
michael@0 | 930 | // applied => number of items that should be applied. |
michael@0 | 931 | // failed => number of items that failed in this sync. |
michael@0 | 932 | // newFailed => number of items that failed for the first time in this sync. |
michael@0 | 933 | // reconciled => number of items that were reconciled. |
michael@0 | 934 | let count = {applied: 0, failed: 0, newFailed: 0, reconciled: 0}; |
michael@0 | 935 | let handled = []; |
michael@0 | 936 | let applyBatch = []; |
michael@0 | 937 | let failed = []; |
michael@0 | 938 | let failedInPreviousSync = this.previousFailed; |
michael@0 | 939 | let fetchBatch = Utils.arrayUnion(this.toFetch, failedInPreviousSync); |
michael@0 | 940 | // Reset previousFailed for each sync since previously failed items may not fail again. |
michael@0 | 941 | this.previousFailed = []; |
michael@0 | 942 | |
michael@0 | 943 | // Used (via exceptions) to allow the record handler/reconciliation/etc. |
michael@0 | 944 | // methods to signal that they would like processing of incoming records to |
michael@0 | 945 | // cease. |
michael@0 | 946 | let aborting = undefined; |
michael@0 | 947 | |
michael@0 | 948 | function doApplyBatch() { |
michael@0 | 949 | this._tracker.ignoreAll = true; |
michael@0 | 950 | try { |
michael@0 | 951 | failed = failed.concat(this._store.applyIncomingBatch(applyBatch)); |
michael@0 | 952 | } catch (ex) { |
michael@0 | 953 | // Catch any error that escapes from applyIncomingBatch. At present |
michael@0 | 954 | // those will all be abort events. |
michael@0 | 955 | this._log.warn("Got exception " + Utils.exceptionStr(ex) + |
michael@0 | 956 | ", aborting processIncoming."); |
michael@0 | 957 | aborting = ex; |
michael@0 | 958 | } |
michael@0 | 959 | this._tracker.ignoreAll = false; |
michael@0 | 960 | applyBatch = []; |
michael@0 | 961 | } |
michael@0 | 962 | |
michael@0 | 963 | function doApplyBatchAndPersistFailed() { |
michael@0 | 964 | // Apply remaining batch. |
michael@0 | 965 | if (applyBatch.length) { |
michael@0 | 966 | doApplyBatch.call(this); |
michael@0 | 967 | } |
michael@0 | 968 | // Persist failed items so we refetch them. |
michael@0 | 969 | if (failed.length) { |
michael@0 | 970 | this.previousFailed = Utils.arrayUnion(failed, this.previousFailed); |
michael@0 | 971 | count.failed += failed.length; |
michael@0 | 972 | this._log.debug("Records that failed to apply: " + failed); |
michael@0 | 973 | failed = []; |
michael@0 | 974 | } |
michael@0 | 975 | } |
michael@0 | 976 | |
michael@0 | 977 | let key = this.service.collectionKeys.keyForCollection(this.name); |
michael@0 | 978 | |
michael@0 | 979 | // Not binding this method to 'this' for performance reasons. It gets |
michael@0 | 980 | // called for every incoming record. |
michael@0 | 981 | let self = this; |
michael@0 | 982 | |
michael@0 | 983 | newitems.recordHandler = function(item) { |
michael@0 | 984 | if (aborting) { |
michael@0 | 985 | return; |
michael@0 | 986 | } |
michael@0 | 987 | |
michael@0 | 988 | // Grab a later last modified if possible |
michael@0 | 989 | if (self.lastModified == null || item.modified > self.lastModified) |
michael@0 | 990 | self.lastModified = item.modified; |
michael@0 | 991 | |
michael@0 | 992 | // Track the collection for the WBO. |
michael@0 | 993 | item.collection = self.name; |
michael@0 | 994 | |
michael@0 | 995 | // Remember which records were processed |
michael@0 | 996 | handled.push(item.id); |
michael@0 | 997 | |
michael@0 | 998 | try { |
michael@0 | 999 | try { |
michael@0 | 1000 | item.decrypt(key); |
michael@0 | 1001 | } catch (ex if Utils.isHMACMismatch(ex)) { |
michael@0 | 1002 | let strategy = self.handleHMACMismatch(item, true); |
michael@0 | 1003 | if (strategy == SyncEngine.kRecoveryStrategy.retry) { |
michael@0 | 1004 | // You only get one retry. |
michael@0 | 1005 | try { |
michael@0 | 1006 | // Try decrypting again, typically because we've got new keys. |
michael@0 | 1007 | self._log.info("Trying decrypt again..."); |
michael@0 | 1008 | key = self.service.collectionKeys.keyForCollection(self.name); |
michael@0 | 1009 | item.decrypt(key); |
michael@0 | 1010 | strategy = null; |
michael@0 | 1011 | } catch (ex if Utils.isHMACMismatch(ex)) { |
michael@0 | 1012 | strategy = self.handleHMACMismatch(item, false); |
michael@0 | 1013 | } |
michael@0 | 1014 | } |
michael@0 | 1015 | |
michael@0 | 1016 | switch (strategy) { |
michael@0 | 1017 | case null: |
michael@0 | 1018 | // Retry succeeded! No further handling. |
michael@0 | 1019 | break; |
michael@0 | 1020 | case SyncEngine.kRecoveryStrategy.retry: |
michael@0 | 1021 | self._log.debug("Ignoring second retry suggestion."); |
michael@0 | 1022 | // Fall through to error case. |
michael@0 | 1023 | case SyncEngine.kRecoveryStrategy.error: |
michael@0 | 1024 | self._log.warn("Error decrypting record: " + Utils.exceptionStr(ex)); |
michael@0 | 1025 | failed.push(item.id); |
michael@0 | 1026 | return; |
michael@0 | 1027 | case SyncEngine.kRecoveryStrategy.ignore: |
michael@0 | 1028 | self._log.debug("Ignoring record " + item.id + |
michael@0 | 1029 | " with bad HMAC: already handled."); |
michael@0 | 1030 | return; |
michael@0 | 1031 | } |
michael@0 | 1032 | } |
michael@0 | 1033 | } catch (ex) { |
michael@0 | 1034 | self._log.warn("Error decrypting record: " + Utils.exceptionStr(ex)); |
michael@0 | 1035 | failed.push(item.id); |
michael@0 | 1036 | return; |
michael@0 | 1037 | } |
michael@0 | 1038 | |
michael@0 | 1039 | let shouldApply; |
michael@0 | 1040 | try { |
michael@0 | 1041 | shouldApply = self._reconcile(item); |
michael@0 | 1042 | } catch (ex if (ex.code == Engine.prototype.eEngineAbortApplyIncoming)) { |
michael@0 | 1043 | self._log.warn("Reconciliation failed: aborting incoming processing."); |
michael@0 | 1044 | failed.push(item.id); |
michael@0 | 1045 | aborting = ex.cause; |
michael@0 | 1046 | } catch (ex) { |
michael@0 | 1047 | self._log.warn("Failed to reconcile incoming record " + item.id); |
michael@0 | 1048 | self._log.warn("Encountered exception: " + Utils.exceptionStr(ex)); |
michael@0 | 1049 | failed.push(item.id); |
michael@0 | 1050 | return; |
michael@0 | 1051 | } |
michael@0 | 1052 | |
michael@0 | 1053 | if (shouldApply) { |
michael@0 | 1054 | count.applied++; |
michael@0 | 1055 | applyBatch.push(item); |
michael@0 | 1056 | } else { |
michael@0 | 1057 | count.reconciled++; |
michael@0 | 1058 | self._log.trace("Skipping reconciled incoming item " + item.id); |
michael@0 | 1059 | } |
michael@0 | 1060 | |
michael@0 | 1061 | if (applyBatch.length == self.applyIncomingBatchSize) { |
michael@0 | 1062 | doApplyBatch.call(self); |
michael@0 | 1063 | } |
michael@0 | 1064 | self._store._sleep(0); |
michael@0 | 1065 | }; |
michael@0 | 1066 | |
michael@0 | 1067 | // Only bother getting data from the server if there's new things |
michael@0 | 1068 | if (this.lastModified == null || this.lastModified > this.lastSync) { |
michael@0 | 1069 | let resp = newitems.get(); |
michael@0 | 1070 | doApplyBatchAndPersistFailed.call(this); |
michael@0 | 1071 | if (!resp.success) { |
michael@0 | 1072 | resp.failureCode = ENGINE_DOWNLOAD_FAIL; |
michael@0 | 1073 | throw resp; |
michael@0 | 1074 | } |
michael@0 | 1075 | |
michael@0 | 1076 | if (aborting) { |
michael@0 | 1077 | throw aborting; |
michael@0 | 1078 | } |
michael@0 | 1079 | } |
michael@0 | 1080 | |
michael@0 | 1081 | // Mobile: check if we got the maximum that we requested; get the rest if so. |
michael@0 | 1082 | if (handled.length == newitems.limit) { |
michael@0 | 1083 | let guidColl = new Collection(this.engineURL, null, this.service); |
michael@0 | 1084 | |
michael@0 | 1085 | // Sort and limit so that on mobile we only get the last X records. |
michael@0 | 1086 | guidColl.limit = this.downloadLimit; |
michael@0 | 1087 | guidColl.newer = this.lastSync; |
michael@0 | 1088 | |
michael@0 | 1089 | // index: Orders by the sortindex descending (highest weight first). |
michael@0 | 1090 | guidColl.sort = "index"; |
michael@0 | 1091 | |
michael@0 | 1092 | let guids = guidColl.get(); |
michael@0 | 1093 | if (!guids.success) |
michael@0 | 1094 | throw guids; |
michael@0 | 1095 | |
michael@0 | 1096 | // Figure out which guids weren't just fetched then remove any guids that |
michael@0 | 1097 | // were already waiting and prepend the new ones |
michael@0 | 1098 | let extra = Utils.arraySub(guids.obj, handled); |
michael@0 | 1099 | if (extra.length > 0) { |
michael@0 | 1100 | fetchBatch = Utils.arrayUnion(extra, fetchBatch); |
michael@0 | 1101 | this.toFetch = Utils.arrayUnion(extra, this.toFetch); |
michael@0 | 1102 | } |
michael@0 | 1103 | } |
michael@0 | 1104 | |
michael@0 | 1105 | // Fast-foward the lastSync timestamp since we have stored the |
michael@0 | 1106 | // remaining items in toFetch. |
michael@0 | 1107 | if (this.lastSync < this.lastModified) { |
michael@0 | 1108 | this.lastSync = this.lastModified; |
michael@0 | 1109 | } |
michael@0 | 1110 | |
michael@0 | 1111 | // Process any backlog of GUIDs. |
michael@0 | 1112 | // At this point we impose an upper limit on the number of items to fetch |
michael@0 | 1113 | // in a single request, even for desktop, to avoid hitting URI limits. |
michael@0 | 1114 | batchSize = isMobile ? this.mobileGUIDFetchBatchSize : |
michael@0 | 1115 | this.guidFetchBatchSize; |
michael@0 | 1116 | |
michael@0 | 1117 | while (fetchBatch.length && !aborting) { |
michael@0 | 1118 | // Reuse the original query, but get rid of the restricting params |
michael@0 | 1119 | // and batch remaining records. |
michael@0 | 1120 | newitems.limit = 0; |
michael@0 | 1121 | newitems.newer = 0; |
michael@0 | 1122 | newitems.ids = fetchBatch.slice(0, batchSize); |
michael@0 | 1123 | |
michael@0 | 1124 | // Reuse the existing record handler set earlier |
michael@0 | 1125 | let resp = newitems.get(); |
michael@0 | 1126 | if (!resp.success) { |
michael@0 | 1127 | resp.failureCode = ENGINE_DOWNLOAD_FAIL; |
michael@0 | 1128 | throw resp; |
michael@0 | 1129 | } |
michael@0 | 1130 | |
michael@0 | 1131 | // This batch was successfully applied. Not using |
michael@0 | 1132 | // doApplyBatchAndPersistFailed() here to avoid writing toFetch twice. |
michael@0 | 1133 | fetchBatch = fetchBatch.slice(batchSize); |
michael@0 | 1134 | this.toFetch = Utils.arraySub(this.toFetch, newitems.ids); |
michael@0 | 1135 | this.previousFailed = Utils.arrayUnion(this.previousFailed, failed); |
michael@0 | 1136 | if (failed.length) { |
michael@0 | 1137 | count.failed += failed.length; |
michael@0 | 1138 | this._log.debug("Records that failed to apply: " + failed); |
michael@0 | 1139 | } |
michael@0 | 1140 | failed = []; |
michael@0 | 1141 | |
michael@0 | 1142 | if (aborting) { |
michael@0 | 1143 | throw aborting; |
michael@0 | 1144 | } |
michael@0 | 1145 | |
michael@0 | 1146 | if (this.lastSync < this.lastModified) { |
michael@0 | 1147 | this.lastSync = this.lastModified; |
michael@0 | 1148 | } |
michael@0 | 1149 | } |
michael@0 | 1150 | |
michael@0 | 1151 | // Apply remaining items. |
michael@0 | 1152 | doApplyBatchAndPersistFailed.call(this); |
michael@0 | 1153 | |
michael@0 | 1154 | count.newFailed = Utils.arraySub(this.previousFailed, failedInPreviousSync).length; |
michael@0 | 1155 | count.succeeded = Math.max(0, count.applied - count.failed); |
michael@0 | 1156 | this._log.info(["Records:", |
michael@0 | 1157 | count.applied, "applied,", |
michael@0 | 1158 | count.succeeded, "successfully,", |
michael@0 | 1159 | count.failed, "failed to apply,", |
michael@0 | 1160 | count.newFailed, "newly failed to apply,", |
michael@0 | 1161 | count.reconciled, "reconciled."].join(" ")); |
michael@0 | 1162 | Observers.notify("weave:engine:sync:applied", count, this.name); |
michael@0 | 1163 | }, |
michael@0 | 1164 | |
michael@0 | 1165 | /** |
michael@0 | 1166 | * Find a GUID of an item that is a duplicate of the incoming item but happens |
michael@0 | 1167 | * to have a different GUID |
michael@0 | 1168 | * |
michael@0 | 1169 | * @return GUID of the similar item; falsy otherwise |
michael@0 | 1170 | */ |
michael@0 | 1171 | _findDupe: function (item) { |
michael@0 | 1172 | // By default, assume there's no dupe items for the engine |
michael@0 | 1173 | }, |
michael@0 | 1174 | |
michael@0 | 1175 | _deleteId: function (id) { |
michael@0 | 1176 | this._tracker.removeChangedID(id); |
michael@0 | 1177 | |
michael@0 | 1178 | // Remember this id to delete at the end of sync |
michael@0 | 1179 | if (this._delete.ids == null) |
michael@0 | 1180 | this._delete.ids = [id]; |
michael@0 | 1181 | else |
michael@0 | 1182 | this._delete.ids.push(id); |
michael@0 | 1183 | }, |
michael@0 | 1184 | |
michael@0 | 1185 | /** |
michael@0 | 1186 | * Reconcile incoming record with local state. |
michael@0 | 1187 | * |
michael@0 | 1188 | * This function essentially determines whether to apply an incoming record. |
michael@0 | 1189 | * |
michael@0 | 1190 | * @param item |
michael@0 | 1191 | * Record from server to be tested for application. |
michael@0 | 1192 | * @return boolean |
michael@0 | 1193 | * Truthy if incoming record should be applied. False if not. |
michael@0 | 1194 | */ |
michael@0 | 1195 | _reconcile: function (item) { |
michael@0 | 1196 | if (this._log.level <= Log.Level.Trace) { |
michael@0 | 1197 | this._log.trace("Incoming: " + item); |
michael@0 | 1198 | } |
michael@0 | 1199 | |
michael@0 | 1200 | // We start reconciling by collecting a bunch of state. We do this here |
michael@0 | 1201 | // because some state may change during the course of this function and we |
michael@0 | 1202 | // need to operate on the original values. |
michael@0 | 1203 | let existsLocally = this._store.itemExists(item.id); |
michael@0 | 1204 | let locallyModified = item.id in this._modified; |
michael@0 | 1205 | |
michael@0 | 1206 | // TODO Handle clock drift better. Tracked in bug 721181. |
michael@0 | 1207 | let remoteAge = AsyncResource.serverTime - item.modified; |
michael@0 | 1208 | let localAge = locallyModified ? |
michael@0 | 1209 | (Date.now() / 1000 - this._modified[item.id]) : null; |
michael@0 | 1210 | let remoteIsNewer = remoteAge < localAge; |
michael@0 | 1211 | |
michael@0 | 1212 | this._log.trace("Reconciling " + item.id + ". exists=" + |
michael@0 | 1213 | existsLocally + "; modified=" + locallyModified + |
michael@0 | 1214 | "; local age=" + localAge + "; incoming age=" + |
michael@0 | 1215 | remoteAge); |
michael@0 | 1216 | |
michael@0 | 1217 | // We handle deletions first so subsequent logic doesn't have to check |
michael@0 | 1218 | // deleted flags. |
michael@0 | 1219 | if (item.deleted) { |
michael@0 | 1220 | // If the item doesn't exist locally, there is nothing for us to do. We |
michael@0 | 1221 | // can't check for duplicates because the incoming record has no data |
michael@0 | 1222 | // which can be used for duplicate detection. |
michael@0 | 1223 | if (!existsLocally) { |
michael@0 | 1224 | this._log.trace("Ignoring incoming item because it was deleted and " + |
michael@0 | 1225 | "the item does not exist locally."); |
michael@0 | 1226 | return false; |
michael@0 | 1227 | } |
michael@0 | 1228 | |
michael@0 | 1229 | // We decide whether to process the deletion by comparing the record |
michael@0 | 1230 | // ages. If the item is not modified locally, the remote side wins and |
michael@0 | 1231 | // the deletion is processed. If it is modified locally, we take the |
michael@0 | 1232 | // newer record. |
michael@0 | 1233 | if (!locallyModified) { |
michael@0 | 1234 | this._log.trace("Applying incoming delete because the local item " + |
michael@0 | 1235 | "exists and isn't modified."); |
michael@0 | 1236 | return true; |
michael@0 | 1237 | } |
michael@0 | 1238 | |
michael@0 | 1239 | // TODO As part of bug 720592, determine whether we should do more here. |
michael@0 | 1240 | // In the case where the local changes are newer, it is quite possible |
michael@0 | 1241 | // that the local client will restore data a remote client had tried to |
michael@0 | 1242 | // delete. There might be a good reason for that delete and it might be |
michael@0 | 1243 | // enexpected for this client to restore that data. |
michael@0 | 1244 | this._log.trace("Incoming record is deleted but we had local changes. " + |
michael@0 | 1245 | "Applying the youngest record."); |
michael@0 | 1246 | return remoteIsNewer; |
michael@0 | 1247 | } |
michael@0 | 1248 | |
michael@0 | 1249 | // At this point the incoming record is not for a deletion and must have |
michael@0 | 1250 | // data. If the incoming record does not exist locally, we check for a local |
michael@0 | 1251 | // duplicate existing under a different ID. The default implementation of |
michael@0 | 1252 | // _findDupe() is empty, so engines have to opt in to this functionality. |
michael@0 | 1253 | // |
michael@0 | 1254 | // If we find a duplicate, we change the local ID to the incoming ID and we |
michael@0 | 1255 | // refresh the metadata collected above. See bug 710448 for the history |
michael@0 | 1256 | // of this logic. |
michael@0 | 1257 | if (!existsLocally) { |
michael@0 | 1258 | let dupeID = this._findDupe(item); |
michael@0 | 1259 | if (dupeID) { |
michael@0 | 1260 | this._log.trace("Local item " + dupeID + " is a duplicate for " + |
michael@0 | 1261 | "incoming item " + item.id); |
michael@0 | 1262 | |
michael@0 | 1263 | // The local, duplicate ID is always deleted on the server. |
michael@0 | 1264 | this._deleteId(dupeID); |
michael@0 | 1265 | |
michael@0 | 1266 | // The current API contract does not mandate that the ID returned by |
michael@0 | 1267 | // _findDupe() actually exists. Therefore, we have to perform this |
michael@0 | 1268 | // check. |
michael@0 | 1269 | existsLocally = this._store.itemExists(dupeID); |
michael@0 | 1270 | |
michael@0 | 1271 | // We unconditionally change the item's ID in case the engine knows of |
michael@0 | 1272 | // an item but doesn't expose it through itemExists. If the API |
michael@0 | 1273 | // contract were stronger, this could be changed. |
michael@0 | 1274 | this._log.debug("Switching local ID to incoming: " + dupeID + " -> " + |
michael@0 | 1275 | item.id); |
michael@0 | 1276 | this._store.changeItemID(dupeID, item.id); |
michael@0 | 1277 | |
michael@0 | 1278 | // If the local item was modified, we carry its metadata forward so |
michael@0 | 1279 | // appropriate reconciling can be performed. |
michael@0 | 1280 | if (dupeID in this._modified) { |
michael@0 | 1281 | locallyModified = true; |
michael@0 | 1282 | localAge = Date.now() / 1000 - this._modified[dupeID]; |
michael@0 | 1283 | remoteIsNewer = remoteAge < localAge; |
michael@0 | 1284 | |
michael@0 | 1285 | this._modified[item.id] = this._modified[dupeID]; |
michael@0 | 1286 | delete this._modified[dupeID]; |
michael@0 | 1287 | } else { |
michael@0 | 1288 | locallyModified = false; |
michael@0 | 1289 | localAge = null; |
michael@0 | 1290 | } |
michael@0 | 1291 | |
michael@0 | 1292 | this._log.debug("Local item after duplication: age=" + localAge + |
michael@0 | 1293 | "; modified=" + locallyModified + "; exists=" + |
michael@0 | 1294 | existsLocally); |
michael@0 | 1295 | } else { |
michael@0 | 1296 | this._log.trace("No duplicate found for incoming item: " + item.id); |
michael@0 | 1297 | } |
michael@0 | 1298 | } |
michael@0 | 1299 | |
michael@0 | 1300 | // At this point we've performed duplicate detection. But, nothing here |
michael@0 | 1301 | // should depend on duplicate detection as the above should have updated |
michael@0 | 1302 | // state seamlessly. |
michael@0 | 1303 | |
michael@0 | 1304 | if (!existsLocally) { |
michael@0 | 1305 | // If the item doesn't exist locally and we have no local modifications |
michael@0 | 1306 | // to the item (implying that it was not deleted), always apply the remote |
michael@0 | 1307 | // item. |
michael@0 | 1308 | if (!locallyModified) { |
michael@0 | 1309 | this._log.trace("Applying incoming because local item does not exist " + |
michael@0 | 1310 | "and was not deleted."); |
michael@0 | 1311 | return true; |
michael@0 | 1312 | } |
michael@0 | 1313 | |
michael@0 | 1314 | // If the item was modified locally but isn't present, it must have |
michael@0 | 1315 | // been deleted. If the incoming record is younger, we restore from |
michael@0 | 1316 | // that record. |
michael@0 | 1317 | if (remoteIsNewer) { |
michael@0 | 1318 | this._log.trace("Applying incoming because local item was deleted " + |
michael@0 | 1319 | "before the incoming item was changed."); |
michael@0 | 1320 | delete this._modified[item.id]; |
michael@0 | 1321 | return true; |
michael@0 | 1322 | } |
michael@0 | 1323 | |
michael@0 | 1324 | this._log.trace("Ignoring incoming item because the local item's " + |
michael@0 | 1325 | "deletion is newer."); |
michael@0 | 1326 | return false; |
michael@0 | 1327 | } |
michael@0 | 1328 | |
michael@0 | 1329 | // If the remote and local records are the same, there is nothing to be |
michael@0 | 1330 | // done, so we don't do anything. In the ideal world, this logic wouldn't |
michael@0 | 1331 | // be here and the engine would take a record and apply it. The reason we |
michael@0 | 1332 | // want to defer this logic is because it would avoid a redundant and |
michael@0 | 1333 | // possibly expensive dip into the storage layer to query item state. |
michael@0 | 1334 | // This should get addressed in the async rewrite, so we ignore it for now. |
michael@0 | 1335 | let localRecord = this._createRecord(item.id); |
michael@0 | 1336 | let recordsEqual = Utils.deepEquals(item.cleartext, |
michael@0 | 1337 | localRecord.cleartext); |
michael@0 | 1338 | |
michael@0 | 1339 | // If the records are the same, we don't need to do anything. This does |
michael@0 | 1340 | // potentially throw away a local modification time. But, if the records |
michael@0 | 1341 | // are the same, does it matter? |
michael@0 | 1342 | if (recordsEqual) { |
michael@0 | 1343 | this._log.trace("Ignoring incoming item because the local item is " + |
michael@0 | 1344 | "identical."); |
michael@0 | 1345 | |
michael@0 | 1346 | delete this._modified[item.id]; |
michael@0 | 1347 | return false; |
michael@0 | 1348 | } |
michael@0 | 1349 | |
michael@0 | 1350 | // At this point the records are different. |
michael@0 | 1351 | |
michael@0 | 1352 | // If we have no local modifications, always take the server record. |
michael@0 | 1353 | if (!locallyModified) { |
michael@0 | 1354 | this._log.trace("Applying incoming record because no local conflicts."); |
michael@0 | 1355 | return true; |
michael@0 | 1356 | } |
michael@0 | 1357 | |
michael@0 | 1358 | // At this point, records are different and the local record is modified. |
michael@0 | 1359 | // We resolve conflicts by record age, where the newest one wins. This does |
michael@0 | 1360 | // result in data loss and should be handled by giving the engine an |
michael@0 | 1361 | // opportunity to merge the records. Bug 720592 tracks this feature. |
michael@0 | 1362 | this._log.warn("DATA LOSS: Both local and remote changes to record: " + |
michael@0 | 1363 | item.id); |
michael@0 | 1364 | return remoteIsNewer; |
michael@0 | 1365 | }, |
michael@0 | 1366 | |
michael@0 | 1367 | // Upload outgoing records. |
michael@0 | 1368 | _uploadOutgoing: function () { |
michael@0 | 1369 | this._log.trace("Uploading local changes to server."); |
michael@0 | 1370 | |
michael@0 | 1371 | let modifiedIDs = Object.keys(this._modified); |
michael@0 | 1372 | if (modifiedIDs.length) { |
michael@0 | 1373 | this._log.trace("Preparing " + modifiedIDs.length + |
michael@0 | 1374 | " outgoing records"); |
michael@0 | 1375 | |
michael@0 | 1376 | // collection we'll upload |
michael@0 | 1377 | let up = new Collection(this.engineURL, null, this.service); |
michael@0 | 1378 | let count = 0; |
michael@0 | 1379 | |
michael@0 | 1380 | // Upload what we've got so far in the collection |
michael@0 | 1381 | let doUpload = Utils.bind2(this, function(desc) { |
michael@0 | 1382 | this._log.info("Uploading " + desc + " of " + modifiedIDs.length + |
michael@0 | 1383 | " records"); |
michael@0 | 1384 | let resp = up.post(); |
michael@0 | 1385 | if (!resp.success) { |
michael@0 | 1386 | this._log.debug("Uploading records failed: " + resp); |
michael@0 | 1387 | resp.failureCode = ENGINE_UPLOAD_FAIL; |
michael@0 | 1388 | throw resp; |
michael@0 | 1389 | } |
michael@0 | 1390 | |
michael@0 | 1391 | // Update server timestamp from the upload. |
michael@0 | 1392 | let modified = resp.headers["x-weave-timestamp"]; |
michael@0 | 1393 | if (modified > this.lastSync) |
michael@0 | 1394 | this.lastSync = modified; |
michael@0 | 1395 | |
michael@0 | 1396 | let failed_ids = Object.keys(resp.obj.failed); |
michael@0 | 1397 | if (failed_ids.length) |
michael@0 | 1398 | this._log.debug("Records that will be uploaded again because " |
michael@0 | 1399 | + "the server couldn't store them: " |
michael@0 | 1400 | + failed_ids.join(", ")); |
michael@0 | 1401 | |
michael@0 | 1402 | // Clear successfully uploaded objects. |
michael@0 | 1403 | for each (let id in resp.obj.success) { |
michael@0 | 1404 | delete this._modified[id]; |
michael@0 | 1405 | } |
michael@0 | 1406 | |
michael@0 | 1407 | up.clearRecords(); |
michael@0 | 1408 | }); |
michael@0 | 1409 | |
michael@0 | 1410 | for each (let id in modifiedIDs) { |
michael@0 | 1411 | try { |
michael@0 | 1412 | let out = this._createRecord(id); |
michael@0 | 1413 | if (this._log.level <= Log.Level.Trace) |
michael@0 | 1414 | this._log.trace("Outgoing: " + out); |
michael@0 | 1415 | |
michael@0 | 1416 | out.encrypt(this.service.collectionKeys.keyForCollection(this.name)); |
michael@0 | 1417 | up.pushData(out); |
michael@0 | 1418 | } |
michael@0 | 1419 | catch(ex) { |
michael@0 | 1420 | this._log.warn("Error creating record: " + Utils.exceptionStr(ex)); |
michael@0 | 1421 | } |
michael@0 | 1422 | |
michael@0 | 1423 | // Partial upload |
michael@0 | 1424 | if ((++count % MAX_UPLOAD_RECORDS) == 0) |
michael@0 | 1425 | doUpload((count - MAX_UPLOAD_RECORDS) + " - " + count + " out"); |
michael@0 | 1426 | |
michael@0 | 1427 | this._store._sleep(0); |
michael@0 | 1428 | } |
michael@0 | 1429 | |
michael@0 | 1430 | // Final upload |
michael@0 | 1431 | if (count % MAX_UPLOAD_RECORDS > 0) |
michael@0 | 1432 | doUpload(count >= MAX_UPLOAD_RECORDS ? "last batch" : "all"); |
michael@0 | 1433 | } |
michael@0 | 1434 | }, |
michael@0 | 1435 | |
michael@0 | 1436 | // Any cleanup necessary. |
michael@0 | 1437 | // Save the current snapshot so as to calculate changes at next sync |
michael@0 | 1438 | _syncFinish: function () { |
michael@0 | 1439 | this._log.trace("Finishing up sync"); |
michael@0 | 1440 | this._tracker.resetScore(); |
michael@0 | 1441 | |
michael@0 | 1442 | let doDelete = Utils.bind2(this, function(key, val) { |
michael@0 | 1443 | let coll = new Collection(this.engineURL, this._recordObj, this.service); |
michael@0 | 1444 | coll[key] = val; |
michael@0 | 1445 | coll.delete(); |
michael@0 | 1446 | }); |
michael@0 | 1447 | |
michael@0 | 1448 | for (let [key, val] in Iterator(this._delete)) { |
michael@0 | 1449 | // Remove the key for future uses |
michael@0 | 1450 | delete this._delete[key]; |
michael@0 | 1451 | |
michael@0 | 1452 | // Send a simple delete for the property |
michael@0 | 1453 | if (key != "ids" || val.length <= 100) |
michael@0 | 1454 | doDelete(key, val); |
michael@0 | 1455 | else { |
michael@0 | 1456 | // For many ids, split into chunks of at most 100 |
michael@0 | 1457 | while (val.length > 0) { |
michael@0 | 1458 | doDelete(key, val.slice(0, 100)); |
michael@0 | 1459 | val = val.slice(100); |
michael@0 | 1460 | } |
michael@0 | 1461 | } |
michael@0 | 1462 | } |
michael@0 | 1463 | }, |
michael@0 | 1464 | |
michael@0 | 1465 | _syncCleanup: function () { |
michael@0 | 1466 | if (!this._modified) { |
michael@0 | 1467 | return; |
michael@0 | 1468 | } |
michael@0 | 1469 | |
michael@0 | 1470 | // Mark failed WBOs as changed again so they are reuploaded next time. |
michael@0 | 1471 | for (let [id, when] in Iterator(this._modified)) { |
michael@0 | 1472 | this._tracker.addChangedID(id, when); |
michael@0 | 1473 | } |
michael@0 | 1474 | this._modified = {}; |
michael@0 | 1475 | }, |
michael@0 | 1476 | |
michael@0 | 1477 | _sync: function () { |
michael@0 | 1478 | try { |
michael@0 | 1479 | this._syncStartup(); |
michael@0 | 1480 | Observers.notify("weave:engine:sync:status", "process-incoming"); |
michael@0 | 1481 | this._processIncoming(); |
michael@0 | 1482 | Observers.notify("weave:engine:sync:status", "upload-outgoing"); |
michael@0 | 1483 | this._uploadOutgoing(); |
michael@0 | 1484 | this._syncFinish(); |
michael@0 | 1485 | } finally { |
michael@0 | 1486 | this._syncCleanup(); |
michael@0 | 1487 | } |
michael@0 | 1488 | }, |
michael@0 | 1489 | |
michael@0 | 1490 | canDecrypt: function () { |
michael@0 | 1491 | // Report failure even if there's nothing to decrypt |
michael@0 | 1492 | let canDecrypt = false; |
michael@0 | 1493 | |
michael@0 | 1494 | // Fetch the most recently uploaded record and try to decrypt it |
michael@0 | 1495 | let test = new Collection(this.engineURL, this._recordObj, this.service); |
michael@0 | 1496 | test.limit = 1; |
michael@0 | 1497 | test.sort = "newest"; |
michael@0 | 1498 | test.full = true; |
michael@0 | 1499 | |
michael@0 | 1500 | let key = this.service.collectionKeys.keyForCollection(this.name); |
michael@0 | 1501 | test.recordHandler = function recordHandler(record) { |
michael@0 | 1502 | record.decrypt(key); |
michael@0 | 1503 | canDecrypt = true; |
michael@0 | 1504 | }.bind(this); |
michael@0 | 1505 | |
michael@0 | 1506 | // Any failure fetching/decrypting will just result in false |
michael@0 | 1507 | try { |
michael@0 | 1508 | this._log.trace("Trying to decrypt a record from the server.."); |
michael@0 | 1509 | test.get(); |
michael@0 | 1510 | } |
michael@0 | 1511 | catch(ex) { |
michael@0 | 1512 | this._log.debug("Failed test decrypt: " + Utils.exceptionStr(ex)); |
michael@0 | 1513 | } |
michael@0 | 1514 | |
michael@0 | 1515 | return canDecrypt; |
michael@0 | 1516 | }, |
michael@0 | 1517 | |
michael@0 | 1518 | _resetClient: function () { |
michael@0 | 1519 | this.resetLastSync(); |
michael@0 | 1520 | this.previousFailed = []; |
michael@0 | 1521 | this.toFetch = []; |
michael@0 | 1522 | }, |
michael@0 | 1523 | |
michael@0 | 1524 | wipeServer: function () { |
michael@0 | 1525 | let response = this.service.resource(this.engineURL).delete(); |
michael@0 | 1526 | if (response.status != 200 && response.status != 404) { |
michael@0 | 1527 | throw response; |
michael@0 | 1528 | } |
michael@0 | 1529 | this._resetClient(); |
michael@0 | 1530 | }, |
michael@0 | 1531 | |
michael@0 | 1532 | removeClientData: function () { |
michael@0 | 1533 | // Implement this method in engines that store client specific data |
michael@0 | 1534 | // on the server. |
michael@0 | 1535 | }, |
michael@0 | 1536 | |
michael@0 | 1537 | /* |
michael@0 | 1538 | * Decide on (and partially effect) an error-handling strategy. |
michael@0 | 1539 | * |
michael@0 | 1540 | * Asks the Service to respond to an HMAC error, which might result in keys |
michael@0 | 1541 | * being downloaded. That call returns true if an action which might allow a |
michael@0 | 1542 | * retry to occur. |
michael@0 | 1543 | * |
michael@0 | 1544 | * If `mayRetry` is truthy, and the Service suggests a retry, |
michael@0 | 1545 | * handleHMACMismatch returns kRecoveryStrategy.retry. Otherwise, it returns |
michael@0 | 1546 | * kRecoveryStrategy.error. |
michael@0 | 1547 | * |
michael@0 | 1548 | * Subclasses of SyncEngine can override this method to allow for different |
michael@0 | 1549 | * behavior -- e.g., to delete and ignore erroneous entries. |
michael@0 | 1550 | * |
michael@0 | 1551 | * All return values will be part of the kRecoveryStrategy enumeration. |
michael@0 | 1552 | */ |
michael@0 | 1553 | handleHMACMismatch: function (item, mayRetry) { |
michael@0 | 1554 | // By default we either try again, or bail out noisily. |
michael@0 | 1555 | return (this.service.handleHMACEvent() && mayRetry) ? |
michael@0 | 1556 | SyncEngine.kRecoveryStrategy.retry : |
michael@0 | 1557 | SyncEngine.kRecoveryStrategy.error; |
michael@0 | 1558 | } |
michael@0 | 1559 | }; |