|
1 /* This Source Code Form is subject to the terms of the Mozilla Public |
|
2 * License, v. 2.0. If a copy of the MPL was not distributed with this |
|
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ |
|
4 |
|
5 package org.mozilla.gecko.sync.repositories; |
|
6 |
|
7 import java.util.ArrayList; |
|
8 import java.util.Collection; |
|
9 import java.util.Iterator; |
|
10 import java.util.concurrent.ExecutorService; |
|
11 import java.util.concurrent.Executors; |
|
12 |
|
13 import org.mozilla.gecko.background.common.log.Logger; |
|
14 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionBeginDelegate; |
|
15 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate; |
|
16 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFinishDelegate; |
|
17 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionGuidsSinceDelegate; |
|
18 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate; |
|
19 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionWipeDelegate; |
|
20 import org.mozilla.gecko.sync.repositories.domain.Record; |
|
21 |
|
22 /** |
|
23 * A <code>RepositorySession</code> is created and used thusly: |
|
24 * |
|
25 *<ul> |
|
26 * <li>Construct, with a reference to its parent {@link Repository}, by calling |
|
27 * {@link Repository#createSession(RepositorySessionCreationDelegate, android.content.Context)}.</li> |
|
28 * <li>Populate with saved information by calling {@link #unbundle(RepositorySessionBundle)}.</li> |
|
29 * <li>Begin a sync by calling {@link #begin(RepositorySessionBeginDelegate)}. <code>begin()</code> |
|
30 * is an appropriate place to initialize expensive resources.</li> |
|
31 * <li>Perform operations such as {@link #fetchSince(long, RepositorySessionFetchRecordsDelegate)} and |
|
32 * {@link #store(Record)}.</li> |
|
33 * <li>Finish by calling {@link #finish(RepositorySessionFinishDelegate)}, retrieving and storing |
|
34 * the current bundle.</li> |
|
35 *</ul> |
|
36 * |
|
37 * If <code>finish()</code> is not called, {@link #abort()} must be called. These calls must |
|
38 * <em>always</em> be paired with <code>begin()</code>. |
|
39 * |
|
40 */ |
|
41 public abstract class RepositorySession { |
|
42 |
|
43 public enum SessionStatus { |
|
44 UNSTARTED, |
|
45 ACTIVE, |
|
46 ABORTED, |
|
47 DONE |
|
48 } |
|
49 |
|
50 private static final String LOG_TAG = "RepositorySession"; |
|
51 |
|
52 protected static void trace(String message) { |
|
53 Logger.trace(LOG_TAG, message); |
|
54 } |
|
55 |
|
56 private SessionStatus status = SessionStatus.UNSTARTED; |
|
57 protected Repository repository; |
|
58 protected RepositorySessionStoreDelegate delegate; |
|
59 |
|
60 /** |
|
61 * A queue of Runnables which call out into delegates. |
|
62 */ |
|
63 protected ExecutorService delegateQueue = Executors.newSingleThreadExecutor(); |
|
64 |
|
65 /** |
|
66 * A queue of Runnables which effect storing. |
|
67 * This includes actual store work, and also the consequences of storeDone. |
|
68 * This provides strict ordering. |
|
69 */ |
|
70 protected ExecutorService storeWorkQueue = Executors.newSingleThreadExecutor(); |
|
71 |
|
72 // The time that the last sync on this collection completed, in milliseconds since epoch. |
|
73 private long lastSyncTimestamp = 0; |
|
74 |
|
75 public long getLastSyncTimestamp() { |
|
76 return lastSyncTimestamp; |
|
77 } |
|
78 |
|
79 public static long now() { |
|
80 return System.currentTimeMillis(); |
|
81 } |
|
82 |
|
83 public RepositorySession(Repository repository) { |
|
84 this.repository = repository; |
|
85 } |
|
86 |
|
87 public abstract void guidsSince(long timestamp, RepositorySessionGuidsSinceDelegate delegate); |
|
88 public abstract void fetchSince(long timestamp, RepositorySessionFetchRecordsDelegate delegate); |
|
89 public abstract void fetch(String[] guids, RepositorySessionFetchRecordsDelegate delegate) throws InactiveSessionException; |
|
90 public abstract void fetchAll(RepositorySessionFetchRecordsDelegate delegate); |
|
91 |
|
92 /** |
|
93 * Override this if you wish to short-circuit a sync when you know -- |
|
94 * e.g., by inspecting the database or info/collections -- that no new |
|
95 * data are available. |
|
96 * |
|
97 * @return true if a sync should proceed. |
|
98 */ |
|
99 public boolean dataAvailable() { |
|
100 return true; |
|
101 } |
|
102 |
|
103 /** |
|
104 * @return true if we cannot safely sync from this <code>RepositorySession</code>. |
|
105 */ |
|
106 public boolean shouldSkip() { |
|
107 return false; |
|
108 } |
|
109 |
|
110 /* |
|
111 * Store operations proceed thusly: |
|
112 * |
|
113 * * Set a delegate |
|
114 * * Store an arbitrary number of records. At any time the delegate can be |
|
115 * notified of an error. |
|
116 * * Call storeDone to notify the session that no more items are forthcoming. |
|
117 * * The store delegate will be notified of error or completion. |
|
118 * |
|
119 * This arrangement of calls allows for batching at the session level. |
|
120 * |
|
121 * Store success calls are not guaranteed. |
|
122 */ |
|
123 public void setStoreDelegate(RepositorySessionStoreDelegate delegate) { |
|
124 Logger.debug(LOG_TAG, "Setting store delegate to " + delegate); |
|
125 this.delegate = delegate; |
|
126 } |
|
127 public abstract void store(Record record) throws NoStoreDelegateException; |
|
128 |
|
129 public void storeDone() { |
|
130 // Our default behavior will be to assume that the Runnable is |
|
131 // executed as soon as all the stores synchronously finish, so |
|
132 // our end timestamp can just be… now. |
|
133 storeDone(now()); |
|
134 } |
|
135 |
|
136 public void storeDone(final long end) { |
|
137 Logger.debug(LOG_TAG, "Scheduling onStoreCompleted for after storing is done: " + end); |
|
138 Runnable command = new Runnable() { |
|
139 @Override |
|
140 public void run() { |
|
141 delegate.onStoreCompleted(end); |
|
142 } |
|
143 }; |
|
144 storeWorkQueue.execute(command); |
|
145 } |
|
146 |
|
147 public abstract void wipe(RepositorySessionWipeDelegate delegate); |
|
148 |
|
149 /** |
|
150 * Synchronously perform the shared work of beginning. Throws on failure. |
|
151 * @throws InvalidSessionTransitionException |
|
152 * |
|
153 */ |
|
154 protected void sharedBegin() throws InvalidSessionTransitionException { |
|
155 Logger.debug(LOG_TAG, "Shared begin."); |
|
156 if (delegateQueue.isShutdown()) { |
|
157 throw new InvalidSessionTransitionException(null); |
|
158 } |
|
159 if (storeWorkQueue.isShutdown()) { |
|
160 throw new InvalidSessionTransitionException(null); |
|
161 } |
|
162 this.transitionFrom(SessionStatus.UNSTARTED, SessionStatus.ACTIVE); |
|
163 } |
|
164 |
|
165 /** |
|
166 * Start the session. This is an appropriate place to initialize |
|
167 * data access components such as database handles. |
|
168 * |
|
169 * @param delegate |
|
170 * @throws InvalidSessionTransitionException |
|
171 */ |
|
172 public void begin(RepositorySessionBeginDelegate delegate) throws InvalidSessionTransitionException { |
|
173 sharedBegin(); |
|
174 delegate.deferredBeginDelegate(delegateQueue).onBeginSucceeded(this); |
|
175 } |
|
176 |
|
177 public void unbundle(RepositorySessionBundle bundle) { |
|
178 this.lastSyncTimestamp = bundle == null ? 0 : bundle.getTimestamp(); |
|
179 } |
|
180 |
|
181 /** |
|
182 * Override this in your subclasses to return values to save between sessions. |
|
183 * Note that RepositorySession automatically bumps the timestamp to the time |
|
184 * the last sync began. If unbundled but not begun, this will be the same as the |
|
185 * value in the input bundle. |
|
186 * |
|
187 * The Synchronizer most likely wants to bump the bundle timestamp to be a value |
|
188 * return from a fetch call. |
|
189 */ |
|
190 protected RepositorySessionBundle getBundle() { |
|
191 // Why don't we just persist the old bundle? |
|
192 long timestamp = getLastSyncTimestamp(); |
|
193 RepositorySessionBundle bundle = new RepositorySessionBundle(timestamp); |
|
194 Logger.debug(LOG_TAG, "Setting bundle timestamp to " + timestamp + "."); |
|
195 |
|
196 return bundle; |
|
197 } |
|
198 |
|
199 /** |
|
200 * Just like finish(), but doesn't do any work that should only be performed |
|
201 * at the end of a successful sync, and can be called any time. |
|
202 */ |
|
203 public void abort(RepositorySessionFinishDelegate delegate) { |
|
204 this.abort(); |
|
205 delegate.deferredFinishDelegate(delegateQueue).onFinishSucceeded(this, this.getBundle()); |
|
206 } |
|
207 |
|
208 /** |
|
209 * Abnormally terminate the repository session, freeing or closing |
|
210 * any resources that were opened during the lifetime of the session. |
|
211 */ |
|
212 public void abort() { |
|
213 // TODO: do something here. |
|
214 this.setStatus(SessionStatus.ABORTED); |
|
215 try { |
|
216 storeWorkQueue.shutdownNow(); |
|
217 } catch (Exception e) { |
|
218 Logger.error(LOG_TAG, "Caught exception shutting down store work queue.", e); |
|
219 } |
|
220 try { |
|
221 delegateQueue.shutdown(); |
|
222 } catch (Exception e) { |
|
223 Logger.error(LOG_TAG, "Caught exception shutting down delegate queue.", e); |
|
224 } |
|
225 } |
|
226 |
|
227 /** |
|
228 * End the repository session, freeing or closing any resources |
|
229 * that were opened during the lifetime of the session. |
|
230 * |
|
231 * @param delegate notified of success or failure. |
|
232 * @throws InactiveSessionException |
|
233 */ |
|
234 public void finish(final RepositorySessionFinishDelegate delegate) throws InactiveSessionException { |
|
235 try { |
|
236 this.transitionFrom(SessionStatus.ACTIVE, SessionStatus.DONE); |
|
237 delegate.deferredFinishDelegate(delegateQueue).onFinishSucceeded(this, this.getBundle()); |
|
238 } catch (InvalidSessionTransitionException e) { |
|
239 Logger.error(LOG_TAG, "Tried to finish() an unstarted or already finished session"); |
|
240 throw new InactiveSessionException(e); |
|
241 } |
|
242 |
|
243 Logger.trace(LOG_TAG, "Shutting down work queues."); |
|
244 storeWorkQueue.shutdown(); |
|
245 delegateQueue.shutdown(); |
|
246 } |
|
247 |
|
248 /** |
|
249 * Run the provided command if we're active and our delegate queue |
|
250 * is not shut down. |
|
251 */ |
|
252 protected synchronized void executeDelegateCommand(Runnable command) |
|
253 throws InactiveSessionException { |
|
254 if (!isActive() || delegateQueue.isShutdown()) { |
|
255 throw new InactiveSessionException(null); |
|
256 } |
|
257 delegateQueue.execute(command); |
|
258 } |
|
259 |
|
260 public synchronized void ensureActive() throws InactiveSessionException { |
|
261 if (!isActive()) { |
|
262 throw new InactiveSessionException(null); |
|
263 } |
|
264 } |
|
265 |
|
266 public synchronized boolean isActive() { |
|
267 return status == SessionStatus.ACTIVE; |
|
268 } |
|
269 |
|
270 public synchronized SessionStatus getStatus() { |
|
271 return status; |
|
272 } |
|
273 |
|
274 public synchronized void setStatus(SessionStatus status) { |
|
275 this.status = status; |
|
276 } |
|
277 |
|
278 public synchronized void transitionFrom(SessionStatus from, SessionStatus to) throws InvalidSessionTransitionException { |
|
279 if (from == null || this.status == from) { |
|
280 Logger.trace(LOG_TAG, "Successfully transitioning from " + this.status + " to " + to); |
|
281 |
|
282 this.status = to; |
|
283 return; |
|
284 } |
|
285 Logger.warn(LOG_TAG, "Wanted to transition from " + from + " but in state " + this.status); |
|
286 throw new InvalidSessionTransitionException(null); |
|
287 } |
|
288 |
|
289 /** |
|
290 * Produce a record that is some combination of the remote and local records |
|
291 * provided. |
|
292 * |
|
293 * The returned record must be produced without mutating either remoteRecord |
|
294 * or localRecord. It is acceptable to return either remoteRecord or localRecord |
|
295 * if no modifications are to be propagated. |
|
296 * |
|
297 * The returned record *should* have the local androidID and the remote GUID, |
|
298 * and some optional merge of data from the two records. |
|
299 * |
|
300 * This method can be called with records that are identical, or differ in |
|
301 * any regard. |
|
302 * |
|
303 * This method will not be called if: |
|
304 * |
|
305 * * either record is marked as deleted, or |
|
306 * * there is no local mapping for a new remote record. |
|
307 * |
|
308 * Otherwise, it will be called precisely once. |
|
309 * |
|
310 * Side-effects (e.g., for transactional storage) can be hooked in here. |
|
311 * |
|
312 * @param remoteRecord |
|
313 * The record retrieved from upstream, already adjusted for clock skew. |
|
314 * @param localRecord |
|
315 * The record retrieved from local storage. |
|
316 * @param lastRemoteRetrieval |
|
317 * The timestamp of the last retrieved set of remote records, adjusted for |
|
318 * clock skew. |
|
319 * @param lastLocalRetrieval |
|
320 * The timestamp of the last retrieved set of local records. |
|
321 * @return |
|
322 * A Record instance to apply, or null to apply nothing. |
|
323 */ |
|
324 protected Record reconcileRecords(final Record remoteRecord, |
|
325 final Record localRecord, |
|
326 final long lastRemoteRetrieval, |
|
327 final long lastLocalRetrieval) { |
|
328 Logger.debug(LOG_TAG, "Reconciling remote " + remoteRecord.guid + " against local " + localRecord.guid); |
|
329 |
|
330 if (localRecord.equalPayloads(remoteRecord)) { |
|
331 if (remoteRecord.lastModified > localRecord.lastModified) { |
|
332 Logger.debug(LOG_TAG, "Records are equal. No record application needed."); |
|
333 return null; |
|
334 } |
|
335 |
|
336 // Local wins. |
|
337 return null; |
|
338 } |
|
339 |
|
340 // TODO: Decide what to do based on: |
|
341 // * Which of the two records is modified; |
|
342 // * Whether they are equal or congruent; |
|
343 // * The modified times of each record (interpreted through the lens of clock skew); |
|
344 // * ... |
|
345 boolean localIsMoreRecent = localRecord.lastModified > remoteRecord.lastModified; |
|
346 Logger.debug(LOG_TAG, "Local record is more recent? " + localIsMoreRecent); |
|
347 Record donor = localIsMoreRecent ? localRecord : remoteRecord; |
|
348 |
|
349 // Modify the local record to match the remote record's GUID and values. |
|
350 // Preserve the local Android ID, and merge data where possible. |
|
351 // It sure would be nice if copyWithIDs didn't give a shit about androidID, mm? |
|
352 Record out = donor.copyWithIDs(remoteRecord.guid, localRecord.androidID); |
|
353 |
|
354 // We don't want to upload the record if the remote record was |
|
355 // applied without changes. |
|
356 // This logic will become more complicated as reconciling becomes smarter. |
|
357 if (!localIsMoreRecent) { |
|
358 trackGUID(out.guid); |
|
359 } |
|
360 return out; |
|
361 } |
|
362 |
|
363 /** |
|
364 * Depending on the RepositorySession implementation, track |
|
365 * that a record — most likely a brand-new record that has been |
|
366 * applied unmodified — should be tracked so as to not be uploaded |
|
367 * redundantly. |
|
368 * |
|
369 * The default implementations do nothing. |
|
370 */ |
|
371 protected void trackGUID(String guid) { |
|
372 } |
|
373 |
|
374 protected synchronized void untrackGUIDs(Collection<String> guids) { |
|
375 } |
|
376 |
|
377 protected void untrackGUID(String guid) { |
|
378 } |
|
379 |
|
380 // Ah, Java. You wretched creature. |
|
381 public Iterator<String> getTrackedRecordIDs() { |
|
382 return new ArrayList<String>().iterator(); |
|
383 } |
|
384 } |