1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 1.2 +++ b/mobile/android/base/sync/repositories/RepositorySession.java Wed Dec 31 06:09:35 2014 +0100 1.3 @@ -0,0 +1,384 @@ 1.4 +/* This Source Code Form is subject to the terms of the Mozilla Public 1.5 + * License, v. 2.0. If a copy of the MPL was not distributed with this 1.6 + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 1.7 + 1.8 +package org.mozilla.gecko.sync.repositories; 1.9 + 1.10 +import java.util.ArrayList; 1.11 +import java.util.Collection; 1.12 +import java.util.Iterator; 1.13 +import java.util.concurrent.ExecutorService; 1.14 +import java.util.concurrent.Executors; 1.15 + 1.16 +import org.mozilla.gecko.background.common.log.Logger; 1.17 +import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionBeginDelegate; 1.18 +import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate; 1.19 +import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFinishDelegate; 1.20 +import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionGuidsSinceDelegate; 1.21 +import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate; 1.22 +import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionWipeDelegate; 1.23 +import org.mozilla.gecko.sync.repositories.domain.Record; 1.24 + 1.25 +/** 1.26 + * A <code>RepositorySession</code> is created and used thusly: 1.27 + * 1.28 + *<ul> 1.29 + * <li>Construct, with a reference to its parent {@link Repository}, by calling 1.30 + * {@link Repository#createSession(RepositorySessionCreationDelegate, android.content.Context)}.</li> 1.31 + * <li>Populate with saved information by calling {@link #unbundle(RepositorySessionBundle)}.</li> 1.32 + * <li>Begin a sync by calling {@link #begin(RepositorySessionBeginDelegate)}. <code>begin()</code> 1.33 + * is an appropriate place to initialize expensive resources.</li> 1.34 + * <li>Perform operations such as {@link #fetchSince(long, RepositorySessionFetchRecordsDelegate)} and 1.35 + * {@link #store(Record)}.</li> 1.36 + * <li>Finish by calling {@link #finish(RepositorySessionFinishDelegate)}, retrieving and storing 1.37 + * the current bundle.</li> 1.38 + *</ul> 1.39 + * 1.40 + * If <code>finish()</code> is not called, {@link #abort()} must be called. These calls must 1.41 + * <em>always</em> be paired with <code>begin()</code>. 1.42 + * 1.43 + */ 1.44 +public abstract class RepositorySession { 1.45 + 1.46 + public enum SessionStatus { 1.47 + UNSTARTED, 1.48 + ACTIVE, 1.49 + ABORTED, 1.50 + DONE 1.51 + } 1.52 + 1.53 + private static final String LOG_TAG = "RepositorySession"; 1.54 + 1.55 + protected static void trace(String message) { 1.56 + Logger.trace(LOG_TAG, message); 1.57 + } 1.58 + 1.59 + private SessionStatus status = SessionStatus.UNSTARTED; 1.60 + protected Repository repository; 1.61 + protected RepositorySessionStoreDelegate delegate; 1.62 + 1.63 + /** 1.64 + * A queue of Runnables which call out into delegates. 1.65 + */ 1.66 + protected ExecutorService delegateQueue = Executors.newSingleThreadExecutor(); 1.67 + 1.68 + /** 1.69 + * A queue of Runnables which effect storing. 1.70 + * This includes actual store work, and also the consequences of storeDone. 1.71 + * This provides strict ordering. 1.72 + */ 1.73 + protected ExecutorService storeWorkQueue = Executors.newSingleThreadExecutor(); 1.74 + 1.75 + // The time that the last sync on this collection completed, in milliseconds since epoch. 1.76 + private long lastSyncTimestamp = 0; 1.77 + 1.78 + public long getLastSyncTimestamp() { 1.79 + return lastSyncTimestamp; 1.80 + } 1.81 + 1.82 + public static long now() { 1.83 + return System.currentTimeMillis(); 1.84 + } 1.85 + 1.86 + public RepositorySession(Repository repository) { 1.87 + this.repository = repository; 1.88 + } 1.89 + 1.90 + public abstract void guidsSince(long timestamp, RepositorySessionGuidsSinceDelegate delegate); 1.91 + public abstract void fetchSince(long timestamp, RepositorySessionFetchRecordsDelegate delegate); 1.92 + public abstract void fetch(String[] guids, RepositorySessionFetchRecordsDelegate delegate) throws InactiveSessionException; 1.93 + public abstract void fetchAll(RepositorySessionFetchRecordsDelegate delegate); 1.94 + 1.95 + /** 1.96 + * Override this if you wish to short-circuit a sync when you know -- 1.97 + * e.g., by inspecting the database or info/collections -- that no new 1.98 + * data are available. 1.99 + * 1.100 + * @return true if a sync should proceed. 1.101 + */ 1.102 + public boolean dataAvailable() { 1.103 + return true; 1.104 + } 1.105 + 1.106 + /** 1.107 + * @return true if we cannot safely sync from this <code>RepositorySession</code>. 1.108 + */ 1.109 + public boolean shouldSkip() { 1.110 + return false; 1.111 + } 1.112 + 1.113 + /* 1.114 + * Store operations proceed thusly: 1.115 + * 1.116 + * * Set a delegate 1.117 + * * Store an arbitrary number of records. At any time the delegate can be 1.118 + * notified of an error. 1.119 + * * Call storeDone to notify the session that no more items are forthcoming. 1.120 + * * The store delegate will be notified of error or completion. 1.121 + * 1.122 + * This arrangement of calls allows for batching at the session level. 1.123 + * 1.124 + * Store success calls are not guaranteed. 1.125 + */ 1.126 + public void setStoreDelegate(RepositorySessionStoreDelegate delegate) { 1.127 + Logger.debug(LOG_TAG, "Setting store delegate to " + delegate); 1.128 + this.delegate = delegate; 1.129 + } 1.130 + public abstract void store(Record record) throws NoStoreDelegateException; 1.131 + 1.132 + public void storeDone() { 1.133 + // Our default behavior will be to assume that the Runnable is 1.134 + // executed as soon as all the stores synchronously finish, so 1.135 + // our end timestamp can just be… now. 1.136 + storeDone(now()); 1.137 + } 1.138 + 1.139 + public void storeDone(final long end) { 1.140 + Logger.debug(LOG_TAG, "Scheduling onStoreCompleted for after storing is done: " + end); 1.141 + Runnable command = new Runnable() { 1.142 + @Override 1.143 + public void run() { 1.144 + delegate.onStoreCompleted(end); 1.145 + } 1.146 + }; 1.147 + storeWorkQueue.execute(command); 1.148 + } 1.149 + 1.150 + public abstract void wipe(RepositorySessionWipeDelegate delegate); 1.151 + 1.152 + /** 1.153 + * Synchronously perform the shared work of beginning. Throws on failure. 1.154 + * @throws InvalidSessionTransitionException 1.155 + * 1.156 + */ 1.157 + protected void sharedBegin() throws InvalidSessionTransitionException { 1.158 + Logger.debug(LOG_TAG, "Shared begin."); 1.159 + if (delegateQueue.isShutdown()) { 1.160 + throw new InvalidSessionTransitionException(null); 1.161 + } 1.162 + if (storeWorkQueue.isShutdown()) { 1.163 + throw new InvalidSessionTransitionException(null); 1.164 + } 1.165 + this.transitionFrom(SessionStatus.UNSTARTED, SessionStatus.ACTIVE); 1.166 + } 1.167 + 1.168 + /** 1.169 + * Start the session. This is an appropriate place to initialize 1.170 + * data access components such as database handles. 1.171 + * 1.172 + * @param delegate 1.173 + * @throws InvalidSessionTransitionException 1.174 + */ 1.175 + public void begin(RepositorySessionBeginDelegate delegate) throws InvalidSessionTransitionException { 1.176 + sharedBegin(); 1.177 + delegate.deferredBeginDelegate(delegateQueue).onBeginSucceeded(this); 1.178 + } 1.179 + 1.180 + public void unbundle(RepositorySessionBundle bundle) { 1.181 + this.lastSyncTimestamp = bundle == null ? 0 : bundle.getTimestamp(); 1.182 + } 1.183 + 1.184 + /** 1.185 + * Override this in your subclasses to return values to save between sessions. 1.186 + * Note that RepositorySession automatically bumps the timestamp to the time 1.187 + * the last sync began. If unbundled but not begun, this will be the same as the 1.188 + * value in the input bundle. 1.189 + * 1.190 + * The Synchronizer most likely wants to bump the bundle timestamp to be a value 1.191 + * return from a fetch call. 1.192 + */ 1.193 + protected RepositorySessionBundle getBundle() { 1.194 + // Why don't we just persist the old bundle? 1.195 + long timestamp = getLastSyncTimestamp(); 1.196 + RepositorySessionBundle bundle = new RepositorySessionBundle(timestamp); 1.197 + Logger.debug(LOG_TAG, "Setting bundle timestamp to " + timestamp + "."); 1.198 + 1.199 + return bundle; 1.200 + } 1.201 + 1.202 + /** 1.203 + * Just like finish(), but doesn't do any work that should only be performed 1.204 + * at the end of a successful sync, and can be called any time. 1.205 + */ 1.206 + public void abort(RepositorySessionFinishDelegate delegate) { 1.207 + this.abort(); 1.208 + delegate.deferredFinishDelegate(delegateQueue).onFinishSucceeded(this, this.getBundle()); 1.209 + } 1.210 + 1.211 + /** 1.212 + * Abnormally terminate the repository session, freeing or closing 1.213 + * any resources that were opened during the lifetime of the session. 1.214 + */ 1.215 + public void abort() { 1.216 + // TODO: do something here. 1.217 + this.setStatus(SessionStatus.ABORTED); 1.218 + try { 1.219 + storeWorkQueue.shutdownNow(); 1.220 + } catch (Exception e) { 1.221 + Logger.error(LOG_TAG, "Caught exception shutting down store work queue.", e); 1.222 + } 1.223 + try { 1.224 + delegateQueue.shutdown(); 1.225 + } catch (Exception e) { 1.226 + Logger.error(LOG_TAG, "Caught exception shutting down delegate queue.", e); 1.227 + } 1.228 + } 1.229 + 1.230 + /** 1.231 + * End the repository session, freeing or closing any resources 1.232 + * that were opened during the lifetime of the session. 1.233 + * 1.234 + * @param delegate notified of success or failure. 1.235 + * @throws InactiveSessionException 1.236 + */ 1.237 + public void finish(final RepositorySessionFinishDelegate delegate) throws InactiveSessionException { 1.238 + try { 1.239 + this.transitionFrom(SessionStatus.ACTIVE, SessionStatus.DONE); 1.240 + delegate.deferredFinishDelegate(delegateQueue).onFinishSucceeded(this, this.getBundle()); 1.241 + } catch (InvalidSessionTransitionException e) { 1.242 + Logger.error(LOG_TAG, "Tried to finish() an unstarted or already finished session"); 1.243 + throw new InactiveSessionException(e); 1.244 + } 1.245 + 1.246 + Logger.trace(LOG_TAG, "Shutting down work queues."); 1.247 + storeWorkQueue.shutdown(); 1.248 + delegateQueue.shutdown(); 1.249 + } 1.250 + 1.251 + /** 1.252 + * Run the provided command if we're active and our delegate queue 1.253 + * is not shut down. 1.254 + */ 1.255 + protected synchronized void executeDelegateCommand(Runnable command) 1.256 + throws InactiveSessionException { 1.257 + if (!isActive() || delegateQueue.isShutdown()) { 1.258 + throw new InactiveSessionException(null); 1.259 + } 1.260 + delegateQueue.execute(command); 1.261 + } 1.262 + 1.263 + public synchronized void ensureActive() throws InactiveSessionException { 1.264 + if (!isActive()) { 1.265 + throw new InactiveSessionException(null); 1.266 + } 1.267 + } 1.268 + 1.269 + public synchronized boolean isActive() { 1.270 + return status == SessionStatus.ACTIVE; 1.271 + } 1.272 + 1.273 + public synchronized SessionStatus getStatus() { 1.274 + return status; 1.275 + } 1.276 + 1.277 + public synchronized void setStatus(SessionStatus status) { 1.278 + this.status = status; 1.279 + } 1.280 + 1.281 + public synchronized void transitionFrom(SessionStatus from, SessionStatus to) throws InvalidSessionTransitionException { 1.282 + if (from == null || this.status == from) { 1.283 + Logger.trace(LOG_TAG, "Successfully transitioning from " + this.status + " to " + to); 1.284 + 1.285 + this.status = to; 1.286 + return; 1.287 + } 1.288 + Logger.warn(LOG_TAG, "Wanted to transition from " + from + " but in state " + this.status); 1.289 + throw new InvalidSessionTransitionException(null); 1.290 + } 1.291 + 1.292 + /** 1.293 + * Produce a record that is some combination of the remote and local records 1.294 + * provided. 1.295 + * 1.296 + * The returned record must be produced without mutating either remoteRecord 1.297 + * or localRecord. It is acceptable to return either remoteRecord or localRecord 1.298 + * if no modifications are to be propagated. 1.299 + * 1.300 + * The returned record *should* have the local androidID and the remote GUID, 1.301 + * and some optional merge of data from the two records. 1.302 + * 1.303 + * This method can be called with records that are identical, or differ in 1.304 + * any regard. 1.305 + * 1.306 + * This method will not be called if: 1.307 + * 1.308 + * * either record is marked as deleted, or 1.309 + * * there is no local mapping for a new remote record. 1.310 + * 1.311 + * Otherwise, it will be called precisely once. 1.312 + * 1.313 + * Side-effects (e.g., for transactional storage) can be hooked in here. 1.314 + * 1.315 + * @param remoteRecord 1.316 + * The record retrieved from upstream, already adjusted for clock skew. 1.317 + * @param localRecord 1.318 + * The record retrieved from local storage. 1.319 + * @param lastRemoteRetrieval 1.320 + * The timestamp of the last retrieved set of remote records, adjusted for 1.321 + * clock skew. 1.322 + * @param lastLocalRetrieval 1.323 + * The timestamp of the last retrieved set of local records. 1.324 + * @return 1.325 + * A Record instance to apply, or null to apply nothing. 1.326 + */ 1.327 + protected Record reconcileRecords(final Record remoteRecord, 1.328 + final Record localRecord, 1.329 + final long lastRemoteRetrieval, 1.330 + final long lastLocalRetrieval) { 1.331 + Logger.debug(LOG_TAG, "Reconciling remote " + remoteRecord.guid + " against local " + localRecord.guid); 1.332 + 1.333 + if (localRecord.equalPayloads(remoteRecord)) { 1.334 + if (remoteRecord.lastModified > localRecord.lastModified) { 1.335 + Logger.debug(LOG_TAG, "Records are equal. No record application needed."); 1.336 + return null; 1.337 + } 1.338 + 1.339 + // Local wins. 1.340 + return null; 1.341 + } 1.342 + 1.343 + // TODO: Decide what to do based on: 1.344 + // * Which of the two records is modified; 1.345 + // * Whether they are equal or congruent; 1.346 + // * The modified times of each record (interpreted through the lens of clock skew); 1.347 + // * ... 1.348 + boolean localIsMoreRecent = localRecord.lastModified > remoteRecord.lastModified; 1.349 + Logger.debug(LOG_TAG, "Local record is more recent? " + localIsMoreRecent); 1.350 + Record donor = localIsMoreRecent ? localRecord : remoteRecord; 1.351 + 1.352 + // Modify the local record to match the remote record's GUID and values. 1.353 + // Preserve the local Android ID, and merge data where possible. 1.354 + // It sure would be nice if copyWithIDs didn't give a shit about androidID, mm? 1.355 + Record out = donor.copyWithIDs(remoteRecord.guid, localRecord.androidID); 1.356 + 1.357 + // We don't want to upload the record if the remote record was 1.358 + // applied without changes. 1.359 + // This logic will become more complicated as reconciling becomes smarter. 1.360 + if (!localIsMoreRecent) { 1.361 + trackGUID(out.guid); 1.362 + } 1.363 + return out; 1.364 + } 1.365 + 1.366 + /** 1.367 + * Depending on the RepositorySession implementation, track 1.368 + * that a record — most likely a brand-new record that has been 1.369 + * applied unmodified — should be tracked so as to not be uploaded 1.370 + * redundantly. 1.371 + * 1.372 + * The default implementations do nothing. 1.373 + */ 1.374 + protected void trackGUID(String guid) { 1.375 + } 1.376 + 1.377 + protected synchronized void untrackGUIDs(Collection<String> guids) { 1.378 + } 1.379 + 1.380 + protected void untrackGUID(String guid) { 1.381 + } 1.382 + 1.383 + // Ah, Java. You wretched creature. 1.384 + public Iterator<String> getTrackedRecordIDs() { 1.385 + return new ArrayList<String>().iterator(); 1.386 + } 1.387 +}