001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.jetspeed.scheduler;
018:
019: import java.util.List;
020:
021: import org.apache.commons.logging.Log;
022: import org.apache.commons.logging.LogFactory;
023:
024: /**
025: * Service for a cron like scheduler.
026: *
027: * @author <a href="mailto:mbryson@mont.mindspring.com">Dave Bryson</a>
028: * @version $Id: AbstractScheduler.java 516448 2007-03-09 16:25:47Z ate $
029: */
030: public abstract class AbstractScheduler implements Scheduler {
031: private final static Log log = LogFactory
032: .getLog(MemoryBasedScheduler.class);
033:
034: /**
035: * The queue.
036: */
037: protected JobQueue scheduleQueue = null;
038:
039: /**
040: * The main loop for starting jobs.
041: */
042: protected MainLoop mainLoop;
043:
044: /**
045: * The thread used to process commands.
046: */
047: protected Thread thread;
048:
049: /**
050: * Creates a new instance.
051: */
052: public AbstractScheduler() {
053: mainLoop = null;
054: thread = null;
055: }
056:
057: public void start() {
058: }
059:
060: public void stop() {
061: if (getThread() != null) {
062: getThread().interrupt();
063: }
064: }
065:
066: /**
067: * Get a specific Job from Storage.
068: *
069: * @param oid The int id for the job.
070: * @return A JobEntry.
071: * @exception Exception, a generic exception.
072: */
073: public abstract JobEntry getJob(int oid) throws Exception;
074:
075: /**
076: * Add a new job to the queue. Before adding a job, calculate the runtime
077: * to make sure the entry will be placed at the right order in the queue.
078: *
079: * @param je A JobEntry with the job to add.
080: * @exception Exception, a generic exception.
081: */
082: public abstract void addJob(JobEntry je) throws Exception;
083:
084: /**
085: * Remove a job from the queue.
086: *
087: * @param je A JobEntry with the job to remove.
088: * @exception Exception, a generic exception.
089: */
090: public abstract void removeJob(JobEntry je) throws Exception;
091:
092: /**
093: * Modify a Job.
094: *
095: * @param je A JobEntry with the job to modify
096: * @exception Exception, a generic exception.
097: */
098: public abstract void updateJob(JobEntry je) throws Exception;
099:
100: /**
101: * List jobs in the queue. This is used by the scheduler UI.
102: *
103: * @return A List of jobs.
104: */
105: public List listJobs() {
106: return scheduleQueue.list();
107: }
108:
109: /**
110: * Return the thread being used to process commands, or null if
111: * there is no such thread. You can use this to invoke any
112: * special methods on the thread, for example, to interrupt it.
113: *
114: * @return A Thread.
115: */
116: public synchronized Thread getThread() {
117: return thread;
118: }
119:
120: /**
121: * Set thread to null to indicate termination.
122: */
123: private synchronized void clearThread() {
124: thread = null;
125: }
126:
127: /**
128: * Start (or restart) a thread to process commands, or wake up an
129: * existing thread if one is already running. This method can be
130: * invoked if the background thread crashed due to an
131: * unrecoverable exception in an executed command.
132: */
133: public synchronized void restart() {
134: if (thread == null) {
135: // Create the the housekeeping thread of the scheduler. It will wait
136: // for the time when the next task needs to be started, and then
137: // launch a worker thread to execute the task.
138: thread = new Thread(mainLoop, Scheduler.SERVICE_NAME);
139: // Indicate that this is a system thread. JVM will quit only when there
140: // are no more active user threads. Settings threads spawned internally
141: // by CPS as daemons allows commandline applications
142: // to terminate in an orderly manner.
143: thread.setDaemon(true);
144: thread.start();
145: } else {
146: notify();
147: }
148: }
149:
150: /**
151: * Return the next Job to execute, or null if thread is
152: * interrupted.
153: *
154: * @return A JobEntry.
155: * @exception Exception, a generic exception.
156: */
157: private synchronized JobEntry nextJob() throws Exception {
158: try {
159: while (!Thread.interrupted()) {
160: // Grab the next job off the queue.
161: JobEntry je = scheduleQueue.getNext();
162:
163: if (je == null) {
164: // Queue must be empty. Wait on it.
165: wait();
166: } else {
167: long now = System.currentTimeMillis();
168: long when = je.getNextRuntime();
169:
170: if (when > now) {
171: // Wait till next runtime.
172: wait(when - now);
173: } else {
174: // Update the next runtime for the job.
175: scheduleQueue.updateQueue(je);
176: // Return the job to run it.
177: return je;
178: }
179: }
180: }
181: } catch (InterruptedException ex) {
182: }
183:
184: // On interrupt.
185: return null;
186: }
187:
188: /**
189: * Inner class. This is isolated in its own Runnable class just
190: * so that the main class need not implement Runnable, which would
191: * allow others to directly invoke run, which is not supported.
192: */
193: protected class MainLoop implements Runnable {
194: /**
195: * Method to run the class.
196: */
197: public void run() {
198: try {
199: for (;;) {
200: JobEntry je = nextJob();
201: if (je != null) {
202: // Start the thread to run the job.
203: Runnable wt = new WorkerThread(je);
204: Thread helper = new Thread(wt);
205: helper.start();
206: } else {
207: break;
208: }
209: }
210: } catch (Exception e) {
211: // Log error.
212: log.error("Error running a Scheduled Job: " + e);
213: } finally {
214: clearThread();
215: }
216: }
217: }
218: }
|