Source Code Cross Referenced for Bundle.java in  » Workflow-Engines » pegasus-2.1.0 » org » griphyn » cPlanner » transfer » refiner » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Workflow Engines » pegasus 2.1.0 » org.griphyn.cPlanner.transfer.refiner 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         * This file or a portion of this file is licensed under the terms of
003:         * the Globus Toolkit Public License, found in file GTPL, or at
004:         * http://www.globus.org/toolkit/download/license.html. This notice must
005:         * appear in redistributions of this file, with or without modification.
006:         *
007:         * Redistributions of this Software, with or without modification, must
008:         * reproduce the GTPL in: (1) the Software, or (2) the Documentation or
009:         * some other similar material which is provided with the Software (if
010:         * any).
011:         *
012:         * Copyright 1999-2004 University of Chicago and The University of
013:         * Southern California. All rights reserved.
014:         */
015:
016:        package org.griphyn.cPlanner.transfer.refiner;
017:
018:        import org.griphyn.cPlanner.classes.ADag;
019:        import org.griphyn.cPlanner.classes.PlannerOptions;
020:        import org.griphyn.cPlanner.classes.SubInfo;
021:        import org.griphyn.cPlanner.classes.FileTransfer;
022:
023:        import org.griphyn.cPlanner.common.PegasusProperties;
024:        import org.griphyn.cPlanner.common.LogManager;
025:
026:        import org.griphyn.cPlanner.namespace.VDS;
027:
028:        import org.griphyn.common.catalog.TransformationCatalogEntry;
029:
030:        import org.griphyn.cPlanner.transfer.Refiner;
031:
032:        import java.util.Collection;
033:        import java.util.List;
034:        import java.util.ArrayList;
035:        import java.util.Vector;
036:        import java.util.Iterator;
037:        import java.util.Map;
038:        import java.util.HashMap;
039:        import java.util.Set;
040:        import java.util.HashSet;
041:        import org.griphyn.cPlanner.engine.ReplicaCatalogBridge;
042:
043:        /**
044:         * An extension of the default refiner, that allows the user to specify
045:         * the number of transfer nodes per execution site for stagein and stageout.
046:         *
047:         * @author Karan Vahi
048:         * @version $Revision: 226 $
049:         */
050:
051:        public class Bundle extends Default {
052:
053:            /**
054:             * A short description of the transfer refinement.
055:             */
056:            public static final String DESCRIPTION = "Bundle Mode (stagein files distributed amongst bundles)";
057:
058:            /**
059:             * The default bundling factor that identifies the number of transfer jobs
060:             * that are being created per execution pool for stageing in data for
061:             * the workflow.
062:             */
063:            public static final String DEFAULT_STAGE_IN_BUNDLE_FACTOR = "1";
064:
065:            /**
066:             * The default bundling factor that identifies the number of transfer jobs
067:             * that are being created per execution pool while stageing data out.
068:             */
069:            public static final String DEFAULT_STAGE_OUT_BUNDLE_FACTOR = "1";
070:
071:            /**
072:             * The map containing the list of stage in transfer jobs that are being
073:             * created for the workflow indexed by the execution poolname.
074:             */
075:            private Map mStageInMap;
076:
077:            /**
078:             * The map indexed by compute jobnames that contains the list of stagin job
079:             * names that are being added during the traversal of the workflow. This is
080:             * used to construct the relations that need to be added to workflow, once
081:             * the traversal is done.
082:             */
083:            private Map mRelationsMap;
084:
085:            /**
086:             * The map containing the stage in bundle values indexed by the name of the
087:             * pool. If the bundle value is not specified, then null is stored.
088:             */
089:            private Map mSIBundleMap;
090:
091:            /**
092:             * The map indexed by staged executable logical name. Each entry is the
093:             * name of the corresponding setup job, that changes the XBit on the staged
094:             * file.
095:             */
096:            private Map mSetupMap;
097:
098:            /**
099:             * A map indexed by site name, that contains the pointer to the stage out
100:             * PoolTransfer objects for that site. This is per level of the workflow.
101:             */
102:            private Map mStageOutMapPerLevel;
103:
104:            /**
105:             * The current level of the jobs being traversed.
106:             */
107:            private int mCurrentLevel;
108:
109:            /**
110:             * The handle to the replica catalog bridge.
111:             */
112:            private ReplicaCatalogBridge mRCB;
113:
114:            /**
115:             * The overloaded constructor.
116:             *
117:             * @param dag        the workflow to which transfer nodes need to be added.
118:             * @param properties the <code>PegasusProperties</code> object containing all
119:             *                   the properties required by Pegasus.
120:             * @param options    the options passed to the planner.
121:             *
122:             */
123:            public Bundle(ADag dag, PegasusProperties properties,
124:                    PlannerOptions options) {
125:                super (dag, properties, options);
126:                mStageInMap = new HashMap(options.getExecutionSites().size());
127:                mSIBundleMap = new HashMap();
128:                mRelationsMap = new HashMap();
129:                mSetupMap = new HashMap();
130:                mCurrentLevel = -1;
131:            }
132:
133:            /**
134:             * Adds the stage in transfer nodes which transfer the input files for a job,
135:             * from the location returned from the replica catalog to the job's execution
136:             * pool.
137:             *
138:             * @param job   <code>SubInfo</code> object corresponding to the node to
139:             *              which the files are to be transferred to.
140:             * @param files Collection of <code>FileTransfer</code> objects containing the
141:             *              information about source and destURL's.
142:             */
143:            public void addStageInXFERNodes(SubInfo job, Collection files) {
144:                String jobName = job.getName();
145:                String siteHandle = job.getSiteHandle();
146:                String key = null;
147:                String par = null;
148:                int bundle = -1;
149:
150:                //to prevent duplicate dependencies
151:                Set tempSet = new HashSet();
152:
153:                int staged = 0;
154:                Collection stagedFiles = new ArrayList();
155:                Collection stageInExecJobs = new ArrayList();//store list of jobs that are transferring the stage file
156:                for (Iterator it = files.iterator(); it.hasNext();) {
157:                    FileTransfer ft = (FileTransfer) it.next();
158:                    String lfn = ft.getLFN();
159:
160:                    //get the key for this lfn and pool
161:                    //if the key already in the table
162:                    //then remove the entry from
163:                    //the Vector and add a dependency
164:                    //in the graph
165:                    key = this .constructFileKey(lfn, siteHandle);
166:                    par = (String) mFileTable.get(key);
167:                    //System.out.println("lfn " + lfn + " par " + par);
168:                    if (par != null) {
169:                        it.remove();
170:
171:                        //check if tempSet does not contain the parent
172:                        //fix for sonal's bug
173:                        tempSet.add(par);
174:
175:                        if (ft.isTransferringExecutableFile()) {
176:                            //currently we have only one file to be staged per
177:                            //compute job . Taking a short cut in determining
178:                            //the name of setXBit job
179:                            String xBitJobName = (String) mSetupMap.get(key);
180:                            if (key == null) {
181:                                throw new RuntimeException(
182:                                        "Internal Pegasus Error while "
183:                                                + "constructing bundled stagein jobs");
184:                            }
185:                            //add relation xbitjob->computejob
186:                            this .addRelation(xBitJobName, jobName);
187:                        }
188:
189:                    } else {
190:                        //get the name of the transfer job
191:                        boolean contains = mStageInMap.containsKey(siteHandle);
192:                        //following pieces need rearragnement!
193:                        if (!contains) {
194:                            bundle = getSISiteBundleValue(siteHandle, job.vdsNS
195:                                    .getStringValue(VDS.BUNDLE_STAGE_IN_KEY));
196:                            mSIBundleMap.put(siteHandle, Integer
197:                                    .toString(bundle));
198:                        }
199:                        PoolTransfer pt = (contains) ? (PoolTransfer) mStageInMap
200:                                .get(siteHandle)
201:                                : new PoolTransfer(siteHandle, bundle);
202:                        if (!contains) {
203:                            mStageInMap.put(siteHandle, pt);
204:                        }
205:                        //add the FT to the appropriate transfer job.
206:                        String newJobName = pt.addTransfer(ft);
207:
208:                        if (ft.isTransferringExecutableFile()) {
209:                            //currently we have only one file to be staged per
210:                            //compute job
211:                            //                    Collection execFiles = new ArrayList(1);
212:                            //                    execFiles.add(ft);
213:                            //add both the name of the stagein job and the executable file
214:                            stageInExecJobs.add(newJobName);
215:                            stagedFiles.add(ft);
216:
217:                            //                    mTXStageInImplementation.addSetXBitJobs(job, newJobName,
218:                            //                                                            execFiles,
219:                            //                                                            SubInfo.STAGE_IN_JOB);
220:                            mLogger.log("Entered "
221:                                    + key
222:                                    + "->"
223:                                    + mTXStageInImplementation
224:                                            .getSetXBitJobName(job.getName(),
225:                                                    staged),
226:                                    LogManager.DEBUG_MESSAGE_LEVEL);
227:                            mSetupMap.put(key, mTXStageInImplementation
228:                                    .getSetXBitJobName(job.getName(), staged));
229:                            staged++;
230:                        }
231:
232:                        //make a new entry into the table
233:                        mFileTable.put(key, newJobName);
234:                        //add the newJobName to the tempSet so that even
235:                        //if the job has duplicate input files only one instance
236:                        //of transfer is scheduled. This came up during collapsing
237:                        //June 15th, 2004
238:                        tempSet.add(newJobName);
239:
240:                    }
241:                }
242:
243:                //if there were any staged files
244:                //add the setXBitJobs for them
245:                int index = 0;
246:                Iterator jobIt = stageInExecJobs.iterator();
247:                for (Iterator it = stagedFiles.iterator(); it.hasNext(); index++) {
248:                    Collection execFiles = new ArrayList(1);
249:                    execFiles.add(it.next());
250:                    mTXStageInImplementation.addSetXBitJobs(job, (String) jobIt
251:                            .next(), execFiles, SubInfo.STAGE_IN_JOB, index);
252:
253:                }
254:
255:                //add the temp set to the relations
256:                //relations are added to the workflow in the end.
257:                mRelationsMap.put(jobName, tempSet);
258:
259:            }
260:
261:            /**
262:             * Adds the stageout transfer nodes, that stage data to an output site
263:             * specified by the user.
264:             *
265:             * @param job   <code>SubInfo</code> object corresponding to the node to
266:             *              which the files are to be transferred to.
267:             * @param files Collection of <code>FileTransfer</code> objects containing the
268:             *              information about source and destURL's.
269:             * @param rcb   bridge to the Replica Catalog. Used for creating registration
270:             *              nodes in the workflow.
271:             * @param deletedLeaf to specify whether the node is being added for
272:             *                      a deleted node by the reduction engine or not.
273:             *                      default: false
274:             */
275:            public void addStageOutXFERNodes(SubInfo job, Collection files,
276:                    ReplicaCatalogBridge rcb, boolean deletedLeaf) {
277:
278:                //initializing rcb till the change in function signature happens
279:                //needs to be passed during refiner initialization
280:                mRCB = rcb;
281:
282:                //sanity check
283:                if (files.isEmpty()) {
284:                    return;
285:                }
286:
287:                String jobName = job.getName();
288:                //        String regJob = this.REGISTER_PREFIX + jobName;
289:
290:                mLogMsg = "Adding output pool nodes for job " + jobName;
291:
292:                //separate the files for transfer
293:                //and for registration
294:                List txFiles = new ArrayList();
295:                List regFiles = new ArrayList();
296:                for (Iterator it = files.iterator(); it.hasNext();) {
297:                    FileTransfer ft = (FileTransfer) it.next();
298:                    if (!ft.getTransientTransferFlag()) {
299:                        txFiles.add(ft);
300:                    }
301:                    if (!ft.getTransientRegFlag()) {
302:                        regFiles.add(ft);
303:                    }
304:                }
305:
306:                boolean makeTNode = !txFiles.isEmpty();
307:                boolean makeRNode = !regFiles.isEmpty();
308:
309:                int level = job.getLevel();
310:                String site = job.getSiteHandle();
311:                int bundleValue = getSOSiteBundleValue(site, job.vdsNS
312:                        .getStringValue(VDS.BUNDLE_STAGE_OUT_KEY));
313:
314:                if (level != mCurrentLevel) {
315:                    mCurrentLevel = level;
316:                    //we are starting on a new level of the workflow.
317:                    //reinitialize stuff
318:                    this .resetStageOutMap();
319:                }
320:
321:                TransferContainer soTC = null;
322:                if (makeTNode) {
323:
324:                    //get the appropriate pool transfer object for the site
325:                    PoolTransfer pt = this .getStageOutPoolTransfer(site,
326:                            bundleValue);
327:                    //we add all the file transfers to the pool transfer
328:                    soTC = pt
329:                            .addTransfer(txFiles, level, SubInfo.STAGE_OUT_JOB);
330:                    String soJob = soTC.getTXName();
331:
332:                    if (!deletedLeaf) {
333:                        //need to add a relation between a compute and stage-out
334:                        //job only if the compute job was not reduced.
335:                        addRelation(jobName, soJob);
336:                    }
337:                    //moved to the resetStageOut method
338:                    //            if (makeRNode) {
339:                    //                addRelation( soJob, soTC.getRegName() );
340:                    //            }
341:                } else if (makeRNode) {
342:                    //add an empty file transfer
343:                    //get the appropriate pool transfer object for the site
344:                    PoolTransfer pt = this .getStageOutPoolTransfer(site,
345:                            bundleValue);
346:                    //we add all the file transfers to the pool transfer
347:                    soTC = pt.addTransfer(new Vector(), level,
348:                            SubInfo.STAGE_OUT_JOB);
349:
350:                    //direct link between compute job and registration job
351:                    addRelation(jobName, soTC.getRegName());
352:
353:                }
354:                if (makeRNode) {
355:                    soTC.addRegistrationFiles(regFiles);
356:                    //call to make the reg subinfo
357:                    //added in make registration node
358:                    //           addJob(createRegistrationJob(regJob, job, regFiles, rcb));
359:                }
360:
361:            }
362:
363:            /**
364:             * Signals that the traversal of the workflow is done. At this point the
365:             * transfer nodes are actually constructed traversing through the transfer
366:             * containers and the stdin of the transfer jobs written.
367:             */
368:            public void done() {
369:                //traverse through the stagein map and
370:                //add transfer nodes per pool
371:                String key;
372:                String value;
373:                PoolTransfer pt;
374:                TransferContainer tc;
375:                Map.Entry entry;
376:                SubInfo job = new SubInfo();
377:
378:                for (Iterator it = mStageInMap.entrySet().iterator(); it
379:                        .hasNext();) {
380:                    entry = (Map.Entry) it.next();
381:                    key = (String) entry.getKey();
382:                    pt = (PoolTransfer) entry.getValue();
383:                    mLogger.log("Adding stage in transfer nodes for pool "
384:                            + key, LogManager.DEBUG_MESSAGE_LEVEL);
385:
386:                    for (Iterator pIt = pt.getTransferContainerIterator(); pIt
387:                            .hasNext();) {
388:                        tc = (TransferContainer) pIt.next();
389:                        if (tc == null) {
390:                            //break out
391:                            break;
392:                        }
393:                        mLogger.log("Adding stagein transfer node "
394:                                + tc.getTXName(),
395:                                LogManager.DEBUG_MESSAGE_LEVEL);
396:                        //added in make transfer node
397:                        //mDag.addNewJob(tc.getName());
398:                        //we just need the execution pool in the job object
399:                        job.executionPool = key;
400:                        addJob(mTXStageInImplementation.createTransferJob(job,
401:                                tc.getFileTransfers(), null, tc.getTXName(),
402:                                SubInfo.STAGE_IN_JOB));
403:
404:                    }
405:                }
406:
407:                //adding relations that tie in the stagin
408:                //jobs to the compute jobs.
409:                for (Iterator it = mRelationsMap.entrySet().iterator(); it
410:                        .hasNext();) {
411:                    entry = (Map.Entry) it.next();
412:                    key = (String) entry.getKey();
413:                    mLogger.log("Adding relations for job " + key,
414:                            LogManager.DEBUG_MESSAGE_LEVEL);
415:                    for (Iterator pIt = ((Collection) entry.getValue())
416:                            .iterator(); pIt.hasNext();) {
417:                        value = (String) pIt.next();
418:                        mLogMsg = "Adding relation " + value + " -> " + key;
419:                        mLogger.log(mLogMsg, LogManager.DEBUG_MESSAGE_LEVEL);
420:                        //                mDag.addNewRelation(value,key);
421:                        addRelation(value, key);
422:                    }
423:                }
424:
425:                //reset the stageout map too
426:                this .resetStageOutMap();
427:            }
428:
429:            /**
430:             * Returns a textual description of the transfer mode.
431:             *
432:             * @return a short textual description
433:             */
434:            public String getDescription() {
435:                return this .DESCRIPTION;
436:            }
437:
438:            /**
439:             * Determines the bundle factor for a particular site on the basis of the
440:             * stage in bundle value associcated with the underlying transfer
441:             * transformation in the transformation catalog. If the key is not found,
442:             * then the default value is returned. In case of the default value being
443:             * null the global default is returned.
444:             *
445:             * @param site    the site at which the value is desired.
446:             * @param deflt   the default value.
447:             *
448:             * @return the bundle factor.
449:             *
450:             * @see #DEFAULT_BUNDLE_STAGE_IN_FACTOR
451:             */
452:            protected int getSISiteBundleValue(String site, String deflt) {
453:                //this should be parameterised Karan Dec 20,2005
454:                TransformationCatalogEntry entry = mTXStageInImplementation
455:                        .getTransformationCatalogEntry(site);
456:                SubInfo sub = new SubInfo();
457:                String value = (deflt == null) ? this .DEFAULT_STAGE_IN_BUNDLE_FACTOR
458:                        : deflt;
459:
460:                if (entry != null) {
461:                    sub.updateProfiles(entry);
462:                    value = (sub.vdsNS.containsKey(VDS.BUNDLE_STAGE_IN_KEY)) ? sub.vdsNS
463:                            .getStringValue(VDS.BUNDLE_STAGE_IN_KEY)
464:                            : value;
465:                }
466:
467:                return Integer.parseInt(value);
468:            }
469:
470:            /**
471:             * Determines the bundle factor for a particular site on the basis of the
472:             * stage out bundle value associcated with the underlying transfer
473:             * transformation in the transformation catalog. If the key is not found,
474:             * then the default value is returned. In case of the default value being
475:             * null the global default is returned.
476:             *
477:             * @param site    the site at which the value is desired.
478:             * @param deflt   the default value.
479:             *
480:             * @return the bundle factor.
481:             *
482:             * @see #DEFAULT_STAGE_OUT_BUNDLE_FACTOR
483:             */
484:            protected int getSOSiteBundleValue(String site, String deflt) {
485:                //this should be parameterised Karan Dec 20,2005
486:                TransformationCatalogEntry entry = mTXStageInImplementation
487:                        .getTransformationCatalogEntry(site);
488:                SubInfo sub = new SubInfo();
489:                String value = (deflt == null) ? this .DEFAULT_STAGE_OUT_BUNDLE_FACTOR
490:                        : deflt;
491:
492:                if (entry != null) {
493:                    sub.updateProfiles(entry);
494:                    value = (sub.vdsNS.containsKey(VDS.BUNDLE_STAGE_OUT_KEY)) ? sub.vdsNS
495:                            .getStringValue(VDS.BUNDLE_STAGE_OUT_KEY)
496:                            : value;
497:                }
498:
499:                return Integer.parseInt(value);
500:            }
501:
502:            /**
503:             * Returns the appropriate pool transfer for a particular site.
504:             *
505:             * @param site  the site for which the PT is reqd.
506:             * @param num   the number of Stageout jobs required for that Pool.
507:             *
508:             * @return the PoolTransfer
509:             */
510:            public PoolTransfer getStageOutPoolTransfer(String site, int num) {
511:
512:                if (this .mStageOutMapPerLevel.containsKey(site)) {
513:                    return (PoolTransfer) this .mStageOutMapPerLevel.get(site);
514:                } else {
515:                    PoolTransfer pt = new PoolTransfer(site, num);
516:                    this .mStageOutMapPerLevel.put(site, pt);
517:                    return pt;
518:                }
519:            }
520:
521:            /**
522:             * Resets the stage out map.
523:             */
524:            private void resetStageOutMap() {
525:                if (this .mStageOutMapPerLevel != null) {
526:                    //before flushing add the stageout nodes to the workflow
527:                    SubInfo job = new SubInfo();
528:
529:                    for (Iterator it = mStageOutMapPerLevel.values().iterator(); it
530:                            .hasNext();) {
531:                        PoolTransfer pt = (PoolTransfer) it.next();
532:                        job.setSiteHandle(pt.mPool);
533:
534:                        mLogger.log(
535:                                "Adding jobs for staging out data from site "
536:                                        + pt.mPool,
537:                                LogManager.DEBUG_MESSAGE_LEVEL);
538:
539:                        //traverse through all the TransferContainers
540:                        for (Iterator tcIt = pt.getTransferContainerIterator(); tcIt
541:                                .hasNext();) {
542:                            TransferContainer tc = (TransferContainer) tcIt
543:                                    .next();
544:                            if (tc == null) {
545:                                //break out
546:                                break;
547:                            }
548:
549:                            //add the stageout job if required
550:                            SubInfo soJob = null;
551:                            if (!tc.getFileTransfers().isEmpty()) {
552:                                mLogger.log("Adding stage-out job "
553:                                        + tc.getTXName(),
554:                                        LogManager.DEBUG_MESSAGE_LEVEL);
555:                                soJob = mTXStageOutImplementation
556:                                        .createTransferJob(job, tc
557:                                                .getFileTransfers(), null, tc
558:                                                .getTXName(),
559:                                                SubInfo.STAGE_OUT_JOB);
560:                                addJob(soJob);
561:                            }
562:
563:                            //add registration job if required
564:                            if (!tc.getRegistrationFiles().isEmpty()) {
565:
566:                                //add relation to stage out if the stageout job was created
567:                                if (soJob != null) {
568:                                    //make the stageout job the super node for the registration job
569:                                    job.setName(soJob.getName());
570:                                    addRelation(tc.getTXName(), tc.getRegName());
571:                                }
572:
573:                                mLogger.log("Adding registration job "
574:                                        + tc.getRegName(),
575:                                        LogManager.DEBUG_MESSAGE_LEVEL);
576:                                addJob(createRegistrationJob(tc.getRegName(),
577:                                        job, tc.getRegistrationFiles(), mRCB));
578:
579:                            }
580:
581:                        }
582:                    }
583:                }
584:
585:                mStageOutMapPerLevel = new HashMap();
586:            }
587:
588:            /**
589:             * A container class for storing the name of the transfer job, the list of
590:             * file transfers that the job is responsible for.
591:             */
592:            private class TransferContainer {
593:
594:                /**
595:                 * The name of the transfer job.
596:                 */
597:                private String mTXName;
598:
599:                /**
600:                 * The name of the registration job.
601:                 */
602:                private String mRegName;
603:
604:                /**
605:                 * The collection of <code>FileTransfer</code> objects containing the
606:                 * transfers the job is responsible for.
607:                 */
608:                private Collection mFileTXList;
609:
610:                /**
611:                 * The collection of <code>FileTransfer</code> objects containing the
612:                 * files that need to be registered.
613:                 */
614:                private Collection mRegFiles;
615:
616:                /**
617:                 * The type of the transfers the job is responsible for.
618:                 */
619:                private int mTransferType;
620:
621:                /**
622:                 * The default constructor.
623:                 */
624:                public TransferContainer() {
625:                    mTXName = null;
626:                    mRegName = null;
627:                    mFileTXList = new Vector();
628:                    mRegFiles = new Vector();
629:                    mTransferType = SubInfo.STAGE_IN_JOB;
630:                }
631:
632:                /**
633:                 * Sets the name of the transfer job.
634:                 *
635:                 * @param name  the name of the transfer job.
636:                 */
637:                public void setTXName(String name) {
638:                    mTXName = name;
639:                }
640:
641:                /**
642:                 * Sets the name of the registration job.
643:                 *
644:                 * @param name  the name of the transfer job.
645:                 */
646:                public void setRegName(String name) {
647:                    mRegName = name;
648:                }
649:
650:                /**
651:                 * Adds a file transfer to the underlying collection.
652:                 *
653:                 * @param transfer  the <code>FileTransfer</code> containing the
654:                 *                  information about a single transfer.
655:                 */
656:                public void addTransfer(FileTransfer transfer) {
657:                    mFileTXList.add(transfer);
658:                }
659:
660:                /**
661:                 * Adds a file transfer to the underlying collection.
662:                 *
663:                 * @param files   collection of <code>FileTransfer</code>.
664:                 */
665:                public void addTransfer(Collection files) {
666:                    mFileTXList.addAll(files);
667:                }
668:
669:                /**
670:                 * Adds a Collection of File transfer to the underlying collection of
671:                 * files to be registered.
672:                 *
673:                 * @param files   collection of <code>FileTransfer</code>.
674:                 */
675:                public void addRegistrationFiles(Collection files) {
676:                    mRegFiles.addAll(files);
677:                }
678:
679:                /**
680:                 * Sets the transfer type for the transfers associated.
681:                 *
682:                 * @param type  type of transfer.
683:                 */
684:                public void setTransferType(int type) {
685:                    mTransferType = type;
686:                }
687:
688:                /**
689:                 * Returns the name of the transfer job.
690:                 *
691:                 * @return name of the transfer job.
692:                 */
693:                public String getTXName() {
694:                    return mTXName;
695:                }
696:
697:                /**
698:                 * Returns the name of the registration job.
699:                 *
700:                 * @return name of the registration job.
701:                 */
702:                public String getRegName() {
703:                    return mRegName;
704:                }
705:
706:                /**
707:                 * Returns the collection of transfers associated with this transfer
708:                 * container.
709:                 *
710:                 * @return a collection of <code>FileTransfer</code> objects.
711:                 */
712:                public Collection getFileTransfers() {
713:                    return mFileTXList;
714:                }
715:
716:                /**
717:                 * Returns the collection of registration files associated with this transfer
718:                 * container.
719:                 *
720:                 * @return a collection of <code>FileTransfer</code> objects.
721:                 */
722:                public Collection getRegistrationFiles() {
723:                    return mRegFiles;
724:                }
725:
726:            }
727:
728:            /**
729:             * A container to store the transfers that need to be done on a single pool.
730:             * The transfers are stored over a collection of Transfer Containers with
731:             * each transfer container responsible for one transfer job.
732:             */
733:            private class PoolTransfer {
734:
735:                /**
736:                 * The maximum number of transfer jobs that are allowed for this
737:                 * particular pool.
738:                 */
739:                private int mCapacity;
740:
741:                /**
742:                 * The index of the job to which the next transfer for the pool would
743:                 * be scheduled.
744:                 */
745:                private int mNext;
746:
747:                /**
748:                 * The pool for which these transfers are grouped.
749:                 */
750:                private String mPool;
751:
752:                /**
753:                 * The list of <code>TransferContainer</code> that correspond to
754:                 * each transfer job.
755:                 */
756:                private List mTXContainers;
757:
758:                /**
759:                 * The default constructor.
760:                 */
761:                public PoolTransfer() {
762:                    mCapacity = 0;
763:                    mNext = -1;
764:                    mPool = null;
765:                    mTXContainers = null;
766:                }
767:
768:                /**
769:                 * Convenience constructor.
770:                 *
771:                 * @param pool    the pool name for which transfers are being grouped.
772:                 * @param number  the number of transfer jobs that are going to be created
773:                 *                for the pool.
774:                 */
775:                public PoolTransfer(String pool, int number) {
776:                    mCapacity = number;
777:                    mNext = 0;
778:                    mPool = pool;
779:                    mTXContainers = new ArrayList(number);
780:                    //intialize to null
781:                    for (int i = 0; i < number; i++) {
782:                        mTXContainers.add(null);
783:                    }
784:                }
785:
786:                /**
787:                 * Adds a a collection of <code>FileTransfer</code> objects to the
788:                 * appropriate TransferContainer. The collection is added to a single
789:                 * TransferContainer, and the pointer is then updated to the next container.
790:                 *
791:                 * @param files  the collection <code>FileTransfer</code> to be added.
792:                 * @param level  the level of the workflow
793:                 * @param type   the type of transfer job
794:                 *
795:                 * @return  the Transfer Container to which the job file transfers were added.
796:                 */
797:                public TransferContainer addTransfer(Collection files,
798:                        int level, int type) {
799:                    //we add the transfer to the container pointed
800:                    //by next
801:                    Object obj = mTXContainers.get(mNext);
802:                    TransferContainer tc = null;
803:                    if (obj == null) {
804:                        //on demand add a new transfer container to the end
805:                        //is there a scope for gaps??
806:                        tc = new TransferContainer();
807:                        tc.setTXName(getTXJobName(mNext, type, level));
808:                        //add the name for the registration job that maybe associated
809:                        tc.setRegName(getRegJobName(mNext, level));
810:                        mTXContainers.set(mNext, tc);
811:                    } else {
812:                        tc = (TransferContainer) obj;
813:                    }
814:                    tc.addTransfer(files);
815:
816:                    //update the next pointer to maintain
817:                    //round robin status
818:                    mNext = (mNext < (mCapacity - 1)) ? mNext + 1 : 0;
819:
820:                    return tc;
821:                }
822:
823:                /**
824:                 * Adds a file transfer to the appropriate TransferContainer.
825:                 * The file transfers are added in a round robin manner underneath.
826:                 *
827:                 * @param transfer  the <code>FileTransfer</code> containing the
828:                 *                  information about a single transfer.
829:                 *
830:                 * @return  the name of the transfer job to which the transfer is added.
831:                 */
832:                public String addTransfer(FileTransfer transfer) {
833:                    //we add the transfer to the container pointed
834:                    //by next
835:                    Object obj = mTXContainers.get(mNext);
836:                    TransferContainer tc = null;
837:                    if (obj == null) {
838:                        //on demand add a new transfer container to the end
839:                        //is there a scope for gaps??
840:                        tc = new TransferContainer();
841:                        tc.setTXName(getTXJobName(mNext, SubInfo.STAGE_IN_JOB));
842:                        mTXContainers.set(mNext, tc);
843:                    } else {
844:                        tc = (TransferContainer) obj;
845:                    }
846:                    tc.addTransfer(transfer);
847:
848:                    //update the next pointer to maintain
849:                    //round robin status
850:                    mNext = (mNext < (mCapacity - 1)) ? mNext + 1 : 0;
851:
852:                    return tc.getTXName();
853:                }
854:
855:                /**
856:                 * Returns the iterator to the list of transfer containers.
857:                 *
858:                 * @return the iterator.
859:                 */
860:                public Iterator getTransferContainerIterator() {
861:                    return mTXContainers.iterator();
862:                }
863:
864:                /**
865:                 * Generates the name of the transfer job, that is unique for the given
866:                 * workflow.
867:                 *
868:                 * @param counter  the index for the transfer job.
869:                 * @param type     the type of transfer job.
870:                 * @param level    the level of the workflow.
871:                 *
872:                 * @return the name of the transfer job.
873:                 */
874:                private String getTXJobName(int counter, int type, int level) {
875:                    StringBuffer sb = new StringBuffer();
876:                    switch (type) {
877:                    case SubInfo.STAGE_IN_JOB:
878:                        sb.append(Refiner.STAGE_IN_PREFIX);
879:                        break;
880:
881:                    case SubInfo.STAGE_OUT_JOB:
882:                        sb.append(Refiner.STAGE_OUT_PREFIX);
883:                        break;
884:
885:                    default:
886:                        throw new RuntimeException("Wrong type specified "
887:                                + type);
888:                    }
889:
890:                    sb.append(mPool).append("_").append(level).append("_")
891:                            .append(counter);
892:
893:                    return sb.toString();
894:                }
895:
896:                /**
897:                 * Generates the name of the transfer job, that is unique for the given
898:                 * workflow.
899:                 *
900:                 * @param counter  the index for the registration job.
901:                 * @param level    the level of the workflow.
902:                 *
903:                 * @return the name of the transfer job.
904:                 */
905:                private String getRegJobName(int counter, int level) {
906:                    StringBuffer sb = new StringBuffer();
907:                    sb.append(Refiner.REGISTER_PREFIX);
908:
909:                    sb.append(mPool).append("_").append(level).append("_")
910:                            .append(counter);
911:
912:                    return sb.toString();
913:                }
914:
915:                /**
916:                 * Generates the name of the transfer job, that is unique for the given
917:                 * workflow.
918:                 *
919:                 * @param counter  the index for the transfer job.
920:                 * @param type     the type of transfer job.
921:                 *
922:                 * @return the name of the transfer job.
923:                 */
924:                private String getTXJobName(int counter, int type) {
925:                    StringBuffer sb = new StringBuffer();
926:                    switch (type) {
927:                    case SubInfo.STAGE_IN_JOB:
928:                        sb.append(Refiner.STAGE_IN_PREFIX);
929:                        break;
930:
931:                    case SubInfo.STAGE_OUT_JOB:
932:                        sb.append(Refiner.STAGE_OUT_PREFIX);
933:                        break;
934:
935:                    default:
936:                        throw new RuntimeException("Wrong type specified "
937:                                + type);
938:                    }
939:                    sb.append(mPool).append("_").append(counter);
940:
941:                    return sb.toString();
942:                }
943:
944:            }
945:
946:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.