001: /**
002: * This file or a portion of this file is licensed under the terms of
003: * the Globus Toolkit Public License, found in file GTPL, or at
004: * http://www.globus.org/toolkit/download/license.html. This notice must
005: * appear in redistributions of this file, with or without modification.
006: *
007: * Redistributions of this Software, with or without modification, must
008: * reproduce the GTPL in: (1) the Software, or (2) the Documentation or
009: * some other similar material which is provided with the Software (if
010: * any).
011: *
012: * Copyright 1999-2004 University of Chicago and The University of
013: * Southern California. All rights reserved.
014: */package org.griphyn.cPlanner.cluster;
015:
016: import org.griphyn.cPlanner.classes.ADag;
017: import org.griphyn.cPlanner.classes.SubInfo;
018: import org.griphyn.cPlanner.classes.AggregatedJob;
019: import org.griphyn.cPlanner.classes.PCRelation;
020:
021: import org.griphyn.cPlanner.common.PegasusProperties;
022: import org.griphyn.cPlanner.common.LogManager;
023:
024: import org.griphyn.cPlanner.cluster.JobAggregator;
025:
026: import org.griphyn.cPlanner.cluster.aggregator.JobAggregatorInstanceFactory;
027:
028: import org.griphyn.cPlanner.partitioner.Partition;
029:
030: import java.util.Collection;
031: import java.util.Vector;
032: import java.util.List;
033: import java.util.ArrayList;
034: import java.util.Map;
035: import java.util.HashMap;
036: import java.util.Iterator;
037: import java.util.Set;
038: import org.griphyn.cPlanner.classes.PegasusBag;
039:
040: /**
041: * An abstract clusterer that the other clusterers can extend. The abstract
042: * implementation treats each partition as a single cluster. It has callouts
043: * to determine the ordering of the jobs in the cluster, and the input/output
044: * files for the clustered jobs.
045: *
046: * @author Karan Vahi
047: * @version $Revision: 450 $
048: */
049:
050: public abstract class Abstract implements Clusterer {
051:
052: /**
053: * A Map to store all the job(SubInfo) objects indexed by their logical ID found in
054: * the dax. This should actually be in the ADag structure.
055: */
056: protected Map mSubInfoMap;
057:
058: /**
059: * A Map that indexes the partition ID to the name of clustered job.
060: */
061: protected Map mPartitionClusterMap;
062:
063: /**
064: * The handle to the logger object.
065: */
066: protected LogManager mLogger;
067:
068: /**
069: * The handle to the properties object holding all the properties.
070: */
071: protected PegasusProperties mProps;
072:
073: /**
074: * The handle to the job aggregator factory.
075: */
076: protected JobAggregatorInstanceFactory mJobAggregatorFactory;
077:
078: /**
079: * The collection of relations, that is constructed for the clustered
080: * workflow.
081: */
082: protected Collection mClusteredRelations;
083:
084: /**
085: * ADag object containing the jobs that have been scheduled by the site
086: * selector.
087: */
088: protected ADag mScheduledDAG;
089:
090: /**
091: * The Abstract constructor.
092: */
093: public Abstract() {
094: mLogger = LogManager.getInstance();
095: mJobAggregatorFactory = new JobAggregatorInstanceFactory();
096: }
097:
098: /**
099: * Returns the nodes in the partition as a List in a particular order.
100: * The iterator of the list returns the nodes in the order determined by
101: * the clusterer.
102: *
103: * @param p the partition whose nodes have to be ordered.
104: *
105: * @return an ordered List of <code>String</code> objects that are the ID's
106: * of the nodes.
107: *
108: * @throws ClustererException in case of error.
109: */
110: public abstract List order(Partition p) throws ClustererException;
111:
112: /**
113: * Determine the input and output files of the job on the basis of the
114: * order of the constituent jobs in the AggregatedJob.
115: *
116: * @param job the <code>AggregatedJob</code>
117: *
118: * @throws ClustererException in case of error.
119: */
120: public abstract void determineInputOutputFiles(AggregatedJob job);
121:
122: /*{
123: //by default we do not care about order
124: List l = new ArrayList( p.getNodeIDs().size() );
125: for( Iterator it = p.getNodeIDs().iterator(); it.hasNext();){
126: l.add( it.next() );
127: }
128: return l;
129: }
130: */
131:
132: /**
133: *Initializes the Clusterer impelementation
134: *
135: * @param dag the workflow that is being clustered.
136: * @param bag the bag of objects that is useful for initialization.
137: *
138: * @throws ClustererException in case of error.
139: */
140: public void initialize(ADag dag, PegasusBag bag)
141: throws ClustererException {
142:
143: mScheduledDAG = dag;
144: mProps = bag.getPegasusProperties();
145: mJobAggregatorFactory.initialize(dag, bag);
146:
147: mClusteredRelations = new Vector(
148: dag.dagInfo.relations.size() / 2);
149:
150: mSubInfoMap = new HashMap(dag.vJobSubInfos.size());
151: mPartitionClusterMap = new HashMap();
152:
153: for (Iterator it = mScheduledDAG.vJobSubInfos.iterator(); it
154: .hasNext();) {
155: SubInfo job = (SubInfo) it.next();
156: addJob(job);
157: }
158: }
159:
160: /**
161: * It creates a single clustered job for the partition. If there is only
162: * one job in the partition, then no clustering happens.
163: *
164: * @param partition the partition for which the clusters need to be
165: * determined.
166: *
167: * @throws ClustererException if the clustering executable is not installed
168: * on the remote site or if all the jobs in the partition are not
169: * scheduled on the same site.
170: */
171: public void determineClusters(Partition partition)
172: throws ClustererException {
173: String pID = partition.getID();
174:
175: //do the ordering on the partition as required.
176: List nodes = order(partition);
177:
178: List l = new ArrayList(nodes.size());
179:
180: mLogger.log(
181: "Clustering jobs in partition " + pID + " " + nodes,
182: LogManager.DEBUG_MESSAGE_LEVEL);
183:
184: String prevSite = null;
185: String currSite = null;
186: for (Iterator it = nodes.iterator(); it.hasNext();) {
187: SubInfo job = (SubInfo) mSubInfoMap.get(it.next());
188: currSite = job.getSiteHandle();
189: l.add(job);
190:
191: //sanity check to ensure jobs are scheduled on same site.
192: if (prevSite == null || currSite.equals(prevSite)) {
193: prevSite = currSite;
194: continue;
195: } else {
196: throw new ClustererException("Jobs in the partition "
197: + partition.getID()
198: + " not scheduled on the same site!");
199: }
200:
201: }
202:
203: int size = l.size();
204: SubInfo firstJob = (SubInfo) l.get(0);
205:
206: // System.out.println( " Job to be clustered is " + firstJob);
207:
208: if (size == 1) {
209: //no need to collapse one job. go to the next iteration
210: mLogger.log("\t No clustering for partition " + pID,
211: LogManager.DEBUG_MESSAGE_LEVEL);
212: associate(partition, firstJob);
213: return;
214: }
215:
216: //do the ordering of the list
217:
218: JobAggregator aggregator = mJobAggregatorFactory
219: .loadInstance(firstJob);
220: if (aggregator.entryNotInTC(currSite)) {
221: throw new ClustererException(
222: "No installed aggregator executable found for partition "
223: + pID + " at site " + currSite);
224: }
225:
226: AggregatedJob clusteredJob = aggregator.construct(l, firstJob
227: .getStagedExecutableBaseName(), this
228: .constructClusteredJobID(partition));
229:
230: //replace the jobs in the partition with the clustered job
231: //in the original workflow
232: for (Iterator it = l.iterator(); it.hasNext();) {
233: SubInfo job = (SubInfo) it.next();
234: mLogger.log("Replacing job " + job.getName() + " with "
235: + clusteredJob.getName(),
236: LogManager.DEBUG_MESSAGE_LEVEL);
237:
238: //remove the old job
239: if (!mScheduledDAG.remove(job)) {
240: String msg = "Removal of job " + job.getName()
241: + " while clustering not successful";
242: throw new ClustererException(msg);
243: }
244: }
245:
246: //get the correct input and output files for the job
247: this .determineInputOutputFiles(clusteredJob);
248:
249: //System.out.println(" Clustered Job is " + clusteredJob );
250:
251: mScheduledDAG.add(clusteredJob);
252:
253: associate(partition, clusteredJob);
254: }
255:
256: /**
257: * Associates the relations between the partitions with the corresponding
258: * relations between the clustered jobs that are created for each Partition.
259: *
260: * @param partitionID the id of a partition.
261: * @param parents the list of <code>String</code> objects that contain
262: * the id's of the parents of the partition.
263: *
264: * @throws ClustererException in case of clustered job not being found for a partition.
265: */
266: public void parents(String partitionID, List parents)
267: throws ClustererException {
268: String error = "No cluster job for partition ";
269: SubInfo clusteredNode = clusteredJob(partitionID);
270: SubInfo parentClusteredNode;
271:
272: //throw error if not found
273: if (clusteredNode == null) {
274: throw new ClustererException(error + partitionID);
275: }
276:
277: for (Iterator it = parents.iterator(); it.hasNext();) {
278: String parent = (String) it.next();
279: parentClusteredNode = clusteredJob(parent);
280:
281: //throw error if not found
282: if (clusteredNode == null) {
283: throw new ClustererException(error + parent);
284: }
285:
286: //add a relation between these clustered jobs
287: mClusteredRelations.add(new PCRelation(parentClusteredNode
288: .getName(), clusteredNode.getName()));
289: }
290:
291: }
292:
293: /**
294: * Returns the clustered workflow.
295: *
296: * @return the <code>ADag</code> object corresponding to the clustered workflow.
297: *
298: * @throws ClustererException in case of error.
299: */
300: public ADag getClusteredDAG() throws ClustererException {
301: //replace the relations of the original DAG and return
302: mScheduledDAG.dagInfo.relations = null;
303: mScheduledDAG.dagInfo.relations = (Vector) mClusteredRelations;
304:
305: return mScheduledDAG;
306: }
307:
308: /**
309: * Returns the ID for the clustered job corresponding to a partition.
310: *
311: * @param partition the partition.
312: *
313: * @return the ID of the clustered job
314: */
315: protected String constructClusteredJobID(Partition partition) {
316: return partition.getID();
317: }
318:
319: /**
320: * Adds jobs to the internal map of jobs that is maintained by the clusterer.
321: *
322: * @param job the job being added
323: */
324: protected void addJob(SubInfo job) {
325: mSubInfoMap.put(job.getLogicalID(), job);
326: }
327:
328: /**
329: * Returns the job object corresponding to the id of the job.
330: *
331: * @param id the id of the job
332: *
333: * @return the corresponding job.
334: */
335: protected SubInfo getJob(String id) {
336: return (SubInfo) mSubInfoMap.get(id);
337: }
338:
339: /**
340: * Maps the partition to the corresponding clustered job.
341: *
342: * @param p the partition being clustered.
343: * @param job the corresponding clustered job.
344: */
345: protected void associate(Partition p, SubInfo job) {
346: mPartitionClusterMap.put(p.getID(), job);
347: }
348:
349: /**
350: * Returns the job corresponding to a partition.
351: *
352: * @param p the partition for which the clustered job is reqd.
353: *
354: * @return the corresponding job, else null in case of job is not found.
355: */
356: protected SubInfo clusteredJob(Partition p) {
357: return this .clusteredJob(p.getID());
358: }
359:
360: /**
361: * Returns the job corresponding to a partition.
362: *
363: * @param id the partition id.
364: *
365: * @return the corresponding job, else null in case of job is not found.
366: */
367: protected SubInfo clusteredJob(String id) {
368: Object obj = mPartitionClusterMap.get(id);
369: return (obj == null) ? null : (SubInfo) obj;
370: }
371:
372: }
|