001: /*
002: * This file or a portion of this file is licensed under the terms of
003: * the Globus Toolkit Public License, found in file GTPL, or at
004: * http://www.globus.org/toolkit/download/license.html. This notice must
005: * appear in redistributions of this file, with or without modification.
006: *
007: * Redistributions of this Software, with or without modification, must
008: * reproduce the GTPL in: (1) the Software, or (2) the Documentation or
009: * some other similar material which is provided with the Software (if
010: * any).
011: *
012: * Copyright 1999-2004 University of Chicago and The University of
013: * Southern California. All rights reserved.
014: */
015:
016: package org.griphyn.vdl.directive;
017:
018: import java.io.*;
019: import java.util.*;
020: import org.griphyn.vdl.dax.*;
021: import org.griphyn.vdl.classes.LFN;
022: import org.griphyn.vdl.parser.DAXParser;
023: import org.griphyn.vdl.util.Logging;
024: import org.griphyn.vdl.planner.*;
025:
026: /**
027: * This class makes concrete plans for a DAX, when planning using the
028: * shell planner.
029: *
030: * @author Jens-S. Vöckler
031: * @author Yong Zhao
032: * @version $Revision: 50 $
033: *
034: * @see org.griphyn.vdl.planner.Scriptor
035: */
036: public class Derive extends Directive {
037: public Derive() throws IOException, MissingResourceException {
038: super ();
039: }
040:
041: /**
042: * Generates shell scripts for the workflow described by the dax.
043: * For each derivation, there is a shell script generated, and
044: * there is a control script to control the execution sequence of
045: * these shell scripts according to their dependencies.
046: *
047: * @param dax is the InputStream for the dax representation
048: * @param dir is the directory name in which to generate these scripts
049: * @param build specifies whether to force build mode
050: * @param register specifies whether to register output files
051: * @return true if successful, false otherwise
052: */
053: public boolean genShellScripts(InputStream dax, String dir,
054: boolean build, boolean register)
055: throws java.sql.SQLException, IOException,
056: InterruptedException {
057: return genShellScripts(dax, dir, build, register, null);
058: }
059:
060: /**
061: * Generates shell scripts for the workflow described by the dax.
062: * For each derivation, there is a shell script generated, and
063: * there is a control script to control the execution sequence of
064: * these shell scripts according to their dependencies.
065: *
066: * @param dax is the InputStream for the dax representation
067: * @param dir is the directory name in which to generate these scripts
068: * @param build specifies whether to force build mode
069: * @param register specifies whether to register output files
070: * @param kickstart_path specifies the location of kickstart. If null,
071: * kickstart will not be used.
072: * @return true if successful, false otherwise
073: */
074: public boolean genShellScripts(InputStream dax, String dir,
075: boolean build, boolean register, String kickstart_path)
076: throws java.sql.SQLException, IOException,
077: InterruptedException {
078: // sanity check -- is there a destination directory
079: if (dir == null || dir.equals("")) {
080: m_logger
081: .log("planner", 0,
082: "Output directory not specified, using default: test");
083: dir = "test";
084: } else {
085: m_logger.log("planner", 0, "Using output directory " + dir);
086: }
087:
088: // parse the dax file
089: m_logger.log("planner", 1, "Initializing dax parser");
090: DAXParser daxparser = new DAXParser(m_props
091: .getDAXSchemaLocation());
092: m_logger.log("planner", 1, "parsing the dax...");
093: ADAG adag = daxparser.parse(dax);
094:
095: // sanity check -- do we have a DAX
096: if (adag == null) {
097: m_logger.log("planner", 0, "failed parsing the dax.");
098: return false;
099: }
100:
101: // check output directory -- does it exist?
102: File f = new File(dir);
103: if (f.exists()) {
104: if (!f.isDirectory()) {
105: m_logger.log("planner", 0, "ERROR: '" + dir
106: + "' is not a directory!");
107: throw new IOException(dir + " is not a directory!");
108: }
109: } else {
110: m_logger.log("planner", 0, "directory '" + dir
111: + "' does not exist. Creating.");
112: f.mkdirs();
113: }
114:
115: // connect to replica catalog
116: RCWrapper rc = null;
117: try {
118: rc = new RCWrapper();
119: } catch (Exception e) {
120: throw new Error(e.getMessage());
121: }
122: m_logger.log("planner", 2, "Using RC " + rc.getName());
123:
124: // connect to transformation catalog
125: TCWrapper tc = new TCWrapper();
126: m_logger.log("planner", 2, "Using TC " + tc.getName());
127:
128: // connect to site catalog, optional
129: SCWrapper sc = new SCWrapper();
130: m_logger.log("planner", 2, "Using SC " + sc.getName());
131:
132: // lookup all filenames in replica catalog, and populate the
133: // filename map that is passed around.
134: m_logger.log("planner", 1, "processing logical filenames");
135: HashMap filenameMap = new HashMap();
136: for (Iterator i = adag.iterateFilename(); i.hasNext();) {
137: Filename fn = (Filename) i.next();
138: String lfn = fn.getFilename();
139: String pfn = rc.lookup("local", lfn);
140: if (pfn == null) {
141: // can't find the lfn->pfn mapping in rc
142: m_logger.log("planner", 1, "Info: Failed to find LFN "
143: + lfn + " in RC, assuming PFN==LFN");
144: pfn = lfn;
145: }
146: filenameMap.put(lfn, pfn);
147: }
148:
149: // convert adag to graph
150: Graph graph = DAX2Graph.DAG2Graph(adag);
151:
152: // to build or to make?
153: if (build) {
154: // build mode
155: m_logger.log("planner", 0,
156: "Running in build mode, DAG pruning skipped");
157: } else {
158: // make mode
159: m_logger.log("planner", 0,
160: "Checking nodes whose outputs already exist");
161: // check output file existence, if all output files exist, then
162: // cut this node
163: boolean cut;
164:
165: // make reverse topological sort to the graph, i.e. find last
166: // finished jobs first.
167: Topology rtp = new Topology(graph.reverseGraph());
168:
169: //Hash to keep all existing files
170: HashMap existMap = new HashMap();
171:
172: //Hash to keep files to add to exist list for this stage
173: HashMap addMap = new HashMap();
174:
175: //Hash to keep files to remove from exist list for this stage
176: HashMap removeMap = new HashMap();
177:
178: String jobs[];
179:
180: // whether we are dealing with last finished jobs
181: boolean last = true;
182:
183: while ((jobs = rtp.stageSort()) != null) {
184: int number = jobs.length;
185: int count = 0;
186:
187: for (int i = 0; i < number; i++) {
188: String jobID = jobs[i];
189: cut = true;
190: Job job = adag.getJob(jobID);
191:
192: // Hash to keep input files of this job
193: HashMap inputMap = new HashMap();
194:
195: for (Iterator e = job.iterateUses(); e.hasNext();) {
196: Filename fn = (Filename) e.next();
197: String lfn = fn.getFilename();
198:
199: // check exist file hash first
200: if (!existMap.containsKey(lfn)) {
201: // look up lfn in filename hash
202: String pfn = (String) filenameMap.get(lfn);
203: if (pfn == null) {
204: // lfn is not in the filename list
205: m_logger
206: .log(
207: "planner",
208: 0,
209: "ERROR: File '"
210: + lfn
211: + "' is not in the <filename> list, "
212: + "please check the DAX!");
213: return false;
214: }
215:
216: // check if output file exists
217: if (fn.getLink() == LFN.OUTPUT) {
218: File fp = new File(pfn);
219: if (!fp.exists()) {
220: // some output file does not exist.
221: cut = false;
222: }
223: }
224: if (fn.getLink() == LFN.INPUT) {
225: inputMap.put(lfn, pfn);
226: }
227: }
228: }
229:
230: if (cut) {
231: // cut node
232: m_logger.log("planner", 1, "Removed job "
233: + jobID + " from DAG");
234: graph.removeVertex(jobID);
235:
236: // assume all input files (outputs from upper stages exist)
237: addMap.putAll(inputMap);
238: count++;
239: } else {
240: // assume all input files not exist.
241: removeMap.putAll(inputMap);
242: }
243: } // for enum
244:
245: if (count == number) {
246: // output files for all the jobs in this stage exist
247: if (last) {
248: // this is the last stage, no need to run the dag
249: m_logger.log("planner", 0,
250: "All output files already exist, "
251: + "no computation is needed!");
252: return true;
253: }
254:
255: // cut all the upper stage jobs
256: while ((jobs = rtp.stageSort()) != null) {
257: for (int i = 0; i < jobs.length; i++) {
258: m_logger.log("planner", 1, "Removed job "
259: + jobs[i] + " from DAG");
260: graph.removeVertex(jobs[i]);
261: }
262: }
263: } else {
264: if (count == 0) {
265: // none gets cut in this stage
266: last = false;
267: continue;
268: }
269:
270: // put assumed existing files into exist map
271: existMap.putAll(addMap);
272: TreeSet temp = new TreeSet(removeMap.keySet());
273: for (Iterator it = temp.iterator(); it.hasNext();) {
274: String lfn = (String) it.next();
275: existMap.remove(lfn);
276: }
277: }
278: // now the last stage has been processed
279: last = false;
280: } // end while
281: } // end else
282:
283: // make topological sort to the graph
284: Topology tp = new Topology(graph);
285:
286: // get the topmost jobs
287: String[] jobs = tp.stageSort();
288:
289: // dax maybe invalid (empty or has cycle in it)
290: if (jobs == null) {
291: m_logger.log("planner", 0,
292: "ERROR: No starting job(s) found, "
293: + "please check the DAX file!");
294: return false;
295: }
296:
297: // create a Scriptor instance
298: Scriptor spt = new Scriptor(dir, adag, rc, sc, tc, filenameMap,
299: m_props.getDataDir());
300: spt.setRegister(register);
301:
302: // Only set kickstart path if CLI argument was specified.
303: // However, permit "-k ''" to remote kickstart invocations
304: if (kickstart_path != null) {
305: int x = kickstart_path.trim().length();
306: spt.setKickstart(x == 0 ? null : kickstart_path);
307: }
308:
309: String ctrlScript = spt.initializeControlScript();
310:
311: // check involved LFN's for these jobs
312: for (int i = 0; i < jobs.length; i++) {
313: // process each job and check input file existence
314: String scriptFile = spt.processJob(jobs[i], true);
315: if (scriptFile == null) {
316: m_logger.log("planner", 0,
317: "ERROR: failed processing job " + jobs[i]);
318: return false;
319: }
320: }
321:
322: spt.intermediateControlScript();
323:
324: // process jobs in the following stages
325: while ((jobs = tp.stageSort()) != null) {
326: for (int i = 0; i < jobs.length; i++) {
327: // not the first stage, no need to check input file existence
328: String scriptFile = spt.processJob(jobs[i], false);
329: if (scriptFile == null) {
330: m_logger.log("planner", 0,
331: "ERROR: failed processing job " + jobs[i]);
332: return false;
333: }
334: }
335:
336: spt.intermediateControlScript();
337: }
338:
339: spt.finalizeControlScript();
340: m_logger.log("planner", 0, "DAG processed successfully");
341: m_logger.log("planner", 1, "changing file permission");
342: changePermission(dir);
343:
344: spt = null;
345: if (rc != null)
346: rc.close();
347: if (sc != null)
348: sc.close();
349: if (tc != null)
350: tc.close();
351: m_logger.log("planner", 0, "To run the DAG, execute '"
352: + ctrlScript + "'" + " in directory " + dir);
353: return true;
354: }
355:
356: /**
357: * Helper method to change the permissions of shell scripts to be executable.
358: */
359: protected static int changePermission(String dir)
360: throws IOException, InterruptedException {
361: if (System.getProperty("line.separator").equals("\n")
362: && System.getProperty("file.separator").equals("/")
363: && System.getProperty("path.separator").equals(":")) {
364: String[] me = new String[3];
365: me[0] = "/bin/sh";
366: me[1] = "-c";
367: me[2] = "chmod 0755 *.sh";
368:
369: Process p = Runtime.getRuntime().exec(me, null,
370: new File(dir));
371: return p.waitFor();
372: } else {
373: return -1;
374: }
375: }
376: }
|