001: /*
002: * This file or a portion of this file is licensed under the terms of
003: * the Globus Toolkit Public License, found in file GTPL, or at
004: * http://www.globus.org/toolkit/download/license.html. This notice must
005: * appear in redistributions of this file, with or without modification.
006: *
007: * Redistributions of this Software, with or without modification, must
008: * reproduce the GTPL in: (1) the Software, or (2) the Documentation or
009: * some other similar material which is provided with the Software (if
010: * any).
011: *
012: * Copyright 1999-2004 University of Chicago and The University of
013: * Southern California. All rights reserved.
014: */
015: package org.griphyn.cPlanner.transfer.implementation;
016:
017: import org.griphyn.cPlanner.classes.TransferJob;
018: import org.griphyn.cPlanner.classes.Profile;
019: import org.griphyn.cPlanner.classes.PlannerOptions;
020: import org.griphyn.cPlanner.classes.FileTransfer;
021:
022: import org.griphyn.cPlanner.common.LogManager;
023: import org.griphyn.cPlanner.common.PegasusProperties;
024:
025: import org.griphyn.cPlanner.namespace.VDS;
026:
027: import org.griphyn.common.classes.TCType;
028:
029: import org.griphyn.common.catalog.TransformationCatalogEntry;
030:
031: import org.griphyn.common.util.Separator;
032:
033: import java.io.FileWriter;
034:
035: import java.util.Collection;
036: import java.util.Iterator;
037: import java.util.List;
038: import java.util.ArrayList;
039:
040: import java.io.File;
041:
042: /**
043: * The implementation that creates transfer jobs referring to the T2
044: * executable distributed with the VDS. T2 extends upon the multiple transfers
045: * provided by the transfer executable, providing for conditional/optional transfers,
046: * and retry in case of url if alternative source or destinations are specified.
047: * <p>
048: * The T2 client is generally invoked on the remote execution sites, unless the
049: * user uses the thirdparty transfer option, in which case the T2 is invoked on
050: * the submit host. Hence there should be an entry in the transformation catalog
051: * for logical transformation <code>T2</code> at the execution sites.
052: * T2 is distributed as part of the VDS worker package and can be found at
053: * $PEGASUS_HOME/bin/T2.
054: * <p>
055: * It leads to the creation of the setup chmod jobs to the workflow, that appear
056: * as parents to compute jobs in case the transfer implementation does not
057: * preserve the X bit on the file being transferred. This is required for
058: * staging of executables as part of the workflow. The setup jobs are only added
059: * as children to the stage in jobs.
060: *
061: * <p>
062: * In order to use the transfer implementation implemented by this class,
063: * <pre>
064: * - the property pegasus.transfer.*.impl must be set to value T2.
065: * </pre>
066: *
067: * <p>
068: * There should be an entry in the transformation catalog with the fully qualified
069: * name as <code>pegasus::T2</code> for all the sites where workflow is run,
070: * or on the local site in case of third party transfers.
071: *
072: * <p>
073: * The arguments with which the client is invoked can be specified
074: * <pre>
075: * - by specifying the property pegasus.transfer.arguments
076: * - associating the VDS profile key transfer.arguments
077: * </pre>
078:
079: *
080: * @author Karan Vahi
081: * @version $Revision: 145 $
082: */
083: public class T2 extends AbstractMultipleFTPerXFERJob {
084:
085: /**
086: * The transformation namespace for the transfer job.
087: */
088: public static final String TRANSFORMATION_NAMESPACE = "pegasus";
089:
090: /**
091: * The name of the underlying transformation that is queried for in the
092: * Transformation Catalog.
093: */
094: public static final String TRANSFORMATION_NAME = "T2";
095:
096: /**
097: * The version number for the transfer job.
098: */
099: public static final String TRANSFORMATION_VERSION = null;
100:
101: /**
102: * The derivation namespace for for the transfer job.
103: */
104: public static final String DERIVATION_NAMESPACE = "pegasus";
105:
106: /**
107: * The name of the underlying derivation.
108: */
109: public static final String DERIVATION_NAME = "T2";
110:
111: /**
112: * The derivation version number for the transfer job.
113: */
114: public static final String DERIVATION_VERSION = "1.0";
115:
116: /**
117: * A short description of the transfer implementation.
118: */
119: public static final String DESCRIPTION = "Pegasus T2";
120:
121: /**
122: * The number of g-u-c processes that are spawned to transfer the files in
123: * one transfer job.
124: */
125: protected String mNumOfTXProcesses;
126:
127: /**
128: * The number of streams that each g-u-c process opens to do the ftp transfer.
129: */
130: protected String mNumOfTXStreams;
131:
132: /**
133: * Whether to use force option for the transfer executable or not.
134: */
135: protected boolean mUseForce;
136:
137: /**
138: * The overloaded constructor, that is called by the Factory to load the
139: * class.
140: *
141: * @param properties the properties object.
142: * @param options the options passed to the Planner.
143: */
144: public T2(PegasusProperties properties, PlannerOptions options) {
145: super (properties, options);
146: mNumOfTXProcesses = mProps.getNumOfTransferProcesses();
147: mNumOfTXStreams = mProps.getNumOfTransferStreams();
148: mUseForce = mProps.useForceInTransfer();
149:
150: }
151:
152: /**
153: * Return a boolean indicating whether the transfers to be done always in
154: * a third party transfer mode. A value of false, results in the
155: * direct or peer to peer transfers being done.
156: * <p>
157: * A value of false does not preclude third party transfers. They still can
158: * be done, by setting the property "pegasus.transfer.*.thirdparty.sites".
159: *
160: * @return boolean indicating whether to always use third party transfers
161: * or not.
162: *
163: * @see PegasusProperties#getThirdPartySites(String)
164: */
165: public boolean useThirdPartyTransferAlways() {
166: return false;
167: }
168:
169: /**
170: * Returns a boolean indicating whether the transfer protocol being used by
171: * the implementation preserves the X Bit or not while staging.
172: *
173: * @return boolean
174: */
175: public boolean doesPreserveXBit() {
176: return false;
177: }
178:
179: /**
180: * Returns a textual description of the transfer implementation.
181: *
182: * @return a short textual description
183: */
184: public String getDescription() {
185: return this .DESCRIPTION;
186: }
187:
188: /**
189: * Retrieves the transformation catalog entry for the executable that is
190: * being used to transfer the files in the implementation.
191: *
192: * @param siteHandle the handle of the site where the transformation is
193: * to be searched.
194: *
195: * @return the transformation catalog entry if found, else null.
196: */
197: public TransformationCatalogEntry getTransformationCatalogEntry(
198: String siteHandle) {
199: List tcentries = null;
200: try {
201: //namespace and version are null for time being
202: tcentries = mTCHandle.getTCEntries(
203: this .TRANSFORMATION_NAMESPACE,
204: this .TRANSFORMATION_NAME,
205: this .TRANSFORMATION_VERSION, siteHandle,
206: TCType.INSTALLED);
207: } catch (Exception e) {
208: mLogger.log("Unable to retrieve entry from TC for "
209: + getCompleteTCName() + " Cause:" + e,
210: LogManager.DEBUG_MESSAGE_LEVEL);
211: }
212:
213: return (tcentries == null) ? this .defaultTCEntry(
214: this .TRANSFORMATION_NAMESPACE,
215: this .TRANSFORMATION_NAME, this .TRANSFORMATION_VERSION,
216: siteHandle) : //try using a default one
217: (TransformationCatalogEntry) tcentries.get(0);
218:
219: }
220:
221: /**
222: * Returns the namespace of the derivation that this implementation
223: * refers to.
224: *
225: * @return the namespace of the derivation.
226: */
227: protected String getDerivationNamespace() {
228: return this .DERIVATION_NAMESPACE;
229: }
230:
231: /**
232: * Returns the logical name of the derivation that this implementation
233: * refers to.
234: *
235: * @return the name of the derivation.
236: */
237: protected String getDerivationName() {
238: return this .DERIVATION_NAME;
239: }
240:
241: /**
242: * Returns the version of the derivation that this implementation
243: * refers to.
244: *
245: * @return the version of the derivation.
246: */
247: protected String getDerivationVersion() {
248: return this .DERIVATION_VERSION;
249: }
250:
251: /**
252: * It constructs the arguments to the transfer executable that need to be passed
253: * to the executable referred to in this transfer mode.
254: *
255: * @param job the object containing the transfer node.
256: * @return the argument string
257: */
258: protected String generateArgumentString(TransferJob job) {
259: StringBuffer sb = new StringBuffer();
260: if (job.vdsNS.containsKey(VDS.TRANSFER_ARGUMENTS_KEY)) {
261: sb.append(job.vdsNS.removeKey(VDS.TRANSFER_ARGUMENTS_KEY));
262: } else {
263: sb.append(" -P ").append(mNumOfTXProcesses).append(" -p ")
264: .append(mNumOfTXStreams);
265:
266: sb = (this .mUseForce) ? sb.append(" -f ") : sb;
267: }
268:
269: sb.append(" base-uri se-mount-point");
270:
271: return sb.toString();
272: }
273:
274: /**
275: * Writes to a FileWriter stream the stdin which T2 takes via standard input.
276: *
277: * @param writer the writer to the stdin file.
278: * @param files Collection of <code>FileTransfer</code> objects containing
279: * the information about sourceam fin and destURL's.
280: *
281: * @see org.griphyn.cPlanner.classes.FileTransfer#toString()
282: * @throws java.lang.Exception
283: */
284: protected void writeJumboStdIn(FileWriter writer, Collection files)
285: throws Exception {
286:
287: for (Iterator it = files.iterator(); it.hasNext();) {
288: FileTransfer ft = (FileTransfer) it.next();
289: //the FileTransfer object writes out in T2 compatible format
290: writer.write(ft.toString());
291: writer.write("\n");
292: writer.flush();
293: }
294: }
295:
296: /**
297: * Returns the complete name for the transformation.
298: *
299: * @return the complete name.
300: */
301: protected String getCompleteTCName() {
302: return Separator.combine(this .TRANSFORMATION_NAMESPACE,
303: this .TRANSFORMATION_NAME, this .TRANSFORMATION_VERSION);
304: }
305:
306: /**
307: * Returns the environment profiles that are required for the default
308: * entry to sensibly work.
309: *
310: * @param site the site where the job is going to run.
311: *
312: * @return List of environment variables, else null in case where the
313: * required environment variables could not be found.
314: */
315: protected List getEnvironmentVariables(String site) {
316: List result = new ArrayList(2);
317:
318: //create the CLASSPATH from home
319: String globus = mSCHandle.getEnvironmentVariable(site,
320: "GLOBUS_LOCATION");
321: if (globus == null) {
322: mLogger.log(
323: "GLOBUS_LOCATION not set in site catalog for site "
324: + site, LogManager.DEBUG_MESSAGE_LEVEL);
325: return null;
326: }
327:
328: //check for LD_LIBRARY_PATH
329: String ldpath = mSCHandle.getEnvironmentVariable(site,
330: "LD_LIBRARY_PATH");
331: if (ldpath == null) {
332: //construct a default LD_LIBRARY_PATH
333: ldpath = globus;
334: //remove trailing / if specified
335: ldpath = (ldpath.charAt(ldpath.length() - 1) == File.separatorChar) ? ldpath
336: .substring(0, ldpath.length() - 1)
337: : ldpath;
338:
339: ldpath = ldpath + File.separator + "lib";
340: mLogger.log(
341: "Constructed default LD_LIBRARY_PATH " + ldpath,
342: LogManager.DEBUG_MESSAGE_LEVEL);
343: }
344:
345: //we have both the environment variables
346: result.add(new Profile(Profile.ENV, "GLOBUS_LOCATION", globus));
347: result.add(new Profile(Profile.ENV, "LD_LIBRARY_PATH", ldpath));
348:
349: return result;
350: }
351:
352: }
|