001: /*
002: * HA-JDBC: High-Availability JDBC
003: * Copyright (c) 2004-2007 Paul Ferraro
004: *
005: * This library is free software; you can redistribute it and/or modify it
006: * under the terms of the GNU Lesser General Public License as published by the
007: * Free Software Foundation; either version 2.1 of the License, or (at your
008: * option) any later version.
009: *
010: * This library is distributed in the hope that it will be useful, but WITHOUT
011: * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
012: * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
013: * for more details.
014: *
015: * You should have received a copy of the GNU Lesser General Public License
016: * along with this library; if not, write to the Free Software Foundation,
017: * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
018: *
019: * Contact: ferraro@users.sourceforge.net
020: */
021: package net.sf.hajdbc.sync;
022:
023: import java.sql.Connection;
024: import java.sql.ResultSet;
025: import java.sql.SQLException;
026: import java.sql.Statement;
027: import java.sql.Types;
028: import java.text.MessageFormat;
029: import java.util.Collection;
030: import java.util.HashMap;
031: import java.util.Map;
032: import java.util.Set;
033: import java.util.concurrent.Callable;
034: import java.util.concurrent.ExecutionException;
035: import java.util.concurrent.ExecutorService;
036: import java.util.concurrent.Future;
037:
038: import net.sf.hajdbc.Database;
039: import net.sf.hajdbc.Dialect;
040: import net.sf.hajdbc.ForeignKeyConstraint;
041: import net.sf.hajdbc.Messages;
042: import net.sf.hajdbc.SequenceProperties;
043: import net.sf.hajdbc.SynchronizationContext;
044: import net.sf.hajdbc.TableProperties;
045: import net.sf.hajdbc.UniqueConstraint;
046: import net.sf.hajdbc.util.SQLExceptionFactory;
047: import net.sf.hajdbc.util.Strings;
048:
049: import org.slf4j.Logger;
050: import org.slf4j.LoggerFactory;
051:
052: /**
053: * @author Paul Ferraro
054: *
055: */
056: public final class SynchronizationSupport {
057: private static Logger logger = LoggerFactory
058: .getLogger(SynchronizationSupport.class);
059:
060: private SynchronizationSupport() {
061: // Hide
062: }
063:
064: /**
065: * Drop all foreign key constraints on the target database
066: * @param <D>
067: * @param context a synchronization context
068: * @throws SQLException if database error occurs
069: */
070: public static <D> void dropForeignKeys(
071: SynchronizationContext<D> context) throws SQLException {
072: Collection<TableProperties> tables = context
073: .getDatabaseProperties().getTables();
074:
075: Dialect dialect = context.getDialect();
076:
077: Connection connection = context.getConnection(context
078: .getTargetDatabase());
079:
080: Statement statement = connection.createStatement();
081:
082: for (TableProperties table : tables) {
083: for (ForeignKeyConstraint constraint : table
084: .getForeignKeyConstraints()) {
085: String sql = dialect
086: .getDropForeignKeyConstraintSQL(constraint);
087:
088: logger.debug(sql);
089:
090: statement.addBatch(sql);
091: }
092: }
093:
094: statement.executeBatch();
095: statement.close();
096: }
097:
098: /**
099: * Restores all foreign key constraints on the target database
100: * @param <D>
101: * @param context a synchronization context
102: * @throws SQLException if database error occurs
103: */
104: public static <D> void restoreForeignKeys(
105: SynchronizationContext<D> context) throws SQLException {
106: Collection<TableProperties> tables = context
107: .getDatabaseProperties().getTables();
108:
109: Dialect dialect = context.getDialect();
110:
111: Connection connection = context.getConnection(context
112: .getTargetDatabase());
113:
114: Statement statement = connection.createStatement();
115:
116: for (TableProperties table : tables) {
117: for (ForeignKeyConstraint constraint : table
118: .getForeignKeyConstraints()) {
119: String sql = dialect
120: .getCreateForeignKeyConstraintSQL(constraint);
121:
122: logger.debug(sql);
123:
124: statement.addBatch(sql);
125: }
126: }
127:
128: statement.executeBatch();
129: statement.close();
130: }
131:
132: /**
133: * Synchronizes the sequences on the target database with the source database.
134: * @param <D>
135: * @param context a synchronization context
136: * @throws SQLException if database error occurs
137: */
138: public static <D> void synchronizeSequences(
139: final SynchronizationContext<D> context)
140: throws SQLException {
141: Collection<SequenceProperties> sequences = context
142: .getDatabaseProperties().getSequences();
143:
144: if (!sequences.isEmpty()) {
145: Database<D> sourceDatabase = context.getSourceDatabase();
146:
147: Set<Database<D>> databases = context.getActiveDatabaseSet();
148:
149: ExecutorService executor = context.getExecutor();
150:
151: Dialect dialect = context.getDialect();
152:
153: Map<SequenceProperties, Long> sequenceMap = new HashMap<SequenceProperties, Long>();
154: Map<Database<D>, Future<Long>> futureMap = new HashMap<Database<D>, Future<Long>>();
155:
156: for (SequenceProperties sequence : sequences) {
157: final String sql = dialect
158: .getNextSequenceValueSQL(sequence);
159:
160: logger.debug(sql);
161:
162: for (final Database<D> database : databases) {
163: Callable<Long> task = new Callable<Long>() {
164: public Long call() throws SQLException {
165: Statement statement = context
166: .getConnection(database)
167: .createStatement();
168: ResultSet resultSet = statement
169: .executeQuery(sql);
170:
171: resultSet.next();
172:
173: long value = resultSet.getLong(1);
174:
175: statement.close();
176:
177: return value;
178: }
179: };
180:
181: futureMap.put(database, executor.submit(task));
182: }
183:
184: try {
185: Long sourceValue = futureMap.get(sourceDatabase)
186: .get();
187:
188: sequenceMap.put(sequence, sourceValue);
189:
190: for (Database<D> database : databases) {
191: if (!database.equals(sourceDatabase)) {
192: Long value = futureMap.get(database).get();
193:
194: if (!value.equals(sourceValue)) {
195: throw new SQLException(
196: Messages
197: .getMessage(
198: Messages.SEQUENCE_OUT_OF_SYNC,
199: sequence,
200: database,
201: value,
202: sourceDatabase,
203: sourceValue));
204: }
205: }
206: }
207: } catch (InterruptedException e) {
208: throw SQLExceptionFactory.createSQLException(e);
209: } catch (ExecutionException e) {
210: throw SQLExceptionFactory.createSQLException(e
211: .getCause());
212: }
213: }
214:
215: Connection targetConnection = context.getConnection(context
216: .getTargetDatabase());
217: Statement targetStatement = targetConnection
218: .createStatement();
219:
220: for (SequenceProperties sequence : sequences) {
221: String sql = dialect.getAlterSequenceSQL(sequence,
222: sequenceMap.get(sequence) + 1);
223:
224: logger.debug(sql);
225:
226: targetStatement.addBatch(sql);
227: }
228:
229: targetStatement.executeBatch();
230: targetStatement.close();
231: }
232: }
233:
234: /**
235: * @param <D>
236: * @param context
237: * @throws SQLException
238: */
239: public static <D> void synchronizeIdentityColumns(
240: SynchronizationContext<D> context) throws SQLException {
241: Statement sourceStatement = context.getConnection(
242: context.getSourceDatabase()).createStatement();
243: Statement targetStatement = context.getConnection(
244: context.getTargetDatabase()).createStatement();
245:
246: Dialect dialect = context.getDialect();
247:
248: for (TableProperties table : context.getDatabaseProperties()
249: .getTables()) {
250: Collection<String> columns = table.getIdentityColumns();
251:
252: if (!columns.isEmpty()) {
253: String selectSQL = MessageFormat
254: .format(
255: "SELECT max({0}) FROM {1}", Strings.join(columns, "), max("), table.getName()); //$NON-NLS-1$ //$NON-NLS-2$
256:
257: logger.debug(selectSQL);
258:
259: Map<String, Long> map = new HashMap<String, Long>();
260:
261: ResultSet resultSet = sourceStatement
262: .executeQuery(selectSQL);
263:
264: if (resultSet.next()) {
265: int i = 0;
266:
267: for (String column : columns) {
268: map.put(column, resultSet.getLong(++i));
269: }
270: }
271:
272: resultSet.close();
273:
274: if (!map.isEmpty()) {
275: for (Map.Entry<String, Long> mapEntry : map
276: .entrySet()) {
277: String alterSQL = dialect
278: .getAlterIdentityColumnSQL(table, table
279: .getColumnProperties(mapEntry
280: .getKey()), mapEntry
281: .getValue() + 1);
282:
283: if (alterSQL != null) {
284: logger.debug(alterSQL);
285:
286: targetStatement.addBatch(alterSQL);
287: }
288: }
289:
290: targetStatement.executeBatch();
291: }
292: }
293: }
294:
295: sourceStatement.close();
296: targetStatement.close();
297: }
298:
299: /**
300: * @param <D>
301: * @param context
302: * @param table
303: * @throws SQLException
304: */
305: public static <D> void dropUniqueConstraints(
306: SynchronizationContext<D> context, TableProperties table)
307: throws SQLException {
308: Collection<UniqueConstraint> constraints = table
309: .getUniqueConstraints();
310:
311: constraints.remove(table.getPrimaryKey());
312:
313: Dialect dialect = context.getDialect();
314:
315: Connection connection = context.getConnection(context
316: .getTargetDatabase());
317:
318: Statement statement = connection.createStatement();
319:
320: // Drop unique constraints on the current table
321: for (UniqueConstraint constraint : constraints) {
322: String sql = dialect.getDropUniqueConstraintSQL(constraint);
323:
324: logger.debug(sql);
325:
326: statement.addBatch(sql);
327: }
328:
329: statement.executeBatch();
330: statement.close();
331: }
332:
333: /**
334: * @param <D>
335: * @param context
336: * @param table
337: * @throws SQLException
338: */
339: public static <D> void restoreUniqueConstraints(
340: SynchronizationContext<D> context, TableProperties table)
341: throws SQLException {
342: Collection<UniqueConstraint> constraints = table
343: .getUniqueConstraints();
344:
345: constraints.remove(table.getPrimaryKey());
346:
347: Dialect dialect = context.getDialect();
348:
349: Connection connection = context.getConnection(context
350: .getTargetDatabase());
351:
352: Statement statement = connection.createStatement();
353:
354: // Drop unique constraints on the current table
355: for (UniqueConstraint constraint : constraints) {
356: String sql = dialect
357: .getCreateUniqueConstraintSQL(constraint);
358:
359: logger.debug(sql);
360:
361: statement.addBatch(sql);
362: }
363:
364: statement.executeBatch();
365: statement.close();
366: }
367:
368: /**
369: * @param connection
370: */
371: public static void rollback(Connection connection) {
372: try {
373: connection.rollback();
374: connection.setAutoCommit(true);
375: } catch (SQLException e) {
376: logger.warn(e.toString(), e);
377: }
378: }
379:
380: /**
381: * Helper method for {@link java.sql.ResultSet#getObject(int)} with special handling for large objects.
382: * @param resultSet
383: * @param index
384: * @param type
385: * @return the object of the specified type at the specified index from the specified result set
386: * @throws SQLException
387: */
388: public static Object getObject(ResultSet resultSet, int index,
389: int type) throws SQLException {
390: switch (type) {
391: case Types.BLOB: {
392: return resultSet.getBlob(index);
393: }
394: case Types.CLOB: {
395: return resultSet.getClob(index);
396: }
397: default: {
398: return resultSet.getObject(index);
399: }
400: }
401: }
402: }
|