001: package org.jbpm.job.executor;
002:
003: import java.io.PrintWriter;
004: import java.io.StringWriter;
005: import java.util.ArrayList;
006: import java.util.Collection;
007: import java.util.Date;
008: import java.util.Iterator;
009: import java.util.List;
010:
011: import org.apache.commons.logging.Log;
012: import org.apache.commons.logging.LogFactory;
013: import org.hibernate.Hibernate;
014: import org.hibernate.StaleStateException;
015: import org.jbpm.JbpmConfiguration;
016: import org.jbpm.JbpmContext;
017: import org.jbpm.db.JobSession;
018: import org.jbpm.job.Job;
019: import org.jbpm.job.Timer;
020: import org.jbpm.persistence.JbpmPersistenceException;
021: import org.jbpm.persistence.db.StaleObjectLogConfigurer;
022:
023: public class JobExecutorThread extends Thread {
024:
025: public JobExecutorThread(String name, JobExecutor jobExecutor,
026: JbpmConfiguration jbpmConfiguration, int idleInterval,
027: int maxIdleInterval, long maxLockTime, int maxHistory) {
028: super (name);
029: this .jobExecutor = jobExecutor;
030: this .jbpmConfiguration = jbpmConfiguration;
031: this .idleInterval = idleInterval;
032: this .maxIdleInterval = maxIdleInterval;
033: this .maxLockTime = maxLockTime;
034: this .maxHistory = maxHistory;
035: }
036:
037: JobExecutor jobExecutor;
038: JbpmConfiguration jbpmConfiguration;
039: int idleInterval;
040: int maxIdleInterval;
041: long maxLockTime;
042: int maxHistory;
043:
044: Collection history = new ArrayList();
045: int currentIdleInterval;
046: boolean isActive = true;
047:
048: public void run() {
049: try {
050: currentIdleInterval = idleInterval;
051: while (isActive) {
052: try {
053: Collection acquiredJobs = acquireJobs();
054:
055: if (!acquiredJobs.isEmpty()) {
056: Iterator iter = acquiredJobs.iterator();
057: while (iter.hasNext() && isActive) {
058: Job job = (Job) iter.next();
059: executeJob(job);
060: }
061:
062: } else { // no jobs acquired
063: if (isActive) {
064: long waitPeriod = getWaitPeriod();
065: if (waitPeriod > 0) {
066: synchronized (jobExecutor) {
067: jobExecutor.wait(waitPeriod);
068: }
069: }
070: }
071: }
072:
073: // no exception so resetting the currentIdleInterval
074: currentIdleInterval = idleInterval;
075:
076: } catch (InterruptedException e) {
077: log.info((isActive ? "active" : "inactivated")
078: + " job executor thread '" + getName()
079: + "' got interrupted");
080: } catch (Exception e) {
081: log.error(
082: "exception in job executor thread. waiting "
083: + currentIdleInterval
084: + " milliseconds", e);
085: try {
086: synchronized (jobExecutor) {
087: jobExecutor.wait(currentIdleInterval);
088: }
089: } catch (InterruptedException e2) {
090: log
091: .debug(
092: "delay after exception got interrupted",
093: e2);
094: }
095: // after an exception, the current idle interval is doubled to prevent
096: // continuous exception generation when e.g. the db is unreachable
097: currentIdleInterval = currentIdleInterval * 2;
098: }
099: }
100: } catch (Throwable t) {
101: t.printStackTrace();
102: } finally {
103: log.info(getName() + " leaves cyberspace");
104: }
105: }
106:
107: protected Collection acquireJobs() {
108: Collection acquiredJobs = null;
109: synchronized (jobExecutor) {
110: Collection jobsToLock = new ArrayList();
111: log.debug("acquiring jobs for execution...");
112: JbpmContext jbpmContext = jbpmConfiguration
113: .createJbpmContext();
114: try {
115: try {
116: JobSession jobSession = jbpmContext.getJobSession();
117: log.debug("querying for acquirable job...");
118: Job job = jobSession
119: .getFirstAcquirableJob(getName());
120: if (job != null) {
121: if (job.isExclusive()) {
122: log
123: .debug("exclusive acquirable job found ("
124: + job
125: + "). querying for other exclusive jobs to lock them all in one tx...");
126: List otherExclusiveJobs = jobSession
127: .findExclusiveJobs(getName(), job
128: .getProcessInstance());
129: jobsToLock.addAll(otherExclusiveJobs);
130: log
131: .debug("trying to obtain a process-instance exclusive locks for '"
132: + otherExclusiveJobs + "'");
133: } else {
134: log.debug("trying to obtain a lock for '"
135: + job + "'");
136: jobsToLock.add(job);
137: }
138:
139: Iterator iter = jobsToLock.iterator();
140: while (iter.hasNext()) {
141: job = (Job) iter.next();
142: job.setLockOwner(getName());
143: job.setLockTime(new Date());
144: // jbpmContext.getSession().update(job);
145: }
146:
147: // HACKY HACK : this is a workaround for a hibernate problem that is fixed in hibernate 3.2.1
148: if (job instanceof Timer) {
149: Hibernate.initialize(((Timer) job)
150: .getGraphElement());
151: }
152:
153: } else {
154: log.debug("no acquirable jobs in job table");
155: }
156:
157: } finally {
158: jbpmContext.close();
159: }
160: acquiredJobs = jobsToLock;
161: log.debug("obtained locks on following jobs: "
162: + acquiredJobs);
163:
164: } catch (StaleStateException e) {
165: log.debug("couldn't acquire lock on job(s): "
166: + jobsToLock);
167: }
168: }
169: return acquiredJobs;
170: }
171:
172: protected void executeJob(Job job) {
173: JbpmContext jbpmContext = jbpmConfiguration.createJbpmContext();
174: try {
175: JobSession jobSession = jbpmContext.getJobSession();
176: job = jobSession.loadJob(job.getId());
177:
178: try {
179: log.debug("executing job " + job);
180: if (job.execute(jbpmContext)) {
181: jobSession.deleteJob(job);
182: }
183:
184: } catch (Exception e) {
185: log.debug("exception while executing '" + job + "'", e);
186: StringWriter sw = new StringWriter();
187: e.printStackTrace(new PrintWriter(sw));
188: job.setException(sw.toString());
189: job.setRetries(job.getRetries() - 1);
190: }
191:
192: // if this job is locked too long
193: long totalLockTimeInMillis = System.currentTimeMillis()
194: - job.getLockTime().getTime();
195: if (totalLockTimeInMillis > maxLockTime) {
196: jbpmContext.setRollbackOnly();
197: }
198:
199: } finally {
200: try {
201: jbpmContext.close();
202: } catch (JbpmPersistenceException e) {
203: // if this is a stale object exception, the jbpm configuration has control over the logging
204: if ("org.hibernate.StaleObjectStateException".equals(e
205: .getCause().getClass().getName())) {
206: log
207: .info("problem committing job execution transaction: optimistic locking failed");
208: StaleObjectLogConfigurer.staleObjectExceptionsLog
209: .error(
210: "problem committing job execution transaction: optimistic locking failed",
211: e);
212: } else {
213: log
214: .error(
215: "problem committing job execution transaction",
216: e);
217: }
218: } catch (RuntimeException e) {
219: log.error(
220: "problem committing job execution transaction",
221: e);
222:
223: throw e;
224: }
225: }
226: }
227:
228: protected Date getNextDueDate() {
229: Date nextDueDate = null;
230: JbpmContext jbpmContext = jbpmConfiguration.createJbpmContext();
231: try {
232: JobSession jobSession = jbpmContext.getJobSession();
233: Collection jobIdsToIgnore = jobExecutor
234: .getMonitoredJobIds();
235: Job job = jobSession.getFirstDueJob(getName(),
236: jobIdsToIgnore);
237: if (job != null) {
238: nextDueDate = job.getDueDate();
239: jobExecutor.addMonitoredJobId(getName(), job.getId());
240: }
241: } finally {
242: jbpmContext.close();
243: }
244: return nextDueDate;
245: }
246:
247: protected long getWaitPeriod() {
248: long interval = currentIdleInterval;
249: Date nextDueDate = getNextDueDate();
250: if (nextDueDate != null) {
251: long currentTimeMillis = System.currentTimeMillis();
252: long nextDueDateTime = nextDueDate.getTime();
253: if (nextDueDateTime < currentTimeMillis
254: + currentIdleInterval) {
255: interval = nextDueDateTime - currentTimeMillis;
256: }
257: }
258: if (interval < 0) {
259: interval = 0;
260: }
261: return interval;
262: }
263:
264: public void setActive(boolean isActive) {
265: this .isActive = isActive;
266: }
267:
268: private static Log log = LogFactory.getLog(JobExecutorThread.class);
269: }
|