mobile/android/base/sync/repositories/Server11RepositorySession.java

Wed, 31 Dec 2014 07:22:50 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Wed, 31 Dec 2014 07:22:50 +0100
branch
TOR_BUG_3246
changeset 4
fc2d59ddac77
permissions
-rw-r--r--

Correct previous dual key logic pending first delivery installment.

     1 /* This Source Code Form is subject to the terms of the Mozilla Public
     2  * License, v. 2.0. If a copy of the MPL was not distributed with this
     3  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
     5 package org.mozilla.gecko.sync.repositories;
     7 import java.io.IOException;
     8 import java.io.OutputStream;
     9 import java.io.UnsupportedEncodingException;
    10 import java.net.URI;
    11 import java.net.URISyntaxException;
    12 import java.util.ArrayList;
    13 import java.util.Collections;
    14 import java.util.HashSet;
    15 import java.util.Set;
    16 import java.util.concurrent.atomic.AtomicLong;
    18 import org.json.simple.JSONArray;
    19 import org.mozilla.gecko.background.common.log.Logger;
    20 import org.mozilla.gecko.sync.CryptoRecord;
    21 import org.mozilla.gecko.sync.DelayedWorkTracker;
    22 import org.mozilla.gecko.sync.ExtendedJSONObject;
    23 import org.mozilla.gecko.sync.HTTPFailureException;
    24 import org.mozilla.gecko.sync.Server11PreviousPostFailedException;
    25 import org.mozilla.gecko.sync.Server11RecordPostFailedException;
    26 import org.mozilla.gecko.sync.UnexpectedJSONException;
    27 import org.mozilla.gecko.sync.crypto.KeyBundle;
    28 import org.mozilla.gecko.sync.net.AuthHeaderProvider;
    29 import org.mozilla.gecko.sync.net.SyncStorageCollectionRequest;
    30 import org.mozilla.gecko.sync.net.SyncStorageRequest;
    31 import org.mozilla.gecko.sync.net.SyncStorageRequestDelegate;
    32 import org.mozilla.gecko.sync.net.SyncStorageResponse;
    33 import org.mozilla.gecko.sync.net.WBOCollectionRequestDelegate;
    34 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionBeginDelegate;
    35 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate;
    36 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionGuidsSinceDelegate;
    37 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
    38 import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionWipeDelegate;
    39 import org.mozilla.gecko.sync.repositories.domain.Record;
    41 import ch.boye.httpclientandroidlib.entity.ContentProducer;
    42 import ch.boye.httpclientandroidlib.entity.EntityTemplate;
    44 public class Server11RepositorySession extends RepositorySession {
    45   private static byte[] recordsStart;
    46   private static byte[] recordSeparator;
    47   private static byte[] recordsEnd;
    49   static {
    50     try {
    51       recordsStart    = "[\n".getBytes("UTF-8");
    52       recordSeparator = ",\n".getBytes("UTF-8");
    53       recordsEnd      = "\n]\n".getBytes("UTF-8");
    54     } catch (UnsupportedEncodingException e) {
    55       // These won't fail.
    56     }
    57   }
    59   public static final String LOG_TAG = "Server11Session";
    61   private static final int UPLOAD_BYTE_THRESHOLD = 1024 * 1024;    // 1MB.
    62   private static final int UPLOAD_ITEM_THRESHOLD = 50;
    63   private static final int PER_RECORD_OVERHEAD   = 2;              // Comma, newline.
    64   // {}, newlines, but we get to skip one record overhead.
    65   private static final int PER_BATCH_OVERHEAD    = 5 - PER_RECORD_OVERHEAD;
    67   /**
    68    * Return the X-Weave-Timestamp header from <code>response</code>, or the
    69    * current time if it is missing.
    70    * <p>
    71    * <b>Warning:</b> this can cause the timestamp of <code>response</code> to
    72    * cross domains (from server clock to local clock), which could cause records
    73    * to be skipped on account of clock drift. This should never happen, because
    74    * <i>every</i> server response should have a well-formed X-Weave-Timestamp
    75    * header.
    76    *
    77    * @param response
    78    *          The <code>SyncStorageResponse</code> to interrogate.
    79    * @return Normalized timestamp in milliseconds.
    80    */
    81   public static long getNormalizedTimestamp(SyncStorageResponse response) {
    82     long normalizedTimestamp = -1;
    83     try {
    84       normalizedTimestamp = response.normalizedWeaveTimestamp();
    85     } catch (NumberFormatException e) {
    86       Logger.warn(LOG_TAG, "Malformed X-Weave-Timestamp header received.", e);
    87     }
    88     if (-1 == normalizedTimestamp) {
    89       Logger.warn(LOG_TAG, "Computing stand-in timestamp from local clock. Clock drift could cause records to be skipped.");
    90       normalizedTimestamp = System.currentTimeMillis();
    91     }
    92     return normalizedTimestamp;
    93   }
    95   /**
    96    * Used to track outstanding requests, so that we can abort them as needed.
    97    */
    98   private Set<SyncStorageCollectionRequest> pending = Collections.synchronizedSet(new HashSet<SyncStorageCollectionRequest>());
   100   @Override
   101   public void abort() {
   102     super.abort();
   103     for (SyncStorageCollectionRequest request : pending) {
   104       request.abort();
   105     }
   106     pending.clear();
   107   }
   109   /**
   110    * Convert HTTP request delegate callbacks into fetch callbacks within the
   111    * context of this RepositorySession.
   112    *
   113    * @author rnewman
   114    *
   115    */
   116   public class RequestFetchDelegateAdapter extends WBOCollectionRequestDelegate {
   117     RepositorySessionFetchRecordsDelegate delegate;
   118     private DelayedWorkTracker workTracker = new DelayedWorkTracker();
   120     // So that we can clean up.
   121     private SyncStorageCollectionRequest request;
   123     public void setRequest(SyncStorageCollectionRequest request) {
   124       this.request = request;
   125     }
   126     private void removeRequestFromPending() {
   127       if (this.request == null) {
   128         return;
   129       }
   130       pending.remove(this.request);
   131       this.request = null;
   132     }
   134     public RequestFetchDelegateAdapter(RepositorySessionFetchRecordsDelegate delegate) {
   135       this.delegate = delegate;
   136     }
   138     @Override
   139     public AuthHeaderProvider getAuthHeaderProvider() {
   140       return serverRepository.getAuthHeaderProvider();
   141     }
   143     @Override
   144     public String ifUnmodifiedSince() {
   145       return null;
   146     }
   148     @Override
   149     public void handleRequestSuccess(SyncStorageResponse response) {
   150       Logger.debug(LOG_TAG, "Fetch done.");
   151       removeRequestFromPending();
   153       final long normalizedTimestamp = getNormalizedTimestamp(response);
   154       Logger.debug(LOG_TAG, "Fetch completed. Timestamp is " + normalizedTimestamp);
   156       // When we're done processing other events, finish.
   157       workTracker.delayWorkItem(new Runnable() {
   158         @Override
   159         public void run() {
   160           Logger.debug(LOG_TAG, "Delayed onFetchCompleted running.");
   161           // TODO: verify number of returned records.
   162           delegate.onFetchCompleted(normalizedTimestamp);
   163         }
   164       });
   165     }
   167     @Override
   168     public void handleRequestFailure(SyncStorageResponse response) {
   169       // TODO: ensure that delegate methods don't get called more than once.
   170       this.handleRequestError(new HTTPFailureException(response));
   171     }
   173     @Override
   174     public void handleRequestError(final Exception ex) {
   175       removeRequestFromPending();
   176       Logger.warn(LOG_TAG, "Got request error.", ex);
   177       // When we're done processing other events, finish.
   178       workTracker.delayWorkItem(new Runnable() {
   179         @Override
   180         public void run() {
   181           Logger.debug(LOG_TAG, "Running onFetchFailed.");
   182           delegate.onFetchFailed(ex, null);
   183         }
   184       });
   185     }
   187     @Override
   188     public void handleWBO(CryptoRecord record) {
   189       workTracker.incrementOutstanding();
   190       try {
   191         delegate.onFetchedRecord(record);
   192       } catch (Exception ex) {
   193         Logger.warn(LOG_TAG, "Got exception calling onFetchedRecord with WBO.", ex);
   194         // TODO: handle this better.
   195         throw new RuntimeException(ex);
   196       } finally {
   197         workTracker.decrementOutstanding();
   198       }
   199     }
   201     // TODO: this implies that we've screwed up our inheritance chain somehow.
   202     @Override
   203     public KeyBundle keyBundle() {
   204       return null;
   205     }
   206   }
   209   Server11Repository serverRepository;
   210   AtomicLong uploadTimestamp = new AtomicLong(0);
   212   private void bumpUploadTimestamp(long ts) {
   213     while (true) {
   214       long existing = uploadTimestamp.get();
   215       if (existing > ts) {
   216         return;
   217       }
   218       if (uploadTimestamp.compareAndSet(existing, ts)) {
   219         return;
   220       }
   221     }
   222   }
   224   public Server11RepositorySession(Repository repository) {
   225     super(repository);
   226     serverRepository = (Server11Repository) repository;
   227   }
   229   private String flattenIDs(String[] guids) {
   230     // Consider using Utils.toDelimitedString if and when the signature changes
   231     // to Collection<String> guids.
   232     if (guids.length == 0) {
   233       return "";
   234     }
   235     if (guids.length == 1) {
   236       return guids[0];
   237     }
   238     StringBuilder b = new StringBuilder();
   239     for (String guid : guids) {
   240       b.append(guid);
   241       b.append(",");
   242     }
   243     return b.substring(0, b.length() - 1);
   244   }
   246   @Override
   247   public void guidsSince(long timestamp,
   248                          RepositorySessionGuidsSinceDelegate delegate) {
   249     // TODO Auto-generated method stub
   251   }
   253   protected void fetchWithParameters(long newer,
   254                                      long limit,
   255                                      boolean full,
   256                                      String sort,
   257                                      String ids,
   258                                      RequestFetchDelegateAdapter delegate)
   259                                          throws URISyntaxException {
   261     URI collectionURI = serverRepository.collectionURI(full, newer, limit, sort, ids);
   262     SyncStorageCollectionRequest request = new SyncStorageCollectionRequest(collectionURI);
   263     request.delegate = delegate;
   265     // So it can clean up.
   266     delegate.setRequest(request);
   267     pending.add(request);
   268     request.get();
   269   }
   271   public void fetchSince(long timestamp, long limit, String sort, RepositorySessionFetchRecordsDelegate delegate) {
   272     try {
   273       this.fetchWithParameters(timestamp, limit, true, sort, null, new RequestFetchDelegateAdapter(delegate));
   274     } catch (URISyntaxException e) {
   275       delegate.onFetchFailed(e, null);
   276     }
   277   }
   279   @Override
   280   public void fetchSince(long timestamp,
   281                          RepositorySessionFetchRecordsDelegate delegate) {
   282     try {
   283       long limit = serverRepository.getDefaultFetchLimit();
   284       String sort = serverRepository.getDefaultSort();
   285       this.fetchWithParameters(timestamp, limit, true, sort, null, new RequestFetchDelegateAdapter(delegate));
   286     } catch (URISyntaxException e) {
   287       delegate.onFetchFailed(e, null);
   288     }
   289   }
   291   @Override
   292   public void fetchAll(RepositorySessionFetchRecordsDelegate delegate) {
   293     this.fetchSince(-1, delegate);
   294   }
   296   @Override
   297   public void fetch(String[] guids,
   298                     RepositorySessionFetchRecordsDelegate delegate) {
   299     // TODO: watch out for URL length limits!
   300     try {
   301       String ids = flattenIDs(guids);
   302       this.fetchWithParameters(-1, -1, true, "index", ids, new RequestFetchDelegateAdapter(delegate));
   303     } catch (URISyntaxException e) {
   304       delegate.onFetchFailed(e, null);
   305     }
   306   }
   308   @Override
   309   public void wipe(RepositorySessionWipeDelegate delegate) {
   310     if (!isActive()) {
   311       delegate.onWipeFailed(new InactiveSessionException(null));
   312       return;
   313     }
   314     // TODO: implement wipe.
   315   }
   317   protected Object recordsBufferMonitor = new Object();
   319   /**
   320    * Data of outbound records.
   321    * <p>
   322    * We buffer the data (rather than the <code>Record</code>) so that we can
   323    * flush the buffer based on outgoing transmission size.
   324    * <p>
   325    * Access should be synchronized on <code>recordsBufferMonitor</code>.
   326    */
   327   protected ArrayList<byte[]> recordsBuffer = new ArrayList<byte[]>();
   329   /**
   330    * GUIDs of outbound records.
   331    * <p>
   332    * Used to fail entire outgoing uploads.
   333    * <p>
   334    * Access should be synchronized on <code>recordsBufferMonitor</code>.
   335    */
   336   protected ArrayList<String> recordGuidsBuffer = new ArrayList<String>();
   337   protected int byteCount = PER_BATCH_OVERHEAD;
   339   @Override
   340   public void store(Record record) throws NoStoreDelegateException {
   341     if (delegate == null) {
   342       throw new NoStoreDelegateException();
   343     }
   344     this.enqueue(record);
   345   }
   347   /**
   348    * Batch incoming records until some reasonable threshold (e.g., 50),
   349    * some size limit is hit (probably way less than 3MB!), or storeDone
   350    * is received.
   351    * @param record
   352    */
   353   protected void enqueue(Record record) {
   354     // JSONify and store the bytes, rather than the record.
   355     byte[] json = record.toJSONBytes();
   356     int delta   = json.length;
   357     synchronized (recordsBufferMonitor) {
   358       if ((delta + byteCount     > UPLOAD_BYTE_THRESHOLD) ||
   359           (recordsBuffer.size() >= UPLOAD_ITEM_THRESHOLD)) {
   361         // POST the existing contents, then enqueue.
   362         flush();
   363       }
   364       recordsBuffer.add(json);
   365       recordGuidsBuffer.add(record.guid);
   366       byteCount += PER_RECORD_OVERHEAD + delta;
   367     }
   368   }
   370   // Asynchronously upload records.
   371   // Must be locked!
   372   protected void flush() {
   373     if (recordsBuffer.size() > 0) {
   374       final ArrayList<byte[]> outgoing = recordsBuffer;
   375       final ArrayList<String> outgoingGuids = recordGuidsBuffer;
   376       RepositorySessionStoreDelegate uploadDelegate = this.delegate;
   377       storeWorkQueue.execute(new RecordUploadRunnable(uploadDelegate, outgoing, outgoingGuids, byteCount));
   379       recordsBuffer = new ArrayList<byte[]>();
   380       recordGuidsBuffer = new ArrayList<String>();
   381       byteCount = PER_BATCH_OVERHEAD;
   382     }
   383   }
   385   @Override
   386   public void storeDone() {
   387     Logger.debug(LOG_TAG, "storeDone().");
   388     synchronized (recordsBufferMonitor) {
   389       flush();
   390       // Do this in a Runnable so that the timestamp is grabbed after any upload.
   391       final Runnable r = new Runnable() {
   392         @Override
   393         public void run() {
   394           synchronized (recordsBufferMonitor) {
   395             final long end = uploadTimestamp.get();
   396             Logger.debug(LOG_TAG, "Calling storeDone with " + end);
   397             storeDone(end);
   398           }
   399         }
   400       };
   401       storeWorkQueue.execute(r);
   402     }
   403   }
   405   /**
   406    * <code>true</code> if a record upload has failed this session.
   407    * <p>
   408    * This is only set in begin and possibly by <code>RecordUploadRunnable</code>.
   409    * Since those are executed serially, we can use an unsynchronized
   410    * volatile boolean here.
   411    */
   412   protected volatile boolean recordUploadFailed;
   414   @Override
   415   public void begin(RepositorySessionBeginDelegate delegate) throws InvalidSessionTransitionException {
   416     recordUploadFailed = false;
   417     super.begin(delegate);
   418   }
   420   /**
   421    * Make an HTTP request, and convert HTTP request delegate callbacks into
   422    * store callbacks within the context of this RepositorySession.
   423    *
   424    * @author rnewman
   425    *
   426    */
   427   protected class RecordUploadRunnable implements Runnable, SyncStorageRequestDelegate {
   429     public final String LOG_TAG = "RecordUploadRunnable";
   430     private ArrayList<byte[]> outgoing;
   431     private ArrayList<String> outgoingGuids;
   432     private long byteCount;
   434     public RecordUploadRunnable(RepositorySessionStoreDelegate storeDelegate,
   435                                 ArrayList<byte[]> outgoing,
   436                                 ArrayList<String> outgoingGuids,
   437                                 long byteCount) {
   438       Logger.debug(LOG_TAG, "Preparing record upload for " +
   439                   outgoing.size() + " records (" +
   440                   byteCount + " bytes).");
   441       this.outgoing = outgoing;
   442       this.outgoingGuids = outgoingGuids;
   443       this.byteCount = byteCount;
   444     }
   446     @Override
   447     public AuthHeaderProvider getAuthHeaderProvider() {
   448       return serverRepository.getAuthHeaderProvider();
   449     }
   451     @Override
   452     public String ifUnmodifiedSince() {
   453       return null;
   454     }
   456     @Override
   457     public void handleRequestSuccess(SyncStorageResponse response) {
   458       Logger.trace(LOG_TAG, "POST of " + outgoing.size() + " records done.");
   460       ExtendedJSONObject body;
   461       try {
   462         body = response.jsonObjectBody(); // jsonObjectBody() throws or returns non-null.
   463       } catch (Exception e) {
   464         Logger.error(LOG_TAG, "Got exception parsing POST success body.", e);
   465         this.handleRequestError(e);
   466         return;
   467       }
   469       // Be defensive when logging timestamp.
   470       if (body.containsKey("modified")) {
   471         Long modified = body.getTimestamp("modified");
   472         if (modified != null) {
   473           Logger.trace(LOG_TAG, "POST request success. Modified timestamp: " + modified.longValue());
   474         } else {
   475           Logger.warn(LOG_TAG, "POST success body contains malformed 'modified': " + body.toJSONString());
   476         }
   477       } else {
   478         Logger.warn(LOG_TAG, "POST success body does not contain key 'modified': " + body.toJSONString());
   479       }
   481       try {
   482         JSONArray          success = body.getArray("success");
   483         if ((success != null) &&
   484             (success.size() > 0)) {
   485           Logger.trace(LOG_TAG, "Successful records: " + success.toString());
   486           for (Object o : success) {
   487             try {
   488               delegate.onRecordStoreSucceeded((String) o);
   489             } catch (ClassCastException e) {
   490               Logger.error(LOG_TAG, "Got exception parsing POST success guid.", e);
   491               // Not much to be done.
   492             }
   493           }
   495           long normalizedTimestamp = getNormalizedTimestamp(response);
   496           Logger.trace(LOG_TAG, "Passing back upload X-Weave-Timestamp: " + normalizedTimestamp);
   497           bumpUploadTimestamp(normalizedTimestamp);
   498         }
   499         success = null; // Want to GC this ASAP.
   501         ExtendedJSONObject failed  = body.getObject("failed");
   502         if ((failed != null) &&
   503             (failed.object.size() > 0)) {
   504           Logger.debug(LOG_TAG, "Failed records: " + failed.object.toString());
   505           Exception ex = new Server11RecordPostFailedException();
   506           for (String guid : failed.keySet()) {
   507             delegate.onRecordStoreFailed(ex, guid);
   508           }
   509         }
   510         failed = null; // Want to GC this ASAP.
   511       } catch (UnexpectedJSONException e) {
   512         Logger.error(LOG_TAG, "Got exception processing success/failed in POST success body.", e);
   513         // TODO
   514         return;
   515       }
   516       Logger.debug(LOG_TAG, "POST of " + outgoing.size() + " records handled.");
   517     }
   519     @Override
   520     public void handleRequestFailure(SyncStorageResponse response) {
   521       // TODO: call session.interpretHTTPFailure.
   522       this.handleRequestError(new HTTPFailureException(response));
   523     }
   525     @Override
   526     public void handleRequestError(final Exception ex) {
   527       Logger.warn(LOG_TAG, "Got request error.", ex);
   529       recordUploadFailed = true;
   530       ArrayList<String> failedOutgoingGuids = outgoingGuids;
   531       outgoingGuids = null; // Want to GC this ASAP.
   532       for (String guid : failedOutgoingGuids) {
   533         delegate.onRecordStoreFailed(ex, guid);
   534       }
   535       return;
   536     }
   538     public class ByteArraysContentProducer implements ContentProducer {
   540       ArrayList<byte[]> outgoing;
   541       public ByteArraysContentProducer(ArrayList<byte[]> arrays) {
   542         outgoing = arrays;
   543       }
   545       @Override
   546       public void writeTo(OutputStream outstream) throws IOException {
   547         int count = outgoing.size();
   548         outstream.write(recordsStart);
   549         outstream.write(outgoing.get(0));
   550         for (int i = 1; i < count; ++i) {
   551           outstream.write(recordSeparator);
   552           outstream.write(outgoing.get(i));
   553         }
   554         outstream.write(recordsEnd);
   555       }
   556     }
   558     public class ByteArraysEntity extends EntityTemplate {
   559       private long count;
   560       public ByteArraysEntity(ArrayList<byte[]> arrays, long totalBytes) {
   561         super(new ByteArraysContentProducer(arrays));
   562         this.count = totalBytes;
   563         this.setContentType("application/json");
   564         // charset is set in BaseResource.
   565       }
   567       @Override
   568       public long getContentLength() {
   569         return count;
   570       }
   572       @Override
   573       public boolean isRepeatable() {
   574         return true;
   575       }
   576     }
   578     public ByteArraysEntity getBodyEntity() {
   579       ByteArraysEntity body = new ByteArraysEntity(outgoing, byteCount);
   580       return body;
   581     }
   583     @Override
   584     public void run() {
   585       if (recordUploadFailed) {
   586         Logger.info(LOG_TAG, "Previous record upload failed.  Failing all records and not retrying.");
   587         Exception ex = new Server11PreviousPostFailedException();
   588         for (String guid : outgoingGuids) {
   589           delegate.onRecordStoreFailed(ex, guid);
   590         }
   591         return;
   592       }
   594       if (outgoing == null ||
   595           outgoing.size() == 0) {
   596         Logger.debug(LOG_TAG, "No items: RecordUploadRunnable returning immediately.");
   597         return;
   598       }
   600       URI u = serverRepository.collectionURI();
   601       SyncStorageRequest request = new SyncStorageRequest(u);
   603       request.delegate = this;
   605       // We don't want the task queue to proceed until this request completes.
   606       // Fortunately, BaseResource is currently synchronous.
   607       // If that ever changes, you'll need to block here.
   608       ByteArraysEntity body = getBodyEntity();
   609       request.post(body);
   610     }
   611   }
   613   @Override
   614   public boolean dataAvailable() {
   615     return serverRepository.updateNeeded(getLastSyncTimestamp());
   616   }
   617 }

mercurial