001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.resource.connectionmanager;
023:
024: import java.util.ArrayList;
025: import java.util.Collections;
026: import java.util.HashSet;
027: import java.util.Iterator;
028: import java.util.Set;
029:
030: import javax.resource.ResourceException;
031: import javax.resource.spi.ConnectionRequestInfo;
032: import javax.resource.spi.ManagedConnection;
033: import javax.resource.spi.ManagedConnectionFactory;
034: import javax.resource.spi.ValidatingManagedConnectionFactory;
035: import javax.security.auth.Subject;
036:
037: import org.jboss.logging.Logger;
038: import org.jboss.resource.JBossResourceException;
039: import org.jboss.util.UnreachableStatementException;
040:
041: import EDU.oswego.cs.dl.util.concurrent.FIFOSemaphore;
042: import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
043:
044: /**
045: * The internal pool implementation
046: *
047: * @author <a href="mailto:d_jencks@users.sourceforge.net">David Jencks</a>
048: * @author <a href="mailto:adrian@jboss.org">Adrian Brock</a>
049: * @author <a href="weston.price@jboss.com">Weston Price</a>
050: *
051: * @version $Revision: 61055 $
052: */
053: public class InternalManagedConnectionPool {
054: /** The managed connection factory */
055: private final ManagedConnectionFactory mcf;
056:
057: /** The connection listener factory */
058: private final ConnectionListenerFactory clf;
059:
060: /** The default subject */
061: private final Subject defaultSubject;
062:
063: /** The default connection request information */
064: private final ConnectionRequestInfo defaultCri;
065:
066: /** The pooling parameters */
067: private final PoolParams poolParams;
068:
069: /** Copy of the maximum size from the pooling parameters.
070: * Dynamic changes to this value are not compatible with
071: * the semaphore which cannot change be dynamically changed.
072: */
073: private int maxSize;
074:
075: /** The available connection event listeners */
076: private ArrayList cls;
077:
078: /** The permits used to control who can checkout a connection */
079: private final FIFOSemaphore permits;
080:
081: /** The log */
082: private final Logger log;
083:
084: /** Whether trace is enabled */
085: private final boolean trace;
086:
087: /** Stats */
088: private final Counter connectionCounter = new Counter();
089:
090: /** The checked out connections */
091: private final HashSet checkedOut = new HashSet();
092:
093: /** Whether the pool has been started */
094: private boolean started = false;
095:
096: /** Whether the pool has been shutdown */
097: private SynchronizedBoolean shutdown = new SynchronizedBoolean(
098: false);
099:
100: /** the max connections ever checked out **/
101: private volatile int maxUsedConnections = 0;
102:
103: /**
104: * Create a new internal pool
105: *
106: * @param mcf the managed connection factory
107: * @param subject the subject
108: * @param cri the connection request information
109: * @param poolParams the pooling parameters
110: * @param log the log
111: */
112: protected InternalManagedConnectionPool(
113: ManagedConnectionFactory mcf,
114: ConnectionListenerFactory clf, Subject subject,
115: ConnectionRequestInfo cri, PoolParams poolParams, Logger log) {
116: this .mcf = mcf;
117: this .clf = clf;
118: defaultSubject = subject;
119: defaultCri = cri;
120: this .poolParams = poolParams;
121: this .maxSize = this .poolParams.maxSize;
122:
123: this .log = log;
124: this .trace = log.isTraceEnabled();
125: cls = new ArrayList(this .maxSize);
126: permits = new FIFOSemaphore(this .maxSize);
127:
128: if (poolParams.prefill) {
129: PoolFiller.fillPool(this );
130:
131: }
132:
133: }
134:
135: /**
136: * Initialize the pool
137: */
138: protected void initialize() {
139: if (poolParams.idleTimeout != 0)
140: IdleRemover.registerPool(this , poolParams.idleTimeout);
141:
142: if (poolParams.backgroundValidation) {
143:
144: log
145: .debug("Registering for background validation at interval "
146: + poolParams.backgroundInterval);
147: ConnectionValidator.registerPool(this ,
148: poolParams.backgroundInterval);
149:
150: }
151:
152: }
153:
154: public long getAvailableConnections() {
155: return permits.permits();
156: }
157:
158: public int getMaxConnectionsInUseCount() {
159: return maxUsedConnections;
160: }
161:
162: public int getConnectionInUseCount() {
163: return checkedOut.size();
164: }
165:
166: /**
167: * todo distinguish between connection dying while match called
168: * and bad match strategy. In latter case we should put it back in
169: * the pool.
170: */
171: public ConnectionListener getConnection(Subject subject,
172: ConnectionRequestInfo cri) throws ResourceException {
173:
174: subject = (subject == null) ? defaultSubject : subject;
175: cri = (cri == null) ? defaultCri : cri;
176: long startWait = System.currentTimeMillis();
177: try {
178: if (permits.attempt(poolParams.blockingTimeout)) {
179: //We have a permit to get a connection. Is there one in the pool already?
180: ConnectionListener cl = null;
181: do {
182: synchronized (cls) {
183: if (shutdown.get()) {
184: permits.release();
185: throw new ResourceException(
186: "The pool has been shutdown");
187: }
188:
189: if (cls.size() > 0) {
190: cl = (ConnectionListener) cls.remove(cls
191: .size() - 1);
192: checkedOut.add(cl);
193: int size = (int) (maxSize - permits
194: .permits());
195:
196: //Update the maxUsedConnections
197: if (size > maxUsedConnections)
198: maxUsedConnections = size;
199: }
200: }
201: if (cl != null) {
202: //Yes, we retrieved a ManagedConnection from the pool. Does it match?
203: try {
204: Object matchedMC = mcf
205: .matchManagedConnections(
206: Collections
207: .singleton(cl
208: .getManagedConnection()),
209: subject, cri);
210: if (matchedMC != null) {
211: if (trace)
212: log
213: .trace("supplying ManagedConnection from pool: "
214: + cl);
215: cl.grantPermit(true);
216: return cl;
217: }
218:
219: //Match did not succeed but no exception was thrown.
220: //Either we have the matching strategy wrong or the
221: //connection died while being checked. We need to
222: //distinguish these cases, but for now we always
223: //destroy the connection.
224: log
225: .warn("Destroying connection that could not be successfully matched: "
226: + cl);
227: synchronized (cls) {
228: checkedOut.remove(cl);
229: }
230: doDestroy(cl);
231: cl = null;
232:
233: } catch (Throwable t) {
234: log.warn(
235: "Throwable while trying to match ManagedConnection, destroying connection: "
236: + cl, t);
237: synchronized (cls) {
238: checkedOut.remove(cl);
239: }
240: doDestroy(cl);
241: cl = null;
242:
243: }
244: //We made it here, something went wrong and we should validate if we should continue attempting to acquire a connection
245: if (poolParams.useFastFail) {
246: log
247: .trace("Fast failing for connection attempt. No more attempts will be made to acquire connection from pool and a new connection will be created immeadiately");
248: break;
249: }
250:
251: }
252: } while (cls.size() > 0);//end of do loop
253:
254: //OK, we couldnt find a working connection from the pool. Make a new one.
255: try {
256: //No, the pool was empty, so we have to make a new one.
257: cl = createConnectionEventListener(subject, cri);
258: synchronized (cls) {
259: checkedOut.add(cl);
260: int size = (int) (maxSize - permits.permits());
261: if (size > maxUsedConnections)
262: maxUsedConnections = size;
263: }
264:
265: //lack of synch on "started" probably ok, if 2 reads occur we will just
266: //run fillPool twice, no harm done.
267: if (started == false) {
268: started = true;
269: if (poolParams.minSize > 0)
270: PoolFiller.fillPool(this );
271: }
272: if (trace)
273: log.trace("supplying new ManagedConnection: "
274: + cl);
275: cl.grantPermit(true);
276: return cl;
277: } catch (Throwable t) {
278: log.warn(
279: "Throwable while attempting to get a new connection: "
280: + cl, t);
281: //return permit and rethrow
282: synchronized (cls) {
283: checkedOut.remove(cl);
284: }
285: permits.release();
286: JBossResourceException.rethrowAsResourceException(
287: "Unexpected throwable while trying to create a connection: "
288: + cl, t);
289: throw new UnreachableStatementException();
290: }
291: } else {
292: // we timed out
293: throw new ResourceException(
294: "No ManagedConnections available within configured blocking timeout ( "
295: + poolParams.blockingTimeout
296: + " [ms] )");
297: }
298:
299: } catch (InterruptedException ie) {
300: long end = System.currentTimeMillis() - startWait;
301: throw new ResourceException(
302: "Interrupted while requesting permit! Waited "
303: + end + " ms");
304: }
305: }
306:
307: public void returnConnection(ConnectionListener cl, boolean kill) {
308: if (cl.getState() == ConnectionListener.DESTROYED) {
309: log
310: .trace("ManagedConnection is being returned after it was destroyed"
311: + cl);
312: if (cl.hasPermit()) {
313: // release semaphore
314: cl.grantPermit(false);
315: permits.release();
316: }
317:
318: return;
319: }
320:
321: if (trace)
322: log.trace("putting ManagedConnection back into pool kill="
323: + kill + " cl=" + cl);
324: try {
325: cl.getManagedConnection().cleanup();
326: } catch (ResourceException re) {
327: log.warn(
328: "ResourceException cleaning up ManagedConnection: "
329: + cl, re);
330: kill = true;
331: }
332:
333: // We need to destroy this one
334: if (cl.getState() == ConnectionListener.DESTROY)
335: kill = true;
336:
337: synchronized (cls) {
338: checkedOut.remove(cl);
339:
340: // This is really an error
341: if (kill == false && cls.size() >= poolParams.maxSize) {
342: log
343: .warn("Destroying returned connection, maximum pool size exceeded "
344: + cl);
345: kill = true;
346: }
347:
348: // If we are destroying, check the connection is not in the pool
349: if (kill) {
350: // Adrian Brock: A resource adapter can asynchronously notify us that
351: // a connection error occurred.
352: // This could happen while the connection is not checked out.
353: // e.g. JMS can do this via an ExceptionListener on the connection.
354: // I have twice had to reinstate this line of code, PLEASE DO NOT REMOTE IT!
355: cls.remove(cl);
356: }
357: // return to the pool
358: else {
359: cl.used();
360: cls.add(cl);
361: }
362:
363: if (cl.hasPermit()) {
364: // release semaphore
365: cl.grantPermit(false);
366: permits.release();
367: }
368: }
369:
370: if (kill) {
371: if (trace)
372: log.trace("Destroying returned connection " + cl);
373: doDestroy(cl);
374: }
375:
376: }
377:
378: public void flush() {
379: ArrayList destroy = null;
380: synchronized (cls) {
381: if (trace)
382: log.trace("Flushing pool checkedOut=" + checkedOut
383: + " inPool=" + cls);
384:
385: // Mark checked out connections as requiring destruction
386: for (Iterator i = checkedOut.iterator(); i.hasNext();) {
387: ConnectionListener cl = (ConnectionListener) i.next();
388: if (trace)
389: log
390: .trace("Flush marking checked out connection for destruction "
391: + cl);
392: cl.setState(ConnectionListener.DESTROY);
393: }
394: // Destroy connections in the pool
395: while (cls.size() > 0) {
396: ConnectionListener cl = (ConnectionListener) cls
397: .remove(0);
398: if (destroy == null)
399: destroy = new ArrayList();
400: destroy.add(cl);
401: }
402: }
403:
404: // We need to destroy some connections
405: if (destroy != null) {
406: for (int i = 0; i < destroy.size(); ++i) {
407: ConnectionListener cl = (ConnectionListener) destroy
408: .get(i);
409: if (trace)
410: log.trace("Destroying flushed connection " + cl);
411: doDestroy(cl);
412: }
413:
414: // We destroyed something, check the minimum.
415: if (shutdown.get() == false && poolParams.minSize > 0)
416: PoolFiller.fillPool(this );
417: }
418:
419: }
420:
421: public void removeTimedOut() {
422: ArrayList destroy = null;
423: long timeout = System.currentTimeMillis()
424: - poolParams.idleTimeout;
425: while (true) {
426: synchronized (cls) {
427: // Nothing left to destroy
428: if (cls.size() == 0)
429: break;
430:
431: // Check the first in the list
432: ConnectionListener cl = (ConnectionListener) cls.get(0);
433: if (cl.isTimedOut(timeout)) {
434: // We need to destroy this one
435: cls.remove(0);
436: if (destroy == null)
437: destroy = new ArrayList();
438: destroy.add(cl);
439: } else {
440: //They were inserted chronologically, so if this one isn't timed out, following ones won't be either.
441: break;
442: }
443: }
444: }
445:
446: // We found some connections to destroy
447: if (destroy != null) {
448: for (int i = 0; i < destroy.size(); ++i) {
449: ConnectionListener cl = (ConnectionListener) destroy
450: .get(i);
451: if (trace)
452: log.trace("Destroying timedout connection " + cl);
453: doDestroy(cl);
454: }
455:
456: // We destroyed something, check the minimum.
457: if (shutdown.get() == false && poolParams.minSize > 0)
458: PoolFiller.fillPool(this );
459: }
460: }
461:
462: /**
463: * For testing
464: */
465: public void shutdownWithoutClear() {
466: IdleRemover.unregisterPool(this );
467: IdleRemover.waitForBackgroundThread();
468: ConnectionValidator.unRegisterPool(this );
469: ConnectionValidator.waitForBackgroundThread();
470:
471: fillToMin();
472: shutdown.set(true);
473: }
474:
475: public void shutdown() {
476: shutdown.set(true);
477: IdleRemover.unregisterPool(this );
478: ConnectionValidator.unRegisterPool(this );
479: flush();
480: }
481:
482: public void fillToMin() {
483: while (true) {
484: // Get a permit - avoids a race when the pool is nearly full
485: // Also avoids unnessary fill checking when all connections are checked out
486: try {
487: if (permits.attempt(poolParams.blockingTimeout)) {
488: try {
489: if (shutdown.get())
490: return;
491:
492: // We already have enough connections
493: if (getMinSize()
494: - connectionCounter
495: .getGuaranteedCount() <= 0)
496: return;
497:
498: // Create a connection to fill the pool
499: try {
500: ConnectionListener cl = createConnectionEventListener(
501: defaultSubject, defaultCri);
502: synchronized (cls) {
503: if (trace)
504: log.trace("Filling pool cl=" + cl);
505: cls.add(cl);
506: }
507: } catch (ResourceException re) {
508: log.warn("Unable to fill pool ", re);
509: return;
510: }
511: } finally {
512: permits.release();
513: }
514: }
515: } catch (InterruptedException ignored) {
516: log
517: .trace("Interrupted while requesting permit in fillToMin");
518: }
519: }
520: }
521:
522: public int getConnectionCount() {
523: return connectionCounter.getCount();
524: }
525:
526: public int getConnectionCreatedCount() {
527: return connectionCounter.getCreatedCount();
528: }
529:
530: public int getConnectionDestroyedCount() {
531: return connectionCounter.getDestroyedCount();
532: }
533:
534: /**
535: * Create a connection event listener
536: *
537: * @param subject the subject
538: * @param cri the connection request information
539: * @return the new listener
540: * @throws ResourceException for any error
541: */
542: private ConnectionListener createConnectionEventListener(
543: Subject subject, ConnectionRequestInfo cri)
544: throws ResourceException {
545: ManagedConnection mc = mcf
546: .createManagedConnection(subject, cri);
547: connectionCounter.inc();
548: try {
549: return clf.createConnectionListener(mc, this );
550: } catch (ResourceException re) {
551: connectionCounter.dec();
552: mc.destroy();
553: throw re;
554: }
555: }
556:
557: /**
558: * Destroy a connection
559: *
560: * @param cl the connection to destroy
561: */
562: private void doDestroy(ConnectionListener cl) {
563: if (cl.getState() == ConnectionListener.DESTROYED) {
564: log.trace("ManagedConnection is already destroyed " + cl);
565: return;
566: }
567:
568: connectionCounter.dec();
569: cl.setState(ConnectionListener.DESTROYED);
570: try {
571: cl.getManagedConnection().destroy();
572: } catch (Throwable t) {
573: log
574: .debug("Exception destroying ManagedConnection "
575: + cl, t);
576: }
577: }
578:
579: public void validateConnections() throws Exception {
580:
581: if (trace)
582: log.trace("Attempting to validate connections for pool "
583: + this );
584:
585: if (permits.attempt(poolParams.blockingTimeout)) {
586:
587: boolean destroyed = false;
588:
589: try {
590:
591: while (true) {
592:
593: ConnectionListener cl = null;
594:
595: synchronized (cls) {
596: if (cls.size() == 0) {
597: break;
598: }
599:
600: cl = removeForFrequencyCheck();
601:
602: }
603:
604: if (cl == null) {
605: break;
606: }
607:
608: try {
609:
610: Set candidateSet = Collections.singleton(cl
611: .getManagedConnection());
612:
613: if (mcf instanceof ValidatingManagedConnectionFactory) {
614: ValidatingManagedConnectionFactory vcf = (ValidatingManagedConnectionFactory) mcf;
615: candidateSet = vcf
616: .getInvalidConnections(candidateSet);
617:
618: if (candidateSet != null
619: && candidateSet.size() > 0) {
620:
621: if (cl.getState() != ConnectionListener.DESTROY) {
622: doDestroy(cl);
623: destroyed = true;
624: }
625: }
626:
627: } else {
628: log
629: .warn("warning: background validation was specified with a non compliant ManagedConnectionFactory interface.");
630: }
631:
632: } finally {
633: if (!destroyed) {
634: synchronized (cls) {
635: returnForFrequencyCheck(cl);
636: }
637: }
638:
639: }
640:
641: }
642:
643: } finally {
644: permits.release();
645:
646: if (destroyed && shutdown.get() == false
647: && poolParams.minSize > 0) {
648: PoolFiller.fillPool(this );
649: }
650:
651: }
652:
653: }
654:
655: }
656:
657: private ConnectionListener removeForFrequencyCheck() {
658:
659: log.debug("Checking for connection within frequency");
660:
661: ConnectionListener cl = null;
662:
663: for (Iterator iter = cls.iterator(); iter.hasNext();) {
664:
665: cl = (ConnectionListener) iter.next();
666: long lastCheck = cl.getLastValidatedTime();
667:
668: if ((System.currentTimeMillis() - lastCheck) >= poolParams.backgroundInterval) {
669: cls.remove(cl);
670: break;
671:
672: } else {
673: cl = null;
674: }
675:
676: }
677:
678: return cl;
679: }
680:
681: private void returnForFrequencyCheck(ConnectionListener cl) {
682:
683: log.debug("Returning for connection within frequency");
684:
685: cl.setLastValidatedTime(System.currentTimeMillis());
686: cls.add(cl);
687:
688: }
689:
690: /**
691: * Guard against configurations or
692: * dynamic changes that may increase the minimum
693: * beyond the maximum
694: */
695: private int getMinSize() {
696: if (poolParams.minSize > maxSize)
697: return maxSize;
698: return poolParams.minSize;
699: }
700:
701: public static class PoolParams {
702: public int minSize = 0;
703:
704: public int maxSize = 10;
705:
706: public int blockingTimeout = 30000; //milliseconds
707:
708: public long idleTimeout = 1000 * 60 * 30; //milliseconds, 30 minutes.
709:
710: public boolean backgroundValidation; //set to false by default
711:
712: public long backgroundInterval = 1000 * 60 * 10; //milliseconds, 10 minutes;
713:
714: public boolean prefill;
715:
716: //Do we want to immeadiately break when a connection cannot be matched and not evaluate the rest of the pool?
717: public boolean useFastFail;
718: }
719:
720: /**
721: * Stats
722: */
723: private static class Counter {
724: private int created = 0;
725:
726: private int destroyed = 0;
727:
728: synchronized int getGuaranteedCount() {
729: return created - destroyed;
730: }
731:
732: int getCount() {
733: return created - destroyed;
734: }
735:
736: int getCreatedCount() {
737: return created;
738: }
739:
740: int getDestroyedCount() {
741: return destroyed;
742: }
743:
744: synchronized void inc() {
745: ++created;
746: }
747:
748: synchronized void dec() {
749: ++destroyed;
750: }
751: }
752: }
|