0001: /*
0002: * This file or a portion of this file is licensed under the terms of
0003: * the Globus Toolkit Public License, found in file GTPL, or at
0004: * http://www.globus.org/toolkit/download/license.html. This notice must
0005: * appear in redistributions of this file, with or without modification.
0006: *
0007: * Redistributions of this Software, with or without modification, must
0008: * reproduce the GTPL in: (1) the Software, or (2) the Documentation or
0009: * some other similar material which is provided with the Software (if
0010: * any).
0011: *
0012: * Copyright 1999-2004 University of Chicago and The University of
0013: * Southern California. All rights reserved.
0014: */
0015: package org.griphyn.cPlanner.engine;
0016:
0017: import org.griphyn.cPlanner.classes.ADag;
0018: import org.griphyn.cPlanner.classes.FileTransfer;
0019: import org.griphyn.cPlanner.classes.GridFTPServer;
0020: import org.griphyn.cPlanner.classes.NameValue;
0021: import org.griphyn.cPlanner.classes.PegasusFile;
0022: import org.griphyn.cPlanner.classes.PlannerOptions;
0023: import org.griphyn.cPlanner.classes.ReplicaLocation;
0024: import org.griphyn.cPlanner.classes.SiteInfo;
0025: import org.griphyn.cPlanner.classes.SubInfo;
0026:
0027: import org.griphyn.cPlanner.common.LogManager;
0028: import org.griphyn.cPlanner.common.Utility;
0029: import org.griphyn.cPlanner.common.PegasusProperties;
0030:
0031: import org.griphyn.cPlanner.namespace.VDS;
0032:
0033: import org.griphyn.cPlanner.partitioner.graph.GraphNode;
0034: import org.griphyn.cPlanner.partitioner.graph.Graph;
0035: import org.griphyn.cPlanner.partitioner.graph.Adapter;
0036:
0037: import org.griphyn.cPlanner.selector.ReplicaSelector;
0038: import org.griphyn.cPlanner.selector.replica.ReplicaSelectorFactory;
0039:
0040: import org.griphyn.cPlanner.transfer.Refiner;
0041: import org.griphyn.cPlanner.transfer.refiner.RefinerFactory;
0042:
0043: import org.griphyn.common.catalog.ReplicaCatalog;
0044: import org.griphyn.common.catalog.ReplicaCatalogEntry;
0045:
0046: import org.griphyn.common.catalog.replica.ReplicaFactory;
0047:
0048: import org.griphyn.common.catalog.transformation.TCMode;
0049:
0050: import org.griphyn.common.util.FactoryException;
0051:
0052: import org.griphyn.vdl.euryale.FileFactory;
0053: import org.griphyn.vdl.euryale.VirtualDecimalHashedFileFactory;
0054: import org.griphyn.vdl.euryale.VirtualFlatFileFactory;
0055:
0056: import java.io.File;
0057:
0058: import java.util.Enumeration;
0059: import java.util.HashSet;
0060: import java.util.Iterator;
0061: import java.util.List;
0062: import java.util.Set;
0063: import java.util.Vector;
0064: import java.util.Properties;
0065: import java.io.IOException;
0066:
0067: /**
0068: * The transfer engine, which on the basis of the pools on which the jobs are to
0069: * run, adds nodes to transfer the data products.
0070: *
0071: * @author Karan Vahi
0072: * @author Gaurang Mehta
0073: * @version $Revision: 290 $
0074: *
0075: */
0076: public class TransferEngine extends Engine {
0077:
0078: /**
0079: * The name of the source key for Replica Catalog Implementer that serves as
0080: * cache
0081: */
0082: public static final String TRANSIENT_REPLICA_CATALOG_KEY = "file";
0083:
0084: /**
0085: * The name of the Replica Catalog Implementer that serves as the source for
0086: * cache files.
0087: */
0088: public static final String TRANSIENT_REPLICA_CATALOG_IMPLEMENTER = "SimpleFile";
0089:
0090: /**
0091: * The classname of the class that stores the file transfer information for
0092: * a transfer object.
0093: *
0094: * @see org.griphyn.cPlanner.classes.FileTransfer
0095: */
0096: private static final String FILE_TX_CLASS_NAME = "org.griphyn.cPlanner.classes.FileTransfer";
0097:
0098: /**
0099: * The DAG object to which the transfer nodes are to be added. This is the
0100: * reduced Dag, which is got from the Reduction Engine.
0101: */
0102: private ADag mDag;
0103:
0104: /**
0105: * The original Dag object constructed after running the DaxParser. This is
0106: * required to transfer files to the output pool for any leaf jobs that may
0107: * have been deleted.
0108: */
0109: // private ADag mOriginalDag;
0110:
0111: /**
0112: * The bridge to the Replica Catalog.
0113: */
0114: private ReplicaCatalogBridge mRCBridge;
0115:
0116: /**
0117: * The handle to the replica selector that is to used to select the various
0118: * replicas.
0119: */
0120: private ReplicaSelector mReplicaSelector;
0121:
0122: /**
0123: * The handle to the transfer refiner that adds the transfer nodes into the
0124: * workflow.
0125: */
0126: private Refiner mTXRefiner;
0127:
0128: /**
0129: * Holds the jobs from the original dags which are deleted by the reduction
0130: * algorithm.
0131: */
0132: private Vector mvDelLeafJobs;
0133:
0134: /**
0135: * A SimpleFile Replica Catalog, that tracks all the files that are being
0136: * materialized as part of workflow executaion.
0137: */
0138: private ReplicaCatalog mTransientRC;
0139:
0140: /**
0141: * The handle to the file factory, that is used to create the top level
0142: * directories for each of the partitions.
0143: */
0144: private FileFactory mFactory;
0145:
0146: /**
0147: * The base path for the stageout directory on the output site where all
0148: * the files are staged out.
0149: */
0150: private String mStageOutBaseDirectory;
0151:
0152: /**
0153: * Overloaded constructor.
0154: *
0155: * @param reducedDag the reduced workflow.
0156: * @param vDelLJobs list of deleted jobs.
0157: * @param properties the <code>PegasusProperties</code> to be used.
0158: * @param options The options specified by the user to run the planner.
0159: */
0160: public TransferEngine(ADag reducedDag, Vector vDelLJobs,
0161: PegasusProperties properties, PlannerOptions options) {
0162: //call the super class constructor for initializations
0163: super (properties);
0164:
0165: mDag = reducedDag;
0166: mvDelLeafJobs = vDelLJobs;
0167: mPOptions = options;
0168: mTCHandle = TCMode.loadInstance();
0169:
0170: try {
0171: mTXRefiner = RefinerFactory.loadInstance(mProps,
0172: reducedDag, options);
0173: mReplicaSelector = ReplicaSelectorFactory
0174: .loadInstance(mProps);
0175: } catch (Exception e) {
0176: //wrap all the exceptions into a factory exception
0177: throw new FactoryException("Transfer Engine ", e);
0178: }
0179:
0180: this .initializeStageOutSiteDirectoryFactory(reducedDag);
0181:
0182: //log some configuration messages
0183: mLogger.log("Transfer Refiner loaded is ["
0184: + mTXRefiner.getDescription() + "]",
0185: LogManager.CONFIG_MESSAGE_LEVEL);
0186: mLogger.log("ReplicaSelector loaded is ["
0187: + mReplicaSelector.description() + "]",
0188: LogManager.CONFIG_MESSAGE_LEVEL);
0189: }
0190:
0191: /**
0192: * Returns whether a site is third party enabled or not. A site is determined
0193: * to be third party enabled if the transfer mode is a thirdparty based
0194: * transfer mode like RFT or if it is specified a third party site in the
0195: * by the property "vds.transfer.thirdparty.sites".
0196: *
0197: * @param site the site handle of the site for which you want to detect
0198: * third party capability.
0199: * @param type the type of transfer job for which the URL is being constructed.
0200: *
0201: * @return true indicating site is third party enabled, else
0202: * false indicating site is not third party enabled.
0203: */
0204: public boolean isSiteThirdParty(String site, int type) {
0205:
0206: return mTXRefiner.isSiteThirdParty(site, type);
0207: }
0208:
0209: /**
0210: * Returns the SubInfo object for the job specified.
0211: *
0212: * @param jobName the name of the job
0213: *
0214: * @return the SubInfo object for a job.
0215: */
0216: private SubInfo getSubInfo(String jobName) {
0217: return mDag.getSubInfo(jobName);
0218: }
0219:
0220: /**
0221: * Adds the transfer nodes to the workflow.
0222: *
0223: * @param rcb the bridge to the ReplicaCatalog.
0224: */
0225: public void addTransferNodes(ReplicaCatalogBridge rcb) {
0226: mRCBridge = rcb;
0227:
0228: SubInfo currentJob;
0229: String currentJobName;
0230: Vector vOutPoolTX;
0231: // int noOfJobs = mDag.getNoOfJobs();
0232: // int counter = 0;
0233: String msg;
0234: String outputSite = mPOptions.getOutputSite();
0235:
0236: //convert the dag to a graph representation and walk it
0237: //in a top down manner
0238: Graph workflow = Adapter.convert(mDag);
0239:
0240: //go through each job in turn
0241: // Enumeration eSubs = mDagSubInfos.elements();
0242: // while (eSubs.hasMoreElements() && counter < noOfJobs) {
0243: // counter++;
0244: // currentJob = (SubInfo) eSubs.nextElement();
0245:
0246: boolean stageOut = ((outputSite != null) && (outputSite.trim()
0247: .length() > 0));
0248: for (Iterator it = workflow.iterator(); it.hasNext();) {
0249: GraphNode node = (GraphNode) it.next();
0250: currentJob = (SubInfo) node.getContent();
0251: //set the node depth as the level
0252: currentJob.setLevel(node.getDepth());
0253: currentJobName = currentJob.getName();
0254:
0255: mLogger.log("", LogManager.DEBUG_MESSAGE_LEVEL);
0256: msg = "Job being traversed is " + currentJobName;
0257: mLogger.log(msg, LogManager.DEBUG_MESSAGE_LEVEL);
0258: msg = "To be run at " + currentJob.executionPool;
0259: mLogger.log(msg, LogManager.DEBUG_MESSAGE_LEVEL);
0260:
0261: //getting the parents of that node
0262: Vector vParents = mDag.getParents(currentJobName);
0263: mLogger.log(vectorToString("Parents of job:", vParents),
0264: LogManager.DEBUG_MESSAGE_LEVEL);
0265: processParents(currentJob, vParents);
0266:
0267: //transfer the nodes output files
0268: //to the output pool
0269: if (stageOut /*&& !currentJob.isTemp*/) {
0270: vOutPoolTX = getFileTX(outputSite, currentJob);
0271: mTXRefiner.addStageOutXFERNodes(currentJob, vOutPoolTX,
0272: rcb);
0273: }
0274: }
0275:
0276: //we are done with the traversal.
0277: mTXRefiner.done();
0278:
0279: //get the deleted leaf jobs o/p files to output pool
0280: //only if output pool is specified
0281: //should be moved upwards in the pool. redundancy at present
0282: if (outputSite != null && outputSite.trim().length() > 0) {
0283:
0284: for (Enumeration e = this .mvDelLeafJobs.elements(); e
0285: .hasMoreElements();) {
0286: currentJob = (SubInfo) e.nextElement();
0287:
0288: //for a deleted node, to transfer it's output
0289: //the execution pool should be set to local i.e submit host
0290: currentJob.executionPool = "local";
0291:
0292: vOutPoolTX = getDeletedFileTX(outputSite, currentJob);
0293: mTXRefiner.addStageOutXFERNodes(currentJob, vOutPoolTX,
0294: rcb, true);
0295: }
0296: }
0297:
0298: //close the handle to the cache file if it is written
0299: closeTransientRC();
0300: }
0301:
0302: /**
0303: * This gets the file transfer objects corresponding to the location of files
0304: * found in the replica mechanism, and transfers it to the output pool asked
0305: * by the user. If the output pool path and the one returned by the replica
0306: * mechanism match then that object is not transferred.
0307: *
0308: * @param pool this the output pool which the user specifies at runtime.
0309: * @param job The SubInfo object corresponding to the leaf job which was
0310: * deleted by the Reduction algorithm
0311: *
0312: * @return Vector of <code>FileTransfer</code> objects
0313: */
0314: private Vector getDeletedFileTX(String pool, SubInfo job) {
0315: Vector vFileTX = new Vector();
0316: SiteInfo p = mPoolHandle.getPoolEntry(pool, "vanilla");
0317:
0318: for (Iterator it = job.getOutputFiles().iterator(); it
0319: .hasNext();) {
0320: PegasusFile pf = (PegasusFile) it.next();
0321: String lfn = pf.getLFN();
0322:
0323: ReplicaLocation rl = mRCBridge.getFileLocs(lfn);
0324:
0325: //selLocs are all the locations found in ReplicaMechanism corr
0326: //to the pool pool
0327: ReplicaLocation selLocs = mReplicaSelector.selectReplicas(
0328: rl, pool);
0329:
0330: boolean flag = false;
0331:
0332: FileTransfer ft = null;
0333: //checking through all the pfn's returned on the pool
0334: for (Iterator selIt = selLocs.pfnIterator(); selIt
0335: .hasNext();) {
0336: ReplicaCatalogEntry selLoc = (ReplicaCatalogEntry) selIt
0337: .next();
0338: String sourceURL = selLoc.getPFN();
0339: //definite inconsitency as url prefix and mount point
0340: //are not picked up from the same server
0341: String destURL = p.getURLPrefix(true)
0342: + this .getPathOnStageoutSite(lfn);
0343: //+ File.separator + lfn;
0344:
0345: //check if the URL's match
0346: if (sourceURL.trim().equalsIgnoreCase(destURL.trim())) {
0347: String msg = "The leaf file " + lfn
0348: + " is already at the output pool " + pool;
0349: mLogger.log(msg, LogManager.INFO_MESSAGE_LEVEL);
0350: flag = true;
0351: break;
0352: }
0353:
0354: ft = new FileTransfer(lfn, job.getName());
0355: ft.addSource(selLoc.getResourceHandle(), sourceURL);
0356: ft.addDestination(pool, destURL);
0357:
0358: //System.out.println("Deleted Leaf Job File transfer object " + ft);
0359:
0360: }
0361: if (!flag) { // adding the last pfn
0362: vFileTX.addElement(ft);
0363: }
0364: }
0365: return vFileTX;
0366: }
0367:
0368: /**
0369: * It processes a nodes parents and determines if nodes are to be added
0370: * or not. All the input files for the job are searched in the output files of
0371: * the parent nodes and the Replica Mechanism.
0372: *
0373: * @param job the <code>SubInfo</code> object containing all the
0374: * details of the job.
0375: * @param vParents Vector of String objects corresponding to the Parents
0376: * of the node.
0377: */
0378: private void processParents(SubInfo job, Vector vParents) {
0379:
0380: Set nodeIpFiles = job.getInputFiles();
0381: Vector vRCSearchFiles = new Vector(); //vector of PegasusFile
0382: Vector vIPTxFiles = new Vector();
0383: Vector vParentSubs = new Vector();
0384:
0385: //getAll the output Files of the parents
0386: Set parentsOutFiles = getOutputFiles(vParents, vParentSubs);
0387:
0388: //interpool transfer of the nodes parents
0389: //output files
0390: Vector vInterPoolFileTX = this .getInterpoolFileTX(job,
0391: vParentSubs);
0392: //only add if there are files to transfer
0393: if (!vInterPoolFileTX.isEmpty()) {
0394: mTXRefiner.addInterSiteTXNodes(job, vInterPoolFileTX);
0395: }
0396:
0397: //check if node ip files are in the parents out files
0398: //if files are not, then these are to be got
0399: //from the RC based on the transiency characteristic
0400: for (Iterator it = nodeIpFiles.iterator(); it.hasNext();) {
0401: PegasusFile pf = (PegasusFile) it.next();
0402: if (!parentsOutFiles.contains(pf)) {
0403: if (!pf.getTransientTransferFlag()) {
0404: vRCSearchFiles.addElement(pf);
0405: }
0406: }
0407: }
0408:
0409: if (!vRCSearchFiles.isEmpty()) {
0410: //get the locations from the RC
0411: getFilesFromRC(job, vRCSearchFiles);
0412: }
0413: }
0414:
0415: /**
0416: * This gets the Vector of FileTransfer objects for the files which have to
0417: * be transferred to an one destination pool. It checks for the transient
0418: * flags for files. If the transfer transient flag is set, it means the file
0419: * does not have to be transferred to the destination pool.
0420: *
0421: * @param destPool The pool to which the files are to be transferred to.
0422: * @param job The <code>SubInfo</code>object of the job whose output files
0423: * are needed at the destination pool.
0424: *
0425: * @return Vector of <code>FileTransfer</code> objects
0426: */
0427: private Vector getFileTX(String destPool, SubInfo job) {
0428: Vector vFileTX = new Vector();
0429:
0430: //check if there is a remote initialdir set
0431: String path = job.vdsNS
0432: .getStringValue(VDS.REMOTE_INITIALDIR_KEY);
0433:
0434: for (Iterator it = job.getOutputFiles().iterator(); it
0435: .hasNext();) {
0436: PegasusFile pf = (PegasusFile) it.next();
0437: String file = pf.getLFN();
0438:
0439: FileTransfer ft = this .constructFileTX(pf,
0440: job.executionPool, destPool, job.logicalName, path);
0441: if (ft != null) {
0442: vFileTX.add(ft);
0443:
0444: }
0445:
0446: }
0447:
0448: return vFileTX;
0449:
0450: }
0451:
0452: /**
0453: * Constructs the FileTransfer object on the basis of the transiency
0454: * information. If the transient flag for transfer is set, the destURL for the
0455: * FileTransfer object would be the execution directory, as this is the entry
0456: * that has to be registered in the ReplicaMechanism
0457: *
0458: * @param pf the PegasusFile for which the transfer has to be done.
0459: * @param execPool the pool on which the file is created.
0460: * @param destPool the output pool where the job should be transferred
0461: * @param job the name of the associated job.
0462: * @param path the path that a user specifies in the profile for key
0463: * remote_initialdir that results in the workdir being
0464: * changed for a job on a execution pool.
0465: *
0466: * @return the corresponding FileTransfer object
0467: */
0468: private FileTransfer constructFileTX(PegasusFile pf,
0469: String execPool, String destPool, String job, String path) {
0470:
0471: String lfn = pf.getLFN();
0472: FileTransfer ft = null;
0473:
0474: SiteInfo ePool = mPoolHandle.getPoolEntry(execPool, "vanilla");
0475: SiteInfo dPool = mPoolHandle.getPoolEntry(destPool, "vanilla");
0476: if (ePool == null || dPool == null) {
0477: mLogMsg = (ePool == null) ? this .poolNotFoundMsg(execPool,
0478: "vanilla") : this .poolNotFoundMsg(destPool,
0479: "vanilla");
0480: mLogger.log(mLogMsg, LogManager.ERROR_MESSAGE_LEVEL);
0481: throw new RuntimeException(mLogMsg);
0482: }
0483:
0484: //definite inconsitency as url prefix and mount point
0485: //are not picked up from the same server
0486: String execURL = ePool.getURLPrefix(true)
0487: + mPoolHandle.getExecPoolWorkDir(execPool, path)
0488: + File.separatorChar + lfn;
0489:
0490: //write out the exec url to the cache file
0491: trackInTransientRC(lfn, execURL, execPool);
0492:
0493: //if both transfer and registration
0494: //are transient return null
0495: if (pf.getTransientRegFlag() && pf.getTransientTransferFlag()) {
0496: return null;
0497: }
0498:
0499: //if only transient transfer flag
0500: //means destURL and sourceURL
0501: //are same and are equal to
0502: //execution directory on execPool
0503: if (pf.getTransientTransferFlag()) {
0504:
0505: ft = new FileTransfer(lfn, job, pf.getFlags());
0506: //set the transfer mode
0507: ft.setTransferFlag(pf.getTransferFlag());
0508: ft.addSource(execPool, execURL);
0509: ft.addDestination(execPool, execURL);
0510: }
0511: //the source dir is the exec dir
0512: //on exec pool and dest dir
0513: //would be on the output pool
0514: else {
0515: //construct the source url depending on whether third party tx
0516: String sourceURL = isSiteThirdParty(execPool,
0517: SubInfo.STAGE_OUT_JOB) ? execURL : "file://"
0518: + mPoolHandle.getExecPoolWorkDir(execPool, path)
0519: + File.separator + lfn;
0520:
0521: ft = new FileTransfer(lfn, job, pf.getFlags());
0522: //set the transfer mode
0523: ft.setTransferFlag(pf.getTransferFlag());
0524:
0525: ft.addSource(execPool, sourceURL);
0526:
0527: //add all the possible destination urls iterating through
0528: //the list of grid ftp servers associated with the dest pool.
0529: List l = mPoolHandle.getGridFTPServers(destPool);
0530: Iterator it = l.iterator();
0531: String destURL = null;
0532: boolean first = true;
0533: while (it.hasNext()) {
0534: destURL = (first) ?
0535: //the first entry has to be the one in the Pool object
0536: dPool.getURLPrefix(false)
0537: :
0538: //get it from the list
0539: ((GridFTPServer) it.next())
0540: .getInfo(GridFTPServer.GRIDFTP_URL);
0541:
0542: if (!first && destURL.equals(dPool.getURLPrefix(false))) {
0543: //ensures no duplicate entries. The gridftp server in the pool
0544: //object is one of the servers in the list of gridftp servers.
0545: continue;
0546: }
0547:
0548: //assumption of same se mount point for each gridftp server
0549: destURL += this .getPathOnStageoutSite(lfn);// + File.separator + lfn;
0550:
0551: //if the paths match of dest URI
0552: //and execDirURL we return null
0553: if (execURL.equalsIgnoreCase(destURL)) {
0554: /*ft = new FileTransfer(file, job);
0555: ft.addSource(execPool, execURL);*/
0556: ft.addDestination(execPool, execURL);
0557: //make the transfer transient?
0558: ft.setTransferFlag(PegasusFile.TRANSFER_NOT);
0559: return ft;
0560: }
0561:
0562: ft.addDestination(destPool, destURL);
0563: first = false;
0564: }
0565:
0566: }
0567:
0568: return ft;
0569: }
0570:
0571: /**
0572: * This generates a error message for pool not found in the pool
0573: * config file.
0574: *
0575: * @param poolName the name of pool that is not found.
0576: * @param universe the condor universe
0577: *
0578: * @return the message.
0579: */
0580: private String poolNotFoundMsg(String poolName, String universe) {
0581: String st = "Error: No matching entry to pool = " + poolName
0582: + " ,universe = " + universe
0583: + "\n found in the pool configuration file ";
0584: return st;
0585:
0586: }
0587:
0588: /**
0589: * This gets the Vector of FileTransfer objects for all the files which have
0590: * to be transferred to the destination pool in case of Interpool transfers.
0591: * Each FileTransfer object has the source and the destination URLs. the
0592: * source URI is determined from the pool on which the jobs are executed.
0593: *
0594: * @param job the job with reference to which interpool file transfers
0595: * need to be determined.
0596: * @param nodes Vector of <code> SubInfo</code> objects for the nodes, whose
0597: * outputfiles are to be transferred to the dest pool.
0598: *
0599: * @return Vector of <code>FileTransfer</code> objects
0600: */
0601: private Vector getInterpoolFileTX(SubInfo job, Vector nodes) {
0602: String destPool = job.executionPool;
0603: //contains the remote_initialdir if specified for the job
0604: String destRemoteDir = job.vdsNS
0605: .getStringValue(VDS.REMOTE_INITIALDIR_KEY);
0606:
0607: SiteInfo desPool = mPoolHandle.getTXPoolEntry(destPool);
0608: SiteInfo sourcePool;
0609:
0610: Vector vFileTX = new Vector();
0611:
0612: for (Iterator it = nodes.iterator(); it.hasNext();) {
0613: //get the parent job
0614: SubInfo pJob = (SubInfo) it.next();
0615: sourcePool = mPoolHandle.getTXPoolEntry(pJob.executionPool);
0616:
0617: if (((String) sourcePool.getInfo(SiteInfo.HANDLE))
0618: .equalsIgnoreCase(destPool)) {
0619: //no need to add transfers, as the parent job and child
0620: //job are run in the same directory on the pool
0621: continue;
0622: }
0623:
0624: String sourceURI = null;
0625: String thirdPartyDestURI = desPool.getURLPrefix(true)
0626: + mPoolHandle.getExecPoolWorkDir(destPool,
0627: destRemoteDir);
0628: //definite inconsitency as url prefix and mount point
0629: //are not picked up from the same server
0630: String destURI = isSiteThirdParty(destPool,
0631: SubInfo.INTER_POOL_JOB) ?
0632: //construct for third party transfer
0633: thirdPartyDestURI
0634: :
0635: //construct for normal transfer
0636: "file://"
0637: + mPoolHandle.getExecPoolWorkDir(destPool,
0638: destRemoteDir);
0639:
0640: for (Iterator fileIt = pJob.getOutputFiles().iterator(); fileIt
0641: .hasNext();) {
0642: PegasusFile pf = (PegasusFile) fileIt.next();
0643: String outFile = pf.getLFN();
0644:
0645: // Not required as input files are Sets now Karan Sept 14, 2006
0646: // if (stringInPegVector(outFile, job.inputFiles)) {
0647: if (job.getInputFiles().contains(pf)) {
0648: String sourceURL = null;
0649: String destURL = destURI + File.separator + outFile;
0650: String thirdPartyDestURL = thirdPartyDestURI
0651: + File.separator + outFile;
0652: FileTransfer ft = new FileTransfer(outFile,
0653: pJob.jobName);
0654: ft.addDestination(destPool, destURL);
0655:
0656: //add all the possible source urls iterating through
0657: //the list of grid ftp servers associated with the dest pool.
0658: List l = mPoolHandle
0659: .getGridFTPServers(pJob.executionPool);
0660: boolean first = true;
0661: for (Iterator it1 = l.iterator(); it1.hasNext();) {
0662: //definite inconsitency as url prefix and mount point
0663: //are not picked up from the same server
0664: sourceURI = (first) ?
0665: //the first entry has to be the one in the Pool object
0666: sourcePool.getURLPrefix(false)
0667: :
0668: //get it from the list
0669: ((GridFTPServer) it1.next())
0670: .getInfo(GridFTPServer.GRIDFTP_URL);
0671:
0672: if ((!first && sourceURI.equals(sourcePool
0673: .getURLPrefix(false)))) {
0674: //ensures no duplicate entries. The gridftp server in the pool
0675: //object is one of the servers in the list of gridftp servers.
0676: mLogger.log(
0677: "Not adding inter pool file tx for "
0678: + outFile,
0679: LogManager.DEBUG_MESSAGE_LEVEL);
0680: continue;
0681: }
0682:
0683: sourceURI += mPoolHandle
0684: .getExecPoolWorkDir(
0685: pJob.executionPool,
0686: pJob.vdsNS
0687: .getStringValue(VDS.REMOTE_INITIALDIR_KEY));
0688: sourceURL = sourceURI + File.separator
0689: + outFile;
0690:
0691: if (!(sourceURL
0692: .equalsIgnoreCase(thirdPartyDestURL))) {
0693: //add the source url only if it does not match to
0694: //the third party destination url
0695: ft.addSource(pJob.executionPool, sourceURL);
0696: }
0697: first = false;
0698: }
0699: if (ft.isValid()) {
0700: //adding only if there is at least
0701: //a single valid transfer associated.
0702: vFileTX.addElement(ft);
0703: }
0704: }
0705: }
0706:
0707: }
0708:
0709: return vFileTX;
0710:
0711: }
0712:
0713: /**
0714: * It looks up the RCEngine Hashtable to lookup the locations for the
0715: * files and add nodes to transfer them. If a file is not found to be in
0716: * the Replica Catalog the Transfer Engine flags an error and exits
0717: *
0718: * @param job the <code>SubInfo</code>object for whose ipfile have
0719: * to search the Replica Mechanism for.
0720: * @param searchFiles Vector containing the PegasusFile objects corresponding
0721: * to the files that need to have their mapping looked
0722: * up from the Replica Mechanism.
0723: */
0724: private void getFilesFromRC(SubInfo job, Vector searchFiles) {
0725: Vector vFileTX = new Vector();
0726: String jobName = job.logicalName;
0727: String ePool = job.executionPool;
0728: //contains the remote_initialdir if specified for the job
0729: String eRemoteDir = job.vdsNS
0730: .getStringValue(VDS.REMOTE_INITIALDIR_KEY);
0731: String sourceURL, destURL = null;
0732: SiteInfo ep = mPoolHandle.getPoolEntry(ePool, "vanilla");
0733: //we are using the pull mode for data transfer
0734: String scheme = "file";
0735:
0736: //sAbsPath would be just the source directory absolute path
0737: //dAbsPath would be just the destination directory absolute path
0738: String dAbsPath = mPoolHandle.getExecPoolWorkDir(ePool,
0739: eRemoteDir);
0740: String sAbsPath = null;
0741:
0742: //sDirURL would be the url to the source directory.
0743: //dDirURL would be the url to the destination directoy
0744: //and is always a networked url.
0745: //definite inconsitency as url prefix and mount point
0746: //are not picked up from the same server
0747: String dDirURL = ep.getURLPrefix(true) + dAbsPath;
0748: String sDirURL = null;
0749: //check if the execution pool is third party or not
0750: String destDir = (isSiteThirdParty(ePool, SubInfo.STAGE_IN_JOB)) ?
0751: //use the full networked url to the directory
0752: dDirURL
0753: :
0754: //use the default pull mode
0755: scheme
0756: + "://"
0757: + mPoolHandle.getExecPoolWorkDir(ePool,
0758: eRemoteDir);
0759:
0760: for (Iterator it = searchFiles.iterator(); it.hasNext();) {
0761: PegasusFile pf = (PegasusFile) it.next();
0762: List pfns = null;
0763: ReplicaLocation rl = null;
0764:
0765: String lfn = pf.getLFN();
0766: NameValue nv = null;
0767:
0768: //see if the pf is infact an instance of FileTransfer
0769: if (pf instanceof FileTransfer) {
0770: //that means we should be having the source url already.
0771: //nv contains both the source pool and the url.
0772: //This happens in case of AI Planner or transfer of executables
0773: nv = ((FileTransfer) pf).getSourceURL();
0774: destURL = ((FileTransfer) pf).removeDestURL()
0775: .getValue();
0776: destURL = (isSiteThirdParty(ePool, SubInfo.STAGE_IN_JOB)) ?
0777: //the destination URL is already third party
0778: //enabled. use as it is
0779: destURL
0780: :
0781: //explicitly convert to file URL scheme
0782: scheme + "://"
0783: + Utility.getAbsolutePath(destURL);
0784: } else {
0785: //query the replica services and get hold of pfn
0786: rl = mRCBridge.getFileLocs(lfn);
0787: pfns = (rl == null) ? null : rl.getPFNList();
0788: }
0789:
0790: if (pfns == null && nv == null) {
0791: //check to see if the input file is optional
0792: if (pf.fileOptional()) {
0793: //no need to add a transfer node for it if no location found
0794: continue;
0795: }
0796:
0797: //flag an error
0798: throw new RuntimeException(
0799: "TransferEngine.java: Can't determine a location to "
0800: + "transfer input file for lfn " + lfn
0801: + " for job " + job.getName());
0802: }
0803:
0804: /*
0805: ReplicaLocation selLoc = (nv == null)?
0806: //select from the various replicas
0807: mReplicaSelector.select( lfn, pfns, job.getSiteHandle() ):
0808: //we have the replica already selected
0809: new ReplicaLocation(nv.getValue(),nv.getKey());
0810: */
0811: ReplicaCatalogEntry selLoc = (nv == null) ?
0812: //select from the various replicas
0813: mReplicaSelector.selectReplica(rl, job.getSiteHandle())
0814: :
0815: //we have the replica already selected
0816: new ReplicaCatalogEntry(nv.getValue(), nv.getKey());
0817:
0818: //get the file to the job's execution pool
0819: //this is assuming that there are no directory paths
0820: //in the pfn!!!
0821: sDirURL = selLoc.getPFN().substring(0,
0822: selLoc.getPFN().lastIndexOf(File.separator));
0823:
0824: //try to get the directory absolute path
0825: //yes i know that we sending the url to directory
0826: //not the file.
0827: sAbsPath = Utility.getAbsolutePath(sDirURL);
0828:
0829: //the final source and destination url's to the file
0830: sourceURL = selLoc.getPFN();
0831: destURL = (destURL == null) ? destDir + File.separator
0832: + lfn : destURL;
0833:
0834: //we have all the chopped up combos of the urls.
0835: //do some funky matching on the basis of the fact
0836: //that each pool has one shared filesystem
0837:
0838: //match the source and dest 3rd party urls or
0839: //match the directory url knowing that lfn and
0840: //(source and dest pool) are same
0841: if (sourceURL.equalsIgnoreCase(dDirURL + File.separator
0842: + lfn)
0843: || (selLoc.getResourceHandle().equalsIgnoreCase(
0844: ePool)
0845: && lfn.equals(sourceURL.substring(sourceURL
0846: .lastIndexOf(File.separator) + 1)) && sAbsPath
0847: .equals(dAbsPath))) {
0848: //do not need to add any transfer node
0849: mLogger.log("Not transferring ip file " + lfn
0850: + " for job " + job.jobName + " to site "
0851: + ePool, LogManager.DEBUG_MESSAGE_LEVEL);
0852: continue;
0853: }
0854:
0855: //construct the file transfer object
0856: FileTransfer ft = (pf instanceof FileTransfer) ? (FileTransfer) pf
0857: : new FileTransfer(lfn, jobName);
0858: //the transfer mode for the file needs to be
0859: //propogated for optional transfers.
0860: ft.setTransferFlag(pf.getTransferFlag());
0861:
0862: //to prevent duplicate source urls
0863: if (ft.getSourceURL() == null) {
0864: ft.addSource(selLoc.getResourceHandle(), sourceURL);
0865: }
0866:
0867: //to prevent duplicate destination urls
0868: if (ft.getDestURL() == null)
0869: ft.addDestination(ePool, destURL);
0870: vFileTX.add(ft);
0871: //we need to set destURL to null
0872: destURL = null;
0873: }
0874:
0875: //call addTransferNode
0876: if (!vFileTX.isEmpty()) {
0877: mTXRefiner.addStageInXFERNodes(job, vFileTX);
0878:
0879: }
0880: }
0881:
0882: /**
0883: * It gets the output files for all the nodes which are specified in
0884: * the Vector nodes passed.
0885: *
0886: * @param nodes Vector of nodes job names whose output files are required.
0887: *
0888: * @param parentSubs Vector of <code>SubInfo</code> objects. One passes an
0889: * empty vector as a parameter. And this populated with
0890: * SubInfo objects, of the nodes when output files are
0891: * being determined.
0892: *
0893: * @return Set of PegasusFile objects
0894: */
0895: private Set getOutputFiles(Vector nodes, Vector parentSubs) {
0896:
0897: Set files = new HashSet();
0898:
0899: for (Iterator it = nodes.iterator(); it.hasNext();) {
0900: String jobName = (String) it.next();
0901: SubInfo sub = getSubInfo(jobName);
0902: parentSubs.addElement(sub);
0903: files.addAll(sub.getOutputFiles());
0904: }
0905:
0906: return files;
0907: }
0908:
0909: /**
0910: * Initializes the transient replica catalog.
0911: */
0912: private void initializeTransientRC() {
0913: mLogger.log("Initialising Transient Replica Catalog",
0914: LogManager.DEBUG_MESSAGE_LEVEL);
0915:
0916: Properties cacheProps = mProps.getVDSProperties()
0917: .matchingSubset(ReplicaCatalog.c_prefix, false);
0918: String file = mPOptions.getSubmitDirectory()
0919: + File.separatorChar + getCacheFileName(mDag);
0920:
0921: //set the appropriate property to designate path to file
0922: cacheProps
0923: .setProperty(this .TRANSIENT_REPLICA_CATALOG_KEY, file);
0924:
0925: try {
0926: mTransientRC = ReplicaFactory.loadInstance(
0927: TRANSIENT_REPLICA_CATALOG_IMPLEMENTER, cacheProps);
0928: } catch (Exception e) {
0929: throw new RuntimeException(
0930: "Unable to initialize the transient replica catalog "
0931: + file, e);
0932: }
0933: }
0934:
0935: /**
0936: * Returns the full path on remote output site, where the lfn will reside.
0937: * Each call to this function could trigger a change in the directory
0938: * returned depending upon the file factory being used.
0939: *
0940: * @param lfn the logical filename of the file.
0941: *
0942: * @return the storage mount point.
0943: */
0944: protected String getPathOnStageoutSite(String lfn) {
0945: String file;
0946: try {
0947: file = mFactory.createFile(lfn).toString();
0948: } catch (IOException e) {
0949: throw new RuntimeException("IOException ", e);
0950: }
0951: return file;
0952: }
0953:
0954: /**
0955: * Initialize the Stageout Site Directory factory.
0956: * The factory is used to returns the relative directory that a particular
0957: * file needs to be staged to on the output site.
0958: *
0959: * @param workflow the workflow to which the transfer nodes need to be
0960: * added.
0961: *
0962: */
0963: protected void initializeStageOutSiteDirectoryFactory(ADag workflow) {
0964: String outputSite = mPOptions.getOutputSite();
0965: boolean stageOut = ((outputSite != null) && (outputSite.trim()
0966: .length() > 0));
0967:
0968: if (!stageOut) {
0969: //no initialization and return
0970: mLogger
0971: .log(
0972: "No initialization of StageOut Site Directory Factory",
0973: LogManager.DEBUG_MESSAGE_LEVEL);
0974: return;
0975: }
0976:
0977: // create files in the directory, unless anything else is known.
0978: mStageOutBaseDirectory = mPoolHandle
0979: .getSeMountPoint(mPoolHandle.getPoolEntry(outputSite,
0980: "vanilla"));
0981:
0982: if (mProps.useDeepStorageDirectoryStructure()) {
0983: // create hashed, and levelled directories
0984: try {
0985: VirtualDecimalHashedFileFactory temp = null;
0986:
0987: //get the total number of files that need to be stageout
0988: int totalFiles = 0;
0989: for (Iterator it = workflow.jobIterator(); it.hasNext();) {
0990: SubInfo job = (SubInfo) it.next();
0991:
0992: //traverse through all the job output files
0993: for (Iterator opIt = job.getOutputFiles()
0994: .iterator(); opIt.hasNext();) {
0995: if (!((PegasusFile) opIt.next())
0996: .getTransientTransferFlag()) {
0997: //means we have to stage to output site
0998: totalFiles++;
0999: }
1000: }
1001: }
1002:
1003: temp = new VirtualDecimalHashedFileFactory(
1004: mStageOutBaseDirectory, totalFiles);
1005:
1006: //each stageout file has only 1 file associated with it
1007: temp.setMultiplicator(1);
1008: mFactory = temp;
1009: } catch (IOException e) {
1010: //wrap into runtime and throw
1011: throw new RuntimeException(
1012: "While initializing HashedFileFactory", e);
1013: }
1014: } else {
1015: try {
1016: //Create a flat file factory
1017: mFactory = new VirtualFlatFileFactory(
1018: mStageOutBaseDirectory); // minimum default
1019: } catch (IOException ioe) {
1020: throw new RuntimeException(
1021: "Unable to generate files in the submit directory ",
1022: ioe);
1023: }
1024: }
1025:
1026: }
1027:
1028: /**
1029: * Inserts an entry into the Transient RC.
1030: *
1031: * @param lfn the logical name of the file.
1032: * @param pfn the pfn
1033: * @param site the site handle
1034: */
1035: private void trackInTransientRC(String lfn, String pfn, String site) {
1036:
1037: //check if the cache handle is initialized
1038: if (mTransientRC == null)
1039: this .initializeTransientRC();
1040:
1041: mTransientRC.insert(lfn, pfn, site);
1042: }
1043:
1044: /**
1045: * Closes and writes out to the Transient Replica Catalog.
1046: */
1047: private void closeTransientRC() {
1048: if (mTransientRC != null)
1049: mTransientRC.close();
1050: }
1051:
1052: /**
1053: * Constructs the basename to the cache file that is to be used
1054: * to log the transient files. The basename is dependant on whether the
1055: * basename prefix has been specified at runtime or not.
1056: *
1057: * @param adag the ADag object containing the workflow that is being
1058: * concretized.
1059: *
1060: * @return the name of the cache file
1061: */
1062: private String getCacheFileName(ADag adag) {
1063: StringBuffer sb = new StringBuffer();
1064: String bprefix = mPOptions.getBasenamePrefix();
1065:
1066: if (bprefix != null) {
1067: //the prefix is not null using it
1068: sb.append(bprefix);
1069: } else {
1070: //generate the prefix from the name of the dag
1071: sb.append(adag.dagInfo.nameOfADag).append("_").append(
1072: adag.dagInfo.index);
1073: }
1074: //append the suffix
1075: sb.append(".cache");
1076:
1077: return sb.toString();
1078:
1079: }
1080:
1081: }
|