001: /*
002: * HA-JDBC: High-Availability JDBC
003: * Copyright (c) 2004-2007 Paul Ferraro
004: *
005: * This library is free software; you can redistribute it and/or modify it
006: * under the terms of the GNU Lesser General Public License as published by the
007: * Free Software Foundation; either version 2.1 of the License, or (at your
008: * option) any later version.
009: *
010: * This library is distributed in the hope that it will be useful, but WITHOUT
011: * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
012: * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
013: * for more details.
014: *
015: * You should have received a copy of the GNU Lesser General Public License
016: * along with this library; if not, write to the Free Software Foundation,
017: * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
018: *
019: * Contact: ferraro@users.sourceforge.net
020: */
021: package net.sf.hajdbc.sql;
022:
023: import java.lang.reflect.InvocationHandler;
024: import java.lang.reflect.Method;
025: import java.sql.SQLException;
026: import java.util.ArrayList;
027: import java.util.Arrays;
028: import java.util.HashMap;
029: import java.util.Iterator;
030: import java.util.LinkedList;
031: import java.util.List;
032: import java.util.Map;
033: import java.util.Set;
034: import java.util.SortedMap;
035:
036: import net.sf.hajdbc.Database;
037: import net.sf.hajdbc.DatabaseCluster;
038: import net.sf.hajdbc.Messages;
039: import net.sf.hajdbc.util.SQLExceptionFactory;
040: import net.sf.hajdbc.util.reflect.Methods;
041:
042: import org.slf4j.Logger;
043: import org.slf4j.LoggerFactory;
044:
045: /**
046: * @author Paul Ferraro
047: * @param <D>
048: * @param <T>
049: */
050: @SuppressWarnings("nls")
051: public abstract class AbstractInvocationHandler<D, T> implements
052: InvocationHandler, SQLProxy<D, T> {
053: private static final Method equalsMethod = Methods.getMethod(
054: Object.class, "equals", Object.class);
055: private static final Method hashCodeMethod = Methods.getMethod(
056: Object.class, "hashCode");
057: private static final Method toStringMethod = Methods.getMethod(
058: Object.class, "toString");
059: private static final Method isWrapperForMethod = Methods
060: .findMethod("java.sql.Wrapper", "isWrapperFor", Class.class);
061: private static final Method unwrapMethod = Methods.findMethod(
062: "java.sql.Wrapper", "unwrap", Class.class);
063:
064: protected Logger logger = LoggerFactory.getLogger(this .getClass());
065:
066: protected DatabaseCluster<D> cluster;
067: private Class<T> proxyClass;
068: private Map<Database<D>, T> objectMap;
069: private List<SQLProxy<D, ?>> childList = new LinkedList<SQLProxy<D, ?>>();
070: private Map<Method, Invoker<D, T, ?>> invokerMap = new HashMap<Method, Invoker<D, T, ?>>();
071:
072: /**
073: * @param cluster the database cluster
074: * @param proxyClass the interface being proxied
075: * @param objectMap a map of database to sql object.
076: */
077: protected AbstractInvocationHandler(DatabaseCluster<D> cluster,
078: Class<T> proxyClass, Map<Database<D>, T> objectMap) {
079: this .cluster = cluster;
080: this .proxyClass = proxyClass;
081: this .objectMap = objectMap;
082: }
083:
084: /**
085: * @see java.lang.reflect.InvocationHandler#invoke(java.lang.Object, java.lang.reflect.Method, java.lang.Object[])
086: */
087: @SuppressWarnings("unchecked")
088: @Override
089: public final Object invoke(Object object, Method method,
090: Object[] parameters) throws Exception {
091: if (method.getName().equals("toString"))
092: return "";
093:
094: if (!this .cluster.isActive()) {
095: throw new SQLException(Messages.getMessage(
096: Messages.CLUSTER_NOT_ACTIVE, this .cluster));
097: }
098:
099: T proxy = this .proxyClass.cast(object);
100:
101: InvocationStrategy strategy = this .getInvocationStrategy(proxy,
102: method, parameters);
103: Invoker invoker = this .getInvoker(proxy, method, parameters);
104:
105: Object result = strategy.invoke(this , invoker);
106:
107: this .record(method, invoker);
108:
109: this .postInvoke(proxy, method, parameters);
110:
111: return result;
112: }
113:
114: /**
115: * Returns the appropriate {@link InvocationStrategy} for the specified method.
116: * This implementation detects {@link java.sql.Wrapper} methods; and {@link Object#equals}, {@link Object#hashCode()}, and {@link Object#toString()}.
117: * Default invocation strategy is {@link DatabaseWriteInvocationStrategy}.
118: * @param object the proxied object
119: * @param method the method to invoke
120: * @param parameters the method invocation parameters
121: * @return an invocation strategy
122: * @throws Exception
123: */
124: protected InvocationStrategy<D, T, ?> getInvocationStrategy(
125: final T object, Method method, final Object[] parameters)
126: throws Exception {
127: // Most Java 1.6 sql classes implement java.sql.Wrapper
128: if (((isWrapperForMethod != null) && method
129: .equals(isWrapperForMethod))
130: || ((unwrapMethod != null) && method
131: .equals(unwrapMethod))) {
132: return new DriverReadInvocationStrategy<D, T, Object>();
133: }
134:
135: if (method.equals(equalsMethod)) {
136: return new InvocationStrategy<D, T, Boolean>() {
137: public Boolean invoke(SQLProxy<D, T> proxy,
138: Invoker<D, T, Boolean> invoker) {
139: return object == parameters[0];
140: }
141: };
142: }
143:
144: if (method.equals(hashCodeMethod)
145: || method.equals(toStringMethod)) {
146: return new DriverReadInvocationStrategy<D, T, Object>();
147: }
148:
149: return new DatabaseWriteInvocationStrategy<D, T, Object>(
150: this .cluster.getNonTransactionalExecutor());
151: }
152:
153: /**
154: * Return the appropriate invoker for the specified method.
155: * @param object
156: * @param method
157: * @param parameters
158: * @return an invoker
159: * @throws Exception
160: */
161: protected Invoker<D, T, ?> getInvoker(T object, Method method,
162: Object[] parameters) throws Exception {
163: if (this .isSQLMethod(method)) {
164: List<Object> parameterList = new ArrayList<Object>(Arrays
165: .asList(parameters));
166:
167: long now = System.currentTimeMillis();
168:
169: if (this .cluster.isCurrentTimestampEvaluationEnabled()) {
170: parameterList.set(0, this .cluster.getDialect()
171: .evaluateCurrentTimestamp(
172: (String) parameterList.get(0),
173: new java.sql.Timestamp(now)));
174: }
175:
176: if (this .cluster.isCurrentDateEvaluationEnabled()) {
177: parameterList.set(0, this .cluster.getDialect()
178: .evaluateCurrentDate(
179: (String) parameterList.get(0),
180: new java.sql.Date(now)));
181: }
182:
183: if (this .cluster.isCurrentTimeEvaluationEnabled()) {
184: parameterList.set(0, this .cluster.getDialect()
185: .evaluateCurrentTime(
186: (String) parameterList.get(0),
187: new java.sql.Time(now)));
188: }
189:
190: if (this .cluster.isRandEvaluationEnabled()) {
191: parameterList.set(0, this .cluster.getDialect()
192: .evaluateRand((String) parameterList.get(0)));
193: }
194:
195: return new SimpleInvoker(method, parameterList.toArray());
196: }
197:
198: return new SimpleInvoker(method, parameters);
199: }
200:
201: /**
202: * Indicates whether or not the specified method accepts a SQL string as its first parameter.
203: * @param method a method
204: * @return true, if the specified method accepts a SQL string as its first parameter, false otherwise.
205: */
206: protected boolean isSQLMethod(Method method) {
207: return false;
208: }
209:
210: /**
211: * Called after method is invoked.
212: * @param proxy the proxied object
213: * @param method the method that was just invoked
214: * @param parameters the parameters of the method that was just invoked
215: */
216: protected void postInvoke(T proxy, Method method,
217: Object[] parameters) {
218: // Do nothing
219: }
220:
221: /**
222: * @see net.sf.hajdbc.sql.SQLProxy#entry()
223: */
224: @Override
225: public Map.Entry<Database<D>, T> entry() {
226: synchronized (this .objectMap) {
227: return this .objectMap.entrySet().iterator().next();
228: }
229: }
230:
231: /**
232: * @see net.sf.hajdbc.sql.SQLProxy#entries()
233: */
234: @Override
235: public Set<Map.Entry<Database<D>, T>> entries() {
236: synchronized (this .objectMap) {
237: return this .objectMap.entrySet();
238: }
239: }
240:
241: /**
242: * @see net.sf.hajdbc.sql.SQLProxy#addChild(net.sf.hajdbc.sql.SQLProxy)
243: */
244: @Override
245: public final void addChild(SQLProxy<D, ?> child) {
246: synchronized (this .childList) {
247: this .childList.add(child);
248: }
249: }
250:
251: /**
252: * @see net.sf.hajdbc.sql.SQLProxy#removeChildren()
253: */
254: @Override
255: public final void removeChildren() {
256: synchronized (this .childList) {
257: this .childList.clear();
258: }
259: }
260:
261: /**
262: * @see net.sf.hajdbc.sql.SQLProxy#removeChild(net.sf.hajdbc.sql.SQLProxy)
263: */
264: @Override
265: public final void removeChild(SQLProxy<D, ?> child) {
266: synchronized (this .childList) {
267: child.removeChildren();
268:
269: this .childList.remove(child);
270: }
271: }
272:
273: /**
274: * Returns the underlying SQL object for the specified database.
275: * If the sql object does not exist (this might be the case if the database was newly activated), it will be created from the stored operation.
276: * Any recorded operations are also executed. If the object could not be created, or if any of the executed operations failed, then the specified database is deactivated.
277: * @param database a database descriptor.
278: * @return an underlying SQL object
279: */
280: @Override
281: public final T getObject(Database<D> database) {
282: synchronized (this .objectMap) {
283: T object = this .objectMap.get(database);
284:
285: if (object == null) {
286: try {
287: object = this .createObject(database);
288:
289: this .replay(database, object);
290:
291: this .objectMap.put(database, object);
292: } catch (Exception e) {
293: if (!this .objectMap.isEmpty()
294: && this .cluster.deactivate(database,
295: this .cluster.getStateManager())) {
296: this .logger
297: .warn(
298: Messages
299: .getMessage(
300: Messages.SQL_OBJECT_INIT_FAILED,
301: this
302: .getClass()
303: .getName(),
304: database), e);
305: }
306: }
307: }
308:
309: return object;
310: }
311: }
312:
313: protected abstract T createObject(Database<D> database)
314: throws Exception;
315:
316: protected void record(Method method, Invoker<D, T, ?> invoker) {
317: // Record only the last invocation of a given set*(...) method
318: if (this .isSetMethod(method)
319: && (method.getParameterTypes().length == 1)) {
320: synchronized (this .invokerMap) {
321: this .invokerMap.put(method, invoker);
322: }
323: }
324: }
325:
326: @SuppressWarnings("nls")
327: protected boolean isSetMethod(Method method) {
328: return method.getName().startsWith("set")
329: && (method.getParameterTypes() != null);
330: }
331:
332: protected void replay(Database<D> database, T object)
333: throws Exception {
334: synchronized (this .invokerMap) {
335: for (Invoker<D, T, ?> invoker : this .invokerMap.values()) {
336: invoker.invoke(database, object);
337: }
338: }
339: }
340:
341: /**
342: * @see net.sf.hajdbc.sql.SQLProxy#retain(java.util.Set)
343: */
344: @Override
345: public final void retain(Set<Database<D>> databaseSet) {
346: synchronized (this .childList) {
347: for (SQLProxy<D, ?> child : this .childList) {
348: child.retain(databaseSet);
349: }
350: }
351:
352: synchronized (this .objectMap) {
353: Iterator<Map.Entry<Database<D>, T>> mapEntries = this .objectMap
354: .entrySet().iterator();
355:
356: while (mapEntries.hasNext()) {
357: Map.Entry<Database<D>, T> mapEntry = mapEntries.next();
358:
359: Database<D> database = mapEntry.getKey();
360:
361: if (!databaseSet.contains(database)) {
362: T object = mapEntry.getValue();
363:
364: if (object != null) {
365: this .close(database, object);
366: }
367:
368: mapEntries.remove();
369: }
370: }
371: }
372: }
373:
374: protected abstract void close(Database<D> database, T object);
375:
376: /**
377: * @see net.sf.hajdbc.sql.SQLProxy#getDatabaseCluster()
378: */
379: @Override
380: public final DatabaseCluster<D> getDatabaseCluster() {
381: return this .cluster;
382: }
383:
384: /**
385: * @see net.sf.hajdbc.sql.SQLProxy#handleFailure(net.sf.hajdbc.Database, java.lang.Exception)
386: */
387: @Override
388: public void handleFailure(Database<D> database, Exception exception)
389: throws Exception {
390: Set<Database<D>> databaseSet = this .cluster.getBalancer().all();
391:
392: // If cluster has only one database left, don't deactivate
393: if (databaseSet.size() <= 1) {
394: throw exception;
395: }
396:
397: Map<Boolean, List<Database<D>>> aliveMap = this .cluster
398: .getAliveMap(databaseSet);
399:
400: this .detectClusterPanic(aliveMap);
401:
402: List<Database<D>> aliveList = aliveMap.get(true);
403:
404: // If all are dead, assume the worst and throw caught exception
405: // If failed database is alive, then throw caught exception
406: if (aliveList.isEmpty() || aliveList.contains(database)) {
407: throw exception;
408: }
409:
410: // Otherwise deactivate failed database
411: if (this .cluster.deactivate(database, this .cluster
412: .getStateManager())) {
413: this .logger.error(Messages.getMessage(
414: Messages.DATABASE_DEACTIVATED, database, this ),
415: exception);
416: }
417: }
418:
419: /**
420: * @see net.sf.hajdbc.sql.SQLProxy#handleFailures(java.util.SortedMap)
421: */
422: @Override
423: public void handleFailures(
424: SortedMap<Database<D>, Exception> exceptionMap)
425: throws Exception {
426: if (exceptionMap.size() == 1) {
427: throw exceptionMap.get(exceptionMap.firstKey());
428: }
429:
430: Map<Boolean, List<Database<D>>> aliveMap = this .cluster
431: .getAliveMap(exceptionMap.keySet());
432:
433: this .detectClusterPanic(aliveMap);
434:
435: List<Database<D>> aliveList = aliveMap.get(true);
436: List<Database<D>> deadList = aliveMap.get(false);
437:
438: if (!aliveList.isEmpty()) {
439: for (Database<D> database : deadList) {
440: if (this .cluster.deactivate(database, this .cluster
441: .getStateManager())) {
442: this .logger.error(Messages.getMessage(
443: Messages.DATABASE_DEACTIVATED, database,
444: this .cluster), exceptionMap.get(database));
445: }
446: }
447: }
448:
449: List<Database<D>> list = aliveList.isEmpty() ? deadList
450: : aliveList;
451:
452: SQLException exception = SQLExceptionFactory
453: .createSQLException(exceptionMap.get(list.get(0)));
454:
455: for (Database<D> database : list.subList(1, list.size())) {
456: exception.setNextException(SQLExceptionFactory
457: .createSQLException(exceptionMap.get(database)));
458: }
459:
460: throw exception;
461: }
462:
463: /**
464: * @see net.sf.hajdbc.sql.SQLProxy#handlePartialFailure(java.util.SortedMap, java.util.SortedMap)
465: */
466: @Override
467: public <R> SortedMap<Database<D>, R> handlePartialFailure(
468: SortedMap<Database<D>, R> resultMap,
469: SortedMap<Database<D>, Exception> exceptionMap)
470: throws Exception {
471: Map<Boolean, List<Database<D>>> aliveMap = this .cluster
472: .getAliveMap(exceptionMap.keySet());
473:
474: // Assume success databases are alive
475: aliveMap.get(true).addAll(resultMap.keySet());
476:
477: this .detectClusterPanic(aliveMap);
478:
479: for (Map.Entry<Database<D>, Exception> exceptionMapEntry : exceptionMap
480: .entrySet()) {
481: Database<D> database = exceptionMapEntry.getKey();
482: Exception exception = exceptionMapEntry.getValue();
483:
484: if (this .cluster.deactivate(database, this .cluster
485: .getStateManager())) {
486: this .logger.error(Messages.getMessage(
487: Messages.DATABASE_DEACTIVATED, database,
488: this .cluster), exception);
489: }
490: }
491:
492: return resultMap;
493: }
494:
495: /**
496: * Detect cluster panic if all conditions are met:
497: * <ul>
498: * <li>We're in distributable mode</li>
499: * <li>We're the only group member</li>
500: * <li>All alive databases are local</li>
501: * <li>All dead databases are remote</li>
502: * </ul>
503: * @param aliveMap
504: * @throws Exception
505: */
506: protected void detectClusterPanic(
507: Map<Boolean, List<Database<D>>> aliveMap) throws Exception {
508: if (this .cluster.getStateManager().isMembershipEmpty()) {
509: List<Database<D>> aliveList = aliveMap.get(true);
510: List<Database<D>> deadList = aliveMap.get(false);
511:
512: if (!aliveList.isEmpty() && !deadList.isEmpty()
513: && sameProximity(aliveList, true)
514: && sameProximity(deadList, false)) {
515: this .cluster.stop();
516:
517: String message = Messages.getMessage(
518: Messages.CLUSTER_PANIC_DETECTED, this .cluster);
519:
520: this .logger.error(message);
521:
522: throw new SQLException(message);
523: }
524: }
525: }
526:
527: private boolean sameProximity(List<Database<D>> databaseList,
528: boolean local) {
529: boolean same = true;
530:
531: for (Database<D> database : databaseList) {
532: same &= (database.isLocal() == local);
533: }
534:
535: return same;
536: }
537:
538: protected class SimpleInvoker implements Invoker<D, T, Object> {
539: private Method method;
540: private Object[] parameters;
541:
542: /**
543: * @param method
544: * @param parameters
545: */
546: public SimpleInvoker(Method method, Object[] parameters) {
547: this .method = method;
548: this .parameters = parameters;
549: }
550:
551: /**
552: * @see net.sf.hajdbc.sql.Invoker#invoke(net.sf.hajdbc.Database, java.lang.Object)
553: */
554: @Override
555: public Object invoke(Database<D> database, T object)
556: throws Exception {
557: return Methods.invoke(this.method, object, this.parameters);
558: }
559: }
560: }
|