001: /**
002: * This file or a portion of this file is licensed under the terms of
003: * the Globus Toolkit Public License, found in file GTPL, or at
004: * http://www.globus.org/toolkit/download/license.html. This notice must
005: * appear in redistributions of this file, with or without modification.
006: *
007: * Redistributions of this Software, with or without modification, must
008: * reproduce the GTPL in: (1) the Software, or (2) the Documentation or
009: * some other similar material which is provided with the Software (if
010: * any).
011: *
012: * Copyright 1999-2004 University of Chicago and The University of
013: * Southern California. All rights reserved.
014: */package org.griphyn.cPlanner.cluster;
015:
016: import org.griphyn.cPlanner.classes.SubInfo;
017: import org.griphyn.cPlanner.classes.AggregatedJob;
018: import org.griphyn.cPlanner.classes.PegasusFile;
019:
020: import org.griphyn.cPlanner.partitioner.Topological;
021: import org.griphyn.cPlanner.partitioner.Partition;
022:
023: import java.util.List;
024: import java.util.Set;
025: import java.util.HashSet;
026: import java.util.Iterator;
027:
028: /**
029: * The vertical cluster, that extends the Default clusterer and topologically
030: * sorts the partition before clustering the jobs into aggregated jobs.
031: *
032: * @author Karan Vahi
033: * @version $Revision: 50 $
034: */
035:
036: public class Vertical extends Abstract {
037:
038: /**
039: * A short description about the partitioner.
040: */
041: public static final String DESCRIPTION = "Topological based Vertical Clustering";
042:
043: /**
044: * The default constructor.
045: */
046: public Vertical() {
047: super ();
048: }
049:
050: /**
051: * Returns a textual description of the transfer implementation.
052: *
053: * @return a short textual description
054: */
055: public String description() {
056: return this .DESCRIPTION;
057: }
058:
059: /**
060: * Returns the nodes in the partition as a List in the topologically sorted
061: * order.
062: *
063: * @param p the partition whose nodes have to be ordered.
064: *
065: * @return an ordered List of <code>String</code> objects that are the ID's
066: * of the nodes.
067: *
068: * @throws ClustererException in case of error.
069: */
070: public List order(Partition p) throws ClustererException {
071: try {
072: return new Topological(p).sort();
073: } catch (Exception e) {
074: throw new ClustererException(
075: "Unable to sort the partition " + p.getID(), e);
076: }
077: }
078:
079: /**
080: * Determine the input and output files of the job on the basis of the
081: * order of the constituent jobs in the AggregatedJob.
082: * The input and output files are determined on the basis of topologically
083: * sorted order of the constituent jobs.
084: *
085: * @param job the <code>AggregatedJob</code>
086: *
087: * @throws ClustererException in case of error.
088: */
089: public void determineInputOutputFiles(AggregatedJob job) {
090:
091: //set the input files to null for time being
092: job.inputFiles = null;
093: job.outputFiles = null;
094:
095: Set inputFiles = new HashSet();
096: Set materializedFiles = new HashSet();
097:
098: PegasusFile file;
099:
100: //the constituent jobs are topologically sorted
101: //traverse through them and build up the ip and op files
102: for (Iterator it = job.constituentJobsIterator(); it.hasNext();) {
103: SubInfo cjob = (SubInfo) it.next();
104:
105: //traverse through input files of constituent job
106: for (Iterator fileIt = cjob.getInputFiles().iterator(); fileIt
107: .hasNext();) {
108: file = (PegasusFile) fileIt.next();
109: //add to input files if it has not been materializd
110: if (!materializedFiles.contains(file)) {
111: inputFiles.add(file);
112: }
113: }
114:
115: //traverse through output files of constituent job
116: for (Iterator fileIt = cjob.getOutputFiles().iterator(); fileIt
117: .hasNext();) {
118: file = (PegasusFile) fileIt.next();
119: //add to materialized files
120: materializedFiles.add(file);
121:
122: // //file is output only if it has to be registered or transferred
123: // if ( !file.getTransientRegFlag() ||
124: // file.getTransferFlag() != PegasusFile.TRANSFER_NOT ){
125: // outputFiles.add( file );
126: // }
127: }
128: }
129:
130: job.setInputFiles(inputFiles);
131:
132: //all the materialized files are output files for
133: //the aggregated job.
134: job.setOutputFiles(materializedFiles);
135:
136: }
137:
138: }
|