001: // $Id: McastDiscovery.java,v 1.1 2005/06/23 13:31:10 belaban Exp $
002:
003: package org.jgroups.tests;
004:
005: import org.jgroups.util.Util;
006:
007: import java.io.Serializable;
008: import java.net.*;
009: import java.util.*;
010:
011: /**
012: * Discovers all neighbors in an IP multicast environment by using expanding ring multicasts (increasing TTL).
013: * The sender multicasts a discovery packet on all available network interfaces, while also listening on
014: * all interfaces. The discovery packet contains the sender's address, which is the address and port of the
015: * interface on which the packet was sent. A receiver replies with an ACK back to the sender's address and port.
016: * After n responses or m milliseconds, the sender terminates and computes the network interfaces which should be used.
017: * The network interface is the intersection of the interface variable of all ACKs received.
018: * @author Bela Ban July 26 2002
019: * @version $Revision: 1.1 $
020: */
021: public class McastDiscovery {
022: int ttl = 32;
023: List handlers = new ArrayList();
024: InetAddress mcast_addr = null;
025: int mcast_port = 5000;
026: long interval = 2000; // time between sends
027: McastSender mcast_sender = null;
028: boolean running = true;
029: HashMap map = new HashMap(); // keys=interface (InetAddress), values=List of receivers (InetAddress)
030:
031: class McastSender extends Thread {
032:
033: McastSender() {
034: super ();
035: setName("McastSender");
036: setDaemon(true);
037: }
038:
039: public void run() {
040: MessageHandler handler;
041: while (running) {
042: for (Iterator it = handlers.iterator(); it.hasNext();) {
043: handler = (MessageHandler) it.next();
044: handler.sendDiscoveryRequest(ttl);
045: }
046: try {
047: sleep(interval);
048: } catch (Exception ex) {
049: }
050: }
051: }
052: }
053:
054: public McastDiscovery(InetAddress mcast_addr, int mcast_port,
055: long interval, int ttl) {
056: this .mcast_addr = mcast_addr;
057: this .mcast_port = mcast_port;
058: this .interval = interval;
059: this .ttl = ttl;
060: }
061:
062: public void start() throws Exception {
063: NetworkInterface intf;
064: InetAddress bind_addr;
065: MessageHandler handler;
066:
067: for (Enumeration en = NetworkInterface.getNetworkInterfaces(); en
068: .hasMoreElements();) {
069: intf = (NetworkInterface) en.nextElement();
070: for (Enumeration en2 = intf.getInetAddresses(); en2
071: .hasMoreElements();) {
072: bind_addr = (InetAddress) en2.nextElement();
073: map.put(bind_addr, new ArrayList());
074: System.out.println("-- creating receiver for "
075: + bind_addr);
076: handler = new MessageHandler(bind_addr);
077: handlers.add(handler);
078: handler.start();
079: }
080: }
081:
082: // Now start sending mcast discovery messages:
083: mcast_sender = new McastSender();
084: mcast_sender.start();
085:
086: System.out.println("press key to stop");
087: System.out.flush();
088: System.in.read();
089:
090: printValidInterfaces();
091: }
092:
093: void printValidInterfaces() {
094: InetAddress bind_interface;
095: Map.Entry entry;
096: List all_mbrs = new ArrayList();
097: List l;
098: InetSocketAddress tmp_addr;
099: HashMap map_copy = (HashMap) map.clone();
100: SortedSet s;
101: Stack st;
102: Result r;
103:
104: System.out
105: .println("\n========================================================");
106: System.out
107: .println("Responses received ordered by interface:\n");
108: for (Iterator it = map.entrySet().iterator(); it.hasNext();) {
109: entry = (Map.Entry) it.next();
110: bind_interface = (InetAddress) entry.getKey();
111: System.out.println(bind_interface.getHostAddress() + ":\t "
112: + entry.getValue());
113: }
114:
115: for (Iterator it = map.values().iterator(); it.hasNext();) {
116: l = (List) it.next();
117: for (Iterator it2 = l.iterator(); it2.hasNext();) {
118: tmp_addr = (InetSocketAddress) it2.next();
119: if (!all_mbrs.contains(tmp_addr))
120: all_mbrs.add(tmp_addr);
121: }
122: }
123:
124: for (Iterator it = all_mbrs.iterator(); it.hasNext();) {
125: tmp_addr = (InetSocketAddress) it.next();
126:
127: // tmp_mbr has to be in all values (Lists) of map, remove entry from map if not
128: for (Iterator it2 = map.entrySet().iterator(); it2
129: .hasNext();) {
130: entry = (Map.Entry) it2.next();
131: l = (List) entry.getValue();
132:
133: if (!l.contains(tmp_addr)) {
134: //System.out.println("Member " + tmp_addr + " did not respond to interface " + entry.getKey() +
135: // ", removing interface");
136: it2.remove(); // remove the entry (key plus value) from map
137: }
138: }
139: }
140:
141: if (map.size() > 0)
142: System.out.println("\n-- Valid interfaces are "
143: + map.keySet() + '\n');
144: else {
145: System.out
146: .println("\nNo valid interfaces found, listing interfaces by number of responses/interface:\n"
147: + "(it is best to use the interface with the most responses)");
148:
149: s = new TreeSet();
150: for (Iterator it = map_copy.entrySet().iterator(); it
151: .hasNext();) {
152: entry = (Map.Entry) it.next();
153: r = new Result((InetAddress) entry.getKey(),
154: ((List) entry.getValue()).size());
155: s.add(r);
156: }
157:
158: st = new Stack();
159: for (Iterator it = s.iterator(); it.hasNext();) {
160: st.push(it.next());
161: }
162:
163: while (!st.empty())
164: System.out.println("-- " + st.pop());
165:
166: }
167:
168: System.out
169: .println("\nUse of any of the above interfaces in \"UDP(bind_addr=<interface>)\" will "
170: + "guarantee that the members will find each other");
171: System.out
172: .println("========================================================\n\n");
173: }
174:
175: class MessageHandler {
176: MulticastSocket mcast_sock = null; // for receiving mcast discovery messages and sending back unicast discovery responses
177: DatagramSocket sock = null;
178: McastReceiver mcast_receiver = null;
179: UcastReceiver ucast_receiver = null;
180: InetAddress local_addr = null;
181: int local_port = 0;
182:
183: class McastReceiver extends Thread {
184: byte[] buf;
185: DatagramPacket mcast_packet, rsp_packet;
186: DiscoveryRequest req;
187: DiscoveryResponse rsp;
188:
189: McastReceiver() {
190: super ();
191: setName("McastReceiver");
192: setDaemon(true);
193: }
194:
195: public void run() {
196: while (running) {
197: buf = new byte[16000];
198: mcast_packet = new DatagramPacket(buf, buf.length);
199: try {
200: mcast_sock.receive(mcast_packet);
201: req = (DiscoveryRequest) Util
202: .objectFromByteBuffer(mcast_packet
203: .getData());
204: System.out.println("<-- " + req);
205:
206: // send response back to req.sender_addr
207: rsp = new DiscoveryResponse(
208: new InetSocketAddress(local_addr,
209: local_port), req.sender_addr
210: .getAddress());
211: buf = Util.objectToByteBuffer(rsp);
212: rsp_packet = new DatagramPacket(buf,
213: buf.length, req.sender_addr);
214: sock.send(rsp_packet);
215: } catch (Exception ex) {
216: System.err.println("McastReceiver.run(): " + ex
217: + ", rsp_packet="
218: + rsp_packet.getSocketAddress()
219: + ", length=" + rsp_packet.getLength()
220: + " bytes");
221: ex.printStackTrace();
222: }
223: }
224: }
225: }
226:
227: class UcastReceiver extends Thread {
228:
229: UcastReceiver() {
230: super ();
231: setName("UcastReceiver");
232: setDaemon(true);
233: }
234:
235: public void run() {
236: DatagramPacket packet;
237: byte[] buf;
238: DiscoveryResponse rsp;
239: List l;
240: InetAddress bind_interface;
241: InetSocketAddress responder_addr;
242:
243: while (running) {
244: try {
245: buf = new byte[16000];
246: packet = new DatagramPacket(buf, buf.length);
247: sock.receive(packet);
248: rsp = (DiscoveryResponse) Util
249: .objectFromByteBuffer(packet.getData());
250: System.out.println("<== " + rsp);
251: bind_interface = rsp.interface_used;
252: l = (List) map.get(bind_interface);
253: if (l == null)
254: map.put(bind_interface,
255: (l = new ArrayList()));
256: responder_addr = rsp.discovery_responder;
257: if (!l.contains(responder_addr))
258: l.add(responder_addr);
259: } catch (Exception ex) {
260: System.err
261: .println("UcastReceiver.run(): " + ex);
262: }
263: }
264: }
265: }
266:
267: MessageHandler(InetAddress bind_interface) throws Exception {
268: mcast_sock = new MulticastSocket(mcast_port);
269: mcast_sock.setInterface(bind_interface);
270: mcast_sock.setTimeToLive(ttl);
271: mcast_sock.joinGroup(mcast_addr);
272: sock = new DatagramSocket(0, bind_interface);
273: local_addr = sock.getLocalAddress();
274: local_port = sock.getLocalPort();
275: }
276:
277: void start() throws Exception {
278: running = true;
279:
280: // 1. start listening on unicast socket. when discovery response received --> ad to map hashmap
281: ucast_receiver = new UcastReceiver();
282: ucast_receiver.start();
283:
284: // 2. start listening on mcast socket. when discovery request received --> send ack
285: mcast_receiver = new McastReceiver();
286: mcast_receiver.start();
287: }
288:
289: void stop() {
290: running = false;
291:
292: if (mcast_sock != null) {
293: mcast_sock.close();
294: mcast_sock = null;
295: }
296: if (sock != null) {
297: sock.close();
298: sock = null;
299: }
300: }
301:
302: void sendDiscoveryRequest(int ttl) {
303: DiscoveryRequest req;
304: byte[] buf;
305: DatagramPacket packet;
306:
307: req = new DiscoveryRequest(local_addr, local_port);
308: System.out.println("--> " + req);
309:
310: try {
311: buf = Util.objectToByteBuffer(req);
312: packet = new DatagramPacket(buf, buf.length,
313: mcast_addr, mcast_port);
314: mcast_sock.send(packet);
315: } catch (Exception ex) {
316: System.err
317: .println("McastDiscovery.sendDiscoveryRequest(): "
318: + ex);
319: }
320: }
321:
322: }
323:
324: public static void main(String[] args) {
325: int ttl = 32; // ttl to use for IP mcast packets
326: String mcast_addr = "228.8.8.8"; // multicast address to use
327: int mcast_port = 5000; // port to use for mcast socket
328: long interval = 2000; // time between mcast requests
329:
330: for (int i = 0; i < args.length; i++) {
331: if ("-help".equals(args[i])) {
332: help();
333: return;
334: }
335: if ("-mcast_addr".equals(args[i])) {
336: mcast_addr = args[++i];
337: continue;
338: }
339: if ("-mcast_port".equals(args[i])) {
340: mcast_port = Integer.parseInt(args[++i]);
341: continue;
342: }
343: if ("-interval".equals(args[i])) {
344: interval = Long.parseLong(args[++i]);
345: continue;
346: }
347: if ("-ttl".equals(args[i])) {
348: ttl = Integer.parseInt(args[++i]);
349: continue;
350: }
351: help();
352: return;
353: }
354:
355: try {
356: new McastDiscovery(InetAddress.getByName(mcast_addr),
357: mcast_port, interval, ttl).start();
358: } catch (Exception ex) {
359: ex.printStackTrace();
360: }
361: }
362:
363: static void help() {
364: System.out
365: .println("McastDiscovery [-mcast_addr <multicast address>] [-mcast_port <port>]"
366: + " [-interval <time between mcasts (msecs)>] [-ttl <ttl>]");
367: }
368:
369: }
370:
371: abstract class DiscoveryPacket implements Serializable {
372:
373: }
374:
375: class DiscoveryRequest extends DiscoveryPacket {
376: InetSocketAddress sender_addr = null;
377:
378: DiscoveryRequest(InetAddress addr, int port) {
379: sender_addr = new InetSocketAddress(addr, port);
380: }
381:
382: public String toString() {
383: return "DiscoveryRequest [sender_addr=" + sender_addr + ']';
384: }
385:
386: }
387:
388: class DiscoveryResponse extends DiscoveryPacket {
389: InetSocketAddress discovery_responder = null; // address of member who responds to discovery request
390: InetAddress interface_used = null;
391:
392: DiscoveryResponse(InetSocketAddress discovery_responder,
393: InetAddress interface_used) {
394: this .discovery_responder = discovery_responder;
395: this .interface_used = interface_used;
396: }
397:
398: public String toString() {
399: return "DiscoveryResponse [discovery_responder="
400: + discovery_responder + ", interface_used="
401: + interface_used + ']';
402: }
403: }
404:
405: class Result implements Comparable {
406: InetAddress bind_interface = null;
407: int num_responses = 0;
408:
409: Result(InetAddress bind_interface, int num_responses) {
410: this .bind_interface = bind_interface;
411: this .num_responses = num_responses;
412: }
413:
414: public int compareTo(Object other) {
415: Result oth = (Result) other;
416: return num_responses == oth.num_responses ? 0
417: : (num_responses < oth.num_responses ? -1 : 1);
418: }
419:
420: public String toString() {
421: return bind_interface.getHostAddress() + ":\t " + num_responses;
422: }
423: }
|