Source Code Cross Referenced for InterPoolEngine.java in  » Workflow-Engines » pegasus-2.1.0 » org » griphyn » cPlanner » engine » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Workflow Engines » pegasus 2.1.0 » org.griphyn.cPlanner.engine 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /**
002:         * This file or a portion of this file is licensed under the terms of
003:         * the Globus Toolkit Public License, found at $PEGASUS_HOME/GTPL or
004:         * http://www.globus.org/toolkit/download/license.html.
005:         * This notice must appear in redistributions of this file
006:         * with or without modification.
007:         *
008:         * Redistributions of this Software, with or without modification, must reproduce
009:         * the GTPL in:
010:         * (1) the Software, or
011:         * (2) the Documentation or
012:         * some other similar material which is provided with the Software (if any).
013:         *
014:         * Copyright 1999-2004
015:         * University of Chicago and The University of Southern California.
016:         * All rights reserved.
017:         */package org.griphyn.cPlanner.engine;
018:
019:        import org.griphyn.cPlanner.classes.ADag;
020:        import org.griphyn.cPlanner.classes.FileTransfer;
021:        import org.griphyn.cPlanner.classes.PegasusFile;
022:        import org.griphyn.cPlanner.classes.JobManager;
023:        import org.griphyn.cPlanner.classes.PlannerOptions;
024:        import org.griphyn.cPlanner.classes.SiteInfo;
025:        import org.griphyn.cPlanner.classes.SubInfo;
026:        import org.griphyn.cPlanner.classes.PegasusBag;
027:
028:        import org.griphyn.cPlanner.common.LogManager;
029:        import org.griphyn.cPlanner.common.PegasusProperties;
030:
031:        import org.griphyn.cPlanner.selector.SiteSelector;
032:
033:        import org.griphyn.cPlanner.selector.site.SiteSelectorFactory;
034:
035:        import org.griphyn.cPlanner.selector.TransformationSelector;
036:
037:        import org.griphyn.cPlanner.namespace.Hints;
038:
039:        import org.griphyn.cPlanner.provenance.pasoa.XMLProducer;
040:        import org.griphyn.cPlanner.provenance.pasoa.producer.XMLProducerFactory;
041:
042:        import org.griphyn.cPlanner.provenance.pasoa.PPS;
043:        import org.griphyn.cPlanner.provenance.pasoa.pps.PPSFactory;
044:
045:        import org.griphyn.common.catalog.TransformationCatalogEntry;
046:
047:        import org.griphyn.common.catalog.transformation.TCMode;
048:
049:        import org.griphyn.common.classes.TCType;
050:
051:        import org.griphyn.common.catalog.transformation.Mapper;
052:
053:        import org.griphyn.common.util.Separator;
054:
055:        import org.griphyn.cPlanner.transfer.SLS;
056:
057:        import org.griphyn.cPlanner.transfer.sls.SLSFactory;
058:        import org.griphyn.cPlanner.transfer.sls.SLSFactoryException;
059:
060:        import java.io.File;
061:
062:        import java.util.Iterator;
063:        import java.util.List;
064:        import java.util.ArrayList;
065:        import java.util.Set;
066:        import java.util.Vector;
067:
068:        /**
069:         * This engine calls out to the Site Selector selected by the user and maps the
070:         * jobs in the workflow to the execution pools.
071:         *
072:         * @author Karan Vahi
073:         * @author Gaurang Mehta
074:         * @version $Revision: 423 $
075:         *
076:         */
077:        public class InterPoolEngine extends Engine implements  Refiner {
078:
079:            /**
080:             * ADag object corresponding to the Dag whose jobs we want to schedule.
081:             *
082:             */
083:            private ADag mDag;
084:
085:            /**
086:             * Set of the execution pools which the user has specified.
087:             */
088:            private Set mExecPools;
089:
090:            /**
091:             * Handle to the site selector.
092:             */
093:            private SiteSelector mSiteSelector;
094:
095:            /**
096:             * The handle to the transformation selector, that ends up selecting
097:             * what transformations to pick up.
098:             */
099:            private TransformationSelector mTXSelector;
100:
101:            /**
102:             * The bag of objects that is required for the Site Selector initialization.
103:             */
104:            private PegasusBag mBag;
105:
106:            /**
107:             * The handle to the transformation catalog mapper object that caches the
108:             * queries to the transformation catalog, and indexes them according to
109:             * lfn's. There is no purge policy in the TC Mapper, so per se it is not a
110:             * classic cache.
111:             */
112:            private Mapper mTCMapper;
113:
114:            /**
115:             * The XML Producer object that records the actions.
116:             */
117:            private XMLProducer mXMLStore;
118:
119:            /**
120:             * The handle to the SLS implementor
121:             */
122:            private SLS mSLS;
123:
124:            /**
125:             * A boolean indicating whether to have worker node execution or not.
126:             */
127:            private boolean mWorkerNodeExecution;
128:
129:            /**
130:             * Default constructor.
131:             *
132:             *
133:             * @param props   the properties to be used.
134:             */
135:            public InterPoolEngine(PegasusProperties props) {
136:                super (props);
137:                mDag = new ADag();
138:                mExecPools = new java.util.HashSet();
139:
140:                mTCHandle = TCMode.loadInstance();
141:                //initialize the transformation mapper
142:                mTCMapper = Mapper.loadTCMapper(mProps.getTCMapperMode());
143:
144:                //intialize the bag of objects and load the site selector
145:                mBag = new PegasusBag();
146:                mBag.add(PegasusBag.PEGASUS_PROPERTIES, props);
147:                mBag.add(PegasusBag.TRANSFORMATION_CATALOG, mTCHandle);
148:                mBag.add(PegasusBag.TRANSFORMATION_MAPPER, mTCMapper);
149:                mBag.add(PegasusBag.PEGASUS_LOGMANAGER, mLogger);
150:
151:                mTXSelector = null;
152:                mXMLStore = XMLProducerFactory.loadXMLProducer(props);
153:
154:                mWorkerNodeExecution = props.executeOnWorkerNode();
155:                if (mWorkerNodeExecution) {
156:                    //load SLS
157:                    mSLS = SLSFactory.loadInstance(mBag);
158:                }
159:
160:            }
161:
162:            /**
163:             * Overloaded constructor.
164:             *
165:             * @param aDag      the <code>ADag</code> object corresponding to the Dag
166:             *                  for which we want to determine on which pools to run
167:             *                  the nodes of the Dag.
168:             * @param props   the properties to be used.
169:             * @param options   The options specified by the user to run the planner.
170:             *
171:             */
172:            public InterPoolEngine(ADag aDag, PegasusProperties props,
173:                    PlannerOptions options) {
174:                super (props);
175:                mDag = aDag;
176:                mPOptions = options;
177:                mExecPools = (Set) options.getExecutionSites();
178:                mTCHandle = TCMode.loadInstance();
179:                //initialize the transformation mapper
180:                mTCMapper = Mapper.loadTCMapper(mProps.getTCMapperMode());
181:
182:                //intialize the bag of objects and load the site selector
183:                mBag = new PegasusBag();
184:                mBag.add(PegasusBag.PEGASUS_PROPERTIES, props);
185:                mBag.add(PegasusBag.PLANNER_OPTIONS, options);
186:                mBag.add(PegasusBag.TRANSFORMATION_CATALOG, mTCHandle);
187:                mBag.add(PegasusBag.TRANSFORMATION_MAPPER, mTCMapper);
188:                mBag.add(PegasusBag.PEGASUS_LOGMANAGER, mLogger);
189:                mBag.add(PegasusBag.SITE_CATALOG, mPoolHandle);
190:
191:                mTXSelector = null;
192:                mXMLStore = XMLProducerFactory.loadXMLProducer(props);
193:
194:                mWorkerNodeExecution = props.executeOnWorkerNode();
195:                if (mWorkerNodeExecution) {
196:                    //load SLS
197:                    mSLS = SLSFactory.loadInstance(mBag);
198:                }
199:
200:            }
201:
202:            /**
203:             * Returns the bag of intialization objects.
204:             *
205:             * @return PegasusBag
206:             */
207:            public PegasusBag getPegasusBag() {
208:                return mBag;
209:            }
210:
211:            /**
212:             * Returns a reference to the workflow that is being refined by the refiner.
213:             *
214:             *
215:             * @return ADAG object.
216:             */
217:            public ADag getWorkflow() {
218:                return this .mDag;
219:            }
220:
221:            /**
222:             * Returns a reference to the XMLProducer, that generates the XML fragment
223:             * capturing the actions of the refiner. This is used for provenace
224:             * purposes.
225:             *
226:             * @return XMLProducer
227:             */
228:            public XMLProducer getXMLProducer() {
229:                return this .mXMLStore;
230:            }
231:
232:            /**
233:             * This is where the callout to the Partitioner should take place, that
234:             * partitions the workflow into clusters and sends to the site selector only
235:             * those list of jobs that are ready to be scheduled.
236:             *
237:             */
238:            public void determineSites() {
239:                SubInfo job;
240:
241:                //at present we schedule the whole workflow at once
242:                List jobs = convertToList(mDag.vJobSubInfos);
243:                List pools = convertToList(mExecPools);
244:
245:                //going through all the jobs making up the Adag, to do the physical mapping
246:                scheduleJobs(mDag, pools);
247:            }
248:
249:            /**
250:             * It schedules a list of jobs on the execution pools by calling out to the
251:             * site selector specified. It is upto to the site selector to determine if
252:             * the job can be run on the list of sites passed.
253:             *
254:             * @param dag   the abstract workflow.
255:             * @param sites the list of execution sites, specified by the user.
256:             *
257:             */
258:            public void scheduleJobs(ADag dag, List sites) {
259:
260:                mSiteSelector = SiteSelectorFactory.loadInstance(mBag);
261:                mSiteSelector.mapWorkflow(dag, sites);
262:
263:                int i = 0;
264:                StringBuffer error;
265:
266:                //load the PPS implementation
267:                PPS pps = PPSFactory.loadPPS(this .mProps);
268:
269:                mXMLStore.add("<workflow url=\"" + mPOptions.getDAX() + "\">");
270:
271:                //call the begin workflow method
272:                try {
273:                    pps.beginWorkflowRefinementStep(this ,
274:                            PPS.REFINEMENT_SITE_SELECT, false);
275:                } catch (Exception e) {
276:                    throw new RuntimeException("PASOA Exception", e);
277:                }
278:
279:                //clear the XML store
280:                mXMLStore.clear();
281:
282:                //Iterate through the jobs and hand them to
283:                //the site selector if required
284:                String site;
285:                for (Iterator it = dag.jobIterator(); it.hasNext(); i++) {
286:
287:                    SubInfo job = (SubInfo) it.next();
288:                    site = job.getSiteHandle();
289:                    //check if the user has specified any hints in the dax
290:
291:                    //          replaced with jobmanager-type
292:                    //            incorporateHint(job, "pfnUniverse");
293:                    incorporateHint(job, Hints.JOBMANAGER_UNIVERSE);
294:                    if (incorporateHint(job, "executionPool")) {
295:                        //i++;
296:                        incorporateProfiles(job);
297:                        continue;
298:                    }
299:
300:                    if (site == null) {
301:                        error = new StringBuffer();
302:                        error
303:                                .append("Site Selector could not map the job ")
304:                                .append(job.getCompleteTCName())
305:                                .append(
306:                                        "\nMost likely an error occured in site selector.");
307:                        mLogger.log(error.toString(),
308:                                LogManager.ERROR_MESSAGE_LEVEL);
309:                        throw new RuntimeException(error.toString());
310:                    }
311:                    String jm = job.getJobManager();
312:                    jm = ((jm == null) || jm.length() == 0) ? null : jm;
313:
314:                    if (site.length() == 0
315:                            || site
316:                                    .equalsIgnoreCase(SiteSelector.SITE_NOT_FOUND)) {
317:                        error = new StringBuffer();
318:                        error.append("Site Selector (").append(
319:                                mSiteSelector.description()).append(
320:                                ") could not map job ").append(
321:                                job.getCompleteTCName()).append(" to any site");
322:                        mLogger.log(error.toString(),
323:                                LogManager.ERROR_MESSAGE_LEVEL);
324:                        throw new RuntimeException(error.toString());
325:                    }
326:                    job.setJobManager(jm == null ? getJobManager(site, job
327:                            .getUniverse()) : jm);
328:
329:                    mLogger.log("Mapped job " + job.jobName + " to pool "
330:                            + site, LogManager.DEBUG_MESSAGE_LEVEL);
331:                    //incorporate the profiles and
332:                    //do transformation selection
333:                    if (!incorporateProfiles(job)) {
334:                        error = new StringBuffer();
335:                        error.append("Profiles incorrectly incorporated for ")
336:                                .append(job.getCompleteTCName());
337:
338:                        mLogger.log(error.toString(),
339:                                LogManager.ERROR_MESSAGE_LEVEL);
340:                        throw new RuntimeException(error.toString());
341:
342:                    }
343:
344:                    //modify the jobs if required for worker node execution
345:                    if (mWorkerNodeExecution) {
346:                        mSLS.modifyJobForFirstLevelStaging(job, mPOptions
347:                                .getSubmitDirectory(),
348:                                mSLS.getSLSInputLFN(job), mSLS
349:                                        .getSLSOutputLFN(job));
350:                    }
351:
352:                    //log actions as XML fragment
353:                    try {
354:                        logRefinerAction(job);
355:                        pps.siteSelectionFor(job.getName(), job.getName());
356:                    } catch (Exception e) {
357:                        throw new RuntimeException("PASOA Exception", e);
358:                    }
359:
360:                }//end of mapping all jobs
361:
362:                try {
363:                    pps.endWorkflowRefinementStep(this );
364:                } catch (Exception e) {
365:                    throw new RuntimeException("PASOA Exception", e);
366:                }
367:
368:            }
369:
370:            /**
371:             * Incorporates the profiles from the various sources into the job.
372:             * The profiles are incorporated in the order pool, transformation catalog,
373:             * and properties file, with the profiles from the properties file having
374:             * the highest priority.
375:             * It is here where the transformation selector is called to select
376:             * amongst the various transformations returned by the TC Mapper.
377:             *
378:             * @param job  the job into which the profiles have been incorporated.
379:             *
380:             * @return true profiles were successfully incorporated.
381:             *         false otherwise
382:             */
383:            private boolean incorporateProfiles(SubInfo job) {
384:                TransformationCatalogEntry tcEntry = null;
385:                List tcEntries = null;
386:                String siteHandle = job.getSiteHandle();
387:
388:                //the profile information from the pool catalog needs to be
389:                //assimilated into the job.
390:                job.updateProfiles(mPoolHandle.getPoolProfile(siteHandle));
391:
392:                //query the TCMapper and get hold of all the valid TC
393:                //entries for that site
394:                tcEntries = mTCMapper.getTCList(job.namespace, job.logicalName,
395:                        job.version, siteHandle);
396:
397:                StringBuffer error;
398:                FileTransfer fTx = null;
399:                if (tcEntries != null && tcEntries.size() > 0) {
400:                    //select a tc entry calling out to
401:                    //the transformation selector
402:                    tcEntry = selectTCEntry(tcEntries, job, mProps
403:                            .getTXSelectorMode());
404:                    if (tcEntry == null) {
405:                        error = new StringBuffer();
406:                        error.append(
407:                                "Transformation selection operation for job  ")
408:                                .append(job.getCompleteTCName()).append(
409:                                        " for site ").append(
410:                                        job.getSiteHandle()).append(
411:                                        " unsuccessful.");
412:                        mLogger.log(error.toString(),
413:                                LogManager.ERROR_MESSAGE_LEVEL);
414:                        throw new RuntimeException(error.toString());
415:                    }
416:
417:                    //something seriously wrong in this code line below.
418:                    //Need to verify further after more runs. (Gaurang 2-7-2006).
419:                    //            tcEntry = (TransformationCatalogEntry) tcEntries.get(0);
420:                    if (tcEntry.getType().equals(TCType.STATIC_BINARY)) {
421:                        SiteInfo site = mPoolHandle.getPoolEntry(siteHandle,
422:                                "vanilla");
423:                        //construct a file transfer object and add it
424:                        //as an input file to the job in the dag
425:                        fTx = new FileTransfer(job
426:                                .getStagedExecutableBaseName(), job.jobName);
427:                        fTx.setType(FileTransfer.EXECUTABLE_FILE);
428:                        //the physical transformation points to
429:                        //guc or the user specified transfer mechanism
430:                        //accessible url
431:                        fTx.addSource(tcEntry.getResourceId(), tcEntry
432:                                .getPhysicalTransformation());
433:                        //the destination url is the working directory for
434:                        //pool where it needs to be staged to
435:                        //always creating a third party transfer URL
436:                        //for the destination.
437:                        String stagedPath = mPoolHandle.getExecPoolWorkDir(job)
438:                                + File.separator
439:                                + job.getStagedExecutableBaseName();
440:                        fTx.addDestination(siteHandle, site.getURLPrefix(false)
441:                                + stagedPath);
442:
443:                        //added in the end now after dependant executables
444:                        //have been handled Karan May 31 2007
445:                        //job.addInputFile(fTx);
446:
447:                        if (mWorkerNodeExecution) {
448:                            //do not specify the full path as we do not know worker
449:                            //node directory
450:
451:                            if (mSLS.doesCondorModifications()) {
452:                                //we need to take the basename of the source url
453:                                //as condor file transfer mech does not allow to
454:                                //specify destination filenames
455:                                job
456:                                        .setRemoteExecutable(new File(tcEntry
457:                                                .getPhysicalTransformation())
458:                                                .getName());
459:                            } else {
460:                                //do this only when kickstart executable existance check is fixed
461:                                //Karan Nov 30 2007
462:                                //job.setRemoteExecutable(job.getStagedExecutableBaseName());
463:                                job.setRemoteExecutable(stagedPath);
464:                            }
465:                        } else {
466:                            //the jobs executable is the path to where
467:                            //the executable is going to be staged
468:                            job.executable = stagedPath;
469:                        }
470:                        //setting the job type of the job to
471:                        //denote the executable is being staged
472:                        job.setJobType(SubInfo.STAGED_COMPUTE_JOB);
473:                    } else {
474:                        //the executable needs to point to the physical
475:                        //path gotten from the selected transformantion
476:                        //entry
477:                        job.executable = tcEntry.getPhysicalTransformation();
478:                    }
479:                } else {
480:                    //mismatch. should be unreachable code!!!
481:                    //as error should have been thrown in the site selector
482:                    mLogger.log("Site selector mapped job "
483:                            + job.getCompleteTCName() + " to pool "
484:                            + job.executionPool
485:                            + " for which no mapping exists in "
486:                            + "transformation mapper.",
487:                            LogManager.FATAL_MESSAGE_LEVEL);
488:                    return false;
489:                }
490:
491:                //the profile information from the transformation
492:                //catalog needs to be assimilated into the job
493:                //overriding the one from pool catalog.
494:                job.updateProfiles(tcEntry);
495:
496:                //the profile information from the properties file
497:                //is assimilated overidding the one from transformation
498:                //catalog.
499:                job.updateProfiles(mProps);
500:
501:                //handle dependant executables
502:                handleDependantExecutables(job);
503:                if (fTx != null) {
504:                    //add the main executable back as input
505:                    job.addInputFile(fTx);
506:                }
507:
508:                return true;
509:            }
510:
511:            /**
512:             * Handles the dependant executables that need to be staged.
513:             *
514:             * @param job SubInfo
515:             *
516:             */
517:            private void handleDependantExecutables(SubInfo job) {
518:                String siteHandle = job.getSiteHandle();
519:                boolean installedTX = !(job.getJobType() == SubInfo.STAGED_COMPUTE_JOB);
520:
521:                List dependantExecutables = new ArrayList();
522:                for (Iterator it = job.getInputFiles().iterator(); it.hasNext();) {
523:                    PegasusFile input = (PegasusFile) it.next();
524:
525:                    if (input.getType() == PegasusFile.EXECUTABLE_FILE) {
526:
527:                        //if the main executable is installed, just remove the executable
528:                        //file requirement from the input files
529:                        if (installedTX) {
530:                            it.remove();
531:                            continue;
532:                        }
533:
534:                        //query the TCMapper and get hold of all the valid TC
535:                        //entries for that site
536:                        String lfn[] = Separator.split(input.getLFN());
537:                        List tcEntries = mTCMapper.getTCList(lfn[0], lfn[1],
538:                                lfn[2], siteHandle);
539:
540:                        StringBuffer error;
541:                        if (tcEntries != null && tcEntries.size() > 0) {
542:                            //select a tc entry calling out to
543:                            //the transformation selector , we only should stage
544:                            //never pick any installed one.
545:                            TransformationCatalogEntry tcEntry = selectTCEntry(
546:                                    tcEntries, job, "Staged");
547:                            if (tcEntry == null) {
548:                                error = new StringBuffer();
549:                                error
550:                                        .append(
551:                                                "Transformation selection operation for job  ")
552:                                        .append(job.getCompleteTCName())
553:                                        .append(" for site ").append(
554:                                                job.getSiteHandle()).append(
555:                                                " unsuccessful.");
556:                                mLogger.log(error.toString(),
557:                                        LogManager.ERROR_MESSAGE_LEVEL);
558:                                throw new RuntimeException(error.toString());
559:                            }
560:
561:                            //            tcEntry = (TransformationCatalogEntry) tcEntries.get(0);
562:                            if (tcEntry.getType().equals(TCType.STATIC_BINARY)) {
563:                                SiteInfo site = mPoolHandle.getPoolEntry(
564:                                        siteHandle, "vanilla");
565:                                //construct a file transfer object and add it
566:                                //as an input file to the job in the dag
567:
568:                                //a disconnect between the basename and the input lfn.
569:                                String basename = SubInfo
570:                                        .getStagedExecutableBaseName(lfn[0],
571:                                                lfn[1], lfn[2]);
572:
573:                                FileTransfer fTx = new FileTransfer(basename,
574:                                        job.jobName);
575:                                fTx.setType(FileTransfer.EXECUTABLE_FILE);
576:                                //the physical transformation points to
577:                                //guc or the user specified transfer mechanism
578:                                //accessible url
579:                                fTx.addSource(tcEntry.getResourceId(), tcEntry
580:                                        .getPhysicalTransformation());
581:                                //the destination url is the working directory for
582:                                //pool where it needs to be staged to
583:                                //always creating a third party transfer URL
584:                                //for the destination.
585:                                String stagedPath = mPoolHandle
586:                                        .getExecPoolWorkDir(job)
587:                                        + File.separator + basename;
588:                                fTx.addDestination(siteHandle, site
589:                                        .getURLPrefix(false)
590:                                        + stagedPath);
591:
592:                                dependantExecutables.add(fTx);
593:
594:                                //the jobs executable is the path to where
595:                                //the executable is going to be staged
596:                                //job.executable = stagedPath;
597:                                mLogger.log("Dependant Executable "
598:                                        + input.getLFN()
599:                                        + " being staged from "
600:                                        + fTx.getSourceURL(),
601:                                        LogManager.DEBUG_MESSAGE_LEVEL);
602:
603:                            }
604:
605:                        }
606:                        it.remove();
607:                    } //end of if file is exectuable
608:                }
609:
610:                //add all the dependant executable FileTransfers back as input files
611:                for (Iterator it = dependantExecutables.iterator(); it
612:                        .hasNext();) {
613:                    FileTransfer file = (FileTransfer) it.next();
614:                    job.addInputFile(file);
615:                }
616:            }
617:
618:            /**
619:             * Recursively ends up calling the transformation selector accordxing to
620:             * a chain of selections that need to be performed on the list of valid
621:             * transformation catalog
622:             *
623:             * @param entries    list of <code>TransformationCatalogEntry</code> objects.
624:             * @param job        the job.
625:             * @param selectors  comma separated list of selectors.
626:             *
627:             * @return the selected <code>TransformationCatalogEntry</code> object
628:             *         null when transformation selector is unable to select any
629:             *         transformation
630:             */
631:            private TransformationCatalogEntry selectTCEntry(List entries,
632:                    SubInfo job, String selectors) {
633:                //at present there is only one selector
634:                //operation performed on the selector
635:                String selector = selectors;
636:
637:                //load the transformation selector. different
638:                //selectors may end up being loaded for different jobs.
639:                mTXSelector = TransformationSelector.loadTXSelector(selector);
640:                entries = mTXSelector.getTCEntry(entries);
641:                return (entries == null || entries.size() == 0) ? null
642:                        : entries.size() > 1 ?
643:                        //call the selector again
644:                        selectTCEntry(entries, job, selectors)
645:                                : (TransformationCatalogEntry) entries.get(0);
646:            }
647:
648:            /**
649:             * It returns a jobmanager for the given pool.
650:             *
651:             * @param site      the name of the pool.
652:             * @param universe  the universe for which you need the scheduler on that
653:             *                  particular pool.
654:             *
655:             * @return the jobmanager for that pool and universe.
656:             *         null if not found.
657:             */
658:            private String getJobManager(String site, String universe) {
659:                SiteInfo p = mPoolHandle.getPoolEntry(site, universe);
660:                JobManager jm = (p == null) ? null : p.selectJobManager(
661:                        universe, true);
662:                String result = (jm == null) ? null : jm
663:                        .getInfo(JobManager.URL);
664:
665:                if (result == null) {
666:                    StringBuffer error = new StringBuffer();
667:                    error = new StringBuffer();
668:                    error.append("Could not find a jobmanager at site (")
669:                            .append(site).append(") for universe ").append(
670:                                    universe);
671:                    mLogger.log(error.toString(),
672:                            LogManager.ERROR_MESSAGE_LEVEL);
673:                    throw new RuntimeException(error.toString());
674:
675:                }
676:
677:                return result;
678:            }
679:
680:            /**
681:             * It incorporates a hint in the namespace to the job. After the hint
682:             * is incorporated the key is deleted from the hint namespace for that
683:             * job.
684:             *
685:             * @param job  the job that needs the hint to be incorporated.
686:             * @param key  the key in the hint namespace.
687:             *
688:             * @return true  the hint was successfully incorporated.
689:             *         false the hint was not set in job or was not successfully
690:             *               incorporated.
691:             */
692:            private boolean incorporateHint(SubInfo job, String key) {
693:                //sanity check
694:                if (key.length() == 0) {
695:                    return false;
696:                }
697:
698:                switch (key.charAt(0)) {
699:                case 'e':
700:                    if (key.equals("executionPool")
701:                            && job.hints.containsKey(key)) {
702:                        //user has overridden in the dax which execution Pool to use
703:                        job.executionPool = (String) job.hints
704:                                .removeKey("executionPool");
705:
706:                        incorporateHint(job, "globusScheduler");
707:                        return true;
708:                    }
709:                    break;
710:
711:                case 'g':
712:                    if (key.equals("globusScheduler")) {
713:                        job.globusScheduler = (job.hints
714:                                .containsKey("globusScheduler")) ?
715:                        //take the globus scheduler that the user
716:                        //specified in the DAX
717:                        (String) job.hints.removeKey("globusScheduler")
718:                                :
719:                                //select one from the pool handle
720:                                mPoolHandle.getPoolEntry(job.executionPool,
721:                                        job.condorUniverse).selectJobManager(
722:                                        job.condorUniverse, true).getInfo(
723:                                        JobManager.URL);
724:
725:                        return true;
726:                    }
727:                    break;
728:
729:                /*//not required any longer
730:                case 'p':
731:
732:                    if (key.equals("pfnUniverse")) {
733:                        job.condorUniverse = job.hints.containsKey("pfnUniverse") ?
734:                            (String) job.hints.removeKey("pfnUniverse") :
735:                            job.condorUniverse;
736:
737:                        return true;
738:
739:                    }
740:                    break;
741:                 */
742:                case 'j':
743:                    if (key.equals(Hints.JOBMANAGER_UNIVERSE)) {
744:                        job.condorUniverse = job.hints
745:                                .containsKey(Hints.JOBMANAGER_UNIVERSE) ? (String) job.hints
746:                                .removeKey(Hints.JOBMANAGER_UNIVERSE)
747:                                : job.condorUniverse;
748:
749:                        return true;
750:
751:                    }
752:                    break;
753:
754:                default:
755:                    break;
756:
757:                }
758:                return false;
759:            }
760:
761:            /**
762:             * Converts a Vector to a List. It only copies by reference.
763:             * @param v Vector
764:             * @return a ArrayList
765:             */
766:            public List convertToList(Vector v) {
767:                return new java.util.ArrayList(v);
768:            }
769:
770:            /**
771:             * Converts a Set to a List. It only copies by reference.
772:             * @param s Set
773:             * @return a ArrayList
774:             */
775:            public List convertToList(Set s) {
776:                return new java.util.ArrayList(s);
777:            }
778:
779:            /**
780:             * Logs the action taken by the refiner on a job as a XML fragment in
781:             * the XML Producer.
782:             *
783:             * @param job  the <code>SubInfo</code> containing the job that was mapped
784:             *             to a site.
785:             */
786:            protected void logRefinerAction(SubInfo job) {
787:                StringBuffer sb = new StringBuffer();
788:                sb.append("\t<siteselection job=\"").append(job.getName())
789:                        .append("\">");
790:                sb.append("\n").append("\t\t");
791:                sb.append("<logicalsite>").append(job.getSiteHandle()).append(
792:                        "</logicalsite>");
793:                sb.append("\n").append("\t\t");
794:                sb.append("<jobmanager>").append(job.getJobManager()).append(
795:                        "</jobmanager>");
796:                sb.append("\n");
797:                sb.append("\t</siteselection>");
798:                sb.append("\n");
799:                mXMLStore.add(sb.toString());
800:            }
801:
802:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.