001: /*
002: * This file or a portion of this file is licensed under the terms of
003: * the Globus Toolkit Public License, found in file GTPL, or at
004: * http://www.globus.org/toolkit/download/license.html. This notice must
005: * appear in redistributions of this file, with or without modification.
006: *
007: * Redistributions of this Software, with or without modification, must
008: * reproduce the GTPL in: (1) the Software, or (2) the Documentation or
009: * some other similar material which is provided with the Software (if
010: * any).
011: *
012: * Copyright 1999-2004 University of Chicago and The University of
013: * Southern California. All rights reserved.
014: */
015:
016: package org.griphyn.cPlanner.transfer.implementation;
017:
018: import org.griphyn.cPlanner.classes.ADag;
019: import org.griphyn.cPlanner.classes.SubInfo;
020: import org.griphyn.cPlanner.classes.TransferJob;
021: import org.griphyn.cPlanner.classes.PlannerOptions;
022: import org.griphyn.cPlanner.classes.FileTransfer;
023: import org.griphyn.cPlanner.classes.SiteInfo;
024: import org.griphyn.cPlanner.classes.JobManager;
025: import org.griphyn.cPlanner.classes.NameValue;
026: import org.griphyn.cPlanner.classes.PegasusBag;
027:
028: import org.griphyn.cPlanner.common.LogManager;
029: import org.griphyn.cPlanner.common.PegasusProperties;
030:
031: import org.griphyn.cPlanner.transfer.Implementation;
032: import org.griphyn.cPlanner.transfer.MultipleFTPerXFERJob;
033:
034: import org.griphyn.common.catalog.TransformationCatalogEntry;
035:
036: import org.griphyn.common.classes.TCType;
037:
038: import org.griphyn.common.util.Separator;
039:
040: import org.griphyn.cPlanner.cluster.aggregator.JobAggregatorFactory;
041: import org.griphyn.cPlanner.cluster.JobAggregator;
042:
043: import java.io.File;
044: import java.io.FileWriter;
045:
046: import java.util.Collection;
047: import java.util.HashSet;
048: import java.util.List;
049: import java.util.LinkedList;
050: import java.util.Iterator;
051: import org.griphyn.cPlanner.transfer.Refiner;
052: import org.griphyn.cPlanner.classes.Profile;
053: import java.util.ArrayList;
054:
055: /**
056: * A Windward implementation that uses the seqexec client to execute
057: *
058: * -DC Transfer client to fetch the raw data sources
059: * -Pegasus transfer client to fetch the patterns from the pattern catalog.
060: *
061: * @author Karan Vahi
062: * @version $Revision: 450 $
063: */
064:
065: public class Windward extends Abstract implements MultipleFTPerXFERJob {
066:
067: /**
068: * The prefix to identify the raw data sources.
069: */
070: public static final String DATA_SOURCE_PREFIX = "DS";
071:
072: /**
073: * A short description of the transfer implementation.
074: */
075: public static final String DESCRIPTION = "Seqexec Transfer Wrapper around Pegasus Transfer and DC Transfer Client";
076:
077: /**
078: * The transformation namespace for for the transfer job.
079: */
080: public static final String TRANSFORMATION_NAMESPACE = "windward";
081:
082: /**
083: * The name of the underlying transformation that is queried for in the
084: * Transformation Catalog.
085: */
086: public static final String TRANSFORMATION_NAME = "dc-transfer";
087:
088: /**
089: * The version number for the transfer job.
090: */
091: public static final String TRANSFORMATION_VERSION = null;
092:
093: /**
094: * The derivation namespace for for the transfer job.
095: */
096: public static final String DERIVATION_NAMESPACE = "windward";
097:
098: /**
099: * The name of the underlying derivation.
100: */
101: public static final String DERIVATION_NAME = "dc-transfer";
102:
103: /**
104: * The derivation version number for the transfer job.
105: */
106: public static final String DERIVATION_VERSION = null;
107:
108: /**
109: * The handle to the transfer implementation.
110: */
111: private Transfer mPegasusTransfer;
112:
113: /**
114: * The seqexec job aggregator.
115: */
116: private JobAggregator mSeqExecAggregator;
117:
118: /**
119: * The overloaded constructor, that is called by the Factory to load the
120: * class.
121: *
122: * @param properties the properties object.
123: * @param options the options passed to the Planner.
124: */
125: public Windward(PegasusProperties properties, PlannerOptions options) {
126: super (properties, options);
127:
128: //should probably go through the factory
129: mPegasusTransfer = new Transfer(properties, options);
130:
131: //just to pass the label have to send an empty ADag.
132: //should be fixed
133: ADag dag = new ADag();
134: dag.dagInfo.setLabel("windward");
135: PegasusBag bag = new PegasusBag();
136: bag.add(PegasusBag.PEGASUS_PROPERTIES, properties);
137: bag.add(PegasusBag.PEGASUS_LOGMANAGER, mLogger);
138: bag.add(PegasusBag.PLANNER_OPTIONS, options);
139:
140: mSeqExecAggregator = JobAggregatorFactory.loadInstance(
141: JobAggregatorFactory.SEQ_EXEC_CLASS, dag, bag);
142: }
143:
144: /**
145: * Sets the callback to the refiner, that has loaded this implementation.
146: *
147: * @param refiner the transfer refiner that loaded the implementation.
148: */
149: public void setRefiner(Refiner refiner) {
150: super .setRefiner(refiner);
151: //also set the refiner for hte internal pegasus transfer
152: mPegasusTransfer.setRefiner(refiner);
153: }
154:
155: /**
156: *
157: *
158: * @param job the SubInfo object for the job, in relation to which
159: * the transfer node is being added. Either the transfer
160: * node can be transferring this jobs input files to
161: * the execution pool, or transferring this job's output
162: * files to the output pool.
163: * @param files collection of <code>FileTransfer</code> objects
164: * representing the data files and staged executables to be
165: * transferred.
166: * @param execFiles subset collection of the files parameter, that identifies
167: * the executable files that are being transferred.
168: * @param txJobName the name of transfer node.
169: * @param jobClass the job Class for the newly added job. Can be one of the
170: * following:
171: * stage-in
172: * stage-out
173: * inter-pool transfer
174: *
175: * @return the created TransferJob.
176: */
177: public TransferJob createTransferJob(SubInfo job, Collection files,
178: Collection execFiles, String txJobName, int jobClass) {
179:
180: //iterate through all the files and identify the patterns
181: //and the other data sources
182: Collection rawDataSources = new LinkedList();
183: Collection patterns = new LinkedList();
184:
185: for (Iterator it = files.iterator(); it.hasNext();) {
186: FileTransfer ft = (FileTransfer) it.next();
187: if (ft.getLFN().startsWith(DATA_SOURCE_PREFIX)
188: && !ft.getSourceURL().getValue().endsWith(".zip")) {
189: //it a raw data source that will have to be ingested
190: rawDataSources.add(ft);
191: } else {
192: //everything else is a pattern
193: patterns.add(ft);
194: }
195: }
196:
197: List txJobs = new LinkedList();
198:
199: //use the Pegasus Transfer to handle the patterns
200: TransferJob patternTXJob = null;
201: String patternTXJobStdin = null;
202: if (!patterns.isEmpty()) {
203: patternTXJob = mPegasusTransfer.createTransferJob(job,
204: patterns, null, txJobName, jobClass);
205:
206: //get the stdin and set it as lof in the arguments
207: patternTXJobStdin = patternTXJob.getStdIn();
208: StringBuffer patternArgs = new StringBuffer();
209: patternArgs.append(patternTXJob.getArguments()).append(" ")
210: .append(patternTXJobStdin);
211: patternTXJob.setArguments(patternArgs.toString());
212: patternTXJob.setStdIn("");
213: txJobs.add(patternTXJob);
214: }
215:
216: TransformationCatalogEntry tcEntry = this
217: .getTransformationCatalogEntry(job.getSiteHandle());
218: if (tcEntry == null) {
219: //should throw a TC specific exception
220: StringBuffer error = new StringBuffer();
221: error.append("Could not find entry in tc for lfn ").append(
222: getCompleteTCName()).append(" at site ").append(
223: job.getSiteHandle());
224: mLogger.log(error.toString(),
225: LogManager.ERROR_MESSAGE_LEVEL);
226: throw new RuntimeException(error.toString());
227: }
228:
229: //this should in fact only be set
230: // for non third party pools
231: //we first check if there entry for transfer universe,
232: //if no then go for globus
233: SiteInfo ePool = mSCHandle.getTXPoolEntry(job.getSiteHandle());
234: JobManager jobmanager = ePool.selectJobManager(
235: this .TRANSFER_UNIVERSE, true);
236:
237: //use the DC transfer client to handle the data sources
238: for (Iterator it = rawDataSources.iterator(); it.hasNext();) {
239: FileTransfer ft = (FileTransfer) it.next();
240: TransferJob dcTXJob = new TransferJob();
241:
242: dcTXJob.namespace = tcEntry.getLogicalNamespace();
243: dcTXJob.logicalName = tcEntry.getLogicalName();
244: dcTXJob.version = tcEntry.getLogicalVersion();
245:
246: dcTXJob.dvNamespace = this .DERIVATION_NAMESPACE;
247: dcTXJob.dvName = this .DERIVATION_NAME;
248: dcTXJob.dvVersion = this .DERIVATION_VERSION;
249:
250: dcTXJob.setRemoteExecutable(tcEntry
251: .getPhysicalTransformation());
252:
253: dcTXJob.globusScheduler = (jobmanager == null) ? null
254: : jobmanager.getInfo(JobManager.URL);
255:
256: dcTXJob.setArguments(quote(((NameValue) ft.getSourceURL())
257: .getValue())
258: + " "
259: + quote(((NameValue) ft.getDestURL()).getValue()));
260: dcTXJob.setStdIn("");
261: dcTXJob.setStdOut("");
262: dcTXJob.setStdErr("");
263: dcTXJob.setSiteHandle(job.getSiteHandle());
264:
265: //the profile information from the transformation
266: //catalog needs to be assimilated into the job
267: dcTXJob.updateProfiles(tcEntry);
268:
269: txJobs.add(dcTXJob);
270: }
271:
272: //now lets merge all these jobs
273: SubInfo merged = mSeqExecAggregator.construct(txJobs,
274: "transfer", txJobName);
275: TransferJob txJob = new TransferJob(merged);
276:
277: //set the name of the merged job back to the name of
278: //transfer job passed in the function call
279: txJob.setName(txJobName);
280: txJob.setJobType(jobClass);
281:
282: //if a pattern job was constructed add the pattern stdin
283: //as an input file for condor to transfer
284: if (patternTXJobStdin != null) {
285: txJob.condorVariables
286: .addIPFileForTransfer(patternTXJobStdin);
287: }
288: //take care of transfer of proxies
289: this .checkAndTransferProxy(txJob);
290:
291: //apply the priority to the transfer job
292: this .applyPriority(txJob);
293:
294: if (execFiles != null) {
295: //we need to add setup jobs to change the XBit
296: super .addSetXBitJobs(job, txJob, execFiles);
297: }
298:
299: return txJob;
300: }
301:
302: /**
303: * Returns a textual description of the transfer implementation.
304: *
305: * @return a short textual description
306: */
307: public String getDescription() {
308: return this .DESCRIPTION;
309: }
310:
311: /**
312: * Returns a boolean indicating whether the transfer protocol being used by
313: * the implementation preserves the X Bit or not while staging.
314: *
315: * @return boolean
316: */
317: public boolean doesPreserveXBit() {
318: return false;
319: }
320:
321: /**
322: * Return a boolean indicating whether the transfers to be done always in
323: * a third party transfer mode. A value of false, results in the
324: * direct or peer to peer transfers being done.
325: * <p>
326: * A value of false does not preclude third party transfers. They still can
327: * be done, by setting the property "pegasus.transfer.*.thirdparty.sites".
328: *
329: * @return boolean indicating whether to always use third party transfers
330: * or not.
331: *
332: * @see PegasusProperties#getThirdPartySites(String)
333: */
334: public boolean useThirdPartyTransferAlways() {
335: return false;
336: }
337:
338: /**
339: * Retrieves the transformation catalog entry for the executable that is
340: * being used to transfer the files in the implementation.
341: *
342: * @param siteHandle the handle of the site where the transformation is
343: * to be searched.
344: *
345: * @return the transformation catalog entry if found, else null.
346: */
347: public TransformationCatalogEntry getTransformationCatalogEntry(
348: String siteHandle) {
349: List tcentries = null;
350: try {
351: //namespace and version are null for time being
352: tcentries = mTCHandle.getTCEntries(
353: this .TRANSFORMATION_NAMESPACE,
354: this .TRANSFORMATION_NAME,
355: this .TRANSFORMATION_VERSION, siteHandle,
356: TCType.INSTALLED);
357: } catch (Exception e) {
358: mLogger.log("Unable to retrieve entry from TC for "
359: + getCompleteTCName() + " Cause:" + e,
360: LogManager.DEBUG_MESSAGE_LEVEL);
361: }
362:
363: return (tcentries == null) ? this .defaultTCEntry(
364: this .TRANSFORMATION_NAMESPACE,
365: this .TRANSFORMATION_NAME, this .TRANSFORMATION_VERSION,
366: siteHandle) : //try using a default one
367: (TransformationCatalogEntry) tcentries.get(0);
368:
369: }
370:
371: /**
372: * Quotes a URL and returns it
373: *
374: * @param url String
375: * @return quoted url
376: */
377: protected String quote(String url) {
378: StringBuffer q = new StringBuffer();
379: q.append("'").append(url).append("'");
380: return q.toString();
381: }
382:
383: /**
384: * Returns a default TC entry to be used for the DC transfer client.
385: *
386: * @param namespace the namespace of the transfer transformation
387: * @param name the logical name of the transfer transformation
388: * @param version the version of the transfer transformation
389: *
390: * @param site the site for which the default entry is required.
391: *
392: *
393: * @return the default entry.
394: */
395: protected TransformationCatalogEntry defaultTCEntry(
396: String namespace, String name, String version, String site) {
397:
398: TransformationCatalogEntry defaultTCEntry = null;
399: //check if DC_HOME is set
400: String dcHome = mSCHandle.getEnvironmentVariable(site,
401: "DC_HOME");
402:
403: mLogger.log("Creating a default TC entry for "
404: + Separator.combine(namespace, name, version)
405: + " at site " + site, LogManager.DEBUG_MESSAGE_LEVEL);
406:
407: //if home is still null
408: if (dcHome == null) {
409: //cannot create default TC
410: mLogger.log("Unable to create a default entry for "
411: + Separator.combine(namespace, name, version)
412: + " as DC_HOME is not set in Site Catalog",
413: LogManager.DEBUG_MESSAGE_LEVEL);
414: //set the flag back to true
415: return defaultTCEntry;
416: }
417:
418: //get the essential environment variables required to get
419: //it to work correctly
420: List envs = this .getEnvironmentVariables(site);
421: if (envs == null) {
422: //cannot create default TC
423: mLogger
424: .log(
425: "Unable to create a default entry for as could not construct necessary environment "
426: + Separator.combine(namespace,
427: name, version),
428: LogManager.DEBUG_MESSAGE_LEVEL);
429: //set the flag back to true
430: return defaultTCEntry;
431: }
432: //add the DC home to environments
433: envs.add(new Profile(Profile.ENV, "DC_HOME", dcHome));
434:
435: //remove trailing / if specified
436: dcHome = (dcHome.charAt(dcHome.length() - 1) == File.separatorChar) ? dcHome
437: .substring(0, dcHome.length() - 1)
438: : dcHome;
439:
440: //construct the path to the jar
441: StringBuffer path = new StringBuffer();
442: path.append(dcHome).append(File.separator).append("bin")
443: .append(File.separator).append("dc-transfer");
444:
445: defaultTCEntry = new TransformationCatalogEntry(namespace,
446: name, version);
447:
448: defaultTCEntry.setPhysicalTransformation(path.toString());
449: defaultTCEntry.setResourceId(site);
450: defaultTCEntry.setType(TCType.INSTALLED);
451: defaultTCEntry.setProfiles(envs);
452:
453: //register back into the transformation catalog
454: //so that we do not need to worry about creating it again
455: try {
456: mTCHandle.addTCEntry(defaultTCEntry, false);
457: } catch (Exception e) {
458: //just log as debug. as this is more of a performance improvement
459: //than anything else
460: mLogger.log(
461: "Unable to register in the TC the default entry "
462: + defaultTCEntry.getLogicalTransformation()
463: + " for site " + site, e,
464: LogManager.DEBUG_MESSAGE_LEVEL);
465: }
466: mLogger.log("Created entry with path "
467: + defaultTCEntry.getPhysicalTransformation(),
468: LogManager.DEBUG_MESSAGE_LEVEL);
469: return defaultTCEntry;
470: }
471:
472: /**
473: * Returns the environment profiles that are required for the default
474: * entry to sensibly work.
475: *
476: * @param site the site where the job is going to run.
477: *
478: * @return List of environment variables, else null in case where the
479: * required environment variables could not be found.
480: */
481: protected List getEnvironmentVariables(String site) {
482: List result = new ArrayList(1);
483:
484: //create the CLASSPATH from home
485: String java = mSCHandle.getEnvironmentVariable(site,
486: "JAVA_HOME");
487: if (java == null) {
488: mLogger.log("JAVA_HOME not set in site catalog for site "
489: + site, LogManager.DEBUG_MESSAGE_LEVEL);
490: return null;
491: }
492:
493: //we have both the environment variables
494: result.add(new Profile(Profile.ENV, "JAVA_HOME", java));
495:
496: return result;
497: }
498:
499: /**
500: * Returns the complete name for the transformation.
501: *
502: * @return the complete name.
503: */
504: protected String getCompleteTCName() {
505: return Separator.combine(this.TRANSFORMATION_NAMESPACE,
506: this.TRANSFORMATION_NAME, this.TRANSFORMATION_VERSION);
507: }
508:
509: }
|