0001: /*
0002: * HA-JDBC: High-Availability JDBC
0003: * Copyright (c) 2004-2007 Paul Ferraro
0004: *
0005: * This library is free software; you can redistribute it and/or modify it
0006: * under the terms of the GNU Lesser General Public License as published by the
0007: * Free Software Foundation; either version 2.1 of the License, or (at your
0008: * option) any later version.
0009: *
0010: * This library is distributed in the hope that it will be useful, but WITHOUT
0011: * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
0012: * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
0013: * for more details.
0014: *
0015: * You should have received a copy of the GNU Lesser General Public License
0016: * along with this library; if not, write to the Free Software Foundation,
0017: * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0018: *
0019: * Contact: ferraro@users.sourceforge.net
0020: */
0021: package net.sf.hajdbc.sql;
0022:
0023: import java.io.File;
0024: import java.io.FileInputStream;
0025: import java.io.FileOutputStream;
0026: import java.io.FileWriter;
0027: import java.io.IOException;
0028: import java.io.InputStream;
0029: import java.net.URL;
0030: import java.nio.channels.Channels;
0031: import java.nio.channels.FileChannel;
0032: import java.nio.channels.WritableByteChannel;
0033: import java.sql.Connection;
0034: import java.sql.SQLException;
0035: import java.sql.Statement;
0036: import java.util.ArrayList;
0037: import java.util.Collection;
0038: import java.util.HashMap;
0039: import java.util.Iterator;
0040: import java.util.List;
0041: import java.util.Map;
0042: import java.util.NoSuchElementException;
0043: import java.util.Set;
0044: import java.util.TreeMap;
0045: import java.util.TreeSet;
0046: import java.util.concurrent.Callable;
0047: import java.util.concurrent.ExecutionException;
0048: import java.util.concurrent.ExecutorService;
0049: import java.util.concurrent.Future;
0050: import java.util.concurrent.SynchronousQueue;
0051: import java.util.concurrent.ThreadPoolExecutor;
0052: import java.util.concurrent.TimeUnit;
0053: import java.util.concurrent.locks.Lock;
0054:
0055: import javax.management.DynamicMBean;
0056: import javax.management.JMException;
0057: import javax.management.MBeanRegistration;
0058: import javax.management.MBeanServer;
0059: import javax.management.ObjectName;
0060:
0061: import net.sf.hajdbc.Balancer;
0062: import net.sf.hajdbc.Database;
0063: import net.sf.hajdbc.DatabaseCluster;
0064: import net.sf.hajdbc.DatabaseClusterDecorator;
0065: import net.sf.hajdbc.DatabaseClusterFactory;
0066: import net.sf.hajdbc.DatabaseClusterMBean;
0067: import net.sf.hajdbc.DatabaseMetaDataCache;
0068: import net.sf.hajdbc.Dialect;
0069: import net.sf.hajdbc.LockManager;
0070: import net.sf.hajdbc.Messages;
0071: import net.sf.hajdbc.StateManager;
0072: import net.sf.hajdbc.SynchronizationContext;
0073: import net.sf.hajdbc.SynchronizationStrategy;
0074: import net.sf.hajdbc.local.LocalLockManager;
0075: import net.sf.hajdbc.local.LocalStateManager;
0076: import net.sf.hajdbc.sync.SynchronizationContextImpl;
0077: import net.sf.hajdbc.sync.SynchronizationStrategyBuilder;
0078: import net.sf.hajdbc.util.concurrent.CronThreadPoolExecutor;
0079: import net.sf.hajdbc.util.concurrent.SynchronousExecutor;
0080:
0081: import org.jibx.runtime.BindingDirectory;
0082: import org.jibx.runtime.IMarshallingContext;
0083: import org.jibx.runtime.IUnmarshallingContext;
0084: import org.jibx.runtime.JiBXException;
0085: import org.quartz.CronExpression;
0086: import org.slf4j.Logger;
0087: import org.slf4j.LoggerFactory;
0088:
0089: /**
0090: * @author Paul Ferraro
0091: * @param <D> either java.sql.Driver or javax.sql.DataSource
0092: * @since 1.0
0093: */
0094: public abstract class AbstractDatabaseCluster<D> implements
0095: DatabaseCluster<D>, DatabaseClusterMBean, MBeanRegistration {
0096: static Logger logger = LoggerFactory
0097: .getLogger(AbstractDatabaseCluster.class);
0098:
0099: private String id;
0100: private Balancer<D> balancer;
0101: private Dialect dialect;
0102: private DatabaseMetaDataCache databaseMetaDataCache;
0103: private String defaultSynchronizationStrategyId;
0104: private CronExpression failureDetectionExpression;
0105: private CronExpression autoActivationExpression;
0106: private int minThreads;
0107: private int maxThreads;
0108: private int maxIdle;
0109: private TransactionMode transactionMode;
0110: private boolean identityColumnDetectionEnabled;
0111: private boolean sequenceDetectionEnabled;
0112: private boolean currentDateEvaluationEnabled;
0113: private boolean currentTimeEvaluationEnabled;
0114: private boolean currentTimestampEvaluationEnabled;
0115: private boolean randEvaluationEnabled;
0116:
0117: private MBeanServer server;
0118: private URL url;
0119: private Map<String, SynchronizationStrategy> synchronizationStrategyMap = new HashMap<String, SynchronizationStrategy>();
0120: private DatabaseClusterDecorator decorator;
0121: private Map<String, Database<D>> databaseMap = new HashMap<String, Database<D>>();
0122: private ExecutorService transactionalExecutor;
0123: private ExecutorService nonTransactionalExecutor;
0124: private CronThreadPoolExecutor cronExecutor = new CronThreadPoolExecutor(
0125: 2);
0126: private LockManager lockManager = new LocalLockManager();
0127: private StateManager stateManager = new LocalStateManager(this );
0128: private volatile boolean active = false;
0129:
0130: protected AbstractDatabaseCluster(String id, URL url) {
0131: this .id = id;
0132: this .url = url;
0133: }
0134:
0135: /**
0136: * @see net.sf.hajdbc.DatabaseCluster#getAliveMap(java.util.Collection)
0137: */
0138: @Override
0139: public Map<Boolean, List<Database<D>>> getAliveMap(
0140: Collection<Database<D>> databases) {
0141: Map<Database<D>, Future<Boolean>> futureMap = new TreeMap<Database<D>, Future<Boolean>>();
0142:
0143: for (final Database<D> database : databases) {
0144: Callable<Boolean> task = new Callable<Boolean>() {
0145: public Boolean call() throws Exception {
0146: return AbstractDatabaseCluster.this
0147: .isAlive(database);
0148: }
0149: };
0150:
0151: futureMap.put(database, this .nonTransactionalExecutor
0152: .submit(task));
0153: }
0154:
0155: Map<Boolean, List<Database<D>>> map = new TreeMap<Boolean, List<Database<D>>>();
0156:
0157: int size = databases.size();
0158:
0159: map.put(false, new ArrayList<Database<D>>(size));
0160: map.put(true, new ArrayList<Database<D>>(size));
0161:
0162: for (Map.Entry<Database<D>, Future<Boolean>> futureMapEntry : futureMap
0163: .entrySet()) {
0164: try {
0165: map.get(futureMapEntry.getValue().get()).add(
0166: futureMapEntry.getKey());
0167: } catch (ExecutionException e) {
0168: // isAlive does not throw an exception
0169: throw new IllegalStateException(e);
0170: } catch (InterruptedException e) {
0171: Thread.currentThread().interrupt();
0172: }
0173: }
0174:
0175: return map;
0176: }
0177:
0178: boolean isAlive(Database<D> database) {
0179: try {
0180: this .test(database);
0181:
0182: return true;
0183: } catch (SQLException e) {
0184: logger.warn(Messages.getMessage(
0185: Messages.DATABASE_NOT_ALIVE, database, this ), e);
0186:
0187: return false;
0188: }
0189: }
0190:
0191: private void test(Database<D> database) throws SQLException {
0192: Connection connection = null;
0193:
0194: try {
0195: connection = database.connect(database
0196: .createConnectionFactory());
0197:
0198: Statement statement = connection.createStatement();
0199:
0200: statement.execute(this .dialect.getSimpleSQL());
0201:
0202: statement.close();
0203: } finally {
0204: if (connection != null) {
0205: try {
0206: connection.close();
0207: } catch (SQLException e) {
0208: logger.warn(e.toString(), e);
0209: }
0210: }
0211: }
0212: }
0213:
0214: /**
0215: * @see net.sf.hajdbc.DatabaseCluster#deactivate(net.sf.hajdbc.Database, net.sf.hajdbc.StateManager)
0216: */
0217: @Override
0218: public boolean deactivate(Database<D> database,
0219: StateManager stateManager) {
0220: synchronized (this .balancer) {
0221: this .unregister(database);
0222: // Reregister database mbean using "inactive" interface
0223: this .register(database, database.getInactiveMBean());
0224:
0225: boolean removed = this .balancer.remove(database);
0226:
0227: if (removed) {
0228: stateManager.remove(database.getId());
0229: }
0230:
0231: return removed;
0232: }
0233: }
0234:
0235: /**
0236: * @see net.sf.hajdbc.DatabaseCluster#getId()
0237: */
0238: @Override
0239: public String getId() {
0240: return this .id;
0241: }
0242:
0243: /**
0244: * @see net.sf.hajdbc.DatabaseClusterMBean#getVersion()
0245: */
0246: @Override
0247: public String getVersion() {
0248: return DatabaseClusterFactory.getVersion();
0249: }
0250:
0251: /**
0252: * @see net.sf.hajdbc.DatabaseCluster#activate(net.sf.hajdbc.Database, net.sf.hajdbc.StateManager)
0253: */
0254: @Override
0255: public boolean activate(Database<D> database,
0256: StateManager stateManager) {
0257: synchronized (this .balancer) {
0258: this .unregister(database);
0259: // Reregister database mbean using "active" interface
0260: this .register(database, database.getActiveMBean());
0261:
0262: if (database.isDirty()) {
0263: this .export();
0264:
0265: database.clean();
0266: }
0267:
0268: boolean added = this .balancer.add(database);
0269:
0270: if (added) {
0271: stateManager.add(database.getId());
0272: }
0273:
0274: return added;
0275: }
0276: }
0277:
0278: /**
0279: * @see net.sf.hajdbc.DatabaseClusterMBean#getActiveDatabases()
0280: */
0281: @Override
0282: public Set<String> getActiveDatabases() {
0283: Set<String> databaseSet = new TreeSet<String>();
0284:
0285: for (Database<D> database : this .balancer.all()) {
0286: databaseSet.add(database.getId());
0287: }
0288:
0289: return databaseSet;
0290: }
0291:
0292: /**
0293: * @see net.sf.hajdbc.DatabaseClusterMBean#getInactiveDatabases()
0294: */
0295: @Override
0296: public Set<String> getInactiveDatabases() {
0297: synchronized (this .databaseMap) {
0298: Set<String> databaseSet = new TreeSet<String>(
0299: this .databaseMap.keySet());
0300:
0301: for (Database<D> database : this .balancer.all()) {
0302: databaseSet.remove(database.getId());
0303: }
0304:
0305: return databaseSet;
0306: }
0307: }
0308:
0309: /**
0310: * @see net.sf.hajdbc.DatabaseCluster#getDatabase(java.lang.String)
0311: */
0312: @Override
0313: public Database<D> getDatabase(String id) {
0314: synchronized (this .databaseMap) {
0315: Database<D> database = this .databaseMap.get(id);
0316:
0317: if (database == null) {
0318: throw new IllegalArgumentException(Messages.getMessage(
0319: Messages.INVALID_DATABASE, id, this ));
0320: }
0321:
0322: return database;
0323: }
0324: }
0325:
0326: /**
0327: * @see net.sf.hajdbc.DatabaseClusterMBean#getDefaultSynchronizationStrategy()
0328: */
0329: @Override
0330: public String getDefaultSynchronizationStrategy() {
0331: return this .defaultSynchronizationStrategyId;
0332: }
0333:
0334: /**
0335: * @see net.sf.hajdbc.DatabaseClusterMBean#getSynchronizationStrategies()
0336: */
0337: @Override
0338: public Set<String> getSynchronizationStrategies() {
0339: return new TreeSet<String>(this .synchronizationStrategyMap
0340: .keySet());
0341: }
0342:
0343: /**
0344: * @see net.sf.hajdbc.DatabaseCluster#getBalancer()
0345: */
0346: @Override
0347: public Balancer<D> getBalancer() {
0348: return this .balancer;
0349: }
0350:
0351: /**
0352: * @see net.sf.hajdbc.DatabaseCluster#getTransactionalExecutor()
0353: */
0354: @Override
0355: public ExecutorService getTransactionalExecutor() {
0356: return this .transactionalExecutor;
0357: }
0358:
0359: /**
0360: * @see net.sf.hajdbc.DatabaseCluster#getNonTransactionalExecutor()
0361: */
0362: @Override
0363: public ExecutorService getNonTransactionalExecutor() {
0364: return this .nonTransactionalExecutor;
0365: }
0366:
0367: /**
0368: * @see net.sf.hajdbc.DatabaseCluster#getDialect()
0369: */
0370: @Override
0371: public Dialect getDialect() {
0372: return this .dialect;
0373: }
0374:
0375: /**
0376: * @see net.sf.hajdbc.DatabaseCluster#getDatabaseMetaDataCache()
0377: */
0378: @Override
0379: public DatabaseMetaDataCache getDatabaseMetaDataCache() {
0380: return this .databaseMetaDataCache;
0381: }
0382:
0383: /**
0384: * @see net.sf.hajdbc.DatabaseCluster#getLockManager()
0385: */
0386: @Override
0387: public LockManager getLockManager() {
0388: return this .lockManager;
0389: }
0390:
0391: /**
0392: * @see net.sf.hajdbc.DatabaseClusterMBean#isAlive(java.lang.String)
0393: */
0394: @Override
0395: public boolean isAlive(String id) {
0396: return this .isAlive(this .getDatabase(id));
0397: }
0398:
0399: /**
0400: * @see net.sf.hajdbc.DatabaseClusterMBean#deactivate(java.lang.String)
0401: */
0402: @Override
0403: public void deactivate(String databaseId) {
0404: if (this .deactivate(this .getDatabase(databaseId),
0405: this .stateManager)) {
0406: logger.info(Messages.getMessage(
0407: Messages.DATABASE_DEACTIVATED, databaseId, this ));
0408: }
0409: }
0410:
0411: /**
0412: * @see net.sf.hajdbc.DatabaseClusterMBean#activate(java.lang.String)
0413: */
0414: @Override
0415: public void activate(String databaseId) {
0416: this .activate(databaseId, this
0417: .getDefaultSynchronizationStrategy());
0418: }
0419:
0420: /**
0421: * @see net.sf.hajdbc.DatabaseClusterMBean#activate(java.lang.String, java.lang.String)
0422: */
0423: @Override
0424: public void activate(String databaseId, String strategyId) {
0425: SynchronizationStrategy strategy = this .synchronizationStrategyMap
0426: .get(strategyId);
0427:
0428: if (strategy == null) {
0429: throw new IllegalArgumentException(Messages.getMessage(
0430: Messages.INVALID_SYNC_STRATEGY, strategyId));
0431: }
0432:
0433: try {
0434: if (this .activate(this .getDatabase(databaseId), strategy)) {
0435: logger.info(Messages.getMessage(
0436: Messages.DATABASE_ACTIVATED, databaseId, this ));
0437: }
0438: } catch (SQLException e) {
0439: logger.warn(Messages
0440: .getMessage(Messages.DATABASE_ACTIVATE_FAILED,
0441: databaseId, this ), e);
0442:
0443: SQLException exception = e.getNextException();
0444:
0445: while (exception != null) {
0446: logger.error(exception.getMessage(), e);
0447:
0448: exception = exception.getNextException();
0449: }
0450:
0451: throw new IllegalStateException(e.toString());
0452: } catch (InterruptedException e) {
0453: logger.warn(e.toString(), e);
0454:
0455: Thread.currentThread().interrupt();
0456: }
0457: }
0458:
0459: protected void register(Database<D> database, DynamicMBean mbean) {
0460: try {
0461: ObjectName name = DatabaseClusterFactory.getObjectName(
0462: this .id, database.getId());
0463:
0464: this .server.registerMBean(mbean, name);
0465: } catch (JMException e) {
0466: logger.error(e.toString(), e);
0467:
0468: throw new IllegalStateException(e);
0469: }
0470: }
0471:
0472: /**
0473: * @see net.sf.hajdbc.DatabaseClusterMBean#remove(java.lang.String)
0474: */
0475: @Override
0476: public void remove(String id) {
0477: synchronized (this .databaseMap) {
0478: Database<D> database = this .getDatabase(id);
0479:
0480: if (this .balancer.all().contains(database)) {
0481: throw new IllegalStateException(Messages.getMessage(
0482: Messages.DATABASE_STILL_ACTIVE, id, this ));
0483: }
0484:
0485: this .unregister(database);
0486:
0487: this .databaseMap.remove(id);
0488:
0489: this .export();
0490: }
0491: }
0492:
0493: private void unregister(Database<D> database) {
0494: try {
0495: ObjectName name = DatabaseClusterFactory.getObjectName(
0496: this .id, database.getId());
0497:
0498: if (this .server.isRegistered(name)) {
0499: this .server.unregisterMBean(name);
0500: }
0501: } catch (JMException e) {
0502: logger.error(e.toString(), e);
0503:
0504: throw new IllegalStateException(e);
0505: }
0506: }
0507:
0508: /**
0509: * @see net.sf.hajdbc.DatabaseCluster#isActive()
0510: */
0511: @Override
0512: public boolean isActive() {
0513: return this .active;
0514: }
0515:
0516: /**
0517: * Starts this database cluster
0518: * @throws Exception if database cluster start fails
0519: */
0520: public synchronized void start() throws Exception {
0521: if (this .active)
0522: return;
0523:
0524: this .lockManager.start();
0525: this .stateManager.start();
0526:
0527: this .nonTransactionalExecutor = new ThreadPoolExecutor(
0528: this .minThreads, this .maxThreads, this .maxIdle,
0529: TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
0530: new ThreadPoolExecutor.CallerRunsPolicy());
0531:
0532: this .transactionalExecutor = this .transactionMode
0533: .equals(TransactionMode.SERIAL) ? new SynchronousExecutor()
0534: : this .nonTransactionalExecutor;
0535:
0536: Set<String> databaseSet = this .stateManager.getInitialState();
0537:
0538: if (databaseSet != null) {
0539: for (String databaseId : databaseSet) {
0540: Database<D> database = this .getDatabase(databaseId);
0541:
0542: if (database != null) {
0543: this .activate(database, this .stateManager);
0544: }
0545: }
0546: } else {
0547: for (Database<D> database : this .getAliveMap(
0548: this .databaseMap.values()).get(true)) {
0549: this .activate(database, this .stateManager);
0550: }
0551: }
0552:
0553: this .databaseMetaDataCache.setDialect(this .dialect);
0554:
0555: try {
0556: this .flushMetaDataCache();
0557: } catch (IllegalStateException e) {
0558: // Ignore - cache will initialize lazily.
0559: }
0560:
0561: if (this .failureDetectionExpression != null) {
0562: this .cronExecutor.schedule(new FailureDetectionTask(),
0563: this .failureDetectionExpression);
0564: }
0565:
0566: if (this .autoActivationExpression != null) {
0567: this .cronExecutor.schedule(new AutoActivationTask(),
0568: this .autoActivationExpression);
0569: }
0570:
0571: this .active = true;
0572: }
0573:
0574: /**
0575: * Stops this database cluster
0576: */
0577: public synchronized void stop() {
0578: if (!this .active)
0579: return;
0580:
0581: this .active = false;
0582:
0583: this .balancer.clear();
0584:
0585: this .stateManager.stop();
0586: this .lockManager.stop();
0587:
0588: this .cronExecutor.shutdownNow();
0589:
0590: if (this .nonTransactionalExecutor != null) {
0591: this .nonTransactionalExecutor.shutdownNow();
0592: }
0593:
0594: if (this .transactionalExecutor != null) {
0595: this .transactionalExecutor.shutdownNow();
0596: }
0597: }
0598:
0599: /**
0600: * @see net.sf.hajdbc.DatabaseClusterMBean#flushMetaDataCache()
0601: */
0602: @Override
0603: public void flushMetaDataCache() {
0604: Connection connection = null;
0605:
0606: try {
0607: Database<D> database = this .balancer.next();
0608:
0609: connection = database.connect(database
0610: .createConnectionFactory());
0611:
0612: this .databaseMetaDataCache.flush(connection);
0613: } catch (NoSuchElementException e) {
0614: throw new IllegalStateException(Messages.getMessage(
0615: Messages.NO_ACTIVE_DATABASES, this ));
0616: } catch (SQLException e) {
0617: throw new IllegalStateException(e.toString(), e);
0618: } finally {
0619: if (connection != null) {
0620: try {
0621: connection.close();
0622: } catch (SQLException e) {
0623: logger.warn(e.toString(), e);
0624: }
0625: }
0626: }
0627: }
0628:
0629: /**
0630: * @see net.sf.hajdbc.DatabaseCluster#isIdentityColumnDetectionEnabled()
0631: */
0632: @Override
0633: public boolean isIdentityColumnDetectionEnabled() {
0634: return this .identityColumnDetectionEnabled;
0635: }
0636:
0637: /**
0638: * @see net.sf.hajdbc.DatabaseCluster#isSequenceDetectionEnabled()
0639: */
0640: @Override
0641: public boolean isSequenceDetectionEnabled() {
0642: return this .sequenceDetectionEnabled;
0643: }
0644:
0645: /**
0646: * @see net.sf.hajdbc.DatabaseCluster#isCurrentDateEvaluationEnabled()
0647: */
0648: @Override
0649: public boolean isCurrentDateEvaluationEnabled() {
0650: return this .currentDateEvaluationEnabled;
0651: }
0652:
0653: /**
0654: * @see net.sf.hajdbc.DatabaseCluster#isCurrentTimeEvaluationEnabled()
0655: */
0656: @Override
0657: public boolean isCurrentTimeEvaluationEnabled() {
0658: return this .currentTimeEvaluationEnabled;
0659: }
0660:
0661: /**
0662: * @see net.sf.hajdbc.DatabaseCluster#isCurrentTimestampEvaluationEnabled()
0663: */
0664: @Override
0665: public boolean isCurrentTimestampEvaluationEnabled() {
0666: return this .currentTimestampEvaluationEnabled;
0667: }
0668:
0669: /**
0670: * @see net.sf.hajdbc.DatabaseCluster#isRandEvaluationEnabled()
0671: */
0672: @Override
0673: public boolean isRandEvaluationEnabled() {
0674: return this .randEvaluationEnabled;
0675: }
0676:
0677: /**
0678: * @see java.lang.Object#toString()
0679: */
0680: @Override
0681: public String toString() {
0682: return this .getId();
0683: }
0684:
0685: /**
0686: * @see java.lang.Object#equals(java.lang.Object)
0687: */
0688: @SuppressWarnings("unchecked")
0689: @Override
0690: public boolean equals(Object object) {
0691: if ((object == null) || !(object instanceof DatabaseCluster))
0692: return false;
0693:
0694: String id = ((DatabaseCluster) object).getId();
0695:
0696: return (id != null) && id.equals(this .id);
0697: }
0698:
0699: /**
0700: * @see java.lang.Object#hashCode()
0701: */
0702: @Override
0703: public int hashCode() {
0704: return this .id.hashCode();
0705: }
0706:
0707: protected DatabaseClusterDecorator getDecorator() {
0708: return this .decorator;
0709: }
0710:
0711: protected void setDecorator(DatabaseClusterDecorator decorator) {
0712: this .decorator = decorator;
0713: }
0714:
0715: protected void add(Database<D> database) {
0716: String id = database.getId();
0717:
0718: synchronized (this .databaseMap) {
0719: if (this .databaseMap.containsKey(id)) {
0720: throw new IllegalArgumentException(Messages.getMessage(
0721: Messages.DATABASE_ALREADY_EXISTS, id, this ));
0722: }
0723:
0724: this .register(database, database.getInactiveMBean());
0725:
0726: this .databaseMap.put(id, database);
0727: }
0728: }
0729:
0730: protected Iterator<Database<D>> getDatabases() {
0731: synchronized (this .databaseMap) {
0732: return this .databaseMap.values().iterator();
0733: }
0734: }
0735:
0736: /**
0737: * @see net.sf.hajdbc.DatabaseCluster#getStateManager()
0738: */
0739: @Override
0740: public StateManager getStateManager() {
0741: return this .stateManager;
0742: }
0743:
0744: /**
0745: * @see net.sf.hajdbc.DatabaseCluster#setStateManager(net.sf.hajdbc.StateManager)
0746: */
0747: @Override
0748: public void setStateManager(StateManager stateManager) {
0749: this .stateManager = stateManager;
0750: }
0751:
0752: /**
0753: * @see net.sf.hajdbc.DatabaseCluster#setLockManager(net.sf.hajdbc.LockManager)
0754: */
0755: @Override
0756: public void setLockManager(LockManager lockManager) {
0757: this .lockManager = lockManager;
0758: }
0759:
0760: /**
0761: * @see net.sf.hajdbc.DatabaseClusterMBean#getUrl()
0762: */
0763: @Override
0764: public URL getUrl() {
0765: return this .url;
0766: }
0767:
0768: private boolean activate(Database<D> database,
0769: SynchronizationStrategy strategy) throws SQLException,
0770: InterruptedException {
0771: Lock lock = this .lockManager.writeLock(LockManager.GLOBAL);
0772:
0773: lock.lockInterruptibly();
0774:
0775: try {
0776: SynchronizationContext<D> context = new SynchronizationContextImpl<D>(
0777: this , database);
0778:
0779: if (context.getActiveDatabaseSet().contains(database)) {
0780: return false;
0781: }
0782:
0783: this .test(database);
0784:
0785: try {
0786: logger.info(Messages.getMessage(
0787: Messages.DATABASE_SYNC_START, database, this ));
0788:
0789: strategy.synchronize(context);
0790:
0791: logger.info(Messages.getMessage(
0792: Messages.DATABASE_SYNC_END, database, this ));
0793:
0794: return this .activate(database, this .stateManager);
0795: } finally {
0796: context.close();
0797: }
0798: } catch (NoSuchElementException e) {
0799: return this .activate(database, this .stateManager);
0800: } finally {
0801: lock.unlock();
0802: }
0803: }
0804:
0805: /**
0806: * @see javax.management.MBeanRegistration#postDeregister()
0807: */
0808: @Override
0809: public void postDeregister() {
0810: this .stop();
0811:
0812: this .unregisterDatabases();
0813: }
0814:
0815: private void unregisterDatabases() {
0816: synchronized (this .databaseMap) {
0817: Iterator<Database<D>> databases = this .databaseMap.values()
0818: .iterator();
0819:
0820: while (databases.hasNext()) {
0821: this .unregister(databases.next());
0822:
0823: databases.remove();
0824: }
0825: }
0826: }
0827:
0828: /**
0829: * @see javax.management.MBeanRegistration#postRegister(java.lang.Boolean)
0830: */
0831: @Override
0832: public void postRegister(Boolean registered) {
0833: if (!registered) {
0834: this .postDeregister();
0835: }
0836: }
0837:
0838: /**
0839: * @see javax.management.MBeanRegistration#preDeregister()
0840: */
0841: @Override
0842: public void preDeregister() throws Exception {
0843: // Nothing to do
0844: }
0845:
0846: /**
0847: * @see javax.management.MBeanRegistration#preRegister(javax.management.MBeanServer, javax.management.ObjectName)
0848: */
0849: @Override
0850: public ObjectName preRegister(MBeanServer server, ObjectName name)
0851: throws Exception {
0852: this .server = server;
0853:
0854: InputStream inputStream = null;
0855:
0856: logger.info(Messages.getMessage(Messages.HA_JDBC_INIT, this
0857: .getVersion(), this .url));
0858:
0859: try {
0860: inputStream = this .url.openStream();
0861:
0862: IUnmarshallingContext context = BindingDirectory
0863: .getFactory(this .getClass())
0864: .createUnmarshallingContext();
0865:
0866: context.setDocument(inputStream, null);
0867:
0868: context.setUserContext(this );
0869:
0870: context.unmarshalElement();
0871:
0872: if (this .decorator != null) {
0873: this .decorator.decorate(this );
0874: }
0875:
0876: this .start();
0877:
0878: return name;
0879: } catch (IOException e) {
0880: logger.error(Messages.getMessage(Messages.CONFIG_NOT_FOUND,
0881: this .url), e);
0882:
0883: throw e;
0884: } catch (JiBXException e) {
0885: logger.error(Messages.getMessage(
0886: Messages.CONFIG_LOAD_FAILED, this .url), e);
0887:
0888: this .unregisterDatabases();
0889:
0890: throw e;
0891: } catch (Exception e) {
0892: logger.error(Messages.getMessage(
0893: Messages.CLUSTER_START_FAILED, this ), e);
0894:
0895: this .postDeregister();
0896:
0897: throw e;
0898: } finally {
0899: if (inputStream != null) {
0900: try {
0901: inputStream.close();
0902: } catch (IOException e) {
0903: logger.warn(e.toString(), e);
0904: }
0905: }
0906: }
0907: }
0908:
0909: private void export() {
0910: File file = null;
0911: WritableByteChannel outputChannel = null;
0912: FileChannel fileChannel = null;
0913:
0914: try {
0915: file = File.createTempFile("ha-jdbc", ".xml"); //$NON-NLS-1$ //$NON-NLS-2$
0916:
0917: IMarshallingContext context = BindingDirectory.getFactory(
0918: this .getClass()).createMarshallingContext();
0919:
0920: context.setIndent(1,
0921: System.getProperty("line.separator"), '\t'); //$NON-NLS-1$
0922:
0923: // This method closes the writer
0924: context.marshalDocument(this , null, null, new FileWriter(
0925: file));
0926:
0927: fileChannel = new FileInputStream(file).getChannel();
0928:
0929: outputChannel = this .getOutputChannel(this .url);
0930:
0931: fileChannel.transferTo(0, file.length(), outputChannel);
0932: } catch (Exception e) {
0933: logger.warn(Messages.getMessage(
0934: Messages.CONFIG_STORE_FAILED, this .url), e);
0935: } finally {
0936: if (outputChannel != null) {
0937: try {
0938: outputChannel.close();
0939: } catch (IOException e) {
0940: logger.warn(e.getMessage(), e);
0941: }
0942: }
0943:
0944: if (fileChannel != null) {
0945: try {
0946: fileChannel.close();
0947: } catch (IOException e) {
0948: logger.warn(e.getMessage(), e);
0949: }
0950: }
0951:
0952: if (file != null) {
0953: file.delete();
0954: }
0955: }
0956: }
0957:
0958: /**
0959: * We cannot use URLConnection for files because Sun's implementation does not support output.
0960: */
0961: private WritableByteChannel getOutputChannel(URL url)
0962: throws IOException {
0963: return this .isFile(url) ? new FileOutputStream(this .toFile(url))
0964: .getChannel()
0965: : Channels.newChannel(url.openConnection()
0966: .getOutputStream());
0967: }
0968:
0969: private boolean isFile(URL url) {
0970: return url.getProtocol().equals("file"); //$NON-NLS-1$
0971: }
0972:
0973: private File toFile(URL url) {
0974: return new File(url.getPath());
0975: }
0976:
0977: protected void addSynchronizationStrategyBuilder(
0978: SynchronizationStrategyBuilder builder) throws Exception {
0979: this .synchronizationStrategyMap.put(builder.getId(), builder
0980: .buildStrategy());
0981: }
0982:
0983: protected Iterator<SynchronizationStrategyBuilder> getSynchronizationStrategyBuilders()
0984: throws Exception {
0985: List<SynchronizationStrategyBuilder> builderList = new ArrayList<SynchronizationStrategyBuilder>(
0986: this .synchronizationStrategyMap.size());
0987:
0988: for (Map.Entry<String, SynchronizationStrategy> mapEntry : this .synchronizationStrategyMap
0989: .entrySet()) {
0990: builderList.add(SynchronizationStrategyBuilder.getBuilder(
0991: mapEntry.getKey(), mapEntry.getValue()));
0992: }
0993:
0994: return builderList.iterator();
0995: }
0996:
0997: class FailureDetectionTask implements Runnable {
0998: /**
0999: * @see java.lang.Runnable#run()
1000: */
1001: @Override
1002: public void run() {
1003: Set<Database<D>> databaseSet = AbstractDatabaseCluster.this
1004: .getBalancer().all();
1005:
1006: if (databaseSet.size() > 1) {
1007: Map<Boolean, List<Database<D>>> aliveMap = AbstractDatabaseCluster.this
1008: .getAliveMap(databaseSet);
1009:
1010: // Deactivate the dead databases, so long as at least one is alive
1011: // Skip deactivation if membership is empty in case of cluster panic
1012: if (!aliveMap.get(true).isEmpty()
1013: && !AbstractDatabaseCluster.this
1014: .getStateManager().isMembershipEmpty()) {
1015: for (Database<D> database : aliveMap.get(false)) {
1016: if (AbstractDatabaseCluster.this .deactivate(
1017: database, AbstractDatabaseCluster.this
1018: .getStateManager())) {
1019: logger.error(Messages.getMessage(
1020: Messages.DATABASE_DEACTIVATED,
1021: database, this ));
1022: }
1023: }
1024: }
1025: }
1026: }
1027: }
1028:
1029: class AutoActivationTask implements Runnable {
1030: /**
1031: * @see java.lang.Runnable#run()
1032: */
1033: @Override
1034: public void run() {
1035: for (String databaseId : AbstractDatabaseCluster.this
1036: .getInactiveDatabases()) {
1037: AbstractDatabaseCluster.this.activate(databaseId);
1038: }
1039: }
1040: }
1041: }
|