001: package org.jgroups.protocols;
002:
003: import org.jgroups.Event;
004: import org.jgroups.Global;
005: import org.jgroups.Message;
006: import org.jgroups.util.Buffer;
007: import org.jgroups.util.ExposedByteArrayOutputStream;
008: import org.jgroups.util.Util;
009:
010: import java.io.*;
011: import java.net.*;
012: import java.util.Enumeration;
013: import java.util.Map;
014: import java.util.Properties;
015:
016: /**
017: * Uses its own IP multicast socket to send and receive discovery requests/responses. Can be used in
018: * conjuntion with a non-UDP transport, e.g. TCP.<p>
019: * The discovery is <em>assymetric</em>: discovery requests are broadcast via the multicast socket, and
020: * received via the multicast socket by everyone in the group. However, the discovery responses are sent
021: * back via the regular transport (e.g. TCP) to the sender (discovery request contained sender's regular address,
022: * e.g. 192.168.0.2:7800).
023: * @author Bela Ban
024: * @version $Id: MPING.java,v 1.19.2.1 2007/04/27 08:03:51 belaban Exp $
025: */
026: public class MPING extends PING implements Runnable {
027: MulticastSocket mcast_sock = null;
028: Thread receiver = null;
029: InetAddress bind_addr = null;
030: boolean bind_to_all_interfaces = false;
031: int ip_ttl = 16;
032: InetAddress mcast_addr = null;
033: int mcast_port = 7555;
034:
035: /** Pre-allocated byte stream. Used for serializing datagram packets. Will grow as needed */
036: final ExposedByteArrayOutputStream out_stream = new ExposedByteArrayOutputStream(
037: 512);
038: byte receive_buf[] = new byte[1024];
039:
040: public String getName() {
041: return "MPING";
042: }
043:
044: public InetAddress getBindAddr() {
045: return bind_addr;
046: }
047:
048: public void setBindAddr(InetAddress bind_addr) {
049: this .bind_addr = bind_addr;
050: }
051:
052: public boolean isBindToAllInterfaces() {
053: return bind_to_all_interfaces;
054: }
055:
056: public void setBindToAllInterfaces(boolean bind_to_all_interfaces) {
057: this .bind_to_all_interfaces = bind_to_all_interfaces;
058: }
059:
060: public int getTTL() {
061: return ip_ttl;
062: }
063:
064: public void setTTL(int ip_ttl) {
065: this .ip_ttl = ip_ttl;
066: }
067:
068: public InetAddress getMcastAddr() {
069: return mcast_addr;
070: }
071:
072: public void setMcastAddr(InetAddress mcast_addr) {
073: this .mcast_addr = mcast_addr;
074: }
075:
076: public int getMcastPort() {
077: return mcast_port;
078: }
079:
080: public void setMcastPort(int mcast_port) {
081: this .mcast_port = mcast_port;
082: }
083:
084: public boolean setProperties(Properties props) {
085: boolean ignore_systemprops = Util
086: .isBindAddressPropertyIgnored();
087: String str = Util.getProperty(new String[] { Global.BIND_ADDR,
088: Global.BIND_ADDR_OLD }, props, "bind_addr",
089: ignore_systemprops, null);
090: if (str != null) {
091: try {
092: bind_addr = InetAddress.getByName(str);
093: } catch (UnknownHostException unknown) {
094: if (log.isFatalEnabled())
095: log
096: .fatal("(bind_addr): host " + str
097: + " not known");
098: return false;
099: }
100: props.remove("bind_addr");
101: }
102:
103: str = Util.getProperty(
104: new String[] { Global.MPING_MCAST_ADDR }, props,
105: "mcast_addr", false, "230.5.6.7");
106: if (str != null) {
107: try {
108: mcast_addr = InetAddress.getByName(str);
109: } catch (UnknownHostException e) {
110: log.error("could not resolve " + str, e);
111: return false;
112: }
113: props.remove("mcast_addr");
114: }
115:
116: str = Util.getProperty(
117: new String[] { Global.MPING_MCAST_PORT }, props,
118: "mcast_port", false, "7555");
119: if (str != null) {
120: mcast_port = Integer.parseInt(str);
121: props.remove("mcast_port");
122: }
123:
124: str = Util.getProperty(new String[] { Global.MPING_IP_TTL },
125: props, "ip_ttl", false, "16");
126: if (str != null) {
127: ip_ttl = Integer.parseInt(str);
128: props.remove("ip_ttl");
129: }
130:
131: str = props.getProperty("bind_to_all_interfaces");
132: if (str != null) {
133: bind_to_all_interfaces = new Boolean(str).booleanValue();
134: props.remove("bind_to_all_interfaces");
135: }
136:
137: if (mcast_addr == null) {
138: try {
139: mcast_addr = InetAddress.getByName("230.5.6.7");
140: } catch (UnknownHostException e) {
141: log.error("failed getting default mcast address", e);
142: return false;
143: }
144: }
145: return super .setProperties(props);
146: }
147:
148: public void up(Event evt) {
149: if (evt.getType() == Event.CONFIG) {
150: if (bind_addr == null) {
151: Map config = (Map) evt.getArg();
152: bind_addr = (InetAddress) config.get("bind_addr");
153: }
154: passUp(evt);
155: return;
156: }
157: super .up(evt);
158: }
159:
160: public void start() throws Exception {
161: mcast_sock = new MulticastSocket(mcast_port);
162: mcast_sock.setTimeToLive(ip_ttl);
163:
164: if (bind_to_all_interfaces) {
165: bindToAllInterfaces();
166: // interface for outgoing packets
167: if (bind_addr != null)
168: mcast_sock.setNetworkInterface(NetworkInterface
169: .getByInetAddress(bind_addr));
170: } else {
171: if (bind_addr == null) {
172: InetAddress[] interfaces = InetAddress
173: .getAllByName(InetAddress.getLocalHost()
174: .getHostAddress());
175: if (interfaces != null && interfaces.length > 0)
176: bind_addr = interfaces[0];
177: }
178: if (bind_addr == null)
179: bind_addr = InetAddress.getLocalHost();
180:
181: if (bind_addr != null)
182: if (log.isInfoEnabled())
183: log.info("sockets will use interface "
184: + bind_addr.getHostAddress());
185:
186: if (bind_addr != null) {
187: mcast_sock.setInterface(bind_addr);
188: // mcast_sock.setNetworkInterface(NetworkInterface.getByInetAddress(bind_addr)); // JDK 1.4 specific
189: }
190: mcast_sock.joinGroup(mcast_addr);
191: }
192:
193: startReceiver();
194: super .start();
195: }
196:
197: private void bindToAllInterfaces() throws IOException {
198: SocketAddress tmp_mcast_addr = new InetSocketAddress(
199: mcast_addr, mcast_port);
200: Enumeration en = NetworkInterface.getNetworkInterfaces();
201: while (en.hasMoreElements()) {
202: NetworkInterface i = (NetworkInterface) en.nextElement();
203: for (Enumeration en2 = i.getInetAddresses(); en2
204: .hasMoreElements();) {
205: InetAddress addr = (InetAddress) en2.nextElement();
206: // if(addr.isLoopbackAddress())
207: // continue;
208: mcast_sock.joinGroup(tmp_mcast_addr, i);
209: if (log.isTraceEnabled())
210: log.trace("joined " + tmp_mcast_addr
211: + " on interface " + i.getName() + " ("
212: + addr + ")");
213: break;
214: }
215: }
216: }
217:
218: private void startReceiver() {
219: if (receiver == null || !receiver.isAlive()) {
220: receiver = new Thread(Util.getGlobalThreadGroup(), this ,
221: "ReceiverThread");
222: receiver.setDaemon(true);
223: receiver.start();
224: if (log.isTraceEnabled())
225: log.trace("receiver thread started");
226: }
227: }
228:
229: public void stop() {
230: mcast_sock.close();
231: mcast_sock = null;
232: receiver = null;
233: super .stop();
234: }
235:
236: void sendMcastDiscoveryRequest(Message msg) {
237: Buffer buf;
238: DatagramPacket packet;
239: DataOutputStream out = null;
240:
241: try {
242: if (msg.getSrc() == null)
243: msg.setSrc(local_addr);
244: out_stream.reset();
245: out = new DataOutputStream(out_stream);
246: msg.writeTo(out);
247: out.flush(); // flushes contents to out_stream
248: buf = new Buffer(out_stream.getRawBuffer(), 0, out_stream
249: .size());
250: packet = new DatagramPacket(buf.getBuf(), buf.getOffset(),
251: buf.getLength(), mcast_addr, mcast_port);
252: mcast_sock.send(packet);
253: } catch (IOException ex) {
254: log.error("failed sending discovery request", ex);
255: } finally {
256: Util.close(out);
257: }
258: }
259:
260: public void run() {
261: DatagramPacket packet = new DatagramPacket(receive_buf,
262: receive_buf.length);
263: byte[] data;
264: ByteArrayInputStream inp_stream = null;
265: DataInputStream inp = null;
266: Message msg;
267:
268: while (mcast_sock != null && receiver != null
269: && Thread.currentThread().equals(receiver)) {
270: packet.setData(receive_buf, 0, receive_buf.length);
271: try {
272: mcast_sock.receive(packet);
273: data = packet.getData();
274: inp_stream = new ByteArrayInputStream(data, 0,
275: data.length);
276: inp = new DataInputStream(inp_stream);
277: msg = new Message();
278: msg.readFrom(inp);
279: up(new Event(Event.MSG, msg));
280: } catch (SocketException socketEx) {
281: break;
282: } catch (Exception ex) {
283: log.error("failed receiving packet", ex);
284: } finally {
285: closeInputStream(inp);
286: closeInputStream(inp_stream);
287: }
288: }
289: if (log.isTraceEnabled())
290: log.trace("receiver thread terminated");
291: }
292:
293: private void closeInputStream(InputStream inp) {
294: if (inp != null)
295: try {
296: inp.close();
297: } catch (IOException e) {
298: }
299: }
300: }
|