001: /* ====================================================================
002: * The LateralNZ Software License, Version 1.0
003: *
004: * Copyright (c) 2003 LateralNZ. All rights reserved.
005: *
006: * Redistribution and use in source and binary forms, with or without
007: * modification, are permitted provided that the following conditions
008: * are met:
009: *
010: * 1. Redistributions of source code must retain the above copyright
011: * notice, this list of conditions and the following disclaimer.
012: *
013: * 2. Redistributions in binary form must reproduce the above copyright
014: * notice, this list of conditions and the following disclaimer in
015: * the documentation and/or other materials provided with the
016: * distribution.
017: *
018: * 3. The end-user documentation included with the redistribution,
019: * if any, must include the following acknowledgment:
020: * "This product includes software developed by
021: * LateralNZ (http://www.lateralnz.org/) and other third parties."
022: * Alternately, this acknowledgment may appear in the software itself,
023: * if and wherever such third-party acknowledgments normally appear.
024: *
025: * 4. The names "LateralNZ" must not be used to endorse or promote
026: * products derived from this software without prior written
027: * permission. For written permission, please
028: * contact oss@lateralnz.org.
029: *
030: * 5. Products derived from this software may not be called "Panther",
031: * or "Lateral" or "LateralNZ", nor may "PANTHER" or "LATERAL" or
032: * "LATERALNZ" appear in their name, without prior written
033: * permission of LateralNZ.
034: *
035: * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
036: * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
037: * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
038: * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
039: * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
040: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
041: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
042: * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
043: * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
044: * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
045: * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
046: * SUCH DAMAGE.
047: * ====================================================================
048: *
049: * This software consists of voluntary contributions made by many
050: * individuals on behalf of LateralNZ. For more
051: * information on Lateral, please see http://www.lateralnz.com/ or
052: * http://www.lateralnz.org
053: *
054: */
055: package org.lateralnz.messaging.lrmp;
056:
057: import java.io.ByteArrayInputStream;
058: import java.io.IOException;
059: import java.io.Serializable;
060: import java.util.HashMap;
061:
062: import org.apache.log4j.Logger;
063:
064: import inria.net.lrmp.*;
065:
066: import org.lateralnz.common.model.TimestampedObject;
067: import org.lateralnz.common.util.Constants;
068: import org.lateralnz.common.util.NumericUtils;
069: import org.lateralnz.common.util.ObjectUtils;
070: import org.lateralnz.messaging.AbstractMessageHandler;
071: import org.lateralnz.messaging.Message;
072: import org.lateralnz.messaging.MessageHandler;
073: import org.lateralnz.messaging.util.PacketUtils;
074:
075: /**
076: * a messaging service that uses IP multicast to transmit messages
077: *
078: * @author J R Briggs
079: */
080: public class LRMPMessageHandler extends AbstractMessageHandler
081: implements LrmpEventHandler, MessageHandler, Constants,
082: Serializable {
083: private static final Logger log = Logger
084: .getLogger(LRMPMessageHandler.class.getName());
085:
086: Lrmp lrmp;
087: LrmpEntity sender = null;
088: String outfile = null;
089:
090: private HashMap receivedPackets = new HashMap();
091:
092: protected LRMPMessageHandler(String ipaddress, int port)
093: throws Exception {
094: LrmpProfile profile = new LrmpProfile();
095:
096: profile.setEventHandler(this );
097:
098: profile.reliability = LrmpProfile.NoLoss;
099:
100: profile.ordered = true;
101:
102: profile.throughput = LrmpProfile.AdaptedThroughput;
103:
104: profile.minRate = 8;
105: profile.maxRate = 64;
106:
107: profile.sendWindowSize = 64;
108: profile.rcvWindowSize = 64;
109:
110: try {
111: lrmp = new Lrmp(ipaddress, port, 32, profile);
112: } catch (LrmpException e) {
113: System.err.println("Failed to create Lrmp - " + e);
114: System.exit(1);
115: }
116:
117: lrmp.start();
118:
119: }
120:
121: public void send(Message msg) throws IOException {
122: if (!willTransmit(msg.getGroup())) {
123: if (log.isDebugEnabled()) {
124: log.debug("transmit disabled for " + msg.getGroup());
125: }
126: return;
127: } else if (log.isDebugEnabled()) {
128: log.debug("sending message " + msg.toString());
129: }
130:
131: try {
132:
133: LrmpPacket pack = new LrmpPacket();
134: int offset = pack.getOffset();
135: byte buffer[] = pack.getDataBuffer();
136:
137: // buffer[offset] = 0;
138: // offset += headerLen;
139: byte[] packetid = PacketUtils.getPacketID();
140: int headerLen = 1 + packetid.length;
141: // System.arraycopy(packetid, 0, buffer, offset, packetid.length);
142: // pack.setDataLength(packetid.length + headerLen);
143: // lrmp.send(pack);
144:
145: ByteArrayInputStream in = new ByteArrayInputStream(
146: ObjectUtils.serialize(msg));
147: int pos = 0;
148: while (in.available() > 0) {
149: pack = new LrmpPacket();
150: offset = pack.getOffset();
151: buffer = pack.getDataBuffer();
152:
153: int maxLen = pack.getMaxDataLength() - headerLen;
154:
155: int len = in.read(buffer, offset + headerLen, maxLen);
156: pack.setDataLength(len + headerLen);
157:
158: System.arraycopy(packetid, 0, buffer, offset + 1,
159: packetid.length);
160: if (in.available() > 0) {
161: buffer[offset] = 1;
162: } else {
163: buffer[offset] = 2;
164: }
165:
166: lrmp.send(pack);
167: }
168: } catch (IOException ioe) {
169: throw ioe;
170: } catch (Exception e) {
171: e.printStackTrace();
172: throw new IOException(e.getMessage());
173: }
174: }
175:
176: public void processData(LrmpPacket pack) {
177: if (!pack.isReliable()) {
178: log.error("unreliable packet " + pack);
179: return;
180: }
181:
182: if (sender == null) {
183: sender = pack.getSource();
184: }
185: // else if (sender.getID() != pack.getSource().getID()) {
186: // log.warn("wrong sender " + pack.getSource());
187: // return;
188: // }
189:
190: byte buffer[] = pack.getDataBuffer();
191: int offset = pack.getOffset();
192: int length = pack.getDataLength();
193: int packetid = NumericUtils.toInt(buffer, offset + 1, 4);
194: String key = pack.getAddress().getHostAddress() + packetid;
195:
196: // actual data
197: byte[] tmpbuf = new byte[buffer.length - (offset + 5)];
198: System.arraycopy(buffer, offset + 5, tmpbuf, 0, tmpbuf.length);
199:
200: TimestampedObject to;
201: byte[] receivedBuf;
202: if (receivedPackets.containsKey(key)) {
203: to = (TimestampedObject) receivedPackets.get(key);
204: receivedBuf = (byte[]) to.obj;
205: } else {
206: receivedBuf = new byte[0];
207: to = new TimestampedObject();
208: }
209:
210: byte[] newbuf = new byte[receivedBuf.length + tmpbuf.length];
211: System.arraycopy(receivedBuf, 0, newbuf, 0, receivedBuf.length);
212: System.arraycopy(tmpbuf, 0, newbuf, receivedBuf.length,
213: tmpbuf.length);
214: to.obj = newbuf;
215:
216: if (buffer[offset] == 0) {
217: receivedPackets.put(key, to);
218: } else {
219: receivedPackets.remove(key);
220:
221: try {
222: Object obj = ObjectUtils.deserialize(newbuf);
223: Message msg = (Message) obj;
224: if (log.isDebugEnabled()) {
225: log.debug("received event with group "
226: + msg.getGroup());
227: }
228: notifyListeners(msg);
229: } catch (Exception e) {
230: e.printStackTrace();
231: }
232: }
233: }
234:
235: public void processEvent(int event, Object obj) {
236: switch (event) {
237: case LrmpEventHandler.UNRECOVERABLE_SEQUENCE_ERROR:
238: LrmpErrorEvent err = (LrmpErrorEvent) obj;
239: if (err.source == sender) {
240: log.error("reception failure");
241: sender = null;
242: }
243: break;
244: case LrmpEventHandler.END_OF_SEQUENCE:
245: LrmpEntity s = (LrmpEntity) obj;
246: if (s == sender) {
247: log.error("sender gone");
248: sender = null;
249: }
250: break;
251: default:
252: break;
253: }
254: }
255: }
|