001: package org.jbpm.job.executor;
002:
003: import java.io.Serializable;
004: import java.net.InetAddress;
005: import java.util.ArrayList;
006: import java.util.Collections;
007: import java.util.HashMap;
008: import java.util.HashSet;
009: import java.util.Iterator;
010: import java.util.List;
011: import java.util.Map;
012: import java.util.Set;
013:
014: import org.apache.commons.logging.Log;
015: import org.apache.commons.logging.LogFactory;
016: import org.jbpm.JbpmConfiguration;
017:
018: public class JobExecutor implements Serializable {
019:
020: private static final long serialVersionUID = 1L;
021:
022: JbpmConfiguration jbpmConfiguration;
023: String name;
024: int nbrOfThreads;
025: int idleInterval;
026: int maxIdleInterval;
027: int historyMaxSize;
028:
029: int maxLockTime;
030: int lockMonitorInterval;
031: int lockBufferTime;
032:
033: Map threads = new HashMap();
034: LockMonitorThread lockMonitorThread;
035: Map monitoredJobIds = Collections.synchronizedMap(new HashMap());
036:
037: boolean isStarted = false;
038:
039: public synchronized void start() {
040: if (!isStarted) {
041: log.debug("starting thread group '" + name + "'...");
042: for (int i = 0; i < nbrOfThreads; i++) {
043: startThread();
044: }
045: isStarted = true;
046: } else {
047: log.debug("ignoring start: thread group '" + name
048: + "' is already started'");
049: }
050:
051: lockMonitorThread = new LockMonitorThread(jbpmConfiguration,
052: lockMonitorInterval, maxLockTime, lockBufferTime);
053: }
054:
055: /**
056: * signals to all threads in this job executor to stop. It may be that
057: * threads are in the middle of something and they will finish that firts.
058: * Use {@link #stopAndJoin()} in case you want a method that blocks until
059: * all the threads are actually finished.
060: * @return a list of all the stopped threads. In case no threads were stopped
061: * an empty list will be returned.
062: */
063: public synchronized List stop() {
064: List stoppedThreads = new ArrayList(threads.size());
065: if (isStarted) {
066: log.debug("stopping thread group '" + name + "'...");
067: for (int i = 0; i < nbrOfThreads; i++) {
068: stoppedThreads.add(stopThread());
069: }
070: isStarted = false;
071: } else {
072: log.debug("ignoring stop: thread group '" + name
073: + "' not started");
074: }
075: return stoppedThreads;
076: }
077:
078: public void stopAndJoin() throws InterruptedException {
079: Iterator iter = stop().iterator();
080: while (iter.hasNext()) {
081: Thread thread = (Thread) iter.next();
082: thread.join();
083: }
084: }
085:
086: protected synchronized void startThread() {
087: String threadName = getNextThreadName();
088: Thread thread = new JobExecutorThread(threadName, this ,
089: jbpmConfiguration, idleInterval, maxIdleInterval,
090: maxLockTime, historyMaxSize);
091: threads.put(threadName, thread);
092: log.debug("starting new job executor thread '" + threadName
093: + "'");
094: thread.start();
095: }
096:
097: protected String getNextThreadName() {
098: return getThreadName(threads.size() + 1);
099: }
100:
101: protected String getLastThreadName() {
102: return getThreadName(threads.size());
103: }
104:
105: private String getThreadName(int index) {
106: return name + ":" + getHostName() + ":" + index;
107: }
108:
109: private String getHostName() {
110: try {
111: return InetAddress.getLocalHost().getHostAddress();
112: } catch (Exception e) {
113: return "unknown";
114: }
115: }
116:
117: protected synchronized Thread stopThread() {
118: String threadName = getLastThreadName();
119: JobExecutorThread thread = (JobExecutorThread) threads
120: .remove(threadName);
121: log.debug("removing job executor thread '" + threadName + "'");
122: thread.setActive(false);
123: thread.interrupt();
124: return thread;
125: }
126:
127: public Set getMonitoredJobIds() {
128: return new HashSet(monitoredJobIds.values());
129: }
130:
131: public void addMonitoredJobId(String threadName, long jobId) {
132: monitoredJobIds.put(threadName, new Long(jobId));
133: }
134:
135: public void removeMonitoredJobId(String threadName) {
136: monitoredJobIds.remove(threadName);
137: }
138:
139: public int getHistoryMaxSize() {
140: return historyMaxSize;
141: }
142:
143: public int getIdleInterval() {
144: return idleInterval;
145: }
146:
147: public boolean isStarted() {
148: return isStarted;
149: }
150:
151: public JbpmConfiguration getJbpmConfiguration() {
152: return jbpmConfiguration;
153: }
154:
155: public int getMaxIdleInterval() {
156: return maxIdleInterval;
157: }
158:
159: public String getName() {
160: return name;
161: }
162:
163: public int getSize() {
164: return nbrOfThreads;
165: }
166:
167: public Map getThreads() {
168: return threads;
169: }
170:
171: public int getMaxLockTime() {
172: return maxLockTime;
173: }
174:
175: public int getLockBufferTime() {
176: return lockBufferTime;
177: }
178:
179: public int getLockMonitorInterval() {
180: return lockMonitorInterval;
181: }
182:
183: public int getNbrOfThreads() {
184: return nbrOfThreads;
185: }
186:
187: private static Log log = LogFactory.getLog(JobExecutor.class);
188: }
|