michael@0: /* This Source Code Form is subject to the terms of the Mozilla Public michael@0: * License, v. 2.0. If a copy of the MPL was not distributed with this michael@0: * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ michael@0: michael@0: package org.mozilla.gecko.sync.synchronizer; michael@0: michael@0: import java.util.concurrent.ConcurrentLinkedQueue; michael@0: import java.util.concurrent.ExecutorService; michael@0: import java.util.concurrent.atomic.AtomicInteger; michael@0: michael@0: import org.mozilla.gecko.background.common.log.Logger; michael@0: import org.mozilla.gecko.sync.ThreadPool; michael@0: import org.mozilla.gecko.sync.repositories.InvalidSessionTransitionException; michael@0: import org.mozilla.gecko.sync.repositories.NoStoreDelegateException; michael@0: import org.mozilla.gecko.sync.repositories.RepositorySession; michael@0: import org.mozilla.gecko.sync.repositories.delegates.DeferredRepositorySessionBeginDelegate; michael@0: import org.mozilla.gecko.sync.repositories.delegates.DeferredRepositorySessionStoreDelegate; michael@0: import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionBeginDelegate; michael@0: import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate; michael@0: import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate; michael@0: import org.mozilla.gecko.sync.repositories.domain.Record; michael@0: michael@0: /** michael@0: * Pulls records from `source`, applying them to `sink`. michael@0: * Notifies its delegate of errors and completion. michael@0: * michael@0: * All stores (initiated by a fetch) must have been completed before storeDone michael@0: * is invoked on the sink. This is to avoid the existing stored items being michael@0: * considered as the total set, with onStoreCompleted being called when they're michael@0: * done: michael@0: * michael@0: * store(A) store(B) michael@0: * store(C) storeDone() michael@0: * store(A) finishes. Store job begins. michael@0: * store(C) finishes. Store job begins. michael@0: * storeDone() finishes. michael@0: * Storing of A complete. michael@0: * Storing of C complete. michael@0: * We're done! Call onStoreCompleted. michael@0: * store(B) finishes... uh oh. michael@0: * michael@0: * In other words, storeDone must be gated on the synchronous invocation of every store. michael@0: * michael@0: * Similarly, we require that every store callback have returned before onStoreCompleted is invoked. michael@0: * michael@0: * This whole set of guarantees should be achievable thusly: michael@0: * michael@0: * * The fetch process must run in a single thread, and invoke store() michael@0: * synchronously. After processing every incoming record, storeDone is called, michael@0: * setting a flag. michael@0: * If the fetch cannot be implicitly queued, it must be explicitly queued. michael@0: * In this implementation, we assume that fetch callbacks are strictly ordered in this way. michael@0: * michael@0: * * The store process must be (implicitly or explicitly) queued. When the michael@0: * queue empties, the consumer checks the storeDone flag. If it's set, and the michael@0: * queue is exhausted, invoke onStoreCompleted. michael@0: * michael@0: * RecordsChannel exists to enforce this ordering of operations. michael@0: * michael@0: * @author rnewman michael@0: * michael@0: */ michael@0: public class RecordsChannel implements michael@0: RepositorySessionFetchRecordsDelegate, michael@0: RepositorySessionStoreDelegate, michael@0: RecordsConsumerDelegate, michael@0: RepositorySessionBeginDelegate { michael@0: michael@0: private static final String LOG_TAG = "RecordsChannel"; michael@0: public RepositorySession source; michael@0: public RepositorySession sink; michael@0: private RecordsChannelDelegate delegate; michael@0: private long fetchEnd = -1; michael@0: michael@0: protected final AtomicInteger numFetched = new AtomicInteger(); michael@0: protected final AtomicInteger numFetchFailed = new AtomicInteger(); michael@0: protected final AtomicInteger numStored = new AtomicInteger(); michael@0: protected final AtomicInteger numStoreFailed = new AtomicInteger(); michael@0: michael@0: public RecordsChannel(RepositorySession source, RepositorySession sink, RecordsChannelDelegate delegate) { michael@0: this.source = source; michael@0: this.sink = sink; michael@0: this.delegate = delegate; michael@0: } michael@0: michael@0: /* michael@0: * We push fetched records into a queue. michael@0: * A separate thread is waiting for us to notify it of work to do. michael@0: * When we tell it to stop, it'll stop. We do that when the fetch michael@0: * is completed. michael@0: * When it stops, we tell the sink that there are no more records, michael@0: * and wait for the sink to tell us that storing is done. michael@0: * Then we notify our delegate of completion. michael@0: */ michael@0: private RecordConsumer consumer; michael@0: private boolean waitingForQueueDone = false; michael@0: private ConcurrentLinkedQueue toProcess = new ConcurrentLinkedQueue(); michael@0: michael@0: @Override michael@0: public ConcurrentLinkedQueue getQueue() { michael@0: return toProcess; michael@0: } michael@0: michael@0: protected boolean isReady() { michael@0: return source.isActive() && sink.isActive(); michael@0: } michael@0: michael@0: /** michael@0: * Get the number of records fetched so far. michael@0: * michael@0: * @return number of fetches. michael@0: */ michael@0: public int getFetchCount() { michael@0: return numFetched.get(); michael@0: } michael@0: michael@0: /** michael@0: * Get the number of fetch failures recorded so far. michael@0: * michael@0: * @return number of fetch failures. michael@0: */ michael@0: public int getFetchFailureCount() { michael@0: return numFetchFailed.get(); michael@0: } michael@0: michael@0: /** michael@0: * Get the number of store attempts (successful or not) so far. michael@0: * michael@0: * @return number of stores attempted. michael@0: */ michael@0: public int getStoreCount() { michael@0: return numStored.get(); michael@0: } michael@0: michael@0: /** michael@0: * Get the number of store failures recorded so far. michael@0: * michael@0: * @return number of store failures. michael@0: */ michael@0: public int getStoreFailureCount() { michael@0: return numStoreFailed.get(); michael@0: } michael@0: michael@0: /** michael@0: * Start records flowing through the channel. michael@0: */ michael@0: public void flow() { michael@0: if (!isReady()) { michael@0: RepositorySession failed = source; michael@0: if (source.isActive()) { michael@0: failed = sink; michael@0: } michael@0: this.delegate.onFlowBeginFailed(this, new SessionNotBegunException(failed)); michael@0: return; michael@0: } michael@0: michael@0: if (!source.dataAvailable()) { michael@0: Logger.info(LOG_TAG, "No data available: short-circuiting flow from source " + source); michael@0: long now = System.currentTimeMillis(); michael@0: this.delegate.onFlowCompleted(this, now, now); michael@0: return; michael@0: } michael@0: michael@0: sink.setStoreDelegate(this); michael@0: numFetched.set(0); michael@0: numFetchFailed.set(0); michael@0: numStored.set(0); michael@0: numStoreFailed.set(0); michael@0: // Start a consumer thread. michael@0: this.consumer = new ConcurrentRecordConsumer(this); michael@0: ThreadPool.run(this.consumer); michael@0: waitingForQueueDone = true; michael@0: source.fetchSince(source.getLastSyncTimestamp(), this); michael@0: } michael@0: michael@0: /** michael@0: * Begin both sessions, invoking flow() when done. michael@0: * @throws InvalidSessionTransitionException michael@0: */ michael@0: public void beginAndFlow() throws InvalidSessionTransitionException { michael@0: Logger.trace(LOG_TAG, "Beginning source."); michael@0: source.begin(this); michael@0: } michael@0: michael@0: @Override michael@0: public void store(Record record) { michael@0: numStored.incrementAndGet(); michael@0: try { michael@0: sink.store(record); michael@0: } catch (NoStoreDelegateException e) { michael@0: Logger.error(LOG_TAG, "Got NoStoreDelegateException in RecordsChannel.store(). This should not occur. Aborting.", e); michael@0: delegate.onFlowStoreFailed(this, e, record.guid); michael@0: } michael@0: } michael@0: michael@0: @Override michael@0: public void onFetchFailed(Exception ex, Record record) { michael@0: Logger.warn(LOG_TAG, "onFetchFailed. Calling for immediate stop.", ex); michael@0: numFetchFailed.incrementAndGet(); michael@0: this.consumer.halt(); michael@0: delegate.onFlowFetchFailed(this, ex); michael@0: } michael@0: michael@0: @Override michael@0: public void onFetchedRecord(Record record) { michael@0: numFetched.incrementAndGet(); michael@0: this.toProcess.add(record); michael@0: this.consumer.doNotify(); michael@0: } michael@0: michael@0: @Override michael@0: public void onFetchCompleted(final long fetchEnd) { michael@0: Logger.trace(LOG_TAG, "onFetchCompleted. Stopping consumer once stores are done."); michael@0: Logger.trace(LOG_TAG, "Fetch timestamp is " + fetchEnd); michael@0: this.fetchEnd = fetchEnd; michael@0: this.consumer.queueFilled(); michael@0: } michael@0: michael@0: @Override michael@0: public void onRecordStoreFailed(Exception ex, String recordGuid) { michael@0: Logger.trace(LOG_TAG, "Failed to store record with guid " + recordGuid); michael@0: numStoreFailed.incrementAndGet(); michael@0: this.consumer.stored(); michael@0: delegate.onFlowStoreFailed(this, ex, recordGuid); michael@0: // TODO: abort? michael@0: } michael@0: michael@0: @Override michael@0: public void onRecordStoreSucceeded(String guid) { michael@0: Logger.trace(LOG_TAG, "Stored record with guid " + guid); michael@0: this.consumer.stored(); michael@0: } michael@0: michael@0: michael@0: @Override michael@0: public void consumerIsDone(boolean allRecordsQueued) { michael@0: Logger.trace(LOG_TAG, "Consumer is done. Are we waiting for it? " + waitingForQueueDone); michael@0: if (waitingForQueueDone) { michael@0: waitingForQueueDone = false; michael@0: this.sink.storeDone(); // Now we'll be waiting for onStoreCompleted. michael@0: } michael@0: } michael@0: michael@0: @Override michael@0: public void onStoreCompleted(long storeEnd) { michael@0: Logger.trace(LOG_TAG, "onStoreCompleted. Notifying delegate of onFlowCompleted. " + michael@0: "Fetch end is " + fetchEnd + ", store end is " + storeEnd); michael@0: // TODO: synchronize on consumer callback? michael@0: delegate.onFlowCompleted(this, fetchEnd, storeEnd); michael@0: } michael@0: michael@0: @Override michael@0: public void onBeginFailed(Exception ex) { michael@0: delegate.onFlowBeginFailed(this, ex); michael@0: } michael@0: michael@0: @Override michael@0: public void onBeginSucceeded(RepositorySession session) { michael@0: if (session == source) { michael@0: Logger.trace(LOG_TAG, "Source session began. Beginning sink session."); michael@0: try { michael@0: sink.begin(this); michael@0: } catch (InvalidSessionTransitionException e) { michael@0: onBeginFailed(e); michael@0: return; michael@0: } michael@0: } michael@0: if (session == sink) { michael@0: Logger.trace(LOG_TAG, "Sink session began. Beginning flow."); michael@0: this.flow(); michael@0: return; michael@0: } michael@0: michael@0: // TODO: error! michael@0: } michael@0: michael@0: @Override michael@0: public RepositorySessionStoreDelegate deferredStoreDelegate(final ExecutorService executor) { michael@0: return new DeferredRepositorySessionStoreDelegate(this, executor); michael@0: } michael@0: michael@0: @Override michael@0: public RepositorySessionBeginDelegate deferredBeginDelegate(final ExecutorService executor) { michael@0: return new DeferredRepositorySessionBeginDelegate(this, executor); michael@0: } michael@0: michael@0: @Override michael@0: public RepositorySessionFetchRecordsDelegate deferredFetchDelegate(ExecutorService executor) { michael@0: // Lie outright. We know that all of our fetch methods are safe. michael@0: return this; michael@0: } michael@0: }