mobile/android/base/sync/repositories/RepositorySession.java

changeset 0
6474c204b198
     1.1 --- /dev/null	Thu Jan 01 00:00:00 1970 +0000
     1.2 +++ b/mobile/android/base/sync/repositories/RepositorySession.java	Wed Dec 31 06:09:35 2014 +0100
     1.3 @@ -0,0 +1,384 @@
     1.4 +/* This Source Code Form is subject to the terms of the Mozilla Public
     1.5 + * License, v. 2.0. If a copy of the MPL was not distributed with this
     1.6 + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
     1.7 +
     1.8 +package org.mozilla.gecko.sync.repositories;
     1.9 +
    1.10 +import java.util.ArrayList;
    1.11 +import java.util.Collection;
    1.12 +import java.util.Iterator;
    1.13 +import java.util.concurrent.ExecutorService;
    1.14 +import java.util.concurrent.Executors;
    1.15 +
    1.16 +import org.mozilla.gecko.background.common.log.Logger;
    1.17 +import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionBeginDelegate;
    1.18 +import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate;
    1.19 +import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFinishDelegate;
    1.20 +import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionGuidsSinceDelegate;
    1.21 +import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
    1.22 +import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionWipeDelegate;
    1.23 +import org.mozilla.gecko.sync.repositories.domain.Record;
    1.24 +
    1.25 +/**
    1.26 + * A <code>RepositorySession</code> is created and used thusly:
    1.27 + *
    1.28 + *<ul>
    1.29 + * <li>Construct, with a reference to its parent {@link Repository}, by calling
    1.30 + *   {@link Repository#createSession(RepositorySessionCreationDelegate, android.content.Context)}.</li>
    1.31 + * <li>Populate with saved information by calling {@link #unbundle(RepositorySessionBundle)}.</li>
    1.32 + * <li>Begin a sync by calling {@link #begin(RepositorySessionBeginDelegate)}. <code>begin()</code>
    1.33 + *   is an appropriate place to initialize expensive resources.</li>
    1.34 + * <li>Perform operations such as {@link #fetchSince(long, RepositorySessionFetchRecordsDelegate)} and
    1.35 + *   {@link #store(Record)}.</li>
    1.36 + * <li>Finish by calling {@link #finish(RepositorySessionFinishDelegate)}, retrieving and storing
    1.37 + *   the current bundle.</li>
    1.38 + *</ul>
    1.39 + *
    1.40 + * If <code>finish()</code> is not called, {@link #abort()} must be called. These calls must
    1.41 + * <em>always</em> be paired with <code>begin()</code>.
    1.42 + *
    1.43 + */
    1.44 +public abstract class RepositorySession {
    1.45 +
    1.46 +  public enum SessionStatus {
    1.47 +    UNSTARTED,
    1.48 +    ACTIVE,
    1.49 +    ABORTED,
    1.50 +    DONE
    1.51 +  }
    1.52 +
    1.53 +  private static final String LOG_TAG = "RepositorySession";
    1.54 +
    1.55 +  protected static void trace(String message) {
    1.56 +    Logger.trace(LOG_TAG, message);
    1.57 +  }
    1.58 +
    1.59 +  private SessionStatus status = SessionStatus.UNSTARTED;
    1.60 +  protected Repository repository;
    1.61 +  protected RepositorySessionStoreDelegate delegate;
    1.62 +
    1.63 +  /**
    1.64 +   * A queue of Runnables which call out into delegates.
    1.65 +   */
    1.66 +  protected ExecutorService delegateQueue  = Executors.newSingleThreadExecutor();
    1.67 +
    1.68 +  /**
    1.69 +   * A queue of Runnables which effect storing.
    1.70 +   * This includes actual store work, and also the consequences of storeDone.
    1.71 +   * This provides strict ordering.
    1.72 +   */
    1.73 +  protected ExecutorService storeWorkQueue = Executors.newSingleThreadExecutor();
    1.74 +
    1.75 +  // The time that the last sync on this collection completed, in milliseconds since epoch.
    1.76 +  private long lastSyncTimestamp = 0;
    1.77 +
    1.78 +  public long getLastSyncTimestamp() {
    1.79 +    return lastSyncTimestamp;
    1.80 +  }
    1.81 +
    1.82 +  public static long now() {
    1.83 +    return System.currentTimeMillis();
    1.84 +  }
    1.85 +
    1.86 +  public RepositorySession(Repository repository) {
    1.87 +    this.repository = repository;
    1.88 +  }
    1.89 +
    1.90 +  public abstract void guidsSince(long timestamp, RepositorySessionGuidsSinceDelegate delegate);
    1.91 +  public abstract void fetchSince(long timestamp, RepositorySessionFetchRecordsDelegate delegate);
    1.92 +  public abstract void fetch(String[] guids, RepositorySessionFetchRecordsDelegate delegate) throws InactiveSessionException;
    1.93 +  public abstract void fetchAll(RepositorySessionFetchRecordsDelegate delegate);
    1.94 +
    1.95 +  /**
    1.96 +   * Override this if you wish to short-circuit a sync when you know --
    1.97 +   * e.g., by inspecting the database or info/collections -- that no new
    1.98 +   * data are available.
    1.99 +   *
   1.100 +   * @return true if a sync should proceed.
   1.101 +   */
   1.102 +  public boolean dataAvailable() {
   1.103 +    return true;
   1.104 +  }
   1.105 +
   1.106 +  /**
   1.107 +   * @return true if we cannot safely sync from this <code>RepositorySession</code>.
   1.108 +   */
   1.109 +  public boolean shouldSkip() {
   1.110 +    return false;
   1.111 +  }
   1.112 +
   1.113 +  /*
   1.114 +   * Store operations proceed thusly:
   1.115 +   *
   1.116 +   * * Set a delegate
   1.117 +   * * Store an arbitrary number of records. At any time the delegate can be
   1.118 +   *   notified of an error.
   1.119 +   * * Call storeDone to notify the session that no more items are forthcoming.
   1.120 +   * * The store delegate will be notified of error or completion.
   1.121 +   *
   1.122 +   * This arrangement of calls allows for batching at the session level.
   1.123 +   *
   1.124 +   * Store success calls are not guaranteed.
   1.125 +   */
   1.126 +  public void setStoreDelegate(RepositorySessionStoreDelegate delegate) {
   1.127 +    Logger.debug(LOG_TAG, "Setting store delegate to " + delegate);
   1.128 +    this.delegate = delegate;
   1.129 +  }
   1.130 +  public abstract void store(Record record) throws NoStoreDelegateException;
   1.131 +
   1.132 +  public void storeDone() {
   1.133 +    // Our default behavior will be to assume that the Runnable is
   1.134 +    // executed as soon as all the stores synchronously finish, so
   1.135 +    // our end timestamp can just be… now.
   1.136 +    storeDone(now());
   1.137 +  }
   1.138 +
   1.139 +  public void storeDone(final long end) {
   1.140 +    Logger.debug(LOG_TAG, "Scheduling onStoreCompleted for after storing is done: " + end);
   1.141 +    Runnable command = new Runnable() {
   1.142 +      @Override
   1.143 +      public void run() {
   1.144 +        delegate.onStoreCompleted(end);
   1.145 +      }
   1.146 +    };
   1.147 +    storeWorkQueue.execute(command);
   1.148 +  }
   1.149 +
   1.150 +  public abstract void wipe(RepositorySessionWipeDelegate delegate);
   1.151 +
   1.152 +  /**
   1.153 +   * Synchronously perform the shared work of beginning. Throws on failure.
   1.154 +   * @throws InvalidSessionTransitionException
   1.155 +   *
   1.156 +   */
   1.157 +  protected void sharedBegin() throws InvalidSessionTransitionException {
   1.158 +    Logger.debug(LOG_TAG, "Shared begin.");
   1.159 +    if (delegateQueue.isShutdown()) {
   1.160 +      throw new InvalidSessionTransitionException(null);
   1.161 +    }
   1.162 +    if (storeWorkQueue.isShutdown()) {
   1.163 +      throw new InvalidSessionTransitionException(null);
   1.164 +    }
   1.165 +    this.transitionFrom(SessionStatus.UNSTARTED, SessionStatus.ACTIVE);
   1.166 +  }
   1.167 +
   1.168 +  /**
   1.169 +   * Start the session. This is an appropriate place to initialize
   1.170 +   * data access components such as database handles.
   1.171 +   *
   1.172 +   * @param delegate
   1.173 +   * @throws InvalidSessionTransitionException
   1.174 +   */
   1.175 +  public void begin(RepositorySessionBeginDelegate delegate) throws InvalidSessionTransitionException {
   1.176 +    sharedBegin();
   1.177 +    delegate.deferredBeginDelegate(delegateQueue).onBeginSucceeded(this);
   1.178 +  }
   1.179 +
   1.180 +  public void unbundle(RepositorySessionBundle bundle) {
   1.181 +    this.lastSyncTimestamp = bundle == null ? 0 : bundle.getTimestamp();
   1.182 +  }
   1.183 +
   1.184 +  /**
   1.185 +   * Override this in your subclasses to return values to save between sessions.
   1.186 +   * Note that RepositorySession automatically bumps the timestamp to the time
   1.187 +   * the last sync began. If unbundled but not begun, this will be the same as the
   1.188 +   * value in the input bundle.
   1.189 +   *
   1.190 +   * The Synchronizer most likely wants to bump the bundle timestamp to be a value
   1.191 +   * return from a fetch call.
   1.192 +   */
   1.193 +  protected RepositorySessionBundle getBundle() {
   1.194 +    // Why don't we just persist the old bundle?
   1.195 +    long timestamp = getLastSyncTimestamp();
   1.196 +    RepositorySessionBundle bundle = new RepositorySessionBundle(timestamp);
   1.197 +    Logger.debug(LOG_TAG, "Setting bundle timestamp to " + timestamp + ".");
   1.198 +
   1.199 +    return bundle;
   1.200 +  }
   1.201 +
   1.202 +  /**
   1.203 +   * Just like finish(), but doesn't do any work that should only be performed
   1.204 +   * at the end of a successful sync, and can be called any time.
   1.205 +   */
   1.206 +  public void abort(RepositorySessionFinishDelegate delegate) {
   1.207 +    this.abort();
   1.208 +    delegate.deferredFinishDelegate(delegateQueue).onFinishSucceeded(this, this.getBundle());
   1.209 +  }
   1.210 +
   1.211 +  /**
   1.212 +   * Abnormally terminate the repository session, freeing or closing
   1.213 +   * any resources that were opened during the lifetime of the session.
   1.214 +   */
   1.215 +  public void abort() {
   1.216 +    // TODO: do something here.
   1.217 +    this.setStatus(SessionStatus.ABORTED);
   1.218 +    try {
   1.219 +      storeWorkQueue.shutdownNow();
   1.220 +    } catch (Exception e) {
   1.221 +      Logger.error(LOG_TAG, "Caught exception shutting down store work queue.", e);
   1.222 +    }
   1.223 +    try {
   1.224 +      delegateQueue.shutdown();
   1.225 +    } catch (Exception e) {
   1.226 +      Logger.error(LOG_TAG, "Caught exception shutting down delegate queue.", e);
   1.227 +    }
   1.228 +  }
   1.229 +
   1.230 +  /**
   1.231 +   * End the repository session, freeing or closing any resources
   1.232 +   * that were opened during the lifetime of the session.
   1.233 +   *
   1.234 +   * @param delegate notified of success or failure.
   1.235 +   * @throws InactiveSessionException
   1.236 +   */
   1.237 +  public void finish(final RepositorySessionFinishDelegate delegate) throws InactiveSessionException {
   1.238 +    try {
   1.239 +      this.transitionFrom(SessionStatus.ACTIVE, SessionStatus.DONE);
   1.240 +      delegate.deferredFinishDelegate(delegateQueue).onFinishSucceeded(this, this.getBundle());
   1.241 +    } catch (InvalidSessionTransitionException e) {
   1.242 +      Logger.error(LOG_TAG, "Tried to finish() an unstarted or already finished session");
   1.243 +      throw new InactiveSessionException(e);
   1.244 +    }
   1.245 +
   1.246 +    Logger.trace(LOG_TAG, "Shutting down work queues.");
   1.247 +    storeWorkQueue.shutdown();
   1.248 +    delegateQueue.shutdown();
   1.249 +  }
   1.250 +
   1.251 +  /**
   1.252 +   * Run the provided command if we're active and our delegate queue
   1.253 +   * is not shut down.
   1.254 +   */
   1.255 +  protected synchronized void executeDelegateCommand(Runnable command)
   1.256 +      throws InactiveSessionException {
   1.257 +    if (!isActive() || delegateQueue.isShutdown()) {
   1.258 +      throw new InactiveSessionException(null);
   1.259 +    }
   1.260 +    delegateQueue.execute(command);
   1.261 +  }
   1.262 +
   1.263 +  public synchronized void ensureActive() throws InactiveSessionException {
   1.264 +    if (!isActive()) {
   1.265 +      throw new InactiveSessionException(null);
   1.266 +    }
   1.267 +  }
   1.268 +
   1.269 +  public synchronized boolean isActive() {
   1.270 +    return status == SessionStatus.ACTIVE;
   1.271 +  }
   1.272 +
   1.273 +  public synchronized SessionStatus getStatus() {
   1.274 +    return status;
   1.275 +  }
   1.276 +
   1.277 +  public synchronized void setStatus(SessionStatus status) {
   1.278 +    this.status = status;
   1.279 +  }
   1.280 +
   1.281 +  public synchronized void transitionFrom(SessionStatus from, SessionStatus to) throws InvalidSessionTransitionException {
   1.282 +    if (from == null || this.status == from) {
   1.283 +      Logger.trace(LOG_TAG, "Successfully transitioning from " + this.status + " to " + to);
   1.284 +
   1.285 +      this.status = to;
   1.286 +      return;
   1.287 +    }
   1.288 +    Logger.warn(LOG_TAG, "Wanted to transition from " + from + " but in state " + this.status);
   1.289 +    throw new InvalidSessionTransitionException(null);
   1.290 +  }
   1.291 +
   1.292 +  /**
   1.293 +   * Produce a record that is some combination of the remote and local records
   1.294 +   * provided.
   1.295 +   *
   1.296 +   * The returned record must be produced without mutating either remoteRecord
   1.297 +   * or localRecord. It is acceptable to return either remoteRecord or localRecord
   1.298 +   * if no modifications are to be propagated.
   1.299 +   *
   1.300 +   * The returned record *should* have the local androidID and the remote GUID,
   1.301 +   * and some optional merge of data from the two records.
   1.302 +   *
   1.303 +   * This method can be called with records that are identical, or differ in
   1.304 +   * any regard.
   1.305 +   *
   1.306 +   * This method will not be called if:
   1.307 +   *
   1.308 +   * * either record is marked as deleted, or
   1.309 +   * * there is no local mapping for a new remote record.
   1.310 +   *
   1.311 +   * Otherwise, it will be called precisely once.
   1.312 +   *
   1.313 +   * Side-effects (e.g., for transactional storage) can be hooked in here.
   1.314 +   *
   1.315 +   * @param remoteRecord
   1.316 +   *        The record retrieved from upstream, already adjusted for clock skew.
   1.317 +   * @param localRecord
   1.318 +   *        The record retrieved from local storage.
   1.319 +   * @param lastRemoteRetrieval
   1.320 +   *        The timestamp of the last retrieved set of remote records, adjusted for
   1.321 +   *        clock skew.
   1.322 +   * @param lastLocalRetrieval
   1.323 +   *        The timestamp of the last retrieved set of local records.
   1.324 +   * @return
   1.325 +   *        A Record instance to apply, or null to apply nothing.
   1.326 +   */
   1.327 +  protected Record reconcileRecords(final Record remoteRecord,
   1.328 +                                    final Record localRecord,
   1.329 +                                    final long lastRemoteRetrieval,
   1.330 +                                    final long lastLocalRetrieval) {
   1.331 +    Logger.debug(LOG_TAG, "Reconciling remote " + remoteRecord.guid + " against local " + localRecord.guid);
   1.332 +
   1.333 +    if (localRecord.equalPayloads(remoteRecord)) {
   1.334 +      if (remoteRecord.lastModified > localRecord.lastModified) {
   1.335 +        Logger.debug(LOG_TAG, "Records are equal. No record application needed.");
   1.336 +        return null;
   1.337 +      }
   1.338 +
   1.339 +      // Local wins.
   1.340 +      return null;
   1.341 +    }
   1.342 +
   1.343 +    // TODO: Decide what to do based on:
   1.344 +    // * Which of the two records is modified;
   1.345 +    // * Whether they are equal or congruent;
   1.346 +    // * The modified times of each record (interpreted through the lens of clock skew);
   1.347 +    // * ...
   1.348 +    boolean localIsMoreRecent = localRecord.lastModified > remoteRecord.lastModified;
   1.349 +    Logger.debug(LOG_TAG, "Local record is more recent? " + localIsMoreRecent);
   1.350 +    Record donor = localIsMoreRecent ? localRecord : remoteRecord;
   1.351 +
   1.352 +    // Modify the local record to match the remote record's GUID and values.
   1.353 +    // Preserve the local Android ID, and merge data where possible.
   1.354 +    // It sure would be nice if copyWithIDs didn't give a shit about androidID, mm?
   1.355 +    Record out = donor.copyWithIDs(remoteRecord.guid, localRecord.androidID);
   1.356 +
   1.357 +    // We don't want to upload the record if the remote record was
   1.358 +    // applied without changes.
   1.359 +    // This logic will become more complicated as reconciling becomes smarter.
   1.360 +    if (!localIsMoreRecent) {
   1.361 +      trackGUID(out.guid);
   1.362 +    }
   1.363 +    return out;
   1.364 +  }
   1.365 +
   1.366 +  /**
   1.367 +   * Depending on the RepositorySession implementation, track
   1.368 +   * that a record — most likely a brand-new record that has been
   1.369 +   * applied unmodified — should be tracked so as to not be uploaded
   1.370 +   * redundantly.
   1.371 +   *
   1.372 +   * The default implementations do nothing.
   1.373 +   */
   1.374 +  protected void trackGUID(String guid) {
   1.375 +  }
   1.376 +
   1.377 +  protected synchronized void untrackGUIDs(Collection<String> guids) {
   1.378 +  }
   1.379 +
   1.380 +  protected void untrackGUID(String guid) {
   1.381 +  }
   1.382 +
   1.383 +  // Ah, Java. You wretched creature.
   1.384 +  public Iterator<String> getTrackedRecordIDs() {
   1.385 +    return new ArrayList<String>().iterator();
   1.386 +  }
   1.387 +}

mercurial