001: /*
002: * This file or a portion of this file is licensed under the terms of
003: * the Globus Toolkit Public License, found in file GTPL, or at
004: * http://www.globus.org/toolkit/download/license.html. This notice must
005: * appear in redistributions of this file, with or without modification.
006: *
007: * Redistributions of this Software, with or without modification, must
008: * reproduce the GTPL in: (1) the Software, or (2) the Documentation or
009: * some other similar material which is provided with the Software (if
010: * any).
011: *
012: * Copyright 1999-2004 University of Chicago and The University of
013: * Southern California. All rights reserved.
014: */
015:
016: package org.griphyn.cPlanner.transfer.refiner;
017:
018: import org.griphyn.cPlanner.classes.ADag;
019: import org.griphyn.cPlanner.classes.SubInfo;
020: import org.griphyn.cPlanner.classes.PlannerOptions;
021:
022: import org.griphyn.cPlanner.namespace.VDS;
023:
024: import org.griphyn.cPlanner.common.LogManager;
025: import org.griphyn.cPlanner.common.PegasusProperties;
026:
027: import org.griphyn.cPlanner.poolinfo.PoolInfoProvider;
028: import org.griphyn.cPlanner.poolinfo.PoolMode;
029:
030: import org.griphyn.common.catalog.TransformationCatalogEntry;
031:
032: import java.util.Map;
033: import java.util.HashMap;
034: import java.util.Iterator;
035: import java.util.List;
036: import java.util.ArrayList;
037: import java.util.LinkedList;
038:
039: /**
040: * This transfer refiner builds upon the Default Refiner.
041: * The defaul Refiner allows the transfer of multiple files in a single
042: * condor job. However, it adds the stage in transfer nodes in parallel leading
043: * to multiple invocation of the globus-url-copy at remote execution pools,
044: * while running huge workflows.
045: * This refiner, tries to circumvent this problem by chaining up the stagein jobs
046: * instead of scheduling in parallel. This works best only when the top level
047: * of the workflow requires stage in jobs. The correct way is that the traversal
048: * needs to be done breath first in the TransferEngine.java.
049: *
050: * @author Karan Vahi
051: * @author Gaurang Mehta
052: *
053: * @version $Revision: 50 $
054: */
055: public class Chain extends Default {
056:
057: /**
058: * The default bundling factor that identifies the number of transfer jobs
059: * that are being created per execution pool for the workflow.
060: */
061: public static final String DEFAULT_BUNDLE_FACTOR = "1";
062:
063: /**
064: * The handle to the Site Catalog. It is instantiated in this class.
065: */
066: protected PoolInfoProvider mSCHandle;
067:
068: /**
069: * The map containing the stage in bundle values indexed by the name of the
070: * site. If the bundle value is not specified, then null is stored.
071: */
072: private Map mSIBundleMap;
073:
074: /**
075: * A map indexed by execution sites. Each value is a SiteTransfer object,
076: * that contains the Bundles of stagin transfer jobs.
077: *
078: * @see TransferChain
079: */
080: private Map mStageInMap;
081:
082: /**
083: * A short description of the transfer refinement.
084: */
085: public static final String DESCRIPTION = "Chain Mode (the stage in jobs being chained together in bundles";
086:
087: /**
088: * The overloaded constructor.
089: *
090: * @param dag the workflow to which transfer nodes need to be added.
091: * @param properties the <code>PegasusProperties</code> object containing all
092: * the properties required by Pegasus.
093: * @param options the options passed to the planner.
094: *
095: */
096: public Chain(ADag dag, PegasusProperties properties,
097: PlannerOptions options) {
098: super (dag, properties, options);
099: //specifying initial capacity.
100: //adding one to account for local pool
101: mStageInMap = new HashMap(
102: options.getExecutionSites().size() + 1);
103: mSIBundleMap = new HashMap();
104:
105: //load the site catalog
106: String poolFile = mProps.getPoolFile();
107: String poolClass = PoolMode.getImplementingClass(mProps
108: .getPoolMode());
109: mSCHandle = PoolMode.loadPoolInstance(poolClass, poolFile,
110: PoolMode.SINGLETON_LOAD);
111:
112: }
113:
114: /**
115: * Adds a new relation to the workflow. In the case when the parent is a
116: * transfer job that is added, the parentNew should be set only the first
117: * time a relation is added. For subsequent compute jobs that maybe
118: * dependant on this, it needs to be set to false.
119: *
120: * @param parent the jobname of the parent node of the edge.
121: * @param child the jobname of the child node of the edge.
122: * @param site the execution site where the transfer node is to be run.
123: * @param parentNew the parent node being added, is the new transfer job
124: * and is being called for the first time.
125: */
126: public void addRelation(String parent, String child, String site,
127: boolean parentNew) {
128:
129: addRelation(parent, child);
130: // mDAG.addNewRelation(parent,child);
131:
132: if (parentNew) {
133: //a new transfer job is being added
134: //figure out the correct bundle to
135: //put in
136: List l = null;
137: if (mStageInMap.containsKey(site)) {
138: //get the SiteTransfer for the site
139: SiteTransfer old = (SiteTransfer) mStageInMap.get(site);
140: //put the parent in the appropriate bundle
141: //and get the pointer to the last element in
142: //the chain before the parent is added.
143: String last = old.addTransfer(parent);
144: if (last != null) {
145: //the parent is now the last element in the chain
146: //continue the chain forward
147: //adding the last link in the chain
148: this .addRelation(last, parent, site, false);
149: }
150: } else {
151: //create a new SiteTransfer for the job
152: //determine the bundle for the site
153: int bundle;
154: if (mSIBundleMap.containsKey(site)) {
155: bundle = ((Integer) mSIBundleMap.get(site))
156: .intValue();
157: } else {
158: bundle = getSiteBundleValue(site,
159: VDS.CHAIN_STAGE_IN_KEY);
160: //put the value into the map
161: mSIBundleMap.put(site, new Integer(bundle));
162: }
163: SiteTransfer siteTX = new SiteTransfer(site, bundle);
164: siteTX.addTransfer(parent);
165: mStageInMap.put(site, siteTX);
166: }
167: }
168: }
169:
170: /**
171: * Determines the bundle factor for a particular site on the basis of the
172: * key associcated with the underlying transfer transformation in the
173: * transformation catalog. If none specified in transformation catalog then
174: * one is picked up from the site catalog. If the key is not found in the
175: * site catalog too , then the global default is returned.
176: *
177: * @param site the site at which the transfer job is being run.
178: * @param key the bundle key whose value needs to be searched.
179: *
180: * @return the bundle factor.
181: *
182: * @see #DEFAULT_BUNDLE_FACTOR
183: */
184: public int getSiteBundleValue(String site, String key) {
185: String value = this .DEFAULT_BUNDLE_FACTOR;
186: //construct a sudo transfer job object
187: //and populate the profiles in it.
188: SubInfo sub = new SubInfo();
189: //assimilate the profile information from the
190: //site catalog into the job.
191: sub.updateProfiles(mSCHandle.getPoolProfile(site));
192:
193: //this should be parameterised Karan Dec 20,2005
194: TransformationCatalogEntry entry = mTXStageInImplementation
195: .getTransformationCatalogEntry(site);
196:
197: //assimilate the profile information from transformation catalog
198: if (entry != null) {
199: sub.updateProfiles(entry);
200: }
201:
202: value = (sub.vdsNS.containsKey(key)) ? sub.vdsNS
203: .getStringValue(key) : value;
204: return Integer.parseInt(value);
205: }
206:
207: /**
208: * Prints out the bundles and chains that have been constructed.
209: */
210: public void done() {
211: //print out all the Site transfers that you have
212: mLogger.log("Chains of stagein jobs per sites are ",
213: LogManager.DEBUG_MESSAGE_LEVEL);
214: for (Iterator it = mStageInMap.entrySet().iterator(); it
215: .hasNext();) {
216: Map.Entry entry = (Map.Entry) it.next();
217: mLogger.log(entry.getKey() + " " + entry.getValue(),
218: LogManager.DEBUG_MESSAGE_LEVEL);
219: }
220: }
221:
222: /**
223: * Returns a textual description of the transfer mode.
224: *
225: * @return a short textual description
226: */
227: public String getDescription() {
228: return this .DESCRIPTION;
229: }
230:
231: /**
232: * A container to manage the transfer jobs that are needed to be done on a
233: * single site. The container maintains the bundles and controls the
234: * distribution of a transfer job amongst the bundles in a round robin manner.
235: * Each bundle itself is actually a chain of transfer jobs.
236: */
237: private class SiteTransfer {
238:
239: /**
240: * The maximum number of transfer jobs that are allowed for this
241: * particular site. This should correspond to the bundle factor.
242: */
243: private int mCapacity;
244:
245: /**
246: * The index of the bundle to which the next transfer for the site would
247: * be added to.
248: */
249: private int mNext;
250:
251: /**
252: * The site for which these transfers are grouped.
253: */
254: private String mSite;
255:
256: /**
257: * The list of <code>Chain</code> object. Each bundle is actually a chain
258: * of transfer nodes.
259: */
260: private List mBundles;
261:
262: /**
263: * The default constructor.
264: */
265: public SiteTransfer() {
266: mCapacity = 1;
267: mNext = -1;
268: mSite = null;
269: mBundles = null;
270: }
271:
272: /**
273: * Convenience constructor.
274: *
275: * @param pool the pool name for which transfers are being grouped.
276: * @param bundle the number of logical bundles that are to be created
277: * per site. it directly translates to the number of transfer
278: * jobs that can be running at a particular site
279: */
280: public SiteTransfer(String pool, int bundle) {
281: mCapacity = bundle;
282: mNext = 0;
283: mSite = pool;
284: mBundles = new ArrayList(bundle);
285: //intialize to null
286: for (int i = 0; i < bundle; i++) {
287: mBundles.add(null);
288: }
289: }
290:
291: /**
292: * Adds a file transfer to the appropriate TransferChain.
293: * The file transfers are added in a round robin manner underneath.
294: *
295: * @param txJobName the name of the transfer job.
296: *
297: * @return the last transfer job in the chain before the current job
298: * was added, null in case the job is the first in the chain
299: */
300: public String addTransfer(String txJobName) {
301: //hmmm i could alternatively add using the
302: //iterator and move iterator around.
303:
304: //we add the transfer to the chain pointed
305: //by next
306: Object obj = mBundles.get(mNext);
307: TransferChain chain = null;
308: String last = null;
309: if (obj == null) {
310: //on demand add a new chain to the end
311: //is there a scope for gaps??
312: chain = new TransferChain();
313: mBundles.set(mNext, chain);
314: } else {
315: chain = (TransferChain) obj;
316: }
317: //we have the chain to which we want
318: //to add the transfer job. Get the
319: //current last job in the chain before
320: //adding the transfer job to the chain
321: last = chain.getLast();
322: chain.add(txJobName);
323: //update the next pointer to maintain
324: //round robin status
325: mNext = (mNext < (mCapacity - 1)) ? mNext + 1 : 0;
326: return last;
327: }
328:
329: /**
330: * Returns the textual description of the object.
331: *
332: * @return the textual description.
333: */
334: public String toString() {
335: StringBuffer sb = new StringBuffer(32);
336: boolean first = true;
337: sb.append("Site ").append(mSite);
338: int num = 1;
339: for (Iterator it = mBundles.iterator(); it.hasNext(); num++) {
340: sb.append("\n").append(num).append(" :").append(
341: it.next());
342: }
343: return sb.toString();
344: }
345:
346: }
347:
348: /**
349: * A shallow container class, that contains the list of the names of the
350: * transfer jobs and can return the last job in the list.
351: */
352: private class TransferChain {
353:
354: /**
355: * The linked list that maintians the chain of names of the transfer
356: * jobs.
357: */
358: private LinkedList mChain;
359:
360: /**
361: * The default constructor.
362: */
363: public TransferChain() {
364: mChain = new LinkedList();
365: }
366:
367: /**
368: * Adds to the end of the chain. Allows null to be added.
369: *
370: * @param name the name of the transfer job.
371: */
372: public void add(String name) {
373: mChain.addLast(name);
374: }
375:
376: /**
377: * Returns the last element in the chain.
378: *
379: * @return the last element in the chain, null if the chain is empty
380: */
381: public String getLast() {
382: String last = null;
383: try {
384: last = (String) mChain.getLast();
385: } catch (java.util.NoSuchElementException e) {
386:
387: }
388: return last;
389: }
390:
391: /**
392: * Returns the textual description of the object.
393: *
394: * @return the textual description.
395: */
396: public String toString() {
397: StringBuffer sb = new StringBuffer(32);
398: boolean first = true;
399: for (Iterator it = mChain.iterator(); it.hasNext();) {
400: if (first) {
401: first = false;
402: } else {
403: sb.append("->");
404: }
405: sb.append(it.next());
406: }
407: return sb.toString();
408: }
409:
410: }
411:
412: }
|