diff -r 000000000000 -r 6474c204b198 mobile/android/thirdparty/ch/boye/httpclientandroidlib/impl/conn/tsccm/ConnPoolByRoute.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/mobile/android/thirdparty/ch/boye/httpclientandroidlib/impl/conn/tsccm/ConnPoolByRoute.java Wed Dec 31 06:09:35 2014 +0100 @@ -0,0 +1,836 @@ +/* + * ==================================================================== + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package ch.boye.httpclientandroidlib.impl.conn.tsccm; + +import java.io.IOException; +import java.util.Date; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Queue; +import java.util.LinkedList; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.TimeUnit; + +import ch.boye.httpclientandroidlib.androidextra.HttpClientAndroidLog; +/* LogFactory removed by HttpClient for Android script. */ +import ch.boye.httpclientandroidlib.annotation.ThreadSafe; +import ch.boye.httpclientandroidlib.conn.routing.HttpRoute; +import ch.boye.httpclientandroidlib.conn.ClientConnectionOperator; +import ch.boye.httpclientandroidlib.conn.ConnectionPoolTimeoutException; +import ch.boye.httpclientandroidlib.conn.OperatedClientConnection; +import ch.boye.httpclientandroidlib.conn.params.ConnPerRoute; +import ch.boye.httpclientandroidlib.conn.params.ConnManagerParams; +import ch.boye.httpclientandroidlib.params.HttpParams; + +/** + * A connection pool that maintains connections by route. + * This class is derived from MultiThreadedHttpConnectionManager + * in HttpClient 3.x, see there for original authors. It implements the same + * algorithm for connection re-use and connection-per-host enforcement: + * + * Note that access to the pool data structures is synchronized via the + * {@link AbstractConnPool#poolLock poolLock} in the base class, + * not via synchronized methods. + * + * @since 4.0 + */ +@ThreadSafe +@SuppressWarnings("deprecation") +public class ConnPoolByRoute extends AbstractConnPool { //TODO: remove dependency on AbstractConnPool + + public HttpClientAndroidLog log = new HttpClientAndroidLog(getClass()); + + private final Lock poolLock; + + /** Connection operator for this pool */ + protected final ClientConnectionOperator operator; + + /** Connections per route lookup */ + protected final ConnPerRoute connPerRoute; + + /** References to issued connections */ + protected final Set leasedConnections; + + /** The list of free connections */ + protected final Queue freeConnections; + + /** The list of WaitingThreads waiting for a connection */ + protected final Queue waitingThreads; + + /** Map of route-specific pools */ + protected final Map routeToPool; + + private final long connTTL; + + private final TimeUnit connTTLTimeUnit; + + protected volatile boolean shutdown; + + protected volatile int maxTotalConnections; + + protected volatile int numConnections; + + /** + * Creates a new connection pool, managed by route. + * + * @since 4.1 + */ + public ConnPoolByRoute( + final ClientConnectionOperator operator, + final ConnPerRoute connPerRoute, + int maxTotalConnections) { + this(operator, connPerRoute, maxTotalConnections, -1, TimeUnit.MILLISECONDS); + } + + /** + * @since 4.1 + */ + public ConnPoolByRoute( + final ClientConnectionOperator operator, + final ConnPerRoute connPerRoute, + int maxTotalConnections, + long connTTL, + final TimeUnit connTTLTimeUnit) { + super(); + if (operator == null) { + throw new IllegalArgumentException("Connection operator may not be null"); + } + if (connPerRoute == null) { + throw new IllegalArgumentException("Connections per route may not be null"); + } + this.poolLock = super.poolLock; + this.leasedConnections = super.leasedConnections; + this.operator = operator; + this.connPerRoute = connPerRoute; + this.maxTotalConnections = maxTotalConnections; + this.freeConnections = createFreeConnQueue(); + this.waitingThreads = createWaitingThreadQueue(); + this.routeToPool = createRouteToPoolMap(); + this.connTTL = connTTL; + this.connTTLTimeUnit = connTTLTimeUnit; + } + + protected Lock getLock() { + return this.poolLock; + } + + /** + * Creates a new connection pool, managed by route. + * + * @deprecated use {@link ConnPoolByRoute#ConnPoolByRoute(ClientConnectionOperator, ConnPerRoute, int)} + */ + @Deprecated + public ConnPoolByRoute(final ClientConnectionOperator operator, final HttpParams params) { + this(operator, + ConnManagerParams.getMaxConnectionsPerRoute(params), + ConnManagerParams.getMaxTotalConnections(params)); + } + + /** + * Creates the queue for {@link #freeConnections}. + * Called once by the constructor. + * + * @return a queue + */ + protected Queue createFreeConnQueue() { + return new LinkedList(); + } + + /** + * Creates the queue for {@link #waitingThreads}. + * Called once by the constructor. + * + * @return a queue + */ + protected Queue createWaitingThreadQueue() { + return new LinkedList(); + } + + /** + * Creates the map for {@link #routeToPool}. + * Called once by the constructor. + * + * @return a map + */ + protected Map createRouteToPoolMap() { + return new HashMap(); + } + + + /** + * Creates a new route-specific pool. + * Called by {@link #getRoutePool} when necessary. + * + * @param route the route + * + * @return the new pool + */ + protected RouteSpecificPool newRouteSpecificPool(HttpRoute route) { + return new RouteSpecificPool(route, this.connPerRoute); + } + + + /** + * Creates a new waiting thread. + * Called by {@link #getRoutePool} when necessary. + * + * @param cond the condition to wait for + * @param rospl the route specific pool, or null + * + * @return a waiting thread representation + */ + protected WaitingThread newWaitingThread(Condition cond, + RouteSpecificPool rospl) { + return new WaitingThread(cond, rospl); + } + + private void closeConnection(final BasicPoolEntry entry) { + OperatedClientConnection conn = entry.getConnection(); + if (conn != null) { + try { + conn.close(); + } catch (IOException ex) { + log.debug("I/O error closing connection", ex); + } + } + } + + /** + * Get a route-specific pool of available connections. + * + * @param route the route + * @param create whether to create the pool if it doesn't exist + * + * @return the pool for the argument route, + * never null if create is true + */ + protected RouteSpecificPool getRoutePool(HttpRoute route, + boolean create) { + RouteSpecificPool rospl = null; + poolLock.lock(); + try { + + rospl = routeToPool.get(route); + if ((rospl == null) && create) { + // no pool for this route yet (or anymore) + rospl = newRouteSpecificPool(route); + routeToPool.put(route, rospl); + } + + } finally { + poolLock.unlock(); + } + + return rospl; + } + + public int getConnectionsInPool(HttpRoute route) { + poolLock.lock(); + try { + // don't allow a pool to be created here! + RouteSpecificPool rospl = getRoutePool(route, false); + return (rospl != null) ? rospl.getEntryCount() : 0; + + } finally { + poolLock.unlock(); + } + } + + public int getConnectionsInPool() { + poolLock.lock(); + try { + return numConnections; + } finally { + poolLock.unlock(); + } + } + + @Override + public PoolEntryRequest requestPoolEntry( + final HttpRoute route, + final Object state) { + + final WaitingThreadAborter aborter = new WaitingThreadAborter(); + + return new PoolEntryRequest() { + + public void abortRequest() { + poolLock.lock(); + try { + aborter.abort(); + } finally { + poolLock.unlock(); + } + } + + public BasicPoolEntry getPoolEntry( + long timeout, + TimeUnit tunit) + throws InterruptedException, ConnectionPoolTimeoutException { + return getEntryBlocking(route, state, timeout, tunit, aborter); + } + + }; + } + + /** + * Obtains a pool entry with a connection within the given timeout. + * If a {@link WaitingThread} is used to block, {@link WaitingThreadAborter#setWaitingThread(WaitingThread)} + * must be called before blocking, to allow the thread to be interrupted. + * + * @param route the route for which to get the connection + * @param timeout the timeout, 0 or negative for no timeout + * @param tunit the unit for the timeout, + * may be null only if there is no timeout + * @param aborter an object which can abort a {@link WaitingThread}. + * + * @return pool entry holding a connection for the route + * + * @throws ConnectionPoolTimeoutException + * if the timeout expired + * @throws InterruptedException + * if the calling thread was interrupted + */ + protected BasicPoolEntry getEntryBlocking( + HttpRoute route, Object state, + long timeout, TimeUnit tunit, + WaitingThreadAborter aborter) + throws ConnectionPoolTimeoutException, InterruptedException { + + Date deadline = null; + if (timeout > 0) { + deadline = new Date + (System.currentTimeMillis() + tunit.toMillis(timeout)); + } + + BasicPoolEntry entry = null; + poolLock.lock(); + try { + + RouteSpecificPool rospl = getRoutePool(route, true); + WaitingThread waitingThread = null; + + while (entry == null) { + + if (shutdown) { + throw new IllegalStateException("Connection pool shut down"); + } + + if (log.isDebugEnabled()) { + log.debug("[" + route + "] total kept alive: " + freeConnections.size() + + ", total issued: " + leasedConnections.size() + + ", total allocated: " + numConnections + " out of " + maxTotalConnections); + } + + // the cases to check for: + // - have a free connection for that route + // - allowed to create a free connection for that route + // - can delete and replace a free connection for another route + // - need to wait for one of the things above to come true + + entry = getFreeEntry(rospl, state); + if (entry != null) { + break; + } + + boolean hasCapacity = rospl.getCapacity() > 0; + + if (log.isDebugEnabled()) { + log.debug("Available capacity: " + rospl.getCapacity() + + " out of " + rospl.getMaxEntries() + + " [" + route + "][" + state + "]"); + } + + if (hasCapacity && numConnections < maxTotalConnections) { + + entry = createEntry(rospl, operator); + + } else if (hasCapacity && !freeConnections.isEmpty()) { + + deleteLeastUsedEntry(); + // if least used entry's route was the same as rospl, + // rospl is now out of date : we preemptively refresh + rospl = getRoutePool(route, true); + entry = createEntry(rospl, operator); + + } else { + + if (log.isDebugEnabled()) { + log.debug("Need to wait for connection" + + " [" + route + "][" + state + "]"); + } + + if (waitingThread == null) { + waitingThread = + newWaitingThread(poolLock.newCondition(), rospl); + aborter.setWaitingThread(waitingThread); + } + + boolean success = false; + try { + rospl.queueThread(waitingThread); + waitingThreads.add(waitingThread); + success = waitingThread.await(deadline); + + } finally { + // In case of 'success', we were woken up by the + // connection pool and should now have a connection + // waiting for us, or else we're shutting down. + // Just continue in the loop, both cases are checked. + rospl.removeThread(waitingThread); + waitingThreads.remove(waitingThread); + } + + // check for spurious wakeup vs. timeout + if (!success && (deadline != null) && + (deadline.getTime() <= System.currentTimeMillis())) { + throw new ConnectionPoolTimeoutException + ("Timeout waiting for connection"); + } + } + } // while no entry + + } finally { + poolLock.unlock(); + } + return entry; + } + + @Override + public void freeEntry(BasicPoolEntry entry, boolean reusable, long validDuration, TimeUnit timeUnit) { + + HttpRoute route = entry.getPlannedRoute(); + if (log.isDebugEnabled()) { + log.debug("Releasing connection" + + " [" + route + "][" + entry.getState() + "]"); + } + + poolLock.lock(); + try { + if (shutdown) { + // the pool is shut down, release the + // connection's resources and get out of here + closeConnection(entry); + return; + } + + // no longer issued, we keep a hard reference now + leasedConnections.remove(entry); + + RouteSpecificPool rospl = getRoutePool(route, true); + + if (reusable) { + if (log.isDebugEnabled()) { + String s; + if (validDuration > 0) { + s = "for " + validDuration + " " + timeUnit; + } else { + s = "indefinitely"; + } + log.debug("Pooling connection" + + " [" + route + "][" + entry.getState() + "]; keep alive " + s); + } + rospl.freeEntry(entry); + entry.updateExpiry(validDuration, timeUnit); + freeConnections.add(entry); + } else { + rospl.dropEntry(); + numConnections--; + } + + notifyWaitingThread(rospl); + + } finally { + poolLock.unlock(); + } + } + + /** + * If available, get a free pool entry for a route. + * + * @param rospl the route-specific pool from which to get an entry + * + * @return an available pool entry for the given route, or + * null if none is available + */ + protected BasicPoolEntry getFreeEntry(RouteSpecificPool rospl, Object state) { + + BasicPoolEntry entry = null; + poolLock.lock(); + try { + boolean done = false; + while(!done) { + + entry = rospl.allocEntry(state); + + if (entry != null) { + if (log.isDebugEnabled()) { + log.debug("Getting free connection" + + " [" + rospl.getRoute() + "][" + state + "]"); + + } + freeConnections.remove(entry); + if (entry.isExpired(System.currentTimeMillis())) { + // If the free entry isn't valid anymore, get rid of it + // and loop to find another one that might be valid. + if (log.isDebugEnabled()) + log.debug("Closing expired free connection" + + " [" + rospl.getRoute() + "][" + state + "]"); + closeConnection(entry); + // We use dropEntry instead of deleteEntry because the entry + // is no longer "free" (we just allocated it), and deleteEntry + // can only be used to delete free entries. + rospl.dropEntry(); + numConnections--; + } else { + leasedConnections.add(entry); + done = true; + } + + } else { + done = true; + if (log.isDebugEnabled()) { + log.debug("No free connections" + + " [" + rospl.getRoute() + "][" + state + "]"); + } + } + } + } finally { + poolLock.unlock(); + } + return entry; + } + + + /** + * Creates a new pool entry. + * This method assumes that the new connection will be handed + * out immediately. + * + * @param rospl the route-specific pool for which to create the entry + * @param op the operator for creating a connection + * + * @return the new pool entry for a new connection + */ + protected BasicPoolEntry createEntry(RouteSpecificPool rospl, + ClientConnectionOperator op) { + + if (log.isDebugEnabled()) { + log.debug("Creating new connection [" + rospl.getRoute() + "]"); + } + + // the entry will create the connection when needed + BasicPoolEntry entry = new BasicPoolEntry(op, rospl.getRoute(), connTTL, connTTLTimeUnit); + + poolLock.lock(); + try { + rospl.createdEntry(entry); + numConnections++; + leasedConnections.add(entry); + } finally { + poolLock.unlock(); + } + + return entry; + } + + + /** + * Deletes a given pool entry. + * This closes the pooled connection and removes all references, + * so that it can be GCed. + * + *

Note: Does not remove the entry from the freeConnections list. + * It is assumed that the caller has already handled this step.

+ * + * + * @param entry the pool entry for the connection to delete + */ + protected void deleteEntry(BasicPoolEntry entry) { + + HttpRoute route = entry.getPlannedRoute(); + + if (log.isDebugEnabled()) { + log.debug("Deleting connection" + + " [" + route + "][" + entry.getState() + "]"); + } + + poolLock.lock(); + try { + + closeConnection(entry); + + RouteSpecificPool rospl = getRoutePool(route, true); + rospl.deleteEntry(entry); + numConnections--; + if (rospl.isUnused()) { + routeToPool.remove(route); + } + + } finally { + poolLock.unlock(); + } + } + + + /** + * Delete an old, free pool entry to make room for a new one. + * Used to replace pool entries with ones for a different route. + */ + protected void deleteLeastUsedEntry() { + poolLock.lock(); + try { + + BasicPoolEntry entry = freeConnections.remove(); + + if (entry != null) { + deleteEntry(entry); + } else if (log.isDebugEnabled()) { + log.debug("No free connection to delete"); + } + + } finally { + poolLock.unlock(); + } + } + + @Override + protected void handleLostEntry(HttpRoute route) { + + poolLock.lock(); + try { + + RouteSpecificPool rospl = getRoutePool(route, true); + rospl.dropEntry(); + if (rospl.isUnused()) { + routeToPool.remove(route); + } + + numConnections--; + notifyWaitingThread(rospl); + + } finally { + poolLock.unlock(); + } + } + + /** + * Notifies a waiting thread that a connection is available. + * This will wake a thread waiting in the specific route pool, + * if there is one. + * Otherwise, a thread in the connection pool will be notified. + * + * @param rospl the pool in which to notify, or null + */ + protected void notifyWaitingThread(RouteSpecificPool rospl) { + + //@@@ while this strategy provides for best connection re-use, + //@@@ is it fair? only do this if the connection is open? + // Find the thread we are going to notify. We want to ensure that + // each waiting thread is only interrupted once, so we will remove + // it from all wait queues before interrupting. + WaitingThread waitingThread = null; + + poolLock.lock(); + try { + + if ((rospl != null) && rospl.hasThread()) { + if (log.isDebugEnabled()) { + log.debug("Notifying thread waiting on pool" + + " [" + rospl.getRoute() + "]"); + } + waitingThread = rospl.nextThread(); + } else if (!waitingThreads.isEmpty()) { + if (log.isDebugEnabled()) { + log.debug("Notifying thread waiting on any pool"); + } + waitingThread = waitingThreads.remove(); + } else if (log.isDebugEnabled()) { + log.debug("Notifying no-one, there are no waiting threads"); + } + + if (waitingThread != null) { + waitingThread.wakeup(); + } + + } finally { + poolLock.unlock(); + } + } + + + @Override + public void deleteClosedConnections() { + poolLock.lock(); + try { + Iterator iter = freeConnections.iterator(); + while (iter.hasNext()) { + BasicPoolEntry entry = iter.next(); + if (!entry.getConnection().isOpen()) { + iter.remove(); + deleteEntry(entry); + } + } + } finally { + poolLock.unlock(); + } + } + + /** + * Closes idle connections. + * + * @param idletime the time the connections should have been idle + * in order to be closed now + * @param tunit the unit for the idletime + */ + @Override + public void closeIdleConnections(long idletime, TimeUnit tunit) { + if (tunit == null) { + throw new IllegalArgumentException("Time unit must not be null."); + } + if (idletime < 0) { + idletime = 0; + } + if (log.isDebugEnabled()) { + log.debug("Closing connections idle longer than " + idletime + " " + tunit); + } + // the latest time for which connections will be closed + long deadline = System.currentTimeMillis() - tunit.toMillis(idletime); + poolLock.lock(); + try { + Iterator iter = freeConnections.iterator(); + while (iter.hasNext()) { + BasicPoolEntry entry = iter.next(); + if (entry.getUpdated() <= deadline) { + if (log.isDebugEnabled()) { + log.debug("Closing connection last used @ " + new Date(entry.getUpdated())); + } + iter.remove(); + deleteEntry(entry); + } + } + } finally { + poolLock.unlock(); + } + } + + @Override + public void closeExpiredConnections() { + log.debug("Closing expired connections"); + long now = System.currentTimeMillis(); + + poolLock.lock(); + try { + Iterator iter = freeConnections.iterator(); + while (iter.hasNext()) { + BasicPoolEntry entry = iter.next(); + if (entry.isExpired(now)) { + if (log.isDebugEnabled()) { + log.debug("Closing connection expired @ " + new Date(entry.getExpiry())); + } + iter.remove(); + deleteEntry(entry); + } + } + } finally { + poolLock.unlock(); + } + } + + @Override + public void shutdown() { + poolLock.lock(); + try { + if (shutdown) { + return; + } + shutdown = true; + + // close all connections that are issued to an application + Iterator iter1 = leasedConnections.iterator(); + while (iter1.hasNext()) { + BasicPoolEntry entry = iter1.next(); + iter1.remove(); + closeConnection(entry); + } + + // close all free connections + Iterator iter2 = freeConnections.iterator(); + while (iter2.hasNext()) { + BasicPoolEntry entry = iter2.next(); + iter2.remove(); + + if (log.isDebugEnabled()) { + log.debug("Closing connection" + + " [" + entry.getPlannedRoute() + "][" + entry.getState() + "]"); + } + closeConnection(entry); + } + + // wake up all waiting threads + Iterator iwth = waitingThreads.iterator(); + while (iwth.hasNext()) { + WaitingThread waiter = iwth.next(); + iwth.remove(); + waiter.wakeup(); + } + + routeToPool.clear(); + + } finally { + poolLock.unlock(); + } + } + + /** + * since 4.1 + */ + public void setMaxTotalConnections(int max) { + poolLock.lock(); + try { + maxTotalConnections = max; + } finally { + poolLock.unlock(); + } + } + + + /** + * since 4.1 + */ + public int getMaxTotalConnections() { + return maxTotalConnections; + } + +} +