|
1 /* This Source Code Form is subject to the terms of the Mozilla Public |
|
2 * License, v. 2.0. If a copy of the MPL was not distributed with this |
|
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ |
|
4 |
|
5 package org.mozilla.gecko.sync.synchronizer; |
|
6 |
|
7 import org.mozilla.gecko.background.common.log.Logger; |
|
8 import org.mozilla.gecko.sync.repositories.domain.Record; |
|
9 |
|
10 /** |
|
11 * Consume records from a queue inside a RecordsChannel, storing them serially. |
|
12 * @author rnewman |
|
13 * |
|
14 */ |
|
15 class SerialRecordConsumer extends RecordConsumer { |
|
16 private static final String LOG_TAG = "SerialRecordConsumer"; |
|
17 protected boolean stopEventually = false; |
|
18 private volatile long counter = 0; |
|
19 |
|
20 public SerialRecordConsumer(RecordsConsumerDelegate delegate) { |
|
21 this.delegate = delegate; |
|
22 } |
|
23 |
|
24 private Object monitor = new Object(); |
|
25 @Override |
|
26 public void doNotify() { |
|
27 synchronized (monitor) { |
|
28 monitor.notify(); |
|
29 } |
|
30 } |
|
31 |
|
32 @Override |
|
33 public void queueFilled() { |
|
34 Logger.debug(LOG_TAG, "Queue filled."); |
|
35 synchronized (monitor) { |
|
36 this.stopEventually = true; |
|
37 monitor.notify(); |
|
38 } |
|
39 } |
|
40 |
|
41 @Override |
|
42 public void halt() { |
|
43 Logger.debug(LOG_TAG, "Halting."); |
|
44 synchronized (monitor) { |
|
45 this.stopEventually = true; |
|
46 this.stopImmediately = true; |
|
47 monitor.notify(); |
|
48 } |
|
49 } |
|
50 |
|
51 private Object storeSerializer = new Object(); |
|
52 @Override |
|
53 public void stored() { |
|
54 Logger.debug(LOG_TAG, "Record stored. Notifying."); |
|
55 synchronized (storeSerializer) { |
|
56 Logger.debug(LOG_TAG, "stored() took storeSerializer."); |
|
57 counter++; |
|
58 storeSerializer.notify(); |
|
59 Logger.debug(LOG_TAG, "stored() dropped storeSerializer."); |
|
60 } |
|
61 } |
|
62 private void storeSerially(Record record) { |
|
63 Logger.debug(LOG_TAG, "New record to store."); |
|
64 synchronized (storeSerializer) { |
|
65 Logger.debug(LOG_TAG, "storeSerially() took storeSerializer."); |
|
66 Logger.debug(LOG_TAG, "Storing..."); |
|
67 try { |
|
68 this.delegate.store(record); |
|
69 } catch (Exception e) { |
|
70 Logger.warn(LOG_TAG, "Got exception in store. Not waiting.", e); |
|
71 return; // So we don't block for a stored() that never comes. |
|
72 } |
|
73 try { |
|
74 Logger.debug(LOG_TAG, "Waiting..."); |
|
75 storeSerializer.wait(); |
|
76 } catch (InterruptedException e) { |
|
77 // TODO |
|
78 } |
|
79 Logger.debug(LOG_TAG, "storeSerially() dropped storeSerializer."); |
|
80 } |
|
81 } |
|
82 |
|
83 private void consumerIsDone() { |
|
84 long counterNow = this.counter; |
|
85 Logger.info(LOG_TAG, "Consumer is done. Processed " + counterNow + ((counterNow == 1) ? " record." : " records.")); |
|
86 delegate.consumerIsDone(stopImmediately); |
|
87 } |
|
88 |
|
89 @Override |
|
90 public void run() { |
|
91 while (true) { |
|
92 synchronized (monitor) { |
|
93 Logger.debug(LOG_TAG, "run() took monitor."); |
|
94 if (stopImmediately) { |
|
95 Logger.debug(LOG_TAG, "Stopping immediately. Clearing queue."); |
|
96 delegate.getQueue().clear(); |
|
97 Logger.debug(LOG_TAG, "Notifying consumer."); |
|
98 consumerIsDone(); |
|
99 return; |
|
100 } |
|
101 Logger.debug(LOG_TAG, "run() dropped monitor."); |
|
102 } |
|
103 // The queue is concurrent-safe. |
|
104 while (!delegate.getQueue().isEmpty()) { |
|
105 Logger.debug(LOG_TAG, "Grabbing record..."); |
|
106 Record record = delegate.getQueue().remove(); |
|
107 // Block here, allowing us to process records |
|
108 // serially. |
|
109 Logger.debug(LOG_TAG, "Invoking storeSerially..."); |
|
110 this.storeSerially(record); |
|
111 Logger.debug(LOG_TAG, "Done with record."); |
|
112 } |
|
113 synchronized (monitor) { |
|
114 Logger.debug(LOG_TAG, "run() took monitor."); |
|
115 |
|
116 if (stopEventually) { |
|
117 Logger.debug(LOG_TAG, "Done with records and told to stop. Notifying consumer."); |
|
118 consumerIsDone(); |
|
119 return; |
|
120 } |
|
121 try { |
|
122 Logger.debug(LOG_TAG, "Not told to stop but no records. Waiting."); |
|
123 monitor.wait(10000); |
|
124 } catch (InterruptedException e) { |
|
125 // TODO |
|
126 } |
|
127 Logger.debug(LOG_TAG, "run() dropped monitor."); |
|
128 } |
|
129 } |
|
130 } |
|
131 } |