001: /*
002: * The contents of this file are subject to the terms of the Common Development
003: * and Distribution License (the License). You may not use this file except in
004: * compliance with the License.
005: *
006: * You can obtain a copy of the License at http://www.netbeans.org/cddl.html
007: * or http://www.netbeans.org/cddl.txt.
008: *
009: * When distributing Covered Code, include this CDDL Header Notice in each file
010: * and include the License file at http://www.netbeans.org/cddl.txt.
011: * If applicable, add the following below the CDDL Header, with the fields
012: * enclosed by brackets [] replaced by your own identifying information:
013: * "Portions Copyrighted [year] [name of copyright owner]"
014: *
015: * The Original Software is NetBeans. The Initial Developer of the Original
016: * Software is Sun Microsystems, Inc. Portions Copyright 1997-2007 Sun
017: * Microsystems, Inc. All Rights Reserved.
018: */
019: package org.netbeans.modules.etl.codegen.impl;
020:
021: import java.util.ArrayList;
022: import java.util.Iterator;
023: import java.util.List;
024: import java.util.Map;
025:
026: import org.netbeans.modules.etl.codegen.ETLProcessFlowGeneratorFactory;
027: import org.netbeans.modules.etl.codegen.ETLStrategyBuilder;
028: import org.netbeans.modules.etl.codegen.ETLStrategyBuilderContext;
029: import org.netbeans.modules.etl.utils.MessageManager;
030: import org.netbeans.modules.sql.framework.common.jdbc.SQLUtils;
031: import org.netbeans.modules.sql.framework.model.SQLDBTable;
032: import org.netbeans.modules.sql.framework.model.SQLDefinition;
033: import org.netbeans.modules.sql.framework.model.TargetTable;
034:
035: import com.sun.etl.engine.ETLEngine;
036: import com.sun.etl.engine.ETLTask;
037: import com.sun.etl.engine.ETLTaskNode;
038: import com.sun.sql.framework.exception.BaseException;
039: import com.sun.sql.framework.jdbc.DBConstants;
040: import net.java.hulp.i18n.Logger;
041: import com.sun.sql.framework.utils.StringUtil;
042: import org.netbeans.modules.etl.logger.Localizer;
043: import org.netbeans.modules.etl.logger.LogUtil;
044: import org.netbeans.modules.sql.framework.model.DBConnectionDefinition;
045:
046: /**
047: * Builds ETL Process Flow and delegates to appropriate ETLStrategyBuilder implementation
048: * as required
049: *
050: * @author Ahimanikya Satapathy
051: * @author Jonathan Giron
052: * @version $Revision$
053: */
054: public class PipelinedFlowGenerator extends BaseFlowGenerator {
055:
056: private static final String LOG_CATEGORY = PipelinedFlowGenerator.class
057: .getName();
058: private static final MessageManager MSG_MGR = MessageManager
059: .getManager(ETLTaskNode.class);
060: protected PipelinedStrategyBuilderImpl pipelinedBuilder;
061: private static transient final Logger mLogger = LogUtil
062: .getLogger(PipelinedFlowGenerator.class.getName());
063: private static transient final Localizer mLoc = Localizer.get();
064:
065: public PipelinedFlowGenerator(SQLDefinition sqlD)
066: throws BaseException {
067: super (sqlD);
068: this .builderModel.setUseInstanceDB(true);
069: }
070:
071: @Override
072: public void applyConnectionDefinitions() throws BaseException {
073: super .applyConnectionDefinitions();
074: }
075:
076: @Override
077: public void applyConnectionDefinitions(Map name2connectionDefMap,
078: Map connDefNameMap, Map intDbConfigParams)
079: throws BaseException {
080: this .builderModel.setUseInstanceDB(true);
081: this .builderModel.setShutdownMonitorDB(true);
082: super .applyConnectionDefinitions(name2connectionDefMap,
083: connDefNameMap, intDbConfigParams);
084: }
085:
086: public ETLEngine getScript() throws BaseException {
087: mLogger.infoNoloc(mLoc.t("PRSR006: In getScript(){0}",
088: LOG_CATEGORY));
089: generateScript();
090: return builderModel.getEngine();
091: }
092:
093: protected void createWarapperTask() throws BaseException {
094: final MessageManager dnLabelMgr = MessageManager
095: .getManager(ETLTaskNode.class);
096:
097: if (pipelinedBuilder != null) {
098: List targetTables = this .builderModel.getSqlDefinition()
099: .getTargetTables();
100: this .initTask = pipelinedBuilder
101: .buildInitTask(targetTables);
102: this .initTask.setDisplayName(dnLabelMgr
103: .getString("LBL_dn_init"));
104: this .globalCleanupTask = pipelinedBuilder
105: .buildCleanupTask(targetTables);
106: this .globalCleanupTask.setDisplayName(dnLabelMgr
107: .getString("LBL_dn_cleanup"));
108: }
109:
110: this .threadCollectorWaitNode = this .builderModel.getEngine()
111: .createETLTaskNode(ETLEngine.WAIT);
112:
113: this .statsUpdateTask = this .builderModel.getEngine()
114: .createETLTaskNode(ETLEngine.UPDATE_STATS);
115: this .statsUpdateTask.setDisplayName(MSG_MGR
116: .getString("LBL_dn_updatestats"));
117: }
118:
119: protected void generateScript() throws BaseException {
120: pipelinedBuilder = getBuilder();
121: List dependencies = new ArrayList();
122: createWarapperTask();
123:
124: // get target table List
125: List targetTables = builderModel.getSqlDefinition()
126: .getTargetTables();
127: if (targetTables == null || targetTables.size() == 0) {
128: throw new BaseException(
129: "Invalid eTL Collaboration: No target table defined.");
130: }
131:
132: ETLStrategyBuilderContext context = new ETLStrategyBuilderContext(
133: initTask, globalCleanupTask, this .statsUpdateTask,
134: this .builderModel);
135:
136: ETLTaskNode pipelineTask = null;
137: // Iterate through the target tables to generate pipeline tasks.
138: Iterator it = targetTables.iterator();
139: while (it.hasNext()) {
140: mLogger.infoNoloc(mLoc.t(
141: "PRSR007: Looping through target tables:{0}",
142: LOG_CATEGORY));
143: TargetTable tt = (TargetTable) it.next();
144:
145: context.setTargetTable(tt);
146: context.setPredecessorTask(initTask);
147: context.setNextTaskOnSucess(threadCollectorWaitNode);
148: context.setNextTaskOnException(statsUpdateTask);
149:
150: pipelinedBuilder.generateScriptForTable(context);
151: pipelineTask = context.getLastPipelinedTask();
152: dependencies.add(pipelineTask.getId());
153: } // end transformer Loop
154:
155: // Create commit node to collect transformer connections and
156: // commit/close them.
157: ETLTaskNode commitTask = this .builderModel.getEngine()
158: .createETLTaskNode(ETLEngine.COMMIT);
159: commitTask.setDisplayName(MSG_MGR.getString("LBL_dn_commit"));
160:
161: // set dependent list for wait node
162: this .threadCollectorWaitNode.setDependsOn(StringUtil
163: .createDelimitedStringFrom(dependencies));
164:
165: // Complete task net by linking nodes.
166: this .startTask.addNextETLTaskNode(ETLTask.SUCCESS,
167: this .initTask.getId());
168: this .initTask.addNextETLTaskNode(ETLTask.EXCEPTION,
169: this .globalCleanupTask.getId());
170:
171: //
172: // Commit data first, then update statistics.
173: //
174: this .threadCollectorWaitNode.addNextETLTaskNode(
175: ETLTask.SUCCESS, commitTask.getId());
176:
177: commitTask.addNextETLTaskNode(ETLTask.SUCCESS,
178: this .statsUpdateTask.getId());
179: commitTask.addNextETLTaskNode(ETLTask.EXCEPTION,
180: this .statsUpdateTask.getId());
181:
182: statsUpdateTask.addNextETLTaskNode(ETLTask.SUCCESS,
183: globalCleanupTask.getId());
184: statsUpdateTask.addNextETLTaskNode(ETLTask.EXCEPTION,
185: globalCleanupTask.getId());
186:
187: globalCleanupTask.addNextETLTaskNode(ETLTask.SUCCESS, endTask
188: .getId());
189: globalCleanupTask.addNextETLTaskNode(ETLTask.EXCEPTION, endTask
190: .getId());
191:
192: // this to generate the engine xml and save for debugging use
193: this .builderModel.getEngine().toXMLString();
194: }
195:
196: protected ETLStrategyBuilder getTargetTableScriptBuilder()
197: throws BaseException {
198: return ETLProcessFlowGeneratorFactory
199: .getPipelinedTargetTableScriptBuilder(builderModel);
200: }
201:
202: /**
203: * Indicates whether the given table must be accessed by the pipeline database via a
204: * dblink/remote table combination.
205: *
206: * @param table SQLDBTable instance to test
207: * @return true if <code>table</code> should be accessed via remote table, false
208: * otherwise
209: */
210: protected boolean requiresRemoteAccess(SQLDBTable table) {
211: // If table is not an Axion table, create an external remote table if it doesn't
212: // already exist. We handle Axion flatfiles in buildFlatfileSQLParts(), though
213: // we will create a log table for target tables, regardless of DB type.
214: boolean ret = true;
215:
216: if (this .builderModel.isConnectionDefinitionOverridesApplied()) {
217: DBConnectionDefinition connDef = table.getParent()
218: .getConnectionDefinition();
219: ret = SQLUtils.getSupportedDBType(connDef.getDBType()) != DBConstants.AXION;
220: }
221:
222: return ret;
223:
224: }
225:
226: private PipelinedStrategyBuilderImpl getBuilder()
227: throws BaseException {
228: if (this.builderModel.getSqlDefinition()
229: .hasValidationConditions()) {
230: return ETLProcessFlowGeneratorFactory
231: .getValidatingTargetTableScriptBuilder(builderModel);
232: } else {
233: return ETLProcessFlowGeneratorFactory
234: .getPipelinedTargetTableScriptBuilder(builderModel);
235: }
236: }
237: }
|