001: /* ====================================================================
002: * The LateralNZ Software License, Version 1.0
003: *
004: * Copyright (c) 2003 LateralNZ. All rights reserved.
005: *
006: * Redistribution and use in source and binary forms, with or without
007: * modification, are permitted provided that the following conditions
008: * are met:
009: *
010: * 1. Redistributions of source code must retain the above copyright
011: * notice, this list of conditions and the following disclaimer.
012: *
013: * 2. Redistributions in binary form must reproduce the above copyright
014: * notice, this list of conditions and the following disclaimer in
015: * the documentation and/or other materials provided with the
016: * distribution.
017: *
018: * 3. The end-user documentation included with the redistribution,
019: * if any, must include the following acknowledgment:
020: * "This product includes software developed by
021: * LateralNZ (http://www.lateralnz.org/) and other third parties."
022: * Alternately, this acknowledgment may appear in the software itself,
023: * if and wherever such third-party acknowledgments normally appear.
024: *
025: * 4. The names "LateralNZ" must not be used to endorse or promote
026: * products derived from this software without prior written
027: * permission. For written permission, please
028: * contact oss@lateralnz.org.
029: *
030: * 5. Products derived from this software may not be called "Panther",
031: * or "Lateral" or "LateralNZ", nor may "PANTHER" or "LATERAL" or
032: * "LATERALNZ" appear in their name, without prior written
033: * permission of LateralNZ.
034: *
035: * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
036: * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
037: * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
038: * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
039: * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
040: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
041: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
042: * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
043: * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
044: * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
045: * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
046: * SUCH DAMAGE.
047: * ====================================================================
048: *
049: * This software consists of voluntary contributions made by many
050: * individuals on behalf of LateralNZ. For more
051: * information on Lateral, please see http://www.lateralnz.com/ or
052: * http://www.lateralnz.org
053: *
054: */
055: package org.lateralnz.messaging.broadcast;
056:
057: import java.io.IOException;
058: import java.io.Serializable;
059: import java.net.DatagramPacket;
060: import java.net.InetAddress;
061: import java.net.DatagramSocket;
062: import java.util.Arrays;
063:
064: import org.apache.log4j.Logger;
065:
066: import org.lateralnz.common.util.Constants;
067: import org.lateralnz.common.util.SystemUtils;
068: import org.lateralnz.common.wrapper.IntHolder;
069: import org.lateralnz.messaging.AbstractMessageHandler;
070: import org.lateralnz.messaging.Message;
071: import org.lateralnz.messaging.MessageHandler;
072: import org.lateralnz.messaging.util.PacketUtils;
073:
074: /**
075: * a messaging service that uses IP multicast to transmit messages
076: *
077: * @author J R Briggs
078: */
079: public class BroadcastMessageHandler extends AbstractMessageHandler
080: implements Runnable, MessageHandler, Constants, Serializable {
081: private static final Logger log = Logger
082: .getLogger(BroadcastMessageHandler.class.getName());
083: private static final int HEADER_SIZE = 12; // size of the head (ID[3], MAX[3], ORDER[3])
084: private static final int SPACER_SIZE = 20; // a spacer (just in case)
085:
086: private boolean running = true;
087:
088: private byte[] localhost2 = SystemUtils.getLocalhostIP();
089: private byte[] localhost1 = { (byte) 127, (byte) 0, (byte) 0,
090: (byte) 1 };
091: protected DatagramSocket sock; // the socket to uses for comms
092: private int maxbuf; // calculated max buffer size
093: private int receiveBufSize; // the receive buffer size for the socket
094: private IntHolder msgID = new IntHolder(0); // ID number to use with fragmented packets
095: protected Thread listenerThread;
096:
097: protected BroadcastMessageHandler(String ipaddress, int port)
098: throws Exception {
099: this (ipaddress, port, new DatagramSocket(port, InetAddress
100: .getByName(ipaddress)), false);
101: sock.setBroadcast(true);
102:
103: listenerThread.start();
104: }
105:
106: protected BroadcastMessageHandler(String ipaddress, int port,
107: DatagramSocket sock) throws Exception {
108: this (ipaddress, port, sock, true);
109: }
110:
111: protected BroadcastMessageHandler(String ipaddress, int port,
112: DatagramSocket sock, boolean start) throws Exception {
113: setAddress(InetAddress.getByName(ipaddress), port);
114: this .sock = sock;
115:
116: maxbuf = sock.getSendBufferSize() - (HEADER_SIZE + SPACER_SIZE);
117: receiveBufSize = sock.getReceiveBufferSize();
118:
119: listenerThread = new Thread(this );
120:
121: if (start) {
122: listenerThread.start();
123: }
124: }
125:
126: private final boolean isLocal(InetAddress addr) {
127: byte[] b = addr.getAddress();
128: return (Arrays.equals(b, localhost1) || Arrays.equals(b,
129: localhost2));
130: }
131:
132: /**
133: * packets are received during the run loop
134: */
135: public void run() {
136: if (log.isDebugEnabled()) {
137: log.debug("packet listener running");
138: }
139: byte[] buf = new byte[receiveBufSize];
140: DatagramPacket packet = new DatagramPacket(buf, buf.length);
141: long time = System.currentTimeMillis();
142:
143: loop: while (running) {
144: try {
145: // get the next packet
146: sock.receive(packet);
147:
148: if (log.isDebugEnabled()) {
149: log.debug("received packet");
150: }
151:
152: if (!isLocal(packet.getAddress())) {
153: Object obj = PacketUtils.reconstitute(packet,
154: receiveBufSize);
155: if (obj != null) {
156: Message msg = (Message) obj;
157: if (log.isDebugEnabled()) {
158: log.debug("received event with group "
159: + msg.getGroup());
160: }
161:
162: notifyListeners(msg);
163: }
164: }
165: // reset the packet
166: packet.setLength(buf.length);
167: } catch (Exception e) {
168: e.printStackTrace();
169: }
170:
171: PacketUtils.dumpOldData();
172: try {
173: Thread.sleep(10);
174: Thread.yield();
175: } catch (InterruptedException ie) {
176: }
177: }
178: }
179:
180: /**
181: * send a message (serialized, fragmented if necessary, then broadcast)
182: */
183: public void send(Message msg) throws IOException {
184: if (!willTransmit(msg.getGroup())) {
185: return;
186: } else if (log.isDebugEnabled()) {
187: log.debug("sending message " + msg.toString());
188: }
189:
190: try {
191: DatagramPacket[] packets = PacketUtils.split(msg, addr,
192: port, maxbuf);
193: for (int i = 0; i < packets.length; i++) {
194: if (log.isDebugEnabled()) {
195: log.debug("sending packet : " + (i + 1) + " of "
196: + packets.length);
197: }
198: sock.send(packets[i]);
199: }
200: } catch (Exception e) {
201: log.error(e);
202: }
203: }
204: }
|