mobile/android/base/sync/synchronizer/SynchronizerSession.java

Wed, 31 Dec 2014 07:22:50 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Wed, 31 Dec 2014 07:22:50 +0100
branch
TOR_BUG_3246
changeset 4
fc2d59ddac77
permissions
-rw-r--r--

Correct previous dual key logic pending first delivery installment.

     1 /* This Source Code Form is subject to the terms of the Mozilla Public
     2  * License, v. 2.0. If a copy of the MPL was not distributed with this
     3  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
     5 package org.mozilla.gecko.sync.synchronizer;
     8 import java.util.concurrent.ExecutorService;
     9 import java.util.concurrent.atomic.AtomicInteger;
    11 import org.mozilla.gecko.background.common.log.Logger;
    12 import org.mozilla.gecko.sync.repositories.InactiveSessionException;
    13 import org.mozilla.gecko.sync.repositories.InvalidSessionTransitionException;
    14 import org.mozilla.gecko.sync.repositories.RepositorySession;
    15 import org.mozilla.gecko.sync.repositories.RepositorySessionBundle;
    16 import org.mozilla.gecko.sync.repositories.delegates.DeferrableRepositorySessionCreationDelegate;
    17 import org.mozilla.gecko.sync.repositories.delegates.DeferredRepositorySessionFinishDelegate;
    18 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFinishDelegate;
    20 import android.content.Context;
    22 /**
    23  * I coordinate the moving parts of a sync started by
    24  * {@link Synchronizer#synchronize}.
    25  *
    26  * I flow records twice: first from A to B, and then from B to A. I provide
    27  * fine-grained feedback by calling my delegate's callback methods.
    28  *
    29  * Initialize me by creating me with a Synchronizer and a
    30  * SynchronizerSessionDelegate. Kick things off by calling `init` with two
    31  * RepositorySessionBundles, and then call `synchronize` in your `onInitialized`
    32  * callback.
    33  *
    34  * I always call exactly one of my delegate's `onInitialized` or
    35  * `onSessionError` callback methods from `init`.
    36  *
    37  * I call my delegate's `onSynchronizeSkipped` callback method if there is no
    38  * data to be synchronized in `synchronize`.
    39  *
    40  * In addition, I call `onFetchError`, `onStoreError`, and `onSessionError` when
    41  * I encounter a fetch, store, or session error while synchronizing.
    42  *
    43  * Typically my delegate will call `abort` in its error callbacks, which will
    44  * call my delegate's `onSynchronizeAborted` method and halt the sync.
    45  *
    46  * I always call exactly one of my delegate's `onSynchronized` or
    47  * `onSynchronizeFailed` callback methods if I have not seen an error.
    48  */
    49 public class SynchronizerSession
    50 extends DeferrableRepositorySessionCreationDelegate
    51 implements RecordsChannelDelegate,
    52            RepositorySessionFinishDelegate {
    54   protected static final String LOG_TAG = "SynchronizerSession";
    55   protected Synchronizer synchronizer;
    56   protected SynchronizerSessionDelegate delegate;
    57   protected Context context;
    59   /*
    60    * Computed during init.
    61    */
    62   private RepositorySession sessionA;
    63   private RepositorySession sessionB;
    64   private RepositorySessionBundle bundleA;
    65   private RepositorySessionBundle bundleB;
    67   // Bug 726054: just like desktop, we track our last interaction with the server,
    68   // not the last record timestamp that we fetched. This ensures that we don't re-
    69   // download the records we just uploaded, at the cost of skipping any records
    70   // that a concurrently syncing client has uploaded.
    71   private long pendingATimestamp = -1;
    72   private long pendingBTimestamp = -1;
    73   private long storeEndATimestamp = -1;
    74   private long storeEndBTimestamp = -1;
    75   private boolean flowAToBCompleted = false;
    76   private boolean flowBToACompleted = false;
    78   protected final AtomicInteger numInboundRecords = new AtomicInteger(-1);
    79   protected final AtomicInteger numOutboundRecords = new AtomicInteger(-1);
    81   /*
    82    * Public API: constructor, init, synchronize.
    83    */
    84   public SynchronizerSession(Synchronizer synchronizer, SynchronizerSessionDelegate delegate) {
    85     this.setSynchronizer(synchronizer);
    86     this.delegate = delegate;
    87   }
    89   public Synchronizer getSynchronizer() {
    90     return synchronizer;
    91   }
    93   public void setSynchronizer(Synchronizer synchronizer) {
    94     this.synchronizer = synchronizer;
    95   }
    97   public void init(Context context, RepositorySessionBundle bundleA, RepositorySessionBundle bundleB) {
    98     this.context = context;
    99     this.bundleA = bundleA;
   100     this.bundleB = bundleB;
   101     // Begin sessionA and sessionB, call onInitialized in callbacks.
   102     this.getSynchronizer().repositoryA.createSession(this, context);
   103   }
   105   /**
   106    * Get the number of records fetched from the first repository (usually the
   107    * server, hence inbound).
   108    * <p>
   109    * Valid only after first flow has completed.
   110    *
   111    * @return number of records, or -1 if not valid.
   112    */
   113   public int getInboundCount() {
   114     return numInboundRecords.get();
   115   }
   117   /**
   118    * Get the number of records fetched from the second repository (usually the
   119    * local store, hence outbound).
   120    * <p>
   121    * Valid only after second flow has completed.
   122    *
   123    * @return number of records, or -1 if not valid.
   124    */
   125   public int getOutboundCount() {
   126     return numOutboundRecords.get();
   127   }
   129   // These are accessed by `abort` and `synchronize`, both of which are synchronized.
   130   // Guarded by `this`.
   131   protected RecordsChannel channelAToB;
   132   protected RecordsChannel channelBToA;
   134   /**
   135    * Please don't call this until you've been notified with onInitialized.
   136    */
   137   public synchronized void synchronize() {
   138     numInboundRecords.set(-1);
   139     numOutboundRecords.set(-1);
   141     // First thing: decide whether we should.
   142     if (sessionA.shouldSkip() ||
   143         sessionB.shouldSkip()) {
   144       Logger.info(LOG_TAG, "Session requested skip. Short-circuiting sync.");
   145       sessionA.abort();
   146       sessionB.abort();
   147       this.delegate.onSynchronizeSkipped(this);
   148       return;
   149     }
   151     final SynchronizerSession session = this;
   153     // TODO: failed record handling.
   155     // This is the *second* record channel to flow.
   156     // I, SynchronizerSession, am the delegate for the *second* flow.
   157     channelBToA = new RecordsChannel(this.sessionB, this.sessionA, this);
   159     // This is the delegate for the *first* flow.
   160     RecordsChannelDelegate channelAToBDelegate = new RecordsChannelDelegate() {
   161       public void onFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd) {
   162         session.onFirstFlowCompleted(recordsChannel, fetchEnd, storeEnd);
   163       }
   165       @Override
   166       public void onFlowBeginFailed(RecordsChannel recordsChannel, Exception ex) {
   167         Logger.warn(LOG_TAG, "First RecordsChannel onFlowBeginFailed. Logging session error.", ex);
   168         session.delegate.onSynchronizeFailed(session, ex, "Failed to begin first flow.");
   169       }
   171       @Override
   172       public void onFlowFetchFailed(RecordsChannel recordsChannel, Exception ex) {
   173         Logger.warn(LOG_TAG, "First RecordsChannel onFlowFetchFailed. Logging remote fetch error.", ex);
   174       }
   176       @Override
   177       public void onFlowStoreFailed(RecordsChannel recordsChannel, Exception ex, String recordGuid) {
   178         Logger.warn(LOG_TAG, "First RecordsChannel onFlowStoreFailed. Logging local store error.", ex);
   179       }
   181       @Override
   182       public void onFlowFinishFailed(RecordsChannel recordsChannel, Exception ex) {
   183         Logger.warn(LOG_TAG, "First RecordsChannel onFlowFinishedFailed. Logging session error.", ex);
   184         session.delegate.onSynchronizeFailed(session, ex, "Failed to finish first flow.");
   185       }
   186     };
   188     // This is the *first* channel to flow.
   189     channelAToB = new RecordsChannel(this.sessionA, this.sessionB, channelAToBDelegate);
   191     Logger.trace(LOG_TAG, "Starting A to B flow. Channel is " + channelAToB);
   192     try {
   193       channelAToB.beginAndFlow();
   194     } catch (InvalidSessionTransitionException e) {
   195       onFlowBeginFailed(channelAToB, e);
   196     }
   197   }
   199   /**
   200    * Called after the first flow completes.
   201    * <p>
   202    * By default, any fetch and store failures are ignored.
   203    * @param recordsChannel the <code>RecordsChannel</code> (for error testing).
   204    * @param fetchEnd timestamp when fetches completed.
   205    * @param storeEnd timestamp when stores completed.
   206    */
   207   public void onFirstFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd) {
   208     Logger.trace(LOG_TAG, "First RecordsChannel onFlowCompleted.");
   209     Logger.debug(LOG_TAG, "Fetch end is " + fetchEnd + ". Store end is " + storeEnd + ". Starting next.");
   210     pendingATimestamp = fetchEnd;
   211     storeEndBTimestamp = storeEnd;
   212     numInboundRecords.set(recordsChannel.getFetchCount());
   213     flowAToBCompleted = true;
   214     channelBToA.flow();
   215   }
   217   /**
   218    * Called after the second flow completes.
   219    * <p>
   220    * By default, any fetch and store failures are ignored.
   221    * @param recordsChannel the <code>RecordsChannel</code> (for error testing).
   222    * @param fetchEnd timestamp when fetches completed.
   223    * @param storeEnd timestamp when stores completed.
   224    */
   225   public void onSecondFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd) {
   226     Logger.trace(LOG_TAG, "Second RecordsChannel onFlowCompleted.");
   227     Logger.debug(LOG_TAG, "Fetch end is " + fetchEnd + ". Store end is " + storeEnd + ". Finishing.");
   229     pendingBTimestamp = fetchEnd;
   230     storeEndATimestamp = storeEnd;
   231     numOutboundRecords.set(recordsChannel.getFetchCount());
   232     flowBToACompleted = true;
   234     // Finish the two sessions.
   235     try {
   236       this.sessionA.finish(this);
   237     } catch (InactiveSessionException e) {
   238       this.onFinishFailed(e);
   239       return;
   240     }
   241   }
   243   @Override
   244   public void onFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd) {
   245     onSecondFlowCompleted(recordsChannel, fetchEnd, storeEnd);
   246   }
   248   @Override
   249   public void onFlowBeginFailed(RecordsChannel recordsChannel, Exception ex) {
   250     Logger.warn(LOG_TAG, "Second RecordsChannel onFlowBeginFailed. Logging session error.", ex);
   251     this.delegate.onSynchronizeFailed(this, ex, "Failed to begin second flow.");
   252   }
   254   @Override
   255   public void onFlowFetchFailed(RecordsChannel recordsChannel, Exception ex) {
   256     Logger.warn(LOG_TAG, "Second RecordsChannel onFlowFetchFailed. Logging local fetch error.", ex);
   257   }
   259   @Override
   260   public void onFlowStoreFailed(RecordsChannel recordsChannel, Exception ex, String recordGuid) {
   261     Logger.warn(LOG_TAG, "Second RecordsChannel onFlowStoreFailed. Logging remote store error.", ex);
   262   }
   264   @Override
   265   public void onFlowFinishFailed(RecordsChannel recordsChannel, Exception ex) {
   266     Logger.warn(LOG_TAG, "Second RecordsChannel onFlowFinishedFailed. Logging session error.", ex);
   267     this.delegate.onSynchronizeFailed(this, ex, "Failed to finish second flow.");
   268   }
   270   /*
   271    * RepositorySessionCreationDelegate methods.
   272    */
   274   /**
   275    * I could be called twice: once for sessionA and once for sessionB.
   276    *
   277    * I try to clean up sessionA if it is not null, since the creation of
   278    * sessionB must have failed.
   279    */
   280   @Override
   281   public void onSessionCreateFailed(Exception ex) {
   282     // Attempt to finish the first session, if the second is the one that failed.
   283     if (this.sessionA != null) {
   284       try {
   285         // We no longer need a reference to our context.
   286         this.context = null;
   287         this.sessionA.finish(this);
   288       } catch (Exception e) {
   289         // Never mind; best-effort finish.
   290       }
   291     }
   292     // We no longer need a reference to our context.
   293     this.context = null;
   294     this.delegate.onSynchronizeFailed(this, ex, "Failed to create session");
   295   }
   297   /**
   298    * I should be called twice: first for sessionA and second for sessionB.
   299    *
   300    * If I am called for sessionB, I call my delegate's `onInitialized` callback
   301    * method because my repository sessions are correctly initialized.
   302    */
   303   // TODO: some of this "finish and clean up" code can be refactored out.
   304   @Override
   305   public void onSessionCreated(RepositorySession session) {
   306     if (session == null ||
   307         this.sessionA == session) {
   308       // TODO: clean up sessionA.
   309       this.delegate.onSynchronizeFailed(this, new UnexpectedSessionException(session), "Failed to create session.");
   310       return;
   311     }
   312     if (this.sessionA == null) {
   313       this.sessionA = session;
   315       // Unbundle.
   316       try {
   317         this.sessionA.unbundle(this.bundleA);
   318       } catch (Exception e) {
   319         this.delegate.onSynchronizeFailed(this, new UnbundleError(e, sessionA), "Failed to unbundle first session.");
   320         // TODO: abort
   321         return;
   322       }
   323       this.getSynchronizer().repositoryB.createSession(this, this.context);
   324       return;
   325     }
   326     if (this.sessionB == null) {
   327       this.sessionB = session;
   328       // We no longer need a reference to our context.
   329       this.context = null;
   331       // Unbundle. We unbundled sessionA when that session was created.
   332       try {
   333         this.sessionB.unbundle(this.bundleB);
   334       } catch (Exception e) {
   335         this.delegate.onSynchronizeFailed(this, new UnbundleError(e, sessionA), "Failed to unbundle second session.");
   336         return;
   337       }
   339       this.delegate.onInitialized(this);
   340       return;
   341     }
   342     // TODO: need a way to make sure we don't call any more delegate methods.
   343     this.delegate.onSynchronizeFailed(this, new UnexpectedSessionException(session), "Failed to create session.");
   344   }
   346   /*
   347    * RepositorySessionFinishDelegate methods.
   348    */
   350   /**
   351    * I could be called twice: once for sessionA and once for sessionB.
   352    *
   353    * If sessionB couldn't be created, I don't fail again.
   354    */
   355   @Override
   356   public void onFinishFailed(Exception ex) {
   357     if (this.sessionB == null) {
   358       // Ah, it was a problem cleaning up. Never mind.
   359       Logger.warn(LOG_TAG, "Got exception cleaning up first after second session creation failed.", ex);
   360       return;
   361     }
   362     String session = (this.sessionA == null) ? "B" : "A";
   363     this.delegate.onSynchronizeFailed(this, ex, "Finish of session " + session + " failed.");
   364   }
   366   /**
   367    * I should be called twice: first for sessionA and second for sessionB.
   368    *
   369    * If I am called for sessionA, I try to finish sessionB.
   370    *
   371    * If I am called for sessionB, I call my delegate's `onSynchronized` callback
   372    * method because my flows should have completed.
   373    */
   374   @Override
   375   public void onFinishSucceeded(RepositorySession session,
   376                                 RepositorySessionBundle bundle) {
   377     Logger.debug(LOG_TAG, "onFinishSucceeded. Flows? " + flowAToBCompleted + ", " + flowBToACompleted);
   379     if (session == sessionA) {
   380       if (flowAToBCompleted) {
   381         Logger.debug(LOG_TAG, "onFinishSucceeded: bumping session A's timestamp to " + pendingATimestamp + " or " + storeEndATimestamp);
   382         bundle.bumpTimestamp(Math.max(pendingATimestamp, storeEndATimestamp));
   383         this.synchronizer.bundleA = bundle;
   384       } else {
   385         // Should not happen!
   386         this.delegate.onSynchronizeFailed(this, new UnexpectedSessionException(sessionA), "Failed to finish first session.");
   387         return;
   388       }
   389       if (this.sessionB != null) {
   390         Logger.trace(LOG_TAG, "Finishing session B.");
   391         // On to the next.
   392         try {
   393           this.sessionB.finish(this);
   394         } catch (InactiveSessionException e) {
   395           this.onFinishFailed(e);
   396           return;
   397         }
   398       }
   399     } else if (session == sessionB) {
   400       if (flowBToACompleted) {
   401         Logger.debug(LOG_TAG, "onFinishSucceeded: bumping session B's timestamp to " + pendingBTimestamp + " or " + storeEndBTimestamp);
   402         bundle.bumpTimestamp(Math.max(pendingBTimestamp, storeEndBTimestamp));
   403         this.synchronizer.bundleB = bundle;
   404         Logger.trace(LOG_TAG, "Notifying delegate.onSynchronized.");
   405         this.delegate.onSynchronized(this);
   406       } else {
   407         // Should not happen!
   408         this.delegate.onSynchronizeFailed(this, new UnexpectedSessionException(sessionB), "Failed to finish second session.");
   409         return;
   410       }
   411     } else {
   412       // TODO: hurrrrrr...
   413     }
   415     if (this.sessionB == null) {
   416       this.sessionA = null; // We're done.
   417     }
   418   }
   420   @Override
   421   public RepositorySessionFinishDelegate deferredFinishDelegate(final ExecutorService executor) {
   422     return new DeferredRepositorySessionFinishDelegate(this, executor);
   423   }
   424 }

mercurial