001: /*
002: * This file or a portion of this file is licensed under the terms of
003: * the Globus Toolkit Public License, found in file GTPL, or at
004: * http://www.globus.org/toolkit/download/license.html. This notice must
005: * appear in redistributions of this file, with or without modification.
006: *
007: * Redistributions of this Software, with or without modification, must
008: * reproduce the GTPL in: (1) the Software, or (2) the Documentation or
009: * some other similar material which is provided with the Software (if
010: * any).
011: *
012: * Copyright 1999-2004 University of Chicago and The University of
013: * Southern California. All rights reserved.
014: */
015: package org.griphyn.cPlanner.transfer.implementation;
016:
017: import org.griphyn.cPlanner.classes.TransferJob;
018: import org.griphyn.cPlanner.classes.PlannerOptions;
019: import org.griphyn.cPlanner.classes.SubInfo;
020: import org.griphyn.cPlanner.classes.JobManager;
021: import org.griphyn.cPlanner.classes.SiteInfo;
022: import org.griphyn.cPlanner.classes.FileTransfer;
023: import org.griphyn.cPlanner.classes.NameValue;
024:
025: import org.griphyn.cPlanner.common.PegasusProperties;
026: import org.griphyn.cPlanner.common.LogManager;
027:
028: import org.griphyn.common.util.Separator;
029:
030: import org.griphyn.common.catalog.TransformationCatalogEntry;
031:
032: import java.util.Iterator;
033: import java.util.List;
034: import java.util.ArrayList;
035: import java.util.Collection;
036: import java.util.HashSet;
037:
038: import java.io.FileWriter;
039:
040: import java.net.URL;
041:
042: /**
043: * A prototype implementation that leverages the Condor file transfer mechanism
044: * to do the transfer to the remote directory . Currently, this will only
045: * work for staging in data to a remote site from the submit host.
046: * <p>
047: * Additionally, this will only work with local replica selector that prefers
048: * file urls from the submit host for staging.
049: *
050: * <p>
051: * In order to use the transfer implementation implemented by this class,
052: * <pre>
053: * - property <code>pegasus.transfer.stagein.impl</code> must be set to
054: * value <code>Condor</code>.
055: * - property <code>pegasus.selector.replica</code> must be set to value
056: * <code>Local</code>
057: * </pre>
058: *
059: * @author Karan Vahi
060: * @version $Revision: 387 $
061: */
062: public class Condor extends AbstractMultipleFTPerXFERJob {
063:
064: /**
065: * The transformation namespace for the transfer job.
066: */
067: public static final String TRANSFORMATION_NAMESPACE = "pegasus";
068:
069: /**
070: * The name of the underlying transformation that is queried for in the
071: * Transformation Catalog.
072: */
073: public static final String TRANSFORMATION_NAME = "true";
074:
075: /**
076: * The version number for the transfer job.
077: */
078: public static final String TRANSFORMATION_VERSION = null;
079:
080: /**
081: * The derivation namespace for for the transfer job.
082: */
083: public static final String DERIVATION_NAMESPACE = "pegasus";
084:
085: /**
086: * The name of the underlying derivation.
087: */
088: public static final String DERIVATION_NAME = "true";
089:
090: /**
091: * The derivation version number for the transfer job.
092: */
093: public static final String DERIVATION_VERSION = "1.0";
094:
095: /**
096: * A short description of the transfer implementation.
097: */
098: public static final String DESCRIPTION = "Condor File Transfer Mechanism";
099:
100: /**
101: * The overloaded constructor, that is called by the Factory to load the
102: * class.
103: *
104: * @param properties the properties object.
105: * @param options the options passed to the Planner.
106: */
107: public Condor(PegasusProperties properties, PlannerOptions options) {
108: super (properties, options);
109: }
110:
111: /**
112: * Returns a boolean indicating whether the transfer protocol being used
113: * by the implementation preserves the X Bit or not while staging.
114: *
115: * @return false
116: */
117: public boolean doesPreserveXBit() {
118: return false;
119: }
120:
121: /**
122: * It constructs the arguments to the transfer executable that need to be
123: * passed to the executable referred to in this transfer mode.
124: *
125: * @param job the object containing the transfer node.
126: * @return the argument string
127: */
128: protected String generateArgumentString(TransferJob job) {
129: return "";
130: }
131:
132: /**
133: * Returns the complete name for the transformation.
134: *
135: * @return the complete name.
136: */
137: protected String getCompleteTCName() {
138: return Separator.combine(this .TRANSFORMATION_NAMESPACE,
139: this .TRANSFORMATION_NAME, this .TRANSFORMATION_VERSION);
140: }
141:
142: /**
143: * Returns the logical name of the derivation that this implementation
144: * refers to.
145: *
146: * @return the name of the derivation.
147: */
148: protected String getDerivationName() {
149: return this .DERIVATION_NAME;
150: }
151:
152: /**
153: * Returns the namespace of the derivation that this implementation
154: * refers to.
155: *
156: * @return the namespace of the derivation.
157: */
158: protected String getDerivationNamespace() {
159: return this .DERIVATION_NAMESPACE;
160: }
161:
162: /**
163: * Returns the version of the derivation that this implementation refers
164: * to.
165: *
166: * @return the version of the derivation.
167: */
168: protected String getDerivationVersion() {
169: return this .DERIVATION_VERSION;
170: }
171:
172: /**
173: * Returns a textual description of the transfer implementation.
174: *
175: * @return a short textual description
176: */
177: public String getDescription() {
178: return Condor.DESCRIPTION;
179: }
180:
181: /**
182: * Returns the environment profiles that are required for the default
183: * entry to sensibly work. There are no variables to be returned for
184: * this case.
185: *
186: * @param site the site where the job is going to run.
187: * @return an empty list
188: */
189: protected List getEnvironmentVariables(String site) {
190: return new ArrayList(0);
191: }
192:
193: /**
194: * Constructs a condor file transfer job that handles multiple transfers.
195: * The job itself is a /bin/true job that currently only manages to
196: * transfer input files from the local host.
197: *
198: * @param job the SubInfo object for the job, in relation to which
199: * the transfer node is being added. Either the transfer
200: * node can be transferring this jobs input files to
201: * the execution pool, or transferring this job's output
202: * files to the output pool.
203: * @param files collection of <code>FileTransfer</code> objects
204: * representing the data files and staged executables to be
205: * transferred.
206: * @param execFiles subset collection of the files parameter, that identifies
207: * the executable files that are being transferred.
208: * @param txJobName the name of transfer node.
209: * @param jobClass the job Class for the newly added job. Can be one of the
210: * following:
211: * stage-in
212: * stage-out
213: * inter-pool transfer
214: *
215: * @return the created TransferJob.
216: */
217: public TransferJob createTransferJob(SubInfo job, Collection files,
218: Collection execFiles, String txJobName, int jobClass) {
219:
220: //sanity check
221: if (jobClass != SubInfo.STAGE_IN_JOB) {
222: throw new RuntimeException(
223: "Condor file transfer can only be used for stagein");
224: }
225:
226: TransferJob txJob = new TransferJob();
227:
228: //run job always on the site where the compute job runs
229: txJob.setSiteHandle(job.getSiteHandle());
230:
231: //the non third party site for the transfer job is
232: //always the job execution site for which the transfer
233: //job is being created.
234: txJob.setNonThirdPartySite(job.getSiteHandle());
235:
236: txJob.setName(txJobName);
237: txJob.setUniverse("globus");
238:
239: txJob.setTransformation(this .TRANSFORMATION_NAMESPACE,
240: this .TRANSFORMATION_NAME, this .TRANSFORMATION_VERSION);
241:
242: txJob.setDerivation(this .DERIVATION_NAMESPACE,
243: this .DERIVATION_NAMESPACE, this .DERIVATION_VERSION);
244:
245: txJob.setRemoteExecutable("/bin/true");
246:
247: //add input files for transfer since we are only doing for
248: //creating stagein jobs
249: for (Iterator it = files.iterator(); it.hasNext();) {
250: FileTransfer ft = (FileTransfer) it.next();
251: NameValue nv = ft.getSourceURL();
252: //sanity check first
253: if (!nv.getKey().equals("local")) {
254: throw new RuntimeException(
255: "Condor File transfer can only do stagein from local site. "
256: + "Unable to transfer " + ft);
257: }
258: //put the url in only if it is a file url
259: String url = nv.getValue();
260: if (url.startsWith("file:/")) {
261: try {
262: txJob.condorVariables.addIPFileForTransfer(new URL(
263: url).getPath());
264: } catch (Exception e) {
265: throw new RuntimeException("Malformed source URL "
266: + url);
267: }
268: }
269: }
270:
271: //this should in fact only be set
272: // for non third party pools
273: //we first check if there entry for transfer universe,
274: //if no then go for globus
275: SiteInfo ePool = mSCHandle
276: .getTXPoolEntry(txJob.getSiteHandle());
277: JobManager jobmanager = ePool.selectJobManager(
278: this .TRANSFER_UNIVERSE, true);
279: txJob.setJobManager((jobmanager == null) ? null : jobmanager
280: .getInfo(JobManager.URL));
281:
282: txJob.setJobType(jobClass);
283: txJob.setVDSSuperNode(job.jobName);
284:
285: txJob.stdErr = "";
286: txJob.stdOut = "";
287:
288: //the i/p and o/p files remain empty
289: //as we doing just copying urls
290: txJob.inputFiles = new HashSet();
291:
292: //to get the file stat information we need to put
293: //the files as output files of the transfer job
294: txJob.outputFiles = new HashSet(files);
295:
296: //the profile information from the pool catalog needs to be
297: //assimilated into the job.
298: txJob.updateProfiles(mSCHandle.getPoolProfile(txJob
299: .getSiteHandle()));
300:
301: //the profile information from the transformation
302: //catalog needs to be assimilated into the job
303: //overriding the one from pool catalog.
304: // txJob.updateProfiles(tcEntry);
305:
306: //the profile information from the properties file
307: //is assimilated overidding the one from transformation
308: //catalog.
309: txJob.updateProfiles(mProps);
310:
311: //apply the priority to the transfer job
312: this .applyPriority(txJob);
313:
314: //constructing the arguments to transfer script
315: //they only have to be incorporated after the
316: //profile incorporation
317: txJob.strargs = this .generateArgumentString(txJob);
318:
319: if (execFiles != null) {
320: //we need to add setup jobs to change the XBit
321: super .addSetXBitJobs(job, txJob, execFiles);
322: }
323:
324: return txJob;
325: }
326:
327: /**
328: * Retrieves the transformation catalog entry for the executable that is
329: * being used to transfer the files in the implementation.
330: *
331: * @param siteHandle the handle of the site where the transformation is
332: * to be searched.
333: * @return the transformation catalog entry if found, else null.
334: */
335: public TransformationCatalogEntry getTransformationCatalogEntry(
336: String siteHandle) {
337: return null;
338: }
339:
340: /**
341: * Return a boolean indicating whether the transfers to be done always in
342: * a third party transfer mode. Fix me. should say NEVER.
343: *
344: * @return boolean indicating whether to always use third party
345: * transfers or not.
346: *
347: */
348: public boolean useThirdPartyTransferAlways() {
349: return false;
350: }
351:
352: /**
353: * Writes to a FileWriter stream the stdin which goes into the magic
354: * script via standard input
355: *
356: * @param stdIn the writer to the stdin file.
357: * @param files Collection of <code>FileTransfer</code> objects
358: * containing the information about sourceam fin and destURL's.
359: * @throws Exception
360: */
361: protected void writeJumboStdIn(FileWriter stdIn, Collection files)
362: throws Exception {
363: }
364: }
|