mobile/android/base/sync/synchronizer/SerialRecordConsumer.java

Wed, 31 Dec 2014 07:22:50 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Wed, 31 Dec 2014 07:22:50 +0100
branch
TOR_BUG_3246
changeset 4
fc2d59ddac77
permissions
-rw-r--r--

Correct previous dual key logic pending first delivery installment.

     1 /* This Source Code Form is subject to the terms of the Mozilla Public
     2  * License, v. 2.0. If a copy of the MPL was not distributed with this
     3  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
     5 package org.mozilla.gecko.sync.synchronizer;
     7 import org.mozilla.gecko.background.common.log.Logger;
     8 import org.mozilla.gecko.sync.repositories.domain.Record;
    10 /**
    11  * Consume records from a queue inside a RecordsChannel, storing them serially.
    12  * @author rnewman
    13  *
    14  */
    15 class SerialRecordConsumer extends RecordConsumer {
    16   private static final String LOG_TAG = "SerialRecordConsumer";
    17   protected boolean stopEventually = false;
    18   private volatile long counter = 0;
    20   public SerialRecordConsumer(RecordsConsumerDelegate delegate) {
    21     this.delegate = delegate;
    22   }
    24   private Object monitor = new Object();
    25   @Override
    26   public void doNotify() {
    27     synchronized (monitor) {
    28       monitor.notify();
    29     }
    30   }
    32   @Override
    33   public void queueFilled() {
    34     Logger.debug(LOG_TAG, "Queue filled.");
    35     synchronized (monitor) {
    36       this.stopEventually = true;
    37       monitor.notify();
    38     }
    39   }
    41   @Override
    42   public void halt() {
    43     Logger.debug(LOG_TAG, "Halting.");
    44     synchronized (monitor) {
    45       this.stopEventually = true;
    46       this.stopImmediately = true;
    47       monitor.notify();
    48     }
    49   }
    51   private Object storeSerializer = new Object();
    52   @Override
    53   public void stored() {
    54     Logger.debug(LOG_TAG, "Record stored. Notifying.");
    55     synchronized (storeSerializer) {
    56       Logger.debug(LOG_TAG, "stored() took storeSerializer.");
    57       counter++;
    58       storeSerializer.notify();
    59       Logger.debug(LOG_TAG, "stored() dropped storeSerializer.");
    60     }
    61   }
    62   private void storeSerially(Record record) {
    63     Logger.debug(LOG_TAG, "New record to store.");
    64     synchronized (storeSerializer) {
    65       Logger.debug(LOG_TAG, "storeSerially() took storeSerializer.");
    66       Logger.debug(LOG_TAG, "Storing...");
    67       try {
    68         this.delegate.store(record);
    69       } catch (Exception e) {
    70         Logger.warn(LOG_TAG, "Got exception in store. Not waiting.", e);
    71         return;      // So we don't block for a stored() that never comes.
    72       }
    73       try {
    74         Logger.debug(LOG_TAG, "Waiting...");
    75         storeSerializer.wait();
    76       } catch (InterruptedException e) {
    77         // TODO
    78       }
    79       Logger.debug(LOG_TAG, "storeSerially() dropped storeSerializer.");
    80     }
    81   }
    83   private void consumerIsDone() {
    84     long counterNow = this.counter;
    85     Logger.info(LOG_TAG, "Consumer is done. Processed " + counterNow + ((counterNow == 1) ? " record." : " records."));
    86     delegate.consumerIsDone(stopImmediately);
    87   }
    89   @Override
    90   public void run() {
    91     while (true) {
    92       synchronized (monitor) {
    93         Logger.debug(LOG_TAG, "run() took monitor.");
    94         if (stopImmediately) {
    95           Logger.debug(LOG_TAG, "Stopping immediately. Clearing queue.");
    96           delegate.getQueue().clear();
    97           Logger.debug(LOG_TAG, "Notifying consumer.");
    98           consumerIsDone();
    99           return;
   100         }
   101         Logger.debug(LOG_TAG, "run() dropped monitor.");
   102       }
   103       // The queue is concurrent-safe.
   104       while (!delegate.getQueue().isEmpty()) {
   105         Logger.debug(LOG_TAG, "Grabbing record...");
   106         Record record = delegate.getQueue().remove();
   107         // Block here, allowing us to process records
   108         // serially.
   109         Logger.debug(LOG_TAG, "Invoking storeSerially...");
   110         this.storeSerially(record);
   111         Logger.debug(LOG_TAG, "Done with record.");
   112       }
   113       synchronized (monitor) {
   114         Logger.debug(LOG_TAG, "run() took monitor.");
   116         if (stopEventually) {
   117           Logger.debug(LOG_TAG, "Done with records and told to stop. Notifying consumer.");
   118           consumerIsDone();
   119           return;
   120         }
   121         try {
   122           Logger.debug(LOG_TAG, "Not told to stop but no records. Waiting.");
   123           monitor.wait(10000);
   124         } catch (InterruptedException e) {
   125           // TODO
   126         }
   127         Logger.debug(LOG_TAG, "run() dropped monitor.");
   128       }
   129     }
   130   }
   131 }

mercurial