1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 1.2 +++ b/mobile/android/base/sync/synchronizer/SerialRecordConsumer.java Wed Dec 31 06:09:35 2014 +0100 1.3 @@ -0,0 +1,131 @@ 1.4 +/* This Source Code Form is subject to the terms of the Mozilla Public 1.5 + * License, v. 2.0. If a copy of the MPL was not distributed with this 1.6 + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 1.7 + 1.8 +package org.mozilla.gecko.sync.synchronizer; 1.9 + 1.10 +import org.mozilla.gecko.background.common.log.Logger; 1.11 +import org.mozilla.gecko.sync.repositories.domain.Record; 1.12 + 1.13 +/** 1.14 + * Consume records from a queue inside a RecordsChannel, storing them serially. 1.15 + * @author rnewman 1.16 + * 1.17 + */ 1.18 +class SerialRecordConsumer extends RecordConsumer { 1.19 + private static final String LOG_TAG = "SerialRecordConsumer"; 1.20 + protected boolean stopEventually = false; 1.21 + private volatile long counter = 0; 1.22 + 1.23 + public SerialRecordConsumer(RecordsConsumerDelegate delegate) { 1.24 + this.delegate = delegate; 1.25 + } 1.26 + 1.27 + private Object monitor = new Object(); 1.28 + @Override 1.29 + public void doNotify() { 1.30 + synchronized (monitor) { 1.31 + monitor.notify(); 1.32 + } 1.33 + } 1.34 + 1.35 + @Override 1.36 + public void queueFilled() { 1.37 + Logger.debug(LOG_TAG, "Queue filled."); 1.38 + synchronized (monitor) { 1.39 + this.stopEventually = true; 1.40 + monitor.notify(); 1.41 + } 1.42 + } 1.43 + 1.44 + @Override 1.45 + public void halt() { 1.46 + Logger.debug(LOG_TAG, "Halting."); 1.47 + synchronized (monitor) { 1.48 + this.stopEventually = true; 1.49 + this.stopImmediately = true; 1.50 + monitor.notify(); 1.51 + } 1.52 + } 1.53 + 1.54 + private Object storeSerializer = new Object(); 1.55 + @Override 1.56 + public void stored() { 1.57 + Logger.debug(LOG_TAG, "Record stored. Notifying."); 1.58 + synchronized (storeSerializer) { 1.59 + Logger.debug(LOG_TAG, "stored() took storeSerializer."); 1.60 + counter++; 1.61 + storeSerializer.notify(); 1.62 + Logger.debug(LOG_TAG, "stored() dropped storeSerializer."); 1.63 + } 1.64 + } 1.65 + private void storeSerially(Record record) { 1.66 + Logger.debug(LOG_TAG, "New record to store."); 1.67 + synchronized (storeSerializer) { 1.68 + Logger.debug(LOG_TAG, "storeSerially() took storeSerializer."); 1.69 + Logger.debug(LOG_TAG, "Storing..."); 1.70 + try { 1.71 + this.delegate.store(record); 1.72 + } catch (Exception e) { 1.73 + Logger.warn(LOG_TAG, "Got exception in store. Not waiting.", e); 1.74 + return; // So we don't block for a stored() that never comes. 1.75 + } 1.76 + try { 1.77 + Logger.debug(LOG_TAG, "Waiting..."); 1.78 + storeSerializer.wait(); 1.79 + } catch (InterruptedException e) { 1.80 + // TODO 1.81 + } 1.82 + Logger.debug(LOG_TAG, "storeSerially() dropped storeSerializer."); 1.83 + } 1.84 + } 1.85 + 1.86 + private void consumerIsDone() { 1.87 + long counterNow = this.counter; 1.88 + Logger.info(LOG_TAG, "Consumer is done. Processed " + counterNow + ((counterNow == 1) ? " record." : " records.")); 1.89 + delegate.consumerIsDone(stopImmediately); 1.90 + } 1.91 + 1.92 + @Override 1.93 + public void run() { 1.94 + while (true) { 1.95 + synchronized (monitor) { 1.96 + Logger.debug(LOG_TAG, "run() took monitor."); 1.97 + if (stopImmediately) { 1.98 + Logger.debug(LOG_TAG, "Stopping immediately. Clearing queue."); 1.99 + delegate.getQueue().clear(); 1.100 + Logger.debug(LOG_TAG, "Notifying consumer."); 1.101 + consumerIsDone(); 1.102 + return; 1.103 + } 1.104 + Logger.debug(LOG_TAG, "run() dropped monitor."); 1.105 + } 1.106 + // The queue is concurrent-safe. 1.107 + while (!delegate.getQueue().isEmpty()) { 1.108 + Logger.debug(LOG_TAG, "Grabbing record..."); 1.109 + Record record = delegate.getQueue().remove(); 1.110 + // Block here, allowing us to process records 1.111 + // serially. 1.112 + Logger.debug(LOG_TAG, "Invoking storeSerially..."); 1.113 + this.storeSerially(record); 1.114 + Logger.debug(LOG_TAG, "Done with record."); 1.115 + } 1.116 + synchronized (monitor) { 1.117 + Logger.debug(LOG_TAG, "run() took monitor."); 1.118 + 1.119 + if (stopEventually) { 1.120 + Logger.debug(LOG_TAG, "Done with records and told to stop. Notifying consumer."); 1.121 + consumerIsDone(); 1.122 + return; 1.123 + } 1.124 + try { 1.125 + Logger.debug(LOG_TAG, "Not told to stop but no records. Waiting."); 1.126 + monitor.wait(10000); 1.127 + } catch (InterruptedException e) { 1.128 + // TODO 1.129 + } 1.130 + Logger.debug(LOG_TAG, "run() dropped monitor."); 1.131 + } 1.132 + } 1.133 + } 1.134 +}