001: /**
002: * This file or a portion of this file is licensed under the terms of
003: * the Globus Toolkit Public License, found in file GTPL, or at
004: * http://www.globus.org/toolkit/download/license.html. This notice must
005: * appear in redistributions of this file, with or without modification.
006: *
007: * Redistributions of this Software, with or without modification, must
008: * reproduce the GTPL in: (1) the Software, or (2) the Documentation or
009: * some other similar material which is provided with the Software (if
010: * any).
011: *
012: * Copyright 1999-2004 University of Chicago and The University of
013: * Southern California. All rights reserved.
014: */package org.griphyn.cPlanner.cluster.aggregator;
015:
016: import org.griphyn.cPlanner.cluster.JobAggregator;
017:
018: import org.griphyn.cPlanner.code.gridstart.GridStartFactory;
019: import org.griphyn.cPlanner.code.gridstart.ExitPOST;
020:
021: import org.griphyn.cPlanner.classes.ADag;
022: import org.griphyn.cPlanner.classes.AggregatedJob;
023: import org.griphyn.cPlanner.classes.SubInfo;
024:
025: import org.griphyn.cPlanner.common.PegasusProperties;
026: import org.griphyn.cPlanner.common.LogManager;
027:
028: import org.griphyn.cPlanner.namespace.VDS;
029: import org.griphyn.cPlanner.namespace.Dagman;
030:
031: import java.util.List;
032:
033: import java.io.File;
034: import org.griphyn.cPlanner.namespace.Condor;
035: import org.griphyn.cPlanner.code.GridStart;
036: import org.griphyn.cPlanner.classes.SiteInfo;
037: import org.griphyn.cPlanner.classes.PegasusBag;
038:
039: /**
040: * This class aggregates the smaller jobs in a manner such that
041: * they are launched at remote end, sequentially on a single node using
042: * seqexec. The executable seqexec is a VDS tool distributed in the VDS worker
043: * package, and can be usually found at $PEGASUS_HOME/bin/seqexec.
044: *
045: * @author Karan Vahi vahi@isi.edu
046: * @version $Revision: 451 $
047: */
048:
049: public class SeqExec extends Abstract {
050:
051: /**
052: * The logical name of the transformation that is able to run multiple
053: * jobs sequentially.
054: */
055: public static final String COLLAPSE_LOGICAL_NAME = "seqexec";
056:
057: /**
058: * The suffix to be applied to seqexec progress report file.
059: */
060: public static final String SEQEXEC_PROGRESS_REPORT_SUFFIX = ".prg";
061:
062: /**
063: * Flag indicating whether a global log file or per job file.
064: */
065: private boolean mGlobalLog;
066:
067: /**
068: * Flag indicating whether to fail on first hard error or not.
069: */
070: private boolean mFailOnFirstError;
071:
072: /**
073: * The default constructor.
074: */
075: public SeqExec() {
076: super ();
077: }
078:
079: /**
080: *Initializes the JobAggregator impelementation
081: *
082: * @param dag the workflow that is being clustered.
083: * @param bag the bag of objects that is useful for initialization.
084: *
085: */
086: public void initialize(ADag dag, PegasusBag bag) {
087: super .initialize(dag, bag);
088: mFailOnFirstError = false;
089: mGlobalLog = bag.getPegasusProperties()
090: .jobAggregatorLogGlobal();
091: }
092:
093: /**
094: * Constructs a new aggregated job that contains all the jobs passed to it.
095: * The new aggregated job, appears as a single job in the workflow and
096: * replaces the jobs it contains in the workflow.
097: * <p>
098: * The seqexec uses kickstart to invoke each of the smaller constituent
099: * jobs. The kickstart output appears on the stdout of the seqexec. Hence,
100: * the seqexec itself is not being kickstarted. At the same time, appropriate
101: * postscript is constructed to be invoked on the job.
102: *
103: * @param jobs the list of <code>SubInfo</code> objects that need to be
104: * collapsed. All the jobs being collapsed should be scheduled
105: * at the same pool, to maintain correct semantics.
106: * @param name the logical name of the jobs in the list passed to this
107: * function.
108: * @param id the id that is given to the new job.
109: *
110: *
111: * @return the <code>AggregatedJob</code> object corresponding to the aggregated
112: * job containing the jobs passed as List in the input,
113: * null if the list of jobs is empty
114: */
115: public AggregatedJob construct(List jobs, String name, String id) {
116: AggregatedJob mergedJob = super .construct(jobs, name, id);
117: //ensure that AggregatedJob is invoked via NoGridStart
118: mergedJob.vdsNS
119: .construct(
120: VDS.GRIDSTART_KEY,
121: GridStartFactory.GRIDSTART_SHORT_NAMES[GridStartFactory.NO_GRIDSTART_INDEX]);
122:
123: SubInfo firstJob = (SubInfo) jobs.get(0);
124: StringBuffer message = new StringBuffer();
125: message.append(" POSTScript for merged job ").append(
126: mergedJob.getName()).append(" ");
127:
128: //should we tinker with the postscript for this job
129: if (mergedJob.dagmanVariables
130: .containsKey(Dagman.POST_SCRIPT_KEY)) {
131: //no merged job has been set to have a specific post script
132: //no tinkering
133: } else {
134: //we need to tinker
135: //gridstart is always populated
136: String gridstart = (String) firstJob.vdsNS
137: .get(VDS.GRIDSTART_KEY);
138: if (gridstart
139: .equalsIgnoreCase(GridStartFactory.GRIDSTART_SHORT_NAMES[GridStartFactory.KICKSTART_INDEX])) {
140: //ensure $PEGASUS_HOME/bin/exitpost is invoked
141: //as the baby jobs are being invoked by kickstart
142: mergedJob.dagmanVariables.construct(
143: Dagman.POST_SCRIPT_KEY, ExitPOST.SHORT_NAME);
144: }
145: }
146: message.append(mergedJob.dagmanVariables
147: .get(Dagman.POST_SCRIPT_KEY));
148: mLogger.log(message.toString(), LogManager.DEBUG_MESSAGE_LEVEL);
149:
150: System.out.println("Input files of merged job "
151: + mergedJob.getInputFiles());
152: return mergedJob;
153: }
154:
155: /**
156: * Enables the constitutent jobs that make up a aggregated job.
157: *
158: * @param mergedJob the clusteredJob
159: * @param jobs the constitutent jobs
160: *
161: * @return AggregatedJob
162: */
163: protected AggregatedJob enable(AggregatedJob mergedJob, List jobs) {
164: SubInfo firstJob = (SubInfo) jobs.get(0);
165: SiteInfo site = mSiteHandle.getPoolEntry(firstJob
166: .getSiteHandle(), Condor.VANILLA_UNIVERSE);
167: GridStart gridStart = mGridStartFactory.loadGridStart(firstJob,
168: site.getKickstartPath());
169:
170: //explicitly set the gridstart key
171: //so as to enable the correct generation of the postscript for
172: //the aggregated job
173: firstJob.vdsNS.construct(VDS.GRIDSTART_KEY, gridStart
174: .getVDSKeyValue());
175:
176: return gridStart.enable(mergedJob, jobs);
177: }
178:
179: /**
180: * Returns the logical name of the transformation that is used to
181: * collapse the jobs.
182: *
183: * @return the the logical name of the collapser executable.
184: * @see #COLLAPSE_LOGICAL_NAME
185: */
186: public String getCollapserLFN() {
187: return COLLAPSE_LOGICAL_NAME;
188: }
189:
190: /**
191: * Determines whether there is NOT an entry in the transformation catalog
192: * for the job aggregator executable on a particular site.
193: *
194: * @param site the site at which existence check is required.
195: *
196: * @return boolean true if an entry does not exists, false otherwise.
197: */
198: public boolean entryNotInTC(String site) {
199: return this .entryNotInTC(this .TRANSFORMATION_NAMESPACE,
200: COLLAPSE_LOGICAL_NAME, this .TRANSFORMATION_VERSION,
201: site);
202: }
203:
204: /**
205: * Returns the arguments with which the <code>AggregatedJob</code>
206: * needs to be invoked with.
207: *
208: * @param job the <code>AggregatedJob</code> for which the arguments have
209: * to be constructed.
210: *
211: * @return argument string
212: */
213: public String aggregatedJobArguments(AggregatedJob job) {
214: StringBuffer arguments = new StringBuffer();
215:
216: //do we need to fail hard on first error
217: if (this .abortOnFristJobFailure()) {
218: arguments.append(" -f ");
219: }
220:
221: //track the progress of the seqexec job
222: arguments.append(" -R ").append(logFile(job));
223:
224: return arguments.toString();
225: }
226:
227: /**
228: * Setter method to indicate , failure on first consitutent job should
229: * result in the abort of the whole aggregated job. Ignores any value
230: * passed, as MPIExec does not handle it for time being.
231: *
232: * @param fail indicates whether to abort or not .
233: */
234: public void setAbortOnFirstJobFailure(boolean fail) {
235: mFailOnFirstError = fail;
236: }
237:
238: /**
239: * Returns a boolean indicating whether to fail the aggregated job on
240: * detecting the first failure during execution of constituent jobs.
241: *
242: * @return boolean indicating whether to fail or not.
243: */
244: public boolean abortOnFristJobFailure() {
245: return mFailOnFirstError;
246: }
247:
248: /**
249: * Returns the name of the log file to used on the remote site, for the
250: * seqexec job. Depending upon the property settings, either assigns a
251: * common
252: *
253: *
254: * @param job the <code>AggregatedJob</code>
255: *
256: * @return the path to the log file.
257: */
258: protected String logFile(AggregatedJob job) {
259: StringBuffer sb = new StringBuffer(32);
260: if (mGlobalLog) {
261: //the basename of the log file is derived from the dag name
262: sb.append(this .mClusteredADag.dagInfo.getLabel());
263: } else {
264: //per seqexec job name
265: sb.append(job.getName());
266: }
267: sb.append(this.SEQEXEC_PROGRESS_REPORT_SUFFIX);
268: return sb.toString();
269: }
270: }
|