|
1 /* This Source Code Form is subject to the terms of the Mozilla Public |
|
2 * License, v. 2.0. If a copy of the MPL was not distributed with this |
|
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ |
|
4 |
|
5 package org.mozilla.gecko.sync.synchronizer; |
|
6 |
|
7 |
|
8 import java.util.concurrent.ExecutorService; |
|
9 import java.util.concurrent.atomic.AtomicInteger; |
|
10 |
|
11 import org.mozilla.gecko.background.common.log.Logger; |
|
12 import org.mozilla.gecko.sync.repositories.InactiveSessionException; |
|
13 import org.mozilla.gecko.sync.repositories.InvalidSessionTransitionException; |
|
14 import org.mozilla.gecko.sync.repositories.RepositorySession; |
|
15 import org.mozilla.gecko.sync.repositories.RepositorySessionBundle; |
|
16 import org.mozilla.gecko.sync.repositories.delegates.DeferrableRepositorySessionCreationDelegate; |
|
17 import org.mozilla.gecko.sync.repositories.delegates.DeferredRepositorySessionFinishDelegate; |
|
18 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFinishDelegate; |
|
19 |
|
20 import android.content.Context; |
|
21 |
|
22 /** |
|
23 * I coordinate the moving parts of a sync started by |
|
24 * {@link Synchronizer#synchronize}. |
|
25 * |
|
26 * I flow records twice: first from A to B, and then from B to A. I provide |
|
27 * fine-grained feedback by calling my delegate's callback methods. |
|
28 * |
|
29 * Initialize me by creating me with a Synchronizer and a |
|
30 * SynchronizerSessionDelegate. Kick things off by calling `init` with two |
|
31 * RepositorySessionBundles, and then call `synchronize` in your `onInitialized` |
|
32 * callback. |
|
33 * |
|
34 * I always call exactly one of my delegate's `onInitialized` or |
|
35 * `onSessionError` callback methods from `init`. |
|
36 * |
|
37 * I call my delegate's `onSynchronizeSkipped` callback method if there is no |
|
38 * data to be synchronized in `synchronize`. |
|
39 * |
|
40 * In addition, I call `onFetchError`, `onStoreError`, and `onSessionError` when |
|
41 * I encounter a fetch, store, or session error while synchronizing. |
|
42 * |
|
43 * Typically my delegate will call `abort` in its error callbacks, which will |
|
44 * call my delegate's `onSynchronizeAborted` method and halt the sync. |
|
45 * |
|
46 * I always call exactly one of my delegate's `onSynchronized` or |
|
47 * `onSynchronizeFailed` callback methods if I have not seen an error. |
|
48 */ |
|
49 public class SynchronizerSession |
|
50 extends DeferrableRepositorySessionCreationDelegate |
|
51 implements RecordsChannelDelegate, |
|
52 RepositorySessionFinishDelegate { |
|
53 |
|
54 protected static final String LOG_TAG = "SynchronizerSession"; |
|
55 protected Synchronizer synchronizer; |
|
56 protected SynchronizerSessionDelegate delegate; |
|
57 protected Context context; |
|
58 |
|
59 /* |
|
60 * Computed during init. |
|
61 */ |
|
62 private RepositorySession sessionA; |
|
63 private RepositorySession sessionB; |
|
64 private RepositorySessionBundle bundleA; |
|
65 private RepositorySessionBundle bundleB; |
|
66 |
|
67 // Bug 726054: just like desktop, we track our last interaction with the server, |
|
68 // not the last record timestamp that we fetched. This ensures that we don't re- |
|
69 // download the records we just uploaded, at the cost of skipping any records |
|
70 // that a concurrently syncing client has uploaded. |
|
71 private long pendingATimestamp = -1; |
|
72 private long pendingBTimestamp = -1; |
|
73 private long storeEndATimestamp = -1; |
|
74 private long storeEndBTimestamp = -1; |
|
75 private boolean flowAToBCompleted = false; |
|
76 private boolean flowBToACompleted = false; |
|
77 |
|
78 protected final AtomicInteger numInboundRecords = new AtomicInteger(-1); |
|
79 protected final AtomicInteger numOutboundRecords = new AtomicInteger(-1); |
|
80 |
|
81 /* |
|
82 * Public API: constructor, init, synchronize. |
|
83 */ |
|
84 public SynchronizerSession(Synchronizer synchronizer, SynchronizerSessionDelegate delegate) { |
|
85 this.setSynchronizer(synchronizer); |
|
86 this.delegate = delegate; |
|
87 } |
|
88 |
|
89 public Synchronizer getSynchronizer() { |
|
90 return synchronizer; |
|
91 } |
|
92 |
|
93 public void setSynchronizer(Synchronizer synchronizer) { |
|
94 this.synchronizer = synchronizer; |
|
95 } |
|
96 |
|
97 public void init(Context context, RepositorySessionBundle bundleA, RepositorySessionBundle bundleB) { |
|
98 this.context = context; |
|
99 this.bundleA = bundleA; |
|
100 this.bundleB = bundleB; |
|
101 // Begin sessionA and sessionB, call onInitialized in callbacks. |
|
102 this.getSynchronizer().repositoryA.createSession(this, context); |
|
103 } |
|
104 |
|
105 /** |
|
106 * Get the number of records fetched from the first repository (usually the |
|
107 * server, hence inbound). |
|
108 * <p> |
|
109 * Valid only after first flow has completed. |
|
110 * |
|
111 * @return number of records, or -1 if not valid. |
|
112 */ |
|
113 public int getInboundCount() { |
|
114 return numInboundRecords.get(); |
|
115 } |
|
116 |
|
117 /** |
|
118 * Get the number of records fetched from the second repository (usually the |
|
119 * local store, hence outbound). |
|
120 * <p> |
|
121 * Valid only after second flow has completed. |
|
122 * |
|
123 * @return number of records, or -1 if not valid. |
|
124 */ |
|
125 public int getOutboundCount() { |
|
126 return numOutboundRecords.get(); |
|
127 } |
|
128 |
|
129 // These are accessed by `abort` and `synchronize`, both of which are synchronized. |
|
130 // Guarded by `this`. |
|
131 protected RecordsChannel channelAToB; |
|
132 protected RecordsChannel channelBToA; |
|
133 |
|
134 /** |
|
135 * Please don't call this until you've been notified with onInitialized. |
|
136 */ |
|
137 public synchronized void synchronize() { |
|
138 numInboundRecords.set(-1); |
|
139 numOutboundRecords.set(-1); |
|
140 |
|
141 // First thing: decide whether we should. |
|
142 if (sessionA.shouldSkip() || |
|
143 sessionB.shouldSkip()) { |
|
144 Logger.info(LOG_TAG, "Session requested skip. Short-circuiting sync."); |
|
145 sessionA.abort(); |
|
146 sessionB.abort(); |
|
147 this.delegate.onSynchronizeSkipped(this); |
|
148 return; |
|
149 } |
|
150 |
|
151 final SynchronizerSession session = this; |
|
152 |
|
153 // TODO: failed record handling. |
|
154 |
|
155 // This is the *second* record channel to flow. |
|
156 // I, SynchronizerSession, am the delegate for the *second* flow. |
|
157 channelBToA = new RecordsChannel(this.sessionB, this.sessionA, this); |
|
158 |
|
159 // This is the delegate for the *first* flow. |
|
160 RecordsChannelDelegate channelAToBDelegate = new RecordsChannelDelegate() { |
|
161 public void onFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd) { |
|
162 session.onFirstFlowCompleted(recordsChannel, fetchEnd, storeEnd); |
|
163 } |
|
164 |
|
165 @Override |
|
166 public void onFlowBeginFailed(RecordsChannel recordsChannel, Exception ex) { |
|
167 Logger.warn(LOG_TAG, "First RecordsChannel onFlowBeginFailed. Logging session error.", ex); |
|
168 session.delegate.onSynchronizeFailed(session, ex, "Failed to begin first flow."); |
|
169 } |
|
170 |
|
171 @Override |
|
172 public void onFlowFetchFailed(RecordsChannel recordsChannel, Exception ex) { |
|
173 Logger.warn(LOG_TAG, "First RecordsChannel onFlowFetchFailed. Logging remote fetch error.", ex); |
|
174 } |
|
175 |
|
176 @Override |
|
177 public void onFlowStoreFailed(RecordsChannel recordsChannel, Exception ex, String recordGuid) { |
|
178 Logger.warn(LOG_TAG, "First RecordsChannel onFlowStoreFailed. Logging local store error.", ex); |
|
179 } |
|
180 |
|
181 @Override |
|
182 public void onFlowFinishFailed(RecordsChannel recordsChannel, Exception ex) { |
|
183 Logger.warn(LOG_TAG, "First RecordsChannel onFlowFinishedFailed. Logging session error.", ex); |
|
184 session.delegate.onSynchronizeFailed(session, ex, "Failed to finish first flow."); |
|
185 } |
|
186 }; |
|
187 |
|
188 // This is the *first* channel to flow. |
|
189 channelAToB = new RecordsChannel(this.sessionA, this.sessionB, channelAToBDelegate); |
|
190 |
|
191 Logger.trace(LOG_TAG, "Starting A to B flow. Channel is " + channelAToB); |
|
192 try { |
|
193 channelAToB.beginAndFlow(); |
|
194 } catch (InvalidSessionTransitionException e) { |
|
195 onFlowBeginFailed(channelAToB, e); |
|
196 } |
|
197 } |
|
198 |
|
199 /** |
|
200 * Called after the first flow completes. |
|
201 * <p> |
|
202 * By default, any fetch and store failures are ignored. |
|
203 * @param recordsChannel the <code>RecordsChannel</code> (for error testing). |
|
204 * @param fetchEnd timestamp when fetches completed. |
|
205 * @param storeEnd timestamp when stores completed. |
|
206 */ |
|
207 public void onFirstFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd) { |
|
208 Logger.trace(LOG_TAG, "First RecordsChannel onFlowCompleted."); |
|
209 Logger.debug(LOG_TAG, "Fetch end is " + fetchEnd + ". Store end is " + storeEnd + ". Starting next."); |
|
210 pendingATimestamp = fetchEnd; |
|
211 storeEndBTimestamp = storeEnd; |
|
212 numInboundRecords.set(recordsChannel.getFetchCount()); |
|
213 flowAToBCompleted = true; |
|
214 channelBToA.flow(); |
|
215 } |
|
216 |
|
217 /** |
|
218 * Called after the second flow completes. |
|
219 * <p> |
|
220 * By default, any fetch and store failures are ignored. |
|
221 * @param recordsChannel the <code>RecordsChannel</code> (for error testing). |
|
222 * @param fetchEnd timestamp when fetches completed. |
|
223 * @param storeEnd timestamp when stores completed. |
|
224 */ |
|
225 public void onSecondFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd) { |
|
226 Logger.trace(LOG_TAG, "Second RecordsChannel onFlowCompleted."); |
|
227 Logger.debug(LOG_TAG, "Fetch end is " + fetchEnd + ". Store end is " + storeEnd + ". Finishing."); |
|
228 |
|
229 pendingBTimestamp = fetchEnd; |
|
230 storeEndATimestamp = storeEnd; |
|
231 numOutboundRecords.set(recordsChannel.getFetchCount()); |
|
232 flowBToACompleted = true; |
|
233 |
|
234 // Finish the two sessions. |
|
235 try { |
|
236 this.sessionA.finish(this); |
|
237 } catch (InactiveSessionException e) { |
|
238 this.onFinishFailed(e); |
|
239 return; |
|
240 } |
|
241 } |
|
242 |
|
243 @Override |
|
244 public void onFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd) { |
|
245 onSecondFlowCompleted(recordsChannel, fetchEnd, storeEnd); |
|
246 } |
|
247 |
|
248 @Override |
|
249 public void onFlowBeginFailed(RecordsChannel recordsChannel, Exception ex) { |
|
250 Logger.warn(LOG_TAG, "Second RecordsChannel onFlowBeginFailed. Logging session error.", ex); |
|
251 this.delegate.onSynchronizeFailed(this, ex, "Failed to begin second flow."); |
|
252 } |
|
253 |
|
254 @Override |
|
255 public void onFlowFetchFailed(RecordsChannel recordsChannel, Exception ex) { |
|
256 Logger.warn(LOG_TAG, "Second RecordsChannel onFlowFetchFailed. Logging local fetch error.", ex); |
|
257 } |
|
258 |
|
259 @Override |
|
260 public void onFlowStoreFailed(RecordsChannel recordsChannel, Exception ex, String recordGuid) { |
|
261 Logger.warn(LOG_TAG, "Second RecordsChannel onFlowStoreFailed. Logging remote store error.", ex); |
|
262 } |
|
263 |
|
264 @Override |
|
265 public void onFlowFinishFailed(RecordsChannel recordsChannel, Exception ex) { |
|
266 Logger.warn(LOG_TAG, "Second RecordsChannel onFlowFinishedFailed. Logging session error.", ex); |
|
267 this.delegate.onSynchronizeFailed(this, ex, "Failed to finish second flow."); |
|
268 } |
|
269 |
|
270 /* |
|
271 * RepositorySessionCreationDelegate methods. |
|
272 */ |
|
273 |
|
274 /** |
|
275 * I could be called twice: once for sessionA and once for sessionB. |
|
276 * |
|
277 * I try to clean up sessionA if it is not null, since the creation of |
|
278 * sessionB must have failed. |
|
279 */ |
|
280 @Override |
|
281 public void onSessionCreateFailed(Exception ex) { |
|
282 // Attempt to finish the first session, if the second is the one that failed. |
|
283 if (this.sessionA != null) { |
|
284 try { |
|
285 // We no longer need a reference to our context. |
|
286 this.context = null; |
|
287 this.sessionA.finish(this); |
|
288 } catch (Exception e) { |
|
289 // Never mind; best-effort finish. |
|
290 } |
|
291 } |
|
292 // We no longer need a reference to our context. |
|
293 this.context = null; |
|
294 this.delegate.onSynchronizeFailed(this, ex, "Failed to create session"); |
|
295 } |
|
296 |
|
297 /** |
|
298 * I should be called twice: first for sessionA and second for sessionB. |
|
299 * |
|
300 * If I am called for sessionB, I call my delegate's `onInitialized` callback |
|
301 * method because my repository sessions are correctly initialized. |
|
302 */ |
|
303 // TODO: some of this "finish and clean up" code can be refactored out. |
|
304 @Override |
|
305 public void onSessionCreated(RepositorySession session) { |
|
306 if (session == null || |
|
307 this.sessionA == session) { |
|
308 // TODO: clean up sessionA. |
|
309 this.delegate.onSynchronizeFailed(this, new UnexpectedSessionException(session), "Failed to create session."); |
|
310 return; |
|
311 } |
|
312 if (this.sessionA == null) { |
|
313 this.sessionA = session; |
|
314 |
|
315 // Unbundle. |
|
316 try { |
|
317 this.sessionA.unbundle(this.bundleA); |
|
318 } catch (Exception e) { |
|
319 this.delegate.onSynchronizeFailed(this, new UnbundleError(e, sessionA), "Failed to unbundle first session."); |
|
320 // TODO: abort |
|
321 return; |
|
322 } |
|
323 this.getSynchronizer().repositoryB.createSession(this, this.context); |
|
324 return; |
|
325 } |
|
326 if (this.sessionB == null) { |
|
327 this.sessionB = session; |
|
328 // We no longer need a reference to our context. |
|
329 this.context = null; |
|
330 |
|
331 // Unbundle. We unbundled sessionA when that session was created. |
|
332 try { |
|
333 this.sessionB.unbundle(this.bundleB); |
|
334 } catch (Exception e) { |
|
335 this.delegate.onSynchronizeFailed(this, new UnbundleError(e, sessionA), "Failed to unbundle second session."); |
|
336 return; |
|
337 } |
|
338 |
|
339 this.delegate.onInitialized(this); |
|
340 return; |
|
341 } |
|
342 // TODO: need a way to make sure we don't call any more delegate methods. |
|
343 this.delegate.onSynchronizeFailed(this, new UnexpectedSessionException(session), "Failed to create session."); |
|
344 } |
|
345 |
|
346 /* |
|
347 * RepositorySessionFinishDelegate methods. |
|
348 */ |
|
349 |
|
350 /** |
|
351 * I could be called twice: once for sessionA and once for sessionB. |
|
352 * |
|
353 * If sessionB couldn't be created, I don't fail again. |
|
354 */ |
|
355 @Override |
|
356 public void onFinishFailed(Exception ex) { |
|
357 if (this.sessionB == null) { |
|
358 // Ah, it was a problem cleaning up. Never mind. |
|
359 Logger.warn(LOG_TAG, "Got exception cleaning up first after second session creation failed.", ex); |
|
360 return; |
|
361 } |
|
362 String session = (this.sessionA == null) ? "B" : "A"; |
|
363 this.delegate.onSynchronizeFailed(this, ex, "Finish of session " + session + " failed."); |
|
364 } |
|
365 |
|
366 /** |
|
367 * I should be called twice: first for sessionA and second for sessionB. |
|
368 * |
|
369 * If I am called for sessionA, I try to finish sessionB. |
|
370 * |
|
371 * If I am called for sessionB, I call my delegate's `onSynchronized` callback |
|
372 * method because my flows should have completed. |
|
373 */ |
|
374 @Override |
|
375 public void onFinishSucceeded(RepositorySession session, |
|
376 RepositorySessionBundle bundle) { |
|
377 Logger.debug(LOG_TAG, "onFinishSucceeded. Flows? " + flowAToBCompleted + ", " + flowBToACompleted); |
|
378 |
|
379 if (session == sessionA) { |
|
380 if (flowAToBCompleted) { |
|
381 Logger.debug(LOG_TAG, "onFinishSucceeded: bumping session A's timestamp to " + pendingATimestamp + " or " + storeEndATimestamp); |
|
382 bundle.bumpTimestamp(Math.max(pendingATimestamp, storeEndATimestamp)); |
|
383 this.synchronizer.bundleA = bundle; |
|
384 } else { |
|
385 // Should not happen! |
|
386 this.delegate.onSynchronizeFailed(this, new UnexpectedSessionException(sessionA), "Failed to finish first session."); |
|
387 return; |
|
388 } |
|
389 if (this.sessionB != null) { |
|
390 Logger.trace(LOG_TAG, "Finishing session B."); |
|
391 // On to the next. |
|
392 try { |
|
393 this.sessionB.finish(this); |
|
394 } catch (InactiveSessionException e) { |
|
395 this.onFinishFailed(e); |
|
396 return; |
|
397 } |
|
398 } |
|
399 } else if (session == sessionB) { |
|
400 if (flowBToACompleted) { |
|
401 Logger.debug(LOG_TAG, "onFinishSucceeded: bumping session B's timestamp to " + pendingBTimestamp + " or " + storeEndBTimestamp); |
|
402 bundle.bumpTimestamp(Math.max(pendingBTimestamp, storeEndBTimestamp)); |
|
403 this.synchronizer.bundleB = bundle; |
|
404 Logger.trace(LOG_TAG, "Notifying delegate.onSynchronized."); |
|
405 this.delegate.onSynchronized(this); |
|
406 } else { |
|
407 // Should not happen! |
|
408 this.delegate.onSynchronizeFailed(this, new UnexpectedSessionException(sessionB), "Failed to finish second session."); |
|
409 return; |
|
410 } |
|
411 } else { |
|
412 // TODO: hurrrrrr... |
|
413 } |
|
414 |
|
415 if (this.sessionB == null) { |
|
416 this.sessionA = null; // We're done. |
|
417 } |
|
418 } |
|
419 |
|
420 @Override |
|
421 public RepositorySessionFinishDelegate deferredFinishDelegate(final ExecutorService executor) { |
|
422 return new DeferredRepositorySessionFinishDelegate(this, executor); |
|
423 } |
|
424 } |