001: /*
002: * This file or a portion of this file is licensed under the terms of
003: * the Globus Toolkit Public License, found in file GTPL, or at
004: * http://www.globus.org/toolkit/download/license.html. This notice must
005: * appear in redistributions of this file, with or without modification.
006: *
007: * Redistributions of this Software, with or without modification, must
008: * reproduce the GTPL in: (1) the Software, or (2) the Documentation or
009: * some other similar material which is provided with the Software (if
010: * any).
011: *
012: * Copyright 1999-2004 University of Chicago and The University of
013: * Southern California. All rights reserved.
014: */
015: package org.griphyn.cPlanner.partitioner;
016:
017: import org.griphyn.cPlanner.common.LogManager;
018: import org.griphyn.cPlanner.common.PegasusProperties;
019:
020: import org.griphyn.cPlanner.partitioner.graph.GraphNode;
021: import org.griphyn.cPlanner.partitioner.graph.Bag;
022: import org.griphyn.cPlanner.partitioner.graph.LabelBag;
023:
024: import java.util.Collections;
025: import java.util.Comparator;
026: import java.util.Iterator;
027: import java.util.LinkedList;
028: import java.util.List;
029: import java.util.ArrayList;
030: import java.util.Map;
031: import java.util.HashMap;
032: import java.util.Set;
033: import java.util.HashSet;
034:
035: /**
036: * Horizontal based partitioning scheme, that allows the user to configure the
037: * number of partitions per transformation name per level.
038: * To set the size of the partition per transformation, the following properties
039: * need to be set
040: * <pre>
041: * pegasus.partitioner.horizontal.collapse.[txName]
042: * pegasus.partitioner.horizontal.bundle.[txName]
043: * </pre>
044: *
045: * The bundle value designates the number of partitions per transformation per level.
046: * The collapse values designates the number of nodes in a partitioning referring
047: * to a particular transformation. If both are specified, then bundle value takes
048: * precedence.
049: *
050: * @author Karan Vahi
051: * @version $Revision: 50 $
052: */
053: public class Horizontal extends BFS {
054:
055: /**
056: * A short description about the partitioner.
057: */
058: public static final String DESCRIPTION = "Configurable Level Based Partitioning";
059:
060: /**
061: * The default collapse factor for collapsing jobs with same logical name
062: * scheduled onto the same execution pool.
063: */
064: public static final int DEFAULT_COLLAPSE_FACTOR = 3;
065:
066: /**
067: * A map indexed by the partition ID. Each value is a partition object.
068: */
069: private Map mPartitionMap;
070:
071: /**
072: * A static instance of GraphNode comparator.
073: */
074: private GraphNodeComparator mNodeComparator;
075:
076: /**
077: * The global counter that is used to assign ID's to the partitions.
078: */
079: private int mIDCounter;
080:
081: /**
082: * Singleton access to the job comparator.
083: *
084: * @return the job comparator.
085: */
086: private Comparator nodeComparator() {
087: return (mNodeComparator == null) ? new GraphNodeComparator()
088: : mNodeComparator;
089: }
090:
091: /**
092: * The overloaded constructor.
093: *
094: * @param root the dummy root node of the graph.
095: * @param graph the map containing all the nodes of the graph keyed by
096: * the logical id of the nodes.
097: * @param properties the properties passed to the planner.
098: */
099: public Horizontal(GraphNode root, Map graph,
100: PegasusProperties properties) {
101: super (root, graph, properties);
102: mIDCounter = 0;
103: mPartitionMap = new HashMap(10);
104: }
105:
106: /**
107: * Returns a textual description of the partitioner implementation.
108: *
109: * @return a short textual description
110: */
111: public String description() {
112: return this .DESCRIPTION;
113: }
114:
115: /**
116: * Given a list of jobs, constructs (one or more) partitions out of it.
117: * Calls out to the partitioner callback, for each of the partitions
118: * constructed.
119: *
120: * @param c the parititoner callback
121: * @param nodes the list of <code>GraphNode</code> objects on a particular level.
122: * @param level the level as determined from the root of the workflow.
123: */
124: protected void constructPartitions(Callback c, List nodes, int level) {
125: //group the nodes by their logical names
126: Collections.sort(nodes, nodeComparator());
127: //traverse through the list and collapse jobs
128: //referring to same logical transformation
129: GraphNode previous = null;
130: List clusterList = new LinkedList();
131: GraphNode node = null;
132:
133: for (Iterator it = nodes.iterator(); it.hasNext();) {
134: node = (GraphNode) it.next();
135: if (previous == null
136: || node.getName().equals(previous.getName())) {
137: clusterList.add(node);
138: } else {
139: //at boundary collapse jobs
140: constructPartitions(c, clusterList, level, previous
141: .getName());
142: clusterList = new LinkedList();
143: clusterList.add(node);
144: }
145: previous = node;
146: }
147: //cluster the last clusterList
148: if (previous != null) {
149: constructPartitions(c, clusterList, level, previous
150: .getName());
151: }
152:
153: }
154:
155: /**
156: * Given a list of jobs, constructs (one or more) partitions out of it.
157: * Calls out to the partitioner callback, for each of the partitions
158: * constructed.
159: *
160: * @param c the parititoner callback
161: * @param nodes the list of <code>GraphNode</code> objects on a particular level,
162: * referring to the same transformation underneath.
163: * @param level the level as determined from the root of the workflow.
164: * @param name the transformation name
165: */
166: protected void constructPartitions(Callback c, List nodes,
167: int level, String name) {
168: //figure out number of jobs that go into one partition
169: int[] cFactor = new int[2];
170: cFactor[0] = 0;
171: cFactor[1] = 0;
172:
173: int size = nodes.size();
174: cFactor = this .getCollapseFactor(name, size);
175:
176: StringBuffer message = new StringBuffer();
177:
178: if (cFactor[0] == 0 && cFactor[1] == 0) {
179: message.append("\t Collapse factor of ").append(cFactor[0])
180: .append(",").append(cFactor[1]).append(
181: " determined for transformation ").append(
182: name);
183: mLogger.log(message.toString(),
184: LogManager.DEBUG_MESSAGE_LEVEL);
185: return;
186: }
187:
188: message.append("Partitioning jobs of type ").append(name)
189: .append(" at level ").append(level).append(
190: " wth collapse factor ").append(cFactor[0])
191: .append(",").append(cFactor[1]);
192:
193: mLogger.log(message.toString(), LogManager.DEBUG_MESSAGE_LEVEL);
194:
195: Partition p;
196: if (cFactor[0] >= size) {
197: //means put all the nodes in one partition
198: //we want to ignore the dummy node partition
199: p = createPartition(nodes);
200: c.cbPartition(p);
201: } else {
202: //do collapsing in chunks of cFactor
203: int increment = 0;
204: int toIndex;
205: for (int i = 0; i < size; i = i + increment) {
206: //compute the increment and decrement cFactor[1]
207: increment = (cFactor[1] > 0) ? cFactor[0] + 1
208: : cFactor[0];
209: cFactor[1]--;
210:
211: //determine the toIndex for creating the partition
212: toIndex = ((i + increment) < size) ? i + increment
213: : size;
214:
215: p = createPartition(nodes.subList(i, toIndex));
216: c.cbPartition(p);
217: }
218: }
219:
220: }
221:
222: /**
223: * Calls out to the callback with appropriate relations between the partitions
224: * constructed for the levels. This is an empty implementation, as we
225: * do our own book-keeping in this partitioner to determine the relations
226: * between the partitions.
227: *
228: * @param c the parititoner callback
229: * @param parent the parent level
230: * @param child the child level.
231: *
232: * @see #done( Callback )
233: */
234: protected void constructLevelRelations(Callback c, int parent,
235: int child) {
236:
237: }
238:
239: /**
240: * Indicates that we are done with the traversal of the graph. Determines
241: * the relations between the partitions constructed and calls out to the
242: * appropriate callback function
243: *
244: * @param c the partitioner callback
245: */
246: protected void done(Callback c) {
247: GraphNode node;
248: GraphNode parent;
249:
250: mLogger.log("Determining relations between partitions",
251: LogManager.INFO_MESSAGE_LEVEL);
252: //construct the relations
253: for (Iterator it = mPartitionMap.entrySet().iterator(); it
254: .hasNext();) {
255: Map.Entry entry = (Map.Entry) it.next();
256: Partition p = (Partition) entry.getValue();
257: List roots = p.getRootNodes();
258: Set parentPartitions = new HashSet(roots.size());
259:
260: //get the Root nodes for each partition and
261: //for each root, determine the partitions of it's parents
262: for (Iterator rootIt = roots.iterator(); rootIt.hasNext();) {
263: node = (GraphNode) rootIt.next();
264: for (Iterator parentsIt = node.getParents().iterator(); parentsIt
265: .hasNext();) {
266: parent = (GraphNode) parentsIt.next();
267: //the parents partition id is parent for the
268: //partition containing the root
269: parentPartitions.add(parent.getBag().get(
270: LabelBag.PARTITION_KEY));
271: }
272: }
273: //write out all the parents of the partition
274: if (!parentPartitions.isEmpty()) {
275: c.cbParents(p.getID(), new ArrayList(parentPartitions));
276: }
277: }
278: mLogger.logCompletion(
279: "Determining relations between partitions",
280: LogManager.INFO_MESSAGE_LEVEL);
281:
282: //done with the partitioning
283: c.cbDone();
284: }
285:
286: /**
287: * Returns the collapse factor, that is used to determine the number of nodes
288: * going in a partition. The collapse factor is determined by
289: * getting the collapse and the bundle values specified for the transformations
290: * in the properties file.
291: *
292: * There are two orthogonal notions of bundling and collapsing. In case the
293: * bundle key is specified, it ends up overriding the collapse key, and
294: * the bundle value is used to generate the collapse values.
295: *
296: * If both are not specified or null, then collapseFactor is set to size.
297: *
298: * @param txName the logical transformation name
299: * @param size the number of jobs that refer to the same logical
300: * transformation and are scheduled on the same execution pool.
301: *
302: * @return int array of size 2 where :-
303: * int[0] is the the collapse factor (number of nodes in a partition)
304: * int[1] is the number of parititons for whom collapsing is int[0] + 1.
305: */
306: protected int[] getCollapseFactor(String txName, int size) {
307: String factor = null;
308: String bundle = null;
309: int result[] = new int[2];
310: result[1] = 0;
311:
312: //the job should have the collapse key from the TC if
313: //by the user specified
314: try {
315: //ceiling is (x + y -1)/y
316: bundle = mProps.getHorizontalPartitionerBundleValue(txName);
317: if (bundle != null) {
318: int b = Integer.parseInt(bundle);
319: result[0] = size / b;
320: result[1] = size % b;
321: return result;
322: //doing no boundary condition checks
323: //return (size + b -1)/b;
324: }
325:
326: factor = mProps
327: .getHorizontalPartitionerCollapseValue(txName);
328: //return the appropriate value
329: result[0] = (factor == null) ? size : //then collapse factor is same as size
330: Integer.parseInt(factor); //use the value in the prop file
331: } catch (NumberFormatException e) {
332: //set bundle to size
333: StringBuffer error = new StringBuffer();
334:
335: if (factor == null) {
336: error.append("Bundle value (").append(bundle).append(
337: ")");
338: } else {
339: error.append("Collapse value (").append(factor).append(
340: ")");
341: }
342: error.append(" for transformation ").append(txName).append(
343: " is not a number");
344: mLogger.log(error.toString(),
345: LogManager.DEBUG_MESSAGE_LEVEL);
346: result[0] = size;
347: }
348: return result;
349:
350: }
351:
352: /**
353: * Creates a partition out of a list of nodes. Also stores it in the internal
354: * partition map to track partitions later on. Associates the partition ID
355: * with each of the nodes making the partition also.
356: *
357: * @param nodes the list of <code>GraphNodes</code> making the partition.
358: *
359: * @return the partition out of those nodes.
360: */
361: protected Partition createPartition(List nodes) {
362: //increment the ID counter before getting the ID
363: this .incrementIDCounter();
364: String id = getPartitionID(this .idCounter());
365: Partition p = new Partition(nodes, id);
366: p.setIndex(this .idCounter());
367: p.constructPartition();
368:
369: mPartitionMap.put(p.getID(), p);
370:
371: //associate the ID with all the nodes
372: for (Iterator it = nodes.iterator(); it.hasNext();) {
373: GraphNode node = (GraphNode) it.next();
374: Bag b = new LabelBag();
375: b.add(LabelBag.PARTITION_KEY, id);
376: node.setBag(b);
377: }
378:
379: //log a message
380: StringBuffer message = new StringBuffer();
381: message.append("Partition ").append(p.getID()).append(" is :")
382: .append(p.getNodeIDs());
383: mLogger.log(message.toString(), LogManager.DEBUG_MESSAGE_LEVEL);
384:
385: return p;
386: }
387:
388: /**
389: * Increments the ID counter by 1.
390: */
391: private void incrementIDCounter() {
392: mIDCounter++;
393: }
394:
395: /**
396: * Returns the current value of the ID counter.
397: */
398: private int idCounter() {
399: return mIDCounter;
400: }
401:
402: /**
403: * Constructs the id for the partition.
404: *
405: * @param id an integer ID.
406: *
407: * @return the ID for the Partition.
408: */
409: private String getPartitionID(int id) {
410: StringBuffer sb = new StringBuffer(5);
411: sb.append("ID").append(id);
412: return sb.toString();
413: }
414:
415: /**
416: * A GraphNode comparator, that allows me to compare nodes according to the
417: * transformation logical names. It is applied to group jobs in a particular partition,
418: * according to the underlying transformation that is referred.
419: *
420: */
421: private class GraphNodeComparator implements Comparator {
422:
423: /**
424: * Compares this object with the specified object for order. Returns a
425: * negative integer, zero, or a positive integer if the first argument is
426: * less than, equal to, or greater than the specified object. The
427: * SubInfo are compared by their transformation name.
428: *
429: * This implementation is not consistent with the
430: * SubInfo.equals(Object) method. Hence, should not be used in sorted
431: * Sets or Maps.
432: *
433: * @param o1 is the first object to be compared.
434: * @param o2 is the second object to be compared.
435: *
436: * @return a negative number, zero, or a positive number, if the
437: * object compared against is less than, equals or greater than
438: * this object.
439: * @exception ClassCastException if the specified object's type
440: * prevents it from being compared to this Object.
441: */
442: public int compare(Object o1, Object o2) {
443: if (o1 instanceof GraphNode && o2 instanceof GraphNode) {
444: return ((GraphNode) o1).getName().compareTo(
445: ((GraphNode) o2).getName());
446:
447: } else {
448: throw new ClassCastException(
449: "Objects being compared are not GraphNode");
450: }
451: }
452: }
453:
454: }
|