Wed, 31 Dec 2014 07:22:50 +0100
Correct previous dual key logic pending first delivery installment.
1 /* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
5 package org.mozilla.gecko.sync.synchronizer;
8 import java.util.concurrent.ExecutorService;
9 import java.util.concurrent.atomic.AtomicInteger;
11 import org.mozilla.gecko.background.common.log.Logger;
12 import org.mozilla.gecko.sync.repositories.InactiveSessionException;
13 import org.mozilla.gecko.sync.repositories.InvalidSessionTransitionException;
14 import org.mozilla.gecko.sync.repositories.RepositorySession;
15 import org.mozilla.gecko.sync.repositories.RepositorySessionBundle;
16 import org.mozilla.gecko.sync.repositories.delegates.DeferrableRepositorySessionCreationDelegate;
17 import org.mozilla.gecko.sync.repositories.delegates.DeferredRepositorySessionFinishDelegate;
18 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFinishDelegate;
20 import android.content.Context;
22 /**
23 * I coordinate the moving parts of a sync started by
24 * {@link Synchronizer#synchronize}.
25 *
26 * I flow records twice: first from A to B, and then from B to A. I provide
27 * fine-grained feedback by calling my delegate's callback methods.
28 *
29 * Initialize me by creating me with a Synchronizer and a
30 * SynchronizerSessionDelegate. Kick things off by calling `init` with two
31 * RepositorySessionBundles, and then call `synchronize` in your `onInitialized`
32 * callback.
33 *
34 * I always call exactly one of my delegate's `onInitialized` or
35 * `onSessionError` callback methods from `init`.
36 *
37 * I call my delegate's `onSynchronizeSkipped` callback method if there is no
38 * data to be synchronized in `synchronize`.
39 *
40 * In addition, I call `onFetchError`, `onStoreError`, and `onSessionError` when
41 * I encounter a fetch, store, or session error while synchronizing.
42 *
43 * Typically my delegate will call `abort` in its error callbacks, which will
44 * call my delegate's `onSynchronizeAborted` method and halt the sync.
45 *
46 * I always call exactly one of my delegate's `onSynchronized` or
47 * `onSynchronizeFailed` callback methods if I have not seen an error.
48 */
49 public class SynchronizerSession
50 extends DeferrableRepositorySessionCreationDelegate
51 implements RecordsChannelDelegate,
52 RepositorySessionFinishDelegate {
54 protected static final String LOG_TAG = "SynchronizerSession";
55 protected Synchronizer synchronizer;
56 protected SynchronizerSessionDelegate delegate;
57 protected Context context;
59 /*
60 * Computed during init.
61 */
62 private RepositorySession sessionA;
63 private RepositorySession sessionB;
64 private RepositorySessionBundle bundleA;
65 private RepositorySessionBundle bundleB;
67 // Bug 726054: just like desktop, we track our last interaction with the server,
68 // not the last record timestamp that we fetched. This ensures that we don't re-
69 // download the records we just uploaded, at the cost of skipping any records
70 // that a concurrently syncing client has uploaded.
71 private long pendingATimestamp = -1;
72 private long pendingBTimestamp = -1;
73 private long storeEndATimestamp = -1;
74 private long storeEndBTimestamp = -1;
75 private boolean flowAToBCompleted = false;
76 private boolean flowBToACompleted = false;
78 protected final AtomicInteger numInboundRecords = new AtomicInteger(-1);
79 protected final AtomicInteger numOutboundRecords = new AtomicInteger(-1);
81 /*
82 * Public API: constructor, init, synchronize.
83 */
84 public SynchronizerSession(Synchronizer synchronizer, SynchronizerSessionDelegate delegate) {
85 this.setSynchronizer(synchronizer);
86 this.delegate = delegate;
87 }
89 public Synchronizer getSynchronizer() {
90 return synchronizer;
91 }
93 public void setSynchronizer(Synchronizer synchronizer) {
94 this.synchronizer = synchronizer;
95 }
97 public void init(Context context, RepositorySessionBundle bundleA, RepositorySessionBundle bundleB) {
98 this.context = context;
99 this.bundleA = bundleA;
100 this.bundleB = bundleB;
101 // Begin sessionA and sessionB, call onInitialized in callbacks.
102 this.getSynchronizer().repositoryA.createSession(this, context);
103 }
105 /**
106 * Get the number of records fetched from the first repository (usually the
107 * server, hence inbound).
108 * <p>
109 * Valid only after first flow has completed.
110 *
111 * @return number of records, or -1 if not valid.
112 */
113 public int getInboundCount() {
114 return numInboundRecords.get();
115 }
117 /**
118 * Get the number of records fetched from the second repository (usually the
119 * local store, hence outbound).
120 * <p>
121 * Valid only after second flow has completed.
122 *
123 * @return number of records, or -1 if not valid.
124 */
125 public int getOutboundCount() {
126 return numOutboundRecords.get();
127 }
129 // These are accessed by `abort` and `synchronize`, both of which are synchronized.
130 // Guarded by `this`.
131 protected RecordsChannel channelAToB;
132 protected RecordsChannel channelBToA;
134 /**
135 * Please don't call this until you've been notified with onInitialized.
136 */
137 public synchronized void synchronize() {
138 numInboundRecords.set(-1);
139 numOutboundRecords.set(-1);
141 // First thing: decide whether we should.
142 if (sessionA.shouldSkip() ||
143 sessionB.shouldSkip()) {
144 Logger.info(LOG_TAG, "Session requested skip. Short-circuiting sync.");
145 sessionA.abort();
146 sessionB.abort();
147 this.delegate.onSynchronizeSkipped(this);
148 return;
149 }
151 final SynchronizerSession session = this;
153 // TODO: failed record handling.
155 // This is the *second* record channel to flow.
156 // I, SynchronizerSession, am the delegate for the *second* flow.
157 channelBToA = new RecordsChannel(this.sessionB, this.sessionA, this);
159 // This is the delegate for the *first* flow.
160 RecordsChannelDelegate channelAToBDelegate = new RecordsChannelDelegate() {
161 public void onFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd) {
162 session.onFirstFlowCompleted(recordsChannel, fetchEnd, storeEnd);
163 }
165 @Override
166 public void onFlowBeginFailed(RecordsChannel recordsChannel, Exception ex) {
167 Logger.warn(LOG_TAG, "First RecordsChannel onFlowBeginFailed. Logging session error.", ex);
168 session.delegate.onSynchronizeFailed(session, ex, "Failed to begin first flow.");
169 }
171 @Override
172 public void onFlowFetchFailed(RecordsChannel recordsChannel, Exception ex) {
173 Logger.warn(LOG_TAG, "First RecordsChannel onFlowFetchFailed. Logging remote fetch error.", ex);
174 }
176 @Override
177 public void onFlowStoreFailed(RecordsChannel recordsChannel, Exception ex, String recordGuid) {
178 Logger.warn(LOG_TAG, "First RecordsChannel onFlowStoreFailed. Logging local store error.", ex);
179 }
181 @Override
182 public void onFlowFinishFailed(RecordsChannel recordsChannel, Exception ex) {
183 Logger.warn(LOG_TAG, "First RecordsChannel onFlowFinishedFailed. Logging session error.", ex);
184 session.delegate.onSynchronizeFailed(session, ex, "Failed to finish first flow.");
185 }
186 };
188 // This is the *first* channel to flow.
189 channelAToB = new RecordsChannel(this.sessionA, this.sessionB, channelAToBDelegate);
191 Logger.trace(LOG_TAG, "Starting A to B flow. Channel is " + channelAToB);
192 try {
193 channelAToB.beginAndFlow();
194 } catch (InvalidSessionTransitionException e) {
195 onFlowBeginFailed(channelAToB, e);
196 }
197 }
199 /**
200 * Called after the first flow completes.
201 * <p>
202 * By default, any fetch and store failures are ignored.
203 * @param recordsChannel the <code>RecordsChannel</code> (for error testing).
204 * @param fetchEnd timestamp when fetches completed.
205 * @param storeEnd timestamp when stores completed.
206 */
207 public void onFirstFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd) {
208 Logger.trace(LOG_TAG, "First RecordsChannel onFlowCompleted.");
209 Logger.debug(LOG_TAG, "Fetch end is " + fetchEnd + ". Store end is " + storeEnd + ". Starting next.");
210 pendingATimestamp = fetchEnd;
211 storeEndBTimestamp = storeEnd;
212 numInboundRecords.set(recordsChannel.getFetchCount());
213 flowAToBCompleted = true;
214 channelBToA.flow();
215 }
217 /**
218 * Called after the second flow completes.
219 * <p>
220 * By default, any fetch and store failures are ignored.
221 * @param recordsChannel the <code>RecordsChannel</code> (for error testing).
222 * @param fetchEnd timestamp when fetches completed.
223 * @param storeEnd timestamp when stores completed.
224 */
225 public void onSecondFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd) {
226 Logger.trace(LOG_TAG, "Second RecordsChannel onFlowCompleted.");
227 Logger.debug(LOG_TAG, "Fetch end is " + fetchEnd + ". Store end is " + storeEnd + ". Finishing.");
229 pendingBTimestamp = fetchEnd;
230 storeEndATimestamp = storeEnd;
231 numOutboundRecords.set(recordsChannel.getFetchCount());
232 flowBToACompleted = true;
234 // Finish the two sessions.
235 try {
236 this.sessionA.finish(this);
237 } catch (InactiveSessionException e) {
238 this.onFinishFailed(e);
239 return;
240 }
241 }
243 @Override
244 public void onFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd) {
245 onSecondFlowCompleted(recordsChannel, fetchEnd, storeEnd);
246 }
248 @Override
249 public void onFlowBeginFailed(RecordsChannel recordsChannel, Exception ex) {
250 Logger.warn(LOG_TAG, "Second RecordsChannel onFlowBeginFailed. Logging session error.", ex);
251 this.delegate.onSynchronizeFailed(this, ex, "Failed to begin second flow.");
252 }
254 @Override
255 public void onFlowFetchFailed(RecordsChannel recordsChannel, Exception ex) {
256 Logger.warn(LOG_TAG, "Second RecordsChannel onFlowFetchFailed. Logging local fetch error.", ex);
257 }
259 @Override
260 public void onFlowStoreFailed(RecordsChannel recordsChannel, Exception ex, String recordGuid) {
261 Logger.warn(LOG_TAG, "Second RecordsChannel onFlowStoreFailed. Logging remote store error.", ex);
262 }
264 @Override
265 public void onFlowFinishFailed(RecordsChannel recordsChannel, Exception ex) {
266 Logger.warn(LOG_TAG, "Second RecordsChannel onFlowFinishedFailed. Logging session error.", ex);
267 this.delegate.onSynchronizeFailed(this, ex, "Failed to finish second flow.");
268 }
270 /*
271 * RepositorySessionCreationDelegate methods.
272 */
274 /**
275 * I could be called twice: once for sessionA and once for sessionB.
276 *
277 * I try to clean up sessionA if it is not null, since the creation of
278 * sessionB must have failed.
279 */
280 @Override
281 public void onSessionCreateFailed(Exception ex) {
282 // Attempt to finish the first session, if the second is the one that failed.
283 if (this.sessionA != null) {
284 try {
285 // We no longer need a reference to our context.
286 this.context = null;
287 this.sessionA.finish(this);
288 } catch (Exception e) {
289 // Never mind; best-effort finish.
290 }
291 }
292 // We no longer need a reference to our context.
293 this.context = null;
294 this.delegate.onSynchronizeFailed(this, ex, "Failed to create session");
295 }
297 /**
298 * I should be called twice: first for sessionA and second for sessionB.
299 *
300 * If I am called for sessionB, I call my delegate's `onInitialized` callback
301 * method because my repository sessions are correctly initialized.
302 */
303 // TODO: some of this "finish and clean up" code can be refactored out.
304 @Override
305 public void onSessionCreated(RepositorySession session) {
306 if (session == null ||
307 this.sessionA == session) {
308 // TODO: clean up sessionA.
309 this.delegate.onSynchronizeFailed(this, new UnexpectedSessionException(session), "Failed to create session.");
310 return;
311 }
312 if (this.sessionA == null) {
313 this.sessionA = session;
315 // Unbundle.
316 try {
317 this.sessionA.unbundle(this.bundleA);
318 } catch (Exception e) {
319 this.delegate.onSynchronizeFailed(this, new UnbundleError(e, sessionA), "Failed to unbundle first session.");
320 // TODO: abort
321 return;
322 }
323 this.getSynchronizer().repositoryB.createSession(this, this.context);
324 return;
325 }
326 if (this.sessionB == null) {
327 this.sessionB = session;
328 // We no longer need a reference to our context.
329 this.context = null;
331 // Unbundle. We unbundled sessionA when that session was created.
332 try {
333 this.sessionB.unbundle(this.bundleB);
334 } catch (Exception e) {
335 this.delegate.onSynchronizeFailed(this, new UnbundleError(e, sessionA), "Failed to unbundle second session.");
336 return;
337 }
339 this.delegate.onInitialized(this);
340 return;
341 }
342 // TODO: need a way to make sure we don't call any more delegate methods.
343 this.delegate.onSynchronizeFailed(this, new UnexpectedSessionException(session), "Failed to create session.");
344 }
346 /*
347 * RepositorySessionFinishDelegate methods.
348 */
350 /**
351 * I could be called twice: once for sessionA and once for sessionB.
352 *
353 * If sessionB couldn't be created, I don't fail again.
354 */
355 @Override
356 public void onFinishFailed(Exception ex) {
357 if (this.sessionB == null) {
358 // Ah, it was a problem cleaning up. Never mind.
359 Logger.warn(LOG_TAG, "Got exception cleaning up first after second session creation failed.", ex);
360 return;
361 }
362 String session = (this.sessionA == null) ? "B" : "A";
363 this.delegate.onSynchronizeFailed(this, ex, "Finish of session " + session + " failed.");
364 }
366 /**
367 * I should be called twice: first for sessionA and second for sessionB.
368 *
369 * If I am called for sessionA, I try to finish sessionB.
370 *
371 * If I am called for sessionB, I call my delegate's `onSynchronized` callback
372 * method because my flows should have completed.
373 */
374 @Override
375 public void onFinishSucceeded(RepositorySession session,
376 RepositorySessionBundle bundle) {
377 Logger.debug(LOG_TAG, "onFinishSucceeded. Flows? " + flowAToBCompleted + ", " + flowBToACompleted);
379 if (session == sessionA) {
380 if (flowAToBCompleted) {
381 Logger.debug(LOG_TAG, "onFinishSucceeded: bumping session A's timestamp to " + pendingATimestamp + " or " + storeEndATimestamp);
382 bundle.bumpTimestamp(Math.max(pendingATimestamp, storeEndATimestamp));
383 this.synchronizer.bundleA = bundle;
384 } else {
385 // Should not happen!
386 this.delegate.onSynchronizeFailed(this, new UnexpectedSessionException(sessionA), "Failed to finish first session.");
387 return;
388 }
389 if (this.sessionB != null) {
390 Logger.trace(LOG_TAG, "Finishing session B.");
391 // On to the next.
392 try {
393 this.sessionB.finish(this);
394 } catch (InactiveSessionException e) {
395 this.onFinishFailed(e);
396 return;
397 }
398 }
399 } else if (session == sessionB) {
400 if (flowBToACompleted) {
401 Logger.debug(LOG_TAG, "onFinishSucceeded: bumping session B's timestamp to " + pendingBTimestamp + " or " + storeEndBTimestamp);
402 bundle.bumpTimestamp(Math.max(pendingBTimestamp, storeEndBTimestamp));
403 this.synchronizer.bundleB = bundle;
404 Logger.trace(LOG_TAG, "Notifying delegate.onSynchronized.");
405 this.delegate.onSynchronized(this);
406 } else {
407 // Should not happen!
408 this.delegate.onSynchronizeFailed(this, new UnexpectedSessionException(sessionB), "Failed to finish second session.");
409 return;
410 }
411 } else {
412 // TODO: hurrrrrr...
413 }
415 if (this.sessionB == null) {
416 this.sessionA = null; // We're done.
417 }
418 }
420 @Override
421 public RepositorySessionFinishDelegate deferredFinishDelegate(final ExecutorService executor) {
422 return new DeferredRepositorySessionFinishDelegate(this, executor);
423 }
424 }