michael@0: /* This Source Code Form is subject to the terms of the Mozilla Public michael@0: * License, v. 2.0. If a copy of the MPL was not distributed with this michael@0: * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ michael@0: michael@0: package org.mozilla.gecko.sync.repositories; michael@0: michael@0: import java.io.IOException; michael@0: import java.io.OutputStream; michael@0: import java.io.UnsupportedEncodingException; michael@0: import java.net.URI; michael@0: import java.net.URISyntaxException; michael@0: import java.util.ArrayList; michael@0: import java.util.Collections; michael@0: import java.util.HashSet; michael@0: import java.util.Set; michael@0: import java.util.concurrent.atomic.AtomicLong; michael@0: michael@0: import org.json.simple.JSONArray; michael@0: import org.mozilla.gecko.background.common.log.Logger; michael@0: import org.mozilla.gecko.sync.CryptoRecord; michael@0: import org.mozilla.gecko.sync.DelayedWorkTracker; michael@0: import org.mozilla.gecko.sync.ExtendedJSONObject; michael@0: import org.mozilla.gecko.sync.HTTPFailureException; michael@0: import org.mozilla.gecko.sync.Server11PreviousPostFailedException; michael@0: import org.mozilla.gecko.sync.Server11RecordPostFailedException; michael@0: import org.mozilla.gecko.sync.UnexpectedJSONException; michael@0: import org.mozilla.gecko.sync.crypto.KeyBundle; michael@0: import org.mozilla.gecko.sync.net.AuthHeaderProvider; michael@0: import org.mozilla.gecko.sync.net.SyncStorageCollectionRequest; michael@0: import org.mozilla.gecko.sync.net.SyncStorageRequest; michael@0: import org.mozilla.gecko.sync.net.SyncStorageRequestDelegate; michael@0: import org.mozilla.gecko.sync.net.SyncStorageResponse; michael@0: import org.mozilla.gecko.sync.net.WBOCollectionRequestDelegate; michael@0: import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionBeginDelegate; michael@0: import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate; michael@0: import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionGuidsSinceDelegate; michael@0: import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate; michael@0: import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionWipeDelegate; michael@0: import org.mozilla.gecko.sync.repositories.domain.Record; michael@0: michael@0: import ch.boye.httpclientandroidlib.entity.ContentProducer; michael@0: import ch.boye.httpclientandroidlib.entity.EntityTemplate; michael@0: michael@0: public class Server11RepositorySession extends RepositorySession { michael@0: private static byte[] recordsStart; michael@0: private static byte[] recordSeparator; michael@0: private static byte[] recordsEnd; michael@0: michael@0: static { michael@0: try { michael@0: recordsStart = "[\n".getBytes("UTF-8"); michael@0: recordSeparator = ",\n".getBytes("UTF-8"); michael@0: recordsEnd = "\n]\n".getBytes("UTF-8"); michael@0: } catch (UnsupportedEncodingException e) { michael@0: // These won't fail. michael@0: } michael@0: } michael@0: michael@0: public static final String LOG_TAG = "Server11Session"; michael@0: michael@0: private static final int UPLOAD_BYTE_THRESHOLD = 1024 * 1024; // 1MB. michael@0: private static final int UPLOAD_ITEM_THRESHOLD = 50; michael@0: private static final int PER_RECORD_OVERHEAD = 2; // Comma, newline. michael@0: // {}, newlines, but we get to skip one record overhead. michael@0: private static final int PER_BATCH_OVERHEAD = 5 - PER_RECORD_OVERHEAD; michael@0: michael@0: /** michael@0: * Return the X-Weave-Timestamp header from response, or the michael@0: * current time if it is missing. michael@0: *

michael@0: * Warning: this can cause the timestamp of response to michael@0: * cross domains (from server clock to local clock), which could cause records michael@0: * to be skipped on account of clock drift. This should never happen, because michael@0: * every server response should have a well-formed X-Weave-Timestamp michael@0: * header. michael@0: * michael@0: * @param response michael@0: * The SyncStorageResponse to interrogate. michael@0: * @return Normalized timestamp in milliseconds. michael@0: */ michael@0: public static long getNormalizedTimestamp(SyncStorageResponse response) { michael@0: long normalizedTimestamp = -1; michael@0: try { michael@0: normalizedTimestamp = response.normalizedWeaveTimestamp(); michael@0: } catch (NumberFormatException e) { michael@0: Logger.warn(LOG_TAG, "Malformed X-Weave-Timestamp header received.", e); michael@0: } michael@0: if (-1 == normalizedTimestamp) { michael@0: Logger.warn(LOG_TAG, "Computing stand-in timestamp from local clock. Clock drift could cause records to be skipped."); michael@0: normalizedTimestamp = System.currentTimeMillis(); michael@0: } michael@0: return normalizedTimestamp; michael@0: } michael@0: michael@0: /** michael@0: * Used to track outstanding requests, so that we can abort them as needed. michael@0: */ michael@0: private Set pending = Collections.synchronizedSet(new HashSet()); michael@0: michael@0: @Override michael@0: public void abort() { michael@0: super.abort(); michael@0: for (SyncStorageCollectionRequest request : pending) { michael@0: request.abort(); michael@0: } michael@0: pending.clear(); michael@0: } michael@0: michael@0: /** michael@0: * Convert HTTP request delegate callbacks into fetch callbacks within the michael@0: * context of this RepositorySession. michael@0: * michael@0: * @author rnewman michael@0: * michael@0: */ michael@0: public class RequestFetchDelegateAdapter extends WBOCollectionRequestDelegate { michael@0: RepositorySessionFetchRecordsDelegate delegate; michael@0: private DelayedWorkTracker workTracker = new DelayedWorkTracker(); michael@0: michael@0: // So that we can clean up. michael@0: private SyncStorageCollectionRequest request; michael@0: michael@0: public void setRequest(SyncStorageCollectionRequest request) { michael@0: this.request = request; michael@0: } michael@0: private void removeRequestFromPending() { michael@0: if (this.request == null) { michael@0: return; michael@0: } michael@0: pending.remove(this.request); michael@0: this.request = null; michael@0: } michael@0: michael@0: public RequestFetchDelegateAdapter(RepositorySessionFetchRecordsDelegate delegate) { michael@0: this.delegate = delegate; michael@0: } michael@0: michael@0: @Override michael@0: public AuthHeaderProvider getAuthHeaderProvider() { michael@0: return serverRepository.getAuthHeaderProvider(); michael@0: } michael@0: michael@0: @Override michael@0: public String ifUnmodifiedSince() { michael@0: return null; michael@0: } michael@0: michael@0: @Override michael@0: public void handleRequestSuccess(SyncStorageResponse response) { michael@0: Logger.debug(LOG_TAG, "Fetch done."); michael@0: removeRequestFromPending(); michael@0: michael@0: final long normalizedTimestamp = getNormalizedTimestamp(response); michael@0: Logger.debug(LOG_TAG, "Fetch completed. Timestamp is " + normalizedTimestamp); michael@0: michael@0: // When we're done processing other events, finish. michael@0: workTracker.delayWorkItem(new Runnable() { michael@0: @Override michael@0: public void run() { michael@0: Logger.debug(LOG_TAG, "Delayed onFetchCompleted running."); michael@0: // TODO: verify number of returned records. michael@0: delegate.onFetchCompleted(normalizedTimestamp); michael@0: } michael@0: }); michael@0: } michael@0: michael@0: @Override michael@0: public void handleRequestFailure(SyncStorageResponse response) { michael@0: // TODO: ensure that delegate methods don't get called more than once. michael@0: this.handleRequestError(new HTTPFailureException(response)); michael@0: } michael@0: michael@0: @Override michael@0: public void handleRequestError(final Exception ex) { michael@0: removeRequestFromPending(); michael@0: Logger.warn(LOG_TAG, "Got request error.", ex); michael@0: // When we're done processing other events, finish. michael@0: workTracker.delayWorkItem(new Runnable() { michael@0: @Override michael@0: public void run() { michael@0: Logger.debug(LOG_TAG, "Running onFetchFailed."); michael@0: delegate.onFetchFailed(ex, null); michael@0: } michael@0: }); michael@0: } michael@0: michael@0: @Override michael@0: public void handleWBO(CryptoRecord record) { michael@0: workTracker.incrementOutstanding(); michael@0: try { michael@0: delegate.onFetchedRecord(record); michael@0: } catch (Exception ex) { michael@0: Logger.warn(LOG_TAG, "Got exception calling onFetchedRecord with WBO.", ex); michael@0: // TODO: handle this better. michael@0: throw new RuntimeException(ex); michael@0: } finally { michael@0: workTracker.decrementOutstanding(); michael@0: } michael@0: } michael@0: michael@0: // TODO: this implies that we've screwed up our inheritance chain somehow. michael@0: @Override michael@0: public KeyBundle keyBundle() { michael@0: return null; michael@0: } michael@0: } michael@0: michael@0: michael@0: Server11Repository serverRepository; michael@0: AtomicLong uploadTimestamp = new AtomicLong(0); michael@0: michael@0: private void bumpUploadTimestamp(long ts) { michael@0: while (true) { michael@0: long existing = uploadTimestamp.get(); michael@0: if (existing > ts) { michael@0: return; michael@0: } michael@0: if (uploadTimestamp.compareAndSet(existing, ts)) { michael@0: return; michael@0: } michael@0: } michael@0: } michael@0: michael@0: public Server11RepositorySession(Repository repository) { michael@0: super(repository); michael@0: serverRepository = (Server11Repository) repository; michael@0: } michael@0: michael@0: private String flattenIDs(String[] guids) { michael@0: // Consider using Utils.toDelimitedString if and when the signature changes michael@0: // to Collection guids. michael@0: if (guids.length == 0) { michael@0: return ""; michael@0: } michael@0: if (guids.length == 1) { michael@0: return guids[0]; michael@0: } michael@0: StringBuilder b = new StringBuilder(); michael@0: for (String guid : guids) { michael@0: b.append(guid); michael@0: b.append(","); michael@0: } michael@0: return b.substring(0, b.length() - 1); michael@0: } michael@0: michael@0: @Override michael@0: public void guidsSince(long timestamp, michael@0: RepositorySessionGuidsSinceDelegate delegate) { michael@0: // TODO Auto-generated method stub michael@0: michael@0: } michael@0: michael@0: protected void fetchWithParameters(long newer, michael@0: long limit, michael@0: boolean full, michael@0: String sort, michael@0: String ids, michael@0: RequestFetchDelegateAdapter delegate) michael@0: throws URISyntaxException { michael@0: michael@0: URI collectionURI = serverRepository.collectionURI(full, newer, limit, sort, ids); michael@0: SyncStorageCollectionRequest request = new SyncStorageCollectionRequest(collectionURI); michael@0: request.delegate = delegate; michael@0: michael@0: // So it can clean up. michael@0: delegate.setRequest(request); michael@0: pending.add(request); michael@0: request.get(); michael@0: } michael@0: michael@0: public void fetchSince(long timestamp, long limit, String sort, RepositorySessionFetchRecordsDelegate delegate) { michael@0: try { michael@0: this.fetchWithParameters(timestamp, limit, true, sort, null, new RequestFetchDelegateAdapter(delegate)); michael@0: } catch (URISyntaxException e) { michael@0: delegate.onFetchFailed(e, null); michael@0: } michael@0: } michael@0: michael@0: @Override michael@0: public void fetchSince(long timestamp, michael@0: RepositorySessionFetchRecordsDelegate delegate) { michael@0: try { michael@0: long limit = serverRepository.getDefaultFetchLimit(); michael@0: String sort = serverRepository.getDefaultSort(); michael@0: this.fetchWithParameters(timestamp, limit, true, sort, null, new RequestFetchDelegateAdapter(delegate)); michael@0: } catch (URISyntaxException e) { michael@0: delegate.onFetchFailed(e, null); michael@0: } michael@0: } michael@0: michael@0: @Override michael@0: public void fetchAll(RepositorySessionFetchRecordsDelegate delegate) { michael@0: this.fetchSince(-1, delegate); michael@0: } michael@0: michael@0: @Override michael@0: public void fetch(String[] guids, michael@0: RepositorySessionFetchRecordsDelegate delegate) { michael@0: // TODO: watch out for URL length limits! michael@0: try { michael@0: String ids = flattenIDs(guids); michael@0: this.fetchWithParameters(-1, -1, true, "index", ids, new RequestFetchDelegateAdapter(delegate)); michael@0: } catch (URISyntaxException e) { michael@0: delegate.onFetchFailed(e, null); michael@0: } michael@0: } michael@0: michael@0: @Override michael@0: public void wipe(RepositorySessionWipeDelegate delegate) { michael@0: if (!isActive()) { michael@0: delegate.onWipeFailed(new InactiveSessionException(null)); michael@0: return; michael@0: } michael@0: // TODO: implement wipe. michael@0: } michael@0: michael@0: protected Object recordsBufferMonitor = new Object(); michael@0: michael@0: /** michael@0: * Data of outbound records. michael@0: *

michael@0: * We buffer the data (rather than the Record) so that we can michael@0: * flush the buffer based on outgoing transmission size. michael@0: *

michael@0: * Access should be synchronized on recordsBufferMonitor. michael@0: */ michael@0: protected ArrayList recordsBuffer = new ArrayList(); michael@0: michael@0: /** michael@0: * GUIDs of outbound records. michael@0: *

michael@0: * Used to fail entire outgoing uploads. michael@0: *

michael@0: * Access should be synchronized on recordsBufferMonitor. michael@0: */ michael@0: protected ArrayList recordGuidsBuffer = new ArrayList(); michael@0: protected int byteCount = PER_BATCH_OVERHEAD; michael@0: michael@0: @Override michael@0: public void store(Record record) throws NoStoreDelegateException { michael@0: if (delegate == null) { michael@0: throw new NoStoreDelegateException(); michael@0: } michael@0: this.enqueue(record); michael@0: } michael@0: michael@0: /** michael@0: * Batch incoming records until some reasonable threshold (e.g., 50), michael@0: * some size limit is hit (probably way less than 3MB!), or storeDone michael@0: * is received. michael@0: * @param record michael@0: */ michael@0: protected void enqueue(Record record) { michael@0: // JSONify and store the bytes, rather than the record. michael@0: byte[] json = record.toJSONBytes(); michael@0: int delta = json.length; michael@0: synchronized (recordsBufferMonitor) { michael@0: if ((delta + byteCount > UPLOAD_BYTE_THRESHOLD) || michael@0: (recordsBuffer.size() >= UPLOAD_ITEM_THRESHOLD)) { michael@0: michael@0: // POST the existing contents, then enqueue. michael@0: flush(); michael@0: } michael@0: recordsBuffer.add(json); michael@0: recordGuidsBuffer.add(record.guid); michael@0: byteCount += PER_RECORD_OVERHEAD + delta; michael@0: } michael@0: } michael@0: michael@0: // Asynchronously upload records. michael@0: // Must be locked! michael@0: protected void flush() { michael@0: if (recordsBuffer.size() > 0) { michael@0: final ArrayList outgoing = recordsBuffer; michael@0: final ArrayList outgoingGuids = recordGuidsBuffer; michael@0: RepositorySessionStoreDelegate uploadDelegate = this.delegate; michael@0: storeWorkQueue.execute(new RecordUploadRunnable(uploadDelegate, outgoing, outgoingGuids, byteCount)); michael@0: michael@0: recordsBuffer = new ArrayList(); michael@0: recordGuidsBuffer = new ArrayList(); michael@0: byteCount = PER_BATCH_OVERHEAD; michael@0: } michael@0: } michael@0: michael@0: @Override michael@0: public void storeDone() { michael@0: Logger.debug(LOG_TAG, "storeDone()."); michael@0: synchronized (recordsBufferMonitor) { michael@0: flush(); michael@0: // Do this in a Runnable so that the timestamp is grabbed after any upload. michael@0: final Runnable r = new Runnable() { michael@0: @Override michael@0: public void run() { michael@0: synchronized (recordsBufferMonitor) { michael@0: final long end = uploadTimestamp.get(); michael@0: Logger.debug(LOG_TAG, "Calling storeDone with " + end); michael@0: storeDone(end); michael@0: } michael@0: } michael@0: }; michael@0: storeWorkQueue.execute(r); michael@0: } michael@0: } michael@0: michael@0: /** michael@0: * true if a record upload has failed this session. michael@0: *

michael@0: * This is only set in begin and possibly by RecordUploadRunnable. michael@0: * Since those are executed serially, we can use an unsynchronized michael@0: * volatile boolean here. michael@0: */ michael@0: protected volatile boolean recordUploadFailed; michael@0: michael@0: @Override michael@0: public void begin(RepositorySessionBeginDelegate delegate) throws InvalidSessionTransitionException { michael@0: recordUploadFailed = false; michael@0: super.begin(delegate); michael@0: } michael@0: michael@0: /** michael@0: * Make an HTTP request, and convert HTTP request delegate callbacks into michael@0: * store callbacks within the context of this RepositorySession. michael@0: * michael@0: * @author rnewman michael@0: * michael@0: */ michael@0: protected class RecordUploadRunnable implements Runnable, SyncStorageRequestDelegate { michael@0: michael@0: public final String LOG_TAG = "RecordUploadRunnable"; michael@0: private ArrayList outgoing; michael@0: private ArrayList outgoingGuids; michael@0: private long byteCount; michael@0: michael@0: public RecordUploadRunnable(RepositorySessionStoreDelegate storeDelegate, michael@0: ArrayList outgoing, michael@0: ArrayList outgoingGuids, michael@0: long byteCount) { michael@0: Logger.debug(LOG_TAG, "Preparing record upload for " + michael@0: outgoing.size() + " records (" + michael@0: byteCount + " bytes)."); michael@0: this.outgoing = outgoing; michael@0: this.outgoingGuids = outgoingGuids; michael@0: this.byteCount = byteCount; michael@0: } michael@0: michael@0: @Override michael@0: public AuthHeaderProvider getAuthHeaderProvider() { michael@0: return serverRepository.getAuthHeaderProvider(); michael@0: } michael@0: michael@0: @Override michael@0: public String ifUnmodifiedSince() { michael@0: return null; michael@0: } michael@0: michael@0: @Override michael@0: public void handleRequestSuccess(SyncStorageResponse response) { michael@0: Logger.trace(LOG_TAG, "POST of " + outgoing.size() + " records done."); michael@0: michael@0: ExtendedJSONObject body; michael@0: try { michael@0: body = response.jsonObjectBody(); // jsonObjectBody() throws or returns non-null. michael@0: } catch (Exception e) { michael@0: Logger.error(LOG_TAG, "Got exception parsing POST success body.", e); michael@0: this.handleRequestError(e); michael@0: return; michael@0: } michael@0: michael@0: // Be defensive when logging timestamp. michael@0: if (body.containsKey("modified")) { michael@0: Long modified = body.getTimestamp("modified"); michael@0: if (modified != null) { michael@0: Logger.trace(LOG_TAG, "POST request success. Modified timestamp: " + modified.longValue()); michael@0: } else { michael@0: Logger.warn(LOG_TAG, "POST success body contains malformed 'modified': " + body.toJSONString()); michael@0: } michael@0: } else { michael@0: Logger.warn(LOG_TAG, "POST success body does not contain key 'modified': " + body.toJSONString()); michael@0: } michael@0: michael@0: try { michael@0: JSONArray success = body.getArray("success"); michael@0: if ((success != null) && michael@0: (success.size() > 0)) { michael@0: Logger.trace(LOG_TAG, "Successful records: " + success.toString()); michael@0: for (Object o : success) { michael@0: try { michael@0: delegate.onRecordStoreSucceeded((String) o); michael@0: } catch (ClassCastException e) { michael@0: Logger.error(LOG_TAG, "Got exception parsing POST success guid.", e); michael@0: // Not much to be done. michael@0: } michael@0: } michael@0: michael@0: long normalizedTimestamp = getNormalizedTimestamp(response); michael@0: Logger.trace(LOG_TAG, "Passing back upload X-Weave-Timestamp: " + normalizedTimestamp); michael@0: bumpUploadTimestamp(normalizedTimestamp); michael@0: } michael@0: success = null; // Want to GC this ASAP. michael@0: michael@0: ExtendedJSONObject failed = body.getObject("failed"); michael@0: if ((failed != null) && michael@0: (failed.object.size() > 0)) { michael@0: Logger.debug(LOG_TAG, "Failed records: " + failed.object.toString()); michael@0: Exception ex = new Server11RecordPostFailedException(); michael@0: for (String guid : failed.keySet()) { michael@0: delegate.onRecordStoreFailed(ex, guid); michael@0: } michael@0: } michael@0: failed = null; // Want to GC this ASAP. michael@0: } catch (UnexpectedJSONException e) { michael@0: Logger.error(LOG_TAG, "Got exception processing success/failed in POST success body.", e); michael@0: // TODO michael@0: return; michael@0: } michael@0: Logger.debug(LOG_TAG, "POST of " + outgoing.size() + " records handled."); michael@0: } michael@0: michael@0: @Override michael@0: public void handleRequestFailure(SyncStorageResponse response) { michael@0: // TODO: call session.interpretHTTPFailure. michael@0: this.handleRequestError(new HTTPFailureException(response)); michael@0: } michael@0: michael@0: @Override michael@0: public void handleRequestError(final Exception ex) { michael@0: Logger.warn(LOG_TAG, "Got request error.", ex); michael@0: michael@0: recordUploadFailed = true; michael@0: ArrayList failedOutgoingGuids = outgoingGuids; michael@0: outgoingGuids = null; // Want to GC this ASAP. michael@0: for (String guid : failedOutgoingGuids) { michael@0: delegate.onRecordStoreFailed(ex, guid); michael@0: } michael@0: return; michael@0: } michael@0: michael@0: public class ByteArraysContentProducer implements ContentProducer { michael@0: michael@0: ArrayList outgoing; michael@0: public ByteArraysContentProducer(ArrayList arrays) { michael@0: outgoing = arrays; michael@0: } michael@0: michael@0: @Override michael@0: public void writeTo(OutputStream outstream) throws IOException { michael@0: int count = outgoing.size(); michael@0: outstream.write(recordsStart); michael@0: outstream.write(outgoing.get(0)); michael@0: for (int i = 1; i < count; ++i) { michael@0: outstream.write(recordSeparator); michael@0: outstream.write(outgoing.get(i)); michael@0: } michael@0: outstream.write(recordsEnd); michael@0: } michael@0: } michael@0: michael@0: public class ByteArraysEntity extends EntityTemplate { michael@0: private long count; michael@0: public ByteArraysEntity(ArrayList arrays, long totalBytes) { michael@0: super(new ByteArraysContentProducer(arrays)); michael@0: this.count = totalBytes; michael@0: this.setContentType("application/json"); michael@0: // charset is set in BaseResource. michael@0: } michael@0: michael@0: @Override michael@0: public long getContentLength() { michael@0: return count; michael@0: } michael@0: michael@0: @Override michael@0: public boolean isRepeatable() { michael@0: return true; michael@0: } michael@0: } michael@0: michael@0: public ByteArraysEntity getBodyEntity() { michael@0: ByteArraysEntity body = new ByteArraysEntity(outgoing, byteCount); michael@0: return body; michael@0: } michael@0: michael@0: @Override michael@0: public void run() { michael@0: if (recordUploadFailed) { michael@0: Logger.info(LOG_TAG, "Previous record upload failed. Failing all records and not retrying."); michael@0: Exception ex = new Server11PreviousPostFailedException(); michael@0: for (String guid : outgoingGuids) { michael@0: delegate.onRecordStoreFailed(ex, guid); michael@0: } michael@0: return; michael@0: } michael@0: michael@0: if (outgoing == null || michael@0: outgoing.size() == 0) { michael@0: Logger.debug(LOG_TAG, "No items: RecordUploadRunnable returning immediately."); michael@0: return; michael@0: } michael@0: michael@0: URI u = serverRepository.collectionURI(); michael@0: SyncStorageRequest request = new SyncStorageRequest(u); michael@0: michael@0: request.delegate = this; michael@0: michael@0: // We don't want the task queue to proceed until this request completes. michael@0: // Fortunately, BaseResource is currently synchronous. michael@0: // If that ever changes, you'll need to block here. michael@0: ByteArraysEntity body = getBodyEntity(); michael@0: request.post(body); michael@0: } michael@0: } michael@0: michael@0: @Override michael@0: public boolean dataAvailable() { michael@0: return serverRepository.updateNeeded(getLastSyncTimestamp()); michael@0: } michael@0: }