001: /*
002:
003: This software is OSI Certified Open Source Software.
004: OSI Certified is a certification mark of the Open Source Initiative.
005:
006: The license (Mozilla version 1.0) can be read at the MMBase site.
007: See http://www.MMBase.org/license
008:
009: */
010: package org.mmbase.module.database;
011:
012: import java.sql.*;
013: import java.util.*;
014: import org.mmbase.util.DijkstraSemaphore;
015: import org.mmbase.module.core.MMBase;
016: import org.mmbase.util.logging.Logger;
017: import org.mmbase.util.logging.Logging;
018:
019: /**
020: * JDBC Pool, a dummy interface to multiple real connection
021: * @javadoc
022: * @author vpro
023: * @version $Id: MultiPool.java,v 1.61 2007/12/06 08:05:51 michiel Exp $
024: */
025: public class MultiPool {
026:
027: private static final Logger log = Logging
028: .getLoggerInstance(MultiPool.class);
029:
030: private final List<MultiConnection> pool = new ArrayList<MultiConnection>();
031: private final List<MultiConnection> busyPool = new ArrayList<MultiConnection>();
032: private int conMax = 4;
033: private DijkstraSemaphore semaphore = null;
034: private int totalConnections = 0;
035: private int maxQueries = 500;
036: private String name;
037: private String password;
038: private String url;
039: private DatabaseSupport databaseSupport;
040:
041: private boolean doReconnect = true;
042:
043: private long maxLifeTime = 120000;
044: private long maxZeroTime = maxLifeTime / 4;
045:
046: /**
047: * @javadoc
048: */
049: MultiPool(DatabaseSupport databaseSupport, String url, String name,
050: String password, int conMax) throws SQLException {
051: this (databaseSupport, url, name, password, conMax, 500);
052:
053: }
054:
055: /**
056: * Establish connection to the JDBC Pool(s)
057: */
058: MultiPool(DatabaseSupport databaseSupport, String url, String name,
059: String password, int conMax, int maxQueries)
060: throws SQLException {
061:
062: this .conMax = conMax;
063: this .name = name;
064: this .url = url;
065: this .password = password;
066: this .maxQueries = maxQueries;
067: if (this .maxQueries <= 0)
068: doReconnect = false;
069: this .databaseSupport = databaseSupport;
070: log.service("Creating a multipool for database "
071: + name
072: + "("
073: + url
074: + ") containing : "
075: + conMax
076: + " connections, "
077: + (doReconnect ? "which will be refreshed after "
078: + this .maxQueries + " queries"
079: : "which will not be refreshed"));
080: createPool();
081: }
082:
083: /**
084: * Set the time in ms how long a query may live before it is killed.
085: * @since MMBase-1.8
086: */
087: void setMaxLifeTime(long maxLifeTime) {
088: this .maxLifeTime = maxLifeTime;
089: maxZeroTime = maxLifeTime / 4;
090: }
091:
092: /**
093: * Gets the time in ms how long a query may live before it is killed.
094: * @since MMBase-1.8
095: */
096: long getMaxLifeTime() {
097: return maxLifeTime;
098: }
099:
100: /**
101: * Fills the connection pool
102: * @since MMBase-1.7
103: */
104: protected void createPool() {
105: MMBase mmb = MMBase.getMMBase();
106: boolean logStack = true;
107: if (semaphore != null) {
108: log.error("Was already created");
109: pool.clear();
110: }
111: try {
112: while (!fillPool(logStack)) {
113: log
114: .error("Cannot run with no connections, retrying after 10 seconds for "
115: + mmb
116: + " "
117: + (mmb.isShutdown() ? "(shutdown)" : ""));
118: Thread.sleep(10000);
119: logStack = false; // don't log that mess a second time
120: if (mmb.isShutdown()) {
121: log.info("MMBase has been shutted down.");
122: return;
123: }
124: }
125: semaphore = new DijkstraSemaphore(pool.size());
126: log.service("Connection pool has " + conMax
127: + " connections");
128: } catch (InterruptedException ie) {
129: log.info("Interrupted: " + ie.getMessage());
130: }
131:
132: }
133:
134: /**
135: * Fills the connection pool.
136: * @return true if the pool contains connections and mmbase can be started.
137: * @since MMBase-1.7
138: */
139: protected boolean fillPool(boolean logStack) {
140: int errors = 0;
141: SQLException firstError = null;
142: // put connections on the pool
143: for (int i = 0; i < conMax; i++) {
144: try {
145: pool.add(getMultiConnection());
146: } catch (SQLException se) {
147: errors++;
148: if (log.isDebugEnabled()) {
149: log.debug("i: " + "error " + errors + ": "
150: + se.getMessage());
151: }
152: if (firstError == null) {
153: firstError = se;
154: }
155: }
156: }
157: if (errors > 0) {
158: String message = firstError.getMessage();
159: if (logStack) {
160: message += " " + Logging.stackTrace(firstError);
161: } else {
162: int nl = message.indexOf('\n'); // some stupid drivers (postgresql) add stacktrace to message
163: if (nl > 0) {
164: message = message.substring(0, nl) + "..."; // take most of it away again.
165: }
166: }
167:
168: log.error("Could not get all connections (" + errors
169: + " failures, multipool size now " + pool.size()
170: + " rather then " + conMax + "). First error: "
171: + message);
172: if (pool.size() < 1) { // that is fatal.
173: return false;
174: }
175: this .conMax = pool.size();
176: }
177:
178: return true;
179:
180: }
181:
182: /**
183: * Request a new 'real' Connection and wraps it in a new 'MultiConnection' object.
184: *
185: * @since MMBase-1.7
186: */
187: protected MultiConnection getMultiConnection() throws SQLException {
188: log.debug("Getting a new connection for url '" + url + "'");
189: Connection con;
190: if (name.equals("url") && password.equals("url")) {
191: con = DriverManager.getConnection(url);
192: } else {
193: con = DriverManager.getConnection(url, name, password);
194: }
195: databaseSupport.initConnection(con);
196: return new MultiConnectionImplementation(this , con);
197: }
198:
199: /**
200: * Tries to fix this multi-connection if it is broken (e.g. if database restarted).
201: * @since MMBase-1.7.1
202: */
203: protected void replaceConnection(MultiConnection multiCon)
204: throws SQLException {
205: Connection con;
206: if (name.equals("url") && password.equals("url")) {
207: con = DriverManager.getConnection(url);
208: } else {
209: con = DriverManager.getConnection(url, name, password);
210: }
211: if (con == null) {
212: log.warn("Got no connection from driver with " + url);
213: } else {
214: multiCon.wrap(con);
215: }
216: databaseSupport.initConnection(multiCon
217: .unwrap(Connection.class));
218:
219: }
220:
221: protected void finalize() {
222: shutdown();
223: }
224:
225: /**
226: * 'realcloses' all connections.
227: * @since MMBase-1.6.2
228: */
229: public void shutdown() {
230: log.info("Shutting down multipool " + this );
231: if (semaphore == null)
232: return; // nothing to shut down
233: synchronized (semaphore) {
234: try {
235: for (MultiConnection con : busyPool) {
236: con.realclose();
237: }
238: busyPool.clear();
239: for (MultiConnection con : pool) {
240: con.realclose();
241: }
242: pool.clear();
243: } catch (Throwable e) {
244: log.error(e);
245: } finally {
246: conMax = busyPool.size() + pool.size(); // should be 0 now
247: log.info("Having " + conMax + " connections now.");
248: if (conMax != 0)
249: log.error("Still having connections!");
250: }
251: }
252: }
253:
254: /**
255: * Check the connections
256: * @bad-constant Max life-time of a query must be configurable
257: */
258: void checkTime() {
259:
260: if (log.isDebugEnabled()) {
261: log.debug("JDBC -> Starting the pool check (" + this
262: + ") : busy=" + busyPool.size() + " free="
263: + pool.size());
264: }
265: if (semaphore == null || // during start-up this could happen.
266: conMax == 0 // during shut-down this could happen
267: )
268: return;
269: synchronized (semaphore) {
270:
271: int releaseCount = 0; // number of connection that are put back to pool
272:
273: //lock semaphore, so during the checks, no connections can be acquired or put back
274:
275: //// (because the methods of semaphore are synchronized)
276: //// commented out above commentline, because this is not true.
277: //// The synchronized in java works on instnaces of a class.
278: //// The code
279: ////
280: //// void synchronized myMethod() ( statement; )
281: ////
282: //// can be rewritten as
283: ////
284: //// void myMethod(){
285: //// synchronized(this) { statement; }
286: //// }
287: ////
288: //// This way the instance is only locked when the method is executing.
289: //// The statements before and after this are not synchronized.
290: //// When an instnace is not in a synchronized block the instance is not locked
291: //// and every thread can modify the instance.
292: //// Busypool is in this method not locked and can be modified in getFree and putBack.
293: //// if the busypool is iterated and modified at the same time a
294: //// ConcurrentModificationException is thrown.
295:
296: // Michiel: but the getFree and putBack actions on the two pools are also synchronized on semaphore.
297: // so nothing can edit them without having acquired the lock on semaphore.
298:
299: long nowTime = System.currentTimeMillis();
300:
301: for (Iterator<MultiConnection> i = busyPool.iterator(); i
302: .hasNext();) {
303: MultiConnection con = i.next();
304:
305: boolean isClosed = true;
306:
307: try {
308: isClosed = con.isClosed();
309: } catch (SQLException e) {
310: log
311: .warn("Could not check isClosed on connection, assuming it closed: "
312: + e.getMessage());
313: }
314:
315: if (isClosed) {
316: MultiConnection newCon = null;
317: log.warn(
318: "WILL KILL SQL because connection was closed. ID="
319: + con.hashCode() + " SQL: "
320: + con.getLastSQL(), con
321: .getStackTrace());
322: try {
323: // get a new connection to replace this one
324: newCon = getMultiConnection();
325: } catch (SQLException e) {
326: log
327: .error("Can't add connection to pool (after close) "
328: + e.toString());
329: }
330: if (newCon != null) { // successfully created new connection
331: // we close connections in a seperate thread, for those broken JDBC drivers out there
332: new ConnectionCloser(con);
333: } else {
334: // could not create new connection somewhy, but this one is **** up as well, what to do?
335: // fail every future query:
336: newCon = con; // simply put it back in the available pool, so everything will fail
337: }
338: pool.add(newCon);
339: releaseCount++;
340: i.remove();
341:
342: continue;
343: }
344:
345: long diff = nowTime - con.getStartTimeMillis();
346:
347: if (log.isDebugEnabled()) {
348: if (diff > 5000 || diff > maxZeroTime) { // don't log too often
349: log.debug("Checking a busy connection " + con
350: + " time = " + diff + " seconds");
351: }
352: }
353:
354: if (diff < maxZeroTime) {
355: // ok, just wait
356: } else if (diff < maxLifeTime) {
357: // between 30 and 120 we putback 'zero' connections
358: if (con.getLastSQL() == null
359: || con.getLastSQL().length() == 0) {
360: log.warn("null connection putBack "
361: + Logging.stackTrace());
362: pool.add(con);
363: releaseCount++;
364: i.remove();
365: }
366: } else {
367: // above 120 we close the connection and open a new one
368: MultiConnection newCon = null;
369: log.warn("WILL KILL SQL. It took already "
370: + (diff / 1000)
371: + " seconds, which is too long. ID="
372: + con.hashCode() + " SQL: "
373: + con.getLastSQL(), con.getStackTrace());
374: try {
375: // get a new connection to replace this one
376: newCon = getMultiConnection();
377: } catch (SQLException e) {
378: log
379: .error("Can't add connection to pool (after kill) "
380: + e.toString());
381: }
382: if (newCon != null) { // successfully created new connection
383: pool.add(newCon);
384: releaseCount++;
385: i.remove();
386: // we close connections in a seperate thread, for those broken JDBC drivers out there
387: new ConnectionCloser(con);
388: } else {
389: // could not create new connection somewhy, will be retried in the next cycle
390: }
391: }
392: }
393:
394: if ((busyPool.size() + pool.size()) != conMax) {
395: // cannot happen, I hope...
396: log.error("Number of connections is not correct: "
397: + busyPool.size() + " + " + pool.size() + " = "
398: + (busyPool.size() + pool.size()) + " != "
399: + conMax);
400: // Check if there are dups in the pools
401: for (MultiConnection bcon : busyPool) {
402: int j = pool.indexOf(bcon);
403: if (j >= 0) {
404: if (log.isDebugEnabled()) {
405: log.debug("duplicate connection found at "
406: + j);
407: }
408: pool.remove(j);
409: }
410: }
411:
412: while (((busyPool.size() + pool.size()) > conMax)
413: && pool.size() > 2) {
414: // Remove too much ones.
415: MultiConnection con = pool.remove(0);
416: if (log.isDebugEnabled()) {
417: log.debug("removing connection " + con);
418: }
419: }
420:
421: }
422: semaphore.release(releaseCount);
423: } // synchronized(semaphore)
424: if (log.isDebugEnabled()) {
425: log.debug("finished checkTime()");
426: }
427: }
428:
429: /**
430: * Get a free connection from the pool
431: */
432: MultiConnection getFree() {
433:
434: MultiConnection con = null;
435: try {
436: if (semaphore == null)
437: return null; // during start-up this could happen.
438: //see comment in method checkTime()
439: synchronized (semaphore) {
440: totalConnections++;
441: if (conMax == 0) { // could happen during shut down of MMBase
442: try {
443: con = getMultiConnection(); // hm....
444: con.claim();
445: return con;
446: } catch (SQLException sqe) {
447: return null; // will probably cause NPE's but well
448: }
449: }
450:
451: semaphore.acquire(); // locks until something free
452: if (log.isDebugEnabled()) {
453: log.debug("Getting free connection from pool "
454: + pool.size());
455: }
456: con = pool.remove(0);
457: con.claim();
458: try {
459: if (con.isClosed()) {
460: con = getMultiConnection();
461: con.claim();
462: log
463: .service("Got a closed connection from the connection Pool, it was replaced by a new one.");
464: }
465: } catch (SQLException sqe) {
466: log
467: .error("Closed connection could not be replaced because: "
468: + sqe.getClass().getName()
469: + ": "
470: + sqe.getMessage());
471: }
472: busyPool.add(con);
473: if (log.isDebugEnabled()) {
474: log.debug("Pool: " + pool.size() + " busypool: "
475: + busyPool.size());
476: }
477: }
478: } catch (InterruptedException e) {
479: log.error("GetFree was Interrupted");
480: }
481: return con;
482: }
483:
484: /**
485: * putback the used connection in the pool
486: */
487: void putBack(MultiConnection con) {
488: //see comment in method checkTime()
489: synchronized (semaphore) {
490: if (log.isDebugEnabled()) {
491: log.debug("Putting back to Pool: " + pool.size()
492: + " from busypool: " + busyPool.size());
493: }
494:
495: if (!busyPool.contains(con)) {
496: // try to guess wat happend, to report that in the logs
497: MMBase mmb = MMBase.getMMBase();
498: if (!mmb.isShutdown()) {
499: try {
500: if (con.isClosed()) {
501: log
502: .debug("Connection "
503: + con
504: + " as closed an not in busypool, so it was removed from busyPool by checkTime");
505: } else {
506: log.warn("Put back connection ("
507: + con.getLastSQL()
508: + ") was not in busyPool!!");
509: }
510: } catch (SQLException sqe) {
511: log.warn("Connection " + con
512: + " not in busypool : "
513: + sqe.getMessage());
514: }
515: } else {
516: log
517: .service("Connection "
518: + con.getLastSQL()
519: + " was put back, but MMBase is shut down, so it was ignored.");
520: }
521: return;
522: }
523:
524: con.release(); //Only resets time connection is busy.
525:
526: MultiConnection oldCon = con;
527:
528: if (doReconnect && (con.getUsage() > maxQueries)) {
529: log.debug("Re-oppening connection");
530:
531: boolean gotNew = false;
532: try {
533: con = getMultiConnection();
534: gotNew = true;
535: } catch (SQLException sqe) {
536: log
537: .error("Can't add connection to pool (during reconnect) "
538: + sqe.toString());
539: }
540:
541: if (gotNew) { // a new conection has successfully created, the old one can be closed
542: new ConnectionCloser(oldCon);
543: } else { // use the old one another cycle
544: log
545: .service("Will continue use original connection");
546: con.resetUsage();
547: }
548: }
549:
550: pool.add(con);
551: busyPool.remove(oldCon);
552: if (log.isDebugEnabled()) {
553: log.debug("Pool: " + pool.size() + " busypool: "
554: + busyPool.size());
555: }
556: semaphore.release();
557: }
558: }
559:
560: /**
561: * get the pool size
562: */
563: public int getSize() {
564: return pool.size();
565: }
566:
567: /**
568: * get the number of statements performed
569: */
570: public int getTotalConnectionsCreated() {
571: return totalConnections;
572: }
573:
574: /**
575: * For reporting purposes the connections in pool can be listed.
576: * An Iterator on a copy of the Pool is returned.
577: *
578: * @see JDBC#listConnections
579: */
580: public Iterator<MultiConnection> getPool() {
581: synchronized (semaphore) {
582: return new ArrayList<MultiConnection>(pool).iterator();
583: }
584: }
585:
586: /**
587: * For reporting purposes the connections in busypool can be listed.
588: * An Iterator on a copy of the BusyPool is returned.
589: * @see JDBC#listConnections
590: */
591:
592: public Iterator<MultiConnection> getBusyPool() {
593: synchronized (semaphore) {
594: return new ArrayList<MultiConnection>(busyPool).iterator();
595: }
596: }
597:
598: /**
599: * @javadoc
600: */
601: public String toString() {
602: return "dbm=" + url + ",name=" + name + ",conmax=" + conMax;
603: }
604:
605: /**
606: * Support class to close connections in a seperate thread, as some JDBC drivers
607: * have a tendency to hang themselves on a running sql close.
608: */
609: static class ConnectionCloser implements Runnable {
610: private static final Logger log = Logging
611: .getLoggerInstance(ConnectionCloser.class);
612:
613: private MultiConnection connection;
614:
615: public ConnectionCloser(MultiConnection con) {
616: connection = con;
617: start();
618: }
619:
620: /**
621: * Starts a Thread and runs this Runnable
622: */
623: protected void start() {
624: // Start up the thread
625: Thread kicker = new Thread(this , "ConnectionCloser");
626: kicker.setDaemon(true); // For the case if indeed the close 'hangs' the thread.
627: kicker.start();
628: }
629:
630: /**
631: * Close the database connection.
632: */
633: public void run() {
634: try {
635: connection.realclose();
636: } catch (Exception re) {
637: log.error("Can't close connection with ID "
638: + connection.hashCode() + ", cause: " + re);
639: }
640: log.debug("Closed connection with ID "
641: + connection.hashCode());
642: }
643: }
644: }
|