1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 1.2 +++ b/mobile/android/base/sync/repositories/Server11RepositorySession.java Wed Dec 31 06:09:35 2014 +0100 1.3 @@ -0,0 +1,617 @@ 1.4 +/* This Source Code Form is subject to the terms of the Mozilla Public 1.5 + * License, v. 2.0. If a copy of the MPL was not distributed with this 1.6 + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 1.7 + 1.8 +package org.mozilla.gecko.sync.repositories; 1.9 + 1.10 +import java.io.IOException; 1.11 +import java.io.OutputStream; 1.12 +import java.io.UnsupportedEncodingException; 1.13 +import java.net.URI; 1.14 +import java.net.URISyntaxException; 1.15 +import java.util.ArrayList; 1.16 +import java.util.Collections; 1.17 +import java.util.HashSet; 1.18 +import java.util.Set; 1.19 +import java.util.concurrent.atomic.AtomicLong; 1.20 + 1.21 +import org.json.simple.JSONArray; 1.22 +import org.mozilla.gecko.background.common.log.Logger; 1.23 +import org.mozilla.gecko.sync.CryptoRecord; 1.24 +import org.mozilla.gecko.sync.DelayedWorkTracker; 1.25 +import org.mozilla.gecko.sync.ExtendedJSONObject; 1.26 +import org.mozilla.gecko.sync.HTTPFailureException; 1.27 +import org.mozilla.gecko.sync.Server11PreviousPostFailedException; 1.28 +import org.mozilla.gecko.sync.Server11RecordPostFailedException; 1.29 +import org.mozilla.gecko.sync.UnexpectedJSONException; 1.30 +import org.mozilla.gecko.sync.crypto.KeyBundle; 1.31 +import org.mozilla.gecko.sync.net.AuthHeaderProvider; 1.32 +import org.mozilla.gecko.sync.net.SyncStorageCollectionRequest; 1.33 +import org.mozilla.gecko.sync.net.SyncStorageRequest; 1.34 +import org.mozilla.gecko.sync.net.SyncStorageRequestDelegate; 1.35 +import org.mozilla.gecko.sync.net.SyncStorageResponse; 1.36 +import org.mozilla.gecko.sync.net.WBOCollectionRequestDelegate; 1.37 +import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionBeginDelegate; 1.38 +import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate; 1.39 +import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionGuidsSinceDelegate; 1.40 +import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate; 1.41 +import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionWipeDelegate; 1.42 +import org.mozilla.gecko.sync.repositories.domain.Record; 1.43 + 1.44 +import ch.boye.httpclientandroidlib.entity.ContentProducer; 1.45 +import ch.boye.httpclientandroidlib.entity.EntityTemplate; 1.46 + 1.47 +public class Server11RepositorySession extends RepositorySession { 1.48 + private static byte[] recordsStart; 1.49 + private static byte[] recordSeparator; 1.50 + private static byte[] recordsEnd; 1.51 + 1.52 + static { 1.53 + try { 1.54 + recordsStart = "[\n".getBytes("UTF-8"); 1.55 + recordSeparator = ",\n".getBytes("UTF-8"); 1.56 + recordsEnd = "\n]\n".getBytes("UTF-8"); 1.57 + } catch (UnsupportedEncodingException e) { 1.58 + // These won't fail. 1.59 + } 1.60 + } 1.61 + 1.62 + public static final String LOG_TAG = "Server11Session"; 1.63 + 1.64 + private static final int UPLOAD_BYTE_THRESHOLD = 1024 * 1024; // 1MB. 1.65 + private static final int UPLOAD_ITEM_THRESHOLD = 50; 1.66 + private static final int PER_RECORD_OVERHEAD = 2; // Comma, newline. 1.67 + // {}, newlines, but we get to skip one record overhead. 1.68 + private static final int PER_BATCH_OVERHEAD = 5 - PER_RECORD_OVERHEAD; 1.69 + 1.70 + /** 1.71 + * Return the X-Weave-Timestamp header from <code>response</code>, or the 1.72 + * current time if it is missing. 1.73 + * <p> 1.74 + * <b>Warning:</b> this can cause the timestamp of <code>response</code> to 1.75 + * cross domains (from server clock to local clock), which could cause records 1.76 + * to be skipped on account of clock drift. This should never happen, because 1.77 + * <i>every</i> server response should have a well-formed X-Weave-Timestamp 1.78 + * header. 1.79 + * 1.80 + * @param response 1.81 + * The <code>SyncStorageResponse</code> to interrogate. 1.82 + * @return Normalized timestamp in milliseconds. 1.83 + */ 1.84 + public static long getNormalizedTimestamp(SyncStorageResponse response) { 1.85 + long normalizedTimestamp = -1; 1.86 + try { 1.87 + normalizedTimestamp = response.normalizedWeaveTimestamp(); 1.88 + } catch (NumberFormatException e) { 1.89 + Logger.warn(LOG_TAG, "Malformed X-Weave-Timestamp header received.", e); 1.90 + } 1.91 + if (-1 == normalizedTimestamp) { 1.92 + Logger.warn(LOG_TAG, "Computing stand-in timestamp from local clock. Clock drift could cause records to be skipped."); 1.93 + normalizedTimestamp = System.currentTimeMillis(); 1.94 + } 1.95 + return normalizedTimestamp; 1.96 + } 1.97 + 1.98 + /** 1.99 + * Used to track outstanding requests, so that we can abort them as needed. 1.100 + */ 1.101 + private Set<SyncStorageCollectionRequest> pending = Collections.synchronizedSet(new HashSet<SyncStorageCollectionRequest>()); 1.102 + 1.103 + @Override 1.104 + public void abort() { 1.105 + super.abort(); 1.106 + for (SyncStorageCollectionRequest request : pending) { 1.107 + request.abort(); 1.108 + } 1.109 + pending.clear(); 1.110 + } 1.111 + 1.112 + /** 1.113 + * Convert HTTP request delegate callbacks into fetch callbacks within the 1.114 + * context of this RepositorySession. 1.115 + * 1.116 + * @author rnewman 1.117 + * 1.118 + */ 1.119 + public class RequestFetchDelegateAdapter extends WBOCollectionRequestDelegate { 1.120 + RepositorySessionFetchRecordsDelegate delegate; 1.121 + private DelayedWorkTracker workTracker = new DelayedWorkTracker(); 1.122 + 1.123 + // So that we can clean up. 1.124 + private SyncStorageCollectionRequest request; 1.125 + 1.126 + public void setRequest(SyncStorageCollectionRequest request) { 1.127 + this.request = request; 1.128 + } 1.129 + private void removeRequestFromPending() { 1.130 + if (this.request == null) { 1.131 + return; 1.132 + } 1.133 + pending.remove(this.request); 1.134 + this.request = null; 1.135 + } 1.136 + 1.137 + public RequestFetchDelegateAdapter(RepositorySessionFetchRecordsDelegate delegate) { 1.138 + this.delegate = delegate; 1.139 + } 1.140 + 1.141 + @Override 1.142 + public AuthHeaderProvider getAuthHeaderProvider() { 1.143 + return serverRepository.getAuthHeaderProvider(); 1.144 + } 1.145 + 1.146 + @Override 1.147 + public String ifUnmodifiedSince() { 1.148 + return null; 1.149 + } 1.150 + 1.151 + @Override 1.152 + public void handleRequestSuccess(SyncStorageResponse response) { 1.153 + Logger.debug(LOG_TAG, "Fetch done."); 1.154 + removeRequestFromPending(); 1.155 + 1.156 + final long normalizedTimestamp = getNormalizedTimestamp(response); 1.157 + Logger.debug(LOG_TAG, "Fetch completed. Timestamp is " + normalizedTimestamp); 1.158 + 1.159 + // When we're done processing other events, finish. 1.160 + workTracker.delayWorkItem(new Runnable() { 1.161 + @Override 1.162 + public void run() { 1.163 + Logger.debug(LOG_TAG, "Delayed onFetchCompleted running."); 1.164 + // TODO: verify number of returned records. 1.165 + delegate.onFetchCompleted(normalizedTimestamp); 1.166 + } 1.167 + }); 1.168 + } 1.169 + 1.170 + @Override 1.171 + public void handleRequestFailure(SyncStorageResponse response) { 1.172 + // TODO: ensure that delegate methods don't get called more than once. 1.173 + this.handleRequestError(new HTTPFailureException(response)); 1.174 + } 1.175 + 1.176 + @Override 1.177 + public void handleRequestError(final Exception ex) { 1.178 + removeRequestFromPending(); 1.179 + Logger.warn(LOG_TAG, "Got request error.", ex); 1.180 + // When we're done processing other events, finish. 1.181 + workTracker.delayWorkItem(new Runnable() { 1.182 + @Override 1.183 + public void run() { 1.184 + Logger.debug(LOG_TAG, "Running onFetchFailed."); 1.185 + delegate.onFetchFailed(ex, null); 1.186 + } 1.187 + }); 1.188 + } 1.189 + 1.190 + @Override 1.191 + public void handleWBO(CryptoRecord record) { 1.192 + workTracker.incrementOutstanding(); 1.193 + try { 1.194 + delegate.onFetchedRecord(record); 1.195 + } catch (Exception ex) { 1.196 + Logger.warn(LOG_TAG, "Got exception calling onFetchedRecord with WBO.", ex); 1.197 + // TODO: handle this better. 1.198 + throw new RuntimeException(ex); 1.199 + } finally { 1.200 + workTracker.decrementOutstanding(); 1.201 + } 1.202 + } 1.203 + 1.204 + // TODO: this implies that we've screwed up our inheritance chain somehow. 1.205 + @Override 1.206 + public KeyBundle keyBundle() { 1.207 + return null; 1.208 + } 1.209 + } 1.210 + 1.211 + 1.212 + Server11Repository serverRepository; 1.213 + AtomicLong uploadTimestamp = new AtomicLong(0); 1.214 + 1.215 + private void bumpUploadTimestamp(long ts) { 1.216 + while (true) { 1.217 + long existing = uploadTimestamp.get(); 1.218 + if (existing > ts) { 1.219 + return; 1.220 + } 1.221 + if (uploadTimestamp.compareAndSet(existing, ts)) { 1.222 + return; 1.223 + } 1.224 + } 1.225 + } 1.226 + 1.227 + public Server11RepositorySession(Repository repository) { 1.228 + super(repository); 1.229 + serverRepository = (Server11Repository) repository; 1.230 + } 1.231 + 1.232 + private String flattenIDs(String[] guids) { 1.233 + // Consider using Utils.toDelimitedString if and when the signature changes 1.234 + // to Collection<String> guids. 1.235 + if (guids.length == 0) { 1.236 + return ""; 1.237 + } 1.238 + if (guids.length == 1) { 1.239 + return guids[0]; 1.240 + } 1.241 + StringBuilder b = new StringBuilder(); 1.242 + for (String guid : guids) { 1.243 + b.append(guid); 1.244 + b.append(","); 1.245 + } 1.246 + return b.substring(0, b.length() - 1); 1.247 + } 1.248 + 1.249 + @Override 1.250 + public void guidsSince(long timestamp, 1.251 + RepositorySessionGuidsSinceDelegate delegate) { 1.252 + // TODO Auto-generated method stub 1.253 + 1.254 + } 1.255 + 1.256 + protected void fetchWithParameters(long newer, 1.257 + long limit, 1.258 + boolean full, 1.259 + String sort, 1.260 + String ids, 1.261 + RequestFetchDelegateAdapter delegate) 1.262 + throws URISyntaxException { 1.263 + 1.264 + URI collectionURI = serverRepository.collectionURI(full, newer, limit, sort, ids); 1.265 + SyncStorageCollectionRequest request = new SyncStorageCollectionRequest(collectionURI); 1.266 + request.delegate = delegate; 1.267 + 1.268 + // So it can clean up. 1.269 + delegate.setRequest(request); 1.270 + pending.add(request); 1.271 + request.get(); 1.272 + } 1.273 + 1.274 + public void fetchSince(long timestamp, long limit, String sort, RepositorySessionFetchRecordsDelegate delegate) { 1.275 + try { 1.276 + this.fetchWithParameters(timestamp, limit, true, sort, null, new RequestFetchDelegateAdapter(delegate)); 1.277 + } catch (URISyntaxException e) { 1.278 + delegate.onFetchFailed(e, null); 1.279 + } 1.280 + } 1.281 + 1.282 + @Override 1.283 + public void fetchSince(long timestamp, 1.284 + RepositorySessionFetchRecordsDelegate delegate) { 1.285 + try { 1.286 + long limit = serverRepository.getDefaultFetchLimit(); 1.287 + String sort = serverRepository.getDefaultSort(); 1.288 + this.fetchWithParameters(timestamp, limit, true, sort, null, new RequestFetchDelegateAdapter(delegate)); 1.289 + } catch (URISyntaxException e) { 1.290 + delegate.onFetchFailed(e, null); 1.291 + } 1.292 + } 1.293 + 1.294 + @Override 1.295 + public void fetchAll(RepositorySessionFetchRecordsDelegate delegate) { 1.296 + this.fetchSince(-1, delegate); 1.297 + } 1.298 + 1.299 + @Override 1.300 + public void fetch(String[] guids, 1.301 + RepositorySessionFetchRecordsDelegate delegate) { 1.302 + // TODO: watch out for URL length limits! 1.303 + try { 1.304 + String ids = flattenIDs(guids); 1.305 + this.fetchWithParameters(-1, -1, true, "index", ids, new RequestFetchDelegateAdapter(delegate)); 1.306 + } catch (URISyntaxException e) { 1.307 + delegate.onFetchFailed(e, null); 1.308 + } 1.309 + } 1.310 + 1.311 + @Override 1.312 + public void wipe(RepositorySessionWipeDelegate delegate) { 1.313 + if (!isActive()) { 1.314 + delegate.onWipeFailed(new InactiveSessionException(null)); 1.315 + return; 1.316 + } 1.317 + // TODO: implement wipe. 1.318 + } 1.319 + 1.320 + protected Object recordsBufferMonitor = new Object(); 1.321 + 1.322 + /** 1.323 + * Data of outbound records. 1.324 + * <p> 1.325 + * We buffer the data (rather than the <code>Record</code>) so that we can 1.326 + * flush the buffer based on outgoing transmission size. 1.327 + * <p> 1.328 + * Access should be synchronized on <code>recordsBufferMonitor</code>. 1.329 + */ 1.330 + protected ArrayList<byte[]> recordsBuffer = new ArrayList<byte[]>(); 1.331 + 1.332 + /** 1.333 + * GUIDs of outbound records. 1.334 + * <p> 1.335 + * Used to fail entire outgoing uploads. 1.336 + * <p> 1.337 + * Access should be synchronized on <code>recordsBufferMonitor</code>. 1.338 + */ 1.339 + protected ArrayList<String> recordGuidsBuffer = new ArrayList<String>(); 1.340 + protected int byteCount = PER_BATCH_OVERHEAD; 1.341 + 1.342 + @Override 1.343 + public void store(Record record) throws NoStoreDelegateException { 1.344 + if (delegate == null) { 1.345 + throw new NoStoreDelegateException(); 1.346 + } 1.347 + this.enqueue(record); 1.348 + } 1.349 + 1.350 + /** 1.351 + * Batch incoming records until some reasonable threshold (e.g., 50), 1.352 + * some size limit is hit (probably way less than 3MB!), or storeDone 1.353 + * is received. 1.354 + * @param record 1.355 + */ 1.356 + protected void enqueue(Record record) { 1.357 + // JSONify and store the bytes, rather than the record. 1.358 + byte[] json = record.toJSONBytes(); 1.359 + int delta = json.length; 1.360 + synchronized (recordsBufferMonitor) { 1.361 + if ((delta + byteCount > UPLOAD_BYTE_THRESHOLD) || 1.362 + (recordsBuffer.size() >= UPLOAD_ITEM_THRESHOLD)) { 1.363 + 1.364 + // POST the existing contents, then enqueue. 1.365 + flush(); 1.366 + } 1.367 + recordsBuffer.add(json); 1.368 + recordGuidsBuffer.add(record.guid); 1.369 + byteCount += PER_RECORD_OVERHEAD + delta; 1.370 + } 1.371 + } 1.372 + 1.373 + // Asynchronously upload records. 1.374 + // Must be locked! 1.375 + protected void flush() { 1.376 + if (recordsBuffer.size() > 0) { 1.377 + final ArrayList<byte[]> outgoing = recordsBuffer; 1.378 + final ArrayList<String> outgoingGuids = recordGuidsBuffer; 1.379 + RepositorySessionStoreDelegate uploadDelegate = this.delegate; 1.380 + storeWorkQueue.execute(new RecordUploadRunnable(uploadDelegate, outgoing, outgoingGuids, byteCount)); 1.381 + 1.382 + recordsBuffer = new ArrayList<byte[]>(); 1.383 + recordGuidsBuffer = new ArrayList<String>(); 1.384 + byteCount = PER_BATCH_OVERHEAD; 1.385 + } 1.386 + } 1.387 + 1.388 + @Override 1.389 + public void storeDone() { 1.390 + Logger.debug(LOG_TAG, "storeDone()."); 1.391 + synchronized (recordsBufferMonitor) { 1.392 + flush(); 1.393 + // Do this in a Runnable so that the timestamp is grabbed after any upload. 1.394 + final Runnable r = new Runnable() { 1.395 + @Override 1.396 + public void run() { 1.397 + synchronized (recordsBufferMonitor) { 1.398 + final long end = uploadTimestamp.get(); 1.399 + Logger.debug(LOG_TAG, "Calling storeDone with " + end); 1.400 + storeDone(end); 1.401 + } 1.402 + } 1.403 + }; 1.404 + storeWorkQueue.execute(r); 1.405 + } 1.406 + } 1.407 + 1.408 + /** 1.409 + * <code>true</code> if a record upload has failed this session. 1.410 + * <p> 1.411 + * This is only set in begin and possibly by <code>RecordUploadRunnable</code>. 1.412 + * Since those are executed serially, we can use an unsynchronized 1.413 + * volatile boolean here. 1.414 + */ 1.415 + protected volatile boolean recordUploadFailed; 1.416 + 1.417 + @Override 1.418 + public void begin(RepositorySessionBeginDelegate delegate) throws InvalidSessionTransitionException { 1.419 + recordUploadFailed = false; 1.420 + super.begin(delegate); 1.421 + } 1.422 + 1.423 + /** 1.424 + * Make an HTTP request, and convert HTTP request delegate callbacks into 1.425 + * store callbacks within the context of this RepositorySession. 1.426 + * 1.427 + * @author rnewman 1.428 + * 1.429 + */ 1.430 + protected class RecordUploadRunnable implements Runnable, SyncStorageRequestDelegate { 1.431 + 1.432 + public final String LOG_TAG = "RecordUploadRunnable"; 1.433 + private ArrayList<byte[]> outgoing; 1.434 + private ArrayList<String> outgoingGuids; 1.435 + private long byteCount; 1.436 + 1.437 + public RecordUploadRunnable(RepositorySessionStoreDelegate storeDelegate, 1.438 + ArrayList<byte[]> outgoing, 1.439 + ArrayList<String> outgoingGuids, 1.440 + long byteCount) { 1.441 + Logger.debug(LOG_TAG, "Preparing record upload for " + 1.442 + outgoing.size() + " records (" + 1.443 + byteCount + " bytes)."); 1.444 + this.outgoing = outgoing; 1.445 + this.outgoingGuids = outgoingGuids; 1.446 + this.byteCount = byteCount; 1.447 + } 1.448 + 1.449 + @Override 1.450 + public AuthHeaderProvider getAuthHeaderProvider() { 1.451 + return serverRepository.getAuthHeaderProvider(); 1.452 + } 1.453 + 1.454 + @Override 1.455 + public String ifUnmodifiedSince() { 1.456 + return null; 1.457 + } 1.458 + 1.459 + @Override 1.460 + public void handleRequestSuccess(SyncStorageResponse response) { 1.461 + Logger.trace(LOG_TAG, "POST of " + outgoing.size() + " records done."); 1.462 + 1.463 + ExtendedJSONObject body; 1.464 + try { 1.465 + body = response.jsonObjectBody(); // jsonObjectBody() throws or returns non-null. 1.466 + } catch (Exception e) { 1.467 + Logger.error(LOG_TAG, "Got exception parsing POST success body.", e); 1.468 + this.handleRequestError(e); 1.469 + return; 1.470 + } 1.471 + 1.472 + // Be defensive when logging timestamp. 1.473 + if (body.containsKey("modified")) { 1.474 + Long modified = body.getTimestamp("modified"); 1.475 + if (modified != null) { 1.476 + Logger.trace(LOG_TAG, "POST request success. Modified timestamp: " + modified.longValue()); 1.477 + } else { 1.478 + Logger.warn(LOG_TAG, "POST success body contains malformed 'modified': " + body.toJSONString()); 1.479 + } 1.480 + } else { 1.481 + Logger.warn(LOG_TAG, "POST success body does not contain key 'modified': " + body.toJSONString()); 1.482 + } 1.483 + 1.484 + try { 1.485 + JSONArray success = body.getArray("success"); 1.486 + if ((success != null) && 1.487 + (success.size() > 0)) { 1.488 + Logger.trace(LOG_TAG, "Successful records: " + success.toString()); 1.489 + for (Object o : success) { 1.490 + try { 1.491 + delegate.onRecordStoreSucceeded((String) o); 1.492 + } catch (ClassCastException e) { 1.493 + Logger.error(LOG_TAG, "Got exception parsing POST success guid.", e); 1.494 + // Not much to be done. 1.495 + } 1.496 + } 1.497 + 1.498 + long normalizedTimestamp = getNormalizedTimestamp(response); 1.499 + Logger.trace(LOG_TAG, "Passing back upload X-Weave-Timestamp: " + normalizedTimestamp); 1.500 + bumpUploadTimestamp(normalizedTimestamp); 1.501 + } 1.502 + success = null; // Want to GC this ASAP. 1.503 + 1.504 + ExtendedJSONObject failed = body.getObject("failed"); 1.505 + if ((failed != null) && 1.506 + (failed.object.size() > 0)) { 1.507 + Logger.debug(LOG_TAG, "Failed records: " + failed.object.toString()); 1.508 + Exception ex = new Server11RecordPostFailedException(); 1.509 + for (String guid : failed.keySet()) { 1.510 + delegate.onRecordStoreFailed(ex, guid); 1.511 + } 1.512 + } 1.513 + failed = null; // Want to GC this ASAP. 1.514 + } catch (UnexpectedJSONException e) { 1.515 + Logger.error(LOG_TAG, "Got exception processing success/failed in POST success body.", e); 1.516 + // TODO 1.517 + return; 1.518 + } 1.519 + Logger.debug(LOG_TAG, "POST of " + outgoing.size() + " records handled."); 1.520 + } 1.521 + 1.522 + @Override 1.523 + public void handleRequestFailure(SyncStorageResponse response) { 1.524 + // TODO: call session.interpretHTTPFailure. 1.525 + this.handleRequestError(new HTTPFailureException(response)); 1.526 + } 1.527 + 1.528 + @Override 1.529 + public void handleRequestError(final Exception ex) { 1.530 + Logger.warn(LOG_TAG, "Got request error.", ex); 1.531 + 1.532 + recordUploadFailed = true; 1.533 + ArrayList<String> failedOutgoingGuids = outgoingGuids; 1.534 + outgoingGuids = null; // Want to GC this ASAP. 1.535 + for (String guid : failedOutgoingGuids) { 1.536 + delegate.onRecordStoreFailed(ex, guid); 1.537 + } 1.538 + return; 1.539 + } 1.540 + 1.541 + public class ByteArraysContentProducer implements ContentProducer { 1.542 + 1.543 + ArrayList<byte[]> outgoing; 1.544 + public ByteArraysContentProducer(ArrayList<byte[]> arrays) { 1.545 + outgoing = arrays; 1.546 + } 1.547 + 1.548 + @Override 1.549 + public void writeTo(OutputStream outstream) throws IOException { 1.550 + int count = outgoing.size(); 1.551 + outstream.write(recordsStart); 1.552 + outstream.write(outgoing.get(0)); 1.553 + for (int i = 1; i < count; ++i) { 1.554 + outstream.write(recordSeparator); 1.555 + outstream.write(outgoing.get(i)); 1.556 + } 1.557 + outstream.write(recordsEnd); 1.558 + } 1.559 + } 1.560 + 1.561 + public class ByteArraysEntity extends EntityTemplate { 1.562 + private long count; 1.563 + public ByteArraysEntity(ArrayList<byte[]> arrays, long totalBytes) { 1.564 + super(new ByteArraysContentProducer(arrays)); 1.565 + this.count = totalBytes; 1.566 + this.setContentType("application/json"); 1.567 + // charset is set in BaseResource. 1.568 + } 1.569 + 1.570 + @Override 1.571 + public long getContentLength() { 1.572 + return count; 1.573 + } 1.574 + 1.575 + @Override 1.576 + public boolean isRepeatable() { 1.577 + return true; 1.578 + } 1.579 + } 1.580 + 1.581 + public ByteArraysEntity getBodyEntity() { 1.582 + ByteArraysEntity body = new ByteArraysEntity(outgoing, byteCount); 1.583 + return body; 1.584 + } 1.585 + 1.586 + @Override 1.587 + public void run() { 1.588 + if (recordUploadFailed) { 1.589 + Logger.info(LOG_TAG, "Previous record upload failed. Failing all records and not retrying."); 1.590 + Exception ex = new Server11PreviousPostFailedException(); 1.591 + for (String guid : outgoingGuids) { 1.592 + delegate.onRecordStoreFailed(ex, guid); 1.593 + } 1.594 + return; 1.595 + } 1.596 + 1.597 + if (outgoing == null || 1.598 + outgoing.size() == 0) { 1.599 + Logger.debug(LOG_TAG, "No items: RecordUploadRunnable returning immediately."); 1.600 + return; 1.601 + } 1.602 + 1.603 + URI u = serverRepository.collectionURI(); 1.604 + SyncStorageRequest request = new SyncStorageRequest(u); 1.605 + 1.606 + request.delegate = this; 1.607 + 1.608 + // We don't want the task queue to proceed until this request completes. 1.609 + // Fortunately, BaseResource is currently synchronous. 1.610 + // If that ever changes, you'll need to block here. 1.611 + ByteArraysEntity body = getBodyEntity(); 1.612 + request.post(body); 1.613 + } 1.614 + } 1.615 + 1.616 + @Override 1.617 + public boolean dataAvailable() { 1.618 + return serverRepository.updateNeeded(getLastSyncTimestamp()); 1.619 + } 1.620 +}