|
1 /* This Source Code Form is subject to the terms of the Mozilla Public |
|
2 * License, v. 2.0. If a copy of the MPL was not distributed with this |
|
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ |
|
4 |
|
5 package org.mozilla.gecko.sync.repositories; |
|
6 |
|
7 import java.io.IOException; |
|
8 import java.io.OutputStream; |
|
9 import java.io.UnsupportedEncodingException; |
|
10 import java.net.URI; |
|
11 import java.net.URISyntaxException; |
|
12 import java.util.ArrayList; |
|
13 import java.util.Collections; |
|
14 import java.util.HashSet; |
|
15 import java.util.Set; |
|
16 import java.util.concurrent.atomic.AtomicLong; |
|
17 |
|
18 import org.json.simple.JSONArray; |
|
19 import org.mozilla.gecko.background.common.log.Logger; |
|
20 import org.mozilla.gecko.sync.CryptoRecord; |
|
21 import org.mozilla.gecko.sync.DelayedWorkTracker; |
|
22 import org.mozilla.gecko.sync.ExtendedJSONObject; |
|
23 import org.mozilla.gecko.sync.HTTPFailureException; |
|
24 import org.mozilla.gecko.sync.Server11PreviousPostFailedException; |
|
25 import org.mozilla.gecko.sync.Server11RecordPostFailedException; |
|
26 import org.mozilla.gecko.sync.UnexpectedJSONException; |
|
27 import org.mozilla.gecko.sync.crypto.KeyBundle; |
|
28 import org.mozilla.gecko.sync.net.AuthHeaderProvider; |
|
29 import org.mozilla.gecko.sync.net.SyncStorageCollectionRequest; |
|
30 import org.mozilla.gecko.sync.net.SyncStorageRequest; |
|
31 import org.mozilla.gecko.sync.net.SyncStorageRequestDelegate; |
|
32 import org.mozilla.gecko.sync.net.SyncStorageResponse; |
|
33 import org.mozilla.gecko.sync.net.WBOCollectionRequestDelegate; |
|
34 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionBeginDelegate; |
|
35 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate; |
|
36 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionGuidsSinceDelegate; |
|
37 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate; |
|
38 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionWipeDelegate; |
|
39 import org.mozilla.gecko.sync.repositories.domain.Record; |
|
40 |
|
41 import ch.boye.httpclientandroidlib.entity.ContentProducer; |
|
42 import ch.boye.httpclientandroidlib.entity.EntityTemplate; |
|
43 |
|
44 public class Server11RepositorySession extends RepositorySession { |
|
45 private static byte[] recordsStart; |
|
46 private static byte[] recordSeparator; |
|
47 private static byte[] recordsEnd; |
|
48 |
|
49 static { |
|
50 try { |
|
51 recordsStart = "[\n".getBytes("UTF-8"); |
|
52 recordSeparator = ",\n".getBytes("UTF-8"); |
|
53 recordsEnd = "\n]\n".getBytes("UTF-8"); |
|
54 } catch (UnsupportedEncodingException e) { |
|
55 // These won't fail. |
|
56 } |
|
57 } |
|
58 |
|
59 public static final String LOG_TAG = "Server11Session"; |
|
60 |
|
61 private static final int UPLOAD_BYTE_THRESHOLD = 1024 * 1024; // 1MB. |
|
62 private static final int UPLOAD_ITEM_THRESHOLD = 50; |
|
63 private static final int PER_RECORD_OVERHEAD = 2; // Comma, newline. |
|
64 // {}, newlines, but we get to skip one record overhead. |
|
65 private static final int PER_BATCH_OVERHEAD = 5 - PER_RECORD_OVERHEAD; |
|
66 |
|
67 /** |
|
68 * Return the X-Weave-Timestamp header from <code>response</code>, or the |
|
69 * current time if it is missing. |
|
70 * <p> |
|
71 * <b>Warning:</b> this can cause the timestamp of <code>response</code> to |
|
72 * cross domains (from server clock to local clock), which could cause records |
|
73 * to be skipped on account of clock drift. This should never happen, because |
|
74 * <i>every</i> server response should have a well-formed X-Weave-Timestamp |
|
75 * header. |
|
76 * |
|
77 * @param response |
|
78 * The <code>SyncStorageResponse</code> to interrogate. |
|
79 * @return Normalized timestamp in milliseconds. |
|
80 */ |
|
81 public static long getNormalizedTimestamp(SyncStorageResponse response) { |
|
82 long normalizedTimestamp = -1; |
|
83 try { |
|
84 normalizedTimestamp = response.normalizedWeaveTimestamp(); |
|
85 } catch (NumberFormatException e) { |
|
86 Logger.warn(LOG_TAG, "Malformed X-Weave-Timestamp header received.", e); |
|
87 } |
|
88 if (-1 == normalizedTimestamp) { |
|
89 Logger.warn(LOG_TAG, "Computing stand-in timestamp from local clock. Clock drift could cause records to be skipped."); |
|
90 normalizedTimestamp = System.currentTimeMillis(); |
|
91 } |
|
92 return normalizedTimestamp; |
|
93 } |
|
94 |
|
95 /** |
|
96 * Used to track outstanding requests, so that we can abort them as needed. |
|
97 */ |
|
98 private Set<SyncStorageCollectionRequest> pending = Collections.synchronizedSet(new HashSet<SyncStorageCollectionRequest>()); |
|
99 |
|
100 @Override |
|
101 public void abort() { |
|
102 super.abort(); |
|
103 for (SyncStorageCollectionRequest request : pending) { |
|
104 request.abort(); |
|
105 } |
|
106 pending.clear(); |
|
107 } |
|
108 |
|
109 /** |
|
110 * Convert HTTP request delegate callbacks into fetch callbacks within the |
|
111 * context of this RepositorySession. |
|
112 * |
|
113 * @author rnewman |
|
114 * |
|
115 */ |
|
116 public class RequestFetchDelegateAdapter extends WBOCollectionRequestDelegate { |
|
117 RepositorySessionFetchRecordsDelegate delegate; |
|
118 private DelayedWorkTracker workTracker = new DelayedWorkTracker(); |
|
119 |
|
120 // So that we can clean up. |
|
121 private SyncStorageCollectionRequest request; |
|
122 |
|
123 public void setRequest(SyncStorageCollectionRequest request) { |
|
124 this.request = request; |
|
125 } |
|
126 private void removeRequestFromPending() { |
|
127 if (this.request == null) { |
|
128 return; |
|
129 } |
|
130 pending.remove(this.request); |
|
131 this.request = null; |
|
132 } |
|
133 |
|
134 public RequestFetchDelegateAdapter(RepositorySessionFetchRecordsDelegate delegate) { |
|
135 this.delegate = delegate; |
|
136 } |
|
137 |
|
138 @Override |
|
139 public AuthHeaderProvider getAuthHeaderProvider() { |
|
140 return serverRepository.getAuthHeaderProvider(); |
|
141 } |
|
142 |
|
143 @Override |
|
144 public String ifUnmodifiedSince() { |
|
145 return null; |
|
146 } |
|
147 |
|
148 @Override |
|
149 public void handleRequestSuccess(SyncStorageResponse response) { |
|
150 Logger.debug(LOG_TAG, "Fetch done."); |
|
151 removeRequestFromPending(); |
|
152 |
|
153 final long normalizedTimestamp = getNormalizedTimestamp(response); |
|
154 Logger.debug(LOG_TAG, "Fetch completed. Timestamp is " + normalizedTimestamp); |
|
155 |
|
156 // When we're done processing other events, finish. |
|
157 workTracker.delayWorkItem(new Runnable() { |
|
158 @Override |
|
159 public void run() { |
|
160 Logger.debug(LOG_TAG, "Delayed onFetchCompleted running."); |
|
161 // TODO: verify number of returned records. |
|
162 delegate.onFetchCompleted(normalizedTimestamp); |
|
163 } |
|
164 }); |
|
165 } |
|
166 |
|
167 @Override |
|
168 public void handleRequestFailure(SyncStorageResponse response) { |
|
169 // TODO: ensure that delegate methods don't get called more than once. |
|
170 this.handleRequestError(new HTTPFailureException(response)); |
|
171 } |
|
172 |
|
173 @Override |
|
174 public void handleRequestError(final Exception ex) { |
|
175 removeRequestFromPending(); |
|
176 Logger.warn(LOG_TAG, "Got request error.", ex); |
|
177 // When we're done processing other events, finish. |
|
178 workTracker.delayWorkItem(new Runnable() { |
|
179 @Override |
|
180 public void run() { |
|
181 Logger.debug(LOG_TAG, "Running onFetchFailed."); |
|
182 delegate.onFetchFailed(ex, null); |
|
183 } |
|
184 }); |
|
185 } |
|
186 |
|
187 @Override |
|
188 public void handleWBO(CryptoRecord record) { |
|
189 workTracker.incrementOutstanding(); |
|
190 try { |
|
191 delegate.onFetchedRecord(record); |
|
192 } catch (Exception ex) { |
|
193 Logger.warn(LOG_TAG, "Got exception calling onFetchedRecord with WBO.", ex); |
|
194 // TODO: handle this better. |
|
195 throw new RuntimeException(ex); |
|
196 } finally { |
|
197 workTracker.decrementOutstanding(); |
|
198 } |
|
199 } |
|
200 |
|
201 // TODO: this implies that we've screwed up our inheritance chain somehow. |
|
202 @Override |
|
203 public KeyBundle keyBundle() { |
|
204 return null; |
|
205 } |
|
206 } |
|
207 |
|
208 |
|
209 Server11Repository serverRepository; |
|
210 AtomicLong uploadTimestamp = new AtomicLong(0); |
|
211 |
|
212 private void bumpUploadTimestamp(long ts) { |
|
213 while (true) { |
|
214 long existing = uploadTimestamp.get(); |
|
215 if (existing > ts) { |
|
216 return; |
|
217 } |
|
218 if (uploadTimestamp.compareAndSet(existing, ts)) { |
|
219 return; |
|
220 } |
|
221 } |
|
222 } |
|
223 |
|
224 public Server11RepositorySession(Repository repository) { |
|
225 super(repository); |
|
226 serverRepository = (Server11Repository) repository; |
|
227 } |
|
228 |
|
229 private String flattenIDs(String[] guids) { |
|
230 // Consider using Utils.toDelimitedString if and when the signature changes |
|
231 // to Collection<String> guids. |
|
232 if (guids.length == 0) { |
|
233 return ""; |
|
234 } |
|
235 if (guids.length == 1) { |
|
236 return guids[0]; |
|
237 } |
|
238 StringBuilder b = new StringBuilder(); |
|
239 for (String guid : guids) { |
|
240 b.append(guid); |
|
241 b.append(","); |
|
242 } |
|
243 return b.substring(0, b.length() - 1); |
|
244 } |
|
245 |
|
246 @Override |
|
247 public void guidsSince(long timestamp, |
|
248 RepositorySessionGuidsSinceDelegate delegate) { |
|
249 // TODO Auto-generated method stub |
|
250 |
|
251 } |
|
252 |
|
253 protected void fetchWithParameters(long newer, |
|
254 long limit, |
|
255 boolean full, |
|
256 String sort, |
|
257 String ids, |
|
258 RequestFetchDelegateAdapter delegate) |
|
259 throws URISyntaxException { |
|
260 |
|
261 URI collectionURI = serverRepository.collectionURI(full, newer, limit, sort, ids); |
|
262 SyncStorageCollectionRequest request = new SyncStorageCollectionRequest(collectionURI); |
|
263 request.delegate = delegate; |
|
264 |
|
265 // So it can clean up. |
|
266 delegate.setRequest(request); |
|
267 pending.add(request); |
|
268 request.get(); |
|
269 } |
|
270 |
|
271 public void fetchSince(long timestamp, long limit, String sort, RepositorySessionFetchRecordsDelegate delegate) { |
|
272 try { |
|
273 this.fetchWithParameters(timestamp, limit, true, sort, null, new RequestFetchDelegateAdapter(delegate)); |
|
274 } catch (URISyntaxException e) { |
|
275 delegate.onFetchFailed(e, null); |
|
276 } |
|
277 } |
|
278 |
|
279 @Override |
|
280 public void fetchSince(long timestamp, |
|
281 RepositorySessionFetchRecordsDelegate delegate) { |
|
282 try { |
|
283 long limit = serverRepository.getDefaultFetchLimit(); |
|
284 String sort = serverRepository.getDefaultSort(); |
|
285 this.fetchWithParameters(timestamp, limit, true, sort, null, new RequestFetchDelegateAdapter(delegate)); |
|
286 } catch (URISyntaxException e) { |
|
287 delegate.onFetchFailed(e, null); |
|
288 } |
|
289 } |
|
290 |
|
291 @Override |
|
292 public void fetchAll(RepositorySessionFetchRecordsDelegate delegate) { |
|
293 this.fetchSince(-1, delegate); |
|
294 } |
|
295 |
|
296 @Override |
|
297 public void fetch(String[] guids, |
|
298 RepositorySessionFetchRecordsDelegate delegate) { |
|
299 // TODO: watch out for URL length limits! |
|
300 try { |
|
301 String ids = flattenIDs(guids); |
|
302 this.fetchWithParameters(-1, -1, true, "index", ids, new RequestFetchDelegateAdapter(delegate)); |
|
303 } catch (URISyntaxException e) { |
|
304 delegate.onFetchFailed(e, null); |
|
305 } |
|
306 } |
|
307 |
|
308 @Override |
|
309 public void wipe(RepositorySessionWipeDelegate delegate) { |
|
310 if (!isActive()) { |
|
311 delegate.onWipeFailed(new InactiveSessionException(null)); |
|
312 return; |
|
313 } |
|
314 // TODO: implement wipe. |
|
315 } |
|
316 |
|
317 protected Object recordsBufferMonitor = new Object(); |
|
318 |
|
319 /** |
|
320 * Data of outbound records. |
|
321 * <p> |
|
322 * We buffer the data (rather than the <code>Record</code>) so that we can |
|
323 * flush the buffer based on outgoing transmission size. |
|
324 * <p> |
|
325 * Access should be synchronized on <code>recordsBufferMonitor</code>. |
|
326 */ |
|
327 protected ArrayList<byte[]> recordsBuffer = new ArrayList<byte[]>(); |
|
328 |
|
329 /** |
|
330 * GUIDs of outbound records. |
|
331 * <p> |
|
332 * Used to fail entire outgoing uploads. |
|
333 * <p> |
|
334 * Access should be synchronized on <code>recordsBufferMonitor</code>. |
|
335 */ |
|
336 protected ArrayList<String> recordGuidsBuffer = new ArrayList<String>(); |
|
337 protected int byteCount = PER_BATCH_OVERHEAD; |
|
338 |
|
339 @Override |
|
340 public void store(Record record) throws NoStoreDelegateException { |
|
341 if (delegate == null) { |
|
342 throw new NoStoreDelegateException(); |
|
343 } |
|
344 this.enqueue(record); |
|
345 } |
|
346 |
|
347 /** |
|
348 * Batch incoming records until some reasonable threshold (e.g., 50), |
|
349 * some size limit is hit (probably way less than 3MB!), or storeDone |
|
350 * is received. |
|
351 * @param record |
|
352 */ |
|
353 protected void enqueue(Record record) { |
|
354 // JSONify and store the bytes, rather than the record. |
|
355 byte[] json = record.toJSONBytes(); |
|
356 int delta = json.length; |
|
357 synchronized (recordsBufferMonitor) { |
|
358 if ((delta + byteCount > UPLOAD_BYTE_THRESHOLD) || |
|
359 (recordsBuffer.size() >= UPLOAD_ITEM_THRESHOLD)) { |
|
360 |
|
361 // POST the existing contents, then enqueue. |
|
362 flush(); |
|
363 } |
|
364 recordsBuffer.add(json); |
|
365 recordGuidsBuffer.add(record.guid); |
|
366 byteCount += PER_RECORD_OVERHEAD + delta; |
|
367 } |
|
368 } |
|
369 |
|
370 // Asynchronously upload records. |
|
371 // Must be locked! |
|
372 protected void flush() { |
|
373 if (recordsBuffer.size() > 0) { |
|
374 final ArrayList<byte[]> outgoing = recordsBuffer; |
|
375 final ArrayList<String> outgoingGuids = recordGuidsBuffer; |
|
376 RepositorySessionStoreDelegate uploadDelegate = this.delegate; |
|
377 storeWorkQueue.execute(new RecordUploadRunnable(uploadDelegate, outgoing, outgoingGuids, byteCount)); |
|
378 |
|
379 recordsBuffer = new ArrayList<byte[]>(); |
|
380 recordGuidsBuffer = new ArrayList<String>(); |
|
381 byteCount = PER_BATCH_OVERHEAD; |
|
382 } |
|
383 } |
|
384 |
|
385 @Override |
|
386 public void storeDone() { |
|
387 Logger.debug(LOG_TAG, "storeDone()."); |
|
388 synchronized (recordsBufferMonitor) { |
|
389 flush(); |
|
390 // Do this in a Runnable so that the timestamp is grabbed after any upload. |
|
391 final Runnable r = new Runnable() { |
|
392 @Override |
|
393 public void run() { |
|
394 synchronized (recordsBufferMonitor) { |
|
395 final long end = uploadTimestamp.get(); |
|
396 Logger.debug(LOG_TAG, "Calling storeDone with " + end); |
|
397 storeDone(end); |
|
398 } |
|
399 } |
|
400 }; |
|
401 storeWorkQueue.execute(r); |
|
402 } |
|
403 } |
|
404 |
|
405 /** |
|
406 * <code>true</code> if a record upload has failed this session. |
|
407 * <p> |
|
408 * This is only set in begin and possibly by <code>RecordUploadRunnable</code>. |
|
409 * Since those are executed serially, we can use an unsynchronized |
|
410 * volatile boolean here. |
|
411 */ |
|
412 protected volatile boolean recordUploadFailed; |
|
413 |
|
414 @Override |
|
415 public void begin(RepositorySessionBeginDelegate delegate) throws InvalidSessionTransitionException { |
|
416 recordUploadFailed = false; |
|
417 super.begin(delegate); |
|
418 } |
|
419 |
|
420 /** |
|
421 * Make an HTTP request, and convert HTTP request delegate callbacks into |
|
422 * store callbacks within the context of this RepositorySession. |
|
423 * |
|
424 * @author rnewman |
|
425 * |
|
426 */ |
|
427 protected class RecordUploadRunnable implements Runnable, SyncStorageRequestDelegate { |
|
428 |
|
429 public final String LOG_TAG = "RecordUploadRunnable"; |
|
430 private ArrayList<byte[]> outgoing; |
|
431 private ArrayList<String> outgoingGuids; |
|
432 private long byteCount; |
|
433 |
|
434 public RecordUploadRunnable(RepositorySessionStoreDelegate storeDelegate, |
|
435 ArrayList<byte[]> outgoing, |
|
436 ArrayList<String> outgoingGuids, |
|
437 long byteCount) { |
|
438 Logger.debug(LOG_TAG, "Preparing record upload for " + |
|
439 outgoing.size() + " records (" + |
|
440 byteCount + " bytes)."); |
|
441 this.outgoing = outgoing; |
|
442 this.outgoingGuids = outgoingGuids; |
|
443 this.byteCount = byteCount; |
|
444 } |
|
445 |
|
446 @Override |
|
447 public AuthHeaderProvider getAuthHeaderProvider() { |
|
448 return serverRepository.getAuthHeaderProvider(); |
|
449 } |
|
450 |
|
451 @Override |
|
452 public String ifUnmodifiedSince() { |
|
453 return null; |
|
454 } |
|
455 |
|
456 @Override |
|
457 public void handleRequestSuccess(SyncStorageResponse response) { |
|
458 Logger.trace(LOG_TAG, "POST of " + outgoing.size() + " records done."); |
|
459 |
|
460 ExtendedJSONObject body; |
|
461 try { |
|
462 body = response.jsonObjectBody(); // jsonObjectBody() throws or returns non-null. |
|
463 } catch (Exception e) { |
|
464 Logger.error(LOG_TAG, "Got exception parsing POST success body.", e); |
|
465 this.handleRequestError(e); |
|
466 return; |
|
467 } |
|
468 |
|
469 // Be defensive when logging timestamp. |
|
470 if (body.containsKey("modified")) { |
|
471 Long modified = body.getTimestamp("modified"); |
|
472 if (modified != null) { |
|
473 Logger.trace(LOG_TAG, "POST request success. Modified timestamp: " + modified.longValue()); |
|
474 } else { |
|
475 Logger.warn(LOG_TAG, "POST success body contains malformed 'modified': " + body.toJSONString()); |
|
476 } |
|
477 } else { |
|
478 Logger.warn(LOG_TAG, "POST success body does not contain key 'modified': " + body.toJSONString()); |
|
479 } |
|
480 |
|
481 try { |
|
482 JSONArray success = body.getArray("success"); |
|
483 if ((success != null) && |
|
484 (success.size() > 0)) { |
|
485 Logger.trace(LOG_TAG, "Successful records: " + success.toString()); |
|
486 for (Object o : success) { |
|
487 try { |
|
488 delegate.onRecordStoreSucceeded((String) o); |
|
489 } catch (ClassCastException e) { |
|
490 Logger.error(LOG_TAG, "Got exception parsing POST success guid.", e); |
|
491 // Not much to be done. |
|
492 } |
|
493 } |
|
494 |
|
495 long normalizedTimestamp = getNormalizedTimestamp(response); |
|
496 Logger.trace(LOG_TAG, "Passing back upload X-Weave-Timestamp: " + normalizedTimestamp); |
|
497 bumpUploadTimestamp(normalizedTimestamp); |
|
498 } |
|
499 success = null; // Want to GC this ASAP. |
|
500 |
|
501 ExtendedJSONObject failed = body.getObject("failed"); |
|
502 if ((failed != null) && |
|
503 (failed.object.size() > 0)) { |
|
504 Logger.debug(LOG_TAG, "Failed records: " + failed.object.toString()); |
|
505 Exception ex = new Server11RecordPostFailedException(); |
|
506 for (String guid : failed.keySet()) { |
|
507 delegate.onRecordStoreFailed(ex, guid); |
|
508 } |
|
509 } |
|
510 failed = null; // Want to GC this ASAP. |
|
511 } catch (UnexpectedJSONException e) { |
|
512 Logger.error(LOG_TAG, "Got exception processing success/failed in POST success body.", e); |
|
513 // TODO |
|
514 return; |
|
515 } |
|
516 Logger.debug(LOG_TAG, "POST of " + outgoing.size() + " records handled."); |
|
517 } |
|
518 |
|
519 @Override |
|
520 public void handleRequestFailure(SyncStorageResponse response) { |
|
521 // TODO: call session.interpretHTTPFailure. |
|
522 this.handleRequestError(new HTTPFailureException(response)); |
|
523 } |
|
524 |
|
525 @Override |
|
526 public void handleRequestError(final Exception ex) { |
|
527 Logger.warn(LOG_TAG, "Got request error.", ex); |
|
528 |
|
529 recordUploadFailed = true; |
|
530 ArrayList<String> failedOutgoingGuids = outgoingGuids; |
|
531 outgoingGuids = null; // Want to GC this ASAP. |
|
532 for (String guid : failedOutgoingGuids) { |
|
533 delegate.onRecordStoreFailed(ex, guid); |
|
534 } |
|
535 return; |
|
536 } |
|
537 |
|
538 public class ByteArraysContentProducer implements ContentProducer { |
|
539 |
|
540 ArrayList<byte[]> outgoing; |
|
541 public ByteArraysContentProducer(ArrayList<byte[]> arrays) { |
|
542 outgoing = arrays; |
|
543 } |
|
544 |
|
545 @Override |
|
546 public void writeTo(OutputStream outstream) throws IOException { |
|
547 int count = outgoing.size(); |
|
548 outstream.write(recordsStart); |
|
549 outstream.write(outgoing.get(0)); |
|
550 for (int i = 1; i < count; ++i) { |
|
551 outstream.write(recordSeparator); |
|
552 outstream.write(outgoing.get(i)); |
|
553 } |
|
554 outstream.write(recordsEnd); |
|
555 } |
|
556 } |
|
557 |
|
558 public class ByteArraysEntity extends EntityTemplate { |
|
559 private long count; |
|
560 public ByteArraysEntity(ArrayList<byte[]> arrays, long totalBytes) { |
|
561 super(new ByteArraysContentProducer(arrays)); |
|
562 this.count = totalBytes; |
|
563 this.setContentType("application/json"); |
|
564 // charset is set in BaseResource. |
|
565 } |
|
566 |
|
567 @Override |
|
568 public long getContentLength() { |
|
569 return count; |
|
570 } |
|
571 |
|
572 @Override |
|
573 public boolean isRepeatable() { |
|
574 return true; |
|
575 } |
|
576 } |
|
577 |
|
578 public ByteArraysEntity getBodyEntity() { |
|
579 ByteArraysEntity body = new ByteArraysEntity(outgoing, byteCount); |
|
580 return body; |
|
581 } |
|
582 |
|
583 @Override |
|
584 public void run() { |
|
585 if (recordUploadFailed) { |
|
586 Logger.info(LOG_TAG, "Previous record upload failed. Failing all records and not retrying."); |
|
587 Exception ex = new Server11PreviousPostFailedException(); |
|
588 for (String guid : outgoingGuids) { |
|
589 delegate.onRecordStoreFailed(ex, guid); |
|
590 } |
|
591 return; |
|
592 } |
|
593 |
|
594 if (outgoing == null || |
|
595 outgoing.size() == 0) { |
|
596 Logger.debug(LOG_TAG, "No items: RecordUploadRunnable returning immediately."); |
|
597 return; |
|
598 } |
|
599 |
|
600 URI u = serverRepository.collectionURI(); |
|
601 SyncStorageRequest request = new SyncStorageRequest(u); |
|
602 |
|
603 request.delegate = this; |
|
604 |
|
605 // We don't want the task queue to proceed until this request completes. |
|
606 // Fortunately, BaseResource is currently synchronous. |
|
607 // If that ever changes, you'll need to block here. |
|
608 ByteArraysEntity body = getBodyEntity(); |
|
609 request.post(body); |
|
610 } |
|
611 } |
|
612 |
|
613 @Override |
|
614 public boolean dataAvailable() { |
|
615 return serverRepository.updateNeeded(getLastSyncTimestamp()); |
|
616 } |
|
617 } |