Wed, 31 Dec 2014 07:22:50 +0100
Correct previous dual key logic pending first delivery installment.
1 /* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
5 package org.mozilla.gecko.sync.net;
7 import java.io.BufferedReader;
8 import java.io.IOException;
9 import java.io.InputStream;
10 import java.io.InputStreamReader;
11 import java.net.URI;
13 import org.mozilla.gecko.background.common.log.Logger;
15 import ch.boye.httpclientandroidlib.Header;
16 import ch.boye.httpclientandroidlib.HttpEntity;
17 import ch.boye.httpclientandroidlib.HttpResponse;
18 import ch.boye.httpclientandroidlib.client.methods.HttpRequestBase;
19 import ch.boye.httpclientandroidlib.impl.client.DefaultHttpClient;
21 /**
22 * A request class that handles line-by-line responses. Eventually this will
23 * handle real stream processing; for now, just parse the returned body
24 * line-by-line.
25 *
26 * @author rnewman
27 *
28 */
29 public class SyncStorageCollectionRequest extends SyncStorageRequest {
30 private static final String LOG_TAG = "CollectionRequest";
32 public SyncStorageCollectionRequest(URI uri) {
33 super(uri);
34 }
36 protected volatile boolean aborting = false;
38 /**
39 * Instruct the request that it should process no more records,
40 * and decline to notify any more delegate callbacks.
41 */
42 public void abort() {
43 aborting = true;
44 try {
45 this.resource.request.abort();
46 } catch (Exception e) {
47 // Just in case.
48 Logger.warn(LOG_TAG, "Got exception in abort: " + e);
49 }
50 }
52 @Override
53 protected BaseResourceDelegate makeResourceDelegate(SyncStorageRequest request) {
54 return new SyncCollectionResourceDelegate((SyncStorageCollectionRequest) request);
55 }
57 // TODO: this is awful.
58 public class SyncCollectionResourceDelegate extends
59 SyncStorageResourceDelegate {
61 private static final String CONTENT_TYPE_INCREMENTAL = "application/newlines";
62 private static final int FETCH_BUFFER_SIZE = 16 * 1024; // 16K chars.
64 SyncCollectionResourceDelegate(SyncStorageCollectionRequest request) {
65 super(request);
66 }
68 @Override
69 public void addHeaders(HttpRequestBase request, DefaultHttpClient client) {
70 super.addHeaders(request, client);
71 request.setHeader("Accept", CONTENT_TYPE_INCREMENTAL);
72 // Caller is responsible for setting full=1.
73 }
75 @Override
76 public void handleHttpResponse(HttpResponse response) {
77 if (aborting) {
78 return;
79 }
81 if (response.getStatusLine().getStatusCode() != 200) {
82 super.handleHttpResponse(response);
83 return;
84 }
86 HttpEntity entity = response.getEntity();
87 Header contentType = entity.getContentType();
88 if (!contentType.getValue().startsWith(CONTENT_TYPE_INCREMENTAL)) {
89 // Not incremental!
90 super.handleHttpResponse(response);
91 return;
92 }
94 // TODO: at this point we can access X-Weave-Timestamp, compare
95 // that to our local timestamp, and compute an estimate of clock
96 // skew. We can provide this to the incremental delegate, which
97 // will allow it to seamlessly correct timestamps on the records
98 // it processes. Bug 721887.
100 // Line-by-line processing, then invoke success.
101 SyncStorageCollectionRequestDelegate delegate = (SyncStorageCollectionRequestDelegate) this.request.delegate;
102 InputStream content = null;
103 BufferedReader br = null;
104 try {
105 content = entity.getContent();
106 br = new BufferedReader(new InputStreamReader(content), FETCH_BUFFER_SIZE);
107 String line;
109 // This relies on connection timeouts at the HTTP layer.
110 while (!aborting &&
111 null != (line = br.readLine())) {
112 try {
113 delegate.handleRequestProgress(line);
114 } catch (Exception ex) {
115 delegate.handleRequestError(new HandleProgressException(ex));
116 BaseResource.consumeEntity(entity);
117 return;
118 }
119 }
120 if (aborting) {
121 // So we don't hit the success case below.
122 return;
123 }
124 } catch (IOException ex) {
125 if (!aborting) {
126 delegate.handleRequestError(ex);
127 }
128 BaseResource.consumeEntity(entity);
129 return;
130 } finally {
131 // Attempt to close the stream and reader.
132 if (br != null) {
133 try {
134 br.close();
135 } catch (IOException e) {
136 // We don't care if this fails.
137 }
138 }
139 }
140 // We're done processing the entity. Don't let fetching the body succeed!
141 BaseResource.consumeEntity(entity);
142 delegate.handleRequestSuccess(new SyncStorageResponse(response));
143 }
144 }
145 }