mobile/android/base/sync/repositories/Server11RepositorySession.java

Wed, 31 Dec 2014 07:22:50 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Wed, 31 Dec 2014 07:22:50 +0100
branch
TOR_BUG_3246
changeset 4
fc2d59ddac77
permissions
-rw-r--r--

Correct previous dual key logic pending first delivery installment.

michael@0 1 /* This Source Code Form is subject to the terms of the Mozilla Public
michael@0 2 * License, v. 2.0. If a copy of the MPL was not distributed with this
michael@0 3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
michael@0 4
michael@0 5 package org.mozilla.gecko.sync.repositories;
michael@0 6
michael@0 7 import java.io.IOException;
michael@0 8 import java.io.OutputStream;
michael@0 9 import java.io.UnsupportedEncodingException;
michael@0 10 import java.net.URI;
michael@0 11 import java.net.URISyntaxException;
michael@0 12 import java.util.ArrayList;
michael@0 13 import java.util.Collections;
michael@0 14 import java.util.HashSet;
michael@0 15 import java.util.Set;
michael@0 16 import java.util.concurrent.atomic.AtomicLong;
michael@0 17
michael@0 18 import org.json.simple.JSONArray;
michael@0 19 import org.mozilla.gecko.background.common.log.Logger;
michael@0 20 import org.mozilla.gecko.sync.CryptoRecord;
michael@0 21 import org.mozilla.gecko.sync.DelayedWorkTracker;
michael@0 22 import org.mozilla.gecko.sync.ExtendedJSONObject;
michael@0 23 import org.mozilla.gecko.sync.HTTPFailureException;
michael@0 24 import org.mozilla.gecko.sync.Server11PreviousPostFailedException;
michael@0 25 import org.mozilla.gecko.sync.Server11RecordPostFailedException;
michael@0 26 import org.mozilla.gecko.sync.UnexpectedJSONException;
michael@0 27 import org.mozilla.gecko.sync.crypto.KeyBundle;
michael@0 28 import org.mozilla.gecko.sync.net.AuthHeaderProvider;
michael@0 29 import org.mozilla.gecko.sync.net.SyncStorageCollectionRequest;
michael@0 30 import org.mozilla.gecko.sync.net.SyncStorageRequest;
michael@0 31 import org.mozilla.gecko.sync.net.SyncStorageRequestDelegate;
michael@0 32 import org.mozilla.gecko.sync.net.SyncStorageResponse;
michael@0 33 import org.mozilla.gecko.sync.net.WBOCollectionRequestDelegate;
michael@0 34 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionBeginDelegate;
michael@0 35 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate;
michael@0 36 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionGuidsSinceDelegate;
michael@0 37 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
michael@0 38 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionWipeDelegate;
michael@0 39 import org.mozilla.gecko.sync.repositories.domain.Record;
michael@0 40
michael@0 41 import ch.boye.httpclientandroidlib.entity.ContentProducer;
michael@0 42 import ch.boye.httpclientandroidlib.entity.EntityTemplate;
michael@0 43
michael@0 44 public class Server11RepositorySession extends RepositorySession {
michael@0 45 private static byte[] recordsStart;
michael@0 46 private static byte[] recordSeparator;
michael@0 47 private static byte[] recordsEnd;
michael@0 48
michael@0 49 static {
michael@0 50 try {
michael@0 51 recordsStart = "[\n".getBytes("UTF-8");
michael@0 52 recordSeparator = ",\n".getBytes("UTF-8");
michael@0 53 recordsEnd = "\n]\n".getBytes("UTF-8");
michael@0 54 } catch (UnsupportedEncodingException e) {
michael@0 55 // These won't fail.
michael@0 56 }
michael@0 57 }
michael@0 58
michael@0 59 public static final String LOG_TAG = "Server11Session";
michael@0 60
michael@0 61 private static final int UPLOAD_BYTE_THRESHOLD = 1024 * 1024; // 1MB.
michael@0 62 private static final int UPLOAD_ITEM_THRESHOLD = 50;
michael@0 63 private static final int PER_RECORD_OVERHEAD = 2; // Comma, newline.
michael@0 64 // {}, newlines, but we get to skip one record overhead.
michael@0 65 private static final int PER_BATCH_OVERHEAD = 5 - PER_RECORD_OVERHEAD;
michael@0 66
michael@0 67 /**
michael@0 68 * Return the X-Weave-Timestamp header from <code>response</code>, or the
michael@0 69 * current time if it is missing.
michael@0 70 * <p>
michael@0 71 * <b>Warning:</b> this can cause the timestamp of <code>response</code> to
michael@0 72 * cross domains (from server clock to local clock), which could cause records
michael@0 73 * to be skipped on account of clock drift. This should never happen, because
michael@0 74 * <i>every</i> server response should have a well-formed X-Weave-Timestamp
michael@0 75 * header.
michael@0 76 *
michael@0 77 * @param response
michael@0 78 * The <code>SyncStorageResponse</code> to interrogate.
michael@0 79 * @return Normalized timestamp in milliseconds.
michael@0 80 */
michael@0 81 public static long getNormalizedTimestamp(SyncStorageResponse response) {
michael@0 82 long normalizedTimestamp = -1;
michael@0 83 try {
michael@0 84 normalizedTimestamp = response.normalizedWeaveTimestamp();
michael@0 85 } catch (NumberFormatException e) {
michael@0 86 Logger.warn(LOG_TAG, "Malformed X-Weave-Timestamp header received.", e);
michael@0 87 }
michael@0 88 if (-1 == normalizedTimestamp) {
michael@0 89 Logger.warn(LOG_TAG, "Computing stand-in timestamp from local clock. Clock drift could cause records to be skipped.");
michael@0 90 normalizedTimestamp = System.currentTimeMillis();
michael@0 91 }
michael@0 92 return normalizedTimestamp;
michael@0 93 }
michael@0 94
michael@0 95 /**
michael@0 96 * Used to track outstanding requests, so that we can abort them as needed.
michael@0 97 */
michael@0 98 private Set<SyncStorageCollectionRequest> pending = Collections.synchronizedSet(new HashSet<SyncStorageCollectionRequest>());
michael@0 99
michael@0 100 @Override
michael@0 101 public void abort() {
michael@0 102 super.abort();
michael@0 103 for (SyncStorageCollectionRequest request : pending) {
michael@0 104 request.abort();
michael@0 105 }
michael@0 106 pending.clear();
michael@0 107 }
michael@0 108
michael@0 109 /**
michael@0 110 * Convert HTTP request delegate callbacks into fetch callbacks within the
michael@0 111 * context of this RepositorySession.
michael@0 112 *
michael@0 113 * @author rnewman
michael@0 114 *
michael@0 115 */
michael@0 116 public class RequestFetchDelegateAdapter extends WBOCollectionRequestDelegate {
michael@0 117 RepositorySessionFetchRecordsDelegate delegate;
michael@0 118 private DelayedWorkTracker workTracker = new DelayedWorkTracker();
michael@0 119
michael@0 120 // So that we can clean up.
michael@0 121 private SyncStorageCollectionRequest request;
michael@0 122
michael@0 123 public void setRequest(SyncStorageCollectionRequest request) {
michael@0 124 this.request = request;
michael@0 125 }
michael@0 126 private void removeRequestFromPending() {
michael@0 127 if (this.request == null) {
michael@0 128 return;
michael@0 129 }
michael@0 130 pending.remove(this.request);
michael@0 131 this.request = null;
michael@0 132 }
michael@0 133
michael@0 134 public RequestFetchDelegateAdapter(RepositorySessionFetchRecordsDelegate delegate) {
michael@0 135 this.delegate = delegate;
michael@0 136 }
michael@0 137
michael@0 138 @Override
michael@0 139 public AuthHeaderProvider getAuthHeaderProvider() {
michael@0 140 return serverRepository.getAuthHeaderProvider();
michael@0 141 }
michael@0 142
michael@0 143 @Override
michael@0 144 public String ifUnmodifiedSince() {
michael@0 145 return null;
michael@0 146 }
michael@0 147
michael@0 148 @Override
michael@0 149 public void handleRequestSuccess(SyncStorageResponse response) {
michael@0 150 Logger.debug(LOG_TAG, "Fetch done.");
michael@0 151 removeRequestFromPending();
michael@0 152
michael@0 153 final long normalizedTimestamp = getNormalizedTimestamp(response);
michael@0 154 Logger.debug(LOG_TAG, "Fetch completed. Timestamp is " + normalizedTimestamp);
michael@0 155
michael@0 156 // When we're done processing other events, finish.
michael@0 157 workTracker.delayWorkItem(new Runnable() {
michael@0 158 @Override
michael@0 159 public void run() {
michael@0 160 Logger.debug(LOG_TAG, "Delayed onFetchCompleted running.");
michael@0 161 // TODO: verify number of returned records.
michael@0 162 delegate.onFetchCompleted(normalizedTimestamp);
michael@0 163 }
michael@0 164 });
michael@0 165 }
michael@0 166
michael@0 167 @Override
michael@0 168 public void handleRequestFailure(SyncStorageResponse response) {
michael@0 169 // TODO: ensure that delegate methods don't get called more than once.
michael@0 170 this.handleRequestError(new HTTPFailureException(response));
michael@0 171 }
michael@0 172
michael@0 173 @Override
michael@0 174 public void handleRequestError(final Exception ex) {
michael@0 175 removeRequestFromPending();
michael@0 176 Logger.warn(LOG_TAG, "Got request error.", ex);
michael@0 177 // When we're done processing other events, finish.
michael@0 178 workTracker.delayWorkItem(new Runnable() {
michael@0 179 @Override
michael@0 180 public void run() {
michael@0 181 Logger.debug(LOG_TAG, "Running onFetchFailed.");
michael@0 182 delegate.onFetchFailed(ex, null);
michael@0 183 }
michael@0 184 });
michael@0 185 }
michael@0 186
michael@0 187 @Override
michael@0 188 public void handleWBO(CryptoRecord record) {
michael@0 189 workTracker.incrementOutstanding();
michael@0 190 try {
michael@0 191 delegate.onFetchedRecord(record);
michael@0 192 } catch (Exception ex) {
michael@0 193 Logger.warn(LOG_TAG, "Got exception calling onFetchedRecord with WBO.", ex);
michael@0 194 // TODO: handle this better.
michael@0 195 throw new RuntimeException(ex);
michael@0 196 } finally {
michael@0 197 workTracker.decrementOutstanding();
michael@0 198 }
michael@0 199 }
michael@0 200
michael@0 201 // TODO: this implies that we've screwed up our inheritance chain somehow.
michael@0 202 @Override
michael@0 203 public KeyBundle keyBundle() {
michael@0 204 return null;
michael@0 205 }
michael@0 206 }
michael@0 207
michael@0 208
michael@0 209 Server11Repository serverRepository;
michael@0 210 AtomicLong uploadTimestamp = new AtomicLong(0);
michael@0 211
michael@0 212 private void bumpUploadTimestamp(long ts) {
michael@0 213 while (true) {
michael@0 214 long existing = uploadTimestamp.get();
michael@0 215 if (existing > ts) {
michael@0 216 return;
michael@0 217 }
michael@0 218 if (uploadTimestamp.compareAndSet(existing, ts)) {
michael@0 219 return;
michael@0 220 }
michael@0 221 }
michael@0 222 }
michael@0 223
michael@0 224 public Server11RepositorySession(Repository repository) {
michael@0 225 super(repository);
michael@0 226 serverRepository = (Server11Repository) repository;
michael@0 227 }
michael@0 228
michael@0 229 private String flattenIDs(String[] guids) {
michael@0 230 // Consider using Utils.toDelimitedString if and when the signature changes
michael@0 231 // to Collection<String> guids.
michael@0 232 if (guids.length == 0) {
michael@0 233 return "";
michael@0 234 }
michael@0 235 if (guids.length == 1) {
michael@0 236 return guids[0];
michael@0 237 }
michael@0 238 StringBuilder b = new StringBuilder();
michael@0 239 for (String guid : guids) {
michael@0 240 b.append(guid);
michael@0 241 b.append(",");
michael@0 242 }
michael@0 243 return b.substring(0, b.length() - 1);
michael@0 244 }
michael@0 245
michael@0 246 @Override
michael@0 247 public void guidsSince(long timestamp,
michael@0 248 RepositorySessionGuidsSinceDelegate delegate) {
michael@0 249 // TODO Auto-generated method stub
michael@0 250
michael@0 251 }
michael@0 252
michael@0 253 protected void fetchWithParameters(long newer,
michael@0 254 long limit,
michael@0 255 boolean full,
michael@0 256 String sort,
michael@0 257 String ids,
michael@0 258 RequestFetchDelegateAdapter delegate)
michael@0 259 throws URISyntaxException {
michael@0 260
michael@0 261 URI collectionURI = serverRepository.collectionURI(full, newer, limit, sort, ids);
michael@0 262 SyncStorageCollectionRequest request = new SyncStorageCollectionRequest(collectionURI);
michael@0 263 request.delegate = delegate;
michael@0 264
michael@0 265 // So it can clean up.
michael@0 266 delegate.setRequest(request);
michael@0 267 pending.add(request);
michael@0 268 request.get();
michael@0 269 }
michael@0 270
michael@0 271 public void fetchSince(long timestamp, long limit, String sort, RepositorySessionFetchRecordsDelegate delegate) {
michael@0 272 try {
michael@0 273 this.fetchWithParameters(timestamp, limit, true, sort, null, new RequestFetchDelegateAdapter(delegate));
michael@0 274 } catch (URISyntaxException e) {
michael@0 275 delegate.onFetchFailed(e, null);
michael@0 276 }
michael@0 277 }
michael@0 278
michael@0 279 @Override
michael@0 280 public void fetchSince(long timestamp,
michael@0 281 RepositorySessionFetchRecordsDelegate delegate) {
michael@0 282 try {
michael@0 283 long limit = serverRepository.getDefaultFetchLimit();
michael@0 284 String sort = serverRepository.getDefaultSort();
michael@0 285 this.fetchWithParameters(timestamp, limit, true, sort, null, new RequestFetchDelegateAdapter(delegate));
michael@0 286 } catch (URISyntaxException e) {
michael@0 287 delegate.onFetchFailed(e, null);
michael@0 288 }
michael@0 289 }
michael@0 290
michael@0 291 @Override
michael@0 292 public void fetchAll(RepositorySessionFetchRecordsDelegate delegate) {
michael@0 293 this.fetchSince(-1, delegate);
michael@0 294 }
michael@0 295
michael@0 296 @Override
michael@0 297 public void fetch(String[] guids,
michael@0 298 RepositorySessionFetchRecordsDelegate delegate) {
michael@0 299 // TODO: watch out for URL length limits!
michael@0 300 try {
michael@0 301 String ids = flattenIDs(guids);
michael@0 302 this.fetchWithParameters(-1, -1, true, "index", ids, new RequestFetchDelegateAdapter(delegate));
michael@0 303 } catch (URISyntaxException e) {
michael@0 304 delegate.onFetchFailed(e, null);
michael@0 305 }
michael@0 306 }
michael@0 307
michael@0 308 @Override
michael@0 309 public void wipe(RepositorySessionWipeDelegate delegate) {
michael@0 310 if (!isActive()) {
michael@0 311 delegate.onWipeFailed(new InactiveSessionException(null));
michael@0 312 return;
michael@0 313 }
michael@0 314 // TODO: implement wipe.
michael@0 315 }
michael@0 316
michael@0 317 protected Object recordsBufferMonitor = new Object();
michael@0 318
michael@0 319 /**
michael@0 320 * Data of outbound records.
michael@0 321 * <p>
michael@0 322 * We buffer the data (rather than the <code>Record</code>) so that we can
michael@0 323 * flush the buffer based on outgoing transmission size.
michael@0 324 * <p>
michael@0 325 * Access should be synchronized on <code>recordsBufferMonitor</code>.
michael@0 326 */
michael@0 327 protected ArrayList<byte[]> recordsBuffer = new ArrayList<byte[]>();
michael@0 328
michael@0 329 /**
michael@0 330 * GUIDs of outbound records.
michael@0 331 * <p>
michael@0 332 * Used to fail entire outgoing uploads.
michael@0 333 * <p>
michael@0 334 * Access should be synchronized on <code>recordsBufferMonitor</code>.
michael@0 335 */
michael@0 336 protected ArrayList<String> recordGuidsBuffer = new ArrayList<String>();
michael@0 337 protected int byteCount = PER_BATCH_OVERHEAD;
michael@0 338
michael@0 339 @Override
michael@0 340 public void store(Record record) throws NoStoreDelegateException {
michael@0 341 if (delegate == null) {
michael@0 342 throw new NoStoreDelegateException();
michael@0 343 }
michael@0 344 this.enqueue(record);
michael@0 345 }
michael@0 346
michael@0 347 /**
michael@0 348 * Batch incoming records until some reasonable threshold (e.g., 50),
michael@0 349 * some size limit is hit (probably way less than 3MB!), or storeDone
michael@0 350 * is received.
michael@0 351 * @param record
michael@0 352 */
michael@0 353 protected void enqueue(Record record) {
michael@0 354 // JSONify and store the bytes, rather than the record.
michael@0 355 byte[] json = record.toJSONBytes();
michael@0 356 int delta = json.length;
michael@0 357 synchronized (recordsBufferMonitor) {
michael@0 358 if ((delta + byteCount > UPLOAD_BYTE_THRESHOLD) ||
michael@0 359 (recordsBuffer.size() >= UPLOAD_ITEM_THRESHOLD)) {
michael@0 360
michael@0 361 // POST the existing contents, then enqueue.
michael@0 362 flush();
michael@0 363 }
michael@0 364 recordsBuffer.add(json);
michael@0 365 recordGuidsBuffer.add(record.guid);
michael@0 366 byteCount += PER_RECORD_OVERHEAD + delta;
michael@0 367 }
michael@0 368 }
michael@0 369
michael@0 370 // Asynchronously upload records.
michael@0 371 // Must be locked!
michael@0 372 protected void flush() {
michael@0 373 if (recordsBuffer.size() > 0) {
michael@0 374 final ArrayList<byte[]> outgoing = recordsBuffer;
michael@0 375 final ArrayList<String> outgoingGuids = recordGuidsBuffer;
michael@0 376 RepositorySessionStoreDelegate uploadDelegate = this.delegate;
michael@0 377 storeWorkQueue.execute(new RecordUploadRunnable(uploadDelegate, outgoing, outgoingGuids, byteCount));
michael@0 378
michael@0 379 recordsBuffer = new ArrayList<byte[]>();
michael@0 380 recordGuidsBuffer = new ArrayList<String>();
michael@0 381 byteCount = PER_BATCH_OVERHEAD;
michael@0 382 }
michael@0 383 }
michael@0 384
michael@0 385 @Override
michael@0 386 public void storeDone() {
michael@0 387 Logger.debug(LOG_TAG, "storeDone().");
michael@0 388 synchronized (recordsBufferMonitor) {
michael@0 389 flush();
michael@0 390 // Do this in a Runnable so that the timestamp is grabbed after any upload.
michael@0 391 final Runnable r = new Runnable() {
michael@0 392 @Override
michael@0 393 public void run() {
michael@0 394 synchronized (recordsBufferMonitor) {
michael@0 395 final long end = uploadTimestamp.get();
michael@0 396 Logger.debug(LOG_TAG, "Calling storeDone with " + end);
michael@0 397 storeDone(end);
michael@0 398 }
michael@0 399 }
michael@0 400 };
michael@0 401 storeWorkQueue.execute(r);
michael@0 402 }
michael@0 403 }
michael@0 404
michael@0 405 /**
michael@0 406 * <code>true</code> if a record upload has failed this session.
michael@0 407 * <p>
michael@0 408 * This is only set in begin and possibly by <code>RecordUploadRunnable</code>.
michael@0 409 * Since those are executed serially, we can use an unsynchronized
michael@0 410 * volatile boolean here.
michael@0 411 */
michael@0 412 protected volatile boolean recordUploadFailed;
michael@0 413
michael@0 414 @Override
michael@0 415 public void begin(RepositorySessionBeginDelegate delegate) throws InvalidSessionTransitionException {
michael@0 416 recordUploadFailed = false;
michael@0 417 super.begin(delegate);
michael@0 418 }
michael@0 419
michael@0 420 /**
michael@0 421 * Make an HTTP request, and convert HTTP request delegate callbacks into
michael@0 422 * store callbacks within the context of this RepositorySession.
michael@0 423 *
michael@0 424 * @author rnewman
michael@0 425 *
michael@0 426 */
michael@0 427 protected class RecordUploadRunnable implements Runnable, SyncStorageRequestDelegate {
michael@0 428
michael@0 429 public final String LOG_TAG = "RecordUploadRunnable";
michael@0 430 private ArrayList<byte[]> outgoing;
michael@0 431 private ArrayList<String> outgoingGuids;
michael@0 432 private long byteCount;
michael@0 433
michael@0 434 public RecordUploadRunnable(RepositorySessionStoreDelegate storeDelegate,
michael@0 435 ArrayList<byte[]> outgoing,
michael@0 436 ArrayList<String> outgoingGuids,
michael@0 437 long byteCount) {
michael@0 438 Logger.debug(LOG_TAG, "Preparing record upload for " +
michael@0 439 outgoing.size() + " records (" +
michael@0 440 byteCount + " bytes).");
michael@0 441 this.outgoing = outgoing;
michael@0 442 this.outgoingGuids = outgoingGuids;
michael@0 443 this.byteCount = byteCount;
michael@0 444 }
michael@0 445
michael@0 446 @Override
michael@0 447 public AuthHeaderProvider getAuthHeaderProvider() {
michael@0 448 return serverRepository.getAuthHeaderProvider();
michael@0 449 }
michael@0 450
michael@0 451 @Override
michael@0 452 public String ifUnmodifiedSince() {
michael@0 453 return null;
michael@0 454 }
michael@0 455
michael@0 456 @Override
michael@0 457 public void handleRequestSuccess(SyncStorageResponse response) {
michael@0 458 Logger.trace(LOG_TAG, "POST of " + outgoing.size() + " records done.");
michael@0 459
michael@0 460 ExtendedJSONObject body;
michael@0 461 try {
michael@0 462 body = response.jsonObjectBody(); // jsonObjectBody() throws or returns non-null.
michael@0 463 } catch (Exception e) {
michael@0 464 Logger.error(LOG_TAG, "Got exception parsing POST success body.", e);
michael@0 465 this.handleRequestError(e);
michael@0 466 return;
michael@0 467 }
michael@0 468
michael@0 469 // Be defensive when logging timestamp.
michael@0 470 if (body.containsKey("modified")) {
michael@0 471 Long modified = body.getTimestamp("modified");
michael@0 472 if (modified != null) {
michael@0 473 Logger.trace(LOG_TAG, "POST request success. Modified timestamp: " + modified.longValue());
michael@0 474 } else {
michael@0 475 Logger.warn(LOG_TAG, "POST success body contains malformed 'modified': " + body.toJSONString());
michael@0 476 }
michael@0 477 } else {
michael@0 478 Logger.warn(LOG_TAG, "POST success body does not contain key 'modified': " + body.toJSONString());
michael@0 479 }
michael@0 480
michael@0 481 try {
michael@0 482 JSONArray success = body.getArray("success");
michael@0 483 if ((success != null) &&
michael@0 484 (success.size() > 0)) {
michael@0 485 Logger.trace(LOG_TAG, "Successful records: " + success.toString());
michael@0 486 for (Object o : success) {
michael@0 487 try {
michael@0 488 delegate.onRecordStoreSucceeded((String) o);
michael@0 489 } catch (ClassCastException e) {
michael@0 490 Logger.error(LOG_TAG, "Got exception parsing POST success guid.", e);
michael@0 491 // Not much to be done.
michael@0 492 }
michael@0 493 }
michael@0 494
michael@0 495 long normalizedTimestamp = getNormalizedTimestamp(response);
michael@0 496 Logger.trace(LOG_TAG, "Passing back upload X-Weave-Timestamp: " + normalizedTimestamp);
michael@0 497 bumpUploadTimestamp(normalizedTimestamp);
michael@0 498 }
michael@0 499 success = null; // Want to GC this ASAP.
michael@0 500
michael@0 501 ExtendedJSONObject failed = body.getObject("failed");
michael@0 502 if ((failed != null) &&
michael@0 503 (failed.object.size() > 0)) {
michael@0 504 Logger.debug(LOG_TAG, "Failed records: " + failed.object.toString());
michael@0 505 Exception ex = new Server11RecordPostFailedException();
michael@0 506 for (String guid : failed.keySet()) {
michael@0 507 delegate.onRecordStoreFailed(ex, guid);
michael@0 508 }
michael@0 509 }
michael@0 510 failed = null; // Want to GC this ASAP.
michael@0 511 } catch (UnexpectedJSONException e) {
michael@0 512 Logger.error(LOG_TAG, "Got exception processing success/failed in POST success body.", e);
michael@0 513 // TODO
michael@0 514 return;
michael@0 515 }
michael@0 516 Logger.debug(LOG_TAG, "POST of " + outgoing.size() + " records handled.");
michael@0 517 }
michael@0 518
michael@0 519 @Override
michael@0 520 public void handleRequestFailure(SyncStorageResponse response) {
michael@0 521 // TODO: call session.interpretHTTPFailure.
michael@0 522 this.handleRequestError(new HTTPFailureException(response));
michael@0 523 }
michael@0 524
michael@0 525 @Override
michael@0 526 public void handleRequestError(final Exception ex) {
michael@0 527 Logger.warn(LOG_TAG, "Got request error.", ex);
michael@0 528
michael@0 529 recordUploadFailed = true;
michael@0 530 ArrayList<String> failedOutgoingGuids = outgoingGuids;
michael@0 531 outgoingGuids = null; // Want to GC this ASAP.
michael@0 532 for (String guid : failedOutgoingGuids) {
michael@0 533 delegate.onRecordStoreFailed(ex, guid);
michael@0 534 }
michael@0 535 return;
michael@0 536 }
michael@0 537
michael@0 538 public class ByteArraysContentProducer implements ContentProducer {
michael@0 539
michael@0 540 ArrayList<byte[]> outgoing;
michael@0 541 public ByteArraysContentProducer(ArrayList<byte[]> arrays) {
michael@0 542 outgoing = arrays;
michael@0 543 }
michael@0 544
michael@0 545 @Override
michael@0 546 public void writeTo(OutputStream outstream) throws IOException {
michael@0 547 int count = outgoing.size();
michael@0 548 outstream.write(recordsStart);
michael@0 549 outstream.write(outgoing.get(0));
michael@0 550 for (int i = 1; i < count; ++i) {
michael@0 551 outstream.write(recordSeparator);
michael@0 552 outstream.write(outgoing.get(i));
michael@0 553 }
michael@0 554 outstream.write(recordsEnd);
michael@0 555 }
michael@0 556 }
michael@0 557
michael@0 558 public class ByteArraysEntity extends EntityTemplate {
michael@0 559 private long count;
michael@0 560 public ByteArraysEntity(ArrayList<byte[]> arrays, long totalBytes) {
michael@0 561 super(new ByteArraysContentProducer(arrays));
michael@0 562 this.count = totalBytes;
michael@0 563 this.setContentType("application/json");
michael@0 564 // charset is set in BaseResource.
michael@0 565 }
michael@0 566
michael@0 567 @Override
michael@0 568 public long getContentLength() {
michael@0 569 return count;
michael@0 570 }
michael@0 571
michael@0 572 @Override
michael@0 573 public boolean isRepeatable() {
michael@0 574 return true;
michael@0 575 }
michael@0 576 }
michael@0 577
michael@0 578 public ByteArraysEntity getBodyEntity() {
michael@0 579 ByteArraysEntity body = new ByteArraysEntity(outgoing, byteCount);
michael@0 580 return body;
michael@0 581 }
michael@0 582
michael@0 583 @Override
michael@0 584 public void run() {
michael@0 585 if (recordUploadFailed) {
michael@0 586 Logger.info(LOG_TAG, "Previous record upload failed. Failing all records and not retrying.");
michael@0 587 Exception ex = new Server11PreviousPostFailedException();
michael@0 588 for (String guid : outgoingGuids) {
michael@0 589 delegate.onRecordStoreFailed(ex, guid);
michael@0 590 }
michael@0 591 return;
michael@0 592 }
michael@0 593
michael@0 594 if (outgoing == null ||
michael@0 595 outgoing.size() == 0) {
michael@0 596 Logger.debug(LOG_TAG, "No items: RecordUploadRunnable returning immediately.");
michael@0 597 return;
michael@0 598 }
michael@0 599
michael@0 600 URI u = serverRepository.collectionURI();
michael@0 601 SyncStorageRequest request = new SyncStorageRequest(u);
michael@0 602
michael@0 603 request.delegate = this;
michael@0 604
michael@0 605 // We don't want the task queue to proceed until this request completes.
michael@0 606 // Fortunately, BaseResource is currently synchronous.
michael@0 607 // If that ever changes, you'll need to block here.
michael@0 608 ByteArraysEntity body = getBodyEntity();
michael@0 609 request.post(body);
michael@0 610 }
michael@0 611 }
michael@0 612
michael@0 613 @Override
michael@0 614 public boolean dataAvailable() {
michael@0 615 return serverRepository.updateNeeded(getLastSyncTimestamp());
michael@0 616 }
michael@0 617 }

mercurial