001: package org.jgroups.tests.perf.transports;
002:
003: import org.jgroups.stack.IpAddress;
004: import org.jgroups.tests.perf.Receiver;
005: import org.jgroups.tests.perf.Transport;
006:
007: import java.io.IOException;
008: import java.net.DatagramPacket;
009: import java.net.DatagramSocket;
010: import java.net.InetAddress;
011: import java.net.MulticastSocket;
012: import java.util.Properties;
013: import java.util.Map;
014:
015: /**
016: * @author Bela Ban Jan 22
017: * @author 2004
018: * @version $Id: UdpTransport.java,v 1.6 2005/07/26 11:50:21 belaban Exp $
019: */
020: public class UdpTransport implements Transport {
021: Receiver receiver = null;
022: Properties config = null;
023: InetAddress mcast_addr = null;
024: int mcast_port = 7500;
025: InetAddress bind_addr = null;
026: MulticastSocket mcast_sock = null;
027: DatagramSocket ucast_sock = null;
028: IpAddress local_addr = null;
029: ReceiverThread mcast_receiver = null;
030: ReceiverThread ucast_receiver = null;
031: int max_receiver_buffer_size = 500000;
032: int max_send_buffer_size = 500000;
033:
034: public UdpTransport() {
035: }
036:
037: public Object getLocalAddress() {
038: return local_addr;
039: }
040:
041: public void create(Properties properties) throws Exception {
042: this .config = properties;
043: String mcast_addr_str = System.getProperty("udp.mcast_addr",
044: config.getProperty("mcast_addr"));
045: if (mcast_addr_str == null)
046: mcast_addr_str = "228.8.8.8";
047: mcast_addr = InetAddress.getByName(mcast_addr_str);
048:
049: String bind_addr_str = System.getProperty("udp.bind_addr",
050: config.getProperty("bind_addr"));
051: if (bind_addr_str != null) {
052: bind_addr = InetAddress.getByName(bind_addr_str);
053: } else
054: bind_addr = InetAddress.getLocalHost();
055:
056: ucast_sock = new DatagramSocket(0, bind_addr);
057: ucast_sock.setReceiveBufferSize(max_receiver_buffer_size);
058: ucast_sock.setSendBufferSize(max_send_buffer_size);
059: mcast_sock = new MulticastSocket(mcast_port);
060: mcast_sock.setReceiveBufferSize(max_receiver_buffer_size);
061: mcast_sock.setSendBufferSize(max_send_buffer_size);
062: if (bind_addr != null)
063: mcast_sock.setInterface(bind_addr);
064: mcast_sock.joinGroup(mcast_addr);
065: local_addr = new IpAddress(ucast_sock.getLocalAddress(),
066: ucast_sock.getLocalPort());
067: System.out.println("-- local_addr is " + local_addr);
068: }
069:
070: public void start() throws Exception {
071: mcast_receiver = new ReceiverThread(mcast_sock);
072: ucast_receiver = new ReceiverThread(ucast_sock);
073: mcast_receiver.start();
074: ucast_receiver.start();
075: }
076:
077: public void stop() {
078: if (mcast_receiver != null)
079: mcast_receiver.stop();
080: if (ucast_receiver != null)
081: ucast_receiver.stop();
082: }
083:
084: public void destroy() {
085: if (mcast_sock != null)
086: mcast_sock.close();
087: if (ucast_sock != null)
088: ucast_sock.close();
089: }
090:
091: public void setReceiver(Receiver r) {
092: this .receiver = r;
093: }
094:
095: public Map dumpStats() {
096: return null;
097: }
098:
099: public void send(Object destination, byte[] payload)
100: throws Exception {
101: DatagramPacket p;
102: if (destination == null) {
103: p = new DatagramPacket(payload, payload.length, mcast_addr,
104: mcast_port);
105: } else {
106: IpAddress addr = (IpAddress) destination;
107: p = new DatagramPacket(payload, payload.length, addr
108: .getIpAddress(), addr.getPort());
109:
110: }
111: ucast_sock.send(p);
112: }
113:
114: class ReceiverThread implements Runnable {
115: DatagramSocket sock;
116: Thread t = null;
117:
118: ReceiverThread(DatagramSocket sock) {
119: this .sock = sock;
120: }
121:
122: void start() throws Exception {
123: t = new Thread(this , "ReceiverThread for "
124: + sock.getLocalAddress() + ':'
125: + sock.getLocalPort());
126: t.start();
127: }
128:
129: void stop() {
130: t = null;
131: if (sock != null)
132: sock.close();
133: }
134:
135: public void run() {
136: byte[] buf = new byte[128000];
137: DatagramPacket p;
138:
139: while (t != null) {
140: p = new DatagramPacket(buf, buf.length);
141: try {
142: sock.receive(p);
143: if (receiver != null) {
144: IpAddress addr = new IpAddress(p.getAddress(),
145: p.getPort());
146: receiver.receive(addr, p.getData());
147: }
148: } catch (IOException e) {
149: if (sock == null)
150: t = null;
151: }
152: }
153: t = null;
154: }
155: }
156: }
|