michael@0: /*
michael@0: * ====================================================================
michael@0: *
michael@0: * Licensed to the Apache Software Foundation (ASF) under one or more
michael@0: * contributor license agreements. See the NOTICE file distributed with
michael@0: * this work for additional information regarding copyright ownership.
michael@0: * The ASF licenses this file to You under the Apache License, Version 2.0
michael@0: * (the "License"); you may not use this file except in compliance with
michael@0: * the License. You may obtain a copy of the License at
michael@0: *
michael@0: * http://www.apache.org/licenses/LICENSE-2.0
michael@0: *
michael@0: * Unless required by applicable law or agreed to in writing, software
michael@0: * distributed under the License is distributed on an "AS IS" BASIS,
michael@0: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
michael@0: * See the License for the specific language governing permissions and
michael@0: * limitations under the License.
michael@0: * ====================================================================
michael@0: *
michael@0: * This software consists of voluntary contributions made by many
michael@0: * individuals on behalf of the Apache Software Foundation. For more
michael@0: * information on the Apache Software Foundation, please see
michael@0: *
michael@0: * This class is based on AutoCloseInputStream
in HttpClient 3.1,
michael@0: * but has notable differences. It does not allow mark/reset, distinguishes
michael@0: * different kinds of event, and does not always close the underlying stream
michael@0: * on EOF. That decision is left to the {@link EofSensorWatcher watcher}.
michael@0: *
michael@0: * @see EofSensorWatcher
michael@0: *
michael@0: * @since 4.0
michael@0: */
michael@0: // don't use FilterInputStream as the base class, we'd have to
michael@0: // override markSupported(), mark(), and reset() to disable them
michael@0: @NotThreadSafe
michael@0: public class EofSensorInputStream extends InputStream implements ConnectionReleaseTrigger {
michael@0:
michael@0: /**
michael@0: * The wrapped input stream, while accessible.
michael@0: * The value changes to null
when the wrapped stream
michael@0: * becomes inaccessible.
michael@0: */
michael@0: protected InputStream wrappedStream;
michael@0:
michael@0: /**
michael@0: * Indicates whether this stream itself is closed.
michael@0: * If it isn't, but {@link #wrappedStream wrappedStream}
michael@0: * is null
, we're running in EOF mode.
michael@0: * All read operations will indicate EOF without accessing
michael@0: * the underlying stream. After closing this stream, read
michael@0: * operations will trigger an {@link IOException IOException}.
michael@0: *
michael@0: * @see #isReadAllowed isReadAllowed
michael@0: */
michael@0: private boolean selfClosed;
michael@0:
michael@0: /** The watcher to be notified, if any. */
michael@0: private final EofSensorWatcher eofWatcher;
michael@0:
michael@0: /**
michael@0: * Creates a new EOF sensor.
michael@0: * If no watcher is passed, the underlying stream will simply be
michael@0: * closed when EOF is detected or {@link #close close} is called.
michael@0: * Otherwise, the watcher decides whether the underlying stream
michael@0: * should be closed before detaching from it.
michael@0: *
michael@0: * @param in the wrapped stream
michael@0: * @param watcher the watcher for events, or null
for
michael@0: * auto-close behavior without notification
michael@0: */
michael@0: public EofSensorInputStream(final InputStream in,
michael@0: final EofSensorWatcher watcher) {
michael@0: if (in == null) {
michael@0: throw new IllegalArgumentException
michael@0: ("Wrapped stream may not be null.");
michael@0: }
michael@0:
michael@0: wrappedStream = in;
michael@0: selfClosed = false;
michael@0: eofWatcher = watcher;
michael@0: }
michael@0:
michael@0: /**
michael@0: * Checks whether the underlying stream can be read from.
michael@0: *
michael@0: * @return true
if the underlying stream is accessible,
michael@0: * false
if this stream is in EOF mode and
michael@0: * detached from the underlying stream
michael@0: *
michael@0: * @throws IOException if this stream is already closed
michael@0: */
michael@0: protected boolean isReadAllowed() throws IOException {
michael@0: if (selfClosed) {
michael@0: throw new IOException("Attempted read on closed stream.");
michael@0: }
michael@0: return (wrappedStream != null);
michael@0: }
michael@0:
michael@0: @Override
michael@0: public int read() throws IOException {
michael@0: int l = -1;
michael@0:
michael@0: if (isReadAllowed()) {
michael@0: try {
michael@0: l = wrappedStream.read();
michael@0: checkEOF(l);
michael@0: } catch (IOException ex) {
michael@0: checkAbort();
michael@0: throw ex;
michael@0: }
michael@0: }
michael@0:
michael@0: return l;
michael@0: }
michael@0:
michael@0: @Override
michael@0: public int read(byte[] b, int off, int len) throws IOException {
michael@0: int l = -1;
michael@0:
michael@0: if (isReadAllowed()) {
michael@0: try {
michael@0: l = wrappedStream.read(b, off, len);
michael@0: checkEOF(l);
michael@0: } catch (IOException ex) {
michael@0: checkAbort();
michael@0: throw ex;
michael@0: }
michael@0: }
michael@0:
michael@0: return l;
michael@0: }
michael@0:
michael@0: @Override
michael@0: public int read(byte[] b) throws IOException {
michael@0: int l = -1;
michael@0:
michael@0: if (isReadAllowed()) {
michael@0: try {
michael@0: l = wrappedStream.read(b);
michael@0: checkEOF(l);
michael@0: } catch (IOException ex) {
michael@0: checkAbort();
michael@0: throw ex;
michael@0: }
michael@0: }
michael@0: return l;
michael@0: }
michael@0:
michael@0: @Override
michael@0: public int available() throws IOException {
michael@0: int a = 0; // not -1
michael@0:
michael@0: if (isReadAllowed()) {
michael@0: try {
michael@0: a = wrappedStream.available();
michael@0: // no checkEOF() here, available() can't trigger EOF
michael@0: } catch (IOException ex) {
michael@0: checkAbort();
michael@0: throw ex;
michael@0: }
michael@0: }
michael@0:
michael@0: return a;
michael@0: }
michael@0:
michael@0: @Override
michael@0: public void close() throws IOException {
michael@0: // tolerate multiple calls to close()
michael@0: selfClosed = true;
michael@0: checkClose();
michael@0: }
michael@0:
michael@0: /**
michael@0: * Detects EOF and notifies the watcher.
michael@0: * This method should only be called while the underlying stream is
michael@0: * still accessible. Use {@link #isReadAllowed isReadAllowed} to
michael@0: * check that condition.
michael@0: *
michael@0: * If EOF is detected, the watcher will be notified and this stream
michael@0: * is detached from the underlying stream. This prevents multiple
michael@0: * notifications from this stream.
michael@0: *
michael@0: * @param eof the result of the calling read operation.
michael@0: * A negative value indicates that EOF is reached.
michael@0: *
michael@0: * @throws IOException
michael@0: * in case of an IO problem on closing the underlying stream
michael@0: */
michael@0: protected void checkEOF(int eof) throws IOException {
michael@0:
michael@0: if ((wrappedStream != null) && (eof < 0)) {
michael@0: try {
michael@0: boolean scws = true; // should close wrapped stream?
michael@0: if (eofWatcher != null)
michael@0: scws = eofWatcher.eofDetected(wrappedStream);
michael@0: if (scws)
michael@0: wrappedStream.close();
michael@0: } finally {
michael@0: wrappedStream = null;
michael@0: }
michael@0: }
michael@0: }
michael@0:
michael@0: /**
michael@0: * Detects stream close and notifies the watcher.
michael@0: * There's not much to detect since this is called by {@link #close close}.
michael@0: * The watcher will only be notified if this stream is closed
michael@0: * for the first time and before EOF has been detected.
michael@0: * This stream will be detached from the underlying stream to prevent
michael@0: * multiple notifications to the watcher.
michael@0: *
michael@0: * @throws IOException
michael@0: * in case of an IO problem on closing the underlying stream
michael@0: */
michael@0: protected void checkClose() throws IOException {
michael@0:
michael@0: if (wrappedStream != null) {
michael@0: try {
michael@0: boolean scws = true; // should close wrapped stream?
michael@0: if (eofWatcher != null)
michael@0: scws = eofWatcher.streamClosed(wrappedStream);
michael@0: if (scws)
michael@0: wrappedStream.close();
michael@0: } finally {
michael@0: wrappedStream = null;
michael@0: }
michael@0: }
michael@0: }
michael@0:
michael@0: /**
michael@0: * Detects stream abort and notifies the watcher.
michael@0: * There's not much to detect since this is called by
michael@0: * {@link #abortConnection abortConnection}.
michael@0: * The watcher will only be notified if this stream is aborted
michael@0: * for the first time and before EOF has been detected or the
michael@0: * stream has been {@link #close closed} gracefully.
michael@0: * This stream will be detached from the underlying stream to prevent
michael@0: * multiple notifications to the watcher.
michael@0: *
michael@0: * @throws IOException
michael@0: * in case of an IO problem on closing the underlying stream
michael@0: */
michael@0: protected void checkAbort() throws IOException {
michael@0:
michael@0: if (wrappedStream != null) {
michael@0: try {
michael@0: boolean scws = true; // should close wrapped stream?
michael@0: if (eofWatcher != null)
michael@0: scws = eofWatcher.streamAbort(wrappedStream);
michael@0: if (scws)
michael@0: wrappedStream.close();
michael@0: } finally {
michael@0: wrappedStream = null;
michael@0: }
michael@0: }
michael@0: }
michael@0:
michael@0: /**
michael@0: * Same as {@link #close close()}.
michael@0: */
michael@0: public void releaseConnection() throws IOException {
michael@0: close();
michael@0: }
michael@0:
michael@0: /**
michael@0: * Aborts this stream.
michael@0: * This is a special version of {@link #close close()} which prevents
michael@0: * re-use of the underlying connection, if any. Calling this method
michael@0: * indicates that there should be no attempt to read until the end of
michael@0: * the stream.
michael@0: */
michael@0: public void abortConnection() throws IOException {
michael@0: // tolerate multiple calls
michael@0: selfClosed = true;
michael@0: checkAbort();
michael@0: }
michael@0:
michael@0: }
michael@0: