|
1 /* |
|
2 * ==================================================================== |
|
3 * |
|
4 * Licensed to the Apache Software Foundation (ASF) under one or more |
|
5 * contributor license agreements. See the NOTICE file distributed with |
|
6 * this work for additional information regarding copyright ownership. |
|
7 * The ASF licenses this file to You under the Apache License, Version 2.0 |
|
8 * (the "License"); you may not use this file except in compliance with |
|
9 * the License. You may obtain a copy of the License at |
|
10 * |
|
11 * http://www.apache.org/licenses/LICENSE-2.0 |
|
12 * |
|
13 * Unless required by applicable law or agreed to in writing, software |
|
14 * distributed under the License is distributed on an "AS IS" BASIS, |
|
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
16 * See the License for the specific language governing permissions and |
|
17 * limitations under the License. |
|
18 * ==================================================================== |
|
19 * |
|
20 * This software consists of voluntary contributions made by many |
|
21 * individuals on behalf of the Apache Software Foundation. For more |
|
22 * information on the Apache Software Foundation, please see |
|
23 * <http://www.apache.org/>. |
|
24 * |
|
25 */ |
|
26 |
|
27 package ch.boye.httpclientandroidlib.impl.conn.tsccm; |
|
28 |
|
29 import java.io.IOException; |
|
30 import java.util.Date; |
|
31 import java.util.HashMap; |
|
32 import java.util.Iterator; |
|
33 import java.util.Queue; |
|
34 import java.util.LinkedList; |
|
35 import java.util.Map; |
|
36 import java.util.Set; |
|
37 import java.util.concurrent.locks.Condition; |
|
38 import java.util.concurrent.locks.Lock; |
|
39 import java.util.concurrent.TimeUnit; |
|
40 |
|
41 import ch.boye.httpclientandroidlib.androidextra.HttpClientAndroidLog; |
|
42 /* LogFactory removed by HttpClient for Android script. */ |
|
43 import ch.boye.httpclientandroidlib.annotation.ThreadSafe; |
|
44 import ch.boye.httpclientandroidlib.conn.routing.HttpRoute; |
|
45 import ch.boye.httpclientandroidlib.conn.ClientConnectionOperator; |
|
46 import ch.boye.httpclientandroidlib.conn.ConnectionPoolTimeoutException; |
|
47 import ch.boye.httpclientandroidlib.conn.OperatedClientConnection; |
|
48 import ch.boye.httpclientandroidlib.conn.params.ConnPerRoute; |
|
49 import ch.boye.httpclientandroidlib.conn.params.ConnManagerParams; |
|
50 import ch.boye.httpclientandroidlib.params.HttpParams; |
|
51 |
|
52 /** |
|
53 * A connection pool that maintains connections by route. |
|
54 * This class is derived from <code>MultiThreadedHttpConnectionManager</code> |
|
55 * in HttpClient 3.x, see there for original authors. It implements the same |
|
56 * algorithm for connection re-use and connection-per-host enforcement: |
|
57 * <ul> |
|
58 * <li>connections are re-used only for the exact same route</li> |
|
59 * <li>connection limits are enforced per route rather than per host</li> |
|
60 * </ul> |
|
61 * Note that access to the pool data structures is synchronized via the |
|
62 * {@link AbstractConnPool#poolLock poolLock} in the base class, |
|
63 * not via <code>synchronized</code> methods. |
|
64 * |
|
65 * @since 4.0 |
|
66 */ |
|
67 @ThreadSafe |
|
68 @SuppressWarnings("deprecation") |
|
69 public class ConnPoolByRoute extends AbstractConnPool { //TODO: remove dependency on AbstractConnPool |
|
70 |
|
71 public HttpClientAndroidLog log = new HttpClientAndroidLog(getClass()); |
|
72 |
|
73 private final Lock poolLock; |
|
74 |
|
75 /** Connection operator for this pool */ |
|
76 protected final ClientConnectionOperator operator; |
|
77 |
|
78 /** Connections per route lookup */ |
|
79 protected final ConnPerRoute connPerRoute; |
|
80 |
|
81 /** References to issued connections */ |
|
82 protected final Set<BasicPoolEntry> leasedConnections; |
|
83 |
|
84 /** The list of free connections */ |
|
85 protected final Queue<BasicPoolEntry> freeConnections; |
|
86 |
|
87 /** The list of WaitingThreads waiting for a connection */ |
|
88 protected final Queue<WaitingThread> waitingThreads; |
|
89 |
|
90 /** Map of route-specific pools */ |
|
91 protected final Map<HttpRoute, RouteSpecificPool> routeToPool; |
|
92 |
|
93 private final long connTTL; |
|
94 |
|
95 private final TimeUnit connTTLTimeUnit; |
|
96 |
|
97 protected volatile boolean shutdown; |
|
98 |
|
99 protected volatile int maxTotalConnections; |
|
100 |
|
101 protected volatile int numConnections; |
|
102 |
|
103 /** |
|
104 * Creates a new connection pool, managed by route. |
|
105 * |
|
106 * @since 4.1 |
|
107 */ |
|
108 public ConnPoolByRoute( |
|
109 final ClientConnectionOperator operator, |
|
110 final ConnPerRoute connPerRoute, |
|
111 int maxTotalConnections) { |
|
112 this(operator, connPerRoute, maxTotalConnections, -1, TimeUnit.MILLISECONDS); |
|
113 } |
|
114 |
|
115 /** |
|
116 * @since 4.1 |
|
117 */ |
|
118 public ConnPoolByRoute( |
|
119 final ClientConnectionOperator operator, |
|
120 final ConnPerRoute connPerRoute, |
|
121 int maxTotalConnections, |
|
122 long connTTL, |
|
123 final TimeUnit connTTLTimeUnit) { |
|
124 super(); |
|
125 if (operator == null) { |
|
126 throw new IllegalArgumentException("Connection operator may not be null"); |
|
127 } |
|
128 if (connPerRoute == null) { |
|
129 throw new IllegalArgumentException("Connections per route may not be null"); |
|
130 } |
|
131 this.poolLock = super.poolLock; |
|
132 this.leasedConnections = super.leasedConnections; |
|
133 this.operator = operator; |
|
134 this.connPerRoute = connPerRoute; |
|
135 this.maxTotalConnections = maxTotalConnections; |
|
136 this.freeConnections = createFreeConnQueue(); |
|
137 this.waitingThreads = createWaitingThreadQueue(); |
|
138 this.routeToPool = createRouteToPoolMap(); |
|
139 this.connTTL = connTTL; |
|
140 this.connTTLTimeUnit = connTTLTimeUnit; |
|
141 } |
|
142 |
|
143 protected Lock getLock() { |
|
144 return this.poolLock; |
|
145 } |
|
146 |
|
147 /** |
|
148 * Creates a new connection pool, managed by route. |
|
149 * |
|
150 * @deprecated use {@link ConnPoolByRoute#ConnPoolByRoute(ClientConnectionOperator, ConnPerRoute, int)} |
|
151 */ |
|
152 @Deprecated |
|
153 public ConnPoolByRoute(final ClientConnectionOperator operator, final HttpParams params) { |
|
154 this(operator, |
|
155 ConnManagerParams.getMaxConnectionsPerRoute(params), |
|
156 ConnManagerParams.getMaxTotalConnections(params)); |
|
157 } |
|
158 |
|
159 /** |
|
160 * Creates the queue for {@link #freeConnections}. |
|
161 * Called once by the constructor. |
|
162 * |
|
163 * @return a queue |
|
164 */ |
|
165 protected Queue<BasicPoolEntry> createFreeConnQueue() { |
|
166 return new LinkedList<BasicPoolEntry>(); |
|
167 } |
|
168 |
|
169 /** |
|
170 * Creates the queue for {@link #waitingThreads}. |
|
171 * Called once by the constructor. |
|
172 * |
|
173 * @return a queue |
|
174 */ |
|
175 protected Queue<WaitingThread> createWaitingThreadQueue() { |
|
176 return new LinkedList<WaitingThread>(); |
|
177 } |
|
178 |
|
179 /** |
|
180 * Creates the map for {@link #routeToPool}. |
|
181 * Called once by the constructor. |
|
182 * |
|
183 * @return a map |
|
184 */ |
|
185 protected Map<HttpRoute, RouteSpecificPool> createRouteToPoolMap() { |
|
186 return new HashMap<HttpRoute, RouteSpecificPool>(); |
|
187 } |
|
188 |
|
189 |
|
190 /** |
|
191 * Creates a new route-specific pool. |
|
192 * Called by {@link #getRoutePool} when necessary. |
|
193 * |
|
194 * @param route the route |
|
195 * |
|
196 * @return the new pool |
|
197 */ |
|
198 protected RouteSpecificPool newRouteSpecificPool(HttpRoute route) { |
|
199 return new RouteSpecificPool(route, this.connPerRoute); |
|
200 } |
|
201 |
|
202 |
|
203 /** |
|
204 * Creates a new waiting thread. |
|
205 * Called by {@link #getRoutePool} when necessary. |
|
206 * |
|
207 * @param cond the condition to wait for |
|
208 * @param rospl the route specific pool, or <code>null</code> |
|
209 * |
|
210 * @return a waiting thread representation |
|
211 */ |
|
212 protected WaitingThread newWaitingThread(Condition cond, |
|
213 RouteSpecificPool rospl) { |
|
214 return new WaitingThread(cond, rospl); |
|
215 } |
|
216 |
|
217 private void closeConnection(final BasicPoolEntry entry) { |
|
218 OperatedClientConnection conn = entry.getConnection(); |
|
219 if (conn != null) { |
|
220 try { |
|
221 conn.close(); |
|
222 } catch (IOException ex) { |
|
223 log.debug("I/O error closing connection", ex); |
|
224 } |
|
225 } |
|
226 } |
|
227 |
|
228 /** |
|
229 * Get a route-specific pool of available connections. |
|
230 * |
|
231 * @param route the route |
|
232 * @param create whether to create the pool if it doesn't exist |
|
233 * |
|
234 * @return the pool for the argument route, |
|
235 * never <code>null</code> if <code>create</code> is <code>true</code> |
|
236 */ |
|
237 protected RouteSpecificPool getRoutePool(HttpRoute route, |
|
238 boolean create) { |
|
239 RouteSpecificPool rospl = null; |
|
240 poolLock.lock(); |
|
241 try { |
|
242 |
|
243 rospl = routeToPool.get(route); |
|
244 if ((rospl == null) && create) { |
|
245 // no pool for this route yet (or anymore) |
|
246 rospl = newRouteSpecificPool(route); |
|
247 routeToPool.put(route, rospl); |
|
248 } |
|
249 |
|
250 } finally { |
|
251 poolLock.unlock(); |
|
252 } |
|
253 |
|
254 return rospl; |
|
255 } |
|
256 |
|
257 public int getConnectionsInPool(HttpRoute route) { |
|
258 poolLock.lock(); |
|
259 try { |
|
260 // don't allow a pool to be created here! |
|
261 RouteSpecificPool rospl = getRoutePool(route, false); |
|
262 return (rospl != null) ? rospl.getEntryCount() : 0; |
|
263 |
|
264 } finally { |
|
265 poolLock.unlock(); |
|
266 } |
|
267 } |
|
268 |
|
269 public int getConnectionsInPool() { |
|
270 poolLock.lock(); |
|
271 try { |
|
272 return numConnections; |
|
273 } finally { |
|
274 poolLock.unlock(); |
|
275 } |
|
276 } |
|
277 |
|
278 @Override |
|
279 public PoolEntryRequest requestPoolEntry( |
|
280 final HttpRoute route, |
|
281 final Object state) { |
|
282 |
|
283 final WaitingThreadAborter aborter = new WaitingThreadAborter(); |
|
284 |
|
285 return new PoolEntryRequest() { |
|
286 |
|
287 public void abortRequest() { |
|
288 poolLock.lock(); |
|
289 try { |
|
290 aborter.abort(); |
|
291 } finally { |
|
292 poolLock.unlock(); |
|
293 } |
|
294 } |
|
295 |
|
296 public BasicPoolEntry getPoolEntry( |
|
297 long timeout, |
|
298 TimeUnit tunit) |
|
299 throws InterruptedException, ConnectionPoolTimeoutException { |
|
300 return getEntryBlocking(route, state, timeout, tunit, aborter); |
|
301 } |
|
302 |
|
303 }; |
|
304 } |
|
305 |
|
306 /** |
|
307 * Obtains a pool entry with a connection within the given timeout. |
|
308 * If a {@link WaitingThread} is used to block, {@link WaitingThreadAborter#setWaitingThread(WaitingThread)} |
|
309 * must be called before blocking, to allow the thread to be interrupted. |
|
310 * |
|
311 * @param route the route for which to get the connection |
|
312 * @param timeout the timeout, 0 or negative for no timeout |
|
313 * @param tunit the unit for the <code>timeout</code>, |
|
314 * may be <code>null</code> only if there is no timeout |
|
315 * @param aborter an object which can abort a {@link WaitingThread}. |
|
316 * |
|
317 * @return pool entry holding a connection for the route |
|
318 * |
|
319 * @throws ConnectionPoolTimeoutException |
|
320 * if the timeout expired |
|
321 * @throws InterruptedException |
|
322 * if the calling thread was interrupted |
|
323 */ |
|
324 protected BasicPoolEntry getEntryBlocking( |
|
325 HttpRoute route, Object state, |
|
326 long timeout, TimeUnit tunit, |
|
327 WaitingThreadAborter aborter) |
|
328 throws ConnectionPoolTimeoutException, InterruptedException { |
|
329 |
|
330 Date deadline = null; |
|
331 if (timeout > 0) { |
|
332 deadline = new Date |
|
333 (System.currentTimeMillis() + tunit.toMillis(timeout)); |
|
334 } |
|
335 |
|
336 BasicPoolEntry entry = null; |
|
337 poolLock.lock(); |
|
338 try { |
|
339 |
|
340 RouteSpecificPool rospl = getRoutePool(route, true); |
|
341 WaitingThread waitingThread = null; |
|
342 |
|
343 while (entry == null) { |
|
344 |
|
345 if (shutdown) { |
|
346 throw new IllegalStateException("Connection pool shut down"); |
|
347 } |
|
348 |
|
349 if (log.isDebugEnabled()) { |
|
350 log.debug("[" + route + "] total kept alive: " + freeConnections.size() + |
|
351 ", total issued: " + leasedConnections.size() + |
|
352 ", total allocated: " + numConnections + " out of " + maxTotalConnections); |
|
353 } |
|
354 |
|
355 // the cases to check for: |
|
356 // - have a free connection for that route |
|
357 // - allowed to create a free connection for that route |
|
358 // - can delete and replace a free connection for another route |
|
359 // - need to wait for one of the things above to come true |
|
360 |
|
361 entry = getFreeEntry(rospl, state); |
|
362 if (entry != null) { |
|
363 break; |
|
364 } |
|
365 |
|
366 boolean hasCapacity = rospl.getCapacity() > 0; |
|
367 |
|
368 if (log.isDebugEnabled()) { |
|
369 log.debug("Available capacity: " + rospl.getCapacity() |
|
370 + " out of " + rospl.getMaxEntries() |
|
371 + " [" + route + "][" + state + "]"); |
|
372 } |
|
373 |
|
374 if (hasCapacity && numConnections < maxTotalConnections) { |
|
375 |
|
376 entry = createEntry(rospl, operator); |
|
377 |
|
378 } else if (hasCapacity && !freeConnections.isEmpty()) { |
|
379 |
|
380 deleteLeastUsedEntry(); |
|
381 // if least used entry's route was the same as rospl, |
|
382 // rospl is now out of date : we preemptively refresh |
|
383 rospl = getRoutePool(route, true); |
|
384 entry = createEntry(rospl, operator); |
|
385 |
|
386 } else { |
|
387 |
|
388 if (log.isDebugEnabled()) { |
|
389 log.debug("Need to wait for connection" + |
|
390 " [" + route + "][" + state + "]"); |
|
391 } |
|
392 |
|
393 if (waitingThread == null) { |
|
394 waitingThread = |
|
395 newWaitingThread(poolLock.newCondition(), rospl); |
|
396 aborter.setWaitingThread(waitingThread); |
|
397 } |
|
398 |
|
399 boolean success = false; |
|
400 try { |
|
401 rospl.queueThread(waitingThread); |
|
402 waitingThreads.add(waitingThread); |
|
403 success = waitingThread.await(deadline); |
|
404 |
|
405 } finally { |
|
406 // In case of 'success', we were woken up by the |
|
407 // connection pool and should now have a connection |
|
408 // waiting for us, or else we're shutting down. |
|
409 // Just continue in the loop, both cases are checked. |
|
410 rospl.removeThread(waitingThread); |
|
411 waitingThreads.remove(waitingThread); |
|
412 } |
|
413 |
|
414 // check for spurious wakeup vs. timeout |
|
415 if (!success && (deadline != null) && |
|
416 (deadline.getTime() <= System.currentTimeMillis())) { |
|
417 throw new ConnectionPoolTimeoutException |
|
418 ("Timeout waiting for connection"); |
|
419 } |
|
420 } |
|
421 } // while no entry |
|
422 |
|
423 } finally { |
|
424 poolLock.unlock(); |
|
425 } |
|
426 return entry; |
|
427 } |
|
428 |
|
429 @Override |
|
430 public void freeEntry(BasicPoolEntry entry, boolean reusable, long validDuration, TimeUnit timeUnit) { |
|
431 |
|
432 HttpRoute route = entry.getPlannedRoute(); |
|
433 if (log.isDebugEnabled()) { |
|
434 log.debug("Releasing connection" + |
|
435 " [" + route + "][" + entry.getState() + "]"); |
|
436 } |
|
437 |
|
438 poolLock.lock(); |
|
439 try { |
|
440 if (shutdown) { |
|
441 // the pool is shut down, release the |
|
442 // connection's resources and get out of here |
|
443 closeConnection(entry); |
|
444 return; |
|
445 } |
|
446 |
|
447 // no longer issued, we keep a hard reference now |
|
448 leasedConnections.remove(entry); |
|
449 |
|
450 RouteSpecificPool rospl = getRoutePool(route, true); |
|
451 |
|
452 if (reusable) { |
|
453 if (log.isDebugEnabled()) { |
|
454 String s; |
|
455 if (validDuration > 0) { |
|
456 s = "for " + validDuration + " " + timeUnit; |
|
457 } else { |
|
458 s = "indefinitely"; |
|
459 } |
|
460 log.debug("Pooling connection" + |
|
461 " [" + route + "][" + entry.getState() + "]; keep alive " + s); |
|
462 } |
|
463 rospl.freeEntry(entry); |
|
464 entry.updateExpiry(validDuration, timeUnit); |
|
465 freeConnections.add(entry); |
|
466 } else { |
|
467 rospl.dropEntry(); |
|
468 numConnections--; |
|
469 } |
|
470 |
|
471 notifyWaitingThread(rospl); |
|
472 |
|
473 } finally { |
|
474 poolLock.unlock(); |
|
475 } |
|
476 } |
|
477 |
|
478 /** |
|
479 * If available, get a free pool entry for a route. |
|
480 * |
|
481 * @param rospl the route-specific pool from which to get an entry |
|
482 * |
|
483 * @return an available pool entry for the given route, or |
|
484 * <code>null</code> if none is available |
|
485 */ |
|
486 protected BasicPoolEntry getFreeEntry(RouteSpecificPool rospl, Object state) { |
|
487 |
|
488 BasicPoolEntry entry = null; |
|
489 poolLock.lock(); |
|
490 try { |
|
491 boolean done = false; |
|
492 while(!done) { |
|
493 |
|
494 entry = rospl.allocEntry(state); |
|
495 |
|
496 if (entry != null) { |
|
497 if (log.isDebugEnabled()) { |
|
498 log.debug("Getting free connection" |
|
499 + " [" + rospl.getRoute() + "][" + state + "]"); |
|
500 |
|
501 } |
|
502 freeConnections.remove(entry); |
|
503 if (entry.isExpired(System.currentTimeMillis())) { |
|
504 // If the free entry isn't valid anymore, get rid of it |
|
505 // and loop to find another one that might be valid. |
|
506 if (log.isDebugEnabled()) |
|
507 log.debug("Closing expired free connection" |
|
508 + " [" + rospl.getRoute() + "][" + state + "]"); |
|
509 closeConnection(entry); |
|
510 // We use dropEntry instead of deleteEntry because the entry |
|
511 // is no longer "free" (we just allocated it), and deleteEntry |
|
512 // can only be used to delete free entries. |
|
513 rospl.dropEntry(); |
|
514 numConnections--; |
|
515 } else { |
|
516 leasedConnections.add(entry); |
|
517 done = true; |
|
518 } |
|
519 |
|
520 } else { |
|
521 done = true; |
|
522 if (log.isDebugEnabled()) { |
|
523 log.debug("No free connections" |
|
524 + " [" + rospl.getRoute() + "][" + state + "]"); |
|
525 } |
|
526 } |
|
527 } |
|
528 } finally { |
|
529 poolLock.unlock(); |
|
530 } |
|
531 return entry; |
|
532 } |
|
533 |
|
534 |
|
535 /** |
|
536 * Creates a new pool entry. |
|
537 * This method assumes that the new connection will be handed |
|
538 * out immediately. |
|
539 * |
|
540 * @param rospl the route-specific pool for which to create the entry |
|
541 * @param op the operator for creating a connection |
|
542 * |
|
543 * @return the new pool entry for a new connection |
|
544 */ |
|
545 protected BasicPoolEntry createEntry(RouteSpecificPool rospl, |
|
546 ClientConnectionOperator op) { |
|
547 |
|
548 if (log.isDebugEnabled()) { |
|
549 log.debug("Creating new connection [" + rospl.getRoute() + "]"); |
|
550 } |
|
551 |
|
552 // the entry will create the connection when needed |
|
553 BasicPoolEntry entry = new BasicPoolEntry(op, rospl.getRoute(), connTTL, connTTLTimeUnit); |
|
554 |
|
555 poolLock.lock(); |
|
556 try { |
|
557 rospl.createdEntry(entry); |
|
558 numConnections++; |
|
559 leasedConnections.add(entry); |
|
560 } finally { |
|
561 poolLock.unlock(); |
|
562 } |
|
563 |
|
564 return entry; |
|
565 } |
|
566 |
|
567 |
|
568 /** |
|
569 * Deletes a given pool entry. |
|
570 * This closes the pooled connection and removes all references, |
|
571 * so that it can be GCed. |
|
572 * |
|
573 * <p><b>Note:</b> Does not remove the entry from the freeConnections list. |
|
574 * It is assumed that the caller has already handled this step.</p> |
|
575 * <!-- @@@ is that a good idea? or rather fix it? --> |
|
576 * |
|
577 * @param entry the pool entry for the connection to delete |
|
578 */ |
|
579 protected void deleteEntry(BasicPoolEntry entry) { |
|
580 |
|
581 HttpRoute route = entry.getPlannedRoute(); |
|
582 |
|
583 if (log.isDebugEnabled()) { |
|
584 log.debug("Deleting connection" |
|
585 + " [" + route + "][" + entry.getState() + "]"); |
|
586 } |
|
587 |
|
588 poolLock.lock(); |
|
589 try { |
|
590 |
|
591 closeConnection(entry); |
|
592 |
|
593 RouteSpecificPool rospl = getRoutePool(route, true); |
|
594 rospl.deleteEntry(entry); |
|
595 numConnections--; |
|
596 if (rospl.isUnused()) { |
|
597 routeToPool.remove(route); |
|
598 } |
|
599 |
|
600 } finally { |
|
601 poolLock.unlock(); |
|
602 } |
|
603 } |
|
604 |
|
605 |
|
606 /** |
|
607 * Delete an old, free pool entry to make room for a new one. |
|
608 * Used to replace pool entries with ones for a different route. |
|
609 */ |
|
610 protected void deleteLeastUsedEntry() { |
|
611 poolLock.lock(); |
|
612 try { |
|
613 |
|
614 BasicPoolEntry entry = freeConnections.remove(); |
|
615 |
|
616 if (entry != null) { |
|
617 deleteEntry(entry); |
|
618 } else if (log.isDebugEnabled()) { |
|
619 log.debug("No free connection to delete"); |
|
620 } |
|
621 |
|
622 } finally { |
|
623 poolLock.unlock(); |
|
624 } |
|
625 } |
|
626 |
|
627 @Override |
|
628 protected void handleLostEntry(HttpRoute route) { |
|
629 |
|
630 poolLock.lock(); |
|
631 try { |
|
632 |
|
633 RouteSpecificPool rospl = getRoutePool(route, true); |
|
634 rospl.dropEntry(); |
|
635 if (rospl.isUnused()) { |
|
636 routeToPool.remove(route); |
|
637 } |
|
638 |
|
639 numConnections--; |
|
640 notifyWaitingThread(rospl); |
|
641 |
|
642 } finally { |
|
643 poolLock.unlock(); |
|
644 } |
|
645 } |
|
646 |
|
647 /** |
|
648 * Notifies a waiting thread that a connection is available. |
|
649 * This will wake a thread waiting in the specific route pool, |
|
650 * if there is one. |
|
651 * Otherwise, a thread in the connection pool will be notified. |
|
652 * |
|
653 * @param rospl the pool in which to notify, or <code>null</code> |
|
654 */ |
|
655 protected void notifyWaitingThread(RouteSpecificPool rospl) { |
|
656 |
|
657 //@@@ while this strategy provides for best connection re-use, |
|
658 //@@@ is it fair? only do this if the connection is open? |
|
659 // Find the thread we are going to notify. We want to ensure that |
|
660 // each waiting thread is only interrupted once, so we will remove |
|
661 // it from all wait queues before interrupting. |
|
662 WaitingThread waitingThread = null; |
|
663 |
|
664 poolLock.lock(); |
|
665 try { |
|
666 |
|
667 if ((rospl != null) && rospl.hasThread()) { |
|
668 if (log.isDebugEnabled()) { |
|
669 log.debug("Notifying thread waiting on pool" + |
|
670 " [" + rospl.getRoute() + "]"); |
|
671 } |
|
672 waitingThread = rospl.nextThread(); |
|
673 } else if (!waitingThreads.isEmpty()) { |
|
674 if (log.isDebugEnabled()) { |
|
675 log.debug("Notifying thread waiting on any pool"); |
|
676 } |
|
677 waitingThread = waitingThreads.remove(); |
|
678 } else if (log.isDebugEnabled()) { |
|
679 log.debug("Notifying no-one, there are no waiting threads"); |
|
680 } |
|
681 |
|
682 if (waitingThread != null) { |
|
683 waitingThread.wakeup(); |
|
684 } |
|
685 |
|
686 } finally { |
|
687 poolLock.unlock(); |
|
688 } |
|
689 } |
|
690 |
|
691 |
|
692 @Override |
|
693 public void deleteClosedConnections() { |
|
694 poolLock.lock(); |
|
695 try { |
|
696 Iterator<BasicPoolEntry> iter = freeConnections.iterator(); |
|
697 while (iter.hasNext()) { |
|
698 BasicPoolEntry entry = iter.next(); |
|
699 if (!entry.getConnection().isOpen()) { |
|
700 iter.remove(); |
|
701 deleteEntry(entry); |
|
702 } |
|
703 } |
|
704 } finally { |
|
705 poolLock.unlock(); |
|
706 } |
|
707 } |
|
708 |
|
709 /** |
|
710 * Closes idle connections. |
|
711 * |
|
712 * @param idletime the time the connections should have been idle |
|
713 * in order to be closed now |
|
714 * @param tunit the unit for the <code>idletime</code> |
|
715 */ |
|
716 @Override |
|
717 public void closeIdleConnections(long idletime, TimeUnit tunit) { |
|
718 if (tunit == null) { |
|
719 throw new IllegalArgumentException("Time unit must not be null."); |
|
720 } |
|
721 if (idletime < 0) { |
|
722 idletime = 0; |
|
723 } |
|
724 if (log.isDebugEnabled()) { |
|
725 log.debug("Closing connections idle longer than " + idletime + " " + tunit); |
|
726 } |
|
727 // the latest time for which connections will be closed |
|
728 long deadline = System.currentTimeMillis() - tunit.toMillis(idletime); |
|
729 poolLock.lock(); |
|
730 try { |
|
731 Iterator<BasicPoolEntry> iter = freeConnections.iterator(); |
|
732 while (iter.hasNext()) { |
|
733 BasicPoolEntry entry = iter.next(); |
|
734 if (entry.getUpdated() <= deadline) { |
|
735 if (log.isDebugEnabled()) { |
|
736 log.debug("Closing connection last used @ " + new Date(entry.getUpdated())); |
|
737 } |
|
738 iter.remove(); |
|
739 deleteEntry(entry); |
|
740 } |
|
741 } |
|
742 } finally { |
|
743 poolLock.unlock(); |
|
744 } |
|
745 } |
|
746 |
|
747 @Override |
|
748 public void closeExpiredConnections() { |
|
749 log.debug("Closing expired connections"); |
|
750 long now = System.currentTimeMillis(); |
|
751 |
|
752 poolLock.lock(); |
|
753 try { |
|
754 Iterator<BasicPoolEntry> iter = freeConnections.iterator(); |
|
755 while (iter.hasNext()) { |
|
756 BasicPoolEntry entry = iter.next(); |
|
757 if (entry.isExpired(now)) { |
|
758 if (log.isDebugEnabled()) { |
|
759 log.debug("Closing connection expired @ " + new Date(entry.getExpiry())); |
|
760 } |
|
761 iter.remove(); |
|
762 deleteEntry(entry); |
|
763 } |
|
764 } |
|
765 } finally { |
|
766 poolLock.unlock(); |
|
767 } |
|
768 } |
|
769 |
|
770 @Override |
|
771 public void shutdown() { |
|
772 poolLock.lock(); |
|
773 try { |
|
774 if (shutdown) { |
|
775 return; |
|
776 } |
|
777 shutdown = true; |
|
778 |
|
779 // close all connections that are issued to an application |
|
780 Iterator<BasicPoolEntry> iter1 = leasedConnections.iterator(); |
|
781 while (iter1.hasNext()) { |
|
782 BasicPoolEntry entry = iter1.next(); |
|
783 iter1.remove(); |
|
784 closeConnection(entry); |
|
785 } |
|
786 |
|
787 // close all free connections |
|
788 Iterator<BasicPoolEntry> iter2 = freeConnections.iterator(); |
|
789 while (iter2.hasNext()) { |
|
790 BasicPoolEntry entry = iter2.next(); |
|
791 iter2.remove(); |
|
792 |
|
793 if (log.isDebugEnabled()) { |
|
794 log.debug("Closing connection" |
|
795 + " [" + entry.getPlannedRoute() + "][" + entry.getState() + "]"); |
|
796 } |
|
797 closeConnection(entry); |
|
798 } |
|
799 |
|
800 // wake up all waiting threads |
|
801 Iterator<WaitingThread> iwth = waitingThreads.iterator(); |
|
802 while (iwth.hasNext()) { |
|
803 WaitingThread waiter = iwth.next(); |
|
804 iwth.remove(); |
|
805 waiter.wakeup(); |
|
806 } |
|
807 |
|
808 routeToPool.clear(); |
|
809 |
|
810 } finally { |
|
811 poolLock.unlock(); |
|
812 } |
|
813 } |
|
814 |
|
815 /** |
|
816 * since 4.1 |
|
817 */ |
|
818 public void setMaxTotalConnections(int max) { |
|
819 poolLock.lock(); |
|
820 try { |
|
821 maxTotalConnections = max; |
|
822 } finally { |
|
823 poolLock.unlock(); |
|
824 } |
|
825 } |
|
826 |
|
827 |
|
828 /** |
|
829 * since 4.1 |
|
830 */ |
|
831 public int getMaxTotalConnections() { |
|
832 return maxTotalConnections; |
|
833 } |
|
834 |
|
835 } |
|
836 |