001: /**
002: * This file or a portion of this file is licensed under the terms of
003: * the Globus Toolkit Public License, found at $PEGASUS_HOME/GTPL or
004: * http://www.globus.org/toolkit/download/license.html.
005: * This notice must appear in redistributions of this file
006: * with or without modification.
007: *
008: * Redistributions of this Software, with or without modification, must reproduce
009: * the GTPL in:
010: * (1) the Software, or
011: * (2) the Documentation or
012: * some other similar material which is provided with the Software (if any).
013: *
014: * Copyright 1999-2004
015: * University of Chicago and The University of Southern California.
016: * All rights reserved.
017: */package org.griphyn.cPlanner.engine;
018:
019: import org.griphyn.cPlanner.classes.ADag;
020: import org.griphyn.cPlanner.classes.JobManager;
021: import org.griphyn.cPlanner.classes.SiteInfo;
022: import org.griphyn.cPlanner.classes.SubInfo;
023:
024: import org.griphyn.cPlanner.common.LogManager;
025: import org.griphyn.cPlanner.common.PegasusProperties;
026: import org.griphyn.cPlanner.common.UserOptions;
027:
028: import org.griphyn.common.catalog.TransformationCatalogEntry;
029: import org.griphyn.common.catalog.transformation.TCMode;
030:
031: import org.griphyn.common.classes.TCType;
032:
033: import org.griphyn.common.util.DynamicLoader;
034: import org.griphyn.common.util.FactoryException;
035:
036: import java.util.List;
037: import java.util.Iterator;
038: import java.util.Set;
039: import java.util.HashSet;
040: import java.io.File;
041: import org.griphyn.common.util.Separator;
042:
043: /**
044: * This common interface that identifies the basic functions that need to be
045: * implemented to introduce random directories in which the jobs are executed on
046: * the remote execution pools. The implementing classes are invoked when the user
047: * gives the --randomdir option. The implementing classes determine where in the
048: * graph the nodes creating the random directories are placed and their
049: * dependencies with the rest of the nodes in the graph.
050: *
051: * @author Karan Vahi
052: * @author Gaurang Mehta
053: *
054: * @version $Revision: 139 $
055: */
056: public abstract class CreateDirectory extends Engine {
057:
058: /**
059: * Constant suffix for the names of the create directory nodes.
060: */
061: public static final String CREATE_DIR_SUFFIX = "_cdir";
062:
063: /**
064: * The name of the package in which all the implementing classes are.
065: */
066: public static final String PACKAGE_NAME = "org.griphyn.cPlanner.engine.";
067:
068: /**
069: * The transformation namespace for the create dir jobs.
070: */
071: public static final String TRANSFORMATION_NAMESPACE = "pegasus";
072:
073: /**
074: * The logical name of the transformation that creates directories on the
075: * remote execution pools.
076: */
077: public static final String TRANSFORMATION_NAME = "dirmanager";
078:
079: /**
080: * The version number for the derivations for create dir jobs.
081: */
082: public static final String TRANSFORMATION_VERSION = null;
083:
084: /**
085: * The derivation namespace for the create dir jobs.
086: */
087: public static final String DERIVATION_NAMESPACE = "pegasus";
088:
089: /**
090: * The logical name of the transformation that creates directories on the
091: * remote execution pools.
092: */
093: public static final String DERIVATION_NAME = "dirmanager";
094:
095: /**
096: * The version number for the derivations for create dir jobs.
097: */
098: public static final String DERIVATION_VERSION = "1.0";
099:
100: /**
101: * It is a reference to the Concrete Dag so far.
102: */
103: protected ADag mCurrentDag;
104:
105: /**
106: * The handle to the options specified by the user at runtime. The name of
107: * the random directory is picked up from here.
108: */
109: protected UserOptions mUserOpts;
110:
111: /**
112: * The handle to the logging object, that is used to log the messages.
113: */
114: protected LogManager mLogger;
115:
116: /**
117: * A convenience method to return the complete transformation name being
118: * used to construct jobs in this class.
119: *
120: * @return the complete transformation name
121: */
122: public static String getCompleteTranformationName() {
123: return Separator.combine(TRANSFORMATION_NAMESPACE,
124: TRANSFORMATION_NAME, TRANSFORMATION_VERSION);
125: }
126:
127: /**
128: * Loads the implementing class corresponding to the mode specified by the
129: * user at runtime.
130: *
131: * @param className The name of the class that implements the mode. It is the
132: * name of the class, not the complete name with package.
133: * That is added by itself.
134: * @param concDag the workflow.
135: * @param properties the <code>PegasusProperties</code> to be used.
136: *
137: * @return instance of a CreateDirecctory implementation
138: *
139: * @throws FactoryException that nests any error that
140: * might occur during the instantiation of the implementation.
141: */
142: public static CreateDirectory loadCreateDirectoryInstance(
143: String className, ADag concDag, PegasusProperties properties)
144: throws FactoryException {
145:
146: //prepend the package name
147: className = PACKAGE_NAME + className;
148:
149: //try loading the class dynamically
150: CreateDirectory cd = null;
151: DynamicLoader dl = new DynamicLoader(className);
152: try {
153: Object argList[] = new Object[2];
154: argList[0] = concDag;
155: argList[1] = properties;
156: cd = (CreateDirectory) dl.instantiate(argList);
157: } catch (Exception e) {
158: throw new FactoryException(
159: "Instantiating Create Directory", className, e);
160: }
161:
162: return cd;
163: }
164:
165: /**
166: * A pratically nothing constructor !
167: *
168: *
169: * @param properties the <code>PegasusProperties</code> to be used.
170: */
171: protected CreateDirectory(PegasusProperties properties) {
172: super (properties);
173: mCurrentDag = null;
174: mUserOpts = UserOptions.getInstance();
175: mTCHandle = TCMode.loadInstance();
176: mLogger = LogManager.getInstance();
177: }
178:
179: /**
180: * Default constructor.
181: *
182: * @param concDag The concrete dag so far.
183: * @param properties the <code>PegasusProperties</code> to be used.
184: */
185: protected CreateDirectory(ADag concDag, PegasusProperties properties) {
186: super (properties);
187: mCurrentDag = concDag;
188: mUserOpts = UserOptions.getInstance();
189: mTCHandle = TCMode.loadInstance();
190: mLogger = LogManager.getInstance();
191: }
192:
193: /**
194: * It modifies the concrete dag passed in the constructor and adds the create
195: * random directory nodes to it at the root level. These directory nodes have
196: * a common child that acts as a concatenating job and ensures that Condor
197: * does not start staging in the data before the directories have been added.
198: * The root nodes in the unmodified dag are now chidren of this concatenating
199: * dummy job.
200: */
201: public abstract void addCreateDirectoryNodes();
202:
203: /**
204: * It returns the name of the create directory job, that is to be assigned.
205: * The name takes into account the workflow name while constructing it, as
206: * that is thing that can guarentee uniqueness of name in case of deferred
207: * planning.
208: *
209: * @param pool the execution pool for which the create directory job
210: * is responsible.
211: *
212: * @return String corresponding to the name of the job.
213: */
214: protected String getCreateDirJobName(String pool) {
215: StringBuffer sb = new StringBuffer();
216: sb.append(mCurrentDag.dagInfo.nameOfADag).append("_").append(
217: mCurrentDag.dagInfo.index).append("_").append(pool)
218: .append(this .CREATE_DIR_SUFFIX);
219:
220: return sb.toString();
221: }
222:
223: /**
224: * Retrieves the sites for which the create dir jobs need to be created.
225: * It returns all the sites where the compute jobs have been scheduled.
226: *
227: *
228: * @return a Set containing a list of siteID's of the sites where the
229: * dag has to be run.
230: */
231: protected Set getCreateDirSites() {
232: Set set = new HashSet();
233:
234: for (Iterator it = mCurrentDag.vJobSubInfos.iterator(); it
235: .hasNext();) {
236: SubInfo job = (SubInfo) it.next();
237: //add to the set only if the job is
238: //being run in the work directory
239: //this takes care of local site create dir
240: if (job.runInWorkDirectory()) {
241: set.add(job.executionPool);
242: }
243: }
244:
245: //remove the stork pool
246: set.remove("stork");
247:
248: return set;
249: }
250:
251: /**
252: * It creates a make directory job that creates a directory on the remote pool
253: * using the perl executable that Gaurang wrote. It access mkdir underneath.
254: * It gets the name of the random directory from the Pool handle. This method
255: * does not update the internal graph structure of the workflow to add the
256: * node. That is done separately.
257: *
258: * @param execPool the execution pool for which the create dir job is to be
259: * created.
260: * @param jobName the name that is to be assigned to the job.
261: *
262: * @return create dir job.
263: */
264: protected SubInfo makeCreateDirJob(String execPool, String jobName) {
265: SubInfo newJob = new SubInfo();
266: List entries = null;
267: String execPath = null;
268: TransformationCatalogEntry entry = null;
269: JobManager jobManager = null;
270:
271: try {
272: entries = mTCHandle.getTCEntries(
273: this .TRANSFORMATION_NAMESPACE,
274: this .TRANSFORMATION_NAME,
275: this .TRANSFORMATION_VERSION, execPool,
276: TCType.INSTALLED);
277: } catch (Exception e) {
278: //non sensical catching
279: mLogger.log("Unable to retrieve entries from TC "
280: + e.getMessage(), LogManager.DEBUG_MESSAGE_LEVEL);
281: }
282:
283: entry = (entries == null) ? this .defaultTCEntry(execPool) : //try using a default one
284: (TransformationCatalogEntry) entries.get(0);
285:
286: if (entry == null) {
287: //NOW THROWN AN EXCEPTION
288:
289: //should throw a TC specific exception
290: StringBuffer error = new StringBuffer();
291: error.append("Could not find entry in tc for lfn ").append(
292: this .getCompleteTranformationName()).append(
293: " at site ").append(execPool);
294:
295: mLogger.log(error.toString(),
296: LogManager.ERROR_MESSAGE_LEVEL);
297: throw new RuntimeException(error.toString());
298: }
299:
300: execPath = entry.getPhysicalTransformation();
301:
302: SiteInfo ePool = mPoolHandle.getPoolEntry(execPool, "transfer");
303: jobManager = ePool.selectJobManager("transfer", true);
304: String argString = "--create --dir "
305: + mPoolHandle.getExecPoolWorkDir(execPool);
306:
307: newJob.jobName = jobName;
308: newJob.setTransformation(this .TRANSFORMATION_NAMESPACE,
309: this .TRANSFORMATION_NAME, this .TRANSFORMATION_VERSION);
310: newJob.setDerivation(this .DERIVATION_NAMESPACE,
311: this .DERIVATION_NAME, this .DERIVATION_VERSION);
312: newJob.condorUniverse = "vanilla";
313: newJob.globusScheduler = jobManager.getInfo(JobManager.URL);
314: newJob.executable = execPath;
315: newJob.executionPool = execPool;
316: newJob.strargs = argString;
317: newJob.jobClass = SubInfo.CREATE_DIR_JOB;
318: newJob.jobID = jobName;
319:
320: //the profile information from the pool catalog needs to be
321: //assimilated into the job.
322: newJob.updateProfiles(mPoolHandle
323: .getPoolProfile(newJob.executionPool));
324:
325: //the profile information from the transformation
326: //catalog needs to be assimilated into the job
327: //overriding the one from pool catalog.
328: newJob.updateProfiles(entry);
329:
330: //the profile information from the properties file
331: //is assimilated overidding the one from transformation
332: //catalog.
333: newJob.updateProfiles(mProps);
334:
335: return newJob;
336:
337: }
338:
339: /**
340: * Returns a default TC entry to be used in case entry is not found in the
341: * transformation catalog.
342: *
343: * @param site the site for which the default entry is required.
344: *
345: *
346: * @return the default entry.
347: */
348: private TransformationCatalogEntry defaultTCEntry(String site) {
349: TransformationCatalogEntry defaultTCEntry = null;
350: //check if PEGASUS_HOME is set
351: String home = mPoolHandle.getPegasusHome(site);
352: //if PEGASUS_HOME is not set, use VDS_HOME
353: home = (home == null) ? mPoolHandle.getVDS_HOME(site) : home;
354:
355: mLogger.log("Creating a default TC entry for "
356: + this .getCompleteTranformationName() + " at site "
357: + site, LogManager.DEBUG_MESSAGE_LEVEL);
358:
359: //if home is still null
360: if (home == null) {
361: //cannot create default TC
362: mLogger.log("Unable to create a default entry for "
363: + this .getCompleteTranformationName(),
364: LogManager.DEBUG_MESSAGE_LEVEL);
365: //set the flag back to true
366: return defaultTCEntry;
367: }
368:
369: //remove trailing / if specified
370: home = (home.charAt(home.length() - 1) == File.separatorChar) ? home
371: .substring(0, home.length() - 1)
372: : home;
373:
374: //construct the path to it
375: StringBuffer path = new StringBuffer();
376: path.append(home).append(File.separator).append("bin").append(
377: File.separator).append(this .TRANSFORMATION_NAME);
378:
379: defaultTCEntry = new TransformationCatalogEntry(
380: this .TRANSFORMATION_NAMESPACE,
381: this .TRANSFORMATION_NAME, this .TRANSFORMATION_VERSION);
382:
383: defaultTCEntry.setPhysicalTransformation(path.toString());
384: defaultTCEntry.setResourceId(site);
385: defaultTCEntry.setType(TCType.INSTALLED);
386:
387: //register back into the transformation catalog
388: //so that we do not need to worry about creating it again
389: try {
390: mTCHandle.addTCEntry(defaultTCEntry, false);
391: } catch (Exception e) {
392: //just log as debug. as this is more of a performance improvement
393: //than anything else
394: mLogger.log(
395: "Unable to register in the TC the default entry "
396: + defaultTCEntry.getLogicalTransformation()
397: + " for site " + site, e,
398: LogManager.DEBUG_MESSAGE_LEVEL);
399: }
400:
401: return defaultTCEntry;
402: }
403:
404: }
|