001: /*
002: * Copyright 2006 Pentaho Corporation. All rights reserved.
003: * This software was developed by Pentaho Corporation and is provided under the terms
004: * of the Mozilla Public License, Version 1.1, or any later version. You may not use
005: * this file except in compliance with the license. If you need a copy of the license,
006: * please go to http://www.mozilla.org/MPL/MPL-1.1.txt. The Original Code is the Pentaho
007: * BI Platform. The Initial Developer is Pentaho Corporation.
008: *
009: * Software distributed under the Mozilla Public License is distributed on an "AS IS"
010: * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. Please refer to
011: * the license for the specific language governing your rights and limitations.
012: *
013: * @created Jan 2, 2006
014: * @author Matt Casters
015: */
016:
017: package org.pentaho.plugin.kettle;
018:
019: import java.io.File;
020: import java.util.ArrayList;
021: import java.util.Iterator;
022: import java.util.List;
023:
024: import org.apache.commons.logging.Log;
025: import org.apache.commons.logging.LogFactory;
026: import org.pentaho.commons.connection.memory.MemoryMetaData;
027: import org.pentaho.commons.connection.memory.MemoryResultSet;
028: import org.pentaho.core.solution.IActionResource;
029: import org.pentaho.core.system.PentahoSystem;
030: import org.pentaho.core.util.XmlHelper;
031: import org.pentaho.core.util.XmlW3CHelper;
032: import org.pentaho.di.core.exception.KettleException;
033: import org.pentaho.di.core.exception.KettleStepException;
034: import org.pentaho.di.core.exception.KettleValueException;
035: import org.pentaho.di.core.logging.Log4jStringAppender;
036: import org.pentaho.di.core.logging.LogWriter;
037: import org.pentaho.di.core.row.RowMetaInterface;
038: import org.pentaho.di.core.row.ValueMetaInterface;
039: import org.pentaho.di.job.Job;
040: import org.pentaho.di.job.JobMeta;
041: import org.pentaho.di.repository.RepositoriesMeta;
042: import org.pentaho.di.repository.Repository;
043: import org.pentaho.di.repository.RepositoryDirectory;
044: import org.pentaho.di.repository.RepositoryMeta;
045: import org.pentaho.di.repository.UserInfo;
046: import org.pentaho.di.trans.StepLoader;
047: import org.pentaho.di.trans.Trans;
048: import org.pentaho.di.trans.TransMeta;
049: import org.pentaho.di.trans.step.RowListener;
050: import org.pentaho.di.trans.step.StepMetaDataCombi;
051: import org.pentaho.messages.Messages;
052: import org.pentaho.plugin.ComponentBase;
053:
054: /**
055: * KettleComponent shows a list of available transformations in the root of the choosen repository.
056: *
057: * @author Matt
058: *
059: */
060: public class KettleComponent extends ComponentBase implements
061: RowListener {
062:
063: private static final long serialVersionUID = 8217343898202366129L;
064:
065: private static final String DIRECTORY = "directory"; //$NON-NLS-1$
066:
067: private static final String TRANSFORMATION = "transformation"; //$NON-NLS-1$
068:
069: private static final String JOB = "job"; //$NON-NLS-1$
070:
071: private static final String TRANSFORMFILE = "transformation-file"; //$NON-NLS-1$
072:
073: private static final String JOBFILE = "job-file"; //$NON-NLS-1$
074:
075: private static final String IMPORTSTEP = "importstep"; //$NON-NLS-1$
076:
077: /**
078: * The repositories.xml file location, if empty take the default $HOME/.kettle/repositories.xml
079: */
080: private String repositoriesXMLFile;
081:
082: /** The name of the repository to use */
083: private String repositoryName;
084:
085: /** The username to login with */
086: private String username;
087:
088: MemoryResultSet results;
089:
090: /** The password to login with */
091: private String password;
092:
093: private Log4jStringAppender kettleUserAppender;
094:
095: public Log getLogger() {
096: return LogFactory.getLog(KettleComponent.class);
097: }
098:
099: protected boolean validateSystemSettings() {
100: // set pentaho.solutionpath so that it can be used in file paths
101: boolean useRepository = PentahoSystem
102: .getSystemSetting(
103: "kettle/settings.xml", "repository.type", "files").equals("rdbms"); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ //$NON-NLS-4$
104: if (useRepository) {
105: repositoriesXMLFile = PentahoSystem
106: .getSystemSetting(
107: "kettle/settings.xml", "repositories.xml.file", null); //$NON-NLS-1$ //$NON-NLS-2$
108: repositoryName = PentahoSystem.getSystemSetting(
109: "kettle/settings.xml", "repository.name", null); //$NON-NLS-1$ //$NON-NLS-2$
110: username = PentahoSystem.getSystemSetting(
111: "kettle/settings.xml", "repository.userid", ""); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
112: password = PentahoSystem.getSystemSetting(
113: "kettle/settings.xml", "repository.password", ""); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
114: // Check the Kettle settings...
115: if ("".equals(repositoryName) || username.equals("")) { //$NON-NLS-1$ //$NON-NLS-2$
116: // looks like the Kettle stuff is not configured yet...
117: // see if we can provide feedback to the user...
118:
119: error(Messages
120: .getErrorString("Kettle.ERROR_0001_SERVER_SETTINGS_NOT_SET")); //$NON-NLS-1$
121: return false;
122: }
123:
124: boolean ok = (repositoryName != null && repositoryName
125: .length() > 0);
126: ok = ok || (username != null && username.length() > 0);
127:
128: return ok;
129: }
130: return true;
131: }
132:
133: public boolean init() {
134: return true;
135:
136: }
137:
138: public boolean validateAction() {
139:
140: if (isDefinedResource(TRANSFORMFILE)
141: || isDefinedResource(JOBFILE)) {
142: return true;
143: }
144:
145: boolean useRepository = PentahoSystem
146: .getSystemSetting(
147: "kettle/settings.xml", "repository.type", "files").equals("rdbms"); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ //$NON-NLS-4$
148:
149: if (!useRepository) {
150: error(Messages
151: .getErrorString("Kettle.ERROR_0019_REPOSITORY_TYPE_FILES")); //$NON-NLS-1$
152: return false;
153: }
154:
155: if (isDefinedInput(DIRECTORY)
156: && (isDefinedInput(TRANSFORMATION) || isDefinedInput(JOB))) {
157: return true;
158: }
159:
160: if (!isDefinedInput(DIRECTORY)) {
161: error(Messages
162: .getErrorString(
163: "Kettle.ERROR_0002_DIR_OR_FILE__NOT_DEFINED", getActionName())); //$NON-NLS-1$
164: return false;
165: } else {
166: if (!isDefinedInput(TRANSFORMATION)) {
167: error(Messages
168: .getErrorString(
169: "Kettle.ERROR_0003_TRANS_NOT_DEFINED", getActionName())); //$NON-NLS-1$
170: return false;
171: }
172: }
173:
174: return false;
175:
176: }
177:
178: /**
179: * Execute the specified transformation in the choosen repository.
180: */
181: public boolean executeAction() {
182:
183: if (debug)
184: debug(Messages.getString("Kettle.DEBUG_START")); //$NON-NLS-1$
185:
186: TransMeta transMeta = null;
187: JobMeta jobMeta = null;
188: LogWriter logWriter = null;
189:
190: kettleUserAppender = LogWriter.createStringAppender();
191: try {
192: logWriter = LogWriter.getInstance(
193: "Kettle-pentaho", false, LogWriter.LOG_LEVEL_BASIC); //$NON-NLS-1$
194: } catch (Throwable t) {
195:
196: }
197:
198: // this use is now considered obsolete, as we prefer the action-sequence inputs since they
199: // now maintain order
200: boolean running = true;
201: int index = 1;
202: ArrayList parameterList = new ArrayList();
203: while (running) {
204: if (isDefinedInput("parameter" + index)) { //$NON-NLS-1$
205: String value = null;
206: String inputName = getInputStringValue("parameter" + index); //$NON-NLS-1$
207: // see if we have an input with this name
208: if (isDefinedInput(inputName)) {
209: value = getInputStringValue(inputName);
210: }
211: parameterList.add(value);
212: } else {
213: running = false;
214: }
215: index++;
216: }
217:
218: // initialize environment variables
219: KettleSystemListener.environmentInit(getSession());
220:
221: // this is the preferred way to provide inputs to the KetteComponent, the order of inputs is now preserved
222: Iterator inputNamesIter = getInputNames().iterator();
223: while (inputNamesIter.hasNext()) {
224: String name = (String) inputNamesIter.next();
225: String value = null;
226: if (isDefinedInput(name)) {
227: value = getInputStringValue(name);
228: }
229: if (!parameterList.contains(value)) {
230: parameterList.add(value);
231: }
232: }
233:
234: String parameters[] = (String[]) parameterList
235: .toArray(new String[parameterList.size()]);
236:
237: String solutionPath = PentahoSystem.getApplicationContext()
238: .getFileOutputPath(""); //$NON-NLS-1$
239: solutionPath = solutionPath.replaceAll("\\\\", "\\\\\\\\"); //$NON-NLS-1$ //$NON-NLS-2$
240: solutionPath = solutionPath.replaceAll("\\$", "\\\\\\$"); //$NON-NLS-1$ //$NON-NLS-2$
241:
242: Repository repository = connectToRepository(logWriter);
243: boolean result = false;
244: logWriter.addAppender(kettleUserAppender);
245: try {
246: if (isDefinedInput(DIRECTORY)) {
247: String directoryName = getInputStringValue(DIRECTORY);
248:
249: if (repository == null) {
250: return false;
251: }
252:
253: if (isDefinedInput(TRANSFORMATION)) {
254: String transformationName = getInputStringValue(TRANSFORMATION);
255: transMeta = loadTransformFromRepository(
256: directoryName, transformationName,
257: repository, logWriter);
258: if (transMeta != null) {
259: transMeta.setArguments(parameters);
260: } else {
261: return false;
262: }
263: } else if (isDefinedInput(JOB)) {
264: String jobName = getInputStringValue(JOB);
265: jobMeta = loadJobFromRepository(directoryName,
266: jobName, repository, logWriter);
267: if (jobMeta != null) {
268: jobMeta.setArguments(parameters);
269: } else {
270: return false;
271: }
272: }
273: } else if (isDefinedResource(TRANSFORMFILE)) {
274: IActionResource transformResource = getResource(TRANSFORMFILE);
275: String fileAddress = getActualFileName(transformResource);
276:
277: try {
278: if (fileAddress != null) { // We have an actual loadable filesystem and file
279: transMeta = new TransMeta(fileAddress,
280: repository, true);
281: transMeta.setFilename(fileAddress);
282: } else {
283: String jobXmlStr = getResourceAsString(getResource(TRANSFORMFILE));
284: jobXmlStr = jobXmlStr
285: .replaceAll(
286: "\\$\\{pentaho.solutionpath\\}", solutionPath); //$NON-NLS-1$
287: jobXmlStr = jobXmlStr
288: .replaceAll(
289: "\\%\\%pentaho.solutionpath\\%\\%", solutionPath); //$NON-NLS-1$
290: org.w3c.dom.Document doc = XmlW3CHelper
291: .getDomFromString(jobXmlStr);
292: // create a tranformation from the document
293: transMeta = new TransMeta(doc.getFirstChild(),
294: repository);
295: }
296: } catch (Exception e) {
297: error(
298: Messages
299: .getErrorString(
300: "Kettle.ERROR_0015_BAD_RESOURCE", TRANSFORMFILE, fileAddress), e); //$NON-NLS-1$
301: return false;
302: }
303:
304: if (transMeta == null) {
305: error(Messages
306: .getErrorString(
307: "Kettle.ERROR_0015_BAD_RESOURCE", TRANSFORMFILE, fileAddress)); //$NON-NLS-1$
308: info(kettleUserAppender.getBuffer().toString());
309: return false;
310: } else { // Don't forget to set the parameters here as well...
311: transMeta.setArguments(parameters);
312: transMeta.setFilename(solutionPath + fileAddress);
313: }
314: } else if (isDefinedResource(JOBFILE)) {
315: String fileAddress = ""; //$NON-NLS-1$
316: try {
317: fileAddress = getResource(JOBFILE).getAddress();
318: String jobXmlStr = XmlHelper
319: .getContentFromSolutionResource(fileAddress);
320: jobXmlStr = jobXmlStr
321: .replaceAll(
322: "\\$\\{pentaho.solutionpath\\}", solutionPath); //$NON-NLS-1$
323: jobXmlStr = jobXmlStr
324: .replaceAll(
325: "\\%\\%pentaho.solutionpath\\%\\%", solutionPath); //$NON-NLS-1$
326: org.w3c.dom.Document doc = XmlW3CHelper
327: .getDomFromString(jobXmlStr);
328: if (doc == null) {
329: error(Messages
330: .getErrorString(
331: "Kettle.ERROR_0015_BAD_RESOURCE", JOBFILE, fileAddress)); //$NON-NLS-1$
332: info(kettleUserAppender.getBuffer().toString());
333: return false;
334: }
335: // create a job from the document
336: try {
337: repository = connectToRepository(logWriter);
338: // if we get a valid repository its great, if not try it without
339: jobMeta = new JobMeta(logWriter, doc
340: .getFirstChild(), repository, null);
341: jobMeta.setFilename(solutionPath + fileAddress);
342: } catch (Exception e) {
343: error(
344: Messages
345: .getString("Kettle.ERROR_0023_NO_META"), e); //$NON-NLS-1$
346: } finally {
347: if (repository != null) {
348: if (debug)
349: debug(Messages
350: .getString("Kettle.DEBUG_DISCONNECTING")); //$NON-NLS-1$
351: repository.disconnect();
352: }
353: }
354: } catch (Exception e) {
355: error(
356: Messages
357: .getErrorString(
358: "Kettle.ERROR_0015_BAD_RESOURCE", JOBFILE, fileAddress), e); //$NON-NLS-1$
359: return false;
360: }
361: if (jobMeta == null) {
362: error(Messages
363: .getErrorString(
364: "Kettle.ERROR_0015_BAD_RESOURCE", JOBFILE, fileAddress)); //$NON-NLS-1$
365: info(kettleUserAppender.getBuffer().toString());
366: return false;
367: } else {
368: jobMeta.setArguments(parameters);
369: jobMeta.setFilename(solutionPath + fileAddress);
370: }
371:
372: }
373:
374: // OK, we have the information, let's load and execute the
375: // transformation or job
376:
377: if (transMeta != null) {
378: result = executeTransformation(transMeta, logWriter);
379: }
380: if (jobMeta != null) {
381: result = executeJob(jobMeta, repository, logWriter);
382: }
383:
384: } finally {
385: logWriter.removeAppender(kettleUserAppender);
386: if (repository != null) {
387: if (debug)
388: debug(Messages
389: .getString("Kettle.DEBUG_DISCONNECTING")); //$NON-NLS-1$
390: repository.disconnect();
391: }
392: }
393: return result;
394:
395: }
396:
397: private String getActualFileName(IActionResource resource) {
398: String fileAddress = null;
399:
400: // Is it a hardcoded path?
401: if ((resource.getSourceType() == IActionResource.FILE_RESOURCE)) {
402: fileAddress = resource.getAddress();
403: }
404: // Is it a solution relative path?
405: else if (resource.getSourceType() == IActionResource.SOLUTION_FILE_RESOURCE) {
406: fileAddress = PentahoSystem.getApplicationContext()
407: .getSolutionPath(resource.getAddress());
408: }
409:
410: // Can it be loaded? this may not be true if using the DB Based repos
411: if (fileAddress != null) {
412: File file = new File(fileAddress);
413: if (!file.exists() || !file.isFile()) {
414: fileAddress = null;
415: }
416: }
417: return (fileAddress);
418: }
419:
420: private boolean executeTransformation(TransMeta transMeta,
421: LogWriter logWriter) {
422: boolean success = true;
423: Trans trans = null;
424: try {
425: if (transMeta != null) {
426: try {
427: trans = new Trans(transMeta);
428: } catch (Throwable t) {
429: error(
430: Messages
431: .getErrorString("Kettle.ERROR_0010_BAD_TRANSFORMATION_METADATA"), t); //$NON-NLS-1$
432: return false;
433: }
434:
435: }
436: if (trans == null) {
437: error(Messages
438: .getErrorString("Kettle.ERROR_0010_BAD_TRANSFORMATION_METADATA")); //$NON-NLS-1$
439: error(kettleUserAppender.getBuffer().toString());
440: return false;
441: }
442: if (trans != null) {
443: // OK, we have the transformation, now run it!
444: if (debug)
445: debug(Messages
446: .getString("Kettle.DEBUG_PREPARING_TRANSFORMATION")); //$NON-NLS-1$
447: try {
448: trans.prepareExecution(transMeta.getArguments());
449: } catch (Throwable t) {
450: error(
451: Messages
452: .getErrorString("Kettle.ERROR_0011_TRANSFORMATION_PREPARATION_FAILED"), t); //$NON-NLS-1$
453: return false;
454: }
455:
456: String stepName = null;
457: String outputName = null;
458: if (debug)
459: debug(Messages
460: .getString("Kettle.DEBUG_FINDING_STEP_IMPORTER")); //$NON-NLS-1$
461: if (isDefinedInput(IMPORTSTEP)) {
462: try {
463: // get the name of the step that we are going to listen
464: // to
465: stepName = getInputStringValue(IMPORTSTEP);
466: if (getOutputNames().size() == 1) {
467: outputName = (String) getOutputNames()
468: .iterator().next();
469: }
470: boolean foundStep = false;
471: if (stepName != null && outputName != null) {
472: List stepList = trans.getSteps();
473: // find the specified step
474: for (int stepNo = 0; stepNo < stepList
475: .size(); stepNo++) {
476: // get the next step
477: StepMetaDataCombi step = (StepMetaDataCombi) stepList
478: .get(stepNo);
479: if (step.stepname.equals(stepName)) {
480: if (debug)
481: debug(Messages
482: .getString("Kettle.DEBUG_FOUND_STEP_IMPORTER")); //$NON-NLS-1$
483: // this is the step we are looking for
484: if (debug)
485: debug(Messages
486: .getString("Kettle.DEBUG_GETTING_STEP_METADATA")); //$NON-NLS-1$
487: RowMetaInterface row = transMeta
488: .getStepFields(stepName);
489: // create the metadata that the Pentaho
490: // result set needs
491: String fieldNames[] = row
492: .getFieldNames();
493: String columns[][] = new String[1][fieldNames.length];
494: for (int column = 0; column < fieldNames.length; column++) {
495: columns[0][column] = fieldNames[column];
496: }
497: if (debug)
498: debug(Messages
499: .getString("Kettle.DEBUG_CREATING_RESULTSET_METADATA")); //$NON-NLS-1$
500:
501: MemoryMetaData metaData = new MemoryMetaData(
502: columns, null);
503: results = new MemoryResultSet(
504: metaData);
505: // add ourself as a row listener
506: step.step.addRowListener(this );
507: foundStep = true;
508: break;
509: }
510: }
511:
512: }
513: if (!foundStep) {
514: error(Messages
515: .getErrorString("Kettle.ERROR_0012_ROW_LISTENER_CREATE_FAILED")); //$NON-NLS-1$
516: }
517: } catch (Exception e) {
518: error(
519: Messages
520: .getErrorString("Kettle.ERROR_0012_ROW_LISTENER_CREATE_FAILED"), e); //$NON-NLS-1$
521: return false;
522: }
523: }
524:
525: try {
526: if (debug)
527: debug(Messages
528: .getString("Kettle.DEBUG_STARTING_TRANSFORMATION")); //$NON-NLS-1$
529: trans.startThreads();
530: } catch (Throwable t) {
531: error(
532: Messages
533: .getErrorString("Kettle.ERROR_0013_TRANSFORMATION_START_FAILED"), t); //$NON-NLS-1$
534: return false;
535: }
536:
537: try {
538: // It's running in a separate tread to allow monitoring,
539: // etc.
540: if (debug)
541: debug(Messages
542: .getString("Kettle.DEBUG_TRANSFORMATION_RUNNING")); //$NON-NLS-1$
543: trans.waitUntilFinished();
544: trans.endProcessing("end");
545: } catch (Throwable t) {
546: error(Messages
547: .getErrorString("Kettle.ERROR_0014_ERROR_DURING_EXECUTE")); //$NON-NLS-1$
548: return false;
549: }
550: // Dump the Kettle log...
551: info(kettleUserAppender.getBuffer().toString());
552: if (outputName != null && results != null) {
553: if (debug)
554: debug(Messages
555: .getString("Kettle.DEBUG_SETTING_OUTPUT")); //$NON-NLS-1$
556: setOutputValue(outputName, results);
557: }
558: }
559: } catch (Exception e) {
560: error(Messages.getErrorString(
561: "Kettle.ERROR_0008_ERROR_RUNNING", e.toString()), e); //$NON-NLS-1$
562: success = false;
563: }
564: // NOT REQUIRED FOR PDI 3
565: //finally {
566: // if (trans != null) {
567: // LocalVariables.getInstance().removeKettleVariables(Thread.currentThread().getName());
568: // }
569: // }
570: return success;
571:
572: }
573:
574: private boolean executeJob(JobMeta jobMeta, Repository repository,
575: LogWriter logWriter) {
576: boolean success = true;
577: Job job = null;
578: try {
579: if (jobMeta != null) {
580: try {
581: job = new Job(logWriter, StepLoader.getInstance(),
582: repository, jobMeta);
583: } catch (Throwable t) {
584: error(
585: Messages
586: .getErrorString("Kettle.ERROR_0021_BAD_JOB_METADATA"), t); //$NON-NLS-1$
587: return false;
588: }
589:
590: }
591: if (job == null) {
592: error(Messages
593: .getErrorString("Kettle.ERROR_0021_BAD_JOB_METADATA")); //$NON-NLS-1$
594: info(kettleUserAppender.getBuffer().toString());
595: return false;
596: }
597: if (job != null) {
598: try {
599: if (debug)
600: debug(Messages
601: .getString("Kettle.DEBUG_STARTING_JOB")); //$NON-NLS-1$
602: job.start();
603: } catch (Throwable t) {
604: error(
605: Messages
606: .getErrorString("Kettle.ERROR_002_JOB_START_FAILED"), t); //$NON-NLS-1$
607: return false;
608: }
609:
610: try {
611: // It's running in a separate tread to allow monitoring,
612: // etc.
613: if (debug)
614: debug(Messages
615: .getString("Kettle.DEBUG_JOB_RUNNING")); //$NON-NLS-1$
616: job.waitUntilFinished(5000000);
617: job.endProcessing("end", job.getResult()); //$NON-NLS-1$
618: if (job.getErrors() > 0) {
619: error(Messages
620: .getErrorString("Kettle.ERROR_0014_ERROR_DURING_EXECUTE")); //$NON-NLS-1$
621: info(kettleUserAppender.getBuffer().toString());
622: return false;
623: }
624: } catch (Throwable t) {
625: error(
626: Messages
627: .getErrorString("Kettle.ERROR_0014_ERROR_DURING_EXECUTE"), t); //$NON-NLS-1$
628: return false;
629: }
630: // Dump the Kettle log...
631: info(kettleUserAppender.getBuffer().toString());
632: }
633: } catch (Exception e) {
634: error(Messages.getErrorString(
635: "Kettle.ERROR_0008_ERROR_RUNNING", e.toString()), e); //$NON-NLS-1$
636: success = false;
637: }
638: // finally {
639: // if (job != null) {
640: // LocalVariables.getInstance().removeKettleVariables(Thread.currentThread().getName());
641: // }
642: // }
643: return success;
644:
645: }
646:
647: private TransMeta loadTransformFromRepository(String directoryName,
648: String transformationName, Repository repository,
649: LogWriter logWriter) {
650: if (debug)
651: debug(Messages.getString(
652: "Kettle.DEBUG_DIRECTORY", directoryName)); //$NON-NLS-1$
653: if (debug)
654: debug(Messages.getString(
655: "Kettle.DEBUG_TRANSFORMATION", transformationName)); //$NON-NLS-1$
656: TransMeta transMeta = null;
657: try {
658:
659: if (debug)
660: debug(Messages
661: .getString("Kettle.DEBUG_FINDING_DIRECTORY")); //$NON-NLS-1$
662:
663: // Find the directory specified.
664: RepositoryDirectory repositoryDirectory = null;
665: try {
666: repositoryDirectory = repository.getDirectoryTree()
667: .findDirectory(directoryName);
668: } catch (Throwable t) {
669: error(
670: Messages
671: .getErrorString(
672: "Kettle.ERROR_0006_DIRECTORY_NOT_FOUND", directoryName), t); //$NON-NLS-1$
673: return null;
674: }
675: if (repositoryDirectory == null) {
676: error(Messages
677: .getErrorString(
678: "Kettle.ERROR_0006_DIRECTORY_NOT_FOUND", directoryName)); //$NON-NLS-1$
679: return null;
680: }
681:
682: if (repositoryDirectory != null) {
683: if (debug)
684: debug(Messages
685: .getString("Kettle.DEBUG_GETTING_TRANSFORMATION_METADATA")); //$NON-NLS-1$
686:
687: try {
688: // Load the transformation from the repository
689: transMeta = new TransMeta(repository,
690: transformationName, repositoryDirectory);
691: } catch (Throwable t) {
692: error(
693: Messages
694: .getErrorString(
695: "Kettle.ERROR_0009_TRANSFROMATION_METADATA_NOT_FOUND", repositoryDirectory + "/" + transformationName), t); //$NON-NLS-1$ //$NON-NLS-2$
696: return null;
697: }
698: if (transMeta == null) {
699: error(Messages
700: .getErrorString(
701: "Kettle.ERROR_0009_TRANSFROMATION_METADATA_NOT_FOUND", repositoryDirectory + "/" + transformationName)); //$NON-NLS-1$ //$NON-NLS-2$
702: info(kettleUserAppender.getBuffer().toString());
703: return null;
704: } else {
705: return transMeta;
706: }
707: }
708:
709: if (debug)
710: debug(kettleUserAppender.getBuffer().toString());
711: // OK, close shop!
712:
713: } catch (Throwable e) {
714: error(Messages.getErrorString(
715: "Kettle.ERROR_0008_ERROR_RUNNING", e.toString()), e); //$NON-NLS-1$
716: }
717: return null;
718: }
719:
720: private JobMeta loadJobFromRepository(String directoryName,
721: String jobName, Repository repository, LogWriter logWriter) {
722: if (debug)
723: debug(Messages.getString(
724: "Kettle.DEBUG_DIRECTORY", directoryName)); //$NON-NLS-1$
725: if (debug)
726: debug(Messages.getString("Kettle.DEBUG_JOB", jobName)); //$NON-NLS-1$
727: JobMeta jobMeta = null;
728: try {
729:
730: if (debug)
731: debug(Messages
732: .getString("Kettle.DEBUG_FINDING_DIRECTORY")); //$NON-NLS-1$
733:
734: // Find the directory specified.
735: RepositoryDirectory repositoryDirectory = null;
736: try {
737: repositoryDirectory = repository.getDirectoryTree()
738: .findDirectory(directoryName);
739: } catch (Throwable t) {
740: error(
741: Messages
742: .getErrorString(
743: "Kettle.ERROR_0006_DIRECTORY_NOT_FOUND", directoryName), t); //$NON-NLS-1$
744: return null;
745: }
746: if (repositoryDirectory == null) {
747: error(Messages
748: .getErrorString(
749: "Kettle.ERROR_0006_DIRECTORY_NOT_FOUND", directoryName)); //$NON-NLS-1$
750: return null;
751: }
752:
753: if (repositoryDirectory != null) {
754: if (debug)
755: debug(Messages
756: .getString("Kettle.DEBUG_GETTING_JOB_METADATA")); //$NON-NLS-1$
757:
758: try {
759: // Load the transformation from the repository
760: jobMeta = new JobMeta(logWriter, repository,
761: jobName, repositoryDirectory);
762: } catch (Throwable t) {
763: error(
764: Messages
765: .getErrorString(
766: "Kettle.ERROR_0020_JOB_METADATA_NOT_FOUND", repositoryDirectory + "/" + jobName), t); //$NON-NLS-1$ //$NON-NLS-2$
767: return null;
768: }
769: if (jobMeta == null) {
770: error(Messages
771: .getErrorString(
772: "Kettle.ERROR_0020_JOB_METADATA_NOT_FOUND", repositoryDirectory + "/" + jobName)); //$NON-NLS-1$ //$NON-NLS-2$
773: info(kettleUserAppender.getBuffer().toString());
774: return null;
775: } else {
776: return jobMeta;
777: }
778: }
779:
780: if (debug)
781: debug(kettleUserAppender.getBuffer().toString());
782: // OK, close shop!
783:
784: } catch (Throwable e) {
785: error(Messages.getErrorString(
786: "Kettle.ERROR_0008_ERROR_RUNNING", e.toString()), e); //$NON-NLS-1$
787: }
788: return null;
789: }
790:
791: private Repository connectToRepository(LogWriter logWriter) {
792: boolean useRepository = PentahoSystem
793: .getSystemSetting(
794: "kettle/settings.xml", "repository.type", "files").equals("rdbms"); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ //$NON-NLS-4$
795:
796: if (!useRepository) {
797: return null;
798: }
799:
800: try {
801: if (debug)
802: debug(Messages
803: .getString("Kettle.DEBUG_META_REPOSITORY")); //$NON-NLS-1$
804:
805: RepositoriesMeta repositoriesMeta = null;
806: try {
807: repositoriesMeta = new RepositoriesMeta(logWriter);
808: } catch (Throwable t) {
809: error(
810: Messages
811: .getErrorString("Kettle.ERROR_0007_BAD_META_REPOSITORY"), t); //$NON-NLS-1$
812: return null;
813: }
814: if (repositoriesMeta == null) {
815: error(Messages
816: .getErrorString("Kettle.ERROR_0007_BAD_META_REPOSITORY")); //$NON-NLS-1$
817: info(kettleUserAppender.getBuffer().toString());
818: return null;
819: }
820:
821: if (debug)
822: debug(Messages
823: .getString("Kettle.DEBUG_POPULATING_META")); //$NON-NLS-1$
824: try {
825: // TODO: add support for specified repositories.xml files...
826: repositoriesMeta.readData(); // Read from the default $HOME/.kettle/repositories.xml file.
827: } catch (Throwable t) {
828: error(
829: Messages
830: .getErrorString("Kettle.ERROR_0018_META_REPOSITORY_NOT_POPULATED"), t); //$NON-NLS-1$
831: return null;
832: }
833: if (repositoriesXMLFile != null
834: && !"".equals(repositoriesXMLFile)) //$NON-NLS-1$
835: {
836: error(Messages
837: .getErrorString("Kettle.ERROR_0017_XML_REPOSITORY_NOT_SUPPORTED")); //$NON-NLS-1$
838: info(kettleUserAppender.getBuffer().toString());
839: return null;
840: }
841:
842: if (debug)
843: debug(Messages
844: .getString("Kettle.DEBUG_FINDING_REPOSITORY")); //$NON-NLS-1$
845: // Find the specified repository.
846: RepositoryMeta repositoryMeta = null;
847: try {
848: repositoryMeta = repositoriesMeta
849: .findRepository(repositoryName);
850: } catch (Throwable t) {
851: error(
852: Messages
853: .getErrorString(
854: "Kettle.ERROR_0004_REPOSITORY_NOT_FOUND", repositoryName), t); //$NON-NLS-1$
855: return null;
856: }
857:
858: if (repositoryMeta == null) {
859: error(Messages
860: .getErrorString(
861: "Kettle.ERROR_0004_REPOSITORY_NOT_FOUND", repositoryName)); //$NON-NLS-1$
862: info(kettleUserAppender.getBuffer().toString());
863: return null;
864: }
865:
866: if (debug)
867: debug(Messages
868: .getString("Kettle.DEBUG_GETTING_REPOSITORY")); //$NON-NLS-1$
869: Repository repository = null;
870: UserInfo userInfo = null;
871: try {
872: repository = new Repository(logWriter, repositoryMeta,
873: userInfo);
874: } catch (Throwable t) {
875: error(
876: Messages
877: .getErrorString("Kettle.ERROR_0016_COULD_NOT_GET_REPOSITORY_INSTANCE"), t); //$NON-NLS-1$
878: return null;
879: }
880:
881: // OK, now try the username and password
882: if (debug)
883: debug(Messages.getString("Kettle.DEBUG_CONNECTING")); //$NON-NLS-1$
884: if (repository.connect(getClass().getName())) {
885: try {
886: userInfo = new UserInfo(repository, username,
887: password);
888: } catch (KettleException e) {
889: userInfo = null;
890: } finally {
891: }
892: } else {
893: error(Messages
894: .getErrorString("Kettle.ERROR_0005_LOGIN_FAILED")); //$NON-NLS-1$
895: info(kettleUserAppender.getBuffer().toString());
896: return null;
897: }
898:
899: // OK, the repository is open and ready to use.
900: if (debug)
901: debug(Messages
902: .getString("Kettle.DEBUG_FINDING_DIRECTORY")); //$NON-NLS-1$
903:
904: return repository;
905:
906: } catch (Throwable e) {
907: error(Messages.getErrorString(
908: "Kettle.ERROR_0008_ERROR_RUNNING", e.toString()), e); //$NON-NLS-1$
909: }
910: return null;
911: }
912:
913: public void done() {
914:
915: }
916:
917: public void rowReadEvent(RowMetaInterface row, Object[] values) {
918: }
919:
920: public void rowWrittenEvent(RowMetaInterface rowMeta, Object[] row)
921: throws KettleStepException {
922:
923: if (results == null) {
924: return;
925: }
926: try {
927: Object pentahoRow[] = new Object[results.getColumnCount()];
928: for (int columnNo = 0; columnNo < results.getColumnCount(); columnNo++) {
929: ValueMetaInterface valueMeta = rowMeta
930: .getValueMeta(columnNo);
931:
932: switch (valueMeta.getType()) {
933: case ValueMetaInterface.TYPE_BIGNUMBER:
934: pentahoRow[columnNo] = rowMeta.getBigNumber(row,
935: columnNo);
936: break;
937: case ValueMetaInterface.TYPE_BOOLEAN:
938: pentahoRow[columnNo] = rowMeta.getBoolean(row,
939: columnNo);
940: break;
941: case ValueMetaInterface.TYPE_DATE:
942: pentahoRow[columnNo] = rowMeta.getDate(row,
943: columnNo);
944: break;
945: case ValueMetaInterface.TYPE_INTEGER:
946: pentahoRow[columnNo] = rowMeta.getInteger(row,
947: columnNo);
948: break;
949: case ValueMetaInterface.TYPE_NONE:
950: pentahoRow[columnNo] = rowMeta.getString(row,
951: columnNo);
952: break;
953: case ValueMetaInterface.TYPE_NUMBER:
954: pentahoRow[columnNo] = rowMeta.getNumber(row,
955: columnNo);
956: break;
957: case ValueMetaInterface.TYPE_STRING:
958: pentahoRow[columnNo] = rowMeta.getString(row,
959: columnNo);
960: break;
961: default:
962: pentahoRow[columnNo] = rowMeta.getString(row,
963: columnNo);
964: }
965: }
966: results.addRow(pentahoRow);
967: } catch (KettleValueException e) {
968: throw new KettleStepException(e);
969: }
970: }
971:
972: public void errorRowWrittenEvent(RowMetaInterface arg0,
973: Object[] arg1) {
974:
975: }
976:
977: }
|