Source Code Cross Referenced for ReplicaCatalogBridge.java in  » Workflow-Engines » pegasus-2.1.0 » org » griphyn » cPlanner » engine » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Workflow Engines » pegasus 2.1.0 » org.griphyn.cPlanner.engine 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         * This file or a portion of this file is licensed under the terms of
003:         * the Globus Toolkit Public License, found in file GTPL, or at
004:         * http://www.globus.org/toolkit/download/license.html. This notice must
005:         * appear in redistributions of this file, with or without modification.
006:         *
007:         * Redistributions of this Software, with or without modification, must
008:         * reproduce the GTPL in: (1) the Software, or (2) the Documentation or
009:         * some other similar material which is provided with the Software (if
010:         * any).
011:         *
012:         * Copyright 1999-2004 University of Chicago and The University of
013:         * Southern California. All rights reserved.
014:         */
015:        package org.griphyn.cPlanner.engine;
016:
017:        import org.griphyn.cPlanner.classes.FileTransfer;
018:        import org.griphyn.cPlanner.classes.LRC;
019:        import org.griphyn.cPlanner.classes.NameValue;
020:        import org.griphyn.cPlanner.classes.SiteInfo;
021:        import org.griphyn.cPlanner.classes.Profile;
022:        import org.griphyn.cPlanner.classes.ReplicaLocation;
023:        import org.griphyn.cPlanner.classes.ReplicaStore;
024:        import org.griphyn.cPlanner.classes.ADag;
025:        import org.griphyn.cPlanner.classes.SubInfo;
026:        import org.griphyn.cPlanner.classes.PlannerOptions;
027:
028:        import org.griphyn.cPlanner.common.LogManager;
029:        import org.griphyn.cPlanner.common.PegasusProperties;
030:
031:        import org.griphyn.cPlanner.namespace.ENV;
032:
033:        import org.griphyn.common.catalog.ReplicaCatalog;
034:        import org.griphyn.common.catalog.TransformationCatalogEntry;
035:
036:        import org.griphyn.common.catalog.replica.ReplicaFactory;
037:
038:        import org.griphyn.common.catalog.transformation.TCMode;
039:
040:        import org.griphyn.common.classes.TCType;
041:
042:        import org.griphyn.common.util.Separator;
043:
044:        import java.io.File;
045:        import java.io.FileWriter;
046:
047:        import java.util.Collection;
048:        import java.util.Iterator;
049:        import java.util.List;
050:        import java.util.Map;
051:        import java.util.Properties;
052:        import java.util.Set;
053:        import java.util.StringTokenizer;
054:
055:        /**
056:         * This coordinates the look up to the Replica Location Service, to determine
057:         * the logical to physical mappings.
058:         *
059:         * @author Karan Vahi
060:         * @author Gaurang Mehta
061:         * @version $Revision: 456 $
062:         *
063:         */
064:        public class ReplicaCatalogBridge extends Engine //for the time being.
065:        {
066:
067:            /**
068:             * The transformation namespace for the regostration jobs.
069:             */
070:            public static final String RC_TRANSFORMATION_NS = "pegasus";
071:
072:            /**
073:             * The logical name of the transformation used.
074:             */
075:            public static final String RC_TRANSFORMATION_NAME = "rc-client";
076:
077:            /**
078:             * The logical name of the transformation used.
079:             */
080:            public static final String RC_TRANSFORMATION_VERSION = null;
081:
082:            /**
083:             * The derivation namespace for the transfer jobs.
084:             */
085:            public static final String RC_DERIVATION_NS = "pegasus";
086:
087:            /**
088:             * The derivation name for the transfer jobs.
089:             */
090:            public static final String RC_DERIVATION_NAME = "rc-client";
091:
092:            /**
093:             * The version number for the derivations for registration jobs.
094:             */
095:            public static final String RC_DERIVATION_VERSION = "1.0";
096:
097:            /**
098:             * The name of the Replica Catalog Implementer that serves as the source for
099:             * cache files.
100:             */
101:            public static final String CACHE_REPLICA_CATALOG_IMPLEMENTER = "SimpleFile";
102:
103:            /**
104:             * The name of the source key for Replica Catalog Implementer that serves as
105:             * cache
106:             */
107:            public static final String CACHE_REPLICA_CATALOG_KEY = "file";
108:
109:            /**
110:             * The name of the URL key for the replica catalog impelementer to be picked
111:             * up.
112:             */
113:            public static final String REPLICA_CATALOG_URL_KEY = "url";
114:
115:            /**
116:             * The handle to the main Replica Catalog.
117:             */
118:            private ReplicaCatalog mReplicaCatalog;
119:
120:            /**
121:             * Contains the various options to the Planner as passed by the user at
122:             * runtime.
123:             */
124:            private PlannerOptions mPOptions;
125:
126:            /**
127:             * The Vector of <code>String</code> objects containing the logical
128:             * filenames of the files whose locations are to be searched in the
129:             * Replica Catalog.
130:             */
131:            protected Set mSearchFiles;
132:
133:            /**
134:             * A boolean variable to desingnate whether the RLI queried was down or not.
135:             * By default it is up, unless it is set to true explicitly.
136:             */
137:            private boolean mRCDown;
138:
139:            /**
140:             * The replica store in which we store all the results that are queried from
141:             * the main replica catalog.
142:             */
143:            private ReplicaStore mReplicaStore;
144:
145:            /**
146:             * The replica store in which we store all the results that are queried from
147:             * the cache replica catalogs.
148:             */
149:            private ReplicaStore mCacheStore;
150:
151:            /**
152:             * A boolean indicating whether the cache file needs to be treated as a
153:             * replica catalog or not.
154:             */
155:            private boolean mTreatCacheAsRC;
156:
157:            /**
158:             * The namespace object holding the environment variables for local
159:             * pool.
160:             */
161:            private ENV mLocalEnv;
162:
163:            /**
164:             * The default tc entry.
165:             */
166:            private TransformationCatalogEntry mDefaultTCRCEntry;
167:
168:            /**
169:             * A boolean indicating whether the attempt to create a default tc entry
170:             * has happened or not.
171:             */
172:            private boolean mDefaultTCRCCreated;
173:
174:            /**
175:             * The overloaded constructor.
176:             *
177:             * @param dag         the workflow that is being worked on.
178:             * @param properties  the properties passed to the planner.
179:             * @param options     the options passed to the planner at runtime.
180:             *
181:             *
182:             */
183:            public ReplicaCatalogBridge(ADag dag, PegasusProperties properties,
184:                    PlannerOptions options) {
185:                super (properties);
186:                this .initialize(dag, properties, options);
187:            }
188:
189:            /**
190:             * Intialises the refiner.
191:             *
192:             * @param dag         the workflow that is being worked on.
193:             * @param properties  the properties passed to the planner.
194:             * @param options     the options passed to the planner at runtime.
195:             *
196:             */
197:            public void initialize(ADag dag, PegasusProperties properties,
198:                    PlannerOptions options) {
199:
200:                mProps = properties;
201:                mPOptions = options;
202:                mRCDown = false;
203:                mCacheStore = new ReplicaStore();
204:                mTreatCacheAsRC = mProps.treatCacheAsRC();
205:                mDefaultTCRCCreated = false;
206:
207:                //converting the Vector into vector of
208:                //strings just containing the logical
209:                //filenames
210:                mSearchFiles = dag.dagInfo.getLFNs(options.getForce());
211:
212:                //load the local environment variable
213:                //from pool config and property file
214:                mLocalEnv = loadLocalEnvVariables();
215:
216:                try {
217:
218:                    //make sure that RLS can be loaded from local environment
219:                    //Karan May 1 2007
220:                    mReplicaCatalog = null;
221:                    if (mSearchFiles != null && !mSearchFiles.isEmpty()) {
222:                        mReplicaCatalog = ReplicaFactory
223:                                .loadInstance(properties);
224:
225:                        //load all the mappings.
226:                        mReplicaStore = new ReplicaStore(mReplicaCatalog
227:                                .lookup(mSearchFiles));
228:                    }
229:
230:                } catch (Exception ex) {
231:                    String msg = "Problem while connecting with the Replica Catalog: ";
232:                    mLogger.log(msg + ex.getMessage(),
233:                            LogManager.ERROR_MESSAGE_LEVEL);
234:                    //set the flag to denote RLI is down
235:                    mRCDown = true;
236:                    mReplicaStore = new ReplicaStore();
237:
238:                    //exit if there is no cache overloading specified.
239:                    if (options.getCacheFiles().isEmpty()) {
240:                        throw new RuntimeException(msg, ex);
241:                    }
242:                }
243:
244:                mTCHandle = TCMode.loadInstance();
245:
246:                //incorporate the caching if any
247:                if (!options.getCacheFiles().isEmpty()) {
248:                    loadCacheFiles(options.getCacheFiles());
249:                }
250:            }
251:
252:            /**
253:             * To close the connection to replica services. This must be defined in the
254:             * case where one has not done a singleton implementation. In other
255:             * cases just do an empty implementation of this method.
256:             */
257:            public void closeConnection() {
258:                if (mReplicaCatalog != null) {
259:                    mReplicaCatalog.close();
260:                }
261:            }
262:
263:            /**
264:             * Closes the connection to the rli.
265:             */
266:            public void finalize() {
267:                this .closeConnection();
268:            }
269:
270:            /**
271:             * This returns the files for which mappings exist in the Replica Catalog.
272:             * This should return a subset of the files which are
273:             * specified in the mSearchFiles, while getting an instance to this.
274:             *
275:             * @return  a <code>Set</code> of logical file names as String objects, for
276:             *          which logical to physical mapping exists.
277:             *
278:             * @see #mSearchFiles
279:             */
280:            public Set getFilesInReplica() {
281:
282:                //check if any exist in the cache
283:                Set lfnsFound = mCacheStore.getLFNs(mSearchFiles);
284:                mLogger.log(lfnsFound.size()
285:                        + " entries found in cache of total "
286:                        + mSearchFiles.size(), LogManager.DEBUG_MESSAGE_LEVEL);
287:
288:                //check in the main replica catalog
289:                if (mRCDown || mReplicaCatalog == null) {
290:                    mLogger
291:                            .log(
292:                                    "Replica Catalog is either down or connection to it was never opened ",
293:                                    LogManager.WARNING_MESSAGE_LEVEL);
294:                    return lfnsFound;
295:                }
296:
297:                //look up from the the main replica catalog
298:                lfnsFound.addAll(mReplicaStore.getLFNs());
299:
300:                return lfnsFound;
301:
302:            }
303:
304:            /**
305:             * Returns all the locations as returned from the Replica Lookup Mechanism.
306:             *
307:             * @param lfn   The name of the logical file whose PFN mappings are
308:             *                      required.
309:             *
310:             * @return ReplicaLocation containing all the locations for that LFN
311:             *
312:             * @see org.griphyn.cPlanner.classes.ReplicaLocation
313:             */
314:            public ReplicaLocation getFileLocs(String lfn) {
315:
316:                ReplicaLocation rl = retrieveFromCache(lfn);
317:                //first check from cache
318:                if (rl != null && !mTreatCacheAsRC) {
319:                    mLogger.log("Location of file " + rl
320:                            + " retrieved from cache",
321:                            LogManager.DEBUG_MESSAGE_LEVEL);
322:                    return rl;
323:                }
324:
325:                ReplicaLocation rcEntry = mReplicaStore.getReplicaLocation(lfn);
326:                if (rl == null) {
327:                    rl = rcEntry;
328:                } else {
329:                    //merge with the ones found in cache
330:                    rl.merge(rcEntry);
331:                }
332:
333:                return rl;
334:            }
335:
336:            /**
337:             * Returns a boolean indicating whether all input files of the workflow
338:             * are in the collection of LFNs passed.
339:             *
340:             * @param lfns  collection of LFNs in which to search for existence.
341:             *
342:             * @return boolean.
343:             */
344:            /*
345:            public boolean allIPFilesInCollection( Collection lfns ){
346:               boolean result = true;
347:               String lfn;
348:               String type;
349:               for (Iterator it = mLFNMap.keySet().iterator(); it.hasNext(); ) {
350:                   lfn = (String) it.next();
351:                   type = (String) mLFNMap.get( lfn );
352:
353:                   //search for existence of input file in lfns
354:                   if ( type.equals("i") && !lfns.contains( lfn ) ) {
355:                       mLogger.log("Input LFN not found in collection " + lfn,
356:                                   LogManager.DEBUG_MESSAGE_LEVEL);
357:                       return false;
358:                   }
359:               }
360:               return result;
361:            }
362:             */
363:
364:            /**
365:             * It constructs the SubInfo object for the registration node, which
366:             * registers the materialized files on the output pool in the RLS.
367:             * Note that the relations corresponding to this node should already have
368:             * been added to the concerned <code>DagInfo</code> object.
369:             *
370:             * @param regJobName  The name of the job which registers the files in the
371:             *                    Replica Location Service.
372:             * @param job         The job whose output files are to be registered in
373:             *                    the Replica Location Service.
374:             *
375:             * @param files       Collection of <code>FileTransfer</code> objects
376:             *                    containing the information about source and
377:             *                    destination URLs. The destination
378:             *                    URLs would be our PFNs.
379:             *
380:             * @return SubInfo corresponding to the new registration node.
381:             */
382:            public SubInfo makeRCRegNode(String regJobName, SubInfo job,
383:                    Collection files) {
384:                //making the files string
385:
386:                SubInfo newJob = new SubInfo();
387:
388:                newJob.setName(regJobName);
389:                newJob.setTransformation(this .RC_TRANSFORMATION_NS,
390:                        this .RC_TRANSFORMATION_NAME,
391:                        this .RC_TRANSFORMATION_VERSION);
392:                newJob.setDerivation(this .RC_DERIVATION_NS,
393:                        this .RC_DERIVATION_NAME, this .RC_DERIVATION_VERSION);
394:
395:                SiteInfo site = mPoolHandle
396:                        .getPoolEntry(mOutputPool, "vanilla");
397:
398:                //change this function
399:                List tcentries = null;
400:                try {
401:                    tcentries = mTCHandle.getTCEntries(newJob.getTXNamespace(),
402:                            newJob.getTXName(), newJob.getTXVersion(), "local",
403:                            TCType.INSTALLED);
404:
405:                } catch (Exception e) {
406:                    mLogger.log("While retrieving entries from TC "
407:                            + e.getMessage(), LogManager.ERROR_MESSAGE_LEVEL);
408:                }
409:
410:                TransformationCatalogEntry tc;
411:
412:                if (tcentries == null || tcentries.isEmpty()) {
413:
414:                    mLogger.log("Unable to find in entry for "
415:                            + newJob.getCompleteTCName()
416:                            + " in transformation catalog on site local",
417:                            LogManager.DEBUG_MESSAGE_LEVEL);
418:                    mLogger.log("Constructing a default entry for it ",
419:                            LogManager.DEBUG_MESSAGE_LEVEL);
420:                    tc = defaultTCRCEntry();
421:
422:                    if (tc == null) {
423:                        throw new RuntimeException(
424:                                "Unable to create an entry for "
425:                                        + newJob.getCompleteTCName()
426:                                        + " on site local");
427:                    }
428:                } else {
429:                    tc = (TransformationCatalogEntry) tcentries.get(0);
430:                }
431:                newJob.setRemoteExecutable(tc.getPhysicalTransformation());
432:                newJob.setArguments(this .generateRepJobArgumentString(site,
433:                        regJobName, files));
434:                newJob.setUniverse(Engine.REGISTRATION_UNIVERSE);
435:                newJob.setSiteHandle(tc.getResourceId());
436:                newJob.setJobType(SubInfo.REPLICA_REG_JOB);
437:                newJob.setVDSSuperNode(job.getName());
438:
439:                //the profile information from the pool catalog needs to be
440:                //assimilated into the job.
441:                newJob.updateProfiles(mPoolHandle.getPoolProfile(newJob
442:                        .getSiteHandle()));
443:
444:                //the profile information from the transformation
445:                //catalog needs to be assimilated into the job
446:                //overriding the one from pool catalog.
447:                newJob.updateProfiles(tc);
448:
449:                //the profile information from the properties file
450:                //is assimilated overidding the one from transformation
451:                //catalog.
452:                newJob.updateProfiles(mProps);
453:
454:                //in order to make sure that COG picks the default proxy
455:                //correctly through condor
456:                newJob.condorVariables.construct("getenv", "true");
457:
458:                return newJob;
459:            }
460:
461:            /**
462:             * Returns a default TC entry to be used in case entry is not found in the
463:             * transformation catalog.
464:             *
465:             *
466:             *
467:             * @return  the default entry.
468:             */
469:            private TransformationCatalogEntry defaultTCRCEntry() {
470:                String site = "local";
471:                //generate only once.
472:                if (!mDefaultTCRCCreated) {
473:                    //check if PEGASUS_HOME is set
474:                    String home = mProps.getPegasusHome();
475:                    //if PEGASUS_HOME is not set, use VDS_HOME
476:                    //home = ( home == null )? mPoolHandle.getVDS_HOME( site ): home;
477:
478:                    //if home is still null
479:                    if (home == null) {
480:                        //cannot create default TC
481:                        mLogger.log("Unable to create a default entry for "
482:                                + Separator.combine(this .RC_TRANSFORMATION_NS,
483:                                        this .RC_TRANSFORMATION_NAME,
484:                                        this .RC_TRANSFORMATION_VERSION),
485:                                LogManager.DEBUG_MESSAGE_LEVEL);
486:                        //set the flag back to true
487:                        mDefaultTCRCCreated = true;
488:                        return mDefaultTCRCEntry;
489:                    }
490:                    //remove trailing / if specified
491:                    home = (home.charAt(home.length() - 1) == File.separatorChar) ? home
492:                            .substring(0, home.length() - 1)
493:                            : home;
494:
495:                    //construct the path to it
496:                    StringBuffer path = new StringBuffer();
497:                    path.append(home).append(File.separator).append("bin")
498:                            .append(File.separator).append("rc-client");
499:
500:                    //create Profiles for JAVA_HOME and CLASSPATH
501:                    String jh = mProps.getProperty("java.home");
502:                    mLogger.log("JAVA_HOME set to " + jh,
503:                            LogManager.DEBUG_MESSAGE_LEVEL);
504:                    Profile javaHome = new Profile(Profile.ENV, "JAVA_HOME", jh);
505:
506:                    Profile classpath = this .getClassPath(home);
507:                    if (classpath == null) {
508:                        return mDefaultTCRCEntry;
509:                    }
510:
511:                    mDefaultTCRCEntry = new TransformationCatalogEntry(
512:                            this .RC_TRANSFORMATION_NS,
513:                            this .RC_TRANSFORMATION_NAME,
514:                            this .RC_TRANSFORMATION_VERSION);
515:
516:                    mDefaultTCRCEntry
517:                            .setPhysicalTransformation(path.toString());
518:                    mDefaultTCRCEntry.setResourceId(site);
519:                    mDefaultTCRCEntry.setProfile(classpath);
520:                    mDefaultTCRCEntry.setProfile(javaHome);
521:                    mDefaultTCRCEntry.setProfile(new Profile(Profile.ENV,
522:                            "PEGASUS_HOME", mProps.getPegasusHome()));
523:                    //set the flag back to true
524:                    mDefaultTCRCCreated = true;
525:                }
526:                return mDefaultTCRCEntry;
527:            }
528:
529:            /**
530:             * Returns the classpath for the default rc-client entry.
531:             *
532:             * @param home   the home directory where we need to check for lib directory.
533:             *
534:             * @return the classpath in an environment profile.
535:             */
536:            private Profile getClassPath(String home) {
537:                Profile result = null;
538:
539:                //create the CLASSPATH from home
540:                String classpath = mProps.getProperty("java.class.path");
541:                if (classpath == null || classpath.trim().length() == 0) {
542:                    return result;
543:                }
544:
545:                mLogger.log("JAVA CLASSPATH SET IS " + classpath,
546:                        LogManager.DEBUG_MESSAGE_LEVEL);
547:
548:                StringBuffer cp = new StringBuffer();
549:                String prefix = home + File.separator + "lib";
550:                for (StringTokenizer st = new StringTokenizer(classpath, ":"); st
551:                        .hasMoreTokens();) {
552:                    String token = st.nextToken();
553:                    if (token.startsWith(prefix)) {
554:                        //this is a valid lib jar to put in
555:                        cp.append(token).append(":");
556:                    }
557:                }
558:
559:                if (cp.length() == 0) {
560:                    //unable to create a valid classpath
561:                    mLogger.log("Unable to create a sensible classpath from "
562:                            + home, LogManager.DEBUG_MESSAGE_LEVEL);
563:                    return result;
564:                }
565:
566:                //we have everything now
567:                result = new Profile(Profile.ENV, "CLASSPATH", cp.toString());
568:
569:                return result;
570:            }
571:
572:            /**
573:             * Generates the argument string to be given to the replica registration job.
574:             * At present by default it would be picking up the file containing the
575:             * mappings.
576:             *
577:             * @param site     the <code>SiteInfo</code> object/
578:             * @param regJob   The name of the registration job.
579:             *
580:             * @param files Collection of <code>FileTransfer</code> objects containing the
581:             *                 information about source and destURLs. The destination
582:             *                 URLs would be our PFNs.
583:             *
584:             * @return the argument string.
585:             */
586:            private String generateRepJobArgumentString(SiteInfo site,
587:                    String regJob, Collection files) {
588:                StringBuffer arguments = new StringBuffer();
589:
590:                //select a LRC. disconnect here. It should be select a RC.
591:
592:                LRC lrc = (site == null) ? null : site.selectLRC(true);
593:                if (lrc == null || lrc.getURL() == null
594:                        || lrc.getURL().length() == 0) {
595:                    throw new RuntimeException(
596:                            "The LRC URL is not specified in site catalog for site "
597:                                    + mOutputPool);
598:                }
599:
600:                //get any command line properties that may need specifying
601:                arguments.append(this .getCommandLineProperties(mProps));
602:
603:                //we have a lrc selected . construct vds.rc.url property
604:                arguments.append("-D").append(ReplicaCatalog.c_prefix).append(
605:                        ".").append(this .REPLICA_CATALOG_URL_KEY).append("=")
606:                        .append(lrc.getURL()).append(" ");
607:
608:                //append the insert option
609:                arguments.append("--insert").append(" ").append(
610:                        this .generateMappingsFile(regJob, files));
611:
612:                return arguments.toString();
613:
614:            }
615:
616:            /**
617:             * Returns the properties that need to be passed to the the rc-client
618:             * invocation on the command line . It is of the form
619:             * "-Dprop1=value1 -Dprop2=value2 .."
620:             *
621:             * @param properties   the properties object
622:             *
623:             * @return the properties list, else empty string.
624:             */
625:            protected String getCommandLineProperties(
626:                    PegasusProperties properties) {
627:                StringBuffer sb = new StringBuffer();
628:                appendProperty(sb, "pegasus.user.properties", properties
629:                        .getPropertiesInSubmitDirectory());
630:                return sb.toString();
631:            }
632:
633:            /**
634:             * Appends a property to the StringBuffer, in the java command line format.
635:             *
636:             * @param sb    the StringBuffer to append the property to.
637:             * @param key   the property.
638:             * @param value the property value.
639:             */
640:            protected void appendProperty(StringBuffer sb, String key,
641:                    String value) {
642:                sb.append("-D").append(key).append("=").append(value).append(
643:                        " ");
644:            }
645:
646:            /**
647:             * Generates the registration mappings in a text file that is generated in the
648:             * dax directory (the directory where all the condor submit files are
649:             * generated). The pool value for the mapping is the output pool specified
650:             * by the user when running Pegasus. The name of the file is regJob+.in
651:             *
652:             * @param regJob   The name of the registration job.
653:             * @param files    Collection of <code>FileTransfer</code>objects containing the
654:             *                 information about source and destURLs. The destination
655:             *                 URLs would be our PFNs.
656:             *
657:             * @return String corresponding to the path of the the file containig the
658:             *                mappings in the appropriate format.
659:             */
660:            private String generateMappingsFile(String regJob, Collection files) {
661:                String fileName = regJob + ".in";
662:                File f = null;
663:                String submitFileDir = mPOptions.getSubmitDirectory();
664:
665:                //writing the stdin file
666:                try {
667:                    f = new File(submitFileDir, fileName);
668:                    FileWriter stdIn = new FileWriter(f);
669:
670:                    for (Iterator it = files.iterator(); it.hasNext();) {
671:                        FileTransfer ft = (FileTransfer) it.next();
672:                        //checking for transient flag
673:                        if (!ft.getTransientRegFlag()) {
674:                            stdIn.write(ftToRC(ft));
675:                            stdIn.flush();
676:                        }
677:                    }
678:
679:                    stdIn.close();
680:
681:                } catch (Exception e) {
682:                    throw new RuntimeException(
683:                            "While writing out the registration file for job "
684:                                    + regJob, e);
685:                }
686:
687:                return fileName;
688:            }
689:
690:            /**
691:             * Converts a <code>FileTransfer</code> to a RC compatible string representation.
692:             *
693:             * @param ft  the <code>FileTransfer</code> object
694:             *
695:             * @return the RC version.
696:             */
697:            private String ftToRC(FileTransfer ft) {
698:                StringBuffer sb = new StringBuffer();
699:                NameValue destURL = ft.getDestURL();
700:                sb.append(ft.getLFN()).append(" ");
701:                sb.append(destURL.getValue()).append(" ");
702:                sb.append("pool=\"").append(destURL.getKey()).append("\"");
703:                sb.append("\n");
704:                return sb.toString();
705:            }
706:
707:            /**
708:             * Retrieves a location from the cache table, that contains the contents
709:             * of the cache files specified at runtime.
710:             *
711:             * @param lfn  the logical name of the file.
712:             *
713:             * @return <code>ReplicaLocation</code> object corresponding to the entry
714:             *         if found, else null.
715:             */
716:            private ReplicaLocation retrieveFromCache(String lfn) {
717:                return mCacheStore.getReplicaLocation(lfn);
718:            }
719:
720:            /**
721:             * Ends up loading the cache files so as to enable the lookup for the transient
722:             * files created by the parent jobs.
723:             *
724:             * @param cacheFiles  set of paths to the cache files.
725:             */
726:            private void loadCacheFiles(Set cacheFiles) {
727:                Properties cacheProps = mProps.getVDSProperties()
728:                        .matchingSubset(ReplicaCatalog.c_prefix, false);
729:
730:                mLogger.log("Loading transient cache files",
731:                        LogManager.INFO_MESSAGE_LEVEL);
732:
733:                ReplicaCatalog simpleFile;
734:                Map wildcardConstraint = null;
735:
736:                for (Iterator it = cacheFiles.iterator(); it.hasNext();) {
737:                    //read each of the cache file and load in memory
738:                    String file = (String) it.next();
739:                    //set the appropriate property to designate path to file
740:                    cacheProps
741:                            .setProperty(this .CACHE_REPLICA_CATALOG_KEY, file);
742:
743:                    mLogger.log("Loading cache file: " + file,
744:                            LogManager.DEBUG_MESSAGE_LEVEL);
745:                    try {
746:                        simpleFile = ReplicaFactory.loadInstance(
747:                                CACHE_REPLICA_CATALOG_IMPLEMENTER, cacheProps);
748:                    } catch (Exception e) {
749:                        mLogger.log("Unable to load cache file " + file, e,
750:                                LogManager.ERROR_MESSAGE_LEVEL);
751:                        continue;
752:                    }
753:                    //suck in all the entries into the cache replica store.
754:                    //returns an unmodifiable collection. so merging an issue..
755:                    mCacheStore.add(simpleFile.lookup(mSearchFiles));
756:
757:                    //no wildcards as we only want to load mappings for files that
758:                    //we require
759:                    //mCacheStore.add( simpleFile.lookup( wildcardConstraint ) );
760:
761:                    //close connection
762:                    simpleFile.close();
763:                }
764:
765:                mLogger.logCompletion("Loading transient cache files",
766:                        LogManager.INFO_MESSAGE_LEVEL);
767:            }
768:
769:            /**
770:             * Reads in the environment variables into memory from the properties file
771:             * and the pool catalog.
772:             *
773:             * @return  the <code>ENV</code> namespace object holding the environment
774:             *          variables.
775:             */
776:            private ENV loadLocalEnvVariables() {
777:                //assumes that pool handle, and property handle are initialized.
778:                ENV env = new ENV();
779:
780:                //load from the pool.config
781:                env.checkKeyInNS(mPoolHandle.getPoolProfile("local",
782:                        Profile.ENV));
783:                //load from property file
784:                env.checkKeyInNS(mProps.getLocalPoolEnvVar());
785:
786:                // the new RC API has a different key. if that is specified use that.
787:                //mProps.getProperty( ReplicaCatalog.c_prefix )
788:
789:                return env;
790:            }
791:
792:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.