001: /**
002: * This file or a portion of this file is licensed under the terms of
003: * the Globus Toolkit Public License, found in file GTPL, or at
004: * http://www.globus.org/toolkit/download/license.html. This notice must
005: * appear in redistributions of this file, with or without modification.
006: *
007: * Redistributions of this Software, with or without modification, must
008: * reproduce the GTPL in: (1) the Software, or (2) the Documentation or
009: * some other similar material which is provided with the Software (if
010: * any).
011: *
012: * Copyright 1999-2004 University of Chicago and The University of
013: * Southern California. All rights reserved.
014: */package org.griphyn.cPlanner.engine.cleanup;
015:
016: import org.griphyn.cPlanner.classes.SubInfo;
017: import org.griphyn.cPlanner.classes.PlannerOptions;
018: import org.griphyn.cPlanner.classes.PegasusFile;
019:
020: import org.griphyn.cPlanner.common.PegasusProperties;
021: import org.griphyn.cPlanner.common.LogManager;
022:
023: import org.griphyn.cPlanner.namespace.Condor;
024:
025: import org.griphyn.cPlanner.poolinfo.PoolInfoProvider;
026: import org.griphyn.cPlanner.poolinfo.SiteFactory;
027: import org.griphyn.cPlanner.poolinfo.SiteFactoryException;
028:
029: import org.griphyn.common.catalog.TransformationCatalog;
030: import org.griphyn.common.catalog.TransformationCatalogEntry;
031:
032: import org.griphyn.common.catalog.transformation.TransformationFactory;
033: import org.griphyn.common.catalog.transformation.TransformationFactoryException;
034:
035: import org.griphyn.common.classes.TCType;
036:
037: import org.griphyn.common.util.Separator;
038:
039: import java.util.List;
040: import java.util.Iterator;
041: import java.util.HashSet;
042: import java.util.ArrayList;
043:
044: import java.io.BufferedWriter;
045: import java.io.FileWriter;
046: import java.io.File;
047: import java.io.IOException;
048:
049: /**
050: * Use's RM to do removal of the files on the remote sites.
051: *
052: * @author Karan Vahi
053: * @version $Revision: 141 $
054: */
055: public class Cleanup implements Implementation {
056:
057: /**
058: * The transformation namespace for the job.
059: */
060: public static final String TRANSFORMATION_NAMESPACE = "pegasus";
061:
062: /**
063: * The default priority key associated with the cleanup jobs.
064: */
065: public static final String DEFAULT_PRIORITY_KEY = "1000";
066:
067: /**
068: * The name of the underlying transformation that is queried for in the
069: * Transformation Catalog.
070: */
071: public static final String TRANSFORMATION_NAME = "cleanup";
072:
073: /**
074: * The version number for the job.
075: */
076: public static final String TRANSFORMATION_VERSION = null;
077:
078: /**
079: * The derivation namespace for the job.
080: */
081: public static final String DERIVATION_NAMESPACE = "pegasus";
082:
083: /**
084: * The name of the underlying derivation.
085: */
086: public static final String DERIVATION_NAME = "cleanup";
087:
088: /**
089: * The derivation version number for the job.
090: */
091: public static final String DERIVATION_VERSION = null;
092:
093: /**
094: * A short description of the transfer implementation.
095: */
096: public static final String DESCRIPTION = "A cleanup script that reads from the stdin the list of files"
097: + " to be cleaned, with one file per line";
098:
099: /**
100: * The handle to the transformation catalog.
101: */
102: protected TransformationCatalog mTCHandle;
103:
104: /**
105: * Handle to the site catalog.
106: */
107: protected PoolInfoProvider mSiteHandle;
108:
109: /**
110: * The handle to the properties passed to Pegasus.
111: */
112: private PegasusProperties mProps;
113:
114: /**
115: * The submit directory where the output files have to be written.
116: */
117: private String mSubmitDirectory;
118:
119: /**
120: * The handle to the logger.
121: */
122: private LogManager mLogger;
123:
124: /**
125: * A convenience method to return the complete transformation name being
126: * used to construct jobs in this class.
127: *
128: * @return the complete transformation name
129: */
130: public static String getCompleteTranformationName() {
131: return Separator.combine(TRANSFORMATION_NAMESPACE,
132: TRANSFORMATION_NAME, TRANSFORMATION_VERSION);
133: }
134:
135: /**
136: * Creates a new instance of InPlace
137: *
138: * @param properties the properties passed to the planner.
139: * @param options the options passed to the planner.
140: *
141: */
142: public Cleanup(PegasusProperties properties, PlannerOptions options) {
143: mLogger = LogManager.getInstance();
144: mProps = properties;
145: mSubmitDirectory = options.getSubmitDirectory();
146:
147: /* load the site catalog using the factory */
148: try {
149: mSiteHandle = SiteFactory.loadInstance(properties, false);
150: } catch (SiteFactoryException e) {
151: throw new RuntimeException("Unable to load Site Catalog "
152: + e.convertException(), e);
153: }
154:
155: /* load the transformation catalog using the factory */
156: try {
157: mTCHandle = TransformationFactory.loadInstance(properties);
158: } catch (TransformationFactoryException e) {
159: throw new RuntimeException(
160: "Unable to load Transformation Catalog "
161: + e.convertException(), e);
162: }
163:
164: }
165:
166: /**
167: * Creates a cleanup job that removes the files from remote working directory.
168: * This will eventually make way to it's own interface.
169: *
170: * @param id the identifier to be assigned to the job.
171: * @param files the list of <code>PegasusFile</code> that need to be
172: * cleaned up.
173: * @param job the primary compute job with which this cleanup job is associated.
174: *
175: * @return the cleanup job.
176: */
177: public SubInfo createCleanupJob(String id, List files, SubInfo job) {
178:
179: //we want to run the clnjob in the same directory
180: //as the compute job. So we clone.
181: SubInfo cJob = (SubInfo) job.clone();
182: cJob.setJobType(SubInfo.CLEANUP_JOB);
183: cJob.setName(id);
184: cJob.setArguments("");
185:
186: //inconsistency between job name and logical name for now
187: cJob.setTransformation(this .TRANSFORMATION_NAMESPACE,
188: this .TRANSFORMATION_NAME, this .TRANSFORMATION_VERSION);
189:
190: cJob.setDerivation(this .DERIVATION_NAMESPACE,
191: this .DERIVATION_NAME, this .DERIVATION_VERSION);
192:
193: cJob.setLogicalID(id);
194:
195: //set the list of files as input files
196: //to change function signature to reflect a set only
197: cJob.setInputFiles(new HashSet(files));
198:
199: //the compute job of the VDS supernode is this job itself
200: cJob.setVDSSuperNode(job.getID());
201:
202: //set the path to the rm executable
203: TransformationCatalogEntry entry = this .getTCEntry(job
204: .getSiteHandle());
205: cJob.setRemoteExecutable(entry.getPhysicalTransformation());
206:
207: //prepare the stdin for the cleanup job
208: String stdIn = id + ".in";
209: try {
210: BufferedWriter writer;
211: writer = new BufferedWriter(new FileWriter(new File(
212: mSubmitDirectory, stdIn)));
213:
214: for (Iterator it = files.iterator(); it.hasNext();) {
215: PegasusFile file = (PegasusFile) it.next();
216: writer.write(file.getLFN());
217: writer.write("\n");
218: }
219:
220: //closing the handle to the writer
221: writer.close();
222: } catch (IOException e) {
223: mLogger.log("While writing the stdIn file "
224: + e.getMessage(), LogManager.ERROR_MESSAGE_LEVEL);
225: throw new RuntimeException("While writing the stdIn file "
226: + stdIn, e);
227: }
228:
229: //we want to run the job on fork jobmanager
230: //SiteInfo site = mSiteHandle.getTXPoolEntry( cJob.getSiteHandle() );
231: //JobManager jobmanager = site.selectJobManager( Engine.TRANSFER_UNIVERSE, true );
232: //cJob.globusScheduler = (jobmanager == null) ?
233: // null :
234: // jobmanager.getInfo(JobManager.URL);
235:
236: //set the stdin file for the job
237: cJob.setStdIn(stdIn);
238:
239: //the cleanup job is a clone of compute
240: //need to reset the profiles first
241: cJob.resetProfiles();
242:
243: //the profile information from the pool catalog needs to be
244: //assimilated into the job.
245: cJob.updateProfiles(mSiteHandle.getPoolProfile(job
246: .getSiteHandle()));
247:
248: //the profile information from the transformation
249: //catalog needs to be assimilated into the job
250: //overriding the one from pool catalog.
251: cJob.updateProfiles(entry);
252:
253: //the profile information from the properties file
254: //is assimilated overidding the one from transformation
255: //catalog.
256: cJob.updateProfiles(mProps);
257:
258: //let us put some priority for the cleaunup jobs
259: cJob.condorVariables.construct(Condor.PRIORITY_KEY,
260: DEFAULT_PRIORITY_KEY);
261:
262: //a remote hack that only works for condor pools
263: //cJob.globusRSL.construct( "condorsubmit",
264: // "(priority " + DEFAULT_PRIORITY_KEY + ")");
265: return cJob;
266: }
267:
268: /**
269: * Returns the TCEntry object for the rm executable on a grid site.
270: *
271: * @param site the site corresponding to which the entry is required.
272: *
273: * @return the TransformationCatalogEntry corresponding to the site.
274: */
275: protected TransformationCatalogEntry getTCEntry(String site) {
276: List tcentries = null;
277: TransformationCatalogEntry entry = null;
278: try {
279: tcentries = mTCHandle
280: .getTCEntries(this .TRANSFORMATION_NAMESPACE,
281: this .TRANSFORMATION_NAME,
282: this .TRANSFORMATION_VERSION, site,
283: TCType.INSTALLED);
284: } catch (Exception e) { /* empty catch */
285: }
286:
287: entry = (tcentries == null) ? this .defaultTCEntry(site) : //try using a default one
288: (TransformationCatalogEntry) tcentries.get(0);
289:
290: if (entry == null) {
291: //NOW THROWN AN EXCEPTION
292:
293: //should throw a TC specific exception
294: StringBuffer error = new StringBuffer();
295: error.append("Could not find entry in tc for lfn ").append(
296: this .getCompleteTranformationName()).append(
297: " at site ").append(site);
298:
299: mLogger.log(error.toString(),
300: LogManager.ERROR_MESSAGE_LEVEL);
301: throw new RuntimeException(error.toString());
302:
303: }
304:
305: return entry;
306:
307: }
308:
309: /**
310: * Returns a default TC entry to be used in case entry is not found in the
311: * transformation catalog.
312: *
313: * @param site the site for which the default entry is required.
314: *
315: *
316: * @return the default entry.
317: */
318: private TransformationCatalogEntry defaultTCEntry(String site) {
319: TransformationCatalogEntry defaultTCEntry = null;
320: //check if PEGASUS_HOME is set
321: String home = mSiteHandle.getPegasusHome(site);
322:
323: mLogger.log("Creating a default TC entry for "
324: + this .getCompleteTranformationName() + " at site "
325: + site, LogManager.DEBUG_MESSAGE_LEVEL);
326:
327: //if home is still null
328: if (home == null) {
329: //cannot create default TC
330: mLogger.log("Unable to create a default entry for "
331: + Separator.combine(this .TRANSFORMATION_NAMESPACE,
332: this .TRANSFORMATION_NAME,
333: this .TRANSFORMATION_VERSION),
334: LogManager.DEBUG_MESSAGE_LEVEL);
335: //set the flag back to true
336: return defaultTCEntry;
337: }
338:
339: //remove trailing / if specified
340: home = (home.charAt(home.length() - 1) == File.separatorChar) ? home
341: .substring(0, home.length() - 1)
342: : home;
343:
344: //construct the path to it
345: StringBuffer path = new StringBuffer();
346: path.append(home).append(File.separator).append("bin").append(
347: File.separator).append(this .TRANSFORMATION_NAME);
348:
349: defaultTCEntry = new TransformationCatalogEntry(
350: this .TRANSFORMATION_NAMESPACE,
351: this .TRANSFORMATION_NAME, this .TRANSFORMATION_VERSION);
352:
353: defaultTCEntry.setPhysicalTransformation(path.toString());
354: defaultTCEntry.setResourceId(site);
355: defaultTCEntry.setType(TCType.INSTALLED);
356:
357: //register back into the transformation catalog
358: //so that we do not need to worry about creating it again
359: try {
360: mTCHandle.addTCEntry(defaultTCEntry, false);
361: } catch (Exception e) {
362: //just log as debug. as this is more of a performance improvement
363: //than anything else
364: mLogger.log(
365: "Unable to register in the TC the default entry "
366: + defaultTCEntry.getLogicalTransformation()
367: + " for site " + site, e,
368: LogManager.DEBUG_MESSAGE_LEVEL);
369: }
370:
371: return defaultTCEntry;
372: }
373:
374: }
|