001: /*
002: Copyright (C) 2007 MySQL AB
003:
004: This program is free software; you can redistribute it and/or modify
005: it under the terms of version 2 of the GNU General Public License as
006: published by the Free Software Foundation.
007:
008: There are special exceptions to the terms and conditions of the GPL
009: as it is applied to this software. View the full text of the
010: exception in file EXCEPTIONS-CONNECTOR-J in the directory of this
011: software distribution.
012:
013: This program is distributed in the hope that it will be useful,
014: but WITHOUT ANY WARRANTY; without even the implied warranty of
015: MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
016: GNU General Public License for more details.
017:
018: You should have received a copy of the GNU General Public License
019: along with this program; if not, write to the Free Software
020: Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
021:
022: */
023:
024: package com.mysql.jdbc;
025:
026: import java.lang.reflect.InvocationHandler;
027: import java.lang.reflect.InvocationTargetException;
028: import java.lang.reflect.Method;
029: import java.lang.reflect.Proxy;
030: import java.sql.SQLException;
031: import java.util.HashMap;
032: import java.util.Iterator;
033: import java.util.List;
034: import java.util.Map;
035: import java.util.Properties;
036:
037: /**
038: * An implementation of java.sql.Connection that load balances requests across a
039: * series of MySQL JDBC connections, where the balancing takes place at
040: * transaction commit.
041: *
042: * Therefore, for this to work (at all), you must use transactions, even if only
043: * reading data.
044: *
045: * This implementation will invalidate connections that it detects have had
046: * communication errors when processing a request. A new connection to the
047: * problematic host will be attempted the next time it is selected by the load
048: * balancing algorithm.
049: *
050: * This implementation is thread-safe, but it's questionable whether sharing a
051: * connection instance amongst threads is a good idea, given that transactions
052: * are scoped to connections in JDBC.
053: *
054: * @version $Id: $
055: *
056: */
057: public class LoadBalancingConnectionProxy implements InvocationHandler,
058: PingTarget {
059:
060: private static Method getLocalTimeMethod;
061:
062: static {
063: try {
064: getLocalTimeMethod = System.class.getMethod("nanoTime",
065: new Class[0]);
066: } catch (SecurityException e) {
067: // ignore
068: } catch (NoSuchMethodException e) {
069: // ignore
070: }
071: }
072:
073: interface BalanceStrategy {
074: abstract Connection pickConnection() throws SQLException;
075: }
076:
077: class BestResponseTimeBalanceStrategy implements BalanceStrategy {
078:
079: public Connection pickConnection() throws SQLException {
080: long minResponseTime = Long.MAX_VALUE;
081:
082: int bestHostIndex = 0;
083:
084: for (int i = 0; i < responseTimes.length; i++) {
085: long candidateResponseTime = responseTimes[i];
086:
087: if (candidateResponseTime < minResponseTime) {
088: if (candidateResponseTime == 0) {
089: bestHostIndex = i;
090:
091: break;
092: }
093:
094: bestHostIndex = i;
095: minResponseTime = candidateResponseTime;
096: }
097: }
098:
099: String bestHost = (String) hostList.get(bestHostIndex);
100:
101: Connection conn = (Connection) liveConnections
102: .get(bestHost);
103:
104: if (conn == null) {
105: conn = createConnectionForHost(bestHost);
106: }
107:
108: return conn;
109: }
110: }
111:
112: // Lifted from C/J 5.1's JDBC-2.0 connection pool classes, let's merge this
113: // if/when this gets into 5.1
114: protected class ConnectionErrorFiringInvocationHandler implements
115: InvocationHandler {
116: Object invokeOn = null;
117:
118: public ConnectionErrorFiringInvocationHandler(Object toInvokeOn) {
119: invokeOn = toInvokeOn;
120: }
121:
122: public Object invoke(Object proxy, Method method, Object[] args)
123: throws Throwable {
124: Object result = null;
125:
126: try {
127: result = method.invoke(invokeOn, args);
128:
129: if (result != null) {
130: result = proxyIfInterfaceIsJdbc(result, result
131: .getClass());
132: }
133: } catch (InvocationTargetException e) {
134: dealWithInvocationException(e);
135: }
136:
137: return result;
138: }
139: }
140:
141: class RandomBalanceStrategy implements BalanceStrategy {
142:
143: public Connection pickConnection() throws SQLException {
144: int random = (int) (Math.random() * hostList.size());
145:
146: if (random == hostList.size()) {
147: random--;
148: }
149:
150: String hostPortSpec = (String) hostList.get(random);
151:
152: Connection conn = (Connection) liveConnections
153: .get(hostPortSpec);
154:
155: if (conn == null) {
156: conn = createConnectionForHost(hostPortSpec);
157: }
158:
159: return conn;
160: }
161: }
162:
163: private Connection currentConn;
164:
165: private List hostList;
166:
167: private Map liveConnections;
168:
169: private Map connectionsToHostsMap;
170:
171: private long[] responseTimes;
172:
173: private Map hostsToListIndexMap;
174:
175: boolean inTransaction = false;
176:
177: long transactionStartTime = 0;
178:
179: Properties localProps;
180:
181: boolean isClosed = false;
182:
183: BalanceStrategy balancer;
184:
185: /**
186: * Creates a proxy for java.sql.Connection that routes requests between the
187: * given list of host:port and uses the given properties when creating
188: * connections.
189: *
190: * @param hosts
191: * @param props
192: * @throws SQLException
193: */
194: LoadBalancingConnectionProxy(List hosts, Properties props)
195: throws SQLException {
196: this .hostList = hosts;
197:
198: int numHosts = this .hostList.size();
199:
200: this .liveConnections = new HashMap(numHosts);
201: this .connectionsToHostsMap = new HashMap(numHosts);
202: this .responseTimes = new long[numHosts];
203: this .hostsToListIndexMap = new HashMap(numHosts);
204:
205: for (int i = 0; i < numHosts; i++) {
206: this .hostsToListIndexMap.put(this .hostList.get(i),
207: new Integer(i));
208: }
209:
210: this .localProps = (Properties) props.clone();
211: this .localProps.remove(NonRegisteringDriver.HOST_PROPERTY_KEY);
212: this .localProps.remove(NonRegisteringDriver.PORT_PROPERTY_KEY);
213: this .localProps.setProperty("useLocalSessionState", "true");
214:
215: String strategy = this .localProps.getProperty(
216: "loadBalanceStrategy", "random");
217:
218: if ("random".equals(strategy)) {
219: this .balancer = new RandomBalanceStrategy();
220: } else if ("bestResponseTime".equals(strategy)) {
221: this .balancer = new BestResponseTimeBalanceStrategy();
222: } else {
223: throw SQLError.createSQLException(Messages.getString(
224: "InvalidLoadBalanceStrategy",
225: new Object[] { strategy }),
226: SQLError.SQL_STATE_ILLEGAL_ARGUMENT);
227: }
228:
229: pickNewConnection();
230: }
231:
232: /**
233: * Creates a new physical connection for the given host:port and updates
234: * required internal mappings and statistics for that connection.
235: *
236: * @param hostPortSpec
237: * @return
238: * @throws SQLException
239: */
240: private synchronized Connection createConnectionForHost(
241: String hostPortSpec) throws SQLException {
242: Properties connProps = (Properties) this .localProps.clone();
243:
244: String[] hostPortPair = NonRegisteringDriver
245: .parseHostPortPair(hostPortSpec);
246:
247: if (hostPortPair[1] == null) {
248: hostPortPair[1] = "3306";
249: }
250:
251: connProps.setProperty(NonRegisteringDriver.HOST_PROPERTY_KEY,
252: hostPortSpec);
253: connProps.setProperty(NonRegisteringDriver.PORT_PROPERTY_KEY,
254: hostPortPair[1]);
255:
256: Connection conn = new ConnectionImpl(hostPortSpec, Integer
257: .parseInt(hostPortPair[1]), connProps, connProps
258: .getProperty(NonRegisteringDriver.DBNAME_PROPERTY_KEY),
259: "jdbc:mysql://" + hostPortPair[0] + ":"
260: + hostPortPair[1] + "/");
261:
262: this .liveConnections.put(hostPortSpec, conn);
263: this .connectionsToHostsMap.put(conn, hostPortSpec);
264:
265: return conn;
266: }
267:
268: /**
269: * @param e
270: * @throws SQLException
271: * @throws Throwable
272: * @throws InvocationTargetException
273: */
274: void dealWithInvocationException(InvocationTargetException e)
275: throws SQLException, Throwable, InvocationTargetException {
276: Throwable t = e.getTargetException();
277:
278: if (t != null) {
279: if (t instanceof SQLException) {
280: String sqlState = ((SQLException) t).getSQLState();
281:
282: if (sqlState != null) {
283: if (sqlState.startsWith("08")) {
284: // connection error, close up shop on current
285: // connection
286: invalidateCurrentConnection();
287: }
288: }
289: }
290:
291: throw t;
292: }
293:
294: throw e;
295: }
296:
297: /**
298: * Closes current connection and removes it from required mappings.
299: *
300: * @throws SQLException
301: */
302: synchronized void invalidateCurrentConnection() throws SQLException {
303: try {
304: if (!this .currentConn.isClosed()) {
305: this .currentConn.close();
306: }
307:
308: } finally {
309: this .liveConnections.remove(this .connectionsToHostsMap
310: .get(this .currentConn));
311: this .connectionsToHostsMap.remove(this .currentConn);
312: }
313: }
314:
315: /**
316: * Proxies method invocation on the java.sql.Connection interface, trapping
317: * "close", "isClosed" and "commit/rollback" (to switch connections for load
318: * balancing).
319: */
320: public Object invoke(Object proxy, Method method, Object[] args)
321: throws Throwable {
322:
323: String methodName = method.getName();
324:
325: if ("close".equals(methodName)) {
326: synchronized (this .liveConnections) {
327: // close all underlying connections
328: Iterator allConnections = this .liveConnections.values()
329: .iterator();
330:
331: while (allConnections.hasNext()) {
332: ((Connection) allConnections.next()).close();
333: }
334:
335: this .liveConnections.clear();
336: this .connectionsToHostsMap.clear();
337: }
338:
339: return null;
340: }
341:
342: if ("isClosed".equals(methodName)) {
343: return Boolean.valueOf(this .isClosed);
344: }
345:
346: if (this .isClosed) {
347: throw SQLError.createSQLException(
348: "No operations allowed after connection closed.",
349: SQLError.SQL_STATE_CONNECTION_NOT_OPEN);
350: }
351:
352: if (!inTransaction) {
353: this .inTransaction = true;
354: this .transactionStartTime = getLocalTimeBestResolution();
355: }
356:
357: Object result = null;
358:
359: try {
360: result = method.invoke(this .currentConn, args);
361:
362: if (result != null) {
363: if (result instanceof com.mysql.jdbc.Statement) {
364: ((com.mysql.jdbc.Statement) result)
365: .setPingTarget(this );
366: }
367:
368: result = proxyIfInterfaceIsJdbc(result, result
369: .getClass());
370: }
371: } catch (InvocationTargetException e) {
372: dealWithInvocationException(e);
373: } finally {
374: if ("commit".equals(methodName)
375: || "rollback".equals(methodName)) {
376: this .inTransaction = false;
377:
378: // Update stats
379: int hostIndex = ((Integer) this .hostsToListIndexMap
380: .get(this .connectionsToHostsMap
381: .get(this .currentConn))).intValue();
382:
383: this .responseTimes[hostIndex] = getLocalTimeBestResolution()
384: - this .transactionStartTime;
385:
386: pickNewConnection();
387: }
388: }
389:
390: return result;
391: }
392:
393: /**
394: * Picks the "best" connection to use for the next transaction based on the
395: * BalanceStrategy in use.
396: *
397: * @throws SQLException
398: */
399: private synchronized void pickNewConnection() throws SQLException {
400: if (this .currentConn == null) {
401: this .currentConn = this .balancer.pickConnection();
402:
403: return;
404: }
405:
406: Connection newConn = this .balancer.pickConnection();
407:
408: newConn.setTransactionIsolation(this .currentConn
409: .getTransactionIsolation());
410: newConn.setAutoCommit(this .currentConn.getAutoCommit());
411: this .currentConn = newConn;
412: }
413:
414: /**
415: * Recursively checks for interfaces on the given object to determine if it
416: * implements a java.sql interface, and if so, proxies the instance so that
417: * we can catch and fire SQL errors.
418: *
419: * @param toProxy
420: * @param clazz
421: * @return
422: */
423: Object proxyIfInterfaceIsJdbc(Object toProxy, Class clazz) {
424: Class[] interfaces = clazz.getInterfaces();
425:
426: for (int i = 0; i < interfaces.length; i++) {
427: String packageName = interfaces[i].getPackage().getName();
428:
429: if ("java.sql".equals(packageName)
430: || "javax.sql".equals(packageName)) {
431: return Proxy.newProxyInstance(toProxy.getClass()
432: .getClassLoader(), interfaces,
433: new ConnectionErrorFiringInvocationHandler(
434: toProxy));
435: }
436:
437: return proxyIfInterfaceIsJdbc(toProxy, interfaces[i]);
438: }
439:
440: return toProxy;
441: }
442:
443: /**
444: * Returns best-resolution representation of local time, using nanoTime() if
445: * availble, otherwise defaulting to currentTimeMillis().
446: */
447: private static long getLocalTimeBestResolution() {
448: if (getLocalTimeMethod != null) {
449: try {
450: return ((Long) getLocalTimeMethod.invoke(null, null))
451: .longValue();
452: } catch (IllegalArgumentException e) {
453: // ignore - we fall through to currentTimeMillis()
454: } catch (IllegalAccessException e) {
455: // ignore - we fall through to currentTimeMillis()
456: } catch (InvocationTargetException e) {
457: // ignore - we fall through to currentTimeMillis()
458: }
459: }
460:
461: return System.currentTimeMillis();
462: }
463:
464: public synchronized void doPing() throws SQLException {
465: Iterator allConns = this .liveConnections.values().iterator();
466:
467: while (allConns.hasNext()) {
468: ((Connection) allConns.next()).ping();
469: }
470: }
471: }
|