michael@0: /*
michael@0: * ====================================================================
michael@0: * Licensed to the Apache Software Foundation (ASF) under one
michael@0: * or more contributor license agreements. See the NOTICE file
michael@0: * distributed with this work for additional information
michael@0: * regarding copyright ownership. The ASF licenses this file
michael@0: * to you under the Apache License, Version 2.0 (the
michael@0: * "License"); you may not use this file except in compliance
michael@0: * with the License. You may obtain a copy of the License at
michael@0: *
michael@0: * http://www.apache.org/licenses/LICENSE-2.0
michael@0: *
michael@0: * Unless required by applicable law or agreed to in writing,
michael@0: * software distributed under the License is distributed on an
michael@0: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
michael@0: * KIND, either express or implied. See the License for the
michael@0: * specific language governing permissions and limitations
michael@0: * under the License.
michael@0: * ====================================================================
michael@0: *
michael@0: * This software consists of voluntary contributions made by many
michael@0: * individuals on behalf of the Apache Software Foundation. For more
michael@0: * information on the Apache Software Foundation, please see
michael@0: * Content-Length header
.
michael@0: * Entities transferred using this stream can be maximum {@link Long#MAX_VALUE}
michael@0: * long.
michael@0: *
michael@0: * Note that this class NEVER closes the underlying stream, even when close michael@0: * gets called. Instead, it will read until the "end" of its limit on michael@0: * close, which allows for the seamless execution of subsequent HTTP 1.1 michael@0: * requests, while not requiring the client to remember to read the entire michael@0: * contents of the response. michael@0: * michael@0: * michael@0: * @since 4.0 michael@0: */ michael@0: public class ContentLengthInputStream extends InputStream { michael@0: michael@0: private static final int BUFFER_SIZE = 2048; michael@0: /** michael@0: * The maximum number of bytes that can be read from the stream. Subsequent michael@0: * read operations will return -1. michael@0: */ michael@0: private long contentLength; michael@0: michael@0: /** The current position */ michael@0: private long pos = 0; michael@0: michael@0: /** True if the stream is closed. */ michael@0: private boolean closed = false; michael@0: michael@0: /** michael@0: * Wrapped input stream that all calls are delegated to. michael@0: */ michael@0: private SessionInputBuffer in = null; michael@0: michael@0: /** michael@0: * Wraps a session input buffer and cuts off output after a defined number michael@0: * of bytes. michael@0: * michael@0: * @param in The session input buffer michael@0: * @param contentLength The maximum number of bytes that can be read from michael@0: * the stream. Subsequent read operations will return -1. michael@0: */ michael@0: public ContentLengthInputStream(final SessionInputBuffer in, long contentLength) { michael@0: super(); michael@0: if (in == null) { michael@0: throw new IllegalArgumentException("Input stream may not be null"); michael@0: } michael@0: if (contentLength < 0) { michael@0: throw new IllegalArgumentException("Content length may not be negative"); michael@0: } michael@0: this.in = in; michael@0: this.contentLength = contentLength; michael@0: } michael@0: michael@0: /** michael@0: *
Reads until the end of the known length of content.
michael@0: * michael@0: *Does not close the underlying socket input, but instead leaves it michael@0: * primed to parse the next response.
michael@0: * @throws IOException If an IO problem occurs. michael@0: */ michael@0: public void close() throws IOException { michael@0: if (!closed) { michael@0: try { michael@0: if (pos < contentLength) { michael@0: byte buffer[] = new byte[BUFFER_SIZE]; michael@0: while (read(buffer) >= 0) { michael@0: } michael@0: } michael@0: } finally { michael@0: // close after above so that we don't throw an exception trying michael@0: // to read after closed! michael@0: closed = true; michael@0: } michael@0: } michael@0: } michael@0: michael@0: public int available() throws IOException { michael@0: if (this.in instanceof BufferInfo) { michael@0: int len = ((BufferInfo) this.in).length(); michael@0: return Math.min(len, (int) (this.contentLength - this.pos)); michael@0: } else { michael@0: return 0; michael@0: } michael@0: } michael@0: michael@0: /** michael@0: * Read the next byte from the stream michael@0: * @return The next byte or -1 if the end of stream has been reached. michael@0: * @throws IOException If an IO problem occurs michael@0: * @see java.io.InputStream#read() michael@0: */ michael@0: public int read() throws IOException { michael@0: if (closed) { michael@0: throw new IOException("Attempted read from closed stream."); michael@0: } michael@0: michael@0: if (pos >= contentLength) { michael@0: return -1; michael@0: } michael@0: int b = this.in.read(); michael@0: if (b == -1) { michael@0: if (pos < contentLength) { michael@0: throw new ConnectionClosedException( michael@0: "Premature end of Content-Length delimited message body (expected: " michael@0: + contentLength + "; received: " + pos); michael@0: } michael@0: } else { michael@0: pos++; michael@0: } michael@0: return b; michael@0: } michael@0: michael@0: /** michael@0: * Does standard {@link InputStream#read(byte[], int, int)} behavior, but michael@0: * also notifies the watcher when the contents have been consumed. michael@0: * michael@0: * @param b The byte array to fill. michael@0: * @param off Start filling at this position. michael@0: * @param len The number of bytes to attempt to read. michael@0: * @return The number of bytes read, or -1 if the end of content has been michael@0: * reached. michael@0: * michael@0: * @throws java.io.IOException Should an error occur on the wrapped stream. michael@0: */ michael@0: public int read (byte[] b, int off, int len) throws java.io.IOException { michael@0: if (closed) { michael@0: throw new IOException("Attempted read from closed stream."); michael@0: } michael@0: michael@0: if (pos >= contentLength) { michael@0: return -1; michael@0: } michael@0: michael@0: if (pos + len > contentLength) { michael@0: len = (int) (contentLength - pos); michael@0: } michael@0: int count = this.in.read(b, off, len); michael@0: if (count == -1 && pos < contentLength) { michael@0: throw new ConnectionClosedException( michael@0: "Premature end of Content-Length delimited message body (expected: " michael@0: + contentLength + "; received: " + pos); michael@0: } michael@0: if (count > 0) { michael@0: pos += count; michael@0: } michael@0: return count; michael@0: } michael@0: michael@0: michael@0: /** michael@0: * Read more bytes from the stream. michael@0: * @param b The byte array to put the new data in. michael@0: * @return The number of bytes read into the buffer. michael@0: * @throws IOException If an IO problem occurs michael@0: * @see java.io.InputStream#read(byte[]) michael@0: */ michael@0: public int read(byte[] b) throws IOException { michael@0: return read(b, 0, b.length); michael@0: } michael@0: michael@0: /** michael@0: * Skips and discards a number of bytes from the input stream. michael@0: * @param n The number of bytes to skip. michael@0: * @return The actual number of bytes skipped. <= 0 if no bytes michael@0: * are skipped. michael@0: * @throws IOException If an error occurs while skipping bytes. michael@0: * @see InputStream#skip(long) michael@0: */ michael@0: public long skip(long n) throws IOException { michael@0: if (n <= 0) { michael@0: return 0; michael@0: } michael@0: byte[] buffer = new byte[BUFFER_SIZE]; michael@0: // make sure we don't skip more bytes than are michael@0: // still available michael@0: long remaining = Math.min(n, this.contentLength - this.pos); michael@0: // skip and keep track of the bytes actually skipped michael@0: long count = 0; michael@0: while (remaining > 0) { michael@0: int l = read(buffer, 0, (int)Math.min(BUFFER_SIZE, remaining)); michael@0: if (l == -1) { michael@0: break; michael@0: } michael@0: count += l; michael@0: remaining -= l; michael@0: } michael@0: return count; michael@0: } michael@0: }