1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 1.2 +++ b/mobile/android/base/sync/synchronizer/SynchronizerSession.java Wed Dec 31 06:09:35 2014 +0100 1.3 @@ -0,0 +1,424 @@ 1.4 +/* This Source Code Form is subject to the terms of the Mozilla Public 1.5 + * License, v. 2.0. If a copy of the MPL was not distributed with this 1.6 + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 1.7 + 1.8 +package org.mozilla.gecko.sync.synchronizer; 1.9 + 1.10 + 1.11 +import java.util.concurrent.ExecutorService; 1.12 +import java.util.concurrent.atomic.AtomicInteger; 1.13 + 1.14 +import org.mozilla.gecko.background.common.log.Logger; 1.15 +import org.mozilla.gecko.sync.repositories.InactiveSessionException; 1.16 +import org.mozilla.gecko.sync.repositories.InvalidSessionTransitionException; 1.17 +import org.mozilla.gecko.sync.repositories.RepositorySession; 1.18 +import org.mozilla.gecko.sync.repositories.RepositorySessionBundle; 1.19 +import org.mozilla.gecko.sync.repositories.delegates.DeferrableRepositorySessionCreationDelegate; 1.20 +import org.mozilla.gecko.sync.repositories.delegates.DeferredRepositorySessionFinishDelegate; 1.21 +import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFinishDelegate; 1.22 + 1.23 +import android.content.Context; 1.24 + 1.25 +/** 1.26 + * I coordinate the moving parts of a sync started by 1.27 + * {@link Synchronizer#synchronize}. 1.28 + * 1.29 + * I flow records twice: first from A to B, and then from B to A. I provide 1.30 + * fine-grained feedback by calling my delegate's callback methods. 1.31 + * 1.32 + * Initialize me by creating me with a Synchronizer and a 1.33 + * SynchronizerSessionDelegate. Kick things off by calling `init` with two 1.34 + * RepositorySessionBundles, and then call `synchronize` in your `onInitialized` 1.35 + * callback. 1.36 + * 1.37 + * I always call exactly one of my delegate's `onInitialized` or 1.38 + * `onSessionError` callback methods from `init`. 1.39 + * 1.40 + * I call my delegate's `onSynchronizeSkipped` callback method if there is no 1.41 + * data to be synchronized in `synchronize`. 1.42 + * 1.43 + * In addition, I call `onFetchError`, `onStoreError`, and `onSessionError` when 1.44 + * I encounter a fetch, store, or session error while synchronizing. 1.45 + * 1.46 + * Typically my delegate will call `abort` in its error callbacks, which will 1.47 + * call my delegate's `onSynchronizeAborted` method and halt the sync. 1.48 + * 1.49 + * I always call exactly one of my delegate's `onSynchronized` or 1.50 + * `onSynchronizeFailed` callback methods if I have not seen an error. 1.51 + */ 1.52 +public class SynchronizerSession 1.53 +extends DeferrableRepositorySessionCreationDelegate 1.54 +implements RecordsChannelDelegate, 1.55 + RepositorySessionFinishDelegate { 1.56 + 1.57 + protected static final String LOG_TAG = "SynchronizerSession"; 1.58 + protected Synchronizer synchronizer; 1.59 + protected SynchronizerSessionDelegate delegate; 1.60 + protected Context context; 1.61 + 1.62 + /* 1.63 + * Computed during init. 1.64 + */ 1.65 + private RepositorySession sessionA; 1.66 + private RepositorySession sessionB; 1.67 + private RepositorySessionBundle bundleA; 1.68 + private RepositorySessionBundle bundleB; 1.69 + 1.70 + // Bug 726054: just like desktop, we track our last interaction with the server, 1.71 + // not the last record timestamp that we fetched. This ensures that we don't re- 1.72 + // download the records we just uploaded, at the cost of skipping any records 1.73 + // that a concurrently syncing client has uploaded. 1.74 + private long pendingATimestamp = -1; 1.75 + private long pendingBTimestamp = -1; 1.76 + private long storeEndATimestamp = -1; 1.77 + private long storeEndBTimestamp = -1; 1.78 + private boolean flowAToBCompleted = false; 1.79 + private boolean flowBToACompleted = false; 1.80 + 1.81 + protected final AtomicInteger numInboundRecords = new AtomicInteger(-1); 1.82 + protected final AtomicInteger numOutboundRecords = new AtomicInteger(-1); 1.83 + 1.84 + /* 1.85 + * Public API: constructor, init, synchronize. 1.86 + */ 1.87 + public SynchronizerSession(Synchronizer synchronizer, SynchronizerSessionDelegate delegate) { 1.88 + this.setSynchronizer(synchronizer); 1.89 + this.delegate = delegate; 1.90 + } 1.91 + 1.92 + public Synchronizer getSynchronizer() { 1.93 + return synchronizer; 1.94 + } 1.95 + 1.96 + public void setSynchronizer(Synchronizer synchronizer) { 1.97 + this.synchronizer = synchronizer; 1.98 + } 1.99 + 1.100 + public void init(Context context, RepositorySessionBundle bundleA, RepositorySessionBundle bundleB) { 1.101 + this.context = context; 1.102 + this.bundleA = bundleA; 1.103 + this.bundleB = bundleB; 1.104 + // Begin sessionA and sessionB, call onInitialized in callbacks. 1.105 + this.getSynchronizer().repositoryA.createSession(this, context); 1.106 + } 1.107 + 1.108 + /** 1.109 + * Get the number of records fetched from the first repository (usually the 1.110 + * server, hence inbound). 1.111 + * <p> 1.112 + * Valid only after first flow has completed. 1.113 + * 1.114 + * @return number of records, or -1 if not valid. 1.115 + */ 1.116 + public int getInboundCount() { 1.117 + return numInboundRecords.get(); 1.118 + } 1.119 + 1.120 + /** 1.121 + * Get the number of records fetched from the second repository (usually the 1.122 + * local store, hence outbound). 1.123 + * <p> 1.124 + * Valid only after second flow has completed. 1.125 + * 1.126 + * @return number of records, or -1 if not valid. 1.127 + */ 1.128 + public int getOutboundCount() { 1.129 + return numOutboundRecords.get(); 1.130 + } 1.131 + 1.132 + // These are accessed by `abort` and `synchronize`, both of which are synchronized. 1.133 + // Guarded by `this`. 1.134 + protected RecordsChannel channelAToB; 1.135 + protected RecordsChannel channelBToA; 1.136 + 1.137 + /** 1.138 + * Please don't call this until you've been notified with onInitialized. 1.139 + */ 1.140 + public synchronized void synchronize() { 1.141 + numInboundRecords.set(-1); 1.142 + numOutboundRecords.set(-1); 1.143 + 1.144 + // First thing: decide whether we should. 1.145 + if (sessionA.shouldSkip() || 1.146 + sessionB.shouldSkip()) { 1.147 + Logger.info(LOG_TAG, "Session requested skip. Short-circuiting sync."); 1.148 + sessionA.abort(); 1.149 + sessionB.abort(); 1.150 + this.delegate.onSynchronizeSkipped(this); 1.151 + return; 1.152 + } 1.153 + 1.154 + final SynchronizerSession session = this; 1.155 + 1.156 + // TODO: failed record handling. 1.157 + 1.158 + // This is the *second* record channel to flow. 1.159 + // I, SynchronizerSession, am the delegate for the *second* flow. 1.160 + channelBToA = new RecordsChannel(this.sessionB, this.sessionA, this); 1.161 + 1.162 + // This is the delegate for the *first* flow. 1.163 + RecordsChannelDelegate channelAToBDelegate = new RecordsChannelDelegate() { 1.164 + public void onFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd) { 1.165 + session.onFirstFlowCompleted(recordsChannel, fetchEnd, storeEnd); 1.166 + } 1.167 + 1.168 + @Override 1.169 + public void onFlowBeginFailed(RecordsChannel recordsChannel, Exception ex) { 1.170 + Logger.warn(LOG_TAG, "First RecordsChannel onFlowBeginFailed. Logging session error.", ex); 1.171 + session.delegate.onSynchronizeFailed(session, ex, "Failed to begin first flow."); 1.172 + } 1.173 + 1.174 + @Override 1.175 + public void onFlowFetchFailed(RecordsChannel recordsChannel, Exception ex) { 1.176 + Logger.warn(LOG_TAG, "First RecordsChannel onFlowFetchFailed. Logging remote fetch error.", ex); 1.177 + } 1.178 + 1.179 + @Override 1.180 + public void onFlowStoreFailed(RecordsChannel recordsChannel, Exception ex, String recordGuid) { 1.181 + Logger.warn(LOG_TAG, "First RecordsChannel onFlowStoreFailed. Logging local store error.", ex); 1.182 + } 1.183 + 1.184 + @Override 1.185 + public void onFlowFinishFailed(RecordsChannel recordsChannel, Exception ex) { 1.186 + Logger.warn(LOG_TAG, "First RecordsChannel onFlowFinishedFailed. Logging session error.", ex); 1.187 + session.delegate.onSynchronizeFailed(session, ex, "Failed to finish first flow."); 1.188 + } 1.189 + }; 1.190 + 1.191 + // This is the *first* channel to flow. 1.192 + channelAToB = new RecordsChannel(this.sessionA, this.sessionB, channelAToBDelegate); 1.193 + 1.194 + Logger.trace(LOG_TAG, "Starting A to B flow. Channel is " + channelAToB); 1.195 + try { 1.196 + channelAToB.beginAndFlow(); 1.197 + } catch (InvalidSessionTransitionException e) { 1.198 + onFlowBeginFailed(channelAToB, e); 1.199 + } 1.200 + } 1.201 + 1.202 + /** 1.203 + * Called after the first flow completes. 1.204 + * <p> 1.205 + * By default, any fetch and store failures are ignored. 1.206 + * @param recordsChannel the <code>RecordsChannel</code> (for error testing). 1.207 + * @param fetchEnd timestamp when fetches completed. 1.208 + * @param storeEnd timestamp when stores completed. 1.209 + */ 1.210 + public void onFirstFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd) { 1.211 + Logger.trace(LOG_TAG, "First RecordsChannel onFlowCompleted."); 1.212 + Logger.debug(LOG_TAG, "Fetch end is " + fetchEnd + ". Store end is " + storeEnd + ". Starting next."); 1.213 + pendingATimestamp = fetchEnd; 1.214 + storeEndBTimestamp = storeEnd; 1.215 + numInboundRecords.set(recordsChannel.getFetchCount()); 1.216 + flowAToBCompleted = true; 1.217 + channelBToA.flow(); 1.218 + } 1.219 + 1.220 + /** 1.221 + * Called after the second flow completes. 1.222 + * <p> 1.223 + * By default, any fetch and store failures are ignored. 1.224 + * @param recordsChannel the <code>RecordsChannel</code> (for error testing). 1.225 + * @param fetchEnd timestamp when fetches completed. 1.226 + * @param storeEnd timestamp when stores completed. 1.227 + */ 1.228 + public void onSecondFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd) { 1.229 + Logger.trace(LOG_TAG, "Second RecordsChannel onFlowCompleted."); 1.230 + Logger.debug(LOG_TAG, "Fetch end is " + fetchEnd + ". Store end is " + storeEnd + ". Finishing."); 1.231 + 1.232 + pendingBTimestamp = fetchEnd; 1.233 + storeEndATimestamp = storeEnd; 1.234 + numOutboundRecords.set(recordsChannel.getFetchCount()); 1.235 + flowBToACompleted = true; 1.236 + 1.237 + // Finish the two sessions. 1.238 + try { 1.239 + this.sessionA.finish(this); 1.240 + } catch (InactiveSessionException e) { 1.241 + this.onFinishFailed(e); 1.242 + return; 1.243 + } 1.244 + } 1.245 + 1.246 + @Override 1.247 + public void onFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd) { 1.248 + onSecondFlowCompleted(recordsChannel, fetchEnd, storeEnd); 1.249 + } 1.250 + 1.251 + @Override 1.252 + public void onFlowBeginFailed(RecordsChannel recordsChannel, Exception ex) { 1.253 + Logger.warn(LOG_TAG, "Second RecordsChannel onFlowBeginFailed. Logging session error.", ex); 1.254 + this.delegate.onSynchronizeFailed(this, ex, "Failed to begin second flow."); 1.255 + } 1.256 + 1.257 + @Override 1.258 + public void onFlowFetchFailed(RecordsChannel recordsChannel, Exception ex) { 1.259 + Logger.warn(LOG_TAG, "Second RecordsChannel onFlowFetchFailed. Logging local fetch error.", ex); 1.260 + } 1.261 + 1.262 + @Override 1.263 + public void onFlowStoreFailed(RecordsChannel recordsChannel, Exception ex, String recordGuid) { 1.264 + Logger.warn(LOG_TAG, "Second RecordsChannel onFlowStoreFailed. Logging remote store error.", ex); 1.265 + } 1.266 + 1.267 + @Override 1.268 + public void onFlowFinishFailed(RecordsChannel recordsChannel, Exception ex) { 1.269 + Logger.warn(LOG_TAG, "Second RecordsChannel onFlowFinishedFailed. Logging session error.", ex); 1.270 + this.delegate.onSynchronizeFailed(this, ex, "Failed to finish second flow."); 1.271 + } 1.272 + 1.273 + /* 1.274 + * RepositorySessionCreationDelegate methods. 1.275 + */ 1.276 + 1.277 + /** 1.278 + * I could be called twice: once for sessionA and once for sessionB. 1.279 + * 1.280 + * I try to clean up sessionA if it is not null, since the creation of 1.281 + * sessionB must have failed. 1.282 + */ 1.283 + @Override 1.284 + public void onSessionCreateFailed(Exception ex) { 1.285 + // Attempt to finish the first session, if the second is the one that failed. 1.286 + if (this.sessionA != null) { 1.287 + try { 1.288 + // We no longer need a reference to our context. 1.289 + this.context = null; 1.290 + this.sessionA.finish(this); 1.291 + } catch (Exception e) { 1.292 + // Never mind; best-effort finish. 1.293 + } 1.294 + } 1.295 + // We no longer need a reference to our context. 1.296 + this.context = null; 1.297 + this.delegate.onSynchronizeFailed(this, ex, "Failed to create session"); 1.298 + } 1.299 + 1.300 + /** 1.301 + * I should be called twice: first for sessionA and second for sessionB. 1.302 + * 1.303 + * If I am called for sessionB, I call my delegate's `onInitialized` callback 1.304 + * method because my repository sessions are correctly initialized. 1.305 + */ 1.306 + // TODO: some of this "finish and clean up" code can be refactored out. 1.307 + @Override 1.308 + public void onSessionCreated(RepositorySession session) { 1.309 + if (session == null || 1.310 + this.sessionA == session) { 1.311 + // TODO: clean up sessionA. 1.312 + this.delegate.onSynchronizeFailed(this, new UnexpectedSessionException(session), "Failed to create session."); 1.313 + return; 1.314 + } 1.315 + if (this.sessionA == null) { 1.316 + this.sessionA = session; 1.317 + 1.318 + // Unbundle. 1.319 + try { 1.320 + this.sessionA.unbundle(this.bundleA); 1.321 + } catch (Exception e) { 1.322 + this.delegate.onSynchronizeFailed(this, new UnbundleError(e, sessionA), "Failed to unbundle first session."); 1.323 + // TODO: abort 1.324 + return; 1.325 + } 1.326 + this.getSynchronizer().repositoryB.createSession(this, this.context); 1.327 + return; 1.328 + } 1.329 + if (this.sessionB == null) { 1.330 + this.sessionB = session; 1.331 + // We no longer need a reference to our context. 1.332 + this.context = null; 1.333 + 1.334 + // Unbundle. We unbundled sessionA when that session was created. 1.335 + try { 1.336 + this.sessionB.unbundle(this.bundleB); 1.337 + } catch (Exception e) { 1.338 + this.delegate.onSynchronizeFailed(this, new UnbundleError(e, sessionA), "Failed to unbundle second session."); 1.339 + return; 1.340 + } 1.341 + 1.342 + this.delegate.onInitialized(this); 1.343 + return; 1.344 + } 1.345 + // TODO: need a way to make sure we don't call any more delegate methods. 1.346 + this.delegate.onSynchronizeFailed(this, new UnexpectedSessionException(session), "Failed to create session."); 1.347 + } 1.348 + 1.349 + /* 1.350 + * RepositorySessionFinishDelegate methods. 1.351 + */ 1.352 + 1.353 + /** 1.354 + * I could be called twice: once for sessionA and once for sessionB. 1.355 + * 1.356 + * If sessionB couldn't be created, I don't fail again. 1.357 + */ 1.358 + @Override 1.359 + public void onFinishFailed(Exception ex) { 1.360 + if (this.sessionB == null) { 1.361 + // Ah, it was a problem cleaning up. Never mind. 1.362 + Logger.warn(LOG_TAG, "Got exception cleaning up first after second session creation failed.", ex); 1.363 + return; 1.364 + } 1.365 + String session = (this.sessionA == null) ? "B" : "A"; 1.366 + this.delegate.onSynchronizeFailed(this, ex, "Finish of session " + session + " failed."); 1.367 + } 1.368 + 1.369 + /** 1.370 + * I should be called twice: first for sessionA and second for sessionB. 1.371 + * 1.372 + * If I am called for sessionA, I try to finish sessionB. 1.373 + * 1.374 + * If I am called for sessionB, I call my delegate's `onSynchronized` callback 1.375 + * method because my flows should have completed. 1.376 + */ 1.377 + @Override 1.378 + public void onFinishSucceeded(RepositorySession session, 1.379 + RepositorySessionBundle bundle) { 1.380 + Logger.debug(LOG_TAG, "onFinishSucceeded. Flows? " + flowAToBCompleted + ", " + flowBToACompleted); 1.381 + 1.382 + if (session == sessionA) { 1.383 + if (flowAToBCompleted) { 1.384 + Logger.debug(LOG_TAG, "onFinishSucceeded: bumping session A's timestamp to " + pendingATimestamp + " or " + storeEndATimestamp); 1.385 + bundle.bumpTimestamp(Math.max(pendingATimestamp, storeEndATimestamp)); 1.386 + this.synchronizer.bundleA = bundle; 1.387 + } else { 1.388 + // Should not happen! 1.389 + this.delegate.onSynchronizeFailed(this, new UnexpectedSessionException(sessionA), "Failed to finish first session."); 1.390 + return; 1.391 + } 1.392 + if (this.sessionB != null) { 1.393 + Logger.trace(LOG_TAG, "Finishing session B."); 1.394 + // On to the next. 1.395 + try { 1.396 + this.sessionB.finish(this); 1.397 + } catch (InactiveSessionException e) { 1.398 + this.onFinishFailed(e); 1.399 + return; 1.400 + } 1.401 + } 1.402 + } else if (session == sessionB) { 1.403 + if (flowBToACompleted) { 1.404 + Logger.debug(LOG_TAG, "onFinishSucceeded: bumping session B's timestamp to " + pendingBTimestamp + " or " + storeEndBTimestamp); 1.405 + bundle.bumpTimestamp(Math.max(pendingBTimestamp, storeEndBTimestamp)); 1.406 + this.synchronizer.bundleB = bundle; 1.407 + Logger.trace(LOG_TAG, "Notifying delegate.onSynchronized."); 1.408 + this.delegate.onSynchronized(this); 1.409 + } else { 1.410 + // Should not happen! 1.411 + this.delegate.onSynchronizeFailed(this, new UnexpectedSessionException(sessionB), "Failed to finish second session."); 1.412 + return; 1.413 + } 1.414 + } else { 1.415 + // TODO: hurrrrrr... 1.416 + } 1.417 + 1.418 + if (this.sessionB == null) { 1.419 + this.sessionA = null; // We're done. 1.420 + } 1.421 + } 1.422 + 1.423 + @Override 1.424 + public RepositorySessionFinishDelegate deferredFinishDelegate(final ExecutorService executor) { 1.425 + return new DeferredRepositorySessionFinishDelegate(this, executor); 1.426 + } 1.427 +}