0001: /**
0002: * This file or a portion of this file is licensed under the terms of
0003: * the Globus Toolkit Public License, found in file GTPL, or at
0004: * http://www.globus.org/toolkit/download/license.html. This notice must
0005: * appear in redistributions of this file, with or without modification.
0006: *
0007: * Redistributions of this Software, with or without modification, must
0008: * reproduce the GTPL in: (1) the Software, or (2) the Documentation or
0009: * some other similar material which is provided with the Software (if
0010: * any).
0011: *
0012: * Copyright 1999-2004 University of Chicago and The University of
0013: * Southern California. All rights reserved.
0014: */package org.griphyn.cPlanner.code.gridstart;
0015:
0016: import org.griphyn.cPlanner.classes.ADag;
0017: import org.griphyn.cPlanner.classes.SubInfo;
0018: import org.griphyn.cPlanner.classes.AggregatedJob;
0019: import org.griphyn.cPlanner.classes.TransferJob;
0020: import org.griphyn.cPlanner.classes.SiteInfo;
0021: import org.griphyn.cPlanner.classes.PegasusFile;
0022: import org.griphyn.cPlanner.classes.FileTransfer;
0023: import org.griphyn.cPlanner.classes.NameValue;
0024: import org.griphyn.cPlanner.classes.PegasusBag;
0025: import org.griphyn.cPlanner.classes.PlannerOptions;
0026:
0027: import org.griphyn.cPlanner.common.LogManager;
0028: import org.griphyn.cPlanner.common.PegasusProperties;
0029:
0030: import org.griphyn.cPlanner.poolinfo.PoolInfoProvider;
0031: import org.griphyn.cPlanner.namespace.Condor;
0032: import org.griphyn.cPlanner.namespace.ENV;
0033: import org.griphyn.cPlanner.namespace.VDS;
0034:
0035: import org.griphyn.cPlanner.code.GridStart;
0036: import org.griphyn.cPlanner.code.POSTScript;
0037:
0038: import org.griphyn.cPlanner.code.generator.condor.CondorQuoteParser;
0039: import org.griphyn.cPlanner.code.generator.condor.CondorQuoteParserException;
0040:
0041: import org.griphyn.cPlanner.transfer.SLS;
0042:
0043: import org.griphyn.cPlanner.transfer.sls.SLSFactory;
0044: import org.griphyn.cPlanner.transfer.sls.SLSFactoryException;
0045:
0046: import java.util.Collection;
0047: import java.util.Iterator;
0048: import java.util.StringTokenizer;
0049: import java.util.Set;
0050: import java.util.List;
0051: import java.util.ArrayList;
0052:
0053: import java.io.File;
0054: import java.io.FileWriter;
0055: import java.io.IOException;
0056: import org.griphyn.cPlanner.code.CodeGeneratorException;
0057:
0058: /**
0059: * This enables a job to be run on the grid, by launching it through kickstart.
0060: * The kickstart executable is a light-weight program which connects the
0061: * stdin, stdout and stderr filehandles for VDS jobs on the remote
0062: * site.
0063: * <p>
0064: * Sitting in between the remote scheduler and the executable, it is
0065: * possible for kickstart to gather additional information about the
0066: * executable run-time behavior, including the exit status of jobs.
0067: * <p>
0068: * Kickstart is an executable distributed with VDS that can generally be found
0069: * at $PEGASUS_HOME/bin/kickstart
0070: *
0071: * @author Karan Vahi vahi@isi.edu
0072: * @version $Revision: 454 $
0073: */
0074: public class Kickstart implements GridStart {
0075:
0076: /**
0077: * The suffix for the kickstart input file, that is generated to use
0078: * invoke at the remote end.
0079: */
0080: public static final String KICKSTART_INPUT_SUFFIX = "arg";
0081:
0082: /**
0083: * The basename of the class that is implmenting this. Could have
0084: * been determined by reflection.
0085: */
0086: public static final String CLASSNAME = "Kickstart";
0087:
0088: /**
0089: * The SHORTNAME for this implementation.
0090: */
0091: public static final String SHORT_NAME = "kickstart";
0092:
0093: /**
0094: * The environment variable used to the set Kickstart SETUP JOB.
0095: */
0096: public static final String KICKSTART_SETUP = "GRIDSTART_SETUP";
0097:
0098: /**
0099: * The environment variable used to the set Kickstart PREJOB.
0100: */
0101: public static final String KICKSTART_PREJOB = "GRIDSTART_PREJOB";
0102:
0103: /**
0104: * The environment variable used to the set Kickstart POSTJOB.
0105: */
0106: public static final String KICKSTART_POSTJOB = "GRIDSTART_POSTJOB";
0107:
0108: /**
0109: * The environment variable used to the set Kickstart CLEANUP JOB.
0110: */
0111: public static final String KICKSTART_CLEANUP = "GRIDSTART_CLEANUP";
0112:
0113: /**
0114: * The LogManager object which is used to log all the messages.
0115: */
0116: private LogManager mLogger;
0117:
0118: /**
0119: * The object holding all the properties pertaining to Pegasus.
0120: */
0121: private PegasusProperties mProps;
0122:
0123: /**
0124: * The options passed to the planner.
0125: */
0126: private PlannerOptions mPOptions;
0127:
0128: /**
0129: * The handle to the workflow that is being enabled.
0130: */
0131: private ADag mConcDAG;
0132:
0133: /**
0134: * Handle to the site catalog.
0135: */
0136: private PoolInfoProvider mSiteHandle;
0137:
0138: /**
0139: * The submit directory where the submit files are being generated for
0140: * the workflow.
0141: */
0142: private String mSubmitDir;
0143:
0144: /**
0145: * A boolean indicating whether to use invoke always or not.
0146: */
0147: private boolean mInvokeAlways;
0148:
0149: /**
0150: * A boolean indicating whether to stat files or not.
0151: */
0152: private boolean mDoStat;
0153:
0154: /**
0155: * A boolean indicating whether to generate lof files or not.
0156: */
0157: private boolean mGenerateLOF;
0158:
0159: /**
0160: * The invoke limit trigger.
0161: */
0162: private long mInvokeLength;
0163:
0164: /**
0165: * A boolean indicating whether to have worker node execution or not.
0166: */
0167: private boolean mWorkerNodeExecution;
0168:
0169: /**
0170: * The handle to the SLS implementor
0171: */
0172: private SLS mSLS;
0173:
0174: /**
0175: * An instance variable to track if enabling is happening as part of a clustered job.
0176: * See Bug 21 comments on Pegasus Bugzilla
0177: */
0178: private boolean mEnablingPartOfAggregatedJob;
0179:
0180: /**
0181: * Initializes the GridStart implementation.
0182: *
0183: * @param bag the bag of objects that is used for initialization.
0184: * @param dag the concrete dag so far.
0185: */
0186: public void initialize(PegasusBag bag, ADag dag) {
0187:
0188: mProps = bag.getPegasusProperties();
0189: mPOptions = bag.getPlannerOptions();
0190: mSubmitDir = mPOptions.getSubmitDirectory();
0191: mInvokeAlways = mProps.useInvokeInGridStart();
0192: mInvokeLength = mProps.getGridStartInvokeLength();
0193: mDoStat = mProps.doStatWithKickstart();
0194: mGenerateLOF = mProps.generateLOFFiles();
0195: mLogger = LogManager.getInstance();
0196: mConcDAG = dag;
0197: mSiteHandle = bag.getHandleToSiteCatalog();
0198:
0199: mWorkerNodeExecution = mProps.executeOnWorkerNode();
0200: if (mWorkerNodeExecution) {
0201: //load SLS
0202: mSLS = SLSFactory.loadInstance(bag);
0203: }
0204: mEnablingPartOfAggregatedJob = false;
0205: }
0206:
0207: /**
0208: * Enables a collection of jobs and puts them into an AggregatedJob.
0209: * The assumption here is that all the jobs are being enabled by the same
0210: * implementation. It enables the jobs and puts them into the AggregatedJob
0211: * that is passed to it.
0212: * However, to create a valid single XML file, it suppresses the header
0213: * creation for all but the first job.
0214: *
0215: * @param aggJob the AggregatedJob into which the collection has to be
0216: * integrated.
0217: * @param jobs the collection of jobs (SubInfo) that need to be enabled.
0218: *
0219: * @return the AggregatedJob containing the enabled jobs.
0220: * @see #enable(SubInfo,boolean)
0221: */
0222: public AggregatedJob enable(AggregatedJob aggJob, Collection jobs) {
0223: boolean first = true;
0224:
0225: //we do not want the jobs being clustered to be enabled
0226: //for worker node execution just yet.
0227: mEnablingPartOfAggregatedJob = true;
0228:
0229: for (Iterator it = jobs.iterator(); it.hasNext();) {
0230: SubInfo job = (SubInfo) it.next();
0231: if (first) {
0232: first = false;
0233: } else {
0234: //we need to pass -H to kickstart
0235: //to suppress the header creation
0236: job.vdsNS.construct(VDS.GRIDSTART_ARGUMENTS_KEY, "-H");
0237: }
0238:
0239: //always pass isGlobus true as always
0240: //interested only in executable strargs
0241: //due to the fact that seqexec does not allow for setting environment
0242: //per constitutent job, we cannot set the postscript removal option
0243: this .enable(job, true, mDoStat, false);
0244: aggJob.add(job);
0245: //check if any files are being transferred via
0246: //Condor and add to Aggregated Job
0247: //add condor keys to transfer files
0248: //This is now taken care of in the merge profiles section
0249: // if(job.condorVariables.containsKey(Condor.TRANSFER_IP_FILES_KEY)){
0250: // aggJob.condorVariables.addIPFileForTransfer(
0251: // (String)job.condorVariables.get( Condor.TRANSFER_IP_FILES_KEY) );
0252: //
0253: // }
0254: }
0255:
0256: //set the flag back to false
0257: mEnablingPartOfAggregatedJob = false;
0258:
0259: return aggJob;
0260: }
0261:
0262: /**
0263: * Enables a job to run on the grid by launching it through kickstart.
0264: * Does the stdio, and stderr handling of the job to be run on the grid.
0265: * It modifies the job description, and also constructs all the valid
0266: * option to be passed to kickstart for launching the executable.
0267: *
0268: * @param job the <code>SubInfo</code> object containing the job description
0269: * of the job that has to be enabled on the grid.
0270: * @param isGlobusJob is <code>true</code>, if the job generated a
0271: * line <code>universe = globus</code>, and thus runs remotely.
0272: * Set to <code>false</code>, if the job runs on the submit
0273: * host in any way.
0274: *
0275: * @return boolean true if enabling was successful,else false in case when
0276: * the path to kickstart could not be determined on the site where
0277: * the job is scheduled.
0278: */
0279: public boolean enable(SubInfo job, boolean isGlobusJob) {
0280: return this .enable(job, isGlobusJob, mDoStat, true);
0281: }
0282:
0283: /**
0284: * Enables a job to run on the grid by launching it through kickstart.
0285: * Does the stdio, and stderr handling of the job to be run on the grid.
0286: * It modifies the job description, and also constructs all the valid
0287: * option to be passed to kickstart for launching the executable.
0288: *
0289: * @param job the <code>SubInfo</code> object containing the job description
0290: * of the job that has to be enabled on the grid.
0291: * @param isGlobusJob is <code>true</code>, if the job generated a
0292: * line <code>universe = globus</code>, and thus runs remotely.
0293: * Set to <code>false</code>, if the job runs on the submit
0294: * host in any way.
0295: * @param stat boolean indicating whether to generate the lof files
0296: * for kickstart stat option or not.
0297: * @param addPostScript boolean indicating whether to add a postscript or not.
0298: *
0299: * @return boolean true if enabling was successful,else false in case when
0300: * the path to kickstart could not be determined on the site where
0301: * the job is scheduled.
0302: */
0303: protected boolean enable(SubInfo job, boolean isGlobusJob,
0304: boolean stat, boolean addPostScript) {
0305:
0306: //take care of relative submit directory if specified.
0307: String submitDir = mSubmitDir + mSeparator;
0308: // String submitDir = getSubmitDirectory( mSubmitDir , job) + mSeparator;
0309:
0310: //To get the gridstart/kickstart path on the remote
0311: //pool, querying with entry for vanilla universe.
0312: //In the new format the gridstart is associated with the
0313: //pool not pool, condor universe
0314: SiteInfo site = mSiteHandle.getPoolEntry(job.executionPool,
0315: Condor.VANILLA_UNIVERSE);
0316: String gridStartPath = site.getKickstartPath();
0317: //sanity check
0318: if (gridStartPath == null) {
0319: return false;
0320: }
0321: StringBuffer gridStartArgs = new StringBuffer();
0322:
0323: // the executable is gridstart, the application becomes its argument
0324: gridStartArgs.append("-n ");
0325: gridStartArgs.append(job.getCompleteTCName());
0326: gridStartArgs.append(' ');
0327: gridStartArgs.append("-N ");
0328: gridStartArgs.append(job.getCompleteDVName());
0329: gridStartArgs.append(' ');
0330:
0331: // handle stdin
0332: if (job.stdIn.length() > 0) {
0333:
0334: //for using the transfer script and other vds executables the
0335: //input file is transferred from the submit host by Condor to
0336: //stdin. We fool the kickstart to pick up the input file from
0337: //standard stdin by giving the input file name as -
0338: if (job.logicalName
0339: .equals(org.griphyn.cPlanner.transfer.implementation.Transfer.TRANSFORMATION_NAME)
0340: || job.logicalName
0341: .equals(org.griphyn.cPlanner.transfer.implementation.T2.TRANSFORMATION_NAME)
0342: || job.logicalName
0343: .equals(org.griphyn.cPlanner.cluster.aggregator.SeqExec.COLLAPSE_LOGICAL_NAME)
0344: || job.logicalName
0345: .equals(org.griphyn.cPlanner.cluster.aggregator.MPIExec.COLLAPSE_LOGICAL_NAME)
0346: || job.logicalName
0347: .equals(org.griphyn.cPlanner.engine.cleanup.Cleanup.TRANSFORMATION_NAME)) {
0348:
0349: //condor needs to pick up the job stdin and
0350: //transfer it to the remote end
0351: construct(job, "input", submitDir + job.getStdIn());
0352: gridStartArgs.append("-i ").append("-").append(' ');
0353:
0354: } else {
0355: //kickstart provides the app's *tracked* stdin
0356: gridStartArgs.append("-i ").append(job.stdIn).append(
0357: ' ');
0358: }
0359: }
0360:
0361: // handle stdout
0362: if (job.stdOut.length() > 0) {
0363: // gridstart saves the app's *tracked* stdout
0364: gridStartArgs.append("-o ").append(job.stdOut).append(' ');
0365: }
0366:
0367: // the Condor output variable and kickstart -o option
0368: // must not point to the same file for any local job.
0369: if (job.stdOut.equals(job.jobName + ".out") && !isGlobusJob) {
0370: mLogger.log("Detected WAW conflict for stdout",
0371: LogManager.WARNING_MESSAGE_LEVEL);
0372: }
0373: // the output of gridstart is propagated back to the submit host
0374: construct(job, "output", submitDir + job.jobName + ".out");
0375:
0376: if (isGlobusJob) {
0377: construct(job, "transfer_output", "true");
0378: }
0379:
0380: // handle stderr
0381: if (job.stdErr.length() > 0) {
0382: // gridstart saves the app's *tracked* stderr
0383: gridStartArgs.append("-e ").append(job.stdErr).append(' ');
0384: }
0385:
0386: // the Condor error variable and kickstart -e option
0387: // must not point to the same file for any local job.
0388: if (job.stdErr.equals(job.jobName + ".err") && !isGlobusJob) {
0389: mLogger.log("Detected WAW conflict for stderr",
0390: LogManager.WARNING_MESSAGE_LEVEL);
0391: }
0392: // the error from gridstart is propagated back to the submit host
0393: construct(job, "error", submitDir + job.jobName + ".err");
0394: if (isGlobusJob) {
0395: construct(job, "transfer_error", "true");
0396: }
0397:
0398: //we need to pass the resource handle
0399: //to kickstart as argument
0400: gridStartArgs.append("-R ").append(job.executionPool).append(
0401: ' ');
0402:
0403: //handle the -w option that asks kickstart to change
0404: //directory before launching an executable.
0405: String style = (String) job.vdsNS.get(VDS.STYLE_KEY);
0406: if (job.vdsNS.getBooleanValue(VDS.CHANGE_DIR_KEY)
0407: && !mWorkerNodeExecution) {
0408: style = (String) job.vdsNS.get(VDS.STYLE_KEY);
0409:
0410: // Commented to take account of submitting to condor pool
0411: // directly or glide in nodes. However, does not work for
0412: // standard universe jobs. Also made change in Kickstart
0413: // to pick up only remote_initialdir Karan Nov 15,2005
0414: String directory = (style
0415: .equalsIgnoreCase(VDS.GLOBUS_STYLE) || style
0416: .equalsIgnoreCase(VDS.GLIDEIN_STYLE)) ? (String) job.condorVariables
0417: .removeKey("remote_initialdir")
0418: : (String) job.condorVariables
0419: .removeKey("initialdir");
0420:
0421: //pass the directory as an argument to kickstart
0422: gridStartArgs.append("-w ").append(directory).append(' ');
0423: }
0424:
0425: if (job.vdsNS.getBooleanValue(VDS.TRANSFER_PROXY_KEY)) {
0426: //just remove the remote_initialdir key
0427: //the job needs to be run in the directory
0428: //Condor or GRAM decides to run
0429: job.condorVariables.removeKey("remote_initialdir");
0430:
0431: }
0432:
0433: if (mWorkerNodeExecution && !mEnablingPartOfAggregatedJob) {
0434: enableForWorkerNodeExecution(job, gridStartArgs);
0435: }
0436:
0437: //check if the job type indicates staging of executable
0438: // The -X functionality is handled by the setup jobs that
0439: // are added as childern to the stage in jobs.
0440: // Karan November 22, 2005
0441: // if(job.getJobClassInt() == SubInfo.STAGED_COMPUTE_JOB){
0442: // //add the -X flag to denote turning on
0443: // gridStartArgs.append("-X ");
0444: // }
0445:
0446: //add the stat options to kickstart only for certain jobs for time being
0447: //and if the input variable is true
0448: if (stat) {
0449: if (job.getJobType() == SubInfo.COMPUTE_JOB
0450: || job.getJobType() == SubInfo.STAGED_COMPUTE_JOB
0451: || job.getJobType() == SubInfo.CLEANUP_JOB
0452: || job.getJobType() == SubInfo.STAGE_IN_JOB
0453: || job.getJobType() == SubInfo.INTER_POOL_JOB) {
0454:
0455: String lof;
0456: List files = new ArrayList(2);
0457:
0458: //inefficient check here again. just a prototype
0459: //we need to generate -S option only for non transfer jobs
0460: //generate the list of filenames file for the input and output files.
0461: if (!(job instanceof TransferJob)) {
0462: lof = generateListofFilenamesFile(job
0463: .getInputFiles(), job.getID() + ".in.lof");
0464: if (lof != null) {
0465: File file = new File(lof);
0466: job.condorVariables.addIPFileForTransfer(lof);
0467: //arguments just need basename . no path component
0468: gridStartArgs.append("-S @").append(
0469: file.getName()).append(" ");
0470: files.add(file.getName());
0471: }
0472: }
0473:
0474: //for cleanup jobs no generation of stats for output files
0475: if (job.getJobType() != SubInfo.CLEANUP_JOB) {
0476: lof = generateListofFilenamesFile(job
0477: .getOutputFiles(), job.getID() + ".out.lof");
0478: if (lof != null) {
0479: File file = new File(lof);
0480: job.condorVariables.addIPFileForTransfer(lof);
0481: //arguments just need basename . no path component
0482: gridStartArgs.append("-s @").append(
0483: file.getName()).append(" ");
0484: files.add(file.getName());
0485: }
0486: }
0487: //add kickstart postscript that removes these files
0488: if (addPostScript) {
0489: addCleanupPostScript(job, files);
0490: }
0491: }
0492: }//end of if ( stat )
0493: else if (mGenerateLOF) {
0494: //dostat is false. so no generation of stat option
0495: //but generate lof files nevertheless
0496:
0497: //inefficient check here again. just a prototype
0498: //we need to generate -S option only for non transfer jobs
0499: //generate the list of filenames file for the input and output files.
0500: if (!(job instanceof TransferJob)) {
0501: generateListofFilenamesFile(job.getInputFiles(), job
0502: .getID()
0503: + ".in.lof");
0504: }
0505:
0506: //for cleanup jobs no generation of stats for output files
0507: if (job.getJobType() != SubInfo.CLEANUP_JOB) {
0508: generateListofFilenamesFile(job.getOutputFiles(), job
0509: .getID()
0510: + ".out.lof");
0511:
0512: }
0513: }///end of mGenerateLOF
0514:
0515: //append any arguments that need to be passed
0516: //kickstart directly, set elsewhere
0517: if (job.vdsNS.containsKey(VDS.GRIDSTART_ARGUMENTS_KEY)) {
0518: gridStartArgs.append(
0519: job.vdsNS.get(VDS.GRIDSTART_ARGUMENTS_KEY)).append(
0520: ' ');
0521: }
0522:
0523: if (mProps.generateKickstartExtraOptions() && mConcDAG != null) {
0524: gridStartArgs.append("-L ").append(mConcDAG.getLabel())
0525: .append(" ");
0526: gridStartArgs.append("-T ").append(mConcDAG.getMTime())
0527: .append(" ");
0528: }
0529:
0530: long argumentLength = gridStartArgs.length()
0531: + job.executable.length() + 1 + job.strargs.length();
0532: if (mInvokeAlways || argumentLength > mInvokeLength) {
0533: if (!useInvoke(job, gridStartArgs)) {
0534: mLogger.log("Unable to use invoke for job ",
0535: LogManager.ERROR_MESSAGE_LEVEL);
0536: return false;
0537: }
0538: } else {
0539: gridStartArgs.append(job.executable).append(' ').append(
0540: job.strargs);
0541: }
0542:
0543: //the executable path and arguments are put
0544: //in the Condor namespace and not printed to the
0545: //file so that they can be overriden if desired
0546: //later through profiles and key transfer_executable
0547: construct(job, "executable", handleTransferOfExecutable(job,
0548: gridStartPath));
0549: construct(job, "arguments", gridStartArgs.toString());
0550:
0551: //all finished successfully
0552: return true;
0553: }
0554:
0555: /**
0556: * It changes the paths to the executable depending on whether we want to
0557: * transfer the executable or not. If the transfer_executable is set to true,
0558: * then the executable needs to be shipped from the submit host meaning the
0559: * local pool. This function changes the path of the executable to the one on
0560: * the local pool, so that it can be shipped.
0561: *
0562: * @param job the <code>SubInfo</code> containing the job description.
0563: * @param path the path to kickstart on the remote compute site.
0564: *
0565: * @return the path that needs to be set as the executable
0566: */
0567: protected String handleTransferOfExecutable(SubInfo job, String path) {
0568: Condor cvar = job.condorVariables;
0569:
0570: if (!cvar.getBooleanValue("transfer_executable")) {
0571: //the executable paths are correct and
0572: //point to the executable on the remote pool
0573: return path;
0574: }
0575:
0576: SiteInfo site = mSiteHandle.getPoolEntry("local",
0577: Condor.VANILLA_UNIVERSE);
0578: String gridStartPath = site.getKickstartPath();
0579: if (gridStartPath == null) {
0580: mLogger.log(
0581: "Gridstart needs to be shipped from the submit host to pool"
0582: + job.getSiteHandle()
0583: + ".\nNo entry for it in pool local",
0584: LogManager.ERROR_MESSAGE_LEVEL);
0585: throw new RuntimeException(
0586: "GridStart needs to be shipped from submit host to site "
0587: + job.getSiteHandle() + " for job "
0588: + job.getName());
0589:
0590: }
0591:
0592: return gridStartPath;
0593:
0594: }
0595:
0596: /**
0597: * Enables a job for worker node execution, by calling out to the SLS
0598: * interface to do the second level staging. Also adds the appropriate
0599: * prejob/setup job/post/cleanup jobs to the job if required.
0600: *
0601: *
0602: * @param job the job to be enabled
0603: * @param args the arguments constructed so far.
0604: */
0605: protected void enableForWorkerNodeExecution(SubInfo job,
0606: StringBuffer args) {
0607: String style = (String) job.vdsNS.get(VDS.STYLE_KEY);
0608:
0609: if (job.getJobType() == SubInfo.COMPUTE_JOB
0610: || job.getJobType() == SubInfo.STAGED_COMPUTE_JOB) {
0611: mLogger.log("Enabling job for worker node execution "
0612: + job.getName(), LogManager.DEBUG_MESSAGE_LEVEL);
0613:
0614: //To Do handle staged compute jobs also.
0615: //and clustered jobs
0616:
0617: //remove the remote or initial dir's for the compute jobs
0618: String key = (style.equalsIgnoreCase(VDS.GLOBUS_STYLE)) ? "remote_initialdir"
0619: : "initialdir";
0620:
0621: String directory = (String) job.condorVariables
0622: .removeKey(key);
0623:
0624: String destDir = mSiteHandle.getEnvironmentVariable(job
0625: .getSiteHandle(), "wntmp");
0626: destDir = (destDir == null) ? "/tmp" : destDir;
0627:
0628: String relativeDir = mPOptions.getRelativeSubmitDirectory();
0629: String workerNodeDir = destDir + File.separator
0630: + relativeDir.replaceAll("/", "-");
0631:
0632: //pass the worker node directory as an argument to kickstart
0633: //because most jobmanagers cannot handle worker node tmp
0634: //as they check for existance on the head node
0635: StringBuffer xBitSetInvocation = null;
0636: if (!mSLS.doesCondorModifications()) {
0637: //only valid if job does not use SLS condor
0638: args.append("-W ").append(workerNodeDir).append(' ');
0639:
0640: //handle for staged compute jobs. set their X bit after
0641: // SLS has happened
0642: if (job.getJobType() == SubInfo.STAGED_COMPUTE_JOB) {
0643: xBitSetInvocation = new StringBuffer();
0644: xBitSetInvocation.append("/bin/chmod 600 ");
0645:
0646: for (Iterator it = job.getInputFiles().iterator(); it
0647: .hasNext();) {
0648: PegasusFile pf = (PegasusFile) it.next();
0649: if (pf.getType() == PegasusFile.EXECUTABLE_FILE) {
0650: // //the below does not work as kickstart attempts to
0651: // //set the X bit before running any prejobs
0652: // args.append( "-X " ).append( workerNodeDir ).
0653: // append( File.separator ).append( pf.getLFN() ).append(' ');
0654: xBitSetInvocation.append(pf.getLFN())
0655: .append(" ");
0656: }
0657: }
0658: }
0659: }
0660:
0661: //always have the remote dir set to /tmp as we are
0662: //banking upon kickstart to change the directory for us
0663: job.condorVariables.construct(key, "/tmp");
0664:
0665: //see if we need to generate a SLS input file in the submit directory
0666: File slsInputFile = null;
0667: if (mSLS.needsSLSInput(job)) {
0668: //generate the sls file with the mappings in the submit directory
0669: slsInputFile = mSLS.generateSLSInputFile(job, mSLS
0670: .getSLSInputLFN(job), mSubmitDir, directory,
0671: workerNodeDir);
0672:
0673: //construct a setup job not reqd as kickstart creating the directory
0674: //String setupJob = constructSetupJob( job, workerNodeDir );
0675: //setupJob = quote( setupJob );
0676: //job.envVariables.construct( this.KICKSTART_SETUP, setupJob );
0677:
0678: File headNodeSLS = new File(directory, slsInputFile
0679: .getName());
0680: String preJob = mSLS.invocationString(job, headNodeSLS);
0681:
0682: if (preJob != null) {
0683: /*
0684: //add the x bit invocation if required
0685: //this is required till kickstart -X feature is fixed
0686: //it needs to be invoked after the prejob
0687: if( xBitSetInvocation != null ){
0688: if( preJob.startsWith( "/bin/bash" ) ){
0689: //remove the last " and add the x bit invocation
0690: if( preJob.lastIndexOf( "\"" ) == preJob.length() - 1 ){
0691: preJob = preJob.substring( 0, preJob.length() - 1 );
0692: xBitSetInvocation.append( "\"" );
0693: preJob += " && " + xBitSetInvocation.toString();
0694: }
0695: }
0696: else{
0697: //prepend a /bin/bash -c invocation
0698: preJob = "/bin/bash -c \"" + preJob + xBitSetInvocation.toString();
0699: }
0700: }
0701: */
0702:
0703: preJob = quote(preJob);
0704: job.envVariables.construct(this .KICKSTART_PREJOB,
0705: preJob);
0706: }
0707: }
0708:
0709: //see if we need to generate a SLS output file in the submit directory
0710: File slsOutputFile = null;
0711: if (mSLS.needsSLSOutput(job)) {
0712: //construct the postjob that transfers the output files
0713: //back to head node directory
0714: //to fix later. right now post job only created is pre job
0715: //created
0716: slsOutputFile = mSLS.generateSLSOutputFile(job, mSLS
0717: .getSLSOutputLFN(job), mSubmitDir, directory,
0718: workerNodeDir);
0719:
0720: //generate the post job
0721: File headNodeSLS = new File(directory, slsOutputFile
0722: .getName());
0723: String postJob = mSLS
0724: .invocationString(job, headNodeSLS);
0725: if (postJob != null) {
0726: postJob = quote(postJob);
0727: job.envVariables.construct(this .KICKSTART_POSTJOB,
0728: postJob);
0729: }
0730: }
0731:
0732: //modify the job if required
0733: if (!mSLS.modifyJobForWorkerNodeExecution(job, mSiteHandle
0734: .getURLPrefix(job.getSiteHandle()), directory,
0735: workerNodeDir)) {
0736:
0737: throw new RuntimeException("Unable to modify job "
0738: + job.getName() + " for worker node execution");
0739:
0740: }
0741:
0742: //only to have cleanup job when not using condor modifications
0743: if (!mSLS.doesCondorModifications()) {
0744: String cleanupJob = constructCleanupJob(job,
0745: workerNodeDir);
0746: if (cleanupJob != null) {
0747: cleanupJob = quote(cleanupJob);
0748: job.envVariables.construct(this .KICKSTART_CLEANUP,
0749: cleanupJob);
0750: }
0751: }
0752: }
0753: }
0754:
0755: /**
0756: * Indicates whether the enabling mechanism can set the X bit
0757: * on the executable on the remote grid site, in addition to launching
0758: * it on the remote grid site.
0759: *
0760: * @return true indicating Kickstart can set the X bit or not.
0761: */
0762: public boolean canSetXBit() {
0763: return true;
0764: }
0765:
0766: /**
0767: * Returns the value of the vds profile with key as VDS.GRIDSTART_KEY,
0768: * that would result in the loading of this particular implementation.
0769: * It is usually the name of the implementing class without the
0770: * package name.
0771: *
0772: * @return the value of the profile key.
0773: * @see org.griphyn.cPlanner.namespace.VDS#GRIDSTART_KEY
0774: */
0775: public String getVDSKeyValue() {
0776: return this .CLASSNAME;
0777: }
0778:
0779: /**
0780: * Returns a short textual description in the form of the name of the class.
0781: *
0782: * @return short textual description.
0783: */
0784: public String shortDescribe() {
0785: return this .SHORT_NAME;
0786: }
0787:
0788: /**
0789: * Returns the SHORT_NAME for the POSTScript implementation that is used
0790: * to be as default with this GridStart implementation.
0791: *
0792: * @return the identifier for the ExitPOST POSTScript implementation.
0793: *
0794: * @see POSTScript#shortDescribe()
0795: */
0796: public String defaultPOSTScript() {
0797: return ExitPOST.SHORT_NAME;
0798: }
0799:
0800: /**
0801: * Triggers the creation of the kickstart input file, that contains the
0802: * the remote executable and the arguments with which it has to be invoked.
0803: * The kickstart input file is created in the submit directory.
0804: *
0805: * @param job the <code>SubInfo</code> object containing the job description.
0806: * @param args the arguments buffer for gridstart invocation so far.
0807: *
0808: * @return boolean indicating whether kickstart input file was generated or not.
0809: * false in case of any error.
0810: */
0811: private boolean useInvoke(SubInfo job, StringBuffer args) {
0812: boolean result = true;
0813:
0814: String inputBaseName = job.jobName + "."
0815: + this .KICKSTART_INPUT_SUFFIX;
0816:
0817: //writing the stdin file
0818: try {
0819: FileWriter input;
0820: input = new FileWriter(new File(mSubmitDir, inputBaseName));
0821: //the first thing that goes in is the executable name
0822: input.write(job.executable);
0823: input.write("\n");
0824: //write out all the arguments
0825: //one on each line
0826: StringTokenizer st = new StringTokenizer(job.strargs);
0827: while (st.hasMoreTokens()) {
0828: input.write(st.nextToken());
0829: input.write("\n");
0830: }
0831: //close the stream
0832: input.close();
0833: } catch (Exception e) {
0834: mLogger.log(
0835: "Unable to write the kickstart input file for job "
0836: + job.getCompleteTCName() + " "
0837: + e.getMessage(),
0838: LogManager.ERROR_MESSAGE_LEVEL);
0839: return false;
0840: }
0841:
0842: //construct list of files that need to be transferred
0843: //via Condor file transfer mechanism
0844: String fileList;
0845: if (job.condorVariables
0846: .containsKey(Condor.TRANSFER_IP_FILES_KEY)) {
0847: //update the existing list.
0848: fileList = (String) job.condorVariables
0849: .get(Condor.TRANSFER_IP_FILES_KEY);
0850: if (fileList != null) {
0851: fileList += "," + inputBaseName;
0852: }
0853: } else {
0854: fileList = inputBaseName;
0855: }
0856:
0857: construct(job, Condor.TRANSFER_IP_FILES_KEY, fileList);
0858: construct(job, "should_transfer_files", "YES");
0859: construct(job, "when_to_transfer_output", "ON_EXIT");
0860:
0861: //add the -I argument to kickstart
0862: args.append("-I ").append(inputBaseName).append(" ");
0863: return result;
0864: }
0865:
0866: /**
0867: * Constructs a kickstart setup job
0868: *
0869: * @param job the job to be run.
0870: * @param workerNodeTmp the worker node tmp to run the job in.
0871: *
0872: * @return String
0873: */
0874: protected String constructSetupJob(SubInfo job, String workerNodeTmp) {
0875: StringBuffer setup = new StringBuffer();
0876:
0877: setup.append("/bin/mkdir -p ").append(workerNodeTmp);
0878:
0879: return setup.toString();
0880: }
0881:
0882: /**
0883: * Constructs a kickstart setup job
0884: *
0885: * @param job the job to be run.
0886: * @param workerNodeTmp the worker node tmp to run the job in.
0887: *
0888: * @return String
0889: */
0890: protected String constructCleanupJob(SubInfo job,
0891: String workerNodeTmp) {
0892: StringBuffer setup = new StringBuffer();
0893:
0894: setup.append("/bin/rm -rf ").append(workerNodeTmp);
0895:
0896: return setup.toString();
0897: }
0898:
0899: /**
0900: * Constructs the prejob that fetches sls file, and then invokes transfer
0901: * again.
0902: *
0903: * @param job the job for which the prejob is being created
0904: * @param headNodeURLPrefix String
0905: * @param headNodeDirectory String
0906: * @param workerNodeDirectory String
0907: * @param slsFile String
0908: *
0909: * @return String containing the prescript invocation
0910: */
0911: protected String constructPREJob(SubInfo job,
0912: String headNodeURLPrefix, String headNodeDirectory,
0913: String workerNodeDirectory, String slsFile) {
0914:
0915: File headNodeSLS = new File(headNodeDirectory, slsFile);
0916: return mSLS.invocationString(job, headNodeSLS);
0917:
0918: //first we need to get the sls file to worker node
0919: /*
0920: preJob.append( "/bin/echo -e \" " ).
0921: append( headNodeURLPrefix ).append( File.separator ).
0922: append( headNodeDirectory ).append( File.separator ).
0923: append( slsFile ).append( " \\n " ).
0924: append( "file://" ).append( workerNodeDirectory ).append( File.separator ).
0925: append( slsFile ).append( "\"" ).
0926: append( " | " ).append( transfer ).append( " base mnt " );
0927:
0928: preJob.append( " && " );
0929:
0930: //now we need to get transfer to execute this sls file
0931: preJob.append( transfer ).append( " base mnt < " ).append( slsFile );
0932: */
0933:
0934: }
0935:
0936: /**
0937: * Constructs the post job that fetches sls file, and then invokes transfer
0938: * again.
0939: *
0940: * @param job the job for which the prejob is being created
0941: * @param headNodeURLPrefix String
0942: * @param headNodeDirectory String
0943: * @param workerNodeDirectory String
0944: * @param slsFile String
0945: *
0946: * @return String containing the postscript invocation
0947: */
0948: protected String constructPOSTJob(SubInfo job,
0949: String headNodeURLPrefix, String headNodeDirectory,
0950: String workerNodeDirectory, String slsFile) {
0951:
0952: StringBuffer postJob = new StringBuffer();
0953:
0954: //first figure out the path to transfer
0955: //hardcoded for now
0956: String transfer = "/nfs/home/vahi/PEGASUS/default/bin/transfer";
0957:
0958: //no need to figure out proxy as already done in prejob?
0959: String proxy = null;
0960: StringBuffer proxyPath = null;
0961: for (Iterator it = job.getInputFiles().iterator(); it.hasNext();) {
0962: PegasusFile pf = (PegasusFile) it.next();
0963: if (pf instanceof FileTransfer
0964: && pf.getLFN().equals(ENV.X509_USER_PROXY_KEY)) {
0965: //there is a proxy that needs to be set for the job
0966: //actually set it in prejob somehow.
0967: proxy = ((NameValue) ((FileTransfer) pf).getDestURL())
0968: .getValue();
0969: proxy = new File(proxy).getName();
0970: proxyPath = new StringBuffer();
0971: proxyPath.append(headNodeDirectory).append(
0972: File.separator).append(proxy);
0973: job.envVariables.construct(ENV.X509_USER_PROXY_KEY,
0974: proxyPath.toString());
0975: break;
0976: }
0977: }
0978:
0979: //add the command to chmod the proxy
0980: if (proxy != null) {
0981: postJob.append("/bin/bash -c \"chmod 600 ").append(
0982: proxyPath.toString()).append(" && ");
0983: }
0984:
0985: postJob.append(transfer).append(" base mnt ").append(
0986: headNodeDirectory).append(File.separator).append(
0987: slsFile);
0988:
0989: if (proxy != null) {
0990: //add the end quote
0991: postJob.append("\"");
0992: }
0993:
0994: return postJob.toString();
0995: }
0996:
0997: /**
0998: * Writes out the list of filenames file for the job.
0999: *
1000: * @param files the list of <code>PegasusFile</code> objects contains the files
1001: * whose stat information is required.
1002: *
1003: * @param basename the basename of the file that is to be created
1004: *
1005: * @return the full path to lof file created, else null if no file is written out.
1006: */
1007: public String generateListofFilenamesFile(Set files, String basename) {
1008: //sanity check
1009: if (files == null || files.isEmpty()) {
1010: return null;
1011: }
1012:
1013: String result = null;
1014: //writing the stdin file
1015: try {
1016: File f = new File(mSubmitDir, basename);
1017: FileWriter input;
1018: input = new FileWriter(f);
1019: PegasusFile pf;
1020: for (Iterator it = files.iterator(); it.hasNext();) {
1021: pf = (PegasusFile) it.next();
1022: input.write(pf.getLFN());
1023: input.write("\n");
1024: }
1025: //close the stream
1026: input.close();
1027: result = f.getAbsolutePath();
1028:
1029: } catch (IOException e) {
1030: mLogger.log("Unable to write the lof file " + basename, e,
1031: LogManager.ERROR_MESSAGE_LEVEL);
1032: }
1033:
1034: return result;
1035: }
1036:
1037: /**
1038: * Constructs a condor variable in the condor profile namespace
1039: * associated with the job. Overrides any preexisting key values.
1040: *
1041: * @param job contains the job description.
1042: * @param key the key of the profile.
1043: * @param value the associated value.
1044: */
1045: private void construct(SubInfo job, String key, String value) {
1046: job.condorVariables.construct(key, value);
1047: }
1048:
1049: /**
1050: * Condor Quotes a string
1051: *
1052: * @param string the string to be quoted.
1053: *
1054: * @return quoted string.
1055: */
1056: private String quote(String string) {
1057: String result;
1058: try {
1059: mLogger.log("Unquoted Prejob is " + string,
1060: LogManager.DEBUG_MESSAGE_LEVEL);
1061: result = CondorQuoteParser.quote(string, false);
1062: mLogger.log("Quoted Prejob is " + result,
1063: LogManager.DEBUG_MESSAGE_LEVEL);
1064: } catch (CondorQuoteParserException e) {
1065: throw new RuntimeException("CondorQuoting Problem "
1066: + e.getMessage());
1067: }
1068: return result;
1069:
1070: }
1071:
1072: /**
1073: * Adds a /bin/rm post job to kickstart that removes the files passed.
1074: * The post jobs is added as an environment variable.
1075: *
1076: * @param job the job in which the post job needs to be added.
1077: * @param files the files to be deleted.
1078: */
1079: private void addCleanupPostScript(SubInfo job, List files) {
1080: //sanity check
1081: if (files == null || !mDoStat || files.isEmpty()) {
1082: return;
1083: }
1084:
1085: //do not add if job already has a postscript specified
1086: if (job.envVariables.containsKey(this .KICKSTART_CLEANUP)) {
1087: mLogger
1088: .log(
1089: "Not adding lof cleanup as another kickstart cleanup already exists",
1090: LogManager.DEBUG_MESSAGE_LEVEL);
1091: return;
1092: }
1093:
1094: StringBuffer ps = new StringBuffer();
1095: //maybe later we might want to pick it up from the TC
1096: ps.append("/bin/rm -rf").append(" ");
1097: for (Iterator it = files.iterator(); it.hasNext();) {
1098: ps.append(it.next()).append(" ");
1099: }
1100:
1101: job.envVariables.construct(this.KICKSTART_CLEANUP, ps
1102: .toString());
1103:
1104: return;
1105: }
1106:
1107: }
|