001: /*
002: * $Id: JobPoller.java,v 1.6 2004/01/24 22:02:16 ajzeneski Exp $
003: *
004: * Copyright (c) 2002 The Open For Business Project - www.ofbiz.org
005: *
006: * Permission is hereby granted, free of charge, to any person obtaining a
007: * copy of this software and associated documentation files (the "Software"),
008: * to deal in the Software without restriction, including without limitation
009: * the rights to use, copy, modify, merge, publish, distribute, sublicense,
010: * and/or sell copies of the Software, and to permit persons to whom the
011: * Software is furnished to do so, subject to the following conditions:
012: *
013: * The above copyright notice and this permission notice shall be included
014: * in all copies or substantial portions of the Software.
015: *
016: * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
017: * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
018: * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
019: * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
020: * CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT
021: * OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
022: * THE USE OR OTHER DEALINGS IN THE SOFTWARE.
023: *
024: */
025: package org.ofbiz.service.job;
026:
027: import java.util.*;
028:
029: import org.ofbiz.service.config.ServiceConfigUtil;
030: import org.ofbiz.base.util.Debug;
031:
032: /**
033: * JobPoller - Polls for persisted jobs to run.
034: *
035: * @author <a href="mailto:jaz@ofbiz.org">Andy Zeneski</a>
036: * @author <a href="mailto:">Magnus Rosenquist</a>
037: * @version $Revision: 1.6 $
038: * @since 2.0
039: */
040: public class JobPoller implements Runnable {
041:
042: public static final String module = JobPoller.class.getName();
043:
044: public static final int MIN_THREADS = 1;
045: public static final int MAX_THREADS = 15;
046: public static final int MAX_JOBS = 3;
047: public static final int POLL_WAIT = 20000;
048: //public static final long MAX_TTL = 18000000;
049:
050: protected Thread thread = null;
051: protected LinkedList pool = null;
052: protected LinkedList run = null;
053: protected JobManager jm = null;
054:
055: protected volatile boolean isRunning = false;
056:
057: /**
058: * Creates a new JobScheduler
059: * @param jm JobManager associated with this scheduler
060: */
061: public JobPoller(JobManager jm) {
062: this .jm = jm;
063: this .run = new LinkedList();
064:
065: // create the thread pool
066: this .pool = createThreadPool();
067:
068: // re-load crashed jobs
069: this .jm.reloadCrashedJobs();
070:
071: // start the thread only if polling is enabled
072: if (pollEnabled()) {
073:
074: // create the poller thread
075: thread = new Thread(this , this .toString());
076: thread.setDaemon(false);
077:
078: // start the poller
079: this .isRunning = true;
080: thread.start();
081: }
082: }
083:
084: protected JobPoller() {
085: }
086:
087: public synchronized void run() {
088: if (Debug.infoOn())
089: Debug.logInfo("JobPoller: (" + thread.getName()
090: + ") Thread Running...", module);
091: try {
092: // wait 30 seconds before the first poll
093: wait(30000);
094: } catch (InterruptedException e) {
095: }
096: while (isRunning) {
097: try {
098: // grab a list of jobs to run.
099: Iterator poll = jm.poll();
100:
101: while (poll.hasNext()) {
102: Job job = (Job) poll.next();
103:
104: if (job.isValid())
105: queueNow(job);
106: }
107: wait(pollWaitTime());
108: } catch (InterruptedException e) {
109: Debug.logError(e, module);
110: stop();
111: }
112: }
113: if (Debug.infoOn())
114: Debug.logInfo("JobPoller: (" + thread.getName()
115: + ") Thread ending...", module);
116: }
117:
118: /**
119: * Returns the JobManager
120: */
121: public JobManager getManager() {
122: return jm;
123: }
124:
125: /**
126: * Stops the JobPoller
127: */
128: public void stop() {
129: isRunning = false;
130: destroyThreadPool();
131: }
132:
133: public List getPoolState() {
134: List stateList = new ArrayList();
135: Iterator i = this .pool.iterator();
136: while (i.hasNext()) {
137: JobInvoker invoker = (JobInvoker) i.next();
138: Map stateMap = new HashMap();
139: stateMap.put("threadName", invoker.getName());
140: stateMap.put("jobName", invoker.getJobName());
141: stateMap.put("serviceName", invoker.getServiceName());
142: stateMap.put("runTime", new Long(invoker
143: .getCurrentRuntime()));
144: stateMap.put("status", new Integer(invoker
145: .getCurrentStatus()));
146: stateList.add(stateMap);
147: }
148: return stateList;
149: }
150:
151: /**
152: * Stops all threads in the threadPool and clears
153: * the pool as final step.
154: */
155: private void destroyThreadPool() {
156: Debug.logInfo("Destroying thread pool...", module);
157: Iterator it = pool.iterator();
158: while (it.hasNext()) {
159: JobInvoker ji = (JobInvoker) it.next();
160: ji.stop();
161: }
162: pool.clear();
163: }
164:
165: public synchronized void killThread(String threadName) {
166: JobInvoker inv = findThread(threadName);
167: if (inv != null) {
168: inv.kill();
169: this .pool.remove(inv);
170: }
171: }
172:
173: private JobInvoker findThread(String threadName) {
174: Iterator i = this .pool.iterator();
175: while (i.hasNext()) {
176: JobInvoker inv = (JobInvoker) i.next();
177: if (threadName.equals(inv.getName())) {
178: return inv;
179: }
180: }
181: return null;
182: }
183:
184: /**
185: * Returns the next job to run
186: */
187: public synchronized Job next() {
188: if (run.size() > 0)
189: return (Job) run.removeFirst();
190: return null;
191: }
192:
193: /**
194: * Adds a job to the RUN queue
195: */
196: public synchronized void queueNow(Job job) {
197: run.add(job);
198: if (Debug.verboseOn())
199: Debug.logVerbose("New run queue size: " + run.size(),
200: module);
201: if (run.size() > pool.size() && pool.size() < maxThreads()) {
202: int calcSize = (run.size() / jobsPerThread())
203: - (pool.size());
204: int addSize = calcSize > maxThreads() ? maxThreads()
205: : calcSize;
206:
207: for (int i = 0; i < addSize; i++) {
208: JobInvoker iv = new JobInvoker(this , invokerWaitTime());
209: pool.add(iv);
210: }
211: }
212: }
213:
214: /**
215: * Removes a thread from the pool.
216: * @param invoker The invoker to remove.
217: */
218: public synchronized void removeThread(JobInvoker invoker) {
219: pool.remove(invoker);
220: invoker.stop();
221: if (pool.size() < minThreads()) {
222: for (int i = 0; i < minThreads() - pool.size(); i++) {
223: JobInvoker iv = new JobInvoker(this , invokerWaitTime());
224: pool.add(iv);
225: }
226: }
227: }
228:
229: // Creates the invoker pool
230: private LinkedList createThreadPool() {
231: LinkedList threadPool = new LinkedList();
232:
233: while (threadPool.size() < minThreads()) {
234: JobInvoker iv = new JobInvoker(this , invokerWaitTime());
235: threadPool.add(iv);
236: }
237:
238: return threadPool;
239: }
240:
241: private int maxThreads() {
242: int max = MAX_THREADS;
243:
244: try {
245: max = Integer.parseInt(ServiceConfigUtil.getElementAttr(
246: "thread-pool", "max-threads"));
247: } catch (NumberFormatException nfe) {
248: Debug.logError(
249: "Problems reading values from serviceengine.xml file ["
250: + nfe.toString() + "]. Using defaults.",
251: module);
252: }
253: return max;
254: }
255:
256: private int minThreads() {
257: int min = MIN_THREADS;
258:
259: try {
260: min = Integer.parseInt(ServiceConfigUtil.getElementAttr(
261: "thread-pool", "min-threads"));
262: } catch (NumberFormatException nfe) {
263: Debug.logError(
264: "Problems reading values from serviceengine.xml file ["
265: + nfe.toString() + "]. Using defaults.",
266: module);
267: }
268: return min;
269: }
270:
271: private int jobsPerThread() {
272: int jobs = MAX_JOBS;
273:
274: try {
275: jobs = Integer.parseInt(ServiceConfigUtil.getElementAttr(
276: "thread-pool", "jobs"));
277: } catch (NumberFormatException nfe) {
278: Debug.logError(
279: "Problems reading values from serviceengine.xml file ["
280: + nfe.toString() + "]. Using defaults.",
281: module);
282: }
283: return jobs;
284: }
285:
286: private int invokerWaitTime() {
287: int wait = JobInvoker.WAIT_TIME;
288:
289: try {
290: wait = Integer.parseInt(ServiceConfigUtil.getElementAttr(
291: "thread-pool", "wait-millis"));
292: } catch (NumberFormatException nfe) {
293: Debug.logError(
294: "Problems reading values from serviceengine.xml file ["
295: + nfe.toString() + "]. Using defaults.",
296: module);
297: }
298: return wait;
299: }
300:
301: private int pollWaitTime() {
302: int poll = POLL_WAIT;
303:
304: try {
305: poll = Integer.parseInt(ServiceConfigUtil.getElementAttr(
306: "thread-pool", "poll-db-millis"));
307: } catch (NumberFormatException nfe) {
308: Debug.logError(
309: "Problems reading values from serviceengine.xml file ["
310: + nfe.toString() + "]. Using defaults.",
311: module);
312: }
313: return poll;
314: }
315:
316: private boolean pollEnabled() {
317: String enabled = ServiceConfigUtil.getElementAttr(
318: "thread-pool", "poll-enabled");
319:
320: if (enabled.equalsIgnoreCase("false"))
321: return false;
322:
323: // also make sure we have a delegator to use for polling
324: if (jm.getDelegator() == null) {
325: Debug
326: .logWarning(
327: "No delegator referenced; not starting job poller.",
328: module);
329: return false;
330: }
331:
332: return true;
333: }
334: }
|