001: /*
002: * This file or a portion of this file is licensed under the terms of
003: * the Globus Toolkit Public License, found in file ../GTPL, or at
004: * http://www.globus.org/toolkit/download/license.html. This notice must
005: * appear in redistributions of this file, with or without modification.
006: *
007: * Redistributions of this Software, with or without modification, must
008: * reproduce the GTPL in: (1) the Software, or (2) the Documentation or
009: * some other similar material which is provided with the Software (if
010: * any).
011: *
012: * Copyright 1999-2004 University of Chicago and The University of
013: * Southern California. All rights reserved.
014: */
015: package org.griphyn.vdl.dbschema;
016:
017: import java.sql.*;
018: import java.util.*;
019: import java.io.*;
020: import java.lang.reflect.*;
021: import org.griphyn.vdl.util.ChimeraProperties;
022: import org.griphyn.common.util.Separator;
023: import org.griphyn.vdl.workflow.*;
024: import org.griphyn.vdl.util.Logging;
025:
026: /**
027: * This class provides basic functionalities to interact with the
028: * backend database for workflow records. Currently, only searches
029: * that fill the workflow class are implemented.
030: *
031: * @author Jens-S. Vöckler
032: * @author Mike Wilde
033: * @version $Revision: 50 $
034: */
035: public class WorkflowSchema extends DatabaseSchema implements WF {
036: /**
037: * Default constructor for access to the WF set of tables.
038: *
039: * @param dbDriverName is the database driver name
040: */
041: public WorkflowSchema(String dbDriverName)
042: throws ClassNotFoundException, NoSuchMethodException,
043: InstantiationException, IllegalAccessException,
044: InvocationTargetException, SQLException, IOException {
045: // load the driver from the properties
046: super (dbDriverName, PROPERTY_PREFIX);
047: Logging.instance().log("dbschema", 3,
048: "done with parent schema c'tor");
049:
050: // Note: Does not rely on optional JDBC3 features
051: this .m_dbdriver.insertPreparedStatement("work.select.all",
052: "SELECT * FROM wf_work");
053: this .m_dbdriver.insertPreparedStatement("work.select.mtime",
054: "SELECT * FROM wf_work WHERE mtime >= ?");
055: this .m_dbdriver.insertPreparedStatement("work.select.sk",
056: "SELECT * FROM wf_work WHERE basedir=? AND vogroup=? "
057: + "AND workflow=? AND run=?");
058:
059: this .m_dbdriver.insertPreparedStatement("job.select.all",
060: "SELECT * FROM wf_jobstate");
061: this .m_dbdriver
062: .insertPreparedStatement(
063: "job.select.mtime",
064: "SELECT * FROM wf_jobstate WHERE wfid IN "
065: + "( SELECT id FROM wf_work WHERE mtime >= ? )");
066: this .m_dbdriver.insertPreparedStatement("job.select.sk",
067: "SELECT * FROM wf_jobstate WHERE wfid=? AND jobid=?");
068: }
069:
070: /**
071: * Converts a regular datum into an SQL timestamp.
072: * @param date is a regular Java date
073: * @return a SQL timestamp obtained from the Date.
074: */
075: protected java.sql.Timestamp toStamp(java.util.Date date) {
076: return new java.sql.Timestamp(date.getTime());
077: }
078:
079: /**
080: * Converts a SQL timestamp into a regular datum.
081: * @param date is SQL timestamp from the database
082: * @return a regular Java date
083: */
084: protected java.util.Date fromStamp(java.sql.Timestamp date) {
085: return new java.util.Date(date.getTime());
086: }
087:
088: /**
089: * Obtains all jobs that belong to a particular workflow.
090: *
091: * @param wfid is the workflow identifier for jobs.
092: * @return a list of all jobs
093: */
094: private java.util.List getAllJobs(long wfid) throws SQLException {
095: java.util.List result = new ArrayList();
096: Logging.instance()
097: .log("xaction", 1, "START load jobs for work");
098: PreparedStatement ps = m_dbdriver
099: .getPreparedStatement("job.select.work");
100:
101: if (m_dbdriver.preferString())
102: ps.setString(1, Long.toString(wfid));
103: else
104: ps.setLong(1, wfid);
105: ResultSet rs = ps.executeQuery();
106: Logging.instance()
107: .log("xaction", 1, "INTER load jobs for work");
108:
109: while (rs.next()) {
110: JobStateEntry j = new JobStateEntry(wfid, rs
111: .getString("id"));
112: j.setState(rs.getString("state"));
113: j.setModificationTime(fromStamp(rs.getTimestamp("mtime")));
114: j.setSite(rs.getString("site"));
115: result.add(j);
116: }
117: rs.close();
118:
119: Logging.instance()
120: .log("xaction", 1, "FINAL load jobs for work");
121: return result;
122: }
123:
124: /**
125: * Converts the output of a result set into a workflow
126: *
127: * @param rs is the result set of a query, which is better valid.
128: * @param withJobs if true, also add the jobs, if false, no jobs.
129: * @return a workflow instance created from the result set.
130: */
131: protected WorkEntry convertResultSet(ResultSet rs, boolean withJobs)
132: throws SQLException {
133: WorkEntry result = new WorkEntry(rs.getLong("id"), rs
134: .getString("basedir"), rs.getString("vogroup"), rs
135: .getString("workflow"), rs.getString("run"));
136: result.setCreator(rs.getString("creator"));
137: result.setCreationTime(fromStamp(rs.getTimestamp("ctime")));
138: result.setState(rs.getInt("state"));
139: result.setModificationTime(fromStamp(rs.getTimestamp("mtime")));
140:
141: if (withJobs)
142: result.setJob(getAllJobs(rs.getLong("id")));
143:
144: return result;
145: }
146:
147: /**
148: * Load a single workflow from the backend database into a Java
149: * object. The identification is based on the secondary key quadruple.
150: *
151: * @param basedir is the base directory
152: * @param vogroup is the VO group identifier
153: * @param label is the workflow label
154: * @param run is the workflow run directory
155: * @return the Workflow that was matched by the id, which may be null
156: */
157: public WorkEntry getWorkflow(String basedir, String vogroup,
158: String label, String run) throws SQLException {
159: WorkEntry result = null;
160: Logging.instance().log("xaction", 1, "START load work sk");
161:
162: PreparedStatement ps = m_dbdriver
163: .getPreparedStatement("work.select.sk");
164: int i = 1;
165: ps.setString(i++, basedir);
166: ps.setString(i++, vogroup);
167: ps.setString(i++, label);
168: ps.setString(i++, run);
169:
170: ResultSet rs = ps.executeQuery();
171: Logging.instance().log("xaction", 1, "INTER load work sk");
172:
173: if (rs.next()) {
174: result = convertResultSet(rs, true);
175: } else {
176: Logging.instance().log("wf", 0, "No workflows found");
177: }
178: rs.close();
179:
180: Logging.instance().log("xaction", 1, "FINAL load work sk");
181: return result;
182: }
183:
184: /**
185: * Loads all workflows that are fresh enough and returns a map of
186: * workflows matching. The list is indexed by the primary key of
187: * the WF table, which is the unique workflow id.
188: *
189: * @param mtime is the oldest last modification time. Use
190: * <code>null</code> for all.
191: * @return a map of workflows, indexed by their workflow id.
192: */
193: public java.util.Map getWorkflows(java.util.Date mtime)
194: throws SQLException {
195: java.util.Map result = new TreeMap();
196: Logging.instance().log("xaction", 1, "START load all work");
197:
198: PreparedStatement ps = null;
199: if (mtime == null) {
200: ps = m_dbdriver.getPreparedStatement("work.select.all");
201: } else {
202: ps = m_dbdriver.getPreparedStatement("work.select.mtime");
203: ps.setTimestamp(1, toStamp(mtime));
204: }
205:
206: ResultSet rs = ps.executeQuery();
207: Logging.instance().log("xaction", 1, "INTER load all work");
208:
209: while (rs.next()) {
210: // insert workflows without job state
211: result.put(new Long(rs.getLong("id")), convertResultSet(rs,
212: false));
213: }
214: rs.close();
215:
216: if (result.size() > 0) {
217: // now add job state
218: Logging.instance().log("xaction", 1, "START load all jobs");
219: if (mtime == null) {
220: ps = m_dbdriver.getPreparedStatement("job.select.all");
221: } else {
222: ps = m_dbdriver
223: .getPreparedStatement("job.select.mtime");
224: ps.setTimestamp(1, toStamp(mtime));
225: }
226:
227: rs = ps.executeQuery();
228: Logging.instance().log("xaction", 1, "INTER load all jobs");
229:
230: while (rs.next()) {
231: Long key = new Long(rs.getLong("wfid"));
232: JobStateEntry job = new JobStateEntry(rs
233: .getLong("wfid"), rs.getString("jobid"));
234: job.setState(rs.getString("state"));
235: job.setModificationTime(fromStamp(rs
236: .getTimestamp("mtime")));
237: job.setSite(rs.getString("site"));
238:
239: if (result.containsKey(key)) {
240: ((WorkEntry) (result.get(key))).addJob(job);
241: }
242: }
243: Logging.instance().log("xaction", 1, "FINAL load all jobs");
244: } else {
245: Logging.instance().log("wf", 0, "No workflows found");
246: }
247:
248: Logging.instance().log("xaction", 1, "FINAL load all work");
249: return result;
250: }
251: }
|