001: /*
002: * This file or a portion of this file is licensed under the terms of
003: * the Globus Toolkit Public License, found in file GTPL, or at
004: * http://www.globus.org/toolkit/download/license.html. This notice must
005: * appear in redistributions of this file, with or without modification.
006: *
007: * Redistributions of this Software, with or without modification, must
008: * reproduce the GTPL in: (1) the Software, or (2) the Documentation or
009: * some other similar material which is provided with the Software (if
010: * any).
011: *
012: * Copyright 1999-2004 University of Chicago and The University of
013: * Southern California. All rights reserved.
014: */
015: package org.griphyn.cPlanner.transfer.implementation;
016:
017: import org.griphyn.cPlanner.classes.TransferJob;
018: import org.griphyn.cPlanner.classes.SubInfo;
019: import org.griphyn.cPlanner.classes.NameValue;
020: import org.griphyn.cPlanner.classes.PlannerOptions;
021: import org.griphyn.cPlanner.classes.FileTransfer;
022:
023: import org.griphyn.cPlanner.common.LogManager;
024: import org.griphyn.cPlanner.common.PegasusProperties;
025:
026: import org.griphyn.cPlanner.namespace.VDS;
027: import org.griphyn.cPlanner.namespace.Condor;
028:
029: import org.griphyn.common.classes.TCType;
030:
031: import org.griphyn.common.catalog.TransformationCatalogEntry;
032:
033: import org.griphyn.common.util.Separator;
034:
035: import java.io.File;
036: import java.io.FileWriter;
037:
038: import java.util.Collection;
039: import java.util.Iterator;
040: import java.util.List;
041: import java.util.Properties;
042: import java.util.ArrayList;
043:
044: /**
045: * The implementation that creates transfer jobs referring to the c rft client
046: * executable distributed with the VDS.
047: *
048: * <p>
049: * The rft client is invoked on the submit host. Hence there should be an
050: * entry in the transformation catalog for logical transformation
051: * <code>CRFT</code> at site <code>local</code>. The transformation should point
052: * to the client that is distributed with RFT in GT4.
053: *
054: * <p>
055: * The user can tweak the options to the globus-crft client by specifying the properties
056: * in the properties files with the prefix <code>pegasus.transfer.crft</code>.
057: * The following table lists all the properties with their prefixes stripped off,
058: * that the user can specify in the properties files. The default value is used
059: * if the user does not specify a particular property. If a value is not specified,
060: * the particular option is not generated.
061: *
062: * <p>
063: * For the properties which have a default value of no default and the user not
064: * providing a value for the property, the option is not propogated further to the
065: * client underneath. In that case, it is upto the client to construct the
066: * appropriate value for that property/option.
067: *
068: * <p>
069: * <table border="1">
070: * <tr align="left"><th>property</th><th>default value</th><th>description</th></tr>
071: * <tr align="left"><th>endpoint</th>
072: * <td>no default (required option)</td>
073: * <td>The endpoint to contact when creating a service.</td>
074: * </tr>
075: * <tr align="left"><th>concurrent</th>
076: * <td>no default</td>
077: * <td>The number of simultaneous transfers.</td>
078: * </tr>
079: * <tr align="left"><th>parallel</th>
080: * <td>no default</td>
081: * <td>The number of parallel sockets to use with each transfer.</td>
082: * </tr>
083: * <tr align="left"><th>tcp-bs</th>
084: * <td>no default</td>
085: * <td>specifies the size (in bytes) of the TCP buffer to be used by the
086: * underlying ftp data channels</td>
087: * </tr>
088: * <tr align="left"><th>verbose</th>
089: * <td>true</td>
090: * <td>to generate more verbose output, helpful for debugging.</td>
091: * </tr>
092: * </table>
093: *
094: * <p>
095: * It leads to the creation of the setup chmod jobs to the workflow, that appear
096: * as parents to compute jobs in case the transfer implementation does not
097: * preserve the X bit on the file being transferred. This is required for
098: * staging of executables as part of the workflow. The setup jobs are only added
099: * as children to the stage in jobs.
100: *
101: * <p>
102: * In order to use the transfer implementation implemented by this class,
103: * <pre>
104: * - the property pegasus.transfer.*.impl must be set to value CRFT.
105: * </pre>
106: *
107: * <p>
108: * There should be an entry in the transformation catalog with the fully qualified
109: * name as <code>globus::crft</code> for all the sites where workflow is run,
110: * or on the local site in case of third party transfers.
111: *
112: *
113: * @author Karan Vahi
114: * @version $Revision: 145 $
115: */
116: public class CRFT extends AbstractMultipleFTPerXFERJob {
117:
118: /**
119: * The transformation namespace for the transfer job.
120: */
121: public static final String TRANSFORMATION_NAMESPACE = "globus";
122:
123: /**
124: * The name of the underlying transformation that is queried for in the
125: * Transformation Catalog.
126: */
127: public static final String TRANSFORMATION_NAME = "crft";
128:
129: /**
130: * The version number for the transfer job.
131: */
132: public static final String TRANSFORMATION_VERSION = null;
133:
134: /**
135: * The derivation namespace for for the transfer job.
136: */
137: public static final String DERIVATION_NAMESPACE = "globus";
138:
139: /**
140: * The name of the underlying derivation.
141: */
142: public static final String DERIVATION_NAME = "crft";
143:
144: /**
145: * The derivation version number for the transfer job.
146: */
147: public static final String DERIVATION_VERSION = "1.0";
148:
149: /**
150: * A short description of the transfer implementation.
151: */
152: public static final String DESCRIPTION = "C based blocking RFT client";
153:
154: /**
155: * The prefix for all the properties this mode requires.
156: */
157: public static final String PROPERTIES_PREFIX = "pegasus.transfer.crft.";
158:
159: /**
160: * The key name that denotes the endpoint to contact when creating a
161: * service.
162: */
163: public static final String END_POINT_KEY = "endpoint";
164:
165: /**
166: * The key name that denotes to create a RFT service.
167: */
168: public static final String CREATE_KEY = "create";
169:
170: /**
171: * The key name that denotes to start a RFT service.
172: */
173: public static final String SUBMIT_KEY = "submit";
174:
175: /**
176: * The key name that denotes to monitor the request. Makes the client block.
177: */
178: public static final String MONITOR_KEY = "monitor";
179:
180: /**
181: * The key name that denotes the TCP buffer size in bytes.
182: */
183: public static final String TCP_BUFFER_SIZE_KEY = "tcp-bs";
184:
185: /**
186: * The key name that denotes whether to do verbose or not.
187: */
188: public static final String VERBOSE_KEY = "verbose";
189:
190: /**
191: * The key name that denotes the number of files to be transferred at any
192: * given time.
193: */
194: public static final String CONCURRENT_KEY = "concurrent";
195:
196: /**
197: * The key name that denotes the number of parallel sockets to use for each
198: * transfer.
199: */
200: public static final String PARALLEL_KEY = "parallel";
201:
202: /**
203: * The key name that points to the transfer file that is containing the
204: * source and destination urls.
205: */
206: public static final String TRANSFER_FILE_KEY = "transfer-file";
207:
208: /**
209: * The options delimiter that is prepended before all the options.
210: */
211: private static final String OPTIONS_DELIMITER = "--";
212:
213: /**
214: * The end point for the service.
215: */
216: private String mEndPoint;
217:
218: /**
219: * The properties object holding all the RFT specific properties specified
220: * by the user in the properties file.
221: */
222: private Properties mCRFTProps;
223:
224: /**
225: * The overloaded constructor, that is called by the Factory to load the
226: * class.
227: *
228: * @param properties the properties object.
229: * @param options the options passed to the Planner.
230: */
231: public CRFT(PegasusProperties properties, PlannerOptions options) {
232: super (properties, options);
233: mCRFTProps = mProps.matchingSubset(PROPERTIES_PREFIX, false);
234:
235: mEndPoint = mCRFTProps.getProperty(END_POINT_KEY);
236: //sanity check
237: if (mEndPoint == null || mEndPoint.length() == 0) {
238: String message = "Need to specify a non empty end point using "
239: + "the property "
240: + PROPERTIES_PREFIX
241: + END_POINT_KEY;
242: throw new RuntimeException(message);
243: }
244:
245: }
246:
247: /**
248: * Calls out to the super class method to create the main structure of the job.
249: * In addition, for the CRFT adds the specific condor magic that allows for
250: * the transfer of the input file correctly to working directory.
251: *
252: *@param job the SubInfo object for the job, in relation to which
253: * the transfer node is being added. Either the transfer
254: * node can be transferring this jobs input files to
255: * the execution pool, or transferring this job's output
256: * files to the output pool.
257: * @param files collection of <code>FileTransfer</code> objects
258: * representing the data files and staged executables to be
259: * transferred.
260: * @param execFiles subset collection of the files parameter, that identifies
261: * the executable files that are being transferred.
262: * @param txJobName the name of transfer node.
263: * @param jobClass the job Class for the newly added job. Can be one of the
264: * following:
265: * stage-in
266: * stage-out
267: * inter-pool transfer
268: *
269: * @return the created TransferJob.
270: */
271: public TransferJob createTransferJob(SubInfo job, Collection files,
272: Collection execFiles, String txJobName, int jobClass) {
273:
274: TransferJob txJob = super .createTransferJob(job, files,
275: execFiles, txJobName, jobClass);
276: File f = new File(mPOptions.getSubmitDirectory(), txJob.stdIn);
277: //add condor key transfer_input_files to transfer the file
278: txJob.condorVariables.addIPFileForTransfer(f.getAbsolutePath());
279: /*
280: //and other required condor keys
281: txJob.condorVariables.checkKeyInNS(Condor.TRANSFER_IP_FILES_KEY,
282: f.getAbsolutePath());
283: txJob.condorVariables.construct("should_transfer_files","YES");
284: txJob.condorVariables.construct("when_to_transfer_output","ON_EXIT");
285: */
286:
287: //the stdin file needs to be transferred as a file not as stdin
288: txJob.stdIn = "";
289:
290: //we want the transfer job to be run in the
291: //directory that Condor or GRAM decided to run
292: txJob.condorVariables.removeKey("remote_initialdir");
293:
294: return txJob;
295: }
296:
297: /**
298: * Returns the environment profiles that are required for the default
299: * entry to sensibly work. Returns an empty list.
300: *
301: * @param site the site where the job is going to run.
302: *
303: * @return List of environment variables, else null in case where the
304: * required environment variables could not be found.
305: */
306: protected List getEnvironmentVariables(String site) {
307: return new ArrayList();
308: }
309:
310: /**
311: * Return a boolean indicating whether the transfers to be done always in
312: * a third party transfer mode. This always returns true, indicating
313: * transfers can only be done in a third party transfer mode.
314: *
315: * A value of false does not preclude third party transfers. They still can
316: * be done, by setting the property "pegasus.transfer.*.thirdparty.sites".
317: *
318: * @return false
319: */
320: public boolean useThirdPartyTransferAlways() {
321: return false;
322: }
323:
324: /**
325: * Returns a boolean indicating whether the transfer protocol being used by
326: * the implementation preserves the X Bit or not while staging.
327: *
328: * @return boolean
329: */
330: public boolean doesPreserveXBit() {
331: return true;
332: }
333:
334: /**
335: * Adds the dirmanager job to the workflow, that do a chmod on the executable
336: * files that are being staged. It is empty as RFT preserves X bit permission
337: * while staging files.
338: *
339: * @param computeJobName the name pf the computeJob for which the files are
340: * being staged.
341: * @param txJobName the name of the transfer job that is staging the files.
342: * @param execFiles the executable files that are being staged.
343: * @param transferClass the class of transfer job
344: *
345: * @return boolean indicating whether any XBitJobs were succesfully added or
346: * not.
347: */
348: public boolean addSetXBitJobs(String computeJobName,
349: String txJobName, Collection execFiles, int transferClass) {
350: return false;
351: }
352:
353: /**
354: * Constructs the arguments to the transfer executable that need to be
355: * passed to the executable referred to in this transfer mode. Since the
356: * rft client is run on the submit host, the path to the input file
357: * to the rft client is given, instead of passing it through condor
358: * files.
359: * In addition , it SETS THE STDIN of the transfer job to null, as the
360: * input file is not being sent to the remote sides. There should be a
361: * generic function prepareIPFile to do this.
362: *
363: * @param job the object containing the transfer node.
364: *
365: * @return the argument string
366: */
367: protected String generateArgumentString(TransferJob job) {
368: File f = new File(mPOptions.getSubmitDirectory(), job.stdIn);
369:
370: StringBuffer sb = new StringBuffer();
371:
372: //construct the few default options
373: sb.append(OPTIONS_DELIMITER).append(MONITOR_KEY).append(" ")
374: .append(OPTIONS_DELIMITER).append(CREATE_KEY).append(
375: " ").append(OPTIONS_DELIMITER).append(
376: SUBMIT_KEY).append(" ").append(
377: OPTIONS_DELIMITER).append(VERBOSE_KEY).append(
378: " ");
379:
380: sb.append(construct(END_POINT_KEY, mEndPoint));
381:
382: //construct the optional long opts
383: sb.append(construct(PARALLEL_KEY, mCRFTProps
384: .getProperty(PARALLEL_KEY)));
385: sb.append(construct(CONCURRENT_KEY, mCRFTProps
386: .getProperty(CONCURRENT_KEY)));
387: sb.append(construct(TCP_BUFFER_SIZE_KEY, mCRFTProps
388: .getProperty(TCP_BUFFER_SIZE_KEY)));
389:
390: //construct the transfer file
391: sb.append(construct(TRANSFER_FILE_KEY, f.getName()));
392:
393: //setting the stdin to null. we no longer need it.
394: //if left specified, condor would try to transfer
395: //it via GASS
396: //Commented by Karan Feb 23, 06. We need the path to the stdin still.
397: //job.stdIn = "";
398: return sb.toString();
399: }
400:
401: /**
402: * Writes to a file on the submit host, that is passed to the rft-client
403: * as input. The rft-client is always run on the submit host, and hence
404: * can access the file.
405: *
406: *
407: * @param writer the writer to the stdin file.
408: * @param files Collection of <code>FileTransfer</code> objects containing
409: * the information about sourceam fin and destURL's.
410: *
411: *
412: * @throws Exception
413: */
414: protected void writeJumboStdIn(FileWriter writer, Collection files)
415: throws Exception {
416:
417: //iterating thru all the FileTransfers
418: writer.write("#Source and Destination URLS\n");
419: for (Iterator it = files.iterator(); it.hasNext();) {
420: FileTransfer ft = (FileTransfer) it.next();
421: //the FileTransfer object writes out in T2 compatible format
422: writer.write(ft.getSourceURL().getValue());
423: writer.write(" ");
424: writer.write(ft.getDestURL().getValue());
425: writer.write("\n");
426: }
427: writer.flush();
428: }
429:
430: /**
431: * Returns a textual description of the transfer implementation.
432: *
433: * @return a short textual description
434: */
435: public String getDescription() {
436: return this .DESCRIPTION;
437: }
438:
439: /**
440: * Retrieves the transformation catalog entry for the executable that is
441: * being used to transfer the files in the implementation.
442: *
443: * @param siteHandle the handle of the site where the transformation is
444: * to be searched.
445: *
446: * @return the transformation catalog entry if found, else null.
447: */
448: public TransformationCatalogEntry getTransformationCatalogEntry(
449: String siteHandle) {
450: List tcentries = null;
451: try {
452: //namespace and version are null for time being
453: tcentries = mTCHandle.getTCEntries(
454: this .TRANSFORMATION_NAMESPACE,
455: this .TRANSFORMATION_NAME,
456: this .TRANSFORMATION_VERSION, siteHandle,
457: TCType.INSTALLED);
458: } catch (Exception e) {
459: mLogger.log("Unable to retrieve entry from TC for "
460: + getCompleteTCName() + " :" + e.getMessage(),
461: LogManager.ERROR_MESSAGE_LEVEL);
462: }
463:
464: //see if any record is returned or not
465: return (tcentries == null) ? null
466: : (TransformationCatalogEntry) tcentries.get(0);
467: }
468:
469: /**
470: * Returns the namespace of the derivation that this implementation
471: * refers to.
472: *
473: * @return the namespace of the derivation.
474: */
475: protected String getDerivationNamespace() {
476: return this .DERIVATION_NAMESPACE;
477: }
478:
479: /**
480: * Returns the logical name of the derivation that this implementation
481: * refers to.
482: *
483: * @return the name of the derivation.
484: */
485: protected String getDerivationName() {
486: return this .DERIVATION_NAME;
487: }
488:
489: /**
490: * Returns the version of the derivation that this implementation
491: * refers to.
492: *
493: * @return the version of the derivation.
494: */
495: protected String getDerivationVersion() {
496: return this .DERIVATION_VERSION;
497: }
498:
499: /**
500: * Returns the complete name for the transformation.
501: *
502: * @return the complete name.
503: */
504: protected String getCompleteTCName() {
505: return Separator.combine(this .TRANSFORMATION_NAMESPACE,
506: this .TRANSFORMATION_NAME, this .TRANSFORMATION_VERSION);
507: }
508:
509: /**
510: * A helper method to generate a required argument option for the client.
511: * It is generated only if a non null value is passed.
512: *
513: * @param option the long version of the option.
514: * @param value the value for the option
515: *
516: * @return the constructed string.
517: */
518: private String construct(String option, String value) {
519: if (value == null || value.length() == 0) {
520: return "";
521: }
522: StringBuffer sb = new StringBuffer(16);
523: sb.append(OPTIONS_DELIMITER).append(option).append(" ").append(
524: value).append(" ");
525:
526: return sb.toString();
527: }
528:
529: }
|