001: /**
002: * This file or a portion of this file is licensed under the terms of
003: * the Globus Toolkit Public License, found at $PEGASUS_HOME/GTPL or
004: * http://www.globus.org/toolkit/download/license.html.
005: * This notice must appear in redistributions of this file
006: * with or without modification.
007: *
008: * Redistributions of this Software, with or without modification, must reproduce
009: * the GTPL in:
010: * (1) the Software, or
011: * (2) the Documentation or
012: * some other similar material which is provided with the Software (if any).
013: *
014: * Copyright 1999-2004
015: * University of Chicago and The University of Southern California.
016: * All rights reserved.
017: */package org.griphyn.cPlanner.engine;
018:
019: import org.griphyn.cPlanner.classes.ADag;
020: import org.griphyn.cPlanner.classes.FileTransfer;
021: import org.griphyn.cPlanner.classes.PegasusFile;
022: import org.griphyn.cPlanner.classes.JobManager;
023: import org.griphyn.cPlanner.classes.PlannerOptions;
024: import org.griphyn.cPlanner.classes.SiteInfo;
025: import org.griphyn.cPlanner.classes.SubInfo;
026: import org.griphyn.cPlanner.classes.PegasusBag;
027:
028: import org.griphyn.cPlanner.common.LogManager;
029: import org.griphyn.cPlanner.common.PegasusProperties;
030:
031: import org.griphyn.cPlanner.selector.SiteSelector;
032:
033: import org.griphyn.cPlanner.selector.site.SiteSelectorFactory;
034:
035: import org.griphyn.cPlanner.selector.TransformationSelector;
036:
037: import org.griphyn.cPlanner.namespace.Hints;
038:
039: import org.griphyn.cPlanner.provenance.pasoa.XMLProducer;
040: import org.griphyn.cPlanner.provenance.pasoa.producer.XMLProducerFactory;
041:
042: import org.griphyn.cPlanner.provenance.pasoa.PPS;
043: import org.griphyn.cPlanner.provenance.pasoa.pps.PPSFactory;
044:
045: import org.griphyn.common.catalog.TransformationCatalogEntry;
046:
047: import org.griphyn.common.catalog.transformation.TCMode;
048:
049: import org.griphyn.common.classes.TCType;
050:
051: import org.griphyn.common.catalog.transformation.Mapper;
052:
053: import org.griphyn.common.util.Separator;
054:
055: import org.griphyn.cPlanner.transfer.SLS;
056:
057: import org.griphyn.cPlanner.transfer.sls.SLSFactory;
058: import org.griphyn.cPlanner.transfer.sls.SLSFactoryException;
059:
060: import java.io.File;
061:
062: import java.util.Iterator;
063: import java.util.List;
064: import java.util.ArrayList;
065: import java.util.Set;
066: import java.util.Vector;
067:
068: /**
069: * This engine calls out to the Site Selector selected by the user and maps the
070: * jobs in the workflow to the execution pools.
071: *
072: * @author Karan Vahi
073: * @author Gaurang Mehta
074: * @version $Revision: 423 $
075: *
076: */
077: public class InterPoolEngine extends Engine implements Refiner {
078:
079: /**
080: * ADag object corresponding to the Dag whose jobs we want to schedule.
081: *
082: */
083: private ADag mDag;
084:
085: /**
086: * Set of the execution pools which the user has specified.
087: */
088: private Set mExecPools;
089:
090: /**
091: * Handle to the site selector.
092: */
093: private SiteSelector mSiteSelector;
094:
095: /**
096: * The handle to the transformation selector, that ends up selecting
097: * what transformations to pick up.
098: */
099: private TransformationSelector mTXSelector;
100:
101: /**
102: * The bag of objects that is required for the Site Selector initialization.
103: */
104: private PegasusBag mBag;
105:
106: /**
107: * The handle to the transformation catalog mapper object that caches the
108: * queries to the transformation catalog, and indexes them according to
109: * lfn's. There is no purge policy in the TC Mapper, so per se it is not a
110: * classic cache.
111: */
112: private Mapper mTCMapper;
113:
114: /**
115: * The XML Producer object that records the actions.
116: */
117: private XMLProducer mXMLStore;
118:
119: /**
120: * The handle to the SLS implementor
121: */
122: private SLS mSLS;
123:
124: /**
125: * A boolean indicating whether to have worker node execution or not.
126: */
127: private boolean mWorkerNodeExecution;
128:
129: /**
130: * Default constructor.
131: *
132: *
133: * @param props the properties to be used.
134: */
135: public InterPoolEngine(PegasusProperties props) {
136: super (props);
137: mDag = new ADag();
138: mExecPools = new java.util.HashSet();
139:
140: mTCHandle = TCMode.loadInstance();
141: //initialize the transformation mapper
142: mTCMapper = Mapper.loadTCMapper(mProps.getTCMapperMode());
143:
144: //intialize the bag of objects and load the site selector
145: mBag = new PegasusBag();
146: mBag.add(PegasusBag.PEGASUS_PROPERTIES, props);
147: mBag.add(PegasusBag.TRANSFORMATION_CATALOG, mTCHandle);
148: mBag.add(PegasusBag.TRANSFORMATION_MAPPER, mTCMapper);
149: mBag.add(PegasusBag.PEGASUS_LOGMANAGER, mLogger);
150:
151: mTXSelector = null;
152: mXMLStore = XMLProducerFactory.loadXMLProducer(props);
153:
154: mWorkerNodeExecution = props.executeOnWorkerNode();
155: if (mWorkerNodeExecution) {
156: //load SLS
157: mSLS = SLSFactory.loadInstance(mBag);
158: }
159:
160: }
161:
162: /**
163: * Overloaded constructor.
164: *
165: * @param aDag the <code>ADag</code> object corresponding to the Dag
166: * for which we want to determine on which pools to run
167: * the nodes of the Dag.
168: * @param props the properties to be used.
169: * @param options The options specified by the user to run the planner.
170: *
171: */
172: public InterPoolEngine(ADag aDag, PegasusProperties props,
173: PlannerOptions options) {
174: super (props);
175: mDag = aDag;
176: mPOptions = options;
177: mExecPools = (Set) options.getExecutionSites();
178: mTCHandle = TCMode.loadInstance();
179: //initialize the transformation mapper
180: mTCMapper = Mapper.loadTCMapper(mProps.getTCMapperMode());
181:
182: //intialize the bag of objects and load the site selector
183: mBag = new PegasusBag();
184: mBag.add(PegasusBag.PEGASUS_PROPERTIES, props);
185: mBag.add(PegasusBag.PLANNER_OPTIONS, options);
186: mBag.add(PegasusBag.TRANSFORMATION_CATALOG, mTCHandle);
187: mBag.add(PegasusBag.TRANSFORMATION_MAPPER, mTCMapper);
188: mBag.add(PegasusBag.PEGASUS_LOGMANAGER, mLogger);
189: mBag.add(PegasusBag.SITE_CATALOG, mPoolHandle);
190:
191: mTXSelector = null;
192: mXMLStore = XMLProducerFactory.loadXMLProducer(props);
193:
194: mWorkerNodeExecution = props.executeOnWorkerNode();
195: if (mWorkerNodeExecution) {
196: //load SLS
197: mSLS = SLSFactory.loadInstance(mBag);
198: }
199:
200: }
201:
202: /**
203: * Returns the bag of intialization objects.
204: *
205: * @return PegasusBag
206: */
207: public PegasusBag getPegasusBag() {
208: return mBag;
209: }
210:
211: /**
212: * Returns a reference to the workflow that is being refined by the refiner.
213: *
214: *
215: * @return ADAG object.
216: */
217: public ADag getWorkflow() {
218: return this .mDag;
219: }
220:
221: /**
222: * Returns a reference to the XMLProducer, that generates the XML fragment
223: * capturing the actions of the refiner. This is used for provenace
224: * purposes.
225: *
226: * @return XMLProducer
227: */
228: public XMLProducer getXMLProducer() {
229: return this .mXMLStore;
230: }
231:
232: /**
233: * This is where the callout to the Partitioner should take place, that
234: * partitions the workflow into clusters and sends to the site selector only
235: * those list of jobs that are ready to be scheduled.
236: *
237: */
238: public void determineSites() {
239: SubInfo job;
240:
241: //at present we schedule the whole workflow at once
242: List jobs = convertToList(mDag.vJobSubInfos);
243: List pools = convertToList(mExecPools);
244:
245: //going through all the jobs making up the Adag, to do the physical mapping
246: scheduleJobs(mDag, pools);
247: }
248:
249: /**
250: * It schedules a list of jobs on the execution pools by calling out to the
251: * site selector specified. It is upto to the site selector to determine if
252: * the job can be run on the list of sites passed.
253: *
254: * @param dag the abstract workflow.
255: * @param sites the list of execution sites, specified by the user.
256: *
257: */
258: public void scheduleJobs(ADag dag, List sites) {
259:
260: mSiteSelector = SiteSelectorFactory.loadInstance(mBag);
261: mSiteSelector.mapWorkflow(dag, sites);
262:
263: int i = 0;
264: StringBuffer error;
265:
266: //load the PPS implementation
267: PPS pps = PPSFactory.loadPPS(this .mProps);
268:
269: mXMLStore.add("<workflow url=\"" + mPOptions.getDAX() + "\">");
270:
271: //call the begin workflow method
272: try {
273: pps.beginWorkflowRefinementStep(this ,
274: PPS.REFINEMENT_SITE_SELECT, false);
275: } catch (Exception e) {
276: throw new RuntimeException("PASOA Exception", e);
277: }
278:
279: //clear the XML store
280: mXMLStore.clear();
281:
282: //Iterate through the jobs and hand them to
283: //the site selector if required
284: String site;
285: for (Iterator it = dag.jobIterator(); it.hasNext(); i++) {
286:
287: SubInfo job = (SubInfo) it.next();
288: site = job.getSiteHandle();
289: //check if the user has specified any hints in the dax
290:
291: // replaced with jobmanager-type
292: // incorporateHint(job, "pfnUniverse");
293: incorporateHint(job, Hints.JOBMANAGER_UNIVERSE);
294: if (incorporateHint(job, "executionPool")) {
295: //i++;
296: incorporateProfiles(job);
297: continue;
298: }
299:
300: if (site == null) {
301: error = new StringBuffer();
302: error
303: .append("Site Selector could not map the job ")
304: .append(job.getCompleteTCName())
305: .append(
306: "\nMost likely an error occured in site selector.");
307: mLogger.log(error.toString(),
308: LogManager.ERROR_MESSAGE_LEVEL);
309: throw new RuntimeException(error.toString());
310: }
311: String jm = job.getJobManager();
312: jm = ((jm == null) || jm.length() == 0) ? null : jm;
313:
314: if (site.length() == 0
315: || site
316: .equalsIgnoreCase(SiteSelector.SITE_NOT_FOUND)) {
317: error = new StringBuffer();
318: error.append("Site Selector (").append(
319: mSiteSelector.description()).append(
320: ") could not map job ").append(
321: job.getCompleteTCName()).append(" to any site");
322: mLogger.log(error.toString(),
323: LogManager.ERROR_MESSAGE_LEVEL);
324: throw new RuntimeException(error.toString());
325: }
326: job.setJobManager(jm == null ? getJobManager(site, job
327: .getUniverse()) : jm);
328:
329: mLogger.log("Mapped job " + job.jobName + " to pool "
330: + site, LogManager.DEBUG_MESSAGE_LEVEL);
331: //incorporate the profiles and
332: //do transformation selection
333: if (!incorporateProfiles(job)) {
334: error = new StringBuffer();
335: error.append("Profiles incorrectly incorporated for ")
336: .append(job.getCompleteTCName());
337:
338: mLogger.log(error.toString(),
339: LogManager.ERROR_MESSAGE_LEVEL);
340: throw new RuntimeException(error.toString());
341:
342: }
343:
344: //modify the jobs if required for worker node execution
345: if (mWorkerNodeExecution) {
346: mSLS.modifyJobForFirstLevelStaging(job, mPOptions
347: .getSubmitDirectory(),
348: mSLS.getSLSInputLFN(job), mSLS
349: .getSLSOutputLFN(job));
350: }
351:
352: //log actions as XML fragment
353: try {
354: logRefinerAction(job);
355: pps.siteSelectionFor(job.getName(), job.getName());
356: } catch (Exception e) {
357: throw new RuntimeException("PASOA Exception", e);
358: }
359:
360: }//end of mapping all jobs
361:
362: try {
363: pps.endWorkflowRefinementStep(this );
364: } catch (Exception e) {
365: throw new RuntimeException("PASOA Exception", e);
366: }
367:
368: }
369:
370: /**
371: * Incorporates the profiles from the various sources into the job.
372: * The profiles are incorporated in the order pool, transformation catalog,
373: * and properties file, with the profiles from the properties file having
374: * the highest priority.
375: * It is here where the transformation selector is called to select
376: * amongst the various transformations returned by the TC Mapper.
377: *
378: * @param job the job into which the profiles have been incorporated.
379: *
380: * @return true profiles were successfully incorporated.
381: * false otherwise
382: */
383: private boolean incorporateProfiles(SubInfo job) {
384: TransformationCatalogEntry tcEntry = null;
385: List tcEntries = null;
386: String siteHandle = job.getSiteHandle();
387:
388: //the profile information from the pool catalog needs to be
389: //assimilated into the job.
390: job.updateProfiles(mPoolHandle.getPoolProfile(siteHandle));
391:
392: //query the TCMapper and get hold of all the valid TC
393: //entries for that site
394: tcEntries = mTCMapper.getTCList(job.namespace, job.logicalName,
395: job.version, siteHandle);
396:
397: StringBuffer error;
398: FileTransfer fTx = null;
399: if (tcEntries != null && tcEntries.size() > 0) {
400: //select a tc entry calling out to
401: //the transformation selector
402: tcEntry = selectTCEntry(tcEntries, job, mProps
403: .getTXSelectorMode());
404: if (tcEntry == null) {
405: error = new StringBuffer();
406: error.append(
407: "Transformation selection operation for job ")
408: .append(job.getCompleteTCName()).append(
409: " for site ").append(
410: job.getSiteHandle()).append(
411: " unsuccessful.");
412: mLogger.log(error.toString(),
413: LogManager.ERROR_MESSAGE_LEVEL);
414: throw new RuntimeException(error.toString());
415: }
416:
417: //something seriously wrong in this code line below.
418: //Need to verify further after more runs. (Gaurang 2-7-2006).
419: // tcEntry = (TransformationCatalogEntry) tcEntries.get(0);
420: if (tcEntry.getType().equals(TCType.STATIC_BINARY)) {
421: SiteInfo site = mPoolHandle.getPoolEntry(siteHandle,
422: "vanilla");
423: //construct a file transfer object and add it
424: //as an input file to the job in the dag
425: fTx = new FileTransfer(job
426: .getStagedExecutableBaseName(), job.jobName);
427: fTx.setType(FileTransfer.EXECUTABLE_FILE);
428: //the physical transformation points to
429: //guc or the user specified transfer mechanism
430: //accessible url
431: fTx.addSource(tcEntry.getResourceId(), tcEntry
432: .getPhysicalTransformation());
433: //the destination url is the working directory for
434: //pool where it needs to be staged to
435: //always creating a third party transfer URL
436: //for the destination.
437: String stagedPath = mPoolHandle.getExecPoolWorkDir(job)
438: + File.separator
439: + job.getStagedExecutableBaseName();
440: fTx.addDestination(siteHandle, site.getURLPrefix(false)
441: + stagedPath);
442:
443: //added in the end now after dependant executables
444: //have been handled Karan May 31 2007
445: //job.addInputFile(fTx);
446:
447: if (mWorkerNodeExecution) {
448: //do not specify the full path as we do not know worker
449: //node directory
450:
451: if (mSLS.doesCondorModifications()) {
452: //we need to take the basename of the source url
453: //as condor file transfer mech does not allow to
454: //specify destination filenames
455: job
456: .setRemoteExecutable(new File(tcEntry
457: .getPhysicalTransformation())
458: .getName());
459: } else {
460: //do this only when kickstart executable existance check is fixed
461: //Karan Nov 30 2007
462: //job.setRemoteExecutable(job.getStagedExecutableBaseName());
463: job.setRemoteExecutable(stagedPath);
464: }
465: } else {
466: //the jobs executable is the path to where
467: //the executable is going to be staged
468: job.executable = stagedPath;
469: }
470: //setting the job type of the job to
471: //denote the executable is being staged
472: job.setJobType(SubInfo.STAGED_COMPUTE_JOB);
473: } else {
474: //the executable needs to point to the physical
475: //path gotten from the selected transformantion
476: //entry
477: job.executable = tcEntry.getPhysicalTransformation();
478: }
479: } else {
480: //mismatch. should be unreachable code!!!
481: //as error should have been thrown in the site selector
482: mLogger.log("Site selector mapped job "
483: + job.getCompleteTCName() + " to pool "
484: + job.executionPool
485: + " for which no mapping exists in "
486: + "transformation mapper.",
487: LogManager.FATAL_MESSAGE_LEVEL);
488: return false;
489: }
490:
491: //the profile information from the transformation
492: //catalog needs to be assimilated into the job
493: //overriding the one from pool catalog.
494: job.updateProfiles(tcEntry);
495:
496: //the profile information from the properties file
497: //is assimilated overidding the one from transformation
498: //catalog.
499: job.updateProfiles(mProps);
500:
501: //handle dependant executables
502: handleDependantExecutables(job);
503: if (fTx != null) {
504: //add the main executable back as input
505: job.addInputFile(fTx);
506: }
507:
508: return true;
509: }
510:
511: /**
512: * Handles the dependant executables that need to be staged.
513: *
514: * @param job SubInfo
515: *
516: */
517: private void handleDependantExecutables(SubInfo job) {
518: String siteHandle = job.getSiteHandle();
519: boolean installedTX = !(job.getJobType() == SubInfo.STAGED_COMPUTE_JOB);
520:
521: List dependantExecutables = new ArrayList();
522: for (Iterator it = job.getInputFiles().iterator(); it.hasNext();) {
523: PegasusFile input = (PegasusFile) it.next();
524:
525: if (input.getType() == PegasusFile.EXECUTABLE_FILE) {
526:
527: //if the main executable is installed, just remove the executable
528: //file requirement from the input files
529: if (installedTX) {
530: it.remove();
531: continue;
532: }
533:
534: //query the TCMapper and get hold of all the valid TC
535: //entries for that site
536: String lfn[] = Separator.split(input.getLFN());
537: List tcEntries = mTCMapper.getTCList(lfn[0], lfn[1],
538: lfn[2], siteHandle);
539:
540: StringBuffer error;
541: if (tcEntries != null && tcEntries.size() > 0) {
542: //select a tc entry calling out to
543: //the transformation selector , we only should stage
544: //never pick any installed one.
545: TransformationCatalogEntry tcEntry = selectTCEntry(
546: tcEntries, job, "Staged");
547: if (tcEntry == null) {
548: error = new StringBuffer();
549: error
550: .append(
551: "Transformation selection operation for job ")
552: .append(job.getCompleteTCName())
553: .append(" for site ").append(
554: job.getSiteHandle()).append(
555: " unsuccessful.");
556: mLogger.log(error.toString(),
557: LogManager.ERROR_MESSAGE_LEVEL);
558: throw new RuntimeException(error.toString());
559: }
560:
561: // tcEntry = (TransformationCatalogEntry) tcEntries.get(0);
562: if (tcEntry.getType().equals(TCType.STATIC_BINARY)) {
563: SiteInfo site = mPoolHandle.getPoolEntry(
564: siteHandle, "vanilla");
565: //construct a file transfer object and add it
566: //as an input file to the job in the dag
567:
568: //a disconnect between the basename and the input lfn.
569: String basename = SubInfo
570: .getStagedExecutableBaseName(lfn[0],
571: lfn[1], lfn[2]);
572:
573: FileTransfer fTx = new FileTransfer(basename,
574: job.jobName);
575: fTx.setType(FileTransfer.EXECUTABLE_FILE);
576: //the physical transformation points to
577: //guc or the user specified transfer mechanism
578: //accessible url
579: fTx.addSource(tcEntry.getResourceId(), tcEntry
580: .getPhysicalTransformation());
581: //the destination url is the working directory for
582: //pool where it needs to be staged to
583: //always creating a third party transfer URL
584: //for the destination.
585: String stagedPath = mPoolHandle
586: .getExecPoolWorkDir(job)
587: + File.separator + basename;
588: fTx.addDestination(siteHandle, site
589: .getURLPrefix(false)
590: + stagedPath);
591:
592: dependantExecutables.add(fTx);
593:
594: //the jobs executable is the path to where
595: //the executable is going to be staged
596: //job.executable = stagedPath;
597: mLogger.log("Dependant Executable "
598: + input.getLFN()
599: + " being staged from "
600: + fTx.getSourceURL(),
601: LogManager.DEBUG_MESSAGE_LEVEL);
602:
603: }
604:
605: }
606: it.remove();
607: } //end of if file is exectuable
608: }
609:
610: //add all the dependant executable FileTransfers back as input files
611: for (Iterator it = dependantExecutables.iterator(); it
612: .hasNext();) {
613: FileTransfer file = (FileTransfer) it.next();
614: job.addInputFile(file);
615: }
616: }
617:
618: /**
619: * Recursively ends up calling the transformation selector accordxing to
620: * a chain of selections that need to be performed on the list of valid
621: * transformation catalog
622: *
623: * @param entries list of <code>TransformationCatalogEntry</code> objects.
624: * @param job the job.
625: * @param selectors comma separated list of selectors.
626: *
627: * @return the selected <code>TransformationCatalogEntry</code> object
628: * null when transformation selector is unable to select any
629: * transformation
630: */
631: private TransformationCatalogEntry selectTCEntry(List entries,
632: SubInfo job, String selectors) {
633: //at present there is only one selector
634: //operation performed on the selector
635: String selector = selectors;
636:
637: //load the transformation selector. different
638: //selectors may end up being loaded for different jobs.
639: mTXSelector = TransformationSelector.loadTXSelector(selector);
640: entries = mTXSelector.getTCEntry(entries);
641: return (entries == null || entries.size() == 0) ? null
642: : entries.size() > 1 ?
643: //call the selector again
644: selectTCEntry(entries, job, selectors)
645: : (TransformationCatalogEntry) entries.get(0);
646: }
647:
648: /**
649: * It returns a jobmanager for the given pool.
650: *
651: * @param site the name of the pool.
652: * @param universe the universe for which you need the scheduler on that
653: * particular pool.
654: *
655: * @return the jobmanager for that pool and universe.
656: * null if not found.
657: */
658: private String getJobManager(String site, String universe) {
659: SiteInfo p = mPoolHandle.getPoolEntry(site, universe);
660: JobManager jm = (p == null) ? null : p.selectJobManager(
661: universe, true);
662: String result = (jm == null) ? null : jm
663: .getInfo(JobManager.URL);
664:
665: if (result == null) {
666: StringBuffer error = new StringBuffer();
667: error = new StringBuffer();
668: error.append("Could not find a jobmanager at site (")
669: .append(site).append(") for universe ").append(
670: universe);
671: mLogger.log(error.toString(),
672: LogManager.ERROR_MESSAGE_LEVEL);
673: throw new RuntimeException(error.toString());
674:
675: }
676:
677: return result;
678: }
679:
680: /**
681: * It incorporates a hint in the namespace to the job. After the hint
682: * is incorporated the key is deleted from the hint namespace for that
683: * job.
684: *
685: * @param job the job that needs the hint to be incorporated.
686: * @param key the key in the hint namespace.
687: *
688: * @return true the hint was successfully incorporated.
689: * false the hint was not set in job or was not successfully
690: * incorporated.
691: */
692: private boolean incorporateHint(SubInfo job, String key) {
693: //sanity check
694: if (key.length() == 0) {
695: return false;
696: }
697:
698: switch (key.charAt(0)) {
699: case 'e':
700: if (key.equals("executionPool")
701: && job.hints.containsKey(key)) {
702: //user has overridden in the dax which execution Pool to use
703: job.executionPool = (String) job.hints
704: .removeKey("executionPool");
705:
706: incorporateHint(job, "globusScheduler");
707: return true;
708: }
709: break;
710:
711: case 'g':
712: if (key.equals("globusScheduler")) {
713: job.globusScheduler = (job.hints
714: .containsKey("globusScheduler")) ?
715: //take the globus scheduler that the user
716: //specified in the DAX
717: (String) job.hints.removeKey("globusScheduler")
718: :
719: //select one from the pool handle
720: mPoolHandle.getPoolEntry(job.executionPool,
721: job.condorUniverse).selectJobManager(
722: job.condorUniverse, true).getInfo(
723: JobManager.URL);
724:
725: return true;
726: }
727: break;
728:
729: /*//not required any longer
730: case 'p':
731:
732: if (key.equals("pfnUniverse")) {
733: job.condorUniverse = job.hints.containsKey("pfnUniverse") ?
734: (String) job.hints.removeKey("pfnUniverse") :
735: job.condorUniverse;
736:
737: return true;
738:
739: }
740: break;
741: */
742: case 'j':
743: if (key.equals(Hints.JOBMANAGER_UNIVERSE)) {
744: job.condorUniverse = job.hints
745: .containsKey(Hints.JOBMANAGER_UNIVERSE) ? (String) job.hints
746: .removeKey(Hints.JOBMANAGER_UNIVERSE)
747: : job.condorUniverse;
748:
749: return true;
750:
751: }
752: break;
753:
754: default:
755: break;
756:
757: }
758: return false;
759: }
760:
761: /**
762: * Converts a Vector to a List. It only copies by reference.
763: * @param v Vector
764: * @return a ArrayList
765: */
766: public List convertToList(Vector v) {
767: return new java.util.ArrayList(v);
768: }
769:
770: /**
771: * Converts a Set to a List. It only copies by reference.
772: * @param s Set
773: * @return a ArrayList
774: */
775: public List convertToList(Set s) {
776: return new java.util.ArrayList(s);
777: }
778:
779: /**
780: * Logs the action taken by the refiner on a job as a XML fragment in
781: * the XML Producer.
782: *
783: * @param job the <code>SubInfo</code> containing the job that was mapped
784: * to a site.
785: */
786: protected void logRefinerAction(SubInfo job) {
787: StringBuffer sb = new StringBuffer();
788: sb.append("\t<siteselection job=\"").append(job.getName())
789: .append("\">");
790: sb.append("\n").append("\t\t");
791: sb.append("<logicalsite>").append(job.getSiteHandle()).append(
792: "</logicalsite>");
793: sb.append("\n").append("\t\t");
794: sb.append("<jobmanager>").append(job.getJobManager()).append(
795: "</jobmanager>");
796: sb.append("\n");
797: sb.append("\t</siteselection>");
798: sb.append("\n");
799: mXMLStore.add(sb.toString());
800: }
801:
802: }
|