michael@0: /*
michael@0: * ====================================================================
michael@0: *
michael@0: * Licensed to the Apache Software Foundation (ASF) under one or more
michael@0: * contributor license agreements. See the NOTICE file distributed with
michael@0: * this work for additional information regarding copyright ownership.
michael@0: * The ASF licenses this file to You under the Apache License, Version 2.0
michael@0: * (the "License"); you may not use this file except in compliance with
michael@0: * the License. You may obtain a copy of the License at
michael@0: *
michael@0: * http://www.apache.org/licenses/LICENSE-2.0
michael@0: *
michael@0: * Unless required by applicable law or agreed to in writing, software
michael@0: * distributed under the License is distributed on an "AS IS" BASIS,
michael@0: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
michael@0: * See the License for the specific language governing permissions and
michael@0: * limitations under the License.
michael@0: * ====================================================================
michael@0: *
michael@0: * This software consists of voluntary contributions made by many
michael@0: * individuals on behalf of the Apache Software Foundation. For more
michael@0: * information on the Apache Software Foundation, please see
michael@0: * MultiThreadedHttpConnectionManager
michael@0: * in HttpClient 3.x, see there for original authors. It implements the same
michael@0: * algorithm for connection re-use and connection-per-host enforcement:
michael@0: *
synchronized
methods.
michael@0: *
michael@0: * @since 4.0
michael@0: */
michael@0: @ThreadSafe
michael@0: @SuppressWarnings("deprecation")
michael@0: public class ConnPoolByRoute extends AbstractConnPool { //TODO: remove dependency on AbstractConnPool
michael@0:
michael@0: public HttpClientAndroidLog log = new HttpClientAndroidLog(getClass());
michael@0:
michael@0: private final Lock poolLock;
michael@0:
michael@0: /** Connection operator for this pool */
michael@0: protected final ClientConnectionOperator operator;
michael@0:
michael@0: /** Connections per route lookup */
michael@0: protected final ConnPerRoute connPerRoute;
michael@0:
michael@0: /** References to issued connections */
michael@0: protected final Setnull
michael@0: *
michael@0: * @return a waiting thread representation
michael@0: */
michael@0: protected WaitingThread newWaitingThread(Condition cond,
michael@0: RouteSpecificPool rospl) {
michael@0: return new WaitingThread(cond, rospl);
michael@0: }
michael@0:
michael@0: private void closeConnection(final BasicPoolEntry entry) {
michael@0: OperatedClientConnection conn = entry.getConnection();
michael@0: if (conn != null) {
michael@0: try {
michael@0: conn.close();
michael@0: } catch (IOException ex) {
michael@0: log.debug("I/O error closing connection", ex);
michael@0: }
michael@0: }
michael@0: }
michael@0:
michael@0: /**
michael@0: * Get a route-specific pool of available connections.
michael@0: *
michael@0: * @param route the route
michael@0: * @param create whether to create the pool if it doesn't exist
michael@0: *
michael@0: * @return the pool for the argument route,
michael@0: * never null
if create
is true
michael@0: */
michael@0: protected RouteSpecificPool getRoutePool(HttpRoute route,
michael@0: boolean create) {
michael@0: RouteSpecificPool rospl = null;
michael@0: poolLock.lock();
michael@0: try {
michael@0:
michael@0: rospl = routeToPool.get(route);
michael@0: if ((rospl == null) && create) {
michael@0: // no pool for this route yet (or anymore)
michael@0: rospl = newRouteSpecificPool(route);
michael@0: routeToPool.put(route, rospl);
michael@0: }
michael@0:
michael@0: } finally {
michael@0: poolLock.unlock();
michael@0: }
michael@0:
michael@0: return rospl;
michael@0: }
michael@0:
michael@0: public int getConnectionsInPool(HttpRoute route) {
michael@0: poolLock.lock();
michael@0: try {
michael@0: // don't allow a pool to be created here!
michael@0: RouteSpecificPool rospl = getRoutePool(route, false);
michael@0: return (rospl != null) ? rospl.getEntryCount() : 0;
michael@0:
michael@0: } finally {
michael@0: poolLock.unlock();
michael@0: }
michael@0: }
michael@0:
michael@0: public int getConnectionsInPool() {
michael@0: poolLock.lock();
michael@0: try {
michael@0: return numConnections;
michael@0: } finally {
michael@0: poolLock.unlock();
michael@0: }
michael@0: }
michael@0:
michael@0: @Override
michael@0: public PoolEntryRequest requestPoolEntry(
michael@0: final HttpRoute route,
michael@0: final Object state) {
michael@0:
michael@0: final WaitingThreadAborter aborter = new WaitingThreadAborter();
michael@0:
michael@0: return new PoolEntryRequest() {
michael@0:
michael@0: public void abortRequest() {
michael@0: poolLock.lock();
michael@0: try {
michael@0: aborter.abort();
michael@0: } finally {
michael@0: poolLock.unlock();
michael@0: }
michael@0: }
michael@0:
michael@0: public BasicPoolEntry getPoolEntry(
michael@0: long timeout,
michael@0: TimeUnit tunit)
michael@0: throws InterruptedException, ConnectionPoolTimeoutException {
michael@0: return getEntryBlocking(route, state, timeout, tunit, aborter);
michael@0: }
michael@0:
michael@0: };
michael@0: }
michael@0:
michael@0: /**
michael@0: * Obtains a pool entry with a connection within the given timeout.
michael@0: * If a {@link WaitingThread} is used to block, {@link WaitingThreadAborter#setWaitingThread(WaitingThread)}
michael@0: * must be called before blocking, to allow the thread to be interrupted.
michael@0: *
michael@0: * @param route the route for which to get the connection
michael@0: * @param timeout the timeout, 0 or negative for no timeout
michael@0: * @param tunit the unit for the timeout
,
michael@0: * may be null
only if there is no timeout
michael@0: * @param aborter an object which can abort a {@link WaitingThread}.
michael@0: *
michael@0: * @return pool entry holding a connection for the route
michael@0: *
michael@0: * @throws ConnectionPoolTimeoutException
michael@0: * if the timeout expired
michael@0: * @throws InterruptedException
michael@0: * if the calling thread was interrupted
michael@0: */
michael@0: protected BasicPoolEntry getEntryBlocking(
michael@0: HttpRoute route, Object state,
michael@0: long timeout, TimeUnit tunit,
michael@0: WaitingThreadAborter aborter)
michael@0: throws ConnectionPoolTimeoutException, InterruptedException {
michael@0:
michael@0: Date deadline = null;
michael@0: if (timeout > 0) {
michael@0: deadline = new Date
michael@0: (System.currentTimeMillis() + tunit.toMillis(timeout));
michael@0: }
michael@0:
michael@0: BasicPoolEntry entry = null;
michael@0: poolLock.lock();
michael@0: try {
michael@0:
michael@0: RouteSpecificPool rospl = getRoutePool(route, true);
michael@0: WaitingThread waitingThread = null;
michael@0:
michael@0: while (entry == null) {
michael@0:
michael@0: if (shutdown) {
michael@0: throw new IllegalStateException("Connection pool shut down");
michael@0: }
michael@0:
michael@0: if (log.isDebugEnabled()) {
michael@0: log.debug("[" + route + "] total kept alive: " + freeConnections.size() +
michael@0: ", total issued: " + leasedConnections.size() +
michael@0: ", total allocated: " + numConnections + " out of " + maxTotalConnections);
michael@0: }
michael@0:
michael@0: // the cases to check for:
michael@0: // - have a free connection for that route
michael@0: // - allowed to create a free connection for that route
michael@0: // - can delete and replace a free connection for another route
michael@0: // - need to wait for one of the things above to come true
michael@0:
michael@0: entry = getFreeEntry(rospl, state);
michael@0: if (entry != null) {
michael@0: break;
michael@0: }
michael@0:
michael@0: boolean hasCapacity = rospl.getCapacity() > 0;
michael@0:
michael@0: if (log.isDebugEnabled()) {
michael@0: log.debug("Available capacity: " + rospl.getCapacity()
michael@0: + " out of " + rospl.getMaxEntries()
michael@0: + " [" + route + "][" + state + "]");
michael@0: }
michael@0:
michael@0: if (hasCapacity && numConnections < maxTotalConnections) {
michael@0:
michael@0: entry = createEntry(rospl, operator);
michael@0:
michael@0: } else if (hasCapacity && !freeConnections.isEmpty()) {
michael@0:
michael@0: deleteLeastUsedEntry();
michael@0: // if least used entry's route was the same as rospl,
michael@0: // rospl is now out of date : we preemptively refresh
michael@0: rospl = getRoutePool(route, true);
michael@0: entry = createEntry(rospl, operator);
michael@0:
michael@0: } else {
michael@0:
michael@0: if (log.isDebugEnabled()) {
michael@0: log.debug("Need to wait for connection" +
michael@0: " [" + route + "][" + state + "]");
michael@0: }
michael@0:
michael@0: if (waitingThread == null) {
michael@0: waitingThread =
michael@0: newWaitingThread(poolLock.newCondition(), rospl);
michael@0: aborter.setWaitingThread(waitingThread);
michael@0: }
michael@0:
michael@0: boolean success = false;
michael@0: try {
michael@0: rospl.queueThread(waitingThread);
michael@0: waitingThreads.add(waitingThread);
michael@0: success = waitingThread.await(deadline);
michael@0:
michael@0: } finally {
michael@0: // In case of 'success', we were woken up by the
michael@0: // connection pool and should now have a connection
michael@0: // waiting for us, or else we're shutting down.
michael@0: // Just continue in the loop, both cases are checked.
michael@0: rospl.removeThread(waitingThread);
michael@0: waitingThreads.remove(waitingThread);
michael@0: }
michael@0:
michael@0: // check for spurious wakeup vs. timeout
michael@0: if (!success && (deadline != null) &&
michael@0: (deadline.getTime() <= System.currentTimeMillis())) {
michael@0: throw new ConnectionPoolTimeoutException
michael@0: ("Timeout waiting for connection");
michael@0: }
michael@0: }
michael@0: } // while no entry
michael@0:
michael@0: } finally {
michael@0: poolLock.unlock();
michael@0: }
michael@0: return entry;
michael@0: }
michael@0:
michael@0: @Override
michael@0: public void freeEntry(BasicPoolEntry entry, boolean reusable, long validDuration, TimeUnit timeUnit) {
michael@0:
michael@0: HttpRoute route = entry.getPlannedRoute();
michael@0: if (log.isDebugEnabled()) {
michael@0: log.debug("Releasing connection" +
michael@0: " [" + route + "][" + entry.getState() + "]");
michael@0: }
michael@0:
michael@0: poolLock.lock();
michael@0: try {
michael@0: if (shutdown) {
michael@0: // the pool is shut down, release the
michael@0: // connection's resources and get out of here
michael@0: closeConnection(entry);
michael@0: return;
michael@0: }
michael@0:
michael@0: // no longer issued, we keep a hard reference now
michael@0: leasedConnections.remove(entry);
michael@0:
michael@0: RouteSpecificPool rospl = getRoutePool(route, true);
michael@0:
michael@0: if (reusable) {
michael@0: if (log.isDebugEnabled()) {
michael@0: String s;
michael@0: if (validDuration > 0) {
michael@0: s = "for " + validDuration + " " + timeUnit;
michael@0: } else {
michael@0: s = "indefinitely";
michael@0: }
michael@0: log.debug("Pooling connection" +
michael@0: " [" + route + "][" + entry.getState() + "]; keep alive " + s);
michael@0: }
michael@0: rospl.freeEntry(entry);
michael@0: entry.updateExpiry(validDuration, timeUnit);
michael@0: freeConnections.add(entry);
michael@0: } else {
michael@0: rospl.dropEntry();
michael@0: numConnections--;
michael@0: }
michael@0:
michael@0: notifyWaitingThread(rospl);
michael@0:
michael@0: } finally {
michael@0: poolLock.unlock();
michael@0: }
michael@0: }
michael@0:
michael@0: /**
michael@0: * If available, get a free pool entry for a route.
michael@0: *
michael@0: * @param rospl the route-specific pool from which to get an entry
michael@0: *
michael@0: * @return an available pool entry for the given route, or
michael@0: * null
if none is available
michael@0: */
michael@0: protected BasicPoolEntry getFreeEntry(RouteSpecificPool rospl, Object state) {
michael@0:
michael@0: BasicPoolEntry entry = null;
michael@0: poolLock.lock();
michael@0: try {
michael@0: boolean done = false;
michael@0: while(!done) {
michael@0:
michael@0: entry = rospl.allocEntry(state);
michael@0:
michael@0: if (entry != null) {
michael@0: if (log.isDebugEnabled()) {
michael@0: log.debug("Getting free connection"
michael@0: + " [" + rospl.getRoute() + "][" + state + "]");
michael@0:
michael@0: }
michael@0: freeConnections.remove(entry);
michael@0: if (entry.isExpired(System.currentTimeMillis())) {
michael@0: // If the free entry isn't valid anymore, get rid of it
michael@0: // and loop to find another one that might be valid.
michael@0: if (log.isDebugEnabled())
michael@0: log.debug("Closing expired free connection"
michael@0: + " [" + rospl.getRoute() + "][" + state + "]");
michael@0: closeConnection(entry);
michael@0: // We use dropEntry instead of deleteEntry because the entry
michael@0: // is no longer "free" (we just allocated it), and deleteEntry
michael@0: // can only be used to delete free entries.
michael@0: rospl.dropEntry();
michael@0: numConnections--;
michael@0: } else {
michael@0: leasedConnections.add(entry);
michael@0: done = true;
michael@0: }
michael@0:
michael@0: } else {
michael@0: done = true;
michael@0: if (log.isDebugEnabled()) {
michael@0: log.debug("No free connections"
michael@0: + " [" + rospl.getRoute() + "][" + state + "]");
michael@0: }
michael@0: }
michael@0: }
michael@0: } finally {
michael@0: poolLock.unlock();
michael@0: }
michael@0: return entry;
michael@0: }
michael@0:
michael@0:
michael@0: /**
michael@0: * Creates a new pool entry.
michael@0: * This method assumes that the new connection will be handed
michael@0: * out immediately.
michael@0: *
michael@0: * @param rospl the route-specific pool for which to create the entry
michael@0: * @param op the operator for creating a connection
michael@0: *
michael@0: * @return the new pool entry for a new connection
michael@0: */
michael@0: protected BasicPoolEntry createEntry(RouteSpecificPool rospl,
michael@0: ClientConnectionOperator op) {
michael@0:
michael@0: if (log.isDebugEnabled()) {
michael@0: log.debug("Creating new connection [" + rospl.getRoute() + "]");
michael@0: }
michael@0:
michael@0: // the entry will create the connection when needed
michael@0: BasicPoolEntry entry = new BasicPoolEntry(op, rospl.getRoute(), connTTL, connTTLTimeUnit);
michael@0:
michael@0: poolLock.lock();
michael@0: try {
michael@0: rospl.createdEntry(entry);
michael@0: numConnections++;
michael@0: leasedConnections.add(entry);
michael@0: } finally {
michael@0: poolLock.unlock();
michael@0: }
michael@0:
michael@0: return entry;
michael@0: }
michael@0:
michael@0:
michael@0: /**
michael@0: * Deletes a given pool entry.
michael@0: * This closes the pooled connection and removes all references,
michael@0: * so that it can be GCed.
michael@0: *
michael@0: * Note: Does not remove the entry from the freeConnections list. michael@0: * It is assumed that the caller has already handled this step.
michael@0: * michael@0: * michael@0: * @param entry the pool entry for the connection to delete michael@0: */ michael@0: protected void deleteEntry(BasicPoolEntry entry) { michael@0: michael@0: HttpRoute route = entry.getPlannedRoute(); michael@0: michael@0: if (log.isDebugEnabled()) { michael@0: log.debug("Deleting connection" michael@0: + " [" + route + "][" + entry.getState() + "]"); michael@0: } michael@0: michael@0: poolLock.lock(); michael@0: try { michael@0: michael@0: closeConnection(entry); michael@0: michael@0: RouteSpecificPool rospl = getRoutePool(route, true); michael@0: rospl.deleteEntry(entry); michael@0: numConnections--; michael@0: if (rospl.isUnused()) { michael@0: routeToPool.remove(route); michael@0: } michael@0: michael@0: } finally { michael@0: poolLock.unlock(); michael@0: } michael@0: } michael@0: michael@0: michael@0: /** michael@0: * Delete an old, free pool entry to make room for a new one. michael@0: * Used to replace pool entries with ones for a different route. michael@0: */ michael@0: protected void deleteLeastUsedEntry() { michael@0: poolLock.lock(); michael@0: try { michael@0: michael@0: BasicPoolEntry entry = freeConnections.remove(); michael@0: michael@0: if (entry != null) { michael@0: deleteEntry(entry); michael@0: } else if (log.isDebugEnabled()) { michael@0: log.debug("No free connection to delete"); michael@0: } michael@0: michael@0: } finally { michael@0: poolLock.unlock(); michael@0: } michael@0: } michael@0: michael@0: @Override michael@0: protected void handleLostEntry(HttpRoute route) { michael@0: michael@0: poolLock.lock(); michael@0: try { michael@0: michael@0: RouteSpecificPool rospl = getRoutePool(route, true); michael@0: rospl.dropEntry(); michael@0: if (rospl.isUnused()) { michael@0: routeToPool.remove(route); michael@0: } michael@0: michael@0: numConnections--; michael@0: notifyWaitingThread(rospl); michael@0: michael@0: } finally { michael@0: poolLock.unlock(); michael@0: } michael@0: } michael@0: michael@0: /** michael@0: * Notifies a waiting thread that a connection is available. michael@0: * This will wake a thread waiting in the specific route pool, michael@0: * if there is one. michael@0: * Otherwise, a thread in the connection pool will be notified. michael@0: * michael@0: * @param rospl the pool in which to notify, ornull
michael@0: */
michael@0: protected void notifyWaitingThread(RouteSpecificPool rospl) {
michael@0:
michael@0: //@@@ while this strategy provides for best connection re-use,
michael@0: //@@@ is it fair? only do this if the connection is open?
michael@0: // Find the thread we are going to notify. We want to ensure that
michael@0: // each waiting thread is only interrupted once, so we will remove
michael@0: // it from all wait queues before interrupting.
michael@0: WaitingThread waitingThread = null;
michael@0:
michael@0: poolLock.lock();
michael@0: try {
michael@0:
michael@0: if ((rospl != null) && rospl.hasThread()) {
michael@0: if (log.isDebugEnabled()) {
michael@0: log.debug("Notifying thread waiting on pool" +
michael@0: " [" + rospl.getRoute() + "]");
michael@0: }
michael@0: waitingThread = rospl.nextThread();
michael@0: } else if (!waitingThreads.isEmpty()) {
michael@0: if (log.isDebugEnabled()) {
michael@0: log.debug("Notifying thread waiting on any pool");
michael@0: }
michael@0: waitingThread = waitingThreads.remove();
michael@0: } else if (log.isDebugEnabled()) {
michael@0: log.debug("Notifying no-one, there are no waiting threads");
michael@0: }
michael@0:
michael@0: if (waitingThread != null) {
michael@0: waitingThread.wakeup();
michael@0: }
michael@0:
michael@0: } finally {
michael@0: poolLock.unlock();
michael@0: }
michael@0: }
michael@0:
michael@0:
michael@0: @Override
michael@0: public void deleteClosedConnections() {
michael@0: poolLock.lock();
michael@0: try {
michael@0: Iteratoridletime
michael@0: */
michael@0: @Override
michael@0: public void closeIdleConnections(long idletime, TimeUnit tunit) {
michael@0: if (tunit == null) {
michael@0: throw new IllegalArgumentException("Time unit must not be null.");
michael@0: }
michael@0: if (idletime < 0) {
michael@0: idletime = 0;
michael@0: }
michael@0: if (log.isDebugEnabled()) {
michael@0: log.debug("Closing connections idle longer than " + idletime + " " + tunit);
michael@0: }
michael@0: // the latest time for which connections will be closed
michael@0: long deadline = System.currentTimeMillis() - tunit.toMillis(idletime);
michael@0: poolLock.lock();
michael@0: try {
michael@0: Iterator