Source Code Cross Referenced for CPlanner.java in  » Workflow-Engines » pegasus-2.1.0 » org » griphyn » cPlanner » toolkit » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Workflow Engines » pegasus 2.1.0 » org.griphyn.cPlanner.toolkit 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        /*
0002:         * This file or a portion of this file is licensed under the terms of
0003:         * the Globus Toolkit Public License, found in file GTPL, or at
0004:         * http://www.globus.org/toolkit/download/license.html. This notice must
0005:         * appear in redistributions of this file, with or without modification.
0006:         *
0007:         * Redistributions of this Software, with or without modification, must
0008:         * reproduce the GTPL in: (1) the Software, or (2) the Documentation or
0009:         * some other similar material which is provided with the Software (if
0010:         * any).
0011:         *
0012:         * Copyright 1999-2004 University of Chicago and The University of
0013:         * Southern California. All rights reserved.
0014:         */
0015:
0016:        package org.griphyn.cPlanner.toolkit;
0017:
0018:        import org.griphyn.cPlanner.code.CodeGenerator;
0019:        import org.griphyn.cPlanner.code.generator.CodeGeneratorFactory;
0020:
0021:        import org.griphyn.cPlanner.classes.ADag;
0022:        import org.griphyn.cPlanner.classes.DagInfo;
0023:        import org.griphyn.cPlanner.classes.NameValue;
0024:        import org.griphyn.cPlanner.classes.PlannerMetrics;
0025:        import org.griphyn.cPlanner.classes.PlannerOptions;
0026:        import org.griphyn.cPlanner.classes.PegasusBag;
0027:
0028:        import org.griphyn.cPlanner.common.PegasusProperties;
0029:        import org.griphyn.cPlanner.common.UserOptions;
0030:        import org.griphyn.cPlanner.common.LogManager;
0031:        import org.griphyn.cPlanner.common.StreamGobbler;
0032:        import org.griphyn.cPlanner.common.StreamGobblerCallback;
0033:        import org.griphyn.cPlanner.common.DefaultStreamGobblerCallback;
0034:
0035:        import org.griphyn.cPlanner.engine.MainEngine;
0036:
0037:        import org.griphyn.cPlanner.poolinfo.PoolMode;
0038:
0039:        import org.griphyn.cPlanner.parser.dax.Callback;
0040:        import org.griphyn.cPlanner.parser.dax.DAXCallbackFactory;
0041:
0042:        import org.griphyn.cPlanner.parser.pdax.PDAXCallbackFactory;
0043:
0044:        import org.griphyn.cPlanner.parser.DaxParser;
0045:        import org.griphyn.cPlanner.parser.PDAXParser;
0046:
0047:        import org.griphyn.common.catalog.work.WorkFactory;
0048:        import org.griphyn.common.catalog.WorkCatalog;
0049:
0050:        import org.griphyn.common.util.Version;
0051:        import org.griphyn.common.util.Currently;
0052:        import org.griphyn.common.util.FactoryException;
0053:
0054:        import gnu.getopt.Getopt;
0055:        import gnu.getopt.LongOpt;
0056:
0057:        import java.io.File;
0058:        import java.io.IOException;
0059:        import java.io.FilenameFilter;
0060:        import java.io.FileOutputStream;
0061:        import java.io.FileWriter;
0062:
0063:        import java.nio.channels.FileChannel;
0064:        import java.nio.channels.FileLock;
0065:
0066:        import java.util.Collection;
0067:        import java.util.List;
0068:        import java.util.Date;
0069:        import java.util.Set;
0070:        import java.util.Map;
0071:        import java.util.HashSet;
0072:        import java.util.Iterator;
0073:
0074:        import java.util.regex.Pattern;
0075:
0076:        import java.text.NumberFormat;
0077:        import java.text.DecimalFormat;
0078:        import org.griphyn.vdl.euryale.VTorInUseException;
0079:
0080:        /**
0081:         * This is the main program for the Pegasus. It parses the options specified
0082:         * by the user and calls out to the appropriate components to parse the abstract
0083:         * plan, concretize it and then write the submit files.
0084:         *
0085:         * @author Gaurang Mehta
0086:         * @author Karan Vahi
0087:         * @version $Revision: 464 $
0088:         */
0089:
0090:        public class CPlanner extends Executable {
0091:
0092:            /**
0093:             * The default megadag mode that is used for generation of megadags in
0094:             * deferred planning.
0095:             */
0096:            public static final String DEFAULT_MEGADAG_MODE = "dag";
0097:
0098:            /**
0099:             * The prefix for the submit directory.
0100:             */
0101:            public static final String SUBMIT_DIRECTORY_PREFIX = "run";
0102:
0103:            /**
0104:             * The basename of the directory that contains the submit files for the
0105:             * cleanup DAG that for the concrete dag generated for the workflow.
0106:             */
0107:            public static final String CLEANUP_DIR = "cleanup";
0108:
0109:            /**
0110:             * The final successful message that is to be logged.
0111:             */
0112:            private static final String EMPTY_FINAL_WORKFLOW_MESSAGE = "\n\n\n"
0113:                    + "The executable workflow generated contains no nodes.\n"
0114:                    + "It seems that the output files are already at the output site. \n"
0115:                    + "To regenerate the output data from scratch specify --force option."
0116:                    + "\n\n\n";
0117:
0118:            /**
0119:             * The message to be logged in case of empty executable workflow.
0120:             */
0121:            private static final String SUCCESS_MESSAGE = "\n\n\n"
0122:                    + "I have concretized your abstract workflow. The workflow has been entered \n"
0123:                    + "into the workflow database with a state of \"planned\". The next step is \n"
0124:                    + "to start or execute your workflow. The invocation required is"
0125:                    + "\n\n\n";
0126:
0127:            /**
0128:             * The object containing all the options passed to the Concrete Planner.
0129:             */
0130:            private PlannerOptions mPOptions;
0131:
0132:            /**
0133:             * The PlannerMetrics object storing the metrics about this planning instance.
0134:             */
0135:            private PlannerMetrics mPMetrics;
0136:
0137:            /**
0138:             * The number formatter to format the run submit dir entries.
0139:             */
0140:            private NumberFormat mNumFormatter;
0141:
0142:            /**
0143:             * The user name of the user running Pegasus.
0144:             */
0145:            private String mUser;
0146:
0147:            /**
0148:             * Default constructor.
0149:             */
0150:            public CPlanner() {
0151:                super ();
0152:                mLogMsg = new String();
0153:                mVersion = Version.instance().toString();
0154:                mNumFormatter = new DecimalFormat("0000");
0155:
0156:                this .mPOptions = new PlannerOptions();
0157:                mPOptions.setSubmitDirectory(".", null);
0158:                mPOptions.setExecutionSites(new java.util.HashSet());
0159:                mPOptions.setOutputSite("");
0160:
0161:                mUser = mProps.getProperty("user.name");
0162:                if (mUser == null) {
0163:                    mUser = "user";
0164:                }
0165:
0166:                mPMetrics = new PlannerMetrics();
0167:                mPMetrics.setUser(mUser);
0168:            }
0169:
0170:            /**
0171:             * The main program for the CPlanner.
0172:             *
0173:             *
0174:             * @param args the main arguments passed to the planner.
0175:             */
0176:            public static void main(String[] args) {
0177:
0178:                CPlanner cPlanner = new CPlanner();
0179:                int result = 0;
0180:                double starttime = new Date().getTime();
0181:                double execTime = -1;
0182:
0183:                try {
0184:                    cPlanner.executeCommand(args);
0185:                } catch (FactoryException fe) {
0186:                    cPlanner.log(fe.convertException(),
0187:                            LogManager.FATAL_MESSAGE_LEVEL);
0188:                    result = 2;
0189:                } catch (RuntimeException rte) {
0190:                    //catch all runtime exceptions including our own that
0191:                    //are thrown that may have chained causes
0192:                    cPlanner.log(convertException(rte),
0193:                            LogManager.FATAL_MESSAGE_LEVEL);
0194:                    result = 1;
0195:                } catch (Exception e) {
0196:                    //unaccounted for exceptions
0197:                    cPlanner
0198:                            .log(e.getMessage(), LogManager.FATAL_MESSAGE_LEVEL);
0199:                    result = 3;
0200:                } finally {
0201:                    double endtime = new Date().getTime();
0202:                    execTime = (endtime - starttime) / 1000;
0203:                }
0204:
0205:                // warn about non zero exit code
0206:                if (result != 0) {
0207:                    cPlanner.log("Non-zero exit-code " + result,
0208:                            LogManager.WARNING_MESSAGE_LEVEL);
0209:                } else {
0210:                    //log the time taken to execute
0211:                    cPlanner.log("Time taken to execute is " + execTime
0212:                            + " seconds", LogManager.INFO_MESSAGE_LEVEL);
0213:                }
0214:
0215:                System.exit(result);
0216:            }
0217:
0218:            /**
0219:             * Loads all the properties that are needed by this class.
0220:             */
0221:            public void loadProperties() {
0222:
0223:            }
0224:
0225:            /**
0226:             * Executes the command on the basis of the options specified.
0227:             *
0228:             * @param args the command line options.
0229:             */
0230:            public void executeCommand(String[] args) {
0231:                executeCommand(parseCommandLineArguments(args));
0232:            }
0233:
0234:            /**
0235:             * Executes the command on the basis of the options specified.
0236:             *
0237:             * @param options the command line options.
0238:             */
0239:            public void executeCommand(PlannerOptions options) {
0240:                String message = new String();
0241:                mPOptions = options;
0242:
0243:                //print help if asked for
0244:                if (mPOptions.getHelp()) {
0245:                    printLongVersion();
0246:                    return;
0247:                }
0248:
0249:                //set the logging level only if -v was specified
0250:                //else bank upon the the default logging level
0251:                if (mPOptions.getLoggingLevel() > 0) {
0252:                    mLogger.setLevel(mPOptions.getLoggingLevel());
0253:                }
0254:
0255:                //do sanity check on dax file
0256:                String dax = mPOptions.getDAX();
0257:                String pdax = mPOptions.getPDAX();
0258:                String baseDir = mPOptions.getBaseSubmitDirectory();
0259:
0260:                if (dax == null && pdax == null) {
0261:                    mLogger
0262:                            .log(
0263:                                    "\nNeed to specify either a dax file ( using --dax )  or a pdax file (using --pdax) to plan",
0264:                                    LogManager.INFO_MESSAGE_LEVEL);
0265:                    this .printShortVersion();
0266:                    return;
0267:                }
0268:
0269:                if (mPOptions.getPartitioningType() != null) {
0270:                    // partition and plan the workflow
0271:                    doPartitionAndPlan(mProps, options);
0272:                    return;
0273:                }
0274:
0275:                //populate planner metrics
0276:                mPMetrics.setStartTime(new Date());
0277:                mPMetrics.setVOGroup(mPOptions.getVOGroup());
0278:                mPMetrics
0279:                        .setBaseSubmitDirectory(mPOptions.getSubmitDirectory());
0280:                mPMetrics.setDAX(mPOptions.getDAX());
0281:
0282:                UserOptions opts = UserOptions.getInstance(mPOptions);
0283:
0284:                //try to get hold of the vds properties
0285:                //set in the jvm that user specifed at command line
0286:                mPOptions.setVDSProperties(mProps.getMatchingProperties(
0287:                        "pegasus.", false));
0288:
0289:                List allVDSProps = mProps.getMatchingProperties("pegasus.",
0290:                        false);
0291:                mLogger.log("Pegasus Properties set by the user",
0292:                        LogManager.CONFIG_MESSAGE_LEVEL);
0293:                for (java.util.Iterator it = allVDSProps.iterator(); it
0294:                        .hasNext();) {
0295:                    NameValue nv = (NameValue) it.next();
0296:                    mLogger.log(nv.toString(), LogManager.CONFIG_MESSAGE_LEVEL);
0297:
0298:                }
0299:
0300:                //check if sites set by user. If user has not specified any sites then
0301:                //load all sites from site catalog.
0302:                Collection eSites = mPOptions.getExecutionSites();
0303:                if (eSites.isEmpty()) {
0304:                    mLogger
0305:                            .log(
0306:                                    "No sites given by user. Will use sites from the site catalog",
0307:                                    LogManager.DEBUG_MESSAGE_LEVEL);
0308:                    List sitelist = (PoolMode.loadPoolInstance(PoolMode
0309:                            .getImplementingClass(mProps.getPoolMode()), mProps
0310:                            .getPoolFile(), PoolMode.SINGLETON_LOAD))
0311:                            .getPools();
0312:
0313:                    if (sitelist != null) {
0314:                        if (sitelist.contains("local")) {
0315:                            sitelist.remove("local");
0316:                        }
0317:                        if (sitelist.size() >= 1) {
0318:                            Set siteset = new HashSet(sitelist);
0319:                            mPOptions.setExecutionSites(siteset);
0320:                            eSites = mPOptions.getExecutionSites();
0321:                        } else {
0322:                            throw new RuntimeException(
0323:                                    "Only local site is available. "
0324:                                            + " Make sure your site catalog contains more sites");
0325:                        }
0326:                    } else {
0327:                        throw new RuntimeException(
0328:                                "No sites present in the site catalog. "
0329:                                        + "Please make sure your site catalog is correctly populated");
0330:                    }
0331:                }
0332:
0333:                if (dax == null && pdax != null && !eSites.isEmpty()) {
0334:                    //do the deferreed planning by parsing
0335:                    //the partition graph in the pdax file.
0336:                    doDeferredPlanning();
0337:                } else if (pdax == null && dax != null && !eSites.isEmpty()) {
0338:
0339:                    Callback cb = DAXCallbackFactory.loadInstance(mProps, dax,
0340:                            "DAX2CDAG");
0341:
0342:                    DaxParser daxParser = new DaxParser(dax, mProps, cb);
0343:
0344:                    ADag orgDag = (ADag) cb.getConstructedObject();
0345:
0346:                    //generate the flow ids for the classads information
0347:                    orgDag.dagInfo.generateFlowName();
0348:                    orgDag.dagInfo.setFlowTimestamp(mPOptions
0349:                            .getDateTime(mProps.useExtendedTimeStamp()));
0350:                    orgDag.dagInfo.setDAXMTime(dax);
0351:                    orgDag.dagInfo.generateFlowID();
0352:                    orgDag.dagInfo.setReleaseVersion();
0353:
0354:                    //write out a the relevant properties to submit directory
0355:                    int state = 0;
0356:                    String relativeDir; //the submit directory relative to the base specified
0357:                    try {
0358:                        //create the base directory if required
0359:                        relativeDir = (mPOptions.partOfDeferredRun()) ? null
0360:                                : (mPOptions.getRelativeSubmitDirectory() == null) ?
0361:                                //create our own relative dir
0362:                                createSubmitDirectory(
0363:                                        orgDag,
0364:                                        baseDir,
0365:                                        mUser,
0366:                                        mPOptions.getVOGroup(),
0367:                                        mProps
0368:                                                .useTimestampForDirectoryStructure())
0369:                                        : mPOptions
0370:                                                .getRelativeSubmitDirectory();
0371:                        mPOptions.setSubmitDirectory(baseDir, relativeDir);
0372:                        state++;
0373:                        mProps.writeOutProperties(mPOptions
0374:                                .getSubmitDirectory());
0375:
0376:                        mPMetrics.setRelativeSubmitDirectory(mPOptions
0377:                                .getRelativeSubmitDirectory());
0378:
0379:                        //also log in the planner metrics where the properties are
0380:                        mPMetrics.setProperties(mProps
0381:                                .getPropertiesInSubmitDirectory());
0382:                    } catch (IOException ioe) {
0383:                        String error = (state == 0) ? "Unable to write  to directory"
0384:                                : "Unable to write out properties to directory";
0385:                        throw new RuntimeException(error
0386:                                + mPOptions.getSubmitDirectory(), ioe);
0387:
0388:                    }
0389:
0390:                    //check if a random directory is specified by the user
0391:                    if (mPOptions.generateRandomDirectory()
0392:                            && mPOptions.getRandomDir() == null) {
0393:                        //user has specified the random dir name but wants
0394:                        //to go with default name which is the flow id
0395:                        //for the workflow unless a basename is specified.
0396:                        mPOptions.setRandomDir(getRandomDirectory(orgDag));
0397:                    } else if (mPOptions.getRandomDir() != null) {
0398:                        //keep the name that the user passed
0399:                    } else if (relativeDir != null) {
0400:                        //the relative directory constructed on the submit host
0401:                        //is the one required for remote sites
0402:                        mPOptions.setRandomDir(relativeDir);
0403:                    }
0404:
0405:                    //populate the singleton instance for user options
0406:                    //UserOptions opts = UserOptions.getInstance(mPOptions);
0407:                    MainEngine cwmain = new MainEngine(orgDag, mProps,
0408:                            mPOptions);
0409:
0410:                    ADag finalDag = cwmain.runPlanner();
0411:                    DagInfo ndi = finalDag.dagInfo;
0412:
0413:                    //store the workflow metrics from the final dag into
0414:                    //the planner metrics
0415:                    mPMetrics.setWorkflowMetrics(finalDag.getWorkflowMetrics());
0416:
0417:                    //we only need the script writer for daglite megadag generator mode
0418:                    CodeGenerator codeGenerator = null;
0419:                    codeGenerator = CodeGeneratorFactory.loadInstance(cwmain
0420:                            .getPegasusBag());
0421:
0422:                    //before generating the codes for the workflow check
0423:                    //for emtpy workflows
0424:                    if (finalDag.isEmpty()) {
0425:
0426:                        mLogger.log(EMPTY_FINAL_WORKFLOW_MESSAGE,
0427:                                LogManager.INFO_MESSAGE_LEVEL);
0428:                        return;
0429:                    }
0430:
0431:                    message = "Generating codes for the concrete workflow";
0432:                    log(message, LogManager.INFO_MESSAGE_LEVEL);
0433:                    try {
0434:                        codeGenerator.generateCode(finalDag);
0435:
0436:                        //generate only the braindump file that is required.
0437:                        //no spawning off the tailstatd for time being
0438:                        codeGenerator.startMonitoring();
0439:
0440:                        /*
0441:                        if (mPOptions.monitorWorkflow()) {
0442:                            //submit files successfully generated.
0443:                            //spawn off the monitoring daemon
0444:                            codeGenerator.startMonitoring();
0445:                        }
0446:                         */
0447:                    } catch (Exception e) {
0448:                        throw new RuntimeException("Unable to generate code", e);
0449:                    }
0450:                    mLogger.logCompletion(message,
0451:                            LogManager.INFO_MESSAGE_LEVEL);
0452:
0453:                    //create the submit files for cleanup dag if
0454:                    //random dir option specified
0455:                    if (mPOptions.generateRandomDirectory()) {
0456:                        ADag cleanupDAG = cwmain.getCleanupDAG();
0457:                        PlannerOptions cleanupOptions = (PlannerOptions) mPOptions
0458:                                .clone();
0459:
0460:                        //submit files are generated in a subdirectory
0461:                        //of the submit directory
0462:                        message = "Generating code for the cleanup workflow";
0463:                        mLogger.log(message, LogManager.INFO_MESSAGE_LEVEL);
0464:                        //set the submit directory in the planner options for cleanup wf
0465:                        cleanupOptions.setSubmitDirectory(cleanupOptions
0466:                                .getSubmitDirectory(), this .CLEANUP_DIR);
0467:                        PegasusBag bag = cwmain.getPegasusBag();
0468:                        bag.add(PegasusBag.PLANNER_OPTIONS, cleanupOptions);
0469:                        codeGenerator = CodeGeneratorFactory
0470:                                .loadInstance(cwmain.getPegasusBag());
0471:
0472:                        try {
0473:                            codeGenerator.generateCode(cleanupDAG);
0474:                        } catch (Exception e) {
0475:                            throw new RuntimeException(
0476:                                    "Unable to generate code", e);
0477:                        }
0478:
0479:                        mLogger.logCompletion(message,
0480:                                LogManager.INFO_MESSAGE_LEVEL);
0481:                    }
0482:
0483:                    //log entry in to the work catalog
0484:                    boolean nodatabase = logEntryInWorkCatalog(finalDag,
0485:                            baseDir, relativeDir);
0486:
0487:                    //write out  the planner metrics  to global log
0488:                    mPMetrics.setEndTime(new Date());
0489:                    writeOutMetrics(mPMetrics);
0490:
0491:                    if (mPOptions.submitToScheduler()) {//submit the jobs
0492:                        StringBuffer invocation = new StringBuffer();
0493:                        //construct the path to the bin directory
0494:                        invocation.append(mProps.getPegasusHome()).append(
0495:                                File.separator).append("bin").append(
0496:                                File.separator).append(
0497:                                getPegasusRunInvocation(nodatabase));
0498:
0499:                        boolean submit = submitWorkflow(invocation.toString());
0500:                        if (!submit) {
0501:                            throw new RuntimeException(
0502:                                    "Unable to submit the workflow using pegasus-run");
0503:                        }
0504:                    } else {
0505:                        //log the success message
0506:                        this .logSuccessfulCompletion(nodatabase);
0507:                    }
0508:                } else {
0509:                    printShortVersion();
0510:                    throw new RuntimeException(
0511:                            "Invalid combination of arguments passed");
0512:                }
0513:            }
0514:
0515:            /**
0516:             * Parses the command line arguments using GetOpt and returns a
0517:             * <code>PlannerOptions</code> contains all the options passed by the
0518:             * user at the command line.
0519:             *
0520:             * @param args  the arguments passed by the user at command line.
0521:             *
0522:             * @return the  options.
0523:             */
0524:            public PlannerOptions parseCommandLineArguments(String[] args) {
0525:                LongOpt[] longOptions = generateValidOptions();
0526:
0527:                Getopt g = new Getopt("pegasus-plan", args,
0528:                        "vhfSnzpVr::aD:d:s:o:P:c:C:b:g:2:", longOptions, false);
0529:                g.setOpterr(false);
0530:
0531:                int option = 0;
0532:                PlannerOptions options = new PlannerOptions();
0533:
0534:                while ((option = g.getopt()) != -1) {
0535:                    //System.out.println("Option tag " + (char)option);
0536:                    switch (option) {
0537:
0538:                    case 1://monitor
0539:                        options.setMonitoring(true);
0540:                        break;
0541:
0542:                    case 'z'://deferred
0543:                        options.setPartOfDeferredRun(true);
0544:                        break;
0545:
0546:                    case 'a'://authenticate
0547:                        options.setAuthentication(true);
0548:                        break;
0549:
0550:                    case 'b'://optional basename
0551:                        options.setBasenamePrefix(g.getOptarg());
0552:                        break;
0553:
0554:                    case 'c'://cache
0555:                        options.setCacheFiles(g.getOptarg());
0556:                        break;
0557:
0558:                    case 'C'://cluster
0559:                        options.setClusteringTechnique(g.getOptarg());
0560:                        break;
0561:
0562:                    case 'd'://dax
0563:                        options.setDAX(g.getOptarg());
0564:                        break;
0565:
0566:                    case 'D': //dir
0567:                        options.setSubmitDirectory(g.getOptarg(), null);
0568:                        break;
0569:
0570:                    case '2'://relative-dir
0571:                        options.setRelativeSubmitDirectory(g.getOptarg());
0572:                        break;
0573:
0574:                    case 'f'://force
0575:                        options.setForce(true);
0576:                        break;
0577:
0578:                    case 'g': //group
0579:                        options.setVOGroup(g.getOptarg());
0580:                        break;
0581:
0582:                    case 'h'://help
0583:                        options.setHelp(true);
0584:                        break;
0585:
0586:                    case 'm'://megadag option
0587:                        options.setMegaDAGMode(g.getOptarg());
0588:                        break;
0589:
0590:                    case 'n'://nocleanup option
0591:                        options.setCleanup(false);
0592:                        break;
0593:
0594:                    case 'o'://output
0595:                        options.setOutputSite(g.getOptarg());
0596:                        break;
0597:
0598:                    case 'p'://partition and plan
0599:                        options.setPartitioningType("Whole");
0600:                        break;
0601:
0602:                    case 'P'://pdax file
0603:                        options.setPDAX(g.getOptarg());
0604:                        break;
0605:
0606:                    case 'r'://randomdir
0607:                        options.setRandomDir(g.getOptarg());
0608:                        break;
0609:
0610:                    case 'S'://submit option
0611:                        options.setSubmitToScheduler(true);
0612:                        break;
0613:
0614:                    case 's'://sites
0615:                        options.setExecutionSites(g.getOptarg());
0616:                        break;
0617:
0618:                    case 'v'://verbose
0619:                        options.incrementLogging();
0620:                        break;
0621:
0622:                    case 'V'://version
0623:                        mLogger.log(getGVDSVersion(),
0624:                                LogManager.INFO_MESSAGE_LEVEL);
0625:                        System.exit(0);
0626:
0627:                    default: //same as help
0628:                        printShortVersion();
0629:                        throw new RuntimeException(
0630:                                "Incorrect option or option usage " + option);
0631:
0632:                    }
0633:                }
0634:                return options;
0635:
0636:            }
0637:
0638:            /**
0639:             * Logs an entry in the work catalog about the workflow being planned.
0640:             *
0641:             * @param dag ADag
0642:             * @param baseDir String
0643:             * @param relativeDir String
0644:             *
0645:             * @return boolean
0646:             */
0647:            protected boolean logEntryInWorkCatalog(ADag dag, String baseDir,
0648:                    String relativeDir) {
0649:                //connect to the work catalog and populate
0650:                //an entry in it for the current workflow
0651:                WorkCatalog wc = null;
0652:                boolean nodatabase = false;
0653:                try {
0654:                    wc = WorkFactory.loadInstance(mProps);
0655:                } catch (Exception e) {
0656:                    //just log and proceed
0657:                    mLogger.log("Ignoring: " + convertException(e),
0658:                            LogManager.DEBUG_MESSAGE_LEVEL);
0659:                    nodatabase = true;
0660:                }
0661:                if (wc != null) {
0662:                    wc.insert(baseDir, mPOptions.getVOGroup(), dag.getLabel(),
0663:                            new File(relativeDir == null ? "." : relativeDir)
0664:                                    .getName(), mUser, Currently.parse(dag
0665:                                    .getMTime()), Currently.parse(dag.dagInfo
0666:                                    .getFlowTimestamp()), -2);
0667:                    try {
0668:                        wc.close();
0669:                    } catch (Exception e) {
0670:                        /*ignore */
0671:                        nodatabase = true;
0672:                    }
0673:                }
0674:                return nodatabase;
0675:            }
0676:
0677:            /**
0678:             * Submits the workflow for execution using pegasus-run, a wrapper around
0679:             * pegasus-submit-dag.
0680:             *
0681:             * @param invocation    the pegasus run invocation
0682:             *
0683:             * @return boolean indicating whether could successfully submit the workflow or not.
0684:             */
0685:            public boolean submitWorkflow(String invocation) {
0686:                boolean result = false;
0687:                try {
0688:                    //set the callback and run the pegasus-run command
0689:                    Runtime r = Runtime.getRuntime();
0690:
0691:                    mLogger.log("Executing  " + invocation,
0692:                            LogManager.DEBUG_MESSAGE_LEVEL);
0693:                    Process p = r.exec(invocation);
0694:
0695:                    //spawn off the gobblers with the already initialized default callback
0696:                    StreamGobbler ips = new StreamGobbler(p.getInputStream(),
0697:                            new DefaultStreamGobblerCallback(
0698:                                    LogManager.INFO_MESSAGE_LEVEL));
0699:                    StreamGobbler eps = new StreamGobbler(p.getErrorStream(),
0700:                            new DefaultStreamGobblerCallback(
0701:                                    LogManager.ERROR_MESSAGE_LEVEL));
0702:
0703:                    ips.start();
0704:                    eps.start();
0705:
0706:                    //wait for the threads to finish off
0707:                    ips.join();
0708:                    eps.join();
0709:
0710:                    //get the status
0711:                    int status = p.waitFor();
0712:
0713:                    mLogger.log("Submission of workflow exited with status "
0714:                            + status, LogManager.DEBUG_MESSAGE_LEVEL);
0715:
0716:                    result = (status == 0) ? true : false;
0717:                } catch (IOException ioe) {
0718:                    mLogger.log("IOException while running tailstatd ", ioe,
0719:                            LogManager.ERROR_MESSAGE_LEVEL);
0720:                } catch (InterruptedException ie) {
0721:                    //ignore
0722:                }
0723:                return result;
0724:
0725:            }
0726:
0727:            /**
0728:             * Partitions and plans the workflow. First step of merging DAGMan and
0729:             * Condor
0730:             *
0731:             * @param properties   the properties passed to the planner.
0732:             * @param options      the options passed to the planner.
0733:             */
0734:            protected void doPartitionAndPlan(PegasusProperties properties,
0735:                    PlannerOptions options) {
0736:                //we first need to get the label of DAX
0737:                Callback cb = DAXCallbackFactory.loadInstance(properties,
0738:                        options.getDAX(), "DAX2Metadata");
0739:                try {
0740:                    DaxParser daxParser = new DaxParser(options.getDAX(),
0741:                            properties, cb);
0742:                } catch (Exception e) {
0743:                    //ignore
0744:                }
0745:                Map metadata = (Map) cb.getConstructedObject();
0746:                String label = (String) metadata.get("name");
0747:
0748:                String baseDir = options.getBaseSubmitDirectory();
0749:                String relativeDir = null;
0750:                //construct the submit directory structure
0751:                try {
0752:                    relativeDir = (options.getRelativeSubmitDirectory() == null) ?
0753:                    //create our own relative dir
0754:                    createSubmitDirectory(label, baseDir, mUser, options
0755:                            .getVOGroup(), properties
0756:                            .useTimestampForDirectoryStructure())
0757:                            : options.getRelativeSubmitDirectory();
0758:                } catch (IOException ioe) {
0759:                    String error = "Unable to write  to directory";
0760:                    throw new RuntimeException(error
0761:                            + options.getSubmitDirectory(), ioe);
0762:
0763:                }
0764:
0765:                options.setSubmitDirectory(baseDir, relativeDir);
0766:                mLogger.log("Submit Directory for workflow is "
0767:                        + options.getSubmitDirectory(),
0768:                        LogManager.DEBUG_MESSAGE_LEVEL);
0769:
0770:                //now let us run partitiondax
0771:                mLogger.log("Partitioning Workflow",
0772:                        LogManager.INFO_MESSAGE_LEVEL);
0773:                PartitionDAX partitionDAX = new PartitionDAX();
0774:                File dir = new File(options.getSubmitDirectory(), "dax");
0775:                String pdax = partitionDAX.partitionDAX(properties, options
0776:                        .getDAX(), dir.getAbsolutePath(), options
0777:                        .getPartitioningType());
0778:
0779:                mLogger.log("PDAX file generated is " + pdax,
0780:                        LogManager.DEBUG_MESSAGE_LEVEL);
0781:                mLogger.logCompletion("Partitioning Workflow",
0782:                        LogManager.INFO_MESSAGE_LEVEL);
0783:
0784:                //now run pegasus-plan with pdax option
0785:                CPlanner pegasusPlan = new CPlanner();
0786:                options.setDAX(null);
0787:                options.setPDAX(pdax);
0788:                options.setPartitioningType(null);
0789:
0790:                pegasusPlan.executeCommand(options);
0791:
0792:            }
0793:
0794:            /**
0795:             * Sets the basename of the random directory that is created on the remote
0796:             * sites per workflow. The name is generated by default from teh flow ID,
0797:             * unless a basename prefix is specifed at runtime in the planner options.
0798:             *
0799:             * @param dag  the DAG containing the abstract workflow.
0800:             *
0801:             * @return  the basename of the random directory.
0802:             */
0803:            protected String getRandomDirectory(ADag dag) {
0804:
0805:                //constructing the name of the dagfile
0806:                StringBuffer sb = new StringBuffer();
0807:                String bprefix = mPOptions.getBasenamePrefix();
0808:                if (bprefix != null) {
0809:                    //the prefix is not null using it
0810:                    sb.append(bprefix);
0811:                    sb.append("-");
0812:                    //append timestamp to generate some uniqueness
0813:                    sb.append(dag.dagInfo.getFlowTimestamp());
0814:                } else {
0815:                    //use the flow ID that contains the timestamp and the name both.
0816:                    sb.append(dag.dagInfo.flowID);
0817:                }
0818:                return sb.toString();
0819:            }
0820:
0821:            /**
0822:             * Tt generates the LongOpt which contain the valid options that the command
0823:             * will accept.
0824:             *
0825:             * @return array of <code>LongOpt</code> objects , corresponding to the valid
0826:             * options
0827:             */
0828:            public LongOpt[] generateValidOptions() {
0829:                LongOpt[] longopts = new LongOpt[22];
0830:
0831:                longopts[0] = new LongOpt("dir", LongOpt.REQUIRED_ARGUMENT,
0832:                        null, 'D');
0833:                longopts[1] = new LongOpt("dax", LongOpt.REQUIRED_ARGUMENT,
0834:                        null, 'd');
0835:                longopts[2] = new LongOpt("sites", LongOpt.REQUIRED_ARGUMENT,
0836:                        null, 's');
0837:                longopts[3] = new LongOpt("output", LongOpt.REQUIRED_ARGUMENT,
0838:                        null, 'o');
0839:                longopts[4] = new LongOpt("verbose", LongOpt.NO_ARGUMENT, null,
0840:                        'v');
0841:                longopts[5] = new LongOpt("help", LongOpt.NO_ARGUMENT, null,
0842:                        'h');
0843:                longopts[6] = new LongOpt("force", LongOpt.NO_ARGUMENT, null,
0844:                        'f');
0845:                longopts[7] = new LongOpt("submit", LongOpt.NO_ARGUMENT, null,
0846:                        'S');
0847:                longopts[8] = new LongOpt("version", LongOpt.NO_ARGUMENT, null,
0848:                        'V');
0849:                longopts[9] = new LongOpt("randomdir",
0850:                        LongOpt.OPTIONAL_ARGUMENT, null, 'r');
0851:                longopts[10] = new LongOpt("authenticate", LongOpt.NO_ARGUMENT,
0852:                        null, 'a');
0853:                //deferred planning options
0854:                longopts[11] = new LongOpt("pdax", LongOpt.REQUIRED_ARGUMENT,
0855:                        null, 'P');
0856:                longopts[12] = new LongOpt("cache", LongOpt.REQUIRED_ARGUMENT,
0857:                        null, 'c');
0858:                longopts[13] = new LongOpt("megadag",
0859:                        LongOpt.REQUIRED_ARGUMENT, null, 'm');
0860:                //collapsing for mpi
0861:                longopts[14] = new LongOpt("cluster",
0862:                        LongOpt.REQUIRED_ARGUMENT, null, 'C');
0863:                //more deferred planning stuff
0864:                longopts[15] = new LongOpt("basename",
0865:                        LongOpt.REQUIRED_ARGUMENT, null, 'b');
0866:                longopts[16] = new LongOpt("monitor", LongOpt.NO_ARGUMENT,
0867:                        null, 1);
0868:                longopts[17] = new LongOpt("nocleanup", LongOpt.NO_ARGUMENT,
0869:                        null, 'n');
0870:                longopts[18] = new LongOpt("group", LongOpt.REQUIRED_ARGUMENT,
0871:                        null, 'g');
0872:                longopts[19] = new LongOpt("deferred", LongOpt.NO_ARGUMENT,
0873:                        null, 'z');
0874:                longopts[20] = new LongOpt("relative-dir",
0875:                        LongOpt.REQUIRED_ARGUMENT, null, '2');
0876:                longopts[21] = new LongOpt("pap", LongOpt.NO_ARGUMENT, null,
0877:                        'p');
0878:                return longopts;
0879:            }
0880:
0881:            /**
0882:             * Prints out a short description of what the command does.
0883:             */
0884:            public void printShortVersion() {
0885:                String text = "\n $Id: CPlanner.java 464 2008-02-07 23:30:44Z vahi $ "
0886:                        + "\n "
0887:                        + getGVDSVersion()
0888:                        + "\n Usage : pegasus-plan [-Dprop  [..]] -d|-P <dax file|pdax file> "
0889:                        + " [-s site[,site[..]]] [-b prefix] [-c f1[,f2[..]]] [-f] [-m style] " /*<dag|noop|daglite>]*/
0890:                        + "\n [-a] [-b basename] [-C t1[,t2[..]]  [-D  <base dir  for o/p files>] "
0891:                        + " [ --relative-dir <relative directory to base directory> ][-g <vogroup>] [-o <output site>] "
0892:                        + "\n [-r[dir name]] [--monitor] [-S] [-n]  [-v] [-V] [-h]";
0893:
0894:                System.out.println(text);
0895:            }
0896:
0897:            /**
0898:             * Prints the long description, displaying in detail what the various options
0899:             * to the command stand for.
0900:             */
0901:            public void printLongVersion() {
0902:
0903:                String text = "\n $Id: CPlanner.java 464 2008-02-07 23:30:44Z vahi $ "
0904:                        + "\n "
0905:                        + getGVDSVersion()
0906:                        + "\n pegasus-plan - The main class which is used to run  Pegasus. "
0907:                        + "\n Usage: pegasus-plan [-Dprop  [..]] --dax|--pdax <file> [--sites <execution sites>] "
0908:                        + "\n [--authenticate] [--basename prefix] [--cache f1[,f2[..]] [--cluster t1[,t2[..]] "
0909:                        + "\n [--dir <dir for o/p files>] [--force] [--group vogroup] [--megadag style] [--monitor] [--nocleanup] "
0910:                        + "\n [--output output site] [--randomdir=[dir name]] [--verbose] [--version][--help] "
0911:                        + "\n"
0912:                        + "\n Mandatory Options "
0913:                        + "\n -d |-P fn "
0914:                        + "\n --dax|--pdax       the path to the dax file containing the abstract workflow "
0915:                        + "\n                    or the path to the pdax file containing the partition graph "
0916:                        + "\n                    generated by the partitioner."
0917:                        + "\n Other Options  "
0918:                        + "\n -a |--authenticate turn on authentication against remote sites ."
0919:                        + "\n -b |--basename     the basename prefix while constructing the per workflow files like .dag etc."
0920:                        + "\n -c |--cache        comma separated list of replica cache files."
0921:                        + "\n -C |--cluster      comma separated list of clustering techniques to be applied to the workflow to "
0922:                        + "\n                    to cluster jobs in to larger jobs, to avoid scheduling overheads."
0923:                        + "\n -D |--dir          the directory where to generate the concrete workflow."
0924:                        + "\n --relative-dir     the relative directory to the base directory where to generate the concrete workflow."
0925:                        + "\n -f |--force        skip reduction of the workflow, resulting in build style dag."
0926:                        + "\n -g |--group        the VO Group to which the user belongs "
0927:                        + "\n -m |--megadag      type of style to use while generating the megadag in deferred planning."
0928:                        + "\n -o |--output       the output site where the data products during workflow execution are transferred to."
0929:                        + "\n -s |--sites        comma separated list of executions sites on which to map the workflow."
0930:                        + "\n -r |--randomdir    create random directories on remote execution sites in which jobs are executed"
0931:                        + "\n                    can optionally specify the basename of the remote directories"
0932:                        + "\n     --monitor      monitor the execution of the workflow, using workflow monitor daemon like tailstatd."
0933:                        + "\n -n |--nocleanup    generates only the separate cleanup workflow. Does not add cleanup nodes to the concrete workflow."
0934:                        + "\n -S |--submit       submit the executable workflow generated"
0935:                        + "\n -v |--verbose      increases the verbosity of messages about what is going on"
0936:                        + "\n -V |--version      displays the version of the Pegasus Workflow Management System"
0937:                        + "\n -h |--help         generates this help."
0938:                        + "\n The following exitcodes are produced"
0939:                        + "\n 0 concrete planner planner was able to generate a concretized workflow"
0940:                        + "\n 1 an error occured. In most cases, the error message logged should give a"
0941:                        + "\n   clear indication as to where  things went wrong."
0942:                        + "\n 2 an error occured while loading a specific module implementation at runtime"
0943:                        + "\n ";
0944:
0945:                System.out.println(text);
0946:                //mLogger.log(text,LogManager.INFO_MESSAGE_LEVEL);
0947:            }
0948:
0949:            /**
0950:             * This ends up invoking the deferred planning code, that generates
0951:             * the MegaDAG that is used to submit the partitioned daxes in layers.
0952:             */
0953:            private void doDeferredPlanning() {
0954:                String mode = mPOptions.getMegaDAGMode();
0955:                mode = (mode == null) ? DEFAULT_MEGADAG_MODE : mode;
0956:
0957:                String file = mPOptions.getPDAX();
0958:
0959:                //get the name of the directory from the file
0960:                String directory = new File(file).getParent();
0961:                //System.out.println("Directory in which partitioned daxes are " + directory);
0962:
0963:                int errorStatus = 1;
0964:                ADag megaDAG = null;
0965:                try {
0966:                    //load the correct callback handler
0967:                    org.griphyn.cPlanner.parser.pdax.Callback c = PDAXCallbackFactory
0968:                            .loadInstance(mProps, mPOptions, directory);
0969:                    errorStatus = 2;
0970:
0971:                    //this is a bug. Should not be called. To be corrected by Karan
0972:                    UserOptions y = UserOptions.getInstance(mPOptions);
0973:
0974:                    //start the parsing and let the fun begin
0975:                    PDAXParser p = new PDAXParser(file, mProps);
0976:                    p.setCallback(c);
0977:                    p.startParser(file);
0978:
0979:                    megaDAG = (ADag) c.getConstructedObject();
0980:                } catch (FactoryException fe) {
0981:                    //just rethrow for time being. we need error status as 2
0982:                    throw fe;
0983:                } catch (Exception e) {
0984:                    String message;
0985:                    switch (errorStatus) {
0986:                    case 1:
0987:                        message = "Unable to load the PDAX Callback ";
0988:                        break;
0989:
0990:                    case 2:
0991:                        message = "Unable to parse the PDAX file ";
0992:                        break;
0993:
0994:                    default:
0995:                        //unreachable code
0996:                        message = "Unknown Error ";
0997:                        break;
0998:                    }
0999:                    throw new RuntimeException(message, e);
1000:                }
1001:
1002:                this .logSuccessfulCompletion(this .logEntryInWorkCatalog(
1003:                        megaDAG, mPOptions.getBaseSubmitDirectory(), mPOptions
1004:                                .getRelativeSubmitDirectory()));
1005:
1006:            }
1007:
1008:            /**
1009:             * Creates the submit directory for the workflow. This is not thread safe.
1010:             *
1011:             * @param dag     the workflow being worked upon.
1012:             * @param dir     the base directory specified by the user.
1013:             * @param user    the username of the user.
1014:             * @param vogroup the vogroup to which the user belongs to.
1015:             * @param timestampBased boolean indicating whether to have a timestamp based dir or not
1016:             *
1017:             * @return  the directory name created relative to the base directory passed
1018:             *          as input.
1019:             *
1020:             * @throws IOException in case of unable to create submit directory.
1021:             */
1022:            protected String createSubmitDirectory(ADag dag, String dir,
1023:                    String user, String vogroup, boolean timestampBased)
1024:                    throws IOException {
1025:
1026:                return createSubmitDirectory(dag.getLabel(), dir, user,
1027:                        vogroup, timestampBased);
1028:            }
1029:
1030:            /**
1031:             * Creates the submit directory for the workflow. This is not thread safe.
1032:             *
1033:             * @param label   the label of the workflow
1034:             * @param dir     the base directory specified by the user.
1035:             * @param user    the username of the user.
1036:             * @param vogroup the vogroup to which the user belongs to.
1037:             * @param timestampBased boolean indicating whether to have a timestamp based dir or not
1038:             *
1039:             * @return  the directory name created relative to the base directory passed
1040:             *          as input.
1041:             *
1042:             * @throws IOException in case of unable to create submit directory.
1043:             */
1044:            protected String createSubmitDirectory(String label, String dir,
1045:                    String user, String vogroup, boolean timestampBased)
1046:                    throws IOException {
1047:                File base = new File(dir);
1048:                StringBuffer result = new StringBuffer();
1049:
1050:                //do a sanity check on the base
1051:                sanityCheck(base);
1052:
1053:                //add the user name if possible
1054:                base = new File(base, user);
1055:                result.append(user).append(File.separator);
1056:
1057:                //add the vogroup
1058:                base = new File(base, vogroup);
1059:                sanityCheck(base);
1060:                result.append(vogroup).append(File.separator);
1061:
1062:                //add the label of the DAX
1063:                base = new File(base, label);
1064:                sanityCheck(base);
1065:                result.append(label).append(File.separator);
1066:
1067:                //create the directory name
1068:                StringBuffer leaf = new StringBuffer();
1069:                if (timestampBased) {
1070:                    leaf.append(mPOptions.getDateTime(mProps
1071:                            .useExtendedTimeStamp()));
1072:                } else {
1073:                    //get all the files in this directory
1074:                    String[] files = base
1075:                            .list(new RunDirectoryFilenameFilter());
1076:                    //find the maximum run directory
1077:                    int num, max = 1;
1078:                    for (int i = 0; i < files.length; i++) {
1079:                        num = Integer.parseInt(files[i]
1080:                                .substring(SUBMIT_DIRECTORY_PREFIX.length()));
1081:                        if (num + 1 > max) {
1082:                            max = num + 1;
1083:                        }
1084:                    }
1085:
1086:                    //create the directory name
1087:                    leaf.append(SUBMIT_DIRECTORY_PREFIX).append(
1088:                            mNumFormatter.format(max));
1089:                }
1090:                result.append(leaf.toString());
1091:                base = new File(base, leaf.toString());
1092:                mLogger.log("Directory to be created is "
1093:                        + base.getAbsolutePath(),
1094:                        LogManager.DEBUG_MESSAGE_LEVEL);
1095:                sanityCheck(base);
1096:
1097:                return result.toString();
1098:            }
1099:
1100:            /**
1101:             * Checks the destination location for existence, if it can
1102:             * be created, if it is writable etc.
1103:             *
1104:             * @param dir is the new base directory to optionally create.
1105:             *
1106:             * @throws IOException in case of error while writing out files.
1107:             */
1108:            protected static void sanityCheck(File dir) throws IOException {
1109:                if (dir.exists()) {
1110:                    // location exists
1111:                    if (dir.isDirectory()) {
1112:                        // ok, isa directory
1113:                        if (dir.canWrite()) {
1114:                            // can write, all is well
1115:                            return;
1116:                        } else {
1117:                            // all is there, but I cannot write to dir
1118:                            throw new IOException(
1119:                                    "Cannot write to existing directory "
1120:                                            + dir.getPath());
1121:                        }
1122:                    } else {
1123:                        // exists but not a directory
1124:                        throw new IOException("Destination " + dir.getPath()
1125:                                + " already "
1126:                                + "exists, but is not a directory.");
1127:                    }
1128:                } else {
1129:                    // does not exist, try to make it
1130:                    if (!dir.mkdirs()) {
1131:                        throw new IOException("Unable to create  directory "
1132:                                + dir.getPath());
1133:                    }
1134:                }
1135:            }
1136:
1137:            /**
1138:             * Writes out the planner metrics to the global log.
1139:             *
1140:             * @param pm  the metrics to be written out.
1141:             *
1142:             * @return boolean
1143:             */
1144:            protected boolean writeOutMetrics(PlannerMetrics pm) {
1145:                boolean result = false;
1146:                if (mProps.writeOutMetrics()) {
1147:                    File log = new File(mProps.getMetricsLogFile());
1148:
1149:                    //do a sanity check on the directory
1150:                    try {
1151:                        sanityCheck(log.getParentFile());
1152:                        //open the log file in append mode
1153:                        FileOutputStream fos = new FileOutputStream(log, true);
1154:
1155:                        //get an exclusive lock
1156:                        FileLock fl = fos.getChannel().lock();
1157:                        try {
1158:                            mLogger.log("Logging Planner Metrics to " + log,
1159:                                    LogManager.DEBUG_MESSAGE_LEVEL);
1160:                            //write out to the planner metrics to fos
1161:                            fos.write(pm.toString().getBytes());
1162:                        } finally {
1163:                            fl.release();
1164:                            fos.close();
1165:                        }
1166:
1167:                    } catch (IOException ioe) {
1168:                        mLogger.log("Unable to write out planner metrics ",
1169:                                ioe, LogManager.DEBUG_MESSAGE_LEVEL);
1170:                        return false;
1171:                    }
1172:
1173:                    result = true;
1174:                }
1175:                return result;
1176:            }
1177:
1178:            /**
1179:             * Logs the successful completion message.
1180:             *
1181:             * @param nodatabase boolean indicating whether to add a nodatabase option or not.
1182:             */
1183:            private void logSuccessfulCompletion(boolean nodatabase) {
1184:                StringBuffer message = new StringBuffer();
1185:                message.append(this .SUCCESS_MESSAGE).append("").append(
1186:                        getPegasusRunInvocation(nodatabase)).append("\n\n");
1187:                mLogger.log(message.toString(), LogManager.INFO_MESSAGE_LEVEL);
1188:
1189:            }
1190:
1191:            /**
1192:             * Returns the pegasus-run invocation on the workflow planned.
1193:             *
1194:             * @param nodatabase boolean indicating whether to add a nodatabase option or not.
1195:             *
1196:             * @return  the pegasus-run invocation
1197:             */
1198:            private String getPegasusRunInvocation(boolean nodatabase) {
1199:                StringBuffer result = new StringBuffer();
1200:
1201:                result.append("pegasus-run ").append(
1202:                        "-Dpegasus.user.properties=").append(
1203:                        mProps.getPropertiesInSubmitDirectory()).append(
1204:                        nodatabase ? " --nodatabase" : "").append(" ").append(
1205:                        mPOptions.getSubmitDirectory());
1206:
1207:                return result.toString();
1208:
1209:            }
1210:
1211:        }
1212:
1213:        /**
1214:         * A filename filter for identifying the run directory
1215:         *
1216:         * @author Karan Vahi vahi@isi.edu
1217:         */
1218:        class RunDirectoryFilenameFilter implements  FilenameFilter {
1219:
1220:            /**
1221:             * Store the regular expressions necessary to parse kickstart output files
1222:             */
1223:            private static final String mRegexExpression = "("
1224:                    + CPlanner.SUBMIT_DIRECTORY_PREFIX
1225:                    + ")([0-9][0-9][0-9][0-9])";
1226:
1227:            /**
1228:             * Stores compiled patterns at first use, quasi-Singleton.
1229:             */
1230:            private static Pattern mPattern = null;
1231:
1232:            /***
1233:             * Tests if a specified file should be included in a file list.
1234:             *
1235:             * @param dir the directory in which the file was found.
1236:             * @param name - the name of the file.
1237:             *
1238:             * @return  true if and only if the name should be included in the file list
1239:             *          false otherwise.
1240:             *
1241:             *
1242:             */
1243:            public boolean accept(File dir, String name) {
1244:                //compile the pattern only once.
1245:                if (mPattern == null) {
1246:                    mPattern = Pattern.compile(mRegexExpression);
1247:                }
1248:                return mPattern.matcher(name).matches();
1249:            }
1250:
1251:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.