michael@0: /* This Source Code Form is subject to the terms of the Mozilla Public
michael@0: * License, v. 2.0. If a copy of the MPL was not distributed with this
michael@0: * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
michael@0:
michael@0: package org.mozilla.gecko.sync.repositories;
michael@0:
michael@0: import java.io.IOException;
michael@0: import java.io.OutputStream;
michael@0: import java.io.UnsupportedEncodingException;
michael@0: import java.net.URI;
michael@0: import java.net.URISyntaxException;
michael@0: import java.util.ArrayList;
michael@0: import java.util.Collections;
michael@0: import java.util.HashSet;
michael@0: import java.util.Set;
michael@0: import java.util.concurrent.atomic.AtomicLong;
michael@0:
michael@0: import org.json.simple.JSONArray;
michael@0: import org.mozilla.gecko.background.common.log.Logger;
michael@0: import org.mozilla.gecko.sync.CryptoRecord;
michael@0: import org.mozilla.gecko.sync.DelayedWorkTracker;
michael@0: import org.mozilla.gecko.sync.ExtendedJSONObject;
michael@0: import org.mozilla.gecko.sync.HTTPFailureException;
michael@0: import org.mozilla.gecko.sync.Server11PreviousPostFailedException;
michael@0: import org.mozilla.gecko.sync.Server11RecordPostFailedException;
michael@0: import org.mozilla.gecko.sync.UnexpectedJSONException;
michael@0: import org.mozilla.gecko.sync.crypto.KeyBundle;
michael@0: import org.mozilla.gecko.sync.net.AuthHeaderProvider;
michael@0: import org.mozilla.gecko.sync.net.SyncStorageCollectionRequest;
michael@0: import org.mozilla.gecko.sync.net.SyncStorageRequest;
michael@0: import org.mozilla.gecko.sync.net.SyncStorageRequestDelegate;
michael@0: import org.mozilla.gecko.sync.net.SyncStorageResponse;
michael@0: import org.mozilla.gecko.sync.net.WBOCollectionRequestDelegate;
michael@0: import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionBeginDelegate;
michael@0: import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate;
michael@0: import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionGuidsSinceDelegate;
michael@0: import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
michael@0: import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionWipeDelegate;
michael@0: import org.mozilla.gecko.sync.repositories.domain.Record;
michael@0:
michael@0: import ch.boye.httpclientandroidlib.entity.ContentProducer;
michael@0: import ch.boye.httpclientandroidlib.entity.EntityTemplate;
michael@0:
michael@0: public class Server11RepositorySession extends RepositorySession {
michael@0: private static byte[] recordsStart;
michael@0: private static byte[] recordSeparator;
michael@0: private static byte[] recordsEnd;
michael@0:
michael@0: static {
michael@0: try {
michael@0: recordsStart = "[\n".getBytes("UTF-8");
michael@0: recordSeparator = ",\n".getBytes("UTF-8");
michael@0: recordsEnd = "\n]\n".getBytes("UTF-8");
michael@0: } catch (UnsupportedEncodingException e) {
michael@0: // These won't fail.
michael@0: }
michael@0: }
michael@0:
michael@0: public static final String LOG_TAG = "Server11Session";
michael@0:
michael@0: private static final int UPLOAD_BYTE_THRESHOLD = 1024 * 1024; // 1MB.
michael@0: private static final int UPLOAD_ITEM_THRESHOLD = 50;
michael@0: private static final int PER_RECORD_OVERHEAD = 2; // Comma, newline.
michael@0: // {}, newlines, but we get to skip one record overhead.
michael@0: private static final int PER_BATCH_OVERHEAD = 5 - PER_RECORD_OVERHEAD;
michael@0:
michael@0: /**
michael@0: * Return the X-Weave-Timestamp header from response
, or the
michael@0: * current time if it is missing.
michael@0: *
michael@0: * Warning: this can cause the timestamp of response
to
michael@0: * cross domains (from server clock to local clock), which could cause records
michael@0: * to be skipped on account of clock drift. This should never happen, because
michael@0: * every server response should have a well-formed X-Weave-Timestamp
michael@0: * header.
michael@0: *
michael@0: * @param response
michael@0: * The SyncStorageResponse
to interrogate.
michael@0: * @return Normalized timestamp in milliseconds.
michael@0: */
michael@0: public static long getNormalizedTimestamp(SyncStorageResponse response) {
michael@0: long normalizedTimestamp = -1;
michael@0: try {
michael@0: normalizedTimestamp = response.normalizedWeaveTimestamp();
michael@0: } catch (NumberFormatException e) {
michael@0: Logger.warn(LOG_TAG, "Malformed X-Weave-Timestamp header received.", e);
michael@0: }
michael@0: if (-1 == normalizedTimestamp) {
michael@0: Logger.warn(LOG_TAG, "Computing stand-in timestamp from local clock. Clock drift could cause records to be skipped.");
michael@0: normalizedTimestamp = System.currentTimeMillis();
michael@0: }
michael@0: return normalizedTimestamp;
michael@0: }
michael@0:
michael@0: /**
michael@0: * Used to track outstanding requests, so that we can abort them as needed.
michael@0: */
michael@0: private Set pending = Collections.synchronizedSet(new HashSet());
michael@0:
michael@0: @Override
michael@0: public void abort() {
michael@0: super.abort();
michael@0: for (SyncStorageCollectionRequest request : pending) {
michael@0: request.abort();
michael@0: }
michael@0: pending.clear();
michael@0: }
michael@0:
michael@0: /**
michael@0: * Convert HTTP request delegate callbacks into fetch callbacks within the
michael@0: * context of this RepositorySession.
michael@0: *
michael@0: * @author rnewman
michael@0: *
michael@0: */
michael@0: public class RequestFetchDelegateAdapter extends WBOCollectionRequestDelegate {
michael@0: RepositorySessionFetchRecordsDelegate delegate;
michael@0: private DelayedWorkTracker workTracker = new DelayedWorkTracker();
michael@0:
michael@0: // So that we can clean up.
michael@0: private SyncStorageCollectionRequest request;
michael@0:
michael@0: public void setRequest(SyncStorageCollectionRequest request) {
michael@0: this.request = request;
michael@0: }
michael@0: private void removeRequestFromPending() {
michael@0: if (this.request == null) {
michael@0: return;
michael@0: }
michael@0: pending.remove(this.request);
michael@0: this.request = null;
michael@0: }
michael@0:
michael@0: public RequestFetchDelegateAdapter(RepositorySessionFetchRecordsDelegate delegate) {
michael@0: this.delegate = delegate;
michael@0: }
michael@0:
michael@0: @Override
michael@0: public AuthHeaderProvider getAuthHeaderProvider() {
michael@0: return serverRepository.getAuthHeaderProvider();
michael@0: }
michael@0:
michael@0: @Override
michael@0: public String ifUnmodifiedSince() {
michael@0: return null;
michael@0: }
michael@0:
michael@0: @Override
michael@0: public void handleRequestSuccess(SyncStorageResponse response) {
michael@0: Logger.debug(LOG_TAG, "Fetch done.");
michael@0: removeRequestFromPending();
michael@0:
michael@0: final long normalizedTimestamp = getNormalizedTimestamp(response);
michael@0: Logger.debug(LOG_TAG, "Fetch completed. Timestamp is " + normalizedTimestamp);
michael@0:
michael@0: // When we're done processing other events, finish.
michael@0: workTracker.delayWorkItem(new Runnable() {
michael@0: @Override
michael@0: public void run() {
michael@0: Logger.debug(LOG_TAG, "Delayed onFetchCompleted running.");
michael@0: // TODO: verify number of returned records.
michael@0: delegate.onFetchCompleted(normalizedTimestamp);
michael@0: }
michael@0: });
michael@0: }
michael@0:
michael@0: @Override
michael@0: public void handleRequestFailure(SyncStorageResponse response) {
michael@0: // TODO: ensure that delegate methods don't get called more than once.
michael@0: this.handleRequestError(new HTTPFailureException(response));
michael@0: }
michael@0:
michael@0: @Override
michael@0: public void handleRequestError(final Exception ex) {
michael@0: removeRequestFromPending();
michael@0: Logger.warn(LOG_TAG, "Got request error.", ex);
michael@0: // When we're done processing other events, finish.
michael@0: workTracker.delayWorkItem(new Runnable() {
michael@0: @Override
michael@0: public void run() {
michael@0: Logger.debug(LOG_TAG, "Running onFetchFailed.");
michael@0: delegate.onFetchFailed(ex, null);
michael@0: }
michael@0: });
michael@0: }
michael@0:
michael@0: @Override
michael@0: public void handleWBO(CryptoRecord record) {
michael@0: workTracker.incrementOutstanding();
michael@0: try {
michael@0: delegate.onFetchedRecord(record);
michael@0: } catch (Exception ex) {
michael@0: Logger.warn(LOG_TAG, "Got exception calling onFetchedRecord with WBO.", ex);
michael@0: // TODO: handle this better.
michael@0: throw new RuntimeException(ex);
michael@0: } finally {
michael@0: workTracker.decrementOutstanding();
michael@0: }
michael@0: }
michael@0:
michael@0: // TODO: this implies that we've screwed up our inheritance chain somehow.
michael@0: @Override
michael@0: public KeyBundle keyBundle() {
michael@0: return null;
michael@0: }
michael@0: }
michael@0:
michael@0:
michael@0: Server11Repository serverRepository;
michael@0: AtomicLong uploadTimestamp = new AtomicLong(0);
michael@0:
michael@0: private void bumpUploadTimestamp(long ts) {
michael@0: while (true) {
michael@0: long existing = uploadTimestamp.get();
michael@0: if (existing > ts) {
michael@0: return;
michael@0: }
michael@0: if (uploadTimestamp.compareAndSet(existing, ts)) {
michael@0: return;
michael@0: }
michael@0: }
michael@0: }
michael@0:
michael@0: public Server11RepositorySession(Repository repository) {
michael@0: super(repository);
michael@0: serverRepository = (Server11Repository) repository;
michael@0: }
michael@0:
michael@0: private String flattenIDs(String[] guids) {
michael@0: // Consider using Utils.toDelimitedString if and when the signature changes
michael@0: // to Collection guids.
michael@0: if (guids.length == 0) {
michael@0: return "";
michael@0: }
michael@0: if (guids.length == 1) {
michael@0: return guids[0];
michael@0: }
michael@0: StringBuilder b = new StringBuilder();
michael@0: for (String guid : guids) {
michael@0: b.append(guid);
michael@0: b.append(",");
michael@0: }
michael@0: return b.substring(0, b.length() - 1);
michael@0: }
michael@0:
michael@0: @Override
michael@0: public void guidsSince(long timestamp,
michael@0: RepositorySessionGuidsSinceDelegate delegate) {
michael@0: // TODO Auto-generated method stub
michael@0:
michael@0: }
michael@0:
michael@0: protected void fetchWithParameters(long newer,
michael@0: long limit,
michael@0: boolean full,
michael@0: String sort,
michael@0: String ids,
michael@0: RequestFetchDelegateAdapter delegate)
michael@0: throws URISyntaxException {
michael@0:
michael@0: URI collectionURI = serverRepository.collectionURI(full, newer, limit, sort, ids);
michael@0: SyncStorageCollectionRequest request = new SyncStorageCollectionRequest(collectionURI);
michael@0: request.delegate = delegate;
michael@0:
michael@0: // So it can clean up.
michael@0: delegate.setRequest(request);
michael@0: pending.add(request);
michael@0: request.get();
michael@0: }
michael@0:
michael@0: public void fetchSince(long timestamp, long limit, String sort, RepositorySessionFetchRecordsDelegate delegate) {
michael@0: try {
michael@0: this.fetchWithParameters(timestamp, limit, true, sort, null, new RequestFetchDelegateAdapter(delegate));
michael@0: } catch (URISyntaxException e) {
michael@0: delegate.onFetchFailed(e, null);
michael@0: }
michael@0: }
michael@0:
michael@0: @Override
michael@0: public void fetchSince(long timestamp,
michael@0: RepositorySessionFetchRecordsDelegate delegate) {
michael@0: try {
michael@0: long limit = serverRepository.getDefaultFetchLimit();
michael@0: String sort = serverRepository.getDefaultSort();
michael@0: this.fetchWithParameters(timestamp, limit, true, sort, null, new RequestFetchDelegateAdapter(delegate));
michael@0: } catch (URISyntaxException e) {
michael@0: delegate.onFetchFailed(e, null);
michael@0: }
michael@0: }
michael@0:
michael@0: @Override
michael@0: public void fetchAll(RepositorySessionFetchRecordsDelegate delegate) {
michael@0: this.fetchSince(-1, delegate);
michael@0: }
michael@0:
michael@0: @Override
michael@0: public void fetch(String[] guids,
michael@0: RepositorySessionFetchRecordsDelegate delegate) {
michael@0: // TODO: watch out for URL length limits!
michael@0: try {
michael@0: String ids = flattenIDs(guids);
michael@0: this.fetchWithParameters(-1, -1, true, "index", ids, new RequestFetchDelegateAdapter(delegate));
michael@0: } catch (URISyntaxException e) {
michael@0: delegate.onFetchFailed(e, null);
michael@0: }
michael@0: }
michael@0:
michael@0: @Override
michael@0: public void wipe(RepositorySessionWipeDelegate delegate) {
michael@0: if (!isActive()) {
michael@0: delegate.onWipeFailed(new InactiveSessionException(null));
michael@0: return;
michael@0: }
michael@0: // TODO: implement wipe.
michael@0: }
michael@0:
michael@0: protected Object recordsBufferMonitor = new Object();
michael@0:
michael@0: /**
michael@0: * Data of outbound records.
michael@0: *
michael@0: * We buffer the data (rather than the Record
) so that we can
michael@0: * flush the buffer based on outgoing transmission size.
michael@0: *
michael@0: * Access should be synchronized on recordsBufferMonitor
.
michael@0: */
michael@0: protected ArrayList recordsBuffer = new ArrayList();
michael@0:
michael@0: /**
michael@0: * GUIDs of outbound records.
michael@0: *
michael@0: * Used to fail entire outgoing uploads.
michael@0: *
michael@0: * Access should be synchronized on recordsBufferMonitor
.
michael@0: */
michael@0: protected ArrayList recordGuidsBuffer = new ArrayList();
michael@0: protected int byteCount = PER_BATCH_OVERHEAD;
michael@0:
michael@0: @Override
michael@0: public void store(Record record) throws NoStoreDelegateException {
michael@0: if (delegate == null) {
michael@0: throw new NoStoreDelegateException();
michael@0: }
michael@0: this.enqueue(record);
michael@0: }
michael@0:
michael@0: /**
michael@0: * Batch incoming records until some reasonable threshold (e.g., 50),
michael@0: * some size limit is hit (probably way less than 3MB!), or storeDone
michael@0: * is received.
michael@0: * @param record
michael@0: */
michael@0: protected void enqueue(Record record) {
michael@0: // JSONify and store the bytes, rather than the record.
michael@0: byte[] json = record.toJSONBytes();
michael@0: int delta = json.length;
michael@0: synchronized (recordsBufferMonitor) {
michael@0: if ((delta + byteCount > UPLOAD_BYTE_THRESHOLD) ||
michael@0: (recordsBuffer.size() >= UPLOAD_ITEM_THRESHOLD)) {
michael@0:
michael@0: // POST the existing contents, then enqueue.
michael@0: flush();
michael@0: }
michael@0: recordsBuffer.add(json);
michael@0: recordGuidsBuffer.add(record.guid);
michael@0: byteCount += PER_RECORD_OVERHEAD + delta;
michael@0: }
michael@0: }
michael@0:
michael@0: // Asynchronously upload records.
michael@0: // Must be locked!
michael@0: protected void flush() {
michael@0: if (recordsBuffer.size() > 0) {
michael@0: final ArrayList outgoing = recordsBuffer;
michael@0: final ArrayList outgoingGuids = recordGuidsBuffer;
michael@0: RepositorySessionStoreDelegate uploadDelegate = this.delegate;
michael@0: storeWorkQueue.execute(new RecordUploadRunnable(uploadDelegate, outgoing, outgoingGuids, byteCount));
michael@0:
michael@0: recordsBuffer = new ArrayList();
michael@0: recordGuidsBuffer = new ArrayList();
michael@0: byteCount = PER_BATCH_OVERHEAD;
michael@0: }
michael@0: }
michael@0:
michael@0: @Override
michael@0: public void storeDone() {
michael@0: Logger.debug(LOG_TAG, "storeDone().");
michael@0: synchronized (recordsBufferMonitor) {
michael@0: flush();
michael@0: // Do this in a Runnable so that the timestamp is grabbed after any upload.
michael@0: final Runnable r = new Runnable() {
michael@0: @Override
michael@0: public void run() {
michael@0: synchronized (recordsBufferMonitor) {
michael@0: final long end = uploadTimestamp.get();
michael@0: Logger.debug(LOG_TAG, "Calling storeDone with " + end);
michael@0: storeDone(end);
michael@0: }
michael@0: }
michael@0: };
michael@0: storeWorkQueue.execute(r);
michael@0: }
michael@0: }
michael@0:
michael@0: /**
michael@0: * true
if a record upload has failed this session.
michael@0: *
michael@0: * This is only set in begin and possibly by RecordUploadRunnable
.
michael@0: * Since those are executed serially, we can use an unsynchronized
michael@0: * volatile boolean here.
michael@0: */
michael@0: protected volatile boolean recordUploadFailed;
michael@0:
michael@0: @Override
michael@0: public void begin(RepositorySessionBeginDelegate delegate) throws InvalidSessionTransitionException {
michael@0: recordUploadFailed = false;
michael@0: super.begin(delegate);
michael@0: }
michael@0:
michael@0: /**
michael@0: * Make an HTTP request, and convert HTTP request delegate callbacks into
michael@0: * store callbacks within the context of this RepositorySession.
michael@0: *
michael@0: * @author rnewman
michael@0: *
michael@0: */
michael@0: protected class RecordUploadRunnable implements Runnable, SyncStorageRequestDelegate {
michael@0:
michael@0: public final String LOG_TAG = "RecordUploadRunnable";
michael@0: private ArrayList outgoing;
michael@0: private ArrayList outgoingGuids;
michael@0: private long byteCount;
michael@0:
michael@0: public RecordUploadRunnable(RepositorySessionStoreDelegate storeDelegate,
michael@0: ArrayList outgoing,
michael@0: ArrayList outgoingGuids,
michael@0: long byteCount) {
michael@0: Logger.debug(LOG_TAG, "Preparing record upload for " +
michael@0: outgoing.size() + " records (" +
michael@0: byteCount + " bytes).");
michael@0: this.outgoing = outgoing;
michael@0: this.outgoingGuids = outgoingGuids;
michael@0: this.byteCount = byteCount;
michael@0: }
michael@0:
michael@0: @Override
michael@0: public AuthHeaderProvider getAuthHeaderProvider() {
michael@0: return serverRepository.getAuthHeaderProvider();
michael@0: }
michael@0:
michael@0: @Override
michael@0: public String ifUnmodifiedSince() {
michael@0: return null;
michael@0: }
michael@0:
michael@0: @Override
michael@0: public void handleRequestSuccess(SyncStorageResponse response) {
michael@0: Logger.trace(LOG_TAG, "POST of " + outgoing.size() + " records done.");
michael@0:
michael@0: ExtendedJSONObject body;
michael@0: try {
michael@0: body = response.jsonObjectBody(); // jsonObjectBody() throws or returns non-null.
michael@0: } catch (Exception e) {
michael@0: Logger.error(LOG_TAG, "Got exception parsing POST success body.", e);
michael@0: this.handleRequestError(e);
michael@0: return;
michael@0: }
michael@0:
michael@0: // Be defensive when logging timestamp.
michael@0: if (body.containsKey("modified")) {
michael@0: Long modified = body.getTimestamp("modified");
michael@0: if (modified != null) {
michael@0: Logger.trace(LOG_TAG, "POST request success. Modified timestamp: " + modified.longValue());
michael@0: } else {
michael@0: Logger.warn(LOG_TAG, "POST success body contains malformed 'modified': " + body.toJSONString());
michael@0: }
michael@0: } else {
michael@0: Logger.warn(LOG_TAG, "POST success body does not contain key 'modified': " + body.toJSONString());
michael@0: }
michael@0:
michael@0: try {
michael@0: JSONArray success = body.getArray("success");
michael@0: if ((success != null) &&
michael@0: (success.size() > 0)) {
michael@0: Logger.trace(LOG_TAG, "Successful records: " + success.toString());
michael@0: for (Object o : success) {
michael@0: try {
michael@0: delegate.onRecordStoreSucceeded((String) o);
michael@0: } catch (ClassCastException e) {
michael@0: Logger.error(LOG_TAG, "Got exception parsing POST success guid.", e);
michael@0: // Not much to be done.
michael@0: }
michael@0: }
michael@0:
michael@0: long normalizedTimestamp = getNormalizedTimestamp(response);
michael@0: Logger.trace(LOG_TAG, "Passing back upload X-Weave-Timestamp: " + normalizedTimestamp);
michael@0: bumpUploadTimestamp(normalizedTimestamp);
michael@0: }
michael@0: success = null; // Want to GC this ASAP.
michael@0:
michael@0: ExtendedJSONObject failed = body.getObject("failed");
michael@0: if ((failed != null) &&
michael@0: (failed.object.size() > 0)) {
michael@0: Logger.debug(LOG_TAG, "Failed records: " + failed.object.toString());
michael@0: Exception ex = new Server11RecordPostFailedException();
michael@0: for (String guid : failed.keySet()) {
michael@0: delegate.onRecordStoreFailed(ex, guid);
michael@0: }
michael@0: }
michael@0: failed = null; // Want to GC this ASAP.
michael@0: } catch (UnexpectedJSONException e) {
michael@0: Logger.error(LOG_TAG, "Got exception processing success/failed in POST success body.", e);
michael@0: // TODO
michael@0: return;
michael@0: }
michael@0: Logger.debug(LOG_TAG, "POST of " + outgoing.size() + " records handled.");
michael@0: }
michael@0:
michael@0: @Override
michael@0: public void handleRequestFailure(SyncStorageResponse response) {
michael@0: // TODO: call session.interpretHTTPFailure.
michael@0: this.handleRequestError(new HTTPFailureException(response));
michael@0: }
michael@0:
michael@0: @Override
michael@0: public void handleRequestError(final Exception ex) {
michael@0: Logger.warn(LOG_TAG, "Got request error.", ex);
michael@0:
michael@0: recordUploadFailed = true;
michael@0: ArrayList failedOutgoingGuids = outgoingGuids;
michael@0: outgoingGuids = null; // Want to GC this ASAP.
michael@0: for (String guid : failedOutgoingGuids) {
michael@0: delegate.onRecordStoreFailed(ex, guid);
michael@0: }
michael@0: return;
michael@0: }
michael@0:
michael@0: public class ByteArraysContentProducer implements ContentProducer {
michael@0:
michael@0: ArrayList outgoing;
michael@0: public ByteArraysContentProducer(ArrayList arrays) {
michael@0: outgoing = arrays;
michael@0: }
michael@0:
michael@0: @Override
michael@0: public void writeTo(OutputStream outstream) throws IOException {
michael@0: int count = outgoing.size();
michael@0: outstream.write(recordsStart);
michael@0: outstream.write(outgoing.get(0));
michael@0: for (int i = 1; i < count; ++i) {
michael@0: outstream.write(recordSeparator);
michael@0: outstream.write(outgoing.get(i));
michael@0: }
michael@0: outstream.write(recordsEnd);
michael@0: }
michael@0: }
michael@0:
michael@0: public class ByteArraysEntity extends EntityTemplate {
michael@0: private long count;
michael@0: public ByteArraysEntity(ArrayList arrays, long totalBytes) {
michael@0: super(new ByteArraysContentProducer(arrays));
michael@0: this.count = totalBytes;
michael@0: this.setContentType("application/json");
michael@0: // charset is set in BaseResource.
michael@0: }
michael@0:
michael@0: @Override
michael@0: public long getContentLength() {
michael@0: return count;
michael@0: }
michael@0:
michael@0: @Override
michael@0: public boolean isRepeatable() {
michael@0: return true;
michael@0: }
michael@0: }
michael@0:
michael@0: public ByteArraysEntity getBodyEntity() {
michael@0: ByteArraysEntity body = new ByteArraysEntity(outgoing, byteCount);
michael@0: return body;
michael@0: }
michael@0:
michael@0: @Override
michael@0: public void run() {
michael@0: if (recordUploadFailed) {
michael@0: Logger.info(LOG_TAG, "Previous record upload failed. Failing all records and not retrying.");
michael@0: Exception ex = new Server11PreviousPostFailedException();
michael@0: for (String guid : outgoingGuids) {
michael@0: delegate.onRecordStoreFailed(ex, guid);
michael@0: }
michael@0: return;
michael@0: }
michael@0:
michael@0: if (outgoing == null ||
michael@0: outgoing.size() == 0) {
michael@0: Logger.debug(LOG_TAG, "No items: RecordUploadRunnable returning immediately.");
michael@0: return;
michael@0: }
michael@0:
michael@0: URI u = serverRepository.collectionURI();
michael@0: SyncStorageRequest request = new SyncStorageRequest(u);
michael@0:
michael@0: request.delegate = this;
michael@0:
michael@0: // We don't want the task queue to proceed until this request completes.
michael@0: // Fortunately, BaseResource is currently synchronous.
michael@0: // If that ever changes, you'll need to block here.
michael@0: ByteArraysEntity body = getBodyEntity();
michael@0: request.post(body);
michael@0: }
michael@0: }
michael@0:
michael@0: @Override
michael@0: public boolean dataAvailable() {
michael@0: return serverRepository.updateNeeded(getLastSyncTimestamp());
michael@0: }
michael@0: }