001: /*
002: * This file or a portion of this file is licensed under the terms of
003: * the Globus Toolkit Public License, found in file GTPL, or at
004: * http://www.globus.org/toolkit/download/license.html. This notice must
005: * appear in redistributions of this file, with or without modification.
006: *
007: * Redistributions of this Software, with or without modification, must
008: * reproduce the GTPL in: (1) the Software, or (2) the Documentation or
009: * some other similar material which is provided with the Software (if
010: * any).
011: *
012: * Copyright 1999-2004 University of Chicago and The University of
013: * Southern California. All rights reserved.
014: */
015:
016: package org.griphyn.cPlanner.transfer.refiner;
017:
018: import org.griphyn.cPlanner.classes.ADag;
019: import org.griphyn.cPlanner.classes.FileTransfer;
020: import org.griphyn.cPlanner.classes.NameValue;
021: import org.griphyn.cPlanner.classes.PlannerOptions;
022: import org.griphyn.cPlanner.classes.SubInfo;
023: import org.griphyn.cPlanner.classes.TransferJob;
024:
025: import org.griphyn.cPlanner.common.LogManager;
026: import org.griphyn.cPlanner.common.PegasusProperties;
027:
028: import org.griphyn.cPlanner.transfer.MultipleFTPerXFERJobRefiner;
029:
030: import org.griphyn.cPlanner.namespace.VDS;
031:
032: import org.griphyn.cPlanner.engine.ReplicaCatalogBridge;
033:
034: import java.io.File;
035:
036: import java.util.Collection;
037: import java.util.Iterator;
038: import java.util.Set;
039: import java.util.HashSet;
040:
041: import java.net.URL;
042: import java.net.MalformedURLException;
043: import org.griphyn.cPlanner.poolinfo.PoolInfoProvider;
044: import org.griphyn.cPlanner.poolinfo.SiteFactory;
045: import org.griphyn.cPlanner.poolinfo.SiteFactoryException;
046: import org.griphyn.cPlanner.code.gridstart.GridStartFactory;
047:
048: /**
049: * A refiner that relies on the Condor file transfer mechanism to get the
050: * raw input data to the remote working directory. It is to be used for doing
051: * the file transfers in a condor pool, while trying to run on the local
052: * filesystem of the worker nodes.
053: *
054: * <p>
055: * Additionally, this will only work with local replica selector that prefers
056: * file urls from the submit host for staging.
057: *
058: * <p>
059: * In order to use the transfer implementation implemented by this class,
060: * <pre>
061: * - property <code>pegasus.transfer.refiner</code> must be set to
062: * value <code>Condor</code>.
063: * - property <code>pegasus.selector.replica</code> must be set to value
064: * <code>Local</code>
065: * - property <code>pegasus.execute.*.filesystem.local</code> must be set to value
066: * <code>true</code>
067: * </pre>
068: *
069: *
070: * @author Karan Vahi
071: * @version $Revision: 455 $
072: */
073: public class Condor extends MultipleFTPerXFERJobRefiner {
074:
075: /**
076: * A short description of the transfer refinement.
077: */
078: public static final String DESCRIPTION = "Condor Transfer Refiner";
079:
080: /**
081: * The string holding the logging messages
082: */
083: protected String mLogMsg;
084:
085: /**
086: * The handle to the Site Catalog. It is instantiated in this class.
087: */
088: protected PoolInfoProvider mSCHandle;
089:
090: /**
091: * The overloaded constructor.
092: *
093: * @param dag the workflow to which transfer nodes need to be added.
094: * @param properties the <code>PegasusProperties</code> object containing all
095: * the properties required by Pegasus.
096: * @param options the options passed to the planner.
097: *
098: */
099: public Condor(ADag dag, PegasusProperties properties,
100: PlannerOptions options) {
101: super (dag, properties, options);
102:
103: /* load the catalog using the factory */
104: mSCHandle = SiteFactory.loadInstance(properties, false);
105:
106: }
107:
108: /**
109: * Adds the stage in transfer nodes which transfer the input files for a job,
110: * from the location returned from the replica catalog to the job's execution
111: * pool.
112: *
113: * @param job <code>SubInfo</code> object corresponding to the node to
114: * which the files are to be transferred to.
115: *
116: * @param files Collection of <code>FileTransfer</code> objects containing the
117: * information about source and destURL's.
118: */
119: public void addStageInXFERNodes(SubInfo job, Collection files) {
120:
121: Set inputFiles = job.getInputFiles();
122: for (Iterator it = files.iterator(); it.hasNext();) {
123: FileTransfer ft = (FileTransfer) it.next();
124:
125: String url = ((NameValue) ft.getSourceURL()).getValue();
126:
127: //remove from input files the PegasusFile object
128: //corresponding to this File Transfer and the
129: //FileTransfer object instead
130: boolean removed = inputFiles.remove(ft);
131: //System.out.println( "Removed " + ft.getLFN() + " " + removed );
132: inputFiles.add(ft);
133:
134: //put the url in only if it is a file url
135: if (url.startsWith("file:/")) {
136: try {
137: job.condorVariables.addIPFileForTransfer(new URL(
138: url).getPath());
139: } catch (Exception e) {
140: throw new RuntimeException("Malformed source URL "
141: + url);
142: }
143: } else {
144: throw new RuntimeException(
145: "Malformed source URL. Input URL should be a file url "
146: + url);
147: }
148: }
149:
150: }
151:
152: /**
153: * Adds the inter pool transfer nodes that are required for transferring
154: * the output files of the parents to the jobs execution site. They are not
155: * supported in this case.
156: *
157: * @param job <code>SubInfo</code> object corresponding to the node to
158: * which the files are to be transferred to.
159: * @param files Collection of <code>FileTransfer</code> objects containing the
160: * information about source and destURL's.
161: */
162: public void addInterSiteTXNodes(SubInfo job, Collection files) {
163:
164: throw new java.lang.UnsupportedOperationException(
165: "Interpool operation is not supported");
166:
167: }
168:
169: /**
170: * Adds the stageout transfer nodes, that stage data to an output site
171: * specified by the user.
172: *
173: * @param job <code>SubInfo</code> object corresponding to the node to
174: * which the files are to be transferred to.
175: * @param files Collection of <code>FileTransfer</code> objects containing the
176: * information about source and destURL's.
177: * @param rcb bridge to the Replica Catalog. Used for creating registration
178: * nodes in the workflow.
179: *
180: */
181: public void addStageOutXFERNodes(SubInfo job, Collection files,
182: ReplicaCatalogBridge rcb) {
183:
184: Set outputFiles = job.getOutputFiles();
185: String destinationDirectory = null;
186: for (Iterator it = files.iterator(); it.hasNext();) {
187: FileTransfer ft = (FileTransfer) it.next();
188:
189: String url = ((NameValue) ft.getDestURL()).getValue();
190:
191: //put the url in only if it is a file url
192: if (url.startsWith("file:/")) {
193:
194: try {
195: destinationDirectory = new File(new URL(url)
196: .getPath()).getParent();
197: } catch (MalformedURLException ex) {
198: throw new RuntimeException("Malformed URL", ex);
199: }
200:
201: //strong disconnect here, as assuming worker node execution
202: //and having the SLS to the submit directory
203: //String pfn = "file://" + mPOptions.getSubmitDirectory() + File.separator + ft.getLFN();
204: //ft.removeSourceURL();
205: //ft.addSource( "local", pfn );
206:
207: } else {
208: throw new RuntimeException(
209: "Malformed destination URL. Output URL should be a file url "
210: + url);
211: }
212: }
213:
214: if (!files.isEmpty()) {
215: String txName = this .STAGE_OUT_PREFIX + job.getName()
216: + "_0";
217: SubInfo txJob = this .createStageOutTransferJob(job, files,
218: destinationDirectory, txName);
219:
220: this .mDAG.add(txJob);
221: this .addRelation(job.getName(), txName);
222: }
223:
224: }
225:
226: /**
227: * Constructs a condor file transfer job that handles multiple transfers.
228: * The job itself is a /bin/true job that does the stageout using the
229: * transfer_input_files feature.
230: *
231: * @param job the SubInfo object for the job, in relation to which
232: * the transfer node is being added. Either the transfer
233: * node can be transferring this jobs input files to
234: * the execution pool, or transferring this job's output
235: * files to the output pool.
236: * @param files collection of <code>FileTransfer</code> objects
237: * representing the data files and staged executables to be
238: * transferred.
239: * @param directory the directory where the transfer job needs to be executed
240: * @param txJobName the name of transfer node.
241: *
242: * @return the created TransferJob.
243: */
244: private TransferJob createStageOutTransferJob(SubInfo job,
245: Collection files, String directory, String txJobName) {
246:
247: TransferJob txJob = new TransferJob();
248:
249: //want to run in the local pool in universe vanilla
250: txJob.setSiteHandle("local");
251: txJob.condorVariables.construct("universe", "vanilla");
252:
253: //the non third party site for the transfer job is
254: //always the job execution site for which the transfer
255: //job is being created.
256: txJob.setNonThirdPartySite(job.getSiteHandle());
257:
258: txJob.setName(txJobName);
259:
260: txJob.setTransformation("pegasus", "true", null);
261:
262: txJob.setDerivation("pegasus", "true", null);
263:
264: txJob.setRemoteExecutable("/bin/true");
265:
266: //we dont want the job to be launced via grid start
267: txJob.vdsNS
268: .construct(
269: VDS.GRIDSTART_KEY,
270: GridStartFactory.GRIDSTART_SHORT_NAMES[GridStartFactory.NO_GRIDSTART_INDEX]);
271:
272: //add input files for transfer since we are only doing for
273: //creating stagein jobs
274: for (Iterator it = files.iterator(); it.hasNext();) {
275: FileTransfer ft = (FileTransfer) it.next();
276: NameValue nv = ft.getSourceURL();
277:
278: //put the url in only if it is a file url
279: String url = nv.getValue();
280: if (url.startsWith("file:/")) {
281: try {
282: txJob.condorVariables.addIPFileForTransfer(new URL(
283: url).getPath());
284:
285: //add the basename of the file fot t_o_f as mei suggests
286: txJob.condorVariables.addOPFileForTransfer(ft
287: .getLFN());
288: } catch (Exception e) {
289: throw new RuntimeException("Malformed source URL "
290: + url);
291: }
292: }
293:
294: }
295:
296: //the intial directory is set to the directory where we need the output
297: txJob.condorVariables.construct("initialdir", directory);
298:
299: txJob.setJobType(SubInfo.STAGE_OUT_JOB);
300: txJob.setVDSSuperNode(job.jobName);
301:
302: txJob.stdErr = "";
303: txJob.stdOut = "";
304:
305: //the i/p and o/p files remain empty
306: //as we doing just copying urls
307: txJob.inputFiles = new HashSet();
308:
309: //to get the file stat information we need to put
310: //the files as output files of the transfer job
311: txJob.outputFiles = new HashSet(files);
312:
313: //the profile information from the pool catalog needs to be
314: //assimilated into the job.
315: txJob.updateProfiles(mSCHandle.getPoolProfile(txJob
316: .getSiteHandle()));
317:
318: //the profile information from the properties file
319: //is assimilated overidding the one from transformation
320: //catalog.
321: txJob.updateProfiles(mProps);
322:
323: return txJob;
324: }
325:
326: /**
327: *
328: *
329: *
330: * @param job <code>SubInfo</code> object corresponding to the node to
331: * which the files are to be transferred to.
332: * @param files Collection of <code>FileTransfer</code> objects containing the
333: * information about source and destURL's.
334: * @param rcb bridge to the Replica Catalog. Used for creating registration
335: * nodes in the workflow.
336: * @param deletedLeaf to specify whether the node is being added for
337: * a deleted node by the reduction engine or not.
338: * default: false
339: */
340: public void addStageOutXFERNodes(SubInfo job, Collection files,
341: ReplicaCatalogBridge rcb, boolean deletedLeaf) {
342:
343: throw new java.lang.UnsupportedOperationException(
344: "Stageout operation is not supported for "
345: + this .getDescription());
346:
347: }
348:
349: /**
350: * Signals that the traversal of the workflow is done. This would allow
351: * the transfer mechanisms to clean up any state that they might be keeping
352: * that needs to be explicitly freed.
353: */
354: public void done() {
355:
356: }
357:
358: /**
359: * Add a new job to the workflow being refined.
360: *
361: * @param job the job to be added.
362: */
363: public void addJob(SubInfo job) {
364: mDAG.add(job);
365: }
366:
367: /**
368: * Adds a new relation to the workflow being refiner.
369: *
370: * @param parent the jobname of the parent node of the edge.
371: * @param child the jobname of the child node of the edge.
372: */
373: public void addRelation(String parent, String child) {
374: mLogger.log("Adding relation " + parent + " -> " + child,
375: LogManager.DEBUG_MESSAGE_LEVEL);
376: mDAG.addNewRelation(parent, child);
377:
378: }
379:
380: /**
381: * Adds a new relation to the workflow. In the case when the parent is a
382: * transfer job that is added, the parentNew should be set only the first
383: * time a relation is added. For subsequent compute jobs that maybe
384: * dependant on this, it needs to be set to false.
385: *
386: * @param parent the jobname of the parent node of the edge.
387: * @param child the jobname of the child node of the edge.
388: * @param site the execution pool where the transfer node is to be run.
389: * @param parentNew the parent node being added, is the new transfer job
390: * and is being called for the first time.
391: */
392: public void addRelation(String parent, String child, String site,
393: boolean parentNew) {
394: mLogger.log("Adding relation " + parent + " -> " + child,
395: LogManager.DEBUG_MESSAGE_LEVEL);
396: mDAG.addNewRelation(parent, child);
397:
398: }
399:
400: /**
401: * Returns a textual description of the transfer mode.
402: *
403: * @return a short textual description
404: */
405: public String getDescription() {
406: return this.DESCRIPTION;
407: }
408:
409: }
|