Source Code Cross Referenced for TUNNEL.java in  » Net » JGroups-2.4.1-sp3 » org » jgroups » protocols » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » JGroups 2.4.1 sp3 » org.jgroups.protocols 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        // $Id: TUNNEL.java,v 1.26.2.1 2007/04/27 08:03:52 belaban Exp $
002:
003:        package org.jgroups.protocols;
004:
005:        import org.jgroups.*;
006:        import org.jgroups.stack.IpAddress;
007:        import org.jgroups.stack.Protocol;
008:        import org.jgroups.stack.RouterStub;
009:        import org.jgroups.util.Util;
010:
011:        import java.util.Enumeration;
012:        import java.util.HashMap;
013:        import java.util.Properties;
014:        import java.util.Vector;
015:        import java.net.InetAddress;
016:        import java.net.UnknownHostException;
017:
018:        /**
019:         * Replacement for UDP. Instead of sending packets via UDP, a TCP connection is opened to a Router
020:         * (using the RouterStub client-side stub),
021:         * the IP address/port of which was given using channel properties <code>router_host</code> and
022:         * <code>router_port</code>. All outgoing traffic is sent via this TCP socket to the Router which
023:         * distributes it to all connected TUNNELs in this group. Incoming traffic received from Router will
024:         * simply be passed up the stack.
025:         * <p>A TUNNEL layer can be used to penetrate a firewall, most firewalls allow creating TCP connections
026:         * to the outside world, however, they do not permit outside hosts to initiate a TCP connection to a host
027:         * inside the firewall. Therefore, the connection created by the inside host is reused by Router to
028:         * send traffic from an outside host to a host inside the firewall.
029:         * @author Bela Ban
030:         */
031:        public class TUNNEL extends Protocol implements  Runnable {
032:            final Properties properties = null;
033:            String channel_name = null;
034:            final Vector members = new Vector();
035:            String router_host = null;
036:            int router_port = 0;
037:            Address local_addr = null; // sock's local addr and local port
038:            Thread receiver = null;
039:            RouterStub stub = new RouterStub();
040:            InetAddress bind_addr = null;
041:            private final Object stub_mutex = new Object();
042:
043:            /** If true, messages sent to self are treated specially: unicast messages are
044:             * looped back immediately, multicast messages get a local copy first and -
045:             * when the real copy arrives - it will be discarded. Useful for Window
046:             * media (non)sense */
047:            boolean loopback = true;
048:
049:            private final Reconnector reconnector = new Reconnector();
050:            private final Object reconnector_mutex = new Object();
051:
052:            /** If set it will be added to <tt>local_addr</tt>. Used to implement
053:             * for example transport independent addresses */
054:            byte[] additional_data = null;
055:
056:            /** time to wait in ms between reconnect attempts */
057:            long reconnect_interval = 5000;
058:
059:            public TUNNEL() {
060:            }
061:
062:            public String toString() {
063:                return "Protocol TUNNEL(local_addr=" + local_addr + ')';
064:            }
065:
066:            public boolean isConnected() {
067:                return stub.isConnected();
068:            }
069:
070:            public RouterStub getRouterStub() {
071:                return stub;
072:            }
073:
074:            /*------------------------------ Protocol interface ------------------------------ */
075:
076:            public String getName() {
077:                return "TUNNEL";
078:            }
079:
080:            public void init() throws Exception {
081:                super .init();
082:            }
083:
084:            public void start() throws Exception {
085:                super .start();
086:                local_addr = stub.getLocalAddress();
087:                passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr));
088:            }
089:
090:            public void stop() {
091:                if (receiver != null)
092:                    receiver = null;
093:                teardownTunnel();
094:                stopReconnector();
095:                local_addr = null;
096:            }
097:
098:            /**
099:             * This prevents the up-handler thread to be created, which essentially is superfluous:
100:             * messages are received from the network rather than from a layer below.
101:             * DON'T REMOVE ! 
102:             */
103:            public void startUpHandler() {
104:            }
105:
106:            /** Setup the Protocol instance acording to the configuration string */
107:            public boolean setProperties(Properties props) {
108:                String str;
109:
110:                super .setProperties(props);
111:                str = props.getProperty("router_host");
112:                if (str != null) {
113:                    router_host = str;
114:                    props.remove("router_host");
115:                }
116:
117:                str = props.getProperty("router_port");
118:                if (str != null) {
119:                    router_port = Integer.parseInt(str);
120:                    props.remove("router_port");
121:                }
122:
123:                if (log.isDebugEnabled()) {
124:                    log.debug("router_host=" + router_host + ";router_port="
125:                            + router_port);
126:                }
127:
128:                if (router_host == null || router_port == 0) {
129:                    if (log.isErrorEnabled()) {
130:                        log
131:                                .error("both router_host and router_port have to be set !");
132:                        return false;
133:                    }
134:                }
135:
136:                str = props.getProperty("reconnect_interval");
137:                if (str != null) {
138:                    reconnect_interval = Long.parseLong(str);
139:                    props.remove("reconnect_interval");
140:                }
141:
142:                str = props.getProperty("loopback");
143:                if (str != null) {
144:                    loopback = Boolean.valueOf(str).booleanValue();
145:                    props.remove("loopback");
146:                }
147:
148:                boolean ignore_systemprops = Util
149:                        .isBindAddressPropertyIgnored();
150:                str = Util.getProperty(new String[] { Global.BIND_ADDR,
151:                        Global.BIND_ADDR_OLD }, props, "bind_addr",
152:                        ignore_systemprops, null);
153:                if (str != null) {
154:                    try {
155:                        bind_addr = InetAddress.getByName(str);
156:                    } catch (UnknownHostException unknown) {
157:                        log.error("(bind_addr): host " + str + " not known");
158:                        return false;
159:                    }
160:                    props.remove("bind_addr");
161:                }
162:
163:                if (bind_addr != null)
164:                    stub.setBindAddress(bind_addr);
165:
166:                if (props.size() > 0) {
167:                    StringBuffer sb = new StringBuffer();
168:                    for (Enumeration e = props.propertyNames(); e
169:                            .hasMoreElements();) {
170:                        sb.append(e.nextElement().toString());
171:                        if (e.hasMoreElements()) {
172:                            sb.append(", ");
173:                        }
174:                    }
175:                    if (log.isErrorEnabled())
176:                        log
177:                                .error("The following properties are not recognized: "
178:                                        + sb);
179:                    return false;
180:                }
181:                return true;
182:            }
183:
184:            /** Caller by the layer above this layer. We just pass it on to the router. */
185:            public void down(Event evt) {
186:                Message msg;
187:                TunnelHeader hdr;
188:                Address dest;
189:
190:                if (evt.getType() != Event.MSG) {
191:                    handleDownEvent(evt);
192:                    return;
193:                }
194:
195:                hdr = new TunnelHeader(channel_name);
196:                msg = (Message) evt.getArg();
197:                dest = msg.getDest();
198:                msg.putHeader(getName(), hdr);
199:
200:                if (msg.getSrc() == null)
201:                    msg.setSrc(local_addr);
202:
203:                if (log.isTraceEnabled())
204:                    log.trace(msg + ", hdrs: " + msg.getHeaders());
205:
206:                // Don't send if destination is local address. Instead, switch dst and src and put in up_queue.
207:                // If multicast message, loopback a copy directly to us (but still multicast). Once we receive this,
208:                // we will discard our own multicast message
209:                if (loopback
210:                        && (dest == null || dest.equals(local_addr) || dest
211:                                .isMulticastAddress())) {
212:                    Message copy = msg.copy();
213:                    // copy.removeHeader(name); // we don't remove the header
214:                    copy.setSrc(local_addr);
215:                    // copy.setDest(dest);
216:                    evt = new Event(Event.MSG, copy);
217:
218:                    /* Because Protocol.up() is never called by this bottommost layer, we call up() directly in the observer.
219:                       This allows e.g. PerfObserver to get the time of reception of a message */
220:                    if (observer != null)
221:                        observer.up(evt, up_queue.size());
222:                    if (log.isTraceEnabled())
223:                        log.trace("looped back local message " + copy);
224:                    passUp(evt);
225:                    if (dest != null && !dest.isMulticastAddress())
226:                        return;
227:                }
228:
229:                if (!stub.isConnected()) {
230:                    startReconnector();
231:                } else {
232:                    if (stub.send(msg, channel_name) == false) {
233:                        startReconnector();
234:                    }
235:                }
236:            }
237:
238:            /** Creates a TCP connection to the router */
239:            void createTunnel() throws Exception {
240:                if (router_host == null || router_port == 0)
241:                    throw new Exception(
242:                            "router_host and/or router_port not set correctly; tunnel cannot be created");
243:
244:                synchronized (stub_mutex) {
245:                    stub.connect(channel_name, router_host, router_port);
246:                    if (additional_data != null
247:                            && local_addr instanceof  IpAddress)
248:                        ((IpAddress) local_addr)
249:                                .setAdditionalData(additional_data);
250:                }
251:            }
252:
253:            /** Tears the TCP connection to the router down */
254:            void teardownTunnel() {
255:                stub.disconnect();
256:            }
257:
258:            /*--------------------------- End of Protocol interface -------------------------- */
259:
260:            public void run() {
261:                Message msg;
262:
263:                while (receiver != null
264:                        && Thread.currentThread().equals(receiver)) {
265:                    try {
266:                        msg = stub.receive();
267:                        if (msg == null) {
268:                            if (receiver == null)
269:                                break;
270:                            if (log.isTraceEnabled())
271:                                log
272:                                        .trace("received a null message. Trying to reconnect to router");
273:                            if (!stub.isConnected())
274:                                startReconnector();
275:                            Util.sleep(5000);
276:                            continue;
277:                        }
278:                        handleIncomingMessage(msg);
279:                    } catch (Exception e) {
280:                        if (receiver == null
281:                                || !Thread.currentThread().equals(receiver))
282:                            return;
283:                        else {
284:                            if (log.isTraceEnabled())
285:                                log.trace("exception in receiver thread", e);
286:                        }
287:                    }
288:                }
289:            }
290:
291:            /* ------------------------------ Private methods -------------------------------- */
292:
293:            public void handleIncomingMessage(Message msg) {
294:                TunnelHeader hdr = (TunnelHeader) msg.removeHeader(getName());
295:
296:                // discard my own multicast loopback copy
297:                if (loopback) {
298:                    Address dst = msg.getDest();
299:                    Address src = msg.getSrc();
300:
301:                    if (dst != null && dst.isMulticastAddress() && src != null
302:                            && local_addr.equals(src)) {
303:                        if (log.isTraceEnabled())
304:                            log
305:                                    .trace("discarded own loopback multicast packet");
306:                        return;
307:                    }
308:                }
309:
310:                if (log.isTraceEnabled())
311:                    log.trace(msg + ", hdrs: " + msg.getHeaders());
312:
313:                /* Discard all messages destined for a channel with a different name */
314:
315:                String ch_name = hdr != null ? hdr.channel_name : null;
316:                if (ch_name != null && !channel_name.equals(ch_name))
317:                    return;
318:
319:                passUp(new Event(Event.MSG, msg));
320:            }
321:
322:            void handleDownEvent(Event evt) {
323:                if (log.isTraceEnabled())
324:                    log.trace(evt);
325:
326:                switch (evt.getType()) {
327:
328:                case Event.TMP_VIEW:
329:                case Event.VIEW_CHANGE:
330:                    synchronized (members) {
331:                        members.removeAllElements();
332:                        Vector tmpvec = ((View) evt.getArg()).getMembers();
333:                        for (int i = 0; i < tmpvec.size(); i++)
334:                            members.addElement(tmpvec.elementAt(i));
335:                    }
336:                    break;
337:
338:                case Event.GET_LOCAL_ADDRESS: // return local address -> Event(SET_LOCAL_ADDRESS, local)
339:                    passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr));
340:                    break;
341:
342:                case Event.SET_LOCAL_ADDRESS:
343:                    local_addr = (Address) evt.getArg();
344:                    if (local_addr instanceof  IpAddress
345:                            && additional_data != null)
346:                        ((IpAddress) local_addr)
347:                                .setAdditionalData(additional_data);
348:                    break;
349:
350:                case Event.CONNECT:
351:                    channel_name = (String) evt.getArg();
352:                    if (stub == null) {
353:                        if (log.isErrorEnabled())
354:                            log.error("CONNECT:  router stub is null!");
355:                    } else {
356:                        try {
357:                            createTunnel();
358:                        } catch (Exception e) {
359:                            if (log.isErrorEnabled())
360:                                log
361:                                        .error("failed connecting to GossipRouter at "
362:                                                + router_host
363:                                                + ":"
364:                                                + router_port);
365:                            break;
366:                        }
367:                    }
368:
369:                    receiver = new Thread(this , "TUNNEL receiver thread");
370:                    receiver.setDaemon(true);
371:                    receiver.start();
372:
373:                    passUp(new Event(Event.CONNECT_OK));
374:                    break;
375:
376:                case Event.DISCONNECT:
377:                    if (receiver != null) {
378:                        receiver = null;
379:                        stub.disconnect();
380:                    }
381:                    teardownTunnel();
382:                    passUp(new Event(Event.DISCONNECT_OK));
383:                    passUp(new Event(Event.SET_LOCAL_ADDRESS, null));
384:                    local_addr = null;
385:                    break;
386:
387:                case Event.CONFIG:
388:                    if (log.isDebugEnabled())
389:                        log.debug("received CONFIG event: " + evt.getArg());
390:                    handleConfigEvent((HashMap) evt.getArg());
391:                    break;
392:                }
393:            }
394:
395:            private void startReconnector() {
396:                synchronized (reconnector_mutex) {
397:                    reconnector.start();
398:                }
399:            }
400:
401:            private void stopReconnector() {
402:                synchronized (reconnector_mutex) {
403:                    reconnector.stop();
404:                }
405:            }
406:
407:            void handleConfigEvent(HashMap map) {
408:                if (map == null)
409:                    return;
410:                if (map.containsKey("additional_data"))
411:                    additional_data = (byte[]) map.get("additional_data");
412:            }
413:
414:            /* ------------------------------------------------------------------------------- */
415:
416:            private class Reconnector implements  Runnable {
417:                Thread my_thread = null;
418:
419:                public void start() {
420:                    synchronized (this ) {
421:                        if (my_thread == null || !my_thread.isAlive()) {
422:                            my_thread = new Thread(this , "Reconnector");
423:                            my_thread.setDaemon(true);
424:                            my_thread.start();
425:                        }
426:                    }
427:                }
428:
429:                public void stop() {
430:                    synchronized (this ) {
431:                        my_thread = null;
432:                    }
433:                }
434:
435:                public void run() {
436:                    while (Thread.currentThread().equals(my_thread)) {
437:                        try {
438:                            stub.reconnect();
439:                            if (log.isTraceEnabled())
440:                                log.trace("reconnected");
441:                            break;
442:                        } catch (Exception e) {
443:                        }
444:                        Util.sleep(reconnect_interval);
445:                    }
446:                }
447:            }
448:
449:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.