001: /*
002: * This file or a portion of this file is licensed under the terms of
003: * the Globus Toolkit Public License, found in file GTPL, or at
004: * http://www.globus.org/toolkit/download/license.html. This notice must
005: * appear in redistributions of this file, with or without modification.
006: *
007: * Redistributions of this Software, with or without modification, must
008: * reproduce the GTPL in: (1) the Software, or (2) the Documentation or
009: * some other similar material which is provided with the Software (if
010: * any).
011: *
012: * Copyright 1999-2004 University of Chicago and The University of
013: * Southern California. All rights reserved.
014: */
015:
016: package org.griphyn.cPlanner.transfer.implementation;
017:
018: import org.griphyn.cPlanner.classes.SubInfo;
019: import org.griphyn.cPlanner.classes.TransferJob;
020: import org.griphyn.cPlanner.classes.PlannerOptions;
021: import org.griphyn.cPlanner.classes.FileTransfer;
022: import org.griphyn.cPlanner.classes.SiteInfo;
023: import org.griphyn.cPlanner.classes.JobManager;
024:
025: import org.griphyn.cPlanner.common.LogManager;
026: import org.griphyn.cPlanner.common.PegasusProperties;
027:
028: import org.griphyn.cPlanner.transfer.SingleFTPerXFERJob;
029:
030: import org.griphyn.common.catalog.TransformationCatalogEntry;
031:
032: import java.util.Collection;
033: import java.util.HashSet;
034: import java.util.Vector;
035: import java.util.Iterator;
036:
037: /**
038: * An abstract implementation for implementations that can handle only a single
039: * file transfer in a single file transfer job.
040: *
041: * @author Karan Vahi
042: * @version $Revision: 50 $
043: */
044:
045: public abstract class AbstractSingleFTPerXFERJob extends Abstract
046: implements SingleFTPerXFERJob {
047:
048: /**
049: * The overloaded constructor, that is called by the Factory to load the
050: * class.
051: *
052: * @param properties the properties object.
053: * @param options the options passed to the Planner.
054: */
055: public AbstractSingleFTPerXFERJob(PegasusProperties properties,
056: PlannerOptions options) {
057: super (properties, options);
058: }
059:
060: /**
061: * Constructs a general transfer job that handles single transfers per
062: * transfer job. There are appropriate callouts to generate the implementation
063: * specific details. It throws an error if asked to create a transfer job
064: * for more than one transfer.
065: *
066: * @param job the SubInfo object for the job, in relation to which
067: * the transfer node is being added. Either the transfer
068: * node can be transferring this jobs input files to
069: * the execution pool, or transferring this job's output
070: * files to the output pool.
071: * @param files collection of <code>FileTransfer</code> objects
072: * representing the data files and staged executables to be
073: * transferred.
074: * @param execFiles subset collection of the files parameter, that identifies
075: * the executable files that are being transferred.
076: * @param txJobName the name of transfer node.
077: * @param jobClass the job Class for the newly added job. Can be one of the
078: * following:
079: * stage-in
080: * stage-out
081: * inter-pool transfer
082: *
083: * @return the created TransferJob.
084: */
085: public TransferJob createTransferJob(SubInfo job, Collection files,
086: Collection execFiles, String txJobName, int jobClass) {
087:
088: if (files.size() > 1) {
089: //log an error
090: //should throw an exception!
091: StringBuffer error = new StringBuffer();
092:
093: error.append("Transfer Implementation ").append(
094: this .getDescription()).append(
095: " supports single transfer per transfer job ");
096: mLogger.log(error.toString(),
097: LogManager.ERROR_MESSAGE_LEVEL);
098: throw new RuntimeException(error.toString());
099: }
100:
101: Iterator it = files.iterator();
102: FileTransfer ft = (FileTransfer) it.next();
103: return this .createTransferJob(job, ft, execFiles, txJobName,
104: jobClass);
105: }
106:
107: /**
108: * Constructs a general transfer job that handles single transfers per
109: * transfer job. There are appropriate callouts to generate the implementation
110: * specific details.
111: *
112: * @param job the SubInfo object for the job, in relation to which
113: * the transfer node is being added. Either the transfer
114: * node can be transferring this jobs input files to
115: * the execution pool, or transferring this job's output
116: * files to the output pool.
117: * @param file collection of <code>FileTransfer</code> objects
118: * representing the data files and staged executables to be
119: * transferred.
120: * @param execFiles subset collection of the files parameter, that identifies
121: * the executable files that are being transferred.
122: * @param txJobName the name of transfer node.
123: * @param jobClass the job Class for the newly added job. Can be one of the
124: * following:
125: * stage-in
126: * stage-out
127: * inter-pool transfer
128: *
129: * @return the created TransferJob.
130: */
131: public TransferJob createTransferJob(SubInfo job,
132: FileTransfer file, Collection execFiles, String txJobName,
133: int jobClass) {
134:
135: TransferJob txJob = new TransferJob();
136: SiteInfo ePool;
137: JobManager jobmanager;
138:
139: //site where the transfer is scheduled
140: //to be run. For thirdparty site it makes
141: //sense to schedule on the local host unless
142: //explicitly designated to run TPT on remote site
143: String tPool = mRefiner.isSiteThirdParty(job.getSiteHandle(),
144: jobClass) ?
145: //check if third party have to be run on remote site
146: mRefiner.runTPTOnRemoteSite(job.getSiteHandle(), jobClass) ? job
147: .getSiteHandle()
148: : "local"
149: : job.getSiteHandle();
150:
151: //the non third party site for the transfer job is
152: //always the job execution site for which the transfer
153: //job is being created.
154: txJob.setNonThirdPartySite(job.getSiteHandle());
155:
156: //we first check if there entry for transfer universe,
157: //if no then go for globus
158: ePool = mSCHandle.getTXPoolEntry(tPool);
159:
160: txJob.jobName = txJobName;
161: txJob.executionPool = tPool;
162: txJob.condorUniverse = "globus";
163:
164: TransformationCatalogEntry tcEntry = this
165: .getTransformationCatalogEntry(tPool);
166: if (tcEntry == null) {
167: //should throw a TC specific exception
168: StringBuffer error = new StringBuffer();
169: error.append("Could not find entry in tc for lfn ").append(
170: getCompleteTCName()).append(" at site ").append(
171: txJob.getSiteHandle());
172: mLogger.log(error.toString(),
173: LogManager.ERROR_MESSAGE_LEVEL);
174: throw new RuntimeException(error.toString());
175:
176: }
177:
178: txJob.namespace = tcEntry.getLogicalNamespace();
179: txJob.logicalName = tcEntry.getLogicalName();
180: txJob.version = tcEntry.getLogicalVersion();
181:
182: txJob.dvName = this .getDerivationName();
183: txJob.dvNamespace = this .getDerivationNamespace();
184: txJob.dvVersion = this .getDerivationVersion();
185:
186: //this should in fact only be set
187: // for non third party pools
188: jobmanager = ePool.selectJobManager(this .TRANSFER_UNIVERSE,
189: true);
190: txJob.globusScheduler = (jobmanager == null) ? null
191: : jobmanager.getInfo(JobManager.URL);
192:
193: txJob.jobClass = jobClass;
194: txJob.jobID = job.jobName;
195:
196: txJob.stdErr = "";
197: txJob.stdOut = "";
198:
199: txJob.executable = tcEntry.getPhysicalTransformation();
200:
201: //the i/p and o/p files remain empty
202: //as we doing just copying urls
203: txJob.inputFiles = new HashSet();
204: txJob.outputFiles = new HashSet();
205:
206: //no stdin file is written out
207:
208: //the profile information from the pool catalog needs to be
209: //assimilated into the job.
210: txJob.updateProfiles(mSCHandle.getPoolProfile(tPool));
211:
212: //the profile information from the transformation
213: //catalog needs to be assimilated into the job
214: //overriding the one from pool catalog.
215: txJob.updateProfiles(tcEntry);
216:
217: //the profile information from the properties file
218: //is assimilated overidding the one from transformation
219: //catalog.
220: txJob.updateProfiles(mProps);
221:
222: //take care of transfer of proxies
223: this .checkAndTransferProxy(txJob);
224:
225: //apply the priority to the transfer job
226: this .applyPriority(txJob);
227:
228: //constructing the arguments to transfer script
229: //they only have to be incorporated after the
230: //profile incorporation
231: txJob.strargs = this .generateArgumentString(txJob, file);
232:
233: if (execFiles != null) {
234: //we need to add setup jobs to change the XBit
235: super .addSetXBitJobs(job, txJob, execFiles);
236: }
237: return txJob;
238: }
239:
240: /**
241: * Returns the namespace of the derivation that this implementation
242: * refers to.
243: *
244: * @return the namespace of the derivation.
245: */
246: protected abstract String getDerivationNamespace();
247:
248: /**
249: * Returns the logical name of the derivation that this implementation
250: * refers to.
251: *
252: * @return the name of the derivation.
253: */
254: protected abstract String getDerivationName();
255:
256: /**
257: * Returns the version of the derivation that this implementation
258: * refers to.
259: *
260: * @return the version of the derivation.
261: */
262: protected abstract String getDerivationVersion();
263:
264: /**
265: * It constructs the arguments to the transfer executable that need to be passed
266: * to the executable referred to in this transfer mode.
267: *
268: * @param job the job containing the transfer node.
269: * @param file the FileTransfer that needs to be done.
270: * @return the argument string
271: */
272: protected abstract String generateArgumentString(TransferJob job,
273: FileTransfer file);
274:
275: /**
276: * Returns the complete name for the transformation that the implementation
277: * is using.
278: *
279: * @return the complete name.
280: */
281: protected abstract String getCompleteTCName();
282:
283: }
|