001: /*
002: * This file or a portion of this file is licensed under the terms of
003: * the Globus Toolkit Public License, found in file GTPL, or at
004: * http://www.globus.org/toolkit/download/license.html. This notice must
005: * appear in redistributions of this file, with or without modification.
006: *
007: * Redistributions of this Software, with or without modification, must
008: * reproduce the GTPL in: (1) the Software, or (2) the Documentation or
009: * some other similar material which is provided with the Software (if
010: * any).
011: *
012: * Copyright 1999-2004 University of Chicago and The University of
013: * Southern California. All rights reserved.
014: */
015: package org.griphyn.cPlanner.transfer.implementation;
016:
017: import org.griphyn.cPlanner.classes.SubInfo;
018: import org.griphyn.cPlanner.classes.TransferJob;
019: import org.griphyn.cPlanner.classes.NameValue;
020: import org.griphyn.cPlanner.classes.PlannerOptions;
021: import org.griphyn.cPlanner.classes.FileTransfer;
022: import org.griphyn.cPlanner.classes.SiteInfo;
023: import org.griphyn.cPlanner.classes.JobManager;
024:
025: import org.griphyn.cPlanner.common.LogManager;
026: import org.griphyn.cPlanner.common.PegasusProperties;
027:
028: import org.griphyn.cPlanner.namespace.VDS;
029:
030: import org.griphyn.common.catalog.TransformationCatalogEntry;
031:
032: import org.griphyn.common.util.Separator;
033:
034: import java.util.Collection;
035: import java.util.Iterator;
036: import java.util.HashSet;
037:
038: /**
039: * The implementation that creates transfer jobs referring to the stork data
040: * placement scheduler that can handle only one transfer per job.
041: *
042: * <p>
043: * Stork is directly invoked by DAGMAN. The appropriate Stork modules need to
044: * be installed on the submit host.
045: *
046: * <p>
047: * It leads to the creation of the setup chmod jobs to the workflow, that appear
048: * as parents to compute jobs in case the transfer implementation does not
049: * preserve the X bit on the file being transferred. This is required for
050: * staging of executables as part of the workflow. The setup jobs are only added
051: * as children to the stage in jobs.
052: *
053: * <p>
054: * In order to use the transfer implementation implemented by this class, the
055: * property <code>vds.transfer.*.impl</code> must be set to
056: * value <code>Stork</code>.
057: *
058: * @author Karan Vahi
059: * @version $Revision: 458 $
060: */
061: public class Stork extends AbstractSingleFTPerXFERJob {
062:
063: /**
064: * The transformation namespace for the transfer job.
065: */
066: public static final String TRANSFORMATION_NAMESPACE = null;
067:
068: /**
069: * The name of the underlying transformation that is queried for in the
070: * Transformation Catalog.
071: */
072: public static final String TRANSFORMATION_NAME = "stork";
073:
074: /**
075: * The version number for the transfer job.
076: */
077: public static final String TRANSFORMATION_VERSION = null;
078:
079: /**
080: * The derivation namespace for for the transfer job.
081: */
082: public static final String DERIVATION_NAMESPACE = "condor";
083:
084: /**
085: * The name of the underlying derivation.
086: */
087: public static final String DERIVATION_NAME = "stork";
088:
089: /**
090: * The derivation version number for the transfer job.
091: */
092: public static final String DERIVATION_VERSION = "1.0";
093:
094: /**
095: * A short description of the transfer implementation.
096: */
097: public static final String DESCRIPTION = "Stork Data Placement Scheduler that does only one transfer per invocation";
098:
099: /**
100: * The overloaded constructor, that is called by the Factory to load the
101: * class.
102: *
103: * @param properties the properties object.
104: * @param options the options passed to the Planner.
105: */
106: public Stork(PegasusProperties properties, PlannerOptions options) {
107: super (properties, options);
108: }
109:
110: /**
111: * Return a boolean indicating whether the transfers to be done always in
112: * a third party transfer mode. A value of false, results in the
113: * direct or peer to peer transfers being done.
114: * <p>
115: * A value of false does not preclude third party transfers. They still can
116: * be done, by setting the property "vds.transfer.*.thirdparty.sites".
117: *
118: * @return boolean indicating whether to always use third party transfers
119: * or not.
120: *
121: * @see PegasusProperties#getThirdPartySites(String)
122: */
123: public boolean useThirdPartyTransferAlways() {
124: return true;
125: }
126:
127: /**
128: * Returns a boolean indicating whether the transfer protocol being used by
129: * the implementation preserves the X Bit or not while staging.
130: *
131: * @return boolean
132: */
133: public boolean doesPreserveXBit() {
134: return false;
135: }
136:
137: /**
138: * Returns a textual description of the transfer implementation.
139: *
140: * @return a short textual description
141: */
142: public String getDescription() {
143: return this .DESCRIPTION;
144: }
145:
146: /**
147: * Constructs a general transfer job that handles single transfers per
148: * transfer job. There are appropriate callouts to generate the implementation
149: * specific details. It throws an error if asked to create a transfer job
150: * for more than one transfer.
151: *
152: * @param job the SubInfo object for the job, in relation to which
153: * the transfer node is being added. Either the transfer
154: * node can be transferring this jobs input files to
155: * the execution pool, or transferring this job's output
156: * files to the output pool.
157: * @param file collection of <code>FileTransfer</code> objects
158: * representing the data files and staged executables to be
159: * transferred.
160: * @param execFiles subset collection of the files parameter, that identifies
161: * the executable files that are being transferred.
162: * @param txJobName the name of transfer node.
163: * @param jobClass the job Class for the newly added job. Can be one of the
164: * following:
165: * stage-in
166: * stage-out
167: * inter-pool transfer
168: *
169: * @return the created TransferJob.
170: */
171: public TransferJob createTransferJob(SubInfo job,
172: FileTransfer file, Collection execFiles, String txJobName,
173: int jobClass) {
174:
175: TransferJob txJob = new TransferJob();
176: SiteInfo ePool;
177: JobManager jobmanager;
178:
179: //Stork does the transfer . Hence set the transfer pool to stork
180: String tPool = "stork";
181:
182: //the non third party site for the transfer job is
183: //always the job execution site for which the transfer
184: //job is being created.
185: txJob.setNonThirdPartySite(job.getSiteHandle());
186:
187: //we first check if there entry for transfer universe,
188: //if no then go for globus
189: // ePool = mSCHandle.getTXPoolEntry(tPool);
190:
191: txJob.jobName = txJobName;
192: txJob.executionPool = tPool;
193: txJob.condorUniverse = "globus";
194:
195: /* TransformationCatalogEntry tcEntry = this.getTransformationCatalogEntry(tPool);
196: if(tcEntry == null){
197: //should throw a TC specific exception
198: StringBuffer error = new StringBuffer();
199: error.append( "Could not find entry in tc for lfn " ).append( getCompleteTCName() ).
200: append(" at site " ).append( txJob.getSiteHandle());
201: mLogger.log( error.toString(), LogManager.ERROR_MESSAGE_LEVEL);
202: throw new RuntimeException( error.toString() );
203:
204: }
205: */
206:
207: txJob.namespace = this .TRANSFORMATION_NAMESPACE;
208: txJob.logicalName = this .TRANSFORMATION_NAME;
209: txJob.version = null;
210:
211: txJob.dvName = this .getDerivationName();
212: txJob.dvNamespace = this .getDerivationNamespace();
213: txJob.dvVersion = this .getDerivationVersion();
214:
215: //this should in fact only be set
216: // for non third party pools
217: // jobmanager = ePool.selectJobManager(this.TRANSFER_UNIVERSE,true);
218: // txJob.globusScheduler = (jobmanager == null) ?
219: // null :
220: // jobmanager.getInfo(JobManager.URL);
221:
222: txJob.jobClass = jobClass;
223: txJob.jobID = job.jobName;
224:
225: txJob.stdErr = "";
226: txJob.stdOut = "";
227:
228: txJob.executable = null;
229:
230: //the i/p and o/p files remain empty
231: //as we doing just copying urls
232: txJob.inputFiles = new HashSet();
233: txJob.outputFiles = new HashSet();
234:
235: //no stdin file is written out
236:
237: //the profile information from the pool catalog needs to be
238: //assimilated into the job.
239: txJob.updateProfiles(mSCHandle.getPoolProfile(tPool));
240:
241: //the profile information from the transformation
242: //catalog needs to be assimilated into the job
243: //overriding the one from pool catalog.
244: //txJob.updateProfiles(tcEntry);
245:
246: //the profile information from the properties file
247: //is assimilated overidding the one from transformation
248: //catalog.
249: txJob.updateProfiles(mProps);
250:
251: //take care of transfer of proxies
252: this .checkAndTransferProxy(txJob);
253:
254: //apply the priority to the transfer job
255: this .applyPriority(txJob);
256:
257: //constructing the arguments to transfer script
258: //they only have to be incorporated after the
259: //profile incorporation
260: txJob.strargs = this .generateArgumentString(txJob, file);
261:
262: if (execFiles != null) {
263: //we need to add setup jobs to change the XBit
264: super .addSetXBitJobs(job, txJob, execFiles);
265: }
266: return txJob;
267: }
268:
269: /**
270: * Retrieves the transformation catalog entry for the executable that is
271: * being used to transfer the files in the implementation. The entry
272: * does not refer to any physical path.
273: *
274: * @param siteHandle the handle of the site where the transformation is
275: * to be searched.
276: *
277: * @return the transformation catalog entry if found, else null.
278: */
279: public TransformationCatalogEntry getTransformationCatalogEntry(
280: String siteHandle) {
281: return new TransformationCatalogEntry(
282: this .TRANSFORMATION_NAMESPACE,
283: this .TRANSFORMATION_NAME, this .TRANSFORMATION_VERSION);
284: }
285:
286: /**
287: * Returns the namespace of the derivation that this implementation
288: * refers to.
289: *
290: * @return the namespace of the derivation.
291: */
292: protected String getDerivationNamespace() {
293: return this .DERIVATION_NAMESPACE;
294: }
295:
296: /**
297: * Returns the logical name of the derivation that this implementation
298: * refers to.
299: *
300: * @return the name of the derivation.
301: */
302: protected String getDerivationName() {
303: return this .DERIVATION_NAME;
304: }
305:
306: /**
307: * Returns the version of the derivation that this implementation
308: * refers to.
309: *
310: * @return the version of the derivation.
311: */
312: protected String getDerivationVersion() {
313: return this .DERIVATION_VERSION;
314: }
315:
316: /**
317: * It constructs the arguments to the transfer executable that need to be passed
318: * to the executable referred to in this transfer mode.
319: *
320: * @param job the transfer job that is being created.
321: * @param file the FileTransfer that needs to be done.
322: * @return the argument string
323: */
324: protected String generateArgumentString(TransferJob job,
325: FileTransfer file) {
326: StringBuffer sb = new StringBuffer();
327: if (job.vdsNS.containsKey(VDS.TRANSFER_ARGUMENTS_KEY)) {
328: sb.append(job.vdsNS.removeKey(VDS.TRANSFER_ARGUMENTS_KEY));
329: }
330: sb.append(((NameValue) file.getSourceURL()).getValue()).append(
331: "\n")
332: .append(((NameValue) file.getDestURL()).getValue());
333: return sb.toString();
334: }
335:
336: /**
337: * Returns the complete name for the transformation.
338: *
339: * @return the complete name.
340: */
341: protected String getCompleteTCName() {
342: return Separator.combine(this.TRANSFORMATION_NAMESPACE,
343: this.TRANSFORMATION_NAME, this.TRANSFORMATION_VERSION);
344: }
345: }
|