michael@0: /* This Source Code Form is subject to the terms of the Mozilla Public michael@0: * License, v. 2.0. If a copy of the MPL was not distributed with this michael@0: * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ michael@0: michael@0: package org.mozilla.gecko.sync.synchronizer; michael@0: michael@0: michael@0: import java.util.concurrent.ExecutorService; michael@0: import java.util.concurrent.atomic.AtomicInteger; michael@0: michael@0: import org.mozilla.gecko.background.common.log.Logger; michael@0: import org.mozilla.gecko.sync.repositories.InactiveSessionException; michael@0: import org.mozilla.gecko.sync.repositories.InvalidSessionTransitionException; michael@0: import org.mozilla.gecko.sync.repositories.RepositorySession; michael@0: import org.mozilla.gecko.sync.repositories.RepositorySessionBundle; michael@0: import org.mozilla.gecko.sync.repositories.delegates.DeferrableRepositorySessionCreationDelegate; michael@0: import org.mozilla.gecko.sync.repositories.delegates.DeferredRepositorySessionFinishDelegate; michael@0: import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFinishDelegate; michael@0: michael@0: import android.content.Context; michael@0: michael@0: /** michael@0: * I coordinate the moving parts of a sync started by michael@0: * {@link Synchronizer#synchronize}. michael@0: * michael@0: * I flow records twice: first from A to B, and then from B to A. I provide michael@0: * fine-grained feedback by calling my delegate's callback methods. michael@0: * michael@0: * Initialize me by creating me with a Synchronizer and a michael@0: * SynchronizerSessionDelegate. Kick things off by calling `init` with two michael@0: * RepositorySessionBundles, and then call `synchronize` in your `onInitialized` michael@0: * callback. michael@0: * michael@0: * I always call exactly one of my delegate's `onInitialized` or michael@0: * `onSessionError` callback methods from `init`. michael@0: * michael@0: * I call my delegate's `onSynchronizeSkipped` callback method if there is no michael@0: * data to be synchronized in `synchronize`. michael@0: * michael@0: * In addition, I call `onFetchError`, `onStoreError`, and `onSessionError` when michael@0: * I encounter a fetch, store, or session error while synchronizing. michael@0: * michael@0: * Typically my delegate will call `abort` in its error callbacks, which will michael@0: * call my delegate's `onSynchronizeAborted` method and halt the sync. michael@0: * michael@0: * I always call exactly one of my delegate's `onSynchronized` or michael@0: * `onSynchronizeFailed` callback methods if I have not seen an error. michael@0: */ michael@0: public class SynchronizerSession michael@0: extends DeferrableRepositorySessionCreationDelegate michael@0: implements RecordsChannelDelegate, michael@0: RepositorySessionFinishDelegate { michael@0: michael@0: protected static final String LOG_TAG = "SynchronizerSession"; michael@0: protected Synchronizer synchronizer; michael@0: protected SynchronizerSessionDelegate delegate; michael@0: protected Context context; michael@0: michael@0: /* michael@0: * Computed during init. michael@0: */ michael@0: private RepositorySession sessionA; michael@0: private RepositorySession sessionB; michael@0: private RepositorySessionBundle bundleA; michael@0: private RepositorySessionBundle bundleB; michael@0: michael@0: // Bug 726054: just like desktop, we track our last interaction with the server, michael@0: // not the last record timestamp that we fetched. This ensures that we don't re- michael@0: // download the records we just uploaded, at the cost of skipping any records michael@0: // that a concurrently syncing client has uploaded. michael@0: private long pendingATimestamp = -1; michael@0: private long pendingBTimestamp = -1; michael@0: private long storeEndATimestamp = -1; michael@0: private long storeEndBTimestamp = -1; michael@0: private boolean flowAToBCompleted = false; michael@0: private boolean flowBToACompleted = false; michael@0: michael@0: protected final AtomicInteger numInboundRecords = new AtomicInteger(-1); michael@0: protected final AtomicInteger numOutboundRecords = new AtomicInteger(-1); michael@0: michael@0: /* michael@0: * Public API: constructor, init, synchronize. michael@0: */ michael@0: public SynchronizerSession(Synchronizer synchronizer, SynchronizerSessionDelegate delegate) { michael@0: this.setSynchronizer(synchronizer); michael@0: this.delegate = delegate; michael@0: } michael@0: michael@0: public Synchronizer getSynchronizer() { michael@0: return synchronizer; michael@0: } michael@0: michael@0: public void setSynchronizer(Synchronizer synchronizer) { michael@0: this.synchronizer = synchronizer; michael@0: } michael@0: michael@0: public void init(Context context, RepositorySessionBundle bundleA, RepositorySessionBundle bundleB) { michael@0: this.context = context; michael@0: this.bundleA = bundleA; michael@0: this.bundleB = bundleB; michael@0: // Begin sessionA and sessionB, call onInitialized in callbacks. michael@0: this.getSynchronizer().repositoryA.createSession(this, context); michael@0: } michael@0: michael@0: /** michael@0: * Get the number of records fetched from the first repository (usually the michael@0: * server, hence inbound). michael@0: *

michael@0: * Valid only after first flow has completed. michael@0: * michael@0: * @return number of records, or -1 if not valid. michael@0: */ michael@0: public int getInboundCount() { michael@0: return numInboundRecords.get(); michael@0: } michael@0: michael@0: /** michael@0: * Get the number of records fetched from the second repository (usually the michael@0: * local store, hence outbound). michael@0: *

michael@0: * Valid only after second flow has completed. michael@0: * michael@0: * @return number of records, or -1 if not valid. michael@0: */ michael@0: public int getOutboundCount() { michael@0: return numOutboundRecords.get(); michael@0: } michael@0: michael@0: // These are accessed by `abort` and `synchronize`, both of which are synchronized. michael@0: // Guarded by `this`. michael@0: protected RecordsChannel channelAToB; michael@0: protected RecordsChannel channelBToA; michael@0: michael@0: /** michael@0: * Please don't call this until you've been notified with onInitialized. michael@0: */ michael@0: public synchronized void synchronize() { michael@0: numInboundRecords.set(-1); michael@0: numOutboundRecords.set(-1); michael@0: michael@0: // First thing: decide whether we should. michael@0: if (sessionA.shouldSkip() || michael@0: sessionB.shouldSkip()) { michael@0: Logger.info(LOG_TAG, "Session requested skip. Short-circuiting sync."); michael@0: sessionA.abort(); michael@0: sessionB.abort(); michael@0: this.delegate.onSynchronizeSkipped(this); michael@0: return; michael@0: } michael@0: michael@0: final SynchronizerSession session = this; michael@0: michael@0: // TODO: failed record handling. michael@0: michael@0: // This is the *second* record channel to flow. michael@0: // I, SynchronizerSession, am the delegate for the *second* flow. michael@0: channelBToA = new RecordsChannel(this.sessionB, this.sessionA, this); michael@0: michael@0: // This is the delegate for the *first* flow. michael@0: RecordsChannelDelegate channelAToBDelegate = new RecordsChannelDelegate() { michael@0: public void onFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd) { michael@0: session.onFirstFlowCompleted(recordsChannel, fetchEnd, storeEnd); michael@0: } michael@0: michael@0: @Override michael@0: public void onFlowBeginFailed(RecordsChannel recordsChannel, Exception ex) { michael@0: Logger.warn(LOG_TAG, "First RecordsChannel onFlowBeginFailed. Logging session error.", ex); michael@0: session.delegate.onSynchronizeFailed(session, ex, "Failed to begin first flow."); michael@0: } michael@0: michael@0: @Override michael@0: public void onFlowFetchFailed(RecordsChannel recordsChannel, Exception ex) { michael@0: Logger.warn(LOG_TAG, "First RecordsChannel onFlowFetchFailed. Logging remote fetch error.", ex); michael@0: } michael@0: michael@0: @Override michael@0: public void onFlowStoreFailed(RecordsChannel recordsChannel, Exception ex, String recordGuid) { michael@0: Logger.warn(LOG_TAG, "First RecordsChannel onFlowStoreFailed. Logging local store error.", ex); michael@0: } michael@0: michael@0: @Override michael@0: public void onFlowFinishFailed(RecordsChannel recordsChannel, Exception ex) { michael@0: Logger.warn(LOG_TAG, "First RecordsChannel onFlowFinishedFailed. Logging session error.", ex); michael@0: session.delegate.onSynchronizeFailed(session, ex, "Failed to finish first flow."); michael@0: } michael@0: }; michael@0: michael@0: // This is the *first* channel to flow. michael@0: channelAToB = new RecordsChannel(this.sessionA, this.sessionB, channelAToBDelegate); michael@0: michael@0: Logger.trace(LOG_TAG, "Starting A to B flow. Channel is " + channelAToB); michael@0: try { michael@0: channelAToB.beginAndFlow(); michael@0: } catch (InvalidSessionTransitionException e) { michael@0: onFlowBeginFailed(channelAToB, e); michael@0: } michael@0: } michael@0: michael@0: /** michael@0: * Called after the first flow completes. michael@0: *

michael@0: * By default, any fetch and store failures are ignored. michael@0: * @param recordsChannel the RecordsChannel (for error testing). michael@0: * @param fetchEnd timestamp when fetches completed. michael@0: * @param storeEnd timestamp when stores completed. michael@0: */ michael@0: public void onFirstFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd) { michael@0: Logger.trace(LOG_TAG, "First RecordsChannel onFlowCompleted."); michael@0: Logger.debug(LOG_TAG, "Fetch end is " + fetchEnd + ". Store end is " + storeEnd + ". Starting next."); michael@0: pendingATimestamp = fetchEnd; michael@0: storeEndBTimestamp = storeEnd; michael@0: numInboundRecords.set(recordsChannel.getFetchCount()); michael@0: flowAToBCompleted = true; michael@0: channelBToA.flow(); michael@0: } michael@0: michael@0: /** michael@0: * Called after the second flow completes. michael@0: *

michael@0: * By default, any fetch and store failures are ignored. michael@0: * @param recordsChannel the RecordsChannel (for error testing). michael@0: * @param fetchEnd timestamp when fetches completed. michael@0: * @param storeEnd timestamp when stores completed. michael@0: */ michael@0: public void onSecondFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd) { michael@0: Logger.trace(LOG_TAG, "Second RecordsChannel onFlowCompleted."); michael@0: Logger.debug(LOG_TAG, "Fetch end is " + fetchEnd + ". Store end is " + storeEnd + ". Finishing."); michael@0: michael@0: pendingBTimestamp = fetchEnd; michael@0: storeEndATimestamp = storeEnd; michael@0: numOutboundRecords.set(recordsChannel.getFetchCount()); michael@0: flowBToACompleted = true; michael@0: michael@0: // Finish the two sessions. michael@0: try { michael@0: this.sessionA.finish(this); michael@0: } catch (InactiveSessionException e) { michael@0: this.onFinishFailed(e); michael@0: return; michael@0: } michael@0: } michael@0: michael@0: @Override michael@0: public void onFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd) { michael@0: onSecondFlowCompleted(recordsChannel, fetchEnd, storeEnd); michael@0: } michael@0: michael@0: @Override michael@0: public void onFlowBeginFailed(RecordsChannel recordsChannel, Exception ex) { michael@0: Logger.warn(LOG_TAG, "Second RecordsChannel onFlowBeginFailed. Logging session error.", ex); michael@0: this.delegate.onSynchronizeFailed(this, ex, "Failed to begin second flow."); michael@0: } michael@0: michael@0: @Override michael@0: public void onFlowFetchFailed(RecordsChannel recordsChannel, Exception ex) { michael@0: Logger.warn(LOG_TAG, "Second RecordsChannel onFlowFetchFailed. Logging local fetch error.", ex); michael@0: } michael@0: michael@0: @Override michael@0: public void onFlowStoreFailed(RecordsChannel recordsChannel, Exception ex, String recordGuid) { michael@0: Logger.warn(LOG_TAG, "Second RecordsChannel onFlowStoreFailed. Logging remote store error.", ex); michael@0: } michael@0: michael@0: @Override michael@0: public void onFlowFinishFailed(RecordsChannel recordsChannel, Exception ex) { michael@0: Logger.warn(LOG_TAG, "Second RecordsChannel onFlowFinishedFailed. Logging session error.", ex); michael@0: this.delegate.onSynchronizeFailed(this, ex, "Failed to finish second flow."); michael@0: } michael@0: michael@0: /* michael@0: * RepositorySessionCreationDelegate methods. michael@0: */ michael@0: michael@0: /** michael@0: * I could be called twice: once for sessionA and once for sessionB. michael@0: * michael@0: * I try to clean up sessionA if it is not null, since the creation of michael@0: * sessionB must have failed. michael@0: */ michael@0: @Override michael@0: public void onSessionCreateFailed(Exception ex) { michael@0: // Attempt to finish the first session, if the second is the one that failed. michael@0: if (this.sessionA != null) { michael@0: try { michael@0: // We no longer need a reference to our context. michael@0: this.context = null; michael@0: this.sessionA.finish(this); michael@0: } catch (Exception e) { michael@0: // Never mind; best-effort finish. michael@0: } michael@0: } michael@0: // We no longer need a reference to our context. michael@0: this.context = null; michael@0: this.delegate.onSynchronizeFailed(this, ex, "Failed to create session"); michael@0: } michael@0: michael@0: /** michael@0: * I should be called twice: first for sessionA and second for sessionB. michael@0: * michael@0: * If I am called for sessionB, I call my delegate's `onInitialized` callback michael@0: * method because my repository sessions are correctly initialized. michael@0: */ michael@0: // TODO: some of this "finish and clean up" code can be refactored out. michael@0: @Override michael@0: public void onSessionCreated(RepositorySession session) { michael@0: if (session == null || michael@0: this.sessionA == session) { michael@0: // TODO: clean up sessionA. michael@0: this.delegate.onSynchronizeFailed(this, new UnexpectedSessionException(session), "Failed to create session."); michael@0: return; michael@0: } michael@0: if (this.sessionA == null) { michael@0: this.sessionA = session; michael@0: michael@0: // Unbundle. michael@0: try { michael@0: this.sessionA.unbundle(this.bundleA); michael@0: } catch (Exception e) { michael@0: this.delegate.onSynchronizeFailed(this, new UnbundleError(e, sessionA), "Failed to unbundle first session."); michael@0: // TODO: abort michael@0: return; michael@0: } michael@0: this.getSynchronizer().repositoryB.createSession(this, this.context); michael@0: return; michael@0: } michael@0: if (this.sessionB == null) { michael@0: this.sessionB = session; michael@0: // We no longer need a reference to our context. michael@0: this.context = null; michael@0: michael@0: // Unbundle. We unbundled sessionA when that session was created. michael@0: try { michael@0: this.sessionB.unbundle(this.bundleB); michael@0: } catch (Exception e) { michael@0: this.delegate.onSynchronizeFailed(this, new UnbundleError(e, sessionA), "Failed to unbundle second session."); michael@0: return; michael@0: } michael@0: michael@0: this.delegate.onInitialized(this); michael@0: return; michael@0: } michael@0: // TODO: need a way to make sure we don't call any more delegate methods. michael@0: this.delegate.onSynchronizeFailed(this, new UnexpectedSessionException(session), "Failed to create session."); michael@0: } michael@0: michael@0: /* michael@0: * RepositorySessionFinishDelegate methods. michael@0: */ michael@0: michael@0: /** michael@0: * I could be called twice: once for sessionA and once for sessionB. michael@0: * michael@0: * If sessionB couldn't be created, I don't fail again. michael@0: */ michael@0: @Override michael@0: public void onFinishFailed(Exception ex) { michael@0: if (this.sessionB == null) { michael@0: // Ah, it was a problem cleaning up. Never mind. michael@0: Logger.warn(LOG_TAG, "Got exception cleaning up first after second session creation failed.", ex); michael@0: return; michael@0: } michael@0: String session = (this.sessionA == null) ? "B" : "A"; michael@0: this.delegate.onSynchronizeFailed(this, ex, "Finish of session " + session + " failed."); michael@0: } michael@0: michael@0: /** michael@0: * I should be called twice: first for sessionA and second for sessionB. michael@0: * michael@0: * If I am called for sessionA, I try to finish sessionB. michael@0: * michael@0: * If I am called for sessionB, I call my delegate's `onSynchronized` callback michael@0: * method because my flows should have completed. michael@0: */ michael@0: @Override michael@0: public void onFinishSucceeded(RepositorySession session, michael@0: RepositorySessionBundle bundle) { michael@0: Logger.debug(LOG_TAG, "onFinishSucceeded. Flows? " + flowAToBCompleted + ", " + flowBToACompleted); michael@0: michael@0: if (session == sessionA) { michael@0: if (flowAToBCompleted) { michael@0: Logger.debug(LOG_TAG, "onFinishSucceeded: bumping session A's timestamp to " + pendingATimestamp + " or " + storeEndATimestamp); michael@0: bundle.bumpTimestamp(Math.max(pendingATimestamp, storeEndATimestamp)); michael@0: this.synchronizer.bundleA = bundle; michael@0: } else { michael@0: // Should not happen! michael@0: this.delegate.onSynchronizeFailed(this, new UnexpectedSessionException(sessionA), "Failed to finish first session."); michael@0: return; michael@0: } michael@0: if (this.sessionB != null) { michael@0: Logger.trace(LOG_TAG, "Finishing session B."); michael@0: // On to the next. michael@0: try { michael@0: this.sessionB.finish(this); michael@0: } catch (InactiveSessionException e) { michael@0: this.onFinishFailed(e); michael@0: return; michael@0: } michael@0: } michael@0: } else if (session == sessionB) { michael@0: if (flowBToACompleted) { michael@0: Logger.debug(LOG_TAG, "onFinishSucceeded: bumping session B's timestamp to " + pendingBTimestamp + " or " + storeEndBTimestamp); michael@0: bundle.bumpTimestamp(Math.max(pendingBTimestamp, storeEndBTimestamp)); michael@0: this.synchronizer.bundleB = bundle; michael@0: Logger.trace(LOG_TAG, "Notifying delegate.onSynchronized."); michael@0: this.delegate.onSynchronized(this); michael@0: } else { michael@0: // Should not happen! michael@0: this.delegate.onSynchronizeFailed(this, new UnexpectedSessionException(sessionB), "Failed to finish second session."); michael@0: return; michael@0: } michael@0: } else { michael@0: // TODO: hurrrrrr... michael@0: } michael@0: michael@0: if (this.sessionB == null) { michael@0: this.sessionA = null; // We're done. michael@0: } michael@0: } michael@0: michael@0: @Override michael@0: public RepositorySessionFinishDelegate deferredFinishDelegate(final ExecutorService executor) { michael@0: return new DeferredRepositorySessionFinishDelegate(this, executor); michael@0: } michael@0: }