001: /**
002: * This file or a portion of this file is licensed under the terms of
003: * the Globus Toolkit Public License, found in file GTPL, or at
004: * http://www.globus.org/toolkit/download/license.html. This notice must
005: * appear in redistributions of this file, with or without modification.
006: *
007: * Redistributions of this Software, with or without modification, must
008: * reproduce the GTPL in: (1) the Software, or (2) the Documentation or
009: * some other similar material which is provided with the Software (if
010: * any).
011: *
012: * Copyright 1999-2004 University of Chicago and The University of
013: * Southern California. All rights reserved.
014: */package org.griphyn.cPlanner.cluster.aggregator;
015:
016: import org.griphyn.cPlanner.code.GridStart;
017:
018: import org.griphyn.cPlanner.code.gridstart.GridStartFactory;
019: import org.griphyn.cPlanner.code.gridstart.GridStartFactoryException;
020:
021: import org.griphyn.cPlanner.common.LogManager;
022: import org.griphyn.cPlanner.common.PegasusProperties;
023:
024: import org.griphyn.cPlanner.classes.ADag;
025: import org.griphyn.cPlanner.classes.AggregatedJob;
026: import org.griphyn.cPlanner.classes.SubInfo;
027: import org.griphyn.cPlanner.classes.SiteInfo;
028: import org.griphyn.cPlanner.classes.PegasusBag;
029: import org.griphyn.cPlanner.classes.PlannerOptions;
030:
031: import org.griphyn.cPlanner.cluster.JobAggregator;
032:
033: import org.griphyn.cPlanner.namespace.Condor;
034: import org.griphyn.cPlanner.namespace.VDS;
035:
036: import org.griphyn.cPlanner.poolinfo.PoolInfoProvider;
037: import org.griphyn.cPlanner.poolinfo.PoolMode;
038:
039: import org.griphyn.common.util.DynamicLoader;
040:
041: import org.griphyn.common.catalog.TransformationCatalog;
042: import org.griphyn.common.catalog.TransformationCatalogEntry;
043:
044: import org.griphyn.common.catalog.transformation.TCMode;
045:
046: import org.griphyn.common.classes.TCType;
047:
048: import java.io.File;
049: import java.io.FileReader;
050: import java.io.FileWriter;
051: import java.io.BufferedReader;
052: import java.io.BufferedWriter;
053: import java.io.FileWriter;
054: import java.io.IOException;
055:
056: import java.util.List;
057: import java.util.Set;
058: import java.util.Iterator;
059: import org.griphyn.common.util.Separator;
060:
061: /**
062: * An abstract implementation of the JobAggregator interface, which the other
063: * implementations can choose to extend.
064: *
065: * @author Karan Vahi vahi@isi.edu
066: * @version $Revision: 450 $
067: *
068: */
069: public abstract class Abstract implements JobAggregator {
070:
071: /**
072: * The prefix that is assigned to the jobname of the collapsed jobs to
073: * get the jobname for the fat job.
074: */
075: public static final String FAT_JOB_PREFIX = "merge_";
076:
077: /**
078: * The transformation namespace for the cluster jobs.
079: */
080: public static final String TRANSFORMATION_NAMESPACE = "pegasus";
081:
082: /**
083: * The version number for the derivations for cluster jobs
084: */
085: public static final String TRANSFORMATION_VERSION = null;
086:
087: /**
088: * The derivation namespace for the cluster jobs.
089: */
090: public static final String DERIVATION_NAMESPACE = "pegasus";
091:
092: /**
093: * The version number for the derivations for cluster jobs.
094: */
095: public static final String DERIVATION_VERSION = "1.0";
096:
097: /**
098: * The directory, where the stdin file of the fat jobs are created.
099: * It should be the submit file directory that the user mentions at
100: * runtime.
101: */
102: protected String mDirectory;
103:
104: /**
105: * The object holding all the properties pertaining to Pegasus.
106: */
107: protected PegasusProperties mProps;
108:
109: /**
110: * The handle to the LogManager that logs all the messages.
111: */
112: protected LogManager mLogger;
113:
114: /**
115: * The handle to the transformation catalog.
116: */
117: protected TransformationCatalog mTCHandle;
118:
119: /**
120: * Handle to the site catalog.
121: */
122: protected PoolInfoProvider mSiteHandle;
123:
124: /**
125: * The handle to the ADag object that contains the workflow being
126: * clustered.
127: */
128: protected ADag mClusteredADag;
129:
130: /**
131: * The handle to the GridStart Factory.
132: */
133: protected GridStartFactory mGridStartFactory;
134:
135: /**
136: * Bag of initialization objects.
137: */
138: protected PegasusBag mBag;
139:
140: /**
141: * A convenience method to return the complete transformation name being
142: * used to construct jobs in this class.
143: *
144: * @param name the name of the transformation
145: *
146: * @return the complete transformation name
147: */
148: public static String getCompleteTranformationName(String name) {
149: return Separator.combine(TRANSFORMATION_NAMESPACE, name,
150: TRANSFORMATION_VERSION);
151: }
152:
153: /**
154: * The default constructor.
155: */
156: public Abstract() {
157:
158: }
159:
160: /**
161: *Initializes the JobAggregator impelementation
162: *
163: * @param dag the workflow that is being clustered.
164: * @param bag the bag of objects that is useful for initialization.
165: *
166: */
167: public void initialize(ADag dag, PegasusBag bag) {
168: mBag = bag;
169: mClusteredADag = dag;
170:
171: mLogger = bag.getLogger();
172: mProps = bag.getPegasusProperties();
173:
174: mTCHandle = bag.getHandleToTransformationCatalog();
175: mSiteHandle = bag.getHandleToSiteCatalog();
176:
177: setDirectory(bag.getPlannerOptions().getSubmitDirectory());
178:
179: mGridStartFactory = new GridStartFactory();
180: mGridStartFactory.initialize(mBag, dag);
181:
182: }
183:
184: /**
185: * Returns the arguments with which the <code>AggregatedJob</code>
186: * needs to be invoked with.
187: *
188: * @param job the <code>AggregatedJob</code> for which the arguments have
189: * to be constructed.
190: *
191: * @return argument string
192: */
193: public abstract String aggregatedJobArguments(AggregatedJob job);
194:
195: /**
196: * Enables the constitutent jobs that make up a aggregated job.
197: *
198: * @param mergedJob the clusteredJob
199: * @param jobs the constitutent jobs
200: *
201: * @return AggregatedJob
202: */
203: protected abstract AggregatedJob enable(AggregatedJob mergedJob,
204: List jobs);
205:
206: /**
207: * Constructs a new aggregated job that contains all the jobs passed to it.
208: * The new aggregated job, appears as a single job in the workflow and
209: * replaces the jobs it contains in the workflow.
210: *
211: * @param jobs the list of <code>SubInfo</code> objects that need to be
212: * collapsed. All the jobs being collapsed should be scheduled
213: * at the same pool, to maintain correct semantics.
214: * @param name the logical name of the jobs in the list passed to this
215: * function.
216: * @param id the id that is given to the new job.
217: *
218: * @return the <code>SubInfo</code> object corresponding to the aggregated
219: * job containing the jobs passed as List in the input,
220: * null if the list of jobs is empty
221: */
222: public AggregatedJob construct(List jobs, String name, String id) {
223: return construct(jobs, name, id, getCollapserLFN());
224: }
225:
226: /**
227: * Constructs a new aggregated job that contains all the jobs passed to it.
228: * The new aggregated job, appears as a single job in the workflow and
229: * replaces the jobs it contains in the workflow.
230: *
231: * @param jobs the list of <code>SubInfo</code> objects that need to be
232: * collapsed. All the jobs being collapsed should be scheduled
233: * at the same pool, to maintain correct semantics.
234: * @param name the logical name of the jobs in the list passed to this
235: * function.
236: * @param id the id that is given to the new job.
237: * @param mergeLFN the logical name for the aggregated job that has to be
238: * constructed.
239: *
240: * @return the <code>AggregatedJob</code> object corresponding to the aggregated
241: * job containing the jobs passed as List in the input,
242: * null if the list of jobs is empty
243: */
244: protected AggregatedJob construct(List jobs, String name,
245: String id, String mergeLFN) {
246: //sanity check
247: if (jobs == null || jobs.isEmpty()) {
248: mLogger.log("List of jobs for clustering is empty",
249: LogManager.ERROR_MESSAGE_LEVEL);
250: return null;
251: }
252:
253: //sanity check missing to ensure jobs are of same type
254: //Right now done in NodeCollapser. But we do not need this for
255: //Vertical Clumping. Karan July 28, 2005
256:
257: //To get the gridstart/kickstart path on the remote
258: //pool, querying with entry for vanilla universe.
259: //In the new format the gridstart is associated with the
260: //pool not pool, condor universe
261: SubInfo firstJob = (SubInfo) jobs.get(0);
262: AggregatedJob mergedJob = new AggregatedJob( /*(SubInfo)jobs.get(0),*/
263: jobs.size());
264:
265: SubInfo job = null;
266: String mergedJobName = this .FAT_JOB_PREFIX + name + "_" + id;
267: mLogger.log("Constructing clustered job " + mergedJobName,
268: LogManager.DEBUG_MESSAGE_LEVEL);
269:
270: String stdIn = null;
271:
272: //containers for the input and output
273: //files of fat job. Set insures no duplication
274: //The multiple transfer ensures no duplicate transfer of
275: //input files. So doing the set thing is redundant.
276: //Hashset not used correctly
277: Set ipFiles = new java.util.HashSet();
278: Set opFiles = new java.util.HashSet();
279:
280: //enable the jobs that need to be merged
281: //before writing out the stdin file
282: // String gridStartPath = site.getKickstartPath();
283: // GridStart gridStart = mGridStartFactory.loadGridStart( firstJob, gridStartPath );
284: // mergedJob = gridStart.enable( mergedJob, jobs );
285:
286: mergedJob = enable(mergedJob, jobs);
287:
288: try {
289: BufferedWriter writer;
290: stdIn = mergedJobName + ".in";
291: writer = new BufferedWriter(new FileWriter(new File(
292: mDirectory, stdIn)));
293:
294: //traverse throught the jobs to determine input/output files
295: //and merge the profiles for the jobs
296: boolean merge = false;
297: for (Iterator it = jobs.iterator(); it.hasNext();) {
298: job = (SubInfo) it.next();
299: ipFiles.addAll(job.getInputFiles());
300: opFiles.addAll(job.getOutputFiles());
301:
302: //merge profiles for all jobs except the first
303: // if( merge ) { mergedJob.mergeProfiles( job ); }
304: //merge profiles for all jobs
305: mergedJob.mergeProfiles(job);
306:
307: merge = true;
308:
309: //handle stdin
310: if (job instanceof AggregatedJob) {
311: //slurp in contents of it's stdin
312: File file = new File(mDirectory, job.getStdIn());
313: BufferedReader reader = new BufferedReader(
314: new FileReader(file));
315: String line;
316: while ((line = reader.readLine()) != null) {
317: writer.write(line);
318: writer.write("\n");
319: }
320: reader.close();
321: //delete the previous stdin file
322: file.delete();
323: } else {
324: //write out the argument string to the
325: //stdin file for the fat job
326: writer.write(job.condorVariables.get("executable")
327: + " "
328: + job.condorVariables.get("arguments")
329: + "\n");
330: }
331: }
332:
333: //closing the handle to the writer
334: writer.close();
335: } catch (IOException e) {
336: mLogger.log("While writing the stdIn file "
337: + e.getMessage(), LogManager.ERROR_MESSAGE_LEVEL);
338: throw new RuntimeException("While writing the stdIn file "
339: + stdIn, e);
340: }
341:
342: //inconsistency between job name and logical name for now
343: mergedJob.setName(mergedJobName);
344:
345: mergedJob.setTransformation(this .TRANSFORMATION_NAMESPACE,
346: mergeLFN, this .TRANSFORMATION_VERSION);
347: mergedJob.setDerivation(this .DERIVATION_NAMESPACE, mergeLFN,
348: this .DERIVATION_VERSION);
349:
350: mergedJob.setLogicalID(id);
351:
352: mergedJob.setSiteHandle(firstJob.getSiteHandle());
353: mergedJob.setUniverse(firstJob.getUniverse());
354: mergedJob.setJobManager(firstJob.getJobManager());
355: mergedJob.setJobType(SubInfo.COMPUTE_JOB);
356:
357: //the compute job of the VDS supernode is this job itself
358: mergedJob.setVDSSuperNode(mergedJobName);
359:
360: //the executable that fat job refers to is collapser
361: TransformationCatalogEntry entry = this .getTCEntry(mergedJob);
362:
363: mergedJob
364: .setRemoteExecutable(entry.getPhysicalTransformation());
365:
366: //overriding the input files, output files, id
367: mergedJob.setInputFiles(ipFiles);
368: mergedJob.setOutputFiles(opFiles);
369:
370: //stdin file is the file containing the arguments
371: //for the jobs being collapsed
372: mergedJob.setStdIn(stdIn);
373:
374: //explicitly set stdout to null overriding any stdout
375: //that might have been inherited in the clone operation.
376: //FIX for bug 142 http://bugzilla.globus.org/vds/show_bug.cgi?id=142
377: mergedJob.setStdOut("");
378: mergedJob.setStdErr("");
379:
380: //set the arguments for the clustered job
381: mergedJob.setArguments(this .aggregatedJobArguments(mergedJob));
382:
383: //get hold of one of the jobs and suck init's globus namespace
384: //info into the the map.
385:
386: /* Not needed, as the clone method would have taken care of it.
387: Karan Sept 09, 2004
388: entry = getTCEntry(job);
389: mergedJob.globusRSL.checkKeyInNS(entry.getProfiles(Profile.GLOBUS));
390: */
391:
392: //also put in jobType as mpi
393: //mergedJob.globusRSL.checkKeyinNS("jobtype","mpi");
394: //the profile information from the pool catalog does not need to be
395: //assimilated into the job. As the collapsed job is run on the
396: //same pool as the job is run
397: // mergedJob.updateProfiles(mPoolHandle.getPoolProfile(mergedJob.executionPool));
398: //the profile information from the transformation
399: //catalog needs to be assimilated into the job
400: //overriding the one from pool catalog.
401: mergedJob.updateProfiles(entry);
402:
403: //the profile information from the properties file
404: //is assimilated overidding the one from transformation
405: //catalog.
406: mergedJob.updateProfiles(mProps);
407:
408: return mergedJob;
409:
410: }
411:
412: /**
413: * Helper method to get an entry from the transformation catalog for an
414: * installed executable. It does the traversal from the list of entries
415: * to return a single TransformationCatalogEntry object, and dies with
416: * an appropriate error message if the object is not found.
417: * The pool and the name are retrieved from job object.
418: *
419: * @param job the job whose corresponding TransformationCatalogEntry you want.
420: *
421: * @return the TransformationCatalogEntry corresponding to the entry in the
422: * TC.
423: */
424: protected TransformationCatalogEntry getTCEntry(SubInfo job) {
425: List tcentries = null;
426: TransformationCatalogEntry entry = null;
427: try {
428: tcentries = mTCHandle.getTCEntries(job.namespace,
429: job.logicalName, job.version, job.executionPool,
430: TCType.INSTALLED);
431: } catch (Exception e) {
432: mLogger.log(
433: "Unable to retrieve entry from TC for transformation "
434: + job.getCompleteTCName() + " "
435: + e.getMessage(),
436: LogManager.DEBUG_MESSAGE_LEVEL);
437: }
438:
439: entry = (tcentries == null) ? this .defaultTCEntry(job
440: .getTXName(), job.getSiteHandle()) : //try using a default one
441: (TransformationCatalogEntry) tcentries.get(0);
442:
443: if (entry == null) {
444: //NOW THROWN AN EXCEPTION
445:
446: //should throw a TC specific exception
447: StringBuffer error = new StringBuffer();
448: error.append("Could not find entry in tc for lfn ").append(
449: job.getCompleteTCName()).append(" at site ")
450: .append(job.getSiteHandle());
451:
452: mLogger.log(error.toString(),
453: LogManager.ERROR_MESSAGE_LEVEL);
454: throw new RuntimeException(error.toString());
455:
456: }
457:
458: return entry;
459:
460: }
461:
462: /**
463: * Returns a default TC entry to be used in case entry is not found in the
464: * transformation catalog.
465: *
466: * @param name the logical name of the clustering executable.
467: * @param site the site for which the default entry is required.
468: *
469: *
470: * @return the default entry.
471: */
472: private TransformationCatalogEntry defaultTCEntry(String name,
473: String site) {
474: TransformationCatalogEntry defaultTCEntry = null;
475: //check if PEGASUS_HOME is set
476: String home = mSiteHandle.getPegasusHome(site);
477: //if PEGASUS_HOME is not set, use VDS_HOME
478: home = (home == null) ? mSiteHandle.getVDS_HOME(site) : home;
479:
480: mLogger.log("Creating a default TC entry for "
481: + this .getCompleteTranformationName(name) + " at site "
482: + site, LogManager.DEBUG_MESSAGE_LEVEL);
483:
484: //if home is still null
485: if (home == null) {
486: //cannot create default TC
487: mLogger.log("Unable to create a default entry for "
488: + this .getCompleteTranformationName(name),
489: LogManager.DEBUG_MESSAGE_LEVEL);
490: //set the flag back to true
491: return defaultTCEntry;
492: }
493:
494: //remove trailing / if specified
495: home = (home.charAt(home.length() - 1) == File.separatorChar) ? home
496: .substring(0, home.length() - 1)
497: : home;
498:
499: //construct the path to it
500: StringBuffer path = new StringBuffer();
501: path.append(home).append(File.separator).append("bin").append(
502: File.separator).append(name);
503:
504: defaultTCEntry = new TransformationCatalogEntry(
505: this .TRANSFORMATION_NAMESPACE, name,
506: this .TRANSFORMATION_VERSION);
507:
508: defaultTCEntry.setPhysicalTransformation(path.toString());
509: defaultTCEntry.setResourceId(site);
510: defaultTCEntry.setType(TCType.INSTALLED);
511:
512: //register back into the transformation catalog
513: //so that we do not need to worry about creating it again
514: try {
515: mTCHandle.addTCEntry(defaultTCEntry, false);
516: } catch (Exception e) {
517: //just log as debug. as this is more of a performance improvement
518: //than anything else
519: mLogger.log(
520: "Unable to register in the TC the default entry "
521: + defaultTCEntry.getLogicalTransformation()
522: + " for site " + site, e,
523: LogManager.DEBUG_MESSAGE_LEVEL);
524: }
525:
526: return defaultTCEntry;
527: }
528:
529: /**
530: * Determines whether there is NOT an entry in the transformation catalog
531: * for a particular transformation on a particular site.
532: *
533: * @param namespace the logical namespace of the transformation.
534: * @param name the logical name of the transformation.
535: * @param version the version of the transformation.
536: * @param site the site at which existence check is required.
537: *
538: * @return boolean true if an entry does not exists, false otherwise.
539: */
540: protected boolean entryNotInTC(String namespace, String name,
541: String version, String site) {
542:
543: //check on for pfn for existence. gmehta says lesser queries
544: //underneath
545: List l = null;
546: try {
547: l = mTCHandle.getTCPhysicalNames(namespace, name, version,
548: site, TCType.INSTALLED);
549: } catch (Exception e) {
550: mLogger.log("Unable to retrieve entry from TC "
551: + e.getMessage(), LogManager.ERROR_MESSAGE_LEVEL);
552: }
553:
554: //a double negative
555: return (l == null || l.isEmpty()) ? ((this .defaultTCEntry(name,
556: site)) == null) : //construct a default tc entry
557: false;
558: }
559:
560: /**
561: * Sets the directory where the stdin files are to be generated.
562: *
563: * @param directory the path to the directory to which it needs to be set.
564: */
565: protected void setDirectory(String directory) {
566: mDirectory = (directory == null) ?
567: //user did not specify a submit file dir
568: //use the default i.e current directory
569: "."
570: :
571: //user specified directory picked up
572: directory;
573:
574: }
575:
576: }
|