001: /**
002: * This file or a portion of this file is licensed under the terms of
003: * the Globus Toolkit Public License, found in file GTPL, or at
004: * http://www.globus.org/toolkit/download/license.html. This notice must
005: * appear in redistributions of this file, with or without modification.
006: *
007: * Redistributions of this Software, with or without modification, must
008: * reproduce the GTPL in: (1) the Software, or (2) the Documentation or
009: * some other similar material which is provided with the Software (if
010: * any).
011: *
012: * Copyright 1999-2004 University of Chicago and The University of
013: * Southern California. All rights reserved.
014: */package org.griphyn.cPlanner.transfer.sls;
015:
016: import org.griphyn.cPlanner.classes.PegasusBag;
017: import org.griphyn.cPlanner.classes.FileTransfer;
018: import org.griphyn.cPlanner.classes.SubInfo;
019: import org.griphyn.cPlanner.classes.PegasusFile;
020: import org.griphyn.cPlanner.namespace.ENV;
021:
022: import org.griphyn.cPlanner.common.PegasusProperties;
023: import org.griphyn.cPlanner.common.LogManager;
024:
025: import org.griphyn.cPlanner.code.GridStart;
026:
027: import org.griphyn.cPlanner.transfer.SLS;
028:
029: import org.griphyn.common.catalog.TransformationCatalog;
030: import org.griphyn.common.catalog.TransformationCatalogEntry;
031:
032: import org.griphyn.cPlanner.poolinfo.PoolInfoProvider;
033:
034: import java.io.File;
035: import java.io.IOException;
036: import java.io.FileWriter;
037:
038: import java.util.Iterator;
039: import java.util.Set;
040: import org.griphyn.cPlanner.classes.Profile;
041: import java.util.List;
042: import org.griphyn.common.classes.TCType;
043: import org.griphyn.common.util.Separator;
044: import java.util.ArrayList;
045:
046: /**
047: * This uses the transfer executable distributed with Pegasus to do the
048: * second level staging.
049: *
050: * @author Karan Vahi
051: * @version $Revision: 414 $
052: */
053: public class Transfer implements SLS {
054:
055: /**
056: * The transformation namespace for the transfer job.
057: */
058: public static final String TRANSFORMATION_NAMESPACE = "pegasus";
059:
060: /**
061: * The name of the underlying transformation that is queried for in the
062: * Transformation Catalog.
063: */
064: public static final String TRANSFORMATION_NAME = "transfer";
065:
066: /**
067: * The version number for the transfer job.
068: */
069: public static final String TRANSFORMATION_VERSION = null;
070:
071: /**
072: * The derivation namespace for for the transfer job.
073: */
074: public static final String DERIVATION_NAMESPACE = "pegasus";
075:
076: /**
077: * The name of the underlying derivation.
078: */
079: public static final String DERIVATION_NAME = "transfer";
080:
081: /**
082: * The derivation version number for the transfer job.
083: */
084: public static final String DERIVATION_VERSION = "1.0";
085:
086: /**
087: * A short description of the transfer implementation.
088: */
089: public static final String DESCRIPTION = "Pegasus Transfer Wrapper around GUC";
090:
091: /**
092: * The handle to the site catalog.
093: */
094: protected PoolInfoProvider mSiteHandle;
095:
096: /**
097: * The handle to the transformation catalog.
098: */
099: protected TransformationCatalog mTCHandle;
100:
101: /**
102: * The handle to the properties.
103: */
104: protected PegasusProperties mProps;
105:
106: /**
107: * The handle to the logging manager.
108: */
109: protected LogManager mLogger;
110:
111: /**
112: * The path to local user proxy.
113: */
114: protected String mLocalUserProxy;
115:
116: /**
117: * The basename of the proxy
118: */
119: protected String mLocalUserProxyBasename;
120:
121: /**
122: * The local url prefix for the submit host.
123: */
124: protected String mLocalURLPrefix;
125:
126: /**
127: * The default constructor.
128: */
129: public Transfer() {
130: }
131:
132: /**
133: * Initializes the SLS implementation.
134: *
135: * @param bag the bag of objects. Contains access to catalogs etc.
136: */
137: public void initialize(PegasusBag bag) {
138: mProps = bag.getPegasusProperties();
139: mLogger = bag.getLogger();
140: mSiteHandle = bag.getHandleToSiteCatalog();
141: mTCHandle = bag.getHandleToTransformationCatalog();
142: mLocalUserProxy = getPathToUserProxy();
143: mLocalUserProxyBasename = (mLocalUserProxy == null) ? null
144: : new File(mLocalUserProxy).getName();
145:
146: mLocalURLPrefix = mSiteHandle.getURLPrefix("local");
147: }
148:
149: /**
150: * Returns a boolean whether the SLS implementation does a condor based
151: * modification or not. By condor based modification we mean whether it
152: * uses condor specific classads to achieve the second level staging or not.
153: *
154: * @return false
155: */
156: public boolean doesCondorModifications() {
157: return false;
158: }
159:
160: /**
161: * Constructs a command line invocation for a job, with a given sls file.
162: * The SLS maybe null. In the case where SLS impl does not read from a file,
163: * it is advised to create a file in generateSLSXXX methods, and then read
164: * the file in this function and put it on the command line.
165: *
166: * @param job the job that is being sls enabled
167: * @param slsFile the slsFile can be null
168: *
169: * @return invocation string
170: */
171: public String invocationString(SubInfo job, File slsFile) {
172: //sanity check
173: if (slsFile == null) {
174: return null;
175: }
176:
177: StringBuffer invocation = new StringBuffer();
178:
179: TransformationCatalogEntry entry = this
180: .getTransformationCatalogEntry(job.getSiteHandle());
181: if (entry == null) {
182: //cannot create an invocation
183: return null;
184:
185: }
186:
187: //we need to set the x bit on proxy correctly first
188: if (mLocalUserProxyBasename != null) {
189: invocation.append("/bin/bash -c \"chmod 600 ").append(
190: slsFile.getParent()).append(File.separator).append(
191: mLocalUserProxyBasename).append(" && ");
192: }
193: invocation.append(entry.getPhysicalTransformation()).append(
194: " base mnt ").append(slsFile.getAbsolutePath());
195:
196: if (mLocalUserProxyBasename != null) {
197: invocation.append("\"");
198: }
199:
200: return invocation.toString();
201:
202: }
203:
204: /**
205: * Returns a boolean indicating whether it will an input file for a job
206: * to do the transfers. Transfer reads from stdin the file transfers that
207: * it needs to do. Always returns true, as we need to transfer the proxy
208: * always.
209: *
210: * @param job the job being detected.
211: *
212: * @return true
213: */
214: public boolean needsSLSInput(SubInfo job) {
215: return true;
216: }
217:
218: /**
219: * Returns a boolean indicating whether it will an output file for a job
220: * to do the transfers. Transfer reads from stdin the file transfers that
221: * it needs to do.
222: *
223: * @param job the job being detected.
224: *
225: * @return true
226: */
227: public boolean needsSLSOutput(SubInfo job) {
228: Set files = job.getOutputFiles();
229: return !(files == null || files.isEmpty());
230: }
231:
232: /**
233: * Returns the LFN of sls input file.
234: *
235: * @param job SubInfo
236: *
237: * @return the name of the sls input file.
238: */
239: public String getSLSInputLFN(SubInfo job) {
240: StringBuffer lfn = new StringBuffer();
241: lfn.append("sls_").append(job.getName()).append(".in");
242: return lfn.toString();
243: }
244:
245: /**
246: * Returns the LFN of sls output file.
247: *
248: * @param job SubInfo
249: *
250: * @return the name of the sls input file.
251: */
252: public String getSLSOutputLFN(SubInfo job) {
253: StringBuffer lfn = new StringBuffer();
254: lfn.append("sls_").append(job.getName()).append(".out");
255: return lfn.toString();
256: }
257:
258: /**
259: * Generates a second level staging file of the input files to the worker
260: * node directory.
261: *
262: * @param job job for which the file is being created
263: * @param fileName name of the file that needs to be written out.
264: * @param submitDir submit directory where it has to be written out.
265: * @param headNodeDirectory directory on the head node of the compute site.
266: * @param workerNodeDirectory worker node directory
267: *
268: * @return the full path to lof file created, else null if no file is
269: * written out.
270: */
271: public File generateSLSInputFile(SubInfo job, String fileName,
272: String submitDir, String headNodeDirectory,
273: String workerNodeDirectory) {
274:
275: //sanity check
276: if (!needsSLSInput(job)) {
277: mLogger.log("Not Writing out a SLS input file for job "
278: + job.getName(), LogManager.DEBUG_MESSAGE_LEVEL);
279: return null;
280: }
281:
282: Set files = job.getInputFiles();
283: File sls = null;
284:
285: //figure out the remote site's headnode gridftp server
286: //and the working directory on it.
287: //the below should be cached somehow
288: String sourceURLPrefix = mSiteHandle.getURLPrefix(job
289: .getSiteHandle());
290: //String sourceDir = mSiteHandle.getExecPoolWorkDir( job );
291: String sourceDir = headNodeDirectory;
292: String destDir = workerNodeDirectory;
293:
294: //writing the stdin file
295: try {
296: sls = new File(submitDir, fileName);
297: FileWriter input = new FileWriter(sls);
298: PegasusFile pf;
299:
300: //To do. distinguish the sls file from the other input files
301: for (Iterator it = files.iterator(); it.hasNext();) {
302: pf = (PegasusFile) it.next();
303:
304: if (pf.getLFN().equals(ENV.X509_USER_PROXY_KEY)) {
305: //ignore the proxy file for time being
306: //as we picking it from the head node directory
307: continue;
308: }
309:
310: input.write(sourceURLPrefix);
311: input.write(File.separator);
312: input.write(sourceDir);
313: input.write(File.separator);
314: input.write(pf.getLFN());
315: input.write("\n");
316:
317: //destination
318: input.write("file://");
319: input.write(destDir);
320: input.write(File.separator);
321: input.write(pf.getLFN());
322: input.write("\n");
323:
324: }
325: //close the stream
326: input.close();
327:
328: } catch (IOException e) {
329: mLogger.log("Unable to write the sls file for job "
330: + job.getName(), e, LogManager.ERROR_MESSAGE_LEVEL);
331: }
332: return sls;
333: }
334:
335: /**
336: * Generates a second level staging file of the input files to the worker
337: * node directory.
338: *
339: * @param job the job for which the file is being created
340: * @param fileName the name of the file that needs to be written out.
341: * @param submitDir the submit directory where it has to be written out.
342: * @param headNodeDirectory the directory on the head node of the
343: * compute site.
344: * @param workerNodeDirectory the worker node directory
345: *
346: * @return the full path to lof file created, else null if no file is
347: * written out.
348: *
349: */
350: public File generateSLSOutputFile(SubInfo job, String fileName,
351: String submitDir, String headNodeDirectory,
352: String workerNodeDirectory) {
353:
354: //sanity check
355: if (!needsSLSOutput(job)) {
356: mLogger.log("Not Writing out a SLS output file for job "
357: + job.getName(), LogManager.DEBUG_MESSAGE_LEVEL);
358: return null;
359: }
360:
361: File sls = null;
362: Set files = job.getOutputFiles();
363:
364: //figure out the remote site's headnode gridftp server
365: //and the working directory on it.
366: //the below should be cached somehow
367: String destURLPrefix = mSiteHandle.getURLPrefix(job
368: .getSiteHandle());
369: //String sourceDir = mSiteHandle.getExecPoolWorkDir( job );
370: String destDir = headNodeDirectory;
371: String sourceDir = workerNodeDirectory;
372:
373: //writing the stdin file
374: try {
375: StringBuffer name = new StringBuffer();
376: name.append("sls_").append(job.getName()).append(".out");
377: sls = new File(submitDir, name.toString());
378: FileWriter input = new FileWriter(sls);
379: PegasusFile pf;
380:
381: //To do. distinguish the sls file from the other input files
382: for (Iterator it = files.iterator(); it.hasNext();) {
383: pf = (PegasusFile) it.next();
384:
385: //source
386: input.write("file://");
387: input.write(sourceDir);
388: input.write(File.separator);
389: input.write(pf.getLFN());
390: input.write("\n");
391:
392: //destination
393: input.write(destURLPrefix);
394: input.write(File.separator);
395: input.write(destDir);
396: input.write(File.separator);
397: input.write(pf.getLFN());
398: input.write("\n");
399:
400: }
401: //close the stream
402: input.close();
403:
404: } catch (IOException e) {
405: mLogger.log("Unable to write the sls output file for job "
406: + job.getName(), e, LogManager.ERROR_MESSAGE_LEVEL);
407: }
408:
409: return sls;
410:
411: }
412:
413: /**
414: * Modifies a job for the first level staging to headnode.This is to add
415: * any files that needs to be staged to the head node for a job specific
416: * to the SLS implementation. If any file needs to be added, a <code>FileTransfer</code>
417: * object should be created and added as an input or an output file.
418: *
419: *
420: * @param job the job
421: * @param submitDir the submit directory
422: * @param slsInputLFN the sls input file if required, that is used for
423: * staging in from the head node to worker node directory.
424: * @param slsOutputLFN the sls output file if required, that is used
425: * for staging in from the head node to worker node directory.
426: * @return boolean
427: */
428: public boolean modifyJobForFirstLevelStaging(SubInfo job,
429: String submitDir, String slsInputLFN, String slsOutputLFN) {
430:
431: String separator = File.separator;
432:
433: //incorporate the sls input file if required
434: if (slsInputLFN != null) {
435:
436: FileTransfer ft = new FileTransfer(slsInputLFN, job
437: .getName());
438:
439: //the source sls is to be sent across from the local site
440: //using the grid ftp server at local site.
441: StringBuffer sourceURL = new StringBuffer();
442: sourceURL.append(mLocalURLPrefix).append(separator).append(
443: submitDir).append(separator).append(slsInputLFN);
444: ft.addSource("local", sourceURL.toString());
445:
446: //the destination URL is the working directory on the filesystem
447: //on the head node where the job is to be run.
448: StringBuffer destURL = new StringBuffer();
449: destURL.append(
450: mSiteHandle.getURLPrefix(job.getSiteHandle()))
451: .append(separator).append(
452: mSiteHandle.getExecPoolWorkDir(job))
453: .append(separator).append(slsInputLFN);
454: ft.addDestination(job.getSiteHandle(), destURL.toString());
455:
456: //add this as input file for the job
457: job.addInputFile(ft);
458: }
459:
460: //add the sls out file as input to the job
461: if (slsOutputLFN != null) {
462: FileTransfer ft = new FileTransfer(slsOutputLFN, job
463: .getName());
464:
465: //the source sls is to be sent across from the local site
466: //using the grid ftp server at local site.
467: StringBuffer sourceURL = new StringBuffer();
468: sourceURL.append(mLocalURLPrefix).append(separator).append(
469: submitDir).append(separator).append(slsOutputLFN);
470:
471: ft.addSource("local", sourceURL.toString());
472:
473: //the destination URL is the working directory on the filesystem
474: //on the head node where the job is to be run.
475: StringBuffer destURL = new StringBuffer();
476: destURL.append(
477: mSiteHandle.getURLPrefix(job.getSiteHandle()))
478: .append(separator).append(
479: mSiteHandle.getExecPoolWorkDir(job))
480: .append(separator).append(slsOutputLFN);
481: ft.addDestination(job.getSiteHandle(), destURL.toString());
482:
483: //add this as input file for the job
484: job.addInputFile(ft);
485: }
486:
487: //add the proxy as input file if required.
488: if (mLocalUserProxy != null) {
489: FileTransfer proxy = new FileTransfer(
490: ENV.X509_USER_PROXY_KEY, job.getName());
491: StringBuffer sourceURL = new StringBuffer();
492: sourceURL.append(mLocalURLPrefix).append(mLocalUserProxy);
493: proxy.addSource("local", sourceURL.toString());
494:
495: StringBuffer destURL = new StringBuffer();
496: destURL.append(
497: mSiteHandle.getURLPrefix(job.getSiteHandle()))
498: .append(separator).append(
499: mSiteHandle.getExecPoolWorkDir(job))
500: .append(separator).append(mLocalUserProxyBasename);
501: proxy.addDestination(job.getSiteHandle(), destURL
502: .toString());
503: job.addInputFile(proxy);
504: }
505:
506: return true;
507:
508: }
509:
510: /**
511: * Modifies a compute job for second level staging. The only modification
512: * it does is add the appropriate environment varialbes to the job
513: *
514: * @param job the job to be modified.
515: * @param headNodeURLPrefix the url prefix for the server on the headnode
516: * @param headNodeDirectory the directory on the headnode, where the
517: * input data is read from and the output data written out.
518: * @param workerNodeDirectory the directory in the worker node tmp
519: *
520: * @return boolean indicating whether job was successfully modified or
521: * not.
522: *
523: */
524: public boolean modifyJobForWorkerNodeExecution(SubInfo job,
525: String headNodeURLPrefix, String headNodeDirectory,
526: String workerNodeDirectory) {
527:
528: List envs = this .getEnvironmentVariables(job.getSiteHandle());
529:
530: if (envs == null || envs.isEmpty()) {
531: //cannot create default TC
532: mLogger.log("Unable to set the necessary environment "
533: + Separator.combine(this .TRANSFORMATION_NAMESPACE,
534: this .TRANSFORMATION_NAME,
535: this .TRANSFORMATION_VERSION),
536: LogManager.DEBUG_MESSAGE_LEVEL);
537: return false;
538: }
539:
540: for (Iterator it = envs.iterator(); it.hasNext();) {
541: job.envVariables.checkKeyInNS((Profile) it.next());
542: }
543:
544: return true;
545:
546: }
547:
548: /**
549: * Returns the path to the user proxy from the pool configuration file and
550: * the properties file. The value in the properties file overrides the
551: * value from the pool configuration file.
552: *
553: * @return path to user proxy on local pool.
554: * null if no path is found.
555: */
556: protected String getPathToUserProxy() {
557: List l = mSiteHandle.getPoolProfile("local", Profile.ENV);
558: String proxy = null;
559:
560: if (l != null) {
561: //try to get the path to the proxy on local pool
562: for (Iterator it = l.iterator(); it.hasNext();) {
563: Profile p = (Profile) it.next();
564: proxy = p.getProfileKey().equalsIgnoreCase(
565: ENV.X509_USER_PROXY_KEY) ? p.getProfileValue()
566: : proxy;
567: }
568: }
569:
570: //overload from the properties file
571: ENV env = new ENV();
572: env.checkKeyInNS(mProps, "local");
573: proxy = env.containsKey(ENV.X509_USER_PROXY_KEY) ? (String) env
574: .get(ENV.X509_USER_PROXY_KEY) : proxy;
575:
576: return proxy;
577: }
578:
579: /**
580: * Retrieves the transformation catalog entry for the executable that is
581: * being used to transfer the files in the implementation.
582: *
583: * @param siteHandle the handle of the site where the transformation is
584: * to be searched.
585: *
586: * @return the transformation catalog entry if found, else null.
587: */
588: public TransformationCatalogEntry getTransformationCatalogEntry(
589: String siteHandle) {
590: List tcentries = null;
591: try {
592: //namespace and version are null for time being
593: tcentries = mTCHandle.getTCEntries(
594: this .TRANSFORMATION_NAMESPACE,
595: this .TRANSFORMATION_NAME,
596: this .TRANSFORMATION_VERSION, siteHandle,
597: TCType.INSTALLED);
598: } catch (Exception e) {
599: mLogger.log("Unable to retrieve entry from TC for "
600: + Separator.combine(this .TRANSFORMATION_NAMESPACE,
601: this .TRANSFORMATION_NAME,
602: this .TRANSFORMATION_VERSION) + " Cause:"
603: + e, LogManager.DEBUG_MESSAGE_LEVEL);
604: }
605:
606: return (tcentries == null) ? this .defaultTCEntry(
607: this .TRANSFORMATION_NAMESPACE,
608: this .TRANSFORMATION_NAME, this .TRANSFORMATION_VERSION,
609: siteHandle) : //try using a default one
610: (TransformationCatalogEntry) tcentries.get(0);
611:
612: }
613:
614: /**
615: * Returns a default TC entry to be used in case entry is not found in the
616: * transformation catalog.
617: *
618: * @param namespace the namespace of the transfer transformation
619: * @param name the logical name of the transfer transformation
620: * @param version the version of the transfer transformation
621: *
622: * @param site the site for which the default entry is required.
623: *
624: *
625: * @return the default entry.
626: */
627: protected TransformationCatalogEntry defaultTCEntry(
628: String namespace, String name, String version, String site) {
629:
630: TransformationCatalogEntry defaultTCEntry = null;
631: //check if PEGASUS_HOME is set
632: String home = mSiteHandle.getPegasusHome(site);
633: //if PEGASUS_HOME is not set, use VDS_HOME
634: home = (home == null) ? mSiteHandle.getVDS_HOME(site) : home;
635:
636: mLogger.log("Creating a default TC entry for "
637: + Separator.combine(namespace, name, version)
638: + " at site " + site, LogManager.DEBUG_MESSAGE_LEVEL);
639:
640: //if home is still null
641: if (home == null) {
642: //cannot create default TC
643: mLogger
644: .log(
645: "Unable to create a default entry for "
646: + Separator.combine(namespace,
647: name, version)
648: + " as PEGASUS_HOME or VDS_HOME is not set in Site Catalog",
649: LogManager.DEBUG_MESSAGE_LEVEL);
650: //set the flag back to true
651: return defaultTCEntry;
652: }
653:
654: //remove trailing / if specified
655: home = (home.charAt(home.length() - 1) == File.separatorChar) ? home
656: .substring(0, home.length() - 1)
657: : home;
658:
659: //construct the path to it
660: StringBuffer path = new StringBuffer();
661: path.append(home).append(File.separator).append("bin").append(
662: File.separator).append(name);
663:
664: defaultTCEntry = new TransformationCatalogEntry(namespace,
665: name, version);
666:
667: defaultTCEntry.setPhysicalTransformation(path.toString());
668: defaultTCEntry.setResourceId(site);
669: defaultTCEntry.setType(TCType.INSTALLED);
670:
671: //register back into the transformation catalog
672: //so that we do not need to worry about creating it again
673: try {
674: mTCHandle.addTCEntry(defaultTCEntry, false);
675: } catch (Exception e) {
676: //just log as debug. as this is more of a performance improvement
677: //than anything else
678: mLogger.log(
679: "Unable to register in the TC the default entry "
680: + defaultTCEntry.getLogicalTransformation()
681: + " for site " + site, e,
682: LogManager.DEBUG_MESSAGE_LEVEL);
683: }
684: mLogger.log("Created entry with path "
685: + defaultTCEntry.getPhysicalTransformation(),
686: LogManager.DEBUG_MESSAGE_LEVEL);
687: return defaultTCEntry;
688: }
689:
690: /**
691: * Returns the environment profiles that are required for the default
692: * entry to sensibly work.
693: *
694: * @param site the site where the job is going to run.
695: *
696: * @return List of environment variables, else null in case where the
697: * required environment variables could not be found.
698: */
699: protected List getEnvironmentVariables(String site) {
700: List result = new ArrayList(2);
701:
702: //create the CLASSPATH from home
703: String globus = mSiteHandle.getEnvironmentVariable(site,
704: "GLOBUS_LOCATION");
705: if (globus == null) {
706: mLogger.log(
707: "GLOBUS_LOCATION not set in site catalog for site "
708: + site, LogManager.DEBUG_MESSAGE_LEVEL);
709: return null;
710: }
711:
712: //check for LD_LIBRARY_PATH
713: String ldpath = mSiteHandle.getEnvironmentVariable(site,
714: "LD_LIBRARY_PATH");
715: if (ldpath == null) {
716: //construct a default LD_LIBRARY_PATH
717: ldpath = globus;
718: //remove trailing / if specified
719: ldpath = (ldpath.charAt(ldpath.length() - 1) == File.separatorChar) ? ldpath
720: .substring(0, ldpath.length() - 1)
721: : ldpath;
722:
723: ldpath = ldpath + File.separator + "lib";
724: mLogger.log(
725: "Constructed default LD_LIBRARY_PATH " + ldpath,
726: LogManager.DEBUG_MESSAGE_LEVEL);
727: }
728:
729: //we have both the environment variables
730: result.add(new Profile(Profile.ENV, "GLOBUS_LOCATION", globus));
731: result.add(new Profile(Profile.ENV, "LD_LIBRARY_PATH", ldpath));
732:
733: return result;
734: }
735:
736: }
|