001: package org.apache.turbine.services.schedule;
002:
003: /*
004: * Licensed to the Apache Software Foundation (ASF) under one
005: * or more contributor license agreements. See the NOTICE file
006: * distributed with this work for additional information
007: * regarding copyright ownership. The ASF licenses this file
008: * to you under the Apache License, Version 2.0 (the
009: * "License"); you may not use this file except in compliance
010: * with the License. You may obtain a copy of the License at
011: *
012: * http://www.apache.org/licenses/LICENSE-2.0
013: *
014: * Unless required by applicable law or agreed to in writing,
015: * software distributed under the License is distributed on an
016: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017: * KIND, either express or implied. See the License for the
018: * specific language governing permissions and limitations
019: * under the License.
020: */
021:
022: import java.util.Iterator;
023: import java.util.List;
024:
025: import javax.servlet.ServletConfig;
026:
027: import org.apache.commons.logging.Log;
028: import org.apache.commons.logging.LogFactory;
029:
030: import org.apache.torque.TorqueException;
031: import org.apache.torque.util.Criteria;
032:
033: import org.apache.turbine.services.InitializationException;
034: import org.apache.turbine.services.TurbineBaseService;
035: import org.apache.turbine.util.TurbineException;
036:
037: /**
038: * Service for a cron like scheduler.
039: *
040: * @author <a href="mailto:mbryson@mont.mindspring.com">Dave Bryson</a>
041: * @author <a href="mailto:quintonm@bellsouth.net">Quinton McCombs</a>
042: * @version $Id: TurbineSchedulerService.java 534527 2007-05-02 16:10:59Z tv $
043: */
044: public class TurbineSchedulerService extends TurbineBaseService
045: implements ScheduleService {
046: /** Logging */
047: private static Log log = LogFactory
048: .getLog(ScheduleService.LOGGER_NAME);
049:
050: /** The queue */
051: protected JobQueue scheduleQueue = null;
052:
053: /** Current status of the scheduler */
054: private boolean enabled = false;
055:
056: /** The main loop for starting jobs. */
057: protected MainLoop mainLoop;
058:
059: /** The thread used to process commands. */
060: protected Thread thread;
061:
062: /**
063: * Creates a new instance.
064: */
065: public TurbineSchedulerService() {
066: mainLoop = null;
067: thread = null;
068: }
069:
070: /**
071: * Initializes the SchedulerService.
072: *
073: * @throws InitializationException Something went wrong in the init
074: * stage
075: */
076: public void init() throws InitializationException {
077: try {
078: setEnabled(getConfiguration().getBoolean("enabled", true));
079: scheduleQueue = new JobQueue();
080: mainLoop = new MainLoop();
081:
082: // Load all from cold storage.
083: List jobs = JobEntryPeer.doSelect(new Criteria());
084:
085: if (jobs != null && jobs.size() > 0) {
086: Iterator it = jobs.iterator();
087: while (it.hasNext()) {
088: ((JobEntry) it.next()).calcRunTime();
089: }
090: scheduleQueue.batchLoad(jobs);
091:
092: restart();
093: }
094:
095: setInit(true);
096: } catch (Exception e) {
097: String errorMessage = "Could not initialize the scheduler service";
098: log.error(errorMessage, e);
099: throw new InitializationException(errorMessage, e);
100: }
101: }
102:
103: /**
104: * Called the first time the Service is used.<br>
105: *
106: * Load all the jobs from cold storage. Add jobs to the queue
107: * (sorted in ascending order by runtime) and start the scheduler
108: * thread.
109: *
110: * @param config A ServletConfig.
111: * @deprecated use init() instead.
112: */
113: public void init(ServletConfig config)
114: throws InitializationException {
115: init();
116: }
117:
118: /**
119: * Shutdowns the service.
120: *
121: * This methods interrupts the housekeeping thread.
122: */
123: public void shutdown() {
124: if (getThread() != null) {
125: getThread().interrupt();
126: }
127: }
128:
129: /**
130: * Get a specific Job from Storage.
131: *
132: * @param oid The int id for the job.
133: * @return A JobEntry.
134: * @exception TurbineException job could not be retreived.
135: */
136: public JobEntry getJob(int oid) throws TurbineException {
137: try {
138: JobEntry je = JobEntryPeer.retrieveByPK(oid);
139: return scheduleQueue.getJob(je);
140: } catch (TorqueException e) {
141: String errorMessage = "Error retrieving job from persistent storage.";
142: log.error(errorMessage, e);
143: throw new TurbineException(errorMessage, e);
144: }
145: }
146:
147: /**
148: * Add a new job to the queue.
149: *
150: * @param je A JobEntry with the job to add.
151: * @throws TurbineException job could not be added
152: */
153: public void addJob(JobEntry je) throws TurbineException {
154: updateJob(je);
155: }
156:
157: /**
158: * Remove a job from the queue.
159: *
160: * @param je A JobEntry with the job to remove.
161: * @exception TurbineException job could not be removed
162: */
163: public void removeJob(JobEntry je) throws TurbineException {
164: try {
165: // First remove from DB.
166: Criteria c = new Criteria().add(JobEntryPeer.JOB_ID, je
167: .getPrimaryKey());
168: JobEntryPeer.doDelete(c);
169:
170: // Remove from the queue.
171: scheduleQueue.remove(je);
172:
173: // restart the scheduler
174: restart();
175: } catch (Exception e) {
176: String errorMessage = "Problem removing Scheduled Job: "
177: + je.getTask();
178: log.error(errorMessage, e);
179: throw new TurbineException(errorMessage, e);
180: }
181: }
182:
183: /**
184: * Add or update a job.
185: *
186: * @param je A JobEntry with the job to modify
187: * @throws TurbineException job could not be updated
188: */
189: public void updateJob(JobEntry je) throws TurbineException {
190: try {
191: je.calcRunTime();
192:
193: // Update the queue.
194: if (je.isNew()) {
195: scheduleQueue.add(je);
196: } else {
197: scheduleQueue.modify(je);
198: }
199:
200: je.save();
201:
202: restart();
203: } catch (Exception e) {
204: String errorMessage = "Problem updating Scheduled Job: "
205: + je.getTask();
206: log.error(errorMessage, e);
207: throw new TurbineException(errorMessage, e);
208: }
209: }
210:
211: /**
212: * List jobs in the queue. This is used by the scheduler UI.
213: *
214: * @return A List of jobs.
215: */
216: public List listJobs() {
217: return scheduleQueue.list();
218: }
219:
220: /**
221: * Sets the enabled status of the scheduler
222: *
223: * @param enabled
224: *
225: */
226: protected void setEnabled(boolean enabled) {
227: this .enabled = enabled;
228: }
229:
230: /**
231: * Determines if the scheduler service is currently enabled.
232: *
233: * @return Status of the scheduler service.
234: */
235: public boolean isEnabled() {
236: return enabled;
237: }
238:
239: /**
240: * Starts or restarts the scheduler if not already running.
241: */
242: public synchronized void startScheduler() {
243: setEnabled(true);
244: restart();
245: }
246:
247: /**
248: * Stops the scheduler if it is currently running.
249: */
250: public synchronized void stopScheduler() {
251: log.info("Stopping job scheduler");
252: Thread thread = getThread();
253: if (thread != null) {
254: thread.interrupt();
255: }
256: enabled = false;
257: }
258:
259: /**
260: * Return the thread being used to process commands, or null if
261: * there is no such thread. You can use this to invoke any
262: * special methods on the thread, for example, to interrupt it.
263: *
264: * @return A Thread.
265: */
266: public synchronized Thread getThread() {
267: return thread;
268: }
269:
270: /**
271: * Set thread to null to indicate termination.
272: */
273: private synchronized void clearThread() {
274: thread = null;
275: }
276:
277: /**
278: * Start (or restart) a thread to process commands, or wake up an
279: * existing thread if one is already running. This method can be
280: * invoked if the background thread crashed due to an
281: * unrecoverable exception in an executed command.
282: */
283: public synchronized void restart() {
284: if (enabled) {
285: log.info("Starting job scheduler");
286: if (thread == null) {
287: // Create the the housekeeping thread of the scheduler. It will wait
288: // for the time when the next task needs to be started, and then
289: // launch a worker thread to execute the task.
290: thread = new Thread(mainLoop,
291: ScheduleService.SERVICE_NAME);
292: // Indicate that this is a system thread. JVM will quit only when there
293: // are no more enabled user threads. Settings threads spawned internally
294: // by Turbine as daemons allows commandline applications using Turbine
295: // to terminate in an orderly manner.
296: thread.setDaemon(true);
297: thread.start();
298: } else {
299: notify();
300: }
301: }
302: }
303:
304: /**
305: * Return the next Job to execute, or null if thread is
306: * interrupted.
307: *
308: * @return A JobEntry.
309: * @exception TurbineException a generic exception.
310: */
311: private synchronized JobEntry nextJob() throws TurbineException {
312: try {
313: while (!Thread.interrupted()) {
314: // Grab the next job off the queue.
315: JobEntry je = scheduleQueue.getNext();
316:
317: if (je == null) {
318: // Queue must be empty. Wait on it.
319: wait();
320: } else {
321: long now = System.currentTimeMillis();
322: long when = je.getNextRuntime();
323:
324: if (when > now) {
325: // Wait till next runtime.
326: wait(when - now);
327: } else {
328: // Update the next runtime for the job.
329: scheduleQueue.updateQueue(je);
330: // Return the job to run it.
331: return je;
332: }
333: }
334: }
335: } catch (InterruptedException ex) {
336: }
337:
338: // On interrupt.
339: return null;
340: }
341:
342: /**
343: * Inner class. This is isolated in its own Runnable class just
344: * so that the main class need not implement Runnable, which would
345: * allow others to directly invoke run, which is not supported.
346: */
347: protected class MainLoop implements Runnable {
348: /**
349: * Method to run the class.
350: */
351: public void run() {
352: String taskName = null;
353: try {
354: while (enabled) {
355: JobEntry je = nextJob();
356: if (je != null) {
357: taskName = je.getTask();
358:
359: // Start the thread to run the job.
360: Runnable wt = new WorkerThread(je);
361: Thread helper = new Thread(wt);
362: helper.start();
363: } else {
364: break;
365: }
366: }
367: } catch (Exception e) {
368: log.error("Error running a Scheduled Job: " + taskName,
369: e);
370: enabled = false;
371: } finally {
372: clearThread();
373: }
374: }
375: }
376: }
|