001: /**
002: * Copyright (C) 2006 NetMind Consulting Bt.
003: *
004: * This library is free software; you can redistribute it and/or
005: * modify it under the terms of the GNU Lesser General Public
006: * License as published by the Free Software Foundation; either
007: * version 3 of the License, or (at your option) any later version.
008: *
009: * This library is distributed in the hope that it will be useful,
010: * but WITHOUT ANY WARRANTY; without even the implied warranty of
011: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012: * Lesser General Public License for more details.
013: *
014: * You should have received a copy of the GNU Lesser General Public
015: * License along with this library; if not, write to the Free Software
016: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
017: */package hu.netmind.persistence;
018:
019: import java.util.LinkedList;
020: import java.util.HashMap;
021: import java.sql.Connection;
022: import javax.sql.DataSource;
023: import java.sql.DatabaseMetaData;
024: import java.util.Iterator;
025: import java.util.ResourceBundle;
026: import java.lang.reflect.Proxy;
027: import java.lang.reflect.InvocationHandler;
028: import java.lang.reflect.Method;
029: import org.apache.log4j.Logger;
030:
031: /**
032: * This is the connection source for database implementations. It pools
033: * the connections so the database implementation does not need to
034: * bother with connection allocation details. Be sure to return the
035: * connection to this pool when ready.
036: * @author Brautigam Robert
037: * @version Revision: $Revision$
038: */
039: public class ConnectionSource {
040: private static Logger logger = Logger
041: .getLogger(ConnectionSource.class);
042: private static ProfileLogger profLogger = ProfileLogger.getLogger();
043:
044: // Timeout for a single connection before it gets dumped.
045: // Note: This could be exchanged for validating a connection, but
046: // there is no certain way to do it, except running an sql statement.
047: private static long TIMEOUT = 10 * 60 * 1000; // 10 mins
048:
049: private DataSource dataSource;
050:
051: private int maxConnections;
052: private LinkedList pool;
053: private HashMap wrappers;
054:
055: public ConnectionSource(DataSource dataSource) {
056: // Initialize variables
057: this .dataSource = dataSource;
058: pool = new LinkedList();
059: wrappers = new HashMap();
060: // Set max connection count to infinite for start
061: maxConnections = 0;
062: }
063:
064: DataSource getDataSource() {
065: return dataSource;
066: }
067:
068: /**
069: * Returns whether the datasource is pooled. Currently, all
070: * datasources allocated through JNDI are considered pooled.
071: */
072: private boolean isDataSourcePooled() {
073: return !(dataSource instanceof DriverDataSource);
074: }
075:
076: /**
077: * Release all database connections. After calling this method, the
078: * connection source is considered unusable.
079: */
080: public synchronized void release() {
081: // Close all connections
082: for (int i = 0; i < pool.size(); i++) {
083: ConnectionWrapper wrapper = (ConnectionWrapper) pool.get(i);
084: try {
085: if (wrapper.connection != null)
086: wrapper.connection.close();
087: } catch (Exception e) {
088: logger.error("could not close connection on release.",
089: e);
090: }
091: }
092: // Empty pool
093: pool = new LinkedList();
094: wrappers = new HashMap();
095: }
096:
097: /**
098: * Get a new pooled connection. This method get a connection from the
099: * pool, or allocates a new connection is no pooled ones are available.
100: * @return A new connection object.
101: */
102: public synchronized Connection getConnection() {
103: // Find an available connection, if a connection becomes unusable, it
104: // is removed
105: long currentTime = System.currentTimeMillis();
106: ConnectionWrapper wrapper = null;
107: Iterator iterator = new LinkedList(pool).iterator();
108: int usedCount = 0;
109: while (iterator.hasNext()) {
110: ConnectionWrapper current = (ConnectionWrapper) iterator
111: .next();
112: boolean usable = true;
113: // If the connection is pooled elsewhere, and it is used,
114: // then we have no business with it
115: if ((current.used) && (isDataSourcePooled()))
116: continue;
117: // Check whether connection became closed somehow
118: try {
119: usable = !current.connection.isClosed();
120: } catch (Exception e) {
121: // Ok, probably closed
122: logger
123: .debug(
124: "while checking closed status, connection threw",
125: e);
126: usable = false;
127: }
128: // Check for timeout
129: if (current.lastUsed + TIMEOUT < currentTime) {
130: usable = false;
131: if (current.used)
132: logger
133: .warn("a connection is used, but reached timeout and will be reclaimed, possible unbalanced transaction handling!");
134: }
135: // If usable, then return this wrapper, else close it
136: if (usable) {
137: if (!current.used) {
138: // This is usable but not used, so return it. Next iterations
139: // can override this still, so always the last usable
140: // connection will be used.
141: wrapper = current;
142: } else {
143: // Connection is used
144: usedCount++;
145: }
146: } else {
147: // Connection is unusable, so forget it.
148: }
149: }
150: // Allocate connection if not yet found
151: if (wrapper == null) {
152: // Allocate connection
153: wrapper = new ConnectionWrapper();
154: try {
155: logger
156: .debug("connection pool has: "
157: + pool.size()
158: + " connections, maximum connections allocated at any given time: "
159: + maxConnections
160: + ", need new connection.");
161: wrapper.connection = dataSource.getConnection();
162: wrapper.connection.setAutoCommit(false);
163: } catch (StoreException e) {
164: throw e;
165: } catch (Exception e) {
166: throw new StoreException(
167: "could not get a new connection from datasouce, current pool size: "
168: + pool.size(), e);
169: }
170: // Add to pool
171: pool.add(wrapper);
172: wrappers.put(wrapper.connection, wrapper);
173: // Successful connection
174: if (pool.size() > maxConnections)
175: maxConnections = pool.size();
176: }
177: // Profile
178: profLogger.profile("connectionpool",
179: "Connection used/pool/wrappers: " + usedCount + "/"
180: + pool.size() + "/" + wrappers.size());
181: // Return connection if one has been found
182: wrapper.used = true;
183: wrapper.lastUsed = currentTime;
184: // Create a wrapper on the connection object, so each use
185: // of it's prepareStatement() can update it's wrapper's last used time.
186: try {
187: return (Connection) Proxy.newProxyInstance(getClass()
188: .getClassLoader(),
189: new Class[] { Connection.class },
190: new WrapperHandler(wrapper));
191: } catch (Exception e) {
192: throw new StoreException(
193: "exception while creating connection proxy", e);
194: }
195: }
196:
197: /**
198: * Close a wrapper, and drop from pool.
199: */
200: private void closeWrapper(ConnectionWrapper wrapper) {
201: pool.remove(wrapper);
202: wrappers.remove(wrapper.connection);
203: // Close it too
204: try {
205: wrapper.connection.close();
206: } catch (Exception e) {
207: logger
208: .debug(
209: "could not close wrapper, this is no problem, it may already been closed",
210: e);
211: }
212: }
213:
214: /**
215: * Release a connection back to the pool.
216: * @param connection The connection to release.
217: */
218: public synchronized void releaseConnection(Connection connection) {
219: ConnectionWrapper wrapper = (ConnectionWrapper) wrappers
220: .get(connection);
221: if (wrapper != null)
222: wrapper.used = false;
223: // If the data source is pool, then drop this wrapper and
224: // do not return it to the pool
225: if (isDataSourcePooled())
226: closeWrapper(wrapper);
227: }
228:
229: private class WrapperHandler implements InvocationHandler {
230: private ConnectionWrapper wrapper;
231:
232: public WrapperHandler(ConnectionWrapper wrapper) {
233: this .wrapper = wrapper;
234: }
235:
236: public Object invoke(Object proxy, Method method, Object[] args)
237: throws Throwable {
238: // If method is prepareStatement(), then update wrapper's
239: // last used indicator.
240: if (method.getName().equals("prepareStatement"))
241: wrapper.lastUsed = System.currentTimeMillis();
242: // Call wrapped connection
243: return method.invoke(wrapper.connection, args);
244: }
245: }
246:
247: private class ConnectionWrapper {
248: public volatile boolean used = false;
249: public volatile long lastUsed = 0;
250: public Connection connection = null;
251: }
252:
253: static {
254: try {
255: ResourceBundle config = ResourceBundle
256: .getBundle("beankeeper");
257: TIMEOUT = Integer.valueOf(
258: config.getString("pool.connection_timeout"))
259: .intValue();
260: } catch (Exception e) {
261: logger
262: .error(
263: "could not read configuration file, using hardcoded defaults.",
264: e);
265: }
266: }
267: }
|