Wed, 31 Dec 2014 07:22:50 +0100
Correct previous dual key logic pending first delivery installment.
1 /* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
5 package org.mozilla.gecko.sync.synchronizer;
7 import org.mozilla.gecko.background.common.log.Logger;
8 import org.mozilla.gecko.sync.repositories.domain.Record;
10 /**
11 * Consume records from a queue inside a RecordsChannel, as fast as we can.
12 * TODO: rewrite this in terms of an ExecutorService and a CompletionService.
13 * See Bug 713483.
14 *
15 * @author rnewman
16 *
17 */
18 class ConcurrentRecordConsumer extends RecordConsumer {
19 private static final String LOG_TAG = "CRecordConsumer";
21 /**
22 * When this is true and all records have been processed, the consumer
23 * will notify its delegate.
24 */
25 protected boolean allRecordsQueued = false;
26 private long counter = 0;
28 public ConcurrentRecordConsumer(RecordsConsumerDelegate delegate) {
29 this.delegate = delegate;
30 }
32 private Object monitor = new Object();
33 @Override
34 public void doNotify() {
35 synchronized (monitor) {
36 monitor.notify();
37 }
38 }
40 @Override
41 public void queueFilled() {
42 Logger.debug(LOG_TAG, "Queue filled.");
43 synchronized (monitor) {
44 this.allRecordsQueued = true;
45 monitor.notify();
46 }
47 }
49 @Override
50 public void halt() {
51 synchronized (monitor) {
52 this.stopImmediately = true;
53 monitor.notify();
54 }
55 }
57 private Object countMonitor = new Object();
58 @Override
59 public void stored() {
60 Logger.trace(LOG_TAG, "Record stored. Notifying.");
61 synchronized (countMonitor) {
62 counter++;
63 }
64 }
66 private void consumerIsDone() {
67 Logger.debug(LOG_TAG, "Consumer is done. Processed " + counter + ((counter == 1) ? " record." : " records."));
68 delegate.consumerIsDone(!allRecordsQueued);
69 }
71 @Override
72 public void run() {
73 Record record;
75 while (true) {
76 // The queue is concurrent-safe.
77 while ((record = delegate.getQueue().poll()) != null) {
78 synchronized (monitor) {
79 Logger.trace(LOG_TAG, "run() took monitor.");
80 if (stopImmediately) {
81 Logger.debug(LOG_TAG, "Stopping immediately. Clearing queue.");
82 delegate.getQueue().clear();
83 Logger.debug(LOG_TAG, "Notifying consumer.");
84 consumerIsDone();
85 return;
86 }
87 Logger.debug(LOG_TAG, "run() dropped monitor.");
88 }
90 Logger.trace(LOG_TAG, "Storing record with guid " + record.guid + ".");
91 try {
92 delegate.store(record);
93 } catch (Exception e) {
94 // TODO: Bug 709371: track records that failed to apply.
95 Logger.error(LOG_TAG, "Caught error in store.", e);
96 }
97 Logger.trace(LOG_TAG, "Done with record.");
98 }
99 synchronized (monitor) {
100 Logger.trace(LOG_TAG, "run() took monitor.");
102 if (allRecordsQueued) {
103 Logger.debug(LOG_TAG, "Done with records and no more to come. Notifying consumerIsDone.");
104 consumerIsDone();
105 return;
106 }
107 if (stopImmediately) {
108 Logger.debug(LOG_TAG, "Done with records and told to stop immediately. Notifying consumerIsDone.");
109 consumerIsDone();
110 return;
111 }
112 try {
113 Logger.debug(LOG_TAG, "Not told to stop but no records. Waiting.");
114 monitor.wait(10000);
115 } catch (InterruptedException e) {
116 // TODO
117 }
118 Logger.trace(LOG_TAG, "run() dropped monitor.");
119 }
120 }
121 }
122 }