mobile/android/base/sync/synchronizer/SynchronizerSession.java

Wed, 31 Dec 2014 06:09:35 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Wed, 31 Dec 2014 06:09:35 +0100
changeset 0
6474c204b198
permissions
-rw-r--r--

Cloned upstream origin tor-browser at tor-browser-31.3.0esr-4.5-1-build1
revision ID fc1c9ff7c1b2defdbc039f12214767608f46423f for hacking purpose.

michael@0 1 /* This Source Code Form is subject to the terms of the Mozilla Public
michael@0 2 * License, v. 2.0. If a copy of the MPL was not distributed with this
michael@0 3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
michael@0 4
michael@0 5 package org.mozilla.gecko.sync.synchronizer;
michael@0 6
michael@0 7
michael@0 8 import java.util.concurrent.ExecutorService;
michael@0 9 import java.util.concurrent.atomic.AtomicInteger;
michael@0 10
michael@0 11 import org.mozilla.gecko.background.common.log.Logger;
michael@0 12 import org.mozilla.gecko.sync.repositories.InactiveSessionException;
michael@0 13 import org.mozilla.gecko.sync.repositories.InvalidSessionTransitionException;
michael@0 14 import org.mozilla.gecko.sync.repositories.RepositorySession;
michael@0 15 import org.mozilla.gecko.sync.repositories.RepositorySessionBundle;
michael@0 16 import org.mozilla.gecko.sync.repositories.delegates.DeferrableRepositorySessionCreationDelegate;
michael@0 17 import org.mozilla.gecko.sync.repositories.delegates.DeferredRepositorySessionFinishDelegate;
michael@0 18 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFinishDelegate;
michael@0 19
michael@0 20 import android.content.Context;
michael@0 21
michael@0 22 /**
michael@0 23 * I coordinate the moving parts of a sync started by
michael@0 24 * {@link Synchronizer#synchronize}.
michael@0 25 *
michael@0 26 * I flow records twice: first from A to B, and then from B to A. I provide
michael@0 27 * fine-grained feedback by calling my delegate's callback methods.
michael@0 28 *
michael@0 29 * Initialize me by creating me with a Synchronizer and a
michael@0 30 * SynchronizerSessionDelegate. Kick things off by calling `init` with two
michael@0 31 * RepositorySessionBundles, and then call `synchronize` in your `onInitialized`
michael@0 32 * callback.
michael@0 33 *
michael@0 34 * I always call exactly one of my delegate's `onInitialized` or
michael@0 35 * `onSessionError` callback methods from `init`.
michael@0 36 *
michael@0 37 * I call my delegate's `onSynchronizeSkipped` callback method if there is no
michael@0 38 * data to be synchronized in `synchronize`.
michael@0 39 *
michael@0 40 * In addition, I call `onFetchError`, `onStoreError`, and `onSessionError` when
michael@0 41 * I encounter a fetch, store, or session error while synchronizing.
michael@0 42 *
michael@0 43 * Typically my delegate will call `abort` in its error callbacks, which will
michael@0 44 * call my delegate's `onSynchronizeAborted` method and halt the sync.
michael@0 45 *
michael@0 46 * I always call exactly one of my delegate's `onSynchronized` or
michael@0 47 * `onSynchronizeFailed` callback methods if I have not seen an error.
michael@0 48 */
michael@0 49 public class SynchronizerSession
michael@0 50 extends DeferrableRepositorySessionCreationDelegate
michael@0 51 implements RecordsChannelDelegate,
michael@0 52 RepositorySessionFinishDelegate {
michael@0 53
michael@0 54 protected static final String LOG_TAG = "SynchronizerSession";
michael@0 55 protected Synchronizer synchronizer;
michael@0 56 protected SynchronizerSessionDelegate delegate;
michael@0 57 protected Context context;
michael@0 58
michael@0 59 /*
michael@0 60 * Computed during init.
michael@0 61 */
michael@0 62 private RepositorySession sessionA;
michael@0 63 private RepositorySession sessionB;
michael@0 64 private RepositorySessionBundle bundleA;
michael@0 65 private RepositorySessionBundle bundleB;
michael@0 66
michael@0 67 // Bug 726054: just like desktop, we track our last interaction with the server,
michael@0 68 // not the last record timestamp that we fetched. This ensures that we don't re-
michael@0 69 // download the records we just uploaded, at the cost of skipping any records
michael@0 70 // that a concurrently syncing client has uploaded.
michael@0 71 private long pendingATimestamp = -1;
michael@0 72 private long pendingBTimestamp = -1;
michael@0 73 private long storeEndATimestamp = -1;
michael@0 74 private long storeEndBTimestamp = -1;
michael@0 75 private boolean flowAToBCompleted = false;
michael@0 76 private boolean flowBToACompleted = false;
michael@0 77
michael@0 78 protected final AtomicInteger numInboundRecords = new AtomicInteger(-1);
michael@0 79 protected final AtomicInteger numOutboundRecords = new AtomicInteger(-1);
michael@0 80
michael@0 81 /*
michael@0 82 * Public API: constructor, init, synchronize.
michael@0 83 */
michael@0 84 public SynchronizerSession(Synchronizer synchronizer, SynchronizerSessionDelegate delegate) {
michael@0 85 this.setSynchronizer(synchronizer);
michael@0 86 this.delegate = delegate;
michael@0 87 }
michael@0 88
michael@0 89 public Synchronizer getSynchronizer() {
michael@0 90 return synchronizer;
michael@0 91 }
michael@0 92
michael@0 93 public void setSynchronizer(Synchronizer synchronizer) {
michael@0 94 this.synchronizer = synchronizer;
michael@0 95 }
michael@0 96
michael@0 97 public void init(Context context, RepositorySessionBundle bundleA, RepositorySessionBundle bundleB) {
michael@0 98 this.context = context;
michael@0 99 this.bundleA = bundleA;
michael@0 100 this.bundleB = bundleB;
michael@0 101 // Begin sessionA and sessionB, call onInitialized in callbacks.
michael@0 102 this.getSynchronizer().repositoryA.createSession(this, context);
michael@0 103 }
michael@0 104
michael@0 105 /**
michael@0 106 * Get the number of records fetched from the first repository (usually the
michael@0 107 * server, hence inbound).
michael@0 108 * <p>
michael@0 109 * Valid only after first flow has completed.
michael@0 110 *
michael@0 111 * @return number of records, or -1 if not valid.
michael@0 112 */
michael@0 113 public int getInboundCount() {
michael@0 114 return numInboundRecords.get();
michael@0 115 }
michael@0 116
michael@0 117 /**
michael@0 118 * Get the number of records fetched from the second repository (usually the
michael@0 119 * local store, hence outbound).
michael@0 120 * <p>
michael@0 121 * Valid only after second flow has completed.
michael@0 122 *
michael@0 123 * @return number of records, or -1 if not valid.
michael@0 124 */
michael@0 125 public int getOutboundCount() {
michael@0 126 return numOutboundRecords.get();
michael@0 127 }
michael@0 128
michael@0 129 // These are accessed by `abort` and `synchronize`, both of which are synchronized.
michael@0 130 // Guarded by `this`.
michael@0 131 protected RecordsChannel channelAToB;
michael@0 132 protected RecordsChannel channelBToA;
michael@0 133
michael@0 134 /**
michael@0 135 * Please don't call this until you've been notified with onInitialized.
michael@0 136 */
michael@0 137 public synchronized void synchronize() {
michael@0 138 numInboundRecords.set(-1);
michael@0 139 numOutboundRecords.set(-1);
michael@0 140
michael@0 141 // First thing: decide whether we should.
michael@0 142 if (sessionA.shouldSkip() ||
michael@0 143 sessionB.shouldSkip()) {
michael@0 144 Logger.info(LOG_TAG, "Session requested skip. Short-circuiting sync.");
michael@0 145 sessionA.abort();
michael@0 146 sessionB.abort();
michael@0 147 this.delegate.onSynchronizeSkipped(this);
michael@0 148 return;
michael@0 149 }
michael@0 150
michael@0 151 final SynchronizerSession session = this;
michael@0 152
michael@0 153 // TODO: failed record handling.
michael@0 154
michael@0 155 // This is the *second* record channel to flow.
michael@0 156 // I, SynchronizerSession, am the delegate for the *second* flow.
michael@0 157 channelBToA = new RecordsChannel(this.sessionB, this.sessionA, this);
michael@0 158
michael@0 159 // This is the delegate for the *first* flow.
michael@0 160 RecordsChannelDelegate channelAToBDelegate = new RecordsChannelDelegate() {
michael@0 161 public void onFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd) {
michael@0 162 session.onFirstFlowCompleted(recordsChannel, fetchEnd, storeEnd);
michael@0 163 }
michael@0 164
michael@0 165 @Override
michael@0 166 public void onFlowBeginFailed(RecordsChannel recordsChannel, Exception ex) {
michael@0 167 Logger.warn(LOG_TAG, "First RecordsChannel onFlowBeginFailed. Logging session error.", ex);
michael@0 168 session.delegate.onSynchronizeFailed(session, ex, "Failed to begin first flow.");
michael@0 169 }
michael@0 170
michael@0 171 @Override
michael@0 172 public void onFlowFetchFailed(RecordsChannel recordsChannel, Exception ex) {
michael@0 173 Logger.warn(LOG_TAG, "First RecordsChannel onFlowFetchFailed. Logging remote fetch error.", ex);
michael@0 174 }
michael@0 175
michael@0 176 @Override
michael@0 177 public void onFlowStoreFailed(RecordsChannel recordsChannel, Exception ex, String recordGuid) {
michael@0 178 Logger.warn(LOG_TAG, "First RecordsChannel onFlowStoreFailed. Logging local store error.", ex);
michael@0 179 }
michael@0 180
michael@0 181 @Override
michael@0 182 public void onFlowFinishFailed(RecordsChannel recordsChannel, Exception ex) {
michael@0 183 Logger.warn(LOG_TAG, "First RecordsChannel onFlowFinishedFailed. Logging session error.", ex);
michael@0 184 session.delegate.onSynchronizeFailed(session, ex, "Failed to finish first flow.");
michael@0 185 }
michael@0 186 };
michael@0 187
michael@0 188 // This is the *first* channel to flow.
michael@0 189 channelAToB = new RecordsChannel(this.sessionA, this.sessionB, channelAToBDelegate);
michael@0 190
michael@0 191 Logger.trace(LOG_TAG, "Starting A to B flow. Channel is " + channelAToB);
michael@0 192 try {
michael@0 193 channelAToB.beginAndFlow();
michael@0 194 } catch (InvalidSessionTransitionException e) {
michael@0 195 onFlowBeginFailed(channelAToB, e);
michael@0 196 }
michael@0 197 }
michael@0 198
michael@0 199 /**
michael@0 200 * Called after the first flow completes.
michael@0 201 * <p>
michael@0 202 * By default, any fetch and store failures are ignored.
michael@0 203 * @param recordsChannel the <code>RecordsChannel</code> (for error testing).
michael@0 204 * @param fetchEnd timestamp when fetches completed.
michael@0 205 * @param storeEnd timestamp when stores completed.
michael@0 206 */
michael@0 207 public void onFirstFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd) {
michael@0 208 Logger.trace(LOG_TAG, "First RecordsChannel onFlowCompleted.");
michael@0 209 Logger.debug(LOG_TAG, "Fetch end is " + fetchEnd + ". Store end is " + storeEnd + ". Starting next.");
michael@0 210 pendingATimestamp = fetchEnd;
michael@0 211 storeEndBTimestamp = storeEnd;
michael@0 212 numInboundRecords.set(recordsChannel.getFetchCount());
michael@0 213 flowAToBCompleted = true;
michael@0 214 channelBToA.flow();
michael@0 215 }
michael@0 216
michael@0 217 /**
michael@0 218 * Called after the second flow completes.
michael@0 219 * <p>
michael@0 220 * By default, any fetch and store failures are ignored.
michael@0 221 * @param recordsChannel the <code>RecordsChannel</code> (for error testing).
michael@0 222 * @param fetchEnd timestamp when fetches completed.
michael@0 223 * @param storeEnd timestamp when stores completed.
michael@0 224 */
michael@0 225 public void onSecondFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd) {
michael@0 226 Logger.trace(LOG_TAG, "Second RecordsChannel onFlowCompleted.");
michael@0 227 Logger.debug(LOG_TAG, "Fetch end is " + fetchEnd + ". Store end is " + storeEnd + ". Finishing.");
michael@0 228
michael@0 229 pendingBTimestamp = fetchEnd;
michael@0 230 storeEndATimestamp = storeEnd;
michael@0 231 numOutboundRecords.set(recordsChannel.getFetchCount());
michael@0 232 flowBToACompleted = true;
michael@0 233
michael@0 234 // Finish the two sessions.
michael@0 235 try {
michael@0 236 this.sessionA.finish(this);
michael@0 237 } catch (InactiveSessionException e) {
michael@0 238 this.onFinishFailed(e);
michael@0 239 return;
michael@0 240 }
michael@0 241 }
michael@0 242
michael@0 243 @Override
michael@0 244 public void onFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd) {
michael@0 245 onSecondFlowCompleted(recordsChannel, fetchEnd, storeEnd);
michael@0 246 }
michael@0 247
michael@0 248 @Override
michael@0 249 public void onFlowBeginFailed(RecordsChannel recordsChannel, Exception ex) {
michael@0 250 Logger.warn(LOG_TAG, "Second RecordsChannel onFlowBeginFailed. Logging session error.", ex);
michael@0 251 this.delegate.onSynchronizeFailed(this, ex, "Failed to begin second flow.");
michael@0 252 }
michael@0 253
michael@0 254 @Override
michael@0 255 public void onFlowFetchFailed(RecordsChannel recordsChannel, Exception ex) {
michael@0 256 Logger.warn(LOG_TAG, "Second RecordsChannel onFlowFetchFailed. Logging local fetch error.", ex);
michael@0 257 }
michael@0 258
michael@0 259 @Override
michael@0 260 public void onFlowStoreFailed(RecordsChannel recordsChannel, Exception ex, String recordGuid) {
michael@0 261 Logger.warn(LOG_TAG, "Second RecordsChannel onFlowStoreFailed. Logging remote store error.", ex);
michael@0 262 }
michael@0 263
michael@0 264 @Override
michael@0 265 public void onFlowFinishFailed(RecordsChannel recordsChannel, Exception ex) {
michael@0 266 Logger.warn(LOG_TAG, "Second RecordsChannel onFlowFinishedFailed. Logging session error.", ex);
michael@0 267 this.delegate.onSynchronizeFailed(this, ex, "Failed to finish second flow.");
michael@0 268 }
michael@0 269
michael@0 270 /*
michael@0 271 * RepositorySessionCreationDelegate methods.
michael@0 272 */
michael@0 273
michael@0 274 /**
michael@0 275 * I could be called twice: once for sessionA and once for sessionB.
michael@0 276 *
michael@0 277 * I try to clean up sessionA if it is not null, since the creation of
michael@0 278 * sessionB must have failed.
michael@0 279 */
michael@0 280 @Override
michael@0 281 public void onSessionCreateFailed(Exception ex) {
michael@0 282 // Attempt to finish the first session, if the second is the one that failed.
michael@0 283 if (this.sessionA != null) {
michael@0 284 try {
michael@0 285 // We no longer need a reference to our context.
michael@0 286 this.context = null;
michael@0 287 this.sessionA.finish(this);
michael@0 288 } catch (Exception e) {
michael@0 289 // Never mind; best-effort finish.
michael@0 290 }
michael@0 291 }
michael@0 292 // We no longer need a reference to our context.
michael@0 293 this.context = null;
michael@0 294 this.delegate.onSynchronizeFailed(this, ex, "Failed to create session");
michael@0 295 }
michael@0 296
michael@0 297 /**
michael@0 298 * I should be called twice: first for sessionA and second for sessionB.
michael@0 299 *
michael@0 300 * If I am called for sessionB, I call my delegate's `onInitialized` callback
michael@0 301 * method because my repository sessions are correctly initialized.
michael@0 302 */
michael@0 303 // TODO: some of this "finish and clean up" code can be refactored out.
michael@0 304 @Override
michael@0 305 public void onSessionCreated(RepositorySession session) {
michael@0 306 if (session == null ||
michael@0 307 this.sessionA == session) {
michael@0 308 // TODO: clean up sessionA.
michael@0 309 this.delegate.onSynchronizeFailed(this, new UnexpectedSessionException(session), "Failed to create session.");
michael@0 310 return;
michael@0 311 }
michael@0 312 if (this.sessionA == null) {
michael@0 313 this.sessionA = session;
michael@0 314
michael@0 315 // Unbundle.
michael@0 316 try {
michael@0 317 this.sessionA.unbundle(this.bundleA);
michael@0 318 } catch (Exception e) {
michael@0 319 this.delegate.onSynchronizeFailed(this, new UnbundleError(e, sessionA), "Failed to unbundle first session.");
michael@0 320 // TODO: abort
michael@0 321 return;
michael@0 322 }
michael@0 323 this.getSynchronizer().repositoryB.createSession(this, this.context);
michael@0 324 return;
michael@0 325 }
michael@0 326 if (this.sessionB == null) {
michael@0 327 this.sessionB = session;
michael@0 328 // We no longer need a reference to our context.
michael@0 329 this.context = null;
michael@0 330
michael@0 331 // Unbundle. We unbundled sessionA when that session was created.
michael@0 332 try {
michael@0 333 this.sessionB.unbundle(this.bundleB);
michael@0 334 } catch (Exception e) {
michael@0 335 this.delegate.onSynchronizeFailed(this, new UnbundleError(e, sessionA), "Failed to unbundle second session.");
michael@0 336 return;
michael@0 337 }
michael@0 338
michael@0 339 this.delegate.onInitialized(this);
michael@0 340 return;
michael@0 341 }
michael@0 342 // TODO: need a way to make sure we don't call any more delegate methods.
michael@0 343 this.delegate.onSynchronizeFailed(this, new UnexpectedSessionException(session), "Failed to create session.");
michael@0 344 }
michael@0 345
michael@0 346 /*
michael@0 347 * RepositorySessionFinishDelegate methods.
michael@0 348 */
michael@0 349
michael@0 350 /**
michael@0 351 * I could be called twice: once for sessionA and once for sessionB.
michael@0 352 *
michael@0 353 * If sessionB couldn't be created, I don't fail again.
michael@0 354 */
michael@0 355 @Override
michael@0 356 public void onFinishFailed(Exception ex) {
michael@0 357 if (this.sessionB == null) {
michael@0 358 // Ah, it was a problem cleaning up. Never mind.
michael@0 359 Logger.warn(LOG_TAG, "Got exception cleaning up first after second session creation failed.", ex);
michael@0 360 return;
michael@0 361 }
michael@0 362 String session = (this.sessionA == null) ? "B" : "A";
michael@0 363 this.delegate.onSynchronizeFailed(this, ex, "Finish of session " + session + " failed.");
michael@0 364 }
michael@0 365
michael@0 366 /**
michael@0 367 * I should be called twice: first for sessionA and second for sessionB.
michael@0 368 *
michael@0 369 * If I am called for sessionA, I try to finish sessionB.
michael@0 370 *
michael@0 371 * If I am called for sessionB, I call my delegate's `onSynchronized` callback
michael@0 372 * method because my flows should have completed.
michael@0 373 */
michael@0 374 @Override
michael@0 375 public void onFinishSucceeded(RepositorySession session,
michael@0 376 RepositorySessionBundle bundle) {
michael@0 377 Logger.debug(LOG_TAG, "onFinishSucceeded. Flows? " + flowAToBCompleted + ", " + flowBToACompleted);
michael@0 378
michael@0 379 if (session == sessionA) {
michael@0 380 if (flowAToBCompleted) {
michael@0 381 Logger.debug(LOG_TAG, "onFinishSucceeded: bumping session A's timestamp to " + pendingATimestamp + " or " + storeEndATimestamp);
michael@0 382 bundle.bumpTimestamp(Math.max(pendingATimestamp, storeEndATimestamp));
michael@0 383 this.synchronizer.bundleA = bundle;
michael@0 384 } else {
michael@0 385 // Should not happen!
michael@0 386 this.delegate.onSynchronizeFailed(this, new UnexpectedSessionException(sessionA), "Failed to finish first session.");
michael@0 387 return;
michael@0 388 }
michael@0 389 if (this.sessionB != null) {
michael@0 390 Logger.trace(LOG_TAG, "Finishing session B.");
michael@0 391 // On to the next.
michael@0 392 try {
michael@0 393 this.sessionB.finish(this);
michael@0 394 } catch (InactiveSessionException e) {
michael@0 395 this.onFinishFailed(e);
michael@0 396 return;
michael@0 397 }
michael@0 398 }
michael@0 399 } else if (session == sessionB) {
michael@0 400 if (flowBToACompleted) {
michael@0 401 Logger.debug(LOG_TAG, "onFinishSucceeded: bumping session B's timestamp to " + pendingBTimestamp + " or " + storeEndBTimestamp);
michael@0 402 bundle.bumpTimestamp(Math.max(pendingBTimestamp, storeEndBTimestamp));
michael@0 403 this.synchronizer.bundleB = bundle;
michael@0 404 Logger.trace(LOG_TAG, "Notifying delegate.onSynchronized.");
michael@0 405 this.delegate.onSynchronized(this);
michael@0 406 } else {
michael@0 407 // Should not happen!
michael@0 408 this.delegate.onSynchronizeFailed(this, new UnexpectedSessionException(sessionB), "Failed to finish second session.");
michael@0 409 return;
michael@0 410 }
michael@0 411 } else {
michael@0 412 // TODO: hurrrrrr...
michael@0 413 }
michael@0 414
michael@0 415 if (this.sessionB == null) {
michael@0 416 this.sessionA = null; // We're done.
michael@0 417 }
michael@0 418 }
michael@0 419
michael@0 420 @Override
michael@0 421 public RepositorySessionFinishDelegate deferredFinishDelegate(final ExecutorService executor) {
michael@0 422 return new DeferredRepositorySessionFinishDelegate(this, executor);
michael@0 423 }
michael@0 424 }

mercurial