Source Code Cross Referenced for PDAX2MDAG.java in  » Workflow-Engines » pegasus-2.1.0 » org » griphyn » cPlanner » parser » pdax » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Workflow Engines » pegasus 2.1.0 » org.griphyn.cPlanner.parser.pdax 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        /*
0002:         * This file or a portion of this file is licensed under the terms of
0003:         * the Globus Toolkit Public License, found in file GTPL, or at
0004:         * http://www.globus.org/toolkit/download/license.html. This notice must
0005:         * appear in redistributions of this file, with or without modification.
0006:         *
0007:         * Redistributions of this Software, with or without modification, must
0008:         * reproduce the GTPL in: (1) the Software, or (2) the Documentation or
0009:         * some other similar material which is provided with the Software (if
0010:         * any).
0011:         *
0012:         * Copyright 1999-2004 University of Chicago and The University of
0013:         * Southern California. All rights reserved.
0014:         */
0015:        package org.griphyn.cPlanner.parser.pdax;
0016:
0017:        import org.griphyn.cPlanner.code.CodeGenerator;
0018:        import org.griphyn.cPlanner.code.CodeGeneratorException;
0019:
0020:        import org.griphyn.cPlanner.code.generator.CodeGeneratorFactory;
0021:
0022:        import org.griphyn.cPlanner.code.gridstart.GridStartFactory;
0023:
0024:        import org.griphyn.cPlanner.classes.ADag;
0025:        import org.griphyn.cPlanner.classes.PlannerOptions;
0026:        import org.griphyn.cPlanner.classes.SubInfo;
0027:        import org.griphyn.cPlanner.classes.PegasusBag;
0028:
0029:        import org.griphyn.cPlanner.common.LogManager;
0030:        import org.griphyn.cPlanner.common.PegasusProperties;
0031:        import org.griphyn.cPlanner.common.StreamGobbler;
0032:        import org.griphyn.cPlanner.common.DefaultStreamGobblerCallback;
0033:        import org.griphyn.cPlanner.common.StreamGobblerCallback;
0034:
0035:        import org.griphyn.cPlanner.namespace.Dagman;
0036:        import org.griphyn.cPlanner.namespace.VDS;
0037:
0038:        import org.griphyn.cPlanner.partitioner.Partition;
0039:        import org.griphyn.cPlanner.partitioner.DAXWriter;
0040:
0041:        import org.griphyn.cPlanner.poolinfo.SiteFactory;
0042:
0043:        import org.griphyn.common.catalog.TransformationCatalog;
0044:        import org.griphyn.common.catalog.TransformationCatalogEntry;
0045:        import org.griphyn.common.catalog.transformation.TCMode;
0046:
0047:        import org.griphyn.vdl.euryale.FileFactory;
0048:        import org.griphyn.vdl.euryale.HashedFileFactory;
0049:        import org.griphyn.vdl.euryale.FlatFileFactory;
0050:
0051:        import org.griphyn.common.classes.TCType;
0052:
0053:        import org.griphyn.common.util.Version;
0054:        import org.griphyn.common.util.FactoryException;
0055:
0056:        import java.io.File;
0057:        import java.io.IOException;
0058:        import java.io.OutputStream;
0059:        import java.io.FileOutputStream;
0060:        import java.io.FileWriter;
0061:        import java.io.PrintWriter;
0062:        import java.io.BufferedWriter;
0063:        import java.io.FilenameFilter;
0064:
0065:        import java.util.ArrayList;
0066:        import java.util.HashMap;
0067:        import java.util.Iterator;
0068:        import java.util.List;
0069:        import java.util.Map;
0070:        import java.util.Properties;
0071:
0072:        import java.util.regex.Pattern;
0073:
0074:        import java.text.NumberFormat;
0075:        import java.text.DecimalFormat;
0076:
0077:        /**
0078:         * This callback ends up creating the megadag that contains the smaller dags
0079:         * each corresponding to the one level as identified in the pdax file
0080:         * generated by the partitioner.
0081:         *
0082:         * @author Karan Vahi
0083:         * @version $Revision: 464 $
0084:         */
0085:        public class PDAX2MDAG implements  Callback {
0086:
0087:            /**
0088:             * The SubmitWriter that has to be loaded for now.
0089:             */
0090:            public static final String CODE_GENERATOR_CLASS = CodeGeneratorFactory.CONDOR_CODE_GENERATOR_CLASS;
0091:
0092:            /**
0093:             * The prefix for the submit directory.
0094:             */
0095:            public static final String SUBMIT_DIRECTORY_PREFIX = "run";
0096:
0097:            /**
0098:             * The number of jobs into which each job in the partition graph is
0099:             * expanded to.
0100:             */
0101:            public static final int NUM_OF_EXPANDED_JOBS = 2;
0102:
0103:            /**
0104:             * The index of the head job.
0105:             */
0106:            public static final int HEAD_INDEX = 0;
0107:
0108:            /**
0109:             * The index of the tail job.
0110:             */
0111:            public static final int TAIL_INDEX = 1;
0112:
0113:            /**
0114:             * The logical name with which to query the transformation catalog for
0115:             * cPlanner executable.
0116:             */
0117:            public static final String CPLANNER_LOGICAL_NAME = "pegasus-plan";
0118:
0119:            /**
0120:             * The namespace to use for condor dagman.
0121:             */
0122:            public static final String CONDOR_DAGMAN_NAMESPACE = "condor";
0123:
0124:            /**
0125:             * The logical name with which to query the transformation catalog for the
0126:             * condor_dagman executable, that ends up running the mini dag as one
0127:             * job.
0128:             */
0129:            public static final String CONDOR_DAGMAN_LOGICAL_NAME = "dagman";
0130:
0131:            /**
0132:             * The namespace to which the job in the MEGA DAG being created refer to.
0133:             */
0134:            public static final String NAMESPACE = "pegasus";
0135:
0136:            /**
0137:             * The planner utility that needs to be called as a prescript.
0138:             */
0139:            public static final String RETRY_LOGICAL_NAME = "pegasus-plan";
0140:
0141:            /**
0142:             * The dagman knobs controlled through property. They map the property name to
0143:             * the corresponding dagman option.
0144:             */
0145:            public static final String DAGMAN_KNOBS[][] = {
0146:                    { "pegasus.dagman.maxpre", " -MaxPre " },
0147:                    { "pegasus.dagman.maxpost", " -MaxPost " },
0148:                    { "pegasus.dagman.maxjobs", " -MaxJobs " },
0149:                    { "pegasus.dagman.maxidle", " -MaxIdle " }, };
0150:
0151:            /**
0152:             * The file Separator to be used on the submit host.
0153:             */
0154:            protected static char mSeparator = File.separatorChar;
0155:
0156:            /**
0157:             * The directory in which the daxes corresponding to the partitions are
0158:             * kept. This should be the same directory where the pdax containing the
0159:             * partition graph resides.
0160:             */
0161:            private String mPDAXDirectory;
0162:
0163:            /**
0164:             * The root of the submit directory where all the submit directories for
0165:             * the various partitions reside.
0166:             */
0167:            private String mSubmitDirectory;
0168:
0169:            /**
0170:             * The abstract dag object that ends up holding the megadag.
0171:             */
0172:            private ADag mMegaDAG;
0173:
0174:            /**
0175:             * The internal map that maps the partition id to the job responsible
0176:             * for executing the partition..
0177:             */
0178:            private Map mJobMap;
0179:
0180:            /**
0181:             * The internal map that contains maps the job id of the partition to the
0182:             * head and tail jobs in the linear sequence of jobs to which the partion
0183:             * job is expanded to.
0184:             */
0185:            //private Map mSequenceMap;
0186:            /**
0187:             * The handle to the properties file.
0188:             */
0189:            private PegasusProperties mProps;
0190:
0191:            /**
0192:             * The handle to the transformation catalog.
0193:             */
0194:            private TransformationCatalog mTCHandle;
0195:
0196:            /**
0197:             * The handle to the logging object.
0198:             */
0199:            private LogManager mLogger;
0200:
0201:            /**
0202:             * The object containing the options that were given to the concrete
0203:             * planner at runtime.
0204:             */
0205:            private PlannerOptions mPOptions;
0206:
0207:            /**
0208:             * Helping clone copy of options that is used for creating runtime options
0209:             * for the different partitions.
0210:             */
0211:            private PlannerOptions mClonedPOptions;
0212:
0213:            /**
0214:             * The path to the properties file that is written out and shared by
0215:             * all partitions in the mega DAG.
0216:             */
0217:            private String mMDAGPropertiesFile;
0218:
0219:            /**
0220:             * The handle to the file factory, that is  used to create the top level
0221:             * directories for each of the partitions.
0222:             */
0223:            private FileFactory mFactory;
0224:
0225:            /**
0226:             * An instance of the default stream gobbler callback implementation that
0227:             * is used for creating symbolic links.
0228:             */
0229:            private StreamGobblerCallback mDefaultCallback;
0230:
0231:            /**
0232:             * The number formatter to format the run submit dir entries.
0233:             */
0234:            private NumberFormat mNumFormatter;
0235:
0236:            /**
0237:             * The user name of the user running Pegasus.
0238:             */
0239:            private String mUser;
0240:
0241:            /**
0242:             * A flag to store whether the parsing is complete or not.
0243:             */
0244:            private boolean mDone;
0245:
0246:            /**
0247:             * Any extra arguments that need to be passed to dagman, as determined
0248:             * from the properties file.
0249:             */
0250:            private String mDAGManKnobs;
0251:
0252:            /**
0253:             * Bag of initialization objects.
0254:             */
0255:            private PegasusBag mBag;
0256:
0257:            /**
0258:             * The overloaded constructor.
0259:             *
0260:             * @param directory the directory where the pdax and all the daxes
0261:             *                  corresponding to the partitions reside.
0262:             * @param properties the <code>PegasusProperties</code> to be used.
0263:             * @param options    the options passed to the planner.
0264:             */
0265:            public PDAX2MDAG(String directory, PegasusProperties properties,
0266:                    PlannerOptions options) {
0267:                mPDAXDirectory = directory;
0268:                mProps = properties;
0269:                mLogger = LogManager.getInstance();
0270:                mPOptions = options;
0271:                mClonedPOptions = (options == null) ? null
0272:                        : (PlannerOptions) options.clone();
0273:                mTCHandle = TCMode.loadInstance();
0274:                mMDAGPropertiesFile = null;
0275:                mNumFormatter = new DecimalFormat("0000");
0276:
0277:                mDone = false;
0278:                mUser = mProps.getProperty("user.name");
0279:                if (mUser == null) {
0280:                    mUser = "user";
0281:                }
0282:
0283:                //initialize the transformation mapper
0284:                //        mTCMapper   = Mapper.loadTCMapper( mProps.getTCMapperMode() );
0285:
0286:                //intialize the bag of objects and load the site selector
0287:                mBag = new PegasusBag();
0288:                mBag.add(PegasusBag.PEGASUS_LOGMANAGER, mLogger);
0289:                mBag.add(PegasusBag.PEGASUS_PROPERTIES, mProps);
0290:                mBag.add(PegasusBag.PLANNER_OPTIONS, options);
0291:                mBag.add(PegasusBag.TRANSFORMATION_CATALOG, mTCHandle);
0292:                //        mBag.add( PegasusBag.TRANSFORMATION_MAPPER, mTCMapper );
0293:                mBag.add(PegasusBag.PEGASUS_LOGMANAGER, mLogger);
0294:
0295:                mBag.add(PegasusBag.SITE_CATALOG, SiteFactory.loadInstance(
0296:                        properties, false));
0297:
0298:                //the default gobbler callback always log to debug level
0299:                mDefaultCallback = new DefaultStreamGobblerCallback(
0300:                        LogManager.DEBUG_MESSAGE_LEVEL);
0301:
0302:                mDAGManKnobs = constructDAGManKnobs(properties);
0303:            }
0304:
0305:            /**
0306:             * Checks the destination location for existence, if it can
0307:             * be created, if it is writable etc.
0308:             *
0309:             * @param dir is the new base directory to optionally create.
0310:             *
0311:             * @throws IOException in case of error while writing out files.
0312:             */
0313:            protected static void sanityCheck(File dir) throws IOException {
0314:                if (dir.exists()) {
0315:                    // location exists
0316:                    if (dir.isDirectory()) {
0317:                        // ok, isa directory
0318:                        if (dir.canWrite()) {
0319:                            // can write, all is well
0320:                            return;
0321:                        } else {
0322:                            // all is there, but I cannot write to dir
0323:                            throw new IOException(
0324:                                    "Cannot write to existing directory "
0325:                                            + dir.getPath());
0326:                        }
0327:                    } else {
0328:                        // exists but not a directory
0329:                        throw new IOException("Destination " + dir.getPath()
0330:                                + " already "
0331:                                + "exists, but is not a directory.");
0332:                    }
0333:                } else {
0334:                    // does not exist, try to make it
0335:                    if (!dir.mkdirs()) {
0336:                        throw new IOException(
0337:                                "Unable to create directory destination "
0338:                                        + dir.getPath());
0339:                    }
0340:                }
0341:            }
0342:
0343:            /**
0344:             * Callback when the opening tag was parsed. This contains all
0345:             * attributes and their raw values within a map. This callback can
0346:             * also be used to initialize callback-specific resources.
0347:             *
0348:             * @param attributes is a map of attribute key to attribute value
0349:             */
0350:            public void cbDocument(Map attributes) {
0351:                mMegaDAG = new ADag();
0352:                mJobMap = new HashMap();
0353:                //mSequenceMap = new HashMap();
0354:
0355:                //the name of the mega dag is set to the name
0356:                //attribute in the pdax
0357:                mMegaDAG.dagInfo.nameOfADag = (String) attributes.get("name");
0358:                mMegaDAG.dagInfo.count = (String) attributes.get("count");
0359:                mMegaDAG.dagInfo.index = (String) attributes.get("index");
0360:
0361:                // create files in the directory, unless anything else is known.
0362:                try {
0363:                    //create a submit directory structure if required
0364:                    String relativeDir = (mPOptions
0365:                            .getRelativeSubmitDirectory() == null) ? this 
0366:                            .createSubmitDirectory(mMegaDAG.getLabel(),
0367:                                    mPOptions.getSubmitDirectory(), mUser,
0368:                                    mPOptions.getVOGroup(),
0369:                                    mProps.useTimestampForDirectoryStructure())
0370:                            : mPOptions.getRelativeSubmitDirectory();
0371:
0372:                    //set the directory structure
0373:                    mPOptions.setSubmitDirectory(mPOptions
0374:                            .getBaseSubmitDirectory(), relativeDir);
0375:                    mSubmitDirectory = mPOptions.getSubmitDirectory();
0376:
0377:                    //we want to set the relative directory as the base working
0378:                    //directory for all the partition on the remote sites.
0379:                    mPOptions.setRandomDir(relativeDir);
0380:
0381:                    mFactory = new FlatFileFactory(mSubmitDirectory); // minimum default
0382:                } catch (IOException ioe) {
0383:                    throw new RuntimeException(
0384:                            "Unable to generate files in the submit directory ",
0385:                            ioe);
0386:                }
0387:
0388:                // not in the PDAX format currently
0389:                String s = (String) attributes.get("partitionCount");
0390:
0391:                // create hashed, and levelled directories
0392:                try {
0393:                    HashedFileFactory temp = null;
0394:                    int partCount = (s == null) ?
0395:                    //determine at runtime the number of partitions
0396:                    getPartitionCount(mPOptions.getPDAX())
0397:                            : Integer.parseInt(s);
0398:
0399:                    //if ( m_minlevel > 0 && m_minlevel > jobCount ) jobCount = m_minlevel;
0400:                    if (partCount > 0)
0401:                        temp = new HashedFileFactory(mSubmitDirectory,
0402:                                partCount);
0403:                    else
0404:                        temp = new HashedFileFactory(mPDAXDirectory);
0405:
0406:                    //each job creates at creates the following files
0407:                    //  - submit file
0408:                    //  - out file
0409:                    //  - error file
0410:                    //  - prescript log
0411:                    //  - the partition directory
0412:                    temp.setMultiplicator(5);
0413:
0414:                    //we want a minimum of one level always for clarity
0415:                    temp.setLevels(1);
0416:
0417:                    //for the time being and test set files per directory to 50
0418:                    /*
0419:                    temp.setFilesPerDirectory( 40 );
0420:                    temp.setLevelsFromTotals(partCount);
0421:                     */
0422:
0423:                    mFactory = temp;
0424:
0425:                    //write out all the properties into a temp file
0426:                    //in the root submit directory
0427:                    //mMDAGPropertiesFile = writeOutProperties( mSubmitDirectory );
0428:                    mMDAGPropertiesFile = mProps
0429:                            .writeOutProperties(mSubmitDirectory);
0430:
0431:                } catch (NumberFormatException nfe) {
0432:                    String error = (s == null) ? "Unspecified number for jobCount"
0433:                            : "Illegal number \"" + s
0434:                                    + "\" for partition count";
0435:                    throw new RuntimeException(error);
0436:                } catch (IOException e) {
0437:                    //figure out where error happened
0438:                    String message = (mMDAGPropertiesFile == null) ? "Unable to write out properties file in base submit directory"
0439:                            : "Base directory creation";
0440:
0441:                    //wrap into runtime and throw
0442:                    throw new RuntimeException(message, e);
0443:                }
0444:
0445:            }
0446:
0447:            /**
0448:             * Callback for the partition . These partitions are completely
0449:             * assembled, but each is passed separately.
0450:             *
0451:             * @param partition is the PDAX-style partition.
0452:             */
0453:            public void cbPartition(Partition partition) {
0454:                String name = partition.getName();
0455:                int index = partition.getIndex();
0456:                ArrayList sequenceList = new ArrayList(NUM_OF_EXPANDED_JOBS);
0457:                String tailJob;
0458:                SubInfo job;
0459:
0460:                //get the filename of the dax file containing the partition
0461:                String dax = DAXWriter.getPDAXFilename(name, index);
0462:
0463:                //construct the path to the file
0464:                dax = mPDAXDirectory + File.separator + dax;
0465:                File partitionDirectory;
0466:                try {
0467:                    partitionDirectory = mFactory
0468:                            .createFile(getBaseName(partition));
0469:                    partitionDirectory.mkdirs();
0470:
0471:                    //construct a symlink to the dax file in the partition directory
0472:                    if (!createSymlink(dax, partitionDirectory)) {
0473:                        mLogger
0474:                                .log(
0475:                                        "Unable to create symlinks of the dax file to submit dir",
0476:                                        LogManager.WARNING_MESSAGE_LEVEL);
0477:                    }
0478:                } catch (IOException e) {
0479:                    //wrap and throw
0480:                    throw new RuntimeException(
0481:                            "Unable to create partition submit directory ", e);
0482:                }
0483:
0484:                //construct the appropriate vds-submit-dag job with the
0485:                //prescript set as an invocation to gencdag etc.
0486:                job = constructDAGJob(partition, partitionDirectory, dax);
0487:
0488:                //add to the workflow
0489:                mMegaDAG.add(job);
0490:
0491:                //map the partition id to the the job that is constructed.
0492:                mJobMap.put(partition.getID(), job);
0493:
0494:                /**
0495:                String jobName = getPegasusJobName(name,index);
0496:                //populate the internal job map with jobname and id
0497:                mJobMap.put(partition.getID(),getPegasusJobName(name,index));
0498:
0499:                //add the sub info for it
0500:                job = constructPegasusJob(jobName, file);
0501:                mMegaDAG.add(job);
0502:
0503:
0504:
0505:                //generate the dagman job that ends up submitting
0506:                //the mini dag corresponding to the partition
0507:                //mMegaDAG.addNewJob(getJobName(name,index));
0508:                tailJob = "condor_submit_" + jobName ;
0509:                job = constructCondorSubmitJob(tailJob,name,index);
0510:                mMegaDAG.add(job);
0511:
0512:                //put the sequence list
0513:                sequenceList.add(HEAD_INDEX,jobName);
0514:                sequenceList.add(TAIL_INDEX,tailJob);
0515:                mSequenceMap.put(jobName,sequenceList);
0516:
0517:                //add the relation between jobname and tail job
0518:                mMegaDAG.addNewRelation(jobName,tailJob);
0519:                 */
0520:
0521:            }
0522:
0523:            /**
0524:             * Callback for child and parent relationships from section 3. This ties
0525:             * in the relations between the partitions to the relations between the jobs
0526:             * that are responsible for partitions. In addition, appropriate cache
0527:             * file arguments are generated.
0528:             *
0529:             * @param child is the IDREF of the child element.
0530:             * @param parents is a list of IDREFs of the included parents.
0531:             */
0532:            public void cbParents(String child, List parents) {
0533:                String cacheName;
0534:                String cacheArgs = null;
0535:
0536:                //get hold of the constructed job for the child.
0537:                //the name of the jobs are treated as ID's
0538:                SubInfo cJob = getJob(child);
0539:                String cID = cJob.getName();
0540:
0541:                //glue in the sequences for the expanded things together
0542:
0543:                if (!parents.isEmpty()) {
0544:                    //the megadag should not be invoked with cache option for time being
0545:                    cacheArgs = " --cache ";
0546:                }
0547:
0548:                //traverse through the parents to put in the relations
0549:                //and the cache file arguments.
0550:                String pID;
0551:                SubInfo pJob;
0552:                for (Iterator it = parents.iterator(); it.hasNext();) {
0553:                    //get the parent job and name
0554:                    pJob = (SubInfo) mJobMap.get(it.next());
0555:                    pID = pJob.getName();
0556:
0557:                    mLogger.log("Adding Relation " + pID + "->" + cID,
0558:                            LogManager.DEBUG_MESSAGE_LEVEL);
0559:                    mMegaDAG.addNewRelation(pID, cID);
0560:
0561:                    //we need to specify the cache files for those partitions
0562:                    //even if they are not constructed. there is a disconnect
0563:                    //as to how the names are being generated. There should be
0564:                    //a call to one function only.
0565:                    cacheName = getCacheFilePath(pJob);
0566:                    cacheArgs += cacheName + ",";
0567:                }
0568:
0569:                //stuff the arguments back into replanner prescript.
0570:                //should be a callout to a different function for portability
0571:                String args = cJob.getPreScriptArguments();
0572:                //System.out.println("Arguments are " + args);
0573:                cJob.setPreScript(cJob.getPreScriptPath(), (cacheArgs == null) ?
0574:                //remains the same
0575:                args
0576:                        :
0577:                        //remove the last instance of , from cache args
0578:                        args
0579:                                + cacheArgs.substring(0, cacheArgs
0580:                                        .lastIndexOf(',')));
0581:
0582:            }
0583:
0584:            /**
0585:             * Callback when the parsing of the document is done. This ends up
0586:             * triggering the writing of the condor submit files corresponding to the
0587:             * mega dag.
0588:             */
0589:            public void cbDone() {
0590:                mDone = true;
0591:
0592:                //generate the classad's options
0593:                //for the Mega DAG
0594:                mMegaDAG.dagInfo.generateFlowName();
0595:                mMegaDAG.dagInfo.setFlowTimestamp(mPOptions.getDateTime(mProps
0596:                        .useExtendedTimeStamp()));
0597:                mMegaDAG.dagInfo.setDAXMTime(mPOptions.getPDAX());
0598:                mMegaDAG.dagInfo.generateFlowID();
0599:                mMegaDAG.dagInfo.setReleaseVersion();
0600:
0601:                CodeGenerator codeGenerator = null;
0602:                int state = 0;
0603:                try {
0604:                    //load the Condor Writer that understands HashedFile Factories.
0605:                    codeGenerator = CodeGeneratorFactory.loadInstance(mBag,
0606:                            CODE_GENERATOR_CLASS);
0607:                    state = 1;
0608:                    codeGenerator.generateCode(mMegaDAG);
0609:
0610:                    //generate only the braindump file that is required.
0611:                    //no spawning off the tailstatd for time being
0612:                    codeGenerator.startMonitoring();
0613:
0614:                } catch (FactoryException fe) {
0615:                    throw new FactoryException("PDAX2MDAG", fe);
0616:                } catch (Exception e) {
0617:                    throw new RuntimeException(
0618:                            "Error while generating code for the workflow", e);
0619:                }
0620:
0621:            }
0622:
0623:            /**
0624:             * Returns the MEGADAG that is generated
0625:             *
0626:             * @return  ADag object containing the mega daga
0627:             */
0628:            public Object getConstructedObject() {
0629:                if (!mDone)
0630:                    throw new RuntimeException(
0631:                            "Method called before the megadag "
0632:                                    + " was fully generated");
0633:
0634:                return mMegaDAG;
0635:            }
0636:
0637:            /**
0638:             * Constructs a job that plans and submits the partitioned workflow,
0639:             * referred to by a Partition. The main job itself is a condor dagman job
0640:             * that submits the concrete workflow. The concrete workflow is generated by
0641:             * running the planner in the prescript for the job.
0642:             *
0643:             * @param partition   the partition corresponding to which the job has to be
0644:             *                    constructed.
0645:             * @param directory   the submit directory where the submit files for the
0646:             *                    partition should reside.
0647:             * @param dax         the absolute path to the partitioned dax file that
0648:             *                    corresponds to this partition.
0649:             *
0650:             * @return the constructed DAG job.
0651:             */
0652:            protected SubInfo constructDAGJob(Partition partition,
0653:                    File directory, String dax) {
0654:                //for time being use the old functions.
0655:                SubInfo job = new SubInfo();
0656:                //the parent directory where the submit file for condor dagman has to
0657:                //reside. the submit files for the corresponding partition are one level
0658:                //deeper.
0659:                String parentDir = directory.getParent();
0660:
0661:                //set the logical transformation
0662:                job.setTransformation(CONDOR_DAGMAN_NAMESPACE,
0663:                        CONDOR_DAGMAN_LOGICAL_NAME, null);
0664:
0665:                //set the logical derivation attributes of the job.
0666:                job.setDerivation(CONDOR_DAGMAN_NAMESPACE,
0667:                        CONDOR_DAGMAN_LOGICAL_NAME, null);
0668:
0669:                //always runs on the submit host
0670:                job.setSiteHandle("local");
0671:
0672:                //set the partition id only as the unique id
0673:                //for the time being.
0674:                //        job.setName(partition.getID());
0675:
0676:                //set the logical id for the job same as the partition id.
0677:                job.setLogicalID(partition.getID());
0678:
0679:                //figure out the relative submit directory where the dagman job should
0680:                //reside. It should be one level up from the partition directory.
0681:                String dir = "";
0682:                dir += (parentDir.equals(mSubmitDirectory)) ?
0683:                //the directory is same as the root
0684:                dir
0685:                        :
0686:                        //get the relative from root
0687:                        parentDir.substring(mSubmitDirectory.length());
0688:                //        job.setSubmitDirectory(dir);
0689:
0690:                //construct the name of the job as a deep lfn with a directory path
0691:                StringBuffer name = new StringBuffer();
0692:
0693:                //get the part from the first file separator onwards
0694:                name.append((dir.indexOf(File.separatorChar) == 0) ? dir
0695:                        .substring(1) : dir.substring(0));
0696:
0697:                //append a file separator in the end if dir was some name
0698:                if (dir.length() > 1) {
0699:                    name.append(File.separatorChar);
0700:                }
0701:
0702:                //set the basename for the deep lfn
0703:                name.append(partition.getID());
0704:                //System.out.println (" The name is " + name.toString());
0705:                job.setName(name.toString());
0706:
0707:                List entries;
0708:                TransformationCatalogEntry entry = null;
0709:
0710:                //get the path to condor dagman
0711:                try {
0712:                    entries = mTCHandle.getTCEntries(job.namespace,
0713:                            job.logicalName, job.version, job.getSiteHandle(),
0714:                            TCType.INSTALLED);
0715:                    entry = (entries == null) ? null :
0716:                    //Gaurang assures that if no record is found then
0717:                            //TC Mechanism returns null
0718:                            (TransformationCatalogEntry) entries.get(0);
0719:                } catch (Exception e) {
0720:                    throw new RuntimeException(
0721:                            "ERROR: While accessing the Transformation Catalog",
0722:                            e);
0723:                }
0724:                if (entry == null) {
0725:                    //throw appropriate error
0726:                    throw new RuntimeException(
0727:                            "ERROR: Entry not found in tc for job "
0728:                                    + job.getCompleteTCName() + " on site "
0729:                                    + job.getSiteHandle());
0730:                }
0731:
0732:                //set the path to the executable and environment string
0733:                job.executable = entry.getPhysicalTransformation();
0734:                //the environment variable are set later automatically from the tc
0735:                //job.envVariables = entry.envString;
0736:
0737:                //the job itself is the main job of the super node
0738:                //construct the classad specific information
0739:                job.jobID = job.getName();
0740:                job.jobClass = SubInfo.COMPUTE_JOB;
0741:
0742:                //directory where all the dagman related files for the nested dagman
0743:                //reside. Same as the directory passed as an input parameter
0744:                dir = directory.getAbsolutePath();
0745:
0746:                //make the initial dir point to the submit file dir for the partition
0747:                //we can do this as we are running this job both on local host, and scheduler
0748:                //universe. Hence, no issues of shared filesystem or anything.
0749:                job.condorVariables.construct("initialdir", dir);
0750:
0751:                //construct the argument string, with all the dagman files
0752:                //being generated in the partition directory. Using basenames as
0753:                //initialdir has been specified for the job.
0754:                StringBuffer sb = new StringBuffer();
0755:
0756:                sb.append(" -f -l . -Debug 3").append(" -Lockfile ").append(
0757:                        getBasename(partition, ".dag.lock")).append(" -Dag ")
0758:                        .append(getBasename(partition, ".dag")).append(
0759:                                " -Rescue ").append(
0760:                                getBasename(partition, ".dag.rescue")).append(
0761:                                " -Condorlog ").append(
0762:                                getBasename(partition, ".log"));
0763:
0764:                //pass any dagman knobs that were specified in properties file
0765:                sb.append(this .mDAGManKnobs);
0766:
0767:                //put in the environment variables that are required
0768:                job.envVariables.construct("_CONDOR_DAGMAN_LOG",
0769:                        getAbsolutePath(partition, dir, ".dag.dagman.out"));
0770:                job.envVariables.construct("_CONDOR_MAX_DAGMAN_LOG", "0");
0771:
0772:                //set the arguments for the job
0773:                job.setArguments(sb.toString());
0774:
0775:                //the environment need to be propogated for exitcode to be picked up
0776:                job.condorVariables.construct("getenv", "TRUE");
0777:
0778:                job.condorVariables.construct("remove_kill_sig", "SIGUSR1");
0779:
0780:                //the log file for condor dagman for the dagman also needs to be created
0781:                //it is different from the log file that is shared by jobs of
0782:                //the partition. That is referred to by Condorlog
0783:
0784:                //       keep the log file common for all jobs and dagman albeit without
0785:                //       dag.dagman.log suffix
0786:                //       job.condorVariables.construct("log", getAbsolutePath( partition, dir,".dag.dagman.log"));
0787:
0788:                //       String dagName = mMegaDAG.dagInfo.nameOfADag;
0789:                //       String dagIndex= mMegaDAG.dagInfo.index;
0790:                //       job.condorVariables.construct("log", dir + mSeparator +
0791:                //                                     dagName + "_" + dagIndex + ".log");
0792:
0793:                //incorporate profiles from the transformation catalog
0794:                //and properties for the time being. Not from the site catalog.
0795:
0796:                //the profile information from the transformation
0797:                //catalog needs to be assimilated into the job
0798:                //overriding the one from pool catalog.
0799:                job.updateProfiles(entry);
0800:
0801:                //the profile information from the properties file
0802:                //is assimilated overidding the one from transformation
0803:                //catalog.
0804:                job.updateProfiles(mProps);
0805:
0806:                //constructed the main job. now construct the prescript
0807:                //the log file resides in the directory where the condor_dagman
0808:                //job resides i.e the parent directory.
0809:                StringBuffer log = new StringBuffer();
0810:                log.append(parentDir).append(mSeparator).append(
0811:                        partition.getID()).append(".pre.log");
0812:                //set the prescript for the job in the dagman namespace
0813:                setPrescript(job, dax, log.toString());
0814:
0815:                //construct the braindump file for tailstatd invocations
0816:                //the dag should be same as the one passed in the arguments string!
0817:                StringBuffer dag = new StringBuffer();
0818:                dag.append(dir).append(mSeparator).append(
0819:                        getBasename(partition, ".dag"));
0820:
0821:                //we do not want the job to be launched via kickstart
0822:                //Fix for VDS bug number 143
0823:                //http://bugzilla.globus.org/vds/show_bug.cgi?id=143
0824:                job.vdsNS
0825:                        .construct(
0826:                                VDS.GRIDSTART_KEY,
0827:                                GridStartFactory.GRIDSTART_SHORT_NAMES[GridStartFactory.NO_GRIDSTART_INDEX]);
0828:
0829:                return job;
0830:            }
0831:
0832:            /**
0833:             * Writes out the braindump.txt file for a partition in the partition submit
0834:             * directory. The braindump.txt file is used for passing to the tailstatd
0835:             * daemon that monitors the state of execution of the workflow.
0836:             *
0837:             * @param directory   the directory in which the braindump file needs to
0838:             *                    be written to.
0839:             * @param partition   the partition for which the braindump is to be written out.
0840:             * @param dax         the dax file
0841:             * @param dag         the dag file
0842:             *
0843:             * @return the absolute path to the braindump file.txt written in the directory.
0844:             *
0845:             * @throws IOException in case of error while writing out file.
0846:             */
0847:            protected String writeOutBraindump(File directory,
0848:                    Partition partition, String dax, String dag)
0849:                    throws IOException {
0850:
0851:                //sanity check on the directory
0852:                sanityCheck(directory);
0853:
0854:                //create a writer to the braindump.txt in the directory.
0855:                File f = new File(directory, "braindump.txt");
0856:                PrintWriter writer = new PrintWriter(new BufferedWriter(
0857:                        new FileWriter(f)));
0858:
0859:                //store absolute path to dir just once
0860:                String absPath = directory.getAbsolutePath();
0861:
0862:                //assemble all the contents in a buffer before writing out
0863:                StringBuffer contents = new StringBuffer();
0864:                contents.append("dax ").append(dax).append("\n").append("dag ")
0865:                        .append(dag).append("\n").append("run ")
0866:                        .append(absPath).append("\n").append("jsd ").append(
0867:                                absPath).append(mSeparator).append(
0868:                                "jobstate.log").append("\n").append("rundir ")
0869:                        .append(directory.getName()).append("\n").append(
0870:                                "pegasushome ").append(mProps.getPegasusHome())
0871:                        .append("\n").append("vogroup pegasus").append("\n").//for time being
0872:                        append("label " + partition.getName());
0873:
0874:                writer.write(contents.toString());
0875:                writer.close();
0876:
0877:                return f.getAbsolutePath();
0878:            }
0879:
0880:            /**
0881:             * Writes out the properties to a temporary file in the directory passed.
0882:             *
0883:             * @param directory   the directory in which the properties file needs to
0884:             *                    be written to.
0885:             *
0886:             * @return the absolute path to the properties file written in the directory.
0887:             *
0888:             * @throws IOException in case of error while writing out file.
0889:             */
0890:            protected String writeOutProperties(String directory)
0891:                    throws IOException {
0892:                File dir = new File(directory);
0893:
0894:                //sanity check on the directory
0895:                sanityCheck(dir);
0896:
0897:                //we only want to write out the VDS properties for time being
0898:                Properties properties = mProps.matchingSubset("pegasus", true);
0899:
0900:                //create a temporary file in directory
0901:                File f = File.createTempFile("pegasus.", ".properties", dir);
0902:
0903:                //the header of the file
0904:                StringBuffer header = new StringBuffer(64);
0905:                header.append("PEGASUS USER PROPERTIES AT RUNTIME \n").append(
0906:                        "#ESCAPES IN VALUES ARE INTRODUCED");
0907:
0908:                //create an output stream to this file and write out the properties
0909:                OutputStream os = new FileOutputStream(f);
0910:                properties.store(os, header.toString());
0911:                os.close();
0912:
0913:                return f.getAbsolutePath();
0914:            }
0915:
0916:            /**
0917:             * Sets the prescript that ends up calling to the default wrapper that
0918:             * introduces retry into Pegasus for a particular job.
0919:             *
0920:             * @param job      the job whose prescript needs to be set.
0921:             * @param daxURL   the path to the dax file on the filesystem.
0922:             * @param log      the file where the output of the prescript needs to be
0923:             *                 redirected to.
0924:             *
0925:             * @see #RETRY_LOGICAL_NAME
0926:             */
0927:            protected void setPrescript(SubInfo job, String daxURL, String log) {
0928:                setPrescript(job, daxURL, log, this .NAMESPACE,
0929:                        RETRY_LOGICAL_NAME, null);
0930:            }
0931:
0932:            /**
0933:             * Sets the prescript that ends up calling to the default wrapper that
0934:             * introduces retry into Pegasus for a particular job.
0935:             *
0936:             * @param job       the job whose prescript needs to be set.
0937:             * @param daxURL    the path to the dax file on the filesystem.
0938:             * @param log       the file where the output of the prescript needs to be
0939:             *                  redirected to.
0940:             * @param namespace the namespace of the replanner utility.
0941:             * @param name      the logical name of the replanner.
0942:             * @param version   the version of the replanner to be picked up.
0943:             *
0944:             */
0945:            protected void setPrescript(SubInfo job, String daxURL, String log,
0946:                    String namespace, String name, String version) {
0947:                String site = job.getSiteHandle();
0948:                TransformationCatalogEntry entry = null;
0949:
0950:                //get the path to script wrapper from the
0951:                try {
0952:                    List entries = mTCHandle.getTCEntries(namespace, name,
0953:                            version, site, TCType.INSTALLED);
0954:
0955:                    //get the first entry from the list returned
0956:                    entry = (entries == null) ? null :
0957:                    //Gaurang assures that if no record is found then
0958:                            //TC Mechanism returns null
0959:                            ((TransformationCatalogEntry) entries.get(0));
0960:                } catch (Exception e) {
0961:                    throw new RuntimeException(
0962:                            "ERROR: While accessing the Transformation Catalog",
0963:                            e);
0964:                }
0965:
0966:                //construct the prescript path
0967:                StringBuffer script = new StringBuffer();
0968:                if (entry == null) {
0969:                    //log to debug
0970:                    mLogger
0971:                            .log(
0972:                                    "Constructing the default path to the replanner for prescript",
0973:                                    LogManager.DEBUG_MESSAGE_LEVEL);
0974:
0975:                    //construct the default path to the executable
0976:                    script.append(mProps.getPegasusHome()).append(mSeparator)
0977:                            .append("bin").append(mSeparator).append(
0978:                                    RETRY_LOGICAL_NAME);
0979:                } else {
0980:                    script.append(entry.getPhysicalTransformation());
0981:                }
0982:
0983:                //the output of the prescript i.e submit files should be created
0984:                //in the directory where the job is being run.
0985:                mClonedPOptions.setSubmitDirectory((String) job.condorVariables
0986:                        .get("initialdir"));
0987:
0988:                //generate the remote working directory for the paritition
0989:                String submit = mClonedPOptions.getSubmitDirectory(); // like /tmp/vahi/pegasus/blackdiamond/run0001/00/PID1
0990:                String remoteBase = mPOptions.getRandomDir(); // like vahi/pegasus/blackdiamond/run0001
0991:                String remoteWorkDir = submit.substring(submit
0992:                        .indexOf(remoteBase)); //gets us vahi/pegasus/blackdiamond/run0001/00/PID1
0993:                mClonedPOptions.setRandomDir(remoteWorkDir);
0994:                mLogger.log("Remote working directory set to " + remoteWorkDir
0995:                        + " for partition " + job.getID(),
0996:                        LogManager.DEBUG_MESSAGE_LEVEL);
0997:
0998:                //set the basename for the nested dag as the ID of the job.
0999:                //which is actually the basename of the deep lfn job name!!
1000:                mClonedPOptions.setBasenamePrefix(getBasenamePrefix(job));
1001:
1002:                //set the flag designating that the planning invocation is part
1003:                //of a deferred planning run
1004:                mClonedPOptions.setPartOfDeferredRun(true);
1005:
1006:                //in case of deferred planning cleanup wont work
1007:                //explicitly turn it off
1008:                mClonedPOptions.setCleanup(false);
1009:
1010:                //we want monitoring to happen
1011:                mClonedPOptions.setMonitoring(true);
1012:
1013:                //construct the argument string.
1014:                //add the jvm options and the pegasus options if any
1015:                StringBuffer arguments = new StringBuffer();
1016:                arguments./*append( mPOptions.toJVMOptions())*/
1017:                append(" -Dpegasus.user.properties=").append(
1018:                        mMDAGPropertiesFile).append(" -Dpegasus.log.*=")
1019:                        .append(log).
1020:                        //the dax argument is diff for each partition
1021:                        append(" --dax ").append(daxURL).
1022:                        //put in all the other options.
1023:                        append(mClonedPOptions.toOptions());
1024:
1025:                //set the path and the arguments to prescript
1026:                job.setPreScript(script.toString(), arguments.toString());
1027:            }
1028:
1029:            /**
1030:             * Returns the base name of the submit directory in which the submit files
1031:             * for a particular partition reside.
1032:             *
1033:             * @param partition  the partition for which the base directory is to be
1034:             *                   constructed.
1035:             *
1036:             * @return the base name of the partition.
1037:             */
1038:            protected String getBaseName(Partition partition) {
1039:                String id = partition.getID();
1040:                StringBuffer sb = new StringBuffer(id.length() + 1);
1041:                sb.append('P').append(id);
1042:                return sb.toString();
1043:            }
1044:
1045:            /**
1046:             * Returns the absolute path to a dagman (usually) related file for a
1047:             * particular partition in the submit directory that is passed as an input
1048:             * parameter. This does not create the file, just returns an absolute path
1049:             * to it. Useful for constructing argument string for condor_dagman.
1050:             *
1051:             * @param partition  the partition for which the dagman is responsible for
1052:             *                   execution.
1053:             * @param directory  the directory where the file should reside.
1054:             * @param suffix     the suffix for the file basename.
1055:             *
1056:             * @return  the absolute path to a file in the submit directory.
1057:             */
1058:            protected String getAbsolutePath(Partition partition,
1059:                    String directory, String suffix) {
1060:
1061:                StringBuffer sb = new StringBuffer();
1062:                //add a prefix P to partition id
1063:                sb.append(directory).append(mSeparator).append(
1064:                        getBasename(partition, suffix));
1065:
1066:                return sb.toString();
1067:            }
1068:
1069:            /**
1070:             * Returns the basename of a dagman (usually) related file for a particular
1071:             * partition.
1072:             *
1073:             * @param partition  the partition for which the dagman is responsible for
1074:             *                   execution.
1075:             * @param suffix     the suffix for the file basename.
1076:             *
1077:             * @return the basename.
1078:             */
1079:            protected String getBasename(Partition partition, String suffix) {
1080:                StringBuffer sb = new StringBuffer(16);
1081:                //add a prefix P
1082:                sb.append('P').append(partition.getID()).append(suffix);
1083:                return sb.toString();
1084:            }
1085:
1086:            /**
1087:             * Returns the basename prefix of a dagman (usually) related file for a
1088:             * a job that submits nested dagman.
1089:             *
1090:             * @param job    the job that submits a nested dagman.
1091:             *
1092:             * @return the basename.
1093:             */
1094:            protected String getBasenamePrefix(SubInfo job) {
1095:                StringBuffer sb = new StringBuffer(8);
1096:                //add a prefix P
1097:                sb.append('P').append(job.getLogicalID());
1098:                return sb.toString();
1099:            }
1100:
1101:            /**
1102:             * Returns the full path to a cache file that corresponds for one partition.
1103:             * The cache file resides in the submit directory for the partition for which
1104:             * the job is responsible for.
1105:             *
1106:             * @param job  the job running on the submit host that submits the partition.
1107:             *
1108:             * @return the full path to the file.
1109:             */
1110:            protected String getCacheFilePath(SubInfo job) {
1111:                StringBuffer sb = new StringBuffer();
1112:
1113:                //cache file is being generated in the initialdir set for the job.
1114:                //intialdir is set correctly to the submit directory for nested dag.
1115:                sb.append(job.condorVariables.get("initialdir")).append(
1116:                        File.separatorChar).append(getBasenamePrefix(job))
1117:                        .append(".cache");
1118:
1119:                return sb.toString();
1120:            }
1121:
1122:            /**
1123:             * Returns the number of partitions referred to in the PDAX file.
1124:             *
1125:             * @param source  the source file that has to be symlinked.
1126:             * @param destDir the destination directory where the symlink has to be
1127:             *                placed.
1128:             *
1129:             * @return the number of partitions in the pdax file.
1130:             */
1131:            protected boolean createSymlink(String source, File destDir) {
1132:                boolean result = false;
1133:
1134:                //do some sanity checks on the source and the destination
1135:                File f = new File(source);
1136:                if (!f.exists() || !f.canRead()) {
1137:                    mLogger.log("The source for symlink does not exist "
1138:                            + source, LogManager.ERROR_MESSAGE_LEVEL);
1139:                    return result;
1140:                }
1141:                if (!destDir.exists() || !destDir.isDirectory()
1142:                        || !destDir.canWrite()) {
1143:                    mLogger.log(
1144:                            "The destination directory cannot be written to "
1145:                                    + destDir, LogManager.ERROR_MESSAGE_LEVEL);
1146:                    return result;
1147:                }
1148:
1149:                try {
1150:                    //set the callback and run the grep command
1151:                    Runtime r = Runtime.getRuntime();
1152:                    String command = "ln -s " + source + " "
1153:                            + destDir.getAbsolutePath();
1154:                    mLogger.log("Creating symlink " + command,
1155:                            LogManager.DEBUG_MESSAGE_LEVEL);
1156:                    Process p = r.exec(command);
1157:
1158:                    //spawn off the gobblers with the already initialized default callback
1159:                    StreamGobbler ips = new StreamGobbler(p.getInputStream(),
1160:                            mDefaultCallback);
1161:                    StreamGobbler eps = new StreamGobbler(p.getErrorStream(),
1162:                            mDefaultCallback);
1163:
1164:                    ips.start();
1165:                    eps.start();
1166:
1167:                    //wait for the threads to finish off
1168:                    ips.join();
1169:                    eps.join();
1170:
1171:                    //get the status
1172:                    int status = p.waitFor();
1173:                    if (status != 0) {
1174:                        mLogger.log("Command " + command
1175:                                + " exited with status " + status,
1176:                                LogManager.DEBUG_MESSAGE_LEVEL);
1177:                        return result;
1178:                    }
1179:                    result = true;
1180:                } catch (IOException ioe) {
1181:                    mLogger.log("IOException while creating symbolic links ",
1182:                            ioe, LogManager.ERROR_MESSAGE_LEVEL);
1183:                } catch (InterruptedException ie) {
1184:                    //ignore
1185:                }
1186:                return result;
1187:            }
1188:
1189:            /**
1190:             * Returns the number of partitions referred to in the PDAX file.
1191:             *
1192:             * @param pdax  the path to the pdax file.
1193:             *
1194:             * @return the number of partitions in the pdax file.
1195:             */
1196:            protected int getPartitionCount(String pdax) {
1197:                int result = 0;
1198:                File f = new File(pdax);
1199:                if (!f.exists() || !f.canRead()) {
1200:                    throw new RuntimeException("PDAX File is unreadable "
1201:                            + pdax);
1202:                }
1203:
1204:                try {
1205:                    //set the callback and run the grep command
1206:                    String word = "<partition";
1207:                    GrepCallback c = new GrepCallback(word);
1208:                    Runtime r = Runtime.getRuntime();
1209:                    String env[] = { "PATH=/bin:/usr/bin" };
1210:                    String command = "grep " + word + " " + pdax;
1211:                    Process p = r.exec(command, env);
1212:
1213:                    //spawn off the gobblers
1214:                    StreamGobbler ips = new StreamGobbler(p.getInputStream(), c);
1215:                    StreamGobbler eps = new StreamGobbler(p.getErrorStream(),
1216:                            new StreamGobblerCallback() {
1217:                                //we cannot log to any of the default stream
1218:                                LogManager mLogger = LogManager.getInstance();
1219:
1220:                                public void work(String s) {
1221:                                    mLogger.log(
1222:                                            "Output on stream gobller error stream "
1223:                                                    + s,
1224:                                            LogManager.DEBUG_MESSAGE_LEVEL);
1225:                                }
1226:                            });
1227:                    ips.start();
1228:                    eps.start();
1229:
1230:                    //wait for the threads to finish off
1231:                    ips.join();
1232:                    result = c.getCount();
1233:                    eps.join();
1234:
1235:                    //get the status
1236:                    int status = p.waitFor();
1237:                    if (status != 0) {
1238:                        mLogger.log("Command " + command
1239:                                + " exited with status " + status,
1240:                                LogManager.WARNING_MESSAGE_LEVEL);
1241:                    }
1242:
1243:                } catch (IOException ioe) {
1244:                    mLogger.log(
1245:                            "IOException while determining partition count ",
1246:                            ioe, LogManager.ERROR_MESSAGE_LEVEL);
1247:                } catch (InterruptedException ie) {
1248:                    //ignore
1249:                }
1250:                return result;
1251:            }
1252:
1253:            /**
1254:             * Returns the job that has been constructed for a particular partition.
1255:             *
1256:             * @param id  the partition id.
1257:             *
1258:             * @return the corresponding job, else null if not found.
1259:             */
1260:            protected SubInfo getJob(String id) {
1261:                Object obj = mJobMap.get(id);
1262:                return (obj == null) ? null : (SubInfo) obj;
1263:            }
1264:
1265:            /**
1266:             * Creates the submit directory for the workflow. This is not thread safe.
1267:             *
1268:             * @param label   the label of the workflow being worked upon.
1269:             * @param dir     the base directory specified by the user.
1270:             * @param user    the username of the user.
1271:             * @param vogroup the vogroup to which the user belongs to.
1272:             * @param timestampBased boolean indicating whether to have a timestamp based dir or not
1273:             *
1274:             * @return  the directory name created relative to the base directory passed
1275:             *          as input.
1276:             *
1277:             * @throws IOException in case of unable to create submit directory.
1278:             */
1279:            protected String createSubmitDirectory(String label, String dir,
1280:                    String user, String vogroup, boolean timestampBased)
1281:                    throws IOException {
1282:                File base = new File(dir);
1283:                StringBuffer result = new StringBuffer();
1284:
1285:                //do a sanity check on the base
1286:                sanityCheck(base);
1287:
1288:                //add the user name if possible
1289:                base = new File(base, user);
1290:                result.append(user).append(File.separator);
1291:
1292:                //add the vogroup
1293:                base = new File(base, vogroup);
1294:                sanityCheck(base);
1295:                result.append(vogroup).append(File.separator);
1296:
1297:                //add the label of the DAX
1298:                base = new File(base, label);
1299:                sanityCheck(base);
1300:                result.append(label).append(File.separator);
1301:
1302:                //create the directory name
1303:                StringBuffer leaf = new StringBuffer();
1304:                if (timestampBased) {
1305:                    leaf.append(mPOptions.getDateTime(mProps
1306:                            .useExtendedTimeStamp()));
1307:                } else {
1308:                    //get all the files in this directory
1309:                    String[] files = base
1310:                            .list(new RunDirectoryFilenameFilter());
1311:                    //find the maximum run directory
1312:                    int num, max = 1;
1313:                    for (int i = 0; i < files.length; i++) {
1314:                        num = Integer.parseInt(files[i]
1315:                                .substring(SUBMIT_DIRECTORY_PREFIX.length()));
1316:                        if (num + 1 > max) {
1317:                            max = num + 1;
1318:                        }
1319:                    }
1320:
1321:                    //create the directory name
1322:                    leaf.append(SUBMIT_DIRECTORY_PREFIX).append(
1323:                            mNumFormatter.format(max));
1324:                }
1325:                result.append(leaf.toString());
1326:                base = new File(base, leaf.toString());
1327:                mLogger.log("Directory to be created is "
1328:                        + base.getAbsolutePath(),
1329:                        LogManager.DEBUG_MESSAGE_LEVEL);
1330:                sanityCheck(base);
1331:
1332:                return result.toString();
1333:            }
1334:
1335:            /**
1336:             * Constructs Any extra arguments that need to be passed to dagman, as determined
1337:             * from the properties file.
1338:             *
1339:             * @param properties   the <code>PegasusProperties</code>
1340:             *
1341:             * @return any arguments to be added, else empty string
1342:             */
1343:            protected String constructDAGManKnobs(PegasusProperties properties) {
1344:                StringBuffer sb = new StringBuffer();
1345:
1346:                //get all the values for the dagman knows
1347:                int value;
1348:                for (int i = 0; i < this .DAGMAN_KNOBS.length; i++) {
1349:                    value = parseInt(properties
1350:                            .getProperty(this .DAGMAN_KNOBS[i][0]));
1351:                    if (value > 0) {
1352:                        //add the option
1353:                        sb.append(this .DAGMAN_KNOBS[i][1]);
1354:                        sb.append(value);
1355:                    }
1356:                }
1357:                return sb.toString();
1358:
1359:            }
1360:
1361:            /**
1362:             * Parses a string into an integer. Non valid values returned as -1
1363:             *
1364:             * @param s  the String to be parsed as integer
1365:             *
1366:             * @return the int value if valid, else -1
1367:             */
1368:            protected int parseInt(String s) {
1369:                int value = -1;
1370:                try {
1371:                    value = Integer.parseInt(s);
1372:                } catch (Exception e) {
1373:                    //ignore
1374:                }
1375:                return value;
1376:            }
1377:
1378:            /**
1379:             * A small utility method that constructs the name of the Condor files
1380:             * that are generated when a dag is submitted. The default separator _ is
1381:             * used.
1382:             *
1383:             * @param name  the name attribute in the partition element of the pdax.
1384:             * @param index the partition number of the partition.
1385:             * @param suffix the suffix that needs to be added to the filename.
1386:             *
1387:             * @return the name of the condor file.
1388:             */
1389:            private String getCondorFileName(String name, int index,
1390:                    String suffix) {
1391:                return getCondorFileName(name, index, suffix, "_");
1392:            }
1393:
1394:            /**
1395:             * A small utility method that constructs the name of the Condor files
1396:             * that are generated when a dag is submitted.
1397:             *
1398:             * @param name  the name attribute in the partition element of the pdax.
1399:             * @param index the partition number of the partition.
1400:             * @param suffix the suffix that needs to be added to the filename
1401:             * @param separator the separator that is to be used while constructing
1402:             *                  the filename.
1403:             *
1404:             * @return the name of the condor file
1405:             */
1406:            private String getCondorFileName(String name, int index,
1407:                    String suffix, String separator) {
1408:                StringBuffer sb = new StringBuffer();
1409:                //all the files reside in the submit file
1410:                //directory specified by the user.
1411:                //sb.append(mPOptions.submitFileDir).append(File.separator);
1412:                sb.append(name).append(separator).append(index).append(suffix);
1413:
1414:                return sb.toString();
1415:            }
1416:
1417:            /**
1418:             * An inner class, that implements the StreamGobblerCallback to count
1419:             * the occurences of a word in a document.
1420:             *
1421:             */
1422:            private class GrepCallback implements  StreamGobblerCallback {
1423:
1424:                /**
1425:                 * The word that is to be searched for.
1426:                 */
1427:                private String mWord;
1428:
1429:                /**
1430:                 * The length of the word to be searched for.
1431:                 */
1432:                private int mWordLength;
1433:
1434:                /**
1435:                 * The number of times the word appears.
1436:                 */
1437:                private int mCount;
1438:
1439:                /**
1440:                 * Overloaded Constructor.
1441:                 *
1442:                 * @param word  the word to be searched for.
1443:                 */
1444:                public GrepCallback(String word) {
1445:                    mWord = word;
1446:                    mWordLength = (word == null) ? 0 : word.length();
1447:                    mCount = 0;
1448:                }
1449:
1450:                /**
1451:                 * Callback whenever a line is read from the stream by the StreamGobbler.
1452:                 * Counts the occurences of the word that are in the line, and
1453:                 * increments to the global counter.
1454:                 *
1455:                 * @param line   the line that is read.
1456:                 */
1457:                public void work(String line) {
1458:
1459:                    //sanity check to prevent infinite iterations
1460:                    if (mWordLength == 0)
1461:                        return;
1462:
1463:                    int start = 0;
1464:                    int index;
1465:                    while ((index = line.indexOf(mWord, start)) != -1) {
1466:                        mCount++;
1467:                        start = index + mWordLength;
1468:                    }
1469:                }
1470:
1471:                /**
1472:                 * Returns the number of words counted so far.
1473:                 *
1474:                 * @return the number of words
1475:                 */
1476:                public int getCount() {
1477:                    return mCount;
1478:                }
1479:
1480:                /**
1481:                 * Resets the internal counters.
1482:                 */
1483:                public void reset() {
1484:                    mCount = 0;
1485:                }
1486:
1487:            }
1488:        }
1489:
1490:        /**
1491:         * A filename filter for identifying the run directory
1492:         *
1493:         * @author Karan Vahi vahi@isi.edu
1494:         */
1495:        class RunDirectoryFilenameFilter implements  FilenameFilter {
1496:
1497:            /**
1498:             * Store the regular expressions necessary to parse kickstart output files
1499:             */
1500:            private static final String mRegexExpression = "("
1501:                    + PDAX2MDAG.SUBMIT_DIRECTORY_PREFIX
1502:                    + ")([0-9][0-9][0-9][0-9])";
1503:
1504:            /**
1505:             * Stores compiled patterns at first use, quasi-Singleton.
1506:             */
1507:            private static Pattern mPattern = null;
1508:
1509:            /***
1510:             * Tests if a specified file should be included in a file list.
1511:             *
1512:             * @param dir the directory in which the file was found.
1513:             * @param name - the name of the file.
1514:             *
1515:             * @return  true if and only if the name should be included in the file list
1516:             *          false otherwise.
1517:             *
1518:             *
1519:             */
1520:            public boolean accept(File dir, String name) {
1521:                //compile the pattern only once.
1522:                if (mPattern == null) {
1523:                    mPattern = Pattern.compile(mRegexExpression);
1524:                }
1525:                return mPattern.matcher(name).matches();
1526:            }
1527:
1528:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.