|
1 /* This Source Code Form is subject to the terms of the Mozilla Public |
|
2 * License, v. 2.0. If a copy of the MPL was not distributed with this |
|
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ |
|
4 |
|
5 package org.mozilla.gecko.sync.synchronizer; |
|
6 |
|
7 import java.util.concurrent.ConcurrentLinkedQueue; |
|
8 import java.util.concurrent.ExecutorService; |
|
9 import java.util.concurrent.atomic.AtomicInteger; |
|
10 |
|
11 import org.mozilla.gecko.background.common.log.Logger; |
|
12 import org.mozilla.gecko.sync.ThreadPool; |
|
13 import org.mozilla.gecko.sync.repositories.InvalidSessionTransitionException; |
|
14 import org.mozilla.gecko.sync.repositories.NoStoreDelegateException; |
|
15 import org.mozilla.gecko.sync.repositories.RepositorySession; |
|
16 import org.mozilla.gecko.sync.repositories.delegates.DeferredRepositorySessionBeginDelegate; |
|
17 import org.mozilla.gecko.sync.repositories.delegates.DeferredRepositorySessionStoreDelegate; |
|
18 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionBeginDelegate; |
|
19 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate; |
|
20 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate; |
|
21 import org.mozilla.gecko.sync.repositories.domain.Record; |
|
22 |
|
23 /** |
|
24 * Pulls records from `source`, applying them to `sink`. |
|
25 * Notifies its delegate of errors and completion. |
|
26 * |
|
27 * All stores (initiated by a fetch) must have been completed before storeDone |
|
28 * is invoked on the sink. This is to avoid the existing stored items being |
|
29 * considered as the total set, with onStoreCompleted being called when they're |
|
30 * done: |
|
31 * |
|
32 * store(A) store(B) |
|
33 * store(C) storeDone() |
|
34 * store(A) finishes. Store job begins. |
|
35 * store(C) finishes. Store job begins. |
|
36 * storeDone() finishes. |
|
37 * Storing of A complete. |
|
38 * Storing of C complete. |
|
39 * We're done! Call onStoreCompleted. |
|
40 * store(B) finishes... uh oh. |
|
41 * |
|
42 * In other words, storeDone must be gated on the synchronous invocation of every store. |
|
43 * |
|
44 * Similarly, we require that every store callback have returned before onStoreCompleted is invoked. |
|
45 * |
|
46 * This whole set of guarantees should be achievable thusly: |
|
47 * |
|
48 * * The fetch process must run in a single thread, and invoke store() |
|
49 * synchronously. After processing every incoming record, storeDone is called, |
|
50 * setting a flag. |
|
51 * If the fetch cannot be implicitly queued, it must be explicitly queued. |
|
52 * In this implementation, we assume that fetch callbacks are strictly ordered in this way. |
|
53 * |
|
54 * * The store process must be (implicitly or explicitly) queued. When the |
|
55 * queue empties, the consumer checks the storeDone flag. If it's set, and the |
|
56 * queue is exhausted, invoke onStoreCompleted. |
|
57 * |
|
58 * RecordsChannel exists to enforce this ordering of operations. |
|
59 * |
|
60 * @author rnewman |
|
61 * |
|
62 */ |
|
63 public class RecordsChannel implements |
|
64 RepositorySessionFetchRecordsDelegate, |
|
65 RepositorySessionStoreDelegate, |
|
66 RecordsConsumerDelegate, |
|
67 RepositorySessionBeginDelegate { |
|
68 |
|
69 private static final String LOG_TAG = "RecordsChannel"; |
|
70 public RepositorySession source; |
|
71 public RepositorySession sink; |
|
72 private RecordsChannelDelegate delegate; |
|
73 private long fetchEnd = -1; |
|
74 |
|
75 protected final AtomicInteger numFetched = new AtomicInteger(); |
|
76 protected final AtomicInteger numFetchFailed = new AtomicInteger(); |
|
77 protected final AtomicInteger numStored = new AtomicInteger(); |
|
78 protected final AtomicInteger numStoreFailed = new AtomicInteger(); |
|
79 |
|
80 public RecordsChannel(RepositorySession source, RepositorySession sink, RecordsChannelDelegate delegate) { |
|
81 this.source = source; |
|
82 this.sink = sink; |
|
83 this.delegate = delegate; |
|
84 } |
|
85 |
|
86 /* |
|
87 * We push fetched records into a queue. |
|
88 * A separate thread is waiting for us to notify it of work to do. |
|
89 * When we tell it to stop, it'll stop. We do that when the fetch |
|
90 * is completed. |
|
91 * When it stops, we tell the sink that there are no more records, |
|
92 * and wait for the sink to tell us that storing is done. |
|
93 * Then we notify our delegate of completion. |
|
94 */ |
|
95 private RecordConsumer consumer; |
|
96 private boolean waitingForQueueDone = false; |
|
97 private ConcurrentLinkedQueue<Record> toProcess = new ConcurrentLinkedQueue<Record>(); |
|
98 |
|
99 @Override |
|
100 public ConcurrentLinkedQueue<Record> getQueue() { |
|
101 return toProcess; |
|
102 } |
|
103 |
|
104 protected boolean isReady() { |
|
105 return source.isActive() && sink.isActive(); |
|
106 } |
|
107 |
|
108 /** |
|
109 * Get the number of records fetched so far. |
|
110 * |
|
111 * @return number of fetches. |
|
112 */ |
|
113 public int getFetchCount() { |
|
114 return numFetched.get(); |
|
115 } |
|
116 |
|
117 /** |
|
118 * Get the number of fetch failures recorded so far. |
|
119 * |
|
120 * @return number of fetch failures. |
|
121 */ |
|
122 public int getFetchFailureCount() { |
|
123 return numFetchFailed.get(); |
|
124 } |
|
125 |
|
126 /** |
|
127 * Get the number of store attempts (successful or not) so far. |
|
128 * |
|
129 * @return number of stores attempted. |
|
130 */ |
|
131 public int getStoreCount() { |
|
132 return numStored.get(); |
|
133 } |
|
134 |
|
135 /** |
|
136 * Get the number of store failures recorded so far. |
|
137 * |
|
138 * @return number of store failures. |
|
139 */ |
|
140 public int getStoreFailureCount() { |
|
141 return numStoreFailed.get(); |
|
142 } |
|
143 |
|
144 /** |
|
145 * Start records flowing through the channel. |
|
146 */ |
|
147 public void flow() { |
|
148 if (!isReady()) { |
|
149 RepositorySession failed = source; |
|
150 if (source.isActive()) { |
|
151 failed = sink; |
|
152 } |
|
153 this.delegate.onFlowBeginFailed(this, new SessionNotBegunException(failed)); |
|
154 return; |
|
155 } |
|
156 |
|
157 if (!source.dataAvailable()) { |
|
158 Logger.info(LOG_TAG, "No data available: short-circuiting flow from source " + source); |
|
159 long now = System.currentTimeMillis(); |
|
160 this.delegate.onFlowCompleted(this, now, now); |
|
161 return; |
|
162 } |
|
163 |
|
164 sink.setStoreDelegate(this); |
|
165 numFetched.set(0); |
|
166 numFetchFailed.set(0); |
|
167 numStored.set(0); |
|
168 numStoreFailed.set(0); |
|
169 // Start a consumer thread. |
|
170 this.consumer = new ConcurrentRecordConsumer(this); |
|
171 ThreadPool.run(this.consumer); |
|
172 waitingForQueueDone = true; |
|
173 source.fetchSince(source.getLastSyncTimestamp(), this); |
|
174 } |
|
175 |
|
176 /** |
|
177 * Begin both sessions, invoking flow() when done. |
|
178 * @throws InvalidSessionTransitionException |
|
179 */ |
|
180 public void beginAndFlow() throws InvalidSessionTransitionException { |
|
181 Logger.trace(LOG_TAG, "Beginning source."); |
|
182 source.begin(this); |
|
183 } |
|
184 |
|
185 @Override |
|
186 public void store(Record record) { |
|
187 numStored.incrementAndGet(); |
|
188 try { |
|
189 sink.store(record); |
|
190 } catch (NoStoreDelegateException e) { |
|
191 Logger.error(LOG_TAG, "Got NoStoreDelegateException in RecordsChannel.store(). This should not occur. Aborting.", e); |
|
192 delegate.onFlowStoreFailed(this, e, record.guid); |
|
193 } |
|
194 } |
|
195 |
|
196 @Override |
|
197 public void onFetchFailed(Exception ex, Record record) { |
|
198 Logger.warn(LOG_TAG, "onFetchFailed. Calling for immediate stop.", ex); |
|
199 numFetchFailed.incrementAndGet(); |
|
200 this.consumer.halt(); |
|
201 delegate.onFlowFetchFailed(this, ex); |
|
202 } |
|
203 |
|
204 @Override |
|
205 public void onFetchedRecord(Record record) { |
|
206 numFetched.incrementAndGet(); |
|
207 this.toProcess.add(record); |
|
208 this.consumer.doNotify(); |
|
209 } |
|
210 |
|
211 @Override |
|
212 public void onFetchCompleted(final long fetchEnd) { |
|
213 Logger.trace(LOG_TAG, "onFetchCompleted. Stopping consumer once stores are done."); |
|
214 Logger.trace(LOG_TAG, "Fetch timestamp is " + fetchEnd); |
|
215 this.fetchEnd = fetchEnd; |
|
216 this.consumer.queueFilled(); |
|
217 } |
|
218 |
|
219 @Override |
|
220 public void onRecordStoreFailed(Exception ex, String recordGuid) { |
|
221 Logger.trace(LOG_TAG, "Failed to store record with guid " + recordGuid); |
|
222 numStoreFailed.incrementAndGet(); |
|
223 this.consumer.stored(); |
|
224 delegate.onFlowStoreFailed(this, ex, recordGuid); |
|
225 // TODO: abort? |
|
226 } |
|
227 |
|
228 @Override |
|
229 public void onRecordStoreSucceeded(String guid) { |
|
230 Logger.trace(LOG_TAG, "Stored record with guid " + guid); |
|
231 this.consumer.stored(); |
|
232 } |
|
233 |
|
234 |
|
235 @Override |
|
236 public void consumerIsDone(boolean allRecordsQueued) { |
|
237 Logger.trace(LOG_TAG, "Consumer is done. Are we waiting for it? " + waitingForQueueDone); |
|
238 if (waitingForQueueDone) { |
|
239 waitingForQueueDone = false; |
|
240 this.sink.storeDone(); // Now we'll be waiting for onStoreCompleted. |
|
241 } |
|
242 } |
|
243 |
|
244 @Override |
|
245 public void onStoreCompleted(long storeEnd) { |
|
246 Logger.trace(LOG_TAG, "onStoreCompleted. Notifying delegate of onFlowCompleted. " + |
|
247 "Fetch end is " + fetchEnd + ", store end is " + storeEnd); |
|
248 // TODO: synchronize on consumer callback? |
|
249 delegate.onFlowCompleted(this, fetchEnd, storeEnd); |
|
250 } |
|
251 |
|
252 @Override |
|
253 public void onBeginFailed(Exception ex) { |
|
254 delegate.onFlowBeginFailed(this, ex); |
|
255 } |
|
256 |
|
257 @Override |
|
258 public void onBeginSucceeded(RepositorySession session) { |
|
259 if (session == source) { |
|
260 Logger.trace(LOG_TAG, "Source session began. Beginning sink session."); |
|
261 try { |
|
262 sink.begin(this); |
|
263 } catch (InvalidSessionTransitionException e) { |
|
264 onBeginFailed(e); |
|
265 return; |
|
266 } |
|
267 } |
|
268 if (session == sink) { |
|
269 Logger.trace(LOG_TAG, "Sink session began. Beginning flow."); |
|
270 this.flow(); |
|
271 return; |
|
272 } |
|
273 |
|
274 // TODO: error! |
|
275 } |
|
276 |
|
277 @Override |
|
278 public RepositorySessionStoreDelegate deferredStoreDelegate(final ExecutorService executor) { |
|
279 return new DeferredRepositorySessionStoreDelegate(this, executor); |
|
280 } |
|
281 |
|
282 @Override |
|
283 public RepositorySessionBeginDelegate deferredBeginDelegate(final ExecutorService executor) { |
|
284 return new DeferredRepositorySessionBeginDelegate(this, executor); |
|
285 } |
|
286 |
|
287 @Override |
|
288 public RepositorySessionFetchRecordsDelegate deferredFetchDelegate(ExecutorService executor) { |
|
289 // Lie outright. We know that all of our fetch methods are safe. |
|
290 return this; |
|
291 } |
|
292 } |