001: /**
002: * This file or a portion of this file is licensed under the terms of
003: * the Globus Toolkit Public License, found in file GTPL, or at
004: * http://www.globus.org/toolkit/download/license.html. This notice must
005: * appear in redistributions of this file, with or without modification.
006: *
007: * Redistributions of this Software, with or without modification, must
008: * reproduce the GTPL in: (1) the Software, or (2) the Documentation or
009: * some other similar material which is provided with the Software (if
010: * any).
011: *
012: * Copyright 1999-2004 University of Chicago and The University of
013: * Southern California. All rights reserved.
014: */package org.griphyn.cPlanner.cluster.aggregator;
015:
016: import org.griphyn.cPlanner.cluster.JobAggregator;
017:
018: import org.griphyn.cPlanner.classes.ADag;
019: import org.griphyn.cPlanner.classes.SubInfo;
020: import org.griphyn.cPlanner.classes.AggregatedJob;
021:
022: import org.griphyn.cPlanner.common.PegasusProperties;
023:
024: import org.griphyn.cPlanner.namespace.VDS;
025:
026: import org.griphyn.cPlanner.code.gridstart.GridStartFactory;
027:
028: import java.util.List;
029: import java.util.Iterator;
030: import org.griphyn.cPlanner.code.GridStart;
031: import org.griphyn.cPlanner.namespace.Condor;
032: import org.griphyn.cPlanner.classes.SiteInfo;
033: import org.griphyn.cPlanner.classes.PegasusBag;
034:
035: /**
036: * This class aggregates the smaller jobs in a manner such that
037: * they are launched at remote end, by mpiexec on n nodes where n is the nodecount
038: * associated with the aggregated job that is being lauched by mpiexec.
039: * The executable mpiexec is a VDS tool distributed in the VDS worker package, and
040: * can be usually found at $PEGASUS_HOME/bin/mpiexec.
041: *
042: * @author Karan Vahi vahi@isi.edu
043: * @version $Revision: 450 $
044: */
045:
046: public class MPIExec extends Abstract {
047:
048: /**
049: * The logical name of the transformation that is able to run multiple
050: * jobs sequentially.
051: */
052: public static final String COLLAPSE_LOGICAL_NAME = "mpiexec";
053:
054: /**
055: * The default constructor.
056: */
057: public MPIExec() {
058: super ();
059: }
060:
061: /**
062: *Initializes the JobAggregator impelementation
063: *
064: * @param dag the workflow that is being clustered.
065: * @param bag the bag of objects that is useful for initialization.
066: *
067: *
068: */
069: public void initialize(ADag dag, PegasusBag bag) {
070: super .initialize(dag, bag);
071: }
072:
073: /**
074: * Constructs a new aggregated job that contains all the jobs passed to it.
075: * The new aggregated job, appears as a single job in the workflow and
076: * replaces the jobs it contains in the workflow.
077: * <p>
078: * The aggregated job is executed at a site, using mpiexec that
079: * executes each of the smaller jobs in the aggregated job on n number of
080: * nodes where n is the nodecount associated with the job.
081: * All the sub jobs are in turn launched via kickstart if kickstart is
082: * installed at the site where the job resides.
083: *
084: * @param jobs the list of <code>SubInfo</code> objects that need to be
085: * collapsed. All the jobs being collapsed should be scheduled
086: * at the same pool, to maintain correct semantics.
087: * @param name the logical name of the jobs in the list passed to this
088: * function.
089: * @param id the id that is given to the new job.
090: *
091: *
092: * @return the <code>AggregatedJob</code> object corresponding to the aggregated
093: * job containing the jobs passed as List in the input,
094: * null if the list of jobs is empty
095: */
096: public AggregatedJob construct(List jobs, String name, String id) {
097: AggregatedJob mergedJob = super .construct(jobs, name, id);
098: //also put in jobType as mpi
099: mergedJob.globusRSL.checkKeyInNS("jobtype", "mpi");
100:
101: //ensure that AggregatedJob is invoked via NoGridStart
102: mergedJob.vdsNS
103: .construct(
104: VDS.GRIDSTART_KEY,
105: GridStartFactory.GRIDSTART_SHORT_NAMES[GridStartFactory.NO_GRIDSTART_INDEX]);
106:
107: return mergedJob;
108: }
109:
110: /**
111: * Enables the constitutent jobs that make up a aggregated job. Makes sure
112: * that they all are enabled via no kickstart
113: *
114: * @param mergedJob the clusteredJob
115: * @param jobs the constitutent jobs
116: *
117: * @return AggregatedJob
118: */
119: protected AggregatedJob enable(AggregatedJob mergedJob, List jobs) {
120: //we cannot invoke any of clustered jobs also via kickstart
121: //as the output will be clobbered
122: SubInfo firstJob = (SubInfo) jobs.get(0);
123: SiteInfo site = mSiteHandle.getPoolEntry(firstJob
124: .getSiteHandle(), Condor.VANILLA_UNIVERSE);
125:
126: firstJob.vdsNS
127: .construct(
128: VDS.GRIDSTART_KEY,
129: GridStartFactory.GRIDSTART_SHORT_NAMES[GridStartFactory.NO_GRIDSTART_INDEX]);
130:
131: GridStart gridStart = mGridStartFactory.loadGridStart(firstJob,
132: site.getKickstartPath());
133:
134: return gridStart.enable(mergedJob, jobs);
135: }
136:
137: /**
138: * Returns the logical name of the transformation that is used to
139: * collapse the jobs.
140: *
141: * @return the the logical name of the collapser executable.
142: * @see #COLLAPSE_LOGICAL_NAME
143: */
144: public String getCollapserLFN() {
145: return COLLAPSE_LOGICAL_NAME;
146: }
147:
148: /**
149: * Determines whether there is NOT an entry in the transformation catalog
150: * for the job aggregator executable on a particular site.
151: *
152: * @param site the site at which existence check is required.
153: *
154: * @return boolean true if an entry does not exists, false otherwise.
155: */
156: public boolean entryNotInTC(String site) {
157: return this .entryNotInTC(this .TRANSFORMATION_NAMESPACE,
158: COLLAPSE_LOGICAL_NAME, this .TRANSFORMATION_VERSION,
159: site);
160: }
161:
162: /**
163: * Returns the arguments with which the <code>AggregatedJob</code>
164: * needs to be invoked with. At present any empty argument string is
165: * returned.
166: *
167: * @param job the <code>AggregatedJob</code> for which the arguments have
168: * to be constructed.
169: *
170: * @return argument string
171: */
172: public String aggregatedJobArguments(AggregatedJob job) {
173: return "";
174: }
175:
176: /**
177: * Setter method to indicate , failure on first consitutent job should
178: * result in the abort of the whole aggregated job. Ignores any value
179: * passed, as MPIExec does not handle it for time being.
180: *
181: * @param fail indicates whether to abort or not .
182: */
183: public void setAbortOnFirstJobFailure(boolean fail) {
184:
185: }
186:
187: /**
188: * Returns a boolean indicating whether to fail the aggregated job on
189: * detecting the first failure during execution of constituent jobs.
190: *
191: * @return boolean indicating whether to fail or not.
192: */
193: public boolean abortOnFristJobFailure() {
194: return false;
195: }
196:
197: }
|