001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.core.thread;
028:
029: import org.cougaar.util.log.Logger;
030: import org.cougaar.util.log.Logging;
031:
032: import java.util.Timer;
033: import java.util.TimerTask;
034:
035: /**
036: * The standard implementation of {@link Schedulable}. The trivial
037: * thread services use {@link TrivialSchedulable} instead.
038: */
039:
040: final class SchedulableObject implements Schedulable {
041: private long timestamp;
042: private final Object consumer;
043: private final ThreadPool pool;
044: private final Scheduler scheduler;
045: private final int lane;
046: private final Runnable runnable;
047: private final String name;
048: private int start_count;
049: private boolean cancelled;
050: private boolean queued;
051: private boolean disqualified;
052: private boolean suspending; // suspend requested but not yet achieved
053: private boolean suspended; // truly suspended
054: private SuspendCallback suspendCallback;
055: private TimerTask task;
056: private int blocking_type = SchedulableStatus.NOT_BLOCKING;
057: private String blocking_excuse;
058: private ThreadPool.PooledThread thread;
059:
060: SchedulableObject(TreeNode treeNode, Runnable runnable,
061: String name, Object consumer, int lane) {
062: this .lane = lane;
063: this .pool = treeNode.getPool(lane);
064: this .scheduler = treeNode.getScheduler(lane);
065: this .runnable = runnable;
066: if (runnable == null) {
067: Logger logger = Logging.getLogger(this );
068: if (logger.isWarnEnabled())
069: logger.warn(consumer + " gave a null Runnable");
070: }
071: if (name == null)
072: this .name = pool.generateName();
073: else
074: this .name = name;
075: this .consumer = consumer;
076: this .start_count = 0;
077: }
078:
079: public void suspend(SuspendCallback cb) {
080: boolean runCallback = false;
081: synchronized (this ) {
082: if (thread == null) {
083: // Already suspended, run the callback immediately
084: runCallback = suspended = true;
085:
086: } else if (suspendCallback != null) {
087: throw new RuntimeException(
088: "More than one SuspendCallback!");
089: } else {
090: suspendCallback = cb;
091: suspending = true;
092: }
093: }
094: if (runCallback) {
095: cb.suspended(this );
096: }
097: }
098:
099: public void resume() {
100: boolean restart = false;
101: synchronized (this ) {
102: if (!suspended) {
103: Logger logger = Logging.getLogger(getClass());
104: logger.warn("Resuming " + this
105: + ", which was not suspended");
106: return;
107: }
108: suspended = false;
109: restart = start_count > 0;
110: }
111: if (restart) {
112: SchedulableStateChangeQueue.pushStart(this );
113: }
114: }
115:
116: void run() {
117: runnable.run();
118: }
119:
120: public int getLane() {
121: return lane;
122: }
123:
124: public String getBlockingExcuse() {
125: return blocking_excuse;
126: }
127:
128: public int getBlockingType() {
129: return blocking_type;
130: }
131:
132: synchronized void setBlocking(int type, String excuse) {
133: blocking_type = type;
134: blocking_excuse = excuse;
135: }
136:
137: synchronized void clearBlocking() {
138: blocking_excuse = null;
139: blocking_type = SchedulableStatus.NOT_BLOCKING;
140: }
141:
142: public String getName() {
143: return name;
144: }
145:
146: Scheduler getScheduler() {
147: return scheduler;
148: }
149:
150: public String toString() {
151: return "<Schedulable " + (name == null ? "anonymous" : name)
152: + ">";
153: }
154:
155: public long getTimestamp() {
156: return timestamp;
157: }
158:
159: synchronized void setQueued(boolean flag) {
160: queued = flag;
161: if (flag)
162: timestamp = System.currentTimeMillis();
163: }
164:
165: boolean isQueued() {
166: return queued;
167: }
168:
169: boolean isDisqualified() {
170: return disqualified;
171: }
172:
173: synchronized void setDisqualified(boolean flag) {
174: disqualified = flag;
175: if (flag)
176: queued = false;
177: }
178:
179: public Object getConsumer() {
180: return consumer;
181: }
182:
183: void claim() {
184: // thread has started or restarted
185: scheduler.threadClaimed(this );
186: }
187:
188: // This method runs after each pass through the body. Cf
189: // reclaimNotify, which only runs when this Schedulable is the
190: // last continuation for a given pooled thread.
191: SchedulableObject reclaim(boolean reuse) {
192: // NB: The Schedulable itself can never be the continuation
193: // of its own thread!
194: SchedulableObject continuation = scheduler.threadReclaimed(
195: this , reuse);
196: if (continuation == this ) {
197: Logger logger = Logging.getLogger(getClass());
198: logger.error(this + " is its own continuation!");
199: } else if (continuation != null) {
200: end();
201: }
202: SuspendCallback cb = null;
203: synchronized (this ) {
204: if (suspending) {
205: cb = suspendCallback;
206: suspendCallback = null;
207: suspended = true;
208: suspending = false;
209: }
210: }
211: if (cb != null) {
212: cb.suspended(this );
213: }
214: return continuation;
215: }
216:
217: // Callback from the Reclaimer. This only runs when this
218: // Schedulable is the last continuation for a given pooled
219: // thread. Cf reclaim, which runs after each pass through the
220: // body.
221: void reclaimNotify() {
222: scheduler.releaseRights(scheduler);
223:
224: // The restart mechanism shouldn't be relevant unless the
225: // no continuation was found in the corresponding reclaim
226: // call.
227: end();
228:
229: }
230:
231: private void end() {
232: synchronized (this ) {
233: thread = null;
234: if (--start_count <= 0) {
235: return;
236: }
237: }
238: // Restart
239: SchedulableStateChangeQueue.pushStart(this );
240: }
241:
242: // This method is not synchronized by design.
243: void addToReclaimer() {
244: SchedulableStateChangeQueue.pushReclaim(this );
245: }
246:
247: synchronized void thread_start() {
248: start_count = 1; // forget any extra intervening start() calls
249: queued = false;
250: if (thread != null) {
251: Logger logger = Logging.getLogger(getClass());
252: logger.error(this + " already has a Thread=" + thread);
253: return;
254: }
255: while (true) {
256: try {
257: thread = pool.getThread(name);
258: timestamp = System.currentTimeMillis();
259: thread.start_running(this );
260: break; //success
261: } catch (IllegalThreadStateException e1) {
262: Logger logger = Logging.getLogger(getClass());
263: logger.error("Thread=" + thread
264: + " is already running! "
265: + "Attempting to restart Schedulable=" + this
266: + " with new thread");
267: thread = null;
268: // retry
269: } catch (RuntimeException e2) {
270: Logger logger = Logging.getLogger(getClass());
271: logger.error("Unrecoverable Thread pool error Pool="
272: + pool + "Can not start Schedulable=" + this
273: + "\n" + e2.getMessage());
274: break; // abort (maybe we should retry?)
275: }
276: }
277: }
278:
279: public void start() {
280: synchronized (this ) {
281: // If the Schedulable has been cancelled, or has already
282: // been asked to start, there's nothing further to do.
283: // Otherwise, submit a request to be started
284: //
285: // check suspended after the pre-increment since
286: // we need to know about start requests when suspended
287: if (cancelled || ++start_count != 1 || suspending
288: || suspended) {
289: return;
290: }
291: }
292: SchedulableStateChangeQueue.pushStart(this );
293: }
294:
295: // All callers should be synchronized on this
296: private TimerTask task() {
297: cancelTimer();
298: task = new TimerTask() {
299: public void run() {
300: start();
301: }
302: };
303: return task;
304: }
305:
306: private Timer timer() {
307: Timer timer = TreeNode.timer();
308: if (timer == null) {
309: Logger logger = Logging.getLogger(this );
310: if (logger.isWarnEnabled()) {
311: logger.warn("Ignoring timer.schedule(..) request,"
312: + " the timer has been stopped");
313: }
314: }
315: return timer;
316: }
317:
318: synchronized public void schedule(long delay) {
319: Timer timer = timer();
320: if (timer != null) {
321: timer.schedule(task(), delay);
322: }
323: }
324:
325: synchronized public void schedule(long delay, long interval) {
326: Timer timer = timer();
327: if (timer != null) {
328: timer.schedule(task(), delay, interval);
329: }
330: }
331:
332: synchronized public void scheduleAtFixedRate(long delay,
333: long interval) {
334: Timer timer = timer();
335: if (timer != null) {
336: timer.scheduleAtFixedRate(task(), delay, interval);
337: }
338: }
339:
340: synchronized public void cancelTimer() {
341: if (task != null)
342: task.cancel();
343: task = null;
344: }
345:
346: synchronized public int getState() {
347: // Later add a 'disqualified' state
348: if (suspended) {
349: return CougaarThread.THREAD_SUSPENDED;
350: } else if (queued) {
351: return CougaarThread.THREAD_PENDING;
352: } else if (thread != null) {
353: return CougaarThread.THREAD_RUNNING;
354: } else {
355: return CougaarThread.THREAD_DORMANT;
356: }
357: }
358:
359: synchronized public boolean cancel() {
360: cancelTimer();
361: cancelled = true;
362: start_count = 0;
363: if (thread != null) {
364: // Currently running.
365: return false;
366: }
367: if (queued)
368: scheduler.dequeue(this );
369: queued = false;
370: return true;
371: }
372:
373: // Used in logging
374: int getStartCount() {
375: return start_count;
376: }
377:
378: }
|