001: /*
002: * DataCopier.java
003: *
004: * This file is part of SQL Workbench/J, http://www.sql-workbench.net
005: *
006: * Copyright 2002-2008, Thomas Kellerer
007: * No part of this code maybe reused without the permission of the author
008: *
009: * To contact the author please send an email to: support@sql-workbench.net
010: *
011: */
012: package workbench.db.datacopy;
013:
014: import java.sql.SQLException;
015: import java.util.ArrayList;
016: import java.util.Collection;
017: import java.util.HashMap;
018: import java.util.List;
019: import java.util.Map;
020: import workbench.db.ColumnIdentifier;
021: import workbench.db.DbMetadata;
022: import workbench.db.TableCreator;
023: import workbench.db.TableDropper;
024: import workbench.db.TableIdentifier;
025: import workbench.db.WbConnection;
026: import workbench.db.importer.DataImporter;
027: import workbench.db.importer.RowDataProducer;
028: import workbench.db.importer.RowDataReceiver;
029: import workbench.db.importer.TableStatements;
030: import workbench.interfaces.BatchCommitter;
031: import workbench.interfaces.JobErrorHandler;
032: import workbench.interfaces.ProgressReporter;
033: import workbench.util.ExceptionUtil;
034: import workbench.log.LogMgr;
035: import workbench.resource.ResourceMgr;
036: import workbench.storage.RowActionMonitor;
037: import workbench.util.MessageBuffer;
038: import workbench.util.SqlUtil;
039: import workbench.util.WbThread;
040:
041: /**
042: *
043: * @author support@sql-workbench.net
044: */
045: public class DataCopier implements BatchCommitter, ProgressReporter {
046: private WbConnection sourceConnection;
047: private WbConnection targetConnection;
048:
049: private RowDataProducer sourceData;
050:
051: private TableIdentifier sourceTable;
052: private TableIdentifier targetTable;
053:
054: private DataImporter importer;
055:
056: // the columnMap will contain elements of type ColumnIdentifier
057: private HashMap<ColumnIdentifier, ColumnIdentifier> columnMap;
058:
059: private ColumnIdentifier[] targetColumnsForQuery;
060: private MessageBuffer messages = null;
061: private MessageBuffer errors = null;
062:
063: public DataCopier() {
064: this .importer = new DataImporter();
065: }
066:
067: public void reset() {
068: sourceData = null;
069: sourceTable = null;
070: sourceConnection = null;
071: targetTable = null;
072: targetColumnsForQuery = null;
073: targetTable = null;
074: targetConnection = null;
075: columnMap = null;
076: importer.clearMessages();
077: messages = new MessageBuffer();
078: errors = new MessageBuffer();
079: }
080:
081: public void setTransactionControl(boolean flag) {
082: this .importer.setTransactionControl(flag);
083: }
084:
085: public void beginMultiTableCopy() {
086: this .importer.beginMultiTable();
087: }
088:
089: public void endMultiTableCopy() {
090: this .importer.endMultiTable();
091: }
092:
093: public RowDataProducer getSource() {
094: return this .sourceData;
095: }
096:
097: public RowDataReceiver getReceiver() {
098: return this .importer;
099: }
100:
101: public void setKeyColumns(List<ColumnIdentifier> keys) {
102: this .importer.setKeyColumns(keys);
103: }
104:
105: public void setKeyColumns(String keys) {
106: this .importer.setKeyColumns(keys);
107: }
108:
109: public void setPerTableStatements(TableStatements stmt) {
110: this .importer.setPerTableStatements(stmt);
111: }
112:
113: /**
114: * Forwards the setMode() call to the DataImporter.
115: * @see workbench.db.importer.DataImporter#setMode(String)
116: */
117: public boolean setMode(String mode) {
118: return this .importer.setMode(mode);
119: }
120:
121: public void setReportInterval(int value) {
122: this .importer.setReportInterval(value);
123: }
124:
125: public void setProducer(RowDataProducer source,
126: WbConnection target, TableIdentifier targetTbl) {
127: this .sourceConnection = null;
128: this .targetConnection = target;
129: this .importer.setConnection(target);
130: this .targetTable = targetTbl;
131: this .targetColumnsForQuery = null;
132: this .sourceData = source;
133: this .importer.setProducer(source);
134: }
135:
136: /**
137: * Define the source table, the target table and the column mapping
138: * for the copy process.
139: * If the columnMapping is null, the matching columns from both tables are used.
140: * It is expected that the mapping contains String objects. The key is the name of the
141: * source column, the mapped value is the name of the target column
142: */
143: public void copyFromTable(WbConnection source, WbConnection target,
144: TableIdentifier aSourceTable, TableIdentifier aTargetTable,
145: Map<String, String> columnMapping, String additionalWhere,
146: boolean createTable, boolean dropTable) throws SQLException {
147: this .sourceConnection = source;
148: this .targetConnection = target;
149: this .importer.setConnection(target);
150: this .sourceTable = aSourceTable;
151: this .targetTable = aTargetTable;
152: this .targetColumnsForQuery = null;
153:
154: if (!this .sourceConnection.getMetadata().objectExists(
155: aSourceTable, (String) null)) {
156: this .addError(ResourceMgr.getFormattedString(
157: "ErrCopySourceTableNotFound", aSourceTable
158: .getQualifiedName()));
159: throw new SQLException("Table "
160: + aTargetTable.getTableName()
161: + " not found in source connection");
162: }
163:
164: this .initColumnMapping(columnMapping, createTable);
165:
166: if (createTable) {
167: createTable(this .columnMap.values(), dropTable);
168: }
169:
170: this .initImporterForTable(additionalWhere);
171: }
172:
173: private void createTable(Collection<ColumnIdentifier> columns,
174: boolean dropIfExists) throws SQLException {
175: if (dropIfExists) {
176: boolean exists = this .targetConnection.getMetadata()
177: .tableExists(targetTable);
178: if (exists) {
179: try {
180: TableDropper dropper = new TableDropper(
181: this .targetConnection);
182: dropper.dropTable(targetTable);
183: this .addMessage(ResourceMgr.getFormattedString(
184: "MsgCopyTableDropped", targetTable
185: .getQualifiedName()));
186: } catch (SQLException e) {
187: this
188: .addError(ResourceMgr
189: .getFormattedString(
190: "MsgCopyErrorCreatTable",
191: targetTable
192: .getTableExpression(this .targetConnection),
193: ExceptionUtil.getDisplay(e)));
194: throw e;
195: }
196: }
197: }
198:
199: try {
200: TableCreator creator = new TableCreator(
201: this .targetConnection, this .targetTable, columns);
202: creator.useDbmsDataType(this .sourceConnection
203: .getDatabaseProductName().equals(
204: this .targetConnection
205: .getDatabaseProductName()));
206: creator.createTable();
207:
208: // no need to delete rows from a newly created table
209: this .setDeleteTarget(false);
210: this .addMessage(ResourceMgr.getFormattedString(
211: "MsgCopyTableCreated", this .targetTable
212: .getTableExpression(this .targetConnection))
213: + "\n");
214: } catch (SQLException e) {
215: //LogMgr.logError("DataCopier.copyFromTable()", "Error when creating target table", e);
216: this .addError(ResourceMgr.getFormattedString(
217: "MsgCopyErrorCreatTable", targetTable
218: .getTableExpression(this .targetConnection),
219: ExceptionUtil.getDisplay(e)));
220: throw e;
221: }
222: }
223:
224: /**
225: * Copy data from a SQL SELECT result to the given target table.
226: */
227: public void copyFromQuery(WbConnection source, WbConnection target,
228: String aSourceQuery, TableIdentifier aTargetTable,
229: ColumnIdentifier[] queryColumns, boolean createTarget,
230: boolean dropTarget) throws SQLException {
231: if (queryColumns == null || queryColumns.length == 0) {
232: throw new IllegalArgumentException(
233: "Source and target column identifiers must be specified when using a SQL query!");
234: }
235: this .sourceConnection = source;
236: this .targetConnection = target;
237: this .importer.setConnection(target);
238: this .sourceTable = null;
239: this .targetTable = aTargetTable;
240: this .targetColumnsForQuery = queryColumns;
241:
242: if (createTarget) {
243: List<ColumnIdentifier> cols = new ArrayList<ColumnIdentifier>();
244: for (ColumnIdentifier col : queryColumns) {
245: cols.add(col);
246: }
247: createTable(cols, dropTarget);
248: }
249: this .initImporterForQuery(aSourceQuery);
250: }
251:
252: public void setRowActionMonitor(RowActionMonitor rowMonitor) {
253: if (rowMonitor != null) {
254: this .importer.setRowActionMonitor(rowMonitor);
255: }
256: }
257:
258: public void setDeleteTarget(boolean delete) {
259: this .importer.setDeleteTarget(delete);
260: }
261:
262: public void setContinueOnError(boolean cont) {
263: this .importer.setContinueOnError(cont);
264: }
265:
266: public void setUseBatch(boolean flag) {
267: this .importer.setUseBatch(flag);
268: }
269:
270: public void setCommitBatch(boolean flag) {
271: this .importer.setCommitBatch(flag);
272: }
273:
274: public void setBatchSize(int size) {
275: this .importer.setBatchSize(size);
276: }
277:
278: public void commitNothing() {
279: this .importer.commitNothing();
280: }
281:
282: public void setCommitEvery(int interval) {
283: this .importer.setCommitEvery(interval);
284: }
285:
286: public void startBackgroundCopy() {
287: Thread t = new WbThread("DataCopier Thread") {
288: public void run() {
289: try {
290: startCopy();
291: } catch (Throwable th) {
292: }
293: }
294: };
295: t.start();
296: }
297:
298: public long getAffectedRows() {
299: return this .importer.getAffectedRows();
300: }
301:
302: public boolean isSuccess() {
303: return this .importer.isSuccess();
304: }
305:
306: public String getErrorMessage() {
307: if (this .errors == null)
308: return null;
309: return this .errors.toString();
310: }
311:
312: public void startCopy() throws Exception {
313: try {
314: // this will call start() or sourceData.start()
315: // depending on which source we set for the importer
316: this .sourceData.setAbortOnError(!this .importer
317: .getContinueOnError());
318: this .importer.startImport();
319:
320: LogMgr.logInfo("DataCopier.start()",
321: "Copying of data finished. "
322: + this .importer.getInsertedRows()
323: + " row(s) inserted. "
324: + this .importer.getUpdatedRows()
325: + " row(s) updated.");
326: } catch (Exception e) {
327: LogMgr.logError("DataCopier.start()",
328: "Error when copying data", e);
329: String msg = ResourceMgr.getFormattedString("ErrCopy",
330: ExceptionUtil.getDisplay(e, false));
331: this .addError(msg);
332: this .importer.tableImportError();
333: throw e;
334: }
335: }
336:
337: public String getRowsUpdatedMessage() {
338: String msg = null;
339: long rows = this .importer.getUpdatedRows();
340: if (rows > 0) {
341: msg = rows + " "
342: + ResourceMgr.getString("MsgCopyNumRowsUpdated");
343: }
344: return msg;
345: }
346:
347: public String getRowsInsertedMessage() {
348: long rows = this .importer.getInsertedRows();
349: String msg = null;
350:
351: if (rows > 0) {
352: msg = rows + " "
353: + ResourceMgr.getString("MsgCopyNumRowsInserted");
354: }
355: return msg;
356: }
357:
358: public void cancel() {
359: this .sourceData.cancel();
360: }
361:
362: private void initImporterForQuery(String query) throws SQLException {
363: if (this .targetColumnsForQuery == null)
364: return;
365: this .importer.setTargetTable(this .targetTable,
366: this .targetColumnsForQuery);
367: initQuerySource(query);
368: }
369:
370: /**
371: * Send the definition of the target table to the DataImporter, and creates
372: * the approriate SELECT statement to retrieve the data from the source
373: */
374: private void initImporterForTable(String addWhere)
375: throws SQLException {
376: if (this .columnMap == null || this .columnMap.size() == 0) {
377: throw new SQLException("No columns defined");
378: }
379: int count = this .columnMap.size();
380: ColumnIdentifier[] cols = new ColumnIdentifier[count];
381:
382: int col = 0;
383:
384: StringBuilder sql = new StringBuilder(count * 25 + 30);
385: sql.append("SELECT ");
386:
387: for (Map.Entry<ColumnIdentifier, ColumnIdentifier> entry : this .columnMap
388: .entrySet()) {
389: ColumnIdentifier sid = entry.getKey();
390: ColumnIdentifier tid = entry.getValue();
391: if (col > 0) {
392: sql.append(", ");
393: }
394: sql.append(sid.getColumnName());
395: cols[col] = tid;
396: col++;
397: }
398:
399: sql.append(" FROM ");
400: sql.append(this .sourceTable
401: .getTableExpression(this .sourceConnection));
402:
403: if (addWhere != null && addWhere.trim().length() > 0) {
404: sql.append(' ');
405: String first = SqlUtil.getSqlVerb(addWhere);
406: if (!first.equals("WHERE")) {
407: sql.append(" WHERE ");
408: }
409: sql.append(addWhere);
410: }
411: initQuerySource(sql.toString());
412:
413: try {
414: this .importer.setTargetTable(this .targetTable, cols);
415: } catch (SQLException e) {
416: String msg = ResourceMgr.getFormattedString(
417: "ErrCopyTargetTableNotFound", this .targetTable
418: .getTableExpression());
419: this .addMessage(msg);
420: throw e;
421: }
422: }
423:
424: private void initQuerySource(String sql) {
425: QueryCopySource source = new QueryCopySource(
426: this .sourceConnection, sql);
427: this .sourceData = source;
428: this .importer.setProducer(source);
429: }
430:
431: private List<ColumnIdentifier> getSourceColumns()
432: throws SQLException {
433: List<ColumnIdentifier> sourceCols = null;
434: if (this .sourceTable != null) {
435: DbMetadata sourceMeta = this .sourceConnection.getMetadata();
436: TableIdentifier tbl = sourceMeta
437: .resolveSynonym(this .sourceTable);
438: sourceCols = sourceMeta.getTableColumns(tbl);
439: } else if (this .targetColumnsForQuery != null) {
440: sourceCols = new ArrayList<ColumnIdentifier>(
441: this .targetColumnsForQuery.length);
442: for (ColumnIdentifier col : targetColumnsForQuery) {
443: sourceCols.add(col);
444: }
445: }
446: return sourceCols;
447: }
448:
449: /**
450: * Initialize the column mapping between source and target table.
451: * If a mapping is provided, it is used (after checking that the columns
452: * exist in both tables).
453: * If no mapping is provided, all matching columns from both tables are copied
454: */
455: private void initColumnMapping(Map<String, String> columnMapping,
456: boolean createNew) throws SQLException {
457: List<ColumnIdentifier> sourceCols = getSourceColumns();
458: List<ColumnIdentifier> targetCols = (createNew ? null
459: : this .targetConnection.getMetadata().getTableColumns(
460: this .targetTable));
461:
462: this .columnMap = new HashMap<ColumnIdentifier, ColumnIdentifier>(
463: sourceCols.size());
464:
465: if (columnMapping != null) {
466: int colPos = 0;
467: for (Map.Entry<String, String> entry : columnMapping
468: .entrySet()) {
469: ColumnIdentifier scol = new ColumnIdentifier(entry
470: .getKey());
471: int index = sourceCols.indexOf(scol);
472: if (index > -1) {
473: ColumnIdentifier sourceCol = sourceCols.get(index);
474: ColumnIdentifier targetCol = null;
475:
476: if (createNew) {
477: // If no target column specified (e.g. using -columns=col1,col2,col3)
478: // then simply use the name from the source table
479: targetCol = sourceCol.createCopy();
480: if (entry.getValue() != null) {
481: // Mapping specified, change the name of the column to the specified value
482: targetCol.setColumnName(entry.getValue());
483: }
484:
485: // Make sure the order of the tables is preserved
486: // when creating the table later, the columns will
487: // be sorted by the position before generating the SQL
488: targetCol.setPosition(colPos);
489: colPos++;
490: } else {
491: // Find the mapped column name in the columns of the target table
492: targetCol = new ColumnIdentifier(entry
493: .getValue());
494: if (targetCols.indexOf(targetCol) == -1) {
495: targetCol = null;
496: LogMgr.logWarning(
497: "DataCopier.initColumnMapping()",
498: "Column " + targetCol
499: + " not found in table "
500: + this .targetTable
501: + ". Ignoring mapping!");
502: String msg = ResourceMgr
503: .getFormattedString(
504: "ErrCopyTargetColumnNotFound",
505: targetCol.getColumnName());
506: this .addMessage(msg);
507: }
508: }
509: if (targetCol != null)
510: this .columnMap.put(sourceCol, targetCol);
511: } else {
512: LogMgr.logWarning("DataCopier.initColumnMapping()",
513: "Column " + scol + " not found in table "
514: + this .sourceTable
515: + ". Ignoring mapping!");
516: String msg = ResourceMgr.getFormattedString(
517: "ErrCopySourceColumnNotFound", scol
518: .getColumnName());
519: this .addMessage(msg);
520: }
521: }
522: } else {
523: // Use all columns from the source table
524: for (ColumnIdentifier scol : sourceCols) {
525: columnMap.put(scol, scol.createCopy());
526: }
527: }
528: }
529:
530: private void addError(String msg) {
531: if (this .errors == null)
532: this .errors = new MessageBuffer();
533: if (this .errors.getLength() > 0)
534: this .errors.appendNewLine();
535: this .errors.append(msg);
536: }
537:
538: private void addMessage(String msg) {
539: if (this .messages == null)
540: this .messages = new MessageBuffer();
541: if (this .messages.getLength() > 0)
542: this .messages.appendNewLine();
543: this .messages.append(msg);
544: }
545:
546: public boolean hasWarnings() {
547: return this .importer.hasWarnings();
548: }
549:
550: public MessageBuffer getMessageBuffer() {
551: MessageBuffer buf = new MessageBuffer();
552: buf.append(this .messages);
553: importer.copyMessages(buf);
554: buf.append(this .errors);
555: return buf;
556: }
557:
558: public CharSequence getAllMessages() {
559: StringBuilder log = new StringBuilder(2000);
560:
561: if (this .messages != null) {
562: log.append(messages.getBuffer());
563: log.append('\n');
564: }
565: log.append(this .importer.getMessages());
566: if (this .errors != null)
567: log.append(this .errors.getBuffer());
568:
569: return log;
570: }
571:
572: public void setErrorHandler(JobErrorHandler handler) {
573: }
574: }
|