0001: /*
0002: * This file or a portion of this file is licensed under the terms of
0003: * the Globus Toolkit Public License, found in file GTPL, or at
0004: * http://www.globus.org/toolkit/download/license.html. This notice must
0005: * appear in redistributions of this file, with or without modification.
0006: *
0007: * Redistributions of this Software, with or without modification, must
0008: * reproduce the GTPL in: (1) the Software, or (2) the Documentation or
0009: * some other similar material which is provided with the Software (if
0010: * any).
0011: *
0012: * Copyright 1999-2004 University of Chicago and The University of
0013: * Southern California. All rights reserved.
0014: */
0015:
0016: package org.griphyn.cPlanner.toolkit;
0017:
0018: import org.griphyn.cPlanner.code.CodeGenerator;
0019: import org.griphyn.cPlanner.code.generator.CodeGeneratorFactory;
0020:
0021: import org.griphyn.cPlanner.classes.ADag;
0022: import org.griphyn.cPlanner.classes.DagInfo;
0023: import org.griphyn.cPlanner.classes.NameValue;
0024: import org.griphyn.cPlanner.classes.PlannerMetrics;
0025: import org.griphyn.cPlanner.classes.PlannerOptions;
0026: import org.griphyn.cPlanner.classes.PegasusBag;
0027:
0028: import org.griphyn.cPlanner.common.PegasusProperties;
0029: import org.griphyn.cPlanner.common.UserOptions;
0030: import org.griphyn.cPlanner.common.LogManager;
0031: import org.griphyn.cPlanner.common.StreamGobbler;
0032: import org.griphyn.cPlanner.common.StreamGobblerCallback;
0033: import org.griphyn.cPlanner.common.DefaultStreamGobblerCallback;
0034:
0035: import org.griphyn.cPlanner.engine.MainEngine;
0036:
0037: import org.griphyn.cPlanner.poolinfo.PoolMode;
0038:
0039: import org.griphyn.cPlanner.parser.dax.Callback;
0040: import org.griphyn.cPlanner.parser.dax.DAXCallbackFactory;
0041:
0042: import org.griphyn.cPlanner.parser.pdax.PDAXCallbackFactory;
0043:
0044: import org.griphyn.cPlanner.parser.DaxParser;
0045: import org.griphyn.cPlanner.parser.PDAXParser;
0046:
0047: import org.griphyn.common.catalog.work.WorkFactory;
0048: import org.griphyn.common.catalog.WorkCatalog;
0049:
0050: import org.griphyn.common.util.Version;
0051: import org.griphyn.common.util.Currently;
0052: import org.griphyn.common.util.FactoryException;
0053:
0054: import gnu.getopt.Getopt;
0055: import gnu.getopt.LongOpt;
0056:
0057: import java.io.File;
0058: import java.io.IOException;
0059: import java.io.FilenameFilter;
0060: import java.io.FileOutputStream;
0061: import java.io.FileWriter;
0062:
0063: import java.nio.channels.FileChannel;
0064: import java.nio.channels.FileLock;
0065:
0066: import java.util.Collection;
0067: import java.util.List;
0068: import java.util.Date;
0069: import java.util.Set;
0070: import java.util.Map;
0071: import java.util.HashSet;
0072: import java.util.Iterator;
0073:
0074: import java.util.regex.Pattern;
0075:
0076: import java.text.NumberFormat;
0077: import java.text.DecimalFormat;
0078: import org.griphyn.vdl.euryale.VTorInUseException;
0079:
0080: /**
0081: * This is the main program for the Pegasus. It parses the options specified
0082: * by the user and calls out to the appropriate components to parse the abstract
0083: * plan, concretize it and then write the submit files.
0084: *
0085: * @author Gaurang Mehta
0086: * @author Karan Vahi
0087: * @version $Revision: 464 $
0088: */
0089:
0090: public class CPlanner extends Executable {
0091:
0092: /**
0093: * The default megadag mode that is used for generation of megadags in
0094: * deferred planning.
0095: */
0096: public static final String DEFAULT_MEGADAG_MODE = "dag";
0097:
0098: /**
0099: * The prefix for the submit directory.
0100: */
0101: public static final String SUBMIT_DIRECTORY_PREFIX = "run";
0102:
0103: /**
0104: * The basename of the directory that contains the submit files for the
0105: * cleanup DAG that for the concrete dag generated for the workflow.
0106: */
0107: public static final String CLEANUP_DIR = "cleanup";
0108:
0109: /**
0110: * The final successful message that is to be logged.
0111: */
0112: private static final String EMPTY_FINAL_WORKFLOW_MESSAGE = "\n\n\n"
0113: + "The executable workflow generated contains no nodes.\n"
0114: + "It seems that the output files are already at the output site. \n"
0115: + "To regenerate the output data from scratch specify --force option."
0116: + "\n\n\n";
0117:
0118: /**
0119: * The message to be logged in case of empty executable workflow.
0120: */
0121: private static final String SUCCESS_MESSAGE = "\n\n\n"
0122: + "I have concretized your abstract workflow. The workflow has been entered \n"
0123: + "into the workflow database with a state of \"planned\". The next step is \n"
0124: + "to start or execute your workflow. The invocation required is"
0125: + "\n\n\n";
0126:
0127: /**
0128: * The object containing all the options passed to the Concrete Planner.
0129: */
0130: private PlannerOptions mPOptions;
0131:
0132: /**
0133: * The PlannerMetrics object storing the metrics about this planning instance.
0134: */
0135: private PlannerMetrics mPMetrics;
0136:
0137: /**
0138: * The number formatter to format the run submit dir entries.
0139: */
0140: private NumberFormat mNumFormatter;
0141:
0142: /**
0143: * The user name of the user running Pegasus.
0144: */
0145: private String mUser;
0146:
0147: /**
0148: * Default constructor.
0149: */
0150: public CPlanner() {
0151: super ();
0152: mLogMsg = new String();
0153: mVersion = Version.instance().toString();
0154: mNumFormatter = new DecimalFormat("0000");
0155:
0156: this .mPOptions = new PlannerOptions();
0157: mPOptions.setSubmitDirectory(".", null);
0158: mPOptions.setExecutionSites(new java.util.HashSet());
0159: mPOptions.setOutputSite("");
0160:
0161: mUser = mProps.getProperty("user.name");
0162: if (mUser == null) {
0163: mUser = "user";
0164: }
0165:
0166: mPMetrics = new PlannerMetrics();
0167: mPMetrics.setUser(mUser);
0168: }
0169:
0170: /**
0171: * The main program for the CPlanner.
0172: *
0173: *
0174: * @param args the main arguments passed to the planner.
0175: */
0176: public static void main(String[] args) {
0177:
0178: CPlanner cPlanner = new CPlanner();
0179: int result = 0;
0180: double starttime = new Date().getTime();
0181: double execTime = -1;
0182:
0183: try {
0184: cPlanner.executeCommand(args);
0185: } catch (FactoryException fe) {
0186: cPlanner.log(fe.convertException(),
0187: LogManager.FATAL_MESSAGE_LEVEL);
0188: result = 2;
0189: } catch (RuntimeException rte) {
0190: //catch all runtime exceptions including our own that
0191: //are thrown that may have chained causes
0192: cPlanner.log(convertException(rte),
0193: LogManager.FATAL_MESSAGE_LEVEL);
0194: result = 1;
0195: } catch (Exception e) {
0196: //unaccounted for exceptions
0197: cPlanner
0198: .log(e.getMessage(), LogManager.FATAL_MESSAGE_LEVEL);
0199: result = 3;
0200: } finally {
0201: double endtime = new Date().getTime();
0202: execTime = (endtime - starttime) / 1000;
0203: }
0204:
0205: // warn about non zero exit code
0206: if (result != 0) {
0207: cPlanner.log("Non-zero exit-code " + result,
0208: LogManager.WARNING_MESSAGE_LEVEL);
0209: } else {
0210: //log the time taken to execute
0211: cPlanner.log("Time taken to execute is " + execTime
0212: + " seconds", LogManager.INFO_MESSAGE_LEVEL);
0213: }
0214:
0215: System.exit(result);
0216: }
0217:
0218: /**
0219: * Loads all the properties that are needed by this class.
0220: */
0221: public void loadProperties() {
0222:
0223: }
0224:
0225: /**
0226: * Executes the command on the basis of the options specified.
0227: *
0228: * @param args the command line options.
0229: */
0230: public void executeCommand(String[] args) {
0231: executeCommand(parseCommandLineArguments(args));
0232: }
0233:
0234: /**
0235: * Executes the command on the basis of the options specified.
0236: *
0237: * @param options the command line options.
0238: */
0239: public void executeCommand(PlannerOptions options) {
0240: String message = new String();
0241: mPOptions = options;
0242:
0243: //print help if asked for
0244: if (mPOptions.getHelp()) {
0245: printLongVersion();
0246: return;
0247: }
0248:
0249: //set the logging level only if -v was specified
0250: //else bank upon the the default logging level
0251: if (mPOptions.getLoggingLevel() > 0) {
0252: mLogger.setLevel(mPOptions.getLoggingLevel());
0253: }
0254:
0255: //do sanity check on dax file
0256: String dax = mPOptions.getDAX();
0257: String pdax = mPOptions.getPDAX();
0258: String baseDir = mPOptions.getBaseSubmitDirectory();
0259:
0260: if (dax == null && pdax == null) {
0261: mLogger
0262: .log(
0263: "\nNeed to specify either a dax file ( using --dax ) or a pdax file (using --pdax) to plan",
0264: LogManager.INFO_MESSAGE_LEVEL);
0265: this .printShortVersion();
0266: return;
0267: }
0268:
0269: if (mPOptions.getPartitioningType() != null) {
0270: // partition and plan the workflow
0271: doPartitionAndPlan(mProps, options);
0272: return;
0273: }
0274:
0275: //populate planner metrics
0276: mPMetrics.setStartTime(new Date());
0277: mPMetrics.setVOGroup(mPOptions.getVOGroup());
0278: mPMetrics
0279: .setBaseSubmitDirectory(mPOptions.getSubmitDirectory());
0280: mPMetrics.setDAX(mPOptions.getDAX());
0281:
0282: UserOptions opts = UserOptions.getInstance(mPOptions);
0283:
0284: //try to get hold of the vds properties
0285: //set in the jvm that user specifed at command line
0286: mPOptions.setVDSProperties(mProps.getMatchingProperties(
0287: "pegasus.", false));
0288:
0289: List allVDSProps = mProps.getMatchingProperties("pegasus.",
0290: false);
0291: mLogger.log("Pegasus Properties set by the user",
0292: LogManager.CONFIG_MESSAGE_LEVEL);
0293: for (java.util.Iterator it = allVDSProps.iterator(); it
0294: .hasNext();) {
0295: NameValue nv = (NameValue) it.next();
0296: mLogger.log(nv.toString(), LogManager.CONFIG_MESSAGE_LEVEL);
0297:
0298: }
0299:
0300: //check if sites set by user. If user has not specified any sites then
0301: //load all sites from site catalog.
0302: Collection eSites = mPOptions.getExecutionSites();
0303: if (eSites.isEmpty()) {
0304: mLogger
0305: .log(
0306: "No sites given by user. Will use sites from the site catalog",
0307: LogManager.DEBUG_MESSAGE_LEVEL);
0308: List sitelist = (PoolMode.loadPoolInstance(PoolMode
0309: .getImplementingClass(mProps.getPoolMode()), mProps
0310: .getPoolFile(), PoolMode.SINGLETON_LOAD))
0311: .getPools();
0312:
0313: if (sitelist != null) {
0314: if (sitelist.contains("local")) {
0315: sitelist.remove("local");
0316: }
0317: if (sitelist.size() >= 1) {
0318: Set siteset = new HashSet(sitelist);
0319: mPOptions.setExecutionSites(siteset);
0320: eSites = mPOptions.getExecutionSites();
0321: } else {
0322: throw new RuntimeException(
0323: "Only local site is available. "
0324: + " Make sure your site catalog contains more sites");
0325: }
0326: } else {
0327: throw new RuntimeException(
0328: "No sites present in the site catalog. "
0329: + "Please make sure your site catalog is correctly populated");
0330: }
0331: }
0332:
0333: if (dax == null && pdax != null && !eSites.isEmpty()) {
0334: //do the deferreed planning by parsing
0335: //the partition graph in the pdax file.
0336: doDeferredPlanning();
0337: } else if (pdax == null && dax != null && !eSites.isEmpty()) {
0338:
0339: Callback cb = DAXCallbackFactory.loadInstance(mProps, dax,
0340: "DAX2CDAG");
0341:
0342: DaxParser daxParser = new DaxParser(dax, mProps, cb);
0343:
0344: ADag orgDag = (ADag) cb.getConstructedObject();
0345:
0346: //generate the flow ids for the classads information
0347: orgDag.dagInfo.generateFlowName();
0348: orgDag.dagInfo.setFlowTimestamp(mPOptions
0349: .getDateTime(mProps.useExtendedTimeStamp()));
0350: orgDag.dagInfo.setDAXMTime(dax);
0351: orgDag.dagInfo.generateFlowID();
0352: orgDag.dagInfo.setReleaseVersion();
0353:
0354: //write out a the relevant properties to submit directory
0355: int state = 0;
0356: String relativeDir; //the submit directory relative to the base specified
0357: try {
0358: //create the base directory if required
0359: relativeDir = (mPOptions.partOfDeferredRun()) ? null
0360: : (mPOptions.getRelativeSubmitDirectory() == null) ?
0361: //create our own relative dir
0362: createSubmitDirectory(
0363: orgDag,
0364: baseDir,
0365: mUser,
0366: mPOptions.getVOGroup(),
0367: mProps
0368: .useTimestampForDirectoryStructure())
0369: : mPOptions
0370: .getRelativeSubmitDirectory();
0371: mPOptions.setSubmitDirectory(baseDir, relativeDir);
0372: state++;
0373: mProps.writeOutProperties(mPOptions
0374: .getSubmitDirectory());
0375:
0376: mPMetrics.setRelativeSubmitDirectory(mPOptions
0377: .getRelativeSubmitDirectory());
0378:
0379: //also log in the planner metrics where the properties are
0380: mPMetrics.setProperties(mProps
0381: .getPropertiesInSubmitDirectory());
0382: } catch (IOException ioe) {
0383: String error = (state == 0) ? "Unable to write to directory"
0384: : "Unable to write out properties to directory";
0385: throw new RuntimeException(error
0386: + mPOptions.getSubmitDirectory(), ioe);
0387:
0388: }
0389:
0390: //check if a random directory is specified by the user
0391: if (mPOptions.generateRandomDirectory()
0392: && mPOptions.getRandomDir() == null) {
0393: //user has specified the random dir name but wants
0394: //to go with default name which is the flow id
0395: //for the workflow unless a basename is specified.
0396: mPOptions.setRandomDir(getRandomDirectory(orgDag));
0397: } else if (mPOptions.getRandomDir() != null) {
0398: //keep the name that the user passed
0399: } else if (relativeDir != null) {
0400: //the relative directory constructed on the submit host
0401: //is the one required for remote sites
0402: mPOptions.setRandomDir(relativeDir);
0403: }
0404:
0405: //populate the singleton instance for user options
0406: //UserOptions opts = UserOptions.getInstance(mPOptions);
0407: MainEngine cwmain = new MainEngine(orgDag, mProps,
0408: mPOptions);
0409:
0410: ADag finalDag = cwmain.runPlanner();
0411: DagInfo ndi = finalDag.dagInfo;
0412:
0413: //store the workflow metrics from the final dag into
0414: //the planner metrics
0415: mPMetrics.setWorkflowMetrics(finalDag.getWorkflowMetrics());
0416:
0417: //we only need the script writer for daglite megadag generator mode
0418: CodeGenerator codeGenerator = null;
0419: codeGenerator = CodeGeneratorFactory.loadInstance(cwmain
0420: .getPegasusBag());
0421:
0422: //before generating the codes for the workflow check
0423: //for emtpy workflows
0424: if (finalDag.isEmpty()) {
0425:
0426: mLogger.log(EMPTY_FINAL_WORKFLOW_MESSAGE,
0427: LogManager.INFO_MESSAGE_LEVEL);
0428: return;
0429: }
0430:
0431: message = "Generating codes for the concrete workflow";
0432: log(message, LogManager.INFO_MESSAGE_LEVEL);
0433: try {
0434: codeGenerator.generateCode(finalDag);
0435:
0436: //generate only the braindump file that is required.
0437: //no spawning off the tailstatd for time being
0438: codeGenerator.startMonitoring();
0439:
0440: /*
0441: if (mPOptions.monitorWorkflow()) {
0442: //submit files successfully generated.
0443: //spawn off the monitoring daemon
0444: codeGenerator.startMonitoring();
0445: }
0446: */
0447: } catch (Exception e) {
0448: throw new RuntimeException("Unable to generate code", e);
0449: }
0450: mLogger.logCompletion(message,
0451: LogManager.INFO_MESSAGE_LEVEL);
0452:
0453: //create the submit files for cleanup dag if
0454: //random dir option specified
0455: if (mPOptions.generateRandomDirectory()) {
0456: ADag cleanupDAG = cwmain.getCleanupDAG();
0457: PlannerOptions cleanupOptions = (PlannerOptions) mPOptions
0458: .clone();
0459:
0460: //submit files are generated in a subdirectory
0461: //of the submit directory
0462: message = "Generating code for the cleanup workflow";
0463: mLogger.log(message, LogManager.INFO_MESSAGE_LEVEL);
0464: //set the submit directory in the planner options for cleanup wf
0465: cleanupOptions.setSubmitDirectory(cleanupOptions
0466: .getSubmitDirectory(), this .CLEANUP_DIR);
0467: PegasusBag bag = cwmain.getPegasusBag();
0468: bag.add(PegasusBag.PLANNER_OPTIONS, cleanupOptions);
0469: codeGenerator = CodeGeneratorFactory
0470: .loadInstance(cwmain.getPegasusBag());
0471:
0472: try {
0473: codeGenerator.generateCode(cleanupDAG);
0474: } catch (Exception e) {
0475: throw new RuntimeException(
0476: "Unable to generate code", e);
0477: }
0478:
0479: mLogger.logCompletion(message,
0480: LogManager.INFO_MESSAGE_LEVEL);
0481: }
0482:
0483: //log entry in to the work catalog
0484: boolean nodatabase = logEntryInWorkCatalog(finalDag,
0485: baseDir, relativeDir);
0486:
0487: //write out the planner metrics to global log
0488: mPMetrics.setEndTime(new Date());
0489: writeOutMetrics(mPMetrics);
0490:
0491: if (mPOptions.submitToScheduler()) {//submit the jobs
0492: StringBuffer invocation = new StringBuffer();
0493: //construct the path to the bin directory
0494: invocation.append(mProps.getPegasusHome()).append(
0495: File.separator).append("bin").append(
0496: File.separator).append(
0497: getPegasusRunInvocation(nodatabase));
0498:
0499: boolean submit = submitWorkflow(invocation.toString());
0500: if (!submit) {
0501: throw new RuntimeException(
0502: "Unable to submit the workflow using pegasus-run");
0503: }
0504: } else {
0505: //log the success message
0506: this .logSuccessfulCompletion(nodatabase);
0507: }
0508: } else {
0509: printShortVersion();
0510: throw new RuntimeException(
0511: "Invalid combination of arguments passed");
0512: }
0513: }
0514:
0515: /**
0516: * Parses the command line arguments using GetOpt and returns a
0517: * <code>PlannerOptions</code> contains all the options passed by the
0518: * user at the command line.
0519: *
0520: * @param args the arguments passed by the user at command line.
0521: *
0522: * @return the options.
0523: */
0524: public PlannerOptions parseCommandLineArguments(String[] args) {
0525: LongOpt[] longOptions = generateValidOptions();
0526:
0527: Getopt g = new Getopt("pegasus-plan", args,
0528: "vhfSnzpVr::aD:d:s:o:P:c:C:b:g:2:", longOptions, false);
0529: g.setOpterr(false);
0530:
0531: int option = 0;
0532: PlannerOptions options = new PlannerOptions();
0533:
0534: while ((option = g.getopt()) != -1) {
0535: //System.out.println("Option tag " + (char)option);
0536: switch (option) {
0537:
0538: case 1://monitor
0539: options.setMonitoring(true);
0540: break;
0541:
0542: case 'z'://deferred
0543: options.setPartOfDeferredRun(true);
0544: break;
0545:
0546: case 'a'://authenticate
0547: options.setAuthentication(true);
0548: break;
0549:
0550: case 'b'://optional basename
0551: options.setBasenamePrefix(g.getOptarg());
0552: break;
0553:
0554: case 'c'://cache
0555: options.setCacheFiles(g.getOptarg());
0556: break;
0557:
0558: case 'C'://cluster
0559: options.setClusteringTechnique(g.getOptarg());
0560: break;
0561:
0562: case 'd'://dax
0563: options.setDAX(g.getOptarg());
0564: break;
0565:
0566: case 'D': //dir
0567: options.setSubmitDirectory(g.getOptarg(), null);
0568: break;
0569:
0570: case '2'://relative-dir
0571: options.setRelativeSubmitDirectory(g.getOptarg());
0572: break;
0573:
0574: case 'f'://force
0575: options.setForce(true);
0576: break;
0577:
0578: case 'g': //group
0579: options.setVOGroup(g.getOptarg());
0580: break;
0581:
0582: case 'h'://help
0583: options.setHelp(true);
0584: break;
0585:
0586: case 'm'://megadag option
0587: options.setMegaDAGMode(g.getOptarg());
0588: break;
0589:
0590: case 'n'://nocleanup option
0591: options.setCleanup(false);
0592: break;
0593:
0594: case 'o'://output
0595: options.setOutputSite(g.getOptarg());
0596: break;
0597:
0598: case 'p'://partition and plan
0599: options.setPartitioningType("Whole");
0600: break;
0601:
0602: case 'P'://pdax file
0603: options.setPDAX(g.getOptarg());
0604: break;
0605:
0606: case 'r'://randomdir
0607: options.setRandomDir(g.getOptarg());
0608: break;
0609:
0610: case 'S'://submit option
0611: options.setSubmitToScheduler(true);
0612: break;
0613:
0614: case 's'://sites
0615: options.setExecutionSites(g.getOptarg());
0616: break;
0617:
0618: case 'v'://verbose
0619: options.incrementLogging();
0620: break;
0621:
0622: case 'V'://version
0623: mLogger.log(getGVDSVersion(),
0624: LogManager.INFO_MESSAGE_LEVEL);
0625: System.exit(0);
0626:
0627: default: //same as help
0628: printShortVersion();
0629: throw new RuntimeException(
0630: "Incorrect option or option usage " + option);
0631:
0632: }
0633: }
0634: return options;
0635:
0636: }
0637:
0638: /**
0639: * Logs an entry in the work catalog about the workflow being planned.
0640: *
0641: * @param dag ADag
0642: * @param baseDir String
0643: * @param relativeDir String
0644: *
0645: * @return boolean
0646: */
0647: protected boolean logEntryInWorkCatalog(ADag dag, String baseDir,
0648: String relativeDir) {
0649: //connect to the work catalog and populate
0650: //an entry in it for the current workflow
0651: WorkCatalog wc = null;
0652: boolean nodatabase = false;
0653: try {
0654: wc = WorkFactory.loadInstance(mProps);
0655: } catch (Exception e) {
0656: //just log and proceed
0657: mLogger.log("Ignoring: " + convertException(e),
0658: LogManager.DEBUG_MESSAGE_LEVEL);
0659: nodatabase = true;
0660: }
0661: if (wc != null) {
0662: wc.insert(baseDir, mPOptions.getVOGroup(), dag.getLabel(),
0663: new File(relativeDir == null ? "." : relativeDir)
0664: .getName(), mUser, Currently.parse(dag
0665: .getMTime()), Currently.parse(dag.dagInfo
0666: .getFlowTimestamp()), -2);
0667: try {
0668: wc.close();
0669: } catch (Exception e) {
0670: /*ignore */
0671: nodatabase = true;
0672: }
0673: }
0674: return nodatabase;
0675: }
0676:
0677: /**
0678: * Submits the workflow for execution using pegasus-run, a wrapper around
0679: * pegasus-submit-dag.
0680: *
0681: * @param invocation the pegasus run invocation
0682: *
0683: * @return boolean indicating whether could successfully submit the workflow or not.
0684: */
0685: public boolean submitWorkflow(String invocation) {
0686: boolean result = false;
0687: try {
0688: //set the callback and run the pegasus-run command
0689: Runtime r = Runtime.getRuntime();
0690:
0691: mLogger.log("Executing " + invocation,
0692: LogManager.DEBUG_MESSAGE_LEVEL);
0693: Process p = r.exec(invocation);
0694:
0695: //spawn off the gobblers with the already initialized default callback
0696: StreamGobbler ips = new StreamGobbler(p.getInputStream(),
0697: new DefaultStreamGobblerCallback(
0698: LogManager.INFO_MESSAGE_LEVEL));
0699: StreamGobbler eps = new StreamGobbler(p.getErrorStream(),
0700: new DefaultStreamGobblerCallback(
0701: LogManager.ERROR_MESSAGE_LEVEL));
0702:
0703: ips.start();
0704: eps.start();
0705:
0706: //wait for the threads to finish off
0707: ips.join();
0708: eps.join();
0709:
0710: //get the status
0711: int status = p.waitFor();
0712:
0713: mLogger.log("Submission of workflow exited with status "
0714: + status, LogManager.DEBUG_MESSAGE_LEVEL);
0715:
0716: result = (status == 0) ? true : false;
0717: } catch (IOException ioe) {
0718: mLogger.log("IOException while running tailstatd ", ioe,
0719: LogManager.ERROR_MESSAGE_LEVEL);
0720: } catch (InterruptedException ie) {
0721: //ignore
0722: }
0723: return result;
0724:
0725: }
0726:
0727: /**
0728: * Partitions and plans the workflow. First step of merging DAGMan and
0729: * Condor
0730: *
0731: * @param properties the properties passed to the planner.
0732: * @param options the options passed to the planner.
0733: */
0734: protected void doPartitionAndPlan(PegasusProperties properties,
0735: PlannerOptions options) {
0736: //we first need to get the label of DAX
0737: Callback cb = DAXCallbackFactory.loadInstance(properties,
0738: options.getDAX(), "DAX2Metadata");
0739: try {
0740: DaxParser daxParser = new DaxParser(options.getDAX(),
0741: properties, cb);
0742: } catch (Exception e) {
0743: //ignore
0744: }
0745: Map metadata = (Map) cb.getConstructedObject();
0746: String label = (String) metadata.get("name");
0747:
0748: String baseDir = options.getBaseSubmitDirectory();
0749: String relativeDir = null;
0750: //construct the submit directory structure
0751: try {
0752: relativeDir = (options.getRelativeSubmitDirectory() == null) ?
0753: //create our own relative dir
0754: createSubmitDirectory(label, baseDir, mUser, options
0755: .getVOGroup(), properties
0756: .useTimestampForDirectoryStructure())
0757: : options.getRelativeSubmitDirectory();
0758: } catch (IOException ioe) {
0759: String error = "Unable to write to directory";
0760: throw new RuntimeException(error
0761: + options.getSubmitDirectory(), ioe);
0762:
0763: }
0764:
0765: options.setSubmitDirectory(baseDir, relativeDir);
0766: mLogger.log("Submit Directory for workflow is "
0767: + options.getSubmitDirectory(),
0768: LogManager.DEBUG_MESSAGE_LEVEL);
0769:
0770: //now let us run partitiondax
0771: mLogger.log("Partitioning Workflow",
0772: LogManager.INFO_MESSAGE_LEVEL);
0773: PartitionDAX partitionDAX = new PartitionDAX();
0774: File dir = new File(options.getSubmitDirectory(), "dax");
0775: String pdax = partitionDAX.partitionDAX(properties, options
0776: .getDAX(), dir.getAbsolutePath(), options
0777: .getPartitioningType());
0778:
0779: mLogger.log("PDAX file generated is " + pdax,
0780: LogManager.DEBUG_MESSAGE_LEVEL);
0781: mLogger.logCompletion("Partitioning Workflow",
0782: LogManager.INFO_MESSAGE_LEVEL);
0783:
0784: //now run pegasus-plan with pdax option
0785: CPlanner pegasusPlan = new CPlanner();
0786: options.setDAX(null);
0787: options.setPDAX(pdax);
0788: options.setPartitioningType(null);
0789:
0790: pegasusPlan.executeCommand(options);
0791:
0792: }
0793:
0794: /**
0795: * Sets the basename of the random directory that is created on the remote
0796: * sites per workflow. The name is generated by default from teh flow ID,
0797: * unless a basename prefix is specifed at runtime in the planner options.
0798: *
0799: * @param dag the DAG containing the abstract workflow.
0800: *
0801: * @return the basename of the random directory.
0802: */
0803: protected String getRandomDirectory(ADag dag) {
0804:
0805: //constructing the name of the dagfile
0806: StringBuffer sb = new StringBuffer();
0807: String bprefix = mPOptions.getBasenamePrefix();
0808: if (bprefix != null) {
0809: //the prefix is not null using it
0810: sb.append(bprefix);
0811: sb.append("-");
0812: //append timestamp to generate some uniqueness
0813: sb.append(dag.dagInfo.getFlowTimestamp());
0814: } else {
0815: //use the flow ID that contains the timestamp and the name both.
0816: sb.append(dag.dagInfo.flowID);
0817: }
0818: return sb.toString();
0819: }
0820:
0821: /**
0822: * Tt generates the LongOpt which contain the valid options that the command
0823: * will accept.
0824: *
0825: * @return array of <code>LongOpt</code> objects , corresponding to the valid
0826: * options
0827: */
0828: public LongOpt[] generateValidOptions() {
0829: LongOpt[] longopts = new LongOpt[22];
0830:
0831: longopts[0] = new LongOpt("dir", LongOpt.REQUIRED_ARGUMENT,
0832: null, 'D');
0833: longopts[1] = new LongOpt("dax", LongOpt.REQUIRED_ARGUMENT,
0834: null, 'd');
0835: longopts[2] = new LongOpt("sites", LongOpt.REQUIRED_ARGUMENT,
0836: null, 's');
0837: longopts[3] = new LongOpt("output", LongOpt.REQUIRED_ARGUMENT,
0838: null, 'o');
0839: longopts[4] = new LongOpt("verbose", LongOpt.NO_ARGUMENT, null,
0840: 'v');
0841: longopts[5] = new LongOpt("help", LongOpt.NO_ARGUMENT, null,
0842: 'h');
0843: longopts[6] = new LongOpt("force", LongOpt.NO_ARGUMENT, null,
0844: 'f');
0845: longopts[7] = new LongOpt("submit", LongOpt.NO_ARGUMENT, null,
0846: 'S');
0847: longopts[8] = new LongOpt("version", LongOpt.NO_ARGUMENT, null,
0848: 'V');
0849: longopts[9] = new LongOpt("randomdir",
0850: LongOpt.OPTIONAL_ARGUMENT, null, 'r');
0851: longopts[10] = new LongOpt("authenticate", LongOpt.NO_ARGUMENT,
0852: null, 'a');
0853: //deferred planning options
0854: longopts[11] = new LongOpt("pdax", LongOpt.REQUIRED_ARGUMENT,
0855: null, 'P');
0856: longopts[12] = new LongOpt("cache", LongOpt.REQUIRED_ARGUMENT,
0857: null, 'c');
0858: longopts[13] = new LongOpt("megadag",
0859: LongOpt.REQUIRED_ARGUMENT, null, 'm');
0860: //collapsing for mpi
0861: longopts[14] = new LongOpt("cluster",
0862: LongOpt.REQUIRED_ARGUMENT, null, 'C');
0863: //more deferred planning stuff
0864: longopts[15] = new LongOpt("basename",
0865: LongOpt.REQUIRED_ARGUMENT, null, 'b');
0866: longopts[16] = new LongOpt("monitor", LongOpt.NO_ARGUMENT,
0867: null, 1);
0868: longopts[17] = new LongOpt("nocleanup", LongOpt.NO_ARGUMENT,
0869: null, 'n');
0870: longopts[18] = new LongOpt("group", LongOpt.REQUIRED_ARGUMENT,
0871: null, 'g');
0872: longopts[19] = new LongOpt("deferred", LongOpt.NO_ARGUMENT,
0873: null, 'z');
0874: longopts[20] = new LongOpt("relative-dir",
0875: LongOpt.REQUIRED_ARGUMENT, null, '2');
0876: longopts[21] = new LongOpt("pap", LongOpt.NO_ARGUMENT, null,
0877: 'p');
0878: return longopts;
0879: }
0880:
0881: /**
0882: * Prints out a short description of what the command does.
0883: */
0884: public void printShortVersion() {
0885: String text = "\n $Id: CPlanner.java 464 2008-02-07 23:30:44Z vahi $ "
0886: + "\n "
0887: + getGVDSVersion()
0888: + "\n Usage : pegasus-plan [-Dprop [..]] -d|-P <dax file|pdax file> "
0889: + " [-s site[,site[..]]] [-b prefix] [-c f1[,f2[..]]] [-f] [-m style] " /*<dag|noop|daglite>]*/
0890: + "\n [-a] [-b basename] [-C t1[,t2[..]] [-D <base dir for o/p files>] "
0891: + " [ --relative-dir <relative directory to base directory> ][-g <vogroup>] [-o <output site>] "
0892: + "\n [-r[dir name]] [--monitor] [-S] [-n] [-v] [-V] [-h]";
0893:
0894: System.out.println(text);
0895: }
0896:
0897: /**
0898: * Prints the long description, displaying in detail what the various options
0899: * to the command stand for.
0900: */
0901: public void printLongVersion() {
0902:
0903: String text = "\n $Id: CPlanner.java 464 2008-02-07 23:30:44Z vahi $ "
0904: + "\n "
0905: + getGVDSVersion()
0906: + "\n pegasus-plan - The main class which is used to run Pegasus. "
0907: + "\n Usage: pegasus-plan [-Dprop [..]] --dax|--pdax <file> [--sites <execution sites>] "
0908: + "\n [--authenticate] [--basename prefix] [--cache f1[,f2[..]] [--cluster t1[,t2[..]] "
0909: + "\n [--dir <dir for o/p files>] [--force] [--group vogroup] [--megadag style] [--monitor] [--nocleanup] "
0910: + "\n [--output output site] [--randomdir=[dir name]] [--verbose] [--version][--help] "
0911: + "\n"
0912: + "\n Mandatory Options "
0913: + "\n -d |-P fn "
0914: + "\n --dax|--pdax the path to the dax file containing the abstract workflow "
0915: + "\n or the path to the pdax file containing the partition graph "
0916: + "\n generated by the partitioner."
0917: + "\n Other Options "
0918: + "\n -a |--authenticate turn on authentication against remote sites ."
0919: + "\n -b |--basename the basename prefix while constructing the per workflow files like .dag etc."
0920: + "\n -c |--cache comma separated list of replica cache files."
0921: + "\n -C |--cluster comma separated list of clustering techniques to be applied to the workflow to "
0922: + "\n to cluster jobs in to larger jobs, to avoid scheduling overheads."
0923: + "\n -D |--dir the directory where to generate the concrete workflow."
0924: + "\n --relative-dir the relative directory to the base directory where to generate the concrete workflow."
0925: + "\n -f |--force skip reduction of the workflow, resulting in build style dag."
0926: + "\n -g |--group the VO Group to which the user belongs "
0927: + "\n -m |--megadag type of style to use while generating the megadag in deferred planning."
0928: + "\n -o |--output the output site where the data products during workflow execution are transferred to."
0929: + "\n -s |--sites comma separated list of executions sites on which to map the workflow."
0930: + "\n -r |--randomdir create random directories on remote execution sites in which jobs are executed"
0931: + "\n can optionally specify the basename of the remote directories"
0932: + "\n --monitor monitor the execution of the workflow, using workflow monitor daemon like tailstatd."
0933: + "\n -n |--nocleanup generates only the separate cleanup workflow. Does not add cleanup nodes to the concrete workflow."
0934: + "\n -S |--submit submit the executable workflow generated"
0935: + "\n -v |--verbose increases the verbosity of messages about what is going on"
0936: + "\n -V |--version displays the version of the Pegasus Workflow Management System"
0937: + "\n -h |--help generates this help."
0938: + "\n The following exitcodes are produced"
0939: + "\n 0 concrete planner planner was able to generate a concretized workflow"
0940: + "\n 1 an error occured. In most cases, the error message logged should give a"
0941: + "\n clear indication as to where things went wrong."
0942: + "\n 2 an error occured while loading a specific module implementation at runtime"
0943: + "\n ";
0944:
0945: System.out.println(text);
0946: //mLogger.log(text,LogManager.INFO_MESSAGE_LEVEL);
0947: }
0948:
0949: /**
0950: * This ends up invoking the deferred planning code, that generates
0951: * the MegaDAG that is used to submit the partitioned daxes in layers.
0952: */
0953: private void doDeferredPlanning() {
0954: String mode = mPOptions.getMegaDAGMode();
0955: mode = (mode == null) ? DEFAULT_MEGADAG_MODE : mode;
0956:
0957: String file = mPOptions.getPDAX();
0958:
0959: //get the name of the directory from the file
0960: String directory = new File(file).getParent();
0961: //System.out.println("Directory in which partitioned daxes are " + directory);
0962:
0963: int errorStatus = 1;
0964: ADag megaDAG = null;
0965: try {
0966: //load the correct callback handler
0967: org.griphyn.cPlanner.parser.pdax.Callback c = PDAXCallbackFactory
0968: .loadInstance(mProps, mPOptions, directory);
0969: errorStatus = 2;
0970:
0971: //this is a bug. Should not be called. To be corrected by Karan
0972: UserOptions y = UserOptions.getInstance(mPOptions);
0973:
0974: //start the parsing and let the fun begin
0975: PDAXParser p = new PDAXParser(file, mProps);
0976: p.setCallback(c);
0977: p.startParser(file);
0978:
0979: megaDAG = (ADag) c.getConstructedObject();
0980: } catch (FactoryException fe) {
0981: //just rethrow for time being. we need error status as 2
0982: throw fe;
0983: } catch (Exception e) {
0984: String message;
0985: switch (errorStatus) {
0986: case 1:
0987: message = "Unable to load the PDAX Callback ";
0988: break;
0989:
0990: case 2:
0991: message = "Unable to parse the PDAX file ";
0992: break;
0993:
0994: default:
0995: //unreachable code
0996: message = "Unknown Error ";
0997: break;
0998: }
0999: throw new RuntimeException(message, e);
1000: }
1001:
1002: this .logSuccessfulCompletion(this .logEntryInWorkCatalog(
1003: megaDAG, mPOptions.getBaseSubmitDirectory(), mPOptions
1004: .getRelativeSubmitDirectory()));
1005:
1006: }
1007:
1008: /**
1009: * Creates the submit directory for the workflow. This is not thread safe.
1010: *
1011: * @param dag the workflow being worked upon.
1012: * @param dir the base directory specified by the user.
1013: * @param user the username of the user.
1014: * @param vogroup the vogroup to which the user belongs to.
1015: * @param timestampBased boolean indicating whether to have a timestamp based dir or not
1016: *
1017: * @return the directory name created relative to the base directory passed
1018: * as input.
1019: *
1020: * @throws IOException in case of unable to create submit directory.
1021: */
1022: protected String createSubmitDirectory(ADag dag, String dir,
1023: String user, String vogroup, boolean timestampBased)
1024: throws IOException {
1025:
1026: return createSubmitDirectory(dag.getLabel(), dir, user,
1027: vogroup, timestampBased);
1028: }
1029:
1030: /**
1031: * Creates the submit directory for the workflow. This is not thread safe.
1032: *
1033: * @param label the label of the workflow
1034: * @param dir the base directory specified by the user.
1035: * @param user the username of the user.
1036: * @param vogroup the vogroup to which the user belongs to.
1037: * @param timestampBased boolean indicating whether to have a timestamp based dir or not
1038: *
1039: * @return the directory name created relative to the base directory passed
1040: * as input.
1041: *
1042: * @throws IOException in case of unable to create submit directory.
1043: */
1044: protected String createSubmitDirectory(String label, String dir,
1045: String user, String vogroup, boolean timestampBased)
1046: throws IOException {
1047: File base = new File(dir);
1048: StringBuffer result = new StringBuffer();
1049:
1050: //do a sanity check on the base
1051: sanityCheck(base);
1052:
1053: //add the user name if possible
1054: base = new File(base, user);
1055: result.append(user).append(File.separator);
1056:
1057: //add the vogroup
1058: base = new File(base, vogroup);
1059: sanityCheck(base);
1060: result.append(vogroup).append(File.separator);
1061:
1062: //add the label of the DAX
1063: base = new File(base, label);
1064: sanityCheck(base);
1065: result.append(label).append(File.separator);
1066:
1067: //create the directory name
1068: StringBuffer leaf = new StringBuffer();
1069: if (timestampBased) {
1070: leaf.append(mPOptions.getDateTime(mProps
1071: .useExtendedTimeStamp()));
1072: } else {
1073: //get all the files in this directory
1074: String[] files = base
1075: .list(new RunDirectoryFilenameFilter());
1076: //find the maximum run directory
1077: int num, max = 1;
1078: for (int i = 0; i < files.length; i++) {
1079: num = Integer.parseInt(files[i]
1080: .substring(SUBMIT_DIRECTORY_PREFIX.length()));
1081: if (num + 1 > max) {
1082: max = num + 1;
1083: }
1084: }
1085:
1086: //create the directory name
1087: leaf.append(SUBMIT_DIRECTORY_PREFIX).append(
1088: mNumFormatter.format(max));
1089: }
1090: result.append(leaf.toString());
1091: base = new File(base, leaf.toString());
1092: mLogger.log("Directory to be created is "
1093: + base.getAbsolutePath(),
1094: LogManager.DEBUG_MESSAGE_LEVEL);
1095: sanityCheck(base);
1096:
1097: return result.toString();
1098: }
1099:
1100: /**
1101: * Checks the destination location for existence, if it can
1102: * be created, if it is writable etc.
1103: *
1104: * @param dir is the new base directory to optionally create.
1105: *
1106: * @throws IOException in case of error while writing out files.
1107: */
1108: protected static void sanityCheck(File dir) throws IOException {
1109: if (dir.exists()) {
1110: // location exists
1111: if (dir.isDirectory()) {
1112: // ok, isa directory
1113: if (dir.canWrite()) {
1114: // can write, all is well
1115: return;
1116: } else {
1117: // all is there, but I cannot write to dir
1118: throw new IOException(
1119: "Cannot write to existing directory "
1120: + dir.getPath());
1121: }
1122: } else {
1123: // exists but not a directory
1124: throw new IOException("Destination " + dir.getPath()
1125: + " already "
1126: + "exists, but is not a directory.");
1127: }
1128: } else {
1129: // does not exist, try to make it
1130: if (!dir.mkdirs()) {
1131: throw new IOException("Unable to create directory "
1132: + dir.getPath());
1133: }
1134: }
1135: }
1136:
1137: /**
1138: * Writes out the planner metrics to the global log.
1139: *
1140: * @param pm the metrics to be written out.
1141: *
1142: * @return boolean
1143: */
1144: protected boolean writeOutMetrics(PlannerMetrics pm) {
1145: boolean result = false;
1146: if (mProps.writeOutMetrics()) {
1147: File log = new File(mProps.getMetricsLogFile());
1148:
1149: //do a sanity check on the directory
1150: try {
1151: sanityCheck(log.getParentFile());
1152: //open the log file in append mode
1153: FileOutputStream fos = new FileOutputStream(log, true);
1154:
1155: //get an exclusive lock
1156: FileLock fl = fos.getChannel().lock();
1157: try {
1158: mLogger.log("Logging Planner Metrics to " + log,
1159: LogManager.DEBUG_MESSAGE_LEVEL);
1160: //write out to the planner metrics to fos
1161: fos.write(pm.toString().getBytes());
1162: } finally {
1163: fl.release();
1164: fos.close();
1165: }
1166:
1167: } catch (IOException ioe) {
1168: mLogger.log("Unable to write out planner metrics ",
1169: ioe, LogManager.DEBUG_MESSAGE_LEVEL);
1170: return false;
1171: }
1172:
1173: result = true;
1174: }
1175: return result;
1176: }
1177:
1178: /**
1179: * Logs the successful completion message.
1180: *
1181: * @param nodatabase boolean indicating whether to add a nodatabase option or not.
1182: */
1183: private void logSuccessfulCompletion(boolean nodatabase) {
1184: StringBuffer message = new StringBuffer();
1185: message.append(this .SUCCESS_MESSAGE).append("").append(
1186: getPegasusRunInvocation(nodatabase)).append("\n\n");
1187: mLogger.log(message.toString(), LogManager.INFO_MESSAGE_LEVEL);
1188:
1189: }
1190:
1191: /**
1192: * Returns the pegasus-run invocation on the workflow planned.
1193: *
1194: * @param nodatabase boolean indicating whether to add a nodatabase option or not.
1195: *
1196: * @return the pegasus-run invocation
1197: */
1198: private String getPegasusRunInvocation(boolean nodatabase) {
1199: StringBuffer result = new StringBuffer();
1200:
1201: result.append("pegasus-run ").append(
1202: "-Dpegasus.user.properties=").append(
1203: mProps.getPropertiesInSubmitDirectory()).append(
1204: nodatabase ? " --nodatabase" : "").append(" ").append(
1205: mPOptions.getSubmitDirectory());
1206:
1207: return result.toString();
1208:
1209: }
1210:
1211: }
1212:
1213: /**
1214: * A filename filter for identifying the run directory
1215: *
1216: * @author Karan Vahi vahi@isi.edu
1217: */
1218: class RunDirectoryFilenameFilter implements FilenameFilter {
1219:
1220: /**
1221: * Store the regular expressions necessary to parse kickstart output files
1222: */
1223: private static final String mRegexExpression = "("
1224: + CPlanner.SUBMIT_DIRECTORY_PREFIX
1225: + ")([0-9][0-9][0-9][0-9])";
1226:
1227: /**
1228: * Stores compiled patterns at first use, quasi-Singleton.
1229: */
1230: private static Pattern mPattern = null;
1231:
1232: /***
1233: * Tests if a specified file should be included in a file list.
1234: *
1235: * @param dir the directory in which the file was found.
1236: * @param name - the name of the file.
1237: *
1238: * @return true if and only if the name should be included in the file list
1239: * false otherwise.
1240: *
1241: *
1242: */
1243: public boolean accept(File dir, String name) {
1244: //compile the pattern only once.
1245: if (mPattern == null) {
1246: mPattern = Pattern.compile(mRegexExpression);
1247: }
1248: return mPattern.matcher(name).matches();
1249: }
1250:
1251: }
|