Source Code Cross Referenced for Condor.java in  » Workflow-Engines » pegasus-2.1.0 » org » griphyn » cPlanner » transfer » refiner » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Workflow Engines » pegasus 2.1.0 » org.griphyn.cPlanner.transfer.refiner 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         * This file or a portion of this file is licensed under the terms of
003:         * the Globus Toolkit Public License, found in file GTPL, or at
004:         * http://www.globus.org/toolkit/download/license.html. This notice must
005:         * appear in redistributions of this file, with or without modification.
006:         *
007:         * Redistributions of this Software, with or without modification, must
008:         * reproduce the GTPL in: (1) the Software, or (2) the Documentation or
009:         * some other similar material which is provided with the Software (if
010:         * any).
011:         *
012:         * Copyright 1999-2004 University of Chicago and The University of
013:         * Southern California. All rights reserved.
014:         */
015:
016:        package org.griphyn.cPlanner.transfer.refiner;
017:
018:        import org.griphyn.cPlanner.classes.ADag;
019:        import org.griphyn.cPlanner.classes.FileTransfer;
020:        import org.griphyn.cPlanner.classes.NameValue;
021:        import org.griphyn.cPlanner.classes.PlannerOptions;
022:        import org.griphyn.cPlanner.classes.SubInfo;
023:        import org.griphyn.cPlanner.classes.TransferJob;
024:
025:        import org.griphyn.cPlanner.common.LogManager;
026:        import org.griphyn.cPlanner.common.PegasusProperties;
027:
028:        import org.griphyn.cPlanner.transfer.MultipleFTPerXFERJobRefiner;
029:
030:        import org.griphyn.cPlanner.namespace.VDS;
031:
032:        import org.griphyn.cPlanner.engine.ReplicaCatalogBridge;
033:
034:        import java.io.File;
035:
036:        import java.util.Collection;
037:        import java.util.Iterator;
038:        import java.util.Set;
039:        import java.util.HashSet;
040:
041:        import java.net.URL;
042:        import java.net.MalformedURLException;
043:        import org.griphyn.cPlanner.poolinfo.PoolInfoProvider;
044:        import org.griphyn.cPlanner.poolinfo.SiteFactory;
045:        import org.griphyn.cPlanner.poolinfo.SiteFactoryException;
046:        import org.griphyn.cPlanner.code.gridstart.GridStartFactory;
047:
048:        /**
049:         * A refiner that relies on the Condor file transfer mechanism to get the
050:         * raw input data to the remote working directory. It is to be used for doing
051:         * the file transfers in a condor pool, while trying to run on the local
052:         * filesystem of the worker nodes.
053:         *
054:         * <p>
055:         * Additionally, this will only work with local replica selector that prefers
056:         * file urls from the submit host for staging.
057:         *
058:         * <p>
059:         * In order to use the transfer implementation implemented by this class,
060:         * <pre>
061:         *     - property <code>pegasus.transfer.refiner</code> must be set to
062:         *       value <code>Condor</code>.
063:         *     - property <code>pegasus.selector.replica</code> must be set to value
064:         *       <code>Local</code>
065:         *     - property <code>pegasus.execute.*.filesystem.local</code> must be set to value
066:         *       <code>true</code>
067:         * </pre>
068:         *
069:         *
070:         * @author Karan Vahi
071:         * @version $Revision: 455 $
072:         */
073:        public class Condor extends MultipleFTPerXFERJobRefiner {
074:
075:            /**
076:             * A short description of the transfer refinement.
077:             */
078:            public static final String DESCRIPTION = "Condor Transfer Refiner";
079:
080:            /**
081:             * The string holding  the logging messages
082:             */
083:            protected String mLogMsg;
084:
085:            /**
086:             * The handle to the Site Catalog. It is instantiated in this class.
087:             */
088:            protected PoolInfoProvider mSCHandle;
089:
090:            /**
091:             * The overloaded constructor.
092:             *
093:             * @param dag        the workflow to which transfer nodes need to be added.
094:             * @param properties the <code>PegasusProperties</code> object containing all
095:             *                   the properties required by Pegasus.
096:             * @param options    the options passed to the planner.
097:             *
098:             */
099:            public Condor(ADag dag, PegasusProperties properties,
100:                    PlannerOptions options) {
101:                super (dag, properties, options);
102:
103:                /* load the catalog using the factory */
104:                mSCHandle = SiteFactory.loadInstance(properties, false);
105:
106:            }
107:
108:            /**
109:             * Adds the stage in transfer nodes which transfer the input files for a job,
110:             * from the location returned from the replica catalog to the job's execution
111:             * pool.
112:             *
113:             * @param job   <code>SubInfo</code> object corresponding to the node to
114:             *              which the files are to be transferred to.
115:             *
116:             * @param files Collection of <code>FileTransfer</code> objects containing the
117:             *              information about source and destURL's.
118:             */
119:            public void addStageInXFERNodes(SubInfo job, Collection files) {
120:
121:                Set inputFiles = job.getInputFiles();
122:                for (Iterator it = files.iterator(); it.hasNext();) {
123:                    FileTransfer ft = (FileTransfer) it.next();
124:
125:                    String url = ((NameValue) ft.getSourceURL()).getValue();
126:
127:                    //remove from input files the PegasusFile object
128:                    //corresponding to this File Transfer and the
129:                    //FileTransfer object instead
130:                    boolean removed = inputFiles.remove(ft);
131:                    //System.out.println( "Removed " + ft.getLFN() + " " + removed );
132:                    inputFiles.add(ft);
133:
134:                    //put the url in only if it is a file url
135:                    if (url.startsWith("file:/")) {
136:                        try {
137:                            job.condorVariables.addIPFileForTransfer(new URL(
138:                                    url).getPath());
139:                        } catch (Exception e) {
140:                            throw new RuntimeException("Malformed source URL "
141:                                    + url);
142:                        }
143:                    } else {
144:                        throw new RuntimeException(
145:                                "Malformed source URL. Input URL should be a file url "
146:                                        + url);
147:                    }
148:                }
149:
150:            }
151:
152:            /**
153:             * Adds the inter pool transfer nodes that are required for  transferring
154:             * the output files of the parents to the jobs execution site. They are not
155:             * supported in this case.
156:             *
157:             * @param job   <code>SubInfo</code> object corresponding to the node to
158:             *              which the files are to be transferred to.
159:             * @param files Collection of <code>FileTransfer</code> objects containing the
160:             *              information about source and destURL's.
161:             */
162:            public void addInterSiteTXNodes(SubInfo job, Collection files) {
163:
164:                throw new java.lang.UnsupportedOperationException(
165:                        "Interpool operation is not supported");
166:
167:            }
168:
169:            /**
170:             * Adds the stageout transfer nodes, that stage data to an output site
171:             * specified by the user.
172:             *
173:             * @param job   <code>SubInfo</code> object corresponding to the node to
174:             *              which the files are to be transferred to.
175:             * @param files Collection of <code>FileTransfer</code> objects containing the
176:             *              information about source and destURL's.
177:             * @param rcb   bridge to the Replica Catalog. Used for creating registration
178:             *              nodes in the workflow.
179:             *
180:             */
181:            public void addStageOutXFERNodes(SubInfo job, Collection files,
182:                    ReplicaCatalogBridge rcb) {
183:
184:                Set outputFiles = job.getOutputFiles();
185:                String destinationDirectory = null;
186:                for (Iterator it = files.iterator(); it.hasNext();) {
187:                    FileTransfer ft = (FileTransfer) it.next();
188:
189:                    String url = ((NameValue) ft.getDestURL()).getValue();
190:
191:                    //put the url in only if it is a file url
192:                    if (url.startsWith("file:/")) {
193:
194:                        try {
195:                            destinationDirectory = new File(new URL(url)
196:                                    .getPath()).getParent();
197:                        } catch (MalformedURLException ex) {
198:                            throw new RuntimeException("Malformed URL", ex);
199:                        }
200:
201:                        //strong disconnect here, as assuming worker node execution
202:                        //and having the SLS to the submit directory
203:                        //String pfn = "file://" + mPOptions.getSubmitDirectory() + File.separator + ft.getLFN();
204:                        //ft.removeSourceURL();
205:                        //ft.addSource( "local", pfn );
206:
207:                    } else {
208:                        throw new RuntimeException(
209:                                "Malformed destination URL. Output URL should be a file url "
210:                                        + url);
211:                    }
212:                }
213:
214:                if (!files.isEmpty()) {
215:                    String txName = this .STAGE_OUT_PREFIX + job.getName()
216:                            + "_0";
217:                    SubInfo txJob = this .createStageOutTransferJob(job, files,
218:                            destinationDirectory, txName);
219:
220:                    this .mDAG.add(txJob);
221:                    this .addRelation(job.getName(), txName);
222:                }
223:
224:            }
225:
226:            /**
227:             * Constructs a  condor file transfer job that handles multiple transfers.
228:             * The job itself is a /bin/true job that does the stageout using the
229:             * transfer_input_files feature.
230:             *
231:             * @param job         the SubInfo object for the job, in relation to which
232:             *                    the transfer node is being added. Either the transfer
233:             *                    node can be transferring this jobs input files to
234:             *                    the execution pool, or transferring this job's output
235:             *                    files to the output pool.
236:             * @param files       collection of <code>FileTransfer</code> objects
237:             *                    representing the data files and staged executables to be
238:             *                    transferred.
239:             * @param directory   the directory where the transfer job needs to be executed
240:             * @param txJobName   the name of transfer node.
241:             *
242:             * @return  the created TransferJob.
243:             */
244:            private TransferJob createStageOutTransferJob(SubInfo job,
245:                    Collection files, String directory, String txJobName) {
246:
247:                TransferJob txJob = new TransferJob();
248:
249:                //want to run in the local pool in universe vanilla
250:                txJob.setSiteHandle("local");
251:                txJob.condorVariables.construct("universe", "vanilla");
252:
253:                //the non third party site for the transfer job is
254:                //always the job execution site for which the transfer
255:                //job is being created.
256:                txJob.setNonThirdPartySite(job.getSiteHandle());
257:
258:                txJob.setName(txJobName);
259:
260:                txJob.setTransformation("pegasus", "true", null);
261:
262:                txJob.setDerivation("pegasus", "true", null);
263:
264:                txJob.setRemoteExecutable("/bin/true");
265:
266:                //we dont want the job to be launced via grid start
267:                txJob.vdsNS
268:                        .construct(
269:                                VDS.GRIDSTART_KEY,
270:                                GridStartFactory.GRIDSTART_SHORT_NAMES[GridStartFactory.NO_GRIDSTART_INDEX]);
271:
272:                //add input files for transfer since we are only doing for
273:                //creating stagein jobs
274:                for (Iterator it = files.iterator(); it.hasNext();) {
275:                    FileTransfer ft = (FileTransfer) it.next();
276:                    NameValue nv = ft.getSourceURL();
277:
278:                    //put the url in only if it is a file url
279:                    String url = nv.getValue();
280:                    if (url.startsWith("file:/")) {
281:                        try {
282:                            txJob.condorVariables.addIPFileForTransfer(new URL(
283:                                    url).getPath());
284:
285:                            //add the basename of the file fot t_o_f as mei suggests
286:                            txJob.condorVariables.addOPFileForTransfer(ft
287:                                    .getLFN());
288:                        } catch (Exception e) {
289:                            throw new RuntimeException("Malformed source URL "
290:                                    + url);
291:                        }
292:                    }
293:
294:                }
295:
296:                //the intial directory is set to the directory where we need the output
297:                txJob.condorVariables.construct("initialdir", directory);
298:
299:                txJob.setJobType(SubInfo.STAGE_OUT_JOB);
300:                txJob.setVDSSuperNode(job.jobName);
301:
302:                txJob.stdErr = "";
303:                txJob.stdOut = "";
304:
305:                //the i/p and o/p files remain empty
306:                //as we doing just copying urls
307:                txJob.inputFiles = new HashSet();
308:
309:                //to get the file stat information we need to put
310:                //the files as output files of the transfer job
311:                txJob.outputFiles = new HashSet(files);
312:
313:                //the profile information from the pool catalog needs to be
314:                //assimilated into the job.
315:                txJob.updateProfiles(mSCHandle.getPoolProfile(txJob
316:                        .getSiteHandle()));
317:
318:                //the profile information from the properties file
319:                //is assimilated overidding the one from transformation
320:                //catalog.
321:                txJob.updateProfiles(mProps);
322:
323:                return txJob;
324:            }
325:
326:            /**
327:             *
328:             *
329:             *
330:             * @param job   <code>SubInfo</code> object corresponding to the node to
331:             *              which the files are to be transferred to.
332:             * @param files Collection of <code>FileTransfer</code> objects containing the
333:             *              information about source and destURL's.
334:             * @param rcb   bridge to the Replica Catalog. Used for creating registration
335:             *              nodes in the workflow.
336:             * @param deletedLeaf to specify whether the node is being added for
337:             *                      a deleted node by the reduction engine or not.
338:             *                      default: false
339:             */
340:            public void addStageOutXFERNodes(SubInfo job, Collection files,
341:                    ReplicaCatalogBridge rcb, boolean deletedLeaf) {
342:
343:                throw new java.lang.UnsupportedOperationException(
344:                        "Stageout operation is not supported for "
345:                                + this .getDescription());
346:
347:            }
348:
349:            /**
350:             * Signals that the traversal of the workflow is done. This would allow
351:             * the transfer mechanisms to clean up any state that they might be keeping
352:             * that needs to be explicitly freed.
353:             */
354:            public void done() {
355:
356:            }
357:
358:            /**
359:             * Add a new job to the workflow being refined.
360:             *
361:             * @param job  the job to be added.
362:             */
363:            public void addJob(SubInfo job) {
364:                mDAG.add(job);
365:            }
366:
367:            /**
368:             * Adds a new relation to the workflow being refiner.
369:             *
370:             * @param parent    the jobname of the parent node of the edge.
371:             * @param child     the jobname of the child node of the edge.
372:             */
373:            public void addRelation(String parent, String child) {
374:                mLogger.log("Adding relation " + parent + " -> " + child,
375:                        LogManager.DEBUG_MESSAGE_LEVEL);
376:                mDAG.addNewRelation(parent, child);
377:
378:            }
379:
380:            /**
381:             * Adds a new relation to the workflow. In the case when the parent is a
382:             * transfer job that is added, the parentNew should be set only the first
383:             * time a relation is added. For subsequent compute jobs that maybe
384:             * dependant on this, it needs to be set to false.
385:             *
386:             * @param parent    the jobname of the parent node of the edge.
387:             * @param child     the jobname of the child node of the edge.
388:             * @param site      the execution pool where the transfer node is to be run.
389:             * @param parentNew the parent node being added, is the new transfer job
390:             *                  and is being called for the first time.
391:             */
392:            public void addRelation(String parent, String child, String site,
393:                    boolean parentNew) {
394:                mLogger.log("Adding relation " + parent + " -> " + child,
395:                        LogManager.DEBUG_MESSAGE_LEVEL);
396:                mDAG.addNewRelation(parent, child);
397:
398:            }
399:
400:            /**
401:             * Returns a textual description of the transfer mode.
402:             *
403:             * @return a short textual description
404:             */
405:            public String getDescription() {
406:                return this.DESCRIPTION;
407:            }
408:
409:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.