001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.core.agent;
028:
029: import java.io.Serializable;
030: import java.util.ArrayList;
031: import java.util.Iterator;
032: import java.util.List;
033: import org.cougaar.bootstrap.SystemProperties;
034: import org.cougaar.core.agent.service.MessageSwitchService;
035: import org.cougaar.core.component.Component;
036: import org.cougaar.core.component.ServiceBroker;
037: import org.cougaar.core.component.ServiceProvider;
038: import org.cougaar.core.component.ServiceRevokedListener;
039: import org.cougaar.core.mts.AgentState;
040: import org.cougaar.core.mts.Attributes;
041: import org.cougaar.core.mts.Message;
042: import org.cougaar.core.mts.MessageAddress;
043: import org.cougaar.core.mts.MessageHandler;
044: import org.cougaar.core.mts.MessageTransportClient;
045: import org.cougaar.core.persist.PersistenceClient;
046: import org.cougaar.core.persist.PersistenceIdentity;
047: import org.cougaar.core.persist.PersistenceService;
048: import org.cougaar.core.persist.RehydrationData;
049: import org.cougaar.core.service.AgentIdentificationService;
050: import org.cougaar.core.service.LoggingService;
051: import org.cougaar.core.service.MessageTransportService;
052: import org.cougaar.core.wp.WhitePagesMessage;
053: import org.cougaar.util.GenericStateModelAdapter;
054: import org.cougaar.util.log.Logging;
055:
056: /**
057: * This component registers the agent in the
058: * {@link MessageTransportService} and forwards all received
059: * messages through the {@link MessageSwitchService} to the
060: * agent-internal message handlers.
061: *
062: * @property org.cougaar.core.agent.showTraffic
063: * If <em>true</em>, shows '+' and '-' on message sends and receives
064: * except for white pages messages. If <em>wp</em>, then shows
065: * both the above +/- for regular send/receive and W/w for white
066: * pages send/receive.
067: *
068: * @property org.cougaar.core.agent.quiet
069: * Makes standard output as quiet as possible.
070: */
071: public final class MessageSwitch extends GenericStateModelAdapter
072: implements Component {
073:
074: // maybe move to parameters
075: private static final boolean showTraffic;
076: private static final boolean showWhitePagesTraffic;
077: static {
078: boolean isQuiet = SystemProperties
079: .getBoolean("org.cougaar.core.agent.quiet");
080: if (isQuiet) {
081: showTraffic = false;
082: showWhitePagesTraffic = false;
083: } else {
084: String trafficParam = SystemProperties.getProperty(
085: "org.cougaar.core.agent.showTraffic", "true");
086: if ("true".equalsIgnoreCase(trafficParam)) {
087: showTraffic = true;
088: showWhitePagesTraffic = false;
089: } else if ("wp".equalsIgnoreCase(trafficParam)) {
090: showTraffic = true;
091: showWhitePagesTraffic = true;
092: } else {
093: showTraffic = false;
094: showWhitePagesTraffic = false;
095: }
096: }
097: }
098:
099: private ServiceBroker sb;
100:
101: private LoggingService log;
102: private MessageTransportService messenger;
103: private ReconcileAddressWatcherService raws;
104:
105: private PersistenceService ps;
106: private PersistenceClient pc;
107:
108: private MessageSwitchService mss;
109:
110: private MessageSwitchUnpendServiceProvider msusp;
111:
112: private MessageSwitchShutdownServiceProvider msssp;
113:
114: private MessageAddress localAgent;
115: private long localIncarnation;
116:
117: private MessageTransportClient mtsClientAdapter;
118:
119: private MessageSwitchImpl msi;
120: private MessageSwitchServiceProvider mssp;
121:
122: // state from "suspend()", which is copied into the mobility state
123: private List unsentMessages;
124: private AgentState mtsState;
125:
126: public void setServiceBroker(ServiceBroker sb) {
127: this .sb = sb;
128: }
129:
130: public void load() {
131: super .load();
132:
133: log = (LoggingService) sb.getService(this ,
134: LoggingService.class, null);
135:
136: AgentIdentificationService ais = (AgentIdentificationService) sb
137: .getService(this , AgentIdentificationService.class,
138: null);
139: if (ais != null) {
140: localAgent = ais.getMessageAddress();
141: sb.releaseService(this , AgentIdentificationService.class,
142: ais);
143: }
144:
145: raws = (ReconcileAddressWatcherService) sb.getService(this ,
146: ReconcileAddressWatcherService.class, null);
147: if (raws == null) {
148: throw new RuntimeException(
149: "Unable to obtain ReconcileAddressWatcherService");
150: }
151:
152: register_persistence();
153: Object o = rehydrate();
154: if (o instanceof AgentMessageTransportState) {
155: // fill in prior (mobile) identity
156: AgentMessageTransportState amts = (AgentMessageTransportState) o;
157: unsentMessages = amts.unsentMessages;
158: mtsState = amts.mtsState;
159: }
160: o = null;
161:
162: find_local_incarnation();
163: load_create_message_switch();
164: load_internal_register_with_mts();
165: resume_resend_unsent_messages();
166: load_add_message_switch_service();
167: load_add_outgoing_traffic_watcher();
168:
169: msusp = new MessageSwitchUnpendServiceProvider();
170: sb.addService(MessageSwitchUnpendService.class, msusp);
171:
172: msssp = new MessageSwitchShutdownServiceProvider();
173: sb.addService(MessageSwitchShutdownService.class, msssp);
174:
175: // called later via MessageSwitchUnpendService:
176: //load_unpend_messages();
177: }
178:
179: public void suspend() {
180: super .suspend();
181: // called earlier via MessageSwitchShutdownService:
182: //suspend_unregister_from_mts();
183: suspend_shutdown_mts();
184: }
185:
186: public void resume() {
187: super .resume();
188: resume_register_with_mts();
189: // called later via MessageSwitchShutdownService:
190: //resume_resend_unsent_messages();
191: }
192:
193: public void unload() {
194: super .unload();
195:
196: if (msssp != null) {
197: sb.revokeService(MessageSwitchShutdownService.class, msssp);
198: msssp = null;
199: }
200: if (msusp != null) {
201: sb.revokeService(MessageSwitchUnpendService.class, msusp);
202: msusp = null;
203: }
204:
205: unload_remove_outgoing_traffic_watcher();
206: unload_revoke_message_switch_service();
207:
208: unregister_persistence();
209:
210: if (raws != null) {
211: sb.releaseService(this ,
212: ReconcileAddressWatcherService.class, raws);
213: raws = null;
214: }
215: }
216:
217: private void find_local_incarnation() {
218: // get the local agent's incarnation number from the
219: // TopologyService
220: if (log.isInfoEnabled()) {
221: log.info("Finding the local agent's incarnation");
222: }
223:
224: TopologyService ts = (TopologyService) sb.getService(this ,
225: TopologyService.class, null);
226: if (ts != null) {
227: localIncarnation = ts.getIncarnationNumber();
228: sb.releaseService(this , TopologyService.class, ts);
229: }
230: }
231:
232: private void load_create_message_switch() {
233: msi = new MessageSwitchImpl(log);
234: }
235:
236: private void load_internal_register_with_mts() {
237: // register with the message transport
238: if (log.isInfoEnabled()) {
239: log.info("Registering with the message transport");
240: }
241:
242: mtsClientAdapter = new MessageTransportClient() {
243: public void receiveMessage(Message message) {
244: receive(message);
245: }
246:
247: public MessageAddress getMessageAddress() {
248: return localAgent;
249: }
250:
251: public long getIncarnationNumber() {
252: return localIncarnation;
253: }
254: };
255:
256: messenger = (MessageTransportService) sb.getService(
257: mtsClientAdapter, MessageTransportService.class, null);
258: if (messenger == null) {
259: throw new RuntimeException(
260: "Unable to obtain MessageTransportService");
261: }
262:
263: if (mtsState != null) {
264: messenger.getAgentState().mergeAttributes(mtsState);
265: mtsState = null;
266: }
267:
268: messenger.registerClient(mtsClientAdapter);
269: }
270:
271: private void load_add_message_switch_service() {
272: mssp = new MessageSwitchServiceProvider();
273: sb.addService(MessageSwitchService.class, mssp);
274: }
275:
276: private void load_add_outgoing_traffic_watcher() {
277: if (!showTraffic) {
278: return;
279: }
280:
281: mss = (MessageSwitchService) sb.getService(this ,
282: MessageSwitchService.class, null);
283:
284: // register message handler to observe all incoming messages
285: MessageHandler mh = new MessageHandler() {
286: public boolean handleMessage(Message message) {
287: if (message instanceof WhitePagesMessage) {
288: if (showWhitePagesTraffic) {
289: Logging.printDot("w");
290: }
291: } else {
292: Logging.printDot("-");
293: }
294: return false; // don't ever consume it
295: }
296: };
297: mss.addMessageHandler(mh);
298: }
299:
300: private Object captureState() {
301: if (getModelState() == ACTIVE) {
302: if (log.isDebugEnabled()) {
303: log.debug("Ignoring persist while active");
304: }
305: return null;
306: }
307:
308: AgentMessageTransportState amts = new AgentMessageTransportState();
309: amts.unsentMessages = unsentMessages;
310: amts.mtsState = mtsState;
311: return amts;
312: }
313:
314: private void register_persistence() {
315: // get persistence
316: pc = new PersistenceClient() {
317: public PersistenceIdentity getPersistenceIdentity() {
318: String id = getClass().getName();
319: return new PersistenceIdentity(id);
320: }
321:
322: public List getPersistenceData() {
323: Object o = captureState();
324: // must return mutable list!
325: List l = new ArrayList(1);
326: l.add(o);
327: return l;
328: }
329: };
330: ps = (PersistenceService) sb.getService(pc,
331: PersistenceService.class, null);
332: }
333:
334: private void unregister_persistence() {
335: if (ps != null) {
336: sb.releaseService(pc, PersistenceService.class, ps);
337: ps = null;
338: pc = null;
339: }
340: }
341:
342: private Object rehydrate() {
343: RehydrationData rd = ps.getRehydrationData();
344: if (rd == null) {
345: if (log.isInfoEnabled()) {
346: log.info("No rehydration data found");
347: }
348: return null;
349: }
350:
351: List l = rd.getObjects();
352: rd = null;
353: int lsize = (l == null ? 0 : l.size());
354: if (lsize < 1) {
355: if (log.isInfoEnabled()) {
356: log.info("Invalid rehydration list? " + l);
357: }
358: return null;
359: }
360: Object o = l.get(0);
361: if (o == null) {
362: if (log.isInfoEnabled()) {
363: log.info("Null rehydration state?");
364: }
365: return null;
366: }
367:
368: if (log.isInfoEnabled()) {
369: log.info("Found rehydrated state");
370: if (log.isDetailEnabled()) {
371: log.detail("state is " + o);
372: }
373: }
374:
375: return o;
376: }
377:
378: private void load_unpend_messages() {
379: msi.unpendMessages();
380: }
381:
382: private void suspend_unregister_from_mts() {
383: // shutdown the MTS
384: if (log.isInfoEnabled()) {
385: log.info("Shutting down message transport");
386: }
387:
388: if (messenger != null) {
389: messenger.unregisterClient(mtsClientAdapter);
390: }
391: }
392:
393: private void suspend_shutdown_mts() {
394: if (messenger == null) {
395: unsentMessages = null;
396: mtsState = null;
397: return;
398: }
399:
400: // flush outgoing messages, block further input.
401: // get a list of unsent messages.
402: unsentMessages = messenger.flushMessages();
403:
404: // get MTS-internal state for this agent
405: mtsState = messenger.getAgentState();
406:
407: // release messenger, remove agent name-server entry.
408: sb.releaseService(mtsClientAdapter,
409: MessageTransportService.class, messenger);
410: messenger = null;
411: }
412:
413: private void resume_register_with_mts() {
414: // re-register MTS
415: if (messenger == null) {
416: if (log.isInfoEnabled()) {
417: log
418: .info("Registering with the message transport service");
419: }
420: messenger = (MessageTransportService) sb.getService(
421: mtsClientAdapter, MessageTransportService.class,
422: null);
423:
424: if (mtsState != null) {
425: messenger.getAgentState().mergeAttributes(mtsState);
426: mtsState = null;
427: }
428:
429: messenger.registerClient(mtsClientAdapter);
430: }
431:
432: if (log.isInfoEnabled()) {
433: log.info("Resuming message transport");
434: }
435: }
436:
437: private void resume_resend_unsent_messages() {
438: // send all unsent messages
439: List l = unsentMessages;
440: unsentMessages = null;
441: int n = (l == null ? 0 : l.size());
442: for (int i = 0; i < n; i++) {
443: Message cmi = (Message) l.get(i);
444: send(cmi);
445: }
446: }
447:
448: private void unload_remove_outgoing_traffic_watcher() {
449: if (!showTraffic) {
450: return;
451: }
452:
453: if (mss != null) {
454: // mss.unregister?
455: sb.releaseService(this , MessageSwitchService.class, mss);
456: mss = null;
457: }
458: }
459:
460: private void unload_revoke_message_switch_service() {
461: sb.revokeService(MessageSwitchService.class, mssp);
462: mssp = null;
463: }
464:
465: private void recordSend(MessageAddress addr) {
466: raws.sentMessageTo(addr);
467: }
468:
469: private void recordReceive(MessageAddress addr) {
470: raws.receivedMessageFrom(addr);
471: }
472:
473: private void send(Message message) {
474: if (message instanceof ClusterMessage) {
475: recordSend(message.getTarget());
476: }
477:
478: if (showTraffic) {
479: if (message instanceof WhitePagesMessage) {
480: if (showWhitePagesTraffic) {
481: Logging.printDot("W");
482: }
483: } else {
484: Logging.printDot("+");
485: }
486: }
487: try {
488: if (messenger == null) {
489: if (log.isWarnEnabled()) {
490: log.warn("MessageTransport unavailable, dropped: "
491: + message);
492: }
493: return;
494: }
495: messenger.sendMessage(message);
496: } catch (Exception ex) {
497: if (log.isErrorEnabled()) {
498: log.error("Problem sending message", ex);
499: }
500: }
501: }
502:
503: private void receive(Message message) {
504: if (message instanceof ClusterMessage) {
505: recordReceive(message.getTarget());
506: }
507:
508: if (message.getTarget()
509: .equals(MessageAddress.MULTICAST_SOCIETY)) {
510: if (log.isWarnEnabled()) {
511: log.warn("Ignoring message received by " + localAgent
512: + " with target MULTICAST_SOCIETY");
513: }
514: return;
515: }
516:
517: try {
518: // messageHandler will now pend and warn about unhandled messages
519: msi.handleMessage(message);
520: } catch (Exception e) {
521: log.error("Uncaught Exception while handling Message ("
522: + message.getClass() + "): " + message, e);
523: }
524: }
525:
526: private class MessageSwitchServiceProvider implements
527: ServiceProvider {
528: private final MessageSwitchService mss;
529:
530: public MessageSwitchServiceProvider() {
531: mss = new MessageSwitchService() {
532: public void sendMessage(Message m) {
533: send(m);
534: }
535:
536: public void addMessageHandler(MessageHandler mh) {
537: msi.addMessageHandler(mh);
538: }
539:
540: public MessageAddress getMessageAddress() {
541: return localAgent;
542: }
543: };
544: }
545:
546: public Object getService(ServiceBroker sb, Object requestor,
547: Class serviceClass) {
548: if (MessageSwitchService.class
549: .isAssignableFrom(serviceClass)) {
550: return mss;
551: } else {
552: return null;
553: }
554: }
555:
556: public void releaseService(ServiceBroker sb, Object requestor,
557: Class serviceClass, Object service) {
558: }
559: }
560:
561: private final class MessageSwitchUnpendServiceProvider implements
562: ServiceProvider {
563: private final MessageSwitchUnpendService msus;
564:
565: public MessageSwitchUnpendServiceProvider() {
566: msus = new MessageSwitchUnpendService() {
567: public void unpendMessages() {
568: load_unpend_messages();
569: }
570: };
571: }
572:
573: public Object getService(ServiceBroker sb, Object requestor,
574: Class serviceClass) {
575: if (MessageSwitchUnpendService.class
576: .isAssignableFrom(serviceClass)) {
577: return msus;
578: } else {
579: return null;
580: }
581: }
582:
583: public void releaseService(ServiceBroker sb, Object requestor,
584: Class serviceClass, Object service) {
585: }
586: }
587:
588: private final class MessageSwitchShutdownServiceProvider implements
589: ServiceProvider {
590: private final MessageSwitchShutdownService msss;
591:
592: public MessageSwitchShutdownServiceProvider() {
593: msss = new MessageSwitchShutdownService() {
594: public void shutdown() {
595: suspend_unregister_from_mts();
596: }
597:
598: public void restore() {
599: resume_resend_unsent_messages();
600: }
601: };
602: }
603:
604: public Object getService(ServiceBroker sb, Object requestor,
605: Class serviceClass) {
606: if (MessageSwitchShutdownService.class
607: .isAssignableFrom(serviceClass)) {
608: return msss;
609: } else {
610: return null;
611: }
612: }
613:
614: public void releaseService(ServiceBroker sb, Object requestor,
615: Class serviceClass, Object service) {
616: }
617: }
618:
619: private static final class AgentMessageTransportState implements
620: java.io.Serializable {
621: public List unsentMessages;
622: public AgentState mtsState;
623: }
624:
625: /**
626: * MessageSwitchImpl is a MessageHandler which calls an ordered
627: * list of other MessageHandler instances in order until
628: * one returns a true value from handle.
629: */
630: private static final class MessageSwitchImpl implements
631: MessageHandler {
632: private final LoggingService log;
633: /** List of MessageHandler instances */
634: private final List handlers = new ArrayList(11);
635: /** list of pending (unhandled) messages - protected by lock on handlers. */
636: private List pendingMessages = new ArrayList(11);
637:
638: public MessageSwitchImpl(LoggingService log) {
639: this .log = log;
640: }
641:
642: public boolean handleMessage(Message m) {
643: synchronized (handlers) {
644: for (int i = 0, l = handlers.size(); i < l; i++) {
645: MessageHandler h = (MessageHandler) handlers.get(i);
646: if (h.handleMessage(m))
647: return true;
648: }
649: pendMessage(m);
650: }
651: return false;
652: }
653:
654: public void addMessageHandler(MessageHandler mh) {
655: synchronized (handlers) {
656: handlers.add(mh);
657: resubmitPendingMessages(mh);
658: }
659: }
660:
661: public void removeMessageHandler(MessageHandler mh) {
662: synchronized (handlers) {
663: handlers.remove(mh);
664: }
665: }
666:
667: // must be called within synchronized(handlers), e.g. only from addMessageHandler
668: private void resubmitPendingMessages(MessageHandler mh) {
669: if (pendingMessages == null) {
670: return;
671: }
672: for (Iterator it = pendingMessages.iterator(); it.hasNext();) {
673: Message m = (Message) it.next();
674: try {
675: boolean handled = mh.handleMessage(m);
676: if (handled) {
677: if (log.isInfoEnabled()) {
678: log
679: .info("Handled previously unhandled Message ("
680: + m.getClass() + "): " + m);
681: }
682: it.remove();
683: } else {
684: // probably not worth the effort...
685: if (log.isDebugEnabled()) {
686: log
687: .debug("Still not handling pending message "
688: + m + " with handler " + mh);
689: }
690: }
691: } catch (Exception e) {
692: log.error(
693: "Uncaught Exception while resubmitting pending Message ("
694: + m.getClass() + "): " + m, e);
695: }
696: }
697: }
698:
699: // must be called within synchronized(handlers)
700: private void pendMessage(Message m) {
701: if (pendingMessages == null) {
702: logUnhandledMessage(m);
703: } else {
704: if (log.isInfoEnabled()) {
705: log.info("Delaying unhandled Message ("
706: + m.getClass() + "): " + m);
707: }
708: pendingMessages.add(m);
709: }
710: }
711:
712: private void logUnhandledMessage(Message m) {
713: log.error("Dropping unhandled Message (" + m.getClass()
714: + "): " + m);
715: }
716:
717: private void unpendMessages() {
718: List ms;
719: synchronized (handlers) {
720: assert pendingMessages != null;
721: ms = pendingMessages;
722: pendingMessages = null;
723: }
724:
725: for (Iterator it = ms.iterator(); it.hasNext();) {
726: Message m = (Message) it.next();
727: logUnhandledMessage(m);
728: }
729: }
730: }
731: }
|