1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 1.2 +++ b/mobile/android/base/sync/synchronizer/RecordsChannel.java Wed Dec 31 06:09:35 2014 +0100 1.3 @@ -0,0 +1,292 @@ 1.4 +/* This Source Code Form is subject to the terms of the Mozilla Public 1.5 + * License, v. 2.0. If a copy of the MPL was not distributed with this 1.6 + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 1.7 + 1.8 +package org.mozilla.gecko.sync.synchronizer; 1.9 + 1.10 +import java.util.concurrent.ConcurrentLinkedQueue; 1.11 +import java.util.concurrent.ExecutorService; 1.12 +import java.util.concurrent.atomic.AtomicInteger; 1.13 + 1.14 +import org.mozilla.gecko.background.common.log.Logger; 1.15 +import org.mozilla.gecko.sync.ThreadPool; 1.16 +import org.mozilla.gecko.sync.repositories.InvalidSessionTransitionException; 1.17 +import org.mozilla.gecko.sync.repositories.NoStoreDelegateException; 1.18 +import org.mozilla.gecko.sync.repositories.RepositorySession; 1.19 +import org.mozilla.gecko.sync.repositories.delegates.DeferredRepositorySessionBeginDelegate; 1.20 +import org.mozilla.gecko.sync.repositories.delegates.DeferredRepositorySessionStoreDelegate; 1.21 +import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionBeginDelegate; 1.22 +import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate; 1.23 +import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate; 1.24 +import org.mozilla.gecko.sync.repositories.domain.Record; 1.25 + 1.26 +/** 1.27 + * Pulls records from `source`, applying them to `sink`. 1.28 + * Notifies its delegate of errors and completion. 1.29 + * 1.30 + * All stores (initiated by a fetch) must have been completed before storeDone 1.31 + * is invoked on the sink. This is to avoid the existing stored items being 1.32 + * considered as the total set, with onStoreCompleted being called when they're 1.33 + * done: 1.34 + * 1.35 + * store(A) store(B) 1.36 + * store(C) storeDone() 1.37 + * store(A) finishes. Store job begins. 1.38 + * store(C) finishes. Store job begins. 1.39 + * storeDone() finishes. 1.40 + * Storing of A complete. 1.41 + * Storing of C complete. 1.42 + * We're done! Call onStoreCompleted. 1.43 + * store(B) finishes... uh oh. 1.44 + * 1.45 + * In other words, storeDone must be gated on the synchronous invocation of every store. 1.46 + * 1.47 + * Similarly, we require that every store callback have returned before onStoreCompleted is invoked. 1.48 + * 1.49 + * This whole set of guarantees should be achievable thusly: 1.50 + * 1.51 + * * The fetch process must run in a single thread, and invoke store() 1.52 + * synchronously. After processing every incoming record, storeDone is called, 1.53 + * setting a flag. 1.54 + * If the fetch cannot be implicitly queued, it must be explicitly queued. 1.55 + * In this implementation, we assume that fetch callbacks are strictly ordered in this way. 1.56 + * 1.57 + * * The store process must be (implicitly or explicitly) queued. When the 1.58 + * queue empties, the consumer checks the storeDone flag. If it's set, and the 1.59 + * queue is exhausted, invoke onStoreCompleted. 1.60 + * 1.61 + * RecordsChannel exists to enforce this ordering of operations. 1.62 + * 1.63 + * @author rnewman 1.64 + * 1.65 + */ 1.66 +public class RecordsChannel implements 1.67 + RepositorySessionFetchRecordsDelegate, 1.68 + RepositorySessionStoreDelegate, 1.69 + RecordsConsumerDelegate, 1.70 + RepositorySessionBeginDelegate { 1.71 + 1.72 + private static final String LOG_TAG = "RecordsChannel"; 1.73 + public RepositorySession source; 1.74 + public RepositorySession sink; 1.75 + private RecordsChannelDelegate delegate; 1.76 + private long fetchEnd = -1; 1.77 + 1.78 + protected final AtomicInteger numFetched = new AtomicInteger(); 1.79 + protected final AtomicInteger numFetchFailed = new AtomicInteger(); 1.80 + protected final AtomicInteger numStored = new AtomicInteger(); 1.81 + protected final AtomicInteger numStoreFailed = new AtomicInteger(); 1.82 + 1.83 + public RecordsChannel(RepositorySession source, RepositorySession sink, RecordsChannelDelegate delegate) { 1.84 + this.source = source; 1.85 + this.sink = sink; 1.86 + this.delegate = delegate; 1.87 + } 1.88 + 1.89 + /* 1.90 + * We push fetched records into a queue. 1.91 + * A separate thread is waiting for us to notify it of work to do. 1.92 + * When we tell it to stop, it'll stop. We do that when the fetch 1.93 + * is completed. 1.94 + * When it stops, we tell the sink that there are no more records, 1.95 + * and wait for the sink to tell us that storing is done. 1.96 + * Then we notify our delegate of completion. 1.97 + */ 1.98 + private RecordConsumer consumer; 1.99 + private boolean waitingForQueueDone = false; 1.100 + private ConcurrentLinkedQueue<Record> toProcess = new ConcurrentLinkedQueue<Record>(); 1.101 + 1.102 + @Override 1.103 + public ConcurrentLinkedQueue<Record> getQueue() { 1.104 + return toProcess; 1.105 + } 1.106 + 1.107 + protected boolean isReady() { 1.108 + return source.isActive() && sink.isActive(); 1.109 + } 1.110 + 1.111 + /** 1.112 + * Get the number of records fetched so far. 1.113 + * 1.114 + * @return number of fetches. 1.115 + */ 1.116 + public int getFetchCount() { 1.117 + return numFetched.get(); 1.118 + } 1.119 + 1.120 + /** 1.121 + * Get the number of fetch failures recorded so far. 1.122 + * 1.123 + * @return number of fetch failures. 1.124 + */ 1.125 + public int getFetchFailureCount() { 1.126 + return numFetchFailed.get(); 1.127 + } 1.128 + 1.129 + /** 1.130 + * Get the number of store attempts (successful or not) so far. 1.131 + * 1.132 + * @return number of stores attempted. 1.133 + */ 1.134 + public int getStoreCount() { 1.135 + return numStored.get(); 1.136 + } 1.137 + 1.138 + /** 1.139 + * Get the number of store failures recorded so far. 1.140 + * 1.141 + * @return number of store failures. 1.142 + */ 1.143 + public int getStoreFailureCount() { 1.144 + return numStoreFailed.get(); 1.145 + } 1.146 + 1.147 + /** 1.148 + * Start records flowing through the channel. 1.149 + */ 1.150 + public void flow() { 1.151 + if (!isReady()) { 1.152 + RepositorySession failed = source; 1.153 + if (source.isActive()) { 1.154 + failed = sink; 1.155 + } 1.156 + this.delegate.onFlowBeginFailed(this, new SessionNotBegunException(failed)); 1.157 + return; 1.158 + } 1.159 + 1.160 + if (!source.dataAvailable()) { 1.161 + Logger.info(LOG_TAG, "No data available: short-circuiting flow from source " + source); 1.162 + long now = System.currentTimeMillis(); 1.163 + this.delegate.onFlowCompleted(this, now, now); 1.164 + return; 1.165 + } 1.166 + 1.167 + sink.setStoreDelegate(this); 1.168 + numFetched.set(0); 1.169 + numFetchFailed.set(0); 1.170 + numStored.set(0); 1.171 + numStoreFailed.set(0); 1.172 + // Start a consumer thread. 1.173 + this.consumer = new ConcurrentRecordConsumer(this); 1.174 + ThreadPool.run(this.consumer); 1.175 + waitingForQueueDone = true; 1.176 + source.fetchSince(source.getLastSyncTimestamp(), this); 1.177 + } 1.178 + 1.179 + /** 1.180 + * Begin both sessions, invoking flow() when done. 1.181 + * @throws InvalidSessionTransitionException 1.182 + */ 1.183 + public void beginAndFlow() throws InvalidSessionTransitionException { 1.184 + Logger.trace(LOG_TAG, "Beginning source."); 1.185 + source.begin(this); 1.186 + } 1.187 + 1.188 + @Override 1.189 + public void store(Record record) { 1.190 + numStored.incrementAndGet(); 1.191 + try { 1.192 + sink.store(record); 1.193 + } catch (NoStoreDelegateException e) { 1.194 + Logger.error(LOG_TAG, "Got NoStoreDelegateException in RecordsChannel.store(). This should not occur. Aborting.", e); 1.195 + delegate.onFlowStoreFailed(this, e, record.guid); 1.196 + } 1.197 + } 1.198 + 1.199 + @Override 1.200 + public void onFetchFailed(Exception ex, Record record) { 1.201 + Logger.warn(LOG_TAG, "onFetchFailed. Calling for immediate stop.", ex); 1.202 + numFetchFailed.incrementAndGet(); 1.203 + this.consumer.halt(); 1.204 + delegate.onFlowFetchFailed(this, ex); 1.205 + } 1.206 + 1.207 + @Override 1.208 + public void onFetchedRecord(Record record) { 1.209 + numFetched.incrementAndGet(); 1.210 + this.toProcess.add(record); 1.211 + this.consumer.doNotify(); 1.212 + } 1.213 + 1.214 + @Override 1.215 + public void onFetchCompleted(final long fetchEnd) { 1.216 + Logger.trace(LOG_TAG, "onFetchCompleted. Stopping consumer once stores are done."); 1.217 + Logger.trace(LOG_TAG, "Fetch timestamp is " + fetchEnd); 1.218 + this.fetchEnd = fetchEnd; 1.219 + this.consumer.queueFilled(); 1.220 + } 1.221 + 1.222 + @Override 1.223 + public void onRecordStoreFailed(Exception ex, String recordGuid) { 1.224 + Logger.trace(LOG_TAG, "Failed to store record with guid " + recordGuid); 1.225 + numStoreFailed.incrementAndGet(); 1.226 + this.consumer.stored(); 1.227 + delegate.onFlowStoreFailed(this, ex, recordGuid); 1.228 + // TODO: abort? 1.229 + } 1.230 + 1.231 + @Override 1.232 + public void onRecordStoreSucceeded(String guid) { 1.233 + Logger.trace(LOG_TAG, "Stored record with guid " + guid); 1.234 + this.consumer.stored(); 1.235 + } 1.236 + 1.237 + 1.238 + @Override 1.239 + public void consumerIsDone(boolean allRecordsQueued) { 1.240 + Logger.trace(LOG_TAG, "Consumer is done. Are we waiting for it? " + waitingForQueueDone); 1.241 + if (waitingForQueueDone) { 1.242 + waitingForQueueDone = false; 1.243 + this.sink.storeDone(); // Now we'll be waiting for onStoreCompleted. 1.244 + } 1.245 + } 1.246 + 1.247 + @Override 1.248 + public void onStoreCompleted(long storeEnd) { 1.249 + Logger.trace(LOG_TAG, "onStoreCompleted. Notifying delegate of onFlowCompleted. " + 1.250 + "Fetch end is " + fetchEnd + ", store end is " + storeEnd); 1.251 + // TODO: synchronize on consumer callback? 1.252 + delegate.onFlowCompleted(this, fetchEnd, storeEnd); 1.253 + } 1.254 + 1.255 + @Override 1.256 + public void onBeginFailed(Exception ex) { 1.257 + delegate.onFlowBeginFailed(this, ex); 1.258 + } 1.259 + 1.260 + @Override 1.261 + public void onBeginSucceeded(RepositorySession session) { 1.262 + if (session == source) { 1.263 + Logger.trace(LOG_TAG, "Source session began. Beginning sink session."); 1.264 + try { 1.265 + sink.begin(this); 1.266 + } catch (InvalidSessionTransitionException e) { 1.267 + onBeginFailed(e); 1.268 + return; 1.269 + } 1.270 + } 1.271 + if (session == sink) { 1.272 + Logger.trace(LOG_TAG, "Sink session began. Beginning flow."); 1.273 + this.flow(); 1.274 + return; 1.275 + } 1.276 + 1.277 + // TODO: error! 1.278 + } 1.279 + 1.280 + @Override 1.281 + public RepositorySessionStoreDelegate deferredStoreDelegate(final ExecutorService executor) { 1.282 + return new DeferredRepositorySessionStoreDelegate(this, executor); 1.283 + } 1.284 + 1.285 + @Override 1.286 + public RepositorySessionBeginDelegate deferredBeginDelegate(final ExecutorService executor) { 1.287 + return new DeferredRepositorySessionBeginDelegate(this, executor); 1.288 + } 1.289 + 1.290 + @Override 1.291 + public RepositorySessionFetchRecordsDelegate deferredFetchDelegate(ExecutorService executor) { 1.292 + // Lie outright. We know that all of our fetch methods are safe. 1.293 + return this; 1.294 + } 1.295 +}