001: package org.jbpm.db;
002:
003: import java.io.Serializable;
004: import java.util.Collection;
005: import java.util.Date;
006: import java.util.Iterator;
007: import java.util.List;
008:
009: import javax.transaction.Synchronization;
010:
011: import org.apache.commons.logging.Log;
012: import org.apache.commons.logging.LogFactory;
013: import org.hibernate.HibernateException;
014: import org.hibernate.LockMode;
015: import org.hibernate.Query;
016: import org.hibernate.Session;
017: import org.hibernate.Transaction;
018: import org.jbpm.JbpmException;
019: import org.jbpm.graph.def.Action;
020: import org.jbpm.graph.exe.ProcessInstance;
021: import org.jbpm.graph.exe.Token;
022: import org.jbpm.job.Job;
023: import org.jbpm.job.Timer;
024:
025: public class JobSession {
026:
027: private Session session;
028:
029: public JobSession(Session session) {
030: this .session = session;
031: }
032:
033: public Job getFirstAcquirableJob(String lockOwner) {
034: Job job = null;
035: try {
036: Query query = session
037: .getNamedQuery("JobSession.getFirstAcquirableJob");
038: query.setString("lockOwner", lockOwner);
039: query.setTimestamp("now", new Date());
040: query.setMaxResults(1);
041: job = (Job) query.uniqueResult();
042:
043: } catch (Exception e) {
044: log.error(e);
045: throw new JbpmException("couldn't get acquirable jobs", e);
046: }
047: return job;
048: }
049:
050: public List findExclusiveJobs(String lockOwner,
051: ProcessInstance processInstance) {
052: List jobs = null;
053: try {
054: Query query = session
055: .getNamedQuery("JobSession.findExclusiveJobs");
056: query.setString("lockOwner", lockOwner);
057: query.setTimestamp("now", new Date());
058: query.setParameter("processInstance", processInstance);
059: jobs = query.list();
060:
061: } catch (Exception e) {
062: log.error(e);
063: throw new JbpmException(
064: "couldn't find exclusive jobs for thread '"
065: + lockOwner + "' and process instance '"
066: + processInstance + "'", e);
067: }
068: return jobs;
069: }
070:
071: public Job getFirstDueJob(String lockOwner,
072: Collection jobIdsToIgnore) {
073: Job job = null;
074: try {
075: Query query = null;
076: if ((jobIdsToIgnore == null) || (jobIdsToIgnore.isEmpty())) {
077: query = session
078: .getNamedQuery("JobSession.getFirstDueJob");
079: query.setString("lockOwner", lockOwner);
080:
081: } else {
082: query = session
083: .getNamedQuery("JobSession.getFirstDueJobExlcMonitoredJobs");
084: query.setString("lockOwner", lockOwner);
085: query
086: .setParameterList("jobIdsToIgnore",
087: jobIdsToIgnore);
088:
089: }
090: query.setMaxResults(1);
091: job = (Job) query.uniqueResult();
092:
093: } catch (Exception e) {
094: log.error(e);
095: throw new JbpmException("couldn't get acquirable jobs", e);
096: }
097: return job;
098: }
099:
100: public void saveJob(Job job) {
101: session.saveOrUpdate(job);
102: if (job instanceof Timer) {
103: Timer timer = (Timer) job;
104: Action action = timer.getAction();
105: if ((action != null) && (!session.contains(action))) {
106: log.debug("cascading timer save to timer action");
107: session.save(action);
108: }
109: }
110: }
111:
112: public void reattachUnmodifiedJob(Job job) {
113: session.lock(job, LockMode.NONE);
114: }
115:
116: public void deleteJob(Job job) {
117: log.debug("deleting " + job);
118: session.delete(job);
119: }
120:
121: public Job loadJob(long jobId) {
122: try {
123: return (Job) session.load(Job.class, new Long(jobId));
124: } catch (Exception e) {
125: log.error(e);
126: throw new JbpmException(
127: "couldn't load job '" + jobId + "'", e);
128: }
129: }
130:
131: public Job getJob(long jobId) {
132: try {
133: return (Job) session.get(Job.class, new Long(jobId));
134: } catch (Exception e) {
135: log.error(e);
136: throw new JbpmException("couldn't get job '" + jobId + "'",
137: e);
138: }
139: }
140:
141: public void suspendJobs(Token token) {
142: try {
143: Query query = session
144: .getNamedQuery("JobSession.suspendJobs");
145: query.setParameter("token", token);
146: query.executeUpdate();
147:
148: } catch (Exception e) {
149: log.error(e);
150: throw new JbpmException("couldn't suspend jobs for "
151: + token, e);
152: }
153: }
154:
155: public void resumeJobs(Token token) {
156: try {
157: Query query = session
158: .getNamedQuery("JobSession.resumeJobs");
159: query.setParameter("token", token);
160: query.executeUpdate();
161:
162: } catch (Exception e) {
163: log.error(e);
164: throw new JbpmException(
165: "couldn't resume jobs for " + token, e);
166: }
167: }
168:
169: public void cancelTimersByName(String name, Token token) {
170: try {
171: // the bulk delete was replaced with a query and session.deletes on
172: // the retrieved elements to prevent stale object exceptions.
173: // With a bulk delete, the hibernate session is not aware and gives a problem
174: // if a later session.delete doesn't return 1.
175: Query query = session
176: .getNamedQuery("JobSession.getTimersByName");
177: query.setString("name", name);
178: query.setParameter("token", token);
179: List results = query.list();
180: if (results != null) {
181: Iterator iter = results.iterator();
182: while (iter.hasNext()) {
183: Timer timer = (Timer) iter.next();
184: log.debug("deleting timer " + timer + " by name "
185: + name);
186: session.delete(timer);
187: }
188: }
189:
190: } catch (Exception e) {
191: log.error(e);
192: throw new JbpmException("couldn't cancel timers '" + name
193: + "' for '" + token + "'", e);
194: }
195: }
196:
197: private class DeleteJobsSynchronization implements Synchronization,
198: Serializable {
199: private static final long serialVersionUID = 1L;
200: ProcessInstance processInstance;
201:
202: public DeleteJobsSynchronization(ProcessInstance processInstance) {
203: this .processInstance = processInstance;
204: }
205:
206: public void beforeCompletion() {
207: log.debug("deleting timers for process instance "
208: + processInstance);
209: Query query = session
210: .getNamedQuery("JobSession.deleteTimersForProcessInstance");
211: query.setParameter("processInstance", processInstance);
212: int result = query.executeUpdate();
213: log.debug(Integer.toString(result)
214: + " remaining timers for '" + processInstance
215: + "' are deleted");
216:
217: log
218: .debug("deleting execute-node-jobs for process instance "
219: + processInstance);
220: query = session
221: .getNamedQuery("JobSession.deleteExecuteNodeJobsForProcessInstance");
222: query.setParameter("processInstance", processInstance);
223: result = query.executeUpdate();
224: log.debug(Integer.toString(result)
225: + " remaining execute-node-jobs for '"
226: + processInstance + "' are deleted");
227: }
228:
229: public void afterCompletion(int arg0) {
230: }
231: }
232:
233: public void deleteJobsForProcessInstance(
234: ProcessInstance processInstance) {
235: try {
236: Transaction transaction = session.getTransaction();
237: transaction
238: .registerSynchronization(new DeleteJobsSynchronization(
239: processInstance));
240: } catch (Exception e) {
241: log.error(e);
242: throw new JbpmException("couldn't delete jobs for '"
243: + processInstance + "'", e);
244: }
245: }
246:
247: public List findJobsWithOverdueLockTime(Date treshold) {
248: Query query = session
249: .getNamedQuery("JobSession.findJobsWithOverdueLockTime");
250: query.setDate("now", treshold);
251: return query.list();
252: }
253:
254: private static Log log = LogFactory.getLog(JobSession.class);
255: }
|