michael@0: /* Any copyright is dedicated to the Public Domain. michael@0: http://creativecommons.org/publicdomain/zero/1.0/ */ michael@0: michael@0: package org.mozilla.gecko.background.testhelpers; michael@0: michael@0: import java.util.Map.Entry; michael@0: import java.util.concurrent.ConcurrentHashMap; michael@0: import java.util.concurrent.ExecutorService; michael@0: import java.util.concurrent.Executors; michael@0: michael@0: import org.mozilla.gecko.background.common.log.Logger; michael@0: import org.mozilla.gecko.sync.repositories.InactiveSessionException; michael@0: import org.mozilla.gecko.sync.repositories.InvalidSessionTransitionException; michael@0: import org.mozilla.gecko.sync.repositories.NoStoreDelegateException; michael@0: import org.mozilla.gecko.sync.repositories.RecordFilter; michael@0: import org.mozilla.gecko.sync.repositories.Repository; michael@0: import org.mozilla.gecko.sync.repositories.StoreTrackingRepositorySession; michael@0: import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionBeginDelegate; michael@0: import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionCreationDelegate; michael@0: import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate; michael@0: import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFinishDelegate; michael@0: import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionGuidsSinceDelegate; michael@0: import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionWipeDelegate; michael@0: import org.mozilla.gecko.sync.repositories.domain.Record; michael@0: michael@0: import android.content.Context; michael@0: michael@0: public class WBORepository extends Repository { michael@0: michael@0: public class WBORepositoryStats { michael@0: public long created = -1; michael@0: public long begun = -1; michael@0: public long fetchBegan = -1; michael@0: public long fetchCompleted = -1; michael@0: public long storeBegan = -1; michael@0: public long storeCompleted = -1; michael@0: public long finished = -1; michael@0: } michael@0: michael@0: public static final String LOG_TAG = "WBORepository"; michael@0: michael@0: // Access to stats is not guarded. michael@0: public WBORepositoryStats stats; michael@0: michael@0: // Whether or not to increment the timestamp of stored records. michael@0: public final boolean bumpTimestamps; michael@0: michael@0: public class WBORepositorySession extends StoreTrackingRepositorySession { michael@0: michael@0: protected WBORepository wboRepository; michael@0: protected ExecutorService delegateExecutor = Executors.newSingleThreadExecutor(); michael@0: public ConcurrentHashMap wbos; michael@0: michael@0: public WBORepositorySession(WBORepository repository) { michael@0: super(repository); michael@0: michael@0: wboRepository = repository; michael@0: wbos = new ConcurrentHashMap(); michael@0: stats = new WBORepositoryStats(); michael@0: stats.created = now(); michael@0: } michael@0: michael@0: @Override michael@0: protected synchronized void trackGUID(String guid) { michael@0: if (wboRepository.shouldTrack()) { michael@0: super.trackGUID(guid); michael@0: } michael@0: } michael@0: michael@0: @Override michael@0: public void guidsSince(long timestamp, michael@0: RepositorySessionGuidsSinceDelegate delegate) { michael@0: throw new RuntimeException("guidsSince not implemented."); michael@0: } michael@0: michael@0: @Override michael@0: public void fetchSince(long timestamp, michael@0: RepositorySessionFetchRecordsDelegate delegate) { michael@0: long fetchBegan = now(); michael@0: stats.fetchBegan = fetchBegan; michael@0: RecordFilter filter = storeTracker.getFilter(); michael@0: michael@0: for (Entry entry : wbos.entrySet()) { michael@0: Record record = entry.getValue(); michael@0: if (record.lastModified >= timestamp) { michael@0: if (filter != null && michael@0: filter.excludeRecord(record)) { michael@0: Logger.debug(LOG_TAG, "Excluding record " + record.guid); michael@0: continue; michael@0: } michael@0: delegate.deferredFetchDelegate(delegateExecutor).onFetchedRecord(record); michael@0: } michael@0: } michael@0: long fetchCompleted = now(); michael@0: stats.fetchCompleted = fetchCompleted; michael@0: delegate.deferredFetchDelegate(delegateExecutor).onFetchCompleted(fetchCompleted); michael@0: } michael@0: michael@0: @Override michael@0: public void fetch(final String[] guids, michael@0: final RepositorySessionFetchRecordsDelegate delegate) { michael@0: long fetchBegan = now(); michael@0: stats.fetchBegan = fetchBegan; michael@0: for (String guid : guids) { michael@0: if (wbos.containsKey(guid)) { michael@0: delegate.deferredFetchDelegate(delegateExecutor).onFetchedRecord(wbos.get(guid)); michael@0: } michael@0: } michael@0: long fetchCompleted = now(); michael@0: stats.fetchCompleted = fetchCompleted; michael@0: delegate.deferredFetchDelegate(delegateExecutor).onFetchCompleted(fetchCompleted); michael@0: } michael@0: michael@0: @Override michael@0: public void fetchAll(final RepositorySessionFetchRecordsDelegate delegate) { michael@0: long fetchBegan = now(); michael@0: stats.fetchBegan = fetchBegan; michael@0: for (Entry entry : wbos.entrySet()) { michael@0: Record record = entry.getValue(); michael@0: delegate.deferredFetchDelegate(delegateExecutor).onFetchedRecord(record); michael@0: } michael@0: long fetchCompleted = now(); michael@0: stats.fetchCompleted = fetchCompleted; michael@0: delegate.deferredFetchDelegate(delegateExecutor).onFetchCompleted(fetchCompleted); michael@0: } michael@0: michael@0: @Override michael@0: public void store(final Record record) throws NoStoreDelegateException { michael@0: if (delegate == null) { michael@0: throw new NoStoreDelegateException(); michael@0: } michael@0: final long now = now(); michael@0: if (stats.storeBegan < 0) { michael@0: stats.storeBegan = now; michael@0: } michael@0: Record existing = wbos.get(record.guid); michael@0: Logger.debug(LOG_TAG, "Existing record is " + (existing == null ? "" : (existing.guid + ", " + existing))); michael@0: if (existing != null && michael@0: existing.lastModified > record.lastModified) { michael@0: Logger.debug(LOG_TAG, "Local record is newer. Not storing."); michael@0: delegate.deferredStoreDelegate(delegateExecutor).onRecordStoreSucceeded(record.guid); michael@0: return; michael@0: } michael@0: if (existing != null) { michael@0: Logger.debug(LOG_TAG, "Replacing local record."); michael@0: } michael@0: michael@0: // Store a copy of the record with an updated modified time. michael@0: Record toStore = record.copyWithIDs(record.guid, record.androidID); michael@0: if (bumpTimestamps) { michael@0: toStore.lastModified = now; michael@0: } michael@0: wbos.put(record.guid, toStore); michael@0: michael@0: trackRecord(toStore); michael@0: delegate.deferredStoreDelegate(delegateExecutor).onRecordStoreSucceeded(record.guid); michael@0: } michael@0: michael@0: @Override michael@0: public void wipe(final RepositorySessionWipeDelegate delegate) { michael@0: if (!isActive()) { michael@0: delegate.onWipeFailed(new InactiveSessionException(null)); michael@0: return; michael@0: } michael@0: michael@0: Logger.info(LOG_TAG, "Wiping WBORepositorySession."); michael@0: this.wbos = new ConcurrentHashMap(); michael@0: michael@0: // Wipe immediately for the convenience of test code. michael@0: wboRepository.wbos = new ConcurrentHashMap(); michael@0: delegate.deferredWipeDelegate(delegateExecutor).onWipeSucceeded(); michael@0: } michael@0: michael@0: @Override michael@0: public void finish(RepositorySessionFinishDelegate delegate) throws InactiveSessionException { michael@0: Logger.info(LOG_TAG, "Finishing WBORepositorySession: handing back " + this.wbos.size() + " WBOs."); michael@0: wboRepository.wbos = this.wbos; michael@0: stats.finished = now(); michael@0: super.finish(delegate); michael@0: } michael@0: michael@0: @Override michael@0: public void begin(RepositorySessionBeginDelegate delegate) throws InvalidSessionTransitionException { michael@0: this.wbos = wboRepository.cloneWBOs(); michael@0: stats.begun = now(); michael@0: super.begin(delegate); michael@0: } michael@0: michael@0: @Override michael@0: public void storeDone(long end) { michael@0: // TODO: this is not guaranteed to be called after all of the record michael@0: // store callbacks have completed! michael@0: if (stats.storeBegan < 0) { michael@0: stats.storeBegan = end; michael@0: } michael@0: stats.storeCompleted = end; michael@0: delegate.deferredStoreDelegate(delegateExecutor).onStoreCompleted(end); michael@0: } michael@0: } michael@0: michael@0: public ConcurrentHashMap wbos; michael@0: michael@0: public WBORepository(boolean bumpTimestamps) { michael@0: super(); michael@0: this.bumpTimestamps = bumpTimestamps; michael@0: this.wbos = new ConcurrentHashMap(); michael@0: } michael@0: michael@0: public WBORepository() { michael@0: this(false); michael@0: } michael@0: michael@0: public synchronized boolean shouldTrack() { michael@0: return false; michael@0: } michael@0: michael@0: @Override michael@0: public void createSession(RepositorySessionCreationDelegate delegate, michael@0: Context context) { michael@0: delegate.deferredCreationDelegate().onSessionCreated(new WBORepositorySession(this)); michael@0: } michael@0: michael@0: public ConcurrentHashMap cloneWBOs() { michael@0: ConcurrentHashMap out = new ConcurrentHashMap(); michael@0: for (Entry entry : wbos.entrySet()) { michael@0: out.put(entry.getKey(), entry.getValue()); // Assume that records are michael@0: // immutable. michael@0: } michael@0: return out; michael@0: } michael@0: }