001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.core.thread;
028:
029: import org.cougaar.bootstrap.SystemProperties;
030: import org.cougaar.util.CircularQueue;
031: import org.cougaar.util.log.Logger;
032: import org.cougaar.util.log.Logging;
033:
034: final class SchedulableStateChangeQueue extends Thread {
035:
036: // At least one thread must be a non-daemon thread, otherwise the JVM
037: // will exit. We'll mark this thread as our non-daemon "keep-alive".
038: private static final boolean DAEMON = SystemProperties
039: .getBoolean("org.cougaar.core.thread.daemon");
040:
041: private static SchedulableStateChangeQueue singleton;
042:
043: static void startThread() {
044: singleton = new SchedulableStateChangeQueue();
045: singleton.start();
046: }
047:
048: static void stopThread() {
049: SchedulableStateChangeQueue instance = singleton;
050: if (instance == null) {
051: return;
052: }
053: singleton = null;
054: instance.quit();
055: try {
056: instance.join();
057: } catch (InterruptedException ie) {
058: // don't care
059: }
060: }
061:
062: static void pushStart(SchedulableObject sched) {
063: push(sched, SchedulableLifecyle.Start);
064: }
065:
066: static void pushReclaim(SchedulableObject sched) {
067: push(sched, SchedulableLifecyle.Reclaim);
068: }
069:
070: private static void push(SchedulableObject schedulable,
071: SchedulableLifecyle operation) {
072: SchedulableStateChangeQueue instance = singleton;
073: if (instance == null) {
074: Logger logger = Logging
075: .getLogger(SchedulableStateChangeQueue.class);
076: if (logger.isWarnEnabled()) {
077: logger
078: .warn("Ignoring enqueue request on stopped thread");
079: }
080: return;
081: }
082: // XXX: Creating a new queue entry every time is potentially expensive
083: instance.add(new QueueEntry(schedulable, operation));
084: }
085:
086: private final CircularQueue<QueueEntry> queue;
087: private final Object lock;
088: private boolean should_stop;
089:
090: private SchedulableStateChangeQueue() {
091: super ("Thread Start/Stop Queue");
092: setDaemon(DAEMON);
093: queue = new CircularQueue<QueueEntry>();
094: lock = new Object();
095: }
096:
097: private void quit() {
098: synchronized (lock) {
099: should_stop = true;
100: lock.notify();
101: }
102: }
103:
104: private void add(QueueEntry entry) {
105: synchronized (lock) {
106: // TODO turn the validity check into an assertion that only runs during testing
107: if (validToAdd(entry)) {
108: queue.add(entry);
109: lock.notify();
110: }
111: }
112: }
113:
114: /**
115: * Verify that the Schedulable isn't already on the
116: * queue. This is potentially expensive and in theory
117: * should never happen. But it does, so until we know
118: * why, we need to check.
119: */
120: private boolean validToAdd(QueueEntry entry) {
121: SchedulableObject schedulable = entry.schedulable;
122: for (QueueEntry e : queue) {
123: if (e.schedulable == schedulable) {
124: Logger logger = Logging
125: .getLogger(SchedulableStateChangeQueue.class);
126: logger.error(schedulable
127: + " is already in the queue with "
128: + e.operation + ", new op is "
129: + entry.operation);
130: // XXX: Figure out why this happens !!
131: return false;
132: }
133: }
134: return true;
135: }
136:
137: public void run() {
138: while (true) {
139: QueueEntry entry = null;
140: synchronized (lock) {
141: while (true) {
142: if (should_stop) {
143: return;
144: }
145: if (!queue.isEmpty()) {
146: break;
147: }
148: try {
149: lock.wait();
150: } catch (InterruptedException ex) {
151: }
152: }
153: entry = queue.next();
154: }
155: entry.doWork();
156: }
157: }
158:
159: private static class QueueEntry {
160: final SchedulableObject schedulable;
161: final SchedulableLifecyle operation;
162:
163: QueueEntry(SchedulableObject schedulable, SchedulableLifecyle op) {
164: this .schedulable = schedulable;
165: this .operation = op;
166: }
167:
168: void doWork() {
169: operation.doWork(schedulable);
170: }
171: }
172: }
|