001: /*
002: * HA-JDBC: High-Availability JDBC
003: * Copyright (c) 2004-2007 Paul Ferraro
004: *
005: * This library is free software; you can redistribute it and/or modify it
006: * under the terms of the GNU Lesser General Public License as published by the
007: * Free Software Foundation; either version 2.1 of the License, or (at your
008: * option) any later version.
009: *
010: * This library is distributed in the hope that it will be useful, but WITHOUT
011: * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
012: * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
013: * for more details.
014: *
015: * You should have received a copy of the GNU Lesser General Public License
016: * along with this library; if not, write to the Free Software Foundation,
017: * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
018: *
019: * Contact: ferraro@users.sourceforge.net
020: */
021: package net.sf.hajdbc.sync;
022:
023: import java.sql.Connection;
024: import java.sql.SQLException;
025: import java.util.HashMap;
026: import java.util.Map;
027: import java.util.Set;
028: import java.util.concurrent.ExecutorService;
029: import java.util.concurrent.Executors;
030:
031: import net.sf.hajdbc.Balancer;
032: import net.sf.hajdbc.Database;
033: import net.sf.hajdbc.DatabaseCluster;
034: import net.sf.hajdbc.DatabaseProperties;
035: import net.sf.hajdbc.Dialect;
036: import net.sf.hajdbc.SynchronizationContext;
037: import net.sf.hajdbc.util.concurrent.DaemonThreadFactory;
038:
039: import org.slf4j.Logger;
040: import org.slf4j.LoggerFactory;
041:
042: /**
043: * @author Paul Ferraro
044: * @param <D>
045: */
046: public class SynchronizationContextImpl<D> implements
047: SynchronizationContext<D> {
048: private static Logger logger = LoggerFactory
049: .getLogger(SynchronizationContextImpl.class);
050:
051: private Set<Database<D>> activeDatabaseSet;
052: private Database<D> sourceDatabase;
053: private Database<D> targetDatabase;
054: private DatabaseCluster<D> cluster;
055: private DatabaseProperties databaseProperties;
056: private Map<Database<D>, Connection> connectionMap = new HashMap<Database<D>, Connection>();
057: private ExecutorService executor;
058:
059: /**
060: * @param cluster
061: * @param database
062: * @throws SQLException
063: */
064: public SynchronizationContextImpl(DatabaseCluster<D> cluster,
065: Database<D> database) throws SQLException {
066: this .cluster = cluster;
067:
068: Balancer<D> balancer = cluster.getBalancer();
069:
070: this .sourceDatabase = balancer.next();
071: this .activeDatabaseSet = balancer.all();
072: this .targetDatabase = database;
073: this .executor = Executors.newFixedThreadPool(
074: this .activeDatabaseSet.size(), DaemonThreadFactory
075: .getInstance());
076: this .databaseProperties = cluster.getDatabaseMetaDataCache()
077: .getDatabaseProperties(
078: this .getConnection(this .targetDatabase));
079: }
080:
081: /**
082: * @see net.sf.hajdbc.SynchronizationContext#getConnection(net.sf.hajdbc.Database)
083: */
084: @Override
085: public Connection getConnection(Database<D> database)
086: throws SQLException {
087: synchronized (this .connectionMap) {
088: Connection connection = this .connectionMap.get(database);
089:
090: if (connection == null) {
091: connection = database.connect(database
092: .createConnectionFactory());
093:
094: this .connectionMap.put(database, connection);
095: }
096:
097: return connection;
098: }
099: }
100:
101: /**
102: * @see net.sf.hajdbc.SynchronizationContext#getSourceDatabase()
103: */
104: @Override
105: public Database<D> getSourceDatabase() {
106: return this .sourceDatabase;
107: }
108:
109: /**
110: * @see net.sf.hajdbc.SynchronizationContext#getTargetDatabase()
111: */
112: @Override
113: public Database<D> getTargetDatabase() {
114: return this .targetDatabase;
115: }
116:
117: /**
118: * @see net.sf.hajdbc.SynchronizationContext#getActiveDatabaseSet()
119: */
120: @Override
121: public Set<Database<D>> getActiveDatabaseSet() {
122: return this .activeDatabaseSet;
123: }
124:
125: /**
126: * @see net.sf.hajdbc.SynchronizationContext#getDatabaseProperties()
127: */
128: @Override
129: public DatabaseProperties getDatabaseProperties() {
130: return this .databaseProperties;
131: }
132:
133: /**
134: * @see net.sf.hajdbc.SynchronizationContext#getDialect()
135: */
136: @Override
137: public Dialect getDialect() {
138: return this .cluster.getDialect();
139: }
140:
141: /**
142: * @see net.sf.hajdbc.SynchronizationContext#getExecutor()
143: */
144: @Override
145: public ExecutorService getExecutor() {
146: return this .executor;
147: }
148:
149: /**
150: * @see net.sf.hajdbc.SynchronizationContext#close()
151: */
152: @Override
153: public void close() {
154: for (Connection connection : this .connectionMap.values()) {
155: if (connection != null) {
156: try {
157: if (!connection.isClosed()) {
158: connection.close();
159: }
160: } catch (SQLException e) {
161: logger.warn(e.toString(), e);
162: }
163: }
164: }
165:
166: this.executor.shutdown();
167: }
168: }
|