mobile/android/thirdparty/ch/boye/httpclientandroidlib/impl/conn/tsccm/ConnPoolByRoute.java

Wed, 31 Dec 2014 06:09:35 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Wed, 31 Dec 2014 06:09:35 +0100
changeset 0
6474c204b198
permissions
-rw-r--r--

Cloned upstream origin tor-browser at tor-browser-31.3.0esr-4.5-1-build1
revision ID fc1c9ff7c1b2defdbc039f12214767608f46423f for hacking purpose.

michael@0 1 /*
michael@0 2 * ====================================================================
michael@0 3 *
michael@0 4 * Licensed to the Apache Software Foundation (ASF) under one or more
michael@0 5 * contributor license agreements. See the NOTICE file distributed with
michael@0 6 * this work for additional information regarding copyright ownership.
michael@0 7 * The ASF licenses this file to You under the Apache License, Version 2.0
michael@0 8 * (the "License"); you may not use this file except in compliance with
michael@0 9 * the License. You may obtain a copy of the License at
michael@0 10 *
michael@0 11 * http://www.apache.org/licenses/LICENSE-2.0
michael@0 12 *
michael@0 13 * Unless required by applicable law or agreed to in writing, software
michael@0 14 * distributed under the License is distributed on an "AS IS" BASIS,
michael@0 15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
michael@0 16 * See the License for the specific language governing permissions and
michael@0 17 * limitations under the License.
michael@0 18 * ====================================================================
michael@0 19 *
michael@0 20 * This software consists of voluntary contributions made by many
michael@0 21 * individuals on behalf of the Apache Software Foundation. For more
michael@0 22 * information on the Apache Software Foundation, please see
michael@0 23 * <http://www.apache.org/>.
michael@0 24 *
michael@0 25 */
michael@0 26
michael@0 27 package ch.boye.httpclientandroidlib.impl.conn.tsccm;
michael@0 28
michael@0 29 import java.io.IOException;
michael@0 30 import java.util.Date;
michael@0 31 import java.util.HashMap;
michael@0 32 import java.util.Iterator;
michael@0 33 import java.util.Queue;
michael@0 34 import java.util.LinkedList;
michael@0 35 import java.util.Map;
michael@0 36 import java.util.Set;
michael@0 37 import java.util.concurrent.locks.Condition;
michael@0 38 import java.util.concurrent.locks.Lock;
michael@0 39 import java.util.concurrent.TimeUnit;
michael@0 40
michael@0 41 import ch.boye.httpclientandroidlib.androidextra.HttpClientAndroidLog;
michael@0 42 /* LogFactory removed by HttpClient for Android script. */
michael@0 43 import ch.boye.httpclientandroidlib.annotation.ThreadSafe;
michael@0 44 import ch.boye.httpclientandroidlib.conn.routing.HttpRoute;
michael@0 45 import ch.boye.httpclientandroidlib.conn.ClientConnectionOperator;
michael@0 46 import ch.boye.httpclientandroidlib.conn.ConnectionPoolTimeoutException;
michael@0 47 import ch.boye.httpclientandroidlib.conn.OperatedClientConnection;
michael@0 48 import ch.boye.httpclientandroidlib.conn.params.ConnPerRoute;
michael@0 49 import ch.boye.httpclientandroidlib.conn.params.ConnManagerParams;
michael@0 50 import ch.boye.httpclientandroidlib.params.HttpParams;
michael@0 51
michael@0 52 /**
michael@0 53 * A connection pool that maintains connections by route.
michael@0 54 * This class is derived from <code>MultiThreadedHttpConnectionManager</code>
michael@0 55 * in HttpClient 3.x, see there for original authors. It implements the same
michael@0 56 * algorithm for connection re-use and connection-per-host enforcement:
michael@0 57 * <ul>
michael@0 58 * <li>connections are re-used only for the exact same route</li>
michael@0 59 * <li>connection limits are enforced per route rather than per host</li>
michael@0 60 * </ul>
michael@0 61 * Note that access to the pool data structures is synchronized via the
michael@0 62 * {@link AbstractConnPool#poolLock poolLock} in the base class,
michael@0 63 * not via <code>synchronized</code> methods.
michael@0 64 *
michael@0 65 * @since 4.0
michael@0 66 */
michael@0 67 @ThreadSafe
michael@0 68 @SuppressWarnings("deprecation")
michael@0 69 public class ConnPoolByRoute extends AbstractConnPool { //TODO: remove dependency on AbstractConnPool
michael@0 70
michael@0 71 public HttpClientAndroidLog log = new HttpClientAndroidLog(getClass());
michael@0 72
michael@0 73 private final Lock poolLock;
michael@0 74
michael@0 75 /** Connection operator for this pool */
michael@0 76 protected final ClientConnectionOperator operator;
michael@0 77
michael@0 78 /** Connections per route lookup */
michael@0 79 protected final ConnPerRoute connPerRoute;
michael@0 80
michael@0 81 /** References to issued connections */
michael@0 82 protected final Set<BasicPoolEntry> leasedConnections;
michael@0 83
michael@0 84 /** The list of free connections */
michael@0 85 protected final Queue<BasicPoolEntry> freeConnections;
michael@0 86
michael@0 87 /** The list of WaitingThreads waiting for a connection */
michael@0 88 protected final Queue<WaitingThread> waitingThreads;
michael@0 89
michael@0 90 /** Map of route-specific pools */
michael@0 91 protected final Map<HttpRoute, RouteSpecificPool> routeToPool;
michael@0 92
michael@0 93 private final long connTTL;
michael@0 94
michael@0 95 private final TimeUnit connTTLTimeUnit;
michael@0 96
michael@0 97 protected volatile boolean shutdown;
michael@0 98
michael@0 99 protected volatile int maxTotalConnections;
michael@0 100
michael@0 101 protected volatile int numConnections;
michael@0 102
michael@0 103 /**
michael@0 104 * Creates a new connection pool, managed by route.
michael@0 105 *
michael@0 106 * @since 4.1
michael@0 107 */
michael@0 108 public ConnPoolByRoute(
michael@0 109 final ClientConnectionOperator operator,
michael@0 110 final ConnPerRoute connPerRoute,
michael@0 111 int maxTotalConnections) {
michael@0 112 this(operator, connPerRoute, maxTotalConnections, -1, TimeUnit.MILLISECONDS);
michael@0 113 }
michael@0 114
michael@0 115 /**
michael@0 116 * @since 4.1
michael@0 117 */
michael@0 118 public ConnPoolByRoute(
michael@0 119 final ClientConnectionOperator operator,
michael@0 120 final ConnPerRoute connPerRoute,
michael@0 121 int maxTotalConnections,
michael@0 122 long connTTL,
michael@0 123 final TimeUnit connTTLTimeUnit) {
michael@0 124 super();
michael@0 125 if (operator == null) {
michael@0 126 throw new IllegalArgumentException("Connection operator may not be null");
michael@0 127 }
michael@0 128 if (connPerRoute == null) {
michael@0 129 throw new IllegalArgumentException("Connections per route may not be null");
michael@0 130 }
michael@0 131 this.poolLock = super.poolLock;
michael@0 132 this.leasedConnections = super.leasedConnections;
michael@0 133 this.operator = operator;
michael@0 134 this.connPerRoute = connPerRoute;
michael@0 135 this.maxTotalConnections = maxTotalConnections;
michael@0 136 this.freeConnections = createFreeConnQueue();
michael@0 137 this.waitingThreads = createWaitingThreadQueue();
michael@0 138 this.routeToPool = createRouteToPoolMap();
michael@0 139 this.connTTL = connTTL;
michael@0 140 this.connTTLTimeUnit = connTTLTimeUnit;
michael@0 141 }
michael@0 142
michael@0 143 protected Lock getLock() {
michael@0 144 return this.poolLock;
michael@0 145 }
michael@0 146
michael@0 147 /**
michael@0 148 * Creates a new connection pool, managed by route.
michael@0 149 *
michael@0 150 * @deprecated use {@link ConnPoolByRoute#ConnPoolByRoute(ClientConnectionOperator, ConnPerRoute, int)}
michael@0 151 */
michael@0 152 @Deprecated
michael@0 153 public ConnPoolByRoute(final ClientConnectionOperator operator, final HttpParams params) {
michael@0 154 this(operator,
michael@0 155 ConnManagerParams.getMaxConnectionsPerRoute(params),
michael@0 156 ConnManagerParams.getMaxTotalConnections(params));
michael@0 157 }
michael@0 158
michael@0 159 /**
michael@0 160 * Creates the queue for {@link #freeConnections}.
michael@0 161 * Called once by the constructor.
michael@0 162 *
michael@0 163 * @return a queue
michael@0 164 */
michael@0 165 protected Queue<BasicPoolEntry> createFreeConnQueue() {
michael@0 166 return new LinkedList<BasicPoolEntry>();
michael@0 167 }
michael@0 168
michael@0 169 /**
michael@0 170 * Creates the queue for {@link #waitingThreads}.
michael@0 171 * Called once by the constructor.
michael@0 172 *
michael@0 173 * @return a queue
michael@0 174 */
michael@0 175 protected Queue<WaitingThread> createWaitingThreadQueue() {
michael@0 176 return new LinkedList<WaitingThread>();
michael@0 177 }
michael@0 178
michael@0 179 /**
michael@0 180 * Creates the map for {@link #routeToPool}.
michael@0 181 * Called once by the constructor.
michael@0 182 *
michael@0 183 * @return a map
michael@0 184 */
michael@0 185 protected Map<HttpRoute, RouteSpecificPool> createRouteToPoolMap() {
michael@0 186 return new HashMap<HttpRoute, RouteSpecificPool>();
michael@0 187 }
michael@0 188
michael@0 189
michael@0 190 /**
michael@0 191 * Creates a new route-specific pool.
michael@0 192 * Called by {@link #getRoutePool} when necessary.
michael@0 193 *
michael@0 194 * @param route the route
michael@0 195 *
michael@0 196 * @return the new pool
michael@0 197 */
michael@0 198 protected RouteSpecificPool newRouteSpecificPool(HttpRoute route) {
michael@0 199 return new RouteSpecificPool(route, this.connPerRoute);
michael@0 200 }
michael@0 201
michael@0 202
michael@0 203 /**
michael@0 204 * Creates a new waiting thread.
michael@0 205 * Called by {@link #getRoutePool} when necessary.
michael@0 206 *
michael@0 207 * @param cond the condition to wait for
michael@0 208 * @param rospl the route specific pool, or <code>null</code>
michael@0 209 *
michael@0 210 * @return a waiting thread representation
michael@0 211 */
michael@0 212 protected WaitingThread newWaitingThread(Condition cond,
michael@0 213 RouteSpecificPool rospl) {
michael@0 214 return new WaitingThread(cond, rospl);
michael@0 215 }
michael@0 216
michael@0 217 private void closeConnection(final BasicPoolEntry entry) {
michael@0 218 OperatedClientConnection conn = entry.getConnection();
michael@0 219 if (conn != null) {
michael@0 220 try {
michael@0 221 conn.close();
michael@0 222 } catch (IOException ex) {
michael@0 223 log.debug("I/O error closing connection", ex);
michael@0 224 }
michael@0 225 }
michael@0 226 }
michael@0 227
michael@0 228 /**
michael@0 229 * Get a route-specific pool of available connections.
michael@0 230 *
michael@0 231 * @param route the route
michael@0 232 * @param create whether to create the pool if it doesn't exist
michael@0 233 *
michael@0 234 * @return the pool for the argument route,
michael@0 235 * never <code>null</code> if <code>create</code> is <code>true</code>
michael@0 236 */
michael@0 237 protected RouteSpecificPool getRoutePool(HttpRoute route,
michael@0 238 boolean create) {
michael@0 239 RouteSpecificPool rospl = null;
michael@0 240 poolLock.lock();
michael@0 241 try {
michael@0 242
michael@0 243 rospl = routeToPool.get(route);
michael@0 244 if ((rospl == null) && create) {
michael@0 245 // no pool for this route yet (or anymore)
michael@0 246 rospl = newRouteSpecificPool(route);
michael@0 247 routeToPool.put(route, rospl);
michael@0 248 }
michael@0 249
michael@0 250 } finally {
michael@0 251 poolLock.unlock();
michael@0 252 }
michael@0 253
michael@0 254 return rospl;
michael@0 255 }
michael@0 256
michael@0 257 public int getConnectionsInPool(HttpRoute route) {
michael@0 258 poolLock.lock();
michael@0 259 try {
michael@0 260 // don't allow a pool to be created here!
michael@0 261 RouteSpecificPool rospl = getRoutePool(route, false);
michael@0 262 return (rospl != null) ? rospl.getEntryCount() : 0;
michael@0 263
michael@0 264 } finally {
michael@0 265 poolLock.unlock();
michael@0 266 }
michael@0 267 }
michael@0 268
michael@0 269 public int getConnectionsInPool() {
michael@0 270 poolLock.lock();
michael@0 271 try {
michael@0 272 return numConnections;
michael@0 273 } finally {
michael@0 274 poolLock.unlock();
michael@0 275 }
michael@0 276 }
michael@0 277
michael@0 278 @Override
michael@0 279 public PoolEntryRequest requestPoolEntry(
michael@0 280 final HttpRoute route,
michael@0 281 final Object state) {
michael@0 282
michael@0 283 final WaitingThreadAborter aborter = new WaitingThreadAborter();
michael@0 284
michael@0 285 return new PoolEntryRequest() {
michael@0 286
michael@0 287 public void abortRequest() {
michael@0 288 poolLock.lock();
michael@0 289 try {
michael@0 290 aborter.abort();
michael@0 291 } finally {
michael@0 292 poolLock.unlock();
michael@0 293 }
michael@0 294 }
michael@0 295
michael@0 296 public BasicPoolEntry getPoolEntry(
michael@0 297 long timeout,
michael@0 298 TimeUnit tunit)
michael@0 299 throws InterruptedException, ConnectionPoolTimeoutException {
michael@0 300 return getEntryBlocking(route, state, timeout, tunit, aborter);
michael@0 301 }
michael@0 302
michael@0 303 };
michael@0 304 }
michael@0 305
michael@0 306 /**
michael@0 307 * Obtains a pool entry with a connection within the given timeout.
michael@0 308 * If a {@link WaitingThread} is used to block, {@link WaitingThreadAborter#setWaitingThread(WaitingThread)}
michael@0 309 * must be called before blocking, to allow the thread to be interrupted.
michael@0 310 *
michael@0 311 * @param route the route for which to get the connection
michael@0 312 * @param timeout the timeout, 0 or negative for no timeout
michael@0 313 * @param tunit the unit for the <code>timeout</code>,
michael@0 314 * may be <code>null</code> only if there is no timeout
michael@0 315 * @param aborter an object which can abort a {@link WaitingThread}.
michael@0 316 *
michael@0 317 * @return pool entry holding a connection for the route
michael@0 318 *
michael@0 319 * @throws ConnectionPoolTimeoutException
michael@0 320 * if the timeout expired
michael@0 321 * @throws InterruptedException
michael@0 322 * if the calling thread was interrupted
michael@0 323 */
michael@0 324 protected BasicPoolEntry getEntryBlocking(
michael@0 325 HttpRoute route, Object state,
michael@0 326 long timeout, TimeUnit tunit,
michael@0 327 WaitingThreadAborter aborter)
michael@0 328 throws ConnectionPoolTimeoutException, InterruptedException {
michael@0 329
michael@0 330 Date deadline = null;
michael@0 331 if (timeout > 0) {
michael@0 332 deadline = new Date
michael@0 333 (System.currentTimeMillis() + tunit.toMillis(timeout));
michael@0 334 }
michael@0 335
michael@0 336 BasicPoolEntry entry = null;
michael@0 337 poolLock.lock();
michael@0 338 try {
michael@0 339
michael@0 340 RouteSpecificPool rospl = getRoutePool(route, true);
michael@0 341 WaitingThread waitingThread = null;
michael@0 342
michael@0 343 while (entry == null) {
michael@0 344
michael@0 345 if (shutdown) {
michael@0 346 throw new IllegalStateException("Connection pool shut down");
michael@0 347 }
michael@0 348
michael@0 349 if (log.isDebugEnabled()) {
michael@0 350 log.debug("[" + route + "] total kept alive: " + freeConnections.size() +
michael@0 351 ", total issued: " + leasedConnections.size() +
michael@0 352 ", total allocated: " + numConnections + " out of " + maxTotalConnections);
michael@0 353 }
michael@0 354
michael@0 355 // the cases to check for:
michael@0 356 // - have a free connection for that route
michael@0 357 // - allowed to create a free connection for that route
michael@0 358 // - can delete and replace a free connection for another route
michael@0 359 // - need to wait for one of the things above to come true
michael@0 360
michael@0 361 entry = getFreeEntry(rospl, state);
michael@0 362 if (entry != null) {
michael@0 363 break;
michael@0 364 }
michael@0 365
michael@0 366 boolean hasCapacity = rospl.getCapacity() > 0;
michael@0 367
michael@0 368 if (log.isDebugEnabled()) {
michael@0 369 log.debug("Available capacity: " + rospl.getCapacity()
michael@0 370 + " out of " + rospl.getMaxEntries()
michael@0 371 + " [" + route + "][" + state + "]");
michael@0 372 }
michael@0 373
michael@0 374 if (hasCapacity && numConnections < maxTotalConnections) {
michael@0 375
michael@0 376 entry = createEntry(rospl, operator);
michael@0 377
michael@0 378 } else if (hasCapacity && !freeConnections.isEmpty()) {
michael@0 379
michael@0 380 deleteLeastUsedEntry();
michael@0 381 // if least used entry's route was the same as rospl,
michael@0 382 // rospl is now out of date : we preemptively refresh
michael@0 383 rospl = getRoutePool(route, true);
michael@0 384 entry = createEntry(rospl, operator);
michael@0 385
michael@0 386 } else {
michael@0 387
michael@0 388 if (log.isDebugEnabled()) {
michael@0 389 log.debug("Need to wait for connection" +
michael@0 390 " [" + route + "][" + state + "]");
michael@0 391 }
michael@0 392
michael@0 393 if (waitingThread == null) {
michael@0 394 waitingThread =
michael@0 395 newWaitingThread(poolLock.newCondition(), rospl);
michael@0 396 aborter.setWaitingThread(waitingThread);
michael@0 397 }
michael@0 398
michael@0 399 boolean success = false;
michael@0 400 try {
michael@0 401 rospl.queueThread(waitingThread);
michael@0 402 waitingThreads.add(waitingThread);
michael@0 403 success = waitingThread.await(deadline);
michael@0 404
michael@0 405 } finally {
michael@0 406 // In case of 'success', we were woken up by the
michael@0 407 // connection pool and should now have a connection
michael@0 408 // waiting for us, or else we're shutting down.
michael@0 409 // Just continue in the loop, both cases are checked.
michael@0 410 rospl.removeThread(waitingThread);
michael@0 411 waitingThreads.remove(waitingThread);
michael@0 412 }
michael@0 413
michael@0 414 // check for spurious wakeup vs. timeout
michael@0 415 if (!success && (deadline != null) &&
michael@0 416 (deadline.getTime() <= System.currentTimeMillis())) {
michael@0 417 throw new ConnectionPoolTimeoutException
michael@0 418 ("Timeout waiting for connection");
michael@0 419 }
michael@0 420 }
michael@0 421 } // while no entry
michael@0 422
michael@0 423 } finally {
michael@0 424 poolLock.unlock();
michael@0 425 }
michael@0 426 return entry;
michael@0 427 }
michael@0 428
michael@0 429 @Override
michael@0 430 public void freeEntry(BasicPoolEntry entry, boolean reusable, long validDuration, TimeUnit timeUnit) {
michael@0 431
michael@0 432 HttpRoute route = entry.getPlannedRoute();
michael@0 433 if (log.isDebugEnabled()) {
michael@0 434 log.debug("Releasing connection" +
michael@0 435 " [" + route + "][" + entry.getState() + "]");
michael@0 436 }
michael@0 437
michael@0 438 poolLock.lock();
michael@0 439 try {
michael@0 440 if (shutdown) {
michael@0 441 // the pool is shut down, release the
michael@0 442 // connection's resources and get out of here
michael@0 443 closeConnection(entry);
michael@0 444 return;
michael@0 445 }
michael@0 446
michael@0 447 // no longer issued, we keep a hard reference now
michael@0 448 leasedConnections.remove(entry);
michael@0 449
michael@0 450 RouteSpecificPool rospl = getRoutePool(route, true);
michael@0 451
michael@0 452 if (reusable) {
michael@0 453 if (log.isDebugEnabled()) {
michael@0 454 String s;
michael@0 455 if (validDuration > 0) {
michael@0 456 s = "for " + validDuration + " " + timeUnit;
michael@0 457 } else {
michael@0 458 s = "indefinitely";
michael@0 459 }
michael@0 460 log.debug("Pooling connection" +
michael@0 461 " [" + route + "][" + entry.getState() + "]; keep alive " + s);
michael@0 462 }
michael@0 463 rospl.freeEntry(entry);
michael@0 464 entry.updateExpiry(validDuration, timeUnit);
michael@0 465 freeConnections.add(entry);
michael@0 466 } else {
michael@0 467 rospl.dropEntry();
michael@0 468 numConnections--;
michael@0 469 }
michael@0 470
michael@0 471 notifyWaitingThread(rospl);
michael@0 472
michael@0 473 } finally {
michael@0 474 poolLock.unlock();
michael@0 475 }
michael@0 476 }
michael@0 477
michael@0 478 /**
michael@0 479 * If available, get a free pool entry for a route.
michael@0 480 *
michael@0 481 * @param rospl the route-specific pool from which to get an entry
michael@0 482 *
michael@0 483 * @return an available pool entry for the given route, or
michael@0 484 * <code>null</code> if none is available
michael@0 485 */
michael@0 486 protected BasicPoolEntry getFreeEntry(RouteSpecificPool rospl, Object state) {
michael@0 487
michael@0 488 BasicPoolEntry entry = null;
michael@0 489 poolLock.lock();
michael@0 490 try {
michael@0 491 boolean done = false;
michael@0 492 while(!done) {
michael@0 493
michael@0 494 entry = rospl.allocEntry(state);
michael@0 495
michael@0 496 if (entry != null) {
michael@0 497 if (log.isDebugEnabled()) {
michael@0 498 log.debug("Getting free connection"
michael@0 499 + " [" + rospl.getRoute() + "][" + state + "]");
michael@0 500
michael@0 501 }
michael@0 502 freeConnections.remove(entry);
michael@0 503 if (entry.isExpired(System.currentTimeMillis())) {
michael@0 504 // If the free entry isn't valid anymore, get rid of it
michael@0 505 // and loop to find another one that might be valid.
michael@0 506 if (log.isDebugEnabled())
michael@0 507 log.debug("Closing expired free connection"
michael@0 508 + " [" + rospl.getRoute() + "][" + state + "]");
michael@0 509 closeConnection(entry);
michael@0 510 // We use dropEntry instead of deleteEntry because the entry
michael@0 511 // is no longer "free" (we just allocated it), and deleteEntry
michael@0 512 // can only be used to delete free entries.
michael@0 513 rospl.dropEntry();
michael@0 514 numConnections--;
michael@0 515 } else {
michael@0 516 leasedConnections.add(entry);
michael@0 517 done = true;
michael@0 518 }
michael@0 519
michael@0 520 } else {
michael@0 521 done = true;
michael@0 522 if (log.isDebugEnabled()) {
michael@0 523 log.debug("No free connections"
michael@0 524 + " [" + rospl.getRoute() + "][" + state + "]");
michael@0 525 }
michael@0 526 }
michael@0 527 }
michael@0 528 } finally {
michael@0 529 poolLock.unlock();
michael@0 530 }
michael@0 531 return entry;
michael@0 532 }
michael@0 533
michael@0 534
michael@0 535 /**
michael@0 536 * Creates a new pool entry.
michael@0 537 * This method assumes that the new connection will be handed
michael@0 538 * out immediately.
michael@0 539 *
michael@0 540 * @param rospl the route-specific pool for which to create the entry
michael@0 541 * @param op the operator for creating a connection
michael@0 542 *
michael@0 543 * @return the new pool entry for a new connection
michael@0 544 */
michael@0 545 protected BasicPoolEntry createEntry(RouteSpecificPool rospl,
michael@0 546 ClientConnectionOperator op) {
michael@0 547
michael@0 548 if (log.isDebugEnabled()) {
michael@0 549 log.debug("Creating new connection [" + rospl.getRoute() + "]");
michael@0 550 }
michael@0 551
michael@0 552 // the entry will create the connection when needed
michael@0 553 BasicPoolEntry entry = new BasicPoolEntry(op, rospl.getRoute(), connTTL, connTTLTimeUnit);
michael@0 554
michael@0 555 poolLock.lock();
michael@0 556 try {
michael@0 557 rospl.createdEntry(entry);
michael@0 558 numConnections++;
michael@0 559 leasedConnections.add(entry);
michael@0 560 } finally {
michael@0 561 poolLock.unlock();
michael@0 562 }
michael@0 563
michael@0 564 return entry;
michael@0 565 }
michael@0 566
michael@0 567
michael@0 568 /**
michael@0 569 * Deletes a given pool entry.
michael@0 570 * This closes the pooled connection and removes all references,
michael@0 571 * so that it can be GCed.
michael@0 572 *
michael@0 573 * <p><b>Note:</b> Does not remove the entry from the freeConnections list.
michael@0 574 * It is assumed that the caller has already handled this step.</p>
michael@0 575 * <!-- @@@ is that a good idea? or rather fix it? -->
michael@0 576 *
michael@0 577 * @param entry the pool entry for the connection to delete
michael@0 578 */
michael@0 579 protected void deleteEntry(BasicPoolEntry entry) {
michael@0 580
michael@0 581 HttpRoute route = entry.getPlannedRoute();
michael@0 582
michael@0 583 if (log.isDebugEnabled()) {
michael@0 584 log.debug("Deleting connection"
michael@0 585 + " [" + route + "][" + entry.getState() + "]");
michael@0 586 }
michael@0 587
michael@0 588 poolLock.lock();
michael@0 589 try {
michael@0 590
michael@0 591 closeConnection(entry);
michael@0 592
michael@0 593 RouteSpecificPool rospl = getRoutePool(route, true);
michael@0 594 rospl.deleteEntry(entry);
michael@0 595 numConnections--;
michael@0 596 if (rospl.isUnused()) {
michael@0 597 routeToPool.remove(route);
michael@0 598 }
michael@0 599
michael@0 600 } finally {
michael@0 601 poolLock.unlock();
michael@0 602 }
michael@0 603 }
michael@0 604
michael@0 605
michael@0 606 /**
michael@0 607 * Delete an old, free pool entry to make room for a new one.
michael@0 608 * Used to replace pool entries with ones for a different route.
michael@0 609 */
michael@0 610 protected void deleteLeastUsedEntry() {
michael@0 611 poolLock.lock();
michael@0 612 try {
michael@0 613
michael@0 614 BasicPoolEntry entry = freeConnections.remove();
michael@0 615
michael@0 616 if (entry != null) {
michael@0 617 deleteEntry(entry);
michael@0 618 } else if (log.isDebugEnabled()) {
michael@0 619 log.debug("No free connection to delete");
michael@0 620 }
michael@0 621
michael@0 622 } finally {
michael@0 623 poolLock.unlock();
michael@0 624 }
michael@0 625 }
michael@0 626
michael@0 627 @Override
michael@0 628 protected void handleLostEntry(HttpRoute route) {
michael@0 629
michael@0 630 poolLock.lock();
michael@0 631 try {
michael@0 632
michael@0 633 RouteSpecificPool rospl = getRoutePool(route, true);
michael@0 634 rospl.dropEntry();
michael@0 635 if (rospl.isUnused()) {
michael@0 636 routeToPool.remove(route);
michael@0 637 }
michael@0 638
michael@0 639 numConnections--;
michael@0 640 notifyWaitingThread(rospl);
michael@0 641
michael@0 642 } finally {
michael@0 643 poolLock.unlock();
michael@0 644 }
michael@0 645 }
michael@0 646
michael@0 647 /**
michael@0 648 * Notifies a waiting thread that a connection is available.
michael@0 649 * This will wake a thread waiting in the specific route pool,
michael@0 650 * if there is one.
michael@0 651 * Otherwise, a thread in the connection pool will be notified.
michael@0 652 *
michael@0 653 * @param rospl the pool in which to notify, or <code>null</code>
michael@0 654 */
michael@0 655 protected void notifyWaitingThread(RouteSpecificPool rospl) {
michael@0 656
michael@0 657 //@@@ while this strategy provides for best connection re-use,
michael@0 658 //@@@ is it fair? only do this if the connection is open?
michael@0 659 // Find the thread we are going to notify. We want to ensure that
michael@0 660 // each waiting thread is only interrupted once, so we will remove
michael@0 661 // it from all wait queues before interrupting.
michael@0 662 WaitingThread waitingThread = null;
michael@0 663
michael@0 664 poolLock.lock();
michael@0 665 try {
michael@0 666
michael@0 667 if ((rospl != null) && rospl.hasThread()) {
michael@0 668 if (log.isDebugEnabled()) {
michael@0 669 log.debug("Notifying thread waiting on pool" +
michael@0 670 " [" + rospl.getRoute() + "]");
michael@0 671 }
michael@0 672 waitingThread = rospl.nextThread();
michael@0 673 } else if (!waitingThreads.isEmpty()) {
michael@0 674 if (log.isDebugEnabled()) {
michael@0 675 log.debug("Notifying thread waiting on any pool");
michael@0 676 }
michael@0 677 waitingThread = waitingThreads.remove();
michael@0 678 } else if (log.isDebugEnabled()) {
michael@0 679 log.debug("Notifying no-one, there are no waiting threads");
michael@0 680 }
michael@0 681
michael@0 682 if (waitingThread != null) {
michael@0 683 waitingThread.wakeup();
michael@0 684 }
michael@0 685
michael@0 686 } finally {
michael@0 687 poolLock.unlock();
michael@0 688 }
michael@0 689 }
michael@0 690
michael@0 691
michael@0 692 @Override
michael@0 693 public void deleteClosedConnections() {
michael@0 694 poolLock.lock();
michael@0 695 try {
michael@0 696 Iterator<BasicPoolEntry> iter = freeConnections.iterator();
michael@0 697 while (iter.hasNext()) {
michael@0 698 BasicPoolEntry entry = iter.next();
michael@0 699 if (!entry.getConnection().isOpen()) {
michael@0 700 iter.remove();
michael@0 701 deleteEntry(entry);
michael@0 702 }
michael@0 703 }
michael@0 704 } finally {
michael@0 705 poolLock.unlock();
michael@0 706 }
michael@0 707 }
michael@0 708
michael@0 709 /**
michael@0 710 * Closes idle connections.
michael@0 711 *
michael@0 712 * @param idletime the time the connections should have been idle
michael@0 713 * in order to be closed now
michael@0 714 * @param tunit the unit for the <code>idletime</code>
michael@0 715 */
michael@0 716 @Override
michael@0 717 public void closeIdleConnections(long idletime, TimeUnit tunit) {
michael@0 718 if (tunit == null) {
michael@0 719 throw new IllegalArgumentException("Time unit must not be null.");
michael@0 720 }
michael@0 721 if (idletime < 0) {
michael@0 722 idletime = 0;
michael@0 723 }
michael@0 724 if (log.isDebugEnabled()) {
michael@0 725 log.debug("Closing connections idle longer than " + idletime + " " + tunit);
michael@0 726 }
michael@0 727 // the latest time for which connections will be closed
michael@0 728 long deadline = System.currentTimeMillis() - tunit.toMillis(idletime);
michael@0 729 poolLock.lock();
michael@0 730 try {
michael@0 731 Iterator<BasicPoolEntry> iter = freeConnections.iterator();
michael@0 732 while (iter.hasNext()) {
michael@0 733 BasicPoolEntry entry = iter.next();
michael@0 734 if (entry.getUpdated() <= deadline) {
michael@0 735 if (log.isDebugEnabled()) {
michael@0 736 log.debug("Closing connection last used @ " + new Date(entry.getUpdated()));
michael@0 737 }
michael@0 738 iter.remove();
michael@0 739 deleteEntry(entry);
michael@0 740 }
michael@0 741 }
michael@0 742 } finally {
michael@0 743 poolLock.unlock();
michael@0 744 }
michael@0 745 }
michael@0 746
michael@0 747 @Override
michael@0 748 public void closeExpiredConnections() {
michael@0 749 log.debug("Closing expired connections");
michael@0 750 long now = System.currentTimeMillis();
michael@0 751
michael@0 752 poolLock.lock();
michael@0 753 try {
michael@0 754 Iterator<BasicPoolEntry> iter = freeConnections.iterator();
michael@0 755 while (iter.hasNext()) {
michael@0 756 BasicPoolEntry entry = iter.next();
michael@0 757 if (entry.isExpired(now)) {
michael@0 758 if (log.isDebugEnabled()) {
michael@0 759 log.debug("Closing connection expired @ " + new Date(entry.getExpiry()));
michael@0 760 }
michael@0 761 iter.remove();
michael@0 762 deleteEntry(entry);
michael@0 763 }
michael@0 764 }
michael@0 765 } finally {
michael@0 766 poolLock.unlock();
michael@0 767 }
michael@0 768 }
michael@0 769
michael@0 770 @Override
michael@0 771 public void shutdown() {
michael@0 772 poolLock.lock();
michael@0 773 try {
michael@0 774 if (shutdown) {
michael@0 775 return;
michael@0 776 }
michael@0 777 shutdown = true;
michael@0 778
michael@0 779 // close all connections that are issued to an application
michael@0 780 Iterator<BasicPoolEntry> iter1 = leasedConnections.iterator();
michael@0 781 while (iter1.hasNext()) {
michael@0 782 BasicPoolEntry entry = iter1.next();
michael@0 783 iter1.remove();
michael@0 784 closeConnection(entry);
michael@0 785 }
michael@0 786
michael@0 787 // close all free connections
michael@0 788 Iterator<BasicPoolEntry> iter2 = freeConnections.iterator();
michael@0 789 while (iter2.hasNext()) {
michael@0 790 BasicPoolEntry entry = iter2.next();
michael@0 791 iter2.remove();
michael@0 792
michael@0 793 if (log.isDebugEnabled()) {
michael@0 794 log.debug("Closing connection"
michael@0 795 + " [" + entry.getPlannedRoute() + "][" + entry.getState() + "]");
michael@0 796 }
michael@0 797 closeConnection(entry);
michael@0 798 }
michael@0 799
michael@0 800 // wake up all waiting threads
michael@0 801 Iterator<WaitingThread> iwth = waitingThreads.iterator();
michael@0 802 while (iwth.hasNext()) {
michael@0 803 WaitingThread waiter = iwth.next();
michael@0 804 iwth.remove();
michael@0 805 waiter.wakeup();
michael@0 806 }
michael@0 807
michael@0 808 routeToPool.clear();
michael@0 809
michael@0 810 } finally {
michael@0 811 poolLock.unlock();
michael@0 812 }
michael@0 813 }
michael@0 814
michael@0 815 /**
michael@0 816 * since 4.1
michael@0 817 */
michael@0 818 public void setMaxTotalConnections(int max) {
michael@0 819 poolLock.lock();
michael@0 820 try {
michael@0 821 maxTotalConnections = max;
michael@0 822 } finally {
michael@0 823 poolLock.unlock();
michael@0 824 }
michael@0 825 }
michael@0 826
michael@0 827
michael@0 828 /**
michael@0 829 * since 4.1
michael@0 830 */
michael@0 831 public int getMaxTotalConnections() {
michael@0 832 return maxTotalConnections;
michael@0 833 }
michael@0 834
michael@0 835 }
michael@0 836

mercurial