001: package org.jbpm.command;
002:
003: import java.io.PrintWriter;
004: import java.io.StringWriter;
005: import java.util.ArrayList;
006: import java.util.Collection;
007: import java.util.Date;
008: import java.util.Iterator;
009: import java.util.List;
010:
011: import org.apache.commons.logging.Log;
012: import org.apache.commons.logging.LogFactory;
013: import org.hibernate.Hibernate;
014: import org.hibernate.StaleStateException;
015: import org.jbpm.JbpmContext;
016: import org.jbpm.JbpmException;
017: import org.jbpm.db.JobSession;
018: import org.jbpm.job.Job;
019: import org.jbpm.job.Timer;
020:
021: /**
022: * Execute all overdue Jobs (may be enhanced with more attributes in future if
023: * needed)
024: *
025: * @author ruecker
026: */
027: public class ExecuteJobsCommand implements Command {
028:
029: private static final long serialVersionUID = -2457066688404533959L;
030:
031: private static final Log log = LogFactory
032: .getLog(ExecuteJobsCommand.class);
033:
034: private static final int maxLockTime = 60000;
035:
036: private transient JbpmContext jbpmContext;
037:
038: public Object execute(JbpmContext jbpmContext) throws Exception {
039: this .jbpmContext = jbpmContext;
040: try {
041: Collection acquiredJobs = acquireJobs();
042:
043: if (!acquiredJobs.isEmpty()) {
044: Iterator iter = acquiredJobs.iterator();
045: while (iter.hasNext()) {
046: Job job = (Job) iter.next();
047: executeJob(job);
048: }
049: }
050:
051: // Job job = jbpmContext.getJobSession().getFirstAcquirableJob("");
052: // if (job != null) {
053: // log.info("execution job: " + job);
054: // job.execute(jbpmContext);
055: // }
056: } catch (JbpmException ex) {
057: log.warn("exception while executing job", ex);
058: }
059: this .jbpmContext = null;
060: return null;
061: }
062:
063: private String getName() {
064: return this .toString();
065: }
066:
067: protected Collection acquireJobs() {
068: Collection acquiredJobs = null;
069: Collection jobsToLock = new ArrayList();
070: log.debug("acquiring jobs for execution...");
071:
072: try {
073: JobSession jobSession = jbpmContext.getJobSession();
074: log.debug("querying for acquirable job...");
075: Job job = jobSession.getFirstAcquirableJob(getName());
076: if (job != null) {
077: if (job.isExclusive()) {
078: log
079: .debug("exclusive acquirable job found ("
080: + job
081: + "). querying for other exclusive jobs to lock them all in one tx...");
082: List otherExclusiveJobs = jobSession
083: .findExclusiveJobs(getName(), job
084: .getProcessInstance());
085: jobsToLock.addAll(otherExclusiveJobs);
086: log
087: .debug("trying to obtain a process-instance exclusive locks for '"
088: + otherExclusiveJobs + "'");
089: } else {
090: log.debug("trying to obtain a lock for '" + job
091: + "'");
092: jobsToLock.add(job);
093: }
094:
095: Iterator iter = jobsToLock.iterator();
096: while (iter.hasNext()) {
097: job = (Job) iter.next();
098: job.setLockOwner(getName());
099: job.setLockTime(new Date());
100: // jbpmContext.getSession().update(job);
101: }
102:
103: // HACKY HACK : this is a workaround for a hibernate problem that is fixed in hibernate 3.2.1
104: if (job instanceof Timer) {
105: Hibernate.initialize(((Timer) job)
106: .getGraphElement());
107: }
108:
109: } else {
110: log.debug("no acquirable jobs in job table");
111: }
112:
113: acquiredJobs = jobsToLock;
114: log.debug("obtained locks on following jobs: "
115: + acquiredJobs);
116:
117: } catch (StaleStateException e) {
118: log.debug("couldn't acquire lock on job(s): " + jobsToLock);
119: }
120: return acquiredJobs;
121: }
122:
123: protected void executeJob(Job job) {
124: JobSession jobSession = jbpmContext.getJobSession();
125: job = jobSession.loadJob(job.getId());
126:
127: try {
128: log.debug("executing job " + job);
129: if (job.execute(jbpmContext)) {
130: jobSession.deleteJob(job);
131: }
132:
133: } catch (Exception e) {
134: log.debug("exception while executing '" + job + "'", e);
135: StringWriter sw = new StringWriter();
136: e.printStackTrace(new PrintWriter(sw));
137: job.setException(sw.toString());
138: job.setRetries(job.getRetries() - 1);
139: }
140:
141: // if this job is locked too long
142: long totalLockTimeInMillis = System.currentTimeMillis()
143: - job.getLockTime().getTime();
144: if (totalLockTimeInMillis > maxLockTime) {
145: jbpmContext.setRollbackOnly();
146: }
147:
148: }
149:
150: protected Date getNextDueDate() {
151: Date nextDueDate = null;
152: JobSession jobSession = jbpmContext.getJobSession();
153: Job job = jobSession.getFirstDueJob(getName(), new ArrayList());
154: if (job != null) {
155: nextDueDate = job.getDueDate();
156: }
157: return nextDueDate;
158: }
159: }
|