001: /*
002: * This file or a portion of this file is licensed under the terms of
003: * the Globus Toolkit Public License, found in file GTPL, or at
004: * http://www.globus.org/toolkit/download/license.html. This notice must
005: * appear in redistributions of this file, with or without modification.
006: *
007: * Redistributions of this Software, with or without modification, must
008: * reproduce the GTPL in: (1) the Software, or (2) the Documentation or
009: * some other similar material which is provided with the Software (if
010: * any).
011: *
012: * Copyright 1999-2004 University of Chicago and The University of
013: * Southern California. All rights reserved.
014: */
015:
016: package org.griphyn.cPlanner.transfer.implementation;
017:
018: import org.griphyn.cPlanner.classes.TransferJob;
019: import org.griphyn.cPlanner.classes.FileTransfer;
020: import org.griphyn.cPlanner.classes.PlannerOptions;
021: import org.griphyn.cPlanner.classes.NameValue;
022:
023: import org.griphyn.cPlanner.common.LogManager;
024: import org.griphyn.cPlanner.common.PegasusProperties;
025:
026: import org.griphyn.cPlanner.namespace.VDS;
027:
028: import org.griphyn.common.util.Separator;
029:
030: import org.griphyn.common.catalog.TransformationCatalogEntry;
031:
032: import org.griphyn.common.classes.TCType;
033:
034: import java.util.List;
035: import java.util.Collection;
036: import java.util.Iterator;
037:
038: import java.io.File;
039: import java.io.FileWriter;
040: import java.util.ArrayList;
041:
042: /**
043: * The implementation that is used to create transfer jobs that callout to
044: * the SRMCP client, that can transfer files to and from a SRM server.
045: *
046: * <p>
047: * In order to use the transfer implementation implemented by this class,
048: * <pre>
049: * - the property pegasus.transfer.*.impl must be set to value SRM.
050: * </pre>
051: *
052: * <p>
053: * There should be an entry in the transformation catalog with the fully qualified
054: * name as <code>SRM::srmcp</code> for all the sites where workflow is run,
055: * or on the local site in case of third party transfers.
056: *
057: * <p>
058: * The arguments with which the client is invoked can be specified
059: * <pre>
060: * - by specifying the property pegasus.transfer.arguments
061: * - associating the VDS profile key transfer.arguments
062: * </pre>
063: *
064: * @author Karan Vahi
065: * @version $Revision: 145 $
066: */
067: public class SRM extends AbstractMultipleFTPerXFERJob {
068:
069: /**
070: * The transformation namespace for the transfer job.
071: */
072: public static final String TRANSFORMATION_NAMESPACE = "srm";
073:
074: /**
075: * The name of the underlying transformation that is queried for in the
076: * Transformation Catalog.
077: */
078: public static final String TRANSFORMATION_NAME = "srmcp";
079:
080: /**
081: * The version number for the transfer job.
082: */
083: public static final String TRANSFORMATION_VERSION = null;
084:
085: /**
086: * The derivation namespace for for the transfer job.
087: */
088: public static final String DERIVATION_NAMESPACE = "srm";
089:
090: /**
091: * The name of the underlying derivation.
092: */
093: public static final String DERIVATION_NAME = "srmcp";
094:
095: /**
096: * The derivation version number for the transfer job.
097: */
098: public static final String DERIVATION_VERSION = "1.20";
099:
100: /**
101: * A short description of the transfer implementation.
102: */
103: public static final String DESCRIPTION = "SRM Client that talks to SRM server and does only one "
104: + "transfer per invocation";
105:
106: /**
107: * The old file URL scheme that needs to be replaced.
108: */
109: private static final String OLD_FILE_URL_SCHEME = "file:///";
110:
111: /**
112: * The new file URL scheme that replaces the old one.
113: */
114: private static final String NEW_FILE_URL_SCHEME = "file:////";
115:
116: /**
117: * The overloaded constructor, that is called by the Factory to load the
118: * class.
119: *
120: * @param properties the properties object.
121: * @param options the options passed to the Planner.
122: */
123: public SRM(PegasusProperties properties, PlannerOptions options) {
124: super (properties, options);
125: }
126:
127: /**
128: * Writes to a FileWriter stream the input for the SRM job.
129: *
130: * @param writer the writer to the stdin file.
131: * @param files Collection of <code>FileTransfer</code> objects containing
132: * the information about sourceam fin and destURL's.
133: *
134: *
135: * @throws Exception
136: */
137: protected void writeJumboStdIn(FileWriter writer, Collection files)
138: throws Exception {
139: for (Iterator it = files.iterator(); it.hasNext();) {
140: FileTransfer ft = (FileTransfer) it.next();
141: NameValue source = ft.getSourceURL();
142: NameValue dest = ft.getDestURL();
143: writer.write(sanitizeURL(source.getValue()));
144: writer.write(" ");
145: writer.write(sanitizeURL(dest.getValue()));
146: writer.write("\n");
147: writer.flush();
148: }
149:
150: }
151:
152: /**
153: * Constructs the arguments to the transfer executable that need to be
154: * passed to the executable referred to in this transfer mode. The STDIN
155: * of the job is passed as an argument.
156: *
157: * @param job the object containing the transfer node.
158: *
159: * @return the argument string
160: */
161: protected String generateArgumentString(TransferJob job) {
162: StringBuffer sb = new StringBuffer();
163:
164: if (job.vdsNS.containsKey(VDS.TRANSFER_ARGUMENTS_KEY)) {
165: sb.append(job.vdsNS.removeKey(VDS.TRANSFER_ARGUMENTS_KEY));
166: }
167:
168: sb.append(" -copyjobfile ").append(job.getStdIn());
169:
170: return sb.toString();
171: }
172:
173: /**
174: * Makes sure the stdin is transferred by the Condor File Transfer
175: * Mechanism. In addition, the stdin is set to null, after the file has
176: * been marked for transfer by Condor File Transfer Mechanism.
177: *
178: * @job the <code>TransferJob</code> that has been created.
179: */
180: public void postProcess(TransferJob job) {
181: File f = new File(mPOptions.getSubmitDirectory(), job
182: .getStdIn());
183: //add condor key transfer_input_files to transfer the file
184: job.condorVariables.addIPFileForTransfer(f.getAbsolutePath());
185: job.setStdIn("");
186: }
187:
188: /**
189: * Returns a boolean indicating whether the transfer protocol being used by
190: * the implementation preserves the X Bit or not while staging.
191: *
192: * @return boolean
193: */
194: public boolean doesPreserveXBit() {
195: return false;
196: }
197:
198: /**
199: * Return a boolean indicating whether the transfers to be done always in
200: * a third party transfer mode. A value of false, results in the
201: * direct or peer to peer transfers being done.
202: * <p>
203: * A value of false does not preclude third party transfers. They still can
204: * be done, by setting the property "pegasus.transfer.*.thirdparty.sites".
205: *
206: * @return boolean indicating whether to always use third party transfers
207: * or not.
208: *
209: * @see PegasusProperties#getThirdPartySites(String)
210: */
211: public boolean useThirdPartyTransferAlways() {
212: return false;
213: }
214:
215: /**
216: * Returns a textual description of the transfer implementation.
217: *
218: * @return a short textual description
219: */
220: public String getDescription() {
221: return this .DESCRIPTION;
222: }
223:
224: /**
225: * Retrieves the transformation catalog entry for the executable that is
226: * being used to transfer the files in the implementation.
227: *
228: * @param siteHandle the handle of the site where the transformation is
229: * to be searched.
230: *
231: * @return the transformation catalog entry if found, else null.
232: */
233: public TransformationCatalogEntry getTransformationCatalogEntry(
234: String siteHandle) {
235: List tcentries = null;
236: try {
237: //namespace and version are null for time being
238: tcentries = mTCHandle.getTCEntries(
239: this .TRANSFORMATION_NAMESPACE,
240: this .TRANSFORMATION_NAME,
241: this .TRANSFORMATION_VERSION, siteHandle,
242: TCType.INSTALLED);
243: } catch (Exception e) {
244: mLogger.log("Unable to retrieve entry from TC for "
245: + getCompleteTCName() + " :" + e.getMessage(),
246: LogManager.ERROR_MESSAGE_LEVEL);
247: }
248:
249: //see if any record is returned or not
250: return (tcentries == null) ? null
251: : (TransformationCatalogEntry) tcentries.get(0);
252: }
253:
254: /**
255: * Returns the environment profiles that are required for the default
256: * entry to sensibly work. Returns an empty list.
257: *
258: * @param site the site where the job is going to run.
259: *
260: * @return List of environment variables, else null in case where the
261: * required environment variables could not be found.
262: */
263: protected List getEnvironmentVariables(String site) {
264: return new ArrayList();
265: }
266:
267: /**
268: * Returns the complete name for the transformation.
269: *
270: * @return the complete name.
271: */
272: protected String getCompleteTCName() {
273: return Separator.combine(this .TRANSFORMATION_NAMESPACE,
274: this .TRANSFORMATION_NAME, this .TRANSFORMATION_VERSION);
275: }
276:
277: /**
278: * Returns the namespace of the derivation that this implementation
279: * refers to.
280: *
281: * @return the namespace of the derivation.
282: */
283: protected String getDerivationNamespace() {
284: return this .DERIVATION_NAMESPACE;
285: }
286:
287: /**
288: * Returns the logical name of the derivation that this implementation
289: * refers to.
290: *
291: * @return the name of the derivation.
292: */
293: protected String getDerivationName() {
294: return this .DERIVATION_NAME;
295: }
296:
297: /**
298: * Returns the version of the derivation that this implementation
299: * refers to.
300: *
301: * @return the version of the derivation.
302: */
303: protected String getDerivationVersion() {
304: return this .DERIVATION_VERSION;
305: }
306:
307: /**
308: * Method that sanitizes file URL's to match with the SRM client.
309: * It replaces file:/// with file://// in the URL.
310: *
311: * @param url the URL to be santized.
312: *
313: * @return the sanitized URL if it is a file URL, else the URL that is
314: * passed.
315: */
316: private String sanitizeURL(String url) {
317: if (url.startsWith(this .OLD_FILE_URL_SCHEME)) {
318: //check if there is already a /
319: if (url.length() > 8) {
320: char c = url.charAt(8);
321: if (c != '/') {//should actually be File.separator
322: //insert the 4th /
323: return this .NEW_FILE_URL_SCHEME + url.substring(8);
324: }
325: }
326: }
327: return url;
328: }
329: }
|