001: package org.jbpm.job.executor;
002:
003: import java.util.Date;
004: import java.util.Iterator;
005: import java.util.List;
006:
007: import org.apache.commons.logging.Log;
008: import org.apache.commons.logging.LogFactory;
009: import org.jbpm.JbpmConfiguration;
010: import org.jbpm.JbpmContext;
011: import org.jbpm.db.JobSession;
012: import org.jbpm.job.Job;
013:
014: public class LockMonitorThread extends Thread {
015:
016: JbpmConfiguration jbpmConfiguration;
017: int lockMonitorInterval;
018: int maxLockTime;
019: int lockBufferTime;
020:
021: boolean isActive = true;
022:
023: public LockMonitorThread(JbpmConfiguration jbpmConfiguration,
024: int lockMonitorInterval, int maxLockTime, int lockBufferTime) {
025: this .jbpmConfiguration = jbpmConfiguration;
026: this .lockMonitorInterval = lockMonitorInterval;
027: this .maxLockTime = maxLockTime;
028: this .lockBufferTime = lockBufferTime;
029: }
030:
031: public void run() {
032: try {
033: while (isActive) {
034: try {
035: unlockOverdueJobs();
036: if ((isActive) && (lockMonitorInterval > 0)) {
037: sleep(lockMonitorInterval);
038: }
039: } catch (InterruptedException e) {
040: log.info("lock monitor thread '" + getName()
041: + "' got interrupted");
042: } catch (Exception e) {
043: log.error(
044: "exception in lock monitor thread. waiting "
045: + lockMonitorInterval
046: + " milliseconds", e);
047: try {
048: sleep(lockMonitorInterval);
049: } catch (InterruptedException e2) {
050: log
051: .debug(
052: "delay after exception got interrupted",
053: e2);
054: }
055: }
056: }
057: } catch (Throwable t) {
058: t.printStackTrace();
059: } finally {
060: log.info(getName() + " leaves cyberspace");
061: }
062: }
063:
064: protected void unlockOverdueJobs() {
065: JbpmContext jbpmContext = jbpmConfiguration.createJbpmContext();
066: try {
067: JobSession jobSession = jbpmContext.getJobSession();
068:
069: Date treshold = new Date(System.currentTimeMillis()
070: - maxLockTime - lockBufferTime);
071: List jobsWithOverdueLockTime = jobSession
072: .findJobsWithOverdueLockTime(treshold);
073: Iterator iter = jobsWithOverdueLockTime.iterator();
074: while (iter.hasNext()) {
075: Job job = (Job) iter.next();
076: // unlock
077: log.debug("unlocking " + job + " owned by thread "
078: + job.getLockOwner());
079: job.setLockOwner(null);
080: job.setLockTime(null);
081: jobSession.saveJob(job);
082: }
083:
084: } finally {
085: try {
086: jbpmContext.close();
087: } catch (RuntimeException e) {
088: log.error(
089: "problem committing job execution transaction",
090: e);
091: throw e;
092: }
093: }
094: }
095:
096: public void setActive(boolean isActive) {
097: this .isActive = isActive;
098: }
099:
100: private static Log log = LogFactory.getLog(LockMonitorThread.class);
101: }
|