001: /*
002: * This file or a portion of this file is licensed under the terms of
003: * the Globus Toolkit Public License, found in file GTPL, or at
004: * http://www.globus.org/toolkit/download/license.html. This notice must
005: * appear in redistributions of this file, with or without modification.
006: *
007: * Redistributions of this Software, with or without modification, must
008: * reproduce the GTPL in: (1) the Software, or (2) the Documentation or
009: * some other similar material which is provided with the Software (if
010: * any).
011: *
012: * Copyright 1999-2004 University of Chicago and The University of
013: * Southern California. All rights reserved.
014: */
015:
016: package org.griphyn.cPlanner.engine;
017:
018: import org.griphyn.cPlanner.classes.ADag;
019: import org.griphyn.cPlanner.classes.DagInfo;
020: import org.griphyn.cPlanner.classes.PCRelation;
021: import org.griphyn.cPlanner.classes.PegasusFile;
022: import org.griphyn.cPlanner.classes.PlannerOptions;
023: import org.griphyn.cPlanner.classes.SubInfo;
024:
025: import org.griphyn.cPlanner.common.LogManager;
026: import org.griphyn.cPlanner.common.PegasusProperties;
027:
028: import org.griphyn.cPlanner.provenance.pasoa.XMLProducer;
029: import org.griphyn.cPlanner.provenance.pasoa.producer.XMLProducerFactory;
030:
031: import org.griphyn.cPlanner.provenance.pasoa.PPS;
032: import org.griphyn.cPlanner.provenance.pasoa.pps.PPSFactory;
033:
034: import java.util.Enumeration;
035: import java.util.Set;
036: import java.util.HashSet;
037: import java.util.Vector;
038: import java.util.Iterator;
039:
040: /**
041: *
042: * Reduction engine for Planner 2.
043: * Given a ADAG it looks up the replica catalog and
044: * determines which output files are in the
045: * Replica Catalog, and on the basis of these
046: * ends up reducing the dag.
047: *
048: * @author Karan Vahi
049: * @author Gaurang Mehta
050: * @version $Revision: 258 $
051: *
052: */
053:
054: public class ReductionEngine extends Engine implements Refiner {
055:
056: /**
057: * the original dag object which
058: * needs to be reduced on the basis of
059: * the results returned from the
060: * Replica Catalog
061: */
062: private ADag mOriginalDag;
063:
064: /**
065: * the dag relations of the
066: * orginal dag
067: */
068: private Vector mOrgDagRelations;
069:
070: /**
071: * the reduced dag object which is
072: * returned.
073: */
074: private ADag mReducedDag;
075:
076: /**
077: * the jobs which are found to be in
078: * the Replica Catalog. These are
079: * the jobs whose output files are at
080: * some location in the Replica Catalog.
081: * This does not include the jobs which
082: * are deleted by applying the reduction
083: * algorithm
084: */
085: private Vector mOrgJobsInRC;
086:
087: /**
088: * the jobs which are deleted due
089: * to the application of the
090: * Reduction algorithm. These do
091: * not include the jobs whose output
092: * files are in the RC. These are
093: * the ones which are deleted due
094: * to cascade delete
095: */
096: private Vector mAddJobsDeleted;
097:
098: /**
099: * all deleted jobs. This
100: * is mOrgJobsInRC + mAddJobsDeleted.
101: */
102: private Vector mAllDeletedJobs;
103:
104: /**
105: * the files whose locations are
106: * returned from the ReplicaCatalog
107: */
108: private Set mFilesInRC;
109:
110: /**
111: * The XML Producer object that records the actions.
112: */
113: private XMLProducer mXMLStore;
114:
115: /**
116: * The workflow object being worked upon.
117: */
118: private ADag mWorkflow;
119:
120: /**
121: * The constructor
122: *
123: * @param orgDag The original Dag object
124: * @param properties the <code>PegasusProperties</code> to be used.
125: * @param options The options specified by
126: * the user to run the
127: * planner
128: */
129: public ReductionEngine(ADag orgDag, PegasusProperties properties,
130: PlannerOptions options) {
131: super (properties);
132: this .mPOptions = options;
133: mOriginalDag = orgDag;
134: mOrgDagRelations = mOriginalDag.dagInfo.relations;
135: mOrgJobsInRC = new Vector();
136: mAddJobsDeleted = new Vector();
137: mAllDeletedJobs = new Vector();
138: mXMLStore = XMLProducerFactory.loadXMLProducer(properties);
139: mWorkflow = orgDag;
140: }
141:
142: /**
143: * Returns a reference to the workflow that is being refined by the refiner.
144: *
145: *
146: * @return ADAG object.
147: */
148: public ADag getWorkflow() {
149: return this .mWorkflow;
150: }
151:
152: /**
153: * Returns a reference to the XMLProducer, that generates the XML fragment
154: * capturing the actions of the refiner. This is used for provenace
155: * purposes.
156: *
157: * @return XMLProducer
158: */
159: public XMLProducer getXMLProducer() {
160: return this .mXMLStore;
161: }
162:
163: /**
164: * Reduces the workflow on the basis of the existence of lfn's in the
165: * replica catalog. The existence of files, is determined via the bridge.
166: *
167: * @param rcb instance of the replica catalog bridge.
168: *
169: * @return the reduced dag
170: *
171: */
172: public ADag reduceDag(ReplicaCatalogBridge rcb) {
173:
174: //search for the replicas of
175: //the files. The search list
176: //is already present in Replica
177: //Mechanism classes
178: mFilesInRC = rcb.getFilesInReplica();
179:
180: //we reduce the dag only if the
181: //force option is not specified.
182: if (mPOptions.getForce())
183: return mOriginalDag;
184:
185: //load the PPS implementation
186: PPS pps = PPSFactory.loadPPS(this .mProps);
187:
188: //mXMLStore.add( "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" );
189: mXMLStore.add("<workflow url=\"" + mPOptions.getDAX() + "\">");
190:
191: //call the begin workflow method
192: try {
193: pps.beginWorkflowRefinementStep(this ,
194: PPS.REFINEMENT_REDUCE, true);
195: } catch (Exception e) {
196: throw new RuntimeException("PASOA Exception", e);
197: }
198:
199: //clear the XML store
200: mXMLStore.clear();
201:
202: mLogger.log("Reducing the workflow",
203: LogManager.DEBUG_MESSAGE_LEVEL);
204: mOrgJobsInRC = getJobsInRC(mOriginalDag.vJobSubInfos,
205: mFilesInRC);
206: mAllDeletedJobs = (Vector) mOrgJobsInRC.clone();
207: firstPass(mOrgJobsInRC);
208: secondPass();
209: firstPass(mAddJobsDeleted);
210:
211: mLogMsg = "Nodes/Jobs Deleted from the Workflow during reduction ";
212: mLogger.log(mLogMsg, LogManager.DEBUG_MESSAGE_LEVEL);
213: for (Enumeration e = mAllDeletedJobs.elements(); e
214: .hasMoreElements();) {
215: String deletedJob = (String) e.nextElement();
216: mLogger.log("\t" + deletedJob,
217: LogManager.DEBUG_MESSAGE_LEVEL);
218: mXMLStore.add("<removed job = \"" + deletedJob + "\"/>");
219: mXMLStore.add("\n");
220: }
221: mLogger.logCompletion(mLogMsg, LogManager.DEBUG_MESSAGE_LEVEL);
222: mReducedDag = makeRedDagObject(mOriginalDag, mAllDeletedJobs);
223:
224: //call the end workflow method for pasoa interactions
225: try {
226: mWorkflow = mReducedDag;
227:
228: for (Iterator it = mWorkflow.jobIterator(); it.hasNext();) {
229: SubInfo job = (SubInfo) it.next();
230: pps.isIdenticalTo(job.getName(), job.getName());
231: }
232:
233: pps.endWorkflowRefinementStep(this );
234: } catch (Exception e) {
235: throw new RuntimeException("PASOA Exception", e);
236: }
237:
238: mLogger.logCompletion("Reducing the workflow",
239: LogManager.DEBUG_MESSAGE_LEVEL);
240: return mReducedDag;
241: }
242:
243: /**
244: * This determines the jobs which are in
245: * the RC corresponding to the files found
246: * in the Replica Catalog. A job is said to
247: * be in the RC if all the outfiles for
248: * that job are found to be in the RC.
249: * A job in RC can be removed from the Dag
250: * and the Dag correspondingly reduced.
251: *
252: * @param vSubInfos Vector of <code>SubInfo</code>
253: * objects corresponding to all
254: * the jobs of a Abstract Dag
255: *
256: * @param filesInRC Set of <code>String</code>
257: * objects corresponding to the
258: * logical filenames of files
259: * which are found to be in the
260: * Replica Catalog
261: *
262: * @return a Vector of jobNames (Strings)
263: *
264: * @see org.griphyn.cPlanner.classes.ReplicaLocations
265: * @see org.griphyn.cPlanner.classes.SubInfo
266: */
267: private Vector getJobsInRC(Vector vSubInfos, Set filesInRC) {
268: SubInfo subInfo;
269: Set vJobOutputFiles;
270: String jobName;
271: Vector vJobsInReplica = new Vector();
272: int noOfOutputFilesInJob = 0;
273: int noOfSuccessfulMatches = 0;
274:
275: if (vSubInfos.isEmpty()) {
276: String msg = "ReductionEngine: The set of jobs in the workflow "
277: + "\n is empty.";
278: mLogger.log(msg, LogManager.DEBUG_MESSAGE_LEVEL);
279: return new Vector();
280: }
281:
282: Enumeration e = vSubInfos.elements();
283: mLogger.log("Jobs whose o/p files already exist",
284: LogManager.DEBUG_MESSAGE_LEVEL);
285: while (e.hasMoreElements()) {
286: //getting submit information about each submit file of a job
287: subInfo = (SubInfo) e.nextElement();
288: jobName = subInfo.jobName;
289: //System.out.println(jobName);
290:
291: if (!subInfo.outputFiles.isEmpty()) {
292: vJobOutputFiles = subInfo.getOutputFiles();
293: } else {
294: vJobOutputFiles = new HashSet();
295: }
296:
297: /* Commented on Oct10. This ended up making the
298: Planner doing duplicate transfers
299: if(subInfo.stdOut.length()>0)
300: vJobOutputFiles.addElement(subInfo.stdOut);
301: */
302:
303: //determine the no of output files for that job
304: if (vJobOutputFiles.isEmpty()) {
305: mLogger.log("Job " + subInfo.getName()
306: + " has no o/p files",
307: LogManager.DEBUG_MESSAGE_LEVEL);
308: }
309:
310: for (Iterator temp = vJobOutputFiles.iterator(); temp
311: .hasNext();) {
312: temp.next();
313: noOfOutputFilesInJob++;
314: }
315:
316: //traversing through the output files of that particular job
317: for (Iterator en = vJobOutputFiles.iterator(); en.hasNext();) {
318: PegasusFile pf = (PegasusFile) en.next();
319: //jobName = pf.getLFN();
320: //if(stringInList(jobName,filesInRC)){
321: if (filesInRC.contains(pf.getLFN())) {
322: noOfSuccessfulMatches++;
323: }
324: }
325:
326: //if the noOfOutputFilesInJob and noOfSuccessfulMatches are equal
327: //this means that all files required by job is in RC
328: if (noOfOutputFilesInJob == noOfSuccessfulMatches) {
329: mLogger.log("\t" + subInfo.jobName,
330: LogManager.DEBUG_MESSAGE_LEVEL);
331: vJobsInReplica.addElement(subInfo.jobName);
332: }
333: //reinitialise the variables
334: noOfSuccessfulMatches = 0;
335: noOfOutputFilesInJob = 0;
336: }
337: mLogger.logCompletion("Jobs whose o/p files already exist",
338: LogManager.DEBUG_MESSAGE_LEVEL);
339: return vJobsInReplica;
340:
341: }
342:
343: /**
344: * If a job is deleted it marks
345: * all the relations related to that
346: * job as deleted
347: *
348: * @param vDelJobs the vector containing the names
349: * of the deleted jobs whose relations
350: * we want to nullify
351: */
352: private void firstPass(Vector vDelJobs) {
353: Enumeration edeljobs = vDelJobs.elements();
354: while (edeljobs.hasMoreElements()) {
355: String deljob = (String) edeljobs.nextElement();
356:
357: Enumeration epcrel = mOrgDagRelations.elements();
358: while (epcrel.hasMoreElements()) {
359: PCRelation pcrc = (PCRelation) epcrel.nextElement();
360: if ((pcrc.child.equalsIgnoreCase(deljob))
361: || (pcrc.parent.equalsIgnoreCase(deljob))) {
362: pcrc.isDeleted = true;
363: }
364: }
365: }
366:
367: }
368:
369: /**
370: * In the second pass we find all the
371: * parents of the nodes which have been
372: * found to be in the RC.
373: * Corresponding to each parent, we find
374: * the corresponding siblings for that
375: * deleted job.
376: * If all the siblings are deleted, we
377: * can delete that parent.
378: */
379: private void secondPass() {
380: Enumeration eDelJobs = mAllDeletedJobs.elements();
381: Enumeration ePcRel;
382: Enumeration eParents;
383: String node;
384: String parentNodeName;
385: PCRelation currentRelPair;
386:
387: Vector vParents = new Vector();//all parents of a particular node
388: Vector vSiblings = new Vector();
389:
390: while (eDelJobs.hasMoreElements()) {
391: node = (String) eDelJobs.nextElement();
392:
393: //getting the parents of that node
394: vParents = this .getNodeParents(node);
395:
396: //now for each parent checking if the siblings are deleted
397: //if yes then delete the node
398: eParents = vParents.elements();
399: while (eParents.hasMoreElements()) {
400: parentNodeName = (String) eParents.nextElement();
401:
402: //getting all the siblings for parentNodeName
403: vSiblings = this .getChildren(parentNodeName);
404:
405: //now we checking if all the siblings are in vdeljobs
406: Enumeration temp = vSiblings.elements();
407: boolean siblingsDeleted = true;
408: while (temp.hasMoreElements()) {
409: if (stringInVector((String) temp.nextElement(),
410: mAllDeletedJobs)) {
411: //means the sibling has been marked deleted
412: } else {
413: siblingsDeleted = false;
414: }
415: }
416:
417: //if all siblings are deleted add the job to vdeljobs
418: if (siblingsDeleted) {
419:
420: //only add if the parentNodeName is not already in the list
421: if (!stringInVector(parentNodeName, mAllDeletedJobs)) {
422: String msg = "Deleted Node :" + parentNodeName;
423: mLogger
424: .log(msg,
425: LogManager.DEBUG_MESSAGE_LEVEL);
426: mAddJobsDeleted.addElement(new String(
427: parentNodeName));
428: mAllDeletedJobs.addElement(new String(
429: parentNodeName));
430:
431: }
432: }
433:
434: //clearing the siblings vector for that parent
435: vSiblings.clear();
436:
437: }//end of while(eParents.hasMoreElements()){
438:
439: //clearing the parents Vector for that job
440: vParents.clear();
441:
442: }//end of while(eDelJobs.hasMoreElements)
443: }
444:
445: /**
446: * Gets all the parents of a particular node.
447: *
448: * @param node the name of the job whose parents are to be found.
449: *
450: * @return Vector corresponding to the parents of the node.
451: */
452: private Vector getNodeParents(String node) {
453: //getting the parents of that node
454: return mOriginalDag.getParents(node);
455: }
456:
457: /**
458: * Gets all the children of a particular node.
459: *
460: * @param node the name of the node whose children we want to find.
461: *
462: * @return Vector containing the children of the node.
463: */
464: private Vector getChildren(String node) {
465: return mOriginalDag.getChildren(node);
466: }
467:
468: /**
469: * All the deleted jobs which
470: * happen to be leaf nodes. This
471: * entails that the output files
472: * of these jobs be transferred
473: * from the location returned
474: * by the Replica Catalog to the
475: * pool specified.
476: * This is a subset of mAllDeletedJobs
477: * Also to determine the deleted
478: * leaf jobs it refers the original
479: * dag, not the reduced dag.
480: *
481: * @return Vector containing the <code>SubInfo</code> of deleted leaf jobs.
482: */
483: public Vector getDeletedLeafJobs() {
484: Vector delLeafJobs = new Vector();
485:
486: mLogger.log("Finding deleted leaf jobs",
487: LogManager.DEBUG_MESSAGE_LEVEL);
488: for (Iterator it = mAllDeletedJobs.iterator(); it.hasNext();) {
489: String job = (String) it.next();
490: if (getChildren(job).isEmpty()) {
491: //means a leaf job
492: String msg = "Found deleted leaf job :" + job;
493: mLogger.log(msg, LogManager.DEBUG_MESSAGE_LEVEL);
494: delLeafJobs.addElement(mOriginalDag.getSubInfo(job));
495:
496: }
497: }
498: mLogger.logCompletion("Finding deleted leaf jobs",
499: LogManager.DEBUG_MESSAGE_LEVEL);
500: return delLeafJobs;
501: }
502:
503: /**
504: * makes the Reduced Dag object which
505: * corresponding to the deleted jobs
506: * which are specified.
507: *
508: * Note : We are plainly copying the
509: * inputFiles and the outputFiles. After
510: * reduction this changes but since we
511: * need those only to look up the RC,
512: * which we have done.
513: *
514: * @param orgDag the original Dag
515: * @param vDelJobs the Vector containing the
516: * names of the jobs whose
517: * SubInfos and Relations we
518: * want to remove.
519: *
520: * @return the reduced dag, which doesnot
521: * have the deleted jobs
522: *
523: */
524: public ADag makeRedDagObject(ADag orgDag, Vector vDelJobs) {
525: ADag redDag = new ADag();
526: redDag.dagInfo = constructNewDagInfo(mOriginalDag.dagInfo,
527: vDelJobs);
528: redDag.vJobSubInfos = constructNewSubInfos(
529: mOriginalDag.vJobSubInfos, vDelJobs);
530: return redDag;
531: }
532:
533: /**
534: * Constructs a DagInfo object for the
535: * decomposed Dag on the basis of the jobs
536: * which are deleted from the DAG by the
537: * reduction algorithm
538: *
539: * Note : We are plainly copying the
540: * inputFiles and the outputFiles. After reduction
541: * this changes but since we need those
542: * only to look up the RC, which we have done.
543: *
544: * @param dagInfo the object which is reduced on
545: * the basis of vDelJobs
546: *
547: * @param vDelJobs Vector containing the logical file
548: * names of jobs which are to
549: * be deleted
550: *
551: * @return the DagInfo object corresponding
552: * to the Decomposed Dag
553: *
554: */
555:
556: private DagInfo constructNewDagInfo(DagInfo dagInfo, Vector vDelJobs) {
557: DagInfo newDagInfo = (DagInfo) dagInfo.clone();
558: String jobName;
559:
560: PCRelation currentRelation;
561: String parentName;
562: String childName;
563: boolean deleted;
564:
565: //populating DagJobs
566: newDagInfo.dagJobs = new Vector();
567: Enumeration e = dagInfo.dagJobs.elements();
568: while (e.hasMoreElements()) {
569: jobName = (String) e.nextElement();
570: if (!stringInVector(jobName, vDelJobs)) {
571: //that job is to be executed so we add it
572: newDagInfo.dagJobs.addElement(new String(jobName));
573: }
574: }
575:
576: //populating PCRelation Vector
577: newDagInfo.relations = new Vector();
578: e = dagInfo.relations.elements();
579: while (e.hasMoreElements()) {
580: currentRelation = (PCRelation) e.nextElement();
581: parentName = new String(currentRelation.parent);
582: childName = new String(currentRelation.child);
583:
584: if (!(currentRelation.isDeleted)) {//the pair has not been marked deleted
585: newDagInfo.relations.addElement(new PCRelation(
586: parentName, childName, false));
587: }
588: }
589:
590: return newDagInfo;
591:
592: }//end of function
593:
594: /**
595: * constructs the Vector of subInfo objects
596: * corresponding to the reduced ADAG.
597: *
598: * It also modifies the strargs to remove
599: * them up of markup and display correct paths
600: * to the filenames
601: *
602: *
603: * @param vSubInfos the SubInfo object including
604: * the jobs which are not needed
605: * after the execution of the
606: * reduction algorithm
607: *
608: * @param vDelJobs the jobs which are deleted by
609: * the reduction algo as their
610: * output files are in the Replica Catalog
611: *
612: * @return the SubInfo objects except the ones
613: * for the deleted jobs
614: *
615: */
616:
617: private Vector constructNewSubInfos(Vector vSubInfos,
618: Vector vDelJobs) {
619: Vector vNewSubInfos = new Vector();
620: SubInfo newSubInfo;
621: SubInfo currentSubInfo;
622: String jobName;
623:
624: Enumeration e = vSubInfos.elements();
625:
626: while (e.hasMoreElements()) {
627: currentSubInfo = (SubInfo) e.nextElement();
628: jobName = currentSubInfo.jobName;
629: //we add only if the jobName is not in vDelJobs
630: if (!stringInVector(jobName, vDelJobs)) {
631: newSubInfo = (SubInfo) currentSubInfo.clone();
632: //adding to Vector
633: vNewSubInfos.addElement(newSubInfo);
634: }
635:
636: }//end of while
637: return vNewSubInfos;
638: }
639:
640: }
|