001: /*
002: * This file or a portion of this file is licensed under the terms of
003: * the Globus Toolkit Public License, found in file GTPL, or at
004: * http://www.globus.org/toolkit/download/license.html. This notice must
005: * appear in redistributions of this file, with or without modification.
006: *
007: * Redistributions of this Software, with or without modification, must
008: * reproduce the GTPL in: (1) the Software, or (2) the Documentation or
009: * some other similar material which is provided with the Software (if
010: * any).
011: *
012: * Copyright 1999-2004 University of Chicago and The University of
013: * Southern California. All rights reserved.
014: */
015:
016: package org.griphyn.cPlanner.transfer.implementation;
017:
018: import org.griphyn.cPlanner.classes.SubInfo;
019: import org.griphyn.cPlanner.classes.TransferJob;
020: import org.griphyn.cPlanner.classes.PlannerOptions;
021: import org.griphyn.cPlanner.classes.SiteInfo;
022: import org.griphyn.cPlanner.classes.JobManager;
023:
024: import org.griphyn.cPlanner.common.LogManager;
025: import org.griphyn.cPlanner.common.PegasusProperties;
026:
027: import org.griphyn.cPlanner.transfer.MultipleFTPerXFERJob;
028:
029: import org.griphyn.common.catalog.TransformationCatalogEntry;
030:
031: import org.griphyn.common.classes.TCType;
032:
033: import org.griphyn.common.util.Separator;
034:
035: import java.io.File;
036: import java.io.FileWriter;
037:
038: import java.util.Collection;
039: import java.util.HashSet;
040: import java.util.List;
041: import java.util.Iterator;
042:
043: /**
044: * An abstract implementation for implementations that can handle multiple
045: * file transfers in a single file transfer job.
046: *
047: * @author Karan Vahi
048: * @version $Revision: 145 $
049: */
050:
051: public abstract class AbstractMultipleFTPerXFERJob extends Abstract
052: implements MultipleFTPerXFERJob {
053:
054: /**
055: * The overloaded constructor, that is called by the Factory to load the
056: * class.
057: *
058: * @param properties the properties object.
059: * @param options the options passed to the Planner.
060: */
061: public AbstractMultipleFTPerXFERJob(PegasusProperties properties,
062: PlannerOptions options) {
063: super (properties, options);
064: }
065:
066: /**
067: * Constructs a general transfer job that handles multiple transfers per
068: * transfer job. There are appropriate callouts to generate the implementation
069: * specific details.
070: *
071: * @param job the SubInfo object for the job, in relation to which
072: * the transfer node is being added. Either the transfer
073: * node can be transferring this jobs input files to
074: * the execution pool, or transferring this job's output
075: * files to the output pool.
076: * @param files collection of <code>FileTransfer</code> objects
077: * representing the data files and staged executables to be
078: * transferred.
079: * @param execFiles subset collection of the files parameter, that identifies
080: * the executable files that are being transferred.
081: * @param txJobName the name of transfer node.
082: * @param jobClass the job Class for the newly added job. Can be one of the
083: * following:
084: * stage-in
085: * stage-out
086: * inter-pool transfer
087: *
088: * @return the created TransferJob.
089: */
090: public TransferJob createTransferJob(SubInfo job, Collection files,
091: Collection execFiles, String txJobName, int jobClass) {
092: TransferJob txJob = new TransferJob();
093: SiteInfo ePool;
094: JobManager jobmanager;
095:
096: //site where the transfer is scheduled
097: //to be run. For thirdparty site it makes
098: //sense to schedule on the local host unless
099: //explicitly designated to run TPT on remote site
100: String tPool = mRefiner.isSiteThirdParty(job.getSiteHandle(),
101: jobClass) ?
102: //check if third party have to be run on remote site
103: mRefiner.runTPTOnRemoteSite(job.getSiteHandle(), jobClass) ? job
104: .getSiteHandle()
105: : "local"
106: : job.getSiteHandle();
107:
108: //the non third party site for the transfer job is
109: //always the job execution site for which the transfer
110: //job is being created.
111: txJob.setNonThirdPartySite(job.getSiteHandle());
112:
113: //we first check if there entry for transfer universe,
114: //if no then go for globus
115: ePool = mSCHandle.getTXPoolEntry(tPool);
116:
117: txJob.jobName = txJobName;
118: txJob.executionPool = tPool;
119: txJob.condorUniverse = "globus";
120:
121: TransformationCatalogEntry tcEntry = this
122: .getTransformationCatalogEntry(tPool);
123: if (tcEntry == null) {
124: //should throw a TC specific exception
125: StringBuffer error = new StringBuffer();
126: error.append("Could not find entry in tc for lfn ").append(
127: getCompleteTCName()).append(" at site ").append(
128: txJob.getSiteHandle());
129: mLogger.log(error.toString(),
130: LogManager.ERROR_MESSAGE_LEVEL);
131: throw new RuntimeException(error.toString());
132: }
133:
134: txJob.namespace = tcEntry.getLogicalNamespace();
135: txJob.logicalName = tcEntry.getLogicalName();
136: txJob.version = tcEntry.getLogicalVersion();
137:
138: txJob.dvName = this .getDerivationName();
139: txJob.dvNamespace = this .getDerivationNamespace();
140: txJob.dvVersion = this .getDerivationVersion();
141:
142: //this should in fact only be set
143: // for non third party pools
144: jobmanager = ePool.selectJobManager(this .TRANSFER_UNIVERSE,
145: true);
146: txJob.globusScheduler = (jobmanager == null) ? null
147: : jobmanager.getInfo(JobManager.URL);
148:
149: txJob.jobClass = jobClass;
150: txJob.jobID = job.jobName;
151:
152: txJob.stdErr = "";
153: txJob.stdOut = "";
154:
155: txJob.executable = tcEntry.getPhysicalTransformation();
156:
157: //the i/p and o/p files remain empty
158: //as we doing just copying urls
159: txJob.inputFiles = new HashSet();
160:
161: //to get the file stat information we need to put
162: //the files as output files of the transfer job
163: txJob.outputFiles = new HashSet(files);
164:
165: try {
166: txJob.stdIn = prepareSTDIN(txJobName, files);
167: } catch (Exception e) {
168: mLogger.log("Unable to write the stdIn file for job "
169: + txJob.getCompleteTCName() + " " + e.getMessage(),
170: LogManager.ERROR_MESSAGE_LEVEL);
171: }
172:
173: //the profile information from the pool catalog needs to be
174: //assimilated into the job.
175: txJob.updateProfiles(mSCHandle.getPoolProfile(tPool));
176:
177: //the profile information from the transformation
178: //catalog needs to be assimilated into the job
179: //overriding the one from pool catalog.
180: txJob.updateProfiles(tcEntry);
181:
182: //the profile information from the properties file
183: //is assimilated overidding the one from transformation
184: //catalog.
185: txJob.updateProfiles(mProps);
186:
187: //take care of transfer of proxies
188: this .checkAndTransferProxy(txJob);
189:
190: //apply the priority to the transfer job
191: this .applyPriority(txJob);
192:
193: //constructing the arguments to transfer script
194: //they only have to be incorporated after the
195: //profile incorporation
196: txJob.strargs = this .generateArgumentString(txJob);
197:
198: if (execFiles != null) {
199: //we need to add setup jobs to change the XBit
200: super .addSetXBitJobs(job, txJob, execFiles);
201: }
202:
203: //a callout that allows the derived transfer implementation
204: //classes do their own specific stuff on the job
205: this .postProcess(txJob);
206:
207: return txJob;
208: }
209:
210: /**
211: * Returns a default TC entry to be used in case entry is not found in the
212: * transformation catalog.
213: *
214: * @param namespace the namespace of the transfer transformation
215: * @param name the logical name of the transfer transformation
216: * @param version the version of the transfer transformation
217: *
218: * @param site the site for which the default entry is required.
219: *
220: *
221: * @return the default entry.
222: */
223: protected TransformationCatalogEntry defaultTCEntry(
224: String namespace, String name, String version, String site) {
225:
226: TransformationCatalogEntry defaultTCEntry = null;
227: //check if PEGASUS_HOME is set
228: String home = mSCHandle.getPegasusHome(site);
229: //if PEGASUS_HOME is not set, use VDS_HOME
230: home = (home == null) ? mSCHandle.getVDS_HOME(site) : home;
231:
232: mLogger.log("Creating a default TC entry for "
233: + Separator.combine(namespace, name, version)
234: + " at site " + site, LogManager.DEBUG_MESSAGE_LEVEL);
235:
236: //if home is still null
237: if (home == null) {
238: //cannot create default TC
239: mLogger
240: .log(
241: "Unable to create a default entry for "
242: + Separator.combine(namespace,
243: name, version)
244: + " as PEGASUS_HOME or VDS_HOME is not set in Site Catalog",
245: LogManager.DEBUG_MESSAGE_LEVEL);
246: //set the flag back to true
247: return defaultTCEntry;
248: }
249:
250: //get the essential environment variables required to get
251: //it to work correctly
252: List envs = this .getEnvironmentVariables(site);
253: if (envs == null) {
254: //cannot create default TC
255: mLogger
256: .log(
257: "Unable to create a default entry for as could not construct necessary environment "
258: + Separator.combine(namespace,
259: name, version),
260: LogManager.DEBUG_MESSAGE_LEVEL);
261: //set the flag back to true
262: return defaultTCEntry;
263: }
264:
265: //remove trailing / if specified
266: home = (home.charAt(home.length() - 1) == File.separatorChar) ? home
267: .substring(0, home.length() - 1)
268: : home;
269:
270: //construct the path to it
271: StringBuffer path = new StringBuffer();
272: path.append(home).append(File.separator).append("bin").append(
273: File.separator).append(name);
274:
275: defaultTCEntry = new TransformationCatalogEntry(namespace,
276: name, version);
277:
278: defaultTCEntry.setPhysicalTransformation(path.toString());
279: defaultTCEntry.setResourceId(site);
280: defaultTCEntry.setType(TCType.INSTALLED);
281: defaultTCEntry.setProfiles(envs);
282:
283: //register back into the transformation catalog
284: //so that we do not need to worry about creating it again
285: try {
286: mTCHandle.addTCEntry(defaultTCEntry, false);
287: } catch (Exception e) {
288: //just log as debug. as this is more of a performance improvement
289: //than anything else
290: mLogger.log(
291: "Unable to register in the TC the default entry "
292: + defaultTCEntry.getLogicalTransformation()
293: + " for site " + site, e,
294: LogManager.DEBUG_MESSAGE_LEVEL);
295: }
296: mLogger.log("Created entry with path "
297: + defaultTCEntry.getPhysicalTransformation(),
298: LogManager.DEBUG_MESSAGE_LEVEL);
299: return defaultTCEntry;
300: }
301:
302: /**
303: * Returns the environment profiles that are required for the default
304: * entry to sensibly work.
305: *
306: * @param site the site where the job is going to run.
307: *
308: * @return List of environment variables, else null in case where the
309: * required environment variables could not be found.
310: */
311: protected abstract List getEnvironmentVariables(String site);
312:
313: /**
314: * An optional method that allows the derived classes to do their own
315: * post processing on the the transfer job before it is returned to
316: * the calling module.
317: *
318: * @param job the <code>TransferJob</code> that has been created.
319: */
320: public void postProcess(TransferJob job) {
321:
322: }
323:
324: /**
325: * Prepares the stdin for the transfer job. Usually involves writing out a
326: * text file that Condor transfers to the remote end.
327: *
328: * @param name the name of the transfer job.
329: * @param files Collection of <code>FileTransfer</code> objects containing
330: * the information about sourceam fin and destURL's.
331: *
332: * @return the path to the prepared stdin file.
333: *
334: * @throws Exception in case of error.
335: */
336: protected String prepareSTDIN(String name, Collection files)
337: throws Exception {
338: //writing the stdin file
339: FileWriter stdIn;
340: String basename = name + ".in";
341: stdIn = new FileWriter(new File(mPOptions.getSubmitDirectory(),
342: basename));
343: writeJumboStdIn(stdIn, files);
344: //close the stdin stream
345: stdIn.close();
346: return basename;
347: }
348:
349: /**
350: * Returns the namespace of the derivation that this implementation
351: * refers to.
352: *
353: * @return the namespace of the derivation.
354: */
355: protected abstract String getDerivationNamespace();
356:
357: /**
358: * Returns the logical name of the derivation that this implementation
359: * refers to.
360: *
361: * @return the name of the derivation.
362: */
363: protected abstract String getDerivationName();
364:
365: /**
366: * Returns the version of the derivation that this implementation
367: * refers to.
368: *
369: * @return the version of the derivation.
370: */
371: protected abstract String getDerivationVersion();
372:
373: /**
374: * It constructs the arguments to the transfer executable that need to be passed
375: * to the executable referred to in this transfer mode.
376: *
377: * @param job the object containing the transfer node.
378: * @return the argument string
379: */
380: protected abstract String generateArgumentString(TransferJob job);
381:
382: /**
383: * Writes to a FileWriter stream the stdin which goes into the magic script
384: * via standard input
385: *
386: * @param stdIn the writer to the stdin file.
387: * @param files Collection of <code>FileTransfer</code> objects containing
388: * the information about sourceam fin and destURL's.
389: *
390: * @throws Exception
391: */
392: protected abstract void writeJumboStdIn(FileWriter stdIn,
393: Collection files) throws Exception;
394:
395: /**
396: * Returns the complete name for the transformation that the implementation
397: * is using..
398: *
399: * @return the complete name.
400: */
401: protected abstract String getCompleteTCName();
402:
403: }
|