Source Code Cross Referenced for SeqExec.java in  » Workflow-Engines » pegasus-2.1.0 » org » griphyn » cPlanner » cluster » aggregator » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Workflow Engines » pegasus 2.1.0 » org.griphyn.cPlanner.cluster.aggregator 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /**
002:         * This file or a portion of this file is licensed under the terms of
003:         * the Globus Toolkit Public License, found in file GTPL, or at
004:         * http://www.globus.org/toolkit/download/license.html. This notice must
005:         * appear in redistributions of this file, with or without modification.
006:         *
007:         * Redistributions of this Software, with or without modification, must
008:         * reproduce the GTPL in: (1) the Software, or (2) the Documentation or
009:         * some other similar material which is provided with the Software (if
010:         * any).
011:         *
012:         * Copyright 1999-2004 University of Chicago and The University of
013:         * Southern California. All rights reserved.
014:         */package org.griphyn.cPlanner.cluster.aggregator;
015:
016:        import org.griphyn.cPlanner.cluster.JobAggregator;
017:
018:        import org.griphyn.cPlanner.code.gridstart.GridStartFactory;
019:        import org.griphyn.cPlanner.code.gridstart.ExitPOST;
020:
021:        import org.griphyn.cPlanner.classes.ADag;
022:        import org.griphyn.cPlanner.classes.AggregatedJob;
023:        import org.griphyn.cPlanner.classes.SubInfo;
024:
025:        import org.griphyn.cPlanner.common.PegasusProperties;
026:        import org.griphyn.cPlanner.common.LogManager;
027:
028:        import org.griphyn.cPlanner.namespace.VDS;
029:        import org.griphyn.cPlanner.namespace.Dagman;
030:
031:        import java.util.List;
032:
033:        import java.io.File;
034:        import org.griphyn.cPlanner.namespace.Condor;
035:        import org.griphyn.cPlanner.code.GridStart;
036:        import org.griphyn.cPlanner.classes.SiteInfo;
037:        import org.griphyn.cPlanner.classes.PegasusBag;
038:
039:        /**
040:         * This class aggregates the smaller jobs in a manner such that
041:         * they are launched at remote end, sequentially on a single node using
042:         * seqexec. The executable seqexec is a VDS tool distributed in the VDS worker
043:         * package, and can be usually found at $PEGASUS_HOME/bin/seqexec.
044:         *
045:         * @author Karan Vahi vahi@isi.edu
046:         * @version $Revision: 451 $
047:         */
048:
049:        public class SeqExec extends Abstract {
050:
051:            /**
052:             * The logical name of the transformation that is able to run multiple
053:             * jobs sequentially.
054:             */
055:            public static final String COLLAPSE_LOGICAL_NAME = "seqexec";
056:
057:            /**
058:             * The suffix to be applied to seqexec progress report file.
059:             */
060:            public static final String SEQEXEC_PROGRESS_REPORT_SUFFIX = ".prg";
061:
062:            /**
063:             * Flag indicating whether a global log file or per job file.
064:             */
065:            private boolean mGlobalLog;
066:
067:            /**
068:             * Flag indicating whether to fail on first hard error or not.
069:             */
070:            private boolean mFailOnFirstError;
071:
072:            /**
073:             * The default constructor.
074:             */
075:            public SeqExec() {
076:                super ();
077:            }
078:
079:            /**
080:             *Initializes the JobAggregator impelementation
081:             *
082:             * @param dag  the workflow that is being clustered.
083:             * @param bag   the bag of objects that is useful for initialization.
084:             *
085:             */
086:            public void initialize(ADag dag, PegasusBag bag) {
087:                super .initialize(dag, bag);
088:                mFailOnFirstError = false;
089:                mGlobalLog = bag.getPegasusProperties()
090:                        .jobAggregatorLogGlobal();
091:            }
092:
093:            /**
094:             * Constructs a new aggregated job that contains all the jobs passed to it.
095:             * The new aggregated job, appears as a single job in the workflow and
096:             * replaces the jobs it contains in the workflow.
097:             * <p>
098:             * The seqexec uses kickstart to invoke each of the smaller constituent
099:             * jobs. The kickstart output appears on the stdout of the seqexec. Hence,
100:             * the seqexec itself is not being kickstarted. At the same time, appropriate
101:             * postscript is constructed to be invoked on the job.
102:             *
103:             * @param jobs the list of <code>SubInfo</code> objects that need to be
104:             *             collapsed. All the jobs being collapsed should be scheduled
105:             *             at the same pool, to maintain correct semantics.
106:             * @param name  the logical name of the jobs in the list passed to this
107:             *              function.
108:             * @param id   the id that is given to the new job.
109:             *
110:             *
111:             * @return  the <code>AggregatedJob</code> object corresponding to the aggregated
112:             *          job containing the jobs passed as List in the input,
113:             *          null if the list of jobs is empty
114:             */
115:            public AggregatedJob construct(List jobs, String name, String id) {
116:                AggregatedJob mergedJob = super .construct(jobs, name, id);
117:                //ensure that AggregatedJob is invoked via NoGridStart
118:                mergedJob.vdsNS
119:                        .construct(
120:                                VDS.GRIDSTART_KEY,
121:                                GridStartFactory.GRIDSTART_SHORT_NAMES[GridStartFactory.NO_GRIDSTART_INDEX]);
122:
123:                SubInfo firstJob = (SubInfo) jobs.get(0);
124:                StringBuffer message = new StringBuffer();
125:                message.append(" POSTScript for merged job ").append(
126:                        mergedJob.getName()).append(" ");
127:
128:                //should we tinker with the postscript for this job
129:                if (mergedJob.dagmanVariables
130:                        .containsKey(Dagman.POST_SCRIPT_KEY)) {
131:                    //no merged job has been set to have a specific post script
132:                    //no tinkering
133:                } else {
134:                    //we need to tinker
135:                    //gridstart is always populated
136:                    String gridstart = (String) firstJob.vdsNS
137:                            .get(VDS.GRIDSTART_KEY);
138:                    if (gridstart
139:                            .equalsIgnoreCase(GridStartFactory.GRIDSTART_SHORT_NAMES[GridStartFactory.KICKSTART_INDEX])) {
140:                        //ensure $PEGASUS_HOME/bin/exitpost is invoked
141:                        //as the baby jobs are being invoked by kickstart
142:                        mergedJob.dagmanVariables.construct(
143:                                Dagman.POST_SCRIPT_KEY, ExitPOST.SHORT_NAME);
144:                    }
145:                }
146:                message.append(mergedJob.dagmanVariables
147:                        .get(Dagman.POST_SCRIPT_KEY));
148:                mLogger.log(message.toString(), LogManager.DEBUG_MESSAGE_LEVEL);
149:
150:                System.out.println("Input files of merged job "
151:                        + mergedJob.getInputFiles());
152:                return mergedJob;
153:            }
154:
155:            /**
156:             * Enables the constitutent jobs that make up a aggregated job.
157:             *
158:             * @param mergedJob   the clusteredJob
159:             * @param jobs         the constitutent jobs
160:             *
161:             * @return AggregatedJob
162:             */
163:            protected AggregatedJob enable(AggregatedJob mergedJob, List jobs) {
164:                SubInfo firstJob = (SubInfo) jobs.get(0);
165:                SiteInfo site = mSiteHandle.getPoolEntry(firstJob
166:                        .getSiteHandle(), Condor.VANILLA_UNIVERSE);
167:                GridStart gridStart = mGridStartFactory.loadGridStart(firstJob,
168:                        site.getKickstartPath());
169:
170:                //explicitly set the gridstart key
171:                //so as to enable the correct generation of the postscript for
172:                //the aggregated job
173:                firstJob.vdsNS.construct(VDS.GRIDSTART_KEY, gridStart
174:                        .getVDSKeyValue());
175:
176:                return gridStart.enable(mergedJob, jobs);
177:            }
178:
179:            /**
180:             * Returns the logical name of the transformation that is used to
181:             * collapse the jobs.
182:             *
183:             * @return the the logical name of the collapser executable.
184:             * @see #COLLAPSE_LOGICAL_NAME
185:             */
186:            public String getCollapserLFN() {
187:                return COLLAPSE_LOGICAL_NAME;
188:            }
189:
190:            /**
191:             * Determines whether there is NOT an entry in the transformation catalog
192:             * for the job aggregator executable on a particular site.
193:             *
194:             * @param site       the site at which existence check is required.
195:             *
196:             * @return boolean  true if an entry does not exists, false otherwise.
197:             */
198:            public boolean entryNotInTC(String site) {
199:                return this .entryNotInTC(this .TRANSFORMATION_NAMESPACE,
200:                        COLLAPSE_LOGICAL_NAME, this .TRANSFORMATION_VERSION,
201:                        site);
202:            }
203:
204:            /**
205:             * Returns the arguments with which the <code>AggregatedJob</code>
206:             * needs to be invoked with.
207:             *
208:             * @param job  the <code>AggregatedJob</code> for which the arguments have
209:             *             to be constructed.
210:             *
211:             * @return argument string
212:             */
213:            public String aggregatedJobArguments(AggregatedJob job) {
214:                StringBuffer arguments = new StringBuffer();
215:
216:                //do we need to fail hard on first error
217:                if (this .abortOnFristJobFailure()) {
218:                    arguments.append(" -f ");
219:                }
220:
221:                //track the progress of the seqexec job
222:                arguments.append(" -R ").append(logFile(job));
223:
224:                return arguments.toString();
225:            }
226:
227:            /**
228:             * Setter method to indicate , failure on first consitutent job should
229:             * result in the abort of the whole aggregated job. Ignores any value
230:             * passed, as MPIExec does not handle it for time being.
231:             *
232:             * @param fail  indicates whether to abort or not .
233:             */
234:            public void setAbortOnFirstJobFailure(boolean fail) {
235:                mFailOnFirstError = fail;
236:            }
237:
238:            /**
239:             * Returns a boolean indicating whether to fail the aggregated job on
240:             * detecting the first failure during execution of constituent jobs.
241:             *
242:             * @return boolean indicating whether to fail or not.
243:             */
244:            public boolean abortOnFristJobFailure() {
245:                return mFailOnFirstError;
246:            }
247:
248:            /**
249:             * Returns the name of the log file to used on the remote site, for the
250:             * seqexec job. Depending upon the property settings, either assigns a
251:             * common
252:             *
253:             *
254:             * @param job the <code>AggregatedJob</code>
255:             *
256:             * @return the path to the log file.
257:             */
258:            protected String logFile(AggregatedJob job) {
259:                StringBuffer sb = new StringBuffer(32);
260:                if (mGlobalLog) {
261:                    //the basename of the log file is derived from the dag name
262:                    sb.append(this .mClusteredADag.dagInfo.getLabel());
263:                } else {
264:                    //per seqexec job name
265:                    sb.append(job.getName());
266:                }
267:                sb.append(this.SEQEXEC_PROGRESS_REPORT_SUFFIX);
268:                return sb.toString();
269:            }
270:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.