1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 1.2 +++ b/mobile/android/base/sync/net/SyncStorageCollectionRequest.java Wed Dec 31 06:09:35 2014 +0100 1.3 @@ -0,0 +1,145 @@ 1.4 +/* This Source Code Form is subject to the terms of the Mozilla Public 1.5 + * License, v. 2.0. If a copy of the MPL was not distributed with this 1.6 + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 1.7 + 1.8 +package org.mozilla.gecko.sync.net; 1.9 + 1.10 +import java.io.BufferedReader; 1.11 +import java.io.IOException; 1.12 +import java.io.InputStream; 1.13 +import java.io.InputStreamReader; 1.14 +import java.net.URI; 1.15 + 1.16 +import org.mozilla.gecko.background.common.log.Logger; 1.17 + 1.18 +import ch.boye.httpclientandroidlib.Header; 1.19 +import ch.boye.httpclientandroidlib.HttpEntity; 1.20 +import ch.boye.httpclientandroidlib.HttpResponse; 1.21 +import ch.boye.httpclientandroidlib.client.methods.HttpRequestBase; 1.22 +import ch.boye.httpclientandroidlib.impl.client.DefaultHttpClient; 1.23 + 1.24 +/** 1.25 + * A request class that handles line-by-line responses. Eventually this will 1.26 + * handle real stream processing; for now, just parse the returned body 1.27 + * line-by-line. 1.28 + * 1.29 + * @author rnewman 1.30 + * 1.31 + */ 1.32 +public class SyncStorageCollectionRequest extends SyncStorageRequest { 1.33 + private static final String LOG_TAG = "CollectionRequest"; 1.34 + 1.35 + public SyncStorageCollectionRequest(URI uri) { 1.36 + super(uri); 1.37 + } 1.38 + 1.39 + protected volatile boolean aborting = false; 1.40 + 1.41 + /** 1.42 + * Instruct the request that it should process no more records, 1.43 + * and decline to notify any more delegate callbacks. 1.44 + */ 1.45 + public void abort() { 1.46 + aborting = true; 1.47 + try { 1.48 + this.resource.request.abort(); 1.49 + } catch (Exception e) { 1.50 + // Just in case. 1.51 + Logger.warn(LOG_TAG, "Got exception in abort: " + e); 1.52 + } 1.53 + } 1.54 + 1.55 + @Override 1.56 + protected BaseResourceDelegate makeResourceDelegate(SyncStorageRequest request) { 1.57 + return new SyncCollectionResourceDelegate((SyncStorageCollectionRequest) request); 1.58 + } 1.59 + 1.60 + // TODO: this is awful. 1.61 + public class SyncCollectionResourceDelegate extends 1.62 + SyncStorageResourceDelegate { 1.63 + 1.64 + private static final String CONTENT_TYPE_INCREMENTAL = "application/newlines"; 1.65 + private static final int FETCH_BUFFER_SIZE = 16 * 1024; // 16K chars. 1.66 + 1.67 + SyncCollectionResourceDelegate(SyncStorageCollectionRequest request) { 1.68 + super(request); 1.69 + } 1.70 + 1.71 + @Override 1.72 + public void addHeaders(HttpRequestBase request, DefaultHttpClient client) { 1.73 + super.addHeaders(request, client); 1.74 + request.setHeader("Accept", CONTENT_TYPE_INCREMENTAL); 1.75 + // Caller is responsible for setting full=1. 1.76 + } 1.77 + 1.78 + @Override 1.79 + public void handleHttpResponse(HttpResponse response) { 1.80 + if (aborting) { 1.81 + return; 1.82 + } 1.83 + 1.84 + if (response.getStatusLine().getStatusCode() != 200) { 1.85 + super.handleHttpResponse(response); 1.86 + return; 1.87 + } 1.88 + 1.89 + HttpEntity entity = response.getEntity(); 1.90 + Header contentType = entity.getContentType(); 1.91 + if (!contentType.getValue().startsWith(CONTENT_TYPE_INCREMENTAL)) { 1.92 + // Not incremental! 1.93 + super.handleHttpResponse(response); 1.94 + return; 1.95 + } 1.96 + 1.97 + // TODO: at this point we can access X-Weave-Timestamp, compare 1.98 + // that to our local timestamp, and compute an estimate of clock 1.99 + // skew. We can provide this to the incremental delegate, which 1.100 + // will allow it to seamlessly correct timestamps on the records 1.101 + // it processes. Bug 721887. 1.102 + 1.103 + // Line-by-line processing, then invoke success. 1.104 + SyncStorageCollectionRequestDelegate delegate = (SyncStorageCollectionRequestDelegate) this.request.delegate; 1.105 + InputStream content = null; 1.106 + BufferedReader br = null; 1.107 + try { 1.108 + content = entity.getContent(); 1.109 + br = new BufferedReader(new InputStreamReader(content), FETCH_BUFFER_SIZE); 1.110 + String line; 1.111 + 1.112 + // This relies on connection timeouts at the HTTP layer. 1.113 + while (!aborting && 1.114 + null != (line = br.readLine())) { 1.115 + try { 1.116 + delegate.handleRequestProgress(line); 1.117 + } catch (Exception ex) { 1.118 + delegate.handleRequestError(new HandleProgressException(ex)); 1.119 + BaseResource.consumeEntity(entity); 1.120 + return; 1.121 + } 1.122 + } 1.123 + if (aborting) { 1.124 + // So we don't hit the success case below. 1.125 + return; 1.126 + } 1.127 + } catch (IOException ex) { 1.128 + if (!aborting) { 1.129 + delegate.handleRequestError(ex); 1.130 + } 1.131 + BaseResource.consumeEntity(entity); 1.132 + return; 1.133 + } finally { 1.134 + // Attempt to close the stream and reader. 1.135 + if (br != null) { 1.136 + try { 1.137 + br.close(); 1.138 + } catch (IOException e) { 1.139 + // We don't care if this fails. 1.140 + } 1.141 + } 1.142 + } 1.143 + // We're done processing the entity. Don't let fetching the body succeed! 1.144 + BaseResource.consumeEntity(entity); 1.145 + delegate.handleRequestSuccess(new SyncStorageResponse(response)); 1.146 + } 1.147 + } 1.148 +}