001: /*
002: * This file or a portion of this file is licensed under the terms of
003: * the Globus Toolkit Public License, found in file GTPL, or at
004: * http://www.globus.org/toolkit/download/license.html. This notice must
005: * appear in redistributions of this file, with or without modification.
006: *
007: * Redistributions of this Software, with or without modification, must
008: * reproduce the GTPL in: (1) the Software, or (2) the Documentation or
009: * some other similar material which is provided with the Software (if
010: * any).
011: *
012: * Copyright 1999-2004 University of Chicago and The University of
013: * Southern California. All rights reserved.
014: */
015: package org.griphyn.cPlanner.transfer.implementation;
016:
017: import org.griphyn.cPlanner.classes.TransferJob;
018: import org.griphyn.cPlanner.classes.NameValue;
019: import org.griphyn.cPlanner.classes.PlannerOptions;
020: import org.griphyn.cPlanner.classes.FileTransfer;
021: import org.griphyn.cPlanner.classes.Profile;
022:
023: import org.griphyn.cPlanner.common.LogManager;
024: import org.griphyn.cPlanner.common.PegasusProperties;
025:
026: import org.griphyn.cPlanner.namespace.VDS;
027:
028: import org.griphyn.common.classes.TCType;
029:
030: import org.griphyn.common.catalog.TransformationCatalogEntry;
031:
032: import org.griphyn.common.util.Separator;
033:
034: import java.io.FileWriter;
035:
036: import java.util.Collection;
037: import java.util.Iterator;
038: import java.util.List;
039: import java.util.ArrayList;
040:
041: import java.util.StringTokenizer;
042: import java.io.File;
043:
044: /**
045: * The implementation that creates transfer jobs referring to the transfer
046: * executable distributed with the VDS.
047: *
048: * <p>
049: * The Transfer client is generally invoked on the remote execution sites, unless
050: * the user uses the thirdparty transfer option, in which case the transfer is
051: * invoked on the submit host. Hence there should be an entry in the transformation
052: * catalog for logical transformation <code>Transfer</code> at the execution sites.
053: * Transfer is distributed as part of the VDS worker package and can be found at
054: * $PEGASUS_HOME/bin/transfer.
055: * <p>
056: * It leads to the creation of the setup chmod jobs to the workflow, that appear
057: * as parents to compute jobs in case the transfer implementation does not
058: * preserve the X bit on the file being transferred. This is required for
059: * staging of executables as part of the workflow. The setup jobs are only added
060: * as children to the stage in jobs.
061: * <p>
062: * In order to use the transfer implementation implemented by this class, the
063: * property <code>pegasus.transfer.*.impl</code> must be set to
064: * value <code>Transfer</code>.
065: *
066: * @author Karan Vahi
067: * @version $Revision: 362 $
068: */
069: public class Transfer extends AbstractMultipleFTPerXFERJob {
070:
071: /**
072: * The transformation namespace for the transfer job.
073: */
074: public static final String TRANSFORMATION_NAMESPACE = "pegasus";
075:
076: /**
077: * The name of the underlying transformation that is queried for in the
078: * Transformation Catalog.
079: */
080: public static final String TRANSFORMATION_NAME = "transfer";
081:
082: /**
083: * The version number for the transfer job.
084: */
085: public static final String TRANSFORMATION_VERSION = null;
086:
087: /**
088: * The derivation namespace for for the transfer job.
089: */
090: public static final String DERIVATION_NAMESPACE = "pegasus";
091:
092: /**
093: * The name of the underlying derivation.
094: */
095: public static final String DERIVATION_NAME = "transfer";
096:
097: /**
098: * The derivation version number for the transfer job.
099: */
100: public static final String DERIVATION_VERSION = "1.0";
101:
102: /**
103: * A short description of the transfer implementation.
104: */
105: public static final String DESCRIPTION = "Pegasus Transfer Wrapper around GUC";
106:
107: /**
108: * The number of g-u-c processes that are spawned to transfer the files in
109: * one transfer job.
110: */
111: protected String mNumOfTXProcesses;
112:
113: /**
114: * The number of streams that each g-u-c process opens to do the ftp transfer.
115: */
116: protected String mNumOfTXStreams;
117:
118: /**
119: * Whether to use force option for the transfer executable or not.
120: */
121: protected boolean mUseForce;
122:
123: /**
124: * The overloaded constructor, that is called by the Factory to load the
125: * class.
126: *
127: * @param properties the properties object.
128: * @param options the options passed to the Planner.
129: */
130: public Transfer(PegasusProperties properties, PlannerOptions options) {
131: super (properties, options);
132: mNumOfTXProcesses = mProps.getNumOfTransferProcesses();
133: mNumOfTXStreams = mProps.getNumOfTransferStreams();
134: mUseForce = mProps.useForceInTransfer();
135:
136: }
137:
138: /**
139: * Return a boolean indicating whether the transfers to be done always in
140: * a third party transfer mode. A value of false, results in the
141: * direct or peer to peer transfers being done.
142: * <p>
143: * A value of false does not preclude third party transfers. They still can
144: * be done, by setting the property "pegasus.transfer.*.thirdparty.sites".
145: *
146: * @return boolean indicating whether to always use third party transfers
147: * or not.
148: *
149: * @see PegasusProperties#getThirdPartySites(String)
150: */
151: public boolean useThirdPartyTransferAlways() {
152: return false;
153: }
154:
155: /**
156: * Returns a boolean indicating whether the transfer protocol being used by
157: * the implementation preserves the X Bit or not while staging.
158: *
159: * @return boolean
160: */
161: public boolean doesPreserveXBit() {
162: return false;
163: }
164:
165: /**
166: * Returns a textual description of the transfer implementation.
167: *
168: * @return a short textual description
169: */
170: public String getDescription() {
171: return this .DESCRIPTION;
172: }
173:
174: /**
175: * Retrieves the transformation catalog entry for the executable that is
176: * being used to transfer the files in the implementation.
177: *
178: * @param siteHandle the handle of the site where the transformation is
179: * to be searched.
180: *
181: * @return the transformation catalog entry if found, else null.
182: */
183: public TransformationCatalogEntry getTransformationCatalogEntry(
184: String siteHandle) {
185: List tcentries = null;
186: try {
187: //namespace and version are null for time being
188: tcentries = mTCHandle.getTCEntries(
189: this .TRANSFORMATION_NAMESPACE,
190: this .TRANSFORMATION_NAME,
191: this .TRANSFORMATION_VERSION, siteHandle,
192: TCType.INSTALLED);
193: } catch (Exception e) {
194: mLogger.log("Unable to retrieve entry from TC for "
195: + getCompleteTCName() + " Cause:" + e,
196: LogManager.DEBUG_MESSAGE_LEVEL);
197: }
198:
199: return (tcentries == null) ? this .defaultTCEntry(
200: this .TRANSFORMATION_NAMESPACE,
201: this .TRANSFORMATION_NAME, this .TRANSFORMATION_VERSION,
202: siteHandle) : //try using a default one
203: (TransformationCatalogEntry) tcentries.get(0);
204:
205: }
206:
207: /**
208: * Returns the environment profiles that are required for the default
209: * entry to sensibly work.
210: *
211: * @param site the site where the job is going to run.
212: *
213: * @return List of environment variables, else null in case where the
214: * required environment variables could not be found.
215: */
216: protected List getEnvironmentVariables(String site) {
217: List result = new ArrayList(2);
218:
219: //create the CLASSPATH from home
220: String globus = mSCHandle.getEnvironmentVariable(site,
221: "GLOBUS_LOCATION");
222: if (globus == null) {
223: mLogger.log(
224: "GLOBUS_LOCATION not set in site catalog for site "
225: + site, LogManager.DEBUG_MESSAGE_LEVEL);
226: return null;
227: }
228:
229: //check for LD_LIBRARY_PATH
230: String ldpath = mSCHandle.getEnvironmentVariable(site,
231: "LD_LIBRARY_PATH");
232: if (ldpath == null) {
233: //construct a default LD_LIBRARY_PATH
234: ldpath = globus;
235: //remove trailing / if specified
236: ldpath = (ldpath.charAt(ldpath.length() - 1) == File.separatorChar) ? ldpath
237: .substring(0, ldpath.length() - 1)
238: : ldpath;
239:
240: ldpath = ldpath + File.separator + "lib";
241: mLogger.log(
242: "Constructed default LD_LIBRARY_PATH " + ldpath,
243: LogManager.DEBUG_MESSAGE_LEVEL);
244: }
245:
246: //we have both the environment variables
247: result.add(new Profile(Profile.ENV, "GLOBUS_LOCATION", globus));
248: result.add(new Profile(Profile.ENV, "LD_LIBRARY_PATH", ldpath));
249:
250: return result;
251: }
252:
253: /**
254: * Returns the namespace of the derivation that this implementation
255: * refers to.
256: *
257: * @return the namespace of the derivation.
258: */
259: protected String getDerivationNamespace() {
260: return this .DERIVATION_NAMESPACE;
261: }
262:
263: /**
264: * Returns the logical name of the derivation that this implementation
265: * refers to.
266: *
267: * @return the name of the derivation.
268: */
269: protected String getDerivationName() {
270: return this .DERIVATION_NAME;
271: }
272:
273: /**
274: * Returns the version of the derivation that this implementation
275: * refers to.
276: *
277: * @return the version of the derivation.
278: */
279: protected String getDerivationVersion() {
280: return this .DERIVATION_VERSION;
281: }
282:
283: /**
284: * It constructs the arguments to the transfer executable that need to be passed
285: * to the executable referred to in this transfer mode.
286: *
287: * @param job the object containing the transfer node.
288: * @return the argument string
289: */
290: protected String generateArgumentString(TransferJob job) {
291: StringBuffer sb = new StringBuffer();
292: if (job.vdsNS.containsKey(VDS.TRANSFER_ARGUMENTS_KEY)) {
293: sb.append(job.vdsNS.removeKey(VDS.TRANSFER_ARGUMENTS_KEY));
294: } else {
295: sb.append(" -P ").append(mNumOfTXProcesses).append(" -p ")
296: .append(mNumOfTXStreams);
297:
298: sb = (this .mUseForce) ? sb.append(" -f ") : sb;
299: }
300:
301: sb.append(" base-uri se-mount-point");
302:
303: return sb.toString();
304: }
305:
306: /**
307: * Writes to a FileWriter stream the stdin which goes into the magic script
308: * via standard input
309: *
310: * @param writer the writer to the stdin file.
311: * @param files Collection of <code>FileTransfer</code> objects containing
312: * the information about sourceam fin and destURL's.
313: *
314: *
315: * @throws Exception
316: */
317: protected void writeJumboStdIn(FileWriter writer, Collection files)
318: throws Exception {
319: for (Iterator it = files.iterator(); it.hasNext();) {
320: FileTransfer ft = (FileTransfer) it.next();
321: NameValue source = ft.getSourceURL();
322: //we want to leverage multiple dests if possible
323: NameValue dest = ft.getDestURL(true);
324: writer.write("#" + source.getKey() + "\n"
325: + source.getValue() + "\n#" + dest.getKey() + "\n"
326: + dest.getValue() + "\n");
327: writer.flush();
328: }
329:
330: }
331:
332: /**
333: * Returns the complete name for the transformation.
334: *
335: * @return the complete name.
336: */
337: protected String getCompleteTCName() {
338: return Separator.combine(this.TRANSFORMATION_NAMESPACE,
339: this.TRANSFORMATION_NAME, this.TRANSFORMATION_VERSION);
340: }
341: }
|