0001: package org.jgroups.protocols;
0002:
0003: import org.jgroups.Address;
0004: import org.jgroups.Message;
0005: import org.jgroups.Global;
0006: import org.jgroups.stack.IpAddress;
0007: import org.jgroups.util.BoundedList;
0008: import org.jgroups.util.Util;
0009:
0010: import java.io.IOException;
0011: import java.io.InterruptedIOException;
0012: import java.net.*;
0013: import java.util.*;
0014:
0015: /**
0016: * IP multicast transport based on UDP. Messages to the group (msg.dest == null) will
0017: * be multicast (to all group members), whereas point-to-point messages
0018: * (msg.dest != null) will be unicast to a single member. Uses a multicast and
0019: * a unicast socket.<p>
0020: * The following properties are read by the UDP protocol:
0021: * <ul>
0022: * <li> param mcast_addr - the multicast address to use; default is 228.8.8.8.
0023: * <li> param mcast_port - (int) the port that the multicast is sent on; default is 7600
0024: * <li> param ip_mcast - (boolean) flag whether to use IP multicast; default is true.
0025: * <li> param ip_ttl - the default time-to-live for multicast packets sent out on this
0026: * socket; default is 32.
0027: * <li> param use_packet_handler - boolean, defaults to false.
0028: * If set, the mcast and ucast receiver threads just put
0029: * the datagram's payload (a byte buffer) into a queue, from where a separate thread
0030: * will dequeue and handle them (unmarshal and pass up). This frees the receiver
0031: * threads from having to do message unmarshalling; this time can now be spent
0032: * receiving packets. If you have lots of retransmissions because of network
0033: * input buffer overflow, consider setting this property to true.
0034: * </ul>
0035: * @author Bela Ban
0036: * @version $Id: UDP.java,v 1.123.2.3 2007/04/27 08:03:51 belaban Exp $
0037: */
0038: public class UDP extends TP implements Runnable {
0039:
0040: /** Socket used for
0041: * <ol>
0042: * <li>sending unicast packets and
0043: * <li>receiving unicast packets
0044: * </ol>
0045: * The address of this socket will be our local address (<tt>local_addr</tt>) */
0046: DatagramSocket sock = null;
0047:
0048: /**
0049: * BoundedList<Integer> of the last 100 ports used. This is to avoid reusing a port for DatagramSocket
0050: */
0051: private static volatile BoundedList last_ports_used = null;
0052:
0053: /** Maintain a list of local ports opened by DatagramSocket. If this is 0, this option is turned off.
0054: * If bind_port is > 0, then this option will be ignored */
0055: int num_last_ports = 100;
0056:
0057: /** IP multicast socket for <em>receiving</em> multicast packets */
0058: MulticastSocket mcast_recv_sock = null;
0059:
0060: /** IP multicast socket for <em>sending</em> multicast packets */
0061: MulticastSocket mcast_send_sock = null;
0062:
0063: /** If we have multiple mcast send sockets, e.g. send_interfaces or send_on_all_interfaces enabled */
0064: MulticastSocket[] mcast_send_sockets = null;
0065:
0066: /**
0067: * Traffic class for sending unicast and multicast datagrams.
0068: * Valid values are (check {@link DatagramSocket#setTrafficClass(int)} ); for details):
0069: * <UL>
0070: * <LI><CODE>IPTOS_LOWCOST (0x02)</CODE>, <b>decimal 2</b></LI>
0071: * <LI><CODE>IPTOS_RELIABILITY (0x04)</CODE><, <b>decimal 4</b>/LI>
0072: * <LI><CODE>IPTOS_THROUGHPUT (0x08)</CODE>, <b>decimal 8</b></LI>
0073: * <LI><CODE>IPTOS_LOWDELAY (0x10)</CODE>, <b>decimal</b> 16</LI>
0074: * </UL>
0075: */
0076: int tos = 8; // valid values: 2, 4, 8 (default), 16
0077:
0078: /** The multicast address (mcast address and port) this member uses */
0079: IpAddress mcast_addr = null;
0080:
0081: /** The multicast address used for sending and receiving packets */
0082: String mcast_addr_name = "228.8.8.8";
0083:
0084: /** The multicast port used for sending and receiving packets */
0085: int mcast_port = 7600;
0086:
0087: /** The multicast receiver thread */
0088: Thread mcast_receiver = null;
0089:
0090: /** The unicast receiver thread */
0091: UcastReceiver ucast_receiver = null;
0092:
0093: /** Whether to enable IP multicasting. If false, multiple unicast datagram
0094: * packets are sent rather than one multicast packet */
0095: boolean ip_mcast = true;
0096:
0097: /** The time-to-live (TTL) for multicast datagram packets */
0098: int ip_ttl = 64;
0099:
0100: /** Send buffer size of the multicast datagram socket */
0101: int mcast_send_buf_size = 32000;
0102:
0103: /** Receive buffer size of the multicast datagram socket */
0104: int mcast_recv_buf_size = 64000;
0105:
0106: /** Send buffer size of the unicast datagram socket */
0107: int ucast_send_buf_size = 32000;
0108:
0109: /** Receive buffer size of the unicast datagram socket */
0110: int ucast_recv_buf_size = 64000;
0111:
0112: /** Usually, src addresses are nulled, and the receiver simply sets them to the address of the sender. However,
0113: * for multiple addresses on a Windows loopback device, this doesn't work
0114: * (see http://jira.jboss.com/jira/browse/JGRP-79 and the JGroups wiki for details). This must be the same
0115: * value for all members of the same group. Default is true, for performance reasons */
0116: // private boolean null_src_addresses=true;
0117:
0118: /**
0119: * Creates the UDP protocol, and initializes the
0120: * state variables, does however not start any sockets or threads.
0121: */
0122: public UDP() {
0123: }
0124:
0125: /**
0126: * Setup the Protocol instance acording to the configuration string.
0127: * The following properties are read by the UDP protocol:
0128: * <ul>
0129: * <li> param mcast_addr - the multicast address to use default is 228.8.8.8
0130: * <li> param mcast_port - (int) the port that the multicast is sent on default is 7600
0131: * <li> param ip_mcast - (boolean) flag whether to use IP multicast - default is true
0132: * <li> param ip_ttl - Set the default time-to-live for multicast packets sent out on this socket. default is 32
0133: * </ul>
0134: * @return true if no other properties are left.
0135: * false if the properties still have data in them, ie ,
0136: * properties are left over and not handled by the protocol stack
0137: */
0138: public boolean setProperties(Properties props) {
0139: String str;
0140:
0141: super .setProperties(props);
0142:
0143: str = props.getProperty("num_last_ports");
0144: if (str != null) {
0145: num_last_ports = Integer.parseInt(str);
0146: props.remove("num_last_ports");
0147: }
0148:
0149: str = Util.getProperty(new String[] { Global.UDP_MCAST_ADDR,
0150: "jboss.partition.udpGroup" }, props, "mcast_addr",
0151: false, "228.8.8.8");
0152: if (str != null)
0153: mcast_addr_name = str;
0154:
0155: str = Util.getProperty(new String[] { Global.UDP_MCAST_PORT,
0156: "jboss.partition.udpPort" }, props, "mcast_port",
0157: false, "7600");
0158: if (str != null)
0159: mcast_port = Integer.parseInt(str);
0160:
0161: str = props.getProperty("ip_mcast");
0162: if (str != null) {
0163: ip_mcast = Boolean.valueOf(str).booleanValue();
0164: props.remove("ip_mcast");
0165: }
0166:
0167: str = Util.getProperty(new String[] { Global.UDP_IP_TTL },
0168: props, "ip_ttl", false, "64");
0169: if (str != null) {
0170: ip_ttl = Integer.parseInt(str);
0171: props.remove("ip_ttl");
0172: }
0173:
0174: str = props.getProperty("tos");
0175: if (str != null) {
0176: tos = Integer.parseInt(str);
0177: props.remove("tos");
0178: }
0179:
0180: str = props.getProperty("mcast_send_buf_size");
0181: if (str != null) {
0182: mcast_send_buf_size = Integer.parseInt(str);
0183: props.remove("mcast_send_buf_size");
0184: }
0185:
0186: str = props.getProperty("mcast_recv_buf_size");
0187: if (str != null) {
0188: mcast_recv_buf_size = Integer.parseInt(str);
0189: props.remove("mcast_recv_buf_size");
0190: }
0191:
0192: str = props.getProperty("ucast_send_buf_size");
0193: if (str != null) {
0194: ucast_send_buf_size = Integer.parseInt(str);
0195: props.remove("ucast_send_buf_size");
0196: }
0197:
0198: str = props.getProperty("ucast_recv_buf_size");
0199: if (str != null) {
0200: ucast_recv_buf_size = Integer.parseInt(str);
0201: props.remove("ucast_recv_buf_size");
0202: }
0203:
0204: str = props.getProperty("null_src_addresses");
0205: if (str != null) {
0206: // null_src_addresses=Boolean.valueOf(str).booleanValue();
0207: props.remove("null_src_addresses");
0208: log
0209: .error("null_src_addresses has been deprecated, property will be ignored");
0210: }
0211:
0212: if (props.size() > 0) {
0213: log.error("the following properties are not recognized: "
0214: + props);
0215: return false;
0216: }
0217: return true;
0218: }
0219:
0220: /* ----------------------- Receiving of MCAST UDP packets ------------------------ */
0221:
0222: public void run() {
0223: DatagramPacket packet;
0224: byte receive_buf[] = new byte[65535];
0225: int offset, len, sender_port;
0226: byte[] data;
0227: InetAddress sender_addr;
0228: Address sender;
0229:
0230: // moved out of loop to avoid excessive object creations (bela March 8 2001)
0231: packet = new DatagramPacket(receive_buf, receive_buf.length);
0232:
0233: while (mcast_receiver != null && mcast_recv_sock != null) {
0234: try {
0235: packet.setData(receive_buf, 0, receive_buf.length);
0236: mcast_recv_sock.receive(packet);
0237: sender_addr = packet.getAddress();
0238: sender_port = packet.getPort();
0239: offset = packet.getOffset();
0240: len = packet.getLength();
0241: data = packet.getData();
0242: sender = new IpAddress(sender_addr, sender_port);
0243:
0244: if (len > receive_buf.length) {
0245: if (log.isErrorEnabled())
0246: log
0247: .error("size of the received packet ("
0248: + len
0249: + ") is bigger than "
0250: + "allocated buffer ("
0251: + receive_buf.length
0252: + "): will not be able to handle packet. "
0253: + "Use the FRAG protocol and make its frag_size lower than "
0254: + receive_buf.length);
0255: }
0256:
0257: receive(mcast_addr, sender, data, offset, len);
0258: } catch (SocketException sock_ex) {
0259: if (log.isTraceEnabled())
0260: log.trace("multicast socket is closed, exception="
0261: + sock_ex);
0262: break;
0263: } catch (InterruptedIOException io_ex) { // thread was interrupted
0264: } catch (Throwable ex) {
0265: if (log.isErrorEnabled())
0266: log.error("failure in multicast receive()", ex);
0267: Util.sleep(100); // so we don't get into 100% cpu spinning (should NEVER happen !)
0268: }
0269: }
0270: if (log.isDebugEnabled())
0271: log.debug("multicast thread terminated");
0272: }
0273:
0274: public String getInfo() {
0275: StringBuffer sb = new StringBuffer();
0276: sb.append("group_addr=").append(mcast_addr_name).append(':')
0277: .append(mcast_port).append("\n");
0278: return sb.toString();
0279: }
0280:
0281: public void sendToAllMembers(byte[] data, int offset, int length)
0282: throws Exception {
0283: if (ip_mcast && mcast_addr != null) {
0284: _send(mcast_addr.getIpAddress(), mcast_addr.getPort(),
0285: true, data, offset, length);
0286: } else {
0287: ArrayList mbrs = new ArrayList(members);
0288: IpAddress mbr;
0289: for (Iterator it = mbrs.iterator(); it.hasNext();) {
0290: mbr = (IpAddress) it.next();
0291: _send(mbr.getIpAddress(), mbr.getPort(), false, data,
0292: offset, length);
0293: }
0294: }
0295: }
0296:
0297: public void sendToSingleMember(Address dest, byte[] data,
0298: int offset, int length) throws Exception {
0299: _send(((IpAddress) dest).getIpAddress(), ((IpAddress) dest)
0300: .getPort(), false, data, offset, length);
0301: }
0302:
0303: public void postUnmarshalling(Message msg, Address dest,
0304: Address src, boolean multicast) {
0305: msg.setDest(dest);
0306: }
0307:
0308: public void postUnmarshallingList(Message msg, Address dest,
0309: boolean multicast) {
0310: msg.setDest(dest);
0311: }
0312:
0313: private void _send(InetAddress dest, int port, boolean mcast,
0314: byte[] data, int offset, int length) throws Exception {
0315: DatagramPacket packet = new DatagramPacket(data, offset,
0316: length, dest, port);
0317: try {
0318: if (mcast) {
0319: if (mcast_send_sock != null) {
0320: mcast_send_sock.send(packet);
0321: } else {
0322: if (mcast_send_sockets != null) {
0323: MulticastSocket s;
0324: for (int i = 0; i < mcast_send_sockets.length; i++) {
0325: s = mcast_send_sockets[i];
0326: try {
0327: s.send(packet);
0328: } catch (Exception e) {
0329: log
0330: .error("failed sending packet on socket "
0331: + s);
0332: }
0333: }
0334: } else {
0335: throw new Exception(
0336: "both mcast_send_sock and mcast_send_sockets are null");
0337: }
0338: }
0339: } else {
0340: if (sock != null)
0341: sock.send(packet);
0342: }
0343: } catch (Exception ex) {
0344: throw new Exception("dest=" + dest + ":" + port + " ("
0345: + length + " bytes)", ex);
0346: }
0347: }
0348:
0349: /* ------------------------------------------------------------------------------- */
0350:
0351: /*------------------------------ Protocol interface ------------------------------ */
0352:
0353: public String getName() {
0354: return "UDP";
0355: }
0356:
0357: /**
0358: * Creates the unicast and multicast sockets and starts the unicast and multicast receiver threads
0359: */
0360: public void start() throws Exception {
0361: if (log.isDebugEnabled())
0362: log.debug("creating sockets and starting threads");
0363: try {
0364: createSockets();
0365: } catch (Exception ex) {
0366: String tmp = "problem creating sockets (bind_addr="
0367: + bind_addr + ", mcast_addr=" + mcast_addr + ")";
0368: throw new Exception(tmp, ex);
0369: }
0370: super .start();
0371: startThreads();
0372: }
0373:
0374: public void stop() {
0375: if (log.isDebugEnabled())
0376: log.debug("closing sockets and stopping threads");
0377: stopThreads(); // will close sockets, closeSockets() is not really needed anymore, but...
0378: closeSockets(); // ... we'll leave it in there for now (doesn't do anything if already closed)
0379: super .stop();
0380: }
0381:
0382: /*--------------------------- End of Protocol interface -------------------------- */
0383:
0384: /* ------------------------------ Private Methods -------------------------------- */
0385:
0386: /**
0387: * Create UDP sender and receiver sockets. Currently there are 2 sockets
0388: * (sending and receiving). This is due to Linux's non-BSD compatibility
0389: * in the JDK port (see DESIGN).
0390: */
0391: private void createSockets() throws Exception {
0392: InetAddress tmp_addr;
0393:
0394: // bind_addr not set, try to assign one by default. This is needed on Windows
0395:
0396: // changed by bela Feb 12 2003: by default multicast sockets will be bound to all network interfaces
0397:
0398: // CHANGED *BACK* by bela March 13 2003: binding to all interfaces did not result in a correct
0399: // local_addr. As a matter of fact, comparison between e.g. 0.0.0.0:1234 (on hostA) and
0400: // 0.0.0.0:1.2.3.4 (on hostB) would fail !
0401: // if(bind_addr == null) {
0402: // InetAddress[] interfaces=InetAddress.getAllByName(InetAddress.getLocalHost().getHostAddress());
0403: // if(interfaces != null && interfaces.length > 0)
0404: // bind_addr=interfaces[0];
0405: // }
0406:
0407: if (bind_addr == null && !use_local_host) {
0408: bind_addr = Util.getFirstNonLoopbackAddress();
0409: }
0410: if (bind_addr == null)
0411: bind_addr = InetAddress.getLocalHost();
0412:
0413: if (bind_addr != null)
0414: if (log.isInfoEnabled())
0415: log.info("sockets will use interface "
0416: + bind_addr.getHostAddress());
0417:
0418: // 2. Create socket for receiving unicast UDP packets. The address and port
0419: // of this socket will be our local address (local_addr)
0420: if (bind_port > 0) {
0421: sock = createDatagramSocketWithBindPort();
0422: } else {
0423: sock = createEphemeralDatagramSocket();
0424: }
0425: if (tos > 0) {
0426: try {
0427: sock.setTrafficClass(tos);
0428: } catch (SocketException e) {
0429: log.warn("traffic class of " + tos
0430: + " could not be set, will be ignored", e);
0431: }
0432: }
0433:
0434: if (sock == null)
0435: throw new Exception("UDP.createSocket(): sock is null");
0436:
0437: local_addr = new IpAddress(sock.getLocalAddress(), sock
0438: .getLocalPort());
0439: if (additional_data != null)
0440: ((IpAddress) local_addr).setAdditionalData(additional_data);
0441:
0442: // 3. Create socket for receiving IP multicast packets
0443: if (ip_mcast) {
0444: // 3a. Create mcast receiver socket
0445: mcast_recv_sock = new MulticastSocket(mcast_port);
0446: mcast_recv_sock.setTimeToLive(ip_ttl);
0447: tmp_addr = InetAddress.getByName(mcast_addr_name);
0448: mcast_addr = new IpAddress(tmp_addr, mcast_port);
0449:
0450: if (receive_on_all_interfaces
0451: || (receive_interfaces != null && receive_interfaces
0452: .size() > 0)) {
0453: List interfaces;
0454: if (receive_interfaces != null)
0455: interfaces = receive_interfaces;
0456: else
0457: interfaces = Util.getAllAvailableInterfaces();
0458: bindToInterfaces(interfaces, mcast_recv_sock,
0459: mcast_addr.getIpAddress());
0460: } else {
0461: if (bind_addr != null)
0462: mcast_recv_sock.setInterface(bind_addr);
0463: mcast_recv_sock.joinGroup(tmp_addr);
0464: }
0465:
0466: // 3b. Create mcast sender socket
0467: if (send_on_all_interfaces
0468: || (send_interfaces != null && send_interfaces
0469: .size() > 0)) {
0470: List interfaces;
0471: NetworkInterface intf;
0472: if (send_interfaces != null)
0473: interfaces = send_interfaces;
0474: else
0475: interfaces = Util.getAllAvailableInterfaces();
0476: mcast_send_sockets = new MulticastSocket[interfaces
0477: .size()];
0478: int index = 0;
0479: for (Iterator it = interfaces.iterator(); it.hasNext();) {
0480: intf = (NetworkInterface) it.next();
0481: mcast_send_sockets[index] = new MulticastSocket();
0482: mcast_send_sockets[index].setNetworkInterface(intf);
0483: mcast_send_sockets[index].setTimeToLive(ip_ttl);
0484: if (tos > 0) {
0485: try {
0486: mcast_send_sockets[index]
0487: .setTrafficClass(tos);
0488: } catch (SocketException e) {
0489: log
0490: .warn(
0491: "traffic class of "
0492: + tos
0493: + " could not be set, will be ignored",
0494: e);
0495: }
0496: }
0497: index++;
0498: }
0499: } else {
0500: mcast_send_sock = new MulticastSocket();
0501: mcast_send_sock.setTimeToLive(ip_ttl);
0502: if (bind_addr != null)
0503: mcast_send_sock.setInterface(bind_addr);
0504:
0505: if (tos > 0) {
0506: try {
0507: mcast_send_sock.setTrafficClass(tos); // high throughput
0508: } catch (SocketException e) {
0509: log.warn("traffic class of " + tos
0510: + " could not be set, will be ignored",
0511: e);
0512: }
0513: }
0514: }
0515: }
0516:
0517: setBufferSizes();
0518: if (log.isInfoEnabled())
0519: log.info("socket information:\n" + dumpSocketInfo());
0520: }
0521:
0522: // private void bindToAllInterfaces(MulticastSocket s, InetAddress mcastAddr) throws IOException {
0523: // SocketAddress tmp_mcast_addr=new InetSocketAddress(mcastAddr, mcast_port);
0524: // Enumeration en=NetworkInterface.getNetworkInterfaces();
0525: // while(en.hasMoreElements()) {
0526: // NetworkInterface i=(NetworkInterface)en.nextElement();
0527: // for(Enumeration en2=i.getInetAddresses(); en2.hasMoreElements();) {
0528: // InetAddress addr=(InetAddress)en2.nextElement();
0529: // // if(addr.isLoopbackAddress())
0530: // // continue;
0531: // s.joinGroup(tmp_mcast_addr, i);
0532: // if(log.isTraceEnabled())
0533: // log.trace("joined " + tmp_mcast_addr + " on interface " + i.getName() + " (" + addr + ")");
0534: // break;
0535: // }
0536: // }
0537: // }
0538:
0539: /**
0540: *
0541: * @param interfaces List<NetworkInterface>. Guaranteed to have no duplicates
0542: * @param s
0543: * @param mcastAddr
0544: * @throws IOException
0545: */
0546: private void bindToInterfaces(List interfaces, MulticastSocket s,
0547: InetAddress mcastAddr) throws IOException {
0548: SocketAddress tmp_mcast_addr = new InetSocketAddress(mcastAddr,
0549: mcast_port);
0550: for (Iterator it = interfaces.iterator(); it.hasNext();) {
0551: NetworkInterface i = (NetworkInterface) it.next();
0552: for (Enumeration en2 = i.getInetAddresses(); en2
0553: .hasMoreElements();) {
0554: InetAddress addr = (InetAddress) en2.nextElement();
0555: s.joinGroup(tmp_mcast_addr, i);
0556: if (log.isTraceEnabled())
0557: log.trace("joined " + tmp_mcast_addr + " on "
0558: + i.getName() + " (" + addr + ")");
0559: break;
0560: }
0561: }
0562: }
0563:
0564: /** Creates a DatagramSocket with a random port. Because in certain operating systems, ports are reused,
0565: * we keep a list of the n last used ports, and avoid port reuse */
0566: protected DatagramSocket createEphemeralDatagramSocket()
0567: throws SocketException {
0568: DatagramSocket tmp;
0569: int localPort = 0;
0570: while (true) {
0571: tmp = new DatagramSocket(localPort, bind_addr); // first time localPort is 0
0572: if (num_last_ports <= 0)
0573: break;
0574: localPort = tmp.getLocalPort();
0575: if (last_ports_used == null)
0576: last_ports_used = new BoundedList(num_last_ports);
0577: if (last_ports_used.contains(new Integer(localPort))) {
0578: if (log.isDebugEnabled())
0579: log
0580: .debug("local port "
0581: + localPort
0582: + " already seen in this session; will try to get other port");
0583: try {
0584: tmp.close();
0585: } catch (Throwable e) {
0586: }
0587: localPort++;
0588: } else {
0589: last_ports_used.add(new Integer(localPort));
0590: break;
0591: }
0592: }
0593: return tmp;
0594: }
0595:
0596: /**
0597: * Creates a DatagramSocket when bind_port > 0. Attempts to allocate the socket with port == bind_port, and
0598: * increments until it finds a valid port, or until port_range has been exceeded
0599: * @return DatagramSocket The newly created socket
0600: * @throws Exception
0601: */
0602: protected DatagramSocket createDatagramSocketWithBindPort()
0603: throws Exception {
0604: DatagramSocket tmp = null;
0605: // 27-6-2003 bgooren, find available port in range (start_port, start_port+port_range)
0606: int rcv_port = bind_port, max_port = bind_port + port_range;
0607: while (rcv_port <= max_port) {
0608: try {
0609: tmp = new DatagramSocket(rcv_port, bind_addr);
0610: break;
0611: } catch (SocketException bind_ex) { // Cannot listen on this port
0612: rcv_port++;
0613: } catch (SecurityException sec_ex) { // Not allowed to listen on this port
0614: rcv_port++;
0615: }
0616:
0617: // Cannot listen at all, throw an Exception
0618: if (rcv_port >= max_port + 1) { // +1 due to the increment above
0619: throw new Exception(
0620: "cannot create a socket on any port in range "
0621: + bind_port + '-'
0622: + (bind_port + port_range));
0623: }
0624: }
0625: return tmp;
0626: }
0627:
0628: private String dumpSocketInfo() throws Exception {
0629: StringBuffer sb = new StringBuffer(128);
0630: sb.append("local_addr=").append(local_addr);
0631: sb.append(", mcast_addr=").append(mcast_addr);
0632: sb.append(", bind_addr=").append(bind_addr);
0633: sb.append(", ttl=").append(ip_ttl);
0634:
0635: if (sock != null) {
0636: sb.append("\nsock: bound to ");
0637: sb.append(sock.getLocalAddress().getHostAddress()).append(
0638: ':').append(sock.getLocalPort());
0639: sb.append(", receive buffer size=").append(
0640: sock.getReceiveBufferSize());
0641: sb.append(", send buffer size=").append(
0642: sock.getSendBufferSize());
0643: }
0644:
0645: if (mcast_recv_sock != null) {
0646: sb.append("\nmcast_recv_sock: bound to ");
0647: sb.append(mcast_recv_sock.getInterface().getHostAddress())
0648: .append(':').append(mcast_recv_sock.getLocalPort());
0649: sb.append(", send buffer size=").append(
0650: mcast_recv_sock.getSendBufferSize());
0651: sb.append(", receive buffer size=").append(
0652: mcast_recv_sock.getReceiveBufferSize());
0653: }
0654:
0655: if (mcast_send_sock != null) {
0656: sb.append("\nmcast_send_sock: bound to ");
0657: sb.append(mcast_send_sock.getInterface().getHostAddress())
0658: .append(':').append(mcast_send_sock.getLocalPort());
0659: sb.append(", send buffer size=").append(
0660: mcast_send_sock.getSendBufferSize());
0661: sb.append(", receive buffer size=").append(
0662: mcast_send_sock.getReceiveBufferSize());
0663: }
0664: if (mcast_send_sockets != null) {
0665: sb.append("\n").append(mcast_send_sockets.length).append(
0666: " mcast send sockets:\n");
0667: MulticastSocket s;
0668: for (int i = 0; i < mcast_send_sockets.length; i++) {
0669: s = mcast_send_sockets[i];
0670: sb.append(s.getInterface().getHostAddress())
0671: .append(':').append(s.getLocalPort());
0672: sb.append(", send buffer size=").append(
0673: s.getSendBufferSize());
0674: sb.append(", receive buffer size=").append(
0675: s.getReceiveBufferSize()).append("\n");
0676: }
0677: }
0678: return sb.toString();
0679: }
0680:
0681: void setBufferSizes() {
0682: if (sock != null)
0683: setBufferSize(sock, ucast_send_buf_size,
0684: ucast_recv_buf_size);
0685:
0686: if (mcast_recv_sock != null)
0687: setBufferSize(mcast_recv_sock, mcast_send_buf_size,
0688: mcast_recv_buf_size);
0689:
0690: if (mcast_send_sock != null)
0691: setBufferSize(mcast_send_sock, mcast_send_buf_size,
0692: mcast_recv_buf_size);
0693:
0694: if (mcast_send_sockets != null) {
0695: for (int i = 0; i < mcast_send_sockets.length; i++) {
0696: setBufferSize(mcast_send_sockets[i],
0697: mcast_send_buf_size, mcast_recv_buf_size);
0698: }
0699: }
0700: }
0701:
0702: private void setBufferSize(DatagramSocket sock, int send_buf_size,
0703: int recv_buf_size) {
0704: try {
0705: sock.setSendBufferSize(send_buf_size);
0706: } catch (Throwable ex) {
0707: if (log.isWarnEnabled())
0708: log.warn("failed setting send buffer size of "
0709: + send_buf_size + " in " + sock + ": " + ex);
0710: }
0711:
0712: try {
0713: sock.setReceiveBufferSize(recv_buf_size);
0714: } catch (Throwable ex) {
0715: if (log.isWarnEnabled())
0716: log.warn("failed setting receive buffer size of "
0717: + recv_buf_size + " in " + sock + ": " + ex);
0718: }
0719: }
0720:
0721: /**
0722: * Closed UDP unicast and multicast sockets
0723: */
0724: void closeSockets() {
0725: // 1. Close multicast socket
0726: closeMulticastSocket();
0727:
0728: // 2. Close socket
0729: closeSocket();
0730: }
0731:
0732: void closeMulticastSocket() {
0733: if (mcast_recv_sock != null) {
0734: try {
0735: if (mcast_addr != null) {
0736: mcast_recv_sock.leaveGroup(mcast_addr
0737: .getIpAddress());
0738: }
0739: mcast_recv_sock.close(); // this will cause the mcast receiver thread to break out of its loop
0740: mcast_recv_sock = null;
0741: if (log.isDebugEnabled())
0742: log.debug("multicast receive socket closed");
0743: } catch (IOException ex) {
0744: }
0745: mcast_addr = null;
0746: }
0747:
0748: if (mcast_send_sock != null) {
0749: mcast_send_sock.close();
0750: mcast_send_sock = null;
0751: if (log.isDebugEnabled())
0752: log.debug("multicast send socket closed");
0753: }
0754: if (mcast_send_sockets != null) {
0755: MulticastSocket s;
0756: for (int i = 0; i < mcast_send_sockets.length; i++) {
0757: s = mcast_send_sockets[i];
0758: s.close();
0759: if (log.isDebugEnabled())
0760: log.debug("multicast send socket " + s + " closed");
0761: }
0762: mcast_send_sockets = null;
0763: }
0764: }
0765:
0766: private void closeSocket() {
0767: if (sock != null) {
0768: sock.close();
0769: sock = null;
0770: if (log.isDebugEnabled())
0771: log.debug("socket closed");
0772: }
0773: }
0774:
0775: /**
0776: * Starts the unicast and multicast receiver threads
0777: */
0778: void startThreads() throws Exception {
0779: if (ucast_receiver == null) {
0780: //start the listener thread of the ucast_recv_sock
0781: ucast_receiver = new UcastReceiver();
0782: ucast_receiver.start();
0783: if (log.isDebugEnabled())
0784: log.debug("created unicast receiver thread");
0785: }
0786:
0787: if (ip_mcast) {
0788: if (mcast_receiver != null) {
0789: if (mcast_receiver.isAlive()) {
0790: if (log.isDebugEnabled())
0791: log
0792: .debug("did not create new multicastreceiver thread as existing "
0793: + "multicast receiver thread is still running");
0794: } else
0795: mcast_receiver = null; // will be created just below...
0796: }
0797:
0798: if (mcast_receiver == null) {
0799: mcast_receiver = new Thread(
0800: Util.getGlobalThreadGroup(), this ,
0801: "UDP mcast receiver");
0802: mcast_receiver.setPriority(Thread.MAX_PRIORITY); // needed ????
0803: mcast_receiver.setDaemon(true);
0804: mcast_receiver.start();
0805: }
0806: }
0807: }
0808:
0809: /**
0810: * Stops unicast and multicast receiver threads
0811: */
0812: void stopThreads() {
0813: Thread tmp;
0814:
0815: // 1. Stop the multicast receiver thread
0816: if (mcast_receiver != null) {
0817: if (mcast_receiver.isAlive()) {
0818: tmp = mcast_receiver;
0819: mcast_receiver = null;
0820: closeMulticastSocket(); // will cause the multicast thread to terminate
0821: tmp.interrupt();
0822: try {
0823: tmp.join(100);
0824: } catch (Exception e) {
0825: }
0826: tmp = null;
0827: }
0828: mcast_receiver = null;
0829: }
0830:
0831: // 2. Stop the unicast receiver thread
0832: if (ucast_receiver != null) {
0833: ucast_receiver.stop();
0834: ucast_receiver = null;
0835: }
0836: }
0837:
0838: protected void setThreadNames() {
0839: super .setThreadNames();
0840:
0841: if (channel_name != null) {
0842: String tmp, prefix = Global.THREAD_PREFIX;
0843: if (mcast_receiver != null) {
0844: tmp = mcast_receiver.getName();
0845: if (tmp != null && tmp.indexOf(prefix) == -1) {
0846: tmp += prefix + channel_name + ")";
0847: mcast_receiver.setName(tmp);
0848: }
0849: }
0850: if (ucast_receiver != null) {
0851: tmp = ucast_receiver.getName();
0852: if (tmp != null && tmp.indexOf(prefix) == -1) {
0853: tmp += prefix + channel_name + ")";
0854: ucast_receiver.setName(tmp);
0855: }
0856: }
0857: }
0858: }
0859:
0860: protected void unsetThreadNames() {
0861: super .unsetThreadNames();
0862: if (channel_name != null) {
0863: String tmp, prefix = Global.THREAD_PREFIX;
0864: int index;
0865:
0866: tmp = mcast_receiver != null ? mcast_receiver.getName()
0867: : null;
0868: if (tmp != null) {
0869: index = tmp.indexOf(prefix);
0870: if (index > -1) {
0871: tmp = tmp.substring(0, index);
0872: mcast_receiver.setName(tmp);
0873: }
0874: }
0875: tmp = ucast_receiver != null ? ucast_receiver.getName()
0876: : null;
0877: if (tmp != null) {
0878: index = tmp.indexOf(prefix);
0879: if (index > -1) {
0880: tmp = tmp.substring(0, index);
0881: ucast_receiver.setName(tmp);
0882: }
0883: }
0884: }
0885: }
0886:
0887: protected void handleConfigEvent(HashMap map) {
0888: boolean set_buffers = false;
0889: super .handleConfigEvent(map);
0890: if (map == null)
0891: return;
0892:
0893: if (map.containsKey("send_buf_size")) {
0894: mcast_send_buf_size = ((Integer) map.get("send_buf_size"))
0895: .intValue();
0896: ucast_send_buf_size = mcast_send_buf_size;
0897: set_buffers = true;
0898: }
0899: if (map.containsKey("recv_buf_size")) {
0900: mcast_recv_buf_size = ((Integer) map.get("recv_buf_size"))
0901: .intValue();
0902: ucast_recv_buf_size = mcast_recv_buf_size;
0903: set_buffers = true;
0904: }
0905: if (set_buffers)
0906: setBufferSizes();
0907: }
0908:
0909: /* ----------------------------- End of Private Methods ---------------------------------------- */
0910:
0911: /* ----------------------------- Inner Classes ---------------------------------------- */
0912:
0913: public class UcastReceiver implements Runnable {
0914: boolean running = true;
0915: Thread thread = null;
0916:
0917: String getName() {
0918: return thread != null ? thread.getName() : null;
0919: }
0920:
0921: void setName(String thread_name) {
0922: if (thread != null)
0923: thread.setName(thread_name);
0924: }
0925:
0926: public void start() {
0927: if (thread == null) {
0928: thread = new Thread(Util.getGlobalThreadGroup(), this ,
0929: "UDP.UcastReceiverThread");
0930: thread.setDaemon(true);
0931: running = true;
0932: thread.start();
0933: }
0934: }
0935:
0936: public void stop() {
0937: Thread tmp;
0938: if (thread != null && thread.isAlive()) {
0939: running = false;
0940: tmp = thread;
0941: thread = null;
0942: closeSocket(); // this will cause the thread to break out of its loop
0943: tmp.interrupt();
0944: try {
0945: tmp.join(500);
0946: } catch (InterruptedException e) {
0947: }
0948: tmp = null;
0949: }
0950: thread = null;
0951: }
0952:
0953: public void run() {
0954: DatagramPacket packet;
0955: byte receive_buf[] = new byte[65535];
0956: int offset, len;
0957: byte[] data;
0958: InetAddress sender_addr;
0959: int sender_port;
0960: Address sender;
0961:
0962: // moved out of loop to avoid excessive object creations (bela March 8 2001)
0963: packet = new DatagramPacket(receive_buf, receive_buf.length);
0964:
0965: while (running && thread != null && sock != null) {
0966: try {
0967: packet.setData(receive_buf, 0, receive_buf.length);
0968: sock.receive(packet);
0969: sender_addr = packet.getAddress();
0970: sender_port = packet.getPort();
0971: offset = packet.getOffset();
0972: len = packet.getLength();
0973: data = packet.getData();
0974: sender = new IpAddress(sender_addr, sender_port);
0975:
0976: if (len > receive_buf.length) {
0977: if (log.isErrorEnabled())
0978: log
0979: .error("size of the received packet ("
0980: + len
0981: + ") is bigger than allocated buffer ("
0982: + receive_buf.length
0983: + "): will not be able to handle packet. "
0984: + "Use the FRAG protocol and make its frag_size lower than "
0985: + receive_buf.length);
0986: }
0987: receive(local_addr, sender, data, offset, len);
0988: } catch (SocketException sock_ex) {
0989: if (log.isDebugEnabled())
0990: log
0991: .debug("unicast receiver socket is closed, exception="
0992: + sock_ex);
0993: break;
0994: } catch (InterruptedIOException io_ex) { // thread was interrupted
0995: } catch (Throwable ex) {
0996: if (log.isErrorEnabled())
0997: log.error("[" + local_addr
0998: + "] failed receiving unicast packet",
0999: ex);
1000: Util.sleep(100); // so we don't get into 100% cpu spinning (should NEVER happen !)
1001: }
1002: }
1003: if (log.isDebugEnabled())
1004: log.debug("unicast receiver thread terminated");
1005: }
1006: }
1007:
1008: }
|