001: /*
002: * <copyright>
003: *
004: * Copyright 2002-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.yp;
028:
029: import org.cougaar.core.mts.Message;
030: import org.cougaar.core.service.ThreadService;
031: import org.cougaar.core.thread.Schedulable;
032: import org.cougaar.util.CircularQueue;
033: import org.cougaar.util.log.Logger;
034:
035: /** A handler of messages via a queue **/
036: public class ServiceThread {
037: public interface Callback {
038: void dispatch(Message m);
039: }
040:
041: private final Logger logger;
042: private final String name;
043: private final Callback callback;
044:
045: public ServiceThread(Callback callback, Logger logger, String name) {
046: this .callback = callback;
047: this .logger = logger;
048: this .name = name;
049: }
050:
051: /** queue for incoming YP Messages - dequeued by the service pseudo-thread **/
052: private final CircularQueue inQ = new CircularQueue(11);
053:
054: /** The service thread **/
055: private Schedulable thread = null;
056:
057: public void start(ThreadService threadService) {
058: // set up the service thread
059: thread = threadService.getThread(this , new Runnable() {
060: public void run() {
061: try {
062: cycle();
063: } catch (Throwable e) {
064: logger.error("Uncaught exception for " + callback,
065: e);
066: }
067: }
068: }, name);
069: }
070:
071: // useful counters to tell if everything is getting dealt with
072: private int ic = 0; // sync on inQ
073: private int oc = 0; // sync on inQ
074:
075: /** Used to (re)start the service thread to handle queued YP messages **/
076: private void wake() {
077: thread.start();
078: }
079:
080: /** queue an incoming message for later handling **/
081: public void addMessage(Message m) {
082: synchronized (inQ) {
083: ic++;
084: if (logger.isDebugEnabled()) {
085: logger.debug(name + " Queuing Message " + ic);
086: }
087:
088: inQ.add(m);
089: }
090: // should this be in the sync?
091: wake();
092: }
093:
094: private final static boolean batchRequests = false;
095:
096: private void cycle() {
097: if (batchRequests) {
098: // service everything we can
099: while (true) {
100: if (!dispatchNext()) {
101: return;
102: }
103: }
104: } else {
105: if (!dispatchNext()) {
106: return;
107: }
108: wake();
109: }
110: }
111:
112: private boolean dispatchNext() {
113: Message m;
114: // get the next message
115: synchronized (inQ) {
116: m = (Message) inQ.next();
117: if (m == null) {
118: // exit the loop if we are done
119: return false;
120: }
121:
122: oc++;
123: if (logger.isDebugEnabled()) {
124: logger.debug(name + " Handling Message " + oc);
125: }
126: }
127:
128: // handle it outside the inQ lock
129: callback.dispatch(m);
130: return true;
131: }
132: }
|