001: /**
002: * Sequoia: Database clustering technology.
003: * Copyright (C) 2002-2004 French National Institute For Research In Computer
004: * Science And Control (INRIA).
005: * Copyright (C) 2005 AmicoSoft, Inc. dba Emic Networks
006: * Contact: sequoia@continuent.org
007: *
008: * Licensed under the Apache License, Version 2.0 (the "License");
009: * you may not use this file except in compliance with the License.
010: * You may obtain a copy of the License at
011: *
012: * http://www.apache.org/licenses/LICENSE-2.0
013: *
014: * Unless required by applicable law or agreed to in writing, software
015: * distributed under the License is distributed on an "AS IS" BASIS,
016: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017: * See the License for the specific language governing permissions and
018: * limitations under the License.
019: *
020: * Initial developer(s): Emmanuel Cecchet.
021: * Contributor(s): Mathieu Peltier.
022: */package org.continuent.sequoia.controller.connection;
023:
024: import java.sql.Connection;
025: import java.sql.SQLException;
026: import java.sql.Statement;
027: import java.util.Iterator;
028: import java.util.LinkedList;
029: import java.util.NoSuchElementException;
030:
031: import org.continuent.sequoia.common.exceptions.UnreachableBackendException;
032: import org.continuent.sequoia.common.i18n.Translate;
033: import org.continuent.sequoia.common.xml.DatabasesXmlTags;
034:
035: /**
036: * This connection manager provides connection pooling with a dynamically
037: * adjustable pool size.
038: * <p>
039: * If the maximum number of active connections is not reached, the
040: * {@link #getConnection()}method creates a connection. Else, the execution is
041: * blocked until a connection is freed or the timeout expires. blocked until a
042: * connection is freed or the timeout expires.
043: * <p>
044: * Idle connections in the pool are removed after the timeout idleTimeout if the
045: * minimum pool size has not been reached.
046: *
047: * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
048: * @author <a href="mailto:Mathieu.Peltier@inrialpes.fr">Mathieu Peltier </a>
049: * @author <a href="mailto:Nicolas.Modrzyk@inrialpes.fr">Nicolas Modrzyk </a>
050: * @version 1.0
051: */
052: public class VariablePoolConnectionManager extends
053: AbstractPoolConnectionManager {
054: /** Initial pool size to be initialized at startup. */
055: private int initPoolSize;
056:
057: /** Minimum pool size. */
058: private int minPoolSize;
059:
060: /** Maximum pool size. */
061: private int maxPoolSize;
062:
063: /**
064: * Time a connection can stay idle before begin released (removed from the
065: * pool) in milliseconds (0 means forever)
066: */
067: private int idleTimeout;
068:
069: /** Maximum time to wait for a connection in milliseconds. */
070: private int waitTimeout;
071:
072: /** Stores the time on which connections have been released. */
073: private LinkedList releaseTimes;
074:
075: /** Allow to remove idle connections in the pool. */
076: private RemoveIdleConnectionsThread removeIdleConnectionsThread;
077:
078: private Connection ping;
079:
080: /**
081: * Creates a new <code>VariablePoolConnectionManager</code> instance with
082: * the default minPoolSize(initial pool size to be initialized at startup).
083: *
084: * @param backendUrl URL of the <code>DatabaseBackend</code> owning this
085: * connection manager
086: * @param backendName name of the <code>DatabaseBackend</code> owning this
087: * connection manager
088: * @param rLogin backend connection login to be used by this connection
089: * manager
090: * @param rPassword backend connection password to be used by this connection
091: * manager
092: * @param driverPath path for driver
093: * @param driverClassName class name for driver
094: * @param minPoolSize minimum pool size.
095: * @param maxPoolSize maximum pool size. 0 means no limit.
096: * @param idleTimeout time a connection can stay idle before begin released
097: * (removed from the pool) in seconds. 0 means no timeout: once
098: * allocated, connections are never released.
099: * @param waitTimeout maximum time to wait for a connection in seconds. 0
100: * means no timeout: waits until one connection is freed.
101: */
102: public VariablePoolConnectionManager(String backendUrl,
103: String backendName, String rLogin, String rPassword,
104: String driverPath, String driverClassName, int minPoolSize,
105: int maxPoolSize, int idleTimeout, int waitTimeout) {
106: this (backendUrl, backendName, rLogin, rPassword, driverPath,
107: driverClassName, minPoolSize, minPoolSize, maxPoolSize,
108: idleTimeout, waitTimeout);
109: }
110:
111: /**
112: * @see java.lang.Object#clone()
113: */
114: protected Object clone() throws CloneNotSupportedException {
115: return new VariablePoolConnectionManager(backendUrl,
116: backendName, rLogin, rPassword, driverPath,
117: driverClassName, minPoolSize, maxPoolSize, idleTimeout,
118: waitTimeout);
119: }
120:
121: /**
122: * @see org.continuent.sequoia.controller.connection.AbstractConnectionManager#clone(String,
123: * String)
124: */
125: public AbstractConnectionManager clone(String rLogin,
126: String rPassword) {
127: return new VariablePoolConnectionManager(backendUrl,
128: backendName, rLogin, rPassword, driverPath,
129: driverClassName, initPoolSize, minPoolSize,
130: maxPoolSize, idleTimeout, waitTimeout);
131: }
132:
133: /**
134: * Creates a new <code>VariablePoolConnectionManager</code> instance.
135: *
136: * @param backendUrl URL of the <code>DatabaseBackend</code> owning this
137: * connection manager
138: * @param backendName name of the <code>DatabaseBackend</code> owning this
139: * connection manager
140: * @param rLogin backend connection login to be used by this connection
141: * manager
142: * @param rPassword backend connection password to be used by this connection
143: * manager
144: * @param driverPath path for driver
145: * @param driverClassName class name for driver
146: * @param initPoolSize initial pool size to be intialized at startup
147: * @param minPoolSize minimum pool size.
148: * @param maxPoolSize maximum pool size. 0 means no limit.
149: * @param idleTimeout time a connection can stay idle before begin released
150: * (removed from the pool) in seconds. 0 means no timeout: once
151: * allocated, connections are never released.
152: * @param waitTimeout maximum time to wait for a connection in seconds. 0
153: * means no timeout: waits until one connection is freed.
154: */
155: public VariablePoolConnectionManager(String backendUrl,
156: String backendName, String rLogin, String rPassword,
157: String driverPath, String driverClassName,
158: int initPoolSize, int minPoolSize, int maxPoolSize,
159: int idleTimeout, int waitTimeout) {
160: super (
161: backendUrl,
162: backendName,
163: rLogin,
164: rPassword,
165: driverPath,
166: driverClassName,
167: maxPoolSize == 0 ? (initPoolSize > minPoolSize ? initPoolSize
168: : minPoolSize)
169: : maxPoolSize);
170: this .initPoolSize = initPoolSize;
171: this .minPoolSize = minPoolSize;
172: this .maxPoolSize = maxPoolSize;
173: this .idleTimeout = idleTimeout * 1000;
174: this .waitTimeout = waitTimeout * 1000;
175: }
176:
177: /**
178: * Gets the max pool size.
179: *
180: * @return a <code>int</code> value.
181: */
182: public int getMaxPoolSize() {
183: return maxPoolSize;
184: }
185:
186: /**
187: * Gets the min pool size.
188: *
189: * @return a <code>int</code> value.
190: */
191: public int getMinPoolSize() {
192: return minPoolSize;
193: }
194:
195: /**
196: * Gets the idle timeout.
197: *
198: * @return a <code>int</code> value.
199: */
200: public int getIdleTimeout() {
201: return idleTimeout;
202: }
203:
204: /**
205: * Gets the wait timeout.
206: *
207: * @return a <code>int</code> value.
208: */
209: public int getWaitTimeout() {
210: return waitTimeout;
211: }
212:
213: /**
214: * @see org.continuent.sequoia.controller.connection.AbstractConnectionManager#doConnectionInitialization()
215: */
216: protected void doConnectionInitialization() throws SQLException {
217: poolSize = maxPoolSize == 0 ? (initPoolSize > minPoolSize ? initPoolSize
218: : minPoolSize)
219: : maxPoolSize;
220: this .ping = getConnectionFromDriver();
221: synchronized (this ) {
222: doConnectionInitialization(initPoolSize);
223:
224: if (idleTimeout != 0) {
225: // Create the thread which manages the free connections
226: removeIdleConnectionsThread = new RemoveIdleConnectionsThread(
227: this .backendName, this );
228:
229: // Intialize release time for the initial connections if an idleTimeout
230: // is set
231: releaseTimes = new LinkedList();
232: Iterator it = freeConnections.iterator();
233: Long currentTime = new Long(System.currentTimeMillis());
234: while (it.hasNext()) {
235: it.next();
236: releaseTimes.addLast(currentTime);
237: }
238: }
239: }
240:
241: // Start the thread outside synchronized(this)
242: if (removeIdleConnectionsThread != null) {
243: removeIdleConnectionsThread.start();
244:
245: synchronized (removeIdleConnectionsThread) {
246: if (releaseTimes.size() > 0) {
247: removeIdleConnectionsThread.notify();
248: }
249: }
250: }
251: if (idlePersistentConnectionPingInterval > 0) {
252: persistentConnectionPingerThread = new IdlePersistentConnectionsPingerThread(
253: backendName, this );
254: persistentConnectionPingerThread.start();
255: idlePersistentConnectionPingRunning = true;
256: }
257: }
258:
259: /**
260: * {@inheritDoc}
261: * @see org.continuent.sequoia.controller.connection.AbstractPoolConnectionManager#doConnectionInitialization(int)
262: */
263: protected synchronized void doConnectionInitialization(
264: int initPoolSize) throws SQLException {
265: if (initialized)
266: throw new SQLException("Connection pool for backend '"
267: + backendUrl + "' already initialized");
268:
269: if (initPoolSize > poolSize) {
270: logger.warn(Translate.get(
271: "connection.max.poolsize.reached", new String[] {
272: String.valueOf(initPoolSize),
273: String.valueOf(poolSize),
274: String.valueOf(poolSize) }));
275: initPoolSize = poolSize;
276: }
277:
278: poolSize = initConnections(initPoolSize);
279: initialized = true;
280:
281: if (logger.isDebugEnabled())
282: logger.debug(Translate.get("connection.pool.initialized",
283: new String[] { String.valueOf(initPoolSize),
284: backendUrl }));
285: }
286:
287: /**
288: * @see org.continuent.sequoia.controller.connection.AbstractConnectionManager#doConnectionFinalization()
289: */
290: protected void doConnectionFinalization() throws SQLException {
291: try {
292: if (ping != null)
293: ping.close();
294: ping = null;
295: } catch (SQLException e) {
296: // ignore the errors we are shutting down
297: }
298: synchronized (this ) {
299: super .doConnectionFinalization();
300: }
301:
302: if (removeIdleConnectionsThread != null) {
303: synchronized (removeIdleConnectionsThread) {
304: removeIdleConnectionsThread.isKilled = true;
305: removeIdleConnectionsThread.notify();
306: }
307: try {
308: removeIdleConnectionsThread.join();
309: } catch (InterruptedException e) {
310: }
311: }
312: }
313:
314: /**
315: * Gets a connection from the pool.
316: * <p>
317: * If the current number of active connections is lower than the maximum pool
318: * size, a new connection is created. If the creation fails, this method waits
319: * for a connection to be freed.
320: * <p>
321: * If the maximum number of active connections is reached, this methods blocks
322: * until a connection is freed or the timeout expires.
323: *
324: * @return a connection from the pool or <code>null</code> if the timeout
325: * has expired.
326: * @throws UnreachableBackendException if the backend must be disabled
327: * @see org.continuent.sequoia.controller.connection.AbstractConnectionManager#getConnection()
328: */
329: public synchronized PooledConnection getConnection()
330: throws UnreachableBackendException {
331: if (!initialized) {
332: logger
333: .error("Requesting a connection from a non-initialized connection manager");
334: return null;
335: }
336: if (isShutdown) {
337: return null;
338: }
339: long lTimeout = waitTimeout;
340: if (freeConnections.isEmpty()) {
341: if ((maxPoolSize == 0)
342: || (activeConnections.size() < maxPoolSize)) {
343: Connection c = getConnectionFromDriver();
344: if (c == null) {
345: if (activeConnections.size() == 0) { // No connection active and backend unreachable, the backend
346: // is probably dead
347: logger.error("Backend " + backendName
348: + " is no more accessible.");
349: throw new UnreachableBackendException();
350: }
351: /*
352: * Ping the server with an open connection in order to determine
353: * whether the server has failed or is has run out of connections. If
354: * the ping fails, the server is down.
355: */
356: try {
357: Statement pingStatement = ping
358: .createStatement();
359: pingStatement.execute(connectionTestStatement);
360: pingStatement.close();
361: } catch (SQLException e) {
362: isShutdown = true;
363: logger.error("Backend " + backendName
364: + " is no more accessible.");
365: throw new UnreachableBackendException();
366: }
367: /*
368: * There are currently no connections available from the server, just
369: * wait for a connection to be freed
370: */
371:
372: if (logger.isWarnEnabled())
373: logger
374: .warn("Failed to create new connection on backend '"
375: + backendName
376: + "', waiting for a connection to be freed.");
377: } else {
378: freeConnections.addLast(new PooledConnection(c));
379: if (idleTimeout != 0) {
380: releaseTimes.add(new Long(System
381: .currentTimeMillis()));
382: }
383: poolSize++;
384: }
385: }
386:
387: /*
388: * We have to do a while loop() because there is a potential race here.
389: * When freeConnections is notified in releaseConnection, a new thread can
390: * take the lock on freeConnections before we wake up/reacquire the lock
391: * on freeConnections. Therefore, we could wake up and have no connection
392: * to take! We ensure that everything is correct with a while statement
393: * and recomputing the timeout between 2 wakeup.
394: */
395: while (freeConnections.isEmpty()) {
396: if (activeConnections.size() == 0 || isShutdown) { // No connection active and backend unreachable, the backend
397: // is probably dead
398: logger.error("Backend " + backendName
399: + " is no more accessible.");
400: throw new UnreachableBackendException();
401: }
402: // Wait
403: try {
404: if (lTimeout > 0) {
405: long start = System.currentTimeMillis();
406: // Convert seconds to milliseconds for wait call
407: this .wait(waitTimeout);
408: long end = System.currentTimeMillis();
409: lTimeout -= end - start;
410: if (lTimeout <= 0) {
411: if (logger.isWarnEnabled())
412: logger
413: .warn("Timeout expired for connection on backend '"
414: + backendName
415: + "', consider increasing pool size (current size is "
416: + poolSize
417: + ") or timeout (current timeout is "
418: + (waitTimeout / 1000)
419: + " seconds)");
420: return null;
421: }
422: } else {
423: this .wait();
424: }
425: } catch (InterruptedException e) {
426: logger
427: .error("Wait on freeConnections interrupted in VariablePoolConnectionManager");
428: return null;
429: }
430: }
431: }
432:
433: // Get the connection
434: try {
435: PooledConnection c = (PooledConnection) freeConnections
436: .removeLast();
437: if (idleTimeout != 0)
438: releaseTimes.removeLast();
439: activeConnections.add(c);
440: return c;
441: } catch (NoSuchElementException e) {
442: if (logger.isErrorEnabled())
443: logger.error("Failed to get a connection on backend '"
444: + backendName
445: + "' but an idle connection was expected");
446: return null;
447: }
448: }
449:
450: /**
451: * @see org.continuent.sequoia.controller.connection.AbstractConnectionManager#releaseConnection(org.continuent.sequoia.controller.connection.PooledConnection)
452: */
453: public void releaseConnection(PooledConnection c) {
454: boolean notifyThread = false;
455: synchronized (this ) {
456: if (!initialized) {
457: closeConnection(c);
458: return; // We probably have been disabled
459: }
460:
461: if (activeConnections.remove(c)) {
462: if (idleTimeout != 0) {
463: notifyThread = freeConnections.isEmpty()
464: || (freeConnections.size() == minPoolSize);
465: releaseTimes.addLast(new Long(System
466: .currentTimeMillis()));
467: }
468: c.removeAllTemporaryTables();
469: freeConnections.addLast(c);
470: this .notify();
471: } else
472: logger.error("Failed to release connection " + c
473: + " (not found in active pool)");
474: }
475:
476: if (notifyThread)
477: synchronized (removeIdleConnectionsThread) {
478: removeIdleConnectionsThread.notify();
479: }
480: }
481:
482: /**
483: * @see org.continuent.sequoia.controller.connection.AbstractConnectionManager#deleteConnection(org.continuent.sequoia.controller.connection.PooledConnection)
484: */
485: public synchronized void deleteConnection(PooledConnection c) {
486: closeConnection(c);
487:
488: if (!initialized)
489: return; // We probably have been disabled
490:
491: if (activeConnections.remove(c)) {
492: poolSize--;
493: if (poolSize < minPoolSize) {
494: Connection newConnection = getConnectionFromDriver();
495: if (newConnection == null) {
496: if (logger.isWarnEnabled())
497: logger
498: .warn("Bad connection : "
499: + c
500: + " has been removed but cannot be replaced.");
501: } else {
502: poolSize++;
503: freeConnections.addLast(new PooledConnection(
504: newConnection));
505: if (idleTimeout != 0)
506: releaseTimes.addLast(new Long(System
507: .currentTimeMillis()));
508: this .notify();
509: if (logger.isDebugEnabled())
510: logger
511: .debug("Bad connection "
512: + c
513: + " has been replaced by a new connection.");
514: }
515: } else if (logger.isDebugEnabled())
516: logger.debug("Bad connection " + c
517: + " has been removed.");
518: } else
519: logger.error("Failed to release connection " + c
520: + " (not found in active pool)");
521: notifyAll();
522: }
523:
524: /**
525: * @see org.continuent.sequoia.controller.connection.AbstractConnectionManager#getXmlImpl()
526: */
527: public String getXmlImpl() {
528: StringBuffer info = new StringBuffer();
529: info.append("<"
530: + DatabasesXmlTags.ELT_VariablePoolConnectionManager
531: + " " + DatabasesXmlTags.ATT_initPoolSize + "=\""
532: + initPoolSize + "\" "
533: + DatabasesXmlTags.ATT_minPoolSize + "=\""
534: + minPoolSize + "\" "
535: + DatabasesXmlTags.ATT_maxPoolSize + "=\""
536: + maxPoolSize + "\" "
537: + DatabasesXmlTags.ATT_idleTimeout + "=\""
538: + idleTimeout / 1000 + "\" "
539: + DatabasesXmlTags.ATT_waitTimeout + "=\""
540: + waitTimeout / 1000 + "\"/>");
541: return info.toString();
542: }
543:
544: /**
545: * Allows to remove idle free connections after the idleTimeout timeout.
546: *
547: * @author <a href="mailto:Mathieu.Peltier@inrialpes.fr">Mathieu Peltier </a>
548: */
549: protected class RemoveIdleConnectionsThread extends Thread {
550: private boolean isKilled = false;
551: private VariablePoolConnectionManager this Pool;
552:
553: protected RemoveIdleConnectionsThread(String pBackendName,
554: VariablePoolConnectionManager this Pool) {
555: super ("RemoveIdleConnectionsThread for backend:"
556: + pBackendName);
557: this .this Pool = this Pool;
558: }
559:
560: /**
561: * @see java.lang.Runnable#run()
562: */
563: public void run() {
564: long idleTime, releaseTime;
565: boolean isMinPoolSizeReached = false;
566: synchronized (this ) {
567: try {
568: while (!isKilled) {
569: isMinPoolSizeReached = false;
570: // the thread is not launched if idleTimeout equals to 0 (the
571: // connections are never released in this case)
572: if (freeConnections.isEmpty()) {
573: wait(); // wait on the thread RemoveIdleConnectionsThread
574: } else if (freeConnections.size() <= minPoolSize) {
575: // Ping these remaining connections when their time out occurs
576: isMinPoolSizeReached = true;
577: }
578:
579: if (isKilled)
580: continue; // Just exit
581:
582: PooledConnection c = null;
583: synchronized (this Pool) {
584: if (releaseTimes.isEmpty())
585: continue; // Sanity check
586:
587: releaseTime = ((Long) releaseTimes
588: .getFirst()).longValue();
589: idleTime = System.currentTimeMillis()
590: - releaseTime;
591:
592: if (idleTime >= idleTimeout) {
593: c = (PooledConnection) freeConnections
594: .removeFirst();
595: releaseTimes.removeFirst();
596: }
597: }
598:
599: if (c == null) { // Nothing to free, wait for next deadline
600: wait(idleTimeout - idleTime);
601: } else if (isMinPoolSizeReached) {
602: try {
603: // check the connection is still valid
604: c
605: .getConnection()
606: .createStatement()
607: .execute(
608: connectionTestStatement);
609: // and put it again in the pool
610: releaseTimes.addLast(new Long(System
611: .currentTimeMillis()));
612: freeConnections.addLast(c);
613: } catch (SQLException e) {
614: // Connection lost... we release it...
615: try {
616: c.close();
617: } catch (SQLException e1) {
618: String msg = "An error occured while closing idle connection after the timeout: "
619: + e;
620: logger.error(msg);
621: } finally {
622: poolSize--;
623: }
624: }
625: } else { // Free the connection out of the synchronized block
626: try {
627: c.close();
628: } catch (SQLException e) {
629: String msg = "An error occured while closing idle connection after the timeout: "
630: + e;
631: logger.error(msg);
632: } finally {
633: poolSize--;
634: }
635: logger
636: .debug("Released idle connection (idle timeout reached)");
637: continue;
638:
639: }
640: }
641: } catch (InterruptedException e) {
642: logger
643: .error("Wait on removeIdleConnectionsThread interrupted in VariablePoolConnectionManager: "
644: + e);
645: }
646: }
647: }
648: }
649:
650: }
|