001: /*
002: * This file or a portion of this file is licensed under the terms of
003: * the Globus Toolkit Public License, found in file GTPL, or at
004: * http://www.globus.org/toolkit/download/license.html. This notice must
005: * appear in redistributions of this file, with or without modification.
006: *
007: * Redistributions of this Software, with or without modification, must
008: * reproduce the GTPL in: (1) the Software, or (2) the Documentation or
009: * some other similar material which is provided with the Software (if
010: * any).
011: *
012: * Copyright 1999-2004 University of Chicago and The University of
013: * Southern California. All rights reserved.
014: */
015: package org.griphyn.cPlanner.transfer.refiner;
016:
017: import org.griphyn.cPlanner.classes.ADag;
018: import org.griphyn.cPlanner.classes.SubInfo;
019: import org.griphyn.cPlanner.classes.FileTransfer;
020: import org.griphyn.cPlanner.classes.PlannerOptions;
021: import org.griphyn.cPlanner.classes.NameValue;
022:
023: import org.griphyn.cPlanner.common.PegasusProperties;
024: import org.griphyn.cPlanner.common.LogManager;
025:
026: import org.griphyn.cPlanner.engine.ReplicaCatalogBridge;
027:
028: import org.griphyn.cPlanner.transfer.MultipleFTPerXFERJobRefiner;
029:
030: import org.griphyn.cPlanner.provenance.pasoa.XMLProducer;
031: import org.griphyn.cPlanner.provenance.pasoa.producer.XMLProducerFactory;
032:
033: import org.griphyn.cPlanner.provenance.pasoa.PPS;
034: import org.griphyn.cPlanner.provenance.pasoa.pps.PPSFactory;
035:
036: import java.util.ArrayList;
037: import java.util.Iterator;
038: import java.util.Collection;
039: import java.util.TreeMap;
040: import java.util.Map;
041: import java.util.List;
042:
043: /**
044: * The default transfer refiner, that implements the multiple refiner.
045: * For each compute job if required it creates the following
046: * - a single stagein transfer job
047: * - a single stageout transfer job
048: * - a single interpool transfer job
049: *
050: * In addition this implementation prevents file clobbering while staging in data
051: * to a remote site, that is shared amongst jobs.
052: *
053: * @author Karan Vahi
054: * @version $Revision: 258 $
055: */
056:
057: public class Default extends MultipleFTPerXFERJobRefiner {
058:
059: /**
060: * A short description of the transfer refinement.
061: */
062: public static final String DESCRIPTION = "Default Multiple Refinement ";
063:
064: /**
065: * The string holding the logging messages
066: */
067: protected String mLogMsg;
068:
069: /**
070: * A Map containing information about which logical file has been
071: * transferred to which site and the name of the stagein transfer node
072: * that is transferring the file from the location returned from
073: * the replica catalog.
074: * The key for the hashmap is logicalfilename:sitehandle and the value would be
075: * the name of the transfer node.
076: *
077: */
078: protected Map mFileTable;
079:
080: /**
081: * The handle to the provenance store implementation.
082: */
083: protected PPS mPPS;
084:
085: /**
086: * The overloaded constructor.
087: *
088: * @param dag the workflow to which transfer nodes need to be added.
089: * @param properties the <code>PegasusProperties</code> object containing all
090: * the properties required by Pegasus.
091: * @param options the options passed to the planner.
092: *
093: */
094: public Default(ADag dag, PegasusProperties properties,
095: PlannerOptions options) {
096: super (dag, properties, options);
097: mLogMsg = null;
098: mFileTable = new TreeMap();
099:
100: //load the PPS implementation
101: mPPS = PPSFactory.loadPPS(this .mProps);
102:
103: mXMLStore.add("<workflow url=\"" + mPOptions.getDAX() + "\">");
104:
105: //call the begin workflow method
106: try {
107: mPPS.beginWorkflowRefinementStep(this ,
108: PPS.REFINEMENT_STAGE, false);
109: } catch (Exception e) {
110: throw new RuntimeException("PASOA Exception", e);
111: }
112:
113: //clear the XML store
114: mXMLStore.clear();
115:
116: }
117:
118: /**
119: * Adds the stage in transfer nodes which transfer the input files for a job,
120: * from the location returned from the replica catalog to the job's execution
121: * pool.
122: *
123: * @param job <code>SubInfo</code> object corresponding to the node to
124: * which the files are to be transferred to.
125: * @param files Collection of <code>FileTransfer</code> objects containing the
126: * information about source and destURL's.
127: */
128: public void addStageInXFERNodes(SubInfo job, Collection files) {
129: String jobName = job.getName();
130: String pool = job.getSiteHandle();
131: int counter = 0;
132: String newJobName = this .STAGE_IN_PREFIX + jobName + "_"
133: + counter;
134: String key = null;
135: String msg = "Adding stagein transfer nodes for job " + jobName;
136: String par = null;
137: Collection stagedFiles = new ArrayList(1);
138:
139: //to prevent duplicate dependencies
140: java.util.HashSet tempSet = new java.util.HashSet();
141: int staged = 0;
142: for (Iterator it = files.iterator(); it.hasNext();) {
143: FileTransfer ft = (FileTransfer) it.next();
144: String lfn = ft.getLFN();
145:
146: //get the key for this lfn and pool
147: //if the key already in the table
148: //then remove the entry from
149: //the Vector and add a dependency
150: //in the graph
151: key = this .constructFileKey(lfn, pool);
152: par = (String) mFileTable.get(key);
153: //System.out.println("lfn " + lfn + " par " + par);
154: if (par != null) {
155: it.remove();
156:
157: //check if tempSet does not contain the parent
158: //fix for sonal's bug
159: if (tempSet.contains(par)) {
160: mLogMsg = "IGNORING TO ADD rc pull relation from rc tx node: "
161: + par
162: + " -> "
163: + jobName
164: + " for transferring file "
165: + lfn
166: + " to pool " + pool;
167:
168: mLogger
169: .log(mLogMsg,
170: LogManager.DEBUG_MESSAGE_LEVEL);
171:
172: } else {
173: mLogMsg = /*"Adding relation " + par + " -> " + jobName +*/
174: " For transferring file " + lfn;
175: mLogger
176: .log(mLogMsg,
177: LogManager.DEBUG_MESSAGE_LEVEL);
178: addRelation(par, jobName, pool, false);
179: tempSet.add(par);
180: }
181: } else {
182: if (ft.isTransferringExecutableFile()) {
183: //add to staged files for adding of
184: //set up job.
185: stagedFiles.add(ft);
186: //the staged execution file should be having the setup
187: //job as parent if it does not preserve x bit
188: if (mTXStageInImplementation.doesPreserveXBit()) {
189: mFileTable.put(key, newJobName);
190: } else {
191: mFileTable.put(key, mTXStageInImplementation
192: .getSetXBitJobName(jobName, staged++));
193: }
194: } else {
195: //make a new entry into the table
196: mFileTable.put(key, newJobName);
197: }
198: //add the newJobName to the tempSet so that even
199: //if the job has duplicate input files only one instance
200: //of transfer is scheduled. This came up during collapsing
201: //June 15th, 2004
202: tempSet.add(newJobName);
203: }
204: }
205:
206: if (!files.isEmpty()) {
207: mLogger.log(msg, LogManager.DEBUG_MESSAGE_LEVEL);
208: msg = "Adding new stagein transfer node named "
209: + newJobName;
210: mLogger.log(msg, LogManager.DEBUG_MESSAGE_LEVEL);
211:
212: //add a direct dependency between compute job
213: //and stagein job only if there is no
214: //executables being staged
215: if (stagedFiles.isEmpty()) {
216: //add the direct relation
217: addRelation(newJobName, jobName, pool, true);
218: SubInfo siJob = mTXStageInImplementation
219: .createTransferJob(job, files, null,
220: newJobName, SubInfo.STAGE_IN_JOB);
221: addJob(siJob);
222:
223: //record the action in the provenance store.
224: logRefinerAction(job, siJob, files, "stage-in");
225: } else {
226: //the dependency to stage in job is added via the
227: //the setup job that does the chmod
228: SubInfo siJob = mTXStageInImplementation
229: .createTransferJob(job, files, stagedFiles,
230: newJobName, SubInfo.STAGE_IN_JOB);
231:
232: addJob(siJob);
233: //record the action in the provenance store.
234: logRefinerAction(job, siJob, files, "stage-in");
235: }
236:
237: }
238:
239: }
240:
241: /**
242: * Adds the inter pool transfer nodes that are required for transferring
243: * the output files of the parents to the jobs execution site.
244: *
245: * @param job <code>SubInfo</code> object corresponding to the node to
246: * which the files are to be transferred to.
247: * @param files Collection of <code>FileTransfer</code> objects containing the
248: * information about source and destURL's.
249: */
250: public void addInterSiteTXNodes(SubInfo job, Collection files) {
251: String jobName = job.getName();
252: int counter = 0;
253: String newJobName = this .INTER_POOL_PREFIX + jobName + "_"
254: + counter;
255:
256: String msg = "Adding inter pool nodes for job " + jobName;
257: String prevParent = null;
258:
259: String lfn = null;
260: String key = null;
261: String par = null;
262: String pool = job.getSiteHandle();
263:
264: boolean toAdd = true;
265:
266: //to prevent duplicate dependencies
267: java.util.HashSet tempSet = new java.util.HashSet();
268:
269: //node construction only if there is
270: //a file to transfer
271: if (!files.isEmpty()) {
272: mLogger.log(msg, LogManager.DEBUG_MESSAGE_LEVEL);
273:
274: for (Iterator it = files.iterator(); it.hasNext();) {
275: FileTransfer ft = (FileTransfer) it.next();
276: lfn = ft.getLFN();
277: //System.out.println("Trying to figure out for lfn " + lfn);
278:
279: //to ensure that duplicate edges
280: //are not added in the graph
281: //between the parent of a node and the
282: //inter tx node that transfers the file
283: //to the node site.
284:
285: //get the key for this lfn and pool
286: //if the key already in the table
287: //then remove the entry from
288: //the Vector and add a dependency
289: //in the graph
290: key = this .constructFileKey(lfn, pool);
291: par = (String) mFileTable.get(key);
292: //System.out.println("\nGot Key :" + key + " Value :" + par );
293: if (par != null) {
294: //transfer of this file
295: //has already been scheduled
296: //onto the pool
297: it.remove();
298:
299: //check if tempSet does not contain the parent
300: if (tempSet.contains(par)) {
301: mLogMsg = "IGNORING TO ADD interpool relation 1 from inter tx node: "
302: + par
303: + " -> "
304: + jobName
305: + " for transferring file "
306: + lfn
307: + " to pool " + pool;
308:
309: mLogger.log(mLogMsg,
310: LogManager.DEBUG_MESSAGE_LEVEL);
311:
312: } else {
313: mLogMsg = "Adding interpool relation 1 from inter tx node: "
314: + par
315: + " -> "
316: + jobName
317: + " for transferring file "
318: + lfn
319: + " to pool " + pool;
320: mLogger.log(mLogMsg,
321: LogManager.DEBUG_MESSAGE_LEVEL);
322: addRelation(par, jobName);
323: tempSet.add(par);
324: }
325: } else {
326: //make a new entry into the table
327: mFileTable.put(key, newJobName);
328: //System.out.println("\nPut Key :" + key + " Value :" + newJobName );
329:
330: //to ensure that duplicate edges
331: //are not added in the graph
332: //between the parent of a node and the
333: //inter tx node that transfers the file
334: //to the node site.
335: if (prevParent == null
336: || !prevParent.equalsIgnoreCase(ft
337: .getJobName())) {
338:
339: mLogMsg = "Adding interpool relation 2"
340: + ft.getJobName() + " -> " + newJobName
341: + " for transferring file " + lfn
342: + " to pool " + pool;
343: mLogger.log(mLogMsg,
344: LogManager.DEBUG_MESSAGE_LEVEL);
345: addRelation(ft.getJobName(), newJobName);
346: }
347:
348: //we only need to add the relation between a
349: //inter tx node and a node once.
350: if (toAdd) {
351: mLogMsg = "Adding interpool relation 3"
352: + newJobName + " -> " + jobName
353: + " for transferring file " + lfn
354: + " to pool " + pool;
355: mLogger.log(mLogMsg,
356: LogManager.DEBUG_MESSAGE_LEVEL);
357: addRelation(newJobName, jobName);
358: tempSet.add(newJobName);
359: toAdd = false;
360: }
361:
362: }
363:
364: prevParent = ft.getJobName();
365: }
366:
367: //add the new job and construct it's
368: //subinfo only if the vector is not
369: //empty
370: if (!files.isEmpty()) {
371: msg = "Adding new inter pool node named " + newJobName;
372: mLogger.log(msg, LogManager.DEBUG_MESSAGE_LEVEL);
373:
374: //added in make transfer node
375: SubInfo interJob = mTXInterImplementation
376: .createTransferJob(job, files, null,
377: newJobName, SubInfo.INTER_POOL_JOB);
378:
379: addJob(interJob);
380:
381: this .logRefinerAction(job, interJob, files,
382: "inter-site");
383: }
384:
385: }
386: tempSet = null;
387:
388: }
389:
390: /**
391: * Adds the stageout transfer nodes, that stage data to an output site
392: * specified by the user.
393: *
394: * @param job <code>SubInfo</code> object corresponding to the node to
395: * which the files are to be transferred to.
396: * @param files Collection of <code>FileTransfer</code> objects containing the
397: * information about source and destURL's.
398: * @param rcb bridge to the Replica Catalog. Used for creating registration
399: * nodes in the workflow.
400: *
401: */
402: public void addStageOutXFERNodes(SubInfo job, Collection files,
403: ReplicaCatalogBridge rcb) {
404:
405: this .addStageOutXFERNodes(job, files, rcb, false);
406: }
407:
408: /**
409: * Adds the stageout transfer nodes, that stage data to an output site
410: * specified by the user.
411: *
412: * @param job <code>SubInfo</code> object corresponding to the node to
413: * which the files are to be transferred to.
414: * @param files Collection of <code>FileTransfer</code> objects containing the
415: * information about source and destURL's.
416: * @param rcb bridge to the Replica Catalog. Used for creating registration
417: * nodes in the workflow.
418: * @param deletedLeaf to specify whether the node is being added for
419: * a deleted node by the reduction engine or not.
420: * default: false
421: */
422: public void addStageOutXFERNodes(SubInfo job, Collection files,
423: ReplicaCatalogBridge rcb, boolean deletedLeaf) {
424: String jobName = job.getName();
425: int counter = 0;
426: String newJobName = this .STAGE_OUT_PREFIX + jobName + "_"
427: + counter;
428: String regJob = this .REGISTER_PREFIX + jobName;
429:
430: mLogMsg = "Adding output pool nodes for job " + jobName;
431:
432: //separate the files for transfer
433: //and for registration
434: List txFiles = new ArrayList();
435: List regFiles = new ArrayList();
436: for (Iterator it = files.iterator(); it.hasNext();) {
437: FileTransfer ft = (FileTransfer) it.next();
438: if (!ft.getTransientTransferFlag()) {
439: txFiles.add(ft);
440: }
441: if (!ft.getTransientRegFlag()) {
442: regFiles.add(ft);
443: }
444: }
445:
446: boolean makeTNode = !txFiles.isEmpty();
447: boolean makeRNode = !regFiles.isEmpty();
448:
449: if (!files.isEmpty()) {
450: mLogger.log(mLogMsg, LogManager.DEBUG_MESSAGE_LEVEL);
451: mLogMsg = "Adding new output pool node named " + newJobName;
452: mLogger.log(mLogMsg, LogManager.DEBUG_MESSAGE_LEVEL);
453:
454: if (makeTNode) {
455: //added in make transfer node
456: //mDag.addNewJob(newJobName);
457: SubInfo soJob = mTXStageOutImplementation
458: .createTransferJob(job, txFiles, null,
459: newJobName, SubInfo.STAGE_OUT_JOB);
460: addJob(soJob);
461: if (!deletedLeaf) {
462: addRelation(jobName, newJobName);
463: }
464: if (makeRNode) {
465: addRelation(newJobName, regJob);
466: }
467:
468: //log the refiner action
469: this .logRefinerAction(job, soJob, txFiles, "stage-out");
470: } else if (!makeTNode && makeRNode) {
471: addRelation(jobName, regJob);
472:
473: }
474: if (makeRNode) {
475: //call to make the reg subinfo
476: //added in make registration node
477: addJob(createRegistrationJob(regJob, job, regFiles, rcb));
478: }
479:
480: }
481:
482: }
483:
484: /**
485: * Creates the registration jobs, which registers the materialized files on
486: * the output site in the Replica Catalog.
487: *
488: * @param regJobName The name of the job which registers the files in the
489: * Replica Mechanism.
490: * @param job The job whose output files are to be registered in the
491: * Replica Mechanism.
492: * @param files Collection of <code>FileTransfer</code> objects containing
493: * the information about source and destURL's.
494: * @param rcb bridge to the Replica Catalog. Used for creating registration
495: * nodes in the workflow.
496: *
497: *
498: * @return the registration job.
499: */
500: protected SubInfo createRegistrationJob(String regJobName,
501: SubInfo job, Collection files, ReplicaCatalogBridge rcb) {
502:
503: SubInfo regJob = rcb.makeRCRegNode(regJobName, job, files);
504:
505: //log the registration action for provenance purposes
506: StringBuffer sb = new StringBuffer();
507: String indent = "\t";
508: sb.append(indent);
509: sb.append("<register job=\"").append(regJobName).append("\"> ");
510: sb.append("\n");
511:
512: //traverse through all the files
513: NameValue dest;
514: String newIndent = indent + "\t";
515: for (Iterator it = files.iterator(); it.hasNext();) {
516: FileTransfer ft = (FileTransfer) it.next();
517: dest = ft.getDestURL();
518: sb.append(newIndent);
519: sb.append("<file ");
520: appendAttribute(sb, "lfn", ft.getLFN());
521: appendAttribute(sb, "site", dest.getKey());
522: sb.append(">");
523: sb.append("\n");
524: sb.append(newIndent).append(indent);
525: sb.append(dest.getValue());
526: sb.append("\n");
527: sb.append(newIndent);
528: sb.append("</file>").append("\n");
529: }
530: sb.append(indent);
531: sb.append("</register>").append("\n");
532:
533: //log the graph relationship
534: String parent = job.getName();
535: String child = regJob.getName();
536:
537: sb.append(indent);
538: sb.append("<child ");
539: appendAttribute(sb, "ref", child);
540: sb.append(">").append("\n");
541:
542: sb.append(newIndent);
543: sb.append("<parent ");
544: appendAttribute(sb, "ref", parent);
545: sb.append("/>").append("\n");
546:
547: sb.append(indent);
548: sb.append("</child>").append("\n");
549:
550: mXMLStore.add(sb.toString());
551:
552: //log the action for creating the relationship assertions
553: try {
554: mPPS.registrationIntroducedFor(regJob.getName(), job
555: .getName());
556: } catch (Exception e) {
557: throw new RuntimeException(
558: "PASOA Exception while logging relationship assertion for registration",
559: e);
560: }
561:
562: return regJob;
563: }
564:
565: /**
566: * Signals that the traversal of the workflow is done. It signals to the
567: * Provenace Store, that refinement is complete.
568: */
569: public void done() {
570:
571: try {
572: mPPS.endWorkflowRefinementStep(this );
573: } catch (Exception e) {
574: throw new RuntimeException("PASOA Exception", e);
575: }
576:
577: }
578:
579: /**
580: * Add a new job to the workflow being refined.
581: *
582: * @param job the job to be added.
583: */
584: public void addJob(SubInfo job) {
585: mDAG.add(job);
586: }
587:
588: /**
589: * Adds a new relation to the workflow being refiner.
590: *
591: * @param parent the jobname of the parent node of the edge.
592: * @param child the jobname of the child node of the edge.
593: */
594: public void addRelation(String parent, String child) {
595: mLogger.log("Adding relation " + parent + " -> " + child,
596: LogManager.DEBUG_MESSAGE_LEVEL);
597: mDAG.addNewRelation(parent, child);
598:
599: }
600:
601: /**
602: * Adds a new relation to the workflow. In the case when the parent is a
603: * transfer job that is added, the parentNew should be set only the first
604: * time a relation is added. For subsequent compute jobs that maybe
605: * dependant on this, it needs to be set to false.
606: *
607: * @param parent the jobname of the parent node of the edge.
608: * @param child the jobname of the child node of the edge.
609: * @param site the execution pool where the transfer node is to be run.
610: * @param parentNew the parent node being added, is the new transfer job
611: * and is being called for the first time.
612: */
613: public void addRelation(String parent, String child, String site,
614: boolean parentNew) {
615: mLogger.log("Adding relation " + parent + " -> " + child,
616: LogManager.DEBUG_MESSAGE_LEVEL);
617: mDAG.addNewRelation(parent, child);
618:
619: }
620:
621: /**
622: * Returns a textual description of the transfer mode.
623: *
624: * @return a short textual description
625: */
626: public String getDescription() {
627: return this .DESCRIPTION;
628: }
629:
630: /**
631: * Records the refiner action into the Provenace Store as a XML fragment.
632: *
633: * @param computeJob the compute job.
634: * @param txJob the associated transfer job.
635: * @param files list of <code>FileTransfer</code> objects containing file transfers.
636: * @param type the type of transfer job
637: */
638: protected void logRefinerAction(SubInfo computeJob, SubInfo txJob,
639: Collection files, String type) {
640: StringBuffer sb = new StringBuffer();
641: String indent = "\t";
642: sb.append(indent);
643: sb.append("<transfer job=\"").append(txJob.getName()).append(
644: "\" ").append("type=\"").append(type).append("\">");
645: sb.append("\n");
646:
647: //traverse through all the files
648: NameValue source;
649: NameValue dest;
650: String newIndent = indent + "\t";
651: for (Iterator it = files.iterator(); it.hasNext();) {
652: FileTransfer ft = (FileTransfer) it.next();
653: source = ft.getSourceURL();
654: dest = ft.getDestURL();
655: sb.append(newIndent);
656: sb.append("<from ");
657: appendAttribute(sb, "site", source.getKey());
658: appendAttribute(sb, "lfn", ft.getLFN());
659: appendAttribute(sb, "url", source.getValue());
660: sb.append("/>");
661: sb.append("\n");
662:
663: sb.append(newIndent);
664: sb.append("<to ");
665: appendAttribute(sb, "site", dest.getKey());
666: appendAttribute(sb, "lfn", ft.getLFN());
667: appendAttribute(sb, "url", dest.getValue());
668: sb.append("/>");
669: sb.append("\n");
670: }
671: sb.append(indent);
672: sb.append("</transfer>");
673: sb.append("\n");
674:
675: //log the graph relationship
676: String parent = (txJob.getJobType() == SubInfo.STAGE_IN_JOB) ? txJob
677: .getName()
678: : computeJob.getName();
679:
680: String child = (txJob.getJobType() == SubInfo.STAGE_IN_JOB) ? computeJob
681: .getName()
682: : txJob.getName();
683:
684: sb.append(indent);
685: sb.append("<child ");
686: appendAttribute(sb, "ref", child);
687: sb.append(">").append("\n");
688:
689: sb.append(newIndent);
690: sb.append("<parent ");
691: appendAttribute(sb, "ref", parent);
692: sb.append("/>").append("\n");
693:
694: sb.append(indent);
695: sb.append("</child>").append("\n");
696:
697: //log the action for creating the relationship assertions
698: try {
699: List stagingNodes = new java.util.ArrayList(1);
700: stagingNodes.add(txJob.getName());
701: mPPS.stagingIntroducedFor(stagingNodes, computeJob
702: .getName());
703: } catch (Exception e) {
704: throw new RuntimeException(
705: "PASOA Exception while logging relationship assertion for staging ",
706: e);
707: }
708:
709: mXMLStore.add(sb.toString());
710:
711: }
712:
713: /**
714: * Appends an xml attribute to the xml feed.
715: *
716: * @param xmlFeed the xmlFeed to which xml is being written
717: * @param key the attribute key
718: * @param value the attribute value
719: */
720: protected void appendAttribute(StringBuffer xmlFeed, String key,
721: String value) {
722: xmlFeed.append(key).append("=").append("\"").append(value)
723: .append("\" ");
724: }
725:
726: /**
727: * Constructs the key for an entry to the file table. The key returned
728: * is lfn:siteHandle
729: *
730: * @param lfn the logical filename of the file that has to be
731: * transferred.
732: * @param siteHandle the name of the site to which the file is being
733: * transferred.
734: *
735: * @return the key for the entry to be made in the filetable.
736: */
737: protected String constructFileKey(String lfn, String siteHandle) {
738: StringBuffer sb = new StringBuffer();
739: sb.append(lfn).append(":").append(siteHandle);
740:
741: return sb.toString();
742: }
743:
744: }
|