Wed, 31 Dec 2014 07:22:50 +0100
Correct previous dual key logic pending first delivery installment.
1 /* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
5 package org.mozilla.gecko.sync.repositories;
7 import java.io.IOException;
8 import java.io.OutputStream;
9 import java.io.UnsupportedEncodingException;
10 import java.net.URI;
11 import java.net.URISyntaxException;
12 import java.util.ArrayList;
13 import java.util.Collections;
14 import java.util.HashSet;
15 import java.util.Set;
16 import java.util.concurrent.atomic.AtomicLong;
18 import org.json.simple.JSONArray;
19 import org.mozilla.gecko.background.common.log.Logger;
20 import org.mozilla.gecko.sync.CryptoRecord;
21 import org.mozilla.gecko.sync.DelayedWorkTracker;
22 import org.mozilla.gecko.sync.ExtendedJSONObject;
23 import org.mozilla.gecko.sync.HTTPFailureException;
24 import org.mozilla.gecko.sync.Server11PreviousPostFailedException;
25 import org.mozilla.gecko.sync.Server11RecordPostFailedException;
26 import org.mozilla.gecko.sync.UnexpectedJSONException;
27 import org.mozilla.gecko.sync.crypto.KeyBundle;
28 import org.mozilla.gecko.sync.net.AuthHeaderProvider;
29 import org.mozilla.gecko.sync.net.SyncStorageCollectionRequest;
30 import org.mozilla.gecko.sync.net.SyncStorageRequest;
31 import org.mozilla.gecko.sync.net.SyncStorageRequestDelegate;
32 import org.mozilla.gecko.sync.net.SyncStorageResponse;
33 import org.mozilla.gecko.sync.net.WBOCollectionRequestDelegate;
34 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionBeginDelegate;
35 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate;
36 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionGuidsSinceDelegate;
37 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
38 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionWipeDelegate;
39 import org.mozilla.gecko.sync.repositories.domain.Record;
41 import ch.boye.httpclientandroidlib.entity.ContentProducer;
42 import ch.boye.httpclientandroidlib.entity.EntityTemplate;
44 public class Server11RepositorySession extends RepositorySession {
45 private static byte[] recordsStart;
46 private static byte[] recordSeparator;
47 private static byte[] recordsEnd;
49 static {
50 try {
51 recordsStart = "[\n".getBytes("UTF-8");
52 recordSeparator = ",\n".getBytes("UTF-8");
53 recordsEnd = "\n]\n".getBytes("UTF-8");
54 } catch (UnsupportedEncodingException e) {
55 // These won't fail.
56 }
57 }
59 public static final String LOG_TAG = "Server11Session";
61 private static final int UPLOAD_BYTE_THRESHOLD = 1024 * 1024; // 1MB.
62 private static final int UPLOAD_ITEM_THRESHOLD = 50;
63 private static final int PER_RECORD_OVERHEAD = 2; // Comma, newline.
64 // {}, newlines, but we get to skip one record overhead.
65 private static final int PER_BATCH_OVERHEAD = 5 - PER_RECORD_OVERHEAD;
67 /**
68 * Return the X-Weave-Timestamp header from <code>response</code>, or the
69 * current time if it is missing.
70 * <p>
71 * <b>Warning:</b> this can cause the timestamp of <code>response</code> to
72 * cross domains (from server clock to local clock), which could cause records
73 * to be skipped on account of clock drift. This should never happen, because
74 * <i>every</i> server response should have a well-formed X-Weave-Timestamp
75 * header.
76 *
77 * @param response
78 * The <code>SyncStorageResponse</code> to interrogate.
79 * @return Normalized timestamp in milliseconds.
80 */
81 public static long getNormalizedTimestamp(SyncStorageResponse response) {
82 long normalizedTimestamp = -1;
83 try {
84 normalizedTimestamp = response.normalizedWeaveTimestamp();
85 } catch (NumberFormatException e) {
86 Logger.warn(LOG_TAG, "Malformed X-Weave-Timestamp header received.", e);
87 }
88 if (-1 == normalizedTimestamp) {
89 Logger.warn(LOG_TAG, "Computing stand-in timestamp from local clock. Clock drift could cause records to be skipped.");
90 normalizedTimestamp = System.currentTimeMillis();
91 }
92 return normalizedTimestamp;
93 }
95 /**
96 * Used to track outstanding requests, so that we can abort them as needed.
97 */
98 private Set<SyncStorageCollectionRequest> pending = Collections.synchronizedSet(new HashSet<SyncStorageCollectionRequest>());
100 @Override
101 public void abort() {
102 super.abort();
103 for (SyncStorageCollectionRequest request : pending) {
104 request.abort();
105 }
106 pending.clear();
107 }
109 /**
110 * Convert HTTP request delegate callbacks into fetch callbacks within the
111 * context of this RepositorySession.
112 *
113 * @author rnewman
114 *
115 */
116 public class RequestFetchDelegateAdapter extends WBOCollectionRequestDelegate {
117 RepositorySessionFetchRecordsDelegate delegate;
118 private DelayedWorkTracker workTracker = new DelayedWorkTracker();
120 // So that we can clean up.
121 private SyncStorageCollectionRequest request;
123 public void setRequest(SyncStorageCollectionRequest request) {
124 this.request = request;
125 }
126 private void removeRequestFromPending() {
127 if (this.request == null) {
128 return;
129 }
130 pending.remove(this.request);
131 this.request = null;
132 }
134 public RequestFetchDelegateAdapter(RepositorySessionFetchRecordsDelegate delegate) {
135 this.delegate = delegate;
136 }
138 @Override
139 public AuthHeaderProvider getAuthHeaderProvider() {
140 return serverRepository.getAuthHeaderProvider();
141 }
143 @Override
144 public String ifUnmodifiedSince() {
145 return null;
146 }
148 @Override
149 public void handleRequestSuccess(SyncStorageResponse response) {
150 Logger.debug(LOG_TAG, "Fetch done.");
151 removeRequestFromPending();
153 final long normalizedTimestamp = getNormalizedTimestamp(response);
154 Logger.debug(LOG_TAG, "Fetch completed. Timestamp is " + normalizedTimestamp);
156 // When we're done processing other events, finish.
157 workTracker.delayWorkItem(new Runnable() {
158 @Override
159 public void run() {
160 Logger.debug(LOG_TAG, "Delayed onFetchCompleted running.");
161 // TODO: verify number of returned records.
162 delegate.onFetchCompleted(normalizedTimestamp);
163 }
164 });
165 }
167 @Override
168 public void handleRequestFailure(SyncStorageResponse response) {
169 // TODO: ensure that delegate methods don't get called more than once.
170 this.handleRequestError(new HTTPFailureException(response));
171 }
173 @Override
174 public void handleRequestError(final Exception ex) {
175 removeRequestFromPending();
176 Logger.warn(LOG_TAG, "Got request error.", ex);
177 // When we're done processing other events, finish.
178 workTracker.delayWorkItem(new Runnable() {
179 @Override
180 public void run() {
181 Logger.debug(LOG_TAG, "Running onFetchFailed.");
182 delegate.onFetchFailed(ex, null);
183 }
184 });
185 }
187 @Override
188 public void handleWBO(CryptoRecord record) {
189 workTracker.incrementOutstanding();
190 try {
191 delegate.onFetchedRecord(record);
192 } catch (Exception ex) {
193 Logger.warn(LOG_TAG, "Got exception calling onFetchedRecord with WBO.", ex);
194 // TODO: handle this better.
195 throw new RuntimeException(ex);
196 } finally {
197 workTracker.decrementOutstanding();
198 }
199 }
201 // TODO: this implies that we've screwed up our inheritance chain somehow.
202 @Override
203 public KeyBundle keyBundle() {
204 return null;
205 }
206 }
209 Server11Repository serverRepository;
210 AtomicLong uploadTimestamp = new AtomicLong(0);
212 private void bumpUploadTimestamp(long ts) {
213 while (true) {
214 long existing = uploadTimestamp.get();
215 if (existing > ts) {
216 return;
217 }
218 if (uploadTimestamp.compareAndSet(existing, ts)) {
219 return;
220 }
221 }
222 }
224 public Server11RepositorySession(Repository repository) {
225 super(repository);
226 serverRepository = (Server11Repository) repository;
227 }
229 private String flattenIDs(String[] guids) {
230 // Consider using Utils.toDelimitedString if and when the signature changes
231 // to Collection<String> guids.
232 if (guids.length == 0) {
233 return "";
234 }
235 if (guids.length == 1) {
236 return guids[0];
237 }
238 StringBuilder b = new StringBuilder();
239 for (String guid : guids) {
240 b.append(guid);
241 b.append(",");
242 }
243 return b.substring(0, b.length() - 1);
244 }
246 @Override
247 public void guidsSince(long timestamp,
248 RepositorySessionGuidsSinceDelegate delegate) {
249 // TODO Auto-generated method stub
251 }
253 protected void fetchWithParameters(long newer,
254 long limit,
255 boolean full,
256 String sort,
257 String ids,
258 RequestFetchDelegateAdapter delegate)
259 throws URISyntaxException {
261 URI collectionURI = serverRepository.collectionURI(full, newer, limit, sort, ids);
262 SyncStorageCollectionRequest request = new SyncStorageCollectionRequest(collectionURI);
263 request.delegate = delegate;
265 // So it can clean up.
266 delegate.setRequest(request);
267 pending.add(request);
268 request.get();
269 }
271 public void fetchSince(long timestamp, long limit, String sort, RepositorySessionFetchRecordsDelegate delegate) {
272 try {
273 this.fetchWithParameters(timestamp, limit, true, sort, null, new RequestFetchDelegateAdapter(delegate));
274 } catch (URISyntaxException e) {
275 delegate.onFetchFailed(e, null);
276 }
277 }
279 @Override
280 public void fetchSince(long timestamp,
281 RepositorySessionFetchRecordsDelegate delegate) {
282 try {
283 long limit = serverRepository.getDefaultFetchLimit();
284 String sort = serverRepository.getDefaultSort();
285 this.fetchWithParameters(timestamp, limit, true, sort, null, new RequestFetchDelegateAdapter(delegate));
286 } catch (URISyntaxException e) {
287 delegate.onFetchFailed(e, null);
288 }
289 }
291 @Override
292 public void fetchAll(RepositorySessionFetchRecordsDelegate delegate) {
293 this.fetchSince(-1, delegate);
294 }
296 @Override
297 public void fetch(String[] guids,
298 RepositorySessionFetchRecordsDelegate delegate) {
299 // TODO: watch out for URL length limits!
300 try {
301 String ids = flattenIDs(guids);
302 this.fetchWithParameters(-1, -1, true, "index", ids, new RequestFetchDelegateAdapter(delegate));
303 } catch (URISyntaxException e) {
304 delegate.onFetchFailed(e, null);
305 }
306 }
308 @Override
309 public void wipe(RepositorySessionWipeDelegate delegate) {
310 if (!isActive()) {
311 delegate.onWipeFailed(new InactiveSessionException(null));
312 return;
313 }
314 // TODO: implement wipe.
315 }
317 protected Object recordsBufferMonitor = new Object();
319 /**
320 * Data of outbound records.
321 * <p>
322 * We buffer the data (rather than the <code>Record</code>) so that we can
323 * flush the buffer based on outgoing transmission size.
324 * <p>
325 * Access should be synchronized on <code>recordsBufferMonitor</code>.
326 */
327 protected ArrayList<byte[]> recordsBuffer = new ArrayList<byte[]>();
329 /**
330 * GUIDs of outbound records.
331 * <p>
332 * Used to fail entire outgoing uploads.
333 * <p>
334 * Access should be synchronized on <code>recordsBufferMonitor</code>.
335 */
336 protected ArrayList<String> recordGuidsBuffer = new ArrayList<String>();
337 protected int byteCount = PER_BATCH_OVERHEAD;
339 @Override
340 public void store(Record record) throws NoStoreDelegateException {
341 if (delegate == null) {
342 throw new NoStoreDelegateException();
343 }
344 this.enqueue(record);
345 }
347 /**
348 * Batch incoming records until some reasonable threshold (e.g., 50),
349 * some size limit is hit (probably way less than 3MB!), or storeDone
350 * is received.
351 * @param record
352 */
353 protected void enqueue(Record record) {
354 // JSONify and store the bytes, rather than the record.
355 byte[] json = record.toJSONBytes();
356 int delta = json.length;
357 synchronized (recordsBufferMonitor) {
358 if ((delta + byteCount > UPLOAD_BYTE_THRESHOLD) ||
359 (recordsBuffer.size() >= UPLOAD_ITEM_THRESHOLD)) {
361 // POST the existing contents, then enqueue.
362 flush();
363 }
364 recordsBuffer.add(json);
365 recordGuidsBuffer.add(record.guid);
366 byteCount += PER_RECORD_OVERHEAD + delta;
367 }
368 }
370 // Asynchronously upload records.
371 // Must be locked!
372 protected void flush() {
373 if (recordsBuffer.size() > 0) {
374 final ArrayList<byte[]> outgoing = recordsBuffer;
375 final ArrayList<String> outgoingGuids = recordGuidsBuffer;
376 RepositorySessionStoreDelegate uploadDelegate = this.delegate;
377 storeWorkQueue.execute(new RecordUploadRunnable(uploadDelegate, outgoing, outgoingGuids, byteCount));
379 recordsBuffer = new ArrayList<byte[]>();
380 recordGuidsBuffer = new ArrayList<String>();
381 byteCount = PER_BATCH_OVERHEAD;
382 }
383 }
385 @Override
386 public void storeDone() {
387 Logger.debug(LOG_TAG, "storeDone().");
388 synchronized (recordsBufferMonitor) {
389 flush();
390 // Do this in a Runnable so that the timestamp is grabbed after any upload.
391 final Runnable r = new Runnable() {
392 @Override
393 public void run() {
394 synchronized (recordsBufferMonitor) {
395 final long end = uploadTimestamp.get();
396 Logger.debug(LOG_TAG, "Calling storeDone with " + end);
397 storeDone(end);
398 }
399 }
400 };
401 storeWorkQueue.execute(r);
402 }
403 }
405 /**
406 * <code>true</code> if a record upload has failed this session.
407 * <p>
408 * This is only set in begin and possibly by <code>RecordUploadRunnable</code>.
409 * Since those are executed serially, we can use an unsynchronized
410 * volatile boolean here.
411 */
412 protected volatile boolean recordUploadFailed;
414 @Override
415 public void begin(RepositorySessionBeginDelegate delegate) throws InvalidSessionTransitionException {
416 recordUploadFailed = false;
417 super.begin(delegate);
418 }
420 /**
421 * Make an HTTP request, and convert HTTP request delegate callbacks into
422 * store callbacks within the context of this RepositorySession.
423 *
424 * @author rnewman
425 *
426 */
427 protected class RecordUploadRunnable implements Runnable, SyncStorageRequestDelegate {
429 public final String LOG_TAG = "RecordUploadRunnable";
430 private ArrayList<byte[]> outgoing;
431 private ArrayList<String> outgoingGuids;
432 private long byteCount;
434 public RecordUploadRunnable(RepositorySessionStoreDelegate storeDelegate,
435 ArrayList<byte[]> outgoing,
436 ArrayList<String> outgoingGuids,
437 long byteCount) {
438 Logger.debug(LOG_TAG, "Preparing record upload for " +
439 outgoing.size() + " records (" +
440 byteCount + " bytes).");
441 this.outgoing = outgoing;
442 this.outgoingGuids = outgoingGuids;
443 this.byteCount = byteCount;
444 }
446 @Override
447 public AuthHeaderProvider getAuthHeaderProvider() {
448 return serverRepository.getAuthHeaderProvider();
449 }
451 @Override
452 public String ifUnmodifiedSince() {
453 return null;
454 }
456 @Override
457 public void handleRequestSuccess(SyncStorageResponse response) {
458 Logger.trace(LOG_TAG, "POST of " + outgoing.size() + " records done.");
460 ExtendedJSONObject body;
461 try {
462 body = response.jsonObjectBody(); // jsonObjectBody() throws or returns non-null.
463 } catch (Exception e) {
464 Logger.error(LOG_TAG, "Got exception parsing POST success body.", e);
465 this.handleRequestError(e);
466 return;
467 }
469 // Be defensive when logging timestamp.
470 if (body.containsKey("modified")) {
471 Long modified = body.getTimestamp("modified");
472 if (modified != null) {
473 Logger.trace(LOG_TAG, "POST request success. Modified timestamp: " + modified.longValue());
474 } else {
475 Logger.warn(LOG_TAG, "POST success body contains malformed 'modified': " + body.toJSONString());
476 }
477 } else {
478 Logger.warn(LOG_TAG, "POST success body does not contain key 'modified': " + body.toJSONString());
479 }
481 try {
482 JSONArray success = body.getArray("success");
483 if ((success != null) &&
484 (success.size() > 0)) {
485 Logger.trace(LOG_TAG, "Successful records: " + success.toString());
486 for (Object o : success) {
487 try {
488 delegate.onRecordStoreSucceeded((String) o);
489 } catch (ClassCastException e) {
490 Logger.error(LOG_TAG, "Got exception parsing POST success guid.", e);
491 // Not much to be done.
492 }
493 }
495 long normalizedTimestamp = getNormalizedTimestamp(response);
496 Logger.trace(LOG_TAG, "Passing back upload X-Weave-Timestamp: " + normalizedTimestamp);
497 bumpUploadTimestamp(normalizedTimestamp);
498 }
499 success = null; // Want to GC this ASAP.
501 ExtendedJSONObject failed = body.getObject("failed");
502 if ((failed != null) &&
503 (failed.object.size() > 0)) {
504 Logger.debug(LOG_TAG, "Failed records: " + failed.object.toString());
505 Exception ex = new Server11RecordPostFailedException();
506 for (String guid : failed.keySet()) {
507 delegate.onRecordStoreFailed(ex, guid);
508 }
509 }
510 failed = null; // Want to GC this ASAP.
511 } catch (UnexpectedJSONException e) {
512 Logger.error(LOG_TAG, "Got exception processing success/failed in POST success body.", e);
513 // TODO
514 return;
515 }
516 Logger.debug(LOG_TAG, "POST of " + outgoing.size() + " records handled.");
517 }
519 @Override
520 public void handleRequestFailure(SyncStorageResponse response) {
521 // TODO: call session.interpretHTTPFailure.
522 this.handleRequestError(new HTTPFailureException(response));
523 }
525 @Override
526 public void handleRequestError(final Exception ex) {
527 Logger.warn(LOG_TAG, "Got request error.", ex);
529 recordUploadFailed = true;
530 ArrayList<String> failedOutgoingGuids = outgoingGuids;
531 outgoingGuids = null; // Want to GC this ASAP.
532 for (String guid : failedOutgoingGuids) {
533 delegate.onRecordStoreFailed(ex, guid);
534 }
535 return;
536 }
538 public class ByteArraysContentProducer implements ContentProducer {
540 ArrayList<byte[]> outgoing;
541 public ByteArraysContentProducer(ArrayList<byte[]> arrays) {
542 outgoing = arrays;
543 }
545 @Override
546 public void writeTo(OutputStream outstream) throws IOException {
547 int count = outgoing.size();
548 outstream.write(recordsStart);
549 outstream.write(outgoing.get(0));
550 for (int i = 1; i < count; ++i) {
551 outstream.write(recordSeparator);
552 outstream.write(outgoing.get(i));
553 }
554 outstream.write(recordsEnd);
555 }
556 }
558 public class ByteArraysEntity extends EntityTemplate {
559 private long count;
560 public ByteArraysEntity(ArrayList<byte[]> arrays, long totalBytes) {
561 super(new ByteArraysContentProducer(arrays));
562 this.count = totalBytes;
563 this.setContentType("application/json");
564 // charset is set in BaseResource.
565 }
567 @Override
568 public long getContentLength() {
569 return count;
570 }
572 @Override
573 public boolean isRepeatable() {
574 return true;
575 }
576 }
578 public ByteArraysEntity getBodyEntity() {
579 ByteArraysEntity body = new ByteArraysEntity(outgoing, byteCount);
580 return body;
581 }
583 @Override
584 public void run() {
585 if (recordUploadFailed) {
586 Logger.info(LOG_TAG, "Previous record upload failed. Failing all records and not retrying.");
587 Exception ex = new Server11PreviousPostFailedException();
588 for (String guid : outgoingGuids) {
589 delegate.onRecordStoreFailed(ex, guid);
590 }
591 return;
592 }
594 if (outgoing == null ||
595 outgoing.size() == 0) {
596 Logger.debug(LOG_TAG, "No items: RecordUploadRunnable returning immediately.");
597 return;
598 }
600 URI u = serverRepository.collectionURI();
601 SyncStorageRequest request = new SyncStorageRequest(u);
603 request.delegate = this;
605 // We don't want the task queue to proceed until this request completes.
606 // Fortunately, BaseResource is currently synchronous.
607 // If that ever changes, you'll need to block here.
608 ByteArraysEntity body = getBodyEntity();
609 request.post(body);
610 }
611 }
613 @Override
614 public boolean dataAvailable() {
615 return serverRepository.updateNeeded(getLastSyncTimestamp());
616 }
617 }