0001: /*
0002: * This file or a portion of this file is licensed under the terms of
0003: * the Globus Toolkit Public License, found in file GTPL, or at
0004: * http://www.globus.org/toolkit/download/license.html. This notice must
0005: * appear in redistributions of this file, with or without modification.
0006: *
0007: * Redistributions of this Software, with or without modification, must
0008: * reproduce the GTPL in: (1) the Software, or (2) the Documentation or
0009: * some other similar material which is provided with the Software (if
0010: * any).
0011: *
0012: * Copyright 1999-2004 University of Chicago and The University of
0013: * Southern California. All rights reserved.
0014: */
0015: package org.griphyn.cPlanner.parser.pdax;
0016:
0017: import org.griphyn.cPlanner.code.CodeGenerator;
0018: import org.griphyn.cPlanner.code.CodeGeneratorException;
0019:
0020: import org.griphyn.cPlanner.code.generator.CodeGeneratorFactory;
0021:
0022: import org.griphyn.cPlanner.code.gridstart.GridStartFactory;
0023:
0024: import org.griphyn.cPlanner.classes.ADag;
0025: import org.griphyn.cPlanner.classes.PlannerOptions;
0026: import org.griphyn.cPlanner.classes.SubInfo;
0027: import org.griphyn.cPlanner.classes.PegasusBag;
0028:
0029: import org.griphyn.cPlanner.common.LogManager;
0030: import org.griphyn.cPlanner.common.PegasusProperties;
0031: import org.griphyn.cPlanner.common.StreamGobbler;
0032: import org.griphyn.cPlanner.common.DefaultStreamGobblerCallback;
0033: import org.griphyn.cPlanner.common.StreamGobblerCallback;
0034:
0035: import org.griphyn.cPlanner.namespace.Dagman;
0036: import org.griphyn.cPlanner.namespace.VDS;
0037:
0038: import org.griphyn.cPlanner.partitioner.Partition;
0039: import org.griphyn.cPlanner.partitioner.DAXWriter;
0040:
0041: import org.griphyn.cPlanner.poolinfo.SiteFactory;
0042:
0043: import org.griphyn.common.catalog.TransformationCatalog;
0044: import org.griphyn.common.catalog.TransformationCatalogEntry;
0045: import org.griphyn.common.catalog.transformation.TCMode;
0046:
0047: import org.griphyn.vdl.euryale.FileFactory;
0048: import org.griphyn.vdl.euryale.HashedFileFactory;
0049: import org.griphyn.vdl.euryale.FlatFileFactory;
0050:
0051: import org.griphyn.common.classes.TCType;
0052:
0053: import org.griphyn.common.util.Version;
0054: import org.griphyn.common.util.FactoryException;
0055:
0056: import java.io.File;
0057: import java.io.IOException;
0058: import java.io.OutputStream;
0059: import java.io.FileOutputStream;
0060: import java.io.FileWriter;
0061: import java.io.PrintWriter;
0062: import java.io.BufferedWriter;
0063: import java.io.FilenameFilter;
0064:
0065: import java.util.ArrayList;
0066: import java.util.HashMap;
0067: import java.util.Iterator;
0068: import java.util.List;
0069: import java.util.Map;
0070: import java.util.Properties;
0071:
0072: import java.util.regex.Pattern;
0073:
0074: import java.text.NumberFormat;
0075: import java.text.DecimalFormat;
0076:
0077: /**
0078: * This callback ends up creating the megadag that contains the smaller dags
0079: * each corresponding to the one level as identified in the pdax file
0080: * generated by the partitioner.
0081: *
0082: * @author Karan Vahi
0083: * @version $Revision: 464 $
0084: */
0085: public class PDAX2MDAG implements Callback {
0086:
0087: /**
0088: * The SubmitWriter that has to be loaded for now.
0089: */
0090: public static final String CODE_GENERATOR_CLASS = CodeGeneratorFactory.CONDOR_CODE_GENERATOR_CLASS;
0091:
0092: /**
0093: * The prefix for the submit directory.
0094: */
0095: public static final String SUBMIT_DIRECTORY_PREFIX = "run";
0096:
0097: /**
0098: * The number of jobs into which each job in the partition graph is
0099: * expanded to.
0100: */
0101: public static final int NUM_OF_EXPANDED_JOBS = 2;
0102:
0103: /**
0104: * The index of the head job.
0105: */
0106: public static final int HEAD_INDEX = 0;
0107:
0108: /**
0109: * The index of the tail job.
0110: */
0111: public static final int TAIL_INDEX = 1;
0112:
0113: /**
0114: * The logical name with which to query the transformation catalog for
0115: * cPlanner executable.
0116: */
0117: public static final String CPLANNER_LOGICAL_NAME = "pegasus-plan";
0118:
0119: /**
0120: * The namespace to use for condor dagman.
0121: */
0122: public static final String CONDOR_DAGMAN_NAMESPACE = "condor";
0123:
0124: /**
0125: * The logical name with which to query the transformation catalog for the
0126: * condor_dagman executable, that ends up running the mini dag as one
0127: * job.
0128: */
0129: public static final String CONDOR_DAGMAN_LOGICAL_NAME = "dagman";
0130:
0131: /**
0132: * The namespace to which the job in the MEGA DAG being created refer to.
0133: */
0134: public static final String NAMESPACE = "pegasus";
0135:
0136: /**
0137: * The planner utility that needs to be called as a prescript.
0138: */
0139: public static final String RETRY_LOGICAL_NAME = "pegasus-plan";
0140:
0141: /**
0142: * The dagman knobs controlled through property. They map the property name to
0143: * the corresponding dagman option.
0144: */
0145: public static final String DAGMAN_KNOBS[][] = {
0146: { "pegasus.dagman.maxpre", " -MaxPre " },
0147: { "pegasus.dagman.maxpost", " -MaxPost " },
0148: { "pegasus.dagman.maxjobs", " -MaxJobs " },
0149: { "pegasus.dagman.maxidle", " -MaxIdle " }, };
0150:
0151: /**
0152: * The file Separator to be used on the submit host.
0153: */
0154: protected static char mSeparator = File.separatorChar;
0155:
0156: /**
0157: * The directory in which the daxes corresponding to the partitions are
0158: * kept. This should be the same directory where the pdax containing the
0159: * partition graph resides.
0160: */
0161: private String mPDAXDirectory;
0162:
0163: /**
0164: * The root of the submit directory where all the submit directories for
0165: * the various partitions reside.
0166: */
0167: private String mSubmitDirectory;
0168:
0169: /**
0170: * The abstract dag object that ends up holding the megadag.
0171: */
0172: private ADag mMegaDAG;
0173:
0174: /**
0175: * The internal map that maps the partition id to the job responsible
0176: * for executing the partition..
0177: */
0178: private Map mJobMap;
0179:
0180: /**
0181: * The internal map that contains maps the job id of the partition to the
0182: * head and tail jobs in the linear sequence of jobs to which the partion
0183: * job is expanded to.
0184: */
0185: //private Map mSequenceMap;
0186: /**
0187: * The handle to the properties file.
0188: */
0189: private PegasusProperties mProps;
0190:
0191: /**
0192: * The handle to the transformation catalog.
0193: */
0194: private TransformationCatalog mTCHandle;
0195:
0196: /**
0197: * The handle to the logging object.
0198: */
0199: private LogManager mLogger;
0200:
0201: /**
0202: * The object containing the options that were given to the concrete
0203: * planner at runtime.
0204: */
0205: private PlannerOptions mPOptions;
0206:
0207: /**
0208: * Helping clone copy of options that is used for creating runtime options
0209: * for the different partitions.
0210: */
0211: private PlannerOptions mClonedPOptions;
0212:
0213: /**
0214: * The path to the properties file that is written out and shared by
0215: * all partitions in the mega DAG.
0216: */
0217: private String mMDAGPropertiesFile;
0218:
0219: /**
0220: * The handle to the file factory, that is used to create the top level
0221: * directories for each of the partitions.
0222: */
0223: private FileFactory mFactory;
0224:
0225: /**
0226: * An instance of the default stream gobbler callback implementation that
0227: * is used for creating symbolic links.
0228: */
0229: private StreamGobblerCallback mDefaultCallback;
0230:
0231: /**
0232: * The number formatter to format the run submit dir entries.
0233: */
0234: private NumberFormat mNumFormatter;
0235:
0236: /**
0237: * The user name of the user running Pegasus.
0238: */
0239: private String mUser;
0240:
0241: /**
0242: * A flag to store whether the parsing is complete or not.
0243: */
0244: private boolean mDone;
0245:
0246: /**
0247: * Any extra arguments that need to be passed to dagman, as determined
0248: * from the properties file.
0249: */
0250: private String mDAGManKnobs;
0251:
0252: /**
0253: * Bag of initialization objects.
0254: */
0255: private PegasusBag mBag;
0256:
0257: /**
0258: * The overloaded constructor.
0259: *
0260: * @param directory the directory where the pdax and all the daxes
0261: * corresponding to the partitions reside.
0262: * @param properties the <code>PegasusProperties</code> to be used.
0263: * @param options the options passed to the planner.
0264: */
0265: public PDAX2MDAG(String directory, PegasusProperties properties,
0266: PlannerOptions options) {
0267: mPDAXDirectory = directory;
0268: mProps = properties;
0269: mLogger = LogManager.getInstance();
0270: mPOptions = options;
0271: mClonedPOptions = (options == null) ? null
0272: : (PlannerOptions) options.clone();
0273: mTCHandle = TCMode.loadInstance();
0274: mMDAGPropertiesFile = null;
0275: mNumFormatter = new DecimalFormat("0000");
0276:
0277: mDone = false;
0278: mUser = mProps.getProperty("user.name");
0279: if (mUser == null) {
0280: mUser = "user";
0281: }
0282:
0283: //initialize the transformation mapper
0284: // mTCMapper = Mapper.loadTCMapper( mProps.getTCMapperMode() );
0285:
0286: //intialize the bag of objects and load the site selector
0287: mBag = new PegasusBag();
0288: mBag.add(PegasusBag.PEGASUS_LOGMANAGER, mLogger);
0289: mBag.add(PegasusBag.PEGASUS_PROPERTIES, mProps);
0290: mBag.add(PegasusBag.PLANNER_OPTIONS, options);
0291: mBag.add(PegasusBag.TRANSFORMATION_CATALOG, mTCHandle);
0292: // mBag.add( PegasusBag.TRANSFORMATION_MAPPER, mTCMapper );
0293: mBag.add(PegasusBag.PEGASUS_LOGMANAGER, mLogger);
0294:
0295: mBag.add(PegasusBag.SITE_CATALOG, SiteFactory.loadInstance(
0296: properties, false));
0297:
0298: //the default gobbler callback always log to debug level
0299: mDefaultCallback = new DefaultStreamGobblerCallback(
0300: LogManager.DEBUG_MESSAGE_LEVEL);
0301:
0302: mDAGManKnobs = constructDAGManKnobs(properties);
0303: }
0304:
0305: /**
0306: * Checks the destination location for existence, if it can
0307: * be created, if it is writable etc.
0308: *
0309: * @param dir is the new base directory to optionally create.
0310: *
0311: * @throws IOException in case of error while writing out files.
0312: */
0313: protected static void sanityCheck(File dir) throws IOException {
0314: if (dir.exists()) {
0315: // location exists
0316: if (dir.isDirectory()) {
0317: // ok, isa directory
0318: if (dir.canWrite()) {
0319: // can write, all is well
0320: return;
0321: } else {
0322: // all is there, but I cannot write to dir
0323: throw new IOException(
0324: "Cannot write to existing directory "
0325: + dir.getPath());
0326: }
0327: } else {
0328: // exists but not a directory
0329: throw new IOException("Destination " + dir.getPath()
0330: + " already "
0331: + "exists, but is not a directory.");
0332: }
0333: } else {
0334: // does not exist, try to make it
0335: if (!dir.mkdirs()) {
0336: throw new IOException(
0337: "Unable to create directory destination "
0338: + dir.getPath());
0339: }
0340: }
0341: }
0342:
0343: /**
0344: * Callback when the opening tag was parsed. This contains all
0345: * attributes and their raw values within a map. This callback can
0346: * also be used to initialize callback-specific resources.
0347: *
0348: * @param attributes is a map of attribute key to attribute value
0349: */
0350: public void cbDocument(Map attributes) {
0351: mMegaDAG = new ADag();
0352: mJobMap = new HashMap();
0353: //mSequenceMap = new HashMap();
0354:
0355: //the name of the mega dag is set to the name
0356: //attribute in the pdax
0357: mMegaDAG.dagInfo.nameOfADag = (String) attributes.get("name");
0358: mMegaDAG.dagInfo.count = (String) attributes.get("count");
0359: mMegaDAG.dagInfo.index = (String) attributes.get("index");
0360:
0361: // create files in the directory, unless anything else is known.
0362: try {
0363: //create a submit directory structure if required
0364: String relativeDir = (mPOptions
0365: .getRelativeSubmitDirectory() == null) ? this
0366: .createSubmitDirectory(mMegaDAG.getLabel(),
0367: mPOptions.getSubmitDirectory(), mUser,
0368: mPOptions.getVOGroup(),
0369: mProps.useTimestampForDirectoryStructure())
0370: : mPOptions.getRelativeSubmitDirectory();
0371:
0372: //set the directory structure
0373: mPOptions.setSubmitDirectory(mPOptions
0374: .getBaseSubmitDirectory(), relativeDir);
0375: mSubmitDirectory = mPOptions.getSubmitDirectory();
0376:
0377: //we want to set the relative directory as the base working
0378: //directory for all the partition on the remote sites.
0379: mPOptions.setRandomDir(relativeDir);
0380:
0381: mFactory = new FlatFileFactory(mSubmitDirectory); // minimum default
0382: } catch (IOException ioe) {
0383: throw new RuntimeException(
0384: "Unable to generate files in the submit directory ",
0385: ioe);
0386: }
0387:
0388: // not in the PDAX format currently
0389: String s = (String) attributes.get("partitionCount");
0390:
0391: // create hashed, and levelled directories
0392: try {
0393: HashedFileFactory temp = null;
0394: int partCount = (s == null) ?
0395: //determine at runtime the number of partitions
0396: getPartitionCount(mPOptions.getPDAX())
0397: : Integer.parseInt(s);
0398:
0399: //if ( m_minlevel > 0 && m_minlevel > jobCount ) jobCount = m_minlevel;
0400: if (partCount > 0)
0401: temp = new HashedFileFactory(mSubmitDirectory,
0402: partCount);
0403: else
0404: temp = new HashedFileFactory(mPDAXDirectory);
0405:
0406: //each job creates at creates the following files
0407: // - submit file
0408: // - out file
0409: // - error file
0410: // - prescript log
0411: // - the partition directory
0412: temp.setMultiplicator(5);
0413:
0414: //we want a minimum of one level always for clarity
0415: temp.setLevels(1);
0416:
0417: //for the time being and test set files per directory to 50
0418: /*
0419: temp.setFilesPerDirectory( 40 );
0420: temp.setLevelsFromTotals(partCount);
0421: */
0422:
0423: mFactory = temp;
0424:
0425: //write out all the properties into a temp file
0426: //in the root submit directory
0427: //mMDAGPropertiesFile = writeOutProperties( mSubmitDirectory );
0428: mMDAGPropertiesFile = mProps
0429: .writeOutProperties(mSubmitDirectory);
0430:
0431: } catch (NumberFormatException nfe) {
0432: String error = (s == null) ? "Unspecified number for jobCount"
0433: : "Illegal number \"" + s
0434: + "\" for partition count";
0435: throw new RuntimeException(error);
0436: } catch (IOException e) {
0437: //figure out where error happened
0438: String message = (mMDAGPropertiesFile == null) ? "Unable to write out properties file in base submit directory"
0439: : "Base directory creation";
0440:
0441: //wrap into runtime and throw
0442: throw new RuntimeException(message, e);
0443: }
0444:
0445: }
0446:
0447: /**
0448: * Callback for the partition . These partitions are completely
0449: * assembled, but each is passed separately.
0450: *
0451: * @param partition is the PDAX-style partition.
0452: */
0453: public void cbPartition(Partition partition) {
0454: String name = partition.getName();
0455: int index = partition.getIndex();
0456: ArrayList sequenceList = new ArrayList(NUM_OF_EXPANDED_JOBS);
0457: String tailJob;
0458: SubInfo job;
0459:
0460: //get the filename of the dax file containing the partition
0461: String dax = DAXWriter.getPDAXFilename(name, index);
0462:
0463: //construct the path to the file
0464: dax = mPDAXDirectory + File.separator + dax;
0465: File partitionDirectory;
0466: try {
0467: partitionDirectory = mFactory
0468: .createFile(getBaseName(partition));
0469: partitionDirectory.mkdirs();
0470:
0471: //construct a symlink to the dax file in the partition directory
0472: if (!createSymlink(dax, partitionDirectory)) {
0473: mLogger
0474: .log(
0475: "Unable to create symlinks of the dax file to submit dir",
0476: LogManager.WARNING_MESSAGE_LEVEL);
0477: }
0478: } catch (IOException e) {
0479: //wrap and throw
0480: throw new RuntimeException(
0481: "Unable to create partition submit directory ", e);
0482: }
0483:
0484: //construct the appropriate vds-submit-dag job with the
0485: //prescript set as an invocation to gencdag etc.
0486: job = constructDAGJob(partition, partitionDirectory, dax);
0487:
0488: //add to the workflow
0489: mMegaDAG.add(job);
0490:
0491: //map the partition id to the the job that is constructed.
0492: mJobMap.put(partition.getID(), job);
0493:
0494: /**
0495: String jobName = getPegasusJobName(name,index);
0496: //populate the internal job map with jobname and id
0497: mJobMap.put(partition.getID(),getPegasusJobName(name,index));
0498:
0499: //add the sub info for it
0500: job = constructPegasusJob(jobName, file);
0501: mMegaDAG.add(job);
0502:
0503:
0504:
0505: //generate the dagman job that ends up submitting
0506: //the mini dag corresponding to the partition
0507: //mMegaDAG.addNewJob(getJobName(name,index));
0508: tailJob = "condor_submit_" + jobName ;
0509: job = constructCondorSubmitJob(tailJob,name,index);
0510: mMegaDAG.add(job);
0511:
0512: //put the sequence list
0513: sequenceList.add(HEAD_INDEX,jobName);
0514: sequenceList.add(TAIL_INDEX,tailJob);
0515: mSequenceMap.put(jobName,sequenceList);
0516:
0517: //add the relation between jobname and tail job
0518: mMegaDAG.addNewRelation(jobName,tailJob);
0519: */
0520:
0521: }
0522:
0523: /**
0524: * Callback for child and parent relationships from section 3. This ties
0525: * in the relations between the partitions to the relations between the jobs
0526: * that are responsible for partitions. In addition, appropriate cache
0527: * file arguments are generated.
0528: *
0529: * @param child is the IDREF of the child element.
0530: * @param parents is a list of IDREFs of the included parents.
0531: */
0532: public void cbParents(String child, List parents) {
0533: String cacheName;
0534: String cacheArgs = null;
0535:
0536: //get hold of the constructed job for the child.
0537: //the name of the jobs are treated as ID's
0538: SubInfo cJob = getJob(child);
0539: String cID = cJob.getName();
0540:
0541: //glue in the sequences for the expanded things together
0542:
0543: if (!parents.isEmpty()) {
0544: //the megadag should not be invoked with cache option for time being
0545: cacheArgs = " --cache ";
0546: }
0547:
0548: //traverse through the parents to put in the relations
0549: //and the cache file arguments.
0550: String pID;
0551: SubInfo pJob;
0552: for (Iterator it = parents.iterator(); it.hasNext();) {
0553: //get the parent job and name
0554: pJob = (SubInfo) mJobMap.get(it.next());
0555: pID = pJob.getName();
0556:
0557: mLogger.log("Adding Relation " + pID + "->" + cID,
0558: LogManager.DEBUG_MESSAGE_LEVEL);
0559: mMegaDAG.addNewRelation(pID, cID);
0560:
0561: //we need to specify the cache files for those partitions
0562: //even if they are not constructed. there is a disconnect
0563: //as to how the names are being generated. There should be
0564: //a call to one function only.
0565: cacheName = getCacheFilePath(pJob);
0566: cacheArgs += cacheName + ",";
0567: }
0568:
0569: //stuff the arguments back into replanner prescript.
0570: //should be a callout to a different function for portability
0571: String args = cJob.getPreScriptArguments();
0572: //System.out.println("Arguments are " + args);
0573: cJob.setPreScript(cJob.getPreScriptPath(), (cacheArgs == null) ?
0574: //remains the same
0575: args
0576: :
0577: //remove the last instance of , from cache args
0578: args
0579: + cacheArgs.substring(0, cacheArgs
0580: .lastIndexOf(',')));
0581:
0582: }
0583:
0584: /**
0585: * Callback when the parsing of the document is done. This ends up
0586: * triggering the writing of the condor submit files corresponding to the
0587: * mega dag.
0588: */
0589: public void cbDone() {
0590: mDone = true;
0591:
0592: //generate the classad's options
0593: //for the Mega DAG
0594: mMegaDAG.dagInfo.generateFlowName();
0595: mMegaDAG.dagInfo.setFlowTimestamp(mPOptions.getDateTime(mProps
0596: .useExtendedTimeStamp()));
0597: mMegaDAG.dagInfo.setDAXMTime(mPOptions.getPDAX());
0598: mMegaDAG.dagInfo.generateFlowID();
0599: mMegaDAG.dagInfo.setReleaseVersion();
0600:
0601: CodeGenerator codeGenerator = null;
0602: int state = 0;
0603: try {
0604: //load the Condor Writer that understands HashedFile Factories.
0605: codeGenerator = CodeGeneratorFactory.loadInstance(mBag,
0606: CODE_GENERATOR_CLASS);
0607: state = 1;
0608: codeGenerator.generateCode(mMegaDAG);
0609:
0610: //generate only the braindump file that is required.
0611: //no spawning off the tailstatd for time being
0612: codeGenerator.startMonitoring();
0613:
0614: } catch (FactoryException fe) {
0615: throw new FactoryException("PDAX2MDAG", fe);
0616: } catch (Exception e) {
0617: throw new RuntimeException(
0618: "Error while generating code for the workflow", e);
0619: }
0620:
0621: }
0622:
0623: /**
0624: * Returns the MEGADAG that is generated
0625: *
0626: * @return ADag object containing the mega daga
0627: */
0628: public Object getConstructedObject() {
0629: if (!mDone)
0630: throw new RuntimeException(
0631: "Method called before the megadag "
0632: + " was fully generated");
0633:
0634: return mMegaDAG;
0635: }
0636:
0637: /**
0638: * Constructs a job that plans and submits the partitioned workflow,
0639: * referred to by a Partition. The main job itself is a condor dagman job
0640: * that submits the concrete workflow. The concrete workflow is generated by
0641: * running the planner in the prescript for the job.
0642: *
0643: * @param partition the partition corresponding to which the job has to be
0644: * constructed.
0645: * @param directory the submit directory where the submit files for the
0646: * partition should reside.
0647: * @param dax the absolute path to the partitioned dax file that
0648: * corresponds to this partition.
0649: *
0650: * @return the constructed DAG job.
0651: */
0652: protected SubInfo constructDAGJob(Partition partition,
0653: File directory, String dax) {
0654: //for time being use the old functions.
0655: SubInfo job = new SubInfo();
0656: //the parent directory where the submit file for condor dagman has to
0657: //reside. the submit files for the corresponding partition are one level
0658: //deeper.
0659: String parentDir = directory.getParent();
0660:
0661: //set the logical transformation
0662: job.setTransformation(CONDOR_DAGMAN_NAMESPACE,
0663: CONDOR_DAGMAN_LOGICAL_NAME, null);
0664:
0665: //set the logical derivation attributes of the job.
0666: job.setDerivation(CONDOR_DAGMAN_NAMESPACE,
0667: CONDOR_DAGMAN_LOGICAL_NAME, null);
0668:
0669: //always runs on the submit host
0670: job.setSiteHandle("local");
0671:
0672: //set the partition id only as the unique id
0673: //for the time being.
0674: // job.setName(partition.getID());
0675:
0676: //set the logical id for the job same as the partition id.
0677: job.setLogicalID(partition.getID());
0678:
0679: //figure out the relative submit directory where the dagman job should
0680: //reside. It should be one level up from the partition directory.
0681: String dir = "";
0682: dir += (parentDir.equals(mSubmitDirectory)) ?
0683: //the directory is same as the root
0684: dir
0685: :
0686: //get the relative from root
0687: parentDir.substring(mSubmitDirectory.length());
0688: // job.setSubmitDirectory(dir);
0689:
0690: //construct the name of the job as a deep lfn with a directory path
0691: StringBuffer name = new StringBuffer();
0692:
0693: //get the part from the first file separator onwards
0694: name.append((dir.indexOf(File.separatorChar) == 0) ? dir
0695: .substring(1) : dir.substring(0));
0696:
0697: //append a file separator in the end if dir was some name
0698: if (dir.length() > 1) {
0699: name.append(File.separatorChar);
0700: }
0701:
0702: //set the basename for the deep lfn
0703: name.append(partition.getID());
0704: //System.out.println (" The name is " + name.toString());
0705: job.setName(name.toString());
0706:
0707: List entries;
0708: TransformationCatalogEntry entry = null;
0709:
0710: //get the path to condor dagman
0711: try {
0712: entries = mTCHandle.getTCEntries(job.namespace,
0713: job.logicalName, job.version, job.getSiteHandle(),
0714: TCType.INSTALLED);
0715: entry = (entries == null) ? null :
0716: //Gaurang assures that if no record is found then
0717: //TC Mechanism returns null
0718: (TransformationCatalogEntry) entries.get(0);
0719: } catch (Exception e) {
0720: throw new RuntimeException(
0721: "ERROR: While accessing the Transformation Catalog",
0722: e);
0723: }
0724: if (entry == null) {
0725: //throw appropriate error
0726: throw new RuntimeException(
0727: "ERROR: Entry not found in tc for job "
0728: + job.getCompleteTCName() + " on site "
0729: + job.getSiteHandle());
0730: }
0731:
0732: //set the path to the executable and environment string
0733: job.executable = entry.getPhysicalTransformation();
0734: //the environment variable are set later automatically from the tc
0735: //job.envVariables = entry.envString;
0736:
0737: //the job itself is the main job of the super node
0738: //construct the classad specific information
0739: job.jobID = job.getName();
0740: job.jobClass = SubInfo.COMPUTE_JOB;
0741:
0742: //directory where all the dagman related files for the nested dagman
0743: //reside. Same as the directory passed as an input parameter
0744: dir = directory.getAbsolutePath();
0745:
0746: //make the initial dir point to the submit file dir for the partition
0747: //we can do this as we are running this job both on local host, and scheduler
0748: //universe. Hence, no issues of shared filesystem or anything.
0749: job.condorVariables.construct("initialdir", dir);
0750:
0751: //construct the argument string, with all the dagman files
0752: //being generated in the partition directory. Using basenames as
0753: //initialdir has been specified for the job.
0754: StringBuffer sb = new StringBuffer();
0755:
0756: sb.append(" -f -l . -Debug 3").append(" -Lockfile ").append(
0757: getBasename(partition, ".dag.lock")).append(" -Dag ")
0758: .append(getBasename(partition, ".dag")).append(
0759: " -Rescue ").append(
0760: getBasename(partition, ".dag.rescue")).append(
0761: " -Condorlog ").append(
0762: getBasename(partition, ".log"));
0763:
0764: //pass any dagman knobs that were specified in properties file
0765: sb.append(this .mDAGManKnobs);
0766:
0767: //put in the environment variables that are required
0768: job.envVariables.construct("_CONDOR_DAGMAN_LOG",
0769: getAbsolutePath(partition, dir, ".dag.dagman.out"));
0770: job.envVariables.construct("_CONDOR_MAX_DAGMAN_LOG", "0");
0771:
0772: //set the arguments for the job
0773: job.setArguments(sb.toString());
0774:
0775: //the environment need to be propogated for exitcode to be picked up
0776: job.condorVariables.construct("getenv", "TRUE");
0777:
0778: job.condorVariables.construct("remove_kill_sig", "SIGUSR1");
0779:
0780: //the log file for condor dagman for the dagman also needs to be created
0781: //it is different from the log file that is shared by jobs of
0782: //the partition. That is referred to by Condorlog
0783:
0784: // keep the log file common for all jobs and dagman albeit without
0785: // dag.dagman.log suffix
0786: // job.condorVariables.construct("log", getAbsolutePath( partition, dir,".dag.dagman.log"));
0787:
0788: // String dagName = mMegaDAG.dagInfo.nameOfADag;
0789: // String dagIndex= mMegaDAG.dagInfo.index;
0790: // job.condorVariables.construct("log", dir + mSeparator +
0791: // dagName + "_" + dagIndex + ".log");
0792:
0793: //incorporate profiles from the transformation catalog
0794: //and properties for the time being. Not from the site catalog.
0795:
0796: //the profile information from the transformation
0797: //catalog needs to be assimilated into the job
0798: //overriding the one from pool catalog.
0799: job.updateProfiles(entry);
0800:
0801: //the profile information from the properties file
0802: //is assimilated overidding the one from transformation
0803: //catalog.
0804: job.updateProfiles(mProps);
0805:
0806: //constructed the main job. now construct the prescript
0807: //the log file resides in the directory where the condor_dagman
0808: //job resides i.e the parent directory.
0809: StringBuffer log = new StringBuffer();
0810: log.append(parentDir).append(mSeparator).append(
0811: partition.getID()).append(".pre.log");
0812: //set the prescript for the job in the dagman namespace
0813: setPrescript(job, dax, log.toString());
0814:
0815: //construct the braindump file for tailstatd invocations
0816: //the dag should be same as the one passed in the arguments string!
0817: StringBuffer dag = new StringBuffer();
0818: dag.append(dir).append(mSeparator).append(
0819: getBasename(partition, ".dag"));
0820:
0821: //we do not want the job to be launched via kickstart
0822: //Fix for VDS bug number 143
0823: //http://bugzilla.globus.org/vds/show_bug.cgi?id=143
0824: job.vdsNS
0825: .construct(
0826: VDS.GRIDSTART_KEY,
0827: GridStartFactory.GRIDSTART_SHORT_NAMES[GridStartFactory.NO_GRIDSTART_INDEX]);
0828:
0829: return job;
0830: }
0831:
0832: /**
0833: * Writes out the braindump.txt file for a partition in the partition submit
0834: * directory. The braindump.txt file is used for passing to the tailstatd
0835: * daemon that monitors the state of execution of the workflow.
0836: *
0837: * @param directory the directory in which the braindump file needs to
0838: * be written to.
0839: * @param partition the partition for which the braindump is to be written out.
0840: * @param dax the dax file
0841: * @param dag the dag file
0842: *
0843: * @return the absolute path to the braindump file.txt written in the directory.
0844: *
0845: * @throws IOException in case of error while writing out file.
0846: */
0847: protected String writeOutBraindump(File directory,
0848: Partition partition, String dax, String dag)
0849: throws IOException {
0850:
0851: //sanity check on the directory
0852: sanityCheck(directory);
0853:
0854: //create a writer to the braindump.txt in the directory.
0855: File f = new File(directory, "braindump.txt");
0856: PrintWriter writer = new PrintWriter(new BufferedWriter(
0857: new FileWriter(f)));
0858:
0859: //store absolute path to dir just once
0860: String absPath = directory.getAbsolutePath();
0861:
0862: //assemble all the contents in a buffer before writing out
0863: StringBuffer contents = new StringBuffer();
0864: contents.append("dax ").append(dax).append("\n").append("dag ")
0865: .append(dag).append("\n").append("run ")
0866: .append(absPath).append("\n").append("jsd ").append(
0867: absPath).append(mSeparator).append(
0868: "jobstate.log").append("\n").append("rundir ")
0869: .append(directory.getName()).append("\n").append(
0870: "pegasushome ").append(mProps.getPegasusHome())
0871: .append("\n").append("vogroup pegasus").append("\n").//for time being
0872: append("label " + partition.getName());
0873:
0874: writer.write(contents.toString());
0875: writer.close();
0876:
0877: return f.getAbsolutePath();
0878: }
0879:
0880: /**
0881: * Writes out the properties to a temporary file in the directory passed.
0882: *
0883: * @param directory the directory in which the properties file needs to
0884: * be written to.
0885: *
0886: * @return the absolute path to the properties file written in the directory.
0887: *
0888: * @throws IOException in case of error while writing out file.
0889: */
0890: protected String writeOutProperties(String directory)
0891: throws IOException {
0892: File dir = new File(directory);
0893:
0894: //sanity check on the directory
0895: sanityCheck(dir);
0896:
0897: //we only want to write out the VDS properties for time being
0898: Properties properties = mProps.matchingSubset("pegasus", true);
0899:
0900: //create a temporary file in directory
0901: File f = File.createTempFile("pegasus.", ".properties", dir);
0902:
0903: //the header of the file
0904: StringBuffer header = new StringBuffer(64);
0905: header.append("PEGASUS USER PROPERTIES AT RUNTIME \n").append(
0906: "#ESCAPES IN VALUES ARE INTRODUCED");
0907:
0908: //create an output stream to this file and write out the properties
0909: OutputStream os = new FileOutputStream(f);
0910: properties.store(os, header.toString());
0911: os.close();
0912:
0913: return f.getAbsolutePath();
0914: }
0915:
0916: /**
0917: * Sets the prescript that ends up calling to the default wrapper that
0918: * introduces retry into Pegasus for a particular job.
0919: *
0920: * @param job the job whose prescript needs to be set.
0921: * @param daxURL the path to the dax file on the filesystem.
0922: * @param log the file where the output of the prescript needs to be
0923: * redirected to.
0924: *
0925: * @see #RETRY_LOGICAL_NAME
0926: */
0927: protected void setPrescript(SubInfo job, String daxURL, String log) {
0928: setPrescript(job, daxURL, log, this .NAMESPACE,
0929: RETRY_LOGICAL_NAME, null);
0930: }
0931:
0932: /**
0933: * Sets the prescript that ends up calling to the default wrapper that
0934: * introduces retry into Pegasus for a particular job.
0935: *
0936: * @param job the job whose prescript needs to be set.
0937: * @param daxURL the path to the dax file on the filesystem.
0938: * @param log the file where the output of the prescript needs to be
0939: * redirected to.
0940: * @param namespace the namespace of the replanner utility.
0941: * @param name the logical name of the replanner.
0942: * @param version the version of the replanner to be picked up.
0943: *
0944: */
0945: protected void setPrescript(SubInfo job, String daxURL, String log,
0946: String namespace, String name, String version) {
0947: String site = job.getSiteHandle();
0948: TransformationCatalogEntry entry = null;
0949:
0950: //get the path to script wrapper from the
0951: try {
0952: List entries = mTCHandle.getTCEntries(namespace, name,
0953: version, site, TCType.INSTALLED);
0954:
0955: //get the first entry from the list returned
0956: entry = (entries == null) ? null :
0957: //Gaurang assures that if no record is found then
0958: //TC Mechanism returns null
0959: ((TransformationCatalogEntry) entries.get(0));
0960: } catch (Exception e) {
0961: throw new RuntimeException(
0962: "ERROR: While accessing the Transformation Catalog",
0963: e);
0964: }
0965:
0966: //construct the prescript path
0967: StringBuffer script = new StringBuffer();
0968: if (entry == null) {
0969: //log to debug
0970: mLogger
0971: .log(
0972: "Constructing the default path to the replanner for prescript",
0973: LogManager.DEBUG_MESSAGE_LEVEL);
0974:
0975: //construct the default path to the executable
0976: script.append(mProps.getPegasusHome()).append(mSeparator)
0977: .append("bin").append(mSeparator).append(
0978: RETRY_LOGICAL_NAME);
0979: } else {
0980: script.append(entry.getPhysicalTransformation());
0981: }
0982:
0983: //the output of the prescript i.e submit files should be created
0984: //in the directory where the job is being run.
0985: mClonedPOptions.setSubmitDirectory((String) job.condorVariables
0986: .get("initialdir"));
0987:
0988: //generate the remote working directory for the paritition
0989: String submit = mClonedPOptions.getSubmitDirectory(); // like /tmp/vahi/pegasus/blackdiamond/run0001/00/PID1
0990: String remoteBase = mPOptions.getRandomDir(); // like vahi/pegasus/blackdiamond/run0001
0991: String remoteWorkDir = submit.substring(submit
0992: .indexOf(remoteBase)); //gets us vahi/pegasus/blackdiamond/run0001/00/PID1
0993: mClonedPOptions.setRandomDir(remoteWorkDir);
0994: mLogger.log("Remote working directory set to " + remoteWorkDir
0995: + " for partition " + job.getID(),
0996: LogManager.DEBUG_MESSAGE_LEVEL);
0997:
0998: //set the basename for the nested dag as the ID of the job.
0999: //which is actually the basename of the deep lfn job name!!
1000: mClonedPOptions.setBasenamePrefix(getBasenamePrefix(job));
1001:
1002: //set the flag designating that the planning invocation is part
1003: //of a deferred planning run
1004: mClonedPOptions.setPartOfDeferredRun(true);
1005:
1006: //in case of deferred planning cleanup wont work
1007: //explicitly turn it off
1008: mClonedPOptions.setCleanup(false);
1009:
1010: //we want monitoring to happen
1011: mClonedPOptions.setMonitoring(true);
1012:
1013: //construct the argument string.
1014: //add the jvm options and the pegasus options if any
1015: StringBuffer arguments = new StringBuffer();
1016: arguments./*append( mPOptions.toJVMOptions())*/
1017: append(" -Dpegasus.user.properties=").append(
1018: mMDAGPropertiesFile).append(" -Dpegasus.log.*=")
1019: .append(log).
1020: //the dax argument is diff for each partition
1021: append(" --dax ").append(daxURL).
1022: //put in all the other options.
1023: append(mClonedPOptions.toOptions());
1024:
1025: //set the path and the arguments to prescript
1026: job.setPreScript(script.toString(), arguments.toString());
1027: }
1028:
1029: /**
1030: * Returns the base name of the submit directory in which the submit files
1031: * for a particular partition reside.
1032: *
1033: * @param partition the partition for which the base directory is to be
1034: * constructed.
1035: *
1036: * @return the base name of the partition.
1037: */
1038: protected String getBaseName(Partition partition) {
1039: String id = partition.getID();
1040: StringBuffer sb = new StringBuffer(id.length() + 1);
1041: sb.append('P').append(id);
1042: return sb.toString();
1043: }
1044:
1045: /**
1046: * Returns the absolute path to a dagman (usually) related file for a
1047: * particular partition in the submit directory that is passed as an input
1048: * parameter. This does not create the file, just returns an absolute path
1049: * to it. Useful for constructing argument string for condor_dagman.
1050: *
1051: * @param partition the partition for which the dagman is responsible for
1052: * execution.
1053: * @param directory the directory where the file should reside.
1054: * @param suffix the suffix for the file basename.
1055: *
1056: * @return the absolute path to a file in the submit directory.
1057: */
1058: protected String getAbsolutePath(Partition partition,
1059: String directory, String suffix) {
1060:
1061: StringBuffer sb = new StringBuffer();
1062: //add a prefix P to partition id
1063: sb.append(directory).append(mSeparator).append(
1064: getBasename(partition, suffix));
1065:
1066: return sb.toString();
1067: }
1068:
1069: /**
1070: * Returns the basename of a dagman (usually) related file for a particular
1071: * partition.
1072: *
1073: * @param partition the partition for which the dagman is responsible for
1074: * execution.
1075: * @param suffix the suffix for the file basename.
1076: *
1077: * @return the basename.
1078: */
1079: protected String getBasename(Partition partition, String suffix) {
1080: StringBuffer sb = new StringBuffer(16);
1081: //add a prefix P
1082: sb.append('P').append(partition.getID()).append(suffix);
1083: return sb.toString();
1084: }
1085:
1086: /**
1087: * Returns the basename prefix of a dagman (usually) related file for a
1088: * a job that submits nested dagman.
1089: *
1090: * @param job the job that submits a nested dagman.
1091: *
1092: * @return the basename.
1093: */
1094: protected String getBasenamePrefix(SubInfo job) {
1095: StringBuffer sb = new StringBuffer(8);
1096: //add a prefix P
1097: sb.append('P').append(job.getLogicalID());
1098: return sb.toString();
1099: }
1100:
1101: /**
1102: * Returns the full path to a cache file that corresponds for one partition.
1103: * The cache file resides in the submit directory for the partition for which
1104: * the job is responsible for.
1105: *
1106: * @param job the job running on the submit host that submits the partition.
1107: *
1108: * @return the full path to the file.
1109: */
1110: protected String getCacheFilePath(SubInfo job) {
1111: StringBuffer sb = new StringBuffer();
1112:
1113: //cache file is being generated in the initialdir set for the job.
1114: //intialdir is set correctly to the submit directory for nested dag.
1115: sb.append(job.condorVariables.get("initialdir")).append(
1116: File.separatorChar).append(getBasenamePrefix(job))
1117: .append(".cache");
1118:
1119: return sb.toString();
1120: }
1121:
1122: /**
1123: * Returns the number of partitions referred to in the PDAX file.
1124: *
1125: * @param source the source file that has to be symlinked.
1126: * @param destDir the destination directory where the symlink has to be
1127: * placed.
1128: *
1129: * @return the number of partitions in the pdax file.
1130: */
1131: protected boolean createSymlink(String source, File destDir) {
1132: boolean result = false;
1133:
1134: //do some sanity checks on the source and the destination
1135: File f = new File(source);
1136: if (!f.exists() || !f.canRead()) {
1137: mLogger.log("The source for symlink does not exist "
1138: + source, LogManager.ERROR_MESSAGE_LEVEL);
1139: return result;
1140: }
1141: if (!destDir.exists() || !destDir.isDirectory()
1142: || !destDir.canWrite()) {
1143: mLogger.log(
1144: "The destination directory cannot be written to "
1145: + destDir, LogManager.ERROR_MESSAGE_LEVEL);
1146: return result;
1147: }
1148:
1149: try {
1150: //set the callback and run the grep command
1151: Runtime r = Runtime.getRuntime();
1152: String command = "ln -s " + source + " "
1153: + destDir.getAbsolutePath();
1154: mLogger.log("Creating symlink " + command,
1155: LogManager.DEBUG_MESSAGE_LEVEL);
1156: Process p = r.exec(command);
1157:
1158: //spawn off the gobblers with the already initialized default callback
1159: StreamGobbler ips = new StreamGobbler(p.getInputStream(),
1160: mDefaultCallback);
1161: StreamGobbler eps = new StreamGobbler(p.getErrorStream(),
1162: mDefaultCallback);
1163:
1164: ips.start();
1165: eps.start();
1166:
1167: //wait for the threads to finish off
1168: ips.join();
1169: eps.join();
1170:
1171: //get the status
1172: int status = p.waitFor();
1173: if (status != 0) {
1174: mLogger.log("Command " + command
1175: + " exited with status " + status,
1176: LogManager.DEBUG_MESSAGE_LEVEL);
1177: return result;
1178: }
1179: result = true;
1180: } catch (IOException ioe) {
1181: mLogger.log("IOException while creating symbolic links ",
1182: ioe, LogManager.ERROR_MESSAGE_LEVEL);
1183: } catch (InterruptedException ie) {
1184: //ignore
1185: }
1186: return result;
1187: }
1188:
1189: /**
1190: * Returns the number of partitions referred to in the PDAX file.
1191: *
1192: * @param pdax the path to the pdax file.
1193: *
1194: * @return the number of partitions in the pdax file.
1195: */
1196: protected int getPartitionCount(String pdax) {
1197: int result = 0;
1198: File f = new File(pdax);
1199: if (!f.exists() || !f.canRead()) {
1200: throw new RuntimeException("PDAX File is unreadable "
1201: + pdax);
1202: }
1203:
1204: try {
1205: //set the callback and run the grep command
1206: String word = "<partition";
1207: GrepCallback c = new GrepCallback(word);
1208: Runtime r = Runtime.getRuntime();
1209: String env[] = { "PATH=/bin:/usr/bin" };
1210: String command = "grep " + word + " " + pdax;
1211: Process p = r.exec(command, env);
1212:
1213: //spawn off the gobblers
1214: StreamGobbler ips = new StreamGobbler(p.getInputStream(), c);
1215: StreamGobbler eps = new StreamGobbler(p.getErrorStream(),
1216: new StreamGobblerCallback() {
1217: //we cannot log to any of the default stream
1218: LogManager mLogger = LogManager.getInstance();
1219:
1220: public void work(String s) {
1221: mLogger.log(
1222: "Output on stream gobller error stream "
1223: + s,
1224: LogManager.DEBUG_MESSAGE_LEVEL);
1225: }
1226: });
1227: ips.start();
1228: eps.start();
1229:
1230: //wait for the threads to finish off
1231: ips.join();
1232: result = c.getCount();
1233: eps.join();
1234:
1235: //get the status
1236: int status = p.waitFor();
1237: if (status != 0) {
1238: mLogger.log("Command " + command
1239: + " exited with status " + status,
1240: LogManager.WARNING_MESSAGE_LEVEL);
1241: }
1242:
1243: } catch (IOException ioe) {
1244: mLogger.log(
1245: "IOException while determining partition count ",
1246: ioe, LogManager.ERROR_MESSAGE_LEVEL);
1247: } catch (InterruptedException ie) {
1248: //ignore
1249: }
1250: return result;
1251: }
1252:
1253: /**
1254: * Returns the job that has been constructed for a particular partition.
1255: *
1256: * @param id the partition id.
1257: *
1258: * @return the corresponding job, else null if not found.
1259: */
1260: protected SubInfo getJob(String id) {
1261: Object obj = mJobMap.get(id);
1262: return (obj == null) ? null : (SubInfo) obj;
1263: }
1264:
1265: /**
1266: * Creates the submit directory for the workflow. This is not thread safe.
1267: *
1268: * @param label the label of the workflow being worked upon.
1269: * @param dir the base directory specified by the user.
1270: * @param user the username of the user.
1271: * @param vogroup the vogroup to which the user belongs to.
1272: * @param timestampBased boolean indicating whether to have a timestamp based dir or not
1273: *
1274: * @return the directory name created relative to the base directory passed
1275: * as input.
1276: *
1277: * @throws IOException in case of unable to create submit directory.
1278: */
1279: protected String createSubmitDirectory(String label, String dir,
1280: String user, String vogroup, boolean timestampBased)
1281: throws IOException {
1282: File base = new File(dir);
1283: StringBuffer result = new StringBuffer();
1284:
1285: //do a sanity check on the base
1286: sanityCheck(base);
1287:
1288: //add the user name if possible
1289: base = new File(base, user);
1290: result.append(user).append(File.separator);
1291:
1292: //add the vogroup
1293: base = new File(base, vogroup);
1294: sanityCheck(base);
1295: result.append(vogroup).append(File.separator);
1296:
1297: //add the label of the DAX
1298: base = new File(base, label);
1299: sanityCheck(base);
1300: result.append(label).append(File.separator);
1301:
1302: //create the directory name
1303: StringBuffer leaf = new StringBuffer();
1304: if (timestampBased) {
1305: leaf.append(mPOptions.getDateTime(mProps
1306: .useExtendedTimeStamp()));
1307: } else {
1308: //get all the files in this directory
1309: String[] files = base
1310: .list(new RunDirectoryFilenameFilter());
1311: //find the maximum run directory
1312: int num, max = 1;
1313: for (int i = 0; i < files.length; i++) {
1314: num = Integer.parseInt(files[i]
1315: .substring(SUBMIT_DIRECTORY_PREFIX.length()));
1316: if (num + 1 > max) {
1317: max = num + 1;
1318: }
1319: }
1320:
1321: //create the directory name
1322: leaf.append(SUBMIT_DIRECTORY_PREFIX).append(
1323: mNumFormatter.format(max));
1324: }
1325: result.append(leaf.toString());
1326: base = new File(base, leaf.toString());
1327: mLogger.log("Directory to be created is "
1328: + base.getAbsolutePath(),
1329: LogManager.DEBUG_MESSAGE_LEVEL);
1330: sanityCheck(base);
1331:
1332: return result.toString();
1333: }
1334:
1335: /**
1336: * Constructs Any extra arguments that need to be passed to dagman, as determined
1337: * from the properties file.
1338: *
1339: * @param properties the <code>PegasusProperties</code>
1340: *
1341: * @return any arguments to be added, else empty string
1342: */
1343: protected String constructDAGManKnobs(PegasusProperties properties) {
1344: StringBuffer sb = new StringBuffer();
1345:
1346: //get all the values for the dagman knows
1347: int value;
1348: for (int i = 0; i < this .DAGMAN_KNOBS.length; i++) {
1349: value = parseInt(properties
1350: .getProperty(this .DAGMAN_KNOBS[i][0]));
1351: if (value > 0) {
1352: //add the option
1353: sb.append(this .DAGMAN_KNOBS[i][1]);
1354: sb.append(value);
1355: }
1356: }
1357: return sb.toString();
1358:
1359: }
1360:
1361: /**
1362: * Parses a string into an integer. Non valid values returned as -1
1363: *
1364: * @param s the String to be parsed as integer
1365: *
1366: * @return the int value if valid, else -1
1367: */
1368: protected int parseInt(String s) {
1369: int value = -1;
1370: try {
1371: value = Integer.parseInt(s);
1372: } catch (Exception e) {
1373: //ignore
1374: }
1375: return value;
1376: }
1377:
1378: /**
1379: * A small utility method that constructs the name of the Condor files
1380: * that are generated when a dag is submitted. The default separator _ is
1381: * used.
1382: *
1383: * @param name the name attribute in the partition element of the pdax.
1384: * @param index the partition number of the partition.
1385: * @param suffix the suffix that needs to be added to the filename.
1386: *
1387: * @return the name of the condor file.
1388: */
1389: private String getCondorFileName(String name, int index,
1390: String suffix) {
1391: return getCondorFileName(name, index, suffix, "_");
1392: }
1393:
1394: /**
1395: * A small utility method that constructs the name of the Condor files
1396: * that are generated when a dag is submitted.
1397: *
1398: * @param name the name attribute in the partition element of the pdax.
1399: * @param index the partition number of the partition.
1400: * @param suffix the suffix that needs to be added to the filename
1401: * @param separator the separator that is to be used while constructing
1402: * the filename.
1403: *
1404: * @return the name of the condor file
1405: */
1406: private String getCondorFileName(String name, int index,
1407: String suffix, String separator) {
1408: StringBuffer sb = new StringBuffer();
1409: //all the files reside in the submit file
1410: //directory specified by the user.
1411: //sb.append(mPOptions.submitFileDir).append(File.separator);
1412: sb.append(name).append(separator).append(index).append(suffix);
1413:
1414: return sb.toString();
1415: }
1416:
1417: /**
1418: * An inner class, that implements the StreamGobblerCallback to count
1419: * the occurences of a word in a document.
1420: *
1421: */
1422: private class GrepCallback implements StreamGobblerCallback {
1423:
1424: /**
1425: * The word that is to be searched for.
1426: */
1427: private String mWord;
1428:
1429: /**
1430: * The length of the word to be searched for.
1431: */
1432: private int mWordLength;
1433:
1434: /**
1435: * The number of times the word appears.
1436: */
1437: private int mCount;
1438:
1439: /**
1440: * Overloaded Constructor.
1441: *
1442: * @param word the word to be searched for.
1443: */
1444: public GrepCallback(String word) {
1445: mWord = word;
1446: mWordLength = (word == null) ? 0 : word.length();
1447: mCount = 0;
1448: }
1449:
1450: /**
1451: * Callback whenever a line is read from the stream by the StreamGobbler.
1452: * Counts the occurences of the word that are in the line, and
1453: * increments to the global counter.
1454: *
1455: * @param line the line that is read.
1456: */
1457: public void work(String line) {
1458:
1459: //sanity check to prevent infinite iterations
1460: if (mWordLength == 0)
1461: return;
1462:
1463: int start = 0;
1464: int index;
1465: while ((index = line.indexOf(mWord, start)) != -1) {
1466: mCount++;
1467: start = index + mWordLength;
1468: }
1469: }
1470:
1471: /**
1472: * Returns the number of words counted so far.
1473: *
1474: * @return the number of words
1475: */
1476: public int getCount() {
1477: return mCount;
1478: }
1479:
1480: /**
1481: * Resets the internal counters.
1482: */
1483: public void reset() {
1484: mCount = 0;
1485: }
1486:
1487: }
1488: }
1489:
1490: /**
1491: * A filename filter for identifying the run directory
1492: *
1493: * @author Karan Vahi vahi@isi.edu
1494: */
1495: class RunDirectoryFilenameFilter implements FilenameFilter {
1496:
1497: /**
1498: * Store the regular expressions necessary to parse kickstart output files
1499: */
1500: private static final String mRegexExpression = "("
1501: + PDAX2MDAG.SUBMIT_DIRECTORY_PREFIX
1502: + ")([0-9][0-9][0-9][0-9])";
1503:
1504: /**
1505: * Stores compiled patterns at first use, quasi-Singleton.
1506: */
1507: private static Pattern mPattern = null;
1508:
1509: /***
1510: * Tests if a specified file should be included in a file list.
1511: *
1512: * @param dir the directory in which the file was found.
1513: * @param name - the name of the file.
1514: *
1515: * @return true if and only if the name should be included in the file list
1516: * false otherwise.
1517: *
1518: *
1519: */
1520: public boolean accept(File dir, String name) {
1521: //compile the pattern only once.
1522: if (mPattern == null) {
1523: mPattern = Pattern.compile(mRegexExpression);
1524: }
1525: return mPattern.matcher(name).matches();
1526: }
1527:
1528: }
|