001: /*
002: * HA-JDBC: High-Availability JDBC
003: * Copyright (c) 2004-2007 Paul Ferraro
004: *
005: * This library is free software; you can redistribute it and/or modify it
006: * under the terms of the GNU Lesser General Public License as published by the
007: * Free Software Foundation; either version 2.1 of the License, or (at your
008: * option) any later version.
009: *
010: * This library is distributed in the hope that it will be useful, but WITHOUT
011: * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
012: * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
013: * for more details.
014: *
015: * You should have received a copy of the GNU Lesser General Public License
016: * along with this library; if not, write to the Free Software Foundation,
017: * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
018: *
019: * Contact: ferraro@users.sourceforge.net
020: */
021: package net.sf.hajdbc.sync;
022:
023: import java.sql.Connection;
024: import java.sql.PreparedStatement;
025: import java.sql.ResultSet;
026: import java.sql.SQLException;
027: import java.sql.Statement;
028: import java.util.Collection;
029: import java.util.Collections;
030: import java.util.concurrent.Callable;
031: import java.util.concurrent.ExecutionException;
032: import java.util.concurrent.ExecutorService;
033: import java.util.concurrent.Future;
034:
035: import net.sf.hajdbc.Dialect;
036: import net.sf.hajdbc.Messages;
037: import net.sf.hajdbc.SynchronizationContext;
038: import net.sf.hajdbc.SynchronizationStrategy;
039: import net.sf.hajdbc.TableProperties;
040: import net.sf.hajdbc.util.SQLExceptionFactory;
041: import net.sf.hajdbc.util.Strings;
042:
043: import org.slf4j.Logger;
044: import org.slf4j.LoggerFactory;
045:
046: /**
047: * Database-independent synchronization strategy that only updates differences between two databases.
048: * This strategy is best used when there are <em>many</em> differences between the active database and the inactive database (i.e. very much out of sync).
049: * The following algorithm is used:
050: * <ol>
051: * <li>Drop the foreign keys on the inactive database (to avoid integrity constraint violations)</li>
052: * <li>For each database table:
053: * <ol>
054: * <li>Delete all rows in the inactive database table</li>
055: * <li>Query all rows on the active database table</li>
056: * <li>For each row in active database table:
057: * <ol>
058: * <li>Insert new row into inactive database table</li>
059: * </ol>
060: * </li>
061: * </ol>
062: * </li>
063: * <li>Re-create the foreign keys on the inactive database</li>
064: * <li>Synchronize sequences</li>
065: * </ol>
066: * @author Paul Ferraro
067: * @version $Revision: 2012 $
068: * @since 1.0
069: */
070: public class FullSynchronizationStrategy implements
071: SynchronizationStrategy {
072: private static Logger logger = LoggerFactory
073: .getLogger(FullSynchronizationStrategy.class);
074:
075: private int maxBatchSize = 100;
076: private int fetchSize = 0;
077:
078: /**
079: * @see net.sf.hajdbc.SynchronizationStrategy#synchronize(net.sf.hajdbc.SynchronizationContext)
080: */
081: @Override
082: public <D> void synchronize(SynchronizationContext<D> context)
083: throws SQLException {
084: Connection sourceConnection = context.getConnection(context
085: .getSourceDatabase());
086: Connection targetConnection = context.getConnection(context
087: .getTargetDatabase());
088:
089: Dialect dialect = context.getDialect();
090: ExecutorService executor = context.getExecutor();
091:
092: targetConnection.setAutoCommit(true);
093:
094: SynchronizationSupport.dropForeignKeys(context);
095:
096: targetConnection.setAutoCommit(false);
097:
098: try {
099: for (TableProperties table : context
100: .getDatabaseProperties().getTables()) {
101: String tableName = table.getName();
102: Collection<String> columns = table.getColumns();
103:
104: String commaDelimitedColumns = Strings.join(columns,
105: Strings.PADDED_COMMA);
106:
107: final String selectSQL = "SELECT " + commaDelimitedColumns + " FROM " + tableName; //$NON-NLS-1$ //$NON-NLS-2$
108:
109: final Statement selectStatement = sourceConnection
110: .createStatement();
111: selectStatement.setFetchSize(this .fetchSize);
112:
113: Callable<ResultSet> callable = new Callable<ResultSet>() {
114: public ResultSet call() throws SQLException {
115: return selectStatement.executeQuery(selectSQL);
116: }
117: };
118:
119: Future<ResultSet> future = executor.submit(callable);
120:
121: String deleteSQL = dialect.getTruncateTableSQL(table);
122:
123: logger.debug(deleteSQL);
124:
125: Statement deleteStatement = targetConnection
126: .createStatement();
127:
128: int deletedRows = deleteStatement
129: .executeUpdate(deleteSQL);
130:
131: logger.info(Messages.getMessage(Messages.DELETE_COUNT,
132: deletedRows, tableName));
133:
134: deleteStatement.close();
135:
136: ResultSet resultSet = future.get();
137:
138: String insertSQL = "INSERT INTO " + tableName + " (" + commaDelimitedColumns + ") VALUES (" + Strings.join(Collections.nCopies(columns.size(), Strings.QUESTION), Strings.PADDED_COMMA) + ")"; //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ //$NON-NLS-4$
139:
140: logger.debug(insertSQL);
141:
142: PreparedStatement insertStatement = targetConnection
143: .prepareStatement(insertSQL);
144: int statementCount = 0;
145:
146: while (resultSet.next()) {
147: int index = 0;
148:
149: for (String column : columns) {
150: index += 1;
151:
152: int type = dialect.getColumnType(table
153: .getColumnProperties(column));
154:
155: Object object = SynchronizationSupport
156: .getObject(resultSet, index, type);
157:
158: if (resultSet.wasNull()) {
159: insertStatement.setNull(index, type);
160: } else {
161: insertStatement.setObject(index, object,
162: type);
163: }
164: }
165:
166: insertStatement.addBatch();
167: statementCount += 1;
168:
169: if ((statementCount % this .maxBatchSize) == 0) {
170: insertStatement.executeBatch();
171: insertStatement.clearBatch();
172: }
173:
174: insertStatement.clearParameters();
175: }
176:
177: if ((statementCount % this .maxBatchSize) > 0) {
178: insertStatement.executeBatch();
179: }
180:
181: logger.info(Messages.getMessage(Messages.INSERT_COUNT,
182: statementCount, tableName));
183:
184: insertStatement.close();
185: selectStatement.close();
186:
187: targetConnection.commit();
188: }
189: } catch (InterruptedException e) {
190: SynchronizationSupport.rollback(targetConnection);
191:
192: SQLExceptionFactory.createSQLException(e);
193: } catch (ExecutionException e) {
194: SynchronizationSupport.rollback(targetConnection);
195:
196: SQLExceptionFactory.createSQLException(e.getCause());
197: } catch (SQLException e) {
198: SynchronizationSupport.rollback(targetConnection);
199:
200: throw e;
201: }
202:
203: targetConnection.setAutoCommit(true);
204:
205: SynchronizationSupport.restoreForeignKeys(context);
206:
207: SynchronizationSupport.synchronizeIdentityColumns(context);
208: SynchronizationSupport.synchronizeSequences(context);
209: }
210:
211: /**
212: * @return the fetchSize.
213: */
214: public int getFetchSize() {
215: return this .fetchSize;
216: }
217:
218: /**
219: * @param fetchSize the fetchSize to set.
220: */
221: public void setFetchSize(int fetchSize) {
222: this .fetchSize = fetchSize;
223: }
224:
225: /**
226: * @return the maxBatchSize.
227: */
228: public int getMaxBatchSize() {
229: return this .maxBatchSize;
230: }
231:
232: /**
233: * @param maxBatchSize the maxBatchSize to set.
234: */
235: public void setMaxBatchSize(int maxBatchSize) {
236: this.maxBatchSize = maxBatchSize;
237: }
238: }
|