001: /*
002: * This file or a portion of this file is licensed under the terms of
003: * the Globus Toolkit Public License, found in file GTPL, or at
004: * http://www.globus.org/toolkit/download/license.html. This notice must
005: * appear in redistributions of this file, with or without modification.
006: *
007: * Redistributions of this Software, with or without modification, must
008: * reproduce the GTPL in: (1) the Software, or (2) the Documentation or
009: * some other similar material which is provided with the Software (if
010: * any).
011: *
012: * Copyright 1999-2004 University of Chicago and The University of
013: * Southern California. All rights reserved.
014: */
015:
016: package org.griphyn.cPlanner.engine.cleanup;
017:
018: import org.griphyn.cPlanner.classes.SubInfo;
019: import org.griphyn.cPlanner.classes.PegasusFile;
020: import org.griphyn.cPlanner.classes.PlannerOptions;
021:
022: import org.griphyn.cPlanner.common.PegasusProperties;
023: import org.griphyn.cPlanner.common.LogManager;
024:
025: import org.griphyn.cPlanner.namespace.Condor;
026:
027: import org.griphyn.cPlanner.partitioner.graph.GraphNode;
028: import org.griphyn.cPlanner.partitioner.graph.Graph;
029:
030: import java.util.Map;
031: import java.util.Iterator;
032: import java.util.HashMap;
033: import java.util.List;
034: import java.util.LinkedList;
035: import java.util.ArrayList;
036: import java.util.Set;
037: import java.util.TreeSet;
038: import java.util.HashSet;
039: import java.lang.StringBuffer;
040: import org.griphyn.cPlanner.partitioner.graph.MapGraph;
041:
042: /**
043: * This generates cleanup jobs in the workflow itself.
044: *
045: *
046: * @author Arun ramakrishnan
047: * @author Karan Vahi
048: *
049: * @version $Revision: 50 $
050: */
051: public class InPlace implements Strategy {
052:
053: /**
054: * The prefix for CLEANUP_JOB ID i.e prefix+the parent compute_job ID becomes
055: * ID of the cleanup job.
056: */
057: public static final String CLEANUP_JOB_PREFIX = "cln_";
058:
059: /**
060: * The mapping to siteHandle to all the jobs that are mapped to it
061: * mapping to siteHandle(String) to Set<GraphNodes>
062: */
063: private HashMap mResMap;
064:
065: /**
066: * The mapping of siteHandle to all subset of the jobs mapped to it that are
067: * leaves in the workflow mapping to siteHandle(String) to Set<GraphNodes>.
068: */
069: private HashMap mResMapLeaves;
070:
071: /**
072: * The mapping of siteHandle to all subset of the jobs mapped to it that are
073: * leaves in the workflow mapping to siteHandle(String) to Set<GraphNodes>.
074: */
075: private HashMap mResMapRoots;
076:
077: /**
078: * The max depth of any job in the workflow useful for a priorityQueue
079: * implementation in an array
080: */
081: private int mMaxDepth;
082:
083: /**
084: * HashSet of Files that should not be cleaned up
085: */
086: private HashSet mDoNotClean;
087:
088: /**
089: * The handle to the Implementation instance that creates the jobs for us.
090: */
091: private Implementation mImpl;
092:
093: /**
094: * The handle to the properties passed to Pegasus.
095: */
096: private PegasusProperties mProps;
097:
098: /**
099: * The handle to the logging object used for logging.
100: */
101: private LogManager mLogger;
102:
103: /**
104: * Creates a new instance of InPlace
105: *
106: * @param properties the properties passed to the planner.
107: * @param options the options passed to the planner.
108: *
109: */
110: public InPlace(PegasusProperties properties, PlannerOptions options) {
111: mProps = properties;
112: mLogger = LogManager.getInstance();
113:
114: mImpl = new Cleanup(properties, options);
115:
116: //intialize the internal structures
117: mResMap = new HashMap();
118: mResMapLeaves = new HashMap();
119: mResMapRoots = new HashMap();
120: mDoNotClean = new HashSet();
121: mMaxDepth = 0;
122: }
123:
124: /**
125: * Adds cleanup jobs to the workflow.
126: *
127: * @param workflow the workflow to add cleanup jobs to.
128: *
129: * @return the workflow with cleanup jobs added to it.
130: */
131: public Graph addCleanupJobs(Graph workflow) {
132: //reset the internal data structures
133: reset();
134:
135: //add the priorities to all the jobs
136: applyJobPriorities(workflow);
137:
138: //determine the files that should not be removed from the resource where it is produced
139: // i.e file A produced by job J should not be removed if J does not have a stage out job
140: // and A has getTransientTransferFlag() set to false
141: for (Iterator it = workflow.nodeIterator(); it.hasNext();) {
142: GraphNode _GN = (GraphNode) it.next();
143: SubInfo _SI = (SubInfo) _GN.getContent();
144: //only for compute jobs
145: if (!(_SI.getJobType() == _SI.COMPUTE_JOB || _SI
146: .getJobType() == _SI.STAGED_COMPUTE_JOB)) {
147: continue;
148: }
149:
150: //if the compute job has a stage out job then all the files produced by it can be removed
151: // so , skip such cases
152: boolean job_has_stageout = false;
153: for (Iterator itjc = _GN.getChildren().iterator(); itjc
154: .hasNext();) {
155: SubInfo _SIchild = (SubInfo) ((GraphNode) itjc.next())
156: .getContent();
157: if (_SIchild.getJobType() == _SIchild.STAGE_OUT_JOB) {
158: job_has_stageout = true;
159: break;
160: }
161: }
162: if (job_has_stageout)
163: continue;
164:
165: //else add files with getTransientTransferFlag() set to false to the do_not_clean List
166: Set _ofiles = _SI.getOutputFiles();
167: for (Iterator itof = _ofiles.iterator(); itof.hasNext();) {
168: PegasusFile of = (PegasusFile) itof.next();
169: if (of.getTransientTransferFlag() == false) {
170: this .mDoNotClean.add(of);
171: }
172: }
173: }
174:
175: // mLogger.log( "The input workflow " + workflow,
176: // LogManager.DEBUG_MESSAGE_LEVEL );
177:
178: //set the depth and ResMap values iteratively
179: setDepth_ResMap(workflow.getRoots());
180:
181: mLogger.log("Number of sites " + mResMap.size(),
182: LogManager.DEBUG_MESSAGE_LEVEL);
183:
184: //output for debug
185: StringBuffer message = new StringBuffer();
186: for (Iterator it = mResMap.entrySet().iterator(); it.hasNext();) {
187: Map.Entry entry = (Map.Entry) it.next();
188: message.append("Site ").append((String) entry.getKey())
189: .append(" count jobs = ").append(
190: ((Set) entry.getValue()).size());
191: mLogger.log(message.toString(),
192: LogManager.DEBUG_MESSAGE_LEVEL);
193:
194: Set whatever = (Set) entry.getValue();
195: for (Iterator weit = whatever.iterator(); weit.hasNext();) {
196: mLogger.log("* " + ((GraphNode) weit.next()).getID(),
197: LogManager.DEBUG_MESSAGE_LEVEL);
198: }
199: message = new StringBuffer();
200: }
201:
202: //for each site do the process of adding cleanup jobs
203: for (Iterator it = mResMap.entrySet().iterator(); it.hasNext();) {
204: Map.Entry entry = (Map.Entry) it.next();
205: addCleanUpJobs((String) entry.getKey(), (Set) entry
206: .getValue(), workflow);
207: }
208:
209: // mLogger.log( "The resultant workflow with cleanup jobs " + workflow,
210: // LogManager.DEBUG_MESSAGE_LEVEL );
211:
212: return workflow;
213: }
214:
215: /**
216: * Resets the internal data structures.
217: *
218: */
219: protected void reset() {
220: mResMap.clear();
221: mResMapLeaves.clear();
222: mResMapRoots.clear();
223: mMaxDepth = 0;
224: }
225:
226: /**
227: * A BFS implementation to set depth value (roots have depth 1) and also
228: * to populate mResMap ,mResMapLeaves,mResMapRoots which contains all the
229: * jobs that are assigned to a particular resource
230: *
231: * @param roots List of GraphNode objects that are roots
232: */
233: private void setDepth_ResMap(List roots) {
234: LinkedList que = new LinkedList();
235: que.addAll(roots);
236:
237: for (int i = 0; i < que.size(); i++) {
238: ((GraphNode) que.get(i)).setDepth(1);
239: }
240:
241: while (que.size() >= 1) {
242: GraphNode curGN = (GraphNode) que.removeFirst();
243:
244: //debug
245: /*
246: System.out.print(curGN.getDepth() +" "+((SubInfo)curGN.getContent()).getSiteHandle()+" ");
247: if( curGN.getChildren() == null )
248: System.out.print("0");
249: else
250: System.out.print( curGN.getChildren().size() );
251: */
252:
253: //populate mResMap ,mResMapLeaves,mResMapRoots
254: SubInfo si = (SubInfo) curGN.getContent();
255: if (!mResMap.containsKey(si.getSiteHandle())) {
256: mResMap.put(si.getSiteHandle(), new HashSet());
257:
258: }
259: ((Set) mResMap.get(si.getSiteHandle())).add(curGN);
260: //System.out.println( " site count="+((Set)mResMap.get( si.getSiteHandle() )).size() );
261:
262: //now set the depth
263:
264: for (Iterator it = curGN.getChildren().iterator(); it
265: .hasNext();) {
266: GraphNode child = (GraphNode) it.next();
267: if (!(child.getDepth() == -1 || child.getDepth() < curGN
268: .getDepth() + 1)) {
269: continue;
270: }
271:
272: child.setDepth(curGN.getDepth() + 1);
273: if (child.getDepth() > mMaxDepth)
274: mMaxDepth = child.getDepth();
275: que.addLast(child);
276: }
277:
278: }
279:
280: }
281:
282: /**
283: * Adds cleanup jobs for the workflow scheduled to a particular site
284: * a best first search strategy is implemented based on the depth of the job
285: * in the workflow
286: *
287: * @param site the site ID
288: * @param leaves the leaf jobs that are scheduled to site
289: * @param workflow the Graph into which new cleanup jobs can be added
290: */
291: private void addCleanUpJobs(String site, Set leaves, Graph workflow) {
292:
293: mLogger.log(site + " " + leaves.size(),
294: LogManager.DEBUG_MESSAGE_LEVEL);
295: //if( !site.equals(new String("k")) )return;
296: //file(String) cleaned by GraphNode
297: HashMap cleanedBy = new HashMap();
298:
299: //the below in case we get rid of the primitive java 1.4
300: //PriorityQueue<GraphNode> pQ=new PriorityQueue<GraphNode>(resMap.get(site).size(),GraphNode_ORDER);
301:
302: StringBuffer message = new StringBuffer();
303: message.append("Leaf jobs scheduled at site ").append(site)
304: .append(" are ");
305: for (Iterator it = leaves.iterator(); it.hasNext();) {
306: message.append(((GraphNode) it.next()).getID());
307: }
308: mLogger.log(message.toString(), LogManager.DEBUG_MESSAGE_LEVEL);
309:
310: //its a Set of GraphNode's
311: Set[] pQA = new Set[mMaxDepth + 1];
312: for (int i = 0; i < pQA.length; i++) {
313: pQA[i] = new HashSet();
314: }
315:
316: //populate the priority Array pQA with all the leaf nodes
317: for (Iterator it = leaves.iterator(); it.hasNext();) {
318: GraphNode gN = (GraphNode) it.next();
319: pQA[gN.getDepth()].add(gN);
320:
321: }
322:
323: //start the best first cleanup job addition
324: for (int curP = mMaxDepth; curP >= 0; curP--) {
325:
326: //process all elements in the current priority
327: while (pQA[curP].size() >= 1) {
328: GraphNode curGN = (GraphNode) pQA[curP].iterator()
329: .next();
330: pQA[curP].remove(curGN);
331: SubInfo curGN_SI = (SubInfo) curGN.getContent();
332:
333: if (!typeNeedsCleanUp(curGN_SI.getJobType())) {
334: continue;
335: }
336:
337: // Leads to corruption of input files for the job.
338: // Set fileSet = curGN_SI.getInputFiles();
339: Set fileSet = new HashSet(curGN_SI.getInputFiles());
340: fileSet.addAll(curGN_SI.getOutputFiles());
341:
342: //remove the files in fileSet that are in this.mDoNotClean
343: Set fileSet2 = new HashSet(fileSet);
344: for (Iterator itfs2 = fileSet2.iterator(); itfs2
345: .hasNext();) {
346: Object _dum_pf = itfs2.next();
347: if (this .mDoNotClean.contains(_dum_pf)) {
348: fileSet.remove(_dum_pf);
349: }
350: }
351:
352: // create new GraphNode with MLogicalID=mLogicalName , mParents
353: // mContent ID ,Name , jobtype
354: //the files it cleans up are specified in mContent.inputFiles
355: //create a dummy GraphNode .first create SubInfo object and then add it to GraphNode
356: GraphNode nuGN = new GraphNode(
357: generateCleanupID(curGN_SI), curGN_SI
358: .getTXName());
359: // InPlace.CLEANUP_JOB_PREFIX + curGN.getName() ,
360: // InPlace.CLEANUP_JOB_PREFIX + curGN.getName() );
361:
362: List cleanupFiles = new LinkedList();
363: for (Iterator it = fileSet.iterator(); it.hasNext();) {
364: PegasusFile file = (PegasusFile) it.next();
365:
366: //check if its already set up to be cleaned up
367: if (cleanedBy.containsKey(file.getLFN())) {
368: GraphNode child = (GraphNode) cleanedBy
369: .get(file.getLFN());
370: if (!child.getParents().contains(curGN)) {
371: child.addParent(curGN);
372: }
373: if (!curGN.getChildren().contains(child)) {
374: curGN.addChild(child);
375: }
376: } else {
377: // nuSI.addInputFile( file );
378: cleanupFiles.add(file);
379: cleanedBy.put(file.getLFN(), nuGN);
380:
381: if (!curGN.getChildren().contains(nuGN)) {
382: curGN.addChild(nuGN);
383: }
384: if (!nuGN.getParents().contains(curGN)) {
385: nuGN.addParent(curGN);
386: }
387: }
388: }// all the files
389:
390: //create a cleanup job if the cleanup node has any parents
391: if (nuGN.getParents().size() >= 1) {
392: mLogger.log("Adding cleanup job with ID "
393: + nuGN.getID(),
394: LogManager.DEBUG_MESSAGE_LEVEL);
395: SubInfo cleanupJob = mImpl.createCleanupJob(nuGN
396: .getID(), cleanupFiles, curGN_SI);
397: //if the corresponding compute job has any transfer or stageout jobs as child add it
398: //as a parent of the cleanup job
399: for (Iterator itc = curGN.getChildren().iterator(); itc
400: .hasNext();) {
401: GraphNode curGNchild = (GraphNode) itc.next();
402: SubInfo itc_si = (SubInfo) curGNchild
403: .getContent();
404: if (itc_si != null)
405: if (itc_si.getJobType() == SubInfo.STAGE_OUT_JOB
406: || itc_si.getJobType() == SubInfo.INTER_POOL_JOB) {
407:
408: nuGN.addParent(curGNchild);
409: curGNchild.addChild(nuGN);
410: }
411: }
412:
413: //add the job as a content to the graphnode
414: //and the node itself to the Graph
415: nuGN.setContent(cleanupJob);
416: workflow.addNode(nuGN);
417: }
418: }
419: }
420:
421: //output whats file is cleaned by what ?
422: mLogger.log("", LogManager.DEBUG_MESSAGE_LEVEL);
423: mLogger.log("CLEANUP LIST", LogManager.DEBUG_MESSAGE_LEVEL);
424: for (Iterator it = cleanedBy.keySet().iterator(); it.hasNext();) {
425: String lfn = (String) it.next();
426: GraphNode cl_GN = (GraphNode) cleanedBy.get(lfn);
427: SubInfo cl_si = (SubInfo) cl_GN.getContent();
428: //Arun please use a StringBuffer first
429: //Karan March 13, 2007
430: mLogger.log("file:" + lfn + " site:"
431: + cl_si.getSiteHandle() + " " + cl_GN.getID(),
432: LogManager.DEBUG_MESSAGE_LEVEL);
433: }
434:
435: //reduce dependencies. for each cleanup job X, look at the parents of
436: //the job. For each parent Y see if there is a path to any other parent Z of X.
437: //If a path exists, then the edge from Z to cleanup job can
438: //be removed.
439: int num = 0;
440: for (Iterator it = cleanedBy.values().iterator(); it.hasNext();) {
441: num++;
442: mLogger.log(" cleanup job counter = " + num,
443: mLogger.DEBUG_MESSAGE_LEVEL);
444: GraphNode cl_GN = (GraphNode) it.next();
445: //SubInfo cl_si=(SubInfo)cl_GN.getContent();
446: List cl_GNparents = cl_GN.getParents();
447: List redundant = new LinkedList();
448: HashSet visit = new HashSet();
449: for (Iterator itp = cl_GN.getParents().iterator(); itp
450: .hasNext();) {
451: LinkedList mque = new LinkedList();
452: mque.add(itp.next());
453:
454: while (mque.size() > 0) {
455: GraphNode popGN = (GraphNode) mque.removeFirst();
456:
457: if (visit.contains(popGN)) {
458: continue;
459: }
460:
461: visit.add(popGN);
462:
463: for (Iterator itp_pop = popGN.getParents()
464: .iterator(); itp_pop.hasNext();) {
465: GraphNode pop_pGN = (GraphNode) itp_pop.next();
466: //check if its redundant ..if so add it to redundant list
467: if (cl_GNparents.contains(pop_pGN)) {
468: redundant.add(pop_pGN);
469: } else {
470: //mque.addAll( pop_pGN.getParents() );
471: for (Iterator itgp = pop_pGN.getParents()
472: .iterator(); itgp.hasNext();) {
473: GraphNode gpGN = (GraphNode) itgp
474: .next();
475: if (!visit.contains(gpGN)) {
476: mque.add(gpGN);
477: }
478: }
479: }
480: }
481: }
482: }
483:
484: //remove all redundant nodes that were found
485: for (Iterator itr = redundant.iterator(); itr.hasNext();) {
486: GraphNode r_GN = (GraphNode) itr.next();
487: cl_GN.removeParent(r_GN);
488: r_GN.removeChild(cl_GN);
489: }
490: }
491:
492: }
493:
494: /**
495: * Adds job priorities to the jobs in the workflow on the basis of
496: * the levels in the traversal order given by the iterator. Later on
497: * this would be a separate refiner.
498: *
499: * @param workflow the workflow on which to apply job priorities.
500: *
501: */
502: protected void applyJobPriorities(Graph workflow) {
503:
504: for (Iterator it = workflow.iterator(); it.hasNext();) {
505: GraphNode node = (GraphNode) it.next();
506: SubInfo job = (SubInfo) node.getContent();
507:
508: //log to debug
509: StringBuffer sb = new StringBuffer();
510: sb.append("Applying priority of ").append(node.getDepth())
511: .append(" to ").append(job.getID());
512: mLogger.log(sb.toString(), LogManager.DEBUG_MESSAGE_LEVEL);
513:
514: //apply a priority to the job overwriting any preexisting priority
515: job.condorVariables.construct(Condor.PRIORITY_KEY,
516: new Integer(node.getDepth()).toString());
517:
518: //also for compute and staged compute jobs
519: //forward to remote job manager also
520: //the below hack only works for condor pools
521: // if( job.getJobType() == SubInfo.COMPUTE_JOB ||
522: // job.getJobType() == SubInfo.STAGED_COMPUTE_JOB ){
523: // job.globusRSL.construct( "condorsubmit",
524: // "(priority " + node.getDepth() + ")");
525: // }
526: }
527: return;
528: }
529:
530: /**
531: * Returns the identifier that is to be assigned to cleanup job.
532: *
533: * @param job the job with which the cleanup job is primarily associated.
534: *
535: * @return the identifier for a cleanup job.
536: */
537: protected String generateCleanupID(SubInfo job) {
538: StringBuffer sb = new StringBuffer();
539: sb.append(this .CLEANUP_JOB_PREFIX).append(job.getID());
540: return sb.toString();
541: }
542:
543: /**
544: * Checks to see which job types are required to be looked at for cleanup.
545: * COMPUTE_JOB , STAGE_OUT_JOB , INTER_POOL_JOB are the ones that need
546: * cleanup
547: *
548: * @param type the type of the job.
549: *
550: * @return boolean
551: */
552: protected boolean typeNeedsCleanUp(int type) {
553: return (type == SubInfo.COMPUTE_JOB
554: /*|| type == SubInfo.STAGE_OUT_JOB
555: || type == SubInfo.INTER_POOL_JOB*/
556: || type == SubInfo.STAGED_COMPUTE_JOB);
557: }
558:
559: }
|