001: /*
002: * QueryCopySource.java
003: *
004: * This file is part of SQL Workbench/J, http://www.sql-workbench.net
005: *
006: * Copyright 2002-2008, Thomas Kellerer
007: * No part of this code maybe reused without the permission of the author
008: *
009: * To contact the author please send an email to: support@sql-workbench.net
010: *
011: */
012: package workbench.db.datacopy;
013:
014: import java.sql.ResultSet;
015: import java.sql.SQLException;
016: import java.sql.Statement;
017: import workbench.db.WbConnection;
018: import workbench.db.importer.RowDataProducer;
019: import workbench.db.importer.RowDataReceiver;
020: import workbench.interfaces.JobErrorHandler;
021: import workbench.log.LogMgr;
022: import workbench.storage.ResultInfo;
023: import workbench.storage.RowData;
024: import workbench.util.MessageBuffer;
025: import workbench.util.SqlUtil;
026: import workbench.util.ValueConverter;
027:
028: /**
029: * @author support@sql-workbench.net
030: */
031: public class QueryCopySource implements RowDataProducer {
032: private RowDataReceiver receiver;
033: private boolean keepRunning = true;
034: private boolean regularStop = false;
035: private WbConnection sourceConnection;
036: private Statement retrieveStatement;
037: private String retrieveSql;
038: private boolean abortOnError;
039: private boolean hasErrors = false;
040: private boolean hasWarnings = false;
041: private RowData currentRow;
042:
043: public QueryCopySource(WbConnection source, String sql) {
044: this .sourceConnection = source;
045: this .retrieveSql = sql;
046: }
047:
048: public boolean hasErrors() {
049: return this .hasErrors;
050: }
051:
052: public boolean hasWarnings() {
053: return this .hasWarnings;
054: }
055:
056: public void setValueConverter(ValueConverter converter) {
057: }
058:
059: public void setReceiver(RowDataReceiver rec) {
060: this .receiver = rec;
061: }
062:
063: public void start() throws Exception {
064: LogMgr.logDebug("QueryCopySource.start()", "Using SQL: "
065: + this .retrieveSql);
066:
067: ResultSet rs = null;
068: this .keepRunning = true;
069: this .regularStop = false;
070: try {
071: this .retrieveStatement = this .sourceConnection
072: .createStatementForQuery();
073: rs = this .retrieveStatement.executeQuery(this .retrieveSql);
074: ResultInfo info = new ResultInfo(rs.getMetaData(),
075: this .sourceConnection);
076: int colCount = info.getColumnCount();
077: currentRow = new RowData(colCount);
078: while (this .keepRunning && rs.next()) {
079: // RowData will make some transformation
080: // on the data read from the database
081: // which works around some bugs in the Oracle
082: // JDBC driver. Especially it will supply
083: // CLOB data as a String which I hope will be
084: // more flexible when copying from Oracle
085: // to other systems
086: currentRow.read(rs, info);
087: if (!keepRunning)
088: break;
089:
090: try {
091: this .receiver.processRow(currentRow.getData());
092: } catch (SQLException e) {
093: if (abortOnError)
094: throw e;
095: }
096: }
097:
098: // if keepRunning == false, cancel() was
099: // called and we have to tell that the Importer
100: // in order to do a rollback
101: if (this .keepRunning || regularStop) {
102: // When copying a schema, we should not send an importFinished()
103: // so that the DataImporter reports the table counts correctly
104: this .receiver.importFinished();
105: } else {
106: this .receiver.importCancelled();
107: }
108: } finally {
109: SqlUtil.closeAll(rs, retrieveStatement);
110: }
111: }
112:
113: public String getLastRecord() {
114: if (currentRow == null)
115: return null;
116: return currentRow.toString();
117: }
118:
119: public void stop() {
120: this .regularStop = true;
121: cancel();
122: }
123:
124: public void cancel() {
125: this .keepRunning = false;
126: try {
127: this .retrieveStatement.cancel();
128: } catch (Exception e) {
129: LogMgr.logWarning("QueryCopySource.cancel()",
130: "Error when cancelling retrieve", e);
131: }
132:
133: }
134:
135: public boolean isCancelled() {
136: return !keepRunning && !regularStop;
137: }
138:
139: public MessageBuffer getMessages() {
140: return null;
141: }
142:
143: public void setAbortOnError(boolean flag) {
144: this .abortOnError = flag;
145: }
146:
147: public void setCheckDependencies(boolean flag) {
148: // not supported
149: }
150:
151: public void setErrorHandler(JobErrorHandler handler) {
152: }
153: }
|