michael@0: /* This Source Code Form is subject to the terms of the Mozilla Public michael@0: * License, v. 2.0. If a copy of the MPL was not distributed with this michael@0: * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ michael@0: michael@0: package org.mozilla.gecko.sync.net; michael@0: michael@0: import java.io.BufferedReader; michael@0: import java.io.IOException; michael@0: import java.io.InputStream; michael@0: import java.io.InputStreamReader; michael@0: import java.net.URI; michael@0: michael@0: import org.mozilla.gecko.background.common.log.Logger; michael@0: michael@0: import ch.boye.httpclientandroidlib.Header; michael@0: import ch.boye.httpclientandroidlib.HttpEntity; michael@0: import ch.boye.httpclientandroidlib.HttpResponse; michael@0: import ch.boye.httpclientandroidlib.client.methods.HttpRequestBase; michael@0: import ch.boye.httpclientandroidlib.impl.client.DefaultHttpClient; michael@0: michael@0: /** michael@0: * A request class that handles line-by-line responses. Eventually this will michael@0: * handle real stream processing; for now, just parse the returned body michael@0: * line-by-line. michael@0: * michael@0: * @author rnewman michael@0: * michael@0: */ michael@0: public class SyncStorageCollectionRequest extends SyncStorageRequest { michael@0: private static final String LOG_TAG = "CollectionRequest"; michael@0: michael@0: public SyncStorageCollectionRequest(URI uri) { michael@0: super(uri); michael@0: } michael@0: michael@0: protected volatile boolean aborting = false; michael@0: michael@0: /** michael@0: * Instruct the request that it should process no more records, michael@0: * and decline to notify any more delegate callbacks. michael@0: */ michael@0: public void abort() { michael@0: aborting = true; michael@0: try { michael@0: this.resource.request.abort(); michael@0: } catch (Exception e) { michael@0: // Just in case. michael@0: Logger.warn(LOG_TAG, "Got exception in abort: " + e); michael@0: } michael@0: } michael@0: michael@0: @Override michael@0: protected BaseResourceDelegate makeResourceDelegate(SyncStorageRequest request) { michael@0: return new SyncCollectionResourceDelegate((SyncStorageCollectionRequest) request); michael@0: } michael@0: michael@0: // TODO: this is awful. michael@0: public class SyncCollectionResourceDelegate extends michael@0: SyncStorageResourceDelegate { michael@0: michael@0: private static final String CONTENT_TYPE_INCREMENTAL = "application/newlines"; michael@0: private static final int FETCH_BUFFER_SIZE = 16 * 1024; // 16K chars. michael@0: michael@0: SyncCollectionResourceDelegate(SyncStorageCollectionRequest request) { michael@0: super(request); michael@0: } michael@0: michael@0: @Override michael@0: public void addHeaders(HttpRequestBase request, DefaultHttpClient client) { michael@0: super.addHeaders(request, client); michael@0: request.setHeader("Accept", CONTENT_TYPE_INCREMENTAL); michael@0: // Caller is responsible for setting full=1. michael@0: } michael@0: michael@0: @Override michael@0: public void handleHttpResponse(HttpResponse response) { michael@0: if (aborting) { michael@0: return; michael@0: } michael@0: michael@0: if (response.getStatusLine().getStatusCode() != 200) { michael@0: super.handleHttpResponse(response); michael@0: return; michael@0: } michael@0: michael@0: HttpEntity entity = response.getEntity(); michael@0: Header contentType = entity.getContentType(); michael@0: if (!contentType.getValue().startsWith(CONTENT_TYPE_INCREMENTAL)) { michael@0: // Not incremental! michael@0: super.handleHttpResponse(response); michael@0: return; michael@0: } michael@0: michael@0: // TODO: at this point we can access X-Weave-Timestamp, compare michael@0: // that to our local timestamp, and compute an estimate of clock michael@0: // skew. We can provide this to the incremental delegate, which michael@0: // will allow it to seamlessly correct timestamps on the records michael@0: // it processes. Bug 721887. michael@0: michael@0: // Line-by-line processing, then invoke success. michael@0: SyncStorageCollectionRequestDelegate delegate = (SyncStorageCollectionRequestDelegate) this.request.delegate; michael@0: InputStream content = null; michael@0: BufferedReader br = null; michael@0: try { michael@0: content = entity.getContent(); michael@0: br = new BufferedReader(new InputStreamReader(content), FETCH_BUFFER_SIZE); michael@0: String line; michael@0: michael@0: // This relies on connection timeouts at the HTTP layer. michael@0: while (!aborting && michael@0: null != (line = br.readLine())) { michael@0: try { michael@0: delegate.handleRequestProgress(line); michael@0: } catch (Exception ex) { michael@0: delegate.handleRequestError(new HandleProgressException(ex)); michael@0: BaseResource.consumeEntity(entity); michael@0: return; michael@0: } michael@0: } michael@0: if (aborting) { michael@0: // So we don't hit the success case below. michael@0: return; michael@0: } michael@0: } catch (IOException ex) { michael@0: if (!aborting) { michael@0: delegate.handleRequestError(ex); michael@0: } michael@0: BaseResource.consumeEntity(entity); michael@0: return; michael@0: } finally { michael@0: // Attempt to close the stream and reader. michael@0: if (br != null) { michael@0: try { michael@0: br.close(); michael@0: } catch (IOException e) { michael@0: // We don't care if this fails. michael@0: } michael@0: } michael@0: } michael@0: // We're done processing the entity. Don't let fetching the body succeed! michael@0: BaseResource.consumeEntity(entity); michael@0: delegate.handleRequestSuccess(new SyncStorageResponse(response)); michael@0: } michael@0: } michael@0: }