001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.core.agent;
028:
029: import java.io.PrintStream;
030: import java.util.ArrayList;
031: import java.util.Collection;
032: import java.util.List;
033: import org.cougaar.core.agent.service.MessageSwitchService;
034: import org.cougaar.core.blackboard.BlackboardForAgent;
035: import org.cougaar.core.component.Component;
036: import org.cougaar.core.component.ServiceBroker;
037: import org.cougaar.core.component.ServiceRevokedListener;
038: import org.cougaar.core.mts.Message;
039: import org.cougaar.core.mts.MessageAddress;
040: import org.cougaar.core.mts.MessageHandler;
041: import org.cougaar.core.service.AgentIdentificationService;
042: import org.cougaar.core.service.LoggingService;
043: import org.cougaar.core.service.ThreadService;
044: import org.cougaar.core.thread.Schedulable;
045: import org.cougaar.util.GenericStateModelAdapter;
046:
047: /**
048: * This component buffers blackboard messages while the agent is
049: * loading, plus switches threads when receiving messages to avoid
050: * blocking the message transport thread.
051: */
052: public final class QueueHandler extends GenericStateModelAdapter
053: implements Component {
054:
055: private ServiceBroker sb;
056:
057: private LoggingService log;
058: private MessageSwitchService mss;
059:
060: private BlackboardForAgent bb;
061:
062: private MessageAddress localAgent;
063:
064: private QueueHandlerBody body;
065: private boolean isStarted;
066: private Object lock = new Object();
067:
068: public void setServiceBroker(ServiceBroker sb) {
069: this .sb = sb;
070: }
071:
072: public void load() {
073: super .load();
074:
075: log = (LoggingService) sb.getService(this ,
076: LoggingService.class, null);
077:
078: AgentIdentificationService ais = (AgentIdentificationService) sb
079: .getService(this , AgentIdentificationService.class,
080: null);
081: if (ais != null) {
082: localAgent = ais.getMessageAddress();
083: sb.releaseService(this , AgentIdentificationService.class,
084: ais);
085: }
086:
087: mss = (MessageSwitchService) sb.getService(this ,
088: MessageSwitchService.class, null);
089:
090: // register message handler to observe all incoming messages
091: MessageHandler mh = new MessageHandler() {
092: public boolean handleMessage(Message message) {
093: if (message instanceof ClusterMessage) {
094: // internal message queue
095: getHandler().addMessage((ClusterMessage) message);
096: return true;
097: } else {
098: return false;
099: }
100: }
101: };
102: mss.addMessageHandler(mh);
103: }
104:
105: public void start() {
106: super .start();
107:
108: // get blackboard service
109: //
110: // this is delayed until "start()" because the queue handler
111: // is loaded after the blackboard. Messages are buffered
112: // between the top-level "load()" MTS unpend and our "start()",
113: // then released by "startThread()".
114: bb = (BlackboardForAgent) sb.getService(this ,
115: BlackboardForAgent.class, null);
116: if (bb == null) {
117: throw new RuntimeException(
118: "Unable to obtain BlackboardForAgent");
119: }
120:
121: startThread();
122:
123: }
124:
125: public void suspend() {
126: super .suspend();
127: stopThread();
128: bb.suspend();
129: }
130:
131: public void resume() {
132: super .resume();
133: bb.resume();
134: startThread();
135: }
136:
137: public void stop() {
138: super .stop();
139: stopThread();
140: if (bb != null) {
141: sb.releaseService(this , BlackboardForAgent.class, null);
142: bb = null;
143: }
144: }
145:
146: public void unload() {
147: super .unload();
148:
149: if (mss != null) {
150: // mss.unregister?
151: sb.releaseService(this , MessageSwitchService.class, mss);
152: mss = null;
153: }
154: }
155:
156: private void startThread() {
157: synchronized (lock) {
158: if (!isStarted) {
159: getHandler().start();
160: isStarted = true;
161: }
162: }
163: }
164:
165: private void stopThread() {
166: synchronized (lock) {
167: if (isStarted) {
168: getHandler().halt();
169: isStarted = false;
170: body = null;
171: }
172: }
173: }
174:
175: private final void receiveMessages(List messages) {
176: try {
177: bb.receiveMessages(messages);
178: } catch (Exception e) {
179: log
180: .error(
181: "Uncaught Exception while handling Queued Messages",
182: e);
183: }
184: }
185:
186: private QueueHandlerBody getHandler() {
187: synchronized (lock) {
188: if (body == null) {
189: QueueClient qc = new QueueClient() {
190: public MessageAddress getMessageAddress() {
191: return localAgent;
192: }
193:
194: public void receiveQueuedMessages(List messages) {
195: receiveMessages(messages);
196: }
197: };
198: ThreadService tsvc = (ThreadService) sb.getService(
199: this , ThreadService.class, null);
200: body = new QueueHandlerBody(qc, tsvc);
201: sb.releaseService(this , ThreadService.class, tsvc);
202: }
203: return body;
204: }
205: }
206:
207: interface QueueClient {
208: MessageAddress getMessageAddress();
209:
210: void receiveQueuedMessages(List messages);
211: }
212:
213: private static final class QueueHandlerBody implements Runnable {
214: private QueueClient client;
215: private final List queue = new ArrayList();
216: private final List msgs = new ArrayList();
217: private boolean ready = false;
218: private boolean active = false;
219: private Schedulable sched;
220:
221: public QueueHandlerBody(QueueClient client, ThreadService tsvc) {
222: this .client = client;
223: sched = tsvc.getThread(this , this , client
224: .getMessageAddress()
225: + "/RQ");
226: }
227:
228: void start() {
229: synchronized (queue) {
230: ready = true;
231: sched.start();
232: }
233: }
234:
235: public void halt() {
236: synchronized (queue) {
237: ready = false;
238: sched.cancel();
239: while (active) {
240: try {
241: queue.wait();
242: } catch (InterruptedException ie) {
243: }
244: }
245: }
246: client = null;
247: }
248:
249: public void run() {
250: synchronized (queue) {
251: if (!ready) {
252: return;
253: }
254: if (queue.isEmpty()) {
255: return;
256: }
257:
258: active = true;
259: msgs.addAll(queue);
260: queue.clear();
261: }
262: if (!msgs.isEmpty()) {
263: client.receiveQueuedMessages(msgs);
264: msgs.clear();
265: }
266: synchronized (queue) {
267: active = false;
268: queue.notify(); // only used for halt()
269: }
270: }
271:
272: public void addMessage(ClusterMessage m) {
273: synchronized (queue) {
274: queue.add(m);
275: if (ready)
276: sched.start(); // restart
277: }
278: }
279: }
280: }
|