michael@0: /* This Source Code Form is subject to the terms of the Mozilla Public
michael@0: * License, v. 2.0. If a copy of the MPL was not distributed with this
michael@0: * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
michael@0:
michael@0: package org.mozilla.gecko.sync.repositories;
michael@0:
michael@0: import java.util.ArrayList;
michael@0: import java.util.Collection;
michael@0: import java.util.Iterator;
michael@0: import java.util.concurrent.ExecutorService;
michael@0: import java.util.concurrent.Executors;
michael@0:
michael@0: import org.mozilla.gecko.background.common.log.Logger;
michael@0: import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionBeginDelegate;
michael@0: import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate;
michael@0: import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFinishDelegate;
michael@0: import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionGuidsSinceDelegate;
michael@0: import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
michael@0: import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionWipeDelegate;
michael@0: import org.mozilla.gecko.sync.repositories.domain.Record;
michael@0:
michael@0: /**
michael@0: * A RepositorySession
is created and used thusly:
michael@0: *
michael@0: *
begin()
michael@0: * is an appropriate place to initialize expensive resources.finish()
is not called, {@link #abort()} must be called. These calls must
michael@0: * always be paired with begin()
.
michael@0: *
michael@0: */
michael@0: public abstract class RepositorySession {
michael@0:
michael@0: public enum SessionStatus {
michael@0: UNSTARTED,
michael@0: ACTIVE,
michael@0: ABORTED,
michael@0: DONE
michael@0: }
michael@0:
michael@0: private static final String LOG_TAG = "RepositorySession";
michael@0:
michael@0: protected static void trace(String message) {
michael@0: Logger.trace(LOG_TAG, message);
michael@0: }
michael@0:
michael@0: private SessionStatus status = SessionStatus.UNSTARTED;
michael@0: protected Repository repository;
michael@0: protected RepositorySessionStoreDelegate delegate;
michael@0:
michael@0: /**
michael@0: * A queue of Runnables which call out into delegates.
michael@0: */
michael@0: protected ExecutorService delegateQueue = Executors.newSingleThreadExecutor();
michael@0:
michael@0: /**
michael@0: * A queue of Runnables which effect storing.
michael@0: * This includes actual store work, and also the consequences of storeDone.
michael@0: * This provides strict ordering.
michael@0: */
michael@0: protected ExecutorService storeWorkQueue = Executors.newSingleThreadExecutor();
michael@0:
michael@0: // The time that the last sync on this collection completed, in milliseconds since epoch.
michael@0: private long lastSyncTimestamp = 0;
michael@0:
michael@0: public long getLastSyncTimestamp() {
michael@0: return lastSyncTimestamp;
michael@0: }
michael@0:
michael@0: public static long now() {
michael@0: return System.currentTimeMillis();
michael@0: }
michael@0:
michael@0: public RepositorySession(Repository repository) {
michael@0: this.repository = repository;
michael@0: }
michael@0:
michael@0: public abstract void guidsSince(long timestamp, RepositorySessionGuidsSinceDelegate delegate);
michael@0: public abstract void fetchSince(long timestamp, RepositorySessionFetchRecordsDelegate delegate);
michael@0: public abstract void fetch(String[] guids, RepositorySessionFetchRecordsDelegate delegate) throws InactiveSessionException;
michael@0: public abstract void fetchAll(RepositorySessionFetchRecordsDelegate delegate);
michael@0:
michael@0: /**
michael@0: * Override this if you wish to short-circuit a sync when you know --
michael@0: * e.g., by inspecting the database or info/collections -- that no new
michael@0: * data are available.
michael@0: *
michael@0: * @return true if a sync should proceed.
michael@0: */
michael@0: public boolean dataAvailable() {
michael@0: return true;
michael@0: }
michael@0:
michael@0: /**
michael@0: * @return true if we cannot safely sync from this RepositorySession
.
michael@0: */
michael@0: public boolean shouldSkip() {
michael@0: return false;
michael@0: }
michael@0:
michael@0: /*
michael@0: * Store operations proceed thusly:
michael@0: *
michael@0: * * Set a delegate
michael@0: * * Store an arbitrary number of records. At any time the delegate can be
michael@0: * notified of an error.
michael@0: * * Call storeDone to notify the session that no more items are forthcoming.
michael@0: * * The store delegate will be notified of error or completion.
michael@0: *
michael@0: * This arrangement of calls allows for batching at the session level.
michael@0: *
michael@0: * Store success calls are not guaranteed.
michael@0: */
michael@0: public void setStoreDelegate(RepositorySessionStoreDelegate delegate) {
michael@0: Logger.debug(LOG_TAG, "Setting store delegate to " + delegate);
michael@0: this.delegate = delegate;
michael@0: }
michael@0: public abstract void store(Record record) throws NoStoreDelegateException;
michael@0:
michael@0: public void storeDone() {
michael@0: // Our default behavior will be to assume that the Runnable is
michael@0: // executed as soon as all the stores synchronously finish, so
michael@0: // our end timestamp can just be… now.
michael@0: storeDone(now());
michael@0: }
michael@0:
michael@0: public void storeDone(final long end) {
michael@0: Logger.debug(LOG_TAG, "Scheduling onStoreCompleted for after storing is done: " + end);
michael@0: Runnable command = new Runnable() {
michael@0: @Override
michael@0: public void run() {
michael@0: delegate.onStoreCompleted(end);
michael@0: }
michael@0: };
michael@0: storeWorkQueue.execute(command);
michael@0: }
michael@0:
michael@0: public abstract void wipe(RepositorySessionWipeDelegate delegate);
michael@0:
michael@0: /**
michael@0: * Synchronously perform the shared work of beginning. Throws on failure.
michael@0: * @throws InvalidSessionTransitionException
michael@0: *
michael@0: */
michael@0: protected void sharedBegin() throws InvalidSessionTransitionException {
michael@0: Logger.debug(LOG_TAG, "Shared begin.");
michael@0: if (delegateQueue.isShutdown()) {
michael@0: throw new InvalidSessionTransitionException(null);
michael@0: }
michael@0: if (storeWorkQueue.isShutdown()) {
michael@0: throw new InvalidSessionTransitionException(null);
michael@0: }
michael@0: this.transitionFrom(SessionStatus.UNSTARTED, SessionStatus.ACTIVE);
michael@0: }
michael@0:
michael@0: /**
michael@0: * Start the session. This is an appropriate place to initialize
michael@0: * data access components such as database handles.
michael@0: *
michael@0: * @param delegate
michael@0: * @throws InvalidSessionTransitionException
michael@0: */
michael@0: public void begin(RepositorySessionBeginDelegate delegate) throws InvalidSessionTransitionException {
michael@0: sharedBegin();
michael@0: delegate.deferredBeginDelegate(delegateQueue).onBeginSucceeded(this);
michael@0: }
michael@0:
michael@0: public void unbundle(RepositorySessionBundle bundle) {
michael@0: this.lastSyncTimestamp = bundle == null ? 0 : bundle.getTimestamp();
michael@0: }
michael@0:
michael@0: /**
michael@0: * Override this in your subclasses to return values to save between sessions.
michael@0: * Note that RepositorySession automatically bumps the timestamp to the time
michael@0: * the last sync began. If unbundled but not begun, this will be the same as the
michael@0: * value in the input bundle.
michael@0: *
michael@0: * The Synchronizer most likely wants to bump the bundle timestamp to be a value
michael@0: * return from a fetch call.
michael@0: */
michael@0: protected RepositorySessionBundle getBundle() {
michael@0: // Why don't we just persist the old bundle?
michael@0: long timestamp = getLastSyncTimestamp();
michael@0: RepositorySessionBundle bundle = new RepositorySessionBundle(timestamp);
michael@0: Logger.debug(LOG_TAG, "Setting bundle timestamp to " + timestamp + ".");
michael@0:
michael@0: return bundle;
michael@0: }
michael@0:
michael@0: /**
michael@0: * Just like finish(), but doesn't do any work that should only be performed
michael@0: * at the end of a successful sync, and can be called any time.
michael@0: */
michael@0: public void abort(RepositorySessionFinishDelegate delegate) {
michael@0: this.abort();
michael@0: delegate.deferredFinishDelegate(delegateQueue).onFinishSucceeded(this, this.getBundle());
michael@0: }
michael@0:
michael@0: /**
michael@0: * Abnormally terminate the repository session, freeing or closing
michael@0: * any resources that were opened during the lifetime of the session.
michael@0: */
michael@0: public void abort() {
michael@0: // TODO: do something here.
michael@0: this.setStatus(SessionStatus.ABORTED);
michael@0: try {
michael@0: storeWorkQueue.shutdownNow();
michael@0: } catch (Exception e) {
michael@0: Logger.error(LOG_TAG, "Caught exception shutting down store work queue.", e);
michael@0: }
michael@0: try {
michael@0: delegateQueue.shutdown();
michael@0: } catch (Exception e) {
michael@0: Logger.error(LOG_TAG, "Caught exception shutting down delegate queue.", e);
michael@0: }
michael@0: }
michael@0:
michael@0: /**
michael@0: * End the repository session, freeing or closing any resources
michael@0: * that were opened during the lifetime of the session.
michael@0: *
michael@0: * @param delegate notified of success or failure.
michael@0: * @throws InactiveSessionException
michael@0: */
michael@0: public void finish(final RepositorySessionFinishDelegate delegate) throws InactiveSessionException {
michael@0: try {
michael@0: this.transitionFrom(SessionStatus.ACTIVE, SessionStatus.DONE);
michael@0: delegate.deferredFinishDelegate(delegateQueue).onFinishSucceeded(this, this.getBundle());
michael@0: } catch (InvalidSessionTransitionException e) {
michael@0: Logger.error(LOG_TAG, "Tried to finish() an unstarted or already finished session");
michael@0: throw new InactiveSessionException(e);
michael@0: }
michael@0:
michael@0: Logger.trace(LOG_TAG, "Shutting down work queues.");
michael@0: storeWorkQueue.shutdown();
michael@0: delegateQueue.shutdown();
michael@0: }
michael@0:
michael@0: /**
michael@0: * Run the provided command if we're active and our delegate queue
michael@0: * is not shut down.
michael@0: */
michael@0: protected synchronized void executeDelegateCommand(Runnable command)
michael@0: throws InactiveSessionException {
michael@0: if (!isActive() || delegateQueue.isShutdown()) {
michael@0: throw new InactiveSessionException(null);
michael@0: }
michael@0: delegateQueue.execute(command);
michael@0: }
michael@0:
michael@0: public synchronized void ensureActive() throws InactiveSessionException {
michael@0: if (!isActive()) {
michael@0: throw new InactiveSessionException(null);
michael@0: }
michael@0: }
michael@0:
michael@0: public synchronized boolean isActive() {
michael@0: return status == SessionStatus.ACTIVE;
michael@0: }
michael@0:
michael@0: public synchronized SessionStatus getStatus() {
michael@0: return status;
michael@0: }
michael@0:
michael@0: public synchronized void setStatus(SessionStatus status) {
michael@0: this.status = status;
michael@0: }
michael@0:
michael@0: public synchronized void transitionFrom(SessionStatus from, SessionStatus to) throws InvalidSessionTransitionException {
michael@0: if (from == null || this.status == from) {
michael@0: Logger.trace(LOG_TAG, "Successfully transitioning from " + this.status + " to " + to);
michael@0:
michael@0: this.status = to;
michael@0: return;
michael@0: }
michael@0: Logger.warn(LOG_TAG, "Wanted to transition from " + from + " but in state " + this.status);
michael@0: throw new InvalidSessionTransitionException(null);
michael@0: }
michael@0:
michael@0: /**
michael@0: * Produce a record that is some combination of the remote and local records
michael@0: * provided.
michael@0: *
michael@0: * The returned record must be produced without mutating either remoteRecord
michael@0: * or localRecord. It is acceptable to return either remoteRecord or localRecord
michael@0: * if no modifications are to be propagated.
michael@0: *
michael@0: * The returned record *should* have the local androidID and the remote GUID,
michael@0: * and some optional merge of data from the two records.
michael@0: *
michael@0: * This method can be called with records that are identical, or differ in
michael@0: * any regard.
michael@0: *
michael@0: * This method will not be called if:
michael@0: *
michael@0: * * either record is marked as deleted, or
michael@0: * * there is no local mapping for a new remote record.
michael@0: *
michael@0: * Otherwise, it will be called precisely once.
michael@0: *
michael@0: * Side-effects (e.g., for transactional storage) can be hooked in here.
michael@0: *
michael@0: * @param remoteRecord
michael@0: * The record retrieved from upstream, already adjusted for clock skew.
michael@0: * @param localRecord
michael@0: * The record retrieved from local storage.
michael@0: * @param lastRemoteRetrieval
michael@0: * The timestamp of the last retrieved set of remote records, adjusted for
michael@0: * clock skew.
michael@0: * @param lastLocalRetrieval
michael@0: * The timestamp of the last retrieved set of local records.
michael@0: * @return
michael@0: * A Record instance to apply, or null to apply nothing.
michael@0: */
michael@0: protected Record reconcileRecords(final Record remoteRecord,
michael@0: final Record localRecord,
michael@0: final long lastRemoteRetrieval,
michael@0: final long lastLocalRetrieval) {
michael@0: Logger.debug(LOG_TAG, "Reconciling remote " + remoteRecord.guid + " against local " + localRecord.guid);
michael@0:
michael@0: if (localRecord.equalPayloads(remoteRecord)) {
michael@0: if (remoteRecord.lastModified > localRecord.lastModified) {
michael@0: Logger.debug(LOG_TAG, "Records are equal. No record application needed.");
michael@0: return null;
michael@0: }
michael@0:
michael@0: // Local wins.
michael@0: return null;
michael@0: }
michael@0:
michael@0: // TODO: Decide what to do based on:
michael@0: // * Which of the two records is modified;
michael@0: // * Whether they are equal or congruent;
michael@0: // * The modified times of each record (interpreted through the lens of clock skew);
michael@0: // * ...
michael@0: boolean localIsMoreRecent = localRecord.lastModified > remoteRecord.lastModified;
michael@0: Logger.debug(LOG_TAG, "Local record is more recent? " + localIsMoreRecent);
michael@0: Record donor = localIsMoreRecent ? localRecord : remoteRecord;
michael@0:
michael@0: // Modify the local record to match the remote record's GUID and values.
michael@0: // Preserve the local Android ID, and merge data where possible.
michael@0: // It sure would be nice if copyWithIDs didn't give a shit about androidID, mm?
michael@0: Record out = donor.copyWithIDs(remoteRecord.guid, localRecord.androidID);
michael@0:
michael@0: // We don't want to upload the record if the remote record was
michael@0: // applied without changes.
michael@0: // This logic will become more complicated as reconciling becomes smarter.
michael@0: if (!localIsMoreRecent) {
michael@0: trackGUID(out.guid);
michael@0: }
michael@0: return out;
michael@0: }
michael@0:
michael@0: /**
michael@0: * Depending on the RepositorySession implementation, track
michael@0: * that a record — most likely a brand-new record that has been
michael@0: * applied unmodified — should be tracked so as to not be uploaded
michael@0: * redundantly.
michael@0: *
michael@0: * The default implementations do nothing.
michael@0: */
michael@0: protected void trackGUID(String guid) {
michael@0: }
michael@0:
michael@0: protected synchronized void untrackGUIDs(Collection