mobile/android/base/sync/synchronizer/RecordsChannel.java

Wed, 31 Dec 2014 06:09:35 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Wed, 31 Dec 2014 06:09:35 +0100
changeset 0
6474c204b198
permissions
-rw-r--r--

Cloned upstream origin tor-browser at tor-browser-31.3.0esr-4.5-1-build1
revision ID fc1c9ff7c1b2defdbc039f12214767608f46423f for hacking purpose.

michael@0 1 /* This Source Code Form is subject to the terms of the Mozilla Public
michael@0 2 * License, v. 2.0. If a copy of the MPL was not distributed with this
michael@0 3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
michael@0 4
michael@0 5 package org.mozilla.gecko.sync.synchronizer;
michael@0 6
michael@0 7 import java.util.concurrent.ConcurrentLinkedQueue;
michael@0 8 import java.util.concurrent.ExecutorService;
michael@0 9 import java.util.concurrent.atomic.AtomicInteger;
michael@0 10
michael@0 11 import org.mozilla.gecko.background.common.log.Logger;
michael@0 12 import org.mozilla.gecko.sync.ThreadPool;
michael@0 13 import org.mozilla.gecko.sync.repositories.InvalidSessionTransitionException;
michael@0 14 import org.mozilla.gecko.sync.repositories.NoStoreDelegateException;
michael@0 15 import org.mozilla.gecko.sync.repositories.RepositorySession;
michael@0 16 import org.mozilla.gecko.sync.repositories.delegates.DeferredRepositorySessionBeginDelegate;
michael@0 17 import org.mozilla.gecko.sync.repositories.delegates.DeferredRepositorySessionStoreDelegate;
michael@0 18 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionBeginDelegate;
michael@0 19 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate;
michael@0 20 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
michael@0 21 import org.mozilla.gecko.sync.repositories.domain.Record;
michael@0 22
michael@0 23 /**
michael@0 24 * Pulls records from `source`, applying them to `sink`.
michael@0 25 * Notifies its delegate of errors and completion.
michael@0 26 *
michael@0 27 * All stores (initiated by a fetch) must have been completed before storeDone
michael@0 28 * is invoked on the sink. This is to avoid the existing stored items being
michael@0 29 * considered as the total set, with onStoreCompleted being called when they're
michael@0 30 * done:
michael@0 31 *
michael@0 32 * store(A) store(B)
michael@0 33 * store(C) storeDone()
michael@0 34 * store(A) finishes. Store job begins.
michael@0 35 * store(C) finishes. Store job begins.
michael@0 36 * storeDone() finishes.
michael@0 37 * Storing of A complete.
michael@0 38 * Storing of C complete.
michael@0 39 * We're done! Call onStoreCompleted.
michael@0 40 * store(B) finishes... uh oh.
michael@0 41 *
michael@0 42 * In other words, storeDone must be gated on the synchronous invocation of every store.
michael@0 43 *
michael@0 44 * Similarly, we require that every store callback have returned before onStoreCompleted is invoked.
michael@0 45 *
michael@0 46 * This whole set of guarantees should be achievable thusly:
michael@0 47 *
michael@0 48 * * The fetch process must run in a single thread, and invoke store()
michael@0 49 * synchronously. After processing every incoming record, storeDone is called,
michael@0 50 * setting a flag.
michael@0 51 * If the fetch cannot be implicitly queued, it must be explicitly queued.
michael@0 52 * In this implementation, we assume that fetch callbacks are strictly ordered in this way.
michael@0 53 *
michael@0 54 * * The store process must be (implicitly or explicitly) queued. When the
michael@0 55 * queue empties, the consumer checks the storeDone flag. If it's set, and the
michael@0 56 * queue is exhausted, invoke onStoreCompleted.
michael@0 57 *
michael@0 58 * RecordsChannel exists to enforce this ordering of operations.
michael@0 59 *
michael@0 60 * @author rnewman
michael@0 61 *
michael@0 62 */
michael@0 63 public class RecordsChannel implements
michael@0 64 RepositorySessionFetchRecordsDelegate,
michael@0 65 RepositorySessionStoreDelegate,
michael@0 66 RecordsConsumerDelegate,
michael@0 67 RepositorySessionBeginDelegate {
michael@0 68
michael@0 69 private static final String LOG_TAG = "RecordsChannel";
michael@0 70 public RepositorySession source;
michael@0 71 public RepositorySession sink;
michael@0 72 private RecordsChannelDelegate delegate;
michael@0 73 private long fetchEnd = -1;
michael@0 74
michael@0 75 protected final AtomicInteger numFetched = new AtomicInteger();
michael@0 76 protected final AtomicInteger numFetchFailed = new AtomicInteger();
michael@0 77 protected final AtomicInteger numStored = new AtomicInteger();
michael@0 78 protected final AtomicInteger numStoreFailed = new AtomicInteger();
michael@0 79
michael@0 80 public RecordsChannel(RepositorySession source, RepositorySession sink, RecordsChannelDelegate delegate) {
michael@0 81 this.source = source;
michael@0 82 this.sink = sink;
michael@0 83 this.delegate = delegate;
michael@0 84 }
michael@0 85
michael@0 86 /*
michael@0 87 * We push fetched records into a queue.
michael@0 88 * A separate thread is waiting for us to notify it of work to do.
michael@0 89 * When we tell it to stop, it'll stop. We do that when the fetch
michael@0 90 * is completed.
michael@0 91 * When it stops, we tell the sink that there are no more records,
michael@0 92 * and wait for the sink to tell us that storing is done.
michael@0 93 * Then we notify our delegate of completion.
michael@0 94 */
michael@0 95 private RecordConsumer consumer;
michael@0 96 private boolean waitingForQueueDone = false;
michael@0 97 private ConcurrentLinkedQueue<Record> toProcess = new ConcurrentLinkedQueue<Record>();
michael@0 98
michael@0 99 @Override
michael@0 100 public ConcurrentLinkedQueue<Record> getQueue() {
michael@0 101 return toProcess;
michael@0 102 }
michael@0 103
michael@0 104 protected boolean isReady() {
michael@0 105 return source.isActive() && sink.isActive();
michael@0 106 }
michael@0 107
michael@0 108 /**
michael@0 109 * Get the number of records fetched so far.
michael@0 110 *
michael@0 111 * @return number of fetches.
michael@0 112 */
michael@0 113 public int getFetchCount() {
michael@0 114 return numFetched.get();
michael@0 115 }
michael@0 116
michael@0 117 /**
michael@0 118 * Get the number of fetch failures recorded so far.
michael@0 119 *
michael@0 120 * @return number of fetch failures.
michael@0 121 */
michael@0 122 public int getFetchFailureCount() {
michael@0 123 return numFetchFailed.get();
michael@0 124 }
michael@0 125
michael@0 126 /**
michael@0 127 * Get the number of store attempts (successful or not) so far.
michael@0 128 *
michael@0 129 * @return number of stores attempted.
michael@0 130 */
michael@0 131 public int getStoreCount() {
michael@0 132 return numStored.get();
michael@0 133 }
michael@0 134
michael@0 135 /**
michael@0 136 * Get the number of store failures recorded so far.
michael@0 137 *
michael@0 138 * @return number of store failures.
michael@0 139 */
michael@0 140 public int getStoreFailureCount() {
michael@0 141 return numStoreFailed.get();
michael@0 142 }
michael@0 143
michael@0 144 /**
michael@0 145 * Start records flowing through the channel.
michael@0 146 */
michael@0 147 public void flow() {
michael@0 148 if (!isReady()) {
michael@0 149 RepositorySession failed = source;
michael@0 150 if (source.isActive()) {
michael@0 151 failed = sink;
michael@0 152 }
michael@0 153 this.delegate.onFlowBeginFailed(this, new SessionNotBegunException(failed));
michael@0 154 return;
michael@0 155 }
michael@0 156
michael@0 157 if (!source.dataAvailable()) {
michael@0 158 Logger.info(LOG_TAG, "No data available: short-circuiting flow from source " + source);
michael@0 159 long now = System.currentTimeMillis();
michael@0 160 this.delegate.onFlowCompleted(this, now, now);
michael@0 161 return;
michael@0 162 }
michael@0 163
michael@0 164 sink.setStoreDelegate(this);
michael@0 165 numFetched.set(0);
michael@0 166 numFetchFailed.set(0);
michael@0 167 numStored.set(0);
michael@0 168 numStoreFailed.set(0);
michael@0 169 // Start a consumer thread.
michael@0 170 this.consumer = new ConcurrentRecordConsumer(this);
michael@0 171 ThreadPool.run(this.consumer);
michael@0 172 waitingForQueueDone = true;
michael@0 173 source.fetchSince(source.getLastSyncTimestamp(), this);
michael@0 174 }
michael@0 175
michael@0 176 /**
michael@0 177 * Begin both sessions, invoking flow() when done.
michael@0 178 * @throws InvalidSessionTransitionException
michael@0 179 */
michael@0 180 public void beginAndFlow() throws InvalidSessionTransitionException {
michael@0 181 Logger.trace(LOG_TAG, "Beginning source.");
michael@0 182 source.begin(this);
michael@0 183 }
michael@0 184
michael@0 185 @Override
michael@0 186 public void store(Record record) {
michael@0 187 numStored.incrementAndGet();
michael@0 188 try {
michael@0 189 sink.store(record);
michael@0 190 } catch (NoStoreDelegateException e) {
michael@0 191 Logger.error(LOG_TAG, "Got NoStoreDelegateException in RecordsChannel.store(). This should not occur. Aborting.", e);
michael@0 192 delegate.onFlowStoreFailed(this, e, record.guid);
michael@0 193 }
michael@0 194 }
michael@0 195
michael@0 196 @Override
michael@0 197 public void onFetchFailed(Exception ex, Record record) {
michael@0 198 Logger.warn(LOG_TAG, "onFetchFailed. Calling for immediate stop.", ex);
michael@0 199 numFetchFailed.incrementAndGet();
michael@0 200 this.consumer.halt();
michael@0 201 delegate.onFlowFetchFailed(this, ex);
michael@0 202 }
michael@0 203
michael@0 204 @Override
michael@0 205 public void onFetchedRecord(Record record) {
michael@0 206 numFetched.incrementAndGet();
michael@0 207 this.toProcess.add(record);
michael@0 208 this.consumer.doNotify();
michael@0 209 }
michael@0 210
michael@0 211 @Override
michael@0 212 public void onFetchCompleted(final long fetchEnd) {
michael@0 213 Logger.trace(LOG_TAG, "onFetchCompleted. Stopping consumer once stores are done.");
michael@0 214 Logger.trace(LOG_TAG, "Fetch timestamp is " + fetchEnd);
michael@0 215 this.fetchEnd = fetchEnd;
michael@0 216 this.consumer.queueFilled();
michael@0 217 }
michael@0 218
michael@0 219 @Override
michael@0 220 public void onRecordStoreFailed(Exception ex, String recordGuid) {
michael@0 221 Logger.trace(LOG_TAG, "Failed to store record with guid " + recordGuid);
michael@0 222 numStoreFailed.incrementAndGet();
michael@0 223 this.consumer.stored();
michael@0 224 delegate.onFlowStoreFailed(this, ex, recordGuid);
michael@0 225 // TODO: abort?
michael@0 226 }
michael@0 227
michael@0 228 @Override
michael@0 229 public void onRecordStoreSucceeded(String guid) {
michael@0 230 Logger.trace(LOG_TAG, "Stored record with guid " + guid);
michael@0 231 this.consumer.stored();
michael@0 232 }
michael@0 233
michael@0 234
michael@0 235 @Override
michael@0 236 public void consumerIsDone(boolean allRecordsQueued) {
michael@0 237 Logger.trace(LOG_TAG, "Consumer is done. Are we waiting for it? " + waitingForQueueDone);
michael@0 238 if (waitingForQueueDone) {
michael@0 239 waitingForQueueDone = false;
michael@0 240 this.sink.storeDone(); // Now we'll be waiting for onStoreCompleted.
michael@0 241 }
michael@0 242 }
michael@0 243
michael@0 244 @Override
michael@0 245 public void onStoreCompleted(long storeEnd) {
michael@0 246 Logger.trace(LOG_TAG, "onStoreCompleted. Notifying delegate of onFlowCompleted. " +
michael@0 247 "Fetch end is " + fetchEnd + ", store end is " + storeEnd);
michael@0 248 // TODO: synchronize on consumer callback?
michael@0 249 delegate.onFlowCompleted(this, fetchEnd, storeEnd);
michael@0 250 }
michael@0 251
michael@0 252 @Override
michael@0 253 public void onBeginFailed(Exception ex) {
michael@0 254 delegate.onFlowBeginFailed(this, ex);
michael@0 255 }
michael@0 256
michael@0 257 @Override
michael@0 258 public void onBeginSucceeded(RepositorySession session) {
michael@0 259 if (session == source) {
michael@0 260 Logger.trace(LOG_TAG, "Source session began. Beginning sink session.");
michael@0 261 try {
michael@0 262 sink.begin(this);
michael@0 263 } catch (InvalidSessionTransitionException e) {
michael@0 264 onBeginFailed(e);
michael@0 265 return;
michael@0 266 }
michael@0 267 }
michael@0 268 if (session == sink) {
michael@0 269 Logger.trace(LOG_TAG, "Sink session began. Beginning flow.");
michael@0 270 this.flow();
michael@0 271 return;
michael@0 272 }
michael@0 273
michael@0 274 // TODO: error!
michael@0 275 }
michael@0 276
michael@0 277 @Override
michael@0 278 public RepositorySessionStoreDelegate deferredStoreDelegate(final ExecutorService executor) {
michael@0 279 return new DeferredRepositorySessionStoreDelegate(this, executor);
michael@0 280 }
michael@0 281
michael@0 282 @Override
michael@0 283 public RepositorySessionBeginDelegate deferredBeginDelegate(final ExecutorService executor) {
michael@0 284 return new DeferredRepositorySessionBeginDelegate(this, executor);
michael@0 285 }
michael@0 286
michael@0 287 @Override
michael@0 288 public RepositorySessionFetchRecordsDelegate deferredFetchDelegate(ExecutorService executor) {
michael@0 289 // Lie outright. We know that all of our fetch methods are safe.
michael@0 290 return this;
michael@0 291 }
michael@0 292 }

mercurial