001: /*
002: * This file or a portion of this file is licensed under the terms of
003: * the Globus Toolkit Public License, found in file GTPL, or at
004: * http://www.globus.org/toolkit/download/license.html. This notice must
005: * appear in redistributions of this file, with or without modification.
006: *
007: * Redistributions of this Software, with or without modification, must
008: * reproduce the GTPL in: (1) the Software, or (2) the Documentation or
009: * some other similar material which is provided with the Software (if
010: * any).
011: *
012: * Copyright 1999-2004 University of Chicago and The University of
013: * Southern California. All rights reserved.
014: */
015:
016: package org.griphyn.cPlanner.transfer.refiner;
017:
018: import org.griphyn.cPlanner.classes.ADag;
019: import org.griphyn.cPlanner.classes.Data;
020: import org.griphyn.cPlanner.classes.FileTransfer;
021: import org.griphyn.cPlanner.classes.NameValue;
022: import org.griphyn.cPlanner.classes.PlannerOptions;
023: import org.griphyn.cPlanner.classes.SubInfo;
024: import org.griphyn.cPlanner.classes.GRMSJob;
025:
026: import org.griphyn.cPlanner.common.LogManager;
027: import org.griphyn.cPlanner.common.PegasusProperties;
028: import org.griphyn.cPlanner.common.Utility;
029:
030: import org.griphyn.cPlanner.transfer.SingleFTPerXFERJobRefiner;
031:
032: import org.griphyn.cPlanner.engine.ReplicaCatalogBridge;
033:
034: import java.io.File;
035:
036: import java.util.Collection;
037: import java.util.Iterator;
038: import java.util.List;
039: import java.util.ArrayList;
040: import java.util.Map;
041: import java.util.HashMap;
042: import java.util.Set;
043: import java.util.StringTokenizer;
044:
045: /**
046: * The refiner that is compatible with the GRMS system. In this the job
047: * specification specifies what input and output file it needs. Hence in this
048: * refiner, unlike the other refiners we do not add any transfer nodes explicitly,
049: * just specify the appropriate urls and let the GRMS system do the magic of
050: * transferring the files to and from the execution pools. It does not perform
051: * any registration of the output files at present, and DOES NOT SUPPORT
052: * RANDOM DIRECTORIES as directories on the remote side are handled by the
053: * GRMS system.
054: * Please Note that this has to be used in conjunction with the GRMSWriter.
055:
056: *
057: * @author Karan Vahi
058: * @version $Revision: 50 $
059: */
060: public class GRMS extends SingleFTPerXFERJobRefiner {
061:
062: /**
063: * A short description of the transfer refinement.
064: */
065: public static final String DESCRIPTION = "GRMS Refiner";
066:
067: /**
068: * The string holding the logging messages
069: */
070: protected String mLogMsg;
071:
072: /**
073: * The overloaded constructor.
074: *
075: * @param dag the workflow to which transfer nodes need to be added.
076: * @param properties the <code>PegasusProperties</code> object containing all
077: * the properties required by Pegasus.
078: * @param options the options passed to the planner.
079: *
080: */
081: public GRMS(ADag dag, PegasusProperties properties,
082: PlannerOptions options) {
083: super (dag, properties, options);
084: //we have to convert all the jobs in the vector to type GRMS
085: Iterator it = dag.vJobSubInfos.iterator();
086: List l = new java.util.ArrayList();
087:
088: while (it.hasNext()) {
089: SubInfo job = (SubInfo) it.next();
090: GRMSJob gjob = new GRMSJob(job);
091: l.add(gjob);
092: it.remove();
093: }
094:
095: it = l.iterator();
096: while (it.hasNext()) {
097: dag.vJobSubInfos.add(it.next());
098: }
099: l = null;
100:
101: }
102:
103: /**
104: * Adds the stage in transfer nodes which transfer the input files for a job,
105: * from the location returned from the replica catalog to the job's execution
106: * pool. It creates a stagein job for each file to be transferred.
107: *
108: * @param job <code>SubInfo</code> object corresponding to the node to
109: * which the files are to be transferred to.
110: * @param files Collection of <code>FileTransfer</code> objects containing the
111: * information about source and destURL's.
112: */
113: public void addStageInXFERNodes(SubInfo job, Collection files) {
114: GRMSJob grmsJob = (GRMSJob) job;
115: String url = null;
116: String sourceURL = null;
117:
118: for (Iterator it = files.iterator(); it.hasNext();) {
119: FileTransfer ft = (FileTransfer) it.next();
120:
121: //insert the extra slash that is requried by GRMS
122: sourceURL = ((NameValue) ft.getSourceURL()).getValue();
123: url = Utility.pruneURLPrefix(sourceURL);
124: url += File.separator
125: + sourceURL.substring(sourceURL.indexOf(url)
126: + url.length());
127: grmsJob.addURL(ft.getLFN(), url, 'i');
128: }
129:
130: }
131:
132: /**
133: * Adds the inter pool transfer nodes that are required for transferring
134: * the output files of the parents to the jobs execution site. They are not
135: * supported in this case.
136: *
137: * @param job <code>SubInfo</code> object corresponding to the node to
138: * which the files are to be transferred to.
139: * @param files Collection of <code>FileTransfer</code> objects containing the
140: * information about source and destURL's.
141: */
142: public void addInterSiteTXNodes(SubInfo job, Collection files) {
143:
144: throw new java.lang.UnsupportedOperationException(
145: "Interpool operation is not supported");
146:
147: }
148:
149: /**
150: * Adds the stageout transfer nodes, that stage data to an output site
151: * specified by the user.
152: *
153: * @param job <code>SubInfo</code> object corresponding to the node to
154: * which the files are to be transferred to.
155: * @param files Collection of <code>FileTransfer</code> objects containing the
156: * information about source and destURL's.
157: * @param rcb bridge to the Replica Catalog. Used for creating registration
158: * nodes in the workflow.
159: *
160: */
161: public void addStageOutXFERNodes(SubInfo job, Collection files,
162: ReplicaCatalogBridge rcb) {
163: this .addStageOutXFERNodes(job, files, rcb, false);
164: }
165:
166: /**
167: * For GRMS we do not need to add any push transfer nodes. Instead we modify
168: * the job description to specify the urls to where the materialized files
169: * need to be pushed to.
170: * It modifies the job input file list to point to urls of the files that are
171: * to be used. The deletedLeaf flag is immaterial for this case.
172: *
173: *
174: * @param job <code>SubInfo</code> object corresponding to the node to
175: * which the files are to be transferred to.
176: * @param files Collection of <code>FileTransfer</code> objects containing the
177: * information about source and destURL's.
178: * @param rcb bridge to the Replica Catalog. Used for creating registration
179: * nodes in the workflow.
180: * @param deletedLeaf to specify whether the node is being added for
181: * a deleted node by the reduction engine or not.
182: * default: false
183: */
184: public void addStageOutXFERNodes(SubInfo job, Collection files,
185: ReplicaCatalogBridge rcb, boolean deletedLeaf) {
186: GRMSJob grmsJob = (GRMSJob) job;
187:
188: for (Iterator it = files.iterator(); it.hasNext();) {
189: FileTransfer ft = (FileTransfer) it.next();
190: grmsJob.addURL(ft.getLFN(), ((NameValue) ft.getDestURL())
191: .getValue(), 'o');
192: }
193:
194: }
195:
196: /**
197: * Signals that the traversal of the workflow is done. This would allow
198: * the transfer mechanisms to clean up any state that they might be keeping
199: * that needs to be explicitly freed.
200: */
201: public void done() {
202:
203: }
204:
205: /**
206: * Add a new job to the workflow being refined.
207: *
208: * @param job the job to be added.
209: */
210: public void addJob(SubInfo job) {
211: mDAG.add(job);
212: }
213:
214: /**
215: * Adds a new relation to the workflow being refiner.
216: *
217: * @param parent the jobname of the parent node of the edge.
218: * @param child the jobname of the child node of the edge.
219: */
220: public void addRelation(String parent, String child) {
221: mLogger.log("Adding relation " + parent + " -> " + child,
222: LogManager.DEBUG_MESSAGE_LEVEL);
223: mDAG.addNewRelation(parent, child);
224:
225: }
226:
227: /**
228: * Adds a new relation to the workflow. In the case when the parent is a
229: * transfer job that is added, the parentNew should be set only the first
230: * time a relation is added. For subsequent compute jobs that maybe
231: * dependant on this, it needs to be set to false.
232: *
233: * @param parent the jobname of the parent node of the edge.
234: * @param child the jobname of the child node of the edge.
235: * @param site the execution pool where the transfer node is to be run.
236: * @param parentNew the parent node being added, is the new transfer job
237: * and is being called for the first time.
238: */
239: public void addRelation(String parent, String child, String site,
240: boolean parentNew) {
241: mLogger.log("Adding relation " + parent + " -> " + child,
242: LogManager.DEBUG_MESSAGE_LEVEL);
243: mDAG.addNewRelation(parent, child);
244:
245: }
246:
247: /**
248: * Returns a textual description of the transfer mode.
249: *
250: * @return a short textual description
251: */
252: public String getDescription() {
253: return this.DESCRIPTION;
254: }
255:
256: }
|