|
1 /* This Source Code Form is subject to the terms of the Mozilla Public |
|
2 * License, v. 2.0. If a copy of the MPL was not distributed with this |
|
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ |
|
4 |
|
5 package org.mozilla.gecko.sync.synchronizer; |
|
6 |
|
7 import org.mozilla.gecko.background.common.log.Logger; |
|
8 import org.mozilla.gecko.sync.repositories.domain.Record; |
|
9 |
|
10 /** |
|
11 * Consume records from a queue inside a RecordsChannel, as fast as we can. |
|
12 * TODO: rewrite this in terms of an ExecutorService and a CompletionService. |
|
13 * See Bug 713483. |
|
14 * |
|
15 * @author rnewman |
|
16 * |
|
17 */ |
|
18 class ConcurrentRecordConsumer extends RecordConsumer { |
|
19 private static final String LOG_TAG = "CRecordConsumer"; |
|
20 |
|
21 /** |
|
22 * When this is true and all records have been processed, the consumer |
|
23 * will notify its delegate. |
|
24 */ |
|
25 protected boolean allRecordsQueued = false; |
|
26 private long counter = 0; |
|
27 |
|
28 public ConcurrentRecordConsumer(RecordsConsumerDelegate delegate) { |
|
29 this.delegate = delegate; |
|
30 } |
|
31 |
|
32 private Object monitor = new Object(); |
|
33 @Override |
|
34 public void doNotify() { |
|
35 synchronized (monitor) { |
|
36 monitor.notify(); |
|
37 } |
|
38 } |
|
39 |
|
40 @Override |
|
41 public void queueFilled() { |
|
42 Logger.debug(LOG_TAG, "Queue filled."); |
|
43 synchronized (monitor) { |
|
44 this.allRecordsQueued = true; |
|
45 monitor.notify(); |
|
46 } |
|
47 } |
|
48 |
|
49 @Override |
|
50 public void halt() { |
|
51 synchronized (monitor) { |
|
52 this.stopImmediately = true; |
|
53 monitor.notify(); |
|
54 } |
|
55 } |
|
56 |
|
57 private Object countMonitor = new Object(); |
|
58 @Override |
|
59 public void stored() { |
|
60 Logger.trace(LOG_TAG, "Record stored. Notifying."); |
|
61 synchronized (countMonitor) { |
|
62 counter++; |
|
63 } |
|
64 } |
|
65 |
|
66 private void consumerIsDone() { |
|
67 Logger.debug(LOG_TAG, "Consumer is done. Processed " + counter + ((counter == 1) ? " record." : " records.")); |
|
68 delegate.consumerIsDone(!allRecordsQueued); |
|
69 } |
|
70 |
|
71 @Override |
|
72 public void run() { |
|
73 Record record; |
|
74 |
|
75 while (true) { |
|
76 // The queue is concurrent-safe. |
|
77 while ((record = delegate.getQueue().poll()) != null) { |
|
78 synchronized (monitor) { |
|
79 Logger.trace(LOG_TAG, "run() took monitor."); |
|
80 if (stopImmediately) { |
|
81 Logger.debug(LOG_TAG, "Stopping immediately. Clearing queue."); |
|
82 delegate.getQueue().clear(); |
|
83 Logger.debug(LOG_TAG, "Notifying consumer."); |
|
84 consumerIsDone(); |
|
85 return; |
|
86 } |
|
87 Logger.debug(LOG_TAG, "run() dropped monitor."); |
|
88 } |
|
89 |
|
90 Logger.trace(LOG_TAG, "Storing record with guid " + record.guid + "."); |
|
91 try { |
|
92 delegate.store(record); |
|
93 } catch (Exception e) { |
|
94 // TODO: Bug 709371: track records that failed to apply. |
|
95 Logger.error(LOG_TAG, "Caught error in store.", e); |
|
96 } |
|
97 Logger.trace(LOG_TAG, "Done with record."); |
|
98 } |
|
99 synchronized (monitor) { |
|
100 Logger.trace(LOG_TAG, "run() took monitor."); |
|
101 |
|
102 if (allRecordsQueued) { |
|
103 Logger.debug(LOG_TAG, "Done with records and no more to come. Notifying consumerIsDone."); |
|
104 consumerIsDone(); |
|
105 return; |
|
106 } |
|
107 if (stopImmediately) { |
|
108 Logger.debug(LOG_TAG, "Done with records and told to stop immediately. Notifying consumerIsDone."); |
|
109 consumerIsDone(); |
|
110 return; |
|
111 } |
|
112 try { |
|
113 Logger.debug(LOG_TAG, "Not told to stop but no records. Waiting."); |
|
114 monitor.wait(10000); |
|
115 } catch (InterruptedException e) { |
|
116 // TODO |
|
117 } |
|
118 Logger.trace(LOG_TAG, "run() dropped monitor."); |
|
119 } |
|
120 } |
|
121 } |
|
122 } |