michael@0: /* This Source Code Form is subject to the terms of the Mozilla Public michael@0: * License, v. 2.0. If a copy of the MPL was not distributed with this michael@0: * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ michael@0: michael@0: package org.mozilla.gecko.sync.repositories; michael@0: michael@0: import java.util.ArrayList; michael@0: import java.util.Collection; michael@0: import java.util.Iterator; michael@0: import java.util.concurrent.ExecutorService; michael@0: import java.util.concurrent.Executors; michael@0: michael@0: import org.mozilla.gecko.background.common.log.Logger; michael@0: import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionBeginDelegate; michael@0: import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate; michael@0: import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFinishDelegate; michael@0: import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionGuidsSinceDelegate; michael@0: import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate; michael@0: import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionWipeDelegate; michael@0: import org.mozilla.gecko.sync.repositories.domain.Record; michael@0: michael@0: /** michael@0: * A RepositorySession is created and used thusly: michael@0: * michael@0: * michael@0: * michael@0: * If finish() is not called, {@link #abort()} must be called. These calls must michael@0: * always be paired with begin(). michael@0: * michael@0: */ michael@0: public abstract class RepositorySession { michael@0: michael@0: public enum SessionStatus { michael@0: UNSTARTED, michael@0: ACTIVE, michael@0: ABORTED, michael@0: DONE michael@0: } michael@0: michael@0: private static final String LOG_TAG = "RepositorySession"; michael@0: michael@0: protected static void trace(String message) { michael@0: Logger.trace(LOG_TAG, message); michael@0: } michael@0: michael@0: private SessionStatus status = SessionStatus.UNSTARTED; michael@0: protected Repository repository; michael@0: protected RepositorySessionStoreDelegate delegate; michael@0: michael@0: /** michael@0: * A queue of Runnables which call out into delegates. michael@0: */ michael@0: protected ExecutorService delegateQueue = Executors.newSingleThreadExecutor(); michael@0: michael@0: /** michael@0: * A queue of Runnables which effect storing. michael@0: * This includes actual store work, and also the consequences of storeDone. michael@0: * This provides strict ordering. michael@0: */ michael@0: protected ExecutorService storeWorkQueue = Executors.newSingleThreadExecutor(); michael@0: michael@0: // The time that the last sync on this collection completed, in milliseconds since epoch. michael@0: private long lastSyncTimestamp = 0; michael@0: michael@0: public long getLastSyncTimestamp() { michael@0: return lastSyncTimestamp; michael@0: } michael@0: michael@0: public static long now() { michael@0: return System.currentTimeMillis(); michael@0: } michael@0: michael@0: public RepositorySession(Repository repository) { michael@0: this.repository = repository; michael@0: } michael@0: michael@0: public abstract void guidsSince(long timestamp, RepositorySessionGuidsSinceDelegate delegate); michael@0: public abstract void fetchSince(long timestamp, RepositorySessionFetchRecordsDelegate delegate); michael@0: public abstract void fetch(String[] guids, RepositorySessionFetchRecordsDelegate delegate) throws InactiveSessionException; michael@0: public abstract void fetchAll(RepositorySessionFetchRecordsDelegate delegate); michael@0: michael@0: /** michael@0: * Override this if you wish to short-circuit a sync when you know -- michael@0: * e.g., by inspecting the database or info/collections -- that no new michael@0: * data are available. michael@0: * michael@0: * @return true if a sync should proceed. michael@0: */ michael@0: public boolean dataAvailable() { michael@0: return true; michael@0: } michael@0: michael@0: /** michael@0: * @return true if we cannot safely sync from this RepositorySession. michael@0: */ michael@0: public boolean shouldSkip() { michael@0: return false; michael@0: } michael@0: michael@0: /* michael@0: * Store operations proceed thusly: michael@0: * michael@0: * * Set a delegate michael@0: * * Store an arbitrary number of records. At any time the delegate can be michael@0: * notified of an error. michael@0: * * Call storeDone to notify the session that no more items are forthcoming. michael@0: * * The store delegate will be notified of error or completion. michael@0: * michael@0: * This arrangement of calls allows for batching at the session level. michael@0: * michael@0: * Store success calls are not guaranteed. michael@0: */ michael@0: public void setStoreDelegate(RepositorySessionStoreDelegate delegate) { michael@0: Logger.debug(LOG_TAG, "Setting store delegate to " + delegate); michael@0: this.delegate = delegate; michael@0: } michael@0: public abstract void store(Record record) throws NoStoreDelegateException; michael@0: michael@0: public void storeDone() { michael@0: // Our default behavior will be to assume that the Runnable is michael@0: // executed as soon as all the stores synchronously finish, so michael@0: // our end timestamp can just be… now. michael@0: storeDone(now()); michael@0: } michael@0: michael@0: public void storeDone(final long end) { michael@0: Logger.debug(LOG_TAG, "Scheduling onStoreCompleted for after storing is done: " + end); michael@0: Runnable command = new Runnable() { michael@0: @Override michael@0: public void run() { michael@0: delegate.onStoreCompleted(end); michael@0: } michael@0: }; michael@0: storeWorkQueue.execute(command); michael@0: } michael@0: michael@0: public abstract void wipe(RepositorySessionWipeDelegate delegate); michael@0: michael@0: /** michael@0: * Synchronously perform the shared work of beginning. Throws on failure. michael@0: * @throws InvalidSessionTransitionException michael@0: * michael@0: */ michael@0: protected void sharedBegin() throws InvalidSessionTransitionException { michael@0: Logger.debug(LOG_TAG, "Shared begin."); michael@0: if (delegateQueue.isShutdown()) { michael@0: throw new InvalidSessionTransitionException(null); michael@0: } michael@0: if (storeWorkQueue.isShutdown()) { michael@0: throw new InvalidSessionTransitionException(null); michael@0: } michael@0: this.transitionFrom(SessionStatus.UNSTARTED, SessionStatus.ACTIVE); michael@0: } michael@0: michael@0: /** michael@0: * Start the session. This is an appropriate place to initialize michael@0: * data access components such as database handles. michael@0: * michael@0: * @param delegate michael@0: * @throws InvalidSessionTransitionException michael@0: */ michael@0: public void begin(RepositorySessionBeginDelegate delegate) throws InvalidSessionTransitionException { michael@0: sharedBegin(); michael@0: delegate.deferredBeginDelegate(delegateQueue).onBeginSucceeded(this); michael@0: } michael@0: michael@0: public void unbundle(RepositorySessionBundle bundle) { michael@0: this.lastSyncTimestamp = bundle == null ? 0 : bundle.getTimestamp(); michael@0: } michael@0: michael@0: /** michael@0: * Override this in your subclasses to return values to save between sessions. michael@0: * Note that RepositorySession automatically bumps the timestamp to the time michael@0: * the last sync began. If unbundled but not begun, this will be the same as the michael@0: * value in the input bundle. michael@0: * michael@0: * The Synchronizer most likely wants to bump the bundle timestamp to be a value michael@0: * return from a fetch call. michael@0: */ michael@0: protected RepositorySessionBundle getBundle() { michael@0: // Why don't we just persist the old bundle? michael@0: long timestamp = getLastSyncTimestamp(); michael@0: RepositorySessionBundle bundle = new RepositorySessionBundle(timestamp); michael@0: Logger.debug(LOG_TAG, "Setting bundle timestamp to " + timestamp + "."); michael@0: michael@0: return bundle; michael@0: } michael@0: michael@0: /** michael@0: * Just like finish(), but doesn't do any work that should only be performed michael@0: * at the end of a successful sync, and can be called any time. michael@0: */ michael@0: public void abort(RepositorySessionFinishDelegate delegate) { michael@0: this.abort(); michael@0: delegate.deferredFinishDelegate(delegateQueue).onFinishSucceeded(this, this.getBundle()); michael@0: } michael@0: michael@0: /** michael@0: * Abnormally terminate the repository session, freeing or closing michael@0: * any resources that were opened during the lifetime of the session. michael@0: */ michael@0: public void abort() { michael@0: // TODO: do something here. michael@0: this.setStatus(SessionStatus.ABORTED); michael@0: try { michael@0: storeWorkQueue.shutdownNow(); michael@0: } catch (Exception e) { michael@0: Logger.error(LOG_TAG, "Caught exception shutting down store work queue.", e); michael@0: } michael@0: try { michael@0: delegateQueue.shutdown(); michael@0: } catch (Exception e) { michael@0: Logger.error(LOG_TAG, "Caught exception shutting down delegate queue.", e); michael@0: } michael@0: } michael@0: michael@0: /** michael@0: * End the repository session, freeing or closing any resources michael@0: * that were opened during the lifetime of the session. michael@0: * michael@0: * @param delegate notified of success or failure. michael@0: * @throws InactiveSessionException michael@0: */ michael@0: public void finish(final RepositorySessionFinishDelegate delegate) throws InactiveSessionException { michael@0: try { michael@0: this.transitionFrom(SessionStatus.ACTIVE, SessionStatus.DONE); michael@0: delegate.deferredFinishDelegate(delegateQueue).onFinishSucceeded(this, this.getBundle()); michael@0: } catch (InvalidSessionTransitionException e) { michael@0: Logger.error(LOG_TAG, "Tried to finish() an unstarted or already finished session"); michael@0: throw new InactiveSessionException(e); michael@0: } michael@0: michael@0: Logger.trace(LOG_TAG, "Shutting down work queues."); michael@0: storeWorkQueue.shutdown(); michael@0: delegateQueue.shutdown(); michael@0: } michael@0: michael@0: /** michael@0: * Run the provided command if we're active and our delegate queue michael@0: * is not shut down. michael@0: */ michael@0: protected synchronized void executeDelegateCommand(Runnable command) michael@0: throws InactiveSessionException { michael@0: if (!isActive() || delegateQueue.isShutdown()) { michael@0: throw new InactiveSessionException(null); michael@0: } michael@0: delegateQueue.execute(command); michael@0: } michael@0: michael@0: public synchronized void ensureActive() throws InactiveSessionException { michael@0: if (!isActive()) { michael@0: throw new InactiveSessionException(null); michael@0: } michael@0: } michael@0: michael@0: public synchronized boolean isActive() { michael@0: return status == SessionStatus.ACTIVE; michael@0: } michael@0: michael@0: public synchronized SessionStatus getStatus() { michael@0: return status; michael@0: } michael@0: michael@0: public synchronized void setStatus(SessionStatus status) { michael@0: this.status = status; michael@0: } michael@0: michael@0: public synchronized void transitionFrom(SessionStatus from, SessionStatus to) throws InvalidSessionTransitionException { michael@0: if (from == null || this.status == from) { michael@0: Logger.trace(LOG_TAG, "Successfully transitioning from " + this.status + " to " + to); michael@0: michael@0: this.status = to; michael@0: return; michael@0: } michael@0: Logger.warn(LOG_TAG, "Wanted to transition from " + from + " but in state " + this.status); michael@0: throw new InvalidSessionTransitionException(null); michael@0: } michael@0: michael@0: /** michael@0: * Produce a record that is some combination of the remote and local records michael@0: * provided. michael@0: * michael@0: * The returned record must be produced without mutating either remoteRecord michael@0: * or localRecord. It is acceptable to return either remoteRecord or localRecord michael@0: * if no modifications are to be propagated. michael@0: * michael@0: * The returned record *should* have the local androidID and the remote GUID, michael@0: * and some optional merge of data from the two records. michael@0: * michael@0: * This method can be called with records that are identical, or differ in michael@0: * any regard. michael@0: * michael@0: * This method will not be called if: michael@0: * michael@0: * * either record is marked as deleted, or michael@0: * * there is no local mapping for a new remote record. michael@0: * michael@0: * Otherwise, it will be called precisely once. michael@0: * michael@0: * Side-effects (e.g., for transactional storage) can be hooked in here. michael@0: * michael@0: * @param remoteRecord michael@0: * The record retrieved from upstream, already adjusted for clock skew. michael@0: * @param localRecord michael@0: * The record retrieved from local storage. michael@0: * @param lastRemoteRetrieval michael@0: * The timestamp of the last retrieved set of remote records, adjusted for michael@0: * clock skew. michael@0: * @param lastLocalRetrieval michael@0: * The timestamp of the last retrieved set of local records. michael@0: * @return michael@0: * A Record instance to apply, or null to apply nothing. michael@0: */ michael@0: protected Record reconcileRecords(final Record remoteRecord, michael@0: final Record localRecord, michael@0: final long lastRemoteRetrieval, michael@0: final long lastLocalRetrieval) { michael@0: Logger.debug(LOG_TAG, "Reconciling remote " + remoteRecord.guid + " against local " + localRecord.guid); michael@0: michael@0: if (localRecord.equalPayloads(remoteRecord)) { michael@0: if (remoteRecord.lastModified > localRecord.lastModified) { michael@0: Logger.debug(LOG_TAG, "Records are equal. No record application needed."); michael@0: return null; michael@0: } michael@0: michael@0: // Local wins. michael@0: return null; michael@0: } michael@0: michael@0: // TODO: Decide what to do based on: michael@0: // * Which of the two records is modified; michael@0: // * Whether they are equal or congruent; michael@0: // * The modified times of each record (interpreted through the lens of clock skew); michael@0: // * ... michael@0: boolean localIsMoreRecent = localRecord.lastModified > remoteRecord.lastModified; michael@0: Logger.debug(LOG_TAG, "Local record is more recent? " + localIsMoreRecent); michael@0: Record donor = localIsMoreRecent ? localRecord : remoteRecord; michael@0: michael@0: // Modify the local record to match the remote record's GUID and values. michael@0: // Preserve the local Android ID, and merge data where possible. michael@0: // It sure would be nice if copyWithIDs didn't give a shit about androidID, mm? michael@0: Record out = donor.copyWithIDs(remoteRecord.guid, localRecord.androidID); michael@0: michael@0: // We don't want to upload the record if the remote record was michael@0: // applied without changes. michael@0: // This logic will become more complicated as reconciling becomes smarter. michael@0: if (!localIsMoreRecent) { michael@0: trackGUID(out.guid); michael@0: } michael@0: return out; michael@0: } michael@0: michael@0: /** michael@0: * Depending on the RepositorySession implementation, track michael@0: * that a record — most likely a brand-new record that has been michael@0: * applied unmodified — should be tracked so as to not be uploaded michael@0: * redundantly. michael@0: * michael@0: * The default implementations do nothing. michael@0: */ michael@0: protected void trackGUID(String guid) { michael@0: } michael@0: michael@0: protected synchronized void untrackGUIDs(Collection guids) { michael@0: } michael@0: michael@0: protected void untrackGUID(String guid) { michael@0: } michael@0: michael@0: // Ah, Java. You wretched creature. michael@0: public Iterator getTrackedRecordIDs() { michael@0: return new ArrayList().iterator(); michael@0: } michael@0: }