001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.mts.std;
028:
029: import org.cougaar.core.component.ServiceBroker;
030: import org.cougaar.core.mts.AttributeConstants;
031: import org.cougaar.core.mts.Message;
032: import org.cougaar.core.mts.MessageAddress;
033: import org.cougaar.core.mts.MessageAttributes;
034: import org.cougaar.core.node.NodeIdentificationService;
035: import org.cougaar.core.service.ThreadService;
036: import org.cougaar.core.thread.Schedulable;
037: import org.cougaar.mts.base.CommFailureException;
038: import org.cougaar.mts.base.DestinationLink;
039: import org.cougaar.mts.base.DestinationLinkDelegateImplBase;
040: import org.cougaar.mts.base.MessageDeliverer;
041: import org.cougaar.mts.base.MessageDelivererDelegateImplBase;
042: import org.cougaar.mts.base.MessageReply;
043: import org.cougaar.mts.base.MisdeliveredMessageException;
044: import org.cougaar.mts.base.NameLookupException;
045: import org.cougaar.mts.base.SendQueue;
046: import org.cougaar.mts.base.StandardAspect;
047: import org.cougaar.mts.base.UnregisteredNameException;
048:
049: /**
050: * This test Aspect sends periodic 'heartbeat' messages to a specified
051: * destination, as given by the <code>dstAddr</code> parameter. Other
052: * parameters specify the delay, timeout and sendInterval of the
053: * hearbeat.
054: */
055: public class HeartBeatAspect extends StandardAspect implements
056: AttributeConstants, Runnable {
057:
058: private final String I_AM_A_HEARTBEAT_ATTRIBUTE = "i am a heartbeat";
059:
060: private MessageAddress hb_dest;
061: private long delay;
062: private long timeout;
063: private long sendInterval;
064: private long msgCount = 0;
065: private SendQueue sendq;
066: private MessageAddress us;
067:
068: public HeartBeatAspect() {
069: super ();
070: }
071:
072: public void load() {
073: super .load();
074: String dstAddr = getParameter("dstAddr", "NODE1");
075: hb_dest = MessageAddress.getMessageAddress(dstAddr);
076: delay = getParameter("delay", 1000);
077: timeout = getParameter("timeout", 1000);
078: sendInterval = getParameter("sendInterval", 500);
079:
080: }
081:
082: synchronized void maybeStartSending(SendQueue queue) {
083: if (sendq != null)
084: return;
085:
086: sendq = queue;
087:
088: ServiceBroker sb = getServiceBroker();
089:
090: NodeIdentificationService nisvc = (NodeIdentificationService) sb
091: .getService(this , NodeIdentificationService.class, null);
092: us = nisvc.getMessageAddress();
093: ThreadService tsvc = (ThreadService) sb.getService(this ,
094: ThreadService.class, null);
095: Schedulable sched = tsvc.getThread(this , this , "HeartBeater");
096: sched.schedule(10000, sendInterval);
097: }
098:
099: static class HBMessage extends Message {
100: HBMessage(MessageAddress src, MessageAddress dest) {
101: super (src, dest);
102: }
103: };
104:
105: public void run() {
106: if (sendq != null) {
107: Message message = new HBMessage(us, hb_dest);
108: AttributedMessage a_message = new AttributedMessage(message);
109: a_message.setAttribute(MESSAGE_SEND_DEADLINE_ATTRIBUTE,
110: new Long(System.currentTimeMillis() + timeout));
111: a_message.setAttribute(I_AM_A_HEARTBEAT_ATTRIBUTE,
112: new Long(msgCount++));
113: sendq.sendMessage(a_message);
114: System.out.println("Sending message " + a_message);
115: }
116: }
117:
118: //
119: // Aspect Code to implement TrafficRecord Collection
120:
121: public Object getDelegate(Object object, Class type) {
122: if (type == DestinationLink.class) {
123: return new HeartBeatDestinationLink(
124: (DestinationLink) object);
125: } else if (type == MessageDeliverer.class) {
126: return new MessageDelivererDelegate(
127: (MessageDeliverer) object);
128: } else if (type == SendQueue.class) {
129: // steal the sendqueue
130: maybeStartSending((SendQueue) object);
131:
132: return null;
133: } else {
134: return null;
135: }
136: }
137:
138: // Used to added Delay
139: public class HeartBeatDestinationLink extends
140: DestinationLinkDelegateImplBase {
141:
142: public HeartBeatDestinationLink(DestinationLink link) {
143: super (link);
144: }
145:
146: public MessageAttributes forwardMessage(
147: AttributedMessage message)
148: throws UnregisteredNameException, NameLookupException,
149: CommFailureException, MisdeliveredMessageException {
150: // Attempt to Deliver message
151: Object count = message
152: .getAttribute(I_AM_A_HEARTBEAT_ATTRIBUTE);
153: if (count != null) {
154: try {
155: Thread.sleep(delay);
156: } catch (InterruptedException ex) {
157: }
158: }
159: MessageAttributes meta = super .forwardMessage(message);
160: return meta;
161: }
162: }
163:
164: public class MessageDelivererDelegate extends
165: MessageDelivererDelegateImplBase {
166:
167: MessageDelivererDelegate(MessageDeliverer delegatee) {
168: super (delegatee);
169: }
170:
171: public MessageAttributes deliverMessage(
172: AttributedMessage message, MessageAddress dest)
173: throws MisdeliveredMessageException {
174: Object count = message
175: .getAttribute(I_AM_A_HEARTBEAT_ATTRIBUTE);
176: if (count != null) {
177: System.out.println("Recieved Heartbeat from="
178: + message.getOriginator() + "count=" + count);
179: MessageAttributes metadata = new MessageReply(message);
180: metadata.setAttribute(
181: MessageAttributes.DELIVERY_ATTRIBUTE,
182: MessageAttributes.DELIVERY_STATUS_DELIVERED);
183: return metadata;
184: } else {
185: return super.deliverMessage(message, dest);
186: }
187: }
188:
189: }
190:
191: }
|