001: /*
002: * This file or a portion of this file is licensed under the terms of
003: * the Globus Toolkit Public License, found in file GTPL, or at
004: * http://www.globus.org/toolkit/download/license.html. This notice must
005: * appear in redistributions of this file, with or without modification.
006: *
007: * Redistributions of this Software, with or without modification, must
008: * reproduce the GTPL in: (1) the Software, or (2) the Documentation or
009: * some other similar material which is provided with the Software (if
010: * any).
011: *
012: * Copyright 1999-2004 University of Chicago and The University of
013: * Southern California. All rights reserved.
014: */
015: package org.griphyn.cPlanner.engine;
016:
017: import org.griphyn.cPlanner.classes.ADag;
018: import org.griphyn.cPlanner.classes.JobManager;
019: import org.griphyn.cPlanner.classes.SiteInfo;
020: import org.griphyn.cPlanner.classes.SubInfo;
021:
022: import org.griphyn.cPlanner.common.LogManager;
023: import org.griphyn.cPlanner.common.PegasusProperties;
024:
025: import org.griphyn.common.catalog.TransformationCatalogEntry;
026: import org.griphyn.common.catalog.transformation.TCMode;
027:
028: import org.griphyn.common.classes.TCType;
029:
030: import org.griphyn.common.util.Separator;
031:
032: import java.util.Iterator;
033: import java.util.Set;
034: import java.util.HashSet;
035: import java.util.List;
036: import java.io.File;
037:
038: /**
039: * Ends up creating a cleanup dag that deletes the remote directories that
040: * were created by the create dir jobs. The cleanup dag is generated in a
041: * sub directory from the main directory containing the submit files of the
042: * dag. The dag consists of independant jobs, with each job responsible for
043: * deleting directory for a execution pool. The current way of generating the
044: * dag is tied to the fact, that the directories in which a job are executed
045: * is tied to the pool not the job itself.
046: *
047: * @author Karan Vahi
048: * @version $Revision: 428 $
049: * @see CreateDirectory
050: */
051: public class RemoveDirectory extends Engine {
052:
053: /**
054: * The prefix that is attached to the name of the dag for which the
055: * cleanup Dag is being generated, to generate the name of the cleanup
056: * Dag.
057: */
058: public static final String CLEANUP_DAG_PREFIX = "del_";
059:
060: /**
061: * Constant suffix for the names of the remote directory nodes.
062: */
063: public static final String REMOVE_DIR_SUFFIX = "_rdir";
064:
065: /**
066: * The logical name of the transformation that removes directories on the
067: * remote execution pools.
068: */
069: public static final String TRANSFORMATION_NAME = "dirmanager";
070:
071: /**
072: * The transformation namespace for the create dir jobs.
073: */
074: public static final String TRANSFORMATION_NAMESPACE = "pegasus";
075:
076: /**
077: * The version number for the derivations for create dir jobs.
078: */
079: public static final String TRANSFORMATION_VERSION = null;
080:
081: /**
082: * The derivation namespace for the create dir jobs.
083: */
084: public static final String DERIVATION_NAMESPACE = "pegasus";
085:
086: /**
087: * The logical name of the transformation that removes directories on the
088: * remote execution pools.
089: */
090: public static final String DERIVATION_NAME = "dirmanager";
091:
092: /**
093: * The version number for the derivations for create dir jobs.
094: */
095: public static final String DERIVATION_VERSION = "1.0";
096:
097: /**
098: * The concrete dag so far, for which the clean up dag needs to be generated.
099: */
100: private ADag mConcDag;
101:
102: /**
103: * A convenience method to return the complete transformation name being
104: * used to construct jobs in this class.
105: *
106: * @return the complete transformation name
107: */
108: public static String getCompleteTranformationName() {
109: return Separator.combine(TRANSFORMATION_NAMESPACE,
110: TRANSFORMATION_NAME, TRANSFORMATION_VERSION);
111: }
112:
113: /**
114: * The overloaded constructor that sets the dag for which we have to
115: * generated the cleanup dag for.
116: *
117: * @param concDag the concrete dag for which cleanup is reqd.
118: * @param properties the <code>PegasusProperties</code> to be used.
119: */
120: public RemoveDirectory(ADag concDag, PegasusProperties properties) {
121: super (properties);
122: mConcDag = concDag;
123: mTCHandle = TCMode.loadInstance();
124: }
125:
126: /**
127: * Generates a cleanup DAG for the dag associated with the class. Creates a
128: * cleanup node per remote pool. It looks at the ADAG, to determine the
129: * sites at which the jobs in the dag have been scheduled.
130: *
131: * @return the cleanup DAG.
132: * @see org.griphyn.cPlanner.classes.ADag#getExecutionSites()
133: */
134: public ADag generateCleanUPDAG() {
135: return this .generateCleanUPDAG(mConcDag);
136: }
137:
138: /**
139: * Generates a cleanup DAG for the dag object passed. Creates a cleanup
140: * node per remote pool. It looks at the ADAG, to determine the sites at
141: * which the jobs in the dag have been scheduled.
142: *
143: * @param dag the dag for which cleanup dag needs to be generated.
144: *
145: * @return the cleanup DAG.
146: * @see org.griphyn.cPlanner.classes.ADag#getExecutionSites()
147: */
148: public ADag generateCleanUPDAG(ADag dag) {
149: ADag cDAG = new ADag();
150: cDAG.dagInfo.nameOfADag = this .CLEANUP_DAG_PREFIX
151: + dag.dagInfo.nameOfADag;
152: cDAG.dagInfo.index = dag.dagInfo.index;
153:
154: Set pools = this .getCreateDirSites(dag);
155: String pool = null;
156: String jobName = null;
157:
158: //remove the entry for the local pool
159: //pools.remove("local");
160:
161: for (Iterator it = pools.iterator(); it.hasNext();) {
162: pool = (String) it.next();
163: jobName = getRemoveDirJobName(dag, pool);
164: cDAG.add(makeRemoveDirJob(pool, jobName));
165: }
166:
167: return cDAG;
168: }
169:
170: /**
171: * Retrieves the sites for which the create dir jobs need to be created.
172: * It returns all the sites where the compute jobs have been scheduled.
173: *
174: * @param dag the workflow for which the sites have to be computed.
175: *
176: * @return a Set containing a list of siteID's of the sites where the
177: * dag has to be run.
178: */
179: public Set getCreateDirSites(ADag dag) {
180: Set set = new HashSet();
181:
182: for (Iterator it = dag.vJobSubInfos.iterator(); it.hasNext();) {
183: SubInfo job = (SubInfo) it.next();
184: //add to the set only if the job is
185: //being run in the work directory
186: //this takes care of local site create dir
187: if (job.runInWorkDirectory()) {
188: set.add(job.executionPool);
189: }
190: }
191:
192: //remove the stork pool
193: set.remove("stork");
194:
195: return set;
196: }
197:
198: /**
199: * It returns the name of the remove directory job, that is to be assigned.
200: * The name takes into account the workflow name while constructing it, as
201: * that is thing that can guarentee uniqueness of name in case of deferred
202: * planning.
203: *
204: * @param dag the dag for which the cleanup DAG is being generated.
205: * @param pool the execution pool for which the remove directory job
206: * is responsible.
207: *
208: * @return String corresponding to the name of the job.
209: */
210: private String getRemoveDirJobName(ADag dag, String pool) {
211: StringBuffer sb = new StringBuffer();
212: sb.append(dag.dagInfo.nameOfADag).append("_").append(
213: dag.dagInfo.index).append("_").append(pool).append(
214: this .REMOVE_DIR_SUFFIX);
215:
216: return sb.toString();
217: }
218:
219: /**
220: * It creates a remove directory job that creates a directory on the remote pool
221: * using the perl executable that Gaurang wrote. It access mkdir underneath.
222: * It gets the name of the random directory from the Pool handle.
223: *
224: * @param execPool the execution pool for which the create dir job is to be
225: * created.
226: * @param jobName the name that is to be assigned to the job.
227: *
228: * @return the remove dir job.
229: */
230: private SubInfo makeRemoveDirJob(String execPool, String jobName) {
231: SubInfo newJob = new SubInfo();
232: List entries = null;
233: String execPath = null;
234: TransformationCatalogEntry entry = null;
235: JobManager jm = null;
236:
237: try {
238: entries = mTCHandle.getTCEntries(
239: this .TRANSFORMATION_NAMESPACE,
240: this .TRANSFORMATION_NAME,
241: this .TRANSFORMATION_VERSION, execPool,
242: TCType.INSTALLED);
243: } catch (Exception e) {
244: //non sensical catching
245: mLogger.log("Unable to retrieve entry from TC "
246: + e.getMessage(), LogManager.ERROR_MESSAGE_LEVEL);
247: }
248: entry = (entries == null) ? this .defaultTCEntry(execPool) : //try using a default one
249: (TransformationCatalogEntry) entries.get(0);
250:
251: if (entry == null) {
252: //NOW THROWN AN EXCEPTION
253:
254: //should throw a TC specific exception
255: StringBuffer error = new StringBuffer();
256: error.append("Could not find entry in tc for lfn ").append(
257: this .getCompleteTranformationName()).append(
258: " at site ").append(execPool);
259:
260: mLogger.log(error.toString(),
261: LogManager.ERROR_MESSAGE_LEVEL);
262: throw new RuntimeException(error.toString());
263: }
264: execPath = entry.getPhysicalTransformation();
265:
266: SiteInfo ePool = mPoolHandle.getPoolEntry(execPool, "transfer");
267: jm = ePool.selectJobManager("transfer", true);
268: String argString = "--verbose --remove --dir "
269: + mPoolHandle.getExecPoolWorkDir(execPool);
270:
271: newJob.jobName = jobName;
272: newJob.setTransformation(this .TRANSFORMATION_NAMESPACE,
273: this .TRANSFORMATION_NAME, this .TRANSFORMATION_VERSION);
274:
275: newJob.setDerivation(this .DERIVATION_NAMESPACE,
276: this .DERIVATION_NAME, this .DERIVATION_VERSION);
277:
278: newJob.condorUniverse = "vanilla";
279: newJob.globusScheduler = jm.getInfo(JobManager.URL);
280: newJob.executable = execPath;
281: newJob.executionPool = execPool;
282: newJob.strargs = argString;
283: newJob.jobClass = SubInfo.CREATE_DIR_JOB;
284: newJob.jobID = jobName;
285:
286: //the profile information from the pool catalog needs to be
287: //assimilated into the job.
288: newJob.updateProfiles(mPoolHandle
289: .getPoolProfile(newJob.executionPool));
290:
291: //the profile information from the transformation
292: //catalog needs to be assimilated into the job
293: //overriding the one from pool catalog.
294: newJob.updateProfiles(entry);
295:
296: //the profile information from the properties file
297: //is assimilated overidding the one from transformation
298: //catalog.
299: newJob.updateProfiles(mProps);
300:
301: return newJob;
302:
303: }
304:
305: /**
306: * Returns a default TC entry to be used in case entry is not found in the
307: * transformation catalog.
308: *
309: * @param site the site for which the default entry is required.
310: *
311: *
312: * @return the default entry.
313: */
314: private TransformationCatalogEntry defaultTCEntry(String site) {
315: TransformationCatalogEntry defaultTCEntry = null;
316: //check if PEGASUS_HOME is set
317: String home = mPoolHandle.getPegasusHome(site);
318: //if PEGASUS_HOME is not set, use VDS_HOME
319: home = (home == null) ? mPoolHandle.getVDS_HOME(site) : home;
320:
321: mLogger.log("Creating a default TC entry for "
322: + this .getCompleteTranformationName() + " at site "
323: + site, LogManager.DEBUG_MESSAGE_LEVEL);
324:
325: //if home is still null
326: if (home == null) {
327: //cannot create default TC
328: mLogger.log("Unable to create a default entry for "
329: + this .getCompleteTranformationName(),
330: LogManager.DEBUG_MESSAGE_LEVEL);
331: //set the flag back to true
332: return defaultTCEntry;
333: }
334:
335: //remove trailing / if specified
336: home = (home.charAt(home.length() - 1) == File.separatorChar) ? home
337: .substring(0, home.length() - 1)
338: : home;
339:
340: //construct the path to it
341: StringBuffer path = new StringBuffer();
342: path.append(home).append(File.separator).append("bin").append(
343: File.separator).append(this .TRANSFORMATION_NAME);
344:
345: defaultTCEntry = new TransformationCatalogEntry(
346: this .TRANSFORMATION_NAMESPACE,
347: this .TRANSFORMATION_NAME, this .TRANSFORMATION_VERSION);
348:
349: defaultTCEntry.setPhysicalTransformation(path.toString());
350: defaultTCEntry.setResourceId(site);
351: defaultTCEntry.setType(TCType.INSTALLED);
352:
353: //register back into the transformation catalog
354: //so that we do not need to worry about creating it again
355: try {
356: mTCHandle.addTCEntry(defaultTCEntry, false);
357: } catch (Exception e) {
358: //just log as debug. as this is more of a performance improvement
359: //than anything else
360: mLogger.log(
361: "Unable to register in the TC the default entry "
362: + defaultTCEntry.getLogicalTransformation()
363: + " for site " + site, e,
364: LogManager.DEBUG_MESSAGE_LEVEL);
365: }
366:
367: return defaultTCEntry;
368: }
369:
370: }
|