001: /*
002: * This file or a portion of this file is licensed under the terms of
003: * the Globus Toolkit Public License, found in file GTPL, or at
004: * http://www.globus.org/toolkit/download/license.html. This notice must
005: * appear in redistributions of this file, with or without modification.
006: *
007: * Redistributions of this Software, with or without modification, must
008: * reproduce the GTPL in: (1) the Software, or (2) the Documentation or
009: * some other similar material which is provided with the Software (if
010: * any).
011: *
012: * Copyright 1999-2004 University of Chicago and The University of
013: * Southern California. All rights reserved.
014: */
015: package org.griphyn.cPlanner.transfer.implementation;
016:
017: import org.griphyn.cPlanner.classes.TransferJob;
018: import org.griphyn.cPlanner.classes.NameValue;
019: import org.griphyn.cPlanner.classes.PlannerOptions;
020: import org.griphyn.cPlanner.classes.FileTransfer;
021:
022: import org.griphyn.cPlanner.common.LogManager;
023: import org.griphyn.cPlanner.common.PegasusProperties;
024:
025: import org.griphyn.cPlanner.namespace.VDS;
026:
027: import org.griphyn.common.classes.TCType;
028:
029: import org.griphyn.common.catalog.TransformationCatalogEntry;
030:
031: import org.griphyn.common.util.Separator;
032:
033: import java.io.FileWriter;
034: import java.io.File;
035:
036: import java.util.Collection;
037: import java.util.Iterator;
038: import java.util.List;
039: import java.util.Properties;
040: import java.util.ArrayList;
041:
042: /**
043: * The implementation that creates transfer jobs referring to the rft-client
044: * distributed with GT4 to do transfers between various sites.
045: * The rft-client connects to a RFT service running on a particular host.
046: * <p>
047: * The rft client is always invoked on the submit host. Hence there should be an
048: * entry in the transformation catalog for logical transformation
049: * <code>rft</code> at site <code>local</code>. The transformation should point
050: * to the client that is distributed with RFT in GT4.
051: * <p>
052: * The user can tweak the options to the rft client by specifying the properties
053: * in the properties files with the prefix <code>vds.transfer.rft</code>.
054: * The following table lists all the properties with their prefixes stripped off,
055: * that the user can specify in the properties files. The default value is used
056: * if the user does not specify a particular property.
057: * <p>
058: * <table border="1">
059: * <tr align="left"><th>property</th><th>default value</th><th>description</th></tr>
060: * <tr align="left"><th>host</th>
061: * <td>localhost</td>
062: * <td>the host-ip of the container.</td>
063: * </tr>
064: * <tr align="left"><th>port</th>
065: * <td>8080</td>
066: * <td>the port at which the container is running.</td>
067: * </tr>
068: * <tr align="left"><th>binary</th>
069: * <td>true</td>
070: * <td>whether to do transfers in binary mode or not.</td>
071: * </tr>
072: * <tr align="left"><th>bs</th>
073: * <td>16000</td>
074: * <td>block size in bytes that is transferred.</td>
075: * </tr>
076: * <tr align="left"><th>tcpbs</th>
077: * <td>16000</td>
078: * <td>specifies the size (in bytes) of the TCP buffer to be used by the
079: * underlying ftp data channels</td>
080: * </tr>
081: * <tr align="left"><th>notpt</th>
082: * <td>false</td>
083: * <td>whether to do normal transfers or not.</td>
084: * </tr>
085: * <tr align="left"><th>streams</th>
086: * <td>1</td>
087: * <td>specifies the number of parallel data connections that should be used.</td>
088: * </tr>
089: * <tr align="left"><th>DCAU</th>
090: * <td>true</td>
091: * <td>data channel authentication for ftp transfers.</td>
092: * </tr>
093: * <tr align="left"><th>processes</th>
094: * <td>1</td>
095: * <td>number of files that you want to transfer at any given point</td>
096: * </tr>
097: * <tr align="left"><th>retry</th>
098: * <td>3</td>
099: * <td>number of times RFT retries a transfer failed with a non-fatal error.</td>
100: * </tr>
101: * </table>
102: * <p>
103: * It leads to the creation of the setup chmod jobs to the workflow, that appear
104: * as parents to compute jobs in case the transfer implementation does not
105: * preserve the X bit on the file being transferred. This is required for
106: * staging of executables as part of the workflow. The setup jobs are only added
107: * as children to the stage in jobs.
108: *
109: * <p>
110: * In order to use the transfer implementation implemented by this class,
111: * <pre>
112: * - the property pegasus.transfer.*.impl must be set to value RFT.
113: * </pre>
114: *
115: * <p>
116: * There should be an entry in the transformation catalog with the fully qualified
117: * name as <code>globus::rft</code> for all the sites where workflow is run,
118: * or on the local site in case of third party transfers.
119: *
120: *
121: * @author Karan Vahi
122: * @version $Revision: 145 $
123: */
124: public class RFT extends AbstractMultipleFTPerXFERJob {
125:
126: /**
127: * The transformation namespace for the transfer job.
128: */
129: public static final String TRANSFORMATION_NAMESPACE = "globus";
130:
131: /**
132: * The name of the underlying transformation that is queried for in the
133: * Transformation Catalog.
134: */
135: public static final String TRANSFORMATION_NAME = "rft";
136:
137: /**
138: * The version number for the transfer job.
139: */
140: public static final String TRANSFORMATION_VERSION = null;
141:
142: /**
143: * The derivation namespace for for the transfer job.
144: */
145: public static final String DERIVATION_NAMESPACE = "globus";
146:
147: /**
148: * The name of the underlying derivation.
149: */
150: public static final String DERIVATION_NAME = "rft";
151:
152: /**
153: * The derivation version number for the transfer job.
154: */
155: public static final String DERIVATION_VERSION = "1.0";
156:
157: /**
158: * A short description of the transfer implementation.
159: */
160: public static final String DESCRIPTION = "RFT Java Client";
161:
162: /**
163: * The prefix for all the properties this mode requires.
164: */
165: public static final String PROPERTIES_PREFIX = "pegasus.transfer.rft.";
166:
167: /**
168: * The key name that denotes the host on which the RFT service is running.
169: */
170: public static final String HOST_KEY = "host";
171:
172: /**
173: * The key name that denotes the port on which the RFT service is running.
174: */
175: public static final String PORT_KEY = "port";
176:
177: /**
178: * The key name that denotes whether to do the transfers in binary mode or not.
179: */
180: public static final String BINARY_KEY = "binary";
181:
182: /**
183: * The key name that denotes the block size in bytes that is to be
184: * transferred.
185: */
186: public static final String BLOCK_SIZE_KEY = "bs";
187:
188: /**
189: * The key name that denotes the TCP buffer size in bytes.
190: */
191: public static final String TCP_BUFFER_SIZE_KEY = "tcpbs";
192:
193: /**
194: * The key name that denotes whether to use TPT (third party transfer) or not.
195: */
196: public static final String NO_TPT_KEY = "notpt";
197:
198: /**
199: * The key name that denotes the number of parallel streams to be used.
200: */
201: public static final String STREAMS_KEY = "streams";
202:
203: /**
204: * The key name that denotes whether to use Data Channel Authentication or not.
205: */
206: public static final String DCAU_KEY = "DCAU";
207:
208: /**
209: * The key name that denotes the number of files to be transferred at any
210: * given time.
211: */
212: public static final String PROCESSES_KEY = "processes";
213:
214: /**
215: * The key name that denotes the maximum number of retries that are made
216: * in case of failure.
217: */
218: public static final String RETRY_KEY = "retry";
219:
220: /**
221: * The properties object holding all the RFT specific properties specified
222: * by the user in the properties file.
223: */
224: private Properties mRFTProps;
225:
226: /**
227: * The overloaded constructor, that is called by the Factory to load the
228: * class.
229: *
230: * @param properties the properties object.
231: * @param options the options passed to the Planner.
232: */
233: public RFT(PegasusProperties properties, PlannerOptions options) {
234: super (properties, options);
235: mRFTProps = mProps.matchingSubset(PROPERTIES_PREFIX, false);
236: }
237:
238: /**
239: * Returns a boolean indicating whether the transfer protocol being used by
240: * the implementation preserves the X Bit or not while staging.
241: *
242: * @return boolean
243: */
244: public boolean doesPreserveXBit() {
245: return true;
246: }
247:
248: /**
249: * Adds the dirmanager job to the workflow, that do a chmod on the executable
250: * files that are being staged. It is empty as RFT preserves X bit permission
251: * while staging files.
252: *
253: * @param computeJobName the name pf the computeJob for which the files are
254: * being staged.
255: * @param txJobName the name of the transfer job that is staging the files.
256: * @param execFiles the executable files that are being staged.
257: * @param transferClass the class of transfer job
258: *
259: * @return boolean indicating whether any XBitJobs were succesfully added or
260: * not.
261: */
262: public boolean addSetXBitJobs(String computeJobName,
263: String txJobName, Collection execFiles, int transferClass) {
264: return false;
265: }
266:
267: /**
268: * Return a boolean indicating whether the transfers to be done always in
269: * a third party transfer mode. This always returns true, indicating
270: * transfers can only be done in a third party transfer mode.
271: *
272: * @return true
273: */
274: public boolean useThirdPartyTransferAlways() {
275: return true;
276: }
277:
278: /**
279: * Constructs the arguments to the transfer executable that need to be
280: * passed to the executable referred to in this transfer mode. Since the
281: * rft client is run on the submit host, the path to the input file
282: * to the rft client is given, instead of passing it through condor
283: * files.
284: *
285: * @param job the object containing the transfer node.
286: *
287: * @return the argument string
288: */
289: protected String generateArgumentString(TransferJob job) {
290: File f = new File(mPOptions.getSubmitDirectory(), job.stdIn);
291: StringBuffer sb = new StringBuffer();
292: sb.append(" -h ").append(
293: mRFTProps.getProperty(HOST_KEY, "localhost")).append(
294: " -r ").append(mRFTProps.getProperty(PORT_KEY, "8080"))
295: .append(" -f ").append(f.getAbsolutePath());
296:
297: return sb.toString();
298: }
299:
300: /**
301: * Resets the STDIN of the transfer job to null, as the
302: * input file is not being sent to the remote sides. There should be a
303: * generic function prepareIPFile to do this.
304: *
305: *
306: * @job the <code>TransferJob</code> that has been created.
307: */
308: public void postProcess(TransferJob job) {
309: File f = new File(mPOptions.getSubmitDirectory(), job
310: .getStdIn());
311: //add condor key transfer_input_files to transfer the file
312: job.condorVariables.addIPFileForTransfer(f.getAbsolutePath());
313: job.setStdIn("");
314: }
315:
316: /**
317: * Writes to a file on the submit host, that is passed to the rft-client
318: * as input. The rft-client is always run on the submit host, and hence
319: * can access the file.
320: *
321: * @param writer the writer to the stdin file.
322: * @param files Collection of <code>FileTransfer</code> objects containing
323: * the information about sourceam fin and destURL's.
324: *
325: */
326: protected void writeJumboStdIn(FileWriter writer, Collection files)
327: throws Exception {
328: //write out the fixed header
329: writer.write("#RFT input file generated by VDS\n");
330: writer.write("#true = binary false=ascii\n");
331: writer.write(mRFTProps.getProperty(BINARY_KEY, "true"));
332: writer.write("\n");
333: writer.write("#Block Size in Bytes\n");
334: writer.write(mRFTProps.getProperty(BLOCK_SIZE_KEY, "16000"));
335: writer.write("\n");
336: writer.write("#TCP Buffer Sizes in Bytes\n");
337: writer.write(mRFTProps
338: .getProperty(TCP_BUFFER_SIZE_KEY, "16000"));
339: writer.write("\n");
340: writer.write("#NO tpt (Third Party Transfer\n");
341: writer.write(mRFTProps.getProperty(NO_TPT_KEY, "false"));
342: writer.write("\n");
343: writer.write("#Number of parallel streams\n");
344: writer.write(mRFTProps.getProperty(STREAMS_KEY, "1"));
345: writer.write("\n");
346: writer.write("#Data Channel Authentication (DCAU)\n");
347: writer.write(mRFTProps.getProperty(DCAU_KEY, "true"));
348: writer.write("\n");
349: writer.write("#Concurrency of the request\n");
350: writer.write(mRFTProps.getProperty(PROCESSES_KEY, "1"));
351: writer.write("\n");
352: writer.write("#Grid Subject Name of Source Grid FTP Server\n");
353: writer.write("null");
354: writer.write("\n");
355: writer
356: .write("#Grid Subject Name of Destination Grid FTP Server\n");
357: writer.write("null");
358: writer.write("\n");
359: writer.write("#Transfer all or none of the transfers\n");
360: writer.write("true");
361: writer.write("\n");
362: writer.write("#Maximum number of retries\n");
363: writer.write(mRFTProps.getProperty(RETRY_KEY, "3"));
364: writer.write("\n");
365:
366: //iterating thru all the FileTransfers
367: writer.write("#Source and Destination URLS\n");
368: for (Iterator it = files.iterator(); it.hasNext();) {
369: FileTransfer ft = (FileTransfer) it.next();
370: //the FileTransfer object writes out in T2 compatible format
371: writer.write(ft.getSourceURL().getValue());
372: writer.write("\n");
373: writer.write(ft.getDestURL().getValue());
374: writer.write("\n");
375: }
376: writer.flush();
377: }
378:
379: /**
380: * Returns a textual description of the transfer implementation.
381: *
382: * @return a short textual description
383: */
384: public String getDescription() {
385: return this .DESCRIPTION;
386: }
387:
388: /**
389: * Retrieves the transformation catalog entry for the executable that is
390: * being used to transfer the files in the implementation.
391: *
392: * @param siteHandle the handle of the site where the transformation is
393: * to be searched.
394: *
395: * @return the transformation catalog entry if found, else null.
396: */
397: public TransformationCatalogEntry getTransformationCatalogEntry(
398: String siteHandle) {
399: List tcentries = null;
400: try {
401: //namespace and version are null for time being
402: tcentries = mTCHandle.getTCEntries(
403: this .TRANSFORMATION_NAMESPACE,
404: this .TRANSFORMATION_NAME,
405: this .TRANSFORMATION_VERSION, siteHandle,
406: TCType.INSTALLED);
407: } catch (Exception e) {
408: mLogger.log("Unable to retrieve entry from TC for "
409: + getCompleteTCName() + " :" + e.getMessage(),
410: LogManager.ERROR_MESSAGE_LEVEL);
411: }
412:
413: //see if any record is returned or not
414: return (tcentries == null) ? null
415: : (TransformationCatalogEntry) tcentries.get(0);
416: }
417:
418: /**
419: * Returns the environment profiles that are required for the default
420: * entry to sensibly work. Returns an empty list.
421: *
422: * @param site the site where the job is going to run.
423: *
424: * @return List of environment variables, else null in case where the
425: * required environment variables could not be found.
426: */
427: protected List getEnvironmentVariables(String site) {
428: return new ArrayList();
429: }
430:
431: /**
432: * Returns the namespace of the derivation that this implementation
433: * refers to.
434: *
435: * @return the namespace of the derivation.
436: */
437: protected String getDerivationNamespace() {
438: return this .DERIVATION_NAMESPACE;
439: }
440:
441: /**
442: * Returns the logical name of the derivation that this implementation
443: * refers to.
444: *
445: * @return the name of the derivation.
446: */
447: protected String getDerivationName() {
448: return this .DERIVATION_NAME;
449: }
450:
451: /**
452: * Returns the version of the derivation that this implementation
453: * refers to.
454: *
455: * @return the version of the derivation.
456: */
457: protected String getDerivationVersion() {
458: return this .DERIVATION_VERSION;
459: }
460:
461: /**
462: * Returns the complete name for the transformation.
463: *
464: * @return the complete name.
465: */
466: protected String getCompleteTCName() {
467: return Separator.combine(this.TRANSFORMATION_NAMESPACE,
468: this.TRANSFORMATION_NAME, this.TRANSFORMATION_VERSION);
469: }
470: }
|