mobile/android/base/sync/synchronizer/ConcurrentRecordConsumer.java

Wed, 31 Dec 2014 06:09:35 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Wed, 31 Dec 2014 06:09:35 +0100
changeset 0
6474c204b198
permissions
-rw-r--r--

Cloned upstream origin tor-browser at tor-browser-31.3.0esr-4.5-1-build1
revision ID fc1c9ff7c1b2defdbc039f12214767608f46423f for hacking purpose.

     1 /* This Source Code Form is subject to the terms of the Mozilla Public
     2  * License, v. 2.0. If a copy of the MPL was not distributed with this
     3  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
     5 package org.mozilla.gecko.sync.synchronizer;
     7 import org.mozilla.gecko.background.common.log.Logger;
     8 import org.mozilla.gecko.sync.repositories.domain.Record;
    10 /**
    11  * Consume records from a queue inside a RecordsChannel, as fast as we can.
    12  * TODO: rewrite this in terms of an ExecutorService and a CompletionService.
    13  * See Bug 713483.
    14  *
    15  * @author rnewman
    16  *
    17  */
    18 class ConcurrentRecordConsumer extends RecordConsumer {
    19   private static final String LOG_TAG = "CRecordConsumer";
    21   /**
    22    * When this is true and all records have been processed, the consumer
    23    * will notify its delegate.
    24    */
    25   protected boolean allRecordsQueued = false;
    26   private long counter = 0;
    28   public ConcurrentRecordConsumer(RecordsConsumerDelegate delegate) {
    29     this.delegate = delegate;
    30   }
    32   private Object monitor = new Object();
    33   @Override
    34   public void doNotify() {
    35     synchronized (monitor) {
    36       monitor.notify();
    37     }
    38   }
    40   @Override
    41   public void queueFilled() {
    42     Logger.debug(LOG_TAG, "Queue filled.");
    43     synchronized (monitor) {
    44       this.allRecordsQueued = true;
    45       monitor.notify();
    46     }
    47   }
    49   @Override
    50   public void halt() {
    51     synchronized (monitor) {
    52       this.stopImmediately = true;
    53       monitor.notify();
    54     }
    55   }
    57   private Object countMonitor = new Object();
    58   @Override
    59   public void stored() {
    60     Logger.trace(LOG_TAG, "Record stored. Notifying.");
    61     synchronized (countMonitor) {
    62       counter++;
    63     }
    64   }
    66   private void consumerIsDone() {
    67     Logger.debug(LOG_TAG, "Consumer is done. Processed " + counter + ((counter == 1) ? " record." : " records."));
    68     delegate.consumerIsDone(!allRecordsQueued);
    69   }
    71   @Override
    72   public void run() {
    73     Record record;
    75     while (true) {
    76       // The queue is concurrent-safe.
    77       while ((record = delegate.getQueue().poll()) != null) {
    78         synchronized (monitor) {
    79           Logger.trace(LOG_TAG, "run() took monitor.");
    80           if (stopImmediately) {
    81             Logger.debug(LOG_TAG, "Stopping immediately. Clearing queue.");
    82             delegate.getQueue().clear();
    83             Logger.debug(LOG_TAG, "Notifying consumer.");
    84             consumerIsDone();
    85             return;
    86           }
    87           Logger.debug(LOG_TAG, "run() dropped monitor.");
    88         }
    90         Logger.trace(LOG_TAG, "Storing record with guid " + record.guid + ".");
    91         try {
    92           delegate.store(record);
    93         } catch (Exception e) {
    94           // TODO: Bug 709371: track records that failed to apply.
    95           Logger.error(LOG_TAG, "Caught error in store.", e);
    96         }
    97         Logger.trace(LOG_TAG, "Done with record.");
    98       }
    99       synchronized (monitor) {
   100         Logger.trace(LOG_TAG, "run() took monitor.");
   102         if (allRecordsQueued) {
   103           Logger.debug(LOG_TAG, "Done with records and no more to come. Notifying consumerIsDone.");
   104           consumerIsDone();
   105           return;
   106         }
   107         if (stopImmediately) {
   108           Logger.debug(LOG_TAG, "Done with records and told to stop immediately. Notifying consumerIsDone.");
   109           consumerIsDone();
   110           return;
   111         }
   112         try {
   113           Logger.debug(LOG_TAG, "Not told to stop but no records. Waiting.");
   114           monitor.wait(10000);
   115         } catch (InterruptedException e) {
   116           // TODO
   117         }
   118         Logger.trace(LOG_TAG, "run() dropped monitor.");
   119       }
   120     }
   121   }
   122 }

mercurial