Wed, 31 Dec 2014 07:22:50 +0100
Correct previous dual key logic pending first delivery installment.
1 /* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
5 package org.mozilla.gecko.sync.repositories;
7 import java.util.ArrayList;
8 import java.util.Collection;
9 import java.util.Iterator;
10 import java.util.concurrent.ExecutorService;
11 import java.util.concurrent.Executors;
13 import org.mozilla.gecko.background.common.log.Logger;
14 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionBeginDelegate;
15 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate;
16 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFinishDelegate;
17 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionGuidsSinceDelegate;
18 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
19 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionWipeDelegate;
20 import org.mozilla.gecko.sync.repositories.domain.Record;
22 /**
23 * A <code>RepositorySession</code> is created and used thusly:
24 *
25 *<ul>
26 * <li>Construct, with a reference to its parent {@link Repository}, by calling
27 * {@link Repository#createSession(RepositorySessionCreationDelegate, android.content.Context)}.</li>
28 * <li>Populate with saved information by calling {@link #unbundle(RepositorySessionBundle)}.</li>
29 * <li>Begin a sync by calling {@link #begin(RepositorySessionBeginDelegate)}. <code>begin()</code>
30 * is an appropriate place to initialize expensive resources.</li>
31 * <li>Perform operations such as {@link #fetchSince(long, RepositorySessionFetchRecordsDelegate)} and
32 * {@link #store(Record)}.</li>
33 * <li>Finish by calling {@link #finish(RepositorySessionFinishDelegate)}, retrieving and storing
34 * the current bundle.</li>
35 *</ul>
36 *
37 * If <code>finish()</code> is not called, {@link #abort()} must be called. These calls must
38 * <em>always</em> be paired with <code>begin()</code>.
39 *
40 */
41 public abstract class RepositorySession {
43 public enum SessionStatus {
44 UNSTARTED,
45 ACTIVE,
46 ABORTED,
47 DONE
48 }
50 private static final String LOG_TAG = "RepositorySession";
52 protected static void trace(String message) {
53 Logger.trace(LOG_TAG, message);
54 }
56 private SessionStatus status = SessionStatus.UNSTARTED;
57 protected Repository repository;
58 protected RepositorySessionStoreDelegate delegate;
60 /**
61 * A queue of Runnables which call out into delegates.
62 */
63 protected ExecutorService delegateQueue = Executors.newSingleThreadExecutor();
65 /**
66 * A queue of Runnables which effect storing.
67 * This includes actual store work, and also the consequences of storeDone.
68 * This provides strict ordering.
69 */
70 protected ExecutorService storeWorkQueue = Executors.newSingleThreadExecutor();
72 // The time that the last sync on this collection completed, in milliseconds since epoch.
73 private long lastSyncTimestamp = 0;
75 public long getLastSyncTimestamp() {
76 return lastSyncTimestamp;
77 }
79 public static long now() {
80 return System.currentTimeMillis();
81 }
83 public RepositorySession(Repository repository) {
84 this.repository = repository;
85 }
87 public abstract void guidsSince(long timestamp, RepositorySessionGuidsSinceDelegate delegate);
88 public abstract void fetchSince(long timestamp, RepositorySessionFetchRecordsDelegate delegate);
89 public abstract void fetch(String[] guids, RepositorySessionFetchRecordsDelegate delegate) throws InactiveSessionException;
90 public abstract void fetchAll(RepositorySessionFetchRecordsDelegate delegate);
92 /**
93 * Override this if you wish to short-circuit a sync when you know --
94 * e.g., by inspecting the database or info/collections -- that no new
95 * data are available.
96 *
97 * @return true if a sync should proceed.
98 */
99 public boolean dataAvailable() {
100 return true;
101 }
103 /**
104 * @return true if we cannot safely sync from this <code>RepositorySession</code>.
105 */
106 public boolean shouldSkip() {
107 return false;
108 }
110 /*
111 * Store operations proceed thusly:
112 *
113 * * Set a delegate
114 * * Store an arbitrary number of records. At any time the delegate can be
115 * notified of an error.
116 * * Call storeDone to notify the session that no more items are forthcoming.
117 * * The store delegate will be notified of error or completion.
118 *
119 * This arrangement of calls allows for batching at the session level.
120 *
121 * Store success calls are not guaranteed.
122 */
123 public void setStoreDelegate(RepositorySessionStoreDelegate delegate) {
124 Logger.debug(LOG_TAG, "Setting store delegate to " + delegate);
125 this.delegate = delegate;
126 }
127 public abstract void store(Record record) throws NoStoreDelegateException;
129 public void storeDone() {
130 // Our default behavior will be to assume that the Runnable is
131 // executed as soon as all the stores synchronously finish, so
132 // our end timestamp can just be… now.
133 storeDone(now());
134 }
136 public void storeDone(final long end) {
137 Logger.debug(LOG_TAG, "Scheduling onStoreCompleted for after storing is done: " + end);
138 Runnable command = new Runnable() {
139 @Override
140 public void run() {
141 delegate.onStoreCompleted(end);
142 }
143 };
144 storeWorkQueue.execute(command);
145 }
147 public abstract void wipe(RepositorySessionWipeDelegate delegate);
149 /**
150 * Synchronously perform the shared work of beginning. Throws on failure.
151 * @throws InvalidSessionTransitionException
152 *
153 */
154 protected void sharedBegin() throws InvalidSessionTransitionException {
155 Logger.debug(LOG_TAG, "Shared begin.");
156 if (delegateQueue.isShutdown()) {
157 throw new InvalidSessionTransitionException(null);
158 }
159 if (storeWorkQueue.isShutdown()) {
160 throw new InvalidSessionTransitionException(null);
161 }
162 this.transitionFrom(SessionStatus.UNSTARTED, SessionStatus.ACTIVE);
163 }
165 /**
166 * Start the session. This is an appropriate place to initialize
167 * data access components such as database handles.
168 *
169 * @param delegate
170 * @throws InvalidSessionTransitionException
171 */
172 public void begin(RepositorySessionBeginDelegate delegate) throws InvalidSessionTransitionException {
173 sharedBegin();
174 delegate.deferredBeginDelegate(delegateQueue).onBeginSucceeded(this);
175 }
177 public void unbundle(RepositorySessionBundle bundle) {
178 this.lastSyncTimestamp = bundle == null ? 0 : bundle.getTimestamp();
179 }
181 /**
182 * Override this in your subclasses to return values to save between sessions.
183 * Note that RepositorySession automatically bumps the timestamp to the time
184 * the last sync began. If unbundled but not begun, this will be the same as the
185 * value in the input bundle.
186 *
187 * The Synchronizer most likely wants to bump the bundle timestamp to be a value
188 * return from a fetch call.
189 */
190 protected RepositorySessionBundle getBundle() {
191 // Why don't we just persist the old bundle?
192 long timestamp = getLastSyncTimestamp();
193 RepositorySessionBundle bundle = new RepositorySessionBundle(timestamp);
194 Logger.debug(LOG_TAG, "Setting bundle timestamp to " + timestamp + ".");
196 return bundle;
197 }
199 /**
200 * Just like finish(), but doesn't do any work that should only be performed
201 * at the end of a successful sync, and can be called any time.
202 */
203 public void abort(RepositorySessionFinishDelegate delegate) {
204 this.abort();
205 delegate.deferredFinishDelegate(delegateQueue).onFinishSucceeded(this, this.getBundle());
206 }
208 /**
209 * Abnormally terminate the repository session, freeing or closing
210 * any resources that were opened during the lifetime of the session.
211 */
212 public void abort() {
213 // TODO: do something here.
214 this.setStatus(SessionStatus.ABORTED);
215 try {
216 storeWorkQueue.shutdownNow();
217 } catch (Exception e) {
218 Logger.error(LOG_TAG, "Caught exception shutting down store work queue.", e);
219 }
220 try {
221 delegateQueue.shutdown();
222 } catch (Exception e) {
223 Logger.error(LOG_TAG, "Caught exception shutting down delegate queue.", e);
224 }
225 }
227 /**
228 * End the repository session, freeing or closing any resources
229 * that were opened during the lifetime of the session.
230 *
231 * @param delegate notified of success or failure.
232 * @throws InactiveSessionException
233 */
234 public void finish(final RepositorySessionFinishDelegate delegate) throws InactiveSessionException {
235 try {
236 this.transitionFrom(SessionStatus.ACTIVE, SessionStatus.DONE);
237 delegate.deferredFinishDelegate(delegateQueue).onFinishSucceeded(this, this.getBundle());
238 } catch (InvalidSessionTransitionException e) {
239 Logger.error(LOG_TAG, "Tried to finish() an unstarted or already finished session");
240 throw new InactiveSessionException(e);
241 }
243 Logger.trace(LOG_TAG, "Shutting down work queues.");
244 storeWorkQueue.shutdown();
245 delegateQueue.shutdown();
246 }
248 /**
249 * Run the provided command if we're active and our delegate queue
250 * is not shut down.
251 */
252 protected synchronized void executeDelegateCommand(Runnable command)
253 throws InactiveSessionException {
254 if (!isActive() || delegateQueue.isShutdown()) {
255 throw new InactiveSessionException(null);
256 }
257 delegateQueue.execute(command);
258 }
260 public synchronized void ensureActive() throws InactiveSessionException {
261 if (!isActive()) {
262 throw new InactiveSessionException(null);
263 }
264 }
266 public synchronized boolean isActive() {
267 return status == SessionStatus.ACTIVE;
268 }
270 public synchronized SessionStatus getStatus() {
271 return status;
272 }
274 public synchronized void setStatus(SessionStatus status) {
275 this.status = status;
276 }
278 public synchronized void transitionFrom(SessionStatus from, SessionStatus to) throws InvalidSessionTransitionException {
279 if (from == null || this.status == from) {
280 Logger.trace(LOG_TAG, "Successfully transitioning from " + this.status + " to " + to);
282 this.status = to;
283 return;
284 }
285 Logger.warn(LOG_TAG, "Wanted to transition from " + from + " but in state " + this.status);
286 throw new InvalidSessionTransitionException(null);
287 }
289 /**
290 * Produce a record that is some combination of the remote and local records
291 * provided.
292 *
293 * The returned record must be produced without mutating either remoteRecord
294 * or localRecord. It is acceptable to return either remoteRecord or localRecord
295 * if no modifications are to be propagated.
296 *
297 * The returned record *should* have the local androidID and the remote GUID,
298 * and some optional merge of data from the two records.
299 *
300 * This method can be called with records that are identical, or differ in
301 * any regard.
302 *
303 * This method will not be called if:
304 *
305 * * either record is marked as deleted, or
306 * * there is no local mapping for a new remote record.
307 *
308 * Otherwise, it will be called precisely once.
309 *
310 * Side-effects (e.g., for transactional storage) can be hooked in here.
311 *
312 * @param remoteRecord
313 * The record retrieved from upstream, already adjusted for clock skew.
314 * @param localRecord
315 * The record retrieved from local storage.
316 * @param lastRemoteRetrieval
317 * The timestamp of the last retrieved set of remote records, adjusted for
318 * clock skew.
319 * @param lastLocalRetrieval
320 * The timestamp of the last retrieved set of local records.
321 * @return
322 * A Record instance to apply, or null to apply nothing.
323 */
324 protected Record reconcileRecords(final Record remoteRecord,
325 final Record localRecord,
326 final long lastRemoteRetrieval,
327 final long lastLocalRetrieval) {
328 Logger.debug(LOG_TAG, "Reconciling remote " + remoteRecord.guid + " against local " + localRecord.guid);
330 if (localRecord.equalPayloads(remoteRecord)) {
331 if (remoteRecord.lastModified > localRecord.lastModified) {
332 Logger.debug(LOG_TAG, "Records are equal. No record application needed.");
333 return null;
334 }
336 // Local wins.
337 return null;
338 }
340 // TODO: Decide what to do based on:
341 // * Which of the two records is modified;
342 // * Whether they are equal or congruent;
343 // * The modified times of each record (interpreted through the lens of clock skew);
344 // * ...
345 boolean localIsMoreRecent = localRecord.lastModified > remoteRecord.lastModified;
346 Logger.debug(LOG_TAG, "Local record is more recent? " + localIsMoreRecent);
347 Record donor = localIsMoreRecent ? localRecord : remoteRecord;
349 // Modify the local record to match the remote record's GUID and values.
350 // Preserve the local Android ID, and merge data where possible.
351 // It sure would be nice if copyWithIDs didn't give a shit about androidID, mm?
352 Record out = donor.copyWithIDs(remoteRecord.guid, localRecord.androidID);
354 // We don't want to upload the record if the remote record was
355 // applied without changes.
356 // This logic will become more complicated as reconciling becomes smarter.
357 if (!localIsMoreRecent) {
358 trackGUID(out.guid);
359 }
360 return out;
361 }
363 /**
364 * Depending on the RepositorySession implementation, track
365 * that a record — most likely a brand-new record that has been
366 * applied unmodified — should be tracked so as to not be uploaded
367 * redundantly.
368 *
369 * The default implementations do nothing.
370 */
371 protected void trackGUID(String guid) {
372 }
374 protected synchronized void untrackGUIDs(Collection<String> guids) {
375 }
377 protected void untrackGUID(String guid) {
378 }
380 // Ah, Java. You wretched creature.
381 public Iterator<String> getTrackedRecordIDs() {
382 return new ArrayList<String>().iterator();
383 }
384 }