001: /*
002: * This file or a portion of this file is licensed under the terms of
003: * the Globus Toolkit Public License, found in file GTPL, or at
004: * http://www.globus.org/toolkit/download/license.html. This notice must
005: * appear in redistributions of this file, with or without modification.
006: *
007: * Redistributions of this Software, with or without modification, must
008: * reproduce the GTPL in: (1) the Software, or (2) the Documentation or
009: * some other similar material which is provided with the Software (if
010: * any).
011: *
012: * Copyright 1999-2004 University of Chicago and The University of
013: * Southern California. All rights reserved.
014: */
015:
016: package org.griphyn.cPlanner.engine;
017:
018: import org.griphyn.cPlanner.classes.ADag;
019: import org.griphyn.cPlanner.classes.PegasusBag;
020: import org.griphyn.cPlanner.classes.PCRelation;
021: import org.griphyn.cPlanner.classes.PlannerOptions;
022: import org.griphyn.cPlanner.classes.SubInfo;
023:
024: import org.griphyn.cPlanner.partitioner.Partitioner;
025: import org.griphyn.cPlanner.partitioner.ClustererCallback;
026:
027: import org.griphyn.cPlanner.partitioner.graph.GraphNode;
028:
029: import org.griphyn.cPlanner.cluster.ClustererFactory;
030: import org.griphyn.cPlanner.cluster.Clusterer;
031: import org.griphyn.cPlanner.cluster.ClustererException;
032:
033: import org.griphyn.cPlanner.common.LogManager;
034: import org.griphyn.cPlanner.common.PegasusProperties;
035:
036: import org.griphyn.cPlanner.parser.dax.DAX2LabelGraph;
037:
038: import java.util.Map;
039: import java.util.HashMap;
040: import java.util.Vector;
041: import java.util.Iterator;
042: import java.util.List;
043: import java.util.StringTokenizer;
044:
045: /**
046: * This collapses the nodes of the same logical name scheduled on the same
047: * pool into fewer fat nodes. The idea behind this is to collapse jobs that
048: * take a few seconds to run into a larger job, and hence reducing time because
049: * of lesser delays due to lesser number of Condor Globus interactions.
050: * Note that the merging of the edges for the jobs being collapsed at present, is
051: * not the best implementation. Once the graph structure is correct , it would
052: * be modified.
053: *
054: * @author Karan Vahi vahi@isi.edu
055: * @author Mei-Hui Su mei@isi.edu
056: *
057: * @version $Revision: 450 $
058: */
059: public class NodeCollapser extends Engine {
060:
061: /**
062: * The handle to the logger object.
063: */
064: protected LogManager mLogger;
065:
066: /**
067: * The directory, where the stdin file of the fat jobs are created.
068: * It should be the submit file directory that the user mentions at
069: * runtime.
070: */
071: private String mDirectory;
072:
073: /**
074: * The internal map that contains the adjacency list representation of the
075: * Graph referred to by the workflow. This is temporary till the main ADag
076: * data structure is corrected.
077: */
078: private Map mGraph;
079:
080: /**
081: * The bag of initialization objects.
082: */
083: private PegasusBag mBag;
084:
085: /**
086: * The overloaded constructor.
087: *
088: * @param bag the bag of initialization objects.
089: *
090: */
091: public NodeCollapser(PegasusBag bag) {
092: super (bag.getPegasusProperties());
093: mBag = bag;
094: mLogger = bag.getLogger();
095: mGraph = new HashMap();
096: mPOptions = bag.getPlannerOptions();
097: setDirectory(mPOptions.getSubmitDirectory());
098: }
099:
100: /**
101: * Sets the directory where the stdin files are to be generated.
102: *
103: * @param directory the path to the directory to which it needs to be set.
104: */
105: public void setDirectory(String directory) {
106: mDirectory = (directory == null) ?
107: //user did not specify a submit file dir
108: //use the default i.e current directory
109: "."
110: :
111: //user specified directory picked up
112: directory;
113:
114: }
115:
116: /**
117: * Clusters the jobs in the workflow. It applies a series of clustering
118: * actions on the graph, as specified by the user at runtime.
119: *
120: * For each clustering action, the graph is first partitioned,
121: * and then sent to the appropriate clustering module for clustering.
122: *
123: * @param dag the scheduled dag that has to be clustered.
124: *
125: * @return ADag containing the collapsed scheduled workflow.
126: *
127: * @throws ClustererException in case of error while clustering
128: */
129: public ADag cluster(ADag dag) throws ClustererException {
130: //load the appropriate partitioner and clusterer
131: String types = mPOptions.getClusteringTechnique();
132:
133: //sanity check
134: if (types == null) {
135: //return the orginal DAG only
136: mLogger
137: .log(
138: "No clustering actions specified. Returning orginal DAG",
139: LogManager.DEBUG_MESSAGE_LEVEL);
140: return dag;
141: }
142:
143: //tokenize and get the types
144: ADag clusteredDAG = dag;
145: for (StringTokenizer st = new StringTokenizer(types, ","); st
146: .hasMoreTokens();) {
147: clusteredDAG = this .cluster(clusteredDAG, st.nextToken());
148: }
149:
150: return clusteredDAG;
151: }
152:
153: /**
154: * Clusters the jobs in the workflow. The graph is first partitioned,
155: * and then sent to the appropriate clustering module for clustering.
156: *
157: * @param dag the scheduled dag that has to be clustered.
158: * @param type the type of clustering to do.
159: *
160: * @return ADag containing the collapsed scheduled workflow.
161: *
162: * @throws ClustererException in case of error while clustering
163: */
164: public ADag cluster(ADag dag, String type)
165: throws ClustererException {
166: //convert the graph representation to a
167: //more manageable and traversal data structure that is sent
168: //to the partitioning stuff
169: Map nameIDMap = new HashMap();
170: SubInfo job;
171: for (Iterator it = dag.vJobSubInfos.iterator(); it.hasNext();) {
172: //pass the jobs to the callback
173: job = (SubInfo) it.next();
174: nameIDMap.put(job.getName(), job.getLogicalID());
175: }
176: mGraph = edgeList2Graph(dag.dagInfo.relations, nameIDMap);
177:
178: //we need to build up a partitioner graph structure to do
179: //the partitioning on the graph. Use the callback mechanism
180: //developed for the partiotioner stuff and populate it
181: //from the exisiting graph structure
182: DAX2LabelGraph d2g = new DAX2LabelGraph(mProps, mPOptions
183: .getDAX());
184:
185: //set the appropriate key that is to be used for picking up the labels
186: d2g.setLabelKey(mProps.getClustererLabelKey());
187:
188: //no need to pass any attributes
189: d2g.cbDocument(null);
190: for (Iterator it = dag.vJobSubInfos.iterator(); it.hasNext();) {
191: //pass the jobs to the callback
192: d2g.cbJob((SubInfo) it.next());
193: }
194: //pass the relations
195: for (Iterator it = mGraph.entrySet().iterator(); it.hasNext();) {
196: Map.Entry entry = (Map.Entry) it.next();
197: d2g.cbParents((String) entry.getKey(), (List) entry
198: .getValue());
199: }
200: //finished populating
201: d2g.cbDone();
202: //get the graph map
203: mGraph = (Map) d2g.getConstructedObject();
204:
205: //get the fake dummy root node
206: GraphNode root = (GraphNode) mGraph
207: .get(DAX2LabelGraph.DUMMY_NODE_ID);
208:
209: Partitioner p = ClustererFactory.loadPartitioner(mProps, type,
210: root, mGraph);
211: mLogger.log("Partitioner loaded is " + p.description(),
212: LogManager.CONFIG_MESSAGE_LEVEL);
213:
214: Clusterer c = ClustererFactory.loadClusterer(dag, mBag, type);
215:
216: mLogger.log("Clusterer loaded is " + c.description(),
217: LogManager.CONFIG_MESSAGE_LEVEL);
218: ClustererCallback cb = new ClustererCallback();
219: cb.initialize(mProps, c);
220:
221: //start the partitioner and let the fun begin!
222: p.determinePartitions(cb);
223:
224: return c.getClusteredDAG();
225: }
226:
227: /**
228: * Returns an adjacency list representation of the graph referred to by
229: * the list of edges. The map contains adjacency list with key as a child
230: * and value as the list of parents.
231: *
232: * @param relations vector of <code>PCRelation</code> objects that does
233: * the conversion.
234: * @param nameIDMap map with the key as the jobname and value as the
235: * logical id
236: *
237: * @return Map.
238: */
239: protected Map edgeList2Graph(Vector relations, Map nameIDMap) {
240: Map map = new HashMap();
241: List l = null;
242:
243: for (Iterator it = relations.iterator(); it.hasNext();) {
244: PCRelation rel = (PCRelation) it.next();
245: if (map.containsKey(nameIDMap.get(rel.child))) {
246: l = (List) map.get(nameIDMap.get(rel.child));
247: l.add(nameIDMap.get(rel.parent));
248: } else {
249: l = new java.util.LinkedList();
250: l.add(nameIDMap.get(rel.parent));
251: map.put(nameIDMap.get(rel.child), l);
252: }
253: }
254:
255: return map;
256: }
257:
258: }//end of NodeCollapser
|