Source Code Cross Referenced for Default.java in  » Workflow-Engines » pegasus-2.1.0 » org » griphyn » cPlanner » transfer » refiner » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Workflow Engines » pegasus 2.1.0 » org.griphyn.cPlanner.transfer.refiner 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         * This file or a portion of this file is licensed under the terms of
003:         * the Globus Toolkit Public License, found in file GTPL, or at
004:         * http://www.globus.org/toolkit/download/license.html. This notice must
005:         * appear in redistributions of this file, with or without modification.
006:         *
007:         * Redistributions of this Software, with or without modification, must
008:         * reproduce the GTPL in: (1) the Software, or (2) the Documentation or
009:         * some other similar material which is provided with the Software (if
010:         * any).
011:         *
012:         * Copyright 1999-2004 University of Chicago and The University of
013:         * Southern California. All rights reserved.
014:         */
015:        package org.griphyn.cPlanner.transfer.refiner;
016:
017:        import org.griphyn.cPlanner.classes.ADag;
018:        import org.griphyn.cPlanner.classes.SubInfo;
019:        import org.griphyn.cPlanner.classes.FileTransfer;
020:        import org.griphyn.cPlanner.classes.PlannerOptions;
021:        import org.griphyn.cPlanner.classes.NameValue;
022:
023:        import org.griphyn.cPlanner.common.PegasusProperties;
024:        import org.griphyn.cPlanner.common.LogManager;
025:
026:        import org.griphyn.cPlanner.engine.ReplicaCatalogBridge;
027:
028:        import org.griphyn.cPlanner.transfer.MultipleFTPerXFERJobRefiner;
029:
030:        import org.griphyn.cPlanner.provenance.pasoa.XMLProducer;
031:        import org.griphyn.cPlanner.provenance.pasoa.producer.XMLProducerFactory;
032:
033:        import org.griphyn.cPlanner.provenance.pasoa.PPS;
034:        import org.griphyn.cPlanner.provenance.pasoa.pps.PPSFactory;
035:
036:        import java.util.ArrayList;
037:        import java.util.Iterator;
038:        import java.util.Collection;
039:        import java.util.TreeMap;
040:        import java.util.Map;
041:        import java.util.List;
042:
043:        /**
044:         * The default transfer refiner, that implements the multiple refiner.
045:         * For each compute job if required it creates the following
046:         *          - a single stagein transfer job
047:         *          - a single stageout transfer job
048:         *          - a single interpool transfer job
049:         *
050:         * In addition this implementation prevents file clobbering while staging in data
051:         * to a remote site, that is shared amongst jobs.
052:         *
053:         * @author Karan Vahi
054:         * @version $Revision: 258 $
055:         */
056:
057:        public class Default extends MultipleFTPerXFERJobRefiner {
058:
059:            /**
060:             * A short description of the transfer refinement.
061:             */
062:            public static final String DESCRIPTION = "Default Multiple Refinement ";
063:
064:            /**
065:             * The string holding  the logging messages
066:             */
067:            protected String mLogMsg;
068:
069:            /**
070:             * A Map containing information about which logical file has been
071:             * transferred to which site and the name of the stagein transfer node
072:             * that is transferring the file from the location returned from
073:             * the replica catalog.
074:             * The key for the hashmap is logicalfilename:sitehandle and the value would be
075:             * the name of the transfer node.
076:             *
077:             */
078:            protected Map mFileTable;
079:
080:            /**
081:             * The handle to the provenance store implementation.
082:             */
083:            protected PPS mPPS;
084:
085:            /**
086:             * The overloaded constructor.
087:             *
088:             * @param dag        the workflow to which transfer nodes need to be added.
089:             * @param properties the <code>PegasusProperties</code> object containing all
090:             *                   the properties required by Pegasus.
091:             * @param options    the options passed to the planner.
092:             *
093:             */
094:            public Default(ADag dag, PegasusProperties properties,
095:                    PlannerOptions options) {
096:                super (dag, properties, options);
097:                mLogMsg = null;
098:                mFileTable = new TreeMap();
099:
100:                //load the PPS implementation
101:                mPPS = PPSFactory.loadPPS(this .mProps);
102:
103:                mXMLStore.add("<workflow url=\"" + mPOptions.getDAX() + "\">");
104:
105:                //call the begin workflow method
106:                try {
107:                    mPPS.beginWorkflowRefinementStep(this ,
108:                            PPS.REFINEMENT_STAGE, false);
109:                } catch (Exception e) {
110:                    throw new RuntimeException("PASOA Exception", e);
111:                }
112:
113:                //clear the XML store
114:                mXMLStore.clear();
115:
116:            }
117:
118:            /**
119:             * Adds the stage in transfer nodes which transfer the input files for a job,
120:             * from the location returned from the replica catalog to the job's execution
121:             * pool.
122:             *
123:             * @param job   <code>SubInfo</code> object corresponding to the node to
124:             *              which the files are to be transferred to.
125:             * @param files Collection of <code>FileTransfer</code> objects containing the
126:             *              information about source and destURL's.
127:             */
128:            public void addStageInXFERNodes(SubInfo job, Collection files) {
129:                String jobName = job.getName();
130:                String pool = job.getSiteHandle();
131:                int counter = 0;
132:                String newJobName = this .STAGE_IN_PREFIX + jobName + "_"
133:                        + counter;
134:                String key = null;
135:                String msg = "Adding stagein transfer nodes for job " + jobName;
136:                String par = null;
137:                Collection stagedFiles = new ArrayList(1);
138:
139:                //to prevent duplicate dependencies
140:                java.util.HashSet tempSet = new java.util.HashSet();
141:                int staged = 0;
142:                for (Iterator it = files.iterator(); it.hasNext();) {
143:                    FileTransfer ft = (FileTransfer) it.next();
144:                    String lfn = ft.getLFN();
145:
146:                    //get the key for this lfn and pool
147:                    //if the key already in the table
148:                    //then remove the entry from
149:                    //the Vector and add a dependency
150:                    //in the graph
151:                    key = this .constructFileKey(lfn, pool);
152:                    par = (String) mFileTable.get(key);
153:                    //System.out.println("lfn " + lfn + " par " + par);
154:                    if (par != null) {
155:                        it.remove();
156:
157:                        //check if tempSet does not contain the parent
158:                        //fix for sonal's bug
159:                        if (tempSet.contains(par)) {
160:                            mLogMsg = "IGNORING TO ADD rc pull relation from rc tx node: "
161:                                    + par
162:                                    + " -> "
163:                                    + jobName
164:                                    + " for transferring file "
165:                                    + lfn
166:                                    + " to pool " + pool;
167:
168:                            mLogger
169:                                    .log(mLogMsg,
170:                                            LogManager.DEBUG_MESSAGE_LEVEL);
171:
172:                        } else {
173:                            mLogMsg = /*"Adding relation " + par + " -> " + jobName +*/
174:                            " For transferring file " + lfn;
175:                            mLogger
176:                                    .log(mLogMsg,
177:                                            LogManager.DEBUG_MESSAGE_LEVEL);
178:                            addRelation(par, jobName, pool, false);
179:                            tempSet.add(par);
180:                        }
181:                    } else {
182:                        if (ft.isTransferringExecutableFile()) {
183:                            //add to staged files for adding of
184:                            //set up job.
185:                            stagedFiles.add(ft);
186:                            //the staged execution file should be having the setup
187:                            //job as parent if it does not preserve x bit
188:                            if (mTXStageInImplementation.doesPreserveXBit()) {
189:                                mFileTable.put(key, newJobName);
190:                            } else {
191:                                mFileTable.put(key, mTXStageInImplementation
192:                                        .getSetXBitJobName(jobName, staged++));
193:                            }
194:                        } else {
195:                            //make a new entry into the table
196:                            mFileTable.put(key, newJobName);
197:                        }
198:                        //add the newJobName to the tempSet so that even
199:                        //if the job has duplicate input files only one instance
200:                        //of transfer is scheduled. This came up during collapsing
201:                        //June 15th, 2004
202:                        tempSet.add(newJobName);
203:                    }
204:                }
205:
206:                if (!files.isEmpty()) {
207:                    mLogger.log(msg, LogManager.DEBUG_MESSAGE_LEVEL);
208:                    msg = "Adding new stagein transfer node named "
209:                            + newJobName;
210:                    mLogger.log(msg, LogManager.DEBUG_MESSAGE_LEVEL);
211:
212:                    //add a direct dependency between compute job
213:                    //and stagein job only if there is no
214:                    //executables being staged
215:                    if (stagedFiles.isEmpty()) {
216:                        //add the direct relation
217:                        addRelation(newJobName, jobName, pool, true);
218:                        SubInfo siJob = mTXStageInImplementation
219:                                .createTransferJob(job, files, null,
220:                                        newJobName, SubInfo.STAGE_IN_JOB);
221:                        addJob(siJob);
222:
223:                        //record the action in the provenance store.
224:                        logRefinerAction(job, siJob, files, "stage-in");
225:                    } else {
226:                        //the dependency to stage in job is added via the
227:                        //the setup job that does the chmod
228:                        SubInfo siJob = mTXStageInImplementation
229:                                .createTransferJob(job, files, stagedFiles,
230:                                        newJobName, SubInfo.STAGE_IN_JOB);
231:
232:                        addJob(siJob);
233:                        //record the action in the provenance store.
234:                        logRefinerAction(job, siJob, files, "stage-in");
235:                    }
236:
237:                }
238:
239:            }
240:
241:            /**
242:             * Adds the inter pool transfer nodes that are required for  transferring
243:             * the output files of the parents to the jobs execution site.
244:             *
245:             * @param job   <code>SubInfo</code> object corresponding to the node to
246:             *              which the files are to be transferred to.
247:             * @param files Collection of <code>FileTransfer</code> objects containing the
248:             *              information about source and destURL's.
249:             */
250:            public void addInterSiteTXNodes(SubInfo job, Collection files) {
251:                String jobName = job.getName();
252:                int counter = 0;
253:                String newJobName = this .INTER_POOL_PREFIX + jobName + "_"
254:                        + counter;
255:
256:                String msg = "Adding inter pool nodes for job " + jobName;
257:                String prevParent = null;
258:
259:                String lfn = null;
260:                String key = null;
261:                String par = null;
262:                String pool = job.getSiteHandle();
263:
264:                boolean toAdd = true;
265:
266:                //to prevent duplicate dependencies
267:                java.util.HashSet tempSet = new java.util.HashSet();
268:
269:                //node construction only if there is
270:                //a file to transfer
271:                if (!files.isEmpty()) {
272:                    mLogger.log(msg, LogManager.DEBUG_MESSAGE_LEVEL);
273:
274:                    for (Iterator it = files.iterator(); it.hasNext();) {
275:                        FileTransfer ft = (FileTransfer) it.next();
276:                        lfn = ft.getLFN();
277:                        //System.out.println("Trying to figure out for lfn " + lfn);
278:
279:                        //to ensure that duplicate edges
280:                        //are not added in the graph
281:                        //between the parent of a node and the
282:                        //inter tx node that transfers the file
283:                        //to the node site.
284:
285:                        //get the key for this lfn and pool
286:                        //if the key already in the table
287:                        //then remove the entry from
288:                        //the Vector and add a dependency
289:                        //in the graph
290:                        key = this .constructFileKey(lfn, pool);
291:                        par = (String) mFileTable.get(key);
292:                        //System.out.println("\nGot Key :" + key + " Value :" + par );
293:                        if (par != null) {
294:                            //transfer of this file
295:                            //has already been scheduled
296:                            //onto the pool
297:                            it.remove();
298:
299:                            //check if tempSet does not contain the parent
300:                            if (tempSet.contains(par)) {
301:                                mLogMsg = "IGNORING TO ADD interpool relation 1 from inter tx node: "
302:                                        + par
303:                                        + " -> "
304:                                        + jobName
305:                                        + " for transferring file "
306:                                        + lfn
307:                                        + " to pool " + pool;
308:
309:                                mLogger.log(mLogMsg,
310:                                        LogManager.DEBUG_MESSAGE_LEVEL);
311:
312:                            } else {
313:                                mLogMsg = "Adding interpool relation 1 from inter tx node: "
314:                                        + par
315:                                        + " -> "
316:                                        + jobName
317:                                        + " for transferring file "
318:                                        + lfn
319:                                        + " to pool " + pool;
320:                                mLogger.log(mLogMsg,
321:                                        LogManager.DEBUG_MESSAGE_LEVEL);
322:                                addRelation(par, jobName);
323:                                tempSet.add(par);
324:                            }
325:                        } else {
326:                            //make a new entry into the table
327:                            mFileTable.put(key, newJobName);
328:                            //System.out.println("\nPut Key :" + key + " Value :" + newJobName );
329:
330:                            //to ensure that duplicate edges
331:                            //are not added in the graph
332:                            //between the parent of a node and the
333:                            //inter tx node that transfers the file
334:                            //to the node site.
335:                            if (prevParent == null
336:                                    || !prevParent.equalsIgnoreCase(ft
337:                                            .getJobName())) {
338:
339:                                mLogMsg = "Adding interpool relation 2"
340:                                        + ft.getJobName() + " -> " + newJobName
341:                                        + " for transferring file " + lfn
342:                                        + " to pool " + pool;
343:                                mLogger.log(mLogMsg,
344:                                        LogManager.DEBUG_MESSAGE_LEVEL);
345:                                addRelation(ft.getJobName(), newJobName);
346:                            }
347:
348:                            //we only need to add the relation between a
349:                            //inter tx node and a node once.
350:                            if (toAdd) {
351:                                mLogMsg = "Adding interpool relation 3"
352:                                        + newJobName + " -> " + jobName
353:                                        + " for transferring file " + lfn
354:                                        + " to pool " + pool;
355:                                mLogger.log(mLogMsg,
356:                                        LogManager.DEBUG_MESSAGE_LEVEL);
357:                                addRelation(newJobName, jobName);
358:                                tempSet.add(newJobName);
359:                                toAdd = false;
360:                            }
361:
362:                        }
363:
364:                        prevParent = ft.getJobName();
365:                    }
366:
367:                    //add the new job and construct it's
368:                    //subinfo only if the vector is not
369:                    //empty
370:                    if (!files.isEmpty()) {
371:                        msg = "Adding new inter pool node named " + newJobName;
372:                        mLogger.log(msg, LogManager.DEBUG_MESSAGE_LEVEL);
373:
374:                        //added in make transfer node
375:                        SubInfo interJob = mTXInterImplementation
376:                                .createTransferJob(job, files, null,
377:                                        newJobName, SubInfo.INTER_POOL_JOB);
378:
379:                        addJob(interJob);
380:
381:                        this .logRefinerAction(job, interJob, files,
382:                                "inter-site");
383:                    }
384:
385:                }
386:                tempSet = null;
387:
388:            }
389:
390:            /**
391:             * Adds the stageout transfer nodes, that stage data to an output site
392:             * specified by the user.
393:             *
394:             * @param job   <code>SubInfo</code> object corresponding to the node to
395:             *              which the files are to be transferred to.
396:             * @param files Collection of <code>FileTransfer</code> objects containing the
397:             *              information about source and destURL's.
398:             * @param rcb   bridge to the Replica Catalog. Used for creating registration
399:             *              nodes in the workflow.
400:             *
401:             */
402:            public void addStageOutXFERNodes(SubInfo job, Collection files,
403:                    ReplicaCatalogBridge rcb) {
404:
405:                this .addStageOutXFERNodes(job, files, rcb, false);
406:            }
407:
408:            /**
409:             * Adds the stageout transfer nodes, that stage data to an output site
410:             * specified by the user.
411:             *
412:             * @param job   <code>SubInfo</code> object corresponding to the node to
413:             *              which the files are to be transferred to.
414:             * @param files Collection of <code>FileTransfer</code> objects containing the
415:             *              information about source and destURL's.
416:             * @param rcb   bridge to the Replica Catalog. Used for creating registration
417:             *              nodes in the workflow.
418:             * @param deletedLeaf to specify whether the node is being added for
419:             *                      a deleted node by the reduction engine or not.
420:             *                      default: false
421:             */
422:            public void addStageOutXFERNodes(SubInfo job, Collection files,
423:                    ReplicaCatalogBridge rcb, boolean deletedLeaf) {
424:                String jobName = job.getName();
425:                int counter = 0;
426:                String newJobName = this .STAGE_OUT_PREFIX + jobName + "_"
427:                        + counter;
428:                String regJob = this .REGISTER_PREFIX + jobName;
429:
430:                mLogMsg = "Adding output pool nodes for job " + jobName;
431:
432:                //separate the files for transfer
433:                //and for registration
434:                List txFiles = new ArrayList();
435:                List regFiles = new ArrayList();
436:                for (Iterator it = files.iterator(); it.hasNext();) {
437:                    FileTransfer ft = (FileTransfer) it.next();
438:                    if (!ft.getTransientTransferFlag()) {
439:                        txFiles.add(ft);
440:                    }
441:                    if (!ft.getTransientRegFlag()) {
442:                        regFiles.add(ft);
443:                    }
444:                }
445:
446:                boolean makeTNode = !txFiles.isEmpty();
447:                boolean makeRNode = !regFiles.isEmpty();
448:
449:                if (!files.isEmpty()) {
450:                    mLogger.log(mLogMsg, LogManager.DEBUG_MESSAGE_LEVEL);
451:                    mLogMsg = "Adding new output pool node named " + newJobName;
452:                    mLogger.log(mLogMsg, LogManager.DEBUG_MESSAGE_LEVEL);
453:
454:                    if (makeTNode) {
455:                        //added in make transfer node
456:                        //mDag.addNewJob(newJobName);
457:                        SubInfo soJob = mTXStageOutImplementation
458:                                .createTransferJob(job, txFiles, null,
459:                                        newJobName, SubInfo.STAGE_OUT_JOB);
460:                        addJob(soJob);
461:                        if (!deletedLeaf) {
462:                            addRelation(jobName, newJobName);
463:                        }
464:                        if (makeRNode) {
465:                            addRelation(newJobName, regJob);
466:                        }
467:
468:                        //log the refiner action
469:                        this .logRefinerAction(job, soJob, txFiles, "stage-out");
470:                    } else if (!makeTNode && makeRNode) {
471:                        addRelation(jobName, regJob);
472:
473:                    }
474:                    if (makeRNode) {
475:                        //call to make the reg subinfo
476:                        //added in make registration node
477:                        addJob(createRegistrationJob(regJob, job, regFiles, rcb));
478:                    }
479:
480:                }
481:
482:            }
483:
484:            /**
485:             * Creates the registration jobs, which registers the materialized files on
486:             * the output site in the Replica Catalog.
487:             *
488:             * @param regJobName  The name of the job which registers the files in the
489:             *                    Replica Mechanism.
490:             * @param job         The job whose output files are to be registered in the
491:             *                    Replica Mechanism.
492:             * @param files       Collection of <code>FileTransfer</code> objects containing
493:             *                    the information about source and destURL's.
494:             * @param rcb   bridge to the Replica Catalog. Used for creating registration
495:             *              nodes in the workflow.
496:             *
497:             *
498:             * @return the registration job.
499:             */
500:            protected SubInfo createRegistrationJob(String regJobName,
501:                    SubInfo job, Collection files, ReplicaCatalogBridge rcb) {
502:
503:                SubInfo regJob = rcb.makeRCRegNode(regJobName, job, files);
504:
505:                //log the registration action for provenance purposes
506:                StringBuffer sb = new StringBuffer();
507:                String indent = "\t";
508:                sb.append(indent);
509:                sb.append("<register job=\"").append(regJobName).append("\"> ");
510:                sb.append("\n");
511:
512:                //traverse through all the files
513:                NameValue dest;
514:                String newIndent = indent + "\t";
515:                for (Iterator it = files.iterator(); it.hasNext();) {
516:                    FileTransfer ft = (FileTransfer) it.next();
517:                    dest = ft.getDestURL();
518:                    sb.append(newIndent);
519:                    sb.append("<file ");
520:                    appendAttribute(sb, "lfn", ft.getLFN());
521:                    appendAttribute(sb, "site", dest.getKey());
522:                    sb.append(">");
523:                    sb.append("\n");
524:                    sb.append(newIndent).append(indent);
525:                    sb.append(dest.getValue());
526:                    sb.append("\n");
527:                    sb.append(newIndent);
528:                    sb.append("</file>").append("\n");
529:                }
530:                sb.append(indent);
531:                sb.append("</register>").append("\n");
532:
533:                //log the graph relationship
534:                String parent = job.getName();
535:                String child = regJob.getName();
536:
537:                sb.append(indent);
538:                sb.append("<child ");
539:                appendAttribute(sb, "ref", child);
540:                sb.append(">").append("\n");
541:
542:                sb.append(newIndent);
543:                sb.append("<parent ");
544:                appendAttribute(sb, "ref", parent);
545:                sb.append("/>").append("\n");
546:
547:                sb.append(indent);
548:                sb.append("</child>").append("\n");
549:
550:                mXMLStore.add(sb.toString());
551:
552:                //log the action for creating the relationship assertions
553:                try {
554:                    mPPS.registrationIntroducedFor(regJob.getName(), job
555:                            .getName());
556:                } catch (Exception e) {
557:                    throw new RuntimeException(
558:                            "PASOA Exception while logging relationship assertion for registration",
559:                            e);
560:                }
561:
562:                return regJob;
563:            }
564:
565:            /**
566:             * Signals that the traversal of the workflow is done. It signals to the
567:             * Provenace Store, that refinement is complete.
568:             */
569:            public void done() {
570:
571:                try {
572:                    mPPS.endWorkflowRefinementStep(this );
573:                } catch (Exception e) {
574:                    throw new RuntimeException("PASOA Exception", e);
575:                }
576:
577:            }
578:
579:            /**
580:             * Add a new job to the workflow being refined.
581:             *
582:             * @param job  the job to be added.
583:             */
584:            public void addJob(SubInfo job) {
585:                mDAG.add(job);
586:            }
587:
588:            /**
589:             * Adds a new relation to the workflow being refiner.
590:             *
591:             * @param parent    the jobname of the parent node of the edge.
592:             * @param child     the jobname of the child node of the edge.
593:             */
594:            public void addRelation(String parent, String child) {
595:                mLogger.log("Adding relation " + parent + " -> " + child,
596:                        LogManager.DEBUG_MESSAGE_LEVEL);
597:                mDAG.addNewRelation(parent, child);
598:
599:            }
600:
601:            /**
602:             * Adds a new relation to the workflow. In the case when the parent is a
603:             * transfer job that is added, the parentNew should be set only the first
604:             * time a relation is added. For subsequent compute jobs that maybe
605:             * dependant on this, it needs to be set to false.
606:             *
607:             * @param parent    the jobname of the parent node of the edge.
608:             * @param child     the jobname of the child node of the edge.
609:             * @param site      the execution pool where the transfer node is to be run.
610:             * @param parentNew the parent node being added, is the new transfer job
611:             *                  and is being called for the first time.
612:             */
613:            public void addRelation(String parent, String child, String site,
614:                    boolean parentNew) {
615:                mLogger.log("Adding relation " + parent + " -> " + child,
616:                        LogManager.DEBUG_MESSAGE_LEVEL);
617:                mDAG.addNewRelation(parent, child);
618:
619:            }
620:
621:            /**
622:             * Returns a textual description of the transfer mode.
623:             *
624:             * @return a short textual description
625:             */
626:            public String getDescription() {
627:                return this .DESCRIPTION;
628:            }
629:
630:            /**
631:             * Records the refiner action into the Provenace Store as a XML fragment.
632:             *
633:             * @param computeJob   the compute job.
634:             * @param txJob        the associated transfer job.
635:             * @param files        list of <code>FileTransfer</code> objects containing file transfers.
636:             * @param type         the type of transfer job
637:             */
638:            protected void logRefinerAction(SubInfo computeJob, SubInfo txJob,
639:                    Collection files, String type) {
640:                StringBuffer sb = new StringBuffer();
641:                String indent = "\t";
642:                sb.append(indent);
643:                sb.append("<transfer job=\"").append(txJob.getName()).append(
644:                        "\" ").append("type=\"").append(type).append("\">");
645:                sb.append("\n");
646:
647:                //traverse through all the files
648:                NameValue source;
649:                NameValue dest;
650:                String newIndent = indent + "\t";
651:                for (Iterator it = files.iterator(); it.hasNext();) {
652:                    FileTransfer ft = (FileTransfer) it.next();
653:                    source = ft.getSourceURL();
654:                    dest = ft.getDestURL();
655:                    sb.append(newIndent);
656:                    sb.append("<from ");
657:                    appendAttribute(sb, "site", source.getKey());
658:                    appendAttribute(sb, "lfn", ft.getLFN());
659:                    appendAttribute(sb, "url", source.getValue());
660:                    sb.append("/>");
661:                    sb.append("\n");
662:
663:                    sb.append(newIndent);
664:                    sb.append("<to ");
665:                    appendAttribute(sb, "site", dest.getKey());
666:                    appendAttribute(sb, "lfn", ft.getLFN());
667:                    appendAttribute(sb, "url", dest.getValue());
668:                    sb.append("/>");
669:                    sb.append("\n");
670:                }
671:                sb.append(indent);
672:                sb.append("</transfer>");
673:                sb.append("\n");
674:
675:                //log the graph relationship
676:                String parent = (txJob.getJobType() == SubInfo.STAGE_IN_JOB) ? txJob
677:                        .getName()
678:                        : computeJob.getName();
679:
680:                String child = (txJob.getJobType() == SubInfo.STAGE_IN_JOB) ? computeJob
681:                        .getName()
682:                        : txJob.getName();
683:
684:                sb.append(indent);
685:                sb.append("<child ");
686:                appendAttribute(sb, "ref", child);
687:                sb.append(">").append("\n");
688:
689:                sb.append(newIndent);
690:                sb.append("<parent ");
691:                appendAttribute(sb, "ref", parent);
692:                sb.append("/>").append("\n");
693:
694:                sb.append(indent);
695:                sb.append("</child>").append("\n");
696:
697:                //log the action for creating the relationship assertions
698:                try {
699:                    List stagingNodes = new java.util.ArrayList(1);
700:                    stagingNodes.add(txJob.getName());
701:                    mPPS.stagingIntroducedFor(stagingNodes, computeJob
702:                            .getName());
703:                } catch (Exception e) {
704:                    throw new RuntimeException(
705:                            "PASOA Exception while logging relationship assertion for staging ",
706:                            e);
707:                }
708:
709:                mXMLStore.add(sb.toString());
710:
711:            }
712:
713:            /**
714:             * Appends an xml attribute to the xml feed.
715:             *
716:             * @param xmlFeed  the xmlFeed to which xml is being written
717:             * @param key   the attribute key
718:             * @param value the attribute value
719:             */
720:            protected void appendAttribute(StringBuffer xmlFeed, String key,
721:                    String value) {
722:                xmlFeed.append(key).append("=").append("\"").append(value)
723:                        .append("\" ");
724:            }
725:
726:            /**
727:             * Constructs the key for an entry to the file table. The key returned
728:             * is lfn:siteHandle
729:             *
730:             * @param lfn         the logical filename of the file that has to be
731:             *                    transferred.
732:             * @param siteHandle  the name of the site to which the file is being
733:             *                    transferred.
734:             *
735:             * @return      the key for the entry to be  made in the filetable.
736:             */
737:            protected String constructFileKey(String lfn, String siteHandle) {
738:                StringBuffer sb = new StringBuffer();
739:                sb.append(lfn).append(":").append(siteHandle);
740:
741:                return sb.toString();
742:            }
743:
744:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.