mobile/android/base/sync/repositories/RepositorySession.java

Wed, 31 Dec 2014 07:22:50 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Wed, 31 Dec 2014 07:22:50 +0100
branch
TOR_BUG_3246
changeset 4
fc2d59ddac77
permissions
-rw-r--r--

Correct previous dual key logic pending first delivery installment.

michael@0 1 /* This Source Code Form is subject to the terms of the Mozilla Public
michael@0 2 * License, v. 2.0. If a copy of the MPL was not distributed with this
michael@0 3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
michael@0 4
michael@0 5 package org.mozilla.gecko.sync.repositories;
michael@0 6
michael@0 7 import java.util.ArrayList;
michael@0 8 import java.util.Collection;
michael@0 9 import java.util.Iterator;
michael@0 10 import java.util.concurrent.ExecutorService;
michael@0 11 import java.util.concurrent.Executors;
michael@0 12
michael@0 13 import org.mozilla.gecko.background.common.log.Logger;
michael@0 14 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionBeginDelegate;
michael@0 15 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate;
michael@0 16 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFinishDelegate;
michael@0 17 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionGuidsSinceDelegate;
michael@0 18 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
michael@0 19 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionWipeDelegate;
michael@0 20 import org.mozilla.gecko.sync.repositories.domain.Record;
michael@0 21
michael@0 22 /**
michael@0 23 * A <code>RepositorySession</code> is created and used thusly:
michael@0 24 *
michael@0 25 *<ul>
michael@0 26 * <li>Construct, with a reference to its parent {@link Repository}, by calling
michael@0 27 * {@link Repository#createSession(RepositorySessionCreationDelegate, android.content.Context)}.</li>
michael@0 28 * <li>Populate with saved information by calling {@link #unbundle(RepositorySessionBundle)}.</li>
michael@0 29 * <li>Begin a sync by calling {@link #begin(RepositorySessionBeginDelegate)}. <code>begin()</code>
michael@0 30 * is an appropriate place to initialize expensive resources.</li>
michael@0 31 * <li>Perform operations such as {@link #fetchSince(long, RepositorySessionFetchRecordsDelegate)} and
michael@0 32 * {@link #store(Record)}.</li>
michael@0 33 * <li>Finish by calling {@link #finish(RepositorySessionFinishDelegate)}, retrieving and storing
michael@0 34 * the current bundle.</li>
michael@0 35 *</ul>
michael@0 36 *
michael@0 37 * If <code>finish()</code> is not called, {@link #abort()} must be called. These calls must
michael@0 38 * <em>always</em> be paired with <code>begin()</code>.
michael@0 39 *
michael@0 40 */
michael@0 41 public abstract class RepositorySession {
michael@0 42
michael@0 43 public enum SessionStatus {
michael@0 44 UNSTARTED,
michael@0 45 ACTIVE,
michael@0 46 ABORTED,
michael@0 47 DONE
michael@0 48 }
michael@0 49
michael@0 50 private static final String LOG_TAG = "RepositorySession";
michael@0 51
michael@0 52 protected static void trace(String message) {
michael@0 53 Logger.trace(LOG_TAG, message);
michael@0 54 }
michael@0 55
michael@0 56 private SessionStatus status = SessionStatus.UNSTARTED;
michael@0 57 protected Repository repository;
michael@0 58 protected RepositorySessionStoreDelegate delegate;
michael@0 59
michael@0 60 /**
michael@0 61 * A queue of Runnables which call out into delegates.
michael@0 62 */
michael@0 63 protected ExecutorService delegateQueue = Executors.newSingleThreadExecutor();
michael@0 64
michael@0 65 /**
michael@0 66 * A queue of Runnables which effect storing.
michael@0 67 * This includes actual store work, and also the consequences of storeDone.
michael@0 68 * This provides strict ordering.
michael@0 69 */
michael@0 70 protected ExecutorService storeWorkQueue = Executors.newSingleThreadExecutor();
michael@0 71
michael@0 72 // The time that the last sync on this collection completed, in milliseconds since epoch.
michael@0 73 private long lastSyncTimestamp = 0;
michael@0 74
michael@0 75 public long getLastSyncTimestamp() {
michael@0 76 return lastSyncTimestamp;
michael@0 77 }
michael@0 78
michael@0 79 public static long now() {
michael@0 80 return System.currentTimeMillis();
michael@0 81 }
michael@0 82
michael@0 83 public RepositorySession(Repository repository) {
michael@0 84 this.repository = repository;
michael@0 85 }
michael@0 86
michael@0 87 public abstract void guidsSince(long timestamp, RepositorySessionGuidsSinceDelegate delegate);
michael@0 88 public abstract void fetchSince(long timestamp, RepositorySessionFetchRecordsDelegate delegate);
michael@0 89 public abstract void fetch(String[] guids, RepositorySessionFetchRecordsDelegate delegate) throws InactiveSessionException;
michael@0 90 public abstract void fetchAll(RepositorySessionFetchRecordsDelegate delegate);
michael@0 91
michael@0 92 /**
michael@0 93 * Override this if you wish to short-circuit a sync when you know --
michael@0 94 * e.g., by inspecting the database or info/collections -- that no new
michael@0 95 * data are available.
michael@0 96 *
michael@0 97 * @return true if a sync should proceed.
michael@0 98 */
michael@0 99 public boolean dataAvailable() {
michael@0 100 return true;
michael@0 101 }
michael@0 102
michael@0 103 /**
michael@0 104 * @return true if we cannot safely sync from this <code>RepositorySession</code>.
michael@0 105 */
michael@0 106 public boolean shouldSkip() {
michael@0 107 return false;
michael@0 108 }
michael@0 109
michael@0 110 /*
michael@0 111 * Store operations proceed thusly:
michael@0 112 *
michael@0 113 * * Set a delegate
michael@0 114 * * Store an arbitrary number of records. At any time the delegate can be
michael@0 115 * notified of an error.
michael@0 116 * * Call storeDone to notify the session that no more items are forthcoming.
michael@0 117 * * The store delegate will be notified of error or completion.
michael@0 118 *
michael@0 119 * This arrangement of calls allows for batching at the session level.
michael@0 120 *
michael@0 121 * Store success calls are not guaranteed.
michael@0 122 */
michael@0 123 public void setStoreDelegate(RepositorySessionStoreDelegate delegate) {
michael@0 124 Logger.debug(LOG_TAG, "Setting store delegate to " + delegate);
michael@0 125 this.delegate = delegate;
michael@0 126 }
michael@0 127 public abstract void store(Record record) throws NoStoreDelegateException;
michael@0 128
michael@0 129 public void storeDone() {
michael@0 130 // Our default behavior will be to assume that the Runnable is
michael@0 131 // executed as soon as all the stores synchronously finish, so
michael@0 132 // our end timestamp can just be… now.
michael@0 133 storeDone(now());
michael@0 134 }
michael@0 135
michael@0 136 public void storeDone(final long end) {
michael@0 137 Logger.debug(LOG_TAG, "Scheduling onStoreCompleted for after storing is done: " + end);
michael@0 138 Runnable command = new Runnable() {
michael@0 139 @Override
michael@0 140 public void run() {
michael@0 141 delegate.onStoreCompleted(end);
michael@0 142 }
michael@0 143 };
michael@0 144 storeWorkQueue.execute(command);
michael@0 145 }
michael@0 146
michael@0 147 public abstract void wipe(RepositorySessionWipeDelegate delegate);
michael@0 148
michael@0 149 /**
michael@0 150 * Synchronously perform the shared work of beginning. Throws on failure.
michael@0 151 * @throws InvalidSessionTransitionException
michael@0 152 *
michael@0 153 */
michael@0 154 protected void sharedBegin() throws InvalidSessionTransitionException {
michael@0 155 Logger.debug(LOG_TAG, "Shared begin.");
michael@0 156 if (delegateQueue.isShutdown()) {
michael@0 157 throw new InvalidSessionTransitionException(null);
michael@0 158 }
michael@0 159 if (storeWorkQueue.isShutdown()) {
michael@0 160 throw new InvalidSessionTransitionException(null);
michael@0 161 }
michael@0 162 this.transitionFrom(SessionStatus.UNSTARTED, SessionStatus.ACTIVE);
michael@0 163 }
michael@0 164
michael@0 165 /**
michael@0 166 * Start the session. This is an appropriate place to initialize
michael@0 167 * data access components such as database handles.
michael@0 168 *
michael@0 169 * @param delegate
michael@0 170 * @throws InvalidSessionTransitionException
michael@0 171 */
michael@0 172 public void begin(RepositorySessionBeginDelegate delegate) throws InvalidSessionTransitionException {
michael@0 173 sharedBegin();
michael@0 174 delegate.deferredBeginDelegate(delegateQueue).onBeginSucceeded(this);
michael@0 175 }
michael@0 176
michael@0 177 public void unbundle(RepositorySessionBundle bundle) {
michael@0 178 this.lastSyncTimestamp = bundle == null ? 0 : bundle.getTimestamp();
michael@0 179 }
michael@0 180
michael@0 181 /**
michael@0 182 * Override this in your subclasses to return values to save between sessions.
michael@0 183 * Note that RepositorySession automatically bumps the timestamp to the time
michael@0 184 * the last sync began. If unbundled but not begun, this will be the same as the
michael@0 185 * value in the input bundle.
michael@0 186 *
michael@0 187 * The Synchronizer most likely wants to bump the bundle timestamp to be a value
michael@0 188 * return from a fetch call.
michael@0 189 */
michael@0 190 protected RepositorySessionBundle getBundle() {
michael@0 191 // Why don't we just persist the old bundle?
michael@0 192 long timestamp = getLastSyncTimestamp();
michael@0 193 RepositorySessionBundle bundle = new RepositorySessionBundle(timestamp);
michael@0 194 Logger.debug(LOG_TAG, "Setting bundle timestamp to " + timestamp + ".");
michael@0 195
michael@0 196 return bundle;
michael@0 197 }
michael@0 198
michael@0 199 /**
michael@0 200 * Just like finish(), but doesn't do any work that should only be performed
michael@0 201 * at the end of a successful sync, and can be called any time.
michael@0 202 */
michael@0 203 public void abort(RepositorySessionFinishDelegate delegate) {
michael@0 204 this.abort();
michael@0 205 delegate.deferredFinishDelegate(delegateQueue).onFinishSucceeded(this, this.getBundle());
michael@0 206 }
michael@0 207
michael@0 208 /**
michael@0 209 * Abnormally terminate the repository session, freeing or closing
michael@0 210 * any resources that were opened during the lifetime of the session.
michael@0 211 */
michael@0 212 public void abort() {
michael@0 213 // TODO: do something here.
michael@0 214 this.setStatus(SessionStatus.ABORTED);
michael@0 215 try {
michael@0 216 storeWorkQueue.shutdownNow();
michael@0 217 } catch (Exception e) {
michael@0 218 Logger.error(LOG_TAG, "Caught exception shutting down store work queue.", e);
michael@0 219 }
michael@0 220 try {
michael@0 221 delegateQueue.shutdown();
michael@0 222 } catch (Exception e) {
michael@0 223 Logger.error(LOG_TAG, "Caught exception shutting down delegate queue.", e);
michael@0 224 }
michael@0 225 }
michael@0 226
michael@0 227 /**
michael@0 228 * End the repository session, freeing or closing any resources
michael@0 229 * that were opened during the lifetime of the session.
michael@0 230 *
michael@0 231 * @param delegate notified of success or failure.
michael@0 232 * @throws InactiveSessionException
michael@0 233 */
michael@0 234 public void finish(final RepositorySessionFinishDelegate delegate) throws InactiveSessionException {
michael@0 235 try {
michael@0 236 this.transitionFrom(SessionStatus.ACTIVE, SessionStatus.DONE);
michael@0 237 delegate.deferredFinishDelegate(delegateQueue).onFinishSucceeded(this, this.getBundle());
michael@0 238 } catch (InvalidSessionTransitionException e) {
michael@0 239 Logger.error(LOG_TAG, "Tried to finish() an unstarted or already finished session");
michael@0 240 throw new InactiveSessionException(e);
michael@0 241 }
michael@0 242
michael@0 243 Logger.trace(LOG_TAG, "Shutting down work queues.");
michael@0 244 storeWorkQueue.shutdown();
michael@0 245 delegateQueue.shutdown();
michael@0 246 }
michael@0 247
michael@0 248 /**
michael@0 249 * Run the provided command if we're active and our delegate queue
michael@0 250 * is not shut down.
michael@0 251 */
michael@0 252 protected synchronized void executeDelegateCommand(Runnable command)
michael@0 253 throws InactiveSessionException {
michael@0 254 if (!isActive() || delegateQueue.isShutdown()) {
michael@0 255 throw new InactiveSessionException(null);
michael@0 256 }
michael@0 257 delegateQueue.execute(command);
michael@0 258 }
michael@0 259
michael@0 260 public synchronized void ensureActive() throws InactiveSessionException {
michael@0 261 if (!isActive()) {
michael@0 262 throw new InactiveSessionException(null);
michael@0 263 }
michael@0 264 }
michael@0 265
michael@0 266 public synchronized boolean isActive() {
michael@0 267 return status == SessionStatus.ACTIVE;
michael@0 268 }
michael@0 269
michael@0 270 public synchronized SessionStatus getStatus() {
michael@0 271 return status;
michael@0 272 }
michael@0 273
michael@0 274 public synchronized void setStatus(SessionStatus status) {
michael@0 275 this.status = status;
michael@0 276 }
michael@0 277
michael@0 278 public synchronized void transitionFrom(SessionStatus from, SessionStatus to) throws InvalidSessionTransitionException {
michael@0 279 if (from == null || this.status == from) {
michael@0 280 Logger.trace(LOG_TAG, "Successfully transitioning from " + this.status + " to " + to);
michael@0 281
michael@0 282 this.status = to;
michael@0 283 return;
michael@0 284 }
michael@0 285 Logger.warn(LOG_TAG, "Wanted to transition from " + from + " but in state " + this.status);
michael@0 286 throw new InvalidSessionTransitionException(null);
michael@0 287 }
michael@0 288
michael@0 289 /**
michael@0 290 * Produce a record that is some combination of the remote and local records
michael@0 291 * provided.
michael@0 292 *
michael@0 293 * The returned record must be produced without mutating either remoteRecord
michael@0 294 * or localRecord. It is acceptable to return either remoteRecord or localRecord
michael@0 295 * if no modifications are to be propagated.
michael@0 296 *
michael@0 297 * The returned record *should* have the local androidID and the remote GUID,
michael@0 298 * and some optional merge of data from the two records.
michael@0 299 *
michael@0 300 * This method can be called with records that are identical, or differ in
michael@0 301 * any regard.
michael@0 302 *
michael@0 303 * This method will not be called if:
michael@0 304 *
michael@0 305 * * either record is marked as deleted, or
michael@0 306 * * there is no local mapping for a new remote record.
michael@0 307 *
michael@0 308 * Otherwise, it will be called precisely once.
michael@0 309 *
michael@0 310 * Side-effects (e.g., for transactional storage) can be hooked in here.
michael@0 311 *
michael@0 312 * @param remoteRecord
michael@0 313 * The record retrieved from upstream, already adjusted for clock skew.
michael@0 314 * @param localRecord
michael@0 315 * The record retrieved from local storage.
michael@0 316 * @param lastRemoteRetrieval
michael@0 317 * The timestamp of the last retrieved set of remote records, adjusted for
michael@0 318 * clock skew.
michael@0 319 * @param lastLocalRetrieval
michael@0 320 * The timestamp of the last retrieved set of local records.
michael@0 321 * @return
michael@0 322 * A Record instance to apply, or null to apply nothing.
michael@0 323 */
michael@0 324 protected Record reconcileRecords(final Record remoteRecord,
michael@0 325 final Record localRecord,
michael@0 326 final long lastRemoteRetrieval,
michael@0 327 final long lastLocalRetrieval) {
michael@0 328 Logger.debug(LOG_TAG, "Reconciling remote " + remoteRecord.guid + " against local " + localRecord.guid);
michael@0 329
michael@0 330 if (localRecord.equalPayloads(remoteRecord)) {
michael@0 331 if (remoteRecord.lastModified > localRecord.lastModified) {
michael@0 332 Logger.debug(LOG_TAG, "Records are equal. No record application needed.");
michael@0 333 return null;
michael@0 334 }
michael@0 335
michael@0 336 // Local wins.
michael@0 337 return null;
michael@0 338 }
michael@0 339
michael@0 340 // TODO: Decide what to do based on:
michael@0 341 // * Which of the two records is modified;
michael@0 342 // * Whether they are equal or congruent;
michael@0 343 // * The modified times of each record (interpreted through the lens of clock skew);
michael@0 344 // * ...
michael@0 345 boolean localIsMoreRecent = localRecord.lastModified > remoteRecord.lastModified;
michael@0 346 Logger.debug(LOG_TAG, "Local record is more recent? " + localIsMoreRecent);
michael@0 347 Record donor = localIsMoreRecent ? localRecord : remoteRecord;
michael@0 348
michael@0 349 // Modify the local record to match the remote record's GUID and values.
michael@0 350 // Preserve the local Android ID, and merge data where possible.
michael@0 351 // It sure would be nice if copyWithIDs didn't give a shit about androidID, mm?
michael@0 352 Record out = donor.copyWithIDs(remoteRecord.guid, localRecord.androidID);
michael@0 353
michael@0 354 // We don't want to upload the record if the remote record was
michael@0 355 // applied without changes.
michael@0 356 // This logic will become more complicated as reconciling becomes smarter.
michael@0 357 if (!localIsMoreRecent) {
michael@0 358 trackGUID(out.guid);
michael@0 359 }
michael@0 360 return out;
michael@0 361 }
michael@0 362
michael@0 363 /**
michael@0 364 * Depending on the RepositorySession implementation, track
michael@0 365 * that a record — most likely a brand-new record that has been
michael@0 366 * applied unmodified — should be tracked so as to not be uploaded
michael@0 367 * redundantly.
michael@0 368 *
michael@0 369 * The default implementations do nothing.
michael@0 370 */
michael@0 371 protected void trackGUID(String guid) {
michael@0 372 }
michael@0 373
michael@0 374 protected synchronized void untrackGUIDs(Collection<String> guids) {
michael@0 375 }
michael@0 376
michael@0 377 protected void untrackGUID(String guid) {
michael@0 378 }
michael@0 379
michael@0 380 // Ah, Java. You wretched creature.
michael@0 381 public Iterator<String> getTrackedRecordIDs() {
michael@0 382 return new ArrayList<String>().iterator();
michael@0 383 }
michael@0 384 }

mercurial