Wed, 31 Dec 2014 07:22:50 +0100
Correct previous dual key logic pending first delivery installment.
1 /* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
5 package org.mozilla.gecko.sync.synchronizer;
7 import java.util.concurrent.ConcurrentLinkedQueue;
8 import java.util.concurrent.ExecutorService;
9 import java.util.concurrent.atomic.AtomicInteger;
11 import org.mozilla.gecko.background.common.log.Logger;
12 import org.mozilla.gecko.sync.ThreadPool;
13 import org.mozilla.gecko.sync.repositories.InvalidSessionTransitionException;
14 import org.mozilla.gecko.sync.repositories.NoStoreDelegateException;
15 import org.mozilla.gecko.sync.repositories.RepositorySession;
16 import org.mozilla.gecko.sync.repositories.delegates.DeferredRepositorySessionBeginDelegate;
17 import org.mozilla.gecko.sync.repositories.delegates.DeferredRepositorySessionStoreDelegate;
18 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionBeginDelegate;
19 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate;
20 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
21 import org.mozilla.gecko.sync.repositories.domain.Record;
23 /**
24 * Pulls records from `source`, applying them to `sink`.
25 * Notifies its delegate of errors and completion.
26 *
27 * All stores (initiated by a fetch) must have been completed before storeDone
28 * is invoked on the sink. This is to avoid the existing stored items being
29 * considered as the total set, with onStoreCompleted being called when they're
30 * done:
31 *
32 * store(A) store(B)
33 * store(C) storeDone()
34 * store(A) finishes. Store job begins.
35 * store(C) finishes. Store job begins.
36 * storeDone() finishes.
37 * Storing of A complete.
38 * Storing of C complete.
39 * We're done! Call onStoreCompleted.
40 * store(B) finishes... uh oh.
41 *
42 * In other words, storeDone must be gated on the synchronous invocation of every store.
43 *
44 * Similarly, we require that every store callback have returned before onStoreCompleted is invoked.
45 *
46 * This whole set of guarantees should be achievable thusly:
47 *
48 * * The fetch process must run in a single thread, and invoke store()
49 * synchronously. After processing every incoming record, storeDone is called,
50 * setting a flag.
51 * If the fetch cannot be implicitly queued, it must be explicitly queued.
52 * In this implementation, we assume that fetch callbacks are strictly ordered in this way.
53 *
54 * * The store process must be (implicitly or explicitly) queued. When the
55 * queue empties, the consumer checks the storeDone flag. If it's set, and the
56 * queue is exhausted, invoke onStoreCompleted.
57 *
58 * RecordsChannel exists to enforce this ordering of operations.
59 *
60 * @author rnewman
61 *
62 */
63 public class RecordsChannel implements
64 RepositorySessionFetchRecordsDelegate,
65 RepositorySessionStoreDelegate,
66 RecordsConsumerDelegate,
67 RepositorySessionBeginDelegate {
69 private static final String LOG_TAG = "RecordsChannel";
70 public RepositorySession source;
71 public RepositorySession sink;
72 private RecordsChannelDelegate delegate;
73 private long fetchEnd = -1;
75 protected final AtomicInteger numFetched = new AtomicInteger();
76 protected final AtomicInteger numFetchFailed = new AtomicInteger();
77 protected final AtomicInteger numStored = new AtomicInteger();
78 protected final AtomicInteger numStoreFailed = new AtomicInteger();
80 public RecordsChannel(RepositorySession source, RepositorySession sink, RecordsChannelDelegate delegate) {
81 this.source = source;
82 this.sink = sink;
83 this.delegate = delegate;
84 }
86 /*
87 * We push fetched records into a queue.
88 * A separate thread is waiting for us to notify it of work to do.
89 * When we tell it to stop, it'll stop. We do that when the fetch
90 * is completed.
91 * When it stops, we tell the sink that there are no more records,
92 * and wait for the sink to tell us that storing is done.
93 * Then we notify our delegate of completion.
94 */
95 private RecordConsumer consumer;
96 private boolean waitingForQueueDone = false;
97 private ConcurrentLinkedQueue<Record> toProcess = new ConcurrentLinkedQueue<Record>();
99 @Override
100 public ConcurrentLinkedQueue<Record> getQueue() {
101 return toProcess;
102 }
104 protected boolean isReady() {
105 return source.isActive() && sink.isActive();
106 }
108 /**
109 * Get the number of records fetched so far.
110 *
111 * @return number of fetches.
112 */
113 public int getFetchCount() {
114 return numFetched.get();
115 }
117 /**
118 * Get the number of fetch failures recorded so far.
119 *
120 * @return number of fetch failures.
121 */
122 public int getFetchFailureCount() {
123 return numFetchFailed.get();
124 }
126 /**
127 * Get the number of store attempts (successful or not) so far.
128 *
129 * @return number of stores attempted.
130 */
131 public int getStoreCount() {
132 return numStored.get();
133 }
135 /**
136 * Get the number of store failures recorded so far.
137 *
138 * @return number of store failures.
139 */
140 public int getStoreFailureCount() {
141 return numStoreFailed.get();
142 }
144 /**
145 * Start records flowing through the channel.
146 */
147 public void flow() {
148 if (!isReady()) {
149 RepositorySession failed = source;
150 if (source.isActive()) {
151 failed = sink;
152 }
153 this.delegate.onFlowBeginFailed(this, new SessionNotBegunException(failed));
154 return;
155 }
157 if (!source.dataAvailable()) {
158 Logger.info(LOG_TAG, "No data available: short-circuiting flow from source " + source);
159 long now = System.currentTimeMillis();
160 this.delegate.onFlowCompleted(this, now, now);
161 return;
162 }
164 sink.setStoreDelegate(this);
165 numFetched.set(0);
166 numFetchFailed.set(0);
167 numStored.set(0);
168 numStoreFailed.set(0);
169 // Start a consumer thread.
170 this.consumer = new ConcurrentRecordConsumer(this);
171 ThreadPool.run(this.consumer);
172 waitingForQueueDone = true;
173 source.fetchSince(source.getLastSyncTimestamp(), this);
174 }
176 /**
177 * Begin both sessions, invoking flow() when done.
178 * @throws InvalidSessionTransitionException
179 */
180 public void beginAndFlow() throws InvalidSessionTransitionException {
181 Logger.trace(LOG_TAG, "Beginning source.");
182 source.begin(this);
183 }
185 @Override
186 public void store(Record record) {
187 numStored.incrementAndGet();
188 try {
189 sink.store(record);
190 } catch (NoStoreDelegateException e) {
191 Logger.error(LOG_TAG, "Got NoStoreDelegateException in RecordsChannel.store(). This should not occur. Aborting.", e);
192 delegate.onFlowStoreFailed(this, e, record.guid);
193 }
194 }
196 @Override
197 public void onFetchFailed(Exception ex, Record record) {
198 Logger.warn(LOG_TAG, "onFetchFailed. Calling for immediate stop.", ex);
199 numFetchFailed.incrementAndGet();
200 this.consumer.halt();
201 delegate.onFlowFetchFailed(this, ex);
202 }
204 @Override
205 public void onFetchedRecord(Record record) {
206 numFetched.incrementAndGet();
207 this.toProcess.add(record);
208 this.consumer.doNotify();
209 }
211 @Override
212 public void onFetchCompleted(final long fetchEnd) {
213 Logger.trace(LOG_TAG, "onFetchCompleted. Stopping consumer once stores are done.");
214 Logger.trace(LOG_TAG, "Fetch timestamp is " + fetchEnd);
215 this.fetchEnd = fetchEnd;
216 this.consumer.queueFilled();
217 }
219 @Override
220 public void onRecordStoreFailed(Exception ex, String recordGuid) {
221 Logger.trace(LOG_TAG, "Failed to store record with guid " + recordGuid);
222 numStoreFailed.incrementAndGet();
223 this.consumer.stored();
224 delegate.onFlowStoreFailed(this, ex, recordGuid);
225 // TODO: abort?
226 }
228 @Override
229 public void onRecordStoreSucceeded(String guid) {
230 Logger.trace(LOG_TAG, "Stored record with guid " + guid);
231 this.consumer.stored();
232 }
235 @Override
236 public void consumerIsDone(boolean allRecordsQueued) {
237 Logger.trace(LOG_TAG, "Consumer is done. Are we waiting for it? " + waitingForQueueDone);
238 if (waitingForQueueDone) {
239 waitingForQueueDone = false;
240 this.sink.storeDone(); // Now we'll be waiting for onStoreCompleted.
241 }
242 }
244 @Override
245 public void onStoreCompleted(long storeEnd) {
246 Logger.trace(LOG_TAG, "onStoreCompleted. Notifying delegate of onFlowCompleted. " +
247 "Fetch end is " + fetchEnd + ", store end is " + storeEnd);
248 // TODO: synchronize on consumer callback?
249 delegate.onFlowCompleted(this, fetchEnd, storeEnd);
250 }
252 @Override
253 public void onBeginFailed(Exception ex) {
254 delegate.onFlowBeginFailed(this, ex);
255 }
257 @Override
258 public void onBeginSucceeded(RepositorySession session) {
259 if (session == source) {
260 Logger.trace(LOG_TAG, "Source session began. Beginning sink session.");
261 try {
262 sink.begin(this);
263 } catch (InvalidSessionTransitionException e) {
264 onBeginFailed(e);
265 return;
266 }
267 }
268 if (session == sink) {
269 Logger.trace(LOG_TAG, "Sink session began. Beginning flow.");
270 this.flow();
271 return;
272 }
274 // TODO: error!
275 }
277 @Override
278 public RepositorySessionStoreDelegate deferredStoreDelegate(final ExecutorService executor) {
279 return new DeferredRepositorySessionStoreDelegate(this, executor);
280 }
282 @Override
283 public RepositorySessionBeginDelegate deferredBeginDelegate(final ExecutorService executor) {
284 return new DeferredRepositorySessionBeginDelegate(this, executor);
285 }
287 @Override
288 public RepositorySessionFetchRecordsDelegate deferredFetchDelegate(ExecutorService executor) {
289 // Lie outright. We know that all of our fetch methods are safe.
290 return this;
291 }
292 }