001: /*
002: * <copyright>
003: *
004: * Copyright 2002-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.core.wp.resolver;
028:
029: import java.util.ArrayList;
030: import java.util.Collection;
031: import java.util.List;
032: import org.cougaar.core.agent.service.MessageSwitchService;
033: import org.cougaar.core.component.Component;
034: import org.cougaar.core.component.Service;
035: import org.cougaar.core.component.ServiceBroker;
036: import org.cougaar.core.component.ServiceRevokedListener;
037: import org.cougaar.core.mts.Message;
038: import org.cougaar.core.mts.MessageAddress;
039: import org.cougaar.core.mts.MessageHandler;
040: import org.cougaar.core.service.AgentIdentificationService;
041: import org.cougaar.core.service.LoggingService;
042: import org.cougaar.core.service.ThreadService;
043: import org.cougaar.core.thread.Schedulable;
044: import org.cougaar.util.GenericStateModelAdapter;
045:
046: /**
047: * This component is a base class that handles {@link
048: * MessageSwitchService} details for the {@link ClientTransport}.
049: * <p>
050: * This is nearly generic; with a bit more work it could be a
051: * useful generic base class.
052: */
053: public abstract class TransportBase extends GenericStateModelAdapter
054: implements Component {
055:
056: protected ServiceBroker sb;
057: protected LoggingService logger;
058: protected MessageAddress agentId;
059: protected ThreadService threadService;
060:
061: private MessageSwitchService messageSwitchService;
062:
063: private final Object sendLock = new Object();
064: private List sendQueue;
065:
066: private Schedulable receiveThread;
067: private final List receiveQueue = new ArrayList();
068: private final List receiveTmp = new ArrayList();
069:
070: public void setServiceBroker(ServiceBroker sb) {
071: this .sb = sb;
072: }
073:
074: public void setLoggingService(LoggingService logger) {
075: this .logger = logger;
076: }
077:
078: public void setThreadService(ThreadService threadService) {
079: this .threadService = threadService;
080: }
081:
082: public void load() {
083: super .load();
084:
085: if (logger.isDebugEnabled()) {
086: logger.debug("Loading resolver remote handler");
087: }
088:
089: // which agent are we in?
090: AgentIdentificationService ais = (AgentIdentificationService) sb
091: .getService(this , AgentIdentificationService.class,
092: null);
093: agentId = ais.getMessageAddress();
094: sb.releaseService(this , AgentIdentificationService.class, ais);
095:
096: Runnable receiveRunner = new Runnable() {
097: public void run() {
098: // assert (thread == receiveThread);
099: receiveNow();
100: }
101: };
102: receiveThread = threadService.getThread(this , receiveRunner,
103: "White pages client handle incoming responses");
104:
105: // register our message switch (now or later)
106: ServiceFinder.Callback sfc = new ServiceFinder.Callback() {
107: public void foundService(Service s) {
108: TransportBase.this .foundService(s);
109: }
110: };
111: ServiceFinder.findServiceLater(sb, MessageSwitchService.class,
112: null, sfc);
113: }
114:
115: public void unload() {
116: MessageSwitchService mss;
117: synchronized (sendLock) {
118: mss = messageSwitchService;
119: messageSwitchService = null;
120: }
121: if (mss != null) {
122: //mss.removeMessageHandler(myMessageHandler);
123: sb.releaseService(this , MessageSwitchService.class, mss);
124: mss = null;
125: }
126:
127: if (threadService != null) {
128: // halt our threads?
129: sb.releaseService(this , ThreadService.class, threadService);
130: threadService = null;
131: }
132: if (logger != null) {
133: sb.releaseService(this , LoggingService.class, logger);
134: logger = null;
135: }
136:
137: super .unload();
138: }
139:
140: private void foundService(Service s) {
141: // service broker now has the MessageSwitchService
142: //
143: // should we do this in a separate thread?
144: if (hasMessageTransport()) {
145: if (logger.isErrorEnabled()) {
146: logger.error("Already obtained our message switch");
147: }
148: return;
149: }
150: if (!(s instanceof MessageSwitchService)) {
151: if (logger.isErrorEnabled()) {
152: logger.error("Unable to obtain MessageSwitchService");
153: }
154: return;
155: }
156: MessageSwitchService mss = (MessageSwitchService) s;
157: MessageHandler myMessageHandler = new MessageHandler() {
158: public boolean handleMessage(Message m) {
159: return receive(m);
160: }
161: };
162: mss.addMessageHandler(myMessageHandler);
163: if (logger.isInfoEnabled()) {
164: logger.info("Registered with message transport");
165: }
166: synchronized (sendLock) {
167: messageSwitchService = mss;
168: }
169: foundMessageTransport();
170: }
171:
172: protected void foundMessageTransport() {
173: flushSendQueueLater();
174: }
175:
176: protected boolean hasMessageTransport() {
177: synchronized (sendLock) {
178: return (messageSwitchService != null);
179: }
180: }
181:
182: protected void sendOrQueue(Message m) {
183: List l = null;
184: MessageSwitchService mss;
185: synchronized (sendLock) {
186: mss = messageSwitchService;
187: if (mss == null) {
188: if (m != null) {
189: // queue to send once the MTS is up
190: if (sendQueue == null) {
191: sendQueue = new ArrayList();
192: }
193: sendQueue.add(m);
194: }
195: return;
196: } else if (sendQueue != null) {
197: // flush pending messages
198: l = sendQueue;
199: sendQueue = null;
200: } else {
201: // typical case
202: }
203: }
204: if (l != null) {
205: // flush pending messages
206: for (int i = 0, n = l.size(); i < n; i++) {
207: Message qm = (Message) l.get(i);
208: send(mss, qm);
209: }
210: }
211: if (m != null) {
212: send(mss, m);
213: }
214: }
215:
216: private void send(MessageSwitchService mss, Message m) {
217: // assert (messageSwitchService != null);
218: if (logger.isDetailEnabled()) {
219: logger.detail("sending message: " + m);
220: }
221: mss.sendMessage(m);
222: }
223:
224: private void flushSendQueueLater() {
225: // send queued messages
226: Runnable flushSendQueueRunner = new Runnable() {
227: public void run() {
228: sendOrQueue(null);
229: }
230: };
231: Schedulable flushSendQueueThread = threadService.getThread(
232: this , flushSendQueueRunner,
233: "Flush queued output messages");
234: flushSendQueueThread.start();
235: }
236:
237: //
238: // receive:
239: //
240:
241: protected abstract boolean shouldReceive(Message m);
242:
243: protected boolean receive(Message m) {
244: if (shouldReceive(m)) {
245: receiveLater(m);
246: return true;
247: }
248: return false;
249: }
250:
251: private void receiveLater(Message m) {
252: // queue to run in our thread
253: synchronized (receiveQueue) {
254: receiveQueue.add(m);
255: }
256: receiveThread.start();
257: }
258:
259: private void receiveNow() {
260: synchronized (receiveQueue) {
261: if (receiveQueue.isEmpty()) {
262: if (logger.isDetailEnabled()) {
263: logger.detail("input queue is empty");
264: }
265: return;
266: }
267: receiveTmp.addAll(receiveQueue);
268: receiveQueue.clear();
269: }
270: receiveNow(receiveTmp);
271: receiveTmp.clear();
272: }
273:
274: protected void receiveNow(List l) {
275: for (int i = 0, n = l.size(); i < n; i++) {
276: Message m = (Message) l.get(i);
277: receiveNow(m);
278: }
279: }
280:
281: protected abstract void receiveNow(Message m);
282:
283: }
|