001: /**
002: * This file or a portion of this file is licensed under the terms of
003: * the Globus Toolkit Public License, found in file GTPL, or at
004: * http://www.globus.org/toolkit/download/license.html. This notice must
005: * appear in redistributions of this file, with or without modification.
006: *
007: * Redistributions of this Software, with or without modification, must
008: * reproduce the GTPL in: (1) the Software, or (2) the Documentation or
009: * some other similar material which is provided with the Software (if
010: * any).
011: *
012: * Copyright 1999-2004 University of Chicago and The University of
013: * Southern California. All rights reserved.
014: */package org.griphyn.cPlanner.cluster;
015:
016: import org.griphyn.cPlanner.classes.ADag;
017: import org.griphyn.cPlanner.classes.SubInfo;
018: import org.griphyn.cPlanner.classes.PCRelation;
019: import org.griphyn.cPlanner.classes.AggregatedJob;
020:
021: import org.griphyn.cPlanner.common.PegasusProperties;
022: import org.griphyn.cPlanner.common.LogManager;
023:
024: import org.griphyn.cPlanner.cluster.aggregator.JobAggregatorInstanceFactory;
025:
026: import org.griphyn.cPlanner.namespace.VDS;
027:
028: import org.griphyn.cPlanner.partitioner.Partition;
029:
030: import org.griphyn.cPlanner.provenance.pasoa.XMLProducer;
031: import org.griphyn.cPlanner.provenance.pasoa.producer.XMLProducerFactory;
032:
033: import org.griphyn.cPlanner.provenance.pasoa.PPS;
034: import org.griphyn.cPlanner.provenance.pasoa.pps.PPSFactory;
035:
036: import java.util.Collections;
037: import java.util.List;
038: import java.util.ArrayList;
039: import java.util.LinkedList;
040: import java.util.Map;
041: import java.util.HashMap;
042: import java.util.Iterator;
043: import java.util.Comparator;
044: import java.util.Set;
045: import java.util.StringTokenizer;
046: import org.griphyn.cPlanner.classes.PegasusBag;
047:
048: /**
049: * The horizontal clusterer, that clusters jobs on the same level.
050: *
051: * @author Karan Vahi
052: * @version $Revision: 450 $
053: */
054:
055: public class Horizontal implements Clusterer,
056: org.griphyn.cPlanner.engine.Refiner {//reqd for PASOA integration
057:
058: /**
059: * The default collapse factor for collapsing jobs with same logical name
060: * scheduled onto the same execution pool.
061: */
062: public static final int DEFAULT_COLLAPSE_FACTOR = 3;
063:
064: /**
065: * A short description about the partitioner.
066: */
067: public static final String DESCRIPTION = "Horizontal Clustering";
068:
069: /**
070: * A singleton access to the job comparator.
071: */
072: private static Comparator mJobComparator = null;
073:
074: /**
075: * The handle to the logger object.
076: */
077: protected LogManager mLogger;
078:
079: /**
080: * The handle to the properties object holding all the properties.
081: */
082: protected PegasusProperties mProps;
083:
084: /**
085: * The handle to the job aggregator factory.
086: */
087: protected JobAggregatorInstanceFactory mJobAggregatorFactory;
088:
089: /**
090: * ADag object containing the jobs that have been scheduled by the site
091: * selector.
092: */
093: private ADag mScheduledDAG;
094:
095: /**
096: * Map to hold the jobs sorted by the label of jobs in dax.
097: * The key is the logical job name and value is the list of jobs with that
098: * logical name.
099: *
100: * This no longer used, and would be removed later.
101: */
102: private Map mJobMap;
103:
104: /**
105: * A Map to store all the job(SubInfo) objects indexed by their logical ID found in
106: * the dax. This should actually be in the ADag structure.
107: */
108: private Map mSubInfoMap;
109:
110: /**
111: * Map to hold the collapse values for the various execution pools. The
112: * values are gotten from the properties file or can be gotten from the
113: * resource information catalog a.k.a MDS.
114: */
115: private Map mCollapseMap;
116:
117: /**
118: * Replacement table, that identifies the corresponding fat job for a job.
119: */
120: private Map mReplacementTable;
121:
122: /**
123: * The XML Producer object that records the actions.
124: */
125: private XMLProducer mXMLStore;
126:
127: /**
128: * The handle to the provenance store implementation.
129: */
130: private PPS mPPS;
131:
132: /**
133: * Singleton access to the job comparator.
134: *
135: * @return the job comparator.
136: */
137: private Comparator jobComparator() {
138: return (mJobComparator == null) ? new JobComparator()
139: : mJobComparator;
140: }
141:
142: /**
143: * The default constructor.
144: */
145: public Horizontal() {
146: mLogger = LogManager.getInstance();
147: mJobAggregatorFactory = new JobAggregatorInstanceFactory();
148: }
149:
150: /**
151: * Returns a reference to the workflow that is being refined by the refiner.
152: *
153: *
154: * @return ADAG object.
155: */
156: public ADag getWorkflow() {
157: return this .mScheduledDAG;
158: }
159:
160: /**
161: * Returns a reference to the XMLProducer, that generates the XML fragment
162: * capturing the actions of the refiner. This is used for provenace
163: * purposes.
164: *
165: * @return XMLProducer
166: */
167: public XMLProducer getXMLProducer() {
168: return this .mXMLStore;
169: }
170:
171: /**
172: *Initializes the Clusterer impelementation
173: *
174: * @param dag the workflow that is being clustered.
175: * @param bag the bag of objects that is useful for initialization.
176: *
177: * @throws ClustererException in case of error.
178: */
179: public void initialize(ADag dag, PegasusBag bag)
180: throws ClustererException {
181: mScheduledDAG = dag;
182: mProps = bag.getPegasusProperties();
183: mJobAggregatorFactory.initialize(dag, bag);
184:
185: mJobMap = new HashMap();
186: mCollapseMap = this .constructMap(mProps.getCollapseFactors());
187: mReplacementTable = new HashMap();
188: mSubInfoMap = new HashMap();
189:
190: for (Iterator it = mScheduledDAG.vJobSubInfos.iterator(); it
191: .hasNext();) {
192: //pass the jobs to the callback
193: SubInfo job = (SubInfo) it.next();
194: mSubInfoMap.put(job.getLogicalID(), job);
195: }
196:
197: //load the PPS implementation
198: mXMLStore = XMLProducerFactory.loadXMLProducer(mProps);
199: mPPS = PPSFactory.loadPPS(this .mProps);
200:
201: mXMLStore.add("<workflow url=\"" + null + "\">");
202:
203: //call the begin workflow method
204: try {
205: mPPS.beginWorkflowRefinementStep(this ,
206: PPS.REFINEMENT_CLUSTER, false);
207: } catch (Exception e) {
208: throw new ClustererException("PASOA Exception", e);
209: }
210:
211: //clear the XML store
212: mXMLStore.clear();
213:
214: }
215:
216: /**
217: * Determine the clusters for a partition. The partition is assumed to
218: * contain independant jobs, and multiple clusters maybe created for the
219: * partition. Internally the jobs are grouped according to transformation name
220: * and then according to the execution site. Each group
221: * (having same transformation name and scheduled on same site), is then
222: * clustered.
223: * The number of clustered jobs created for each group is dependant on the
224: * following VDS profiles that can be associated with the jobs.
225: * <pre>
226: * 1) bundle (dictates the number of clustered jobs that are created)
227: * 2) collapse (the number of jobs that make a single clustered job)
228: * </pre>
229: *
230: * In case of both parameters being associated with the jobs in a group, the
231: * bundle parameter overrides collapse parameter.
232: *
233: * @param partition the partition for which the clusters need to be
234: * determined.
235: *
236: * @throws ClustererException in case of error.
237: *
238: * @see VDS#BUNDLE_KEY
239: * @see VDS#COLLAPSE_KEY
240: */
241: public void determineClusters(Partition partition)
242: throws ClustererException {
243: Set s = partition.getNodeIDs();
244: List l = new ArrayList(s.size());
245: mLogger.log("Collapsing jobs in partition " + partition.getID()
246: + " " + s, LogManager.DEBUG_MESSAGE_LEVEL);
247:
248: for (Iterator it = s.iterator(); it.hasNext();) {
249: SubInfo job = (SubInfo) mSubInfoMap.get(it.next());
250: l.add(job);
251: }
252: //group the jobs by their transformation names
253: Collections.sort(l, jobComparator());
254: //traverse through the list and collapse jobs
255: //referring to same logical transformation
256: SubInfo previous = null;
257: List clusterList = new LinkedList();
258: SubInfo job = null;
259: for (Iterator it = l.iterator(); it.hasNext();) {
260: job = (SubInfo) it.next();
261: if (previous == null
262: || job.getCompleteTCName().equals(
263: previous.getCompleteTCName())) {
264: clusterList.add(job);
265: } else {
266: //at boundary collapse jobs
267: collapseJobs(previous.getStagedExecutableBaseName(),
268: clusterList, partition.getID());
269: clusterList = new LinkedList();
270: clusterList.add(job);
271: }
272: previous = job;
273: }
274: //cluster the last clusterList
275: if (previous != null) {
276: collapseJobs(previous.getStagedExecutableBaseName(),
277: clusterList, partition.getID());
278: }
279:
280: }
281:
282: /**
283: * Am empty implementation of the callout, as state is maintained
284: * internally to determine the relations between the jobs.
285: *
286: * @param partitionID the id of a partition.
287: * @param parents the list of <code>String</code> objects that contain
288: * the id's of the parents of the partition.
289: *
290: * @throws ClustererException in case of error.
291: */
292: public void parents(String partitionID, List parents)
293: throws ClustererException {
294:
295: }
296:
297: /**
298: * Collapses the jobs having the same logical name according to the sites
299: * where they are scheduled.
300: *
301: * @param name the logical name of the jobs in the list passed to
302: * this function.
303: * @param jobs the list <code>SubInfo</code> objects corresponding
304: * to the jobs that have the same logical name.
305: * @param partitionID the ID of the partition to which the jobs belong.
306: */
307: private void collapseJobs(String name, List jobs, String partitionID) {
308: String key = null;
309: SubInfo job = null;
310: List l = null;
311: //internal map that keeps the jobs according to the execution pool
312: Map tempMap = new java.util.HashMap();
313: int[] cFactor = new int[2]; //the collapse factor for collapsing the jobs
314: cFactor[0] = 0;
315: cFactor[1] = 0;
316: AggregatedJob fatJob = null;
317:
318: mLogger.log("Collapsing jobs of type " + name,
319: LogManager.DEBUG_MESSAGE_LEVEL);
320:
321: //traverse through all the jobs and order them by the
322: //pool on which they are scheduled
323: for (Iterator it = jobs.iterator(); it.hasNext();) {
324:
325: job = (SubInfo) it.next();
326: key = job.executionPool;
327: //check if the job logical name is already in the map
328: if (tempMap.containsKey(key)) {
329: //add the job to the corresponding list.
330: l = (List) tempMap.get(key);
331: l.add(job);
332: } else {
333: //first instance of this logical name
334: l = new java.util.LinkedList();
335: l.add(job);
336: tempMap.put(key, l);
337: }
338: }
339:
340: //iterate through the built up temp map to get jobs per execution pool
341: String factor = null;
342: int size = -1;
343: //the id for the fatjobs. we want ids
344: //unique across the execution pools for a
345: //particular type of job being merged.
346: int id = 1;
347:
348: for (Iterator it = tempMap.entrySet().iterator(); it.hasNext();) {
349: Map.Entry entry = (Map.Entry) it.next();
350: l = (List) entry.getValue();
351: size = l.size();
352: //the pool name on which the job is to run is the key
353: key = (String) entry.getKey();
354:
355: if (size <= 1) {
356: //no need to collapse one job. go to the next iteration
357: mLogger.log("\t No collapsing for execution pool "
358: + key, LogManager.DEBUG_MESSAGE_LEVEL);
359: continue;
360: }
361:
362: JobAggregator aggregator = mJobAggregatorFactory
363: .loadInstance((SubInfo) l.get(0));
364: if (aggregator.entryNotInTC(key)) {
365: //no need to collapse one job. go to the next iteration
366: mLogger.log(
367: "\t No collapsing for execution pool because job aggregator entry not in tc "
368: + key, LogManager.DEBUG_MESSAGE_LEVEL);
369: continue;
370: }
371:
372: //checks made ensure that l is not empty at this point
373: cFactor = getCollapseFactor(key, (SubInfo) l.get(0), size);
374: if (cFactor[0] == 1 && cFactor[1] == 0) {
375: mLogger.log("\t Collapse factor of (" + cFactor[0]
376: + "," + cFactor[1] + ") determined for pool. "
377: + key + ". Skipping collapsing",
378: LogManager.DEBUG_MESSAGE_LEVEL);
379: continue;
380: }
381:
382: mLogger.log("\t Collapsing jobs at execution pool " + key
383: + " with collapse factor " + cFactor[0] + ","
384: + cFactor[1], LogManager.DEBUG_MESSAGE_LEVEL);
385:
386: //we do collapsing in chunks of 3 instead of picking up
387: //from the properties file. ceiling is (x + y -1)/y
388: //cFactor = (size + 2)/3;
389:
390: if (cFactor[0] >= size) {
391: //means collapse all the jobs in the list as a fat node
392: //Note: Passing a link to iterator might be more efficient, as
393: //this would only require a single traversal through the list
394: fatJob = aggregator.construct(l.subList(0, size), name,
395: constructID(partitionID, id));
396: updateReplacementTable(l.subList(0, size), fatJob);
397:
398: //increment the id
399: id++;
400: //add the fat job to the dag
401: //use the method to add, else add explicitly to DagInfo
402: mScheduledDAG.add(fatJob);
403:
404: //log the refiner action capturing the creation of the job
405: this .logRefinerAction(fatJob, aggregator);
406: } else {
407: //do collapsing in chunks of cFactor
408: int increment = 0;
409: for (int i = 0; i < size; i = i + increment) {
410: //compute the increment and decrement cFactor[1]
411: increment = (cFactor[1] > 0) ? cFactor[0] + 1
412: : cFactor[0];
413: cFactor[1]--;
414:
415: if (increment == 1) {
416: //we can exit out of the loop as we do not want
417: //any merging for single jobs
418: break;
419: } else if ((i + increment) < size) {
420: fatJob = aggregator.construct(l.subList(i, i
421: + increment), name, constructID(
422: partitionID, id));
423:
424: updateReplacementTable(l.subList(i, i
425: + increment), fatJob);
426: } else {
427: fatJob = aggregator.construct(l
428: .subList(i, size), name, constructID(
429: partitionID, id));
430: updateReplacementTable(l.subList(i, size),
431: fatJob);
432: }
433:
434: //increment the id
435: id++;
436:
437: //add the fat job to the dag
438: //use the method to add, else add explicitly to DagInfo
439: mScheduledDAG.add(fatJob);
440:
441: //log the refiner action capturing the creation of the job
442: this .logRefinerAction(fatJob, aggregator);
443: }
444: }
445:
446: }
447:
448: //explicity free the map
449: tempMap = null;
450: }
451:
452: /**
453: * Returns the clustered workflow.
454: *
455: * @return the <code>ADag</code> object corresponding to the clustered workflow.
456: *
457: * @throws ClustererException in case of error.
458: */
459: public ADag getClusteredDAG() throws ClustererException {
460: //do all the replacement of jobs in the main data structure
461: //that needs to be returned
462: replaceJobs();
463:
464: //should be in the done method. which is currently not htere in the
465: //Clusterer API
466: try {
467: mPPS.endWorkflowRefinementStep(this );
468: } catch (Exception e) {
469: throw new ClustererException(
470: "PASOA Exception while logging end of clustering refinement",
471: e);
472: }
473:
474: return mScheduledDAG;
475: }
476:
477: /**
478: * Returns a textual description of the transfer implementation.
479: *
480: * @return a short textual description
481: */
482: public String description() {
483: return this .DESCRIPTION;
484: }
485:
486: /**
487: * Records the refiner action into the Provenace Store as a XML fragment.
488: *
489: * @param clusteredJob the clustered job
490: * @param aggregator the aggregator that was used to create this clustered job
491: */
492: protected void logRefinerAction(AggregatedJob clusteredJob,
493: JobAggregator aggregator) {
494: StringBuffer sb = new StringBuffer();
495: String indent = "\t";
496: sb.append(indent);
497: sb.append("<clustered ");
498: appendAttribute(sb, "job", clusteredJob.getName());
499: appendAttribute(sb, "type", aggregator.getCollapserLFN());
500: sb.append(">").append("\n");
501:
502: //traverse through all the files
503: String newIndent = indent + "\t";
504: List jobs = new ArrayList();
505: for (Iterator it = clusteredJob.constituentJobsIterator(); it
506: .hasNext();) {
507: SubInfo job = (SubInfo) it.next();
508: jobs.add(job.getName());
509: sb.append(newIndent);
510: sb.append("<constitutent ");
511: appendAttribute(sb, "job", job.getName());
512: sb.append("/>");
513: sb.append("\n");
514: }
515: sb.append(indent);
516: sb.append("</clustered>");
517: sb.append("\n");
518:
519: //log the action for creating the relationship assertions
520: try {
521: mPPS.clusteringOf(clusteredJob.getName(), jobs);
522: } catch (Exception e) {
523: throw new RuntimeException(
524: "PASOA Exception while logging relationship assertion for clustering ",
525: e);
526: }
527:
528: mXMLStore.add(sb.toString());
529:
530: }
531:
532: /**
533: * Appends an xml attribute to the xml feed.
534: *
535: * @param xmlFeed the xmlFeed to which xml is being written
536: * @param key the attribute key
537: * @param value the attribute value
538: */
539: protected void appendAttribute(StringBuffer xmlFeed, String key,
540: String value) {
541: xmlFeed.append(key).append("=").append("\"").append(value)
542: .append("\" ");
543: }
544:
545: /**
546: * A callback that triggers the collapsing of a partition/level of a graph.
547: *
548: * @param partition the partition that needs to be collapsed.
549: *
550: */
551: /*
552: private void collapseJobs(Partition partition){
553: Set s = partition.getNodeIDs();
554: List l = new ArrayList(s.size());
555: mLogger.log("Collapsing jobs in partition " + partition.getID() +
556: " " + s,
557: LogManager.DEBUG_MESSAGE_LEVEL);
558:
559: for(Iterator it = s.iterator();it.hasNext();){
560: SubInfo job = (SubInfo)mSubInfoMap.get(it.next());
561: l.add(job);
562: }
563: //group the jobs by their transformation names
564: Collections.sort(l,jobComparator());
565: //traverse through the list and collapse jobs
566: //referring to same logical transformation
567: SubInfo previous = null;
568: List clusterList = new LinkedList();
569: SubInfo job = null;
570: for(Iterator it = l.iterator();it.hasNext();){
571: job = (SubInfo)it.next();
572: if(previous == null ||
573: job.getCompleteTCName().equals(previous.getCompleteTCName())){
574: clusterList.add(job);
575: }
576: else{
577: //at boundary collapse jobs
578: collapseJobs(previous.getStagedExecutableBaseName(),clusterList,partition.getID());
579: clusterList = new LinkedList();
580: clusterList.add(job);
581: }
582: previous = job;
583: }
584: //cluster the last clusterList
585: if(previous != null){
586: collapseJobs(previous.getStagedExecutableBaseName(), clusterList, partition.getID());
587: }
588:
589: //collapse the jobs in list l
590: // collapseJobs(job.logicalName,l,partition.getID());
591: }
592: */
593:
594: /**
595: * Returns the collapse factor, that is used to chunk up the jobs of a
596: * particular type on a pool. The collapse factor is determined by
597: * getting the collapse key in the VDS namespace/profile associated with the
598: * job in the transformation catalog. Right now tc overrides the property
599: * from the one in the properties file that specifies per pool.
600: * There are two orthogonal notions of bundling and collapsing. In case the
601: * bundle key is specified, it ends up overriding the collapse key, and
602: * the bundle value is used to generate the collapse values.
603: *
604: * @param pool the pool where the chunking up is occuring
605: * @param job the <code>SubInfo</code> object containing the job that
606: * is to be chunked up together.
607: * @param size the number of jobs that refer to the same logical
608: * transformation and are scheduled on the same execution pool.
609: *
610: * @return int array of size 2 where int[0] is the the collapse factor
611: * int[1] is the number of jobs for whom collapsing is int[0] + 1.
612: */
613: public int[] getCollapseFactor(String pool, SubInfo job, int size) {
614: String factor = null;
615: int result[] = new int[2];
616: result[1] = 0;
617:
618: //the job should have the collapse key from the TC if
619: //by the user specified
620: factor = (String) job.vdsNS.get(VDS.COLLAPSE_KEY);
621:
622: //ceiling is (x + y -1)/y
623: String bundle = (String) job.vdsNS.get(VDS.BUNDLE_KEY);
624: if (bundle != null) {
625: int b = Integer.parseInt(bundle);
626: result[0] = size / b;
627: result[1] = size % b;
628: return result;
629: //doing no boundary condition checks
630: //return (size + b -1)/b;
631: }
632:
633: //return the appropriate value
634: result[0] = (factor == null) ? ((factor = (String) mCollapseMap
635: .get(pool)) == null) ? this .DEFAULT_COLLAPSE_FACTOR : //the default value
636: Integer.parseInt(factor)//use the value in the prop file
637: :
638: //return the value found in the TC
639: Integer.parseInt(factor);
640: return result;
641:
642: }
643:
644: /**
645: * Given an integer id, returns a string id that is used for the clustered
646: * job.
647: *
648: * @param partitionID the id of the partition.
649: * @param id the integer id from which the string id has to be
650: * constructed. The id should be unique for all the
651: * clustered jobs that are formed for a particular
652: * partition.
653: *
654: * @return the id of the clustered job
655: */
656: public String constructID(String partitionID, int id) {
657: StringBuffer sb = new StringBuffer(8);
658: sb.append("P").append(partitionID).append("_");
659: sb.append("ID").append(id);
660:
661: return sb.toString();
662: }
663:
664: /**
665: * Updates the replacement table.
666: *
667: * @param jobs the List of jobs that is being replaced.
668: * @param mergedJob the mergedJob that is replacing the jobs in the list.
669: */
670: private void updateReplacementTable(List jobs, SubInfo mergedJob) {
671: if (jobs == null || jobs.isEmpty())
672: return;
673: String mergedJobName = mergedJob.jobName;
674: for (Iterator it = jobs.iterator(); it.hasNext();) {
675: SubInfo job = (SubInfo) it.next();
676: //put the entry in the replacement table
677: mReplacementTable.put(job.jobName, mergedJobName);
678: }
679:
680: }
681:
682: /**
683: * Puts the jobs in the abstract workflow into the job that is index
684: * by the logical name of the jobs.
685: */
686: private void assimilateJobs() {
687: Iterator it = mScheduledDAG.vJobSubInfos.iterator();
688: SubInfo job = null;
689: List l = null;
690: String key = null;
691:
692: while (it.hasNext()) {
693: job = (SubInfo) it.next();
694: key = job.logicalName;
695: //check if the job logical name is already in the map
696: if (mJobMap.containsKey(key)) {
697: //add the job to the corresponding list.
698: l = (List) mJobMap.get(key);
699: l.add(job);
700: } else {
701: //first instance of this logical name
702: l = new java.util.LinkedList();
703: l.add(job);
704: mJobMap.put(key, l);
705: }
706: }
707: }
708:
709: /**
710: * Constructs a map with the numbers/values for the collapsing factors to
711: * collapse the nodes of same type. The user ends up specifying these through
712: * the properties file. The value of the property is of the form
713: * poolname1=value,poolname2=value....
714: *
715: * @param propValue the value of the property got from the properties file.
716: *
717: * @return the constructed map.
718: */
719: private Map constructMap(String propValue) {
720: Map map = new java.util.TreeMap();
721:
722: if (propValue != null) {
723: StringTokenizer st = new StringTokenizer(propValue, ",");
724: while (st.hasMoreTokens()) {
725: String raw = st.nextToken();
726: int pos = raw.indexOf('=');
727: if (pos > 0) {
728: map.put(raw.substring(0, pos).trim(), raw
729: .substring(pos + 1).trim());
730: }
731: }
732: }
733:
734: return map;
735: }
736:
737: /**
738: * The relations/edges are changed in local graph structure.
739: */
740: private void replaceJobs() {
741: boolean val = false;
742: List l = null;
743: List nl = null;
744: SubInfo sub = new SubInfo();
745: String msg;
746:
747: for (Iterator it = mReplacementTable.entrySet().iterator(); it
748: .hasNext();) {
749: Map.Entry entry = (Map.Entry) it.next();
750: String key = (String) entry.getKey();
751: mLogger.log("Replacing job " + key + " with "
752: + entry.getValue(), LogManager.DEBUG_MESSAGE_LEVEL);
753: //remove the old job
754: //remove by just creating a subinfo object with the same key
755: sub.jobName = key;
756: val = mScheduledDAG.remove(sub);
757: if (val == false) {
758: throw new RuntimeException("Removal of job " + key
759: + " while clustering not successful");
760: }
761: }
762: mLogger.log("All clustered jobs removed from the workflow",
763: LogManager.DEBUG_MESSAGE_LEVEL);
764:
765: //Set mergedEdges = new java.util.HashSet();
766: //this is temp thing till the hast thing sorted out correctly
767: List mergedEdges = new java.util.ArrayList(
768: mScheduledDAG.vJobSubInfos.size());
769:
770: //traverse the edges and do appropriate replacements
771: String parent = null;
772: String child = null;
773: String value = null;
774: for (Iterator it = mScheduledDAG.dagInfo.relations.iterator(); it
775: .hasNext();) {
776: PCRelation rel = (PCRelation) it.next();
777: //replace the parent and child if there is a need
778: parent = rel.parent;
779: child = rel.child;
780:
781: msg = ("\n Replacing " + rel);
782:
783: value = (String) mReplacementTable.get(parent);
784: if (value != null) {
785: rel.parent = value;
786: }
787: value = (String) mReplacementTable.get(child);
788: if (value != null) {
789: rel.child = value;
790: }
791: msg += (" with " + rel);
792:
793: //put in the merged edges set
794: if (!mergedEdges.contains(rel)) {
795: val = mergedEdges.add(rel);
796: msg += "Add to set : " + val;
797: } else {
798: msg += "\t Duplicate Entry for " + rel;
799: }
800: mLogger.log(msg, LogManager.DEBUG_MESSAGE_LEVEL);
801: }
802:
803: //the final edges need to be updated
804: mScheduledDAG.dagInfo.relations = null;
805: mScheduledDAG.dagInfo.relations = new java.util.Vector(
806: mergedEdges);
807: }
808:
809: /**
810: * A utility method to print short description of jobs in a list.
811: *
812: * @param l the list of <code>SubInfo</code> objects
813: */
814: private void printList(List l) {
815: for (Iterator it = l.iterator(); it.hasNext();) {
816: SubInfo job = (SubInfo) it.next();
817: System.out.print(" " + /*job.getCompleteTCName() +*/
818: "[" + job.logicalId + "]");
819: }
820:
821: }
822:
823: /**
824: * A job comparator, that allows me to compare jobs according to the
825: * transformation names. It is applied to group jobs in a particular partition,
826: * according to the underlying transformation that is referred.
827: * <p>
828: * This comparator is not consistent with the SubInfo.equals(Object) method.
829: * Hence, should not be used in sorted sets or Maps.
830: */
831: private class JobComparator implements Comparator {
832:
833: /**
834: * Compares this object with the specified object for order. Returns a
835: * negative integer, zero, or a positive integer if the first argument is
836: * less than, equal to, or greater than the specified object. The
837: * SubInfo are compared by their transformation name.
838: *
839: * This implementation is not consistent with the
840: * SubInfo.equals(Object) method. Hence, should not be used in sorted
841: * Sets or Maps.
842: *
843: * @param o1 is the first object to be compared.
844: * @param o2 is the second object to be compared.
845: *
846: * @return a negative number, zero, or a positive number, if the
847: * object compared against is less than, equals or greater than
848: * this object.
849: * @exception ClassCastException if the specified object's type
850: * prevents it from being compared to this Object.
851: */
852: public int compare(Object o1, Object o2) {
853: if (o1 instanceof SubInfo && o2 instanceof SubInfo) {
854: return ((SubInfo) o1).getCompleteTCName().compareTo(
855: ((SubInfo) o2).getCompleteTCName());
856:
857: } else {
858: throw new ClassCastException(
859: "Objects being compared are not SubInfo");
860: }
861: }
862: }
863:
864: }
|