001: /**
002: * This file or a portion of this file is licensed under the terms of
003: * the Globus Toolkit Public License, found at $PEGASUS_HOME/GTPL or
004: * http://www.globus.org/toolkit/download/license.html.
005: * This notice must appear in redistributions of this file
006: * with or without modification.
007: *
008: * Redistributions of this Software, with or without modification, must reproduce
009: * the GTPL in:
010: * (1) the Software, or
011: * (2) the Documentation or
012: * some other similar material which is provided with the Software (if any).
013: *
014: * Copyright 1999-2004
015: * University of Chicago and The University of Southern California.
016: * All rights reserved.
017: */package org.griphyn.cPlanner.engine;
018:
019: import java.util.Iterator;
020: import java.util.Set;
021: import java.util.Vector;
022:
023: import org.griphyn.cPlanner.classes.ADag;
024: import org.griphyn.cPlanner.classes.PlannerOptions;
025: import org.griphyn.cPlanner.classes.PegasusBag;
026:
027: import org.griphyn.cPlanner.common.LogManager;
028: import org.griphyn.cPlanner.common.PegasusProperties;
029:
030: import org.griphyn.common.catalog.transformation.TCMode;
031:
032: /**
033: * The central class that calls out to the various other components of Pegasus.
034: *
035: * @author Karan Vahi
036: * @author Gaurang Mehta
037: * @version $Revision: 450 $
038: *
039: * @see org.griphyn.cPlanner.classes.ReplicaLocations
040: */
041: public class MainEngine extends Engine {
042:
043: /**
044: * The Original Dag object which is constructed by parsing the dag file.
045: */
046: private ADag mOriginalDag;
047:
048: /**
049: * The reduced Dag object which is got from the Reduction Engine.
050: */
051: private ADag mReducedDag;
052:
053: /**
054: * The cleanup dag for the final concrete dag.
055: */
056: private ADag mCleanupDag;
057:
058: /**
059: * The pools on which the Dag should be executed as specified by the user.
060: */
061: private Set mExecPools;
062:
063: /**
064: * The pool on which all the output data should be transferred.
065: */
066: private String mOutputPool;
067:
068: /**
069: * The bridge to the Replica Catalog.
070: */
071: private ReplicaCatalogBridge mRCBridge;
072:
073: /**
074: * The handle to the InterPool Engine that calls out to the Site Selector
075: * and maps the jobs.
076: */
077: private InterPoolEngine mIPEng;
078:
079: /**
080: * The handle to the Reduction Engine that performs reduction on the graph.
081: */
082: private ReductionEngine mRedEng;
083:
084: /**
085: * The handle to the Transfer Engine that adds the transfer nodes in the
086: * graph to transfer the files from one site to another.
087: */
088: private TransferEngine mTransEng;
089:
090: /**
091: * The engine that ends up creating random directories in the remote
092: * execution pools.
093: */
094: private CreateDirectory mCreateEng;
095:
096: /**
097: * The engine that ends up creating the cleanup dag for the dag.
098: */
099: private RemoveDirectory mRemoveEng;
100:
101: /**
102: * The handle to the Authentication Engine that performs the authentication
103: * with the various sites.
104: */
105: private AuthenticateEngine mAuthEng;
106:
107: /**
108: * The handle to the node collapser.
109: */
110: private NodeCollapser mNodeCollapser;
111:
112: /**
113: * The bag of objects that is populated as planner is run.
114: */
115: private PegasusBag mBag;
116:
117: /**
118: * This constructor initialises the class variables to the variables
119: * passed. The pool names specified should be present in the pool.config file
120: *
121: * @param orgDag the dag to be worked on.
122: * @param props the properties to be used.
123: * @param options The options specified by the user to run the planner.
124: */
125:
126: public MainEngine(ADag orgDag, PegasusProperties props,
127: PlannerOptions options) {
128:
129: super (props);
130: mOriginalDag = orgDag;
131: this .mPOptions = options;
132: mExecPools = (Set) mPOptions.getExecutionSites();
133: mOutputPool = mPOptions.getOutputSite();
134: mTCHandle = TCMode.loadInstance();
135:
136: if (mOutputPool != null && mOutputPool.length() > 0) {
137: Engine.mOutputPool = mOutputPool;
138: }
139:
140: }
141:
142: /**
143: * The main function which calls the other engines and does the necessary work.
144: *
145: * @return the planned worflow.
146: */
147: public ADag runPlanner() {
148: //do the authentication against the pools
149: if (mPOptions.authenticationSet()) {
150: mAuthEng = new AuthenticateEngine(
151: mProps,
152: new java.util.HashSet(mPOptions.getExecutionSites()));
153:
154: mLogger.log("Authenticating Sites",
155: LogManager.INFO_MESSAGE_LEVEL);
156: Set authenticatedSet = mAuthEng.authenticate();
157: if (authenticatedSet.isEmpty()) {
158: StringBuffer error = new StringBuffer();
159: error
160: .append(
161: "Unable to authenticate against any site. ")
162: .append(
163: "Probably your credentials were not generated")
164: .append(" or have expired");
165: throw new RuntimeException(error.toString());
166: }
167: mLogger.log("Sites authenticated are "
168: + setToString(authenticatedSet, ","),
169: LogManager.DEBUG_MESSAGE_LEVEL);
170: mLogger.logCompletion("Authenticating Sites",
171: LogManager.INFO_MESSAGE_LEVEL);
172: mPOptions.setExecutionSites(authenticatedSet);
173: }
174:
175: Vector vDelLeafJobs = new Vector();
176: String message = null;
177: mRCBridge = new ReplicaCatalogBridge(mOriginalDag, mProps,
178: mPOptions);
179:
180: mRedEng = new ReductionEngine(mOriginalDag, mProps, mPOptions);
181: mReducedDag = mRedEng.reduceDag(mRCBridge);
182: vDelLeafJobs = mRedEng.getDeletedLeafJobs();
183: mRedEng = null;
184:
185: //unmark arg strings
186: //unmarkArgs();
187: message = "Doing site selection";
188: mLogger.log(message, LogManager.INFO_MESSAGE_LEVEL);
189: mIPEng = new InterPoolEngine(mReducedDag, mProps, mPOptions);
190: mIPEng.determineSites();
191: mBag = mIPEng.getPegasusBag();
192: mIPEng = null;
193: mLogger.logCompletion(message, LogManager.INFO_MESSAGE_LEVEL);
194:
195: //do the node cluster
196: if (mPOptions.getClusteringTechnique() != null) {
197: message = "Clustering the jobs in the workflow";
198: mLogger.log(message, LogManager.INFO_MESSAGE_LEVEL);
199: mNodeCollapser = new NodeCollapser(mBag);
200:
201: try {
202: mReducedDag = mNodeCollapser.cluster(mReducedDag);
203: } catch (Exception e) {
204: throw new RuntimeException(message, e);
205: }
206:
207: mNodeCollapser = null;
208: mLogger.logCompletion(message,
209: LogManager.INFO_MESSAGE_LEVEL);
210: }
211:
212: message = "Grafting transfer nodes in the workflow";
213: mLogger.log(message, LogManager.INFO_MESSAGE_LEVEL);
214: mTransEng = new TransferEngine(mReducedDag, vDelLeafJobs,
215: mProps, mPOptions);
216: mTransEng.addTransferNodes(mRCBridge);
217: mTransEng = null;
218: mLogger.logCompletion(message, LogManager.INFO_MESSAGE_LEVEL);
219:
220: //close the connection to RLI explicitly
221: mRCBridge.closeConnection();
222:
223: if (mPOptions.generateRandomDirectory()) {
224: //add the nodes to that create
225: //random directories at the remote
226: //execution pools.
227: message = "Grafting the remote workdirectory creation jobs "
228: + "in the workflow";
229: mLogger.log(message, LogManager.INFO_MESSAGE_LEVEL);
230: mCreateEng = CreateDirectory.loadCreateDirectoryInstance(
231: mProps.getCreateDirClass(), mReducedDag, mProps);
232: mCreateEng.addCreateDirectoryNodes();
233: mCreateEng = null;
234: mLogger.logCompletion(message,
235: LogManager.INFO_MESSAGE_LEVEL);
236:
237: //create the cleanup dag
238: message = "Generating the cleanup workflow";
239: mLogger.log(message, LogManager.INFO_MESSAGE_LEVEL);
240: mRemoveEng = new RemoveDirectory(mReducedDag, mProps);
241: mCleanupDag = mRemoveEng.generateCleanUPDAG();
242: mLogger.logCompletion(message,
243: LogManager.INFO_MESSAGE_LEVEL);
244: }
245:
246: //add the cleanup nodes in place
247: if (mPOptions.getCleanup()) { /* should be exposed via command line option */
248: message = "Adding cleanup jobs in the workflow";
249: mLogger.log(message, LogManager.INFO_MESSAGE_LEVEL);
250: CleanupEngine cEngine = new CleanupEngine(mProps, mPOptions);
251: mReducedDag = cEngine.addCleanupJobs(mReducedDag);
252: mLogger.logCompletion(message,
253: LogManager.INFO_MESSAGE_LEVEL);
254: }
255:
256: return mReducedDag;
257: }
258:
259: /**
260: * Returns the cleanup dag for the concrete dag.
261: *
262: * @return the cleanup dag if the random dir is given.
263: * null otherwise.
264: */
265: public ADag getCleanupDAG() {
266: return mCleanupDag;
267: }
268:
269: /**
270: * Returns the bag of intialization objects.
271: *
272: * @return PegasusBag
273: */
274: public PegasusBag getPegasusBag() {
275: return mBag;
276: }
277:
278: /**
279: * Unmarks the arguments , that are tagged in the DaxParser. At present there are
280: * no tagging.
281: *
282: * @deprecated
283: */
284: private void unmarkArgs() {
285: /*Enumeration e = mReducedDag.vJobSubInfos.elements();
286: while(e.hasMoreElements()){
287: SubInfo sub = (SubInfo)e.nextElement();
288: sub.strargs = new String(removeMarkups(sub.strargs));
289: }*/
290: }
291:
292: /**
293: * A small helper method that displays the contents of a Set in a String.
294: *
295: * @param s the Set whose contents need to be displayed
296: * @param delim The delimited between the members of the set.
297: * @return String
298: */
299: public String setToString(Set s, String delim) {
300: StringBuffer sb = new StringBuffer();
301: for (Iterator it = s.iterator(); it.hasNext();) {
302: sb.append((String) it.next()).append(delim);
303: }
304: String result = sb.toString();
305: result = (result.length() > 0) ? result.substring(0, result
306: .lastIndexOf(delim)) : result;
307: return result;
308: }
309:
310: }
|