001: /*
002: * The contents of this file are subject to the terms of the Common Development
003: * and Distribution License (the License). You may not use this file except in
004: * compliance with the License.
005: *
006: * You can obtain a copy of the License at http://www.netbeans.org/cddl.html
007: * or http://www.netbeans.org/cddl.txt.
008: *
009: * When distributing Covered Code, include this CDDL Header Notice in each file
010: * and include the License file at http://www.netbeans.org/cddl.txt.
011: * If applicable, add the following below the CDDL Header, with the fields
012: * enclosed by brackets [] replaced by your own identifying information:
013: * "Portions Copyrighted [year] [name of copyright owner]"
014: *
015: * The Original Software is NetBeans. The Initial Developer of the Original
016: * Software is Sun Microsystems, Inc. Portions Copyright 1997-2007 Sun
017: * Microsystems, Inc. All Rights Reserved.
018: */
019: package org.netbeans.modules.etl.codegen.impl;
020:
021: import java.util.ArrayList;
022: import java.util.Collection;
023: import java.util.HashMap;
024: import java.util.HashSet;
025: import java.util.Iterator;
026: import java.util.List;
027: import java.util.Map;
028: import java.util.Set;
029:
030: import org.netbeans.modules.etl.codegen.ETLCodegenUtil;
031: import org.netbeans.modules.etl.codegen.ETLScriptBuilderModel;
032: import org.netbeans.modules.etl.codegen.ETLStrategyBuilderContext;
033: import org.netbeans.modules.etl.utils.MessageManager;
034: import org.netbeans.modules.mashup.db.model.FlatfileDefinition;
035: import org.netbeans.modules.sql.framework.common.jdbc.SQLDBConnectionDefinition;
036: import org.netbeans.modules.sql.framework.common.jdbc.SQLUtils;
037: import org.netbeans.modules.sql.framework.codegen.DB;
038: import org.netbeans.modules.sql.framework.codegen.DBFactory;
039: import org.netbeans.modules.sql.framework.codegen.StatementContext;
040: import org.netbeans.modules.sql.framework.codegen.Statements;
041: import org.netbeans.modules.sql.framework.codegen.axion.AxionDB;
042: import org.netbeans.modules.sql.framework.codegen.axion.AxionPipelineStatements;
043: import org.netbeans.modules.sql.framework.codegen.axion.AxionStatements;
044: import org.netbeans.modules.sql.framework.model.SQLConstants;
045: import org.netbeans.modules.sql.framework.model.SQLDBTable;
046: import org.netbeans.modules.sql.framework.model.SQLModelObjectFactory;
047: import org.netbeans.modules.sql.framework.model.TargetTable;
048:
049: import com.sun.etl.engine.ETLEngine;
050: import com.sun.etl.engine.ETLTask;
051: import com.sun.etl.engine.ETLTaskNode;
052: import com.sun.sql.framework.exception.BaseException;
053: import com.sun.sql.framework.jdbc.DBConstants;
054: import com.sun.sql.framework.jdbc.SQLPart;
055: import net.java.hulp.i18n.Logger;
056: import com.sun.sql.framework.utils.StringUtil;
057: import org.netbeans.modules.etl.logger.Localizer;
058: import org.netbeans.modules.etl.logger.LogUtil;
059: import org.netbeans.modules.sql.framework.model.DBConnectionDefinition;
060:
061: /**
062: * @author Jonathan Giron
063: * @author Ritesh Adval
064: * @version $Revision$
065: */
066: public class PipelinedStrategyBuilderImpl extends
067: BaseETLStrategyBuilder {
068:
069: private static final String LOG_CATEGORY = PipelinedStrategyBuilderImpl.class
070: .getName();
071: private static final MessageManager msgMgr = MessageManager
072: .getManager(ETLTaskNode.class);
073: private static transient final Logger mLogger = LogUtil
074: .getLogger(PipelinedStrategyBuilderImpl.class.getName());
075: private static transient final Localizer mLoc = Localizer.get();
076: protected AxionDB db;
077: protected Map linkTableMap;
078: protected AxionPipelineStatements pipelineStmts;
079: protected List remoteTables;
080: protected AxionStatements stmts;
081:
082: public PipelinedStrategyBuilderImpl(ETLScriptBuilderModel model)
083: throws BaseException {
084: super (model);
085:
086: db = (AxionDB) DBFactory.getInstance().getDatabase(DB.AXIONDB);
087: stmts = (AxionStatements) db.getStatements();
088: pipelineStmts = db.getAxionPipelineStatements();
089: linkTableMap = new HashMap();
090: remoteTables = new ArrayList(3);
091: }
092:
093: public void addSelectRejectedRowCountStatement(
094: ETLTaskNode updateStats, TargetTable tt)
095: throws BaseException {
096: // Empty to be overridden by sub class
097: }
098:
099: /**
100: * Builds ETLTaskNode to hold and execute statements associated with execution cleanup
101: * for the given List of TargetTables.
102: *
103: * @param pfGen PipelinedFlowGenerator to use in building the cleanup task node
104: * @param targetTables List of TargetTables whose cleanup statements are to be
105: * generated
106: * @return ETLTaskNode containing cleanup statements to be executed
107: * @throws BaseException if error occurs during statement generation
108: */
109: public ETLTaskNode buildCleanupTask(List targetTables)
110: throws BaseException {
111: ETLTaskNode cleanupTask = this .getEngine().createETLTaskNode(
112: ETLEngine.CLEANUP);
113: buildCleanupStatements(cleanupTask, targetTables);
114:
115: // Add statement to shut down the pipelining database.
116: SQLPart shutdownInternalDB = new SQLPart("SHUTDOWN",
117: SQLPart.STMT_DEFRAG,
118: ETLScriptBuilderModel.ETL_INSTANCE_DB_CONN_DEF_NAME);
119: cleanupTask.addStatement(shutdownInternalDB);
120:
121: if (this .builderModel.isShutdownMonitorDB()) {
122: shutdownInternalDB = new SQLPart("SHUTDOWN",
123: SQLPart.STMT_DEFRAG,
124: ETLScriptBuilderModel.ETL_MONITOR_DB_CONN_DEF_NAME);
125: cleanupTask.addOptionalTask(shutdownInternalDB);
126: }
127:
128: return cleanupTask;
129: }
130:
131: public ETLTaskNode buildInitTask(List targetTables)
132: throws BaseException {
133: ETLEngine engine = this .getEngine();
134: ETLTaskNode initTask = engine.createETLTaskNode(ETLEngine.INIT);
135: Map connDefToLinkName = new HashMap();
136:
137: linkTableMap.clear();
138:
139: // Create set of all tables participating in the collaboration.
140: Set tables = new HashSet(10);
141: tables.addAll(targetTables);
142: Iterator ttIter = targetTables.iterator();
143: while (ttIter.hasNext()) {
144: TargetTable tt = (TargetTable) ttIter.next();
145: tables.addAll(tt.getSourceTableList());
146: }
147:
148: // Create execution summary table if it does not exist.
149: addCreateIfNotExistsSummaryTableStatement(initTask);
150:
151: // Now build all required initialization statements for each table.
152: Iterator tblIter = tables.iterator();
153: while (tblIter.hasNext()) {
154: SQLDBTable table = (SQLDBTable) tblIter.next();
155: buildInitializationStatements(table, initTask,
156: connDefToLinkName);
157: }
158:
159: return initTask;
160: }
161:
162: /**
163: * Before calling apply appropriate applyConnections
164: */
165: public void generateScriptForTable(ETLStrategyBuilderContext context)
166: throws BaseException {
167: mLogger.infoNoloc(mLoc.t(
168: "PRSR008: Looping through target tables:{0}",
169: LOG_CATEGORY));
170: checkTargetConnectionDefinition(context);
171:
172: TargetTable tt = context.getTargetTable();
173:
174: buildFlatfileSQLParts(context.getInitTask(), context
175: .getGlobalCleanUpTask(), context.getTargetTable(),
176: context.getModel());
177:
178: ETLTaskNode pipelineTask = buildPipelineTransformTask(context,
179: tt);
180:
181: context.getInitTask().addNextETLTaskNode(ETLTask.SUCCESS,
182: pipelineTask.getId());
183: context.setLastPipelinedTask(pipelineTask);
184: pipelineTask.addNextETLTaskNode(ETLTask.SUCCESS, context
185: .getNextTaskOnSuccess().getId());
186: pipelineTask.addNextETLTaskNode(ETLTask.EXCEPTION, context
187: .getNextTaskOnException().getId());
188:
189: // Add query to obtain execution id assigned to new execution record.
190: addSelectExecutionIdForNewExecutionRecordStatement(
191: pipelineTask, tt);
192:
193: // Add queries to get count of rejected rows and update the execution record
194: // in update statistics task
195: addSelectRejectedRowCountStatement(
196: context.getStatsUpdateTask(), tt);
197: addUpdateExecutionRecordPreparedStatement(context
198: .getStatsUpdateTask(), tt);
199: }
200:
201: public String getScriptToDisplay(ETLStrategyBuilderContext context)
202: throws BaseException {
203: super .checkTargetConnectionDefinition(context);
204:
205: StringBuilder buffer = new StringBuilder();
206: TargetTable targetTable = context.getTargetTable();
207:
208: StatementContext stmtContext = new StatementContext();
209: stmtContext.setUsingFullyQualifiedTablePrefix(false);
210:
211: String transformSQL = createTransformStatement(targetTable,
212: stmtContext).getSQL();
213: buffer.append(getCommentForTransformer(targetTable)).append(
214: "\n");
215: buffer.append(transformSQL);
216:
217: return buffer.toString();
218:
219: }
220:
221: void buildFlatfileSQLParts(ETLTaskNode initTask,
222: ETLTaskNode cleanupTask, TargetTable tTable,
223: ETLScriptBuilderModel model) throws BaseException {
224: Collection participatingTables = tTable.getSourceTableList();
225: InternalDBMetadata tgtInternalMetadata = model
226: .getInternalMetadata(tTable);
227:
228: //For Target Table
229: FlatfileDefinition trgFFDef = ETLCodegenUtil
230: .getStcdbObjectTypeDefinition(tTable);
231: if (trgFFDef instanceof FlatfileDefinition
232: && tgtInternalMetadata != null) {
233: getFlatfileInitSQLParts(trgFFDef, tgtInternalMetadata,
234: initTask, cleanupTask, tTable, true);
235: }
236:
237: //For all SourceTables associated with Target table
238: for (Iterator it = participatingTables.iterator(); it.hasNext();) {
239: SQLDBTable dbTable = (SQLDBTable) it.next();
240: FlatfileDefinition srcRepObj = ETLCodegenUtil
241: .getStcdbObjectTypeDefinition(dbTable);
242: if (srcRepObj != null) {
243: FlatfileDefinition srcFFDef = srcRepObj;
244: InternalDBMetadata srcInternalMetadata = model
245: .getInternalMetadata(dbTable);
246: if (srcInternalMetadata != null) {
247: getFlatfileInitSQLParts(srcFFDef,
248: srcInternalMetadata, initTask, cleanupTask,
249: dbTable, false);
250: }
251: }
252: }
253: }
254:
255: ETLTaskNode buildPipelineTransformTask(
256: ETLStrategyBuilderContext context, TargetTable tt)
257: throws BaseException {
258: String displayName = msgMgr.getString("TEMPLATE_dn",
259: getTaskType(), tt.getName());
260: ETLTaskNode pipeline = context.getModel().getEngine()
261: .createETLTaskNode(getTaskType());
262: pipeline.setDisplayName(displayName);
263:
264: StatementContext sc = new StatementContext();
265: sc.setUsingUniqueTableName(true);
266: sc.putClientProperty(
267: StatementContext.USE_FULLY_QUALIFIED_TABLE,
268: Boolean.FALSE);
269:
270: truncateTargetTableIfExists(tt, pipeline,
271: ETLScriptBuilderModel.ETL_INSTANCE_DB_CONN_DEF_NAME, db
272: .getAxionPipelineStatements(), sc);
273: SQLPart insertSelectPart = createTransformStatement(tt, sc);
274:
275: insertSelectPart
276: .setConnectionPoolName(ETLScriptBuilderModel.ETL_INSTANCE_DB_CONN_DEF_NAME);
277: pipeline.addStatement(insertSelectPart);
278:
279: // Use unique table name to generate table name associated with
280: // pipeline/validation task.
281: sc.setUseTargetTableAliasName(false);
282: pipeline.setTableName(db.getUnescapedName(db
283: .getGeneratorFactory().generate(tt, sc)));
284:
285: // Insert new execution record for target table, tt.
286: super .addInsertNewExecutionRecordStatement(pipeline, tt);
287:
288: return pipeline;
289: }
290:
291: /**
292: * @param cleanupTask
293: * @param pfGen
294: * @param targetTables
295: */
296: protected void buildCleanupStatements(ETLTaskNode cleanupTask,
297: List targetTables) throws BaseException {
298: StatementContext context = new StatementContext();
299: context.setUsingFullyQualifiedTablePrefix(false);
300:
301: Iterator iter = targetTables.iterator();
302: while (iter.hasNext()) {
303: TargetTable tt = (TargetTable) iter.next();
304: if (!remoteTables.contains(tt)) { // Generate defrag only for non-remote
305: // tables
306: SQLPart defragPartStatement = generateDefragPart(tt,
307: db, tt.getStatementType());
308: if (defragPartStatement != null) {
309: defragPartStatement
310: .setConnectionPoolName(ETLScriptBuilderModel.ETL_INSTANCE_DB_CONN_DEF_NAME);
311: cleanupTask.addOptionalTask(defragPartStatement);
312: }
313: }
314: }
315: }
316:
317: protected void buildInitializationStatements(SQLDBTable table,
318: ETLTaskNode initTask, Map connDefToLinkName)
319: throws BaseException {
320: DBConnectionDefinition connDef = this .builderModel
321: .getConnectionDefinition(table);
322:
323: if (requiresRemoteAccess(table)) {
324: // Generate a unique name for the DB link, ensuring that the link name is a
325: // legal SQL identifier, then generate SQL statement(s).
326: String linkName = StringUtil.createSQLIdentifier(connDef
327: .getName());
328:
329: if (!connDefToLinkName.containsValue(linkName)) {
330: // Create an eTL-specific connection definition that references the
331: // correct driver class, among other things. This is used solely to
332: // generate organization parameters for "CREATE EXTERNALDB TABLE"
333: // statements.
334: SQLDBConnectionDefinition etlConnDef = SQLModelObjectFactory
335: .getInstance().createDBConnectionDefinition(
336: connDef);
337: etlConnDef.setDriverClass(connDef.getDriverClass());
338: SQLPart initPart = new SQLPart(
339: getCreateDBLinkSQL(etlConnDef, linkName),
340: SQLPart.STMT_INITIALIZESTATEMENTS,
341: ETLScriptBuilderModel.ETL_INSTANCE_DB_CONN_DEF_NAME);
342:
343: initTask.addOptionalTask(initPart);
344: connDefToLinkName.put(connDef, linkName);
345: }
346:
347: // Generate a unique name for each and add to init task SQL part
348: Set linkTables = (Set) linkTableMap.get(linkName);
349: if (linkTables == null) {
350: linkTables = new HashSet();
351: linkTableMap.put(linkName, linkTables);
352: }
353: linkTables.add(table);
354: remoteTables.add(table);
355:
356: StatementContext context = new StatementContext();
357: context.setUsingUniqueTableName(table, true);
358: context.setUsingFullyQualifiedTablePrefix(false);
359: String localName = db.getUnescapedName(db
360: .getGeneratorFactory().generate(table, context));
361:
362: SQLPart initPart = new SQLPart(getCreateRemoteTableSQL(
363: table, localName, linkName),
364: SQLPart.STMT_INITIALIZESTATEMENTS,
365: ETLScriptBuilderModel.ETL_INSTANCE_DB_CONN_DEF_NAME);
366: initTask.addOptionalTask(initPart);
367:
368: // FIXME: create remount statement in Axion statement Generator
369: // FIXME: Do we need remount here ?
370: SQLPart remountPart = new SQLPart("REMOUNT EXTERNAL TABLE "
371: + (StringUtil.isNullString(localName) ? table
372: .getName() : localName),
373: SQLPart.STMT_INITIALIZESTATEMENTS,
374: ETLScriptBuilderModel.ETL_INSTANCE_DB_CONN_DEF_NAME);
375: initTask.addOptionalTask(remountPart);
376: }
377: }
378:
379: protected void createTargetTableIfNotExists(TargetTable table,
380: ETLTaskNode taskNode, String trgtConnName, DB targetDB)
381: throws BaseException {
382: if (table.isCreateTargetTable()) {
383: StringBuilder sqlBuffer = new StringBuilder(200);
384:
385: Statements tStmts = targetDB.getStatements();
386: StatementContext context = new StatementContext();
387:
388: context.clearAllUsingTempTableName();
389:
390: SQLPart ifExists = tStmts.getTableExistsStatement(table,
391: context);
392: SQLPart doCreate = tStmts
393: .getCreateStatement(table, context);
394:
395: sqlBuffer.append(ifExists.getSQL()).append(
396: SQLPart.STATEMENT_SEPARATOR);
397: sqlBuffer.append(doCreate.getSQL());
398:
399: SQLPart createIfExists = new SQLPart(sqlBuffer.toString(),
400: SQLPart.STMT_CREATEBEFOREPROCESS, trgtConnName);
401: taskNode.addStatement(createIfExists);
402: }
403: }
404:
405: /**
406: * Creates SQLPart containing appropriate transformation statement for the given run
407: * mode and TargetTable instance, using the given StatementContext to assist in
408: * statement generation.
409: *
410: * @param runMode ETLProcessFlowGenerator.DESIGN_TIME if statement will be executed in
411: * design time, ETLProcessFlowGenerator.RUN_TIME if statement will be executed
412: * in runtime on an app server
413: * @param tt TargetTable whose transformation statement is to be generated
414: * @param context StatementContext to use in generating transformation statement
415: * @return SQLPart containing transformation statement
416: * @throws BaseException if error occurs during statement generation
417: */
418: protected SQLPart createTransformStatement(TargetTable tt,
419: StatementContext context) throws BaseException {
420: SQLPart insertSelectPart;
421:
422: Statements statements = this .pipelineStmts;
423:
424: int statementType = tt.getStatementType();
425: switch (statementType) {
426: case SQLConstants.UPDATE_STATEMENT:
427: insertSelectPart = statements.getUpdateStatement(tt,
428: context);
429: break;
430:
431: case SQLConstants.INSERT_UPDATE_STATEMENT:
432: // Generate merge-select statement for transformation.
433: insertSelectPart = statements
434: .getMergeStatement(tt, context);
435: break;
436:
437: case SQLConstants.DELETE_STATEMENT:
438: context.putClientProperty("useWhere", Boolean.TRUE);
439: insertSelectPart = statements.getDeleteStatement(tt,
440: context);
441: break;
442:
443: default:
444: List srcTblList = tt.getSourceTableList();
445: if (srcTblList == null || srcTblList.size() == 0) {
446: insertSelectPart = statements.getStaticInsertStatement(
447: tt, context);
448: } else {
449: // Generate insert select statement for transformation.
450: context.putClientProperty("useWhere", Boolean.TRUE);
451: insertSelectPart = statements.getInsertSelectStatement(
452: tt, context);
453: }
454:
455: break;
456: }
457:
458: context.setUseTargetTableAliasName(false);
459: insertSelectPart.setTableName(db.getUnescapedName(db
460: .getGeneratorFactory().generate(tt, context)));
461: return insertSelectPart;
462: }
463:
464: /**
465: * Generate Transformer comment line for a given target table
466: *
467: * @param targetTable
468: * @return Comment String
469: */
470: @Override
471: protected String getCommentForTransformer(TargetTable targetTable)
472: throws BaseException {
473: return MSG_MGR.getString("DISPLAY_INSERT_TARGET_PIPELINE",
474: targetTable.getParent().getModelName(),
475: this .builderModel.getConnectionDefinition(targetTable)
476: .getDBType());
477: }
478:
479: protected String getCreateDBLinkSQL(DBConnectionDefinition connDef,
480: String linkName) throws BaseException {
481: StringBuilder stmtBuf = new StringBuilder(50);
482:
483: // Generate check for link existence + drop statement if necessary
484: // Generate a "create DB link" statement using connection parameters in the
485: // connection definition
486: stmtBuf.append(stmts
487: .getCreateDBLinkStatement(connDef, linkName).getSQL());
488:
489: return stmtBuf.toString();
490: }
491:
492: /**
493: * Generates drop external statement for the given SQLDBTable if appropriate.
494: *
495: * @param table SQLDBTable for which to generate a drop external statement
496: * @param localName local name of table as used in the Axion database; may be
497: * different from the table name contained in <code>table</code>
498: * @param ifExists true if statement should include an "IF EXISTS" qualifier
499: * @param context StatementContext to use in generating statement
500: * @return SQL statement representing drop external statement for SQLDBTable.
501: * @throws BaseException if error occurs during statement generation
502: */
503: protected String getDropExternalTableSQL(SQLDBTable table,
504: String localName, boolean ifExists, StatementContext context)
505: throws BaseException {
506: return stmts.getDropExternalTableStatement(table, localName,
507: ifExists, context).getSQL();
508: }
509:
510: protected String getTaskType() {
511: return ETLEngine.PIPELINE;
512: }
513:
514: protected boolean requiresRemoteAccess(SQLDBTable table) {
515: if (this .builderModel.isConnectionDefinitionOverridesApplied()) {
516: // If table is not an Axion table, create an external remote table if it
517: // doesn't
518: // already exist. We handle Axion flatfiles in buildFlatfileSQLParts(), though
519: // we will create a log table for target tables, regardless of DB type.
520: DBConnectionDefinition connDef = table.getParent()
521: .getConnectionDefinition();
522: return (SQLUtils.getSupportedDBType(connDef.getDBType()) != DBConstants.AXION);
523: } else {
524: return true;
525: }
526: }
527:
528: private String getCreateRemoteTableSQL(SQLDBTable table,
529: String localName, String linkName) throws BaseException {
530: StringBuilder stmtBuf = new StringBuilder(50);
531: if (StringUtil.isNullString(localName)) {
532: localName = table.getName();
533: }
534:
535: // Generate a "create external table" statement that references its DB link
536: stmtBuf.append(stmts.getCreateRemoteTableStatement(table,
537: localName, linkName).getSQL());
538: return stmtBuf.toString();
539: }
540: }
|