mobile/android/base/sync/synchronizer/SerialRecordConsumer.java

Wed, 31 Dec 2014 07:22:50 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Wed, 31 Dec 2014 07:22:50 +0100
branch
TOR_BUG_3246
changeset 4
fc2d59ddac77
permissions
-rw-r--r--

Correct previous dual key logic pending first delivery installment.

michael@0 1 /* This Source Code Form is subject to the terms of the Mozilla Public
michael@0 2 * License, v. 2.0. If a copy of the MPL was not distributed with this
michael@0 3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
michael@0 4
michael@0 5 package org.mozilla.gecko.sync.synchronizer;
michael@0 6
michael@0 7 import org.mozilla.gecko.background.common.log.Logger;
michael@0 8 import org.mozilla.gecko.sync.repositories.domain.Record;
michael@0 9
michael@0 10 /**
michael@0 11 * Consume records from a queue inside a RecordsChannel, storing them serially.
michael@0 12 * @author rnewman
michael@0 13 *
michael@0 14 */
michael@0 15 class SerialRecordConsumer extends RecordConsumer {
michael@0 16 private static final String LOG_TAG = "SerialRecordConsumer";
michael@0 17 protected boolean stopEventually = false;
michael@0 18 private volatile long counter = 0;
michael@0 19
michael@0 20 public SerialRecordConsumer(RecordsConsumerDelegate delegate) {
michael@0 21 this.delegate = delegate;
michael@0 22 }
michael@0 23
michael@0 24 private Object monitor = new Object();
michael@0 25 @Override
michael@0 26 public void doNotify() {
michael@0 27 synchronized (monitor) {
michael@0 28 monitor.notify();
michael@0 29 }
michael@0 30 }
michael@0 31
michael@0 32 @Override
michael@0 33 public void queueFilled() {
michael@0 34 Logger.debug(LOG_TAG, "Queue filled.");
michael@0 35 synchronized (monitor) {
michael@0 36 this.stopEventually = true;
michael@0 37 monitor.notify();
michael@0 38 }
michael@0 39 }
michael@0 40
michael@0 41 @Override
michael@0 42 public void halt() {
michael@0 43 Logger.debug(LOG_TAG, "Halting.");
michael@0 44 synchronized (monitor) {
michael@0 45 this.stopEventually = true;
michael@0 46 this.stopImmediately = true;
michael@0 47 monitor.notify();
michael@0 48 }
michael@0 49 }
michael@0 50
michael@0 51 private Object storeSerializer = new Object();
michael@0 52 @Override
michael@0 53 public void stored() {
michael@0 54 Logger.debug(LOG_TAG, "Record stored. Notifying.");
michael@0 55 synchronized (storeSerializer) {
michael@0 56 Logger.debug(LOG_TAG, "stored() took storeSerializer.");
michael@0 57 counter++;
michael@0 58 storeSerializer.notify();
michael@0 59 Logger.debug(LOG_TAG, "stored() dropped storeSerializer.");
michael@0 60 }
michael@0 61 }
michael@0 62 private void storeSerially(Record record) {
michael@0 63 Logger.debug(LOG_TAG, "New record to store.");
michael@0 64 synchronized (storeSerializer) {
michael@0 65 Logger.debug(LOG_TAG, "storeSerially() took storeSerializer.");
michael@0 66 Logger.debug(LOG_TAG, "Storing...");
michael@0 67 try {
michael@0 68 this.delegate.store(record);
michael@0 69 } catch (Exception e) {
michael@0 70 Logger.warn(LOG_TAG, "Got exception in store. Not waiting.", e);
michael@0 71 return; // So we don't block for a stored() that never comes.
michael@0 72 }
michael@0 73 try {
michael@0 74 Logger.debug(LOG_TAG, "Waiting...");
michael@0 75 storeSerializer.wait();
michael@0 76 } catch (InterruptedException e) {
michael@0 77 // TODO
michael@0 78 }
michael@0 79 Logger.debug(LOG_TAG, "storeSerially() dropped storeSerializer.");
michael@0 80 }
michael@0 81 }
michael@0 82
michael@0 83 private void consumerIsDone() {
michael@0 84 long counterNow = this.counter;
michael@0 85 Logger.info(LOG_TAG, "Consumer is done. Processed " + counterNow + ((counterNow == 1) ? " record." : " records."));
michael@0 86 delegate.consumerIsDone(stopImmediately);
michael@0 87 }
michael@0 88
michael@0 89 @Override
michael@0 90 public void run() {
michael@0 91 while (true) {
michael@0 92 synchronized (monitor) {
michael@0 93 Logger.debug(LOG_TAG, "run() took monitor.");
michael@0 94 if (stopImmediately) {
michael@0 95 Logger.debug(LOG_TAG, "Stopping immediately. Clearing queue.");
michael@0 96 delegate.getQueue().clear();
michael@0 97 Logger.debug(LOG_TAG, "Notifying consumer.");
michael@0 98 consumerIsDone();
michael@0 99 return;
michael@0 100 }
michael@0 101 Logger.debug(LOG_TAG, "run() dropped monitor.");
michael@0 102 }
michael@0 103 // The queue is concurrent-safe.
michael@0 104 while (!delegate.getQueue().isEmpty()) {
michael@0 105 Logger.debug(LOG_TAG, "Grabbing record...");
michael@0 106 Record record = delegate.getQueue().remove();
michael@0 107 // Block here, allowing us to process records
michael@0 108 // serially.
michael@0 109 Logger.debug(LOG_TAG, "Invoking storeSerially...");
michael@0 110 this.storeSerially(record);
michael@0 111 Logger.debug(LOG_TAG, "Done with record.");
michael@0 112 }
michael@0 113 synchronized (monitor) {
michael@0 114 Logger.debug(LOG_TAG, "run() took monitor.");
michael@0 115
michael@0 116 if (stopEventually) {
michael@0 117 Logger.debug(LOG_TAG, "Done with records and told to stop. Notifying consumer.");
michael@0 118 consumerIsDone();
michael@0 119 return;
michael@0 120 }
michael@0 121 try {
michael@0 122 Logger.debug(LOG_TAG, "Not told to stop but no records. Waiting.");
michael@0 123 monitor.wait(10000);
michael@0 124 } catch (InterruptedException e) {
michael@0 125 // TODO
michael@0 126 }
michael@0 127 Logger.debug(LOG_TAG, "run() dropped monitor.");
michael@0 128 }
michael@0 129 }
michael@0 130 }
michael@0 131 }

mercurial