001: //** Copyright Statement ***************************************************
002: //The Salmon Open Framework for Internet Applications (SOFIA)
003: // Copyright (C) 1999 - 2002, Salmon LLC
004: //
005: // This program is free software; you can redistribute it and/or
006: // modify it under the terms of the GNU General Public License version 2
007: // as published by the Free Software Foundation;
008: //
009: // This program is distributed in the hope that it will be useful,
010: // but WITHOUT ANY WARRANTY; without even the implied warranty of
011: // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
012: // GNU General Public License for more details.
013: //
014: // You should have received a copy of the GNU General Public License
015: // along with this program; if not, write to the Free Software
016: // Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
017: //
018: // For more information please visit http://www.salmonllc.com
019: //** End Copyright Statement ***************************************************
020: package com.salmonllc.sql;
021:
022: import com.salmonllc.util.MessageLog;
023:
024: import java.util.Vector;
025:
026: /**
027: * This class is can be used to pipe data from one database to another assuming that the identical tables exist in both databases
028: */
029: public class DataPipe {
030: String _source, _dest, _application;
031: Vector _tables = new Vector();
032: DataDictionary _dataDict;
033: boolean _continueOnError = false;
034: int _maxRows = -1;
035: boolean _updateTablesWithData = true;
036:
037: /**
038: * Constructs a new DataPipe
039: * @param application The application that this is running under
040: * @param source The profile name for the source database
041: * @param dest The profile name for the destination database
042: * @param tables A vector of tables to pipe over. Leave null for all tables
043: */
044: public DataPipe(String application, String source, String dest,
045: Vector tables) {
046: _source = source;
047: _dest = dest;
048: _tables = tables;
049: _application = application;
050: _dataDict = new DataDictionary(application, source);
051: if (_tables == null)
052: _tables = _dataDict.getTableNames();
053: }
054:
055: /**
056: * Constructs a new DataPipe that will pipe all data from one db to another
057: * @param application The application that this is running under
058: * @param source The profile name for the source database
059: * @param dest The profile name for the destination database
060: */
061: public DataPipe(String application, String source, String dest) {
062: this (application, source, dest, null);
063: }
064:
065: /**
066: * Set to true if you want the pipe to empty the table before copying the data to it.
067: * Set to false to skip tables that already have data in them
068: * @param updateTablesWithData
069: */
070: public void setUpdateTablesWithData(boolean updateTablesWithData) {
071: _updateTablesWithData = updateTablesWithData;
072: }
073:
074: /**
075: * Copies the data for the specifiec tables from one database to another
076: * @throws Exception
077: */
078: public void pipeData() throws Exception {
079: MessageLog.writeInfoMessage("Beginning Datapipe process", this );
080: DBConnection dest = null;
081: boolean doRollback = true;
082: try {
083: dest = DBConnection.getConnection(_application, _dest);
084: java.sql.Statement st = dest.createStatement();
085: for (int i = 0; i < _tables.size(); i++) {
086: try {
087: String table = (String) _tables.elementAt(i);
088:
089: DataStore ds = buildDs(table);
090: if (!_updateTablesWithData) {
091: int count = ds.estimateRowsRetrieved(dest);
092: if (count > 0) {
093: MessageLog
094: .writeInfoMessage(
095: "Table:"
096: + table
097: + " SKIPPED. Target table already has data.",
098: this );
099: continue;
100: }
101: }
102: if (_maxRows != -1) {
103: int count = ds.estimateRowsRetrieved();
104: if (count > _maxRows) {
105: MessageLog.writeInfoMessage("Table:"
106: + table
107: + " SKIPPED. It has too many rows("
108: + count + ")", this );
109: continue;
110: }
111: }
112: MessageLog.writeInfoMessage(
113: "Loading data for table:" + table, this );
114: ds.retrieve();
115: MessageLog.writeInfoMessage("Loaded:"
116: + ds.getRowCount() + " rows for:" + table,
117: this );
118: for (int j = 0; j < ds.getRowCount(); j++)
119: ds.setRowStatus(j,
120: DataStore.STATUS_NEW_MODIFIED);
121: dest.beginTransaction();
122: doRollback = true;
123: st.executeUpdate("DELETE FROM " + table);
124: ds.update(dest);
125: MessageLog.writeInfoMessage("Updated:"
126: + ds.getRowCount() + " rows for:" + table,
127: this );
128: doRollback = false;
129: dest.commit();
130: } catch (Exception e) {
131: if (doRollback)
132: dest.rollback();
133: if (!_continueOnError)
134: throw (e);
135: else
136: MessageLog.writeErrorMessage(
137: "Error Piping Data", e, this );
138: }
139: }
140: st.close();
141: } catch (Exception e) {
142: throw (e);
143: } finally {
144: if (dest != null)
145: dest.freeConnection();
146: }
147: }
148:
149: private DataStore buildDs(String tableName) {
150: java.util.Vector columns = _dataDict.getColumns(tableName);
151: DataStore ds = new DataStore(_application, _source);
152: ds.setUseBindForUpdate(true);
153: ds.setBatchInserts(true);
154: int nColumns = columns.size();
155: for (int i = 0; i < nColumns; i++) {
156: ColumnDefinition column = (ColumnDefinition) columns
157: .elementAt(i);
158: if (column.getDSDataType() == DataStore.DATATYPE_DATE)
159: ds.addColumn(tableName, column.getColumnName(),
160: DataStore.DATATYPE_DATETIME, true, true);
161: else if (column.getDSDataType() == DataStore.DATATYPE_DOUBLE
162: && column.getScale() < 1)
163: ds.addColumn(tableName, column.getColumnName(),
164: DataStore.DATATYPE_INT, true, true);
165: else
166: ds.addColumn(tableName, column.getColumnName(), column
167: .getDSDataType(), true, true);
168: }
169: return ds;
170: }
171:
172: /**
173: * Sets the maximum number of rows any one table will pipe over or -1 for unlimited
174: */
175: public void setMaxRows(int maxRows) {
176: _maxRows = maxRows;
177: }
178:
179: /**
180: * Set to true to continue processing even if there is an error
181: */
182: public void setContinueOnError(boolean continueOnError) {
183: _continueOnError = continueOnError;
184: }
185:
186: }
|