001: /*
002: * This file or a portion of this file is licensed under the terms of
003: * the Globus Toolkit Public License, found in file GTPL, or at
004: * http://www.globus.org/toolkit/download/license.html. This notice must
005: * appear in redistributions of this file, with or without modification.
006: *
007: * Redistributions of this Software, with or without modification, must
008: * reproduce the GTPL in: (1) the Software, or (2) the Documentation or
009: * some other similar material which is provided with the Software (if
010: * any).
011: *
012: * Copyright 1999-2004 University of Chicago and The University of
013: * Southern California. All rights reserved.
014: */
015:
016: package org.griphyn.vdl.directive;
017:
018: import java.io.*;
019: import org.griphyn.vdl.directive.WorkflowJob;
020: import org.griphyn.vdl.util.*;
021: import java.sql.SQLException;
022:
023: /**
024: *
025: * A <code>Workflow</code> object defines a context for running a
026: * derivation graph on the Grid as a DAG, and managing its execution.
027: * It serves as a front-end to an associated shell script (by default
028: * located in $PEGASUS_HOME/bin/vds-Workflow-script-runwf)
029: *
030: * The workflow to be executed is designated by its terminal derivation (DV).
031: *
032: * The Workflow instance is returned by the class method Workflow.run().
033: *
034: * The Workflow class variables contain status cached from period
035: * queries of the workflow database.
036: *
037: * @author Douglas Scheftner
038: * @author Mike Wilde
039: * @author Eric Gilbert
040: * @version $Revision: 50 $
041: *
042: */
043:
044: public class Workflow {
045:
046: /* Class Variables */
047:
048: public static String runwfCmd = "/home/dscheftn/vds/bin/vds-Workflow-script-runwf";
049: public static String wfstatCmd = "/home/dscheftn/vds/bin/vds-Workflow-script-wfstat";
050:
051: public static String defaultVOGroup = "quarknet";
052:
053: public static String logicalFileNameBase = "/export/d1/dscheftn/quarknet_testing/runs";
054:
055: /** parent directory for run-dir tree ala vds-plan/vds-run. */
056: public static String defaultBaseDir = "/no/base/dir";
057:
058: public static String rlsURL = "rls://terminable.uchicago.edu";
059:
060: public static final int MAXWF = 100000;
061: public static Workflow[] workflows = new Workflow[MAXWF];
062: public static int nworkflows;
063:
064: public static long millisecsToRefreshStatus = 30000; // 30 secs between auto-refresh of status
065: public static long timeOfLastRefresh = 0; // really want this publically read-only
066:
067: // private static String voGroup;
068:
069: /* Instance Variables */
070:
071: /* sample data from database:
072:
073: id | basedir | vogroup | workflow | run | creator |
074: ----+-----------------+---------+------------+---------+---------+
075: 1 | /home/wilde/run | ivdgl1 | test | run0001 | wilde |
076:
077: ctime | state | mtime
078: ------------------------+-------+------------------------
079: 2005-08-20 13:25:27-05 | -2 | 2005-08-20 13:28:09-05
080:
081: */
082:
083: /* Instance variables that mirror the database fields */
084:
085: public String wfid;
086: public String basedir;
087: public String vogroup;
088: public String wfname;
089: public String run;
090: public String creator;
091: public String ctime;
092: public String state;
093: public String exitstatus;
094: public String mtime;
095:
096: /* Instance variables to track workflow state */
097:
098: public static final int WFMAXJOBS = 20000; /* FIX: can we avoid this hard limit? */
099: public WorkflowJob[] jobs;
100: public int njobs;
101: public String tmpdir;
102: public String errorMessage;
103:
104: /* Class Methods */
105:
106: public static Workflow run(String namespace, String dvName) {
107: Process p;
108: int rc;
109: Reader is;
110: StringBuffer sb = new StringBuffer();
111: char[] b = new char[100000];
112: int n;
113:
114: Workflow wf = new Workflow();
115:
116: wf.basedir = defaultBaseDir;
117: wf.vogroup = defaultVOGroup;
118: wf.wfname = dvName;
119:
120: try {
121: System.out.println("Running Process " + namespace + " "
122: + dvName);
123:
124: String[] cmd = { runwfCmd, defaultVOGroup,
125: logicalFileNameBase, defaultBaseDir, rlsURL,
126: namespace, dvName };
127:
128: p = Runtime.getRuntime().exec(cmd);
129:
130: InputStream out = p.getInputStream();
131: InputStreamReader r = new InputStreamReader(out);
132: BufferedReader in = new BufferedReader(r);
133:
134: wf.tmpdir = in.readLine();
135: System.out
136: .println("output from runwf: tmpdir=" + wf.tmpdir);
137: wf.run = in.readLine();
138: System.out.println("output from runwf: run=" + wf.run);
139: wf.errorMessage = in.readLine();
140: System.out.println("output from runwf: errorMessage="
141: + wf.errorMessage);
142:
143: rc = p.waitFor();
144: System.out.println("Process returned rc=" + rc);
145:
146: return (wf);
147: } catch (Exception e) {
148: System.out.println("Prepare: Exception: " + e.toString());
149: return wf;
150: }
151: }
152:
153: public static boolean refresh() {
154: Process p;
155: int rc;
156: Reader is;
157: StringBuffer sb = new StringBuffer();
158: char[] b = new char[100000];
159: int n;
160:
161: /* Run status command to get workflow states */
162:
163: try {
164:
165: p = Runtime.getRuntime().exec(wfstatCmd);
166:
167: InputStream out = p.getInputStream();
168: InputStreamReader r = new InputStreamReader(out);
169: BufferedReader in = new BufferedReader(r);
170:
171: String line;
172: nworkflows = 0;
173: while ((line = in.readLine()) != null) {
174:
175: Workflow w = new Workflow();
176: String[] t = line.split("\\|");
177: int nt = t.length;
178: if (nt > 1)
179: w.wfid = t[1];
180: if (nt > 2)
181: w.basedir = t[2];
182: if (nt > 3)
183: w.vogroup = t[3];
184: if (nt > 4)
185: w.wfname = t[4];
186: if (nt > 5)
187: w.run = t[5];
188: if (nt > 6)
189: w.creator = t[6];
190: if (nt > 7)
191: w.ctime = t[7];
192: if (nt > 8) {
193: switch (Integer.parseInt(t[8], 10)) {
194: case -2:
195: w.state = "WFSTATE_PLANNED";
196: w.exitstatus = "";
197: break;
198: case -1:
199: w.state = "WFSTATE_RUNNING";
200: w.exitstatus = "";
201: break;
202: default:
203: w.state = "WFSTATE_FINISHED";
204: w.exitstatus = t[8];
205: break;
206: }
207: }
208: if (nt > 9)
209: w.mtime = t[9];
210:
211: if (nworkflows < (MAXWF)) {
212: workflows[nworkflows++] = w;
213: } else {
214: return false;
215: }
216: }
217: rc = p.waitFor();
218: return true;
219: } catch (Exception e) {
220: System.out.println("WorkflowJob.refresh: Exception: "
221: + e.toString());
222: return false;
223: }
224: }
225:
226: /* Instance Methods */
227:
228: /**
229: * Sets the status fields in a Workflow instance.
230: * @return true if status was successfully obtained, false if not.
231: */
232: public boolean updateStatus() {
233: boolean rc;
234:
235: long now = System.currentTimeMillis();
236: if (now > (timeOfLastRefresh + millisecsToRefreshStatus)) {
237: rc = Workflow.refresh();
238: rc = WorkflowJob.refresh();
239: timeOfLastRefresh = now;
240: }
241:
242: for (int i = 0; i < nworkflows; i++) {
243: if (workflows[i].basedir.equals(basedir)
244: && workflows[i].vogroup.equals(vogroup)
245: && workflows[i].wfname.equals(wfname)
246: && workflows[i].run.equals(run)) {
247: wfid = workflows[i].wfid;
248: creator = workflows[i].creator;
249: ctime = workflows[i].ctime;
250: state = workflows[i].state;
251: exitstatus = workflows[i].exitstatus;
252: mtime = workflows[i].mtime;
253: break;
254: }
255: }
256:
257: if (wfid == null)
258: return false;
259:
260: // System.out.println ("basedir = " + basedir + " vogroup = " + vogroup + " wfname = " + wfname + " run = " + run + " wfid = " + wfid + " creator = " + creator + " ctime = " + ctime + " state = " + state + " exitstatus = " + exitstatus + " mtime = " + mtime);
261:
262: jobs = new WorkflowJob[WFMAXJOBS];
263: njobs = 0;
264: for (int i = 0; i < WorkflowJob.njobs; i++) {
265: if (WorkflowJob.jobs[i].wfid.equals(wfid))
266: if (njobs < WFMAXJOBS)
267: jobs[njobs++] = WorkflowJob.jobs[i];
268: }
269: return true;
270: }
271:
272: public String toWFStatusString() {
273: updateStatus();
274: return "wfid=" + wfid + " run=" + run + " ctime=" + ctime
275: + " state=" + state + " exitstatus=" + exitstatus
276: + " mtime=" + mtime + " refreshed=" + timeOfLastRefresh;
277: }
278:
279: public String toJobStatusString() {
280: updateStatus();
281: if (njobs == 0)
282: return "";
283: String s = jobs[0].asStatusString();
284: for (int i = 1; i < njobs; i++) {
285: s += "\n" + jobs[i].asStatusString();
286: }
287: return (s);
288: }
289:
290: public String toDetailStatusString() {
291: return toWFStatusString() + "\n" + toJobStatusString();
292: }
293:
294: public int stop() {
295: return 0;
296: }
297:
298: public int cleanup() {
299: return 0;
300: }
301: }
|