001: /*
002: * The contents of this file are subject to the terms of the Common Development
003: * and Distribution License (the License). You may not use this file except in
004: * compliance with the License.
005: *
006: * You can obtain a copy of the License at http://www.netbeans.org/cddl.html
007: * or http://www.netbeans.org/cddl.txt.
008: *
009: * When distributing Covered Code, include this CDDL Header Notice in each file
010: * and include the License file at http://www.netbeans.org/cddl.txt.
011: * If applicable, add the following below the CDDL Header, with the fields
012: * enclosed by brackets [] replaced by your own identifying information:
013: * "Portions Copyrighted [year] [name of copyright owner]"
014: *
015: * The Original Software is NetBeans. The Initial Developer of the Original
016: * Software is Sun Microsystems, Inc. Portions Copyright 1997-2007 Sun
017: * Microsystems, Inc. All Rights Reserved.
018: */
019: package org.netbeans.modules.etl.codegen.impl;
020:
021: import java.util.ArrayList;
022: import java.util.Collection;
023: import java.util.HashMap;
024: import java.util.Iterator;
025: import java.util.List;
026: import java.util.Map;
027: import java.util.Properties;
028:
029: import org.netbeans.modules.etl.codegen.ETLCodegenUtil;
030: import org.netbeans.modules.etl.codegen.ETLScriptBuilderModel;
031: import org.netbeans.modules.etl.codegen.ETLStrategyBuilder;
032: import org.netbeans.modules.etl.codegen.ETLStrategyBuilderContext;
033: import org.netbeans.modules.etl.codegen.PatternFinder;
034: import org.netbeans.modules.etl.utils.MessageManager;
035: import org.netbeans.modules.mashup.db.model.FlatfileDBTable;
036: import org.netbeans.modules.mashup.db.model.FlatfileDefinition;
037: import org.netbeans.modules.sql.framework.common.jdbc.SQLDBConnectionDefinition;
038: import org.netbeans.modules.sql.framework.codegen.DB;
039: import org.netbeans.modules.sql.framework.codegen.DBFactory;
040: import org.netbeans.modules.sql.framework.codegen.StatementContext;
041: import org.netbeans.modules.sql.framework.codegen.Statements;
042: import org.netbeans.modules.sql.framework.codegen.axion.AxionDB;
043: import org.netbeans.modules.sql.framework.model.SQLConstants;
044: import org.netbeans.modules.sql.framework.model.SQLDBTable;
045: import org.netbeans.modules.sql.framework.model.SourceTable;
046: import org.netbeans.modules.sql.framework.model.TargetTable;
047:
048: import com.sun.etl.engine.ETLEngine;
049: import com.sun.etl.engine.ETLTaskNode;
050: import com.sun.sql.framework.exception.BaseException;
051: import com.sun.sql.framework.jdbc.DBConnectionFactory;
052: import com.sun.sql.framework.jdbc.SQLPart;
053: import com.sun.sql.framework.utils.AttributeMap;
054: import com.sun.sql.framework.utils.StringUtil;
055: import org.netbeans.modules.mashup.db.model.impl.FlatfileDBTableImpl;
056: import org.netbeans.modules.sql.framework.model.DBConnectionDefinition;
057: import org.netbeans.modules.sql.framework.model.DBTable;
058:
059: /**
060: * Base class for all ETLStrategyBuilder classes.
061: *
062: * @author Ahimanikya Satapathy
063: * @version $Revision$
064: */
065: public abstract class BaseETLStrategyBuilder implements
066: ETLStrategyBuilder {
067:
068: protected static final MessageManager MSG_MGR = MessageManager
069: .getManager("org.netbeans.modules.etl.codegen.impl");
070: private static final String RUNTIME_INPUTS_MAP = "runtimeInputsMap";
071:
072: public static Properties getConnectionPropertiesFrom(
073: DBConnectionDefinition def) {
074: Properties props = null;
075: if (def instanceof SQLDBConnectionDefinition) {
076: props = ((SQLDBConnectionDefinition) def)
077: .getConnectionProperties();
078: } else {
079: props = new Properties();
080: props.put(DBConnectionFactory.PROP_DBTYPE, def.getDBType());
081: props.put(DBConnectionFactory.PROP_URL, def
082: .getConnectionURL());
083: props.put(DBConnectionFactory.PROP_USERNAME, def
084: .getUserName());
085: props.put(DBConnectionFactory.PROP_PASSWORD, def
086: .getPassword());
087: }
088: return props;
089: }
090:
091: protected ETLScriptBuilderModel builderModel = null;
092: protected DBConnectionFactory connFactory = DBConnectionFactory
093: .getInstance();
094: protected DBConnectionDefinition targtConDef = null;
095:
096: public BaseETLStrategyBuilder(ETLScriptBuilderModel model)
097: throws BaseException {
098: this .builderModel = model;
099: }
100:
101: public void setETLScriptBuilderModel(ETLScriptBuilderModel model) {
102: this .builderModel = model;
103: }
104:
105: protected void addCreateIfNotExistsSummaryTableStatement(
106: ETLTaskNode initTask) throws BaseException {
107: DB db = DBFactory.getInstance().getDatabase(DB.AXIONDB);
108: Statements stmts = db.getStatements();
109:
110: SQLPart createSummaryTablePart = stmts
111: .getCreateLogSummaryTableStatement(builderModel
112: .isMemoryMonitorDB());
113: createSummaryTablePart
114: .setType(SQLPart.STMT_CREATELOGSUMMARYTABLE);
115: createSummaryTablePart
116: .setConnectionPoolName(ETLScriptBuilderModel.ETL_MONITOR_DB_CONN_DEF_NAME);
117:
118: initTask.addStatement(createSummaryTablePart);
119: }
120:
121: /**
122: * @see org.netbeans.modules.etl.codegen.ETLStrategyBuilder#addInsertNewExecutionRecordStatement(org.netbeans.modules.etl.engine.ETLTaskNode)
123: */
124: protected void addInsertNewExecutionRecordStatement(
125: ETLTaskNode taskNode, TargetTable tt) throws BaseException {
126: DB db = DBFactory.getInstance().getDatabase(DB.AXIONDB);
127:
128: StatementContext context = new StatementContext();
129: context.setUsingUniqueTableName(true);
130:
131: Statements stmts = db.getStatements();
132: SQLPart insertStartDateIntoSummaryTablePart = stmts
133: .getInsertStartDateIntoSummaryTableStatement(tt,
134: context);
135: insertStartDateIntoSummaryTablePart
136: .setConnectionPoolName(ETLScriptBuilderModel.ETL_MONITOR_DB_CONN_DEF_NAME);
137:
138: context.setUsingFullyQualifiedTablePrefix(false);
139: insertStartDateIntoSummaryTablePart.setTableName(db
140: .getUnescapedName(db.getGeneratorFactory().generate(tt,
141: context)));
142:
143: taskNode.addStatement(insertStartDateIntoSummaryTablePart);
144: }
145:
146: protected void addSelectExecutionIdForNewExecutionRecordStatement(
147: ETLTaskNode pipeline, TargetTable tt) throws BaseException {
148: DB db = DBFactory.getInstance().getDatabase(DB.AXIONDB);
149:
150: StatementContext context = new StatementContext();
151: context.setUsingUniqueTableName(true);
152: context.setUsingFullyQualifiedTablePrefix(false);
153:
154: Statements stmts = db.getStatements();
155: SQLPart selectExecutionIdPart = stmts
156: .getSelectExecutionIdFromSummaryTableStatement(tt,
157: context);
158: selectExecutionIdPart
159: .setConnectionPoolName(ETLScriptBuilderModel.ETL_MONITOR_DB_CONN_DEF_NAME);
160:
161: selectExecutionIdPart.setTableName(db.getUnescapedName(db
162: .getGeneratorFactory().generate(tt, context)));
163:
164: pipeline.addStatement(selectExecutionIdPart);
165: }
166:
167: /**
168: * @see org.netbeans.modules.etl.codegen.ETLStrategyBuilder#addUpdateExecutionRecordPreparedStatement(org.netbeans.modules.etl.engine.ETLTaskNode,
169: * TargetTable)
170: */
171: protected void addUpdateExecutionRecordPreparedStatement(
172: ETLTaskNode updateStats, TargetTable tt)
173: throws BaseException {
174: DB db = DBFactory.getInstance().getDatabase(DB.AXIONDB);
175:
176: StatementContext context = new StatementContext();
177: context.setUsingFullyQualifiedTablePrefix(false);
178: context.setUsingUniqueTableName(true);
179:
180: Statements stmts = db.getStatements();
181: SQLPart updateEndDatePart = stmts
182: .getUpdateEndDateInSummaryTableStatement(tt, context);
183: updateEndDatePart
184: .setConnectionPoolName(ETLScriptBuilderModel.ETL_MONITOR_DB_CONN_DEF_NAME);
185:
186: String tableName = db.getUnescapedName(db.getGeneratorFactory()
187: .generate(tt, context));
188: updateEndDatePart.setTableName(tableName);
189: updateStats.addTableSpecificStatement(tableName,
190: updateEndDatePart);
191: }
192:
193: protected boolean areAllAssociatedTablesInternal(TargetTable tt)
194: throws BaseException {
195: Collection participatingTables = tt.getSourceTableList();
196: return PatternFinder.isInternalDBTable(tt)
197: && PatternFinder
198: .allDBTablesAreInternal(participatingTables
199: .iterator());
200: }
201:
202: protected void checkTargetConnectionDefinition(
203: ETLStrategyBuilderContext context) throws BaseException {
204: targtConDef = context.getModel().getConnectionDefinition(
205: context.getTargetTable());
206:
207: if (targtConDef == null) {
208: throw new BaseException(
209: "Target table connection definition is null.");
210: }
211: }
212:
213: protected SQLPart createSQLPart(String sqlString, String key,
214: String conDefnName) {
215: SQLPart sqlPart = new SQLPart(sqlString, key, conDefnName);
216: return sqlPart;
217: }
218:
219: protected void createTargetTableIfNotExists(TargetTable table,
220: ETLTaskNode taskNode, String trgtConnName, DB targetDB,
221: StatementContext context) throws BaseException {
222: if (table.isCreateTargetTable()
223: && (!(targetDB instanceof AxionDB))) {
224: StringBuilder sqlBuffer = new StringBuilder(200);
225:
226: Statements stmts = targetDB.getStatements();
227:
228: context.clearAllUsingTempTableName();
229:
230: SQLPart ifExists = stmts.getTableExistsStatement(table,
231: context);
232: SQLPart doCreate = stmts.getCreateStatement(table, context);
233:
234: sqlBuffer.append(ifExists.getSQL()).append(
235: SQLPart.STATEMENT_SEPARATOR);
236: sqlBuffer.append(doCreate.getSQL());
237:
238: SQLPart createIfExists = new SQLPart(sqlBuffer.toString(),
239: SQLPart.STMT_CREATEBEFOREPROCESS, trgtConnName);
240: taskNode.addStatement(createIfExists);
241: }
242: }
243:
244: protected void createTransformerSQLPart(TargetTable tt,
245: boolean useSourceFilter, ETLTaskNode transformerTask,
246: String trgtConnName, ETLTaskNode cleanupTask, DB targetDB,
247: DB statsDB) throws BaseException {
248:
249: StatementContext context = new StatementContext();
250: context.setUsingFullyQualifiedTablePrefix(false);
251: context.setUsingUniqueTableName(true);
252: final String statsTableName = statsDB.getUnescapedName(statsDB
253: .getGeneratorFactory().generate(tt, context));
254: transformerTask.setTableName(statsTableName);
255:
256: int statementType = tt.getStatementType();
257:
258: SQLPart insertSelectPart = null;
259: SQLPart defragPartStatement = null;
260:
261: List srcTblList = tt.getSourceTableList();
262:
263: insertSelectPart = generateTransformerSQLPart(tt,
264: useSourceFilter, targetDB, statementType, srcTblList);
265: insertSelectPart.setTableName(statsTableName);
266: insertSelectPart.setConnectionPoolName(trgtConnName);
267: transformerTask.addStatement(insertSelectPart);
268:
269: // defrag if required.
270: defragPartStatement = generateDefragPart(tt, targetDB,
271: statementType);
272: if (defragPartStatement != null) {
273: defragPartStatement.setConnectionPoolName(trgtConnName);
274: cleanupTask.addStatement(defragPartStatement);
275: }
276:
277: StatementContext tgtContext = new StatementContext();
278: if ((this .builderModel.isConnectionDefinitionOverridesApplied())
279: && (PatternFinder.isInternalDBTable(tt))) {
280: tgtContext.setUsingUniqueTableName(tt, true);
281: }
282: createTargetTableIfNotExists(tt, transformerTask, trgtConnName,
283: targetDB, tgtContext);
284: truncateTargetTableIfExists(tt, transformerTask, trgtConnName,
285: targetDB.getStatements(), tgtContext);
286:
287: // Insert new execution record for target table, tt.
288: addInsertNewExecutionRecordStatement(transformerTask, tt);
289:
290: // Add query to obtain execution id assigned to new execution record.
291: addSelectExecutionIdForNewExecutionRecordStatement(
292: transformerTask, tt);
293: }
294:
295: protected void createCorrelatedUpdateSQLParts(TargetTable tt,
296: ETLTaskNode correlatedQueryExecutor, String trgtConnName,
297: DB targetDB, DB statsDB, boolean genInsertSelect)
298: throws BaseException {
299: SQLPart insertSelect = null;
300: SQLPart select = null;
301: SQLPart update = null;
302:
303: StatementContext context = new StatementContext();
304: context.setUsingFullyQualifiedTablePrefix(false);
305: context.setUsingUniqueTableName(true);
306: final String statsTableName = statsDB.getUnescapedName(statsDB
307: .getGeneratorFactory().generate(tt, context));
308: correlatedQueryExecutor.setTableName(statsTableName);
309:
310: Map map = getCorrelatedUpdateQueries(tt, targetDB,
311: genInsertSelect);
312: insertSelect = (SQLPart) map.get(SQLPart.STMT_INSERTSELECT);
313: select = (SQLPart) map.get(SQLPart.STMT_CORRELATED_SELECT);
314: update = (SQLPart) map.get(SQLPart.STMT_CORRELATED_UPDATE);
315:
316: select.setTableName(statsTableName);
317: select.setConnectionPoolName(trgtConnName);
318:
319: update.setTableName(statsTableName);
320: update.setConnectionPoolName(trgtConnName);
321:
322: correlatedQueryExecutor.addStatement(select);
323: correlatedQueryExecutor.addStatement(update);
324:
325: if (genInsertSelect) {
326: insertSelect.setTableName(statsTableName);
327: insertSelect.setConnectionPoolName(trgtConnName);
328: correlatedQueryExecutor.addStatement(insertSelect);
329: }
330:
331: StatementContext tgtContext = new StatementContext();
332: createTargetTableIfNotExists(tt, correlatedQueryExecutor,
333: trgtConnName, targetDB, tgtContext);
334: truncateTargetTableIfExists(tt, correlatedQueryExecutor,
335: trgtConnName, targetDB.getStatements(), tgtContext);
336:
337: // Insert new execution record for target table, tt.
338: addInsertNewExecutionRecordStatement(correlatedQueryExecutor,
339: tt);
340:
341: // Add query to obtain execution id assigned to new execution record.
342: addSelectExecutionIdForNewExecutionRecordStatement(
343: correlatedQueryExecutor, tt);
344:
345: AttributeMap attrMap = new AttributeMap();
346: attrMap.put("batchSize", tt.getBatchSize() + ""); // NOI18N
347: correlatedQueryExecutor.setAttributeMap(attrMap);
348: }
349:
350: protected Map getCorrelatedUpdateQueries(TargetTable tt,
351: DB targetDB, boolean genInsertSelect) throws BaseException {
352: Map ret = new HashMap();
353: StatementContext context;
354: List srcTblList = tt.getSourceTableList();
355:
356: Statements stmts = targetDB.getStatements();
357: context = new StatementContext();
358:
359: Iterator iter = srcTblList.iterator();
360: while (iter.hasNext()) {
361: SourceTable srcTable = (SourceTable) iter.next();
362: if (isExtractionRequired(srcTable, tt)) {
363: context.setUsingTempTableName(srcTable, true);
364: } else {
365: useUniqueNameIfRequired(srcTable, context);
366: }
367: }
368: context.putClientProperty(RUNTIME_INPUTS_MAP, this .builderModel
369: .getEngine().getInputAttrMap());
370: ret = stmts.getCorrelatedUpdateStatement(tt, context);
371:
372: if (genInsertSelect) {
373: context.putClientProperty("useWhere", Boolean.TRUE);
374: SQLPart insertSelectPart = stmts.getInsertSelectStatement(
375: tt, context);
376: ret.put(SQLPart.STMT_INSERTSELECT, insertSelectPart);
377: }
378: return ret;
379: }
380:
381: // Generate defrag when required.
382: protected SQLPart generateDefragPart(TargetTable tt, DB targetDB,
383: int statementType) throws BaseException {
384: SQLPart defragPartStatement = null;
385: StatementContext localContext = new StatementContext();
386: localContext.setUsingFullyQualifiedTablePrefix(false);
387: localContext.setUsingUniqueTableName(true);
388:
389: switch (statementType) {
390: case SQLConstants.UPDATE_STATEMENT:
391: case SQLConstants.INSERT_UPDATE_STATEMENT:
392: case SQLConstants.DELETE_STATEMENT:
393: defragPartStatement = targetDB.getStatements()
394: .getDefragStatement(tt, localContext);
395: break;
396:
397: default:
398: break;
399: }
400: return defragPartStatement;
401: }
402:
403: protected final void useUniqueNameIfRequired(SQLDBTable table,
404: StatementContext sc) throws BaseException {
405: if ((this .builderModel.isConnectionDefinitionOverridesApplied())
406: && (PatternFinder.isInternalDBTable(table))) {
407: sc.setUsingUniqueTableName(table, true);
408: }
409: }
410:
411: // Generate Insert Select Part
412: protected SQLPart generateTransformerSQLPart(TargetTable tt,
413: boolean useSourceFilter, DB targetDB, int statementType,
414: List srcTblList) throws BaseException {
415: Statements stmts = targetDB.getStatements();
416: StatementContext context = new StatementContext();
417:
418: Iterator iter = srcTblList.iterator();
419: while (iter.hasNext()) {
420: SourceTable srcTable = (SourceTable) iter.next();
421: if (isExtractionRequired(srcTable, tt)) {
422: context.setUsingTempTableName(srcTable, true);
423: } else {
424: useUniqueNameIfRequired(srcTable, context);
425: }
426: }
427:
428: useUniqueNameIfRequired(tt, context);
429:
430: SQLPart insertSelectPart;
431: switch (statementType) {
432: case SQLConstants.UPDATE_STATEMENT:
433: insertSelectPart = stmts.getUpdateStatement(tt, context);
434: break;
435:
436: case SQLConstants.INSERT_UPDATE_STATEMENT:
437: // Generate merge-select statement for transformation.
438: insertSelectPart = stmts.getMergeStatement(tt, context);
439: break;
440:
441: case SQLConstants.DELETE_STATEMENT:
442: context.putClientProperty("useWhere", Boolean.TRUE);
443: insertSelectPart = stmts.getDeleteStatement(tt, context);
444: break;
445:
446: default:
447: if (srcTblList == null || srcTblList.size() == 0) {
448: insertSelectPart = stmts.getStaticInsertStatement(tt,
449: context);
450: } else {
451: // Generate insert select statement for transformation.
452: context.putClientProperty("useWhere", Boolean
453: .valueOf(useSourceFilter));
454: insertSelectPart = stmts.getInsertSelectStatement(tt,
455: context);
456: }
457: break;
458: }
459: return insertSelectPart;
460: }
461:
462: protected SQLPart generateSQLPart(FlatfileDBTable flatfileDBTable,
463: String tableName, String staticDirectory, String stmtType,
464: String connPoolName, String flatfileRuntimeFilePath,
465: boolean isDynamicPath, boolean createDataFileIfNotExist) {
466:
467: SQLPart sqlPart = null;
468:
469: // NOTE: getCreateStatementSQL() and getDropStatementSQL() were modified not to
470: // have the side effect of assigning tableName as flatfileDBTable's new name. If
471: // this is required, then cast FlatfileDBTable to FlatfileDBTableImpl and call
472: // setName().
473: if (stmtType.equals("DROP")) {
474: sqlPart = createSQLPart(FlatfileDBTableImpl
475: .getDropStatementSQL(tableName), stmtType,
476: connPoolName);
477: } else if (stmtType.equals("CREATE")) {
478: sqlPart = createSQLPart(flatfileDBTable
479: .getCreateStatementSQL(staticDirectory, tableName,
480: flatfileRuntimeFilePath, isDynamicPath,
481: createDataFileIfNotExist), stmtType,
482: connPoolName);
483: }
484:
485: // Provide a fully-qualified filename, appending staticDirectory as the path, as
486: // default value if dynamic path is true; otherwise, use just the sample filename
487: // as supplied from the Database.
488: String defaultValue = (isDynamicPath ? staticDirectory + "/"
489: : "")
490: + flatfileDBTable.getFileName();
491: sqlPart.setDefaultValue(defaultValue);
492:
493: sqlPart.setTableName(tableName);
494: sqlPart.setConnectionPoolName(connPoolName);
495:
496: return sqlPart;
497: }
498:
499: /**
500: * Generate Transformer comment line for a given target table
501: *
502: * @param targetTable
503: * @return Comment String
504: */
505: protected String getCommentForTransformer(TargetTable targetTable)
506: throws BaseException {
507: return MSG_MGR.getString("DISPLAY_INSERT_TARGET", targetTable
508: .getParent().getModelName(), this .builderModel
509: .getConnectionDefinition(targetTable).getDBType());
510: }
511:
512: /**
513: * @param srcType
514: * @return
515: * @throws BaseException
516: */
517: protected DB getDBFor(DBConnectionDefinition srcConDefn)
518: throws BaseException {
519: int srcType = connFactory
520: .getDatabaseVersion(getConnectionPropertiesFrom(srcConDefn));
521: return DBFactory.getInstance().getDatabase(srcType);
522: }
523:
524: protected ETLEngine getEngine() {
525: return builderModel.getEngine();
526: }
527:
528: protected Statements getStatementsForTableDB(DBTable table)
529: throws BaseException {
530: DBConnectionDefinition cd = table.getParent()
531: .getConnectionDefinition();
532: DB db = getDBFor(cd);
533: return db.getStatements();
534: }
535:
536: protected Statements getStatementsForTableDB(DBTable table,
537: ETLStrategyBuilderContext context) throws BaseException {
538: DBConnectionDefinition cd = context.getModel()
539: .getConnectionDefinition(table);
540: DB db = getDBFor(cd);
541: return db.getStatements();
542: }
543:
544: protected Statements getStatementsForTargetTableDB(
545: ETLStrategyBuilderContext context) throws BaseException {
546: DBTable table = context.getTargetTable();
547: DBConnectionDefinition cd = context.getModel()
548: .getConnectionDefinition(table);
549: DB db = getDBFor(cd);
550: return db.getStatements();
551: }
552:
553: protected void getFlatfileInitSQLParts(FlatfileDefinition ffdb,
554: InternalDBMetadata ffMetadata, boolean isAllDBTypeInternal,
555: ETLTaskNode initTask, ETLTaskNode cleanupTask,
556: String connPoolName, SQLDBTable flatfileRuntime,
557: boolean createDataFileIfNotExist) {
558: String staticDirectory = ffMetadata.getStaticDirectory();
559: boolean isDynamicFilePath = ffMetadata.isDynamicFilePath();
560:
561: String tableName = null;
562: String flatfileRuntimeFilePath = flatfileRuntime
563: .getRuntimeArgumentName();
564: String oId = flatfileRuntime.getUniqueTableName();
565: FlatfileDBTable flatfileTable = (FlatfileDBTable) ffdb
566: .getTable(flatfileRuntime.getName());
567:
568: // if (isAllDBTypeInternal) {
569: tableName = oId;
570: // } else {
571: // tableName = flatfileTable.getName();
572: // }
573:
574: if (flatfileTable != null) {
575: SQLPart dropPart = generateSQLPart(flatfileTable,
576: tableName, staticDirectory, "DROP", connPoolName,
577: flatfileRuntimeFilePath, isDynamicFilePath,
578: createDataFileIfNotExist);
579: SQLPart createPart = generateSQLPart(flatfileTable,
580: tableName, staticDirectory, "CREATE", connPoolName,
581: flatfileRuntimeFilePath, isDynamicFilePath,
582: createDataFileIfNotExist);
583:
584: initTask.addOptionalTask(dropPart);
585: initTask.addOptionalTask(createPart);
586: cleanupTask.addOptionalTask(dropPart);
587: }
588: }
589:
590: protected void getFlatfileInitSQLParts(FlatfileDefinition ffdb,
591: InternalDBMetadata internalMetadata, ETLTaskNode initTask,
592: ETLTaskNode cleanupTask, SQLDBTable flatfileRuntime,
593: boolean createDataFileIfNotExist) {
594: String staticDirectory = internalMetadata.getStaticDirectory();
595: boolean isDynamicFilePath = internalMetadata
596: .isDynamicFilePath();
597:
598: FlatfileDBTable flatfileTable = (FlatfileDBTable) ffdb
599: .getTable(flatfileRuntime.getName());
600: String tableName = flatfileRuntime.getUniqueTableName();
601: String flatfileRuntimeFilePath = flatfileRuntime
602: .getRuntimeArgumentName();
603:
604: if (flatfileTable != null) {
605: SQLPart dropPart = generateSQLPart(
606: flatfileTable,
607: tableName,
608: staticDirectory,
609: "DROP",
610: ETLScriptBuilderModel.ETL_INSTANCE_DB_CONN_DEF_NAME,
611: flatfileRuntimeFilePath, isDynamicFilePath,
612: createDataFileIfNotExist);
613: SQLPart createPart = generateSQLPart(
614: flatfileTable,
615: tableName,
616: staticDirectory,
617: "CREATE",
618: ETLScriptBuilderModel.ETL_INSTANCE_DB_CONN_DEF_NAME,
619: flatfileRuntimeFilePath, isDynamicFilePath,
620: createDataFileIfNotExist);
621:
622: initTask.addOptionalTask(dropPart);
623: initTask.addOptionalTask(createPart);
624: cleanupTask.addOptionalTask(dropPart);
625: }
626: }
627:
628: protected String getTargetConnName() {
629:
630: if (targtConDef.getDriverClass().equals(
631: "org.axiondb.jdbc.AxionDriver")
632: && builderModel
633: .isConnectionDefinitionOverridesApplied()) {
634: return ETLScriptBuilderModel.ETL_INSTANCE_DB_CONN_DEF_NAME;
635: }
636: return targtConDef.getName();
637: }
638:
639: protected Statements getTargetStatements() throws BaseException {
640: DB aDb = getDBFor(targtConDef);
641: return aDb.getStatements();
642: }
643:
644: protected String getTransformerSQL(TargetTable targetTable,
645: DB targetDB, List srcTblList, boolean useSourceFilter)
646: throws BaseException {
647: String sql = null;
648: int statementType = targetTable.getStatementType();
649: if ((targetDB.getDBType() == DB.JDBCDB)
650: && (targetTable.getSourceTableList().size() != 0)
651: && ((statementType == SQLConstants.UPDATE_STATEMENT) || (statementType == SQLConstants.INSERT_UPDATE_STATEMENT))) {
652: Map map = this
653: .getCorrelatedUpdateQueries(
654: targetTable,
655: targetDB,
656: (statementType == SQLConstants.INSERT_UPDATE_STATEMENT));
657: SQLPart select = (SQLPart) map
658: .get(SQLPart.STMT_CORRELATED_SELECT);
659: SQLPart update = (SQLPart) map
660: .get(SQLPart.STMT_CORRELATED_UPDATE);
661: sql = select.getSQL();
662: sql = sql + "\n\n\n" + update.getSQL();
663: } else {
664: SQLPart insertSelectPart = generateTransformerSQLPart(
665: targetTable, useSourceFilter, targetDB,
666: statementType, srcTblList);
667: sql = insertSelectPart.getSQL();
668: }
669:
670: return sql;
671: }
672:
673: protected boolean isExtractionRequired(SourceTable sourceTable,
674: TargetTable targetTable) throws BaseException {
675: return (!PatternFinder.isFromSameDB(sourceTable, targetTable,
676: this .builderModel));
677: }
678:
679: protected List makeUniqueTableNames(
680: ETLStrategyBuilderContext context) throws BaseException {
681: Iterator iter = context.getTargetTable().getSourceTableList()
682: .iterator();
683: List oldUserDefinedList = new ArrayList();
684: while (iter.hasNext()) {
685: SQLDBTable dbTable = (SQLDBTable) iter.next();
686: oldUserDefinedList.add(dbTable.getUserDefinedTableName());
687: makeTableNameUnique(dbTable);
688: }
689:
690: SQLDBTable tt = context.getTargetTable();
691: oldUserDefinedList.add(tt.getUserDefinedTableName());
692: makeTableNameUnique(tt);
693: return oldUserDefinedList;
694: }
695:
696: protected void populateInitTask(ETLTaskNode initTask,
697: ETLTaskNode cleanupTask, TargetTable tTable)
698: throws BaseException {
699: DBConnectionDefinition dbCondefn = builderModel
700: .getConnectionDefinition(tTable);
701: String tgtConnPoolName = dbCondefn.getName();
702:
703: InternalDBMetadata tgtInternalMetadata = builderModel
704: .getInternalMetadata(tTable);
705: if (tgtInternalMetadata == null) {
706: // Placeholder instance.
707: tgtInternalMetadata = new InternalDBMetadata(null, false,
708: null);
709: }
710:
711: int targetDBType = connFactory
712: .getDatabaseVersion(getConnectionPropertiesFrom(dbCondefn));
713: String poolName = (targetDBType == DB.AXIONDB) ? ETLScriptBuilderModel.ETL_INSTANCE_DB_CONN_DEF_NAME
714: : tgtConnPoolName;
715: populateFlatfileMetadata(tTable, poolName, tgtInternalMetadata,
716: initTask, cleanupTask);
717:
718: DB db = DBFactory.getInstance().getDatabase(targetDBType);
719: StatementContext context = new StatementContext();
720: SQLPart initStatements = db.getStatements()
721: .getInitializationStatements(context);
722:
723: if (initStatements != null) {
724: initStatements.setConnectionPoolName(poolName);
725: initTask.addStatement(initStatements);
726: }
727: }
728:
729: protected void populateFlatfileMetadata(TargetTable tTable,
730: String tgtConnPoolName,
731: InternalDBMetadata tgtInternalMetadata,
732: ETLTaskNode initTask, ETLTaskNode cleanupTask)
733: throws BaseException {
734: Collection participatingTables = tTable.getSourceTableList();
735: boolean isAllDBTypeInternal = PatternFinder
736: .isInternalDBTable(tTable)
737: && PatternFinder
738: .allDBTablesAreInternal(participatingTables
739: .iterator());
740:
741: // For Target Table
742: FlatfileDefinition tgtDB = ETLCodegenUtil
743: .getStcdbObjectTypeDefinition(tTable);
744: if (tgtDB != null && tgtInternalMetadata != null
745: && tgtInternalMetadata.getStaticDirectory() != null) {
746: getFlatfileInitSQLParts(tgtDB, tgtInternalMetadata,
747: isAllDBTypeInternal, initTask, cleanupTask,
748: tgtConnPoolName, tTable, true);
749: }
750:
751: // For all SourceTables associated with Target table
752: for (Iterator it = participatingTables.iterator(); it.hasNext();) {
753: SQLDBTable dbTable = (SQLDBTable) it.next();
754: FlatfileDefinition srcDB = ETLCodegenUtil
755: .getFFDefinition(dbTable);
756: if (srcDB != null) {
757: DBConnectionDefinition srcCondefn = builderModel
758: .getConnectionDefinition(dbTable);
759: InternalDBMetadata srcInternalMetadata = builderModel
760: .getInternalMetadata(dbTable);
761: String conDefnName = getConDefnName(srcCondefn);
762:
763: if (srcInternalMetadata != null) {
764: if (isAllDBTypeInternal) {
765: conDefnName = tgtConnPoolName;
766: }
767: getFlatfileInitSQLParts(srcDB, srcInternalMetadata,
768: isAllDBTypeInternal, initTask, cleanupTask,
769: conDefnName, dbTable, false);
770: }
771: }
772: }
773: }
774:
775: protected String getConDefnName(DBConnectionDefinition srcCondefn) {
776: if (srcCondefn.getDriverClass().equals(
777: "org.axiondb.jdbc.AxionDriver")
778: && builderModel
779: .isConnectionDefinitionOverridesApplied()) {
780: return ETLScriptBuilderModel.ETL_INSTANCE_DB_CONN_DEF_NAME;
781: }
782: return srcCondefn.getName();
783: }
784:
785: protected void truncateTargetTableIfExists(TargetTable tt,
786: ETLTaskNode taskNode, String trgtConnName,
787: Statements stmts, StatementContext sc) throws BaseException {
788: if (tt.isTruncateBeforeLoad()) {
789: truncateTableIfExists((SQLDBTable) tt, taskNode,
790: trgtConnName, stmts, sc);
791: }
792: }
793:
794: protected void truncateTableIfExists(SQLDBTable dbt,
795: ETLTaskNode taskNode, String trgtConnName,
796: Statements stmts, StatementContext sc) throws BaseException {
797: SQLPart ifExists = stmts.getTableExistsStatement(dbt, sc);
798: String existsSql = (ifExists != null) ? ifExists.getSQL()
799: .trim() : null;
800: StringBuilder sqlBuffer = StringUtil.isNullString(existsSql) ? new StringBuilder(
801: 200)
802: : new StringBuilder(existsSql);
803:
804: SQLPart doTruncate = stmts.getTruncateStatement(dbt, sc);
805: String truncateSql = doTruncate.getSQL().trim();
806: if (!StringUtil.isNullString(truncateSql)) {
807: if (sqlBuffer.length() != 0) {
808: sqlBuffer.append(SQLPart.STATEMENT_SEPARATOR);
809: }
810:
811: sqlBuffer.append(truncateSql);
812: SQLPart truncateIfExists = new SQLPart(
813: sqlBuffer.toString(),
814: SQLPart.STMT_TRUNCATEBEFOREPROCESS, trgtConnName);
815: taskNode.addStatement(truncateIfExists);
816: } else {
817: truncateSql = "";
818: }
819:
820: StatementContext context = new StatementContext();
821: context.putClientProperty("useWhere", Boolean.FALSE);
822: SQLPart doDelete = stmts.getDeleteStatement(dbt, context);
823: String deleteSql = doDelete.getSQL().trim();
824:
825: // Only add the fallback delete statement if it is not identical to the
826: // truncate statement. The delete and truncate statements are identical for
827: // those DBs which do not have a separate 'truncate' command.
828: if (!StringUtil.isNullString(deleteSql)
829: && !deleteSql.equalsIgnoreCase(truncateSql)) {
830: sqlBuffer = StringUtil.isNullString(existsSql) ? new StringBuilder(
831: 200)
832: : new StringBuilder(existsSql);
833: if (sqlBuffer.length() != 0) {
834: sqlBuffer.append(SQLPart.STATEMENT_SEPARATOR);
835: }
836:
837: sqlBuffer.append(deleteSql);
838:
839: SQLPart deleteIfExists = new SQLPart(sqlBuffer.toString(),
840: SQLPart.STMT_DELETEBEFOREPROCESS, trgtConnName);
841: taskNode.addStatement(deleteIfExists);
842: }
843: }
844:
845: private void makeTableNameUnique(SQLDBTable dbTable) {
846: String name = dbTable.getUniqueTableName();
847: dbTable.setUserDefinedTableName(name);
848: }
849: }
|