Source Code Cross Referenced for Algorithm.java in  » Workflow-Engines » pegasus-2.1.0 » org » griphyn » cPlanner » selector » site » heft » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Workflow Engines » pegasus 2.1.0 » org.griphyn.cPlanner.selector.site.heft 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /**
002:         * This file or a portion of this file is licensed under the terms of
003:         * the Globus Toolkit Public License, found at $PEGASUS_HOME/GTPL or
004:         * http://www.globus.org/toolkit/download/license.html.
005:         * This notice must appear in redistributions of this file
006:         * with or without modification.
007:         *
008:         * Redistributions of this Software, with or without modification, must reproduce
009:         * the GTPL in:
010:         * (1) the Software, or
011:         * (2) the Documentation or
012:         * some other similar material which is provided with the Software (if any).
013:         *
014:         * Copyright 1999-2004
015:         * University of Chicago and The University of Southern California.
016:         * All rights reserved.
017:         */package org.griphyn.cPlanner.selector.site.heft;
018:
019:        import org.griphyn.cPlanner.classes.ADag;
020:        import org.griphyn.cPlanner.classes.SubInfo;
021:        import org.griphyn.cPlanner.classes.Profile;
022:        import org.griphyn.cPlanner.classes.SiteInfo;
023:        import org.griphyn.cPlanner.classes.JobManager;
024:        import org.griphyn.cPlanner.classes.PegasusBag;
025:
026:        import org.griphyn.cPlanner.common.PegasusProperties;
027:        import org.griphyn.cPlanner.common.LogManager;
028:
029:        import org.griphyn.cPlanner.partitioner.graph.Graph;
030:        import org.griphyn.cPlanner.partitioner.graph.GraphNode;
031:        import org.griphyn.cPlanner.partitioner.graph.Adapter;
032:        import org.griphyn.cPlanner.partitioner.graph.Bag;
033:
034:        import org.griphyn.common.catalog.TransformationCatalog;
035:        import org.griphyn.common.catalog.TransformationCatalogEntry;
036:
037:        import org.griphyn.common.catalog.transformation.Mapper;
038:        import org.griphyn.common.catalog.transformation.Windward;
039:
040:        import org.griphyn.cPlanner.poolinfo.PoolInfoProvider;
041:
042:        import org.griphyn.cPlanner.namespace.VDS;
043:
044:        import edu.isi.ikcap.workflows.ac.ProcessCatalogFactory;
045:        import edu.isi.ikcap.workflows.ac.ProcessCatalog;
046:        import edu.isi.ikcap.workflows.ac.classes.TransformationCharacteristics;
047:
048:        import edu.isi.ikcap.workflows.util.FactoryException;
049:
050:        import edu.isi.ikcap.workflows.sr.util.WorkflowGenerationProvenanceCatalog;
051:
052:        import java.util.List;
053:        import java.util.Map;
054:        import java.util.HashMap;
055:        import java.util.LinkedList;
056:        import java.util.Iterator;
057:        import java.util.Comparator;
058:        import java.util.Collections;
059:        import java.util.Properties;
060:
061:        /**
062:         * The HEFT based site selector. The runtime for the job in seconds is picked
063:         * from the pegasus profile key runtime in the transformation catalog for a
064:         * transformation.
065:         *
066:         * The data communication costs between jobs if scheduled on different sites
067:         * is assumed to be fixed. Later on if required, the ability to specify this
068:         * value will be exposed via properties.
069:         *
070:         * The number of processors in a site is picked by the attribute idle-nodes
071:         * associated with the vanilla jobmanager for a site in the site catalog.
072:         *
073:         * @author Karan Vahi
074:         * @version $Revision: 426 $
075:         *
076:         * @see #AVERAGE_BANDWIDTH
077:         * @see #RUNTIME_PROFILE_KEY
078:         * @see #DEFAULT_NUMBER_OF_FREE_NODES
079:         * @see #AVERAGE_DATA_SIZE_BETWEEN_JOBS
080:         * @see org.griphyn.cPlanner.classes.JobManager.IDLE_NODES
081:         *
082:         */
083:        public class Algorithm {
084:
085:            /**
086:             * The pegasus profile key that gives us the expected runtime.
087:             */
088:            public static final String RUNTIME_PROFILE_KEY = VDS.RUNTIME_KEY;
089:
090:            /**
091:             * The property that designates which Process catalog impl to pick up.
092:             */
093:            public static final String PROCESS_CATALOG_IMPL_PROPERTY = "pegasus.catalog.transformation.windward";
094:
095:            /**
096:             * The average bandwidth between the sites. In mega bytes/per second.
097:             */
098:            public static final float AVERAGE_BANDWIDTH = 5;
099:
100:            /**
101:             * The average data that is transferred in between 2 jobs in the workflow.
102:             * In megabytes.
103:             */
104:            public static final float AVERAGE_DATA_SIZE_BETWEEN_JOBS = 2;
105:
106:            /**
107:             * The default number of nodes that are associated with a site if not found
108:             * in the site catalog.
109:             */
110:            public static final int DEFAULT_NUMBER_OF_FREE_NODES = 10;
111:
112:            /**
113:             * The maximum finish time possible for a job.
114:             */
115:            public static final long MAXIMUM_FINISH_TIME = Long.MAX_VALUE;
116:
117:            /**
118:             * The average communication cost between nodes.
119:             */
120:            private float mAverageCommunicationCost;
121:
122:            /**
123:             * The workflow in the graph format, that needs to be scheduled.
124:             */
125:            private Graph mWorkflow;
126:
127:            /**
128:             * Handle to the site catalog.
129:             */
130:            private PoolInfoProvider mSiteHandle;
131:
132:            /**
133:             * The list of sites where the workflow can run.
134:             */
135:            private List mSites;
136:
137:            /**
138:             * Map containing the number of free nodes for each site. The key is the site
139:             * name, and value is a <code>Site</code> object.
140:             */
141:            private Map mSiteMap;
142:
143:            /**
144:             * Handle to the TCMapper.
145:             */
146:            protected Mapper mTCMapper;
147:
148:            /**
149:             * The handle to the LogManager
150:             */
151:            private LogManager mLogger;
152:
153:            /**
154:             * The handle to the properties.
155:             */
156:            private PegasusProperties mProps;
157:
158:            //TANGRAM related variables
159:            /**
160:             * The handle to the workflow provenance catalog
161:             */
162:            private WorkflowGenerationProvenanceCatalog mWGPC;
163:
164:            /**
165:             * The request id associated with the DAX.
166:             */
167:            private String mRequestID;
168:
169:            /**
170:             * The label of the workflow.
171:             */
172:            private String mLabel;
173:
174:            /**
175:             * The handle to the Process Catalog.
176:             */
177:            private ProcessCatalog mProcessCatalog;
178:
179:            /**
180:             * The handle to the transformation catalog.
181:             */
182:            private TransformationCatalog mTCHandle;
183:
184:            /**
185:             * The default constructor.
186:             *
187:             * @param bag  the bag of Pegasus related objects.
188:             */
189:            public Algorithm(PegasusBag bag) {
190:                mProps = (PegasusProperties) bag
191:                        .get(PegasusBag.PEGASUS_PROPERTIES);
192:                mTCHandle = (TransformationCatalog) bag
193:                        .get(PegasusBag.TRANSFORMATION_CATALOG);
194:                mTCMapper = (Mapper) bag.get(PegasusBag.TRANSFORMATION_MAPPER);
195:                mLogger = (LogManager) bag.get(PegasusBag.PEGASUS_LOGMANAGER);
196:                mSiteHandle = (PoolInfoProvider) bag
197:                        .get(PegasusBag.SITE_CATALOG);
198:                mAverageCommunicationCost = (this .AVERAGE_BANDWIDTH / this .AVERAGE_DATA_SIZE_BETWEEN_JOBS);
199:
200:                mProcessCatalog = this .loadProcessCatalog(mProps
201:                        .getProperty(this .PROCESS_CATALOG_IMPL_PROPERTY),
202:                        mProps.matchingSubset(
203:                                this .PROCESS_CATALOG_IMPL_PROPERTY, false));
204:
205:                //to figure out a way to insantiate SWF
206:                //Varun needs to write out a factory
207:                mWGPC = new WorkflowGenerationProvenanceCatalog();
208:            }
209:
210:            /**
211:             * Schedules the workflow using the heft.
212:             *
213:             * @param dag   the <code>ADag</code> object containing the abstract workflow
214:             *              that needs to be mapped.
215:             * @param sites the list of candidate sites where the workflow can potentially
216:             *              execute.
217:             */
218:            public void schedule(ADag dag, List sites) {
219:                //metadata about the DAG needs to go to Graph object
220:                mLabel = dag.getLabel();
221:                mRequestID = dag.getRequestID();
222:
223:                //convert the dag into a graph representation
224:                schedule(Adapter.convert(dag), sites);
225:            }
226:
227:            /**
228:             * Load the process catalog, only if it is determined that the Transformation
229:             * Catalog description is the windward one.
230:             *
231:             * @param type  the type of process catalog
232:             * @param props contains all necessary data to establish the link.
233:             *
234:             * @return true if connected now, or false to indicate a failure.
235:             */
236:            protected ProcessCatalog loadProcessCatalog(String type,
237:                    Properties props) {
238:                ProcessCatalog result = null;
239:
240:                //only load process catalog if TC implementation loaded is of type windward.
241:                if (!(mTCHandle instanceof  Windward)) {
242:                    return result;
243:                }
244:
245:                //figure out how to specify via properties
246:                try {
247:                    result = ProcessCatalogFactory.loadInstance(type, props);
248:                } catch (FactoryException e) {
249:                    mLogger.log("Unable to connect to process catalog "
250:                            + e.convertException(),
251:                            LogManager.DEBUG_MESSAGE_LEVEL);
252:                }
253:                return result;
254:            }
255:
256:            /**
257:             * Schedules the workflow according to the HEFT algorithm.
258:             *
259:             * @param workflow  the workflow that has to be scheduled.
260:             * @param sites the list of candidate sites where the workflow can potentially
261:             *              execute.
262:
263:             */
264:            public void schedule(Graph workflow, List sites) {
265:                mWorkflow = workflow;
266:                populateSiteMap(sites);
267:
268:                //compute weighted execution times for each job
269:                for (Iterator it = workflow.nodeIterator(); it.hasNext();) {
270:                    GraphNode node = (GraphNode) it.next();
271:                    SubInfo job = (SubInfo) node.getContent();
272:
273:                    //add the heft bag to a node
274:                    Float averageComputeTime = new Float(
275:                            calculateAverageComputeTime(job));
276:                    HeftBag b = new HeftBag();
277:                    b.add(HeftBag.AVG_COMPUTE_TIME, averageComputeTime);
278:                    node.setBag(b);
279:
280:                    mLogger.log("Average Compute Time " + node.getID() + " is "
281:                            + averageComputeTime,
282:                            LogManager.DEBUG_MESSAGE_LEVEL);
283:
284:                }
285:
286:                //add a dummy root
287:                Bag bag;
288:                GraphNode dummyRoot = new GraphNode("dummy", "dummy");
289:                workflow.addRoot(dummyRoot);
290:                bag = new HeftBag();
291:                //downward rank for the root is set to 0
292:                bag.add(HeftBag.DOWNWARD_RANK, new Float(0));
293:                dummyRoot.setBag(bag);
294:
295:                //do a breadth first traversal and compute the downward ranks
296:                Iterator it = workflow.iterator();
297:                dummyRoot = (GraphNode) it.next(); //we have the dummy root
298:                Float drank;
299:                //the dummy root has a downward rank of 0
300:                dummyRoot.getBag().add(HeftBag.DOWNWARD_RANK, new Float(0));
301:                //stores the nodes in sorted ascending order
302:                List sortedNodes = new LinkedList();
303:                while (it.hasNext()) {
304:                    GraphNode node = (GraphNode) it.next();
305:                    drank = new Float(computeDownwardRank(node));
306:                    bag = node.getBag();
307:                    bag.add(HeftBag.DOWNWARD_RANK, drank);
308:                    sortedNodes.add(node);
309:                    mLogger.log("Downward rank for node " + node.getID()
310:                            + " is " + drank, LogManager.DEBUG_MESSAGE_LEVEL);
311:                }
312:
313:                //sort the node
314:                Collections.sort(sortedNodes, new HeftGraphNodeComparator());
315:
316:                //the start time and end time for the dummy root is 0
317:                dummyRoot.getBag().add(HeftBag.ACTUAL_START_TIME, new Long(0));
318:                dummyRoot.getBag().add(HeftBag.ACTUAL_FINISH_TIME, new Long(0));
319:
320:                //schedule out the sorted order of the nodes
321:                for (it = sortedNodes.iterator(); it.hasNext();) {
322:                    GraphNode current = (GraphNode) it.next();
323:                    bag = current.getBag();
324:                    mLogger.log("Scheduling node " + current.getID(),
325:                            LogManager.DEBUG_MESSAGE_LEVEL);
326:
327:                    //figure out the sites where a job can run
328:                    SubInfo job = (SubInfo) current.getContent();
329:                    List runnableSites = mTCMapper.getSiteList(job
330:                            .getTXNamespace(), job.getTXName(), job
331:                            .getTXVersion(), mSites);
332:
333:                    //for each runnable site get the estimated finish time
334:                    //and schedule job on site that minimizes the finish time
335:                    String site;
336:                    long est_result[];
337:                    long result[] = new long[2];
338:                    result[1] = this .MAXIMUM_FINISH_TIME;
339:                    for (Iterator rit = runnableSites.iterator(); rit.hasNext();) {
340:                        site = (String) rit.next();
341:                        est_result = calculateEstimatedStartAndFinishTime(
342:                                current, site);
343:
344:                        //if existing EFT is greater than the returned EFT
345:                        //set existing EFT to the returned EFT
346:                        if (result[1] > est_result[1]) {
347:                            result[0] = est_result[0];
348:                            result[1] = est_result[1];
349:                            //tentatively schedule the job for that site
350:                            bag.add(HeftBag.SCHEDULED_SITE, site);
351:                        }
352:                    }
353:
354:                    //update the site selected with the job
355:                    bag.add(HeftBag.ACTUAL_START_TIME, new Long(result[0]));
356:                    bag.add(HeftBag.ACTUAL_FINISH_TIME, new Long(result[1]));
357:                    site = (String) bag.get(HeftBag.SCHEDULED_SITE);
358:                    scheduleJob(site, result[0], result[1]);
359:
360:                    //log the information
361:                    StringBuffer sb = new StringBuffer();
362:                    sb.append("Scheduled job ").append(current.getID()).append(
363:                            " to site ").append(site).append(" with from  ")
364:                            .append(result[0]).append(" till ").append(
365:                                    result[1]);
366:
367:                    mLogger.log(sb.toString(), LogManager.DEBUG_MESSAGE_LEVEL);
368:                }//end of going through all the sorted nodes
369:
370:                //remove the dummy root
371:                mWorkflow.remove(dummyRoot.getID());
372:            }
373:
374:            /**
375:             * Returns the makespan of the scheduled workflow. It is maximum of the
376:             * actual finish times for the leaves of the scheduled workflow.
377:             *
378:             * @return long  the makespan of the workflow.
379:             */
380:            public long getMakespan() {
381:                long result = -1;
382:
383:                //compute the maximum of the actual end times of leaves
384:                for (Iterator it = mWorkflow.getLeaves().iterator(); it
385:                        .hasNext();) {
386:                    GraphNode node = (GraphNode) it.next();
387:                    Long endTime = (Long) node.getBag().get(
388:                            HeftBag.ACTUAL_FINISH_TIME);
389:                    //sanity check
390:                    if (endTime == null) {
391:                        throw new RuntimeException(
392:                                "Looks like the leave node is unscheduled "
393:                                        + node.getID());
394:                    }
395:                    if (endTime > result) {
396:                        result = endTime;
397:                    }
398:                }
399:
400:                return result;
401:            }
402:
403:            /**
404:             * Estimates the start and finish time of a job on a site.
405:             *
406:             * @param node   the node that is being scheduled
407:             * @param site  the site for which the finish time is reqd.
408:             *
409:             * @return  long[0] the estimated start time.
410:             *          long[1] the estimated finish time.
411:             */
412:            protected long[] calculateEstimatedStartAndFinishTime(
413:                    GraphNode node, String site) {
414:
415:                SubInfo job = (SubInfo) node.getContent();
416:                long[] result = new long[2];
417:
418:                //calculate the ready time for the job
419:                //that is time by which all the data needed
420:                //by the job has reached the site.
421:                long readyTime = 0;
422:                for (Iterator it = node.getParents().iterator(); it.hasNext();) {
423:                    GraphNode parent = (GraphNode) it.next();
424:                    long current = 0;
425:                    //add the parent finish time to current
426:                    current += (Long) parent.getBag().get(
427:                            HeftBag.ACTUAL_FINISH_TIME);
428:
429:                    //if the parent was scheduled on another site
430:                    //add the average data transfer time.
431:                    if (!parent.getBag().get(HeftBag.SCHEDULED_SITE).equals(
432:                            site)) {
433:                        current += this .mAverageCommunicationCost;
434:                    }
435:
436:                    if (current > readyTime) {
437:                        //ready time is maximum of all currents
438:                        readyTime = current;
439:                    }
440:                }
441:
442:                //the estimated start time is the maximum
443:                //of the ready time and available time of the site
444:                //using non insertion based policy for time being
445:                result[0] = getAvailableTime(site, readyTime);
446:
447:                // do not need it, as available time is always >= ready time
448:                //        if ( result[ 0 ] < readyTime ){
449:                //            result[ 0 ] = readyTime;
450:                //       }
451:
452:                //the estimated finish time is est + compute time on site
453:                List entries = mTCMapper.getTCList(job.getTXNamespace(), job
454:                        .getTXName(), job.getTXVersion(), site);
455:                //pick the first one for time being
456:                TransformationCatalogEntry entry = (TransformationCatalogEntry) entries
457:                        .get(0);
458:                result[1] = result[0] + getExpectedRuntime(job, entry);
459:
460:                //est now stores the estimated finish time
461:                return result;
462:            }
463:
464:            /**
465:             * Computes the downward rank of a node.
466:             *
467:             * The downward rank of node i is
468:             *                                   _    ___
469:             *           max {       rank( n ) + w  + c    }
470:             *         j E pred( i )     d  j     j    ji
471:             *
472:             *
473:             *
474:             * @param node   the <code>GraphNode</code> whose rank needs to be computed.
475:             *
476:             * @return computed rank.
477:             */
478:            protected float computeDownwardRank(GraphNode node) {
479:                float result = 0;
480:                float value = 0;
481:
482:                for (Iterator it = node.getParents().iterator(); it.hasNext();) {
483:                    GraphNode p = (GraphNode) it.next();
484:                    Bag pbag = p.getBag();
485:
486:                    value += (getFloatValue(pbag.get(HeftBag.DOWNWARD_RANK))
487:                            + getFloatValue(pbag.get(HeftBag.AVG_COMPUTE_TIME)) + mAverageCommunicationCost);
488:
489:                    if (value > result) {
490:                        result = value;
491:                    }
492:                }
493:
494:                return result;
495:            }
496:
497:            /**
498:             * Returns the average compute time in seconds for a job.
499:             *
500:             * @param job the job whose average compute time is to be computed.
501:             *
502:             * @return the weighted compute time in seconds.
503:             */
504:            protected float calculateAverageComputeTime(SubInfo job) {
505:                //get all the TC entries for the sites where a job can run
506:                List runnableSites = mTCMapper.getSiteList(
507:                        job.getTXNamespace(), job.getTXName(), job
508:                                .getTXVersion(), mSites);
509:
510:                //sanity check
511:                if (runnableSites == null || runnableSites.isEmpty()) {
512:                    throw new RuntimeException("No runnable site for job "
513:                            + job.getName());
514:                }
515:
516:                //for each runnable site get the expected runtime
517:                String site;
518:                int total_nodes = 0;
519:                int total = 0;
520:                for (Iterator it = runnableSites.iterator(); it.hasNext();) {
521:                    site = (String) it.next();
522:                    int nodes = getFreeNodesForSite(site);
523:                    List entries = mTCMapper.getTCList(job.getTXNamespace(),
524:                            job.getTXName(), job.getTXVersion(), site);
525:
526:                    //pick the first one for time being
527:                    TransformationCatalogEntry entry = (TransformationCatalogEntry) entries
528:                            .get(0);
529:                    int jobRuntime = getExpectedRuntime(job, entry);
530:                    total_nodes += nodes;
531:                    total += jobRuntime * nodes;
532:
533:                }
534:
535:                return total / total_nodes;
536:            }
537:
538:            /**
539:             * Return expected runtime.
540:             *
541:             * @param job    the job in the workflow.
542:             * @param entry  the <code>TransformationCatalogEntry</code> object.
543:             *
544:             * @return the runtime in seconds.
545:             */
546:            protected int getExpectedRuntime(SubInfo job,
547:                    TransformationCatalogEntry entry) {
548:                int result = -1;
549:
550:                //try and fetch the expected runtime from the Windward AC
551:                result = getExpectedRuntimeFromAC(job, entry);
552:                if (result > 1) {
553:                    return result;
554:                }
555:
556:                //else try and get the runtime from the profiles
557:                List profiles = entry.getProfiles(Profile.VDS);
558:                mLogger
559:                        .log(
560:                                "Fetching runtime information from profiles for job "
561:                                        + job.getName(),
562:                                LogManager.DEBUG_MESSAGE_LEVEL);
563:                if (profiles != null) {
564:                    for (Iterator it = profiles.iterator(); it.hasNext();) {
565:                        Profile p = (Profile) it.next();
566:                        if (p.getProfileKey().equals(this .RUNTIME_PROFILE_KEY)) {
567:                            result = Integer.parseInt(p.getProfileValue());
568:                            break;
569:                        }
570:                    }
571:                }
572:                //sanity check for time being
573:                if (result < 1) {
574:                    throw new RuntimeException(
575:                            "Invalid or no runtime specified");
576:                }
577:
578:                return result;
579:            }
580:
581:            /**
582:             * Return expected runtime from the AC only if the process catalog is
583:             * initialized.
584:             *
585:             * @param job    the job in the workflow.
586:             * @param entry  the TC entry
587:             *
588:             * @return the runtime in seconds.
589:             */
590:            protected int getExpectedRuntimeFromAC(SubInfo job,
591:                    TransformationCatalogEntry entry) {
592:                int result = -1;
593:                if (mProcessCatalog == null) {
594:                    return result;
595:                }
596:                //fetch the job information first
597:                List tcs = mProcessCatalog.getPredictedPerformance(mWGPC
598:                        .getJobInformation(mRequestID, mLabel, job
599:                                .getLogicalID()), entry.getResourceId(), entry
600:                        .getSysInfo().getArch().toString());
601:
602:                mLogger.log("Predicted performance for job " + job.getID()
603:                        + " is " + tcs, LogManager.DEBUG_MESSAGE_LEVEL);
604:                return tcs == null || tcs.isEmpty() ? result
605:                        : (Integer) (((TransformationCharacteristics) tcs
606:                                .get(0))
607:                                .getCharacteristic(TransformationCharacteristics.EXPECTED_RUNTIME));
608:            }
609:
610:            /**
611:             * Populates the number of free nodes for each site, by querying the
612:             * Site Catalog.
613:             *
614:             * @param sites   list of sites.
615:             */
616:            protected void populateSiteMap(List sites) {
617:                mSiteMap = new HashMap();
618:
619:                //for testing purposes
620:                mSites = sites;
621:
622:                String value = null;
623:                int nodes = 0;
624:                for (Iterator it = mSites.iterator(); it.hasNext();) {
625:                    String site = (String) it.next();
626:                    SiteInfo s = mSiteHandle.getPoolEntry(site, "vanilla");
627:                    JobManager manager = s.selectJobManager("vanilla", true);
628:                    value = (String) manager.getInfo(JobManager.IDLE_NODES);
629:
630:                    try {
631:                        nodes = (value == null) ? this .DEFAULT_NUMBER_OF_FREE_NODES
632:                                : new Integer(value).intValue();
633:
634:                    } catch (Exception e) {
635:                        nodes = this .DEFAULT_NUMBER_OF_FREE_NODES;
636:                    }
637:
638:                    mSiteMap.put(site, new Site(site, nodes));
639:                }
640:
641:            }
642:
643:            /**
644:             * Returns the freenodes for a site.
645:             *
646:             * @param site   the site identifier.
647:             *
648:             * @return number of nodes
649:             */
650:            protected int getFreeNodesForSite(String site) {
651:                if (mSiteMap.containsKey(site)) {
652:                    return ((Site) mSiteMap.get(site)).getAvailableProcessors();
653:                } else {
654:                    throw new RuntimeException(
655:                            "The number of free nodes not available for site "
656:                                    + site);
657:                }
658:            }
659:
660:            /**
661:             * Schedules a job to a site.
662:             *
663:             * @param site  the site at which to schedule
664:             * @param start the start time for job
665:             * @param end   the end time of job
666:             */
667:            protected void scheduleJob(String site, long start, long end) {
668:                Site s = (Site) mSiteMap.get(site);
669:                s.scheduleJob(start, end);
670:
671:            }
672:
673:            /**
674:             * Returns the available time for a site.
675:             *
676:             * @param site       the site at which you want to schedule the job.
677:             * @param readyTime  the time at which all the data reqd by the job will arrive at site.
678:             *
679:             * @return the available time of the site.
680:             */
681:            protected long getAvailableTime(String site, long readyTime) {
682:                if (mSiteMap.containsKey(site)) {
683:                    return ((Site) mSiteMap.get(site))
684:                            .getAvailableTime(readyTime);
685:                } else {
686:                    throw new RuntimeException(
687:                            "Site information unavailable for site " + site);
688:                }
689:
690:            }
691:
692:            /**
693:             * This method returns a String describing the site selection technique
694:             * that is being implemented by the implementing class.
695:             *
696:             * @return String
697:             */
698:            public String description() {
699:                return "Heft based Site Selector";
700:            }
701:
702:            /**
703:             * The call out to the site selector to determine on what pool the job
704:             * should be scheduled.
705:             *
706:             * @param job SubInfo the <code>SubInfo</code> object corresponding to
707:             *   the job whose execution pool we want to determine.
708:             * @param pools the list of <code>String</code> objects representing the
709:             *   execution pools that can be used.
710:             * @return if the pool is found to which the job can be mapped, a string
711:             *   of the form <code>executionpool:jobmanager</code> where the
712:             *   jobmanager can be null. If the pool is not found, then set
713:             *   poolhandle to NONE. null - if some error occured .
714:             */
715:            public String mapJob2ExecPool(SubInfo job, List pools) {
716:                return "";
717:            }
718:
719:            /**
720:             * A convenience method to get the intValue for the object passed.
721:             *
722:             * @param key   the key to be converted
723:             *
724:             * @return the floatt value if object an integer, else -1
725:             */
726:            private float getFloatValue(Object key) {
727:
728:                float k = -1;
729:                //try{
730:                k = ((Float) key).floatValue();
731:                //}
732:                //catch( Exception e ){}
733:
734:                return k;
735:
736:            }
737:        }
738:
739:        /**
740:         * Comparator for GraphNode objects that allow us to sort on basis of
741:         * the downward rank computed.
742:         */
743:        class HeftGraphNodeComparator implements  Comparator {
744:
745:            /**
746:             * Implementation of the {@link java.lang.Comparable} interface.
747:             * Compares this object with the specified object for order. Returns a
748:             * negative integer, zero, or a positive integer as this object is
749:             * less than, equal to, or greater than the specified object. The
750:             * definitions are compared by their type, and by their short ids.
751:             *
752:             * @param o1 is the object to be compared
753:             * @param o2 is the object to be compared with o1.
754:             *
755:             * @return a negative number, zero, or a positive number, if the
756:             * object compared against is less than, equals or greater than
757:             * this object.
758:             * @exception ClassCastException if the specified object's type
759:             * prevents it from being compared to this Object.
760:             */
761:            public int compare(Object o1, Object o2) {
762:                if (o1 instanceof  GraphNode && o2 instanceof  GraphNode) {
763:                    GraphNode g1 = (GraphNode) o1;
764:                    GraphNode g2 = (GraphNode) o2;
765:
766:                    float drank1 = ((Float) g1.getBag().get(
767:                            HeftBag.DOWNWARD_RANK));//.floatValue();
768:                    float drank2 = ((Float) g2.getBag().get(
769:                            HeftBag.DOWNWARD_RANK));//.floatValue();
770:
771:                    return (int) (drank1 - drank2);
772:                } else {
773:                    throw new ClassCastException("object is not a GraphNode");
774:                }
775:            }
776:
777:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.