001: /*
002: * HA-JDBC: High-Availability JDBC
003: * Copyright (c) 2004-2007 Paul Ferraro
004: *
005: * This library is free software; you can redistribute it and/or modify it
006: * under the terms of the GNU Lesser General Public License as published by the
007: * Free Software Foundation; either version 2.1 of the License, or (at your
008: * option) any later version.
009: *
010: * This library is distributed in the hope that it will be useful, but WITHOUT
011: * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
012: * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
013: * for more details.
014: *
015: * You should have received a copy of the GNU Lesser General Public License
016: * along with this library; if not, write to the Free Software Foundation,
017: * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
018: *
019: * Contact: ferraro@users.sourceforge.net
020: */
021: package net.sf.hajdbc.sql;
022:
023: import java.lang.reflect.Method;
024: import java.sql.Connection;
025: import java.sql.ResultSet;
026: import java.sql.SQLException;
027: import java.sql.Statement;
028: import java.util.ArrayList;
029: import java.util.Collections;
030: import java.util.LinkedList;
031: import java.util.List;
032: import java.util.Map;
033: import java.util.Set;
034: import java.util.SortedMap;
035: import java.util.TreeSet;
036: import java.util.concurrent.locks.Lock;
037:
038: import net.sf.hajdbc.Database;
039: import net.sf.hajdbc.LockManager;
040: import net.sf.hajdbc.Messages;
041: import net.sf.hajdbc.TableProperties;
042: import net.sf.hajdbc.util.SQLExceptionFactory;
043: import net.sf.hajdbc.util.reflect.Methods;
044:
045: /**
046: * @author Paul Ferraro
047: * @param <D>
048: * @param <S>
049: */
050: @SuppressWarnings("nls")
051: public abstract class AbstractStatementInvocationHandler<D, S extends Statement>
052: extends AbstractChildInvocationHandler<D, Connection, S> {
053: private static final Set<Method> driverReadMethodSet = Methods
054: .findMethods(Statement.class, "getFetchDirection",
055: "getFetchSize", "getGeneratedKeys",
056: "getMaxFieldSize", "getMaxRows", "getQueryTimeout",
057: "getResultSetConcurrency",
058: "getResultSetHoldability", "getResultSetType",
059: "getUpdateCount", "getWarnings", "isClosed",
060: "isPoolable");
061: private static final Set<Method> driverWriteMethodSet = Methods
062: .findMethods(Statement.class, "addBatch", "clearBatch",
063: "clearWarnings", "setCursorName",
064: "setEscapeProcessing", "setFetchDirection",
065: "setFetchSize", "setMaxFieldSize", "setMaxRows",
066: "setPoolable", "setQueryTimeout");
067: private static final Set<Method> executeMethodSet = Methods
068: .findMethods(Statement.class, "execute(Update)?");
069:
070: private static final Method getConnectionMethod = Methods
071: .getMethod(Statement.class, "getConnection");
072: private static final Method executeQueryMethod = Methods.getMethod(
073: Statement.class, "executeQuery", String.class);
074: private static final Method clearBatchMethod = Methods.getMethod(
075: Statement.class, "clearBatch");
076: private static final Method executeBatchMethod = Methods.getMethod(
077: Statement.class, "executeBatch");
078: private static final Method getMoreResultsMethod = Methods
079: .getMethod(Statement.class, "getMoreResults", Integer.TYPE);
080: private static final Method getResultSetMethod = Methods.getMethod(
081: Statement.class, "getResultSet");
082: private static final Method addBatchMethod = Methods.getMethod(
083: Statement.class, "addBatch", String.class);
084: private static final Method closeMethod = Methods.getMethod(
085: Statement.class, "close");
086:
087: protected TransactionContext<D> transactionContext;
088: protected FileSupport fileSupport;
089:
090: private List<Invoker<D, S, ?>> invokerList = new LinkedList<Invoker<D, S, ?>>();
091: private List<String> sqlList = new LinkedList<String>();
092:
093: /**
094: * @param connection the parent connection of this statement
095: * @param proxy the parent invocation handler
096: * @param invoker the invoker that created this statement
097: * @param statementClass
098: * @param statementMap a map of database to underlying statement
099: * @param transactionContext
100: * @param fileSupport support object for streams
101: * @throws Exception
102: */
103: protected AbstractStatementInvocationHandler(Connection connection,
104: SQLProxy<D, Connection> proxy,
105: Invoker<D, Connection, S> invoker, Class<S> statementClass,
106: Map<Database<D>, S> statementMap,
107: TransactionContext<D> transactionContext,
108: FileSupport fileSupport) throws Exception {
109: super (connection, proxy, invoker, statementClass, statementMap);
110:
111: this .transactionContext = transactionContext;
112: this .fileSupport = fileSupport;
113: }
114:
115: /**
116: * @see net.sf.hajdbc.sql.AbstractChildInvocationHandler#getInvocationStrategy(java.lang.Object, java.lang.reflect.Method, java.lang.Object[])
117: */
118: @Override
119: protected InvocationStrategy<D, S, ?> getInvocationStrategy(
120: S statement, Method method, Object[] parameters)
121: throws Exception {
122: if (driverReadMethodSet.contains(method)) {
123: return new DriverReadInvocationStrategy<D, S, Object>();
124: }
125:
126: if (driverWriteMethodSet.contains(method)) {
127: return new DriverWriteInvocationStrategy<D, S, Object>();
128: }
129:
130: if (executeMethodSet.contains(method)) {
131: List<Lock> lockList = this
132: .extractLocks((String) parameters[0]);
133:
134: return this .transactionContext
135: .start(
136: new LockingInvocationStrategy<D, S, Object>(
137: new DatabaseWriteInvocationStrategy<D, S, Object>(
138: this .cluster
139: .getTransactionalExecutor()),
140: lockList), this .getParent());
141: }
142:
143: if (method.equals(getConnectionMethod)) {
144: return new InvocationStrategy<D, S, Connection>() {
145: public Connection invoke(SQLProxy<D, S> proxy,
146: Invoker<D, S, Connection> invoker)
147: throws Exception {
148: return AbstractStatementInvocationHandler.this
149: .getParent();
150: }
151: };
152: }
153:
154: if (method.equals(executeQueryMethod)) {
155: String sql = (String) parameters[0];
156:
157: List<Lock> lockList = this .extractLocks(sql);
158:
159: int concurrency = statement.getResultSetConcurrency();
160: boolean selectForUpdate = this .isSelectForUpdate(sql);
161:
162: if (lockList.isEmpty()
163: && (concurrency == ResultSet.CONCUR_READ_ONLY)
164: && !selectForUpdate) {
165: return new LazyResultSetInvocationStrategy<D, S>(
166: statement, this .transactionContext,
167: this .fileSupport);
168: }
169:
170: InvocationStrategy<D, S, ResultSet> strategy = new LockingInvocationStrategy<D, S, ResultSet>(
171: new EagerResultSetInvocationStrategy<D, S>(
172: this .cluster, statement,
173: this .transactionContext, this .fileSupport),
174: lockList);
175:
176: return selectForUpdate ? this .transactionContext.start(
177: strategy, this .getParent()) : strategy;
178: }
179:
180: if (method.equals(executeBatchMethod)) {
181: List<Lock> lockList = this .extractLocks(this .sqlList);
182:
183: return this .transactionContext
184: .start(
185: new LockingInvocationStrategy<D, S, Object>(
186: new DatabaseWriteInvocationStrategy<D, S, Object>(
187: this .cluster
188: .getTransactionalExecutor()),
189: lockList), this .getParent());
190: }
191:
192: if (method.equals(getMoreResultsMethod)) {
193: if (parameters[0].equals(Statement.KEEP_CURRENT_RESULT)) {
194: return new DriverWriteInvocationStrategy<D, S, Object>();
195: }
196: }
197:
198: if (method.equals(getResultSetMethod)) {
199: if (statement.getResultSetConcurrency() == ResultSet.CONCUR_READ_ONLY) {
200: return new LazyResultSetInvocationStrategy<D, S>(
201: statement, this .transactionContext,
202: this .fileSupport);
203: }
204:
205: return new EagerResultSetInvocationStrategy<D, S>(
206: this .cluster, statement, this .transactionContext,
207: this .fileSupport);
208: }
209:
210: return super .getInvocationStrategy(statement, method,
211: parameters);
212: }
213:
214: /**
215: * @see net.sf.hajdbc.sql.AbstractChildInvocationHandler#isSQLMethod(java.lang.reflect.Method)
216: */
217: @Override
218: protected boolean isSQLMethod(Method method) {
219: return method.equals(addBatchMethod)
220: || method.equals(executeQueryMethod)
221: || executeMethodSet.contains(method);
222: }
223:
224: /**
225: * @see net.sf.hajdbc.sql.AbstractChildInvocationHandler#postInvoke(java.lang.Object, java.lang.reflect.Method, java.lang.Object[])
226: */
227: @Override
228: protected void postInvoke(S statement, Method method,
229: Object[] parameters) {
230: if (method.equals(addBatchMethod)) {
231: this .sqlList.add((String) parameters[0]);
232: } else if (method.equals(clearBatchMethod)
233: || method.equals(executeBatchMethod)) {
234: this .sqlList.clear();
235: } else if (method.equals(closeMethod)) {
236: this .getParentProxy().removeChild(this );
237: }
238: }
239:
240: /**
241: * @see net.sf.hajdbc.sql.SQLProxy#handlePartialFailure(java.util.SortedMap, java.util.SortedMap)
242: */
243: @Override
244: public <R> SortedMap<Database<D>, R> handlePartialFailure(
245: SortedMap<Database<D>, R> resultMap,
246: SortedMap<Database<D>, Exception> exceptionMap)
247: throws Exception {
248: if (this .getParent().getAutoCommit()) {
249: return super .handlePartialFailure(resultMap, exceptionMap);
250: }
251:
252: // If auto-commit is off, throw exception to give client the opportunity to rollback the transaction
253: Map<Boolean, List<Database<D>>> aliveMap = this .cluster
254: .getAliveMap(exceptionMap.keySet());
255:
256: List<Database<D>> aliveList = aliveMap.get(true);
257:
258: int size = aliveList.size();
259:
260: // Assume successful databases are alive
261: aliveList.addAll(resultMap.keySet());
262:
263: this .detectClusterPanic(aliveMap);
264:
265: List<Database<D>> deadList = aliveMap.get(false);
266:
267: for (Database<D> database : deadList) {
268: if (this .cluster.deactivate(database, this .cluster
269: .getStateManager())) {
270: this .logger.error(Messages.getMessage(
271: Messages.DATABASE_DEACTIVATED, database,
272: this .cluster), exceptionMap.get(database));
273: }
274: }
275:
276: // If failed databases are all dead
277: if (size == 0) {
278: return resultMap;
279: }
280:
281: // Chain exceptions from alive databases
282: SQLException exception = SQLExceptionFactory
283: .createSQLException(exceptionMap.get(aliveList.get(0)));
284:
285: for (Database<D> database : aliveList.subList(1, size)) {
286: exception.setNextException(SQLExceptionFactory
287: .createSQLException(exceptionMap.get(database)));
288: }
289:
290: throw exception;
291: }
292:
293: protected boolean isSelectForUpdate(String sql) throws SQLException {
294: return this .cluster.getDatabaseMetaDataCache()
295: .getDatabaseProperties(this .getParent())
296: .supportsSelectForUpdate() ? this .cluster.getDialect()
297: .isSelectForUpdate(sql) : false;
298: }
299:
300: protected List<Lock> extractLocks(String sql) throws SQLException {
301: return this .extractLocks(Collections.singletonList(sql));
302: }
303:
304: private List<Lock> extractLocks(List<String> sqlList)
305: throws SQLException {
306: Set<String> identifierSet = new TreeSet<String>();
307:
308: for (String sql : sqlList) {
309: if (this .cluster.isSequenceDetectionEnabled()) {
310: String sequence = this .cluster.getDialect()
311: .parseSequence(sql);
312:
313: if (sequence != null) {
314: identifierSet.add(sequence);
315: }
316: }
317:
318: if (this .cluster.isIdentityColumnDetectionEnabled()) {
319: String table = this .cluster.getDialect()
320: .parseInsertTable(sql);
321:
322: if (table != null) {
323: TableProperties tableProperties = this .cluster
324: .getDatabaseMetaDataCache()
325: .getDatabaseProperties(this .getParent())
326: .findTable(table);
327:
328: if (!tableProperties.getIdentityColumns().isEmpty()) {
329: identifierSet.add(tableProperties.getName());
330: }
331: }
332: }
333: }
334:
335: List<Lock> lockList = new ArrayList<Lock>(identifierSet.size());
336:
337: if (!identifierSet.isEmpty()) {
338: LockManager lockManager = this .cluster.getLockManager();
339:
340: for (String identifier : identifierSet) {
341: lockList.add(lockManager.writeLock(identifier));
342: }
343: }
344:
345: return lockList;
346: }
347:
348: /**
349: * @see net.sf.hajdbc.sql.AbstractChildInvocationHandler#close(java.lang.Object, java.lang.Object)
350: */
351: @Override
352: protected void close(Connection connection, S statement)
353: throws SQLException {
354: statement.close();
355: }
356:
357: /**
358: * @see net.sf.hajdbc.sql.AbstractInvocationHandler#record(java.lang.reflect.Method, net.sf.hajdbc.sql.Invoker)
359: */
360: @Override
361: protected void record(Method method, Invoker<D, S, ?> invoker) {
362: if (this .isRecordable(method)) {
363: synchronized (this .invokerList) {
364: this .invokerList.add(invoker);
365: }
366: } else if (method.equals(clearBatchMethod)
367: || method.equals(executeBatchMethod)) {
368: synchronized (this .invokerList) {
369: this .invokerList.clear();
370: }
371: } else {
372: super .record(method, invoker);
373: }
374: }
375:
376: protected boolean isRecordable(Method method) {
377: return method.equals(addBatchMethod);
378: }
379:
380: /**
381: * @see net.sf.hajdbc.sql.AbstractInvocationHandler#replay(net.sf.hajdbc.Database, java.lang.Object)
382: */
383: @Override
384: protected void replay(Database<D> database, S statement)
385: throws Exception {
386: super .replay(database, statement);
387:
388: synchronized (this .invokerList) {
389: for (Invoker<D, S, ?> invoker : this.invokerList) {
390: invoker.invoke(database, statement);
391: }
392: }
393: }
394: }
|