001: /*
002: * This file or a portion of this file is licensed under the terms of
003: * the Globus Toolkit Public License, found in file GTPL, or at
004: * http://www.globus.org/toolkit/download/license.html. This notice must
005: * appear in redistributions of this file, with or without modification.
006: *
007: * Redistributions of this Software, with or without modification, must
008: * reproduce the GTPL in: (1) the Software, or (2) the Documentation or
009: * some other similar material which is provided with the Software (if
010: * any).
011: *
012: * Copyright 1999-2004 University of Chicago and The University of
013: * Southern California. All rights reserved.
014: */
015:
016: package org.griphyn.cPlanner.transfer.refiner;
017:
018: import org.griphyn.cPlanner.classes.ADag;
019: import org.griphyn.cPlanner.classes.FileTransfer;
020: import org.griphyn.cPlanner.classes.PlannerOptions;
021: import org.griphyn.cPlanner.classes.SubInfo;
022:
023: import org.griphyn.cPlanner.common.LogManager;
024: import org.griphyn.cPlanner.common.PegasusProperties;
025:
026: import org.griphyn.cPlanner.engine.ReplicaCatalogBridge;
027:
028: import org.griphyn.cPlanner.transfer.SingleFTPerXFERJobRefiner;
029:
030: import java.util.Collection;
031: import java.util.Iterator;
032: import java.util.List;
033: import java.util.ArrayList;
034: import java.util.Map;
035: import java.util.TreeMap;
036: import java.util.Set;
037: import java.util.StringTokenizer;
038:
039: /**
040: * The default single refiner, that always creates a transfer job per file
041: * transfer that is required. If a compute job requires 3 files, it will
042: * create 3 independant stagein jobs for that particular file.
043: *
044: *
045: * @author Karan Vahi
046: * @version $Revision: 50 $
047: */
048: public class SDefault extends SingleFTPerXFERJobRefiner {
049:
050: /**
051: * A short description of the transfer refinement.
052: */
053: public static final String DESCRIPTION = "Default Single Refinement ";
054:
055: /**
056: * The string holding the logging messages
057: */
058: protected String mLogMsg;
059:
060: /**
061: * A Map containing information about which logical file has been
062: * transferred to which site and the name of the stagein transfer node
063: * that is transferring the file from the location returned from
064: * the replica catalog.
065: * The key for the hashmap is logicalfilename:sitehandle and the value would be
066: * the name of the transfer node.
067: *
068: */
069: protected Map mFileTable;
070:
071: /**
072: * The overloaded constructor.
073: *
074: * @param dag the workflow to which transfer nodes need to be added.
075: * @param properties the <code>PegasusProperties</code> object containing all
076: * the properties required by Pegasus.
077: * @param options the options passed to the planner.
078: *
079: */
080: public SDefault(ADag dag, PegasusProperties properties,
081: PlannerOptions options) {
082: super (dag, properties, options);
083: mFileTable = new TreeMap();
084: }
085:
086: /**
087: * Adds the stage in transfer nodes which transfer the input files for a job,
088: * from the location returned from the replica catalog to the job's execution
089: * pool. It creates a stagein job for each file to be transferred.
090: *
091: * @param job <code>SubInfo</code> object corresponding to the node to
092: * which the files are to be transferred to.
093: * @param files Collection of <code>FileTransfer</code> objects containing the
094: * information about source and destURL's.
095: */
096: public void addStageInXFERNodes(SubInfo job, Collection files) {
097:
098: String jobName = job.getName();
099: String newJob;
100: String pool = job.getSiteHandle();
101: int counter = 0;
102: String key = null;
103: String msg = "Adding stage in transfer nodes for job "
104: + jobName;
105: String par = null;
106:
107: mLogger.log(msg, LogManager.DEBUG_MESSAGE_LEVEL);
108: for (Iterator it = files.iterator(); it.hasNext();) {
109: Collection stagedFiles = null;
110: FileTransfer ft = (FileTransfer) it.next();
111: String lfn = ft.getLFN();
112: newJob = this .STAGE_IN_PREFIX + jobName + "_" + counter;
113:
114: //get the key for this lfn and pool
115: //if the key already in the table
116: //then remove the entry from
117: //the Vector and add a dependency
118: //in the graph
119: key = this .constructFileKey(lfn, pool);
120: par = (String) mFileTable.get(key);
121: if (par != null) {
122: it.remove();
123: /*mLogMsg = "Adding relation " + par + " -> " + jobName +
124: " for transferring file " + lfn;
125: mLogger.log(mLogMsg,LogManager.DEBUG_MESSAGE_LEVEL);*/
126: addRelation(par, jobName, pool, false);
127: } else {
128: if (ft.isTransferringExecutableFile()) {
129: //add to staged files for adding of
130: //set up job.
131: stagedFiles = new ArrayList(1);
132: stagedFiles.add(ft);
133: //the staged execution file should be having the setup
134: //job as parent if it does not preserve x bit
135: if (mTXStageInImplementation.doesPreserveXBit()) {
136: mFileTable.put(key, newJob);
137: } else {
138: mFileTable.put(key, mTXStageInImplementation
139: .getSetXBitJobName(jobName, 0));
140: }
141: } else {
142: //make a new entry into the table
143: mFileTable.put(key, newJob);
144: }
145:
146: //construct the stagein transfer node
147: msg = "Adding new stagein transfer node named "
148: + newJob;
149: mLogger.log(msg, LogManager.DEBUG_MESSAGE_LEVEL);
150: counter++;
151:
152: //add a direct dependency between compute job
153: //and stagein job only if there is no
154: //executables being staged
155: //call to add the job
156: List file = new ArrayList(1);
157: file.add(ft);
158: if (stagedFiles == null) {
159: //add the direct relation
160: addRelation(newJob, jobName, pool, true);
161: addJob(mTXStageInImplementation.createTransferJob(
162: job, file, null, newJob,
163: SubInfo.STAGE_IN_JOB));
164: } else {
165: //the dependency to stage in job is added via the
166: //the setup job that does the chmod
167: addJob(mTXStageInImplementation.createTransferJob(
168: job, file, stagedFiles, newJob,
169: SubInfo.STAGE_IN_JOB));
170: }
171:
172: }
173: }
174: }
175:
176: /**
177: * Adds the inter pool transfer nodes that are required for transferring
178: * the output files of the parents to the jobs execution site.
179: *
180: * @param job <code>SubInfo</code> object corresponding to the node to
181: * which the files are to be transferred to.
182: * @param files Collection of <code>FileTransfer</code> objects containing the
183: * information about source and destURL's.
184: */
185: public void addInterSiteTXNodes(SubInfo job, Collection files) {
186:
187: String jobName = job.getName();
188: String newJob;
189: int counter = 0;
190: String msg = "Adding inter pool transfer nodes for job "
191: + job.getName();
192:
193: if (!files.isEmpty()) {
194: mLogger.log(msg, LogManager.DEBUG_MESSAGE_LEVEL);
195:
196: }
197: for (Iterator it = files.iterator(); it.hasNext();) {
198: FileTransfer ft = (FileTransfer) it.next();
199: newJob = this .INTER_POOL_PREFIX + jobName + "_" + counter;
200: msg = "Adding new inter pool node named " + newJob;
201: mLogger.log(msg, LogManager.DEBUG_MESSAGE_LEVEL);
202: counter++;
203: //added in make transfer node
204: //mDag.addNewJob(newJob);
205: addRelation(ft.getJobName(), newJob);
206: addRelation(newJob, jobName);
207:
208: //call to make the subinfo
209: List file = new ArrayList(1);
210: file.add(ft);
211: addJob(mTXInterImplementation.createTransferJob(job, file,
212: null, newJob, SubInfo.INTER_POOL_JOB));
213: }
214:
215: }
216:
217: /**
218: * Adds the stageout transfer nodes, that stage data to an output site
219: * specified by the user.
220: *
221: * @param job <code>SubInfo</code> object corresponding to the node to
222: * which the files are to be transferred to.
223: * @param files Collection of <code>FileTransfer</code> objects containing the
224: * information about source and destURL's.
225: * @param rcb bridge to the Replica Catalog. Used for creating registration
226: * nodes in the workflow.
227: *
228: */
229: public void addStageOutXFERNodes(SubInfo job, Collection files,
230: ReplicaCatalogBridge rcb) {
231: this .addStageOutXFERNodes(job, files, rcb, false);
232: }
233:
234: /**
235: * Adds the stageout transfer nodes, that stage data to an output site
236: * specified by the user.
237: *
238: * @param job <code>SubInfo</code> object corresponding to the node to
239: * which the files are to be transferred to.
240: * @param files Collection of <code>FileTransfer</code> objects containing the
241: * information about source and destURL's.
242: * @param rcb bridge to the Replica Catalog. Used for creating registration
243: * nodes in the workflow.
244: * @param deletedLeaf to specify whether the node is being added for
245: * a deleted node by the reduction engine or not.
246: * default: false
247: */
248: public void addStageOutXFERNodes(SubInfo job, Collection files,
249: ReplicaCatalogBridge rcb, boolean deletedLeaf) {
250:
251: String jobName = job.getName();
252: String newJob;
253: String regJob = this .REGISTER_PREFIX + jobName;
254: int counter = 1;
255:
256: String mLogMsg = "Adding stagout transfer nodes for job "
257: + jobName;
258:
259: if (!files.isEmpty()) {
260: mLogger.log(mLogMsg, LogManager.DEBUG_MESSAGE_LEVEL);
261:
262: }
263: List regFiles = new ArrayList();
264: for (Iterator it = files.iterator(); it.hasNext();) {
265: FileTransfer ft = (FileTransfer) it.next();
266: newJob = this .STAGE_OUT_PREFIX + jobName + "_" + counter;
267: counter++;
268: mLogMsg = "Adding new stageout transfer node named "
269: + newJob;
270: mLogger.log(mLogMsg, LogManager.DEBUG_MESSAGE_LEVEL);
271:
272: //call to make the subinfo
273: if (!ft.getTransientTransferFlag()) {
274: //not to add relation if deleted leaf
275: if (!deletedLeaf) {
276: addRelation(jobName, newJob);
277: }
278: if (!ft.getTransientRegFlag()) {
279: addRelation(newJob, regJob);
280: regFiles.add(ft);
281: }
282:
283: List file = new ArrayList(1);
284: file.add(ft);
285: addJob(mTXStageOutImplementation.createTransferJob(job,
286: file, null, newJob, SubInfo.STAGE_OUT_JOB));
287: }
288: //no transfer node but a registration node
289: else if (!ft.getTransientRegFlag()) {
290: addRelation(jobName, regJob);
291: regFiles.add(ft);
292: }
293:
294: }
295:
296: //create the registration job if required
297: if (!regFiles.isEmpty()) {
298: addJob(createRegistrationJob(regJob, job, regFiles, rcb));
299: }
300:
301: }
302:
303: /**
304: * Creates the registration jobs, which registers the materialized files on
305: * the output site in the Replica Catalog.
306: *
307: * @param regJobName The name of the job which registers the files in the
308: * Replica Mechanism.
309: * @param job The job whose output files are to be registered in the
310: * Replica Mechanism.
311: * @param files Collection of <code>FileTransfer</code> objects containing
312: * the information about source and destURL's.
313: * @param rcb bridge to the Replica Catalog. Used for creating registration
314: * nodes in the workflow.
315: *
316: *
317: * @return the registration job.
318: */
319: protected SubInfo createRegistrationJob(String regJobName,
320: SubInfo job, Collection files, ReplicaCatalogBridge rcb) {
321:
322: return rcb.makeRCRegNode(regJobName, job, files);
323: }
324:
325: /**
326: * Signals that the traversal of the workflow is done. This would allow
327: * the transfer mechanisms to clean up any state that they might be keeping
328: * that needs to be explicitly freed.
329: */
330: public void done() {
331:
332: }
333:
334: /**
335: * Add a new job to the workflow being refined.
336: *
337: * @param job the job to be added.
338: */
339: public void addJob(SubInfo job) {
340: mDAG.add(job);
341: }
342:
343: /**
344: * Adds a new relation to the workflow being refiner.
345: *
346: * @param parent the jobname of the parent node of the edge.
347: * @param child the jobname of the child node of the edge.
348: */
349: public void addRelation(String parent, String child) {
350: mLogger.log("Adding relation " + parent + " -> " + child,
351: LogManager.DEBUG_MESSAGE_LEVEL);
352: mDAG.addNewRelation(parent, child);
353:
354: }
355:
356: /**
357: * Adds a new relation to the workflow. In the case when the parent is a
358: * transfer job that is added, the parentNew should be set only the first
359: * time a relation is added. For subsequent compute jobs that maybe
360: * dependant on this, it needs to be set to false.
361: *
362: * @param parent the jobname of the parent node of the edge.
363: * @param child the jobname of the child node of the edge.
364: * @param site the execution pool where the transfer node is to be run.
365: * @param parentNew the parent node being added, is the new transfer job
366: * and is being called for the first time.
367: */
368: public void addRelation(String parent, String child, String site,
369: boolean parentNew) {
370: mLogger.log("Adding relation " + parent + " -> " + child,
371: LogManager.DEBUG_MESSAGE_LEVEL);
372: mDAG.addNewRelation(parent, child);
373:
374: }
375:
376: /**
377: * Returns a textual description of the transfer mode.
378: *
379: * @return a short textual description
380: */
381: public String getDescription() {
382: return this .DESCRIPTION;
383: }
384:
385: /**
386: * Constructs the key for an entry to the file table. The key returned
387: * is lfn:siteHandle
388: *
389: * @param lfn the logical filename of the file that has to be
390: * transferred.
391: * @param siteHandle the name of the site to which the file is being
392: * transferred.
393: *
394: * @return the key for the entry to be made in the filetable.
395: */
396: protected String constructFileKey(String lfn, String siteHandle) {
397: StringBuffer sb = new StringBuffer();
398: sb.append(lfn).append(":").append(siteHandle);
399:
400: return sb.toString();
401: }
402:
403: }
|