Source Code Cross Referenced for Kickstart.java in  » Workflow-Engines » pegasus-2.1.0 » org » griphyn » cPlanner » code » gridstart » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Workflow Engines » pegasus 2.1.0 » org.griphyn.cPlanner.code.gridstart 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        /**
0002:         * This file or a portion of this file is licensed under the terms of
0003:         * the Globus Toolkit Public License, found in file GTPL, or at
0004:         * http://www.globus.org/toolkit/download/license.html. This notice must
0005:         * appear in redistributions of this file, with or without modification.
0006:         *
0007:         * Redistributions of this Software, with or without modification, must
0008:         * reproduce the GTPL in: (1) the Software, or (2) the Documentation or
0009:         * some other similar material which is provided with the Software (if
0010:         * any).
0011:         *
0012:         * Copyright 1999-2004 University of Chicago and The University of
0013:         * Southern California. All rights reserved.
0014:         */package org.griphyn.cPlanner.code.gridstart;
0015:
0016:        import org.griphyn.cPlanner.classes.ADag;
0017:        import org.griphyn.cPlanner.classes.SubInfo;
0018:        import org.griphyn.cPlanner.classes.AggregatedJob;
0019:        import org.griphyn.cPlanner.classes.TransferJob;
0020:        import org.griphyn.cPlanner.classes.SiteInfo;
0021:        import org.griphyn.cPlanner.classes.PegasusFile;
0022:        import org.griphyn.cPlanner.classes.FileTransfer;
0023:        import org.griphyn.cPlanner.classes.NameValue;
0024:        import org.griphyn.cPlanner.classes.PegasusBag;
0025:        import org.griphyn.cPlanner.classes.PlannerOptions;
0026:
0027:        import org.griphyn.cPlanner.common.LogManager;
0028:        import org.griphyn.cPlanner.common.PegasusProperties;
0029:
0030:        import org.griphyn.cPlanner.poolinfo.PoolInfoProvider;
0031:        import org.griphyn.cPlanner.namespace.Condor;
0032:        import org.griphyn.cPlanner.namespace.ENV;
0033:        import org.griphyn.cPlanner.namespace.VDS;
0034:
0035:        import org.griphyn.cPlanner.code.GridStart;
0036:        import org.griphyn.cPlanner.code.POSTScript;
0037:
0038:        import org.griphyn.cPlanner.code.generator.condor.CondorQuoteParser;
0039:        import org.griphyn.cPlanner.code.generator.condor.CondorQuoteParserException;
0040:
0041:        import org.griphyn.cPlanner.transfer.SLS;
0042:
0043:        import org.griphyn.cPlanner.transfer.sls.SLSFactory;
0044:        import org.griphyn.cPlanner.transfer.sls.SLSFactoryException;
0045:
0046:        import java.util.Collection;
0047:        import java.util.Iterator;
0048:        import java.util.StringTokenizer;
0049:        import java.util.Set;
0050:        import java.util.List;
0051:        import java.util.ArrayList;
0052:
0053:        import java.io.File;
0054:        import java.io.FileWriter;
0055:        import java.io.IOException;
0056:        import org.griphyn.cPlanner.code.CodeGeneratorException;
0057:
0058:        /**
0059:         * This enables a job to be run on the grid, by launching it through kickstart.
0060:         * The kickstart executable is a light-weight program which  connects  the
0061:         * stdin,  stdout  and  stderr  filehandles for VDS jobs on the remote
0062:         * site.
0063:         * <p>
0064:         * Sitting in between the remote scheduler and the executable, it is
0065:         * possible  for  kickstart  to  gather additional information about the
0066:         * executable run-time behavior, including the  exit  status  of  jobs.
0067:         * <p>
0068:         * Kickstart is an executable distributed with VDS that can generally be found
0069:         * at $PEGASUS_HOME/bin/kickstart
0070:         *
0071:         * @author Karan Vahi vahi@isi.edu
0072:         * @version $Revision: 454 $
0073:         */
0074:        public class Kickstart implements  GridStart {
0075:
0076:            /**
0077:             * The suffix for the kickstart input file, that is generated to use
0078:             * invoke at the remote end.
0079:             */
0080:            public static final String KICKSTART_INPUT_SUFFIX = "arg";
0081:
0082:            /**
0083:             * The basename of the class that is implmenting this. Could have
0084:             * been determined by reflection.
0085:             */
0086:            public static final String CLASSNAME = "Kickstart";
0087:
0088:            /**
0089:             * The SHORTNAME for this implementation.
0090:             */
0091:            public static final String SHORT_NAME = "kickstart";
0092:
0093:            /**
0094:             * The environment variable used to the set Kickstart SETUP JOB.
0095:             */
0096:            public static final String KICKSTART_SETUP = "GRIDSTART_SETUP";
0097:
0098:            /**
0099:             * The environment variable used to the set Kickstart PREJOB.
0100:             */
0101:            public static final String KICKSTART_PREJOB = "GRIDSTART_PREJOB";
0102:
0103:            /**
0104:             * The environment variable used to the set Kickstart POSTJOB.
0105:             */
0106:            public static final String KICKSTART_POSTJOB = "GRIDSTART_POSTJOB";
0107:
0108:            /**
0109:             * The environment variable used to the set Kickstart CLEANUP JOB.
0110:             */
0111:            public static final String KICKSTART_CLEANUP = "GRIDSTART_CLEANUP";
0112:
0113:            /**
0114:             * The LogManager object which is used to log all the messages.
0115:             */
0116:            private LogManager mLogger;
0117:
0118:            /**
0119:             * The object holding all the properties pertaining to Pegasus.
0120:             */
0121:            private PegasusProperties mProps;
0122:
0123:            /**
0124:             * The options passed to the planner.
0125:             */
0126:            private PlannerOptions mPOptions;
0127:
0128:            /**
0129:             * The handle to the workflow that is being enabled.
0130:             */
0131:            private ADag mConcDAG;
0132:
0133:            /**
0134:             * Handle to the site catalog.
0135:             */
0136:            private PoolInfoProvider mSiteHandle;
0137:
0138:            /**
0139:             * The submit directory where the submit files are being generated for
0140:             * the workflow.
0141:             */
0142:            private String mSubmitDir;
0143:
0144:            /**
0145:             * A boolean indicating whether to use invoke always or not.
0146:             */
0147:            private boolean mInvokeAlways;
0148:
0149:            /**
0150:             * A boolean indicating whether to stat files or not.
0151:             */
0152:            private boolean mDoStat;
0153:
0154:            /**
0155:             * A boolean indicating whether to generate lof files or not.
0156:             */
0157:            private boolean mGenerateLOF;
0158:
0159:            /**
0160:             * The invoke limit trigger.
0161:             */
0162:            private long mInvokeLength;
0163:
0164:            /**
0165:             * A boolean indicating whether to have worker node execution or not.
0166:             */
0167:            private boolean mWorkerNodeExecution;
0168:
0169:            /**
0170:             * The handle to the SLS implementor
0171:             */
0172:            private SLS mSLS;
0173:
0174:            /**
0175:             * An instance variable to track if enabling is happening as part of a clustered job.
0176:             * See Bug 21 comments on Pegasus Bugzilla
0177:             */
0178:            private boolean mEnablingPartOfAggregatedJob;
0179:
0180:            /**
0181:             * Initializes the GridStart implementation.
0182:             *
0183:             * @param bag   the bag of objects that is used for initialization.
0184:             * @param dag   the concrete dag so far.
0185:             */
0186:            public void initialize(PegasusBag bag, ADag dag) {
0187:
0188:                mProps = bag.getPegasusProperties();
0189:                mPOptions = bag.getPlannerOptions();
0190:                mSubmitDir = mPOptions.getSubmitDirectory();
0191:                mInvokeAlways = mProps.useInvokeInGridStart();
0192:                mInvokeLength = mProps.getGridStartInvokeLength();
0193:                mDoStat = mProps.doStatWithKickstart();
0194:                mGenerateLOF = mProps.generateLOFFiles();
0195:                mLogger = LogManager.getInstance();
0196:                mConcDAG = dag;
0197:                mSiteHandle = bag.getHandleToSiteCatalog();
0198:
0199:                mWorkerNodeExecution = mProps.executeOnWorkerNode();
0200:                if (mWorkerNodeExecution) {
0201:                    //load SLS
0202:                    mSLS = SLSFactory.loadInstance(bag);
0203:                }
0204:                mEnablingPartOfAggregatedJob = false;
0205:            }
0206:
0207:            /**
0208:             * Enables a collection of jobs and puts them into an AggregatedJob.
0209:             * The assumption here is that all the jobs are being enabled by the same
0210:             * implementation. It enables the jobs and puts them into the AggregatedJob
0211:             * that is passed to it.
0212:             * However, to create a valid single XML file, it suppresses the header
0213:             * creation for all but the first job.
0214:             *
0215:             * @param aggJob the AggregatedJob into which the collection has to be
0216:             *               integrated.
0217:             * @param jobs   the collection of jobs (SubInfo) that need to be enabled.
0218:             *
0219:             * @return the AggregatedJob containing the enabled jobs.
0220:             * @see #enable(SubInfo,boolean)
0221:             */
0222:            public AggregatedJob enable(AggregatedJob aggJob, Collection jobs) {
0223:                boolean first = true;
0224:
0225:                //we do not want the jobs being clustered to be enabled
0226:                //for worker node execution just yet.
0227:                mEnablingPartOfAggregatedJob = true;
0228:
0229:                for (Iterator it = jobs.iterator(); it.hasNext();) {
0230:                    SubInfo job = (SubInfo) it.next();
0231:                    if (first) {
0232:                        first = false;
0233:                    } else {
0234:                        //we need to pass -H to kickstart
0235:                        //to suppress the header creation
0236:                        job.vdsNS.construct(VDS.GRIDSTART_ARGUMENTS_KEY, "-H");
0237:                    }
0238:
0239:                    //always pass isGlobus true as always
0240:                    //interested only in executable strargs
0241:                    //due to the fact that seqexec does not allow for setting environment
0242:                    //per constitutent job, we cannot set the postscript removal option
0243:                    this .enable(job, true, mDoStat, false);
0244:                    aggJob.add(job);
0245:                    //check if any files are being transferred via
0246:                    //Condor and add to Aggregated Job
0247:                    //add condor keys to transfer files
0248:                    //This is now taken care of in the merge profiles section
0249:                    //            if(job.condorVariables.containsKey(Condor.TRANSFER_IP_FILES_KEY)){
0250:                    //              aggJob.condorVariables.addIPFileForTransfer(
0251:                    //                                          (String)job.condorVariables.get( Condor.TRANSFER_IP_FILES_KEY) );
0252:                    //
0253:                    //           }
0254:                }
0255:
0256:                //set the flag back to false
0257:                mEnablingPartOfAggregatedJob = false;
0258:
0259:                return aggJob;
0260:            }
0261:
0262:            /**
0263:             * Enables a job to run on the grid by launching it through kickstart.
0264:             * Does the stdio, and stderr handling of the job to be run on the grid.
0265:             * It modifies the job description, and also constructs all the valid
0266:             * option to be passed to kickstart for launching the executable.
0267:             *
0268:             * @param job  the <code>SubInfo</code> object containing the job description
0269:             *             of the job that has to be enabled on the grid.
0270:             * @param isGlobusJob is <code>true</code>, if the job generated a
0271:             *        line <code>universe = globus</code>, and thus runs remotely.
0272:             *        Set to <code>false</code>, if the job runs on the submit
0273:             *        host in any way.
0274:             *
0275:             * @return boolean true if enabling was successful,else false in case when
0276:             *         the path to kickstart could not be determined on the site where
0277:             *         the job is scheduled.
0278:             */
0279:            public boolean enable(SubInfo job, boolean isGlobusJob) {
0280:                return this .enable(job, isGlobusJob, mDoStat, true);
0281:            }
0282:
0283:            /**
0284:             * Enables a job to run on the grid by launching it through kickstart.
0285:             * Does the stdio, and stderr handling of the job to be run on the grid.
0286:             * It modifies the job description, and also constructs all the valid
0287:             * option to be passed to kickstart for launching the executable.
0288:             *
0289:             * @param job  the <code>SubInfo</code> object containing the job description
0290:             *             of the job that has to be enabled on the grid.
0291:             * @param isGlobusJob is <code>true</code>, if the job generated a
0292:             *        line <code>universe = globus</code>, and thus runs remotely.
0293:             *        Set to <code>false</code>, if the job runs on the submit
0294:             *        host in any way.
0295:             * @param stat  boolean indicating whether to generate the lof files
0296:             *                     for kickstart stat option or not.
0297:             * @param addPostScript boolean indicating whether to add a postscript or not.
0298:             *
0299:             * @return boolean true if enabling was successful,else false in case when
0300:             *         the path to kickstart could not be determined on the site where
0301:             *         the job is scheduled.
0302:             */
0303:            protected boolean enable(SubInfo job, boolean isGlobusJob,
0304:                    boolean stat, boolean addPostScript) {
0305:
0306:                //take care of relative submit directory if specified.
0307:                String submitDir = mSubmitDir + mSeparator;
0308:                //        String submitDir = getSubmitDirectory( mSubmitDir , job) + mSeparator;
0309:
0310:                //To get the gridstart/kickstart path on the remote
0311:                //pool, querying with entry for vanilla universe.
0312:                //In the new format the gridstart is associated with the
0313:                //pool not pool, condor universe
0314:                SiteInfo site = mSiteHandle.getPoolEntry(job.executionPool,
0315:                        Condor.VANILLA_UNIVERSE);
0316:                String gridStartPath = site.getKickstartPath();
0317:                //sanity check
0318:                if (gridStartPath == null) {
0319:                    return false;
0320:                }
0321:                StringBuffer gridStartArgs = new StringBuffer();
0322:
0323:                // the executable is gridstart, the application becomes its argument
0324:                gridStartArgs.append("-n ");
0325:                gridStartArgs.append(job.getCompleteTCName());
0326:                gridStartArgs.append(' ');
0327:                gridStartArgs.append("-N ");
0328:                gridStartArgs.append(job.getCompleteDVName());
0329:                gridStartArgs.append(' ');
0330:
0331:                // handle stdin
0332:                if (job.stdIn.length() > 0) {
0333:
0334:                    //for using the transfer script and other vds executables the
0335:                    //input file is transferred from the submit host by Condor to
0336:                    //stdin. We fool the kickstart to pick up the input file from
0337:                    //standard stdin by giving the input file name as -
0338:                    if (job.logicalName
0339:                            .equals(org.griphyn.cPlanner.transfer.implementation.Transfer.TRANSFORMATION_NAME)
0340:                            || job.logicalName
0341:                                    .equals(org.griphyn.cPlanner.transfer.implementation.T2.TRANSFORMATION_NAME)
0342:                            || job.logicalName
0343:                                    .equals(org.griphyn.cPlanner.cluster.aggregator.SeqExec.COLLAPSE_LOGICAL_NAME)
0344:                            || job.logicalName
0345:                                    .equals(org.griphyn.cPlanner.cluster.aggregator.MPIExec.COLLAPSE_LOGICAL_NAME)
0346:                            || job.logicalName
0347:                                    .equals(org.griphyn.cPlanner.engine.cleanup.Cleanup.TRANSFORMATION_NAME)) {
0348:
0349:                        //condor needs to pick up the job stdin and
0350:                        //transfer it to the remote end
0351:                        construct(job, "input", submitDir + job.getStdIn());
0352:                        gridStartArgs.append("-i ").append("-").append(' ');
0353:
0354:                    } else {
0355:                        //kickstart provides the app's *tracked* stdin
0356:                        gridStartArgs.append("-i ").append(job.stdIn).append(
0357:                                ' ');
0358:                    }
0359:                }
0360:
0361:                // handle stdout
0362:                if (job.stdOut.length() > 0) {
0363:                    // gridstart saves the app's *tracked* stdout
0364:                    gridStartArgs.append("-o ").append(job.stdOut).append(' ');
0365:                }
0366:
0367:                // the Condor output variable and kickstart -o option
0368:                // must not point to the same file for any local job.
0369:                if (job.stdOut.equals(job.jobName + ".out") && !isGlobusJob) {
0370:                    mLogger.log("Detected WAW conflict for stdout",
0371:                            LogManager.WARNING_MESSAGE_LEVEL);
0372:                }
0373:                // the output of gridstart is propagated back to the submit host
0374:                construct(job, "output", submitDir + job.jobName + ".out");
0375:
0376:                if (isGlobusJob) {
0377:                    construct(job, "transfer_output", "true");
0378:                }
0379:
0380:                // handle stderr
0381:                if (job.stdErr.length() > 0) {
0382:                    // gridstart saves the app's *tracked* stderr
0383:                    gridStartArgs.append("-e ").append(job.stdErr).append(' ');
0384:                }
0385:
0386:                // the Condor error variable and kickstart -e option
0387:                // must not point to the same file for any local job.
0388:                if (job.stdErr.equals(job.jobName + ".err") && !isGlobusJob) {
0389:                    mLogger.log("Detected WAW conflict for stderr",
0390:                            LogManager.WARNING_MESSAGE_LEVEL);
0391:                }
0392:                // the error from gridstart is propagated back to the submit host
0393:                construct(job, "error", submitDir + job.jobName + ".err");
0394:                if (isGlobusJob) {
0395:                    construct(job, "transfer_error", "true");
0396:                }
0397:
0398:                //we need to pass the resource handle
0399:                //to kickstart as argument
0400:                gridStartArgs.append("-R ").append(job.executionPool).append(
0401:                        ' ');
0402:
0403:                //handle the -w option that asks kickstart to change
0404:                //directory before launching an executable.
0405:                String style = (String) job.vdsNS.get(VDS.STYLE_KEY);
0406:                if (job.vdsNS.getBooleanValue(VDS.CHANGE_DIR_KEY)
0407:                        && !mWorkerNodeExecution) {
0408:                    style = (String) job.vdsNS.get(VDS.STYLE_KEY);
0409:
0410:                    //            Commented to take account of submitting to condor pool
0411:                    //            directly or glide in nodes. However, does not work for
0412:                    //            standard universe jobs. Also made change in Kickstart
0413:                    //            to pick up only remote_initialdir Karan Nov 15,2005
0414:                    String directory = (style
0415:                            .equalsIgnoreCase(VDS.GLOBUS_STYLE) || style
0416:                            .equalsIgnoreCase(VDS.GLIDEIN_STYLE)) ? (String) job.condorVariables
0417:                            .removeKey("remote_initialdir")
0418:                            : (String) job.condorVariables
0419:                                    .removeKey("initialdir");
0420:
0421:                    //pass the directory as an argument to kickstart
0422:                    gridStartArgs.append("-w ").append(directory).append(' ');
0423:                }
0424:
0425:                if (job.vdsNS.getBooleanValue(VDS.TRANSFER_PROXY_KEY)) {
0426:                    //just remove the remote_initialdir key
0427:                    //the job needs to be run in the directory
0428:                    //Condor or GRAM decides to run
0429:                    job.condorVariables.removeKey("remote_initialdir");
0430:
0431:                }
0432:
0433:                if (mWorkerNodeExecution && !mEnablingPartOfAggregatedJob) {
0434:                    enableForWorkerNodeExecution(job, gridStartArgs);
0435:                }
0436:
0437:                //check if the job type indicates staging of executable
0438:                //        The -X functionality is handled by the setup jobs that
0439:                //        are added as childern to the stage in jobs.
0440:                //        Karan November 22, 2005
0441:                //        if(job.getJobClassInt() == SubInfo.STAGED_COMPUTE_JOB){
0442:                //            //add the -X flag to denote turning on
0443:                //            gridStartArgs.append("-X ");
0444:                //       }
0445:
0446:                //add the stat options to kickstart only for certain jobs for time being
0447:                //and if the input variable is true
0448:                if (stat) {
0449:                    if (job.getJobType() == SubInfo.COMPUTE_JOB
0450:                            || job.getJobType() == SubInfo.STAGED_COMPUTE_JOB
0451:                            || job.getJobType() == SubInfo.CLEANUP_JOB
0452:                            || job.getJobType() == SubInfo.STAGE_IN_JOB
0453:                            || job.getJobType() == SubInfo.INTER_POOL_JOB) {
0454:
0455:                        String lof;
0456:                        List files = new ArrayList(2);
0457:
0458:                        //inefficient check here again. just a prototype
0459:                        //we need to generate -S option only for non transfer jobs
0460:                        //generate the list of filenames file for the input and output files.
0461:                        if (!(job instanceof  TransferJob)) {
0462:                            lof = generateListofFilenamesFile(job
0463:                                    .getInputFiles(), job.getID() + ".in.lof");
0464:                            if (lof != null) {
0465:                                File file = new File(lof);
0466:                                job.condorVariables.addIPFileForTransfer(lof);
0467:                                //arguments just need basename . no path component
0468:                                gridStartArgs.append("-S @").append(
0469:                                        file.getName()).append(" ");
0470:                                files.add(file.getName());
0471:                            }
0472:                        }
0473:
0474:                        //for cleanup jobs no generation of stats for output files
0475:                        if (job.getJobType() != SubInfo.CLEANUP_JOB) {
0476:                            lof = generateListofFilenamesFile(job
0477:                                    .getOutputFiles(), job.getID() + ".out.lof");
0478:                            if (lof != null) {
0479:                                File file = new File(lof);
0480:                                job.condorVariables.addIPFileForTransfer(lof);
0481:                                //arguments just need basename . no path component
0482:                                gridStartArgs.append("-s @").append(
0483:                                        file.getName()).append(" ");
0484:                                files.add(file.getName());
0485:                            }
0486:                        }
0487:                        //add kickstart postscript that removes these files
0488:                        if (addPostScript) {
0489:                            addCleanupPostScript(job, files);
0490:                        }
0491:                    }
0492:                }//end of if ( stat )
0493:                else if (mGenerateLOF) {
0494:                    //dostat is false. so no generation of stat option
0495:                    //but generate lof files nevertheless
0496:
0497:                    //inefficient check here again. just a prototype
0498:                    //we need to generate -S option only for non transfer jobs
0499:                    //generate the list of filenames file for the input and output files.
0500:                    if (!(job instanceof  TransferJob)) {
0501:                        generateListofFilenamesFile(job.getInputFiles(), job
0502:                                .getID()
0503:                                + ".in.lof");
0504:                    }
0505:
0506:                    //for cleanup jobs no generation of stats for output files
0507:                    if (job.getJobType() != SubInfo.CLEANUP_JOB) {
0508:                        generateListofFilenamesFile(job.getOutputFiles(), job
0509:                                .getID()
0510:                                + ".out.lof");
0511:
0512:                    }
0513:                }///end of mGenerateLOF
0514:
0515:                //append any arguments that need to be passed
0516:                //kickstart directly, set elsewhere
0517:                if (job.vdsNS.containsKey(VDS.GRIDSTART_ARGUMENTS_KEY)) {
0518:                    gridStartArgs.append(
0519:                            job.vdsNS.get(VDS.GRIDSTART_ARGUMENTS_KEY)).append(
0520:                            ' ');
0521:                }
0522:
0523:                if (mProps.generateKickstartExtraOptions() && mConcDAG != null) {
0524:                    gridStartArgs.append("-L ").append(mConcDAG.getLabel())
0525:                            .append(" ");
0526:                    gridStartArgs.append("-T ").append(mConcDAG.getMTime())
0527:                            .append(" ");
0528:                }
0529:
0530:                long argumentLength = gridStartArgs.length()
0531:                        + job.executable.length() + 1 + job.strargs.length();
0532:                if (mInvokeAlways || argumentLength > mInvokeLength) {
0533:                    if (!useInvoke(job, gridStartArgs)) {
0534:                        mLogger.log("Unable to use invoke for job ",
0535:                                LogManager.ERROR_MESSAGE_LEVEL);
0536:                        return false;
0537:                    }
0538:                } else {
0539:                    gridStartArgs.append(job.executable).append(' ').append(
0540:                            job.strargs);
0541:                }
0542:
0543:                //the executable path and arguments are put
0544:                //in the Condor namespace and not printed to the
0545:                //file so that they can be overriden if desired
0546:                //later through profiles and key transfer_executable
0547:                construct(job, "executable", handleTransferOfExecutable(job,
0548:                        gridStartPath));
0549:                construct(job, "arguments", gridStartArgs.toString());
0550:
0551:                //all finished successfully
0552:                return true;
0553:            }
0554:
0555:            /**
0556:             * It changes the paths to the executable depending on whether we want to
0557:             * transfer the executable or not. If the transfer_executable is set to true,
0558:             * then the executable needs to be shipped from the submit host meaning the
0559:             * local pool. This function changes the path of the executable to the one on
0560:             * the local pool, so that it can be shipped.
0561:             *
0562:             * @param job   the <code>SubInfo</code> containing the job description.
0563:             * @param path  the path to kickstart on the remote compute site.
0564:             *
0565:             * @return the path that needs to be set as the executable
0566:             */
0567:            protected String handleTransferOfExecutable(SubInfo job, String path) {
0568:                Condor cvar = job.condorVariables;
0569:
0570:                if (!cvar.getBooleanValue("transfer_executable")) {
0571:                    //the executable paths are correct and
0572:                    //point to the executable on the remote pool
0573:                    return path;
0574:                }
0575:
0576:                SiteInfo site = mSiteHandle.getPoolEntry("local",
0577:                        Condor.VANILLA_UNIVERSE);
0578:                String gridStartPath = site.getKickstartPath();
0579:                if (gridStartPath == null) {
0580:                    mLogger.log(
0581:                            "Gridstart needs to be shipped from the submit host to pool"
0582:                                    + job.getSiteHandle()
0583:                                    + ".\nNo entry for it in pool local",
0584:                            LogManager.ERROR_MESSAGE_LEVEL);
0585:                    throw new RuntimeException(
0586:                            "GridStart needs to be shipped from submit host to site "
0587:                                    + job.getSiteHandle() + " for job "
0588:                                    + job.getName());
0589:
0590:                }
0591:
0592:                return gridStartPath;
0593:
0594:            }
0595:
0596:            /**
0597:             * Enables a job for worker node execution, by calling out to the SLS
0598:             * interface to do the second level staging. Also adds the appropriate
0599:             * prejob/setup job/post/cleanup jobs to the job if required.
0600:             *
0601:             *
0602:             * @param job     the job to be enabled
0603:             * @param args    the arguments constructed so far.
0604:             */
0605:            protected void enableForWorkerNodeExecution(SubInfo job,
0606:                    StringBuffer args) {
0607:                String style = (String) job.vdsNS.get(VDS.STYLE_KEY);
0608:
0609:                if (job.getJobType() == SubInfo.COMPUTE_JOB
0610:                        || job.getJobType() == SubInfo.STAGED_COMPUTE_JOB) {
0611:                    mLogger.log("Enabling job for worker node execution "
0612:                            + job.getName(), LogManager.DEBUG_MESSAGE_LEVEL);
0613:
0614:                    //To Do handle staged compute jobs also.
0615:                    //and clustered jobs
0616:
0617:                    //remove the remote or initial dir's for the compute jobs
0618:                    String key = (style.equalsIgnoreCase(VDS.GLOBUS_STYLE)) ? "remote_initialdir"
0619:                            : "initialdir";
0620:
0621:                    String directory = (String) job.condorVariables
0622:                            .removeKey(key);
0623:
0624:                    String destDir = mSiteHandle.getEnvironmentVariable(job
0625:                            .getSiteHandle(), "wntmp");
0626:                    destDir = (destDir == null) ? "/tmp" : destDir;
0627:
0628:                    String relativeDir = mPOptions.getRelativeSubmitDirectory();
0629:                    String workerNodeDir = destDir + File.separator
0630:                            + relativeDir.replaceAll("/", "-");
0631:
0632:                    //pass the worker node directory as an argument to kickstart
0633:                    //because most jobmanagers cannot handle worker node tmp
0634:                    //as they check for existance on the head node
0635:                    StringBuffer xBitSetInvocation = null;
0636:                    if (!mSLS.doesCondorModifications()) {
0637:                        //only valid if job does not use SLS condor
0638:                        args.append("-W ").append(workerNodeDir).append(' ');
0639:
0640:                        //handle for staged compute jobs. set their X bit after
0641:                        // SLS has happened
0642:                        if (job.getJobType() == SubInfo.STAGED_COMPUTE_JOB) {
0643:                            xBitSetInvocation = new StringBuffer();
0644:                            xBitSetInvocation.append("/bin/chmod 600 ");
0645:
0646:                            for (Iterator it = job.getInputFiles().iterator(); it
0647:                                    .hasNext();) {
0648:                                PegasusFile pf = (PegasusFile) it.next();
0649:                                if (pf.getType() == PegasusFile.EXECUTABLE_FILE) {
0650:                                    //                            //the below does not work as kickstart attempts to
0651:                                    //                            //set the X bit before running any prejobs
0652:                                    //                            args.append( "-X " ).append( workerNodeDir ).
0653:                                    //                                append( File.separator ).append( pf.getLFN() ).append(' ');
0654:                                    xBitSetInvocation.append(pf.getLFN())
0655:                                            .append(" ");
0656:                                }
0657:                            }
0658:                        }
0659:                    }
0660:
0661:                    //always have the remote dir set to /tmp as we are
0662:                    //banking upon kickstart to change the directory for us
0663:                    job.condorVariables.construct(key, "/tmp");
0664:
0665:                    //see if we need to generate a SLS input file in the submit directory
0666:                    File slsInputFile = null;
0667:                    if (mSLS.needsSLSInput(job)) {
0668:                        //generate the sls file with the mappings in the submit directory
0669:                        slsInputFile = mSLS.generateSLSInputFile(job, mSLS
0670:                                .getSLSInputLFN(job), mSubmitDir, directory,
0671:                                workerNodeDir);
0672:
0673:                        //construct a setup job not reqd as kickstart creating the directory
0674:                        //String setupJob = constructSetupJob( job, workerNodeDir );
0675:                        //setupJob = quote( setupJob );
0676:                        //job.envVariables.construct( this.KICKSTART_SETUP, setupJob );
0677:
0678:                        File headNodeSLS = new File(directory, slsInputFile
0679:                                .getName());
0680:                        String preJob = mSLS.invocationString(job, headNodeSLS);
0681:
0682:                        if (preJob != null) {
0683:                            /*
0684:                                //add the x bit invocation if required
0685:                                //this is required till kickstart -X feature is fixed
0686:                                //it needs to be invoked after the prejob
0687:                                if( xBitSetInvocation != null ){
0688:                                    if( preJob.startsWith( "/bin/bash" ) ){
0689:                                        //remove the last " and add the x bit invocation
0690:                                        if( preJob.lastIndexOf( "\"" ) == preJob.length() - 1 ){
0691:                                            preJob = preJob.substring( 0, preJob.length() - 1 );
0692:                                            xBitSetInvocation.append( "\"" );
0693:                                            preJob += " && " + xBitSetInvocation.toString();
0694:                                        }
0695:                                    }
0696:                                    else{
0697:                                        //prepend a /bin/bash -c invocation
0698:                                        preJob = "/bin/bash -c \"" + preJob + xBitSetInvocation.toString();
0699:                                    }
0700:                                }
0701:                             */
0702:
0703:                            preJob = quote(preJob);
0704:                            job.envVariables.construct(this .KICKSTART_PREJOB,
0705:                                    preJob);
0706:                        }
0707:                    }
0708:
0709:                    //see if we need to generate a SLS output file in the submit directory
0710:                    File slsOutputFile = null;
0711:                    if (mSLS.needsSLSOutput(job)) {
0712:                        //construct the postjob that transfers the output files
0713:                        //back to head node directory
0714:                        //to fix later. right now post job only created is pre job
0715:                        //created
0716:                        slsOutputFile = mSLS.generateSLSOutputFile(job, mSLS
0717:                                .getSLSOutputLFN(job), mSubmitDir, directory,
0718:                                workerNodeDir);
0719:
0720:                        //generate the post job
0721:                        File headNodeSLS = new File(directory, slsOutputFile
0722:                                .getName());
0723:                        String postJob = mSLS
0724:                                .invocationString(job, headNodeSLS);
0725:                        if (postJob != null) {
0726:                            postJob = quote(postJob);
0727:                            job.envVariables.construct(this .KICKSTART_POSTJOB,
0728:                                    postJob);
0729:                        }
0730:                    }
0731:
0732:                    //modify the job if required
0733:                    if (!mSLS.modifyJobForWorkerNodeExecution(job, mSiteHandle
0734:                            .getURLPrefix(job.getSiteHandle()), directory,
0735:                            workerNodeDir)) {
0736:
0737:                        throw new RuntimeException("Unable to modify job "
0738:                                + job.getName() + " for worker node execution");
0739:
0740:                    }
0741:
0742:                    //only to have cleanup job when not using condor modifications
0743:                    if (!mSLS.doesCondorModifications()) {
0744:                        String cleanupJob = constructCleanupJob(job,
0745:                                workerNodeDir);
0746:                        if (cleanupJob != null) {
0747:                            cleanupJob = quote(cleanupJob);
0748:                            job.envVariables.construct(this .KICKSTART_CLEANUP,
0749:                                    cleanupJob);
0750:                        }
0751:                    }
0752:                }
0753:            }
0754:
0755:            /**
0756:             * Indicates whether the enabling mechanism can set the X bit
0757:             * on the executable on the remote grid site, in addition to launching
0758:             * it on the remote grid site.
0759:             *
0760:             * @return true indicating Kickstart can set the X bit or not.
0761:             */
0762:            public boolean canSetXBit() {
0763:                return true;
0764:            }
0765:
0766:            /**
0767:             * Returns the value of the vds profile with key as VDS.GRIDSTART_KEY,
0768:             * that would result in the loading of this particular implementation.
0769:             * It is usually the name of the implementing class without the
0770:             * package name.
0771:             *
0772:             * @return the value of the profile key.
0773:             * @see org.griphyn.cPlanner.namespace.VDS#GRIDSTART_KEY
0774:             */
0775:            public String getVDSKeyValue() {
0776:                return this .CLASSNAME;
0777:            }
0778:
0779:            /**
0780:             * Returns a short textual description in the form of the name of the class.
0781:             *
0782:             * @return  short textual description.
0783:             */
0784:            public String shortDescribe() {
0785:                return this .SHORT_NAME;
0786:            }
0787:
0788:            /**
0789:             * Returns the SHORT_NAME for the POSTScript implementation that is used
0790:             * to be as default with this GridStart implementation.
0791:             *
0792:             * @return  the identifier for the ExitPOST POSTScript implementation.
0793:             *
0794:             * @see POSTScript#shortDescribe()
0795:             */
0796:            public String defaultPOSTScript() {
0797:                return ExitPOST.SHORT_NAME;
0798:            }
0799:
0800:            /**
0801:             * Triggers the creation of the kickstart input file, that contains the
0802:             * the remote executable and the arguments with which it has to be invoked.
0803:             * The kickstart input file is created in the submit directory.
0804:             *
0805:             * @param job  the <code>SubInfo</code> object containing the job description.
0806:             * @param args the arguments buffer for gridstart invocation so far.
0807:             *
0808:             * @return boolean indicating whether kickstart input file was generated or not.
0809:             *                 false in case of any error.
0810:             */
0811:            private boolean useInvoke(SubInfo job, StringBuffer args) {
0812:                boolean result = true;
0813:
0814:                String inputBaseName = job.jobName + "."
0815:                        + this .KICKSTART_INPUT_SUFFIX;
0816:
0817:                //writing the stdin file
0818:                try {
0819:                    FileWriter input;
0820:                    input = new FileWriter(new File(mSubmitDir, inputBaseName));
0821:                    //the first thing that goes in is the executable name
0822:                    input.write(job.executable);
0823:                    input.write("\n");
0824:                    //write out all the arguments
0825:                    //one on each line
0826:                    StringTokenizer st = new StringTokenizer(job.strargs);
0827:                    while (st.hasMoreTokens()) {
0828:                        input.write(st.nextToken());
0829:                        input.write("\n");
0830:                    }
0831:                    //close the stream
0832:                    input.close();
0833:                } catch (Exception e) {
0834:                    mLogger.log(
0835:                            "Unable to write the kickstart input file for job "
0836:                                    + job.getCompleteTCName() + " "
0837:                                    + e.getMessage(),
0838:                            LogManager.ERROR_MESSAGE_LEVEL);
0839:                    return false;
0840:                }
0841:
0842:                //construct list of files that need to be transferred
0843:                //via Condor file transfer mechanism
0844:                String fileList;
0845:                if (job.condorVariables
0846:                        .containsKey(Condor.TRANSFER_IP_FILES_KEY)) {
0847:                    //update the existing list.
0848:                    fileList = (String) job.condorVariables
0849:                            .get(Condor.TRANSFER_IP_FILES_KEY);
0850:                    if (fileList != null) {
0851:                        fileList += "," + inputBaseName;
0852:                    }
0853:                } else {
0854:                    fileList = inputBaseName;
0855:                }
0856:
0857:                construct(job, Condor.TRANSFER_IP_FILES_KEY, fileList);
0858:                construct(job, "should_transfer_files", "YES");
0859:                construct(job, "when_to_transfer_output", "ON_EXIT");
0860:
0861:                //add the -I argument to kickstart
0862:                args.append("-I ").append(inputBaseName).append(" ");
0863:                return result;
0864:            }
0865:
0866:            /**
0867:             * Constructs a kickstart setup job
0868:             *
0869:             * @param job           the job to be run.
0870:             * @param workerNodeTmp the worker node tmp to run the job in.
0871:             *
0872:             * @return String
0873:             */
0874:            protected String constructSetupJob(SubInfo job, String workerNodeTmp) {
0875:                StringBuffer setup = new StringBuffer();
0876:
0877:                setup.append("/bin/mkdir -p ").append(workerNodeTmp);
0878:
0879:                return setup.toString();
0880:            }
0881:
0882:            /**
0883:             * Constructs a kickstart setup job
0884:             *
0885:             * @param job           the job to be run.
0886:             * @param workerNodeTmp the worker node tmp to run the job in.
0887:             *
0888:             * @return String
0889:             */
0890:            protected String constructCleanupJob(SubInfo job,
0891:                    String workerNodeTmp) {
0892:                StringBuffer setup = new StringBuffer();
0893:
0894:                setup.append("/bin/rm -rf ").append(workerNodeTmp);
0895:
0896:                return setup.toString();
0897:            }
0898:
0899:            /**
0900:             * Constructs the prejob  that fetches sls file, and then invokes transfer
0901:             * again.
0902:             *
0903:             * @param job   the job for which the prejob is being created
0904:             * @param headNodeURLPrefix String
0905:             * @param headNodeDirectory String
0906:             * @param workerNodeDirectory String
0907:             * @param slsFile String
0908:             *
0909:             * @return String containing the prescript invocation
0910:             */
0911:            protected String constructPREJob(SubInfo job,
0912:                    String headNodeURLPrefix, String headNodeDirectory,
0913:                    String workerNodeDirectory, String slsFile) {
0914:
0915:                File headNodeSLS = new File(headNodeDirectory, slsFile);
0916:                return mSLS.invocationString(job, headNodeSLS);
0917:
0918:                //first we need to get the sls file to worker node
0919:                /*
0920:                preJob.append( "/bin/echo -e \" " ).
0921:                       append( headNodeURLPrefix ).append( File.separator ).
0922:                       append( headNodeDirectory ).append( File.separator ).
0923:                       append( slsFile ).append( " \\n " ).
0924:                       append( "file://" ).append( workerNodeDirectory ).append( File.separator ).
0925:                       append( slsFile ).append( "\"" ).
0926:                       append( " | " ).append( transfer ).append( " base mnt " );
0927:
0928:                preJob.append( " && " );
0929:
0930:                //now we need to get transfer to execute this sls file
0931:                preJob.append( transfer ).append( " base mnt < " ).append( slsFile );
0932:                 */
0933:
0934:            }
0935:
0936:            /**
0937:             * Constructs the post job  that fetches sls file, and then invokes transfer
0938:             * again.
0939:             *
0940:             * @param job   the job for which the prejob is being created
0941:             * @param headNodeURLPrefix String
0942:             * @param headNodeDirectory String
0943:             * @param workerNodeDirectory String
0944:             * @param slsFile String
0945:             *
0946:             * @return String containing the postscript invocation
0947:             */
0948:            protected String constructPOSTJob(SubInfo job,
0949:                    String headNodeURLPrefix, String headNodeDirectory,
0950:                    String workerNodeDirectory, String slsFile) {
0951:
0952:                StringBuffer postJob = new StringBuffer();
0953:
0954:                //first figure out the path to transfer
0955:                //hardcoded for now
0956:                String transfer = "/nfs/home/vahi/PEGASUS/default/bin/transfer";
0957:
0958:                //no need to figure out proxy as already done in prejob?
0959:                String proxy = null;
0960:                StringBuffer proxyPath = null;
0961:                for (Iterator it = job.getInputFiles().iterator(); it.hasNext();) {
0962:                    PegasusFile pf = (PegasusFile) it.next();
0963:                    if (pf instanceof  FileTransfer
0964:                            && pf.getLFN().equals(ENV.X509_USER_PROXY_KEY)) {
0965:                        //there is a proxy that needs to be set for the job
0966:                        //actually set it in prejob somehow.
0967:                        proxy = ((NameValue) ((FileTransfer) pf).getDestURL())
0968:                                .getValue();
0969:                        proxy = new File(proxy).getName();
0970:                        proxyPath = new StringBuffer();
0971:                        proxyPath.append(headNodeDirectory).append(
0972:                                File.separator).append(proxy);
0973:                        job.envVariables.construct(ENV.X509_USER_PROXY_KEY,
0974:                                proxyPath.toString());
0975:                        break;
0976:                    }
0977:                }
0978:
0979:                //add the command to chmod the proxy
0980:                if (proxy != null) {
0981:                    postJob.append("/bin/bash -c \"chmod 600 ").append(
0982:                            proxyPath.toString()).append(" && ");
0983:                }
0984:
0985:                postJob.append(transfer).append(" base mnt ").append(
0986:                        headNodeDirectory).append(File.separator).append(
0987:                        slsFile);
0988:
0989:                if (proxy != null) {
0990:                    //add the end quote
0991:                    postJob.append("\"");
0992:                }
0993:
0994:                return postJob.toString();
0995:            }
0996:
0997:            /**
0998:             * Writes out the list of filenames file for the job.
0999:             *
1000:             * @param files  the list of <code>PegasusFile</code> objects contains the files
1001:             *               whose stat information is required.
1002:             *
1003:             * @param basename   the basename of the file that is to be created
1004:             *
1005:             * @return the full path to lof file created, else null if no file is written out.
1006:             */
1007:            public String generateListofFilenamesFile(Set files, String basename) {
1008:                //sanity check
1009:                if (files == null || files.isEmpty()) {
1010:                    return null;
1011:                }
1012:
1013:                String result = null;
1014:                //writing the stdin file
1015:                try {
1016:                    File f = new File(mSubmitDir, basename);
1017:                    FileWriter input;
1018:                    input = new FileWriter(f);
1019:                    PegasusFile pf;
1020:                    for (Iterator it = files.iterator(); it.hasNext();) {
1021:                        pf = (PegasusFile) it.next();
1022:                        input.write(pf.getLFN());
1023:                        input.write("\n");
1024:                    }
1025:                    //close the stream
1026:                    input.close();
1027:                    result = f.getAbsolutePath();
1028:
1029:                } catch (IOException e) {
1030:                    mLogger.log("Unable to write the lof file " + basename, e,
1031:                            LogManager.ERROR_MESSAGE_LEVEL);
1032:                }
1033:
1034:                return result;
1035:            }
1036:
1037:            /**
1038:             * Constructs a condor variable in the condor profile namespace
1039:             * associated with the job. Overrides any preexisting key values.
1040:             *
1041:             * @param job   contains the job description.
1042:             * @param key   the key of the profile.
1043:             * @param value the associated value.
1044:             */
1045:            private void construct(SubInfo job, String key, String value) {
1046:                job.condorVariables.construct(key, value);
1047:            }
1048:
1049:            /**
1050:             * Condor Quotes a string
1051:             *
1052:             * @param string   the string to be quoted.
1053:             *
1054:             * @return quoted string.
1055:             */
1056:            private String quote(String string) {
1057:                String result;
1058:                try {
1059:                    mLogger.log("Unquoted Prejob is  " + string,
1060:                            LogManager.DEBUG_MESSAGE_LEVEL);
1061:                    result = CondorQuoteParser.quote(string, false);
1062:                    mLogger.log("Quoted Prejob is  " + result,
1063:                            LogManager.DEBUG_MESSAGE_LEVEL);
1064:                } catch (CondorQuoteParserException e) {
1065:                    throw new RuntimeException("CondorQuoting Problem "
1066:                            + e.getMessage());
1067:                }
1068:                return result;
1069:
1070:            }
1071:
1072:            /**
1073:             * Adds a /bin/rm post job to kickstart that removes the files passed.
1074:             * The post jobs is added as an environment variable.
1075:             *
1076:             * @param job   the job in which the post job needs to be added.
1077:             * @param files the files to be deleted.
1078:             */
1079:            private void addCleanupPostScript(SubInfo job, List files) {
1080:                //sanity check
1081:                if (files == null || !mDoStat || files.isEmpty()) {
1082:                    return;
1083:                }
1084:
1085:                //do not add if job already has a postscript specified
1086:                if (job.envVariables.containsKey(this .KICKSTART_CLEANUP)) {
1087:                    mLogger
1088:                            .log(
1089:                                    "Not adding lof cleanup as another kickstart cleanup already exists",
1090:                                    LogManager.DEBUG_MESSAGE_LEVEL);
1091:                    return;
1092:                }
1093:
1094:                StringBuffer ps = new StringBuffer();
1095:                //maybe later we might want to pick it up from the TC
1096:                ps.append("/bin/rm -rf").append(" ");
1097:                for (Iterator it = files.iterator(); it.hasNext();) {
1098:                    ps.append(it.next()).append(" ");
1099:                }
1100:
1101:                job.envVariables.construct(this.KICKSTART_CLEANUP, ps
1102:                        .toString());
1103:
1104:                return;
1105:            }
1106:
1107:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.