|
1 /* This Source Code Form is subject to the terms of the Mozilla Public |
|
2 * License, v. 2.0. If a copy of the MPL was not distributed with this |
|
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ |
|
4 |
|
5 package org.mozilla.gecko.sync.net; |
|
6 |
|
7 import java.io.BufferedReader; |
|
8 import java.io.IOException; |
|
9 import java.io.InputStream; |
|
10 import java.io.InputStreamReader; |
|
11 import java.net.URI; |
|
12 |
|
13 import org.mozilla.gecko.background.common.log.Logger; |
|
14 |
|
15 import ch.boye.httpclientandroidlib.Header; |
|
16 import ch.boye.httpclientandroidlib.HttpEntity; |
|
17 import ch.boye.httpclientandroidlib.HttpResponse; |
|
18 import ch.boye.httpclientandroidlib.client.methods.HttpRequestBase; |
|
19 import ch.boye.httpclientandroidlib.impl.client.DefaultHttpClient; |
|
20 |
|
21 /** |
|
22 * A request class that handles line-by-line responses. Eventually this will |
|
23 * handle real stream processing; for now, just parse the returned body |
|
24 * line-by-line. |
|
25 * |
|
26 * @author rnewman |
|
27 * |
|
28 */ |
|
29 public class SyncStorageCollectionRequest extends SyncStorageRequest { |
|
30 private static final String LOG_TAG = "CollectionRequest"; |
|
31 |
|
32 public SyncStorageCollectionRequest(URI uri) { |
|
33 super(uri); |
|
34 } |
|
35 |
|
36 protected volatile boolean aborting = false; |
|
37 |
|
38 /** |
|
39 * Instruct the request that it should process no more records, |
|
40 * and decline to notify any more delegate callbacks. |
|
41 */ |
|
42 public void abort() { |
|
43 aborting = true; |
|
44 try { |
|
45 this.resource.request.abort(); |
|
46 } catch (Exception e) { |
|
47 // Just in case. |
|
48 Logger.warn(LOG_TAG, "Got exception in abort: " + e); |
|
49 } |
|
50 } |
|
51 |
|
52 @Override |
|
53 protected BaseResourceDelegate makeResourceDelegate(SyncStorageRequest request) { |
|
54 return new SyncCollectionResourceDelegate((SyncStorageCollectionRequest) request); |
|
55 } |
|
56 |
|
57 // TODO: this is awful. |
|
58 public class SyncCollectionResourceDelegate extends |
|
59 SyncStorageResourceDelegate { |
|
60 |
|
61 private static final String CONTENT_TYPE_INCREMENTAL = "application/newlines"; |
|
62 private static final int FETCH_BUFFER_SIZE = 16 * 1024; // 16K chars. |
|
63 |
|
64 SyncCollectionResourceDelegate(SyncStorageCollectionRequest request) { |
|
65 super(request); |
|
66 } |
|
67 |
|
68 @Override |
|
69 public void addHeaders(HttpRequestBase request, DefaultHttpClient client) { |
|
70 super.addHeaders(request, client); |
|
71 request.setHeader("Accept", CONTENT_TYPE_INCREMENTAL); |
|
72 // Caller is responsible for setting full=1. |
|
73 } |
|
74 |
|
75 @Override |
|
76 public void handleHttpResponse(HttpResponse response) { |
|
77 if (aborting) { |
|
78 return; |
|
79 } |
|
80 |
|
81 if (response.getStatusLine().getStatusCode() != 200) { |
|
82 super.handleHttpResponse(response); |
|
83 return; |
|
84 } |
|
85 |
|
86 HttpEntity entity = response.getEntity(); |
|
87 Header contentType = entity.getContentType(); |
|
88 if (!contentType.getValue().startsWith(CONTENT_TYPE_INCREMENTAL)) { |
|
89 // Not incremental! |
|
90 super.handleHttpResponse(response); |
|
91 return; |
|
92 } |
|
93 |
|
94 // TODO: at this point we can access X-Weave-Timestamp, compare |
|
95 // that to our local timestamp, and compute an estimate of clock |
|
96 // skew. We can provide this to the incremental delegate, which |
|
97 // will allow it to seamlessly correct timestamps on the records |
|
98 // it processes. Bug 721887. |
|
99 |
|
100 // Line-by-line processing, then invoke success. |
|
101 SyncStorageCollectionRequestDelegate delegate = (SyncStorageCollectionRequestDelegate) this.request.delegate; |
|
102 InputStream content = null; |
|
103 BufferedReader br = null; |
|
104 try { |
|
105 content = entity.getContent(); |
|
106 br = new BufferedReader(new InputStreamReader(content), FETCH_BUFFER_SIZE); |
|
107 String line; |
|
108 |
|
109 // This relies on connection timeouts at the HTTP layer. |
|
110 while (!aborting && |
|
111 null != (line = br.readLine())) { |
|
112 try { |
|
113 delegate.handleRequestProgress(line); |
|
114 } catch (Exception ex) { |
|
115 delegate.handleRequestError(new HandleProgressException(ex)); |
|
116 BaseResource.consumeEntity(entity); |
|
117 return; |
|
118 } |
|
119 } |
|
120 if (aborting) { |
|
121 // So we don't hit the success case below. |
|
122 return; |
|
123 } |
|
124 } catch (IOException ex) { |
|
125 if (!aborting) { |
|
126 delegate.handleRequestError(ex); |
|
127 } |
|
128 BaseResource.consumeEntity(entity); |
|
129 return; |
|
130 } finally { |
|
131 // Attempt to close the stream and reader. |
|
132 if (br != null) { |
|
133 try { |
|
134 br.close(); |
|
135 } catch (IOException e) { |
|
136 // We don't care if this fails. |
|
137 } |
|
138 } |
|
139 } |
|
140 // We're done processing the entity. Don't let fetching the body succeed! |
|
141 BaseResource.consumeEntity(entity); |
|
142 delegate.handleRequestSuccess(new SyncStorageResponse(response)); |
|
143 } |
|
144 } |
|
145 } |