Source Code Cross Referenced for AbstractDatabaseCluster.java in  » Database-JDBC-Connection-Pool » HA-JDBC » net » sf » hajdbc » sql » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Database JDBC Connection Pool » HA JDBC » net.sf.hajdbc.sql 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        /*
0002:         * HA-JDBC: High-Availability JDBC
0003:         * Copyright (c) 2004-2007 Paul Ferraro
0004:         * 
0005:         * This library is free software; you can redistribute it and/or modify it 
0006:         * under the terms of the GNU Lesser General Public License as published by the 
0007:         * Free Software Foundation; either version 2.1 of the License, or (at your 
0008:         * option) any later version.
0009:         * 
0010:         * This library is distributed in the hope that it will be useful, but WITHOUT
0011:         * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 
0012:         * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License 
0013:         * for more details.
0014:         * 
0015:         * You should have received a copy of the GNU Lesser General Public License
0016:         * along with this library; if not, write to the Free Software Foundation, 
0017:         * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0018:         * 
0019:         * Contact: ferraro@users.sourceforge.net
0020:         */
0021:        package net.sf.hajdbc.sql;
0022:
0023:        import java.io.File;
0024:        import java.io.FileInputStream;
0025:        import java.io.FileOutputStream;
0026:        import java.io.FileWriter;
0027:        import java.io.IOException;
0028:        import java.io.InputStream;
0029:        import java.net.URL;
0030:        import java.nio.channels.Channels;
0031:        import java.nio.channels.FileChannel;
0032:        import java.nio.channels.WritableByteChannel;
0033:        import java.sql.Connection;
0034:        import java.sql.SQLException;
0035:        import java.sql.Statement;
0036:        import java.util.ArrayList;
0037:        import java.util.Collection;
0038:        import java.util.HashMap;
0039:        import java.util.Iterator;
0040:        import java.util.List;
0041:        import java.util.Map;
0042:        import java.util.NoSuchElementException;
0043:        import java.util.Set;
0044:        import java.util.TreeMap;
0045:        import java.util.TreeSet;
0046:        import java.util.concurrent.Callable;
0047:        import java.util.concurrent.ExecutionException;
0048:        import java.util.concurrent.ExecutorService;
0049:        import java.util.concurrent.Future;
0050:        import java.util.concurrent.SynchronousQueue;
0051:        import java.util.concurrent.ThreadPoolExecutor;
0052:        import java.util.concurrent.TimeUnit;
0053:        import java.util.concurrent.locks.Lock;
0054:
0055:        import javax.management.DynamicMBean;
0056:        import javax.management.JMException;
0057:        import javax.management.MBeanRegistration;
0058:        import javax.management.MBeanServer;
0059:        import javax.management.ObjectName;
0060:
0061:        import net.sf.hajdbc.Balancer;
0062:        import net.sf.hajdbc.Database;
0063:        import net.sf.hajdbc.DatabaseCluster;
0064:        import net.sf.hajdbc.DatabaseClusterDecorator;
0065:        import net.sf.hajdbc.DatabaseClusterFactory;
0066:        import net.sf.hajdbc.DatabaseClusterMBean;
0067:        import net.sf.hajdbc.DatabaseMetaDataCache;
0068:        import net.sf.hajdbc.Dialect;
0069:        import net.sf.hajdbc.LockManager;
0070:        import net.sf.hajdbc.Messages;
0071:        import net.sf.hajdbc.StateManager;
0072:        import net.sf.hajdbc.SynchronizationContext;
0073:        import net.sf.hajdbc.SynchronizationStrategy;
0074:        import net.sf.hajdbc.local.LocalLockManager;
0075:        import net.sf.hajdbc.local.LocalStateManager;
0076:        import net.sf.hajdbc.sync.SynchronizationContextImpl;
0077:        import net.sf.hajdbc.sync.SynchronizationStrategyBuilder;
0078:        import net.sf.hajdbc.util.concurrent.CronThreadPoolExecutor;
0079:        import net.sf.hajdbc.util.concurrent.SynchronousExecutor;
0080:
0081:        import org.jibx.runtime.BindingDirectory;
0082:        import org.jibx.runtime.IMarshallingContext;
0083:        import org.jibx.runtime.IUnmarshallingContext;
0084:        import org.jibx.runtime.JiBXException;
0085:        import org.quartz.CronExpression;
0086:        import org.slf4j.Logger;
0087:        import org.slf4j.LoggerFactory;
0088:
0089:        /**
0090:         * @author  Paul Ferraro
0091:         * @param <D> either java.sql.Driver or javax.sql.DataSource
0092:         * @since   1.0
0093:         */
0094:        public abstract class AbstractDatabaseCluster<D> implements 
0095:                DatabaseCluster<D>, DatabaseClusterMBean, MBeanRegistration {
0096:            static Logger logger = LoggerFactory
0097:                    .getLogger(AbstractDatabaseCluster.class);
0098:
0099:            private String id;
0100:            private Balancer<D> balancer;
0101:            private Dialect dialect;
0102:            private DatabaseMetaDataCache databaseMetaDataCache;
0103:            private String defaultSynchronizationStrategyId;
0104:            private CronExpression failureDetectionExpression;
0105:            private CronExpression autoActivationExpression;
0106:            private int minThreads;
0107:            private int maxThreads;
0108:            private int maxIdle;
0109:            private TransactionMode transactionMode;
0110:            private boolean identityColumnDetectionEnabled;
0111:            private boolean sequenceDetectionEnabled;
0112:            private boolean currentDateEvaluationEnabled;
0113:            private boolean currentTimeEvaluationEnabled;
0114:            private boolean currentTimestampEvaluationEnabled;
0115:            private boolean randEvaluationEnabled;
0116:
0117:            private MBeanServer server;
0118:            private URL url;
0119:            private Map<String, SynchronizationStrategy> synchronizationStrategyMap = new HashMap<String, SynchronizationStrategy>();
0120:            private DatabaseClusterDecorator decorator;
0121:            private Map<String, Database<D>> databaseMap = new HashMap<String, Database<D>>();
0122:            private ExecutorService transactionalExecutor;
0123:            private ExecutorService nonTransactionalExecutor;
0124:            private CronThreadPoolExecutor cronExecutor = new CronThreadPoolExecutor(
0125:                    2);
0126:            private LockManager lockManager = new LocalLockManager();
0127:            private StateManager stateManager = new LocalStateManager(this );
0128:            private volatile boolean active = false;
0129:
0130:            protected AbstractDatabaseCluster(String id, URL url) {
0131:                this .id = id;
0132:                this .url = url;
0133:            }
0134:
0135:            /**
0136:             * @see net.sf.hajdbc.DatabaseCluster#getAliveMap(java.util.Collection)
0137:             */
0138:            @Override
0139:            public Map<Boolean, List<Database<D>>> getAliveMap(
0140:                    Collection<Database<D>> databases) {
0141:                Map<Database<D>, Future<Boolean>> futureMap = new TreeMap<Database<D>, Future<Boolean>>();
0142:
0143:                for (final Database<D> database : databases) {
0144:                    Callable<Boolean> task = new Callable<Boolean>() {
0145:                        public Boolean call() throws Exception {
0146:                            return AbstractDatabaseCluster.this 
0147:                                    .isAlive(database);
0148:                        }
0149:                    };
0150:
0151:                    futureMap.put(database, this .nonTransactionalExecutor
0152:                            .submit(task));
0153:                }
0154:
0155:                Map<Boolean, List<Database<D>>> map = new TreeMap<Boolean, List<Database<D>>>();
0156:
0157:                int size = databases.size();
0158:
0159:                map.put(false, new ArrayList<Database<D>>(size));
0160:                map.put(true, new ArrayList<Database<D>>(size));
0161:
0162:                for (Map.Entry<Database<D>, Future<Boolean>> futureMapEntry : futureMap
0163:                        .entrySet()) {
0164:                    try {
0165:                        map.get(futureMapEntry.getValue().get()).add(
0166:                                futureMapEntry.getKey());
0167:                    } catch (ExecutionException e) {
0168:                        // isAlive does not throw an exception
0169:                        throw new IllegalStateException(e);
0170:                    } catch (InterruptedException e) {
0171:                        Thread.currentThread().interrupt();
0172:                    }
0173:                }
0174:
0175:                return map;
0176:            }
0177:
0178:            boolean isAlive(Database<D> database) {
0179:                try {
0180:                    this .test(database);
0181:
0182:                    return true;
0183:                } catch (SQLException e) {
0184:                    logger.warn(Messages.getMessage(
0185:                            Messages.DATABASE_NOT_ALIVE, database, this ), e);
0186:
0187:                    return false;
0188:                }
0189:            }
0190:
0191:            private void test(Database<D> database) throws SQLException {
0192:                Connection connection = null;
0193:
0194:                try {
0195:                    connection = database.connect(database
0196:                            .createConnectionFactory());
0197:
0198:                    Statement statement = connection.createStatement();
0199:
0200:                    statement.execute(this .dialect.getSimpleSQL());
0201:
0202:                    statement.close();
0203:                } finally {
0204:                    if (connection != null) {
0205:                        try {
0206:                            connection.close();
0207:                        } catch (SQLException e) {
0208:                            logger.warn(e.toString(), e);
0209:                        }
0210:                    }
0211:                }
0212:            }
0213:
0214:            /**
0215:             * @see net.sf.hajdbc.DatabaseCluster#deactivate(net.sf.hajdbc.Database, net.sf.hajdbc.StateManager)
0216:             */
0217:            @Override
0218:            public boolean deactivate(Database<D> database,
0219:                    StateManager stateManager) {
0220:                synchronized (this .balancer) {
0221:                    this .unregister(database);
0222:                    // Reregister database mbean using "inactive" interface
0223:                    this .register(database, database.getInactiveMBean());
0224:
0225:                    boolean removed = this .balancer.remove(database);
0226:
0227:                    if (removed) {
0228:                        stateManager.remove(database.getId());
0229:                    }
0230:
0231:                    return removed;
0232:                }
0233:            }
0234:
0235:            /**
0236:             * @see net.sf.hajdbc.DatabaseCluster#getId()
0237:             */
0238:            @Override
0239:            public String getId() {
0240:                return this .id;
0241:            }
0242:
0243:            /**
0244:             * @see net.sf.hajdbc.DatabaseClusterMBean#getVersion()
0245:             */
0246:            @Override
0247:            public String getVersion() {
0248:                return DatabaseClusterFactory.getVersion();
0249:            }
0250:
0251:            /**
0252:             * @see net.sf.hajdbc.DatabaseCluster#activate(net.sf.hajdbc.Database, net.sf.hajdbc.StateManager)
0253:             */
0254:            @Override
0255:            public boolean activate(Database<D> database,
0256:                    StateManager stateManager) {
0257:                synchronized (this .balancer) {
0258:                    this .unregister(database);
0259:                    // Reregister database mbean using "active" interface
0260:                    this .register(database, database.getActiveMBean());
0261:
0262:                    if (database.isDirty()) {
0263:                        this .export();
0264:
0265:                        database.clean();
0266:                    }
0267:
0268:                    boolean added = this .balancer.add(database);
0269:
0270:                    if (added) {
0271:                        stateManager.add(database.getId());
0272:                    }
0273:
0274:                    return added;
0275:                }
0276:            }
0277:
0278:            /**
0279:             * @see net.sf.hajdbc.DatabaseClusterMBean#getActiveDatabases()
0280:             */
0281:            @Override
0282:            public Set<String> getActiveDatabases() {
0283:                Set<String> databaseSet = new TreeSet<String>();
0284:
0285:                for (Database<D> database : this .balancer.all()) {
0286:                    databaseSet.add(database.getId());
0287:                }
0288:
0289:                return databaseSet;
0290:            }
0291:
0292:            /**
0293:             * @see net.sf.hajdbc.DatabaseClusterMBean#getInactiveDatabases()
0294:             */
0295:            @Override
0296:            public Set<String> getInactiveDatabases() {
0297:                synchronized (this .databaseMap) {
0298:                    Set<String> databaseSet = new TreeSet<String>(
0299:                            this .databaseMap.keySet());
0300:
0301:                    for (Database<D> database : this .balancer.all()) {
0302:                        databaseSet.remove(database.getId());
0303:                    }
0304:
0305:                    return databaseSet;
0306:                }
0307:            }
0308:
0309:            /**
0310:             * @see net.sf.hajdbc.DatabaseCluster#getDatabase(java.lang.String)
0311:             */
0312:            @Override
0313:            public Database<D> getDatabase(String id) {
0314:                synchronized (this .databaseMap) {
0315:                    Database<D> database = this .databaseMap.get(id);
0316:
0317:                    if (database == null) {
0318:                        throw new IllegalArgumentException(Messages.getMessage(
0319:                                Messages.INVALID_DATABASE, id, this ));
0320:                    }
0321:
0322:                    return database;
0323:                }
0324:            }
0325:
0326:            /**
0327:             * @see net.sf.hajdbc.DatabaseClusterMBean#getDefaultSynchronizationStrategy()
0328:             */
0329:            @Override
0330:            public String getDefaultSynchronizationStrategy() {
0331:                return this .defaultSynchronizationStrategyId;
0332:            }
0333:
0334:            /**
0335:             * @see net.sf.hajdbc.DatabaseClusterMBean#getSynchronizationStrategies()
0336:             */
0337:            @Override
0338:            public Set<String> getSynchronizationStrategies() {
0339:                return new TreeSet<String>(this .synchronizationStrategyMap
0340:                        .keySet());
0341:            }
0342:
0343:            /**
0344:             * @see net.sf.hajdbc.DatabaseCluster#getBalancer()
0345:             */
0346:            @Override
0347:            public Balancer<D> getBalancer() {
0348:                return this .balancer;
0349:            }
0350:
0351:            /**
0352:             * @see net.sf.hajdbc.DatabaseCluster#getTransactionalExecutor()
0353:             */
0354:            @Override
0355:            public ExecutorService getTransactionalExecutor() {
0356:                return this .transactionalExecutor;
0357:            }
0358:
0359:            /**
0360:             * @see net.sf.hajdbc.DatabaseCluster#getNonTransactionalExecutor()
0361:             */
0362:            @Override
0363:            public ExecutorService getNonTransactionalExecutor() {
0364:                return this .nonTransactionalExecutor;
0365:            }
0366:
0367:            /**
0368:             * @see net.sf.hajdbc.DatabaseCluster#getDialect()
0369:             */
0370:            @Override
0371:            public Dialect getDialect() {
0372:                return this .dialect;
0373:            }
0374:
0375:            /**
0376:             * @see net.sf.hajdbc.DatabaseCluster#getDatabaseMetaDataCache()
0377:             */
0378:            @Override
0379:            public DatabaseMetaDataCache getDatabaseMetaDataCache() {
0380:                return this .databaseMetaDataCache;
0381:            }
0382:
0383:            /**
0384:             * @see net.sf.hajdbc.DatabaseCluster#getLockManager()
0385:             */
0386:            @Override
0387:            public LockManager getLockManager() {
0388:                return this .lockManager;
0389:            }
0390:
0391:            /**
0392:             * @see net.sf.hajdbc.DatabaseClusterMBean#isAlive(java.lang.String)
0393:             */
0394:            @Override
0395:            public boolean isAlive(String id) {
0396:                return this .isAlive(this .getDatabase(id));
0397:            }
0398:
0399:            /**
0400:             * @see net.sf.hajdbc.DatabaseClusterMBean#deactivate(java.lang.String)
0401:             */
0402:            @Override
0403:            public void deactivate(String databaseId) {
0404:                if (this .deactivate(this .getDatabase(databaseId),
0405:                        this .stateManager)) {
0406:                    logger.info(Messages.getMessage(
0407:                            Messages.DATABASE_DEACTIVATED, databaseId, this ));
0408:                }
0409:            }
0410:
0411:            /**
0412:             * @see net.sf.hajdbc.DatabaseClusterMBean#activate(java.lang.String)
0413:             */
0414:            @Override
0415:            public void activate(String databaseId) {
0416:                this .activate(databaseId, this 
0417:                        .getDefaultSynchronizationStrategy());
0418:            }
0419:
0420:            /**
0421:             * @see net.sf.hajdbc.DatabaseClusterMBean#activate(java.lang.String, java.lang.String)
0422:             */
0423:            @Override
0424:            public void activate(String databaseId, String strategyId) {
0425:                SynchronizationStrategy strategy = this .synchronizationStrategyMap
0426:                        .get(strategyId);
0427:
0428:                if (strategy == null) {
0429:                    throw new IllegalArgumentException(Messages.getMessage(
0430:                            Messages.INVALID_SYNC_STRATEGY, strategyId));
0431:                }
0432:
0433:                try {
0434:                    if (this .activate(this .getDatabase(databaseId), strategy)) {
0435:                        logger.info(Messages.getMessage(
0436:                                Messages.DATABASE_ACTIVATED, databaseId, this ));
0437:                    }
0438:                } catch (SQLException e) {
0439:                    logger.warn(Messages
0440:                            .getMessage(Messages.DATABASE_ACTIVATE_FAILED,
0441:                                    databaseId, this ), e);
0442:
0443:                    SQLException exception = e.getNextException();
0444:
0445:                    while (exception != null) {
0446:                        logger.error(exception.getMessage(), e);
0447:
0448:                        exception = exception.getNextException();
0449:                    }
0450:
0451:                    throw new IllegalStateException(e.toString());
0452:                } catch (InterruptedException e) {
0453:                    logger.warn(e.toString(), e);
0454:
0455:                    Thread.currentThread().interrupt();
0456:                }
0457:            }
0458:
0459:            protected void register(Database<D> database, DynamicMBean mbean) {
0460:                try {
0461:                    ObjectName name = DatabaseClusterFactory.getObjectName(
0462:                            this .id, database.getId());
0463:
0464:                    this .server.registerMBean(mbean, name);
0465:                } catch (JMException e) {
0466:                    logger.error(e.toString(), e);
0467:
0468:                    throw new IllegalStateException(e);
0469:                }
0470:            }
0471:
0472:            /**
0473:             * @see net.sf.hajdbc.DatabaseClusterMBean#remove(java.lang.String)
0474:             */
0475:            @Override
0476:            public void remove(String id) {
0477:                synchronized (this .databaseMap) {
0478:                    Database<D> database = this .getDatabase(id);
0479:
0480:                    if (this .balancer.all().contains(database)) {
0481:                        throw new IllegalStateException(Messages.getMessage(
0482:                                Messages.DATABASE_STILL_ACTIVE, id, this ));
0483:                    }
0484:
0485:                    this .unregister(database);
0486:
0487:                    this .databaseMap.remove(id);
0488:
0489:                    this .export();
0490:                }
0491:            }
0492:
0493:            private void unregister(Database<D> database) {
0494:                try {
0495:                    ObjectName name = DatabaseClusterFactory.getObjectName(
0496:                            this .id, database.getId());
0497:
0498:                    if (this .server.isRegistered(name)) {
0499:                        this .server.unregisterMBean(name);
0500:                    }
0501:                } catch (JMException e) {
0502:                    logger.error(e.toString(), e);
0503:
0504:                    throw new IllegalStateException(e);
0505:                }
0506:            }
0507:
0508:            /**
0509:             * @see net.sf.hajdbc.DatabaseCluster#isActive()
0510:             */
0511:            @Override
0512:            public boolean isActive() {
0513:                return this .active;
0514:            }
0515:
0516:            /**
0517:             * Starts this database cluster
0518:             * @throws Exception if database cluster start fails
0519:             */
0520:            public synchronized void start() throws Exception {
0521:                if (this .active)
0522:                    return;
0523:
0524:                this .lockManager.start();
0525:                this .stateManager.start();
0526:
0527:                this .nonTransactionalExecutor = new ThreadPoolExecutor(
0528:                        this .minThreads, this .maxThreads, this .maxIdle,
0529:                        TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
0530:                        new ThreadPoolExecutor.CallerRunsPolicy());
0531:
0532:                this .transactionalExecutor = this .transactionMode
0533:                        .equals(TransactionMode.SERIAL) ? new SynchronousExecutor()
0534:                        : this .nonTransactionalExecutor;
0535:
0536:                Set<String> databaseSet = this .stateManager.getInitialState();
0537:
0538:                if (databaseSet != null) {
0539:                    for (String databaseId : databaseSet) {
0540:                        Database<D> database = this .getDatabase(databaseId);
0541:
0542:                        if (database != null) {
0543:                            this .activate(database, this .stateManager);
0544:                        }
0545:                    }
0546:                } else {
0547:                    for (Database<D> database : this .getAliveMap(
0548:                            this .databaseMap.values()).get(true)) {
0549:                        this .activate(database, this .stateManager);
0550:                    }
0551:                }
0552:
0553:                this .databaseMetaDataCache.setDialect(this .dialect);
0554:
0555:                try {
0556:                    this .flushMetaDataCache();
0557:                } catch (IllegalStateException e) {
0558:                    // Ignore - cache will initialize lazily.
0559:                }
0560:
0561:                if (this .failureDetectionExpression != null) {
0562:                    this .cronExecutor.schedule(new FailureDetectionTask(),
0563:                            this .failureDetectionExpression);
0564:                }
0565:
0566:                if (this .autoActivationExpression != null) {
0567:                    this .cronExecutor.schedule(new AutoActivationTask(),
0568:                            this .autoActivationExpression);
0569:                }
0570:
0571:                this .active = true;
0572:            }
0573:
0574:            /**
0575:             * Stops this database cluster
0576:             */
0577:            public synchronized void stop() {
0578:                if (!this .active)
0579:                    return;
0580:
0581:                this .active = false;
0582:
0583:                this .balancer.clear();
0584:
0585:                this .stateManager.stop();
0586:                this .lockManager.stop();
0587:
0588:                this .cronExecutor.shutdownNow();
0589:
0590:                if (this .nonTransactionalExecutor != null) {
0591:                    this .nonTransactionalExecutor.shutdownNow();
0592:                }
0593:
0594:                if (this .transactionalExecutor != null) {
0595:                    this .transactionalExecutor.shutdownNow();
0596:                }
0597:            }
0598:
0599:            /**
0600:             * @see net.sf.hajdbc.DatabaseClusterMBean#flushMetaDataCache()
0601:             */
0602:            @Override
0603:            public void flushMetaDataCache() {
0604:                Connection connection = null;
0605:
0606:                try {
0607:                    Database<D> database = this .balancer.next();
0608:
0609:                    connection = database.connect(database
0610:                            .createConnectionFactory());
0611:
0612:                    this .databaseMetaDataCache.flush(connection);
0613:                } catch (NoSuchElementException e) {
0614:                    throw new IllegalStateException(Messages.getMessage(
0615:                            Messages.NO_ACTIVE_DATABASES, this ));
0616:                } catch (SQLException e) {
0617:                    throw new IllegalStateException(e.toString(), e);
0618:                } finally {
0619:                    if (connection != null) {
0620:                        try {
0621:                            connection.close();
0622:                        } catch (SQLException e) {
0623:                            logger.warn(e.toString(), e);
0624:                        }
0625:                    }
0626:                }
0627:            }
0628:
0629:            /**
0630:             * @see net.sf.hajdbc.DatabaseCluster#isIdentityColumnDetectionEnabled()
0631:             */
0632:            @Override
0633:            public boolean isIdentityColumnDetectionEnabled() {
0634:                return this .identityColumnDetectionEnabled;
0635:            }
0636:
0637:            /**
0638:             * @see net.sf.hajdbc.DatabaseCluster#isSequenceDetectionEnabled()
0639:             */
0640:            @Override
0641:            public boolean isSequenceDetectionEnabled() {
0642:                return this .sequenceDetectionEnabled;
0643:            }
0644:
0645:            /**
0646:             * @see net.sf.hajdbc.DatabaseCluster#isCurrentDateEvaluationEnabled()
0647:             */
0648:            @Override
0649:            public boolean isCurrentDateEvaluationEnabled() {
0650:                return this .currentDateEvaluationEnabled;
0651:            }
0652:
0653:            /**
0654:             * @see net.sf.hajdbc.DatabaseCluster#isCurrentTimeEvaluationEnabled()
0655:             */
0656:            @Override
0657:            public boolean isCurrentTimeEvaluationEnabled() {
0658:                return this .currentTimeEvaluationEnabled;
0659:            }
0660:
0661:            /**
0662:             * @see net.sf.hajdbc.DatabaseCluster#isCurrentTimestampEvaluationEnabled()
0663:             */
0664:            @Override
0665:            public boolean isCurrentTimestampEvaluationEnabled() {
0666:                return this .currentTimestampEvaluationEnabled;
0667:            }
0668:
0669:            /**
0670:             * @see net.sf.hajdbc.DatabaseCluster#isRandEvaluationEnabled()
0671:             */
0672:            @Override
0673:            public boolean isRandEvaluationEnabled() {
0674:                return this .randEvaluationEnabled;
0675:            }
0676:
0677:            /**
0678:             * @see java.lang.Object#toString()
0679:             */
0680:            @Override
0681:            public String toString() {
0682:                return this .getId();
0683:            }
0684:
0685:            /**
0686:             * @see java.lang.Object#equals(java.lang.Object)
0687:             */
0688:            @SuppressWarnings("unchecked")
0689:            @Override
0690:            public boolean equals(Object object) {
0691:                if ((object == null) || !(object instanceof  DatabaseCluster))
0692:                    return false;
0693:
0694:                String id = ((DatabaseCluster) object).getId();
0695:
0696:                return (id != null) && id.equals(this .id);
0697:            }
0698:
0699:            /**
0700:             * @see java.lang.Object#hashCode()
0701:             */
0702:            @Override
0703:            public int hashCode() {
0704:                return this .id.hashCode();
0705:            }
0706:
0707:            protected DatabaseClusterDecorator getDecorator() {
0708:                return this .decorator;
0709:            }
0710:
0711:            protected void setDecorator(DatabaseClusterDecorator decorator) {
0712:                this .decorator = decorator;
0713:            }
0714:
0715:            protected void add(Database<D> database) {
0716:                String id = database.getId();
0717:
0718:                synchronized (this .databaseMap) {
0719:                    if (this .databaseMap.containsKey(id)) {
0720:                        throw new IllegalArgumentException(Messages.getMessage(
0721:                                Messages.DATABASE_ALREADY_EXISTS, id, this ));
0722:                    }
0723:
0724:                    this .register(database, database.getInactiveMBean());
0725:
0726:                    this .databaseMap.put(id, database);
0727:                }
0728:            }
0729:
0730:            protected Iterator<Database<D>> getDatabases() {
0731:                synchronized (this .databaseMap) {
0732:                    return this .databaseMap.values().iterator();
0733:                }
0734:            }
0735:
0736:            /**
0737:             * @see net.sf.hajdbc.DatabaseCluster#getStateManager()
0738:             */
0739:            @Override
0740:            public StateManager getStateManager() {
0741:                return this .stateManager;
0742:            }
0743:
0744:            /**
0745:             * @see net.sf.hajdbc.DatabaseCluster#setStateManager(net.sf.hajdbc.StateManager)
0746:             */
0747:            @Override
0748:            public void setStateManager(StateManager stateManager) {
0749:                this .stateManager = stateManager;
0750:            }
0751:
0752:            /**
0753:             * @see net.sf.hajdbc.DatabaseCluster#setLockManager(net.sf.hajdbc.LockManager)
0754:             */
0755:            @Override
0756:            public void setLockManager(LockManager lockManager) {
0757:                this .lockManager = lockManager;
0758:            }
0759:
0760:            /**
0761:             * @see net.sf.hajdbc.DatabaseClusterMBean#getUrl()
0762:             */
0763:            @Override
0764:            public URL getUrl() {
0765:                return this .url;
0766:            }
0767:
0768:            private boolean activate(Database<D> database,
0769:                    SynchronizationStrategy strategy) throws SQLException,
0770:                    InterruptedException {
0771:                Lock lock = this .lockManager.writeLock(LockManager.GLOBAL);
0772:
0773:                lock.lockInterruptibly();
0774:
0775:                try {
0776:                    SynchronizationContext<D> context = new SynchronizationContextImpl<D>(
0777:                            this , database);
0778:
0779:                    if (context.getActiveDatabaseSet().contains(database)) {
0780:                        return false;
0781:                    }
0782:
0783:                    this .test(database);
0784:
0785:                    try {
0786:                        logger.info(Messages.getMessage(
0787:                                Messages.DATABASE_SYNC_START, database, this ));
0788:
0789:                        strategy.synchronize(context);
0790:
0791:                        logger.info(Messages.getMessage(
0792:                                Messages.DATABASE_SYNC_END, database, this ));
0793:
0794:                        return this .activate(database, this .stateManager);
0795:                    } finally {
0796:                        context.close();
0797:                    }
0798:                } catch (NoSuchElementException e) {
0799:                    return this .activate(database, this .stateManager);
0800:                } finally {
0801:                    lock.unlock();
0802:                }
0803:            }
0804:
0805:            /**
0806:             * @see javax.management.MBeanRegistration#postDeregister()
0807:             */
0808:            @Override
0809:            public void postDeregister() {
0810:                this .stop();
0811:
0812:                this .unregisterDatabases();
0813:            }
0814:
0815:            private void unregisterDatabases() {
0816:                synchronized (this .databaseMap) {
0817:                    Iterator<Database<D>> databases = this .databaseMap.values()
0818:                            .iterator();
0819:
0820:                    while (databases.hasNext()) {
0821:                        this .unregister(databases.next());
0822:
0823:                        databases.remove();
0824:                    }
0825:                }
0826:            }
0827:
0828:            /**
0829:             * @see javax.management.MBeanRegistration#postRegister(java.lang.Boolean)
0830:             */
0831:            @Override
0832:            public void postRegister(Boolean registered) {
0833:                if (!registered) {
0834:                    this .postDeregister();
0835:                }
0836:            }
0837:
0838:            /**
0839:             * @see javax.management.MBeanRegistration#preDeregister()
0840:             */
0841:            @Override
0842:            public void preDeregister() throws Exception {
0843:                // Nothing to do
0844:            }
0845:
0846:            /**
0847:             * @see javax.management.MBeanRegistration#preRegister(javax.management.MBeanServer, javax.management.ObjectName)
0848:             */
0849:            @Override
0850:            public ObjectName preRegister(MBeanServer server, ObjectName name)
0851:                    throws Exception {
0852:                this .server = server;
0853:
0854:                InputStream inputStream = null;
0855:
0856:                logger.info(Messages.getMessage(Messages.HA_JDBC_INIT, this 
0857:                        .getVersion(), this .url));
0858:
0859:                try {
0860:                    inputStream = this .url.openStream();
0861:
0862:                    IUnmarshallingContext context = BindingDirectory
0863:                            .getFactory(this .getClass())
0864:                            .createUnmarshallingContext();
0865:
0866:                    context.setDocument(inputStream, null);
0867:
0868:                    context.setUserContext(this );
0869:
0870:                    context.unmarshalElement();
0871:
0872:                    if (this .decorator != null) {
0873:                        this .decorator.decorate(this );
0874:                    }
0875:
0876:                    this .start();
0877:
0878:                    return name;
0879:                } catch (IOException e) {
0880:                    logger.error(Messages.getMessage(Messages.CONFIG_NOT_FOUND,
0881:                            this .url), e);
0882:
0883:                    throw e;
0884:                } catch (JiBXException e) {
0885:                    logger.error(Messages.getMessage(
0886:                            Messages.CONFIG_LOAD_FAILED, this .url), e);
0887:
0888:                    this .unregisterDatabases();
0889:
0890:                    throw e;
0891:                } catch (Exception e) {
0892:                    logger.error(Messages.getMessage(
0893:                            Messages.CLUSTER_START_FAILED, this ), e);
0894:
0895:                    this .postDeregister();
0896:
0897:                    throw e;
0898:                } finally {
0899:                    if (inputStream != null) {
0900:                        try {
0901:                            inputStream.close();
0902:                        } catch (IOException e) {
0903:                            logger.warn(e.toString(), e);
0904:                        }
0905:                    }
0906:                }
0907:            }
0908:
0909:            private void export() {
0910:                File file = null;
0911:                WritableByteChannel outputChannel = null;
0912:                FileChannel fileChannel = null;
0913:
0914:                try {
0915:                    file = File.createTempFile("ha-jdbc", ".xml"); //$NON-NLS-1$ //$NON-NLS-2$
0916:
0917:                    IMarshallingContext context = BindingDirectory.getFactory(
0918:                            this .getClass()).createMarshallingContext();
0919:
0920:                    context.setIndent(1,
0921:                            System.getProperty("line.separator"), '\t'); //$NON-NLS-1$
0922:
0923:                    // This method closes the writer
0924:                    context.marshalDocument(this , null, null, new FileWriter(
0925:                            file));
0926:
0927:                    fileChannel = new FileInputStream(file).getChannel();
0928:
0929:                    outputChannel = this .getOutputChannel(this .url);
0930:
0931:                    fileChannel.transferTo(0, file.length(), outputChannel);
0932:                } catch (Exception e) {
0933:                    logger.warn(Messages.getMessage(
0934:                            Messages.CONFIG_STORE_FAILED, this .url), e);
0935:                } finally {
0936:                    if (outputChannel != null) {
0937:                        try {
0938:                            outputChannel.close();
0939:                        } catch (IOException e) {
0940:                            logger.warn(e.getMessage(), e);
0941:                        }
0942:                    }
0943:
0944:                    if (fileChannel != null) {
0945:                        try {
0946:                            fileChannel.close();
0947:                        } catch (IOException e) {
0948:                            logger.warn(e.getMessage(), e);
0949:                        }
0950:                    }
0951:
0952:                    if (file != null) {
0953:                        file.delete();
0954:                    }
0955:                }
0956:            }
0957:
0958:            /**
0959:             * We cannot use URLConnection for files because Sun's implementation does not support output.
0960:             */
0961:            private WritableByteChannel getOutputChannel(URL url)
0962:                    throws IOException {
0963:                return this .isFile(url) ? new FileOutputStream(this .toFile(url))
0964:                        .getChannel()
0965:                        : Channels.newChannel(url.openConnection()
0966:                                .getOutputStream());
0967:            }
0968:
0969:            private boolean isFile(URL url) {
0970:                return url.getProtocol().equals("file"); //$NON-NLS-1$
0971:            }
0972:
0973:            private File toFile(URL url) {
0974:                return new File(url.getPath());
0975:            }
0976:
0977:            protected void addSynchronizationStrategyBuilder(
0978:                    SynchronizationStrategyBuilder builder) throws Exception {
0979:                this .synchronizationStrategyMap.put(builder.getId(), builder
0980:                        .buildStrategy());
0981:            }
0982:
0983:            protected Iterator<SynchronizationStrategyBuilder> getSynchronizationStrategyBuilders()
0984:                    throws Exception {
0985:                List<SynchronizationStrategyBuilder> builderList = new ArrayList<SynchronizationStrategyBuilder>(
0986:                        this .synchronizationStrategyMap.size());
0987:
0988:                for (Map.Entry<String, SynchronizationStrategy> mapEntry : this .synchronizationStrategyMap
0989:                        .entrySet()) {
0990:                    builderList.add(SynchronizationStrategyBuilder.getBuilder(
0991:                            mapEntry.getKey(), mapEntry.getValue()));
0992:                }
0993:
0994:                return builderList.iterator();
0995:            }
0996:
0997:            class FailureDetectionTask implements  Runnable {
0998:                /**
0999:                 * @see java.lang.Runnable#run()
1000:                 */
1001:                @Override
1002:                public void run() {
1003:                    Set<Database<D>> databaseSet = AbstractDatabaseCluster.this 
1004:                            .getBalancer().all();
1005:
1006:                    if (databaseSet.size() > 1) {
1007:                        Map<Boolean, List<Database<D>>> aliveMap = AbstractDatabaseCluster.this 
1008:                                .getAliveMap(databaseSet);
1009:
1010:                        // Deactivate the dead databases, so long as at least one is alive
1011:                        // Skip deactivation if membership is empty in case of cluster panic
1012:                        if (!aliveMap.get(true).isEmpty()
1013:                                && !AbstractDatabaseCluster.this 
1014:                                        .getStateManager().isMembershipEmpty()) {
1015:                            for (Database<D> database : aliveMap.get(false)) {
1016:                                if (AbstractDatabaseCluster.this .deactivate(
1017:                                        database, AbstractDatabaseCluster.this 
1018:                                                .getStateManager())) {
1019:                                    logger.error(Messages.getMessage(
1020:                                            Messages.DATABASE_DEACTIVATED,
1021:                                            database, this ));
1022:                                }
1023:                            }
1024:                        }
1025:                    }
1026:                }
1027:            }
1028:
1029:            class AutoActivationTask implements  Runnable {
1030:                /**
1031:                 * @see java.lang.Runnable#run()
1032:                 */
1033:                @Override
1034:                public void run() {
1035:                    for (String databaseId : AbstractDatabaseCluster.this
1036:                            .getInactiveDatabases()) {
1037:                        AbstractDatabaseCluster.this.activate(databaseId);
1038:                    }
1039:                }
1040:            }
1041:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.