001: /*
002: * This file or a portion of this file is licensed under the terms of
003: * the Globus Toolkit Public License, found in file GTPL, or at
004: * http://www.globus.org/toolkit/download/license.html. This notice must
005: * appear in redistributions of this file, with or without modification.
006: *
007: * Redistributions of this Software, with or without modification, must
008: * reproduce the GTPL in: (1) the Software, or (2) the Documentation or
009: * some other similar material which is provided with the Software (if
010: * any).
011: *
012: * Copyright 1999-2004 University of Chicago and The University of
013: * Southern California. All rights reserved.
014: */
015:
016: package org.griphyn.cPlanner.code.generator;
017:
018: import org.griphyn.cPlanner.classes.ADag;
019: import org.griphyn.cPlanner.classes.GRMSJob;
020: import org.griphyn.cPlanner.classes.NameValue;
021: import org.griphyn.cPlanner.classes.SiteInfo;
022: import org.griphyn.cPlanner.classes.SubInfo;
023: import org.griphyn.cPlanner.classes.PlannerOptions;
024:
025: import org.griphyn.cPlanner.code.CodeGenerator;
026: import org.griphyn.cPlanner.code.CodeGeneratorException;
027:
028: import org.griphyn.cPlanner.common.LogManager;
029: import org.griphyn.cPlanner.common.PegasusProperties;
030:
031: import org.griphyn.cPlanner.namespace.ENV;
032:
033: import org.griphyn.cPlanner.poolinfo.PoolInfoProvider;
034: import org.griphyn.cPlanner.poolinfo.PoolMode;
035:
036: import java.io.File;
037: import java.io.FileWriter;
038: import java.io.PrintWriter;
039:
040: import java.util.Iterator;
041: import java.util.Set;
042: import java.util.StringTokenizer;
043: import java.util.Vector;
044: import org.griphyn.cPlanner.classes.PegasusBag;
045:
046: /**
047: * This generates the submit files in the xml format that can be used to submit
048: * the workflow to a GRMS server.
049: *
050: * @author Karan Vahi
051: * @version $Revision: 410 $
052: */
053: public class GRMS extends Abstract {
054:
055: /**
056: * The "official" namespace URI of the GRMS workflow schema.
057: */
058: public static final String SCHEMA_NAMESPACE = "";
059:
060: /**
061: * The "not-so-official" location URL of the GRMS workflow schema definition.
062: */
063: public static final String SCHEMA_LOCATION = "";
064:
065: /**
066: * The workflow schema to which this writer conforms.
067: */
068: public static final String SCHEMA = "grms-workflow-schema_10.xsd";
069:
070: /**
071: * The prefix that needs to be added to the stdout to make GRMS aware of
072: * a kickstart output.
073: */
074: public static final String STDOUT_PREFIX = "kickstart__exitcode__of__";
075:
076: /**
077: * The version to report.
078: */
079: public static final String SCHEMA_VERSION = "10";
080:
081: /**
082: * The LogManager object which is used to log all the messages.
083: */
084: private LogManager mLogger;
085:
086: /**
087: * The handle to the output file that is being written to.
088: */
089: private PrintWriter mWriteHandle;
090:
091: /**
092: * The handle to the properties file.
093: */
094: private PegasusProperties mProps;
095:
096: /**
097: * Handle to the pool provider.
098: */
099: private PoolInfoProvider mPoolHandle;
100:
101: /**
102: * The default constructor.
103: */
104: public GRMS() {
105: super ();
106: }
107:
108: /**
109: * Initializes the Code Generator implementation.
110: *
111: * @param bag the bag of initialization objects.
112: *
113: * @throws CodeGeneratorException in case of any error occuring code generation.
114: */
115: public void initialize(PegasusBag bag)
116: throws CodeGeneratorException {
117: super .initialize(bag);
118: mLogger = bag.getLogger();
119:
120: //get the handle to pool file
121: mPoolHandle = bag.getHandleToSiteCatalog();
122:
123: }
124:
125: /**
126: * Generates the code for the concrete workflow in the GRMS input format.
127: * The GRMS input format is xml based. One XML file is generated per
128: * workflow.
129: *
130: * @param dag the concrete workflow.
131: *
132: * @throws CodeGeneratorException in case of any error occuring code generation.
133: */
134: public void generateCode(ADag dag) throws CodeGeneratorException {
135: String opFileName = this .mSubmitFileDir + File.separator
136: + dag.dagInfo.nameOfADag + ".xml";
137: initializeWriteHandle(opFileName);
138: SubInfo job = null;
139:
140: writeString("\n<grmsjob "
141: + " xmlns:xsi = \"http://www.w3.org/2001/XMLSchema-instance\" "
142: + " xsi:noNamespaceSchemaLocation=\"" + SCHEMA + "\""
143: + " appid=\"Pegasus\">");
144: for (Iterator it = dag.vJobSubInfos.iterator(); it.hasNext();) {
145: job = (SubInfo) it.next();
146: writeString(jobToXML(dag, job));
147: }
148: writeString("\n</grmsjob>");
149: mWriteHandle.close();
150: }
151:
152: /**
153: * Generates the code for a single job in the input format of the workflow
154: * executor being used.
155: *
156: * @param dag the dag of which the job is a part of.
157: * @param job the <code>SubInfo</code> object holding the information about
158: * that particular job.
159: *
160: * @throws CodeGeneratorException in case of any error occuring code generation.
161: */
162: public void generateCode(ADag dag, SubInfo job)
163: throws CodeGeneratorException {
164: throw new CodeGeneratorException(
165: new UnsupportedOperationException(
166: "Method generateCode( ADag, SubInfo) not yet implemented."));
167: }
168:
169: /**
170: * It returns the corresponding xml description for a particular job.
171: *
172: * @param dag the dag of which the job is a part of.
173: * @param job object containing job info.
174: *
175: * @return the string containing the xml description.
176: */
177: protected String jobToXML(ADag dag, SubInfo job) {
178: StringBuffer sb = new StringBuffer();
179: boolean gridstart = true;
180:
181: sb.append("\n");
182: sb.append("\t<task taskid = \"").append(job.jobName).append(
183: "\"").append(" persistent=\"").append("true").append(
184: "\">");
185:
186: //put the resource on which to run the job
187: sb.append("\n\t\t<resource>");
188: sb.append("\n\t\t\t<hostname>").append(job.globusScheduler)
189: .append("</hostname>");
190: sb.append("\n\t\t</resource>");
191:
192: //at present only one process is launched by the jobmanager
193: //while launching the job at GRMS site. No MPI
194: if (gridstart) {
195: //launch the application at the GRMS end through gridstart.
196: //i.e gridstart is the launching application
197: sb.append(gridstart(job));
198: } else {
199: sb.append(executableToXML(job.executable, job.strargs,
200: null, null, null));
201: }
202:
203: //set the environment variables
204: //hmm they have to appear in the executables tag
205: //sb.append(envToXML(job.envVariables));
206:
207: //write in the relations
208: sb.append(relationsToXML(dag, job));
209:
210: sb.append("\n\t</task>");
211: return sb.toString();
212: }
213:
214: /**
215: * This launches a particular job through gridstart, and accordingly
216: * changes the stdio , stdin and stderr handling for the launched job.
217: * The stdout and stderr of gridstart is propagated back to the submit host
218: * to the directory where the output description was generated in tune
219: * with how we do things with condor. Assumption is that either the submit
220: * file dir is on shared file system, or on the file system accessible to the
221: * gridftp server specified on the local pool.
222: *
223: * @param job the job description.
224: *
225: * @return the xml description containing the executable that is to be
226: * launched and it's arguments.
227: */
228: private String gridstart(SubInfo job) {
229: SiteInfo site = mPoolHandle.getPoolEntry(job.executionPool,
230: "vanilla");
231: SiteInfo submitSite = mPoolHandle.getPoolEntry("local",
232: "vanilla");
233: String gridStartPath = site.getKickstartPath();
234: boolean isGlobusJob = true;
235:
236: //
237: // with gridstart section
238: //
239: StringBuffer gridStartArgs = new StringBuffer();
240:
241: //the executable is gridstart, the application becomes its argument
242: //writer.println("executable = " + gridStartPath);
243: gridStartArgs.append("-n ");
244:
245: gridStartArgs.append(job.getCompleteTCName());
246: gridStartArgs.append(' ');
247: gridStartArgs.append("-N ");
248: gridStartArgs.append(job.getCompleteDVName());
249: gridStartArgs.append(' ');
250:
251: // HANDLING stdin for the moment
252:
253: if (job.stdIn.length() > 0) {
254:
255: //for using the transfer script the
256: //input file is transferred from the
257: //submit host by Condor to stdin.
258: //We fool the kickstart to pick up
259: //the input file from standard stdin
260: //by giving the input file name as -
261: if (job.logicalName
262: .equals(org.griphyn.cPlanner.transfer.implementation.Transfer.TRANSFORMATION_NAME)
263: || job.logicalName
264: .equals(org.griphyn.cPlanner.transfer.implementation.T2.TRANSFORMATION_NAME)
265: || job.logicalName
266: .equals(org.griphyn.cPlanner.cluster.aggregator.SeqExec.COLLAPSE_LOGICAL_NAME)
267: || job.logicalName
268: .equals(org.griphyn.cPlanner.cluster.aggregator.MPIExec.COLLAPSE_LOGICAL_NAME)) {
269:
270: // handle stdin
271: if (job.stdIn.length() > 0) {
272: // the output of gridstart is propagated back to the submit host
273: // to the submit file dir at the submit host
274: String stdIn = submitSite.getURLPrefix(false)
275: + File.separatorChar + mSubmitFileDir
276: + File.separator + job.jobName + ".in";
277: job.stdIn = stdIn;
278:
279: //writer.println("input = " + job.stdIn);
280: gridStartArgs.append("-i ").append("-").append(' ');
281: } else {
282: //error in Pegasus Code
283: mLogger
284: .log(
285: "Input file not generated for transfer job",
286: LogManager.ERROR_MESSAGE_LEVEL);
287: }
288:
289: } else {
290: // gridstart provides the app's *tracked* stdin
291: gridStartArgs.append("-i ").append(job.stdIn).append(
292: ' ');
293: }
294: }
295:
296: // handle stdout
297: if (job.stdOut.length() > 0) {
298: // gridstart saves the app's *tracked* stdout
299: gridStartArgs.append("-o ").append(job.stdOut).append(' ');
300: }
301: // the GRMS output variable and kickstart -o option
302: // must not point to the same file for any local job.
303: if (job.stdOut.equals(job.jobName + ".xml") && !isGlobusJob) {
304: System.err
305: .println("WARNING! Detected WAW conflict for stdout");
306: }
307: // the output of gridstart is propagated back to the submit host
308: // to the submit file dir at the submit host
309: String stdout = submitSite.getURLPrefix(false)
310: + File.separatorChar + mSubmitFileDir + File.separator
311: + STDOUT_PREFIX + job.jobName + ".xml";
312: job.stdOut = stdout;
313:
314: //handle the stderr
315: if (job.stdErr.length() > 0) {
316: // gridstart saves the app's *tracked* stderr
317: gridStartArgs.append("-e ").append(job.stdErr).append(' ');
318: }
319: // the GRMS error variable and kickstart -e option
320: // must not point to the same file for any local job.
321: if (job.stdErr.equals(job.jobName + ".err") && !isGlobusJob) {
322: System.err
323: .println("WARNING! Detected WAW conflict for stderr");
324: }
325:
326: // the error of gridstart is propagated back to the submit host
327: // to the submit file dir at the submit host
328: String stderr = submitSite.getURLPrefix(false)
329: + File.separatorChar + mSubmitFileDir + File.separator
330: + job.jobName + ".err";
331: job.stdErr = stderr;
332:
333: //GRMS invokes the job in it's own directory
334: //make kickstart change to a directory that can be
335: //tracked through the VDS by pass -w to kickstart
336: gridStartArgs.append("-w ").append(
337: mPoolHandle.getExecPoolWorkDir(job)).append(' ');
338:
339: gridStartArgs.append(job.executable).append(' ').append(
340: job.strargs);
341:
342: //now the application to be launced is now kickstart instead of the
343: //original application
344: job.executable = gridStartPath;
345: job.strargs = gridStartArgs.toString();
346: return executableToXML(job);
347:
348: }
349:
350: /**
351: * This method returns the xml description for the executable that is to be
352: * executed, that includes the arguments with which it is to be invoked,
353: * the path to the executable and location of it's stdout , stdin and stderr.
354: *
355: * @param job the GRMS job.
356: *
357: * @return String
358: */
359: protected String executableToXML(SubInfo job) {
360: StringBuffer sb = new StringBuffer();
361:
362: sb.append("\n\t\t<executable type=\"single\" count=\"1\">");
363: sb.append("\n\t\t\t<execfile name=\"kickstart\" >").append(
364: "\n\t\t\t\t<url>").append("file:///").append(
365: job.executable).append("</url>").append(
366: "\n\t\t\t</execfile>");
367:
368: //copy the arguments
369: sb.append(argumentsToXML(job));
370:
371: if (job.stdIn != null && job.stdIn.length() > 0)
372: sb.append(stdInToXML(job.stdIn));
373: if (job.stdOut != null && job.stdOut.length() > 0)
374: sb.append(stdOutToXML(job.stdOut));
375: if (job.stdErr != null && job.stdErr.length() > 0)
376: sb.append(stdErrToXML(job.stdErr));
377:
378: //set the environment variables
379: sb.append(envToXML(job.envVariables));
380:
381: sb.append("\n\t\t</executable>");
382:
383: return sb.toString();
384: }
385:
386: /**
387: * This method returns the xml description for the executable that is to be
388: * executed, that includes the arguments with which it is to be invoked,
389: * the path to the executable and location of it's stdout , stdin and stderr.
390: *
391: * @param path the path to the executable.
392: * @param args the arguments to the executable.
393: * @param stdin the url for the stdin of the job.
394: * @param stdout the url for the stdin of the job.
395: * @param stderr the url for the stdin of the job.
396: *
397: * @return String
398: */
399: protected String executableToXML(String path, String args,
400: String stdin, String stdout, String stderr) {
401: StringBuffer sb = new StringBuffer();
402:
403: sb.append("\n\t\t<executable type=\"single\" count=\"1\">");
404: sb
405: .append(
406: "\n\t\t\t<file name=\"exec-file\" type=\"executable\">")
407: .append("\n\t\t\t\t<url>").append("file:///").append(
408: path).append("</url>")
409: .append("\n\t\t\t</file>");
410:
411: //copy the arguments
412: sb.append(argumentsToXML(null));
413:
414: if (stdin != null && stdin.length() > 0)
415: sb.append(stdInToXML(stdin));
416: if (stdout != null && stdout.length() > 0)
417: sb.append(stdOutToXML(stdout));
418: if (stderr != null && stderr.length() > 0)
419: sb.append(stdErrToXML(stderr));
420:
421: sb.append("\n\t\t</executable>");
422:
423: return sb.toString();
424: }
425:
426: /**
427: * This method returns the xml description for specifying the stdout.
428: *
429: * @param url the url to the stdout file.
430: *
431: * @return the xml description.
432: */
433: protected String stdInToXML(String url) {
434: StringBuffer sb = new StringBuffer();
435: sb.append("\n\t\t\t").append("<stdin>");
436: sb.append("\n\t\t\t\t").append("<url>").append(url).append(
437: "</url>");
438: sb.append("\n\t\t\t").append("</stdin>");
439: return sb.toString();
440: }
441:
442: /**
443: * This method returns the xml description for specifying the stdout.
444: *
445: * @param url the url to the stdout file.
446: *
447: * @return the xml description.
448: */
449: protected String stdOutToXML(String url) {
450: StringBuffer sb = new StringBuffer();
451: sb.append("\n\t\t\t").append("<stdout>");
452: sb.append("\n\t\t\t\t").append("<url>").append(url).append(
453: "</url>");
454: sb.append("\n\t\t\t").append("</stdout>");
455: return sb.toString();
456: }
457:
458: /**
459: * This method returins the xml description for specifying the stdout.
460: *
461: * @param url the url to the stdout file.
462: *
463: * @return the xml description.
464: */
465: protected String stdErrToXML(String url) {
466: StringBuffer sb = new StringBuffer();
467: sb.append("\n\t\t\t").append("<stderr>");
468: sb.append("\n\t\t\t\t").append("<url>").append(url).append(
469: "</url>");
470: sb.append("\n\t\t\t").append("</stderr>");
471: return sb.toString();
472: }
473:
474: /**
475: * This method returns the xml description of the arguments that are passed
476: * to the transformation that is being invoked.
477: *
478: * @param job the job description.
479: *
480: * @return the xml description of the arguments.
481: */
482: protected String argumentsToXML(SubInfo job) {
483: StringBuffer sb = new StringBuffer();
484: StringTokenizer st = new StringTokenizer(job.strargs);
485:
486: //typecast it to a GRMS job
487: Iterator it;
488:
489: sb.append("\n\t\t\t<arguments>");
490: //all the arguments in different tags
491: while (st.hasMoreTokens()) {
492: sb.append("\n\t\t\t\t<value>").append(st.nextToken())
493: .append("</value>");
494: }
495: //write in the input files
496: // Not doing it as using expilicit transfers
497: // instead of relying on GRMS to do it
498: /*
499: it = ((GRMSJob)job).iterator('i');
500: while(it.hasNext()){
501: NameValue nv = (NameValue)it.next();
502: sb.append(urlToXML(nv.getKey(),nv.getValue(),'i'));
503: }
504: //write in the output files
505: it = ((GRMSJob)job).iterator('o');
506: while(it.hasNext()){
507: NameValue nv = (NameValue)it.next();
508: sb.append(urlToXML(nv.getKey(),nv.getValue(),'o'));
509: }
510: */
511: sb.append("\n\t\t\t</arguments>");
512:
513: return sb.toString();
514: }
515:
516: /**
517: * This method returns the xml description of the url.
518: *
519: * @param lfn the logical name of the file associated with the url.
520: * @param url the url
521: * @param type i input url
522: * o output url
523: *
524: * @return String
525: */
526: protected String urlToXML(String lfn, String url, char type) {
527: StringBuffer sb = new StringBuffer();
528:
529: sb.append("\n\t\t\t").append("<file name = \"").append(lfn)
530: .append("\" type=\"");
531:
532: switch (type) {
533: case 'i':
534: sb.append("in");
535: break;
536: case 'o':
537: sb.append("out");
538: break;
539: default:
540: return null;
541: }
542:
543: sb.append("\">");
544: sb.append("\n\t\t\t\t").append("<url>").append(url).append(
545: "</url>");
546: sb.append("\n\t\t\t").append("</file>");
547:
548: return sb.toString();
549: }
550:
551: /**
552: * This method returns the xml description of the relations between the jobs.
553: * It refers to the associated ADag object with this class to get hold of the
554: * parents to the job.
555: *
556: * @param dag the dag of which the job is a part of.
557: * @param job the <code>SubInfo</code> object containing the job description.
558: *
559: * @return the xml element if there are any dependencies of the job
560: * else an empty string.
561: */
562: protected String relationsToXML(ADag dag, SubInfo job) {
563: StringBuffer sb = new StringBuffer();
564: Vector parents = dag.getParents(job.getName());
565: if (parents.isEmpty())
566: return sb.toString();
567:
568: sb.append("\n\t\t<workflow>");
569: for (Iterator it = parents.iterator(); it.hasNext();) {
570: //all the parents should be finished before invokeing the
571: //child.
572: sb.append("\n\t\t\t<parent triggerState=\"FINISHED\">")
573: .append(it.next()).append("</parent>");
574: }
575: sb.append("\n\t\t</workflow>");
576: return sb.toString();
577: }
578:
579: /**
580: * This method returns the xml description of the environment variables
581: * associated with the job.
582: *
583: * @param env the <code>EnvNS</code> object that contains the environment
584: * variables for the job.
585: *
586: * @return the xml element if there are any environment variables
587: * else an empty string.
588: */
589: protected String envToXML(ENV env) {
590: StringBuffer st = new StringBuffer();
591: String key = null;
592: String value = null;
593: Set s = env.keySet();
594: Iterator it = (s == null) ? null : s.iterator();
595: if (it == null)
596: return new String();
597:
598: st.append("\n\t\t\t<environment>");
599: while (it.hasNext()) {
600: key = (String) it.next();
601: value = (String) env.get(key);
602: st.append("\n\t\t\t\t<variable name =\"").append(key)
603: .append("\">").append(value).append("</variable>");
604: }
605: st.append("\n\t\t\t</environment>");
606: return st.toString();
607:
608: }
609:
610: /**
611: * Returns the xml header for the output xml file.
612: *
613: * @return String
614: */
615: private String getXMLHeader() {
616: String st = "<?xml version=\"1.0\" encoding=\"UTF-8\"?> ";
617: return st;
618:
619: }
620:
621: /**
622: * It initializes the write handle to the output file.
623: *
624: * @param filename the name of the file to which you want the write handle.
625: */
626: private void initializeWriteHandle(String filename) {
627: try {
628: mWriteHandle = new PrintWriter(new FileWriter(filename));
629: mLogger.log("Writing to file " + filename,
630: LogManager.DEBUG_MESSAGE_LEVEL);
631: } catch (Exception e) {
632: mLogger.log("Error while initialising handle to file "
633: + e.getMessage(), LogManager.FATAL_MESSAGE_LEVEL);
634: System.exit(1);
635: }
636:
637: writeString(this .getXMLHeader());
638: }
639:
640: /**
641: * Writes a string to the associated write handle with the class
642: *
643: * @param st the string to be written.
644: */
645: private void writeString(String st) {
646: //try{
647: //write the xml header
648: mWriteHandle.println(st);
649: /*}
650: catch(IOException ex){
651: System.out.println("Error while writing to xml " + ex.getMessage());
652: }*/
653: }
654:
655: }
|