001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.mts.base;
028:
029: import java.util.ArrayList;
030: import java.util.Iterator;
031: import java.util.LinkedList;
032:
033: import org.cougaar.core.component.Container;
034: import org.cougaar.core.service.ThreadService;
035: import org.cougaar.core.thread.Schedulable;
036: import org.cougaar.util.UnaryPredicate;
037: import org.cougaar.mts.std.AttributedMessage;
038:
039: /**
040: * An abstract class which manages a circular queue of messages, and
041: * runs its own thread to pop messages off that queue. The method
042: * <strong>dispatch</strong>, provided by instantiable subclasses, is
043: * invoked on each message as it's popped off. */
044: abstract class MessageQueue extends BoundComponent implements Runnable {
045:
046: // Simplified queue
047: private static class SimpleQueue extends LinkedList {
048: Object next() {
049: return removeFirst();
050: }
051: }
052:
053: private SimpleQueue queue;
054: private Schedulable thread;
055: private String name;
056: private AttributedMessage in_progress;
057: private Object in_progress_lock, queue_processing;
058:
059: MessageQueue(String name, Container container) {
060: this .name = name;
061: in_progress_lock = new Object();
062: queue_processing = new Object();
063: queue = new SimpleQueue();
064: }
065:
066: int getLane() {
067: return ThreadService.BEST_EFFORT_LANE;
068: }
069:
070: public void load() {
071: super .load();
072: thread = threadService.getThread(this , this , name, getLane());
073: }
074:
075: String getName() {
076: return name;
077: }
078:
079: public void removeMessages(UnaryPredicate pred, ArrayList removed) {
080: // only one remove can be examining the queue at a time,
081: // even if they are looking for orthogonal messages
082: synchronized (queue_processing) {
083: // Note: We are blocking processing of the queue during
084: // this time, which would usually include waiting for an
085: // in-progress message to complete. But messages can still
086: // be added to the queue.
087:
088: AttributedMessage msg = in_progress;
089: boolean matches = msg != null && pred.execute(msg);
090: if (matches) {
091: // Wait for the in-progress message to complete or fail.
092: synchronized (in_progress_lock) {
093: if (in_progress == null) {
094: // Between the time we cached the in-progress
095: // message and the time we got the lock, the
096: // message could have been sent
097: // successfully. In that case in_progress is
098: // null and we don't care about it.
099: } else if (in_progress == msg) {
100: // Since we have the lock on queue_processing and
101: // in_progress is not null, it must still be
102: // the message we cached, which we know
103: // should be removed.
104: removed.add(in_progress);
105: in_progress = null;
106: } else {
107: loggingService
108: .error("In Progress Message Changed which is impossible"
109: + msg);
110: }
111: }
112: }
113: synchronized (queue) {
114: Iterator itr = queue.iterator();
115: while (itr.hasNext()) {
116: msg = (AttributedMessage) itr.next();
117: if (pred.execute(msg)) {
118: removed.add(msg);
119: itr.remove();
120: }
121: }
122: }
123: }
124: }
125:
126: private static final long HOLD_TIME = 500;
127:
128: // Process the last failed message, if any, followed by as many
129: // items as possible from the queue, with a max time as given by
130: // HOLD_TIME.
131: public void run() {
132: long endTime = System.currentTimeMillis() + HOLD_TIME;
133: // Now process the queued items.
134: while (System.currentTimeMillis() <= endTime) {
135: if (in_progress == null) {
136: synchronized (queue_processing) {
137: synchronized (queue) {
138: if (queue.isEmpty())
139: break; // done for now
140: synchronized (in_progress_lock) {
141: in_progress = (AttributedMessage) queue
142: .next();
143: }
144: }
145: }
146: }
147:
148: // Note that in_progress could have been set to null by
149: // the time we get here, presumaly because it was killed
150: // via removeMessages, which could have run between the
151: // synchronization three lines up and this one. But this
152: // is OK. The null will be detected and the thread will
153: // simply continue processing the queue.
154: synchronized (in_progress_lock) {
155: if (in_progress == null) {
156: continue;
157: } else if (dispatch(in_progress)) {
158: // Processing succeeded, continue popping the queue
159: in_progress = null;
160: continue;
161: } else {
162: // The dispatch code has already scheduled the thread
163: // to run again later.
164: return;
165: }
166: }
167: }
168:
169: // Ran out of time or queue is empty. Restart later if any
170: // remains on the queue.
171: restartIfNotEmpty();
172: }
173:
174: // Restart the thread immediately if the queue is not empty.
175: private void restartIfNotEmpty() {
176: synchronized (queue) {
177: if (!queue.isEmpty())
178: thread.start();
179: }
180: }
181:
182: void scheduleRestart(int delay) {
183: thread.schedule(delay);
184: }
185:
186: /**
187: * Enqueue a message. */
188: void add(AttributedMessage message) {
189: synchronized (queue) {
190: queue.add(message);
191: }
192: thread.start();
193: }
194:
195: public AttributedMessage[] snapshot() {
196: synchronized (queue) {
197: AttributedMessage head = in_progress;
198: int size = queue.size();
199: if (head != null)
200: size++;
201: AttributedMessage[] ret = new AttributedMessage[size];
202: int i = 0;
203: Iterator iter = queue.iterator();
204: if (head != null)
205: ret[i++] = head;
206: while (i < size)
207: ret[i++] = (AttributedMessage) iter.next();
208: return ret;
209: }
210: }
211:
212: /**
213: * Process a dequeued message. Return value indicates success or
214: * failure or the dispatch. Failed dispatches will be tried again
215: * before any further queue entries are dispatched. */
216: abstract boolean dispatch(AttributedMessage m);
217:
218: /**
219: * Number of messages waiting in the queue.
220: */
221: public int size() {
222: return queue.size();
223: }
224:
225: }
|