001: // $Id: TUNNEL.java,v 1.26.2.1 2007/04/27 08:03:52 belaban Exp $
002:
003: package org.jgroups.protocols;
004:
005: import org.jgroups.*;
006: import org.jgroups.stack.IpAddress;
007: import org.jgroups.stack.Protocol;
008: import org.jgroups.stack.RouterStub;
009: import org.jgroups.util.Util;
010:
011: import java.util.Enumeration;
012: import java.util.HashMap;
013: import java.util.Properties;
014: import java.util.Vector;
015: import java.net.InetAddress;
016: import java.net.UnknownHostException;
017:
018: /**
019: * Replacement for UDP. Instead of sending packets via UDP, a TCP connection is opened to a Router
020: * (using the RouterStub client-side stub),
021: * the IP address/port of which was given using channel properties <code>router_host</code> and
022: * <code>router_port</code>. All outgoing traffic is sent via this TCP socket to the Router which
023: * distributes it to all connected TUNNELs in this group. Incoming traffic received from Router will
024: * simply be passed up the stack.
025: * <p>A TUNNEL layer can be used to penetrate a firewall, most firewalls allow creating TCP connections
026: * to the outside world, however, they do not permit outside hosts to initiate a TCP connection to a host
027: * inside the firewall. Therefore, the connection created by the inside host is reused by Router to
028: * send traffic from an outside host to a host inside the firewall.
029: * @author Bela Ban
030: */
031: public class TUNNEL extends Protocol implements Runnable {
032: final Properties properties = null;
033: String channel_name = null;
034: final Vector members = new Vector();
035: String router_host = null;
036: int router_port = 0;
037: Address local_addr = null; // sock's local addr and local port
038: Thread receiver = null;
039: RouterStub stub = new RouterStub();
040: InetAddress bind_addr = null;
041: private final Object stub_mutex = new Object();
042:
043: /** If true, messages sent to self are treated specially: unicast messages are
044: * looped back immediately, multicast messages get a local copy first and -
045: * when the real copy arrives - it will be discarded. Useful for Window
046: * media (non)sense */
047: boolean loopback = true;
048:
049: private final Reconnector reconnector = new Reconnector();
050: private final Object reconnector_mutex = new Object();
051:
052: /** If set it will be added to <tt>local_addr</tt>. Used to implement
053: * for example transport independent addresses */
054: byte[] additional_data = null;
055:
056: /** time to wait in ms between reconnect attempts */
057: long reconnect_interval = 5000;
058:
059: public TUNNEL() {
060: }
061:
062: public String toString() {
063: return "Protocol TUNNEL(local_addr=" + local_addr + ')';
064: }
065:
066: public boolean isConnected() {
067: return stub.isConnected();
068: }
069:
070: public RouterStub getRouterStub() {
071: return stub;
072: }
073:
074: /*------------------------------ Protocol interface ------------------------------ */
075:
076: public String getName() {
077: return "TUNNEL";
078: }
079:
080: public void init() throws Exception {
081: super .init();
082: }
083:
084: public void start() throws Exception {
085: super .start();
086: local_addr = stub.getLocalAddress();
087: passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr));
088: }
089:
090: public void stop() {
091: if (receiver != null)
092: receiver = null;
093: teardownTunnel();
094: stopReconnector();
095: local_addr = null;
096: }
097:
098: /**
099: * This prevents the up-handler thread to be created, which essentially is superfluous:
100: * messages are received from the network rather than from a layer below.
101: * DON'T REMOVE !
102: */
103: public void startUpHandler() {
104: }
105:
106: /** Setup the Protocol instance acording to the configuration string */
107: public boolean setProperties(Properties props) {
108: String str;
109:
110: super .setProperties(props);
111: str = props.getProperty("router_host");
112: if (str != null) {
113: router_host = str;
114: props.remove("router_host");
115: }
116:
117: str = props.getProperty("router_port");
118: if (str != null) {
119: router_port = Integer.parseInt(str);
120: props.remove("router_port");
121: }
122:
123: if (log.isDebugEnabled()) {
124: log.debug("router_host=" + router_host + ";router_port="
125: + router_port);
126: }
127:
128: if (router_host == null || router_port == 0) {
129: if (log.isErrorEnabled()) {
130: log
131: .error("both router_host and router_port have to be set !");
132: return false;
133: }
134: }
135:
136: str = props.getProperty("reconnect_interval");
137: if (str != null) {
138: reconnect_interval = Long.parseLong(str);
139: props.remove("reconnect_interval");
140: }
141:
142: str = props.getProperty("loopback");
143: if (str != null) {
144: loopback = Boolean.valueOf(str).booleanValue();
145: props.remove("loopback");
146: }
147:
148: boolean ignore_systemprops = Util
149: .isBindAddressPropertyIgnored();
150: str = Util.getProperty(new String[] { Global.BIND_ADDR,
151: Global.BIND_ADDR_OLD }, props, "bind_addr",
152: ignore_systemprops, null);
153: if (str != null) {
154: try {
155: bind_addr = InetAddress.getByName(str);
156: } catch (UnknownHostException unknown) {
157: log.error("(bind_addr): host " + str + " not known");
158: return false;
159: }
160: props.remove("bind_addr");
161: }
162:
163: if (bind_addr != null)
164: stub.setBindAddress(bind_addr);
165:
166: if (props.size() > 0) {
167: StringBuffer sb = new StringBuffer();
168: for (Enumeration e = props.propertyNames(); e
169: .hasMoreElements();) {
170: sb.append(e.nextElement().toString());
171: if (e.hasMoreElements()) {
172: sb.append(", ");
173: }
174: }
175: if (log.isErrorEnabled())
176: log
177: .error("The following properties are not recognized: "
178: + sb);
179: return false;
180: }
181: return true;
182: }
183:
184: /** Caller by the layer above this layer. We just pass it on to the router. */
185: public void down(Event evt) {
186: Message msg;
187: TunnelHeader hdr;
188: Address dest;
189:
190: if (evt.getType() != Event.MSG) {
191: handleDownEvent(evt);
192: return;
193: }
194:
195: hdr = new TunnelHeader(channel_name);
196: msg = (Message) evt.getArg();
197: dest = msg.getDest();
198: msg.putHeader(getName(), hdr);
199:
200: if (msg.getSrc() == null)
201: msg.setSrc(local_addr);
202:
203: if (log.isTraceEnabled())
204: log.trace(msg + ", hdrs: " + msg.getHeaders());
205:
206: // Don't send if destination is local address. Instead, switch dst and src and put in up_queue.
207: // If multicast message, loopback a copy directly to us (but still multicast). Once we receive this,
208: // we will discard our own multicast message
209: if (loopback
210: && (dest == null || dest.equals(local_addr) || dest
211: .isMulticastAddress())) {
212: Message copy = msg.copy();
213: // copy.removeHeader(name); // we don't remove the header
214: copy.setSrc(local_addr);
215: // copy.setDest(dest);
216: evt = new Event(Event.MSG, copy);
217:
218: /* Because Protocol.up() is never called by this bottommost layer, we call up() directly in the observer.
219: This allows e.g. PerfObserver to get the time of reception of a message */
220: if (observer != null)
221: observer.up(evt, up_queue.size());
222: if (log.isTraceEnabled())
223: log.trace("looped back local message " + copy);
224: passUp(evt);
225: if (dest != null && !dest.isMulticastAddress())
226: return;
227: }
228:
229: if (!stub.isConnected()) {
230: startReconnector();
231: } else {
232: if (stub.send(msg, channel_name) == false) {
233: startReconnector();
234: }
235: }
236: }
237:
238: /** Creates a TCP connection to the router */
239: void createTunnel() throws Exception {
240: if (router_host == null || router_port == 0)
241: throw new Exception(
242: "router_host and/or router_port not set correctly; tunnel cannot be created");
243:
244: synchronized (stub_mutex) {
245: stub.connect(channel_name, router_host, router_port);
246: if (additional_data != null
247: && local_addr instanceof IpAddress)
248: ((IpAddress) local_addr)
249: .setAdditionalData(additional_data);
250: }
251: }
252:
253: /** Tears the TCP connection to the router down */
254: void teardownTunnel() {
255: stub.disconnect();
256: }
257:
258: /*--------------------------- End of Protocol interface -------------------------- */
259:
260: public void run() {
261: Message msg;
262:
263: while (receiver != null
264: && Thread.currentThread().equals(receiver)) {
265: try {
266: msg = stub.receive();
267: if (msg == null) {
268: if (receiver == null)
269: break;
270: if (log.isTraceEnabled())
271: log
272: .trace("received a null message. Trying to reconnect to router");
273: if (!stub.isConnected())
274: startReconnector();
275: Util.sleep(5000);
276: continue;
277: }
278: handleIncomingMessage(msg);
279: } catch (Exception e) {
280: if (receiver == null
281: || !Thread.currentThread().equals(receiver))
282: return;
283: else {
284: if (log.isTraceEnabled())
285: log.trace("exception in receiver thread", e);
286: }
287: }
288: }
289: }
290:
291: /* ------------------------------ Private methods -------------------------------- */
292:
293: public void handleIncomingMessage(Message msg) {
294: TunnelHeader hdr = (TunnelHeader) msg.removeHeader(getName());
295:
296: // discard my own multicast loopback copy
297: if (loopback) {
298: Address dst = msg.getDest();
299: Address src = msg.getSrc();
300:
301: if (dst != null && dst.isMulticastAddress() && src != null
302: && local_addr.equals(src)) {
303: if (log.isTraceEnabled())
304: log
305: .trace("discarded own loopback multicast packet");
306: return;
307: }
308: }
309:
310: if (log.isTraceEnabled())
311: log.trace(msg + ", hdrs: " + msg.getHeaders());
312:
313: /* Discard all messages destined for a channel with a different name */
314:
315: String ch_name = hdr != null ? hdr.channel_name : null;
316: if (ch_name != null && !channel_name.equals(ch_name))
317: return;
318:
319: passUp(new Event(Event.MSG, msg));
320: }
321:
322: void handleDownEvent(Event evt) {
323: if (log.isTraceEnabled())
324: log.trace(evt);
325:
326: switch (evt.getType()) {
327:
328: case Event.TMP_VIEW:
329: case Event.VIEW_CHANGE:
330: synchronized (members) {
331: members.removeAllElements();
332: Vector tmpvec = ((View) evt.getArg()).getMembers();
333: for (int i = 0; i < tmpvec.size(); i++)
334: members.addElement(tmpvec.elementAt(i));
335: }
336: break;
337:
338: case Event.GET_LOCAL_ADDRESS: // return local address -> Event(SET_LOCAL_ADDRESS, local)
339: passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr));
340: break;
341:
342: case Event.SET_LOCAL_ADDRESS:
343: local_addr = (Address) evt.getArg();
344: if (local_addr instanceof IpAddress
345: && additional_data != null)
346: ((IpAddress) local_addr)
347: .setAdditionalData(additional_data);
348: break;
349:
350: case Event.CONNECT:
351: channel_name = (String) evt.getArg();
352: if (stub == null) {
353: if (log.isErrorEnabled())
354: log.error("CONNECT: router stub is null!");
355: } else {
356: try {
357: createTunnel();
358: } catch (Exception e) {
359: if (log.isErrorEnabled())
360: log
361: .error("failed connecting to GossipRouter at "
362: + router_host
363: + ":"
364: + router_port);
365: break;
366: }
367: }
368:
369: receiver = new Thread(this , "TUNNEL receiver thread");
370: receiver.setDaemon(true);
371: receiver.start();
372:
373: passUp(new Event(Event.CONNECT_OK));
374: break;
375:
376: case Event.DISCONNECT:
377: if (receiver != null) {
378: receiver = null;
379: stub.disconnect();
380: }
381: teardownTunnel();
382: passUp(new Event(Event.DISCONNECT_OK));
383: passUp(new Event(Event.SET_LOCAL_ADDRESS, null));
384: local_addr = null;
385: break;
386:
387: case Event.CONFIG:
388: if (log.isDebugEnabled())
389: log.debug("received CONFIG event: " + evt.getArg());
390: handleConfigEvent((HashMap) evt.getArg());
391: break;
392: }
393: }
394:
395: private void startReconnector() {
396: synchronized (reconnector_mutex) {
397: reconnector.start();
398: }
399: }
400:
401: private void stopReconnector() {
402: synchronized (reconnector_mutex) {
403: reconnector.stop();
404: }
405: }
406:
407: void handleConfigEvent(HashMap map) {
408: if (map == null)
409: return;
410: if (map.containsKey("additional_data"))
411: additional_data = (byte[]) map.get("additional_data");
412: }
413:
414: /* ------------------------------------------------------------------------------- */
415:
416: private class Reconnector implements Runnable {
417: Thread my_thread = null;
418:
419: public void start() {
420: synchronized (this ) {
421: if (my_thread == null || !my_thread.isAlive()) {
422: my_thread = new Thread(this , "Reconnector");
423: my_thread.setDaemon(true);
424: my_thread.start();
425: }
426: }
427: }
428:
429: public void stop() {
430: synchronized (this ) {
431: my_thread = null;
432: }
433: }
434:
435: public void run() {
436: while (Thread.currentThread().equals(my_thread)) {
437: try {
438: stub.reconnect();
439: if (log.isTraceEnabled())
440: log.trace("reconnected");
441: break;
442: } catch (Exception e) {
443: }
444: Util.sleep(reconnect_interval);
445: }
446: }
447: }
448:
449: }
|