001: /*******************************************************************************
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: *******************************************************************************/package org.ofbiz.service.job;
019:
020: import java.util.Date;
021:
022: import org.ofbiz.service.config.ServiceConfigUtil;
023: import org.ofbiz.base.util.Debug;
024: import org.ofbiz.base.util.UtilDateTime;
025: import org.ofbiz.entity.transaction.TransactionUtil;
026: import org.ofbiz.entity.transaction.GenericTransactionException;
027:
028: /**
029: * JobInvoker
030: */
031: public class JobInvoker implements Runnable {
032:
033: public static final String module = JobInvoker.class.getName();
034: public static final long THREAD_TTL = 18000000;
035: public static final int WAIT_TIME = 750;
036:
037: private JobPoller jp = null;
038: private Thread thread = null;
039: private Date created = null;
040: private String name = null;
041: private int count = 0;
042: private int wait = 0;
043:
044: private volatile boolean run = false;
045: private volatile Job currentJob = null;
046: private volatile int statusCode = 0;
047: private volatile long jobStart = 0;
048:
049: public JobInvoker(JobPoller jp) {
050: this (jp, WAIT_TIME);
051: }
052:
053: public JobInvoker(JobPoller jp, int wait) {
054: this .created = new Date();
055: this .run = true;
056: this .count = 0;
057: this .jp = jp;
058: this .wait = wait;
059:
060: // service dispatcher delegator name (for thread name)
061: String delegatorName = jp.getManager().getDelegator()
062: .getDelegatorName();
063:
064: // get a new thread
065: this .thread = new Thread(this );
066: this .name = delegatorName + "-invoker-" + this .thread.getName();
067:
068: this .thread.setDaemon(false);
069: this .thread.setName(this .name);
070:
071: if (Debug.verboseOn())
072: Debug.logVerbose("JobInoker: Starting Invoker Thread -- "
073: + thread.getName(), module);
074: this .thread.start();
075: }
076:
077: protected JobInvoker() {
078: }
079:
080: /**
081: * Tells the thread to stop after the next job.
082: */
083: public void stop() {
084: run = false;
085: }
086:
087: /**
088: * Wakes up this thread.
089: */
090: public void wakeUp() {
091: notifyAll();
092: }
093:
094: /**
095: * Gets the number of times this thread was used.
096: * @return The number of times used.
097: */
098: public int getUsage() {
099: return count;
100: }
101:
102: /**
103: * Gets the time when this thread was created.
104: * @return Time in milliseconds when this was created.
105: */
106: public long getTime() {
107: return created.getTime();
108: }
109:
110: /**
111: * Gets the name of this JobInvoker.
112: * @return Name of the invoker.
113: */
114: public String getName() {
115: return this .name;
116: }
117:
118: /**
119: * Gets the status code for this thread (0 = sleeping, 1 = running job)
120: * @return 0 for sleeping or 1 when running a job.
121: */
122: public int getCurrentStatus() {
123: return this .statusCode;
124: }
125:
126: /**
127: * Gets the total time the current job has been running or 0 when sleeping.
128: * @return Total time the curent job has been running.
129: */
130: public long getCurrentRuntime() {
131: if (this .jobStart > 0) {
132: long now = System.currentTimeMillis();
133: return now - this .jobStart;
134: } else {
135: return 0;
136: }
137: }
138:
139: /**
140: * Get the current running job's ID.
141: * @return String ID of the current running job.
142: */
143: public String getJobId() {
144: if (this .statusCode == 1) {
145: if (this .currentJob != null) {
146: return this .currentJob.getJobId();
147: } else {
148: return "WARNING: Invalid Job!";
149: }
150: } else {
151: return null;
152: }
153: }
154:
155: /**
156: * Get the current running job's name.
157: * @return String name of the current running job.
158: */
159: public String getJobName() {
160: if (this .statusCode == 1) {
161: if (this .currentJob != null) {
162: return this .currentJob.getJobName();
163: } else {
164: return "WARNING: Invalid Job!";
165: }
166: } else {
167: return null;
168: }
169: }
170:
171: /**
172: * Returns the name of the service being run.
173: * @return The name of the service being run.
174: */
175: public String getServiceName() {
176: String serviceName = null;
177: if (this .statusCode == 1) {
178: if (this .currentJob != null) {
179: if (this .currentJob instanceof GenericServiceJob) {
180: GenericServiceJob gsj = (GenericServiceJob) this .currentJob;
181: try {
182: serviceName = gsj.getServiceName();
183: } catch (InvalidJobException e) {
184: Debug.logError(e, module);
185: }
186: }
187: }
188: }
189: return serviceName;
190: }
191:
192: /**
193: * Kill this invoker thread.s
194: */
195: public void kill() {
196: this .stop();
197: this .statusCode = -1;
198: this .thread.interrupt();
199: this .thread = null;
200: }
201:
202: public synchronized void run() {
203: while (run) {
204: Job job = jp.next();
205:
206: if (job == null) {
207: try {
208: wait(wait);
209: } catch (InterruptedException ie) {
210: Debug.logError(ie,
211: "JobInvoker.run() : InterruptedException",
212: module);
213: stop();
214: }
215: } else {
216: Debug.log("Invoker: " + thread.getName()
217: + " received job -- " + job.getJobName()
218: + " from poller - " + jp.toString(), module);
219:
220: // setup the current job settings
221: this .currentJob = job;
222: this .statusCode = 1;
223: this .jobStart = System.currentTimeMillis();
224:
225: // execute the job
226: if (Debug.verboseOn())
227: Debug.logVerbose("Invoker: " + thread.getName()
228: + " executing job -- " + job.getJobName(),
229: module);
230: try {
231: job.exec();
232: } catch (InvalidJobException e) {
233: Debug.logWarning(e.getMessage(), module);
234: }
235: if (Debug.verboseOn())
236: Debug.logVerbose("Invoker: " + thread.getName()
237: + " finished executing job -- "
238: + job.getJobName(), module);
239:
240: // clear the current job settings
241: this .currentJob = null;
242: this .statusCode = 0;
243: this .jobStart = 0;
244:
245: // sanity check; make sure we don't have any transactions in place
246: try {
247: // roll back current TX first
248: if (TransactionUtil.isTransactionInPlace()) {
249: Debug
250: .logWarning(
251: "*** NOTICE: JobInvoker finished w/ a transaction in place! Rolling back.",
252: module);
253: TransactionUtil.rollback();
254: }
255:
256: // now resume/rollback any suspended txs
257: if (TransactionUtil.suspendedTransactionsHeld()) {
258: int suspended = TransactionUtil
259: .cleanSuspendedTransactions();
260: Debug
261: .logWarning(
262: "Resumed/Rolled Back ["
263: + suspended
264: + "] transactions.",
265: module);
266: }
267: } catch (GenericTransactionException e) {
268: Debug.logWarning(e, module);
269: }
270:
271: // increment the count
272: count++;
273: if (Debug.verboseOn())
274: Debug.logVerbose("Invoker: " + thread.getName()
275: + " (" + count + ") total.", module);
276: }
277: long diff = (new Date().getTime() - this .getTime());
278:
279: if (getTTL() > 0 && diff > getTTL())
280: jp.removeThread(this );
281: }
282: if (Debug.verboseOn())
283: Debug
284: .logVerbose(
285: "Invoker: " + thread.getName()
286: + " dead -- "
287: + UtilDateTime.nowTimestamp(),
288: module);
289: }
290:
291: private long getTTL() {
292: long ttl = THREAD_TTL;
293:
294: try {
295: ttl = Long.parseLong(ServiceConfigUtil.getElementAttr(
296: "thread-pool", "ttl"));
297: } catch (NumberFormatException nfe) {
298: Debug.logError(
299: "Problems reading values from serviceengine.xml file ["
300: + nfe.toString() + "]. Using defaults.",
301: module);
302: }
303: return ttl;
304: }
305: }
|