mobile/android/base/sync/synchronizer/ConcurrentRecordConsumer.java

Wed, 31 Dec 2014 07:22:50 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Wed, 31 Dec 2014 07:22:50 +0100
branch
TOR_BUG_3246
changeset 4
fc2d59ddac77
permissions
-rw-r--r--

Correct previous dual key logic pending first delivery installment.

michael@0 1 /* This Source Code Form is subject to the terms of the Mozilla Public
michael@0 2 * License, v. 2.0. If a copy of the MPL was not distributed with this
michael@0 3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
michael@0 4
michael@0 5 package org.mozilla.gecko.sync.synchronizer;
michael@0 6
michael@0 7 import org.mozilla.gecko.background.common.log.Logger;
michael@0 8 import org.mozilla.gecko.sync.repositories.domain.Record;
michael@0 9
michael@0 10 /**
michael@0 11 * Consume records from a queue inside a RecordsChannel, as fast as we can.
michael@0 12 * TODO: rewrite this in terms of an ExecutorService and a CompletionService.
michael@0 13 * See Bug 713483.
michael@0 14 *
michael@0 15 * @author rnewman
michael@0 16 *
michael@0 17 */
michael@0 18 class ConcurrentRecordConsumer extends RecordConsumer {
michael@0 19 private static final String LOG_TAG = "CRecordConsumer";
michael@0 20
michael@0 21 /**
michael@0 22 * When this is true and all records have been processed, the consumer
michael@0 23 * will notify its delegate.
michael@0 24 */
michael@0 25 protected boolean allRecordsQueued = false;
michael@0 26 private long counter = 0;
michael@0 27
michael@0 28 public ConcurrentRecordConsumer(RecordsConsumerDelegate delegate) {
michael@0 29 this.delegate = delegate;
michael@0 30 }
michael@0 31
michael@0 32 private Object monitor = new Object();
michael@0 33 @Override
michael@0 34 public void doNotify() {
michael@0 35 synchronized (monitor) {
michael@0 36 monitor.notify();
michael@0 37 }
michael@0 38 }
michael@0 39
michael@0 40 @Override
michael@0 41 public void queueFilled() {
michael@0 42 Logger.debug(LOG_TAG, "Queue filled.");
michael@0 43 synchronized (monitor) {
michael@0 44 this.allRecordsQueued = true;
michael@0 45 monitor.notify();
michael@0 46 }
michael@0 47 }
michael@0 48
michael@0 49 @Override
michael@0 50 public void halt() {
michael@0 51 synchronized (monitor) {
michael@0 52 this.stopImmediately = true;
michael@0 53 monitor.notify();
michael@0 54 }
michael@0 55 }
michael@0 56
michael@0 57 private Object countMonitor = new Object();
michael@0 58 @Override
michael@0 59 public void stored() {
michael@0 60 Logger.trace(LOG_TAG, "Record stored. Notifying.");
michael@0 61 synchronized (countMonitor) {
michael@0 62 counter++;
michael@0 63 }
michael@0 64 }
michael@0 65
michael@0 66 private void consumerIsDone() {
michael@0 67 Logger.debug(LOG_TAG, "Consumer is done. Processed " + counter + ((counter == 1) ? " record." : " records."));
michael@0 68 delegate.consumerIsDone(!allRecordsQueued);
michael@0 69 }
michael@0 70
michael@0 71 @Override
michael@0 72 public void run() {
michael@0 73 Record record;
michael@0 74
michael@0 75 while (true) {
michael@0 76 // The queue is concurrent-safe.
michael@0 77 while ((record = delegate.getQueue().poll()) != null) {
michael@0 78 synchronized (monitor) {
michael@0 79 Logger.trace(LOG_TAG, "run() took monitor.");
michael@0 80 if (stopImmediately) {
michael@0 81 Logger.debug(LOG_TAG, "Stopping immediately. Clearing queue.");
michael@0 82 delegate.getQueue().clear();
michael@0 83 Logger.debug(LOG_TAG, "Notifying consumer.");
michael@0 84 consumerIsDone();
michael@0 85 return;
michael@0 86 }
michael@0 87 Logger.debug(LOG_TAG, "run() dropped monitor.");
michael@0 88 }
michael@0 89
michael@0 90 Logger.trace(LOG_TAG, "Storing record with guid " + record.guid + ".");
michael@0 91 try {
michael@0 92 delegate.store(record);
michael@0 93 } catch (Exception e) {
michael@0 94 // TODO: Bug 709371: track records that failed to apply.
michael@0 95 Logger.error(LOG_TAG, "Caught error in store.", e);
michael@0 96 }
michael@0 97 Logger.trace(LOG_TAG, "Done with record.");
michael@0 98 }
michael@0 99 synchronized (monitor) {
michael@0 100 Logger.trace(LOG_TAG, "run() took monitor.");
michael@0 101
michael@0 102 if (allRecordsQueued) {
michael@0 103 Logger.debug(LOG_TAG, "Done with records and no more to come. Notifying consumerIsDone.");
michael@0 104 consumerIsDone();
michael@0 105 return;
michael@0 106 }
michael@0 107 if (stopImmediately) {
michael@0 108 Logger.debug(LOG_TAG, "Done with records and told to stop immediately. Notifying consumerIsDone.");
michael@0 109 consumerIsDone();
michael@0 110 return;
michael@0 111 }
michael@0 112 try {
michael@0 113 Logger.debug(LOG_TAG, "Not told to stop but no records. Waiting.");
michael@0 114 monitor.wait(10000);
michael@0 115 } catch (InterruptedException e) {
michael@0 116 // TODO
michael@0 117 }
michael@0 118 Logger.trace(LOG_TAG, "run() dropped monitor.");
michael@0 119 }
michael@0 120 }
michael@0 121 }
michael@0 122 }

mercurial