michael@0: /* This Source Code Form is subject to the terms of the Mozilla Public michael@0: * License, v. 2.0. If a copy of the MPL was not distributed with this michael@0: * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ michael@0: michael@0: package org.mozilla.gecko.sync.synchronizer; michael@0: michael@0: import org.mozilla.gecko.background.common.log.Logger; michael@0: import org.mozilla.gecko.sync.repositories.domain.Record; michael@0: michael@0: /** michael@0: * Consume records from a queue inside a RecordsChannel, as fast as we can. michael@0: * TODO: rewrite this in terms of an ExecutorService and a CompletionService. michael@0: * See Bug 713483. michael@0: * michael@0: * @author rnewman michael@0: * michael@0: */ michael@0: class ConcurrentRecordConsumer extends RecordConsumer { michael@0: private static final String LOG_TAG = "CRecordConsumer"; michael@0: michael@0: /** michael@0: * When this is true and all records have been processed, the consumer michael@0: * will notify its delegate. michael@0: */ michael@0: protected boolean allRecordsQueued = false; michael@0: private long counter = 0; michael@0: michael@0: public ConcurrentRecordConsumer(RecordsConsumerDelegate delegate) { michael@0: this.delegate = delegate; michael@0: } michael@0: michael@0: private Object monitor = new Object(); michael@0: @Override michael@0: public void doNotify() { michael@0: synchronized (monitor) { michael@0: monitor.notify(); michael@0: } michael@0: } michael@0: michael@0: @Override michael@0: public void queueFilled() { michael@0: Logger.debug(LOG_TAG, "Queue filled."); michael@0: synchronized (monitor) { michael@0: this.allRecordsQueued = true; michael@0: monitor.notify(); michael@0: } michael@0: } michael@0: michael@0: @Override michael@0: public void halt() { michael@0: synchronized (monitor) { michael@0: this.stopImmediately = true; michael@0: monitor.notify(); michael@0: } michael@0: } michael@0: michael@0: private Object countMonitor = new Object(); michael@0: @Override michael@0: public void stored() { michael@0: Logger.trace(LOG_TAG, "Record stored. Notifying."); michael@0: synchronized (countMonitor) { michael@0: counter++; michael@0: } michael@0: } michael@0: michael@0: private void consumerIsDone() { michael@0: Logger.debug(LOG_TAG, "Consumer is done. Processed " + counter + ((counter == 1) ? " record." : " records.")); michael@0: delegate.consumerIsDone(!allRecordsQueued); michael@0: } michael@0: michael@0: @Override michael@0: public void run() { michael@0: Record record; michael@0: michael@0: while (true) { michael@0: // The queue is concurrent-safe. michael@0: while ((record = delegate.getQueue().poll()) != null) { michael@0: synchronized (monitor) { michael@0: Logger.trace(LOG_TAG, "run() took monitor."); michael@0: if (stopImmediately) { michael@0: Logger.debug(LOG_TAG, "Stopping immediately. Clearing queue."); michael@0: delegate.getQueue().clear(); michael@0: Logger.debug(LOG_TAG, "Notifying consumer."); michael@0: consumerIsDone(); michael@0: return; michael@0: } michael@0: Logger.debug(LOG_TAG, "run() dropped monitor."); michael@0: } michael@0: michael@0: Logger.trace(LOG_TAG, "Storing record with guid " + record.guid + "."); michael@0: try { michael@0: delegate.store(record); michael@0: } catch (Exception e) { michael@0: // TODO: Bug 709371: track records that failed to apply. michael@0: Logger.error(LOG_TAG, "Caught error in store.", e); michael@0: } michael@0: Logger.trace(LOG_TAG, "Done with record."); michael@0: } michael@0: synchronized (monitor) { michael@0: Logger.trace(LOG_TAG, "run() took monitor."); michael@0: michael@0: if (allRecordsQueued) { michael@0: Logger.debug(LOG_TAG, "Done with records and no more to come. Notifying consumerIsDone."); michael@0: consumerIsDone(); michael@0: return; michael@0: } michael@0: if (stopImmediately) { michael@0: Logger.debug(LOG_TAG, "Done with records and told to stop immediately. Notifying consumerIsDone."); michael@0: consumerIsDone(); michael@0: return; michael@0: } michael@0: try { michael@0: Logger.debug(LOG_TAG, "Not told to stop but no records. Waiting."); michael@0: monitor.wait(10000); michael@0: } catch (InterruptedException e) { michael@0: // TODO michael@0: } michael@0: Logger.trace(LOG_TAG, "run() dropped monitor."); michael@0: } michael@0: } michael@0: } michael@0: }