Source Code Cross Referenced for TransferEngine.java in  » Workflow-Engines » pegasus-2.1.0 » org » griphyn » cPlanner » engine » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Workflow Engines » pegasus 2.1.0 » org.griphyn.cPlanner.engine 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        /*
0002:         * This file or a portion of this file is licensed under the terms of
0003:         * the Globus Toolkit Public License, found in file GTPL, or at
0004:         * http://www.globus.org/toolkit/download/license.html. This notice must
0005:         * appear in redistributions of this file, with or without modification.
0006:         *
0007:         * Redistributions of this Software, with or without modification, must
0008:         * reproduce the GTPL in: (1) the Software, or (2) the Documentation or
0009:         * some other similar material which is provided with the Software (if
0010:         * any).
0011:         *
0012:         * Copyright 1999-2004 University of Chicago and The University of
0013:         * Southern California. All rights reserved.
0014:         */
0015:        package org.griphyn.cPlanner.engine;
0016:
0017:        import org.griphyn.cPlanner.classes.ADag;
0018:        import org.griphyn.cPlanner.classes.FileTransfer;
0019:        import org.griphyn.cPlanner.classes.GridFTPServer;
0020:        import org.griphyn.cPlanner.classes.NameValue;
0021:        import org.griphyn.cPlanner.classes.PegasusFile;
0022:        import org.griphyn.cPlanner.classes.PlannerOptions;
0023:        import org.griphyn.cPlanner.classes.ReplicaLocation;
0024:        import org.griphyn.cPlanner.classes.SiteInfo;
0025:        import org.griphyn.cPlanner.classes.SubInfo;
0026:
0027:        import org.griphyn.cPlanner.common.LogManager;
0028:        import org.griphyn.cPlanner.common.Utility;
0029:        import org.griphyn.cPlanner.common.PegasusProperties;
0030:
0031:        import org.griphyn.cPlanner.namespace.VDS;
0032:
0033:        import org.griphyn.cPlanner.partitioner.graph.GraphNode;
0034:        import org.griphyn.cPlanner.partitioner.graph.Graph;
0035:        import org.griphyn.cPlanner.partitioner.graph.Adapter;
0036:
0037:        import org.griphyn.cPlanner.selector.ReplicaSelector;
0038:        import org.griphyn.cPlanner.selector.replica.ReplicaSelectorFactory;
0039:
0040:        import org.griphyn.cPlanner.transfer.Refiner;
0041:        import org.griphyn.cPlanner.transfer.refiner.RefinerFactory;
0042:
0043:        import org.griphyn.common.catalog.ReplicaCatalog;
0044:        import org.griphyn.common.catalog.ReplicaCatalogEntry;
0045:
0046:        import org.griphyn.common.catalog.replica.ReplicaFactory;
0047:
0048:        import org.griphyn.common.catalog.transformation.TCMode;
0049:
0050:        import org.griphyn.common.util.FactoryException;
0051:
0052:        import org.griphyn.vdl.euryale.FileFactory;
0053:        import org.griphyn.vdl.euryale.VirtualDecimalHashedFileFactory;
0054:        import org.griphyn.vdl.euryale.VirtualFlatFileFactory;
0055:
0056:        import java.io.File;
0057:
0058:        import java.util.Enumeration;
0059:        import java.util.HashSet;
0060:        import java.util.Iterator;
0061:        import java.util.List;
0062:        import java.util.Set;
0063:        import java.util.Vector;
0064:        import java.util.Properties;
0065:        import java.io.IOException;
0066:
0067:        /**
0068:         * The transfer engine, which on the basis of the pools on which the jobs are to
0069:         * run, adds nodes to transfer the data products.
0070:         *
0071:         * @author Karan Vahi
0072:         * @author Gaurang Mehta
0073:         * @version $Revision: 290 $
0074:         *
0075:         */
0076:        public class TransferEngine extends Engine {
0077:
0078:            /**
0079:             * The name of the source key for Replica Catalog Implementer that serves as
0080:             * cache
0081:             */
0082:            public static final String TRANSIENT_REPLICA_CATALOG_KEY = "file";
0083:
0084:            /**
0085:             * The name of the Replica Catalog Implementer that serves as the source for
0086:             * cache files.
0087:             */
0088:            public static final String TRANSIENT_REPLICA_CATALOG_IMPLEMENTER = "SimpleFile";
0089:
0090:            /**
0091:             * The classname of the class that stores the file transfer information for
0092:             * a transfer object.
0093:             *
0094:             * @see org.griphyn.cPlanner.classes.FileTransfer
0095:             */
0096:            private static final String FILE_TX_CLASS_NAME = "org.griphyn.cPlanner.classes.FileTransfer";
0097:
0098:            /**
0099:             * The DAG object to which the transfer nodes are to be added. This is the
0100:             * reduced Dag, which is got from the Reduction Engine.
0101:             */
0102:            private ADag mDag;
0103:
0104:            /**
0105:             * The original Dag object constructed after running the DaxParser. This is
0106:             * required to transfer files to the output pool for any leaf jobs that may
0107:             * have been deleted.
0108:             */
0109:            //    private ADag mOriginalDag;
0110:
0111:            /**
0112:             * The bridge to the Replica Catalog.
0113:             */
0114:            private ReplicaCatalogBridge mRCBridge;
0115:
0116:            /**
0117:             * The handle to the replica selector that is to used to select the various
0118:             * replicas.
0119:             */
0120:            private ReplicaSelector mReplicaSelector;
0121:
0122:            /**
0123:             * The handle to the transfer refiner that adds the transfer nodes into the
0124:             * workflow.
0125:             */
0126:            private Refiner mTXRefiner;
0127:
0128:            /**
0129:             * Holds the jobs from the original dags which are deleted by the reduction
0130:             * algorithm.
0131:             */
0132:            private Vector mvDelLeafJobs;
0133:
0134:            /**
0135:             * A SimpleFile Replica Catalog, that tracks all the files that are being
0136:             * materialized as part of workflow executaion.
0137:             */
0138:            private ReplicaCatalog mTransientRC;
0139:
0140:            /**
0141:             * The handle to the file factory, that is  used to create the top level
0142:             * directories for each of the partitions.
0143:             */
0144:            private FileFactory mFactory;
0145:
0146:            /**
0147:             * The base path for the stageout directory on the output site where all
0148:             * the files are staged out.
0149:             */
0150:            private String mStageOutBaseDirectory;
0151:
0152:            /**
0153:             * Overloaded constructor.
0154:             *
0155:             * @param reducedDag  the reduced workflow.
0156:             * @param vDelLJobs    list of deleted jobs.
0157:             * @param properties the <code>PegasusProperties</code> to be used.
0158:             * @param options   The options specified by the user to run the planner.
0159:             */
0160:            public TransferEngine(ADag reducedDag, Vector vDelLJobs,
0161:                    PegasusProperties properties, PlannerOptions options) {
0162:                //call the super class constructor for initializations
0163:                super (properties);
0164:
0165:                mDag = reducedDag;
0166:                mvDelLeafJobs = vDelLJobs;
0167:                mPOptions = options;
0168:                mTCHandle = TCMode.loadInstance();
0169:
0170:                try {
0171:                    mTXRefiner = RefinerFactory.loadInstance(mProps,
0172:                            reducedDag, options);
0173:                    mReplicaSelector = ReplicaSelectorFactory
0174:                            .loadInstance(mProps);
0175:                } catch (Exception e) {
0176:                    //wrap all the exceptions into a factory exception
0177:                    throw new FactoryException("Transfer Engine ", e);
0178:                }
0179:
0180:                this .initializeStageOutSiteDirectoryFactory(reducedDag);
0181:
0182:                //log some configuration messages
0183:                mLogger.log("Transfer Refiner loaded is ["
0184:                        + mTXRefiner.getDescription() + "]",
0185:                        LogManager.CONFIG_MESSAGE_LEVEL);
0186:                mLogger.log("ReplicaSelector loaded is ["
0187:                        + mReplicaSelector.description() + "]",
0188:                        LogManager.CONFIG_MESSAGE_LEVEL);
0189:            }
0190:
0191:            /**
0192:             * Returns whether a site is third party enabled or not. A site is determined
0193:             * to be third party enabled if the transfer mode is a thirdparty based
0194:             * transfer mode like RFT or if it is specified a third party site in the
0195:             * by the property "vds.transfer.thirdparty.sites".
0196:             *
0197:             * @param site  the site handle of the site for which you want to detect
0198:             *              third party capability.
0199:             * @param type  the type of transfer job for which the URL is being constructed.
0200:             *
0201:             * @return true indicating site is third party enabled, else
0202:             *         false indicating site is not third party enabled.
0203:             */
0204:            public boolean isSiteThirdParty(String site, int type) {
0205:
0206:                return mTXRefiner.isSiteThirdParty(site, type);
0207:            }
0208:
0209:            /**
0210:             * Returns the SubInfo object for the job specified.
0211:             *
0212:             * @param jobName  the name of the job
0213:             *
0214:             * @return  the SubInfo object for a job.
0215:             */
0216:            private SubInfo getSubInfo(String jobName) {
0217:                return mDag.getSubInfo(jobName);
0218:            }
0219:
0220:            /**
0221:             * Adds the transfer nodes to the workflow.
0222:             *
0223:             * @param rcb  the bridge to the ReplicaCatalog.
0224:             */
0225:            public void addTransferNodes(ReplicaCatalogBridge rcb) {
0226:                mRCBridge = rcb;
0227:
0228:                SubInfo currentJob;
0229:                String currentJobName;
0230:                Vector vOutPoolTX;
0231:                //        int noOfJobs = mDag.getNoOfJobs();
0232:                //        int counter = 0;
0233:                String msg;
0234:                String outputSite = mPOptions.getOutputSite();
0235:
0236:                //convert the dag to a graph representation and walk it
0237:                //in a top down manner
0238:                Graph workflow = Adapter.convert(mDag);
0239:
0240:                //go through each job in turn
0241:                //        Enumeration eSubs = mDagSubInfos.elements();
0242:                //        while (eSubs.hasMoreElements() && counter < noOfJobs) {
0243:                //            counter++;
0244:                //            currentJob = (SubInfo) eSubs.nextElement();
0245:
0246:                boolean stageOut = ((outputSite != null) && (outputSite.trim()
0247:                        .length() > 0));
0248:                for (Iterator it = workflow.iterator(); it.hasNext();) {
0249:                    GraphNode node = (GraphNode) it.next();
0250:                    currentJob = (SubInfo) node.getContent();
0251:                    //set the node depth as the level
0252:                    currentJob.setLevel(node.getDepth());
0253:                    currentJobName = currentJob.getName();
0254:
0255:                    mLogger.log("", LogManager.DEBUG_MESSAGE_LEVEL);
0256:                    msg = "Job being traversed is " + currentJobName;
0257:                    mLogger.log(msg, LogManager.DEBUG_MESSAGE_LEVEL);
0258:                    msg = "To be run at " + currentJob.executionPool;
0259:                    mLogger.log(msg, LogManager.DEBUG_MESSAGE_LEVEL);
0260:
0261:                    //getting the parents of that node
0262:                    Vector vParents = mDag.getParents(currentJobName);
0263:                    mLogger.log(vectorToString("Parents of job:", vParents),
0264:                            LogManager.DEBUG_MESSAGE_LEVEL);
0265:                    processParents(currentJob, vParents);
0266:
0267:                    //transfer the nodes output files
0268:                    //to the output pool
0269:                    if (stageOut /*&& !currentJob.isTemp*/) {
0270:                        vOutPoolTX = getFileTX(outputSite, currentJob);
0271:                        mTXRefiner.addStageOutXFERNodes(currentJob, vOutPoolTX,
0272:                                rcb);
0273:                    }
0274:                }
0275:
0276:                //we are done with the traversal.
0277:                mTXRefiner.done();
0278:
0279:                //get the deleted leaf jobs o/p files to output pool
0280:                //only if output pool is specified
0281:                //should be moved upwards in the pool. redundancy at present
0282:                if (outputSite != null && outputSite.trim().length() > 0) {
0283:
0284:                    for (Enumeration e = this .mvDelLeafJobs.elements(); e
0285:                            .hasMoreElements();) {
0286:                        currentJob = (SubInfo) e.nextElement();
0287:
0288:                        //for a deleted node, to transfer it's output
0289:                        //the execution pool should be set to local i.e submit host
0290:                        currentJob.executionPool = "local";
0291:
0292:                        vOutPoolTX = getDeletedFileTX(outputSite, currentJob);
0293:                        mTXRefiner.addStageOutXFERNodes(currentJob, vOutPoolTX,
0294:                                rcb, true);
0295:                    }
0296:                }
0297:
0298:                //close the handle to the cache file if it is written
0299:                closeTransientRC();
0300:            }
0301:
0302:            /**
0303:             * This gets the file transfer objects corresponding to the location of files
0304:             * found in the replica mechanism, and transfers it to the output pool asked
0305:             * by the user. If the output pool path and the one returned by the replica
0306:             * mechanism match then that object is not transferred.
0307:             *
0308:             * @param pool    this the output pool which the user specifies at runtime.
0309:             * @param job     The SubInfo object corresponding to the leaf job which was
0310:             *                deleted by the Reduction algorithm
0311:             *
0312:             * @return        Vector of <code>FileTransfer</code> objects
0313:             */
0314:            private Vector getDeletedFileTX(String pool, SubInfo job) {
0315:                Vector vFileTX = new Vector();
0316:                SiteInfo p = mPoolHandle.getPoolEntry(pool, "vanilla");
0317:
0318:                for (Iterator it = job.getOutputFiles().iterator(); it
0319:                        .hasNext();) {
0320:                    PegasusFile pf = (PegasusFile) it.next();
0321:                    String lfn = pf.getLFN();
0322:
0323:                    ReplicaLocation rl = mRCBridge.getFileLocs(lfn);
0324:
0325:                    //selLocs are all the locations found in ReplicaMechanism corr
0326:                    //to the pool pool
0327:                    ReplicaLocation selLocs = mReplicaSelector.selectReplicas(
0328:                            rl, pool);
0329:
0330:                    boolean flag = false;
0331:
0332:                    FileTransfer ft = null;
0333:                    //checking through all the pfn's returned on the pool
0334:                    for (Iterator selIt = selLocs.pfnIterator(); selIt
0335:                            .hasNext();) {
0336:                        ReplicaCatalogEntry selLoc = (ReplicaCatalogEntry) selIt
0337:                                .next();
0338:                        String sourceURL = selLoc.getPFN();
0339:                        //definite inconsitency as url prefix and mount point
0340:                        //are not picked up from the same server
0341:                        String destURL = p.getURLPrefix(true)
0342:                                + this .getPathOnStageoutSite(lfn);
0343:                        //+                                 File.separator + lfn;
0344:
0345:                        //check if the URL's match
0346:                        if (sourceURL.trim().equalsIgnoreCase(destURL.trim())) {
0347:                            String msg = "The leaf file " + lfn
0348:                                    + " is already at the output pool " + pool;
0349:                            mLogger.log(msg, LogManager.INFO_MESSAGE_LEVEL);
0350:                            flag = true;
0351:                            break;
0352:                        }
0353:
0354:                        ft = new FileTransfer(lfn, job.getName());
0355:                        ft.addSource(selLoc.getResourceHandle(), sourceURL);
0356:                        ft.addDestination(pool, destURL);
0357:
0358:                        //System.out.println("Deleted Leaf Job File transfer object " + ft);
0359:
0360:                    }
0361:                    if (!flag) { //  adding the last pfn
0362:                        vFileTX.addElement(ft);
0363:                    }
0364:                }
0365:                return vFileTX;
0366:            }
0367:
0368:            /**
0369:             * It processes a nodes parents and determines if nodes are to be added
0370:             * or not. All the input files for the job are searched in the output files of
0371:             * the parent nodes and the Replica Mechanism.
0372:             *
0373:             * @param job       the <code>SubInfo</code> object containing all the
0374:             *                  details of the job.
0375:             * @param vParents  Vector of String objects corresponding to the Parents
0376:             *                  of the node.
0377:             */
0378:            private void processParents(SubInfo job, Vector vParents) {
0379:
0380:                Set nodeIpFiles = job.getInputFiles();
0381:                Vector vRCSearchFiles = new Vector(); //vector of PegasusFile
0382:                Vector vIPTxFiles = new Vector();
0383:                Vector vParentSubs = new Vector();
0384:
0385:                //getAll the output Files of the parents
0386:                Set parentsOutFiles = getOutputFiles(vParents, vParentSubs);
0387:
0388:                //interpool transfer of the nodes parents
0389:                //output files
0390:                Vector vInterPoolFileTX = this .getInterpoolFileTX(job,
0391:                        vParentSubs);
0392:                //only add if there are files to transfer
0393:                if (!vInterPoolFileTX.isEmpty()) {
0394:                    mTXRefiner.addInterSiteTXNodes(job, vInterPoolFileTX);
0395:                }
0396:
0397:                //check if node ip files are in the parents out files
0398:                //if files are not, then these are to be got
0399:                //from the RC based on the transiency characteristic
0400:                for (Iterator it = nodeIpFiles.iterator(); it.hasNext();) {
0401:                    PegasusFile pf = (PegasusFile) it.next();
0402:                    if (!parentsOutFiles.contains(pf)) {
0403:                        if (!pf.getTransientTransferFlag()) {
0404:                            vRCSearchFiles.addElement(pf);
0405:                        }
0406:                    }
0407:                }
0408:
0409:                if (!vRCSearchFiles.isEmpty()) {
0410:                    //get the locations from the RC
0411:                    getFilesFromRC(job, vRCSearchFiles);
0412:                }
0413:            }
0414:
0415:            /**
0416:             * This gets the Vector of FileTransfer objects for the files which have to
0417:             * be transferred to an one destination pool. It checks for the transient
0418:             * flags for files. If the transfer transient flag is set, it means the file
0419:             * does not have to be transferred to the destination pool.
0420:             *
0421:             * @param destPool The pool to which the files are to be transferred to.
0422:             * @param job      The <code>SubInfo</code>object of the job whose output files
0423:             *                 are needed at the destination pool.
0424:             *
0425:             * @return        Vector of <code>FileTransfer</code> objects
0426:             */
0427:            private Vector getFileTX(String destPool, SubInfo job) {
0428:                Vector vFileTX = new Vector();
0429:
0430:                //check if there is a remote initialdir set
0431:                String path = job.vdsNS
0432:                        .getStringValue(VDS.REMOTE_INITIALDIR_KEY);
0433:
0434:                for (Iterator it = job.getOutputFiles().iterator(); it
0435:                        .hasNext();) {
0436:                    PegasusFile pf = (PegasusFile) it.next();
0437:                    String file = pf.getLFN();
0438:
0439:                    FileTransfer ft = this .constructFileTX(pf,
0440:                            job.executionPool, destPool, job.logicalName, path);
0441:                    if (ft != null) {
0442:                        vFileTX.add(ft);
0443:
0444:                    }
0445:
0446:                }
0447:
0448:                return vFileTX;
0449:
0450:            }
0451:
0452:            /**
0453:             * Constructs the FileTransfer object on the basis of the transiency
0454:             * information. If the transient flag for transfer is set, the destURL for the
0455:             * FileTransfer object would be the execution directory, as this is the entry
0456:             * that has to be registered in the ReplicaMechanism
0457:             *
0458:             * @param pf          the PegasusFile for which the transfer has to be done.
0459:             * @param execPool    the pool on which the file is created.
0460:             * @param destPool    the output pool where the job should be transferred
0461:             * @param job         the name of the associated job.
0462:             * @param path        the path that a user specifies in the profile for key
0463:             *                    remote_initialdir that results in the workdir being
0464:             *                    changed for a job on a execution pool.
0465:             *
0466:             * @return   the corresponding FileTransfer object
0467:             */
0468:            private FileTransfer constructFileTX(PegasusFile pf,
0469:                    String execPool, String destPool, String job, String path) {
0470:
0471:                String lfn = pf.getLFN();
0472:                FileTransfer ft = null;
0473:
0474:                SiteInfo ePool = mPoolHandle.getPoolEntry(execPool, "vanilla");
0475:                SiteInfo dPool = mPoolHandle.getPoolEntry(destPool, "vanilla");
0476:                if (ePool == null || dPool == null) {
0477:                    mLogMsg = (ePool == null) ? this .poolNotFoundMsg(execPool,
0478:                            "vanilla") : this .poolNotFoundMsg(destPool,
0479:                            "vanilla");
0480:                    mLogger.log(mLogMsg, LogManager.ERROR_MESSAGE_LEVEL);
0481:                    throw new RuntimeException(mLogMsg);
0482:                }
0483:
0484:                //definite inconsitency as url prefix and mount point
0485:                //are not picked up from the same server
0486:                String execURL = ePool.getURLPrefix(true)
0487:                        + mPoolHandle.getExecPoolWorkDir(execPool, path)
0488:                        + File.separatorChar + lfn;
0489:
0490:                //write out the exec url to the cache file
0491:                trackInTransientRC(lfn, execURL, execPool);
0492:
0493:                //if both transfer and registration
0494:                //are transient return null
0495:                if (pf.getTransientRegFlag() && pf.getTransientTransferFlag()) {
0496:                    return null;
0497:                }
0498:
0499:                //if only transient transfer flag
0500:                //means destURL and sourceURL
0501:                //are same and are equal to
0502:                //execution directory on execPool
0503:                if (pf.getTransientTransferFlag()) {
0504:
0505:                    ft = new FileTransfer(lfn, job, pf.getFlags());
0506:                    //set the transfer mode
0507:                    ft.setTransferFlag(pf.getTransferFlag());
0508:                    ft.addSource(execPool, execURL);
0509:                    ft.addDestination(execPool, execURL);
0510:                }
0511:                //the source dir is the exec dir
0512:                //on exec pool and dest dir
0513:                //would be on the output pool
0514:                else {
0515:                    //construct the source url depending on whether third party tx
0516:                    String sourceURL = isSiteThirdParty(execPool,
0517:                            SubInfo.STAGE_OUT_JOB) ? execURL : "file://"
0518:                            + mPoolHandle.getExecPoolWorkDir(execPool, path)
0519:                            + File.separator + lfn;
0520:
0521:                    ft = new FileTransfer(lfn, job, pf.getFlags());
0522:                    //set the transfer mode
0523:                    ft.setTransferFlag(pf.getTransferFlag());
0524:
0525:                    ft.addSource(execPool, sourceURL);
0526:
0527:                    //add all the possible destination urls iterating through
0528:                    //the list of grid ftp servers associated with the dest pool.
0529:                    List l = mPoolHandle.getGridFTPServers(destPool);
0530:                    Iterator it = l.iterator();
0531:                    String destURL = null;
0532:                    boolean first = true;
0533:                    while (it.hasNext()) {
0534:                        destURL = (first) ?
0535:                        //the first entry has to be the one in the Pool object
0536:                        dPool.getURLPrefix(false)
0537:                                :
0538:                                //get it from the list
0539:                                ((GridFTPServer) it.next())
0540:                                        .getInfo(GridFTPServer.GRIDFTP_URL);
0541:
0542:                        if (!first && destURL.equals(dPool.getURLPrefix(false))) {
0543:                            //ensures no duplicate entries. The gridftp server in the pool
0544:                            //object is one of the servers in the list of gridftp servers.
0545:                            continue;
0546:                        }
0547:
0548:                        //assumption of same se mount point for each gridftp server
0549:                        destURL += this .getPathOnStageoutSite(lfn);//  + File.separator + lfn;
0550:
0551:                        //if the paths match of dest URI
0552:                        //and execDirURL we return null
0553:                        if (execURL.equalsIgnoreCase(destURL)) {
0554:                            /*ft = new FileTransfer(file, job);
0555:                            ft.addSource(execPool, execURL);*/
0556:                            ft.addDestination(execPool, execURL);
0557:                            //make the transfer transient?
0558:                            ft.setTransferFlag(PegasusFile.TRANSFER_NOT);
0559:                            return ft;
0560:                        }
0561:
0562:                        ft.addDestination(destPool, destURL);
0563:                        first = false;
0564:                    }
0565:
0566:                }
0567:
0568:                return ft;
0569:            }
0570:
0571:            /**
0572:             * This generates a error message for pool not found in the pool
0573:             * config file.
0574:             *
0575:             * @param poolName  the name of pool that is not found.
0576:             * @param universe  the condor universe
0577:             *
0578:             * @return the message.
0579:             */
0580:            private String poolNotFoundMsg(String poolName, String universe) {
0581:                String st = "Error: No matching entry to pool = " + poolName
0582:                        + " ,universe = " + universe
0583:                        + "\n found in the pool configuration file ";
0584:                return st;
0585:
0586:            }
0587:
0588:            /**
0589:             * This gets the Vector of FileTransfer objects for all the files which have
0590:             * to be transferred to the destination pool in case of Interpool transfers.
0591:             * Each FileTransfer object has the source and the destination URLs. the
0592:             * source URI is determined from the pool on which the jobs are executed.
0593:             *
0594:             * @param job     the job with reference to which interpool file transfers
0595:             *                need to be determined.
0596:             * @param nodes   Vector of <code> SubInfo</code> objects for the nodes, whose
0597:             *                outputfiles are to be transferred to the dest pool.
0598:             *
0599:             * @return        Vector of <code>FileTransfer</code> objects
0600:             */
0601:            private Vector getInterpoolFileTX(SubInfo job, Vector nodes) {
0602:                String destPool = job.executionPool;
0603:                //contains the remote_initialdir if specified for the job
0604:                String destRemoteDir = job.vdsNS
0605:                        .getStringValue(VDS.REMOTE_INITIALDIR_KEY);
0606:
0607:                SiteInfo desPool = mPoolHandle.getTXPoolEntry(destPool);
0608:                SiteInfo sourcePool;
0609:
0610:                Vector vFileTX = new Vector();
0611:
0612:                for (Iterator it = nodes.iterator(); it.hasNext();) {
0613:                    //get the parent job
0614:                    SubInfo pJob = (SubInfo) it.next();
0615:                    sourcePool = mPoolHandle.getTXPoolEntry(pJob.executionPool);
0616:
0617:                    if (((String) sourcePool.getInfo(SiteInfo.HANDLE))
0618:                            .equalsIgnoreCase(destPool)) {
0619:                        //no need to add transfers, as the parent job and child
0620:                        //job are run in the same directory on the pool
0621:                        continue;
0622:                    }
0623:
0624:                    String sourceURI = null;
0625:                    String thirdPartyDestURI = desPool.getURLPrefix(true)
0626:                            + mPoolHandle.getExecPoolWorkDir(destPool,
0627:                                    destRemoteDir);
0628:                    //definite inconsitency as url prefix and mount point
0629:                    //are not picked up from the same server
0630:                    String destURI = isSiteThirdParty(destPool,
0631:                            SubInfo.INTER_POOL_JOB) ?
0632:                    //construct for third party transfer
0633:                    thirdPartyDestURI
0634:                            :
0635:                            //construct for normal transfer
0636:                            "file://"
0637:                                    + mPoolHandle.getExecPoolWorkDir(destPool,
0638:                                            destRemoteDir);
0639:
0640:                    for (Iterator fileIt = pJob.getOutputFiles().iterator(); fileIt
0641:                            .hasNext();) {
0642:                        PegasusFile pf = (PegasusFile) fileIt.next();
0643:                        String outFile = pf.getLFN();
0644:
0645:                        //       Not required as input files are Sets now Karan Sept 14, 2006
0646:                        //                if (stringInPegVector(outFile, job.inputFiles)) {
0647:                        if (job.getInputFiles().contains(pf)) {
0648:                            String sourceURL = null;
0649:                            String destURL = destURI + File.separator + outFile;
0650:                            String thirdPartyDestURL = thirdPartyDestURI
0651:                                    + File.separator + outFile;
0652:                            FileTransfer ft = new FileTransfer(outFile,
0653:                                    pJob.jobName);
0654:                            ft.addDestination(destPool, destURL);
0655:
0656:                            //add all the possible source urls iterating through
0657:                            //the list of grid ftp servers associated with the dest pool.
0658:                            List l = mPoolHandle
0659:                                    .getGridFTPServers(pJob.executionPool);
0660:                            boolean first = true;
0661:                            for (Iterator it1 = l.iterator(); it1.hasNext();) {
0662:                                //definite inconsitency as url prefix and mount point
0663:                                //are not picked up from the same server
0664:                                sourceURI = (first) ?
0665:                                //the first entry has to be the one in the Pool object
0666:                                sourcePool.getURLPrefix(false)
0667:                                        :
0668:                                        //get it from the list
0669:                                        ((GridFTPServer) it1.next())
0670:                                                .getInfo(GridFTPServer.GRIDFTP_URL);
0671:
0672:                                if ((!first && sourceURI.equals(sourcePool
0673:                                        .getURLPrefix(false)))) {
0674:                                    //ensures no duplicate entries. The gridftp server in the pool
0675:                                    //object is one of the servers in the list of gridftp servers.
0676:                                    mLogger.log(
0677:                                            "Not adding inter pool file tx for "
0678:                                                    + outFile,
0679:                                            LogManager.DEBUG_MESSAGE_LEVEL);
0680:                                    continue;
0681:                                }
0682:
0683:                                sourceURI += mPoolHandle
0684:                                        .getExecPoolWorkDir(
0685:                                                pJob.executionPool,
0686:                                                pJob.vdsNS
0687:                                                        .getStringValue(VDS.REMOTE_INITIALDIR_KEY));
0688:                                sourceURL = sourceURI + File.separator
0689:                                        + outFile;
0690:
0691:                                if (!(sourceURL
0692:                                        .equalsIgnoreCase(thirdPartyDestURL))) {
0693:                                    //add the source url only if it does not match to
0694:                                    //the third party destination url
0695:                                    ft.addSource(pJob.executionPool, sourceURL);
0696:                                }
0697:                                first = false;
0698:                            }
0699:                            if (ft.isValid()) {
0700:                                //adding only if there is at least
0701:                                //a single valid transfer associated.
0702:                                vFileTX.addElement(ft);
0703:                            }
0704:                        }
0705:                    }
0706:
0707:                }
0708:
0709:                return vFileTX;
0710:
0711:            }
0712:
0713:            /**
0714:             * It looks up the RCEngine Hashtable to lookup the locations for the
0715:             * files and add nodes to transfer them. If a file is not found to be in
0716:             * the Replica Catalog the Transfer Engine flags an error and exits
0717:             *
0718:             * @param job           the <code>SubInfo</code>object for whose ipfile have
0719:             *                      to search the Replica Mechanism for.
0720:             * @param searchFiles   Vector containing the PegasusFile objects corresponding
0721:             *                      to the files that need to have their mapping looked
0722:             *                      up from the Replica Mechanism.
0723:             */
0724:            private void getFilesFromRC(SubInfo job, Vector searchFiles) {
0725:                Vector vFileTX = new Vector();
0726:                String jobName = job.logicalName;
0727:                String ePool = job.executionPool;
0728:                //contains the remote_initialdir if specified for the job
0729:                String eRemoteDir = job.vdsNS
0730:                        .getStringValue(VDS.REMOTE_INITIALDIR_KEY);
0731:                String sourceURL, destURL = null;
0732:                SiteInfo ep = mPoolHandle.getPoolEntry(ePool, "vanilla");
0733:                //we are using the pull mode for data transfer
0734:                String scheme = "file";
0735:
0736:                //sAbsPath would be just the source directory absolute path
0737:                //dAbsPath would be just the destination directory absolute path
0738:                String dAbsPath = mPoolHandle.getExecPoolWorkDir(ePool,
0739:                        eRemoteDir);
0740:                String sAbsPath = null;
0741:
0742:                //sDirURL would be the url to the source directory.
0743:                //dDirURL would be the url to the destination directoy
0744:                //and is always a networked url.
0745:                //definite inconsitency as url prefix and mount point
0746:                //are not picked up from the same server
0747:                String dDirURL = ep.getURLPrefix(true) + dAbsPath;
0748:                String sDirURL = null;
0749:                //check if the execution pool is third party or not
0750:                String destDir = (isSiteThirdParty(ePool, SubInfo.STAGE_IN_JOB)) ?
0751:                //use the full networked url to the directory
0752:                dDirURL
0753:                        :
0754:                        //use the default pull mode
0755:                        scheme
0756:                                + "://"
0757:                                + mPoolHandle.getExecPoolWorkDir(ePool,
0758:                                        eRemoteDir);
0759:
0760:                for (Iterator it = searchFiles.iterator(); it.hasNext();) {
0761:                    PegasusFile pf = (PegasusFile) it.next();
0762:                    List pfns = null;
0763:                    ReplicaLocation rl = null;
0764:
0765:                    String lfn = pf.getLFN();
0766:                    NameValue nv = null;
0767:
0768:                    //see if the pf is infact an instance of FileTransfer
0769:                    if (pf instanceof  FileTransfer) {
0770:                        //that means we should be having the source url already.
0771:                        //nv contains both the source pool and the url.
0772:                        //This happens in case of AI Planner or transfer of executables
0773:                        nv = ((FileTransfer) pf).getSourceURL();
0774:                        destURL = ((FileTransfer) pf).removeDestURL()
0775:                                .getValue();
0776:                        destURL = (isSiteThirdParty(ePool, SubInfo.STAGE_IN_JOB)) ?
0777:                        //the destination URL is already third party
0778:                        //enabled. use as it is
0779:                        destURL
0780:                                :
0781:                                //explicitly convert to file URL scheme
0782:                                scheme + "://"
0783:                                        + Utility.getAbsolutePath(destURL);
0784:                    } else {
0785:                        //query the replica services and get hold of pfn
0786:                        rl = mRCBridge.getFileLocs(lfn);
0787:                        pfns = (rl == null) ? null : rl.getPFNList();
0788:                    }
0789:
0790:                    if (pfns == null && nv == null) {
0791:                        //check to see if the input file is optional
0792:                        if (pf.fileOptional()) {
0793:                            //no need to add a transfer node for it if no location found
0794:                            continue;
0795:                        }
0796:
0797:                        //flag an error
0798:                        throw new RuntimeException(
0799:                                "TransferEngine.java: Can't determine a location to "
0800:                                        + "transfer input file for lfn " + lfn
0801:                                        + " for job " + job.getName());
0802:                    }
0803:
0804:                    /*
0805:                    ReplicaLocation selLoc = (nv == null)?
0806:                        //select from the various replicas
0807:                        mReplicaSelector.select( lfn, pfns, job.getSiteHandle() ):
0808:                        //we have the replica already selected
0809:                        new ReplicaLocation(nv.getValue(),nv.getKey());
0810:                     */
0811:                    ReplicaCatalogEntry selLoc = (nv == null) ?
0812:                    //select from the various replicas
0813:                    mReplicaSelector.selectReplica(rl, job.getSiteHandle())
0814:                            :
0815:                            //we have the replica already selected
0816:                            new ReplicaCatalogEntry(nv.getValue(), nv.getKey());
0817:
0818:                    //get the file to the job's execution pool
0819:                    //this is assuming that there are no directory paths
0820:                    //in the pfn!!!
0821:                    sDirURL = selLoc.getPFN().substring(0,
0822:                            selLoc.getPFN().lastIndexOf(File.separator));
0823:
0824:                    //try to get the directory absolute path
0825:                    //yes i know that we sending the url to directory
0826:                    //not the file.
0827:                    sAbsPath = Utility.getAbsolutePath(sDirURL);
0828:
0829:                    //the final source and destination url's to the file
0830:                    sourceURL = selLoc.getPFN();
0831:                    destURL = (destURL == null) ? destDir + File.separator
0832:                            + lfn : destURL;
0833:
0834:                    //we have all the chopped up combos of the urls.
0835:                    //do some funky matching on the basis of the fact
0836:                    //that each pool has one shared filesystem
0837:
0838:                    //match the source and dest 3rd party urls or
0839:                    //match the directory url knowing that lfn and
0840:                    //(source and dest pool) are same
0841:                    if (sourceURL.equalsIgnoreCase(dDirURL + File.separator
0842:                            + lfn)
0843:                            || (selLoc.getResourceHandle().equalsIgnoreCase(
0844:                                    ePool)
0845:                                    && lfn.equals(sourceURL.substring(sourceURL
0846:                                            .lastIndexOf(File.separator) + 1)) && sAbsPath
0847:                                    .equals(dAbsPath))) {
0848:                        //do not need to add any transfer node
0849:                        mLogger.log("Not transferring ip file " + lfn
0850:                                + " for job " + job.jobName + " to site "
0851:                                + ePool, LogManager.DEBUG_MESSAGE_LEVEL);
0852:                        continue;
0853:                    }
0854:
0855:                    //construct the file transfer object
0856:                    FileTransfer ft = (pf instanceof  FileTransfer) ? (FileTransfer) pf
0857:                            : new FileTransfer(lfn, jobName);
0858:                    //the transfer mode for the file needs to be
0859:                    //propogated for optional transfers.
0860:                    ft.setTransferFlag(pf.getTransferFlag());
0861:
0862:                    //to prevent duplicate source urls
0863:                    if (ft.getSourceURL() == null) {
0864:                        ft.addSource(selLoc.getResourceHandle(), sourceURL);
0865:                    }
0866:
0867:                    //to prevent duplicate destination urls
0868:                    if (ft.getDestURL() == null)
0869:                        ft.addDestination(ePool, destURL);
0870:                    vFileTX.add(ft);
0871:                    //we need to set destURL to null
0872:                    destURL = null;
0873:                }
0874:
0875:                //call addTransferNode
0876:                if (!vFileTX.isEmpty()) {
0877:                    mTXRefiner.addStageInXFERNodes(job, vFileTX);
0878:
0879:                }
0880:            }
0881:
0882:            /**
0883:             * It gets the output files for all the nodes which are specified in
0884:             * the Vector nodes passed.
0885:             *
0886:             * @param nodes   Vector of nodes job names whose output files are required.
0887:             *
0888:             * @param parentSubs  Vector of <code>SubInfo</code> objects. One passes an
0889:             *                    empty vector as a parameter. And this populated with
0890:             *                    SubInfo objects, of the nodes when output files are
0891:             *                    being determined.
0892:             *
0893:             * @return   Set of PegasusFile objects
0894:             */
0895:            private Set getOutputFiles(Vector nodes, Vector parentSubs) {
0896:
0897:                Set files = new HashSet();
0898:
0899:                for (Iterator it = nodes.iterator(); it.hasNext();) {
0900:                    String jobName = (String) it.next();
0901:                    SubInfo sub = getSubInfo(jobName);
0902:                    parentSubs.addElement(sub);
0903:                    files.addAll(sub.getOutputFiles());
0904:                }
0905:
0906:                return files;
0907:            }
0908:
0909:            /**
0910:             * Initializes the transient replica catalog.
0911:             */
0912:            private void initializeTransientRC() {
0913:                mLogger.log("Initialising Transient Replica Catalog",
0914:                        LogManager.DEBUG_MESSAGE_LEVEL);
0915:
0916:                Properties cacheProps = mProps.getVDSProperties()
0917:                        .matchingSubset(ReplicaCatalog.c_prefix, false);
0918:                String file = mPOptions.getSubmitDirectory()
0919:                        + File.separatorChar + getCacheFileName(mDag);
0920:
0921:                //set the appropriate property to designate path to file
0922:                cacheProps
0923:                        .setProperty(this .TRANSIENT_REPLICA_CATALOG_KEY, file);
0924:
0925:                try {
0926:                    mTransientRC = ReplicaFactory.loadInstance(
0927:                            TRANSIENT_REPLICA_CATALOG_IMPLEMENTER, cacheProps);
0928:                } catch (Exception e) {
0929:                    throw new RuntimeException(
0930:                            "Unable to initialize the transient replica catalog  "
0931:                                    + file, e);
0932:                }
0933:            }
0934:
0935:            /**
0936:             * Returns the full path on remote output site, where the lfn will reside.
0937:             * Each call to this function could trigger a change in the directory
0938:             * returned depending upon the file factory being used.
0939:             *
0940:             * @param lfn   the logical filename of the file.
0941:             *
0942:             * @return the storage mount point.
0943:             */
0944:            protected String getPathOnStageoutSite(String lfn) {
0945:                String file;
0946:                try {
0947:                    file = mFactory.createFile(lfn).toString();
0948:                } catch (IOException e) {
0949:                    throw new RuntimeException("IOException ", e);
0950:                }
0951:                return file;
0952:            }
0953:
0954:            /**
0955:             * Initialize the Stageout Site Directory factory.
0956:             * The factory is used to returns the relative directory that a particular
0957:             * file needs to be staged to on the output site.
0958:             *
0959:             * @param workflow  the workflow to which the transfer nodes need to be
0960:             *                  added.
0961:             *
0962:             */
0963:            protected void initializeStageOutSiteDirectoryFactory(ADag workflow) {
0964:                String outputSite = mPOptions.getOutputSite();
0965:                boolean stageOut = ((outputSite != null) && (outputSite.trim()
0966:                        .length() > 0));
0967:
0968:                if (!stageOut) {
0969:                    //no initialization and return
0970:                    mLogger
0971:                            .log(
0972:                                    "No initialization of StageOut Site Directory Factory",
0973:                                    LogManager.DEBUG_MESSAGE_LEVEL);
0974:                    return;
0975:                }
0976:
0977:                // create files in the directory, unless anything else is known.
0978:                mStageOutBaseDirectory = mPoolHandle
0979:                        .getSeMountPoint(mPoolHandle.getPoolEntry(outputSite,
0980:                                "vanilla"));
0981:
0982:                if (mProps.useDeepStorageDirectoryStructure()) {
0983:                    // create hashed, and levelled directories
0984:                    try {
0985:                        VirtualDecimalHashedFileFactory temp = null;
0986:
0987:                        //get the total number of files that need to be stageout
0988:                        int totalFiles = 0;
0989:                        for (Iterator it = workflow.jobIterator(); it.hasNext();) {
0990:                            SubInfo job = (SubInfo) it.next();
0991:
0992:                            //traverse through all the job output files
0993:                            for (Iterator opIt = job.getOutputFiles()
0994:                                    .iterator(); opIt.hasNext();) {
0995:                                if (!((PegasusFile) opIt.next())
0996:                                        .getTransientTransferFlag()) {
0997:                                    //means we have to stage to output site
0998:                                    totalFiles++;
0999:                                }
1000:                            }
1001:                        }
1002:
1003:                        temp = new VirtualDecimalHashedFileFactory(
1004:                                mStageOutBaseDirectory, totalFiles);
1005:
1006:                        //each stageout file  has only 1 file associated with it
1007:                        temp.setMultiplicator(1);
1008:                        mFactory = temp;
1009:                    } catch (IOException e) {
1010:                        //wrap into runtime and throw
1011:                        throw new RuntimeException(
1012:                                "While initializing HashedFileFactory", e);
1013:                    }
1014:                } else {
1015:                    try {
1016:                        //Create a flat file factory
1017:                        mFactory = new VirtualFlatFileFactory(
1018:                                mStageOutBaseDirectory); // minimum default
1019:                    } catch (IOException ioe) {
1020:                        throw new RuntimeException(
1021:                                "Unable to generate files in the submit directory ",
1022:                                ioe);
1023:                    }
1024:                }
1025:
1026:            }
1027:
1028:            /**
1029:             * Inserts an entry into the Transient RC.
1030:             *
1031:             * @param lfn  the logical name of the file.
1032:             * @param pfn  the pfn
1033:             * @param site the site handle
1034:             */
1035:            private void trackInTransientRC(String lfn, String pfn, String site) {
1036:
1037:                //check if the cache handle is initialized
1038:                if (mTransientRC == null)
1039:                    this .initializeTransientRC();
1040:
1041:                mTransientRC.insert(lfn, pfn, site);
1042:            }
1043:
1044:            /**
1045:             * Closes and writes out to the Transient Replica Catalog.
1046:             */
1047:            private void closeTransientRC() {
1048:                if (mTransientRC != null)
1049:                    mTransientRC.close();
1050:            }
1051:
1052:            /**
1053:             * Constructs the basename to the cache file that is to be used
1054:             * to log the transient files. The basename is dependant on whether the
1055:             * basename prefix has been specified at runtime or not.
1056:             *
1057:             * @param adag  the ADag object containing the workflow that is being
1058:             *              concretized.
1059:             *
1060:             * @return the name of the cache file
1061:             */
1062:            private String getCacheFileName(ADag adag) {
1063:                StringBuffer sb = new StringBuffer();
1064:                String bprefix = mPOptions.getBasenamePrefix();
1065:
1066:                if (bprefix != null) {
1067:                    //the prefix is not null using it
1068:                    sb.append(bprefix);
1069:                } else {
1070:                    //generate the prefix from the name of the dag
1071:                    sb.append(adag.dagInfo.nameOfADag).append("_").append(
1072:                            adag.dagInfo.index);
1073:                }
1074:                //append the suffix
1075:                sb.append(".cache");
1076:
1077:                return sb.toString();
1078:
1079:            }
1080:
1081:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.