|
1 /* Any copyright is dedicated to the Public Domain. |
|
2 http://creativecommons.org/publicdomain/zero/1.0/ */ |
|
3 |
|
4 package org.mozilla.gecko.background.testhelpers; |
|
5 |
|
6 import java.util.Map.Entry; |
|
7 import java.util.concurrent.ConcurrentHashMap; |
|
8 import java.util.concurrent.ExecutorService; |
|
9 import java.util.concurrent.Executors; |
|
10 |
|
11 import org.mozilla.gecko.background.common.log.Logger; |
|
12 import org.mozilla.gecko.sync.repositories.InactiveSessionException; |
|
13 import org.mozilla.gecko.sync.repositories.InvalidSessionTransitionException; |
|
14 import org.mozilla.gecko.sync.repositories.NoStoreDelegateException; |
|
15 import org.mozilla.gecko.sync.repositories.RecordFilter; |
|
16 import org.mozilla.gecko.sync.repositories.Repository; |
|
17 import org.mozilla.gecko.sync.repositories.StoreTrackingRepositorySession; |
|
18 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionBeginDelegate; |
|
19 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionCreationDelegate; |
|
20 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate; |
|
21 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFinishDelegate; |
|
22 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionGuidsSinceDelegate; |
|
23 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionWipeDelegate; |
|
24 import org.mozilla.gecko.sync.repositories.domain.Record; |
|
25 |
|
26 import android.content.Context; |
|
27 |
|
28 public class WBORepository extends Repository { |
|
29 |
|
30 public class WBORepositoryStats { |
|
31 public long created = -1; |
|
32 public long begun = -1; |
|
33 public long fetchBegan = -1; |
|
34 public long fetchCompleted = -1; |
|
35 public long storeBegan = -1; |
|
36 public long storeCompleted = -1; |
|
37 public long finished = -1; |
|
38 } |
|
39 |
|
40 public static final String LOG_TAG = "WBORepository"; |
|
41 |
|
42 // Access to stats is not guarded. |
|
43 public WBORepositoryStats stats; |
|
44 |
|
45 // Whether or not to increment the timestamp of stored records. |
|
46 public final boolean bumpTimestamps; |
|
47 |
|
48 public class WBORepositorySession extends StoreTrackingRepositorySession { |
|
49 |
|
50 protected WBORepository wboRepository; |
|
51 protected ExecutorService delegateExecutor = Executors.newSingleThreadExecutor(); |
|
52 public ConcurrentHashMap<String, Record> wbos; |
|
53 |
|
54 public WBORepositorySession(WBORepository repository) { |
|
55 super(repository); |
|
56 |
|
57 wboRepository = repository; |
|
58 wbos = new ConcurrentHashMap<String, Record>(); |
|
59 stats = new WBORepositoryStats(); |
|
60 stats.created = now(); |
|
61 } |
|
62 |
|
63 @Override |
|
64 protected synchronized void trackGUID(String guid) { |
|
65 if (wboRepository.shouldTrack()) { |
|
66 super.trackGUID(guid); |
|
67 } |
|
68 } |
|
69 |
|
70 @Override |
|
71 public void guidsSince(long timestamp, |
|
72 RepositorySessionGuidsSinceDelegate delegate) { |
|
73 throw new RuntimeException("guidsSince not implemented."); |
|
74 } |
|
75 |
|
76 @Override |
|
77 public void fetchSince(long timestamp, |
|
78 RepositorySessionFetchRecordsDelegate delegate) { |
|
79 long fetchBegan = now(); |
|
80 stats.fetchBegan = fetchBegan; |
|
81 RecordFilter filter = storeTracker.getFilter(); |
|
82 |
|
83 for (Entry<String, Record> entry : wbos.entrySet()) { |
|
84 Record record = entry.getValue(); |
|
85 if (record.lastModified >= timestamp) { |
|
86 if (filter != null && |
|
87 filter.excludeRecord(record)) { |
|
88 Logger.debug(LOG_TAG, "Excluding record " + record.guid); |
|
89 continue; |
|
90 } |
|
91 delegate.deferredFetchDelegate(delegateExecutor).onFetchedRecord(record); |
|
92 } |
|
93 } |
|
94 long fetchCompleted = now(); |
|
95 stats.fetchCompleted = fetchCompleted; |
|
96 delegate.deferredFetchDelegate(delegateExecutor).onFetchCompleted(fetchCompleted); |
|
97 } |
|
98 |
|
99 @Override |
|
100 public void fetch(final String[] guids, |
|
101 final RepositorySessionFetchRecordsDelegate delegate) { |
|
102 long fetchBegan = now(); |
|
103 stats.fetchBegan = fetchBegan; |
|
104 for (String guid : guids) { |
|
105 if (wbos.containsKey(guid)) { |
|
106 delegate.deferredFetchDelegate(delegateExecutor).onFetchedRecord(wbos.get(guid)); |
|
107 } |
|
108 } |
|
109 long fetchCompleted = now(); |
|
110 stats.fetchCompleted = fetchCompleted; |
|
111 delegate.deferredFetchDelegate(delegateExecutor).onFetchCompleted(fetchCompleted); |
|
112 } |
|
113 |
|
114 @Override |
|
115 public void fetchAll(final RepositorySessionFetchRecordsDelegate delegate) { |
|
116 long fetchBegan = now(); |
|
117 stats.fetchBegan = fetchBegan; |
|
118 for (Entry<String, Record> entry : wbos.entrySet()) { |
|
119 Record record = entry.getValue(); |
|
120 delegate.deferredFetchDelegate(delegateExecutor).onFetchedRecord(record); |
|
121 } |
|
122 long fetchCompleted = now(); |
|
123 stats.fetchCompleted = fetchCompleted; |
|
124 delegate.deferredFetchDelegate(delegateExecutor).onFetchCompleted(fetchCompleted); |
|
125 } |
|
126 |
|
127 @Override |
|
128 public void store(final Record record) throws NoStoreDelegateException { |
|
129 if (delegate == null) { |
|
130 throw new NoStoreDelegateException(); |
|
131 } |
|
132 final long now = now(); |
|
133 if (stats.storeBegan < 0) { |
|
134 stats.storeBegan = now; |
|
135 } |
|
136 Record existing = wbos.get(record.guid); |
|
137 Logger.debug(LOG_TAG, "Existing record is " + (existing == null ? "<null>" : (existing.guid + ", " + existing))); |
|
138 if (existing != null && |
|
139 existing.lastModified > record.lastModified) { |
|
140 Logger.debug(LOG_TAG, "Local record is newer. Not storing."); |
|
141 delegate.deferredStoreDelegate(delegateExecutor).onRecordStoreSucceeded(record.guid); |
|
142 return; |
|
143 } |
|
144 if (existing != null) { |
|
145 Logger.debug(LOG_TAG, "Replacing local record."); |
|
146 } |
|
147 |
|
148 // Store a copy of the record with an updated modified time. |
|
149 Record toStore = record.copyWithIDs(record.guid, record.androidID); |
|
150 if (bumpTimestamps) { |
|
151 toStore.lastModified = now; |
|
152 } |
|
153 wbos.put(record.guid, toStore); |
|
154 |
|
155 trackRecord(toStore); |
|
156 delegate.deferredStoreDelegate(delegateExecutor).onRecordStoreSucceeded(record.guid); |
|
157 } |
|
158 |
|
159 @Override |
|
160 public void wipe(final RepositorySessionWipeDelegate delegate) { |
|
161 if (!isActive()) { |
|
162 delegate.onWipeFailed(new InactiveSessionException(null)); |
|
163 return; |
|
164 } |
|
165 |
|
166 Logger.info(LOG_TAG, "Wiping WBORepositorySession."); |
|
167 this.wbos = new ConcurrentHashMap<String, Record>(); |
|
168 |
|
169 // Wipe immediately for the convenience of test code. |
|
170 wboRepository.wbos = new ConcurrentHashMap<String, Record>(); |
|
171 delegate.deferredWipeDelegate(delegateExecutor).onWipeSucceeded(); |
|
172 } |
|
173 |
|
174 @Override |
|
175 public void finish(RepositorySessionFinishDelegate delegate) throws InactiveSessionException { |
|
176 Logger.info(LOG_TAG, "Finishing WBORepositorySession: handing back " + this.wbos.size() + " WBOs."); |
|
177 wboRepository.wbos = this.wbos; |
|
178 stats.finished = now(); |
|
179 super.finish(delegate); |
|
180 } |
|
181 |
|
182 @Override |
|
183 public void begin(RepositorySessionBeginDelegate delegate) throws InvalidSessionTransitionException { |
|
184 this.wbos = wboRepository.cloneWBOs(); |
|
185 stats.begun = now(); |
|
186 super.begin(delegate); |
|
187 } |
|
188 |
|
189 @Override |
|
190 public void storeDone(long end) { |
|
191 // TODO: this is not guaranteed to be called after all of the record |
|
192 // store callbacks have completed! |
|
193 if (stats.storeBegan < 0) { |
|
194 stats.storeBegan = end; |
|
195 } |
|
196 stats.storeCompleted = end; |
|
197 delegate.deferredStoreDelegate(delegateExecutor).onStoreCompleted(end); |
|
198 } |
|
199 } |
|
200 |
|
201 public ConcurrentHashMap<String, Record> wbos; |
|
202 |
|
203 public WBORepository(boolean bumpTimestamps) { |
|
204 super(); |
|
205 this.bumpTimestamps = bumpTimestamps; |
|
206 this.wbos = new ConcurrentHashMap<String, Record>(); |
|
207 } |
|
208 |
|
209 public WBORepository() { |
|
210 this(false); |
|
211 } |
|
212 |
|
213 public synchronized boolean shouldTrack() { |
|
214 return false; |
|
215 } |
|
216 |
|
217 @Override |
|
218 public void createSession(RepositorySessionCreationDelegate delegate, |
|
219 Context context) { |
|
220 delegate.deferredCreationDelegate().onSessionCreated(new WBORepositorySession(this)); |
|
221 } |
|
222 |
|
223 public ConcurrentHashMap<String, Record> cloneWBOs() { |
|
224 ConcurrentHashMap<String, Record> out = new ConcurrentHashMap<String, Record>(); |
|
225 for (Entry<String, Record> entry : wbos.entrySet()) { |
|
226 out.put(entry.getKey(), entry.getValue()); // Assume that records are |
|
227 // immutable. |
|
228 } |
|
229 return out; |
|
230 } |
|
231 } |