0001: /*
0002: * DataImporter.java
0003: *
0004: * This file is part of SQL Workbench/J, http://www.sql-workbench.net
0005: *
0006: * Copyright 2002-2008, Thomas Kellerer
0007: * No part of this code maybe reused without the permission of the author
0008: *
0009: * To contact the author please send an email to: support@sql-workbench.net
0010: *
0011: */
0012: package workbench.db.importer;
0013:
0014: import java.io.BufferedInputStream;
0015: import java.io.ByteArrayInputStream;
0016: import java.io.Closeable;
0017: import java.io.File;
0018: import java.io.FileInputStream;
0019: import java.io.IOException;
0020: import java.io.InputStream;
0021: import java.sql.Blob;
0022: import java.sql.PreparedStatement;
0023: import java.sql.SQLException;
0024: import java.sql.Statement;
0025: import java.util.Arrays;
0026: import java.util.List;
0027: import workbench.db.ColumnIdentifier;
0028: import workbench.db.DbMetadata;
0029: import workbench.db.TableCreator;
0030: import workbench.db.TableIdentifier;
0031: import workbench.db.WbConnection;
0032: import workbench.interfaces.Committer;
0033: import workbench.interfaces.ProgressReporter;
0034: import workbench.util.ExceptionUtil;
0035: import workbench.interfaces.Interruptable;
0036: import workbench.log.LogMgr;
0037: import workbench.resource.ResourceMgr;
0038: import workbench.storage.RowActionMonitor;
0039: import workbench.util.FileUtil;
0040: import workbench.util.SqlUtil;
0041: import workbench.util.StringUtil;
0042: import java.io.Reader;
0043: import java.sql.Clob;
0044: import java.sql.Savepoint;
0045: import java.sql.Types;
0046: import java.util.LinkedList;
0047: import workbench.interfaces.BatchCommitter;
0048: import workbench.interfaces.ImportFileParser;
0049: import workbench.util.EncodingUtil;
0050: import workbench.util.MessageBuffer;
0051: import workbench.util.WbThread;
0052:
0053: /**
0054: * Import data that is provided from a {@link RowDataProducer} into
0055: * a table in the database.
0056: *
0057: * @see workbench.sql.wbcommands.WbImport
0058: * @see workbench.sql.wbcommands.WbCopy
0059: * @see workbench.db.datacopy.DataCopier
0060: *
0061: * @author support@sql-workbench.net
0062: */
0063: public class DataImporter implements Interruptable, RowDataReceiver,
0064: ProgressReporter, BatchCommitter {
0065: public static final int MODE_INSERT = 0;
0066: public static final int MODE_UPDATE = 1;
0067: public static final int MODE_INSERT_UPDATE = 2;
0068: public static final int MODE_UPDATE_INSERT = 3;
0069:
0070: private WbConnection dbConn;
0071: private String insertSql;
0072: private String updateSql;
0073:
0074: private RowDataProducer source;
0075: private PreparedStatement insertStatement;
0076: private PreparedStatement updateStatement;
0077:
0078: private TableIdentifier targetTable = null;
0079:
0080: private int commitEvery = 0;
0081:
0082: private boolean deleteTarget = false;
0083: private boolean createTarget = false;
0084: private boolean continueOnError = true;
0085:
0086: private long totalRows = 0;
0087: private long updatedRows = 0;
0088: private long insertedRows = 0;
0089: private long currentImportRow = 0;
0090: private int mode = MODE_INSERT;
0091: private boolean useBatch = false;
0092: private int batchSize = -1;
0093: private boolean supportsBatch = false;
0094: private boolean canCommitInBatch = true;
0095: private boolean commitBatch = false;
0096:
0097: private boolean hasErrors = false;
0098: private boolean hasWarnings = false;
0099: private int reportInterval = 1;
0100: private MessageBuffer messages;
0101: private String targetSchema;
0102:
0103: private int colCount;
0104: private boolean useTruncate = false;
0105: private int totalTables = -1;
0106: private int currentTable = -1;
0107: private boolean transactionControl = true;
0108: private boolean useSetNull = false;
0109:
0110: // this array will map the columns for updating the target table
0111: // the index into this array will be the index
0112: // from the row data array supplied by the producer.
0113: // (which should be the same order as the columns in targetColumns)
0114: // the value of that index position is the index
0115: // for the setXXX() method for the prepared statement
0116: // to update the table
0117: private int[] columnMap = null;
0118:
0119: private ColumnIdentifier[] targetColumns;
0120: private List<ColumnIdentifier> keyColumns;
0121:
0122: // A map that stores constant values for the import.
0123: // e.g. for columns not part of the input file.
0124: private ConstantColumnValues columnConstants;
0125:
0126: private RowActionMonitor progressMonitor;
0127: private boolean isRunning = false;
0128: private ImportFileParser parser;
0129:
0130: // Use for partial imports
0131: private long startRow = 0;
0132: private long endRow = Long.MAX_VALUE;
0133: private boolean partialImportEnded = false;
0134:
0135: // Additional WHERE clause for UPDATE statements
0136: private String whereClauseForUpdate;
0137: private BadfileWriter badWriter;
0138: private String badfileName;
0139:
0140: private boolean useSavepoint;
0141: private Savepoint insertSavepoint;
0142: private Savepoint updateSavepoint;
0143:
0144: private boolean checkRealClobLength = false;
0145: private boolean isOracle = false;
0146: private boolean multiTable = false;
0147: private List<Closeable> batchStreams;
0148: private boolean batchRunning = false;
0149: private TableStatements tableStatements;
0150:
0151: public DataImporter() {
0152: this .messages = new MessageBuffer();
0153: }
0154:
0155: public void setConnection(WbConnection aConn) {
0156: this .dbConn = aConn;
0157: if (dbConn == null)
0158: return;
0159: this .supportsBatch = this .dbConn.getMetadata()
0160: .supportsBatchUpdates();
0161: this .useBatch = this .useBatch && supportsBatch;
0162: this .useSavepoint = this .dbConn.getDbSettings()
0163: .useSavepointForImport();
0164: this .useSavepoint = this .useSavepoint
0165: && !this .dbConn.getAutoCommit();
0166: if (useSavepoint && !this .dbConn.supportsSavepoints()) {
0167: LogMgr
0168: .logWarning(
0169: "DataImporter.setConnection",
0170: "A savepoint should be used for each statement but the driver does not support savepoints!");
0171: this .useSavepoint = false;
0172: }
0173: this .checkRealClobLength = this .dbConn.getDbSettings()
0174: .needsExactClobLength();
0175: this .isOracle = this .dbConn.getMetadata().isOracle();
0176: this .useSetNull = this .dbConn.getDbSettings().useSetNull();
0177: }
0178:
0179: public void setTransactionControl(boolean flag) {
0180: this .transactionControl = flag;
0181: }
0182:
0183: public void setRowActionMonitor(RowActionMonitor rowMonitor) {
0184: this .progressMonitor = rowMonitor;
0185: if (this .progressMonitor != null) {
0186: this .progressMonitor
0187: .setMonitorType(RowActionMonitor.MONITOR_INSERT);
0188: }
0189: }
0190:
0191: public void setProducer(RowDataProducer producer) {
0192: this .source = producer;
0193: this .source.setReceiver(this );
0194: this .source.setAbortOnError(!this .continueOnError);
0195: if (producer instanceof ImportFileParser) {
0196: this .parser = (ImportFileParser) producer;
0197: }
0198: }
0199:
0200: /**
0201: * Define statements that should be executed before an import
0202: * for a table starts and after the last record has been inserted.
0203: *
0204: * @param stmt the statement definitions. May be null
0205: */
0206: public void setPerTableStatements(TableStatements stmt) {
0207: if (stmt != null && stmt.hasStatements()) {
0208: this .tableStatements = stmt;
0209: } else {
0210: this .tableStatements = null;
0211: }
0212: }
0213:
0214: public void beginMultiTable() {
0215: this .multiTable = true;
0216: }
0217:
0218: public void endMultiTable() {
0219: this .multiTable = false;
0220: if (this .progressMonitor != null)
0221: this .progressMonitor.jobFinished();
0222: }
0223:
0224: public void setStartRow(long row) {
0225: if (row >= 0)
0226: this .startRow = row;
0227: else
0228: this .startRow = 0;
0229: }
0230:
0231: public void setEndRow(long row) {
0232: if (row >= 0)
0233: this .endRow = row;
0234: else
0235: this .endRow = Long.MAX_VALUE;
0236: }
0237:
0238: public void setCommitBatch(boolean flag) {
0239: this .commitBatch = flag;
0240: if (flag) {
0241: this .commitEvery = 0;
0242: }
0243: }
0244:
0245: /**
0246: * Do not commit any changes after finishing the import
0247: */
0248: public void commitNothing() {
0249: this .commitBatch = false;
0250: this .commitEvery = Committer.NO_COMMIT_FLAG;
0251: }
0252:
0253: public RowDataProducer getProducer() {
0254: return this .source;
0255: }
0256:
0257: /**
0258: * Set the commit interval.
0259: * When this parameter is set, commitBatch is set to false.
0260: *
0261: * @param aCount the interval in which commits should be sent
0262: */
0263: public void setCommitEvery(int aCount) {
0264: if (aCount > 0 || aCount == Committer.NO_COMMIT_FLAG) {
0265: this .commitBatch = false;
0266: }
0267: this .commitEvery = aCount;
0268: }
0269:
0270: public int getCommitEvery() {
0271: return this .commitEvery;
0272: }
0273:
0274: public boolean getContinueOnError() {
0275: return this .continueOnError;
0276: }
0277:
0278: public void setContinueOnError(boolean flag) {
0279: this .continueOnError = flag;
0280: }
0281:
0282: public boolean getDeleteTarget() {
0283: return deleteTarget;
0284: }
0285:
0286: public void setBatchSize(int size) {
0287: this .batchSize = size;
0288: }
0289:
0290: public void setBadfileName(String fname) {
0291: this .badfileName = fname;
0292: }
0293:
0294: public void setWhereClauseForUpdate(String clause) {
0295: if (StringUtil.isEmptyString(clause)) {
0296: this .whereClauseForUpdate = null;
0297: } else {
0298: this .whereClauseForUpdate = clause;
0299: }
0300: }
0301:
0302: /**
0303: * Controls creation of target table for imports where the
0304: * producer can retrieve a full table definition (i.e. XML files
0305: * created with SQL Workbench)
0306: *
0307: * @see #createTarget()
0308: * @see #setTargetTable(workbench.db.TableIdentifier, workbench.db.ColumnIdentifier[])
0309: */
0310: public void setCreateTarget(boolean flag) {
0311: this .createTarget = flag;
0312: }
0313:
0314: /**
0315: * Controls deletion of the target table.
0316: */
0317: public void setDeleteTarget(boolean deleteTarget) {
0318: this .deleteTarget = deleteTarget;
0319: }
0320:
0321: /**
0322: * Use batch updates if the driver supports this
0323: */
0324: public void setUseBatch(boolean flag) {
0325: if (this .isModeInsertUpdate() || this .isModeUpdateInsert())
0326: return;
0327:
0328: if (flag && !this .supportsBatch) {
0329: LogMgr
0330: .logWarning(
0331: "DataImporter.setUseBatch()",
0332: "JDBC driver does not support batch updates. Ignoring request to use batch updates");
0333: this .messages.append(ResourceMgr
0334: .getString("MsgJDBCDriverNoBatch")
0335: + "\n");
0336: }
0337:
0338: if (this .dbConn != null) {
0339: this .useBatch = flag && this .supportsBatch;
0340: } else {
0341: // we cannot yet decide if the driver supports batch updates.
0342: // this will be checked if the connection is set
0343: this .useBatch = flag;
0344: }
0345: }
0346:
0347: public boolean getUseBatch() {
0348: return this .useBatch;
0349: }
0350:
0351: public void setModeInsert() {
0352: this .mode = MODE_INSERT;
0353: }
0354:
0355: public void setModeUpdate() {
0356: this .mode = MODE_UPDATE;
0357: }
0358:
0359: public void setModeInsertUpdate() {
0360: this .mode = MODE_INSERT_UPDATE;
0361: this .useBatch = false;
0362: }
0363:
0364: public void setModeUpdateInsert() {
0365: this .mode = MODE_UPDATE_INSERT;
0366: this .useBatch = false;
0367: }
0368:
0369: public boolean isModeInsert() {
0370: return (this .mode == MODE_INSERT);
0371: }
0372:
0373: public boolean isModeUpdate() {
0374: return (this .mode == MODE_UPDATE);
0375: }
0376:
0377: public boolean isModeInsertUpdate() {
0378: return (this .mode == MODE_INSERT_UPDATE);
0379: }
0380:
0381: public boolean isModeUpdateInsert() {
0382: return (this .mode == MODE_UPDATE_INSERT);
0383: }
0384:
0385: public static int estimateReportIntervalFromFileSize(File file) {
0386: try {
0387: long records = FileUtil.estimateRecords(file, 10);
0388: if (records < 100) {
0389: return 1;
0390: } else if (records < 10000) {
0391: return 10;
0392: } else if (records < 250000) {
0393: return 100;
0394: } else {
0395: return 1000;
0396: }
0397: } catch (Exception e) {
0398: LogMgr
0399: .logError(
0400: "DataImporter.estimateReportIntervalFromFileSize()",
0401: "Error when checking input file", e);
0402: return 0;
0403: }
0404: }
0405:
0406: public void setMode(int mode) {
0407: if (mode == MODE_INSERT)
0408: this .setModeInsert();
0409: else if (mode == MODE_UPDATE)
0410: this .setModeUpdate();
0411: else if (mode == MODE_INSERT_UPDATE)
0412: this .setModeInsertUpdate();
0413: else if (mode == MODE_UPDATE_INSERT)
0414: this .setModeUpdateInsert();
0415: }
0416:
0417: /**
0418: * Return the numer mode value based on keywords.
0419: *
0420: * Valid mode definitions are:
0421: * <ul>
0422: * <li>insert</li>
0423: * <li>update</li>
0424: * <li>insert,update</li>
0425: * <li>update,insert</li>
0426: * </ul>
0427: * The mode string is not case sensitive (INSERT is the same as insert)
0428: * @return -1 if the value is not valid
0429: *
0430: * @see #getModeValue(String)
0431: * @see #MODE_INSERT
0432: * @see #MODE_UPDATE
0433: * @see #MODE_INSERT_UPDATE
0434: * @see #MODE_UPDATE_INSERT
0435: */
0436: public static int getModeValue(String mode) {
0437: if (mode == null)
0438: return -1;
0439: mode = mode.trim().toLowerCase();
0440: if (mode.indexOf(',') == -1) {
0441: // only one keyword supplied
0442: if ("insert".equals(mode)) {
0443: return MODE_INSERT;
0444: } else if ("update".equals(mode)) {
0445: return MODE_UPDATE;
0446: } else {
0447: return -1;
0448: }
0449: } else {
0450: List l = StringUtil.stringToList(mode, ",");
0451: String first = (String) l.get(0);
0452: String second = (String) l.get(1);
0453: if ("insert".equals(first) && "update".equals(second)) {
0454: return MODE_INSERT_UPDATE;
0455: } else if ("update".equals(first)
0456: && "insert".equals(second)) {
0457: return MODE_UPDATE_INSERT;
0458: } else {
0459: return -1;
0460: }
0461: }
0462:
0463: }
0464:
0465: /**
0466: * Define the mode by supplying keywords.
0467: * @return true if the passed string is valid, false otherwise
0468: * @see #getModeValue(String)
0469: */
0470: public boolean setMode(String mode) {
0471: int modevalue = getModeValue(mode);
0472: if (modevalue == -1)
0473: return false;
0474: setMode(modevalue);
0475: return true;
0476: }
0477:
0478: /**
0479: * Define column constants for the import.
0480: * It is expected that the value object is already converted to the correct
0481: * class. DataImporter will not convert the passed values in any way.
0482: */
0483: public void setConstantColumnValues(
0484: ConstantColumnValues constantValues) {
0485: this .columnConstants = null;
0486: if (constantValues != null
0487: && constantValues.getColumnCount() > 0) {
0488: this .columnConstants = constantValues;
0489: }
0490: }
0491:
0492: /**
0493: * Define the key columns by supplying a comma separated
0494: * list of column names
0495: */
0496: public void setKeyColumns(String aColumnList) {
0497: List cols = StringUtil.stringToList(aColumnList, ",");
0498: int count = cols.size();
0499: this .keyColumns = new LinkedList<ColumnIdentifier>();
0500: for (int i = 0; i < count; i++) {
0501: ColumnIdentifier col = new ColumnIdentifier((String) cols
0502: .get(i));
0503: keyColumns.add(col);
0504: }
0505: }
0506:
0507: /**
0508: * Set the key columns for the target table to be used
0509: * for update mode.
0510: * The list has to contain objects of type {@link workbench.db.ColumnIdentifier}
0511: */
0512: public void setKeyColumns(List<ColumnIdentifier> cols) {
0513: this .keyColumns = cols;
0514: }
0515:
0516: private boolean hasKeyColumns() {
0517: return (this .keyColumns != null && keyColumns.size() > 0);
0518: }
0519:
0520: public void startBackgroundImport() {
0521: if (this .source == null)
0522: return;
0523: Thread t = new WbThread("WbImport Thread") {
0524: public void run() {
0525: try {
0526: startImport();
0527: } catch (Throwable th) {
0528: }
0529: }
0530: };
0531: t.setPriority(Thread.MIN_PRIORITY);
0532: t.start();
0533: }
0534:
0535: /**
0536: * Start the import
0537: */
0538: public void startImport() throws IOException, SQLException,
0539: Exception {
0540: if (this .source == null)
0541: return;
0542: this .isRunning = true;
0543: this .canCommitInBatch = true;
0544: this .batchRunning = false;
0545:
0546: // When using UPDATE/INSERT or INSERT/UPDATE
0547: // we cannot use batch mode as we immediately need
0548: // the result of the first statement to decide
0549: // whether we have to send another one
0550: if (this .useBatch
0551: && (this .isModeInsertUpdate() || this
0552: .isModeUpdateInsert())) {
0553: this .useBatch = false;
0554: this .messages.append(ResourceMgr
0555: .getString("ErrImportNoBatchMode"));
0556: }
0557:
0558: try {
0559: this .source.start();
0560: } catch (CycleErrorException e) {
0561: this .hasErrors = true;
0562: messages.append(ResourceMgr.getString("ErrImpCycle"));
0563: messages.append(" (" + e.getRootTable() + ")");
0564: this .messages.append(this .source.getMessages());
0565: throw e;
0566: } catch (Exception e) {
0567: this .hasErrors = true;
0568: this .messages.append(this .source.getMessages());
0569: throw e;
0570: }
0571: }
0572:
0573: public static boolean isDeleteTableAllowed(int mode) {
0574: return mode == MODE_INSERT;
0575: }
0576:
0577: /**
0578: * Deletes the target table by issuing a DELETE FROM ...
0579: */
0580: private void deleteTarget() throws SQLException {
0581: if (this .targetTable == null)
0582: return;
0583: String deleteSql = null;
0584:
0585: if (!this .isModeInsert()) {
0586: LogMgr
0587: .logWarning("DataImporter.deleteTarget()",
0588: "Target table will not be deleted because import mode is not set to 'insert'");
0589: this .messages.append(ResourceMgr
0590: .getString("ErrImpNoDeleteUpd"));
0591: this .messages.appendNewLine();
0592: return;
0593: }
0594:
0595: if (this .useTruncate) {
0596: deleteSql = "TRUNCATE TABLE "
0597: + this .targetTable.getTableExpression(this .dbConn);
0598: } else {
0599: deleteSql = "DELETE FROM "
0600: + this .targetTable.getTableExpression(this .dbConn);
0601: }
0602: Statement stmt = this .dbConn.createStatement();
0603: LogMgr.logInfo("DataImporter.deleteTarget()", "Executing: ["
0604: + deleteSql + "] to delete target table...");
0605: int rows = stmt.executeUpdate(deleteSql);
0606: if (this .useTruncate) {
0607: String msg = ResourceMgr.getString(
0608: "MsgImportTableTruncated").replaceAll("%table%",
0609: this .targetTable.getTableExpression(this .dbConn));
0610: this .messages.append(msg);
0611: this .messages.appendNewLine();
0612: } else {
0613: this .messages.append(rows + " "
0614: + ResourceMgr.getString("MsgImporterRowsDeleted")
0615: + " "
0616: + this .targetTable.getTableExpression(this .dbConn)
0617: + "\n");
0618: }
0619: }
0620:
0621: private void createTarget() throws SQLException {
0622: TableCreator creator = new TableCreator(this .dbConn,
0623: this .targetTable, Arrays.asList(this .targetColumns));
0624: creator.useDbmsDataType(true);
0625: creator.createTable();
0626: String table = creator.getTable().getTableName();
0627: String msg = StringUtil
0628: .replace(ResourceMgr
0629: .getString("MsgImporterTableCreated"),
0630: "%table%", table);
0631: this .messages.append(msg);
0632: }
0633:
0634: public void setUseTruncate(boolean flag) {
0635: this .useTruncate = flag;
0636: }
0637:
0638: public boolean isRunning() {
0639: return this .isRunning;
0640: }
0641:
0642: public boolean isSuccess() {
0643: return !hasErrors;
0644: }
0645:
0646: public boolean hasWarnings() {
0647: return this .hasWarnings;
0648: }
0649:
0650: public long getAffectedRows() {
0651: return this .totalRows;
0652: }
0653:
0654: public long getInsertedRows() {
0655: return this .insertedRows;
0656: }
0657:
0658: public long getUpdatedRows() {
0659: return this .updatedRows;
0660: }
0661:
0662: /**
0663: * This method is called if cancelExecution() is called
0664: * to check if the user should confirm the cancelling of the import
0665: */
0666: public boolean confirmCancel() {
0667: return true;
0668: }
0669:
0670: public void cancelExecution() {
0671: this .isRunning = false;
0672: if (this .batchRunning) {
0673: try {
0674: if (this .insertStatement != null)
0675: this .insertStatement.cancel();
0676: } catch (Throwable th) {
0677:
0678: }
0679: }
0680: this .source.cancel();
0681: this .messages.append(ResourceMgr
0682: .getString("MsgImportCancelled")
0683: + "\n");
0684: }
0685:
0686: public void setTableCount(int total) {
0687: this .totalTables = total;
0688: }
0689:
0690: public void setCurrentTable(int current) {
0691: this .currentTable = current;
0692: }
0693:
0694: private int errorCount = 0;
0695: private boolean errorLimitAdded = false;
0696:
0697: private void addError(String msg) {
0698: if (errorCount < 5000) {
0699: this .messages.append(msg);
0700: } else {
0701: if (!errorLimitAdded) {
0702: messages.appendNewLine();
0703: messages.append(ResourceMgr
0704: .getString("MsgImpTooManyError"));
0705: messages.appendNewLine();
0706: errorLimitAdded = true;
0707: }
0708: }
0709: }
0710:
0711: public void recordRejected(String record) {
0712: if (badWriter != null && record != null) {
0713: badWriter.recordRejected(record);
0714: }
0715: }
0716:
0717: public boolean shouldProcessNextRow() {
0718: if (currentImportRow + 1 < startRow)
0719: return false;
0720: if (currentImportRow + 1 > endRow)
0721: return false;
0722: return true;
0723: }
0724:
0725: public void nextRowSkipped() {
0726: this .currentImportRow++;
0727: }
0728:
0729: /**
0730: * Callback function for RowDataProducer. The order in the data array
0731: * has to be the same as initially passed in the setTargetTable() method.
0732: */
0733: public void processRow(Object[] row) throws SQLException {
0734: if (row == null)
0735: return;
0736: if (row.length != this .colCount) {
0737: throw new SQLException(
0738: "Invalid row data received. Size of row array does not match column count");
0739: }
0740:
0741: currentImportRow++;
0742: if (currentImportRow < startRow)
0743: return;
0744: if (currentImportRow > endRow) {
0745: LogMgr.logInfo("DataImporter.processRow()",
0746: "Import limit (" + this .endRow
0747: + ") reached. Stopping import");
0748: String msg = ResourceMgr.getString("MsgPartialImportEnded");
0749: msg = StringUtil.replace(msg, "%rowlimit%", Long
0750: .toString(endRow));
0751: this .messages.append(msg);
0752: this .messages.appendNewLine();
0753: this .source.stop();
0754: return;
0755: }
0756:
0757: if (this .progressMonitor != null
0758: && this .reportInterval > 0
0759: && (currentImportRow == 1 || currentImportRow
0760: % reportInterval == 0)) {
0761: if (this .totalTables > 0) {
0762: StringBuilder msg = new StringBuilder(this .targetTable
0763: .getTableName().length() + 20);
0764: msg.append(this .targetTable.getTableName());
0765: msg.append(" [");
0766: msg.append(this .currentTable);
0767: msg.append('/');
0768: msg.append(this .totalTables);
0769: msg.append(']');
0770: progressMonitor.setCurrentObject(msg.toString(),
0771: currentImportRow, -1);
0772: } else {
0773: progressMonitor.setCurrentObject(this .targetTable
0774: .getTableName(), currentImportRow, -1);
0775: }
0776: }
0777:
0778: int rows = 0;
0779: try {
0780: switch (this .mode) {
0781: case MODE_INSERT:
0782: rows = this .insertRow(row, useSavepoint
0783: && continueOnError);
0784: break;
0785:
0786: case MODE_INSERT_UPDATE:
0787: boolean inserted = false;
0788: // in case of an Exception we are retrying the row
0789: // with an update. Theoretically the only expected
0790: // exception should indicate a primary key violation,
0791: // but as we don't analyze the exception, we will
0792: // try the update, for any exception. If the exception
0793: // was not a key violation, the update will most probably
0794: // fail as well.
0795: try {
0796: rows = this .insertRow(row, useSavepoint);
0797: inserted = true;
0798: } catch (Exception e) {
0799: //LogMgr.logDebug("DataImporter.processRow()", "Error inserting row, trying update");
0800: inserted = false;
0801: }
0802:
0803: if (!inserted) {
0804: rows = this .updateRow(row, useSavepoint
0805: && continueOnError);
0806: }
0807: break;
0808:
0809: case MODE_UPDATE_INSERT:
0810: // an exception is not expected when updating the row
0811: // if the row does not exist, the update counter should be
0812: // zero. If the update violates any constraints, then the
0813: // INSERT will fail as well, so any exception thrown, indicates
0814: // an error with this row, so we will not proceed with the insert
0815: rows = this .updateRow(row, useSavepoint
0816: && continueOnError);
0817: if (rows <= 0)
0818: rows = this .insertRow(row, useSavepoint
0819: && continueOnError);
0820: break;
0821:
0822: case MODE_UPDATE:
0823: rows = this .updateRow(row, useSavepoint
0824: && continueOnError);
0825: break;
0826: }
0827: this .totalRows += rows;
0828: } catch (OutOfMemoryError oome) {
0829: this .hasErrors = true;
0830: closeStatements();
0831: this .messages.clear();
0832: System.gc();
0833: this .messages.append(ResourceMgr
0834: .getString("MsgOutOfMemoryGeneric"));
0835: this .messages.appendNewLine();
0836: if (this .batchSize > 0) {
0837: LogMgr
0838: .logError(
0839: "DataImporter.processRow()",
0840: "Not enough memory to hold statement batch! Use the -batchSize parameter to reduce the batch size!",
0841: null);
0842: this .messages.append(ResourceMgr
0843: .getString("MsgOutOfMemoryJdbcBatch"));
0844: this .messages.appendNewLine();
0845: this .messages.appendNewLine();
0846: } else {
0847: LogMgr.logError("DataImporter.processRow()",
0848: "Not enough memory to run this import!", null);
0849: }
0850: throw new SQLException("Not enough memory!");
0851: } catch (SQLException e) {
0852: this .hasErrors = true;
0853: LogMgr.logError("DataImporter.processRow()",
0854: "Error importing row " + currentImportRow + ": "
0855: + ExceptionUtil.getDisplay(e), null);
0856: if (this .badWriter == null) {
0857: String value = this .getValueDisplay(row);
0858: this .addError(ResourceMgr.getString("ErrImportingRow")
0859: + " " + currentImportRow + "\n");
0860: this .addError(ResourceMgr
0861: .getString("ErrImportErrorMsg")
0862: + " " + e.getMessage() + "\n");
0863: this .addError(ResourceMgr.getString("ErrImportValues")
0864: + " " + value + "\n\n");
0865: if (errorLimitAdded) {
0866: LogMgr.logError("DataImporter.processRow()",
0867: "Values: " + value, null);
0868: }
0869: }
0870: errorCount++;
0871: if (!this .continueOnError)
0872: throw e;
0873: String rec = this .source.getLastRecord();
0874: if (rec == null) {
0875: rec = this .getValueDisplay(row);
0876: }
0877: recordRejected(rec);
0878: }
0879:
0880: if (this .useBatch && this .batchSize > 0
0881: && ((this .totalRows % this .batchSize) == 0)) {
0882: try {
0883: this .executeBatch();
0884: } catch (OutOfMemoryError oome) {
0885: this .hasErrors = true;
0886: closeStatements();
0887: this .messages = new MessageBuffer();
0888: System.gc();
0889: this .messages.append(ResourceMgr
0890: .getString("MsgOutOfMemoryGeneric"));
0891: throw new SQLException("Not enough memory!");
0892: } catch (SQLException e) {
0893: this .hasErrors = true;
0894: LogMgr.logError("DataImporter.processRow()",
0895: "Error executing batch after "
0896: + currentImportRow + " rows", e);
0897: this .addError(ResourceMgr
0898: .getString("ErrImportExecuteBatchQueue")
0899: + "\n");
0900: this .addError(e.getMessage());
0901: if (!this .continueOnError)
0902: throw e;
0903: }
0904: }
0905:
0906: if (this .commitEvery > 0
0907: && ((this .totalRows % this .commitEvery) == 0)
0908: && !this .dbConn.getAutoCommit()) {
0909: try {
0910: // Oracle seems to have a problem with adding a different SQL statement
0911: // to the batch of a prepared Statement (works fine with PostgreSQL)
0912: if (this .useBatch) {
0913: if (canCommitInBatch) {
0914: PreparedStatement stmt = null;
0915: if (this .isModeInsert()) {
0916: stmt = this .insertStatement;
0917: } else if (this .isModeUpdate()) {
0918: stmt = this .updateStatement;
0919: }
0920:
0921: try {
0922: if (stmt != null)
0923: stmt.addBatch("COMMIT");
0924: } catch (Exception e) {
0925: LogMgr
0926: .logWarning(
0927: "DataImporter.processRow()",
0928: "Error when adding COMMIT to batch. This does not seem to be supported by the server: "
0929: + ExceptionUtil
0930: .getDisplay(e));
0931: String msg = ResourceMgr.getString(
0932: "ErrCommitInBatch").replaceAll(
0933: "%error%", e.getMessage())
0934: + "\n";
0935: this .messages.append(msg);
0936: this .hasWarnings = true;
0937: this .canCommitInBatch = false;
0938: }
0939: }
0940: } else {
0941: this .dbConn.commit();
0942: }
0943: } catch (SQLException e) {
0944: String error = ExceptionUtil.getDisplay(e);
0945: this .messages.append(error);
0946: this .messages.appendNewLine();
0947: this .hasErrors = true;
0948: if (!continueOnError)
0949: throw e;
0950: }
0951: }
0952: }
0953:
0954: private void setUpdateSavepoint() {
0955: try {
0956: this .updateSavepoint = this .dbConn.getSqlConnection()
0957: .setSavepoint();
0958: } catch (Exception e) {
0959: LogMgr.logError("DataImporter",
0960: "Could not create pre-update Savepoint", e);
0961: }
0962: }
0963:
0964: private void setInsertSavepoint() {
0965: try {
0966: this .insertSavepoint = this .dbConn.getSqlConnection()
0967: .setSavepoint();
0968: } catch (Exception e) {
0969: LogMgr.logError("DataImporter",
0970: "Could not set pre-insert Savepoint", e);
0971: }
0972: }
0973:
0974: private void rollbackUpdate() {
0975: rollbackToSavepoint(updateSavepoint);
0976: updateSavepoint = null;
0977: }
0978:
0979: private void rollbackInsert() {
0980: rollbackToSavepoint(insertSavepoint);
0981: insertSavepoint = null;
0982: }
0983:
0984: private void rollbackToSavepoint(Savepoint savepoint) {
0985: if (savepoint == null)
0986: return;
0987: try {
0988: this .dbConn.getSqlConnection().rollback(savepoint);
0989: } catch (Exception e) {
0990: LogMgr.logError("DataImporter.rollbackToSavePoint()",
0991: "Error when performing rollback to savepoint", e);
0992: }
0993: }
0994:
0995: private void releaseInsertSavepoint() {
0996: releaseSavepoint(insertSavepoint);
0997: insertSavepoint = null;
0998: }
0999:
1000: private void releaseUpdateSavepoint() {
1001: releaseSavepoint(updateSavepoint);
1002: updateSavepoint = null;
1003: }
1004:
1005: private void releaseSavepoint(Savepoint savepoint) {
1006: if (savepoint == null)
1007: return;
1008: try {
1009: this .dbConn.getSqlConnection().releaseSavepoint(savepoint);
1010: } catch (Throwable th) {
1011: LogMgr.logError("DataImporter.processrow()",
1012: "Error when releasing savepoint", th);
1013: }
1014: }
1015:
1016: private String getValueDisplay(Object[] row) {
1017: int count = row.length;
1018: StringBuilder values = new StringBuilder(count * 20);
1019: values.append('[');
1020:
1021: for (int i = 0; i < count; i++) {
1022: if (i > 0)
1023: values.append(',');
1024: if (row[i] == null) {
1025: values.append("NULL");
1026: } else {
1027: values.append(row[i].toString());
1028: }
1029: }
1030: values.append(']');
1031: return values.toString();
1032: }
1033:
1034: /**
1035: * Insert a row of data into the target table.
1036: * This method relies on insertStatement correctly initialized with
1037: * all parameters at the correct location.
1038: */
1039: private int insertRow(Object[] row, boolean useSP)
1040: throws SQLException {
1041: try {
1042: if (useSP)
1043: setInsertSavepoint();
1044: int rows = processRowData(this .insertStatement, row,
1045: this .useBatch, false);
1046: if (!this .useBatch) {
1047: this .insertedRows += rows;
1048: }
1049: releaseInsertSavepoint();
1050: return rows;
1051: } catch (SQLException e) {
1052: if (useSP) {
1053: rollbackInsert();
1054: }
1055: throw e;
1056: }
1057: }
1058:
1059: /**
1060: * Update the data in the target table using the PreparedStatement
1061: * available in updateStatement
1062: */
1063: private int updateRow(Object[] row, boolean useSP)
1064: throws SQLException {
1065: try {
1066: if (useSP)
1067: setUpdateSavepoint();
1068: int rows = processRowData(this .updateStatement, row,
1069: this .useBatch, true);
1070: if (!this .useBatch) {
1071: this .updatedRows += rows;
1072: }
1073: releaseUpdateSavepoint();
1074: return rows;
1075: } catch (SQLException e) {
1076: if (useSP) {
1077: rollbackUpdate();
1078: }
1079: throw e;
1080: }
1081: }
1082:
1083: private int processRowData(PreparedStatement pstmt, Object[] row,
1084: boolean addBatch, boolean useColMap) throws SQLException {
1085: List<Closeable> streams = new LinkedList<Closeable>();
1086:
1087: for (int i = 0; i < row.length; i++) {
1088: int colIndex = i + 1;
1089: if (useColMap) {
1090: // The colIndex points to the correct location in the PreparedStatement
1091: // when using UPDATE with different column names
1092: colIndex = this .columnMap[i] + 1;
1093: }
1094:
1095: int targetSqlType = this .targetColumns[i].getDataType();
1096: String targetDbmsType = this .targetColumns[i].getDbmsType();
1097:
1098: if (row[i] == null) {
1099: if (useSetNull) {
1100: pstmt.setNull(colIndex, targetSqlType);
1101: } else {
1102: pstmt.setObject(colIndex, null);
1103: }
1104: } else if (SqlUtil.isClobType(targetSqlType)
1105: || "LONG".equals(targetDbmsType)
1106: || "CLOB".equals(targetDbmsType)) {
1107: Reader in = null;
1108: int size = -1;
1109:
1110: if (row[i] instanceof Clob) {
1111: Clob clob = (Clob) row[i];
1112: in = clob.getCharacterStream();
1113: streams.add(in);
1114: } else if (row[i] instanceof File) {
1115: ImportFileHandler handler = (this .parser != null ? parser
1116: .getFileHandler()
1117: : null);
1118: String encoding = (handler != null ? handler
1119: .getEncoding() : null);
1120: if (encoding == null) {
1121: encoding = (this .parser != null ? parser
1122: .getEncoding() : EncodingUtil
1123: .getDefaultEncoding());
1124: }
1125:
1126: File f = (File) row[i];
1127: try {
1128: if (handler != null) {
1129: in = EncodingUtil
1130: .createReader(handler
1131: .getAttachedFileStream(f),
1132: encoding);
1133:
1134: // Apache Derby needs the exact length in characters
1135: // which might not be the file size if a multi-byte encoding is used
1136: if (checkRealClobLength) {
1137: size = (int) handler
1138: .getCharacterLength(f);
1139: } else {
1140: size = (int) handler.getLength(f);
1141: }
1142: streams.add(in);
1143: } else {
1144: if (!f.isAbsolute()) {
1145: File sourcefile = new File(this .parser
1146: .getSourceFilename());
1147: f = new File(
1148: sourcefile.getParentFile(), f
1149: .getName());
1150: }
1151: in = EncodingUtil.createBufferedReader(f,
1152: encoding);
1153: streams.add(in);
1154:
1155: // Apache Derby needs the exact length in characters
1156: // which might not be the file size if a multi-byte encoding is used
1157: if (checkRealClobLength) {
1158: size = (int) FileUtil
1159: .getCharacterLength(f, encoding);
1160: } else {
1161: size = (int) f.length();
1162: }
1163: }
1164: } catch (IOException ex) {
1165: hasErrors = true;
1166: String msg = ResourceMgr.getFormattedString(
1167: "ErrFileNotAccessible", f
1168: .getAbsolutePath(), ex
1169: .getMessage());
1170: messages.append(msg);
1171: throw new SQLException(ex.getMessage());
1172: }
1173: } else {
1174: // this assumes that the JDBC driver will actually
1175: // implement the toString() for whatever object
1176: // it created when reading that column!
1177: String value = row[i].toString();
1178: in = null;
1179: pstmt.setObject(colIndex, value);
1180: }
1181:
1182: if (in != null) {
1183: // For Oracle, this will only work with Oracle 10g drivers.
1184: // Oracle 9i drivers do not implement the setCharacterStream()
1185: // and associated methods properly
1186: pstmt.setCharacterStream(colIndex, in, size);
1187: }
1188: } else if (SqlUtil.isBlobType(targetSqlType)
1189: || "BLOB".equals(targetDbmsType)) {
1190: InputStream in = null;
1191: int len = -1;
1192: if (row[i] instanceof File) {
1193: // When importing files created by SQL Workbench/J
1194: // blobs will be "passed" as File objects pointing to the external file
1195: ImportFileHandler handler = (this .parser != null ? parser
1196: .getFileHandler()
1197: : null);
1198: File f = (File) row[i];
1199: try {
1200: if (handler != null) {
1201: in = new BufferedInputStream(handler
1202: .getAttachedFileStream(f));
1203: len = (int) handler.getLength(f);
1204: } else {
1205: if (!f.isAbsolute()) {
1206: File sourcefile = new File(this .parser
1207: .getSourceFilename());
1208: f = new File(
1209: sourcefile.getParentFile(), f
1210: .getName());
1211: }
1212: in = new BufferedInputStream(
1213: new FileInputStream(f), 64 * 1024);
1214: len = (int) f.length();
1215: }
1216: } catch (IOException ex) {
1217: hasErrors = true;
1218: String msg = ResourceMgr.getFormattedString(
1219: "ErrFileNotAccessible", f
1220: .getAbsolutePath(), ex
1221: .getMessage());
1222: messages.append(msg);
1223: throw new SQLException(ex.getMessage());
1224: }
1225: streams.add(in);
1226: } else if (row[i] instanceof Blob) {
1227: Blob b = (Blob) row[i];
1228: in = b.getBinaryStream();
1229: streams.add(in);
1230: len = (int) b.length();
1231: } else if (row[i] instanceof byte[]) {
1232: byte[] buffer = (byte[]) row[i];
1233: in = new ByteArrayInputStream(buffer);
1234: len = buffer.length;
1235: }
1236:
1237: if (in != null && len > -1) {
1238: pstmt.setBinaryStream(colIndex, in, len);
1239: } else {
1240: pstmt.setNull(colIndex, Types.BLOB);
1241: this .messages
1242: .append(ResourceMgr.getFormattedString(
1243: "MsgBlobNotRead", i + 1));
1244: this .messages.appendNewLine();
1245: }
1246: } else {
1247: if (isOracle && targetSqlType == java.sql.Types.DATE
1248: && row[i] instanceof java.sql.Date) {
1249: java.sql.Timestamp ts = new java.sql.Timestamp(
1250: ((java.sql.Date) row[i]).getTime());
1251: pstmt.setTimestamp(colIndex, ts);
1252: } else {
1253: pstmt.setObject(colIndex, row[i]);
1254: }
1255: }
1256: }
1257:
1258: if (this .columnConstants != null
1259: && pstmt == this .insertStatement) {
1260: int count = this .columnConstants.getColumnCount();
1261: int colIndex = row.length + 1;
1262: for (int i = 0; i < count; i++) {
1263: if (!this .columnConstants.isFunctionCall(i)) {
1264: columnConstants.setParameter(pstmt, colIndex, i);
1265: colIndex++;
1266: }
1267: }
1268: }
1269:
1270: int rows = 0;
1271: if (addBatch) {
1272: pstmt.addBatch();
1273: if (this .batchStreams == null) {
1274: this .batchStreams = streams;
1275: } else {
1276: this .batchStreams.addAll(streams);
1277: }
1278:
1279: // let's assume the batch statement affects at least one row.
1280: // If this is not done, the rowcount will never be increased
1281: // in batchmode and thus each row will be committed even if
1282: // a different commit frequency is selected.
1283: // Thanks to Pascal for pointing this out!
1284: rows = 1;
1285: } else {
1286: try {
1287: rows = pstmt.executeUpdate();
1288: } finally {
1289: FileUtil.closeStreams(streams);
1290: }
1291: }
1292: return rows;
1293: }
1294:
1295: private void checkConstantValues() throws SQLException {
1296: if (this .columnConstants == null)
1297: return;
1298: for (ColumnIdentifier col : this .targetColumns) {
1299: if (this .columnConstants.removeColumn(col)) {
1300: String msg = ResourceMgr.getFormattedString(
1301: "MsgImporterConstIgnored", col.getColumnName());
1302: this .messages.append(msg);
1303: this .messages.appendNewLine();
1304: if (this .continueOnError) {
1305: LogMgr.logWarning(
1306: "DataImporter.checkConstanValues()", msg);
1307: } else {
1308: throw new SQLException(msg);
1309: }
1310: }
1311: }
1312: }
1313:
1314: /**
1315: * Callback function from the RowDataProducer
1316: */
1317: public void setTargetTable(TableIdentifier table,
1318: ColumnIdentifier[] columns) throws SQLException {
1319: // be prepared to import more then one table...
1320: if (this .isRunning && this .targetTable != null) {
1321: try {
1322: this .finishTable();
1323: this .totalRows = 0;
1324: this .currentImportRow = 0;
1325: this .updatedRows = 0;
1326: this .insertedRows = 0;
1327: } catch (SQLException e) {
1328: this .totalRows = -1;
1329: this .currentImportRow = -1;
1330: this .updatedRows = -1;
1331: this .insertedRows = -1;
1332: if (!this .continueOnError) {
1333: this .hasErrors = true;
1334: throw e;
1335: }
1336: }
1337: }
1338:
1339: this .errorCount = 0;
1340: this .errorLimitAdded = false;
1341:
1342: try {
1343: this .targetTable = table.createCopy();
1344: this .targetColumns = columns;
1345: this .colCount = this .targetColumns.length;
1346:
1347: if (this .parser != null) {
1348: String msg = ResourceMgr.getFormattedString(
1349: "MsgImportingFile", this .parser
1350: .getSourceFilename(), this .targetTable
1351: .getTableName());
1352: this .messages.append(msg);
1353: this .messages.appendNewLine();
1354: }
1355:
1356: if (this .createTarget) {
1357: try {
1358: this .createTarget();
1359: } catch (SQLException e) {
1360: String msg = ResourceMgr
1361: .getString("ErrImportTableNotCreated");
1362: msg = StringUtil.replace(msg, "%table%",
1363: this .targetTable
1364: .getTableExpression(this .dbConn));
1365: msg = StringUtil.replace(msg, "%error%",
1366: ExceptionUtil.getDisplay(e));
1367: this .messages.append(msg);
1368: this .messages.appendNewLine();
1369: LogMgr.logError("DataImporter.setTargetTable()",
1370: "Could not create target: "
1371: + this .targetTable, e);
1372: this .hasErrors = true;
1373: throw e;
1374: }
1375: }
1376:
1377: try {
1378: this .checkTable();
1379: } catch (SQLException e) {
1380: String msg = ResourceMgr.getFormattedString(
1381: "ErrImportTableNotFound", this .targetTable
1382: .getTableExpression());
1383: if (parser != null) {
1384: String s = ResourceMgr
1385: .getString("ErrImportFileNotProcessed");
1386: msg = msg
1387: + " "
1388: + StringUtil.replace(s, "%filename%",
1389: this .parser.getSourceFilename());
1390: }
1391: this .hasErrors = true;
1392: this .messages.append(msg);
1393: this .messages.appendNewLine();
1394: this .targetTable = null;
1395: throw e;
1396: }
1397:
1398: checkConstantValues();
1399:
1400: if (this .mode != MODE_UPDATE) {
1401: this .prepareInsertStatement();
1402: }
1403: if (this .mode != MODE_INSERT) {
1404: this .prepareUpdateStatement();
1405: }
1406:
1407: if (this .deleteTarget) {
1408: try {
1409: this .deleteTarget();
1410: } catch (SQLException e) {
1411: this .hasErrors = true;
1412: String msg = ResourceMgr
1413: .getString("ErrDeleteTableData");
1414: msg = msg.replaceAll("%table%", table.toString());
1415: msg = msg.replaceAll("%error%", ExceptionUtil
1416: .getDisplay(e));
1417: this .messages.append(msg);
1418: this .messages.appendNewLine();
1419:
1420: LogMgr.logError("DataImporter.setTargetTable()",
1421: "Could not delete contents of table "
1422: + this .targetTable, e);
1423: if (!this .continueOnError) {
1424: throw e;
1425: }
1426: }
1427: }
1428:
1429: this .currentImportRow = 0;
1430: this .totalRows = 0;
1431:
1432: if (this .reportInterval == 0
1433: && this .progressMonitor != null) {
1434: this .progressMonitor
1435: .setMonitorType(RowActionMonitor.MONITOR_PLAIN);
1436: this .progressMonitor.setCurrentObject(ResourceMgr
1437: .getString("MsgImportingTableData")
1438: + " "
1439: + this .targetTable
1440: + " ("
1441: + this .getModeString() + ")", -1, -1);
1442: }
1443: if (LogMgr.isInfoEnabled()) {
1444: LogMgr
1445: .logInfo("DataImporter.setTargetTable()",
1446: "Starting import for table "
1447: + this .targetTable
1448: .getTableExpression());
1449: }
1450:
1451: if (this .badfileName != null) {
1452: this .badWriter = new BadfileWriter(this .badfileName,
1453: this .targetTable, "UTF8");
1454: } else {
1455: this .badWriter = null;
1456: }
1457:
1458: if (this .tableStatements != null) {
1459: this .tableStatements.runPreTableStatement(dbConn,
1460: targetTable);
1461: }
1462: } catch (RuntimeException th) {
1463: this .hasErrors = true;
1464: LogMgr
1465: .logError("DataImporter.setTargetTable()",
1466: "Error when setting target table "
1467: + this .targetTable
1468: .getTableExpression(), th);
1469: throw th;
1470: }
1471: }
1472:
1473: private String getModeString() {
1474: if (this .isModeInsert())
1475: return "insert";
1476: if (this .isModeUpdate())
1477: return "update";
1478: if (this .isModeInsertUpdate())
1479: return "insert/update";
1480: if (this .isModeUpdateInsert())
1481: return "update/insert";
1482: return "";
1483: }
1484:
1485: private void checkTable() throws SQLException {
1486: if (this .dbConn == null)
1487: return;
1488: if (this .targetTable == null)
1489: return;
1490:
1491: DbMetadata meta = this .dbConn.getMetadata();
1492: boolean exists = meta.tableExists(this .targetTable);
1493: if (!exists) {
1494: throw new SQLException("Table "
1495: + this .targetTable.getTableExpression(this .dbConn)
1496: + " not found!");
1497: }
1498: }
1499:
1500: /**
1501: * Prepare the statement to be used for inserts.
1502: * targetTable and targetColumns have to be initialized before calling this!
1503: */
1504: private void prepareInsertStatement() throws SQLException {
1505: StringBuilder text = new StringBuilder(
1506: this .targetColumns.length * 50);
1507: StringBuilder parms = new StringBuilder(
1508: targetColumns.length * 20);
1509:
1510: String sql = dbConn.getDbSettings().getInsertForImport();
1511: if (!StringUtil.isEmptyString(sql)) {
1512: text.append(sql);
1513: text.append(' ');
1514: } else {
1515: text.append("INSERT INTO ");
1516: }
1517: text.append(targetTable.getTableExpression(this .dbConn));
1518: text.append(" (");
1519: for (int i = 0; i < this .colCount; i++) {
1520: if (i > 0) {
1521: text.append(',');
1522: parms.append(',');
1523: }
1524: text.append(this .targetColumns[i].getColumnName());
1525: parms.append('?');
1526: }
1527: if (this .columnConstants != null) {
1528: int cols = columnConstants.getColumnCount();
1529: for (int i = 0; i < cols; i++) {
1530: text.append(',');
1531: text.append(columnConstants.getColumn(i)
1532: .getColumnName());
1533: parms.append(',');
1534: if (columnConstants.isFunctionCall(i)) {
1535: parms.append(columnConstants.getFunctionLiteral(i));
1536: } else {
1537: parms.append('?');
1538: }
1539: }
1540: }
1541: text.append(") VALUES (");
1542: text.append(parms);
1543: text.append(')');
1544:
1545: try {
1546: this .insertSql = text.toString();
1547: this .insertStatement = this .dbConn.getSqlConnection()
1548: .prepareStatement(this .insertSql);
1549: LogMgr.logInfo("DataImporter.prepareInsertStatement()",
1550: "Statement for insert: " + this .insertSql);
1551: } catch (SQLException e) {
1552: LogMgr.logError("DataImporter.prepareInsertStatement()",
1553: "Error when preparing INSERT statement: "
1554: + this .insertSql, e);
1555: this .messages.append(ResourceMgr
1556: .getString("ErrImportInitTargetFailed"));
1557: this .messages.append(ExceptionUtil.getDisplay(e));
1558: this .insertStatement = null;
1559: this .hasErrors = true;
1560: throw e;
1561: }
1562: }
1563:
1564: /**
1565: * Prepare the statement to be used for updates
1566: * targetTable and targetColumns have to be initialized before calling this!
1567: */
1568: private void prepareUpdateStatement() throws SQLException {
1569: if (!this .hasKeyColumns()) {
1570: this .retrieveKeyColumns();
1571: if (!this .hasKeyColumns()) {
1572: this .messages.append(ResourceMgr
1573: .getString("ErrImportNoKeyForUpdate"));
1574: this .messages.appendNewLine();
1575: throw new SQLException(
1576: "No key columns defined for update mode");
1577: }
1578: }
1579:
1580: this .columnMap = new int[this .colCount];
1581: int pkIndex = this .colCount - this .keyColumns.size();
1582: int pkCount = 0;
1583: int colIndex = 0;
1584: StringBuilder sql = new StringBuilder(this .colCount * 20 + 80);
1585: StringBuilder where = new StringBuilder(
1586: this .keyColumns.size() * 10);
1587: sql.append("UPDATE ");
1588: sql.append(this .targetTable.getTableExpression(this .dbConn));
1589: sql.append(" SET ");
1590: where.append(" WHERE ");
1591: boolean pkAdded = false;
1592: for (int i = 0; i < this .colCount; i++) {
1593: ColumnIdentifier col = this .targetColumns[i];
1594: if (keyColumns.contains(col)) {
1595: this .columnMap[i] = pkIndex;
1596: if (pkAdded)
1597: where.append(" AND ");
1598: else
1599: pkAdded = true;
1600: where.append(col.getColumnName());
1601: where.append(" = ?");
1602: pkIndex++;
1603: pkCount++;
1604: } else {
1605: this .columnMap[i] = colIndex;
1606: if (colIndex > 0) {
1607: sql.append(", ");
1608: }
1609: sql.append(col.getColumnName());
1610: sql.append(" = ?");
1611: colIndex++;
1612: }
1613: }
1614: if (!pkAdded) {
1615: LogMgr
1616: .logError(
1617: "DataImporter.prepareUpdateStatement()",
1618: "No primary key columns defined! Update mode not available\n",
1619: null);
1620: this .messages.append(ResourceMgr
1621: .getString("ErrImportNoKeyForUpdate"));
1622: this .messages.appendNewLine();
1623: this .updateSql = null;
1624: this .updateStatement = null;
1625: throw new SQLException(
1626: "No key columns defined for update mode");
1627: }
1628: if (pkCount != this .keyColumns.size()) {
1629: LogMgr
1630: .logError(
1631: "DataImporter.prepareUpdateStatement()",
1632: "At least one of the supplied primary key columns was not found in the target table!\n",
1633: null);
1634: this .messages.append(ResourceMgr
1635: .getString("ErrImportUpdateKeyColumnNotFound")
1636: + "\n");
1637: this .updateSql = null;
1638: this .updateStatement = null;
1639: throw new SQLException(
1640: "Not enough key columns defined for update mode");
1641: }
1642:
1643: if (colIndex == 0) {
1644: LogMgr
1645: .logError(
1646: "DataImporter.prepareUpdateStatement()",
1647: "Only PK columns defined! Update mode is not available!",
1648: null);
1649: this .messages.append(ResourceMgr
1650: .getString("ErrImportOnlyKeyColumnsForUpdate"));
1651: this .updateSql = null;
1652: this .updateStatement = null;
1653: throw new SQLException(
1654: "Only key columns defined for update mode");
1655: }
1656:
1657: sql.append(where);
1658: if (!StringUtil.isEmptyString(this .whereClauseForUpdate)) {
1659: boolean addBracket = false;
1660: if (!this .whereClauseForUpdate.trim().toUpperCase()
1661: .startsWith("AND")
1662: && !this .whereClauseForUpdate.trim().toUpperCase()
1663: .startsWith("OR")) {
1664: sql.append(" AND (");
1665: addBracket = true;
1666: } else {
1667: sql.append(' ');
1668: }
1669: sql.append(this .whereClauseForUpdate.trim());
1670: if (addBracket)
1671: sql.append(")");
1672: }
1673:
1674: try {
1675: this .updateSql = sql.toString();
1676: this .updateStatement = this .dbConn.getSqlConnection()
1677: .prepareStatement(this .updateSql);
1678: LogMgr.logInfo("DataImporter.prepareUpdateStatement()",
1679: "Statement for update: " + this .updateSql);
1680: } catch (SQLException e) {
1681: LogMgr.logError("DataImporter.prepareUpdateStatement()",
1682: "Error when preparing UPDATE statement", e);
1683: this .messages.append(ResourceMgr
1684: .getString("ErrImportInitTargetFailed"));
1685: this .messages.append(ExceptionUtil.getDisplay(e));
1686: this .updateStatement = null;
1687: this .hasErrors = true;
1688: throw e;
1689: }
1690: return;
1691: }
1692:
1693: /**
1694: * If the key columns have not been defined externally through {@link #setKeyColumns(List)}
1695: * this method is used to retrieve the key columns for the target table
1696: */
1697: private void retrieveKeyColumns() {
1698: try {
1699: List<ColumnIdentifier> cols = this .dbConn.getMetadata()
1700: .getTableColumns(this .targetTable);
1701: this .keyColumns = new LinkedList<ColumnIdentifier>();
1702: for (ColumnIdentifier col : cols) {
1703: if (col.isPkColumn()) {
1704: this .keyColumns.add(col);
1705: }
1706: }
1707: } catch (SQLException e) {
1708: LogMgr.logError("DataImporter.retrieveKeyColumns()",
1709: "Error when retrieving key columns", e);
1710: this .columnMap = null;
1711: this .keyColumns = null;
1712: }
1713: }
1714:
1715: private void executeBatch() throws SQLException {
1716: if (!this .useBatch)
1717: return;
1718:
1719: try {
1720: this .batchRunning = true;
1721: if (this .isModeInsert() && this .insertStatement != null) {
1722: int rows[] = this .insertStatement.executeBatch();
1723: if (rows != null) {
1724: for (int i = 0; i < rows.length; i++) {
1725: // Oracle does not seem to report the correct number
1726: // so, if we get a SUCCESS_NO_INFO status, we'll simply
1727: // assume that one row has been inserted
1728: if (rows[i] == Statement.SUCCESS_NO_INFO)
1729: this .insertedRows++;
1730: else if (rows[i] >= 0)
1731: this .insertedRows += rows[i];
1732: }
1733: }
1734: this .insertStatement.clearBatch();
1735: } else if (this .isModeUpdate()
1736: && this .updateStatement != null) {
1737: int rows[] = this .updateStatement.executeBatch();
1738: if (rows != null) {
1739: for (int i = 0; i < rows.length; i++) {
1740: if (rows[i] == Statement.SUCCESS_NO_INFO)
1741: this .updatedRows++;
1742: else if (rows[i] >= 0)
1743: this .updatedRows += rows[i];
1744: }
1745: }
1746: this .updateStatement.clearBatch();
1747: }
1748: } finally {
1749: this .batchRunning = false;
1750: if (this .batchStreams != null) {
1751: FileUtil.closeStreams(batchStreams);
1752: batchStreams.clear();
1753: batchStreams = null;
1754: }
1755: }
1756:
1757: if (this .commitBatch && !this .dbConn.getAutoCommit()) {
1758: this .dbConn.commit();
1759: }
1760: }
1761:
1762: private void finishTable() throws SQLException {
1763: boolean commitNeeded = this .transactionControl
1764: && !dbConn.getAutoCommit()
1765: && (this .commitEvery != Committer.NO_COMMIT_FLAG);
1766:
1767: try {
1768: if (this .useBatch) {
1769: this .executeBatch();
1770:
1771: // If the batch is executed and committed, there is no
1772: // need to send another commit. In fact some DBMS don't like
1773: // a commit or rollback if no transaction was started.
1774: if (commitBatch)
1775: commitNeeded = false;
1776: }
1777:
1778: this .closeStatements();
1779:
1780: if (this .tableStatements != null) {
1781: this .tableStatements.runPostTableStatement(dbConn,
1782: targetTable);
1783: }
1784:
1785: if (commitNeeded) {
1786: LogMgr.logInfo("DataImporter.finishTable()", this
1787: .getAffectedRows()
1788: + " row(s) imported. Committing changes");
1789: this .dbConn.commit();
1790: } else if (!transactionControl) {
1791: LogMgr
1792: .logInfo(
1793: "DataImporter.finishTable()",
1794: this .getAffectedRows()
1795: + " row(s) imported. Transaction control disabled. No commit sent to server");
1796: }
1797:
1798: this .messages.append(this .source.getMessages());
1799: if (this .insertedRows > -1) {
1800: this .messages.append(this .insertedRows
1801: + " "
1802: + ResourceMgr
1803: .getString("MsgCopyNumRowsInserted"));
1804: this .messages.appendNewLine();
1805: }
1806: if (this .updatedRows > -1) {
1807: this .messages.append(this .updatedRows
1808: + " "
1809: + ResourceMgr
1810: .getString("MsgCopyNumRowsUpdated"));
1811: }
1812: if (this .badWriter != null && badWriter.getRows() > 0) {
1813: this .messages.appendNewLine();
1814: this .messages.append(this .badWriter.getMessage());
1815: }
1816: this .messages.appendNewLine();
1817: this .hasErrors = this .source.hasErrors();
1818: this .hasWarnings = this .source.hasWarnings();
1819: } catch (SQLException e) {
1820: if (commitNeeded) {
1821: try {
1822: this .dbConn.rollback();
1823: } catch (Throwable ignore) {
1824: }
1825: }
1826: LogMgr.logError("DataImporter.finishTable()",
1827: "Error commiting changes", e);
1828: this .hasErrors = true;
1829: this .messages.append(ExceptionUtil.getDisplay(e));
1830: this .messages.appendNewLine();
1831: throw e;
1832: }
1833: }
1834:
1835: /**
1836: * Return the messages generated during import.
1837: * Calling this, clears the message buffer
1838: * @return the message buffer.
1839: * @see workbench.util.MessageBuffer#getBuffer()
1840: */
1841: public CharSequence getMessages() {
1842: return messages.getBuffer();
1843: }
1844:
1845: public void copyMessages(MessageBuffer target) {
1846: target.append(this .messages);
1847: clearMessages();
1848: }
1849:
1850: public void clearMessages() {
1851: this .messages.clear();
1852: }
1853:
1854: /**
1855: * Callback from the RowDataProducer
1856: */
1857: public void importFinished() {
1858: if (!isRunning)
1859: return;
1860: try {
1861: this .finishTable();
1862: } catch (SQLException sql) {
1863: // already logged in finishTable()
1864: } catch (Exception e) {
1865: // log all others...
1866: LogMgr.logError("DataImporter.importFinished()",
1867: "Error when commiting changes", e);
1868: this .messages.append(ExceptionUtil.getDisplay(e));
1869: this .hasErrors = true;
1870: } finally {
1871: this .isRunning = false;
1872: if (!multiTable) {
1873: if (this .progressMonitor != null)
1874: this .progressMonitor.jobFinished();
1875: }
1876: }
1877:
1878: this .hasErrors = this .hasErrors || this .source.hasErrors();
1879: this .hasWarnings = this .hasWarnings
1880: || this .source.hasWarnings();
1881: }
1882:
1883: private void cleanupRollback() {
1884: try {
1885: this .closeStatements();
1886: if (this .transactionControl && !this .dbConn.getAutoCommit()) {
1887: LogMgr.logInfo("DataImporter.cleanupRollback()",
1888: "Rollback changes");
1889: this .dbConn.rollback();
1890: this .updatedRows = 0;
1891: this .insertedRows = 0;
1892: }
1893: } catch (Exception e) {
1894: LogMgr.logError("DataImporter.cleanupRollback()",
1895: "Error on rollback", e);
1896: this .messages.append(ExceptionUtil.getDisplay(e));
1897: this .hasErrors = true;
1898: }
1899: this .isRunning = false;
1900: //this.messages.append(this.source.getMessages());
1901: if (this .progressMonitor != null)
1902: this .progressMonitor.jobFinished();
1903: }
1904:
1905: public void tableImportError() {
1906: cleanupRollback();
1907: }
1908:
1909: public void importCancelled() {
1910: if (!isRunning)
1911: return;
1912: if (this .partialImportEnded) {
1913: this .importFinished();
1914: return;
1915: }
1916:
1917: cleanupRollback();
1918: this .hasErrors = this .hasErrors || this .source.hasErrors();
1919: this .hasWarnings = this .hasWarnings
1920: || this .source.hasWarnings();
1921:
1922: }
1923:
1924: private void closeStatements() {
1925: if (this .insertStatement != null) {
1926: try {
1927: this .insertStatement.clearBatch();
1928: } catch (Throwable th) {
1929: }
1930: try {
1931: this .insertStatement.close();
1932: } catch (Throwable th) {
1933: }
1934: }
1935: if (this .updateStatement != null) {
1936: try {
1937: this .updateStatement.clearBatch();
1938: } catch (Throwable th) {
1939: }
1940: try {
1941: this .updateStatement.close();
1942: } catch (Throwable th) {
1943: }
1944: }
1945: }
1946:
1947: public void setReportInterval(int interval) {
1948: if (interval > 0) {
1949: this .reportInterval = interval;
1950: } else {
1951: this .reportInterval = 0;
1952: }
1953: }
1954:
1955: public String getTargetSchema() {
1956: return targetSchema;
1957: }
1958:
1959: public void setTargetSchema(String targetSchema) {
1960: this.targetSchema = targetSchema;
1961: }
1962:
1963: }
|