001: /*
002: * This file or a portion of this file is licensed under the terms of
003: * the Globus Toolkit Public License, found in file GTPL, or at
004: * http://www.globus.org/toolkit/download/license.html. This notice must
005: * appear in redistributions of this file, with or without modification.
006: *
007: * Redistributions of this Software, with or without modification, must
008: * reproduce the GTPL in: (1) the Software, or (2) the Documentation or
009: * some other similar material which is provided with the Software (if
010: * any).
011: *
012: * Copyright 1999-2004 University of Chicago and The University of
013: * Southern California. All rights reserved.
014: */
015:
016: package org.griphyn.cPlanner.transfer.refiner;
017:
018: import org.griphyn.cPlanner.classes.ADag;
019: import org.griphyn.cPlanner.classes.PlannerOptions;
020: import org.griphyn.cPlanner.classes.SubInfo;
021: import org.griphyn.cPlanner.classes.FileTransfer;
022:
023: import org.griphyn.cPlanner.common.PegasusProperties;
024: import org.griphyn.cPlanner.common.LogManager;
025:
026: import org.griphyn.cPlanner.namespace.VDS;
027:
028: import org.griphyn.common.catalog.TransformationCatalogEntry;
029:
030: import org.griphyn.cPlanner.transfer.Refiner;
031:
032: import java.util.Collection;
033: import java.util.List;
034: import java.util.ArrayList;
035: import java.util.Vector;
036: import java.util.Iterator;
037: import java.util.Map;
038: import java.util.HashMap;
039: import java.util.Set;
040: import java.util.HashSet;
041: import org.griphyn.cPlanner.engine.ReplicaCatalogBridge;
042:
043: /**
044: * An extension of the default refiner, that allows the user to specify
045: * the number of transfer nodes per execution site for stagein and stageout.
046: *
047: * @author Karan Vahi
048: * @version $Revision: 226 $
049: */
050:
051: public class Bundle extends Default {
052:
053: /**
054: * A short description of the transfer refinement.
055: */
056: public static final String DESCRIPTION = "Bundle Mode (stagein files distributed amongst bundles)";
057:
058: /**
059: * The default bundling factor that identifies the number of transfer jobs
060: * that are being created per execution pool for stageing in data for
061: * the workflow.
062: */
063: public static final String DEFAULT_STAGE_IN_BUNDLE_FACTOR = "1";
064:
065: /**
066: * The default bundling factor that identifies the number of transfer jobs
067: * that are being created per execution pool while stageing data out.
068: */
069: public static final String DEFAULT_STAGE_OUT_BUNDLE_FACTOR = "1";
070:
071: /**
072: * The map containing the list of stage in transfer jobs that are being
073: * created for the workflow indexed by the execution poolname.
074: */
075: private Map mStageInMap;
076:
077: /**
078: * The map indexed by compute jobnames that contains the list of stagin job
079: * names that are being added during the traversal of the workflow. This is
080: * used to construct the relations that need to be added to workflow, once
081: * the traversal is done.
082: */
083: private Map mRelationsMap;
084:
085: /**
086: * The map containing the stage in bundle values indexed by the name of the
087: * pool. If the bundle value is not specified, then null is stored.
088: */
089: private Map mSIBundleMap;
090:
091: /**
092: * The map indexed by staged executable logical name. Each entry is the
093: * name of the corresponding setup job, that changes the XBit on the staged
094: * file.
095: */
096: private Map mSetupMap;
097:
098: /**
099: * A map indexed by site name, that contains the pointer to the stage out
100: * PoolTransfer objects for that site. This is per level of the workflow.
101: */
102: private Map mStageOutMapPerLevel;
103:
104: /**
105: * The current level of the jobs being traversed.
106: */
107: private int mCurrentLevel;
108:
109: /**
110: * The handle to the replica catalog bridge.
111: */
112: private ReplicaCatalogBridge mRCB;
113:
114: /**
115: * The overloaded constructor.
116: *
117: * @param dag the workflow to which transfer nodes need to be added.
118: * @param properties the <code>PegasusProperties</code> object containing all
119: * the properties required by Pegasus.
120: * @param options the options passed to the planner.
121: *
122: */
123: public Bundle(ADag dag, PegasusProperties properties,
124: PlannerOptions options) {
125: super (dag, properties, options);
126: mStageInMap = new HashMap(options.getExecutionSites().size());
127: mSIBundleMap = new HashMap();
128: mRelationsMap = new HashMap();
129: mSetupMap = new HashMap();
130: mCurrentLevel = -1;
131: }
132:
133: /**
134: * Adds the stage in transfer nodes which transfer the input files for a job,
135: * from the location returned from the replica catalog to the job's execution
136: * pool.
137: *
138: * @param job <code>SubInfo</code> object corresponding to the node to
139: * which the files are to be transferred to.
140: * @param files Collection of <code>FileTransfer</code> objects containing the
141: * information about source and destURL's.
142: */
143: public void addStageInXFERNodes(SubInfo job, Collection files) {
144: String jobName = job.getName();
145: String siteHandle = job.getSiteHandle();
146: String key = null;
147: String par = null;
148: int bundle = -1;
149:
150: //to prevent duplicate dependencies
151: Set tempSet = new HashSet();
152:
153: int staged = 0;
154: Collection stagedFiles = new ArrayList();
155: Collection stageInExecJobs = new ArrayList();//store list of jobs that are transferring the stage file
156: for (Iterator it = files.iterator(); it.hasNext();) {
157: FileTransfer ft = (FileTransfer) it.next();
158: String lfn = ft.getLFN();
159:
160: //get the key for this lfn and pool
161: //if the key already in the table
162: //then remove the entry from
163: //the Vector and add a dependency
164: //in the graph
165: key = this .constructFileKey(lfn, siteHandle);
166: par = (String) mFileTable.get(key);
167: //System.out.println("lfn " + lfn + " par " + par);
168: if (par != null) {
169: it.remove();
170:
171: //check if tempSet does not contain the parent
172: //fix for sonal's bug
173: tempSet.add(par);
174:
175: if (ft.isTransferringExecutableFile()) {
176: //currently we have only one file to be staged per
177: //compute job . Taking a short cut in determining
178: //the name of setXBit job
179: String xBitJobName = (String) mSetupMap.get(key);
180: if (key == null) {
181: throw new RuntimeException(
182: "Internal Pegasus Error while "
183: + "constructing bundled stagein jobs");
184: }
185: //add relation xbitjob->computejob
186: this .addRelation(xBitJobName, jobName);
187: }
188:
189: } else {
190: //get the name of the transfer job
191: boolean contains = mStageInMap.containsKey(siteHandle);
192: //following pieces need rearragnement!
193: if (!contains) {
194: bundle = getSISiteBundleValue(siteHandle, job.vdsNS
195: .getStringValue(VDS.BUNDLE_STAGE_IN_KEY));
196: mSIBundleMap.put(siteHandle, Integer
197: .toString(bundle));
198: }
199: PoolTransfer pt = (contains) ? (PoolTransfer) mStageInMap
200: .get(siteHandle)
201: : new PoolTransfer(siteHandle, bundle);
202: if (!contains) {
203: mStageInMap.put(siteHandle, pt);
204: }
205: //add the FT to the appropriate transfer job.
206: String newJobName = pt.addTransfer(ft);
207:
208: if (ft.isTransferringExecutableFile()) {
209: //currently we have only one file to be staged per
210: //compute job
211: // Collection execFiles = new ArrayList(1);
212: // execFiles.add(ft);
213: //add both the name of the stagein job and the executable file
214: stageInExecJobs.add(newJobName);
215: stagedFiles.add(ft);
216:
217: // mTXStageInImplementation.addSetXBitJobs(job, newJobName,
218: // execFiles,
219: // SubInfo.STAGE_IN_JOB);
220: mLogger.log("Entered "
221: + key
222: + "->"
223: + mTXStageInImplementation
224: .getSetXBitJobName(job.getName(),
225: staged),
226: LogManager.DEBUG_MESSAGE_LEVEL);
227: mSetupMap.put(key, mTXStageInImplementation
228: .getSetXBitJobName(job.getName(), staged));
229: staged++;
230: }
231:
232: //make a new entry into the table
233: mFileTable.put(key, newJobName);
234: //add the newJobName to the tempSet so that even
235: //if the job has duplicate input files only one instance
236: //of transfer is scheduled. This came up during collapsing
237: //June 15th, 2004
238: tempSet.add(newJobName);
239:
240: }
241: }
242:
243: //if there were any staged files
244: //add the setXBitJobs for them
245: int index = 0;
246: Iterator jobIt = stageInExecJobs.iterator();
247: for (Iterator it = stagedFiles.iterator(); it.hasNext(); index++) {
248: Collection execFiles = new ArrayList(1);
249: execFiles.add(it.next());
250: mTXStageInImplementation.addSetXBitJobs(job, (String) jobIt
251: .next(), execFiles, SubInfo.STAGE_IN_JOB, index);
252:
253: }
254:
255: //add the temp set to the relations
256: //relations are added to the workflow in the end.
257: mRelationsMap.put(jobName, tempSet);
258:
259: }
260:
261: /**
262: * Adds the stageout transfer nodes, that stage data to an output site
263: * specified by the user.
264: *
265: * @param job <code>SubInfo</code> object corresponding to the node to
266: * which the files are to be transferred to.
267: * @param files Collection of <code>FileTransfer</code> objects containing the
268: * information about source and destURL's.
269: * @param rcb bridge to the Replica Catalog. Used for creating registration
270: * nodes in the workflow.
271: * @param deletedLeaf to specify whether the node is being added for
272: * a deleted node by the reduction engine or not.
273: * default: false
274: */
275: public void addStageOutXFERNodes(SubInfo job, Collection files,
276: ReplicaCatalogBridge rcb, boolean deletedLeaf) {
277:
278: //initializing rcb till the change in function signature happens
279: //needs to be passed during refiner initialization
280: mRCB = rcb;
281:
282: //sanity check
283: if (files.isEmpty()) {
284: return;
285: }
286:
287: String jobName = job.getName();
288: // String regJob = this.REGISTER_PREFIX + jobName;
289:
290: mLogMsg = "Adding output pool nodes for job " + jobName;
291:
292: //separate the files for transfer
293: //and for registration
294: List txFiles = new ArrayList();
295: List regFiles = new ArrayList();
296: for (Iterator it = files.iterator(); it.hasNext();) {
297: FileTransfer ft = (FileTransfer) it.next();
298: if (!ft.getTransientTransferFlag()) {
299: txFiles.add(ft);
300: }
301: if (!ft.getTransientRegFlag()) {
302: regFiles.add(ft);
303: }
304: }
305:
306: boolean makeTNode = !txFiles.isEmpty();
307: boolean makeRNode = !regFiles.isEmpty();
308:
309: int level = job.getLevel();
310: String site = job.getSiteHandle();
311: int bundleValue = getSOSiteBundleValue(site, job.vdsNS
312: .getStringValue(VDS.BUNDLE_STAGE_OUT_KEY));
313:
314: if (level != mCurrentLevel) {
315: mCurrentLevel = level;
316: //we are starting on a new level of the workflow.
317: //reinitialize stuff
318: this .resetStageOutMap();
319: }
320:
321: TransferContainer soTC = null;
322: if (makeTNode) {
323:
324: //get the appropriate pool transfer object for the site
325: PoolTransfer pt = this .getStageOutPoolTransfer(site,
326: bundleValue);
327: //we add all the file transfers to the pool transfer
328: soTC = pt
329: .addTransfer(txFiles, level, SubInfo.STAGE_OUT_JOB);
330: String soJob = soTC.getTXName();
331:
332: if (!deletedLeaf) {
333: //need to add a relation between a compute and stage-out
334: //job only if the compute job was not reduced.
335: addRelation(jobName, soJob);
336: }
337: //moved to the resetStageOut method
338: // if (makeRNode) {
339: // addRelation( soJob, soTC.getRegName() );
340: // }
341: } else if (makeRNode) {
342: //add an empty file transfer
343: //get the appropriate pool transfer object for the site
344: PoolTransfer pt = this .getStageOutPoolTransfer(site,
345: bundleValue);
346: //we add all the file transfers to the pool transfer
347: soTC = pt.addTransfer(new Vector(), level,
348: SubInfo.STAGE_OUT_JOB);
349:
350: //direct link between compute job and registration job
351: addRelation(jobName, soTC.getRegName());
352:
353: }
354: if (makeRNode) {
355: soTC.addRegistrationFiles(regFiles);
356: //call to make the reg subinfo
357: //added in make registration node
358: // addJob(createRegistrationJob(regJob, job, regFiles, rcb));
359: }
360:
361: }
362:
363: /**
364: * Signals that the traversal of the workflow is done. At this point the
365: * transfer nodes are actually constructed traversing through the transfer
366: * containers and the stdin of the transfer jobs written.
367: */
368: public void done() {
369: //traverse through the stagein map and
370: //add transfer nodes per pool
371: String key;
372: String value;
373: PoolTransfer pt;
374: TransferContainer tc;
375: Map.Entry entry;
376: SubInfo job = new SubInfo();
377:
378: for (Iterator it = mStageInMap.entrySet().iterator(); it
379: .hasNext();) {
380: entry = (Map.Entry) it.next();
381: key = (String) entry.getKey();
382: pt = (PoolTransfer) entry.getValue();
383: mLogger.log("Adding stage in transfer nodes for pool "
384: + key, LogManager.DEBUG_MESSAGE_LEVEL);
385:
386: for (Iterator pIt = pt.getTransferContainerIterator(); pIt
387: .hasNext();) {
388: tc = (TransferContainer) pIt.next();
389: if (tc == null) {
390: //break out
391: break;
392: }
393: mLogger.log("Adding stagein transfer node "
394: + tc.getTXName(),
395: LogManager.DEBUG_MESSAGE_LEVEL);
396: //added in make transfer node
397: //mDag.addNewJob(tc.getName());
398: //we just need the execution pool in the job object
399: job.executionPool = key;
400: addJob(mTXStageInImplementation.createTransferJob(job,
401: tc.getFileTransfers(), null, tc.getTXName(),
402: SubInfo.STAGE_IN_JOB));
403:
404: }
405: }
406:
407: //adding relations that tie in the stagin
408: //jobs to the compute jobs.
409: for (Iterator it = mRelationsMap.entrySet().iterator(); it
410: .hasNext();) {
411: entry = (Map.Entry) it.next();
412: key = (String) entry.getKey();
413: mLogger.log("Adding relations for job " + key,
414: LogManager.DEBUG_MESSAGE_LEVEL);
415: for (Iterator pIt = ((Collection) entry.getValue())
416: .iterator(); pIt.hasNext();) {
417: value = (String) pIt.next();
418: mLogMsg = "Adding relation " + value + " -> " + key;
419: mLogger.log(mLogMsg, LogManager.DEBUG_MESSAGE_LEVEL);
420: // mDag.addNewRelation(value,key);
421: addRelation(value, key);
422: }
423: }
424:
425: //reset the stageout map too
426: this .resetStageOutMap();
427: }
428:
429: /**
430: * Returns a textual description of the transfer mode.
431: *
432: * @return a short textual description
433: */
434: public String getDescription() {
435: return this .DESCRIPTION;
436: }
437:
438: /**
439: * Determines the bundle factor for a particular site on the basis of the
440: * stage in bundle value associcated with the underlying transfer
441: * transformation in the transformation catalog. If the key is not found,
442: * then the default value is returned. In case of the default value being
443: * null the global default is returned.
444: *
445: * @param site the site at which the value is desired.
446: * @param deflt the default value.
447: *
448: * @return the bundle factor.
449: *
450: * @see #DEFAULT_BUNDLE_STAGE_IN_FACTOR
451: */
452: protected int getSISiteBundleValue(String site, String deflt) {
453: //this should be parameterised Karan Dec 20,2005
454: TransformationCatalogEntry entry = mTXStageInImplementation
455: .getTransformationCatalogEntry(site);
456: SubInfo sub = new SubInfo();
457: String value = (deflt == null) ? this .DEFAULT_STAGE_IN_BUNDLE_FACTOR
458: : deflt;
459:
460: if (entry != null) {
461: sub.updateProfiles(entry);
462: value = (sub.vdsNS.containsKey(VDS.BUNDLE_STAGE_IN_KEY)) ? sub.vdsNS
463: .getStringValue(VDS.BUNDLE_STAGE_IN_KEY)
464: : value;
465: }
466:
467: return Integer.parseInt(value);
468: }
469:
470: /**
471: * Determines the bundle factor for a particular site on the basis of the
472: * stage out bundle value associcated with the underlying transfer
473: * transformation in the transformation catalog. If the key is not found,
474: * then the default value is returned. In case of the default value being
475: * null the global default is returned.
476: *
477: * @param site the site at which the value is desired.
478: * @param deflt the default value.
479: *
480: * @return the bundle factor.
481: *
482: * @see #DEFAULT_STAGE_OUT_BUNDLE_FACTOR
483: */
484: protected int getSOSiteBundleValue(String site, String deflt) {
485: //this should be parameterised Karan Dec 20,2005
486: TransformationCatalogEntry entry = mTXStageInImplementation
487: .getTransformationCatalogEntry(site);
488: SubInfo sub = new SubInfo();
489: String value = (deflt == null) ? this .DEFAULT_STAGE_OUT_BUNDLE_FACTOR
490: : deflt;
491:
492: if (entry != null) {
493: sub.updateProfiles(entry);
494: value = (sub.vdsNS.containsKey(VDS.BUNDLE_STAGE_OUT_KEY)) ? sub.vdsNS
495: .getStringValue(VDS.BUNDLE_STAGE_OUT_KEY)
496: : value;
497: }
498:
499: return Integer.parseInt(value);
500: }
501:
502: /**
503: * Returns the appropriate pool transfer for a particular site.
504: *
505: * @param site the site for which the PT is reqd.
506: * @param num the number of Stageout jobs required for that Pool.
507: *
508: * @return the PoolTransfer
509: */
510: public PoolTransfer getStageOutPoolTransfer(String site, int num) {
511:
512: if (this .mStageOutMapPerLevel.containsKey(site)) {
513: return (PoolTransfer) this .mStageOutMapPerLevel.get(site);
514: } else {
515: PoolTransfer pt = new PoolTransfer(site, num);
516: this .mStageOutMapPerLevel.put(site, pt);
517: return pt;
518: }
519: }
520:
521: /**
522: * Resets the stage out map.
523: */
524: private void resetStageOutMap() {
525: if (this .mStageOutMapPerLevel != null) {
526: //before flushing add the stageout nodes to the workflow
527: SubInfo job = new SubInfo();
528:
529: for (Iterator it = mStageOutMapPerLevel.values().iterator(); it
530: .hasNext();) {
531: PoolTransfer pt = (PoolTransfer) it.next();
532: job.setSiteHandle(pt.mPool);
533:
534: mLogger.log(
535: "Adding jobs for staging out data from site "
536: + pt.mPool,
537: LogManager.DEBUG_MESSAGE_LEVEL);
538:
539: //traverse through all the TransferContainers
540: for (Iterator tcIt = pt.getTransferContainerIterator(); tcIt
541: .hasNext();) {
542: TransferContainer tc = (TransferContainer) tcIt
543: .next();
544: if (tc == null) {
545: //break out
546: break;
547: }
548:
549: //add the stageout job if required
550: SubInfo soJob = null;
551: if (!tc.getFileTransfers().isEmpty()) {
552: mLogger.log("Adding stage-out job "
553: + tc.getTXName(),
554: LogManager.DEBUG_MESSAGE_LEVEL);
555: soJob = mTXStageOutImplementation
556: .createTransferJob(job, tc
557: .getFileTransfers(), null, tc
558: .getTXName(),
559: SubInfo.STAGE_OUT_JOB);
560: addJob(soJob);
561: }
562:
563: //add registration job if required
564: if (!tc.getRegistrationFiles().isEmpty()) {
565:
566: //add relation to stage out if the stageout job was created
567: if (soJob != null) {
568: //make the stageout job the super node for the registration job
569: job.setName(soJob.getName());
570: addRelation(tc.getTXName(), tc.getRegName());
571: }
572:
573: mLogger.log("Adding registration job "
574: + tc.getRegName(),
575: LogManager.DEBUG_MESSAGE_LEVEL);
576: addJob(createRegistrationJob(tc.getRegName(),
577: job, tc.getRegistrationFiles(), mRCB));
578:
579: }
580:
581: }
582: }
583: }
584:
585: mStageOutMapPerLevel = new HashMap();
586: }
587:
588: /**
589: * A container class for storing the name of the transfer job, the list of
590: * file transfers that the job is responsible for.
591: */
592: private class TransferContainer {
593:
594: /**
595: * The name of the transfer job.
596: */
597: private String mTXName;
598:
599: /**
600: * The name of the registration job.
601: */
602: private String mRegName;
603:
604: /**
605: * The collection of <code>FileTransfer</code> objects containing the
606: * transfers the job is responsible for.
607: */
608: private Collection mFileTXList;
609:
610: /**
611: * The collection of <code>FileTransfer</code> objects containing the
612: * files that need to be registered.
613: */
614: private Collection mRegFiles;
615:
616: /**
617: * The type of the transfers the job is responsible for.
618: */
619: private int mTransferType;
620:
621: /**
622: * The default constructor.
623: */
624: public TransferContainer() {
625: mTXName = null;
626: mRegName = null;
627: mFileTXList = new Vector();
628: mRegFiles = new Vector();
629: mTransferType = SubInfo.STAGE_IN_JOB;
630: }
631:
632: /**
633: * Sets the name of the transfer job.
634: *
635: * @param name the name of the transfer job.
636: */
637: public void setTXName(String name) {
638: mTXName = name;
639: }
640:
641: /**
642: * Sets the name of the registration job.
643: *
644: * @param name the name of the transfer job.
645: */
646: public void setRegName(String name) {
647: mRegName = name;
648: }
649:
650: /**
651: * Adds a file transfer to the underlying collection.
652: *
653: * @param transfer the <code>FileTransfer</code> containing the
654: * information about a single transfer.
655: */
656: public void addTransfer(FileTransfer transfer) {
657: mFileTXList.add(transfer);
658: }
659:
660: /**
661: * Adds a file transfer to the underlying collection.
662: *
663: * @param files collection of <code>FileTransfer</code>.
664: */
665: public void addTransfer(Collection files) {
666: mFileTXList.addAll(files);
667: }
668:
669: /**
670: * Adds a Collection of File transfer to the underlying collection of
671: * files to be registered.
672: *
673: * @param files collection of <code>FileTransfer</code>.
674: */
675: public void addRegistrationFiles(Collection files) {
676: mRegFiles.addAll(files);
677: }
678:
679: /**
680: * Sets the transfer type for the transfers associated.
681: *
682: * @param type type of transfer.
683: */
684: public void setTransferType(int type) {
685: mTransferType = type;
686: }
687:
688: /**
689: * Returns the name of the transfer job.
690: *
691: * @return name of the transfer job.
692: */
693: public String getTXName() {
694: return mTXName;
695: }
696:
697: /**
698: * Returns the name of the registration job.
699: *
700: * @return name of the registration job.
701: */
702: public String getRegName() {
703: return mRegName;
704: }
705:
706: /**
707: * Returns the collection of transfers associated with this transfer
708: * container.
709: *
710: * @return a collection of <code>FileTransfer</code> objects.
711: */
712: public Collection getFileTransfers() {
713: return mFileTXList;
714: }
715:
716: /**
717: * Returns the collection of registration files associated with this transfer
718: * container.
719: *
720: * @return a collection of <code>FileTransfer</code> objects.
721: */
722: public Collection getRegistrationFiles() {
723: return mRegFiles;
724: }
725:
726: }
727:
728: /**
729: * A container to store the transfers that need to be done on a single pool.
730: * The transfers are stored over a collection of Transfer Containers with
731: * each transfer container responsible for one transfer job.
732: */
733: private class PoolTransfer {
734:
735: /**
736: * The maximum number of transfer jobs that are allowed for this
737: * particular pool.
738: */
739: private int mCapacity;
740:
741: /**
742: * The index of the job to which the next transfer for the pool would
743: * be scheduled.
744: */
745: private int mNext;
746:
747: /**
748: * The pool for which these transfers are grouped.
749: */
750: private String mPool;
751:
752: /**
753: * The list of <code>TransferContainer</code> that correspond to
754: * each transfer job.
755: */
756: private List mTXContainers;
757:
758: /**
759: * The default constructor.
760: */
761: public PoolTransfer() {
762: mCapacity = 0;
763: mNext = -1;
764: mPool = null;
765: mTXContainers = null;
766: }
767:
768: /**
769: * Convenience constructor.
770: *
771: * @param pool the pool name for which transfers are being grouped.
772: * @param number the number of transfer jobs that are going to be created
773: * for the pool.
774: */
775: public PoolTransfer(String pool, int number) {
776: mCapacity = number;
777: mNext = 0;
778: mPool = pool;
779: mTXContainers = new ArrayList(number);
780: //intialize to null
781: for (int i = 0; i < number; i++) {
782: mTXContainers.add(null);
783: }
784: }
785:
786: /**
787: * Adds a a collection of <code>FileTransfer</code> objects to the
788: * appropriate TransferContainer. The collection is added to a single
789: * TransferContainer, and the pointer is then updated to the next container.
790: *
791: * @param files the collection <code>FileTransfer</code> to be added.
792: * @param level the level of the workflow
793: * @param type the type of transfer job
794: *
795: * @return the Transfer Container to which the job file transfers were added.
796: */
797: public TransferContainer addTransfer(Collection files,
798: int level, int type) {
799: //we add the transfer to the container pointed
800: //by next
801: Object obj = mTXContainers.get(mNext);
802: TransferContainer tc = null;
803: if (obj == null) {
804: //on demand add a new transfer container to the end
805: //is there a scope for gaps??
806: tc = new TransferContainer();
807: tc.setTXName(getTXJobName(mNext, type, level));
808: //add the name for the registration job that maybe associated
809: tc.setRegName(getRegJobName(mNext, level));
810: mTXContainers.set(mNext, tc);
811: } else {
812: tc = (TransferContainer) obj;
813: }
814: tc.addTransfer(files);
815:
816: //update the next pointer to maintain
817: //round robin status
818: mNext = (mNext < (mCapacity - 1)) ? mNext + 1 : 0;
819:
820: return tc;
821: }
822:
823: /**
824: * Adds a file transfer to the appropriate TransferContainer.
825: * The file transfers are added in a round robin manner underneath.
826: *
827: * @param transfer the <code>FileTransfer</code> containing the
828: * information about a single transfer.
829: *
830: * @return the name of the transfer job to which the transfer is added.
831: */
832: public String addTransfer(FileTransfer transfer) {
833: //we add the transfer to the container pointed
834: //by next
835: Object obj = mTXContainers.get(mNext);
836: TransferContainer tc = null;
837: if (obj == null) {
838: //on demand add a new transfer container to the end
839: //is there a scope for gaps??
840: tc = new TransferContainer();
841: tc.setTXName(getTXJobName(mNext, SubInfo.STAGE_IN_JOB));
842: mTXContainers.set(mNext, tc);
843: } else {
844: tc = (TransferContainer) obj;
845: }
846: tc.addTransfer(transfer);
847:
848: //update the next pointer to maintain
849: //round robin status
850: mNext = (mNext < (mCapacity - 1)) ? mNext + 1 : 0;
851:
852: return tc.getTXName();
853: }
854:
855: /**
856: * Returns the iterator to the list of transfer containers.
857: *
858: * @return the iterator.
859: */
860: public Iterator getTransferContainerIterator() {
861: return mTXContainers.iterator();
862: }
863:
864: /**
865: * Generates the name of the transfer job, that is unique for the given
866: * workflow.
867: *
868: * @param counter the index for the transfer job.
869: * @param type the type of transfer job.
870: * @param level the level of the workflow.
871: *
872: * @return the name of the transfer job.
873: */
874: private String getTXJobName(int counter, int type, int level) {
875: StringBuffer sb = new StringBuffer();
876: switch (type) {
877: case SubInfo.STAGE_IN_JOB:
878: sb.append(Refiner.STAGE_IN_PREFIX);
879: break;
880:
881: case SubInfo.STAGE_OUT_JOB:
882: sb.append(Refiner.STAGE_OUT_PREFIX);
883: break;
884:
885: default:
886: throw new RuntimeException("Wrong type specified "
887: + type);
888: }
889:
890: sb.append(mPool).append("_").append(level).append("_")
891: .append(counter);
892:
893: return sb.toString();
894: }
895:
896: /**
897: * Generates the name of the transfer job, that is unique for the given
898: * workflow.
899: *
900: * @param counter the index for the registration job.
901: * @param level the level of the workflow.
902: *
903: * @return the name of the transfer job.
904: */
905: private String getRegJobName(int counter, int level) {
906: StringBuffer sb = new StringBuffer();
907: sb.append(Refiner.REGISTER_PREFIX);
908:
909: sb.append(mPool).append("_").append(level).append("_")
910: .append(counter);
911:
912: return sb.toString();
913: }
914:
915: /**
916: * Generates the name of the transfer job, that is unique for the given
917: * workflow.
918: *
919: * @param counter the index for the transfer job.
920: * @param type the type of transfer job.
921: *
922: * @return the name of the transfer job.
923: */
924: private String getTXJobName(int counter, int type) {
925: StringBuffer sb = new StringBuffer();
926: switch (type) {
927: case SubInfo.STAGE_IN_JOB:
928: sb.append(Refiner.STAGE_IN_PREFIX);
929: break;
930:
931: case SubInfo.STAGE_OUT_JOB:
932: sb.append(Refiner.STAGE_OUT_PREFIX);
933: break;
934:
935: default:
936: throw new RuntimeException("Wrong type specified "
937: + type);
938: }
939: sb.append(mPool).append("_").append(counter);
940:
941: return sb.toString();
942: }
943:
944: }
945:
946: }
|