michael@0: /* This Source Code Form is subject to the terms of the Mozilla Public michael@0: * License, v. 2.0. If a copy of the MPL was not distributed with this michael@0: * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ michael@0: michael@0: package org.mozilla.gecko.sync.synchronizer; michael@0: michael@0: import org.mozilla.gecko.background.common.log.Logger; michael@0: import org.mozilla.gecko.sync.repositories.domain.Record; michael@0: michael@0: /** michael@0: * Consume records from a queue inside a RecordsChannel, storing them serially. michael@0: * @author rnewman michael@0: * michael@0: */ michael@0: class SerialRecordConsumer extends RecordConsumer { michael@0: private static final String LOG_TAG = "SerialRecordConsumer"; michael@0: protected boolean stopEventually = false; michael@0: private volatile long counter = 0; michael@0: michael@0: public SerialRecordConsumer(RecordsConsumerDelegate delegate) { michael@0: this.delegate = delegate; michael@0: } michael@0: michael@0: private Object monitor = new Object(); michael@0: @Override michael@0: public void doNotify() { michael@0: synchronized (monitor) { michael@0: monitor.notify(); michael@0: } michael@0: } michael@0: michael@0: @Override michael@0: public void queueFilled() { michael@0: Logger.debug(LOG_TAG, "Queue filled."); michael@0: synchronized (monitor) { michael@0: this.stopEventually = true; michael@0: monitor.notify(); michael@0: } michael@0: } michael@0: michael@0: @Override michael@0: public void halt() { michael@0: Logger.debug(LOG_TAG, "Halting."); michael@0: synchronized (monitor) { michael@0: this.stopEventually = true; michael@0: this.stopImmediately = true; michael@0: monitor.notify(); michael@0: } michael@0: } michael@0: michael@0: private Object storeSerializer = new Object(); michael@0: @Override michael@0: public void stored() { michael@0: Logger.debug(LOG_TAG, "Record stored. Notifying."); michael@0: synchronized (storeSerializer) { michael@0: Logger.debug(LOG_TAG, "stored() took storeSerializer."); michael@0: counter++; michael@0: storeSerializer.notify(); michael@0: Logger.debug(LOG_TAG, "stored() dropped storeSerializer."); michael@0: } michael@0: } michael@0: private void storeSerially(Record record) { michael@0: Logger.debug(LOG_TAG, "New record to store."); michael@0: synchronized (storeSerializer) { michael@0: Logger.debug(LOG_TAG, "storeSerially() took storeSerializer."); michael@0: Logger.debug(LOG_TAG, "Storing..."); michael@0: try { michael@0: this.delegate.store(record); michael@0: } catch (Exception e) { michael@0: Logger.warn(LOG_TAG, "Got exception in store. Not waiting.", e); michael@0: return; // So we don't block for a stored() that never comes. michael@0: } michael@0: try { michael@0: Logger.debug(LOG_TAG, "Waiting..."); michael@0: storeSerializer.wait(); michael@0: } catch (InterruptedException e) { michael@0: // TODO michael@0: } michael@0: Logger.debug(LOG_TAG, "storeSerially() dropped storeSerializer."); michael@0: } michael@0: } michael@0: michael@0: private void consumerIsDone() { michael@0: long counterNow = this.counter; michael@0: Logger.info(LOG_TAG, "Consumer is done. Processed " + counterNow + ((counterNow == 1) ? " record." : " records.")); michael@0: delegate.consumerIsDone(stopImmediately); michael@0: } michael@0: michael@0: @Override michael@0: public void run() { michael@0: while (true) { michael@0: synchronized (monitor) { michael@0: Logger.debug(LOG_TAG, "run() took monitor."); michael@0: if (stopImmediately) { michael@0: Logger.debug(LOG_TAG, "Stopping immediately. Clearing queue."); michael@0: delegate.getQueue().clear(); michael@0: Logger.debug(LOG_TAG, "Notifying consumer."); michael@0: consumerIsDone(); michael@0: return; michael@0: } michael@0: Logger.debug(LOG_TAG, "run() dropped monitor."); michael@0: } michael@0: // The queue is concurrent-safe. michael@0: while (!delegate.getQueue().isEmpty()) { michael@0: Logger.debug(LOG_TAG, "Grabbing record..."); michael@0: Record record = delegate.getQueue().remove(); michael@0: // Block here, allowing us to process records michael@0: // serially. michael@0: Logger.debug(LOG_TAG, "Invoking storeSerially..."); michael@0: this.storeSerially(record); michael@0: Logger.debug(LOG_TAG, "Done with record."); michael@0: } michael@0: synchronized (monitor) { michael@0: Logger.debug(LOG_TAG, "run() took monitor."); michael@0: michael@0: if (stopEventually) { michael@0: Logger.debug(LOG_TAG, "Done with records and told to stop. Notifying consumer."); michael@0: consumerIsDone(); michael@0: return; michael@0: } michael@0: try { michael@0: Logger.debug(LOG_TAG, "Not told to stop but no records. Waiting."); michael@0: monitor.wait(10000); michael@0: } catch (InterruptedException e) { michael@0: // TODO michael@0: } michael@0: Logger.debug(LOG_TAG, "run() dropped monitor."); michael@0: } michael@0: } michael@0: } michael@0: }