001: /*
002: * HA-JDBC: High-Availability JDBC
003: * Copyright (c) 2004-2007 Paul Ferraro
004: *
005: * This library is free software; you can redistribute it and/or modify it
006: * under the terms of the GNU Lesser General Public License as published by the
007: * Free Software Foundation; either version 2.1 of the License, or (at your
008: * option) any later version.
009: *
010: * This library is distributed in the hope that it will be useful, but WITHOUT
011: * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
012: * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
013: * for more details.
014: *
015: * You should have received a copy of the GNU Lesser General Public License
016: * along with this library; if not, write to the Free Software Foundation,
017: * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
018: *
019: * Contact: ferraro@users.sourceforge.net
020: */
021: package net.sf.hajdbc.sync;
022:
023: import java.sql.Connection;
024: import java.sql.PreparedStatement;
025: import java.sql.ResultSet;
026: import java.sql.SQLException;
027: import java.sql.Statement;
028: import java.util.ArrayList;
029: import java.util.Arrays;
030: import java.util.Collection;
031: import java.util.Collections;
032: import java.util.LinkedHashSet;
033: import java.util.List;
034: import java.util.Map;
035: import java.util.Set;
036: import java.util.TreeMap;
037: import java.util.concurrent.Callable;
038: import java.util.concurrent.ExecutionException;
039: import java.util.concurrent.ExecutorService;
040: import java.util.concurrent.Future;
041:
042: import net.sf.hajdbc.Dialect;
043: import net.sf.hajdbc.Messages;
044: import net.sf.hajdbc.SynchronizationContext;
045: import net.sf.hajdbc.SynchronizationStrategy;
046: import net.sf.hajdbc.TableProperties;
047: import net.sf.hajdbc.UniqueConstraint;
048: import net.sf.hajdbc.util.SQLExceptionFactory;
049: import net.sf.hajdbc.util.Strings;
050:
051: import org.slf4j.Logger;
052: import org.slf4j.LoggerFactory;
053:
054: /**
055: * Database-independent synchronization strategy that only updates differences between two databases.
056: * This strategy is best used when there are <em>few</em> differences between the active database and the inactive database (i.e. barely out of sync).
057: * The following algorithm is used:
058: * <ol>
059: * <li>Drop the foreign keys on the inactive database (to avoid integrity constraint violations)</li>
060: * <li>For each database table:
061: * <ol>
062: * <li>Drop the unique constraints on the table (to avoid integrity constraint violations)</li>
063: * <li>Find the primary key(s) of the table</li>
064: * <li>Query all rows in the inactive database table, sorting by the primary key(s)</li>
065: * <li>Query all rows on the active database table</li>
066: * <li>For each row in table:
067: * <ol>
068: * <li>If primary key of the rows are the same, determine whether or not row needs to be updated</li>
069: * <li>Otherwise, determine whether row should be deleted, or a new row is to be inserted</li>
070: * </ol>
071: * </li>
072: * <li>Re-create the unique constraints on the table (to avoid integrity constraint violations)</li>
073: * </ol>
074: * </li>
075: * <li>Re-create the foreign keys on the inactive database</li>
076: * <li>Synchronize sequences</li>
077: * </ol>
078: * @author Paul Ferraro
079: * @version $Revision: 2012 $
080: * @since 1.0
081: */
082: public class DifferentialSynchronizationStrategy implements
083: SynchronizationStrategy {
084: private static Logger logger = LoggerFactory
085: .getLogger(DifferentialSynchronizationStrategy.class);
086:
087: private int fetchSize = 0;
088: private int maxBatchSize = 100;
089:
090: /**
091: * @see net.sf.hajdbc.SynchronizationStrategy#synchronize(net.sf.hajdbc.SynchronizationContext)
092: */
093: @Override
094: public <D> void synchronize(SynchronizationContext<D> context)
095: throws SQLException {
096: Map<Short, String> primaryKeyColumnMap = new TreeMap<Short, String>();
097: Set<Integer> primaryKeyColumnIndexSet = new LinkedHashSet<Integer>();
098:
099: Connection sourceConnection = context.getConnection(context
100: .getSourceDatabase());
101: Connection targetConnection = context.getConnection(context
102: .getTargetDatabase());
103:
104: Dialect dialect = context.getDialect();
105:
106: ExecutorService executor = context.getExecutor();
107:
108: targetConnection.setAutoCommit(true);
109:
110: SynchronizationSupport.dropForeignKeys(context);
111:
112: try {
113: for (TableProperties table : context
114: .getDatabaseProperties().getTables()) {
115: SynchronizationSupport.dropUniqueConstraints(context,
116: table);
117:
118: targetConnection.setAutoCommit(false);
119:
120: primaryKeyColumnMap.clear();
121: primaryKeyColumnIndexSet.clear();
122:
123: String tableName = table.getName();
124:
125: UniqueConstraint primaryKey = table.getPrimaryKey();
126:
127: if (primaryKey == null) {
128: throw new SQLException(Messages.getMessage(
129: Messages.PRIMARY_KEY_REQUIRED, this
130: .getClass().getName(), tableName));
131: }
132:
133: List<String> primaryKeyColumnList = primaryKey
134: .getColumnList();
135:
136: Collection<String> columns = table.getColumns();
137:
138: // List of colums for select statement - starting with primary key
139: List<String> columnList = new ArrayList<String>(columns
140: .size());
141:
142: columnList.addAll(primaryKeyColumnList);
143:
144: for (String column : columns) {
145: if (!primaryKeyColumnList.contains(column)) {
146: columnList.add(column);
147: }
148: }
149:
150: List<String> nonPrimaryKeyColumnList = columnList
151: .subList(primaryKeyColumnList.size(),
152: columnList.size());
153:
154: String commaDelimitedColumns = Strings.join(columnList,
155: Strings.PADDED_COMMA);
156:
157: // Retrieve table rows in primary key order
158: final String selectSQL = "SELECT " + commaDelimitedColumns + " FROM " + tableName + " ORDER BY " + Strings.join(primaryKeyColumnList, Strings.PADDED_COMMA); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
159:
160: final Statement targetStatement = targetConnection
161: .createStatement();
162:
163: targetStatement.setFetchSize(this .fetchSize);
164:
165: logger.debug(selectSQL);
166:
167: Callable<ResultSet> callable = new Callable<ResultSet>() {
168: public ResultSet call() throws SQLException {
169: return targetStatement.executeQuery(selectSQL);
170: }
171: };
172:
173: Future<ResultSet> future = executor.submit(callable);
174:
175: Statement sourceStatement = sourceConnection
176: .createStatement();
177: sourceStatement.setFetchSize(this .fetchSize);
178:
179: ResultSet sourceResultSet = sourceStatement
180: .executeQuery(selectSQL);
181:
182: ResultSet inactiveResultSet = future.get();
183:
184: String primaryKeyWhereClause = " WHERE " + Strings.join(primaryKeyColumnList, " = ? AND ") + " = ?"; //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
185:
186: // Construct DELETE SQL
187: String deleteSQL = "DELETE FROM " + tableName + primaryKeyWhereClause; //$NON-NLS-1$
188:
189: logger.debug(deleteSQL);
190:
191: PreparedStatement deleteStatement = targetConnection
192: .prepareStatement(deleteSQL);
193:
194: // Construct INSERT SQL
195: String insertSQL = "INSERT INTO " + tableName + " (" + commaDelimitedColumns + ") VALUES (" + Strings.join(Collections.nCopies(columnList.size(), Strings.QUESTION), Strings.PADDED_COMMA) + ")"; //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ //$NON-NLS-4$
196:
197: logger.debug(insertSQL);
198:
199: PreparedStatement insertStatement = targetConnection
200: .prepareStatement(insertSQL);
201:
202: // Construct UPDATE SQL
203: String updateSQL = "UPDATE " + tableName + " SET " + Strings.join(nonPrimaryKeyColumnList, " = ?, ") + " = ?" + primaryKeyWhereClause; //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ //$NON-NLS-4$
204:
205: logger.debug(updateSQL);
206:
207: PreparedStatement updateStatement = targetConnection
208: .prepareStatement(updateSQL);
209:
210: boolean hasMoreActiveResults = sourceResultSet.next();
211: boolean hasMoreInactiveResults = inactiveResultSet
212: .next();
213:
214: int insertCount = 0;
215: int updateCount = 0;
216: int deleteCount = 0;
217:
218: while (hasMoreActiveResults || hasMoreInactiveResults) {
219: int compare = 0;
220:
221: if (!hasMoreActiveResults) {
222: compare = 1;
223: } else if (!hasMoreInactiveResults) {
224: compare = -1;
225: } else {
226: for (int i = 1; i <= primaryKeyColumnList
227: .size(); ++i) {
228: Object activeObject = sourceResultSet
229: .getObject(i);
230: Object inactiveObject = inactiveResultSet
231: .getObject(i);
232:
233: // We assume that the primary keys column types are Comparable
234: compare = this .compare(activeObject,
235: inactiveObject);
236:
237: if (compare != 0) {
238: break;
239: }
240: }
241: }
242:
243: if (compare > 0) {
244: deleteStatement.clearParameters();
245:
246: for (int i = 1; i <= primaryKeyColumnList
247: .size(); ++i) {
248: int type = dialect.getColumnType(table
249: .getColumnProperties(columnList
250: .get(i - 1)));
251:
252: deleteStatement.setObject(i,
253: inactiveResultSet.getObject(i),
254: type);
255: }
256:
257: deleteStatement.addBatch();
258:
259: deleteCount += 1;
260:
261: if ((deleteCount % this .maxBatchSize) == 0) {
262: deleteStatement.executeBatch();
263: deleteStatement.clearBatch();
264: }
265: } else if (compare < 0) {
266: insertStatement.clearParameters();
267:
268: for (int i = 1; i <= columnList.size(); ++i) {
269: int type = dialect.getColumnType(table
270: .getColumnProperties(columnList
271: .get(i - 1)));
272:
273: Object object = SynchronizationSupport
274: .getObject(sourceResultSet, i, type);
275:
276: if (sourceResultSet.wasNull()) {
277: insertStatement.setNull(i, type);
278: } else {
279: insertStatement.setObject(i, object,
280: type);
281: }
282: }
283:
284: insertStatement.addBatch();
285:
286: insertCount += 1;
287:
288: if ((insertCount % this .maxBatchSize) == 0) {
289: insertStatement.executeBatch();
290: insertStatement.clearBatch();
291: }
292: } else // if (compare == 0)
293: {
294: updateStatement.clearParameters();
295:
296: boolean updated = false;
297:
298: for (int i = primaryKeyColumnList.size() + 1; i <= columnList
299: .size(); ++i) {
300: int type = dialect.getColumnType(table
301: .getColumnProperties(columnList
302: .get(i - 1)));
303:
304: Object activeObject = SynchronizationSupport
305: .getObject(sourceResultSet, i, type);
306: Object inactiveObject = SynchronizationSupport
307: .getObject(inactiveResultSet, i,
308: type);
309:
310: int index = i - primaryKeyColumnList.size();
311:
312: if (sourceResultSet.wasNull()) {
313: updateStatement.setNull(index, type);
314:
315: updated |= !inactiveResultSet.wasNull();
316: } else {
317: updateStatement.setObject(index,
318: activeObject, type);
319:
320: updated |= inactiveResultSet.wasNull();
321: updated |= !equals(activeObject,
322: inactiveObject);
323: }
324: }
325:
326: if (updated) {
327: for (int i = 1; i <= primaryKeyColumnList
328: .size(); ++i) {
329: int type = dialect.getColumnType(table
330: .getColumnProperties(columnList
331: .get(i - 1)));
332:
333: updateStatement.setObject(i
334: + nonPrimaryKeyColumnList
335: .size(),
336: inactiveResultSet.getObject(i),
337: type);
338: }
339:
340: updateStatement.addBatch();
341:
342: updateCount += 1;
343:
344: if ((updateCount % this .maxBatchSize) == 0) {
345: updateStatement.executeBatch();
346: updateStatement.clearBatch();
347: }
348: }
349: }
350:
351: if (hasMoreActiveResults && (compare <= 0)) {
352: hasMoreActiveResults = sourceResultSet.next();
353: }
354:
355: if (hasMoreInactiveResults && (compare >= 0)) {
356: hasMoreInactiveResults = inactiveResultSet
357: .next();
358: }
359: }
360:
361: if ((deleteCount % this .maxBatchSize) > 0) {
362: deleteStatement.executeBatch();
363: }
364:
365: deleteStatement.close();
366:
367: if ((insertCount % this .maxBatchSize) > 0) {
368: insertStatement.executeBatch();
369: }
370:
371: insertStatement.close();
372:
373: if ((updateCount % this .maxBatchSize) > 0) {
374: updateStatement.executeBatch();
375: }
376:
377: updateStatement.close();
378:
379: targetStatement.close();
380: sourceStatement.close();
381:
382: targetConnection.commit();
383:
384: targetConnection.setAutoCommit(true);
385:
386: SynchronizationSupport.restoreUniqueConstraints(
387: context, table);
388:
389: logger.info(Messages.getMessage(Messages.INSERT_COUNT,
390: insertCount, tableName));
391: logger.info(Messages.getMessage(Messages.UPDATE_COUNT,
392: updateCount, tableName));
393: logger.info(Messages.getMessage(Messages.DELETE_COUNT,
394: deleteCount, tableName));
395: }
396: } catch (ExecutionException e) {
397: SynchronizationSupport.rollback(targetConnection);
398:
399: throw SQLExceptionFactory.createSQLException(e.getCause());
400: } catch (InterruptedException e) {
401: SynchronizationSupport.rollback(targetConnection);
402:
403: throw SQLExceptionFactory.createSQLException(e.getCause());
404: } catch (SQLException e) {
405: SynchronizationSupport.rollback(targetConnection);
406:
407: throw e;
408: }
409:
410: SynchronizationSupport.restoreForeignKeys(context);
411:
412: SynchronizationSupport.synchronizeIdentityColumns(context);
413: SynchronizationSupport.synchronizeSequences(context);
414: }
415:
416: private boolean equals(Object object1, Object object2) {
417: if ((object1 instanceof byte[]) && (object2 instanceof byte[])) {
418: byte[] bytes1 = (byte[]) object1;
419: byte[] bytes2 = (byte[]) object2;
420:
421: if (bytes1.length != bytes2.length) {
422: return false;
423: }
424:
425: return Arrays.equals(bytes1, bytes2);
426: }
427:
428: return object1.equals(object2);
429: }
430:
431: @SuppressWarnings("unchecked")
432: private int compare(Object object1, Object object2) {
433: return ((Comparable) object1).compareTo(object2);
434: }
435:
436: /**
437: * @return the fetchSize.
438: */
439: public int getFetchSize() {
440: return this .fetchSize;
441: }
442:
443: /**
444: * @param fetchSize the fetchSize to set.
445: */
446: public void setFetchSize(int fetchSize) {
447: this .fetchSize = fetchSize;
448: }
449:
450: /**
451: * @return Returns the maxBatchSize.
452: */
453: public int getMaxBatchSize() {
454: return this .maxBatchSize;
455: }
456:
457: /**
458: * @param maxBatchSize The maxBatchSize to set.
459: */
460: public void setMaxBatchSize(int maxBatchSize) {
461: this.maxBatchSize = maxBatchSize;
462: }
463: }
|