001: /*
002: * $Id: JobManager.java,v 1.12 2004/01/24 19:37:53 ajzeneski Exp $
003: *
004: * Copyright (c) 2001, 2002 The Open For Business Project - www.ofbiz.org
005: *
006: * Permission is hereby granted, free of charge, to any person obtaining a
007: * copy of this software and associated documentation files (the "Software"),
008: * to deal in the Software without restriction, including without limitation
009: * the rights to use, copy, modify, merge, publish, distribute, sublicense,
010: * and/or sell copies of the Software, and to permit persons to whom the
011: * Software is furnished to do so, subject to the following conditions:
012: *
013: * The above copyright notice and this permission notice shall be included
014: * in all copies or substantial portions of the Software.
015: *
016: * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
017: * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
018: * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
019: * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
020: * CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT
021: * OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
022: * THE USE OR OTHER DEALINGS IN THE SOFTWARE.
023: *
024: */
025: package org.ofbiz.service.job;
026:
027: import java.io.IOException;
028: import java.util.ArrayList;
029: import java.util.Collection;
030: import java.util.Date;
031: import java.util.Iterator;
032: import java.util.List;
033: import java.util.Map;
034: import java.sql.Timestamp;
035:
036: import org.ofbiz.base.util.Debug;
037: import org.ofbiz.base.util.UtilDateTime;
038: import org.ofbiz.base.util.UtilMisc;
039: import org.ofbiz.base.util.UtilProperties;
040: import org.ofbiz.entity.GenericDelegator;
041: import org.ofbiz.entity.GenericEntityException;
042: import org.ofbiz.entity.GenericValue;
043: import org.ofbiz.entity.condition.EntityCondition;
044: import org.ofbiz.entity.condition.EntityConditionList;
045: import org.ofbiz.entity.condition.EntityExpr;
046: import org.ofbiz.entity.condition.EntityOperator;
047: import org.ofbiz.entity.serialize.SerializeException;
048: import org.ofbiz.entity.serialize.XmlSerializer;
049: import org.ofbiz.entity.transaction.GenericTransactionException;
050: import org.ofbiz.entity.transaction.TransactionUtil;
051: import org.ofbiz.service.DispatchContext;
052: import org.ofbiz.service.GenericDispatcher;
053: import org.ofbiz.service.GenericServiceException;
054: import org.ofbiz.service.LocalDispatcher;
055: import org.ofbiz.service.calendar.RecurrenceInfo;
056: import org.ofbiz.service.calendar.RecurrenceInfoException;
057: import org.ofbiz.service.config.ServiceConfigUtil;
058:
059: /**
060: * JobManager
061: *
062: * @author <a href="mailto:jaz@ofbiz.org">Andy Zeneski</a>
063: * @version $Revision: 1.12 $
064: * @since 2.0
065: */
066: public class JobManager {
067:
068: public static final String module = JobManager.class.getName();
069: public static final String dispatcherName = "JobDispatcher";
070:
071: protected GenericDelegator delegator;
072: protected JobPoller jp;
073:
074: /** Creates a new JobManager object. */
075: public JobManager(GenericDelegator delegator) {
076: this .delegator = delegator;
077: jp = new JobPoller(this );
078: }
079:
080: /** Queues a Job to run now. */
081: public void runJob(Job job) throws JobManagerException {
082: if (job.isValid())
083: jp.queueNow(job);
084: }
085:
086: /** Returns the ServiceDispatcher. */
087: public LocalDispatcher getDispatcher() {
088: LocalDispatcher this Dispatcher = null;
089: try {
090: this Dispatcher = GenericDispatcher.getLocalDispatcher(
091: dispatcherName, delegator);
092: } catch (GenericServiceException e) {
093: Debug.logError(e, module);
094: }
095: return this Dispatcher;
096: }
097:
098: /** Returns the GenericDelegator. */
099: public GenericDelegator getDelegator() {
100: return this .delegator;
101: }
102:
103: public synchronized Iterator poll() {
104: List poll = new ArrayList();
105: Collection jobEnt = null;
106:
107: // sort the results by time
108: List order = UtilMisc.toList("runTime");
109:
110: // basic query
111: List expressions = UtilMisc.toList(new EntityExpr("runTime",
112: EntityOperator.LESS_THAN_EQUAL_TO, UtilDateTime
113: .nowTimestamp()), new EntityExpr(
114: "startDateTime", EntityOperator.EQUALS, null),
115: new EntityExpr("cancelDateTime", EntityOperator.EQUALS,
116: null));
117:
118: // limit to just defined pools
119: List pools = ServiceConfigUtil.getRunPools();
120: List poolsExpr = UtilMisc.toList(new EntityExpr("poolId",
121: EntityOperator.EQUALS, null));
122: if (pools != null) {
123: Iterator poolsIter = pools.iterator();
124: while (poolsIter.hasNext()) {
125: String poolName = (String) poolsIter.next();
126: poolsExpr.add(new EntityExpr("poolId",
127: EntityOperator.EQUALS, poolName));
128: }
129: }
130:
131: // make the conditions
132: EntityCondition baseCondition = new EntityConditionList(
133: expressions, EntityOperator.AND);
134: EntityCondition poolCondition = new EntityConditionList(
135: poolsExpr, EntityOperator.OR);
136: EntityCondition mainCondition = new EntityConditionList(
137: UtilMisc.toList(baseCondition, poolCondition),
138: EntityOperator.AND);
139:
140: // we will loop until we have no more to do
141: boolean pollDone = false;
142:
143: while (!pollDone) {
144: boolean beganTransaction;
145: try {
146: beganTransaction = TransactionUtil.begin();
147: } catch (GenericTransactionException e) {
148: Debug
149: .logError(
150: e,
151: "Unable to start transaction; not polling for jobs",
152: module);
153: return null;
154: }
155: if (!beganTransaction) {
156: Debug
157: .logError(
158: "Unable to poll for jobs; transaction was not started by this process",
159: module);
160: return null;
161: }
162:
163: try {
164: jobEnt = delegator.findByCondition("JobSandbox",
165: mainCondition, null, order);
166: } catch (GenericEntityException ee) {
167: Debug.logError(ee, "Cannot load jobs from datasource.",
168: module);
169: } catch (Exception e) {
170: Debug.logError(e, "Unknown error.", module);
171: }
172:
173: if (jobEnt != null && jobEnt.size() > 0) {
174: Iterator i = jobEnt.iterator();
175:
176: while (i.hasNext()) {
177: GenericValue v = (GenericValue) i.next();
178: DispatchContext dctx = getDispatcher()
179: .getDispatchContext();
180:
181: if (dctx == null) {
182: Debug
183: .logError(
184: "Unable to locate DispatchContext object; not running job!",
185: module);
186: continue;
187: }
188: Job job = new PersistedServiceJob(dctx, v, null); // todo fix the requester
189: poll.add(job);
190: }
191: } else {
192: pollDone = true;
193: }
194:
195: // finished this run; commit the transaction
196: try {
197: TransactionUtil.commit(beganTransaction);
198: } catch (GenericTransactionException e) {
199: Debug.logError(e, module);
200: }
201:
202: }
203: return poll.iterator();
204: }
205:
206: public synchronized void reloadCrashedJobs() {
207: String instanceId = UtilProperties.getPropertyValue(
208: "general.properties", "unique.instanceId", "ofbiz0");
209: List toStore = new ArrayList();
210: List crashed = null;
211:
212: List exprs = UtilMisc.toList(new EntityExpr("startDateTime",
213: EntityOperator.NOT_EQUAL, null));
214: exprs.add(new EntityExpr("finishDateTime",
215: EntityOperator.EQUALS, null));
216: exprs.add(new EntityExpr("cancelDateTime",
217: EntityOperator.EQUALS, null));
218: exprs.add(new EntityExpr("runByInstanceId",
219: EntityOperator.EQUALS, instanceId));
220: try {
221: crashed = delegator.findByAnd("JobSandbox", exprs, UtilMisc
222: .toList("startDateTime"));
223: } catch (GenericEntityException e) {
224: Debug.logError(e, "Unable to load crashed jobs", module);
225: }
226:
227: if (crashed != null && crashed.size() > 0) {
228: Iterator i = crashed.iterator();
229: while (i.hasNext()) {
230: GenericValue job = (GenericValue) i.next();
231: long runtime = job.getTimestamp("runTime").getTime();
232: RecurrenceInfo ri = JobManager.getRecurrenceInfo(job);
233: if (ri != null) {
234: long next = ri.next();
235: if (next <= runtime) {
236: Timestamp now = UtilDateTime.nowTimestamp();
237: // only re-schedule if there is no new recurrences since last run
238: Debug.log("Scheduling Job : " + job, module);
239: GenericValue newJob = new GenericValue(job);
240: newJob.set("runTime", now);
241: newJob.set("startDateTime", null);
242: toStore.add(newJob);
243:
244: // set the cancel time on the old job to the same as the re-schedule time
245: job.set("cancelDateTime", now);
246: toStore.add(job);
247: }
248: }
249: }
250:
251: if (toStore.size() > 0) {
252: try {
253: delegator.storeAll(toStore);
254: } catch (GenericEntityException e) {
255: Debug.logError(e, module);
256: }
257: if (Debug.infoOn())
258: Debug.logInfo("-- " + toStore.size()
259: + " jobs re-scheduled", module);
260: }
261:
262: } else {
263: if (Debug.infoOn())
264: Debug.logInfo("No crashed jobs to re-schedule", module);
265: }
266: }
267:
268: /**
269: * Schedule a job to start at a specific time with specific recurrence info
270: *@param serviceName The name of the service to invoke
271: *@param context The context for the service
272: *@param startTime The time in milliseconds the service should run
273: *@param frequency The frequency of the recurrence (HOURLY,DAILY,MONTHLY,etc)
274: *@param interval The interval of the frequency recurrence
275: *@param count The number of times to repeat
276: */
277: public void schedule(String serviceName, Map context,
278: long startTime, int frequency, int interval, int count)
279: throws JobManagerException {
280: schedule(serviceName, context, startTime, frequency, interval,
281: count, 0);
282: }
283:
284: /**
285: * Schedule a job to start at a specific time with specific recurrence info
286: *@param serviceName The name of the service to invoke
287: *@param context The context for the service
288: *@param startTime The time in milliseconds the service should run
289: *@param frequency The frequency of the recurrence (HOURLY,DAILY,MONTHLY,etc)
290: *@param interval The interval of the frequency recurrence
291: *@param endTime The time in milliseconds the service should expire
292: */
293: public void schedule(String serviceName, Map context,
294: long startTime, int frequency, int interval, long endTime)
295: throws JobManagerException {
296: schedule(serviceName, context, startTime, frequency, interval,
297: -1, endTime);
298: }
299:
300: /**
301: * Schedule a job to start at a specific time with specific recurrence info
302: *@param serviceName The name of the service to invoke
303: *@param context The context for the service
304: *@param startTime The time in milliseconds the service should run
305: *@param frequency The frequency of the recurrence (HOURLY,DAILY,MONTHLY,etc)
306: *@param interval The interval of the frequency recurrence
307: *@param count The number of times to repeat
308: *@param endTime The time in milliseconds the service should expire
309: */
310: public void schedule(String serviceName, Map context,
311: long startTime, int frequency, int interval, int count,
312: long endTime) throws JobManagerException {
313: schedule(null, serviceName, context, startTime, frequency,
314: interval, count, endTime);
315: }
316:
317: /**
318: * Schedule a job to start at a specific time with specific recurrence info
319: *@param poolName The name of the pool to run the service from
320: *@param serviceName The name of the service to invoke
321: *@param context The context for the service
322: *@param startTime The time in milliseconds the service should run
323: *@param frequency The frequency of the recurrence (HOURLY,DAILY,MONTHLY,etc)
324: *@param interval The interval of the frequency recurrence
325: *@param count The number of times to repeat
326: *@param endTime The time in milliseconds the service should expire
327: */
328: public void schedule(String poolName, String serviceName,
329: Map context, long startTime, int frequency, int interval,
330: int count, long endTime) throws JobManagerException {
331: String dataId = null;
332: String infoId = null;
333: String jobName = new String(new Long((new Date().getTime()))
334: .toString());
335:
336: if (delegator == null) {
337: Debug.logWarning(
338: "No delegator referenced; cannot schedule job.",
339: module);
340: return;
341: }
342:
343: try {
344: dataId = delegator.getNextSeqId("RuntimeData").toString();
345: GenericValue runtimeData = delegator.makeValue(
346: "RuntimeData", UtilMisc.toMap("runtimeDataId",
347: dataId));
348:
349: runtimeData.set("runtimeInfo", XmlSerializer
350: .serialize(context));
351: delegator.create(runtimeData);
352: } catch (GenericEntityException ee) {
353: throw new JobManagerException(ee.getMessage(), ee);
354: } catch (SerializeException se) {
355: throw new JobManagerException(se.getMessage(), se);
356: } catch (IOException ioe) {
357: throw new JobManagerException(ioe.getMessage(), ioe);
358: }
359: try {
360: RecurrenceInfo info = RecurrenceInfo.makeInfo(delegator,
361: startTime, frequency, interval, count);
362:
363: infoId = info.primaryKey();
364: } catch (RecurrenceInfoException e) {
365: throw new JobManagerException(e.getMessage(), e);
366: }
367: Map jFields = UtilMisc.toMap("jobName", jobName, "runTime",
368: new java.sql.Timestamp(startTime), "serviceName",
369: serviceName, "recurrenceInfoId", infoId,
370: "runtimeDataId", dataId);
371:
372: // set the pool ID
373: if (poolName != null && poolName.length() > 0) {
374: jFields.put("poolId", poolName);
375: } else {
376: jFields.put("poolId", ServiceConfigUtil.getSendPool());
377: }
378:
379: GenericValue jobV = null;
380:
381: try {
382: jobV = delegator.makeValue("JobSandbox", jFields);
383: delegator.create(jobV);
384: } catch (GenericEntityException e) {
385: throw new JobManagerException(e.getMessage(), e);
386: }
387: }
388:
389: /**
390: * Kill a JobInvoker Thread.
391: * @param threadName Name of the JobInvoker Thread to kill.
392: */
393: public void killThread(String threadName) {
394: jp.killThread(threadName);
395: }
396:
397: /**
398: * Get a List of each threads current state.
399: * @return List containing a Map of each thread's state.
400: */
401: public List processList() {
402: return jp.getPoolState();
403: }
404:
405: /** Close out the scheduler thread. */
406: public void finalize() {
407: if (jp != null) {
408: jp.stop();
409: jp = null;
410: Debug.logInfo("JobManager: Stopped Scheduler Thread.",
411: module);
412: }
413: }
414:
415: /** gets the recurrence info object for a job. */
416: public static RecurrenceInfo getRecurrenceInfo(GenericValue job) {
417: try {
418: if (job != null) {
419: GenericValue ri = job.getRelatedOne("RecurrenceInfo");
420:
421: if (ri != null) {
422: return new RecurrenceInfo(ri);
423: } else {
424: return null;
425: }
426: } else {
427: return null;
428: }
429: } catch (GenericEntityException e) {
430: e.printStackTrace();
431: Debug
432: .logError(
433: e,
434: "Problem getting RecurrenceInfo entity from JobSandbox",
435: module);
436: } catch (RecurrenceInfoException re) {
437: re.printStackTrace();
438: Debug.logError(re,
439: "Problem creating RecurrenceInfo instance: "
440: + re.getMessage(), module);
441: }
442: return null;
443: }
444:
445: }
|