Source Code Cross Referenced for UDP.java in  » Net » JGroups-2.4.1-sp3 » org » jgroups » protocols » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » JGroups 2.4.1 sp3 » org.jgroups.protocols 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        package org.jgroups.protocols;
0002:
0003:        import org.jgroups.Address;
0004:        import org.jgroups.Message;
0005:        import org.jgroups.Global;
0006:        import org.jgroups.stack.IpAddress;
0007:        import org.jgroups.util.BoundedList;
0008:        import org.jgroups.util.Util;
0009:
0010:        import java.io.IOException;
0011:        import java.io.InterruptedIOException;
0012:        import java.net.*;
0013:        import java.util.*;
0014:
0015:        /**
0016:         * IP multicast transport based on UDP. Messages to the group (msg.dest == null) will
0017:         * be multicast (to all group members), whereas point-to-point messages
0018:         * (msg.dest != null) will be unicast to a single member. Uses a multicast and
0019:         * a unicast socket.<p>
0020:         * The following properties are read by the UDP protocol:
0021:         * <ul>
0022:         * <li> param mcast_addr - the multicast address to use; default is 228.8.8.8.
0023:         * <li> param mcast_port - (int) the port that the multicast is sent on; default is 7600
0024:         * <li> param ip_mcast - (boolean) flag whether to use IP multicast; default is true.
0025:         * <li> param ip_ttl - the default time-to-live for multicast packets sent out on this
0026:         * socket; default is 32.
0027:         * <li> param use_packet_handler - boolean, defaults to false.
0028:         * If set, the mcast and ucast receiver threads just put
0029:         * the datagram's payload (a byte buffer) into a queue, from where a separate thread
0030:         * will dequeue and handle them (unmarshal and pass up). This frees the receiver
0031:         * threads from having to do message unmarshalling; this time can now be spent
0032:         * receiving packets. If you have lots of retransmissions because of network
0033:         * input buffer overflow, consider setting this property to true.
0034:         * </ul>
0035:         * @author Bela Ban
0036:         * @version $Id: UDP.java,v 1.123.2.3 2007/04/27 08:03:51 belaban Exp $
0037:         */
0038:        public class UDP extends TP implements  Runnable {
0039:
0040:            /** Socket used for
0041:             * <ol>
0042:             * <li>sending unicast packets and
0043:             * <li>receiving unicast packets
0044:             * </ol>
0045:             * The address of this socket will be our local address (<tt>local_addr</tt>) */
0046:            DatagramSocket sock = null;
0047:
0048:            /**
0049:             * BoundedList<Integer> of the last 100 ports used. This is to avoid reusing a port for DatagramSocket
0050:             */
0051:            private static volatile BoundedList last_ports_used = null;
0052:
0053:            /** Maintain a list of local ports opened by DatagramSocket. If this is 0, this option is turned off.
0054:             * If bind_port is > 0, then this option will be ignored */
0055:            int num_last_ports = 100;
0056:
0057:            /** IP multicast socket for <em>receiving</em> multicast packets */
0058:            MulticastSocket mcast_recv_sock = null;
0059:
0060:            /** IP multicast socket for <em>sending</em> multicast packets */
0061:            MulticastSocket mcast_send_sock = null;
0062:
0063:            /** If we have multiple mcast send sockets, e.g. send_interfaces or send_on_all_interfaces enabled */
0064:            MulticastSocket[] mcast_send_sockets = null;
0065:
0066:            /**
0067:             * Traffic class for sending unicast and multicast datagrams.
0068:             * Valid values are (check {@link DatagramSocket#setTrafficClass(int)} );  for details):
0069:             * <UL>
0070:             * <LI><CODE>IPTOS_LOWCOST (0x02)</CODE>, <b>decimal 2</b></LI>
0071:             * <LI><CODE>IPTOS_RELIABILITY (0x04)</CODE><, <b>decimal 4</b>/LI>
0072:             * <LI><CODE>IPTOS_THROUGHPUT (0x08)</CODE>, <b>decimal 8</b></LI>
0073:             * <LI><CODE>IPTOS_LOWDELAY (0x10)</CODE>, <b>decimal</b> 16</LI>
0074:             * </UL>
0075:             */
0076:            int tos = 8; // valid values: 2, 4, 8 (default), 16
0077:
0078:            /** The multicast address (mcast address and port) this member uses */
0079:            IpAddress mcast_addr = null;
0080:
0081:            /** The multicast address used for sending and receiving packets */
0082:            String mcast_addr_name = "228.8.8.8";
0083:
0084:            /** The multicast port used for sending and receiving packets */
0085:            int mcast_port = 7600;
0086:
0087:            /** The multicast receiver thread */
0088:            Thread mcast_receiver = null;
0089:
0090:            /** The unicast receiver thread */
0091:            UcastReceiver ucast_receiver = null;
0092:
0093:            /** Whether to enable IP multicasting. If false, multiple unicast datagram
0094:             * packets are sent rather than one multicast packet */
0095:            boolean ip_mcast = true;
0096:
0097:            /** The time-to-live (TTL) for multicast datagram packets */
0098:            int ip_ttl = 64;
0099:
0100:            /** Send buffer size of the multicast datagram socket */
0101:            int mcast_send_buf_size = 32000;
0102:
0103:            /** Receive buffer size of the multicast datagram socket */
0104:            int mcast_recv_buf_size = 64000;
0105:
0106:            /** Send buffer size of the unicast datagram socket */
0107:            int ucast_send_buf_size = 32000;
0108:
0109:            /** Receive buffer size of the unicast datagram socket */
0110:            int ucast_recv_buf_size = 64000;
0111:
0112:            /** Usually, src addresses are nulled, and the receiver simply sets them to the address of the sender. However,
0113:             * for multiple addresses on a Windows loopback device, this doesn't work
0114:             * (see http://jira.jboss.com/jira/browse/JGRP-79 and the JGroups wiki for details). This must be the same
0115:             * value for all members of the same group. Default is true, for performance reasons */
0116:            // private boolean null_src_addresses=true;
0117:
0118:            /**
0119:             * Creates the UDP protocol, and initializes the
0120:             * state variables, does however not start any sockets or threads.
0121:             */
0122:            public UDP() {
0123:            }
0124:
0125:            /**
0126:             * Setup the Protocol instance acording to the configuration string.
0127:             * The following properties are read by the UDP protocol:
0128:             * <ul>
0129:             * <li> param mcast_addr - the multicast address to use default is 228.8.8.8
0130:             * <li> param mcast_port - (int) the port that the multicast is sent on default is 7600
0131:             * <li> param ip_mcast - (boolean) flag whether to use IP multicast - default is true
0132:             * <li> param ip_ttl - Set the default time-to-live for multicast packets sent out on this socket. default is 32
0133:             * </ul>
0134:             * @return true if no other properties are left.
0135:             *         false if the properties still have data in them, ie ,
0136:             *         properties are left over and not handled by the protocol stack
0137:             */
0138:            public boolean setProperties(Properties props) {
0139:                String str;
0140:
0141:                super .setProperties(props);
0142:
0143:                str = props.getProperty("num_last_ports");
0144:                if (str != null) {
0145:                    num_last_ports = Integer.parseInt(str);
0146:                    props.remove("num_last_ports");
0147:                }
0148:
0149:                str = Util.getProperty(new String[] { Global.UDP_MCAST_ADDR,
0150:                        "jboss.partition.udpGroup" }, props, "mcast_addr",
0151:                        false, "228.8.8.8");
0152:                if (str != null)
0153:                    mcast_addr_name = str;
0154:
0155:                str = Util.getProperty(new String[] { Global.UDP_MCAST_PORT,
0156:                        "jboss.partition.udpPort" }, props, "mcast_port",
0157:                        false, "7600");
0158:                if (str != null)
0159:                    mcast_port = Integer.parseInt(str);
0160:
0161:                str = props.getProperty("ip_mcast");
0162:                if (str != null) {
0163:                    ip_mcast = Boolean.valueOf(str).booleanValue();
0164:                    props.remove("ip_mcast");
0165:                }
0166:
0167:                str = Util.getProperty(new String[] { Global.UDP_IP_TTL },
0168:                        props, "ip_ttl", false, "64");
0169:                if (str != null) {
0170:                    ip_ttl = Integer.parseInt(str);
0171:                    props.remove("ip_ttl");
0172:                }
0173:
0174:                str = props.getProperty("tos");
0175:                if (str != null) {
0176:                    tos = Integer.parseInt(str);
0177:                    props.remove("tos");
0178:                }
0179:
0180:                str = props.getProperty("mcast_send_buf_size");
0181:                if (str != null) {
0182:                    mcast_send_buf_size = Integer.parseInt(str);
0183:                    props.remove("mcast_send_buf_size");
0184:                }
0185:
0186:                str = props.getProperty("mcast_recv_buf_size");
0187:                if (str != null) {
0188:                    mcast_recv_buf_size = Integer.parseInt(str);
0189:                    props.remove("mcast_recv_buf_size");
0190:                }
0191:
0192:                str = props.getProperty("ucast_send_buf_size");
0193:                if (str != null) {
0194:                    ucast_send_buf_size = Integer.parseInt(str);
0195:                    props.remove("ucast_send_buf_size");
0196:                }
0197:
0198:                str = props.getProperty("ucast_recv_buf_size");
0199:                if (str != null) {
0200:                    ucast_recv_buf_size = Integer.parseInt(str);
0201:                    props.remove("ucast_recv_buf_size");
0202:                }
0203:
0204:                str = props.getProperty("null_src_addresses");
0205:                if (str != null) {
0206:                    // null_src_addresses=Boolean.valueOf(str).booleanValue();
0207:                    props.remove("null_src_addresses");
0208:                    log
0209:                            .error("null_src_addresses has been deprecated, property will be ignored");
0210:                }
0211:
0212:                if (props.size() > 0) {
0213:                    log.error("the following properties are not recognized: "
0214:                            + props);
0215:                    return false;
0216:                }
0217:                return true;
0218:            }
0219:
0220:            /* ----------------------- Receiving of MCAST UDP packets ------------------------ */
0221:
0222:            public void run() {
0223:                DatagramPacket packet;
0224:                byte receive_buf[] = new byte[65535];
0225:                int offset, len, sender_port;
0226:                byte[] data;
0227:                InetAddress sender_addr;
0228:                Address sender;
0229:
0230:                // moved out of loop to avoid excessive object creations (bela March 8 2001)
0231:                packet = new DatagramPacket(receive_buf, receive_buf.length);
0232:
0233:                while (mcast_receiver != null && mcast_recv_sock != null) {
0234:                    try {
0235:                        packet.setData(receive_buf, 0, receive_buf.length);
0236:                        mcast_recv_sock.receive(packet);
0237:                        sender_addr = packet.getAddress();
0238:                        sender_port = packet.getPort();
0239:                        offset = packet.getOffset();
0240:                        len = packet.getLength();
0241:                        data = packet.getData();
0242:                        sender = new IpAddress(sender_addr, sender_port);
0243:
0244:                        if (len > receive_buf.length) {
0245:                            if (log.isErrorEnabled())
0246:                                log
0247:                                        .error("size of the received packet ("
0248:                                                + len
0249:                                                + ") is bigger than "
0250:                                                + "allocated buffer ("
0251:                                                + receive_buf.length
0252:                                                + "): will not be able to handle packet. "
0253:                                                + "Use the FRAG protocol and make its frag_size lower than "
0254:                                                + receive_buf.length);
0255:                        }
0256:
0257:                        receive(mcast_addr, sender, data, offset, len);
0258:                    } catch (SocketException sock_ex) {
0259:                        if (log.isTraceEnabled())
0260:                            log.trace("multicast socket is closed, exception="
0261:                                    + sock_ex);
0262:                        break;
0263:                    } catch (InterruptedIOException io_ex) { // thread was interrupted
0264:                    } catch (Throwable ex) {
0265:                        if (log.isErrorEnabled())
0266:                            log.error("failure in multicast receive()", ex);
0267:                        Util.sleep(100); // so we don't get into 100% cpu spinning (should NEVER happen !)
0268:                    }
0269:                }
0270:                if (log.isDebugEnabled())
0271:                    log.debug("multicast thread terminated");
0272:            }
0273:
0274:            public String getInfo() {
0275:                StringBuffer sb = new StringBuffer();
0276:                sb.append("group_addr=").append(mcast_addr_name).append(':')
0277:                        .append(mcast_port).append("\n");
0278:                return sb.toString();
0279:            }
0280:
0281:            public void sendToAllMembers(byte[] data, int offset, int length)
0282:                    throws Exception {
0283:                if (ip_mcast && mcast_addr != null) {
0284:                    _send(mcast_addr.getIpAddress(), mcast_addr.getPort(),
0285:                            true, data, offset, length);
0286:                } else {
0287:                    ArrayList mbrs = new ArrayList(members);
0288:                    IpAddress mbr;
0289:                    for (Iterator it = mbrs.iterator(); it.hasNext();) {
0290:                        mbr = (IpAddress) it.next();
0291:                        _send(mbr.getIpAddress(), mbr.getPort(), false, data,
0292:                                offset, length);
0293:                    }
0294:                }
0295:            }
0296:
0297:            public void sendToSingleMember(Address dest, byte[] data,
0298:                    int offset, int length) throws Exception {
0299:                _send(((IpAddress) dest).getIpAddress(), ((IpAddress) dest)
0300:                        .getPort(), false, data, offset, length);
0301:            }
0302:
0303:            public void postUnmarshalling(Message msg, Address dest,
0304:                    Address src, boolean multicast) {
0305:                msg.setDest(dest);
0306:            }
0307:
0308:            public void postUnmarshallingList(Message msg, Address dest,
0309:                    boolean multicast) {
0310:                msg.setDest(dest);
0311:            }
0312:
0313:            private void _send(InetAddress dest, int port, boolean mcast,
0314:                    byte[] data, int offset, int length) throws Exception {
0315:                DatagramPacket packet = new DatagramPacket(data, offset,
0316:                        length, dest, port);
0317:                try {
0318:                    if (mcast) {
0319:                        if (mcast_send_sock != null) {
0320:                            mcast_send_sock.send(packet);
0321:                        } else {
0322:                            if (mcast_send_sockets != null) {
0323:                                MulticastSocket s;
0324:                                for (int i = 0; i < mcast_send_sockets.length; i++) {
0325:                                    s = mcast_send_sockets[i];
0326:                                    try {
0327:                                        s.send(packet);
0328:                                    } catch (Exception e) {
0329:                                        log
0330:                                                .error("failed sending packet on socket "
0331:                                                        + s);
0332:                                    }
0333:                                }
0334:                            } else {
0335:                                throw new Exception(
0336:                                        "both mcast_send_sock and mcast_send_sockets are null");
0337:                            }
0338:                        }
0339:                    } else {
0340:                        if (sock != null)
0341:                            sock.send(packet);
0342:                    }
0343:                } catch (Exception ex) {
0344:                    throw new Exception("dest=" + dest + ":" + port + " ("
0345:                            + length + " bytes)", ex);
0346:                }
0347:            }
0348:
0349:            /* ------------------------------------------------------------------------------- */
0350:
0351:            /*------------------------------ Protocol interface ------------------------------ */
0352:
0353:            public String getName() {
0354:                return "UDP";
0355:            }
0356:
0357:            /**
0358:             * Creates the unicast and multicast sockets and starts the unicast and multicast receiver threads
0359:             */
0360:            public void start() throws Exception {
0361:                if (log.isDebugEnabled())
0362:                    log.debug("creating sockets and starting threads");
0363:                try {
0364:                    createSockets();
0365:                } catch (Exception ex) {
0366:                    String tmp = "problem creating sockets (bind_addr="
0367:                            + bind_addr + ", mcast_addr=" + mcast_addr + ")";
0368:                    throw new Exception(tmp, ex);
0369:                }
0370:                super .start();
0371:                startThreads();
0372:            }
0373:
0374:            public void stop() {
0375:                if (log.isDebugEnabled())
0376:                    log.debug("closing sockets and stopping threads");
0377:                stopThreads(); // will close sockets, closeSockets() is not really needed anymore, but...
0378:                closeSockets(); // ... we'll leave it in there for now (doesn't do anything if already closed)
0379:                super .stop();
0380:            }
0381:
0382:            /*--------------------------- End of Protocol interface -------------------------- */
0383:
0384:            /* ------------------------------ Private Methods -------------------------------- */
0385:
0386:            /**
0387:             * Create UDP sender and receiver sockets. Currently there are 2 sockets
0388:             * (sending and receiving). This is due to Linux's non-BSD compatibility
0389:             * in the JDK port (see DESIGN).
0390:             */
0391:            private void createSockets() throws Exception {
0392:                InetAddress tmp_addr;
0393:
0394:                // bind_addr not set, try to assign one by default. This is needed on Windows
0395:
0396:                // changed by bela Feb 12 2003: by default multicast sockets will be bound to all network interfaces
0397:
0398:                // CHANGED *BACK* by bela March 13 2003: binding to all interfaces did not result in a correct
0399:                // local_addr. As a matter of fact, comparison between e.g. 0.0.0.0:1234 (on hostA) and
0400:                // 0.0.0.0:1.2.3.4 (on hostB) would fail !
0401:                //        if(bind_addr == null) {
0402:                //            InetAddress[] interfaces=InetAddress.getAllByName(InetAddress.getLocalHost().getHostAddress());
0403:                //            if(interfaces != null && interfaces.length > 0)
0404:                //                bind_addr=interfaces[0];
0405:                //        }
0406:
0407:                if (bind_addr == null && !use_local_host) {
0408:                    bind_addr = Util.getFirstNonLoopbackAddress();
0409:                }
0410:                if (bind_addr == null)
0411:                    bind_addr = InetAddress.getLocalHost();
0412:
0413:                if (bind_addr != null)
0414:                    if (log.isInfoEnabled())
0415:                        log.info("sockets will use interface "
0416:                                + bind_addr.getHostAddress());
0417:
0418:                // 2. Create socket for receiving unicast UDP packets. The address and port
0419:                //    of this socket will be our local address (local_addr)
0420:                if (bind_port > 0) {
0421:                    sock = createDatagramSocketWithBindPort();
0422:                } else {
0423:                    sock = createEphemeralDatagramSocket();
0424:                }
0425:                if (tos > 0) {
0426:                    try {
0427:                        sock.setTrafficClass(tos);
0428:                    } catch (SocketException e) {
0429:                        log.warn("traffic class of " + tos
0430:                                + " could not be set, will be ignored", e);
0431:                    }
0432:                }
0433:
0434:                if (sock == null)
0435:                    throw new Exception("UDP.createSocket(): sock is null");
0436:
0437:                local_addr = new IpAddress(sock.getLocalAddress(), sock
0438:                        .getLocalPort());
0439:                if (additional_data != null)
0440:                    ((IpAddress) local_addr).setAdditionalData(additional_data);
0441:
0442:                // 3. Create socket for receiving IP multicast packets
0443:                if (ip_mcast) {
0444:                    // 3a. Create mcast receiver socket
0445:                    mcast_recv_sock = new MulticastSocket(mcast_port);
0446:                    mcast_recv_sock.setTimeToLive(ip_ttl);
0447:                    tmp_addr = InetAddress.getByName(mcast_addr_name);
0448:                    mcast_addr = new IpAddress(tmp_addr, mcast_port);
0449:
0450:                    if (receive_on_all_interfaces
0451:                            || (receive_interfaces != null && receive_interfaces
0452:                                    .size() > 0)) {
0453:                        List interfaces;
0454:                        if (receive_interfaces != null)
0455:                            interfaces = receive_interfaces;
0456:                        else
0457:                            interfaces = Util.getAllAvailableInterfaces();
0458:                        bindToInterfaces(interfaces, mcast_recv_sock,
0459:                                mcast_addr.getIpAddress());
0460:                    } else {
0461:                        if (bind_addr != null)
0462:                            mcast_recv_sock.setInterface(bind_addr);
0463:                        mcast_recv_sock.joinGroup(tmp_addr);
0464:                    }
0465:
0466:                    // 3b. Create mcast sender socket
0467:                    if (send_on_all_interfaces
0468:                            || (send_interfaces != null && send_interfaces
0469:                                    .size() > 0)) {
0470:                        List interfaces;
0471:                        NetworkInterface intf;
0472:                        if (send_interfaces != null)
0473:                            interfaces = send_interfaces;
0474:                        else
0475:                            interfaces = Util.getAllAvailableInterfaces();
0476:                        mcast_send_sockets = new MulticastSocket[interfaces
0477:                                .size()];
0478:                        int index = 0;
0479:                        for (Iterator it = interfaces.iterator(); it.hasNext();) {
0480:                            intf = (NetworkInterface) it.next();
0481:                            mcast_send_sockets[index] = new MulticastSocket();
0482:                            mcast_send_sockets[index].setNetworkInterface(intf);
0483:                            mcast_send_sockets[index].setTimeToLive(ip_ttl);
0484:                            if (tos > 0) {
0485:                                try {
0486:                                    mcast_send_sockets[index]
0487:                                            .setTrafficClass(tos);
0488:                                } catch (SocketException e) {
0489:                                    log
0490:                                            .warn(
0491:                                                    "traffic class of "
0492:                                                            + tos
0493:                                                            + " could not be set, will be ignored",
0494:                                                    e);
0495:                                }
0496:                            }
0497:                            index++;
0498:                        }
0499:                    } else {
0500:                        mcast_send_sock = new MulticastSocket();
0501:                        mcast_send_sock.setTimeToLive(ip_ttl);
0502:                        if (bind_addr != null)
0503:                            mcast_send_sock.setInterface(bind_addr);
0504:
0505:                        if (tos > 0) {
0506:                            try {
0507:                                mcast_send_sock.setTrafficClass(tos); // high throughput
0508:                            } catch (SocketException e) {
0509:                                log.warn("traffic class of " + tos
0510:                                        + " could not be set, will be ignored",
0511:                                        e);
0512:                            }
0513:                        }
0514:                    }
0515:                }
0516:
0517:                setBufferSizes();
0518:                if (log.isInfoEnabled())
0519:                    log.info("socket information:\n" + dumpSocketInfo());
0520:            }
0521:
0522:            //    private void bindToAllInterfaces(MulticastSocket s, InetAddress mcastAddr) throws IOException {
0523:            //        SocketAddress tmp_mcast_addr=new InetSocketAddress(mcastAddr, mcast_port);
0524:            //        Enumeration en=NetworkInterface.getNetworkInterfaces();
0525:            //        while(en.hasMoreElements()) {
0526:            //            NetworkInterface i=(NetworkInterface)en.nextElement();
0527:            //            for(Enumeration en2=i.getInetAddresses(); en2.hasMoreElements();) {
0528:            //                InetAddress addr=(InetAddress)en2.nextElement();
0529:            //                // if(addr.isLoopbackAddress())
0530:            //                // continue;
0531:            //                s.joinGroup(tmp_mcast_addr, i);
0532:            //                if(log.isTraceEnabled())
0533:            //                    log.trace("joined " + tmp_mcast_addr + " on interface " + i.getName() + " (" + addr + ")");
0534:            //                break;
0535:            //            }
0536:            //        }
0537:            //    }
0538:
0539:            /**
0540:             *
0541:             * @param interfaces List<NetworkInterface>. Guaranteed to have no duplicates
0542:             * @param s
0543:             * @param mcastAddr
0544:             * @throws IOException
0545:             */
0546:            private void bindToInterfaces(List interfaces, MulticastSocket s,
0547:                    InetAddress mcastAddr) throws IOException {
0548:                SocketAddress tmp_mcast_addr = new InetSocketAddress(mcastAddr,
0549:                        mcast_port);
0550:                for (Iterator it = interfaces.iterator(); it.hasNext();) {
0551:                    NetworkInterface i = (NetworkInterface) it.next();
0552:                    for (Enumeration en2 = i.getInetAddresses(); en2
0553:                            .hasMoreElements();) {
0554:                        InetAddress addr = (InetAddress) en2.nextElement();
0555:                        s.joinGroup(tmp_mcast_addr, i);
0556:                        if (log.isTraceEnabled())
0557:                            log.trace("joined " + tmp_mcast_addr + " on "
0558:                                    + i.getName() + " (" + addr + ")");
0559:                        break;
0560:                    }
0561:                }
0562:            }
0563:
0564:            /** Creates a DatagramSocket with a random port. Because in certain operating systems, ports are reused,
0565:             * we keep a list of the n last used ports, and avoid port reuse */
0566:            protected DatagramSocket createEphemeralDatagramSocket()
0567:                    throws SocketException {
0568:                DatagramSocket tmp;
0569:                int localPort = 0;
0570:                while (true) {
0571:                    tmp = new DatagramSocket(localPort, bind_addr); // first time localPort is 0
0572:                    if (num_last_ports <= 0)
0573:                        break;
0574:                    localPort = tmp.getLocalPort();
0575:                    if (last_ports_used == null)
0576:                        last_ports_used = new BoundedList(num_last_ports);
0577:                    if (last_ports_used.contains(new Integer(localPort))) {
0578:                        if (log.isDebugEnabled())
0579:                            log
0580:                                    .debug("local port "
0581:                                            + localPort
0582:                                            + " already seen in this session; will try to get other port");
0583:                        try {
0584:                            tmp.close();
0585:                        } catch (Throwable e) {
0586:                        }
0587:                        localPort++;
0588:                    } else {
0589:                        last_ports_used.add(new Integer(localPort));
0590:                        break;
0591:                    }
0592:                }
0593:                return tmp;
0594:            }
0595:
0596:            /**
0597:             * Creates a DatagramSocket when bind_port > 0. Attempts to allocate the socket with port == bind_port, and
0598:             * increments until it finds a valid port, or until port_range has been exceeded
0599:             * @return DatagramSocket The newly created socket
0600:             * @throws Exception
0601:             */
0602:            protected DatagramSocket createDatagramSocketWithBindPort()
0603:                    throws Exception {
0604:                DatagramSocket tmp = null;
0605:                // 27-6-2003 bgooren, find available port in range (start_port, start_port+port_range)
0606:                int rcv_port = bind_port, max_port = bind_port + port_range;
0607:                while (rcv_port <= max_port) {
0608:                    try {
0609:                        tmp = new DatagramSocket(rcv_port, bind_addr);
0610:                        break;
0611:                    } catch (SocketException bind_ex) { // Cannot listen on this port
0612:                        rcv_port++;
0613:                    } catch (SecurityException sec_ex) { // Not allowed to listen on this port
0614:                        rcv_port++;
0615:                    }
0616:
0617:                    // Cannot listen at all, throw an Exception
0618:                    if (rcv_port >= max_port + 1) { // +1 due to the increment above
0619:                        throw new Exception(
0620:                                "cannot create a socket on any port in range "
0621:                                        + bind_port + '-'
0622:                                        + (bind_port + port_range));
0623:                    }
0624:                }
0625:                return tmp;
0626:            }
0627:
0628:            private String dumpSocketInfo() throws Exception {
0629:                StringBuffer sb = new StringBuffer(128);
0630:                sb.append("local_addr=").append(local_addr);
0631:                sb.append(", mcast_addr=").append(mcast_addr);
0632:                sb.append(", bind_addr=").append(bind_addr);
0633:                sb.append(", ttl=").append(ip_ttl);
0634:
0635:                if (sock != null) {
0636:                    sb.append("\nsock: bound to ");
0637:                    sb.append(sock.getLocalAddress().getHostAddress()).append(
0638:                            ':').append(sock.getLocalPort());
0639:                    sb.append(", receive buffer size=").append(
0640:                            sock.getReceiveBufferSize());
0641:                    sb.append(", send buffer size=").append(
0642:                            sock.getSendBufferSize());
0643:                }
0644:
0645:                if (mcast_recv_sock != null) {
0646:                    sb.append("\nmcast_recv_sock: bound to ");
0647:                    sb.append(mcast_recv_sock.getInterface().getHostAddress())
0648:                            .append(':').append(mcast_recv_sock.getLocalPort());
0649:                    sb.append(", send buffer size=").append(
0650:                            mcast_recv_sock.getSendBufferSize());
0651:                    sb.append(", receive buffer size=").append(
0652:                            mcast_recv_sock.getReceiveBufferSize());
0653:                }
0654:
0655:                if (mcast_send_sock != null) {
0656:                    sb.append("\nmcast_send_sock: bound to ");
0657:                    sb.append(mcast_send_sock.getInterface().getHostAddress())
0658:                            .append(':').append(mcast_send_sock.getLocalPort());
0659:                    sb.append(", send buffer size=").append(
0660:                            mcast_send_sock.getSendBufferSize());
0661:                    sb.append(", receive buffer size=").append(
0662:                            mcast_send_sock.getReceiveBufferSize());
0663:                }
0664:                if (mcast_send_sockets != null) {
0665:                    sb.append("\n").append(mcast_send_sockets.length).append(
0666:                            " mcast send sockets:\n");
0667:                    MulticastSocket s;
0668:                    for (int i = 0; i < mcast_send_sockets.length; i++) {
0669:                        s = mcast_send_sockets[i];
0670:                        sb.append(s.getInterface().getHostAddress())
0671:                                .append(':').append(s.getLocalPort());
0672:                        sb.append(", send buffer size=").append(
0673:                                s.getSendBufferSize());
0674:                        sb.append(", receive buffer size=").append(
0675:                                s.getReceiveBufferSize()).append("\n");
0676:                    }
0677:                }
0678:                return sb.toString();
0679:            }
0680:
0681:            void setBufferSizes() {
0682:                if (sock != null)
0683:                    setBufferSize(sock, ucast_send_buf_size,
0684:                            ucast_recv_buf_size);
0685:
0686:                if (mcast_recv_sock != null)
0687:                    setBufferSize(mcast_recv_sock, mcast_send_buf_size,
0688:                            mcast_recv_buf_size);
0689:
0690:                if (mcast_send_sock != null)
0691:                    setBufferSize(mcast_send_sock, mcast_send_buf_size,
0692:                            mcast_recv_buf_size);
0693:
0694:                if (mcast_send_sockets != null) {
0695:                    for (int i = 0; i < mcast_send_sockets.length; i++) {
0696:                        setBufferSize(mcast_send_sockets[i],
0697:                                mcast_send_buf_size, mcast_recv_buf_size);
0698:                    }
0699:                }
0700:            }
0701:
0702:            private void setBufferSize(DatagramSocket sock, int send_buf_size,
0703:                    int recv_buf_size) {
0704:                try {
0705:                    sock.setSendBufferSize(send_buf_size);
0706:                } catch (Throwable ex) {
0707:                    if (log.isWarnEnabled())
0708:                        log.warn("failed setting send buffer size of "
0709:                                + send_buf_size + " in " + sock + ": " + ex);
0710:                }
0711:
0712:                try {
0713:                    sock.setReceiveBufferSize(recv_buf_size);
0714:                } catch (Throwable ex) {
0715:                    if (log.isWarnEnabled())
0716:                        log.warn("failed setting receive buffer size of "
0717:                                + recv_buf_size + " in " + sock + ": " + ex);
0718:                }
0719:            }
0720:
0721:            /**
0722:             * Closed UDP unicast and multicast sockets
0723:             */
0724:            void closeSockets() {
0725:                // 1. Close multicast socket
0726:                closeMulticastSocket();
0727:
0728:                // 2. Close socket
0729:                closeSocket();
0730:            }
0731:
0732:            void closeMulticastSocket() {
0733:                if (mcast_recv_sock != null) {
0734:                    try {
0735:                        if (mcast_addr != null) {
0736:                            mcast_recv_sock.leaveGroup(mcast_addr
0737:                                    .getIpAddress());
0738:                        }
0739:                        mcast_recv_sock.close(); // this will cause the mcast receiver thread to break out of its loop
0740:                        mcast_recv_sock = null;
0741:                        if (log.isDebugEnabled())
0742:                            log.debug("multicast receive socket closed");
0743:                    } catch (IOException ex) {
0744:                    }
0745:                    mcast_addr = null;
0746:                }
0747:
0748:                if (mcast_send_sock != null) {
0749:                    mcast_send_sock.close();
0750:                    mcast_send_sock = null;
0751:                    if (log.isDebugEnabled())
0752:                        log.debug("multicast send socket closed");
0753:                }
0754:                if (mcast_send_sockets != null) {
0755:                    MulticastSocket s;
0756:                    for (int i = 0; i < mcast_send_sockets.length; i++) {
0757:                        s = mcast_send_sockets[i];
0758:                        s.close();
0759:                        if (log.isDebugEnabled())
0760:                            log.debug("multicast send socket " + s + " closed");
0761:                    }
0762:                    mcast_send_sockets = null;
0763:                }
0764:            }
0765:
0766:            private void closeSocket() {
0767:                if (sock != null) {
0768:                    sock.close();
0769:                    sock = null;
0770:                    if (log.isDebugEnabled())
0771:                        log.debug("socket closed");
0772:                }
0773:            }
0774:
0775:            /**
0776:             * Starts the unicast and multicast receiver threads
0777:             */
0778:            void startThreads() throws Exception {
0779:                if (ucast_receiver == null) {
0780:                    //start the listener thread of the ucast_recv_sock
0781:                    ucast_receiver = new UcastReceiver();
0782:                    ucast_receiver.start();
0783:                    if (log.isDebugEnabled())
0784:                        log.debug("created unicast receiver thread");
0785:                }
0786:
0787:                if (ip_mcast) {
0788:                    if (mcast_receiver != null) {
0789:                        if (mcast_receiver.isAlive()) {
0790:                            if (log.isDebugEnabled())
0791:                                log
0792:                                        .debug("did not create new multicastreceiver thread as existing "
0793:                                                + "multicast receiver thread is still running");
0794:                        } else
0795:                            mcast_receiver = null; // will be created just below...
0796:                    }
0797:
0798:                    if (mcast_receiver == null) {
0799:                        mcast_receiver = new Thread(
0800:                                Util.getGlobalThreadGroup(), this ,
0801:                                "UDP mcast receiver");
0802:                        mcast_receiver.setPriority(Thread.MAX_PRIORITY); // needed ????
0803:                        mcast_receiver.setDaemon(true);
0804:                        mcast_receiver.start();
0805:                    }
0806:                }
0807:            }
0808:
0809:            /**
0810:             * Stops unicast and multicast receiver threads
0811:             */
0812:            void stopThreads() {
0813:                Thread tmp;
0814:
0815:                // 1. Stop the multicast receiver thread
0816:                if (mcast_receiver != null) {
0817:                    if (mcast_receiver.isAlive()) {
0818:                        tmp = mcast_receiver;
0819:                        mcast_receiver = null;
0820:                        closeMulticastSocket(); // will cause the multicast thread to terminate
0821:                        tmp.interrupt();
0822:                        try {
0823:                            tmp.join(100);
0824:                        } catch (Exception e) {
0825:                        }
0826:                        tmp = null;
0827:                    }
0828:                    mcast_receiver = null;
0829:                }
0830:
0831:                // 2. Stop the unicast receiver thread
0832:                if (ucast_receiver != null) {
0833:                    ucast_receiver.stop();
0834:                    ucast_receiver = null;
0835:                }
0836:            }
0837:
0838:            protected void setThreadNames() {
0839:                super .setThreadNames();
0840:
0841:                if (channel_name != null) {
0842:                    String tmp, prefix = Global.THREAD_PREFIX;
0843:                    if (mcast_receiver != null) {
0844:                        tmp = mcast_receiver.getName();
0845:                        if (tmp != null && tmp.indexOf(prefix) == -1) {
0846:                            tmp += prefix + channel_name + ")";
0847:                            mcast_receiver.setName(tmp);
0848:                        }
0849:                    }
0850:                    if (ucast_receiver != null) {
0851:                        tmp = ucast_receiver.getName();
0852:                        if (tmp != null && tmp.indexOf(prefix) == -1) {
0853:                            tmp += prefix + channel_name + ")";
0854:                            ucast_receiver.setName(tmp);
0855:                        }
0856:                    }
0857:                }
0858:            }
0859:
0860:            protected void unsetThreadNames() {
0861:                super .unsetThreadNames();
0862:                if (channel_name != null) {
0863:                    String tmp, prefix = Global.THREAD_PREFIX;
0864:                    int index;
0865:
0866:                    tmp = mcast_receiver != null ? mcast_receiver.getName()
0867:                            : null;
0868:                    if (tmp != null) {
0869:                        index = tmp.indexOf(prefix);
0870:                        if (index > -1) {
0871:                            tmp = tmp.substring(0, index);
0872:                            mcast_receiver.setName(tmp);
0873:                        }
0874:                    }
0875:                    tmp = ucast_receiver != null ? ucast_receiver.getName()
0876:                            : null;
0877:                    if (tmp != null) {
0878:                        index = tmp.indexOf(prefix);
0879:                        if (index > -1) {
0880:                            tmp = tmp.substring(0, index);
0881:                            ucast_receiver.setName(tmp);
0882:                        }
0883:                    }
0884:                }
0885:            }
0886:
0887:            protected void handleConfigEvent(HashMap map) {
0888:                boolean set_buffers = false;
0889:                super .handleConfigEvent(map);
0890:                if (map == null)
0891:                    return;
0892:
0893:                if (map.containsKey("send_buf_size")) {
0894:                    mcast_send_buf_size = ((Integer) map.get("send_buf_size"))
0895:                            .intValue();
0896:                    ucast_send_buf_size = mcast_send_buf_size;
0897:                    set_buffers = true;
0898:                }
0899:                if (map.containsKey("recv_buf_size")) {
0900:                    mcast_recv_buf_size = ((Integer) map.get("recv_buf_size"))
0901:                            .intValue();
0902:                    ucast_recv_buf_size = mcast_recv_buf_size;
0903:                    set_buffers = true;
0904:                }
0905:                if (set_buffers)
0906:                    setBufferSizes();
0907:            }
0908:
0909:            /* ----------------------------- End of Private Methods ---------------------------------------- */
0910:
0911:            /* ----------------------------- Inner Classes ---------------------------------------- */
0912:
0913:            public class UcastReceiver implements  Runnable {
0914:                boolean running = true;
0915:                Thread thread = null;
0916:
0917:                String getName() {
0918:                    return thread != null ? thread.getName() : null;
0919:                }
0920:
0921:                void setName(String thread_name) {
0922:                    if (thread != null)
0923:                        thread.setName(thread_name);
0924:                }
0925:
0926:                public void start() {
0927:                    if (thread == null) {
0928:                        thread = new Thread(Util.getGlobalThreadGroup(), this ,
0929:                                "UDP.UcastReceiverThread");
0930:                        thread.setDaemon(true);
0931:                        running = true;
0932:                        thread.start();
0933:                    }
0934:                }
0935:
0936:                public void stop() {
0937:                    Thread tmp;
0938:                    if (thread != null && thread.isAlive()) {
0939:                        running = false;
0940:                        tmp = thread;
0941:                        thread = null;
0942:                        closeSocket(); // this will cause the thread to break out of its loop
0943:                        tmp.interrupt();
0944:                        try {
0945:                            tmp.join(500);
0946:                        } catch (InterruptedException e) {
0947:                        }
0948:                        tmp = null;
0949:                    }
0950:                    thread = null;
0951:                }
0952:
0953:                public void run() {
0954:                    DatagramPacket packet;
0955:                    byte receive_buf[] = new byte[65535];
0956:                    int offset, len;
0957:                    byte[] data;
0958:                    InetAddress sender_addr;
0959:                    int sender_port;
0960:                    Address sender;
0961:
0962:                    // moved out of loop to avoid excessive object creations (bela March 8 2001)
0963:                    packet = new DatagramPacket(receive_buf, receive_buf.length);
0964:
0965:                    while (running && thread != null && sock != null) {
0966:                        try {
0967:                            packet.setData(receive_buf, 0, receive_buf.length);
0968:                            sock.receive(packet);
0969:                            sender_addr = packet.getAddress();
0970:                            sender_port = packet.getPort();
0971:                            offset = packet.getOffset();
0972:                            len = packet.getLength();
0973:                            data = packet.getData();
0974:                            sender = new IpAddress(sender_addr, sender_port);
0975:
0976:                            if (len > receive_buf.length) {
0977:                                if (log.isErrorEnabled())
0978:                                    log
0979:                                            .error("size of the received packet ("
0980:                                                    + len
0981:                                                    + ") is bigger than allocated buffer ("
0982:                                                    + receive_buf.length
0983:                                                    + "): will not be able to handle packet. "
0984:                                                    + "Use the FRAG protocol and make its frag_size lower than "
0985:                                                    + receive_buf.length);
0986:                            }
0987:                            receive(local_addr, sender, data, offset, len);
0988:                        } catch (SocketException sock_ex) {
0989:                            if (log.isDebugEnabled())
0990:                                log
0991:                                        .debug("unicast receiver socket is closed, exception="
0992:                                                + sock_ex);
0993:                            break;
0994:                        } catch (InterruptedIOException io_ex) { // thread was interrupted
0995:                        } catch (Throwable ex) {
0996:                            if (log.isErrorEnabled())
0997:                                log.error("[" + local_addr
0998:                                        + "] failed receiving unicast packet",
0999:                                        ex);
1000:                            Util.sleep(100); // so we don't get into 100% cpu spinning (should NEVER happen !)
1001:                        }
1002:                    }
1003:                    if (log.isDebugEnabled())
1004:                        log.debug("unicast receiver thread terminated");
1005:                }
1006:            }
1007:
1008:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.