001: /**
002: * This file or a portion of this file is licensed under the terms of
003: * the Globus Toolkit Public License, found in file GTPL, or at
004: * http://www.globus.org/toolkit/download/license.html. This notice must
005: * appear in redistributions of this file, with or without modification.
006: *
007: * Redistributions of this Software, with or without modification, must
008: * reproduce the GTPL in: (1) the Software, or (2) the Documentation or
009: * some other similar material which is provided with the Software (if
010: * any).
011: *
012: * Copyright 1999-2004 University of Chicago and The University of
013: * Southern California. All rights reserved.
014: */package org.griphyn.cPlanner.code.gridstart;
015:
016: import org.griphyn.cPlanner.common.LogManager;
017: import org.griphyn.cPlanner.common.PegasusProperties;
018:
019: import org.griphyn.cPlanner.code.GridStart;
020: import org.griphyn.cPlanner.code.POSTScript;
021:
022: import org.griphyn.cPlanner.classes.ADag;
023: import org.griphyn.cPlanner.classes.SubInfo;
024: import org.griphyn.cPlanner.classes.AggregatedJob;
025: import org.griphyn.cPlanner.classes.PegasusFile;
026: import org.griphyn.cPlanner.classes.TransferJob;
027: import org.griphyn.cPlanner.classes.PegasusBag;
028:
029: import org.griphyn.cPlanner.transfer.sls.SLSFactory;
030: import org.griphyn.cPlanner.transfer.SLS;
031:
032: import org.griphyn.cPlanner.namespace.VDS;
033: import org.griphyn.cPlanner.namespace.Dagman;
034:
035: import java.io.File;
036:
037: import java.util.Collection;
038: import java.util.Iterator;
039: import java.util.Set;
040: import java.io.IOException;
041: import java.io.FileWriter;
042: import org.griphyn.cPlanner.classes.PlannerOptions;
043: import org.griphyn.cPlanner.poolinfo.PoolInfoProvider;
044:
045: /**
046: * This class ends up running the job directly on the grid, without wrapping
047: * it in any other launcher executable.
048: * It ends up connecting the jobs stdio and stderr to condor commands to
049: * ensure they are sent back to the submit host.
050: *
051: *
052: * @author Karan Vahi vahi@isi.edu
053: * @version $Revision: 451 $
054: */
055:
056: public class NoGridStart implements GridStart {
057:
058: /**
059: * The basename of the class that is implmenting this. Could have
060: * been determined by reflection.
061: */
062: public static final String CLASSNAME = "NoGridStart";
063:
064: /**
065: * The SHORTNAME for this implementation.
066: */
067: public static final String SHORT_NAME = "none";
068:
069: /**
070: * The LogManager object which is used to log all the messages.
071: */
072: private LogManager mLogger;
073:
074: /**
075: * The object holding all the properties pertaining to Pegasus.
076: */
077: protected PegasusProperties mProps;
078:
079: /**
080: * The submit directory where the submit files are being generated for
081: * the workflow.
082: */
083: private String mSubmitDir;
084:
085: /**
086: * The argument string containing the arguments with which the exitcode
087: * is invoked on kickstart output.
088: */
089: private String mExitParserArguments;
090:
091: /**
092: * A boolean indicating whether to generate lof files or not.
093: */
094: private boolean mGenerateLOF;
095:
096: /**
097: * A boolean indicating whether to have worker node execution or not.
098: */
099: private boolean mWorkerNodeExecution;
100:
101: /**
102: * The handle to the SLS implementor
103: */
104: private SLS mSLS;
105:
106: /**
107: * The options passed to the planner.
108: */
109: private PlannerOptions mPOptions;
110:
111: /**
112: * Handle to the site catalog.
113: */
114: private PoolInfoProvider mSiteHandle;
115:
116: /**
117: * An instance variable to track if enabling is happening as part of a clustered job.
118: * See Bug 21 comments on Pegasus Bugzilla
119: */
120: private boolean mEnablingPartOfAggregatedJob;
121:
122: /**
123: * Initializes the GridStart implementation.
124: *
125: * @param bag the bag of objects that is used for initialization.
126: * @param dag the concrete dag so far.
127: */
128: public void initialize(PegasusBag bag, ADag dag) {
129: mLogger = bag.getLogger();
130: mSiteHandle = bag.getHandleToSiteCatalog();
131: mPOptions = bag.getPlannerOptions();
132: mSubmitDir = mPOptions.getSubmitDirectory();
133: mProps = bag.getPegasusProperties();
134: mGenerateLOF = mProps.generateLOFFiles();
135: mExitParserArguments = getExitCodeArguments();
136:
137: mWorkerNodeExecution = mProps.executeOnWorkerNode();
138: if (mWorkerNodeExecution) {
139: //load SLS
140: mSLS = SLSFactory.loadInstance(bag);
141: }
142: mEnablingPartOfAggregatedJob = false;
143: }
144:
145: /**
146: * Enables a collection of jobs and puts them into an AggregatedJob.
147: * The assumption here is that all the jobs are being enabled by the same
148: * implementation. It enables the jobs and puts them into the AggregatedJob
149: * that is passed to it.
150: *
151: * @param aggJob the AggregatedJob into which the collection has to be
152: * integrated.
153: * @param jobs the collection of jobs (SubInfo) that need to be enabled.
154: *
155: * @return the AggregatedJob containing the enabled jobs.
156: * @see #enable(SubInfo,boolean)
157: */
158: public AggregatedJob enable(AggregatedJob aggJob, Collection jobs) {
159: //sanity check for the arguments
160: if (aggJob.strargs != null && aggJob.strargs.length() > 0) {
161: construct(aggJob, "arguments", aggJob.strargs);
162: }
163:
164: //we do not want the jobs being clustered to be enabled
165: //for worker node execution just yet.
166: mEnablingPartOfAggregatedJob = true;
167:
168: for (Iterator it = jobs.iterator(); it.hasNext();) {
169: SubInfo job = (SubInfo) it.next();
170: //always pass isGlobus true as always
171: //interested only in executable strargs
172: this .enable(job, true);
173: aggJob.add(job);
174: }
175:
176: //set the flag back to false
177: mEnablingPartOfAggregatedJob = false;
178:
179: return aggJob;
180: }
181:
182: /**
183: * Enables a job to run on the grid by launching it directly. It ends
184: * up running the executable directly without going through any intermediate
185: * launcher executable. It connects the stdio, and stderr to underlying
186: * condor mechanisms so that they are transported back to the submit host.
187: *
188: * @param job the <code>SubInfo</code> object containing the job description
189: * of the job that has to be enabled on the grid.
190: * @param isGlobusJob is <code>true</code>, if the job generated a
191: * line <code>universe = globus</code>, and thus runs remotely.
192: * Set to <code>false</code>, if the job runs on the submit
193: * host in any way.
194: *
195: * @return boolean true if enabling was successful,else false in case when
196: * the path to kickstart could not be determined on the site where
197: * the job is scheduled.
198: */
199: public boolean enable(SubInfo job, boolean isGlobusJob) {
200: //take care of relative submit directory if specified
201: String submitDir = mSubmitDir + mSeparator;
202: // String submitDir = getSubmitDirectory( mSubmitDir , job) + mSeparator;
203:
204: //the executable path and arguments are put
205: //in the Condor namespace and not printed to the
206: //file so that they can be overriden if desired
207: //later through profiles and key transfer_executable
208: construct(job, "executable", job.executable);
209:
210: //sanity check for the arguments
211: if (job.strargs != null && job.strargs.length() > 0) {
212: construct(job, "arguments", job.strargs);
213: }
214:
215: // handle stdin
216: if (job.stdIn.length() > 0) {
217: construct(job, "input", submitDir + job.stdIn);
218: if (isGlobusJob) {
219: //this needs to be true as you want the stdin
220: //to be transfered to the remote execution
221: //pool, as in case of the transfer script.
222: //it needs to be set if the stdin is already
223: //prepopulated at the remote side which
224: //it is not.
225: construct(job, "transfer_input", "true");
226: }
227: }
228:
229: if (job.stdOut.length() > 0) {
230: //handle stdout
231: construct(job, "output", job.stdOut);
232: if (isGlobusJob) {
233: construct(job, "transfer_output", "false");
234: }
235: } else {
236: // transfer output back to submit host, if unused
237: construct(job, "output", submitDir + job.jobName + ".out");
238: if (isGlobusJob) {
239: construct(job, "transfer_output", "true");
240: }
241: }
242:
243: if (job.stdErr.length() > 0) {
244: //handle stderr
245: construct(job, "error", job.stdErr);
246: if (isGlobusJob) {
247: construct(job, "transfer_error", "false");
248: }
249: } else {
250: // transfer error back to submit host, if unused
251: construct(job, "error", submitDir + job.jobName + ".err");
252: if (isGlobusJob) {
253: construct(job, "transfer_error", "true");
254: }
255: }
256:
257: if (mWorkerNodeExecution && !mEnablingPartOfAggregatedJob) {
258: if (job.getJobType() == SubInfo.COMPUTE_JOB
259: || job.getJobType() == SubInfo.STAGED_COMPUTE_JOB) {
260:
261: if (!mSLS.doesCondorModifications()) {
262: throw new RuntimeException(
263: "Second Level Staging with NoGridStart only works with Condor SLS");
264: }
265:
266: String style = (String) job.vdsNS.get(VDS.STYLE_KEY);
267:
268: //remove the remote or initial dir's for the compute jobs
269: String key = (style.equalsIgnoreCase(VDS.GLOBUS_STYLE)) ? "remote_initialdir"
270: : "initialdir";
271:
272: String directory = (String) job.condorVariables
273: .removeKey(key);
274:
275: String destDir = mSiteHandle.getEnvironmentVariable(job
276: .getSiteHandle(), "wntmp");
277: destDir = (destDir == null) ? "/tmp" : destDir;
278:
279: String relativeDir = mPOptions
280: .getRelativeSubmitDirectory();
281: String workerNodeDir = destDir + File.separator
282: + relativeDir.replaceAll("/", "-");
283:
284: //always have the remote dir set to /tmp as we are
285: //banking upon kickstart to change the directory for us
286: job.condorVariables.construct(key, "/tmp");
287:
288: //modify the job if required
289: if (!mSLS.modifyJobForWorkerNodeExecution(job,
290: mSiteHandle.getURLPrefix(job.getSiteHandle()),
291: directory, workerNodeDir)) {
292: throw new RuntimeException("Unable to modify job "
293: + job.getName()
294: + " for worker node execution");
295: }
296:
297: }
298: }//end of worker node execution
299:
300: if (mGenerateLOF) {
301: //but generate lof files nevertheless
302:
303: //inefficient check here again. just a prototype
304: //we need to generate -S option only for non transfer jobs
305: //generate the list of filenames file for the input and output files.
306: if (!(job instanceof TransferJob)) {
307: generateListofFilenamesFile(job.getInputFiles(), job
308: .getID()
309: + ".in.lof");
310: }
311:
312: //for cleanup jobs no generation of stats for output files
313: if (job.getJobType() != SubInfo.CLEANUP_JOB) {
314: generateListofFilenamesFile(job.getOutputFiles(), job
315: .getID()
316: + ".out.lof");
317:
318: }
319: }///end of mGenerateLOF
320:
321: return true;
322: }
323:
324: /**
325: * Indicates whether the enabling mechanism can set the X bit
326: * on the executable on the remote grid site, in addition to launching
327: * it on the remote grid stie
328: *
329: * @return false, as no wrapper executable is being used.
330: */
331: public boolean canSetXBit() {
332: return false;
333: }
334:
335: /**
336: * Returns the value of the vds profile with key as VDS.GRIDSTART_KEY,
337: * that would result in the loading of this particular implementation.
338: * It is usually the name of the implementing class without the
339: * package name.
340: *
341: * @return the value of the profile key.
342: * @see org.griphyn.cPlanner.namespace.VDS#GRIDSTART_KEY
343: */
344: public String getVDSKeyValue() {
345: return this .CLASSNAME;
346: }
347:
348: /**
349: * Returns a short textual description in the form of the name of the class.
350: *
351: * @return short textual description.
352: */
353: public String shortDescribe() {
354: return this .SHORT_NAME;
355: }
356:
357: /**
358: * Returns the SHORT_NAME for the POSTScript implementation that is used
359: * to be as default with this GridStart implementation.
360: *
361: * @return the identifier for the NoPOSTScript POSTScript implementation.
362: *
363: * @see POSTScript#shortDescribe()
364: */
365: public String defaultPOSTScript() {
366: return NoPOSTScript.SHORT_NAME;
367: }
368:
369: /**
370: * Returns a boolean indicating whether to remove remote directory
371: * information or not from the job. This is determined on the basis of the
372: * style key that is associated with the job.
373: *
374: * @param job the job in question.
375: *
376: * @return boolean
377: */
378: private boolean removeDirectoryKey(SubInfo job) {
379: String style = job.vdsNS.containsKey(VDS.STYLE_KEY) ? null
380: : (String) job.vdsNS.get(VDS.STYLE_KEY);
381:
382: //is being run. Remove remote_initialdir if there
383: //condor style associated with the job
384: //Karan Nov 15,2005
385: return (style == null) ? false : style
386: .equalsIgnoreCase(VDS.CONDOR_STYLE);
387:
388: }
389:
390: /**
391: * Constructs a condor variable in the condor profile namespace
392: * associated with the job. Overrides any preexisting key values.
393: *
394: * @param job contains the job description.
395: * @param key the key of the profile.
396: * @param value the associated value.
397: */
398: private void construct(SubInfo job, String key, String value) {
399: job.condorVariables.construct(key, value);
400: }
401:
402: /**
403: * Returns a string containing the arguments with which the exitcode
404: * needs to be invoked.
405: *
406: * @return the argument string.
407: */
408: private String getExitCodeArguments() {
409: return mProps.getPOSTScriptArguments();
410: }
411:
412: /**
413: * Writes out the list of filenames file for the job.
414: *
415: * @param files the list of <code>PegasusFile</code> objects contains the files
416: * whose stat information is required.
417: *
418: * @param basename the basename of the file that is to be created
419: *
420: * @return the full path to lof file created, else null if no file is written out.
421: */
422: public String generateListofFilenamesFile(Set files, String basename) {
423: //sanity check
424: if (files == null || files.isEmpty()) {
425: return null;
426: }
427:
428: String result = null;
429: //writing the stdin file
430: try {
431: File f = new File(mSubmitDir, basename);
432: FileWriter input;
433: input = new FileWriter(f);
434: PegasusFile pf;
435: for (Iterator it = files.iterator(); it.hasNext();) {
436: pf = (PegasusFile) it.next();
437: input.write(pf.getLFN());
438: input.write("\n");
439: }
440: //close the stream
441: input.close();
442: result = f.getAbsolutePath();
443:
444: } catch (IOException e) {
445: mLogger.log("Unable to write the lof file " + basename, e,
446: LogManager.ERROR_MESSAGE_LEVEL);
447: }
448:
449: return result;
450: }
451:
452: }
|