Source Code Cross Referenced for Link.java in  » Net » JGroups-2.4.1-sp3 » org » jgroups » blocks » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » JGroups 2.4.1 sp3 » org.jgroups.blocks 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        // $Id: Link.java,v 1.7.10.1 2007/04/27 08:03:56 belaban Exp $
002:
003:        package org.jgroups.blocks;
004:
005:        import org.jgroups.util.TimedWriter;
006:        import org.jgroups.util.Util;
007:        import org.apache.commons.logging.Log;
008:        import org.apache.commons.logging.LogFactory;
009:
010:        import java.io.*;
011:        import java.net.InetAddress;
012:        import java.net.ServerSocket;
013:        import java.net.Socket;
014:
015:        /**
016:         * Implements a physical link between 2 parties (point-to-point connection). For incoming traffic,
017:         * a server socket is created (bound to a given local address and port). The receiver thread does the
018:         * following: it accepts a new connection from the server socket and (on the same thread) reads messages
019:         * until the connection breaks. Then it goes back to accept(). This is done in 2 nested while-loops.
020:         * The outgoing connection is established when started. If this fails, the link is marked as not established.
021:         * This means that there is not outgoing socket.<br>
022:         * A heartbeat will be exchanged between the 2 peers periodically as long as the connection is established
023:         * (outgoing socket is okay). When the connection breaks, the heartbeat will stop and a connection establisher
024:         * thread will be started. It periodically tries to re-establish connection to the peer. When this happens
025:         * it will stop and the heartbeat thread will resume.<br>
026:         * For details see Link.txt
027:         * @author  Bela Ban, June 2000
028:         */
029:        public class Link implements  Runnable {
030:            String local_addr = null, remote_addr = null;
031:            InetAddress local = null, remote = null;
032:            int local_port = 0, remote_port = 0;
033:            ServerSocket srv_sock = null;
034:            Socket outgoing = null; // traffic to peer
035:            Socket incoming = null; // traffic from peer
036:            DataOutputStream outstream = null;
037:            DataInputStream instream = null;
038:            boolean established = false; // (incoming and outgoing) connections to peer are up and running
039:            boolean stop = false;
040:            boolean trace = false;
041:            Thread receiver_thread = null;
042:            final long receiver_thread_join_timeout = 2000;
043:            Receiver receiver = null;
044:            static final int HB_PACKET = -99;
045:            Heartbeat hb = null;
046:            long timeout = 10000; // if no heartbeat was received for timeout msecs, assume peer is dead
047:            long hb_interval = 3000; // send a heartbeat every n msecs
048:            final Object outgoing_mutex = new Object(); // sync on creation and closing of outgoing socket
049:            TimedWriter writer = null;
050:            Log log = LogFactory.getLog(getClass());
051:
052:            public interface Receiver {
053:                void receive(byte[] msg);
054:
055:                void linkDown(InetAddress local, int local_port,
056:                        InetAddress remote, int remote_port);
057:
058:                void linkUp(InetAddress local, int local_port,
059:                        InetAddress remote, int remote_port);
060:
061:                void missedHeartbeat(InetAddress local, int local_port,
062:                        InetAddress remote, int remote_port, int num_hbs);
063:
064:                void receivedHeartbeatAgain(InetAddress local, int local_port,
065:                        InetAddress remote, int remote_port);
066:            }
067:
068:            public Link(String local_addr, int local_port, String remote_addr,
069:                    int remote_port) {
070:                this .local_addr = local_addr;
071:                this .local_port = local_port;
072:                this .remote_addr = remote_addr;
073:                this .remote_port = remote_port;
074:                hb = new Heartbeat(timeout, hb_interval);
075:            }
076:
077:            public Link(String local_addr, int local_port, String remote_addr,
078:                    int remote_port, Receiver r) {
079:                this (local_addr, local_port, remote_addr, remote_port);
080:                setReceiver(r);
081:            }
082:
083:            public Link(String local_addr, int local_port, String remote_addr,
084:                    int remote_port, long timeout, long hb_interval, Receiver r) {
085:                this .local_addr = local_addr;
086:                this .local_port = local_port;
087:                this .remote_addr = remote_addr;
088:                this .remote_port = remote_port;
089:                this .timeout = timeout;
090:                this .hb_interval = hb_interval;
091:                hb = new Heartbeat(timeout, hb_interval);
092:                setReceiver(r);
093:            }
094:
095:            public void setTrace(boolean t) {
096:                trace = t;
097:            }
098:
099:            public void setReceiver(Receiver r) {
100:                receiver = r;
101:            }
102:
103:            public boolean established() {
104:                return established;
105:            }
106:
107:            public InetAddress getLocalAddress() {
108:                return local;
109:            }
110:
111:            public InetAddress getRemoteAddress() {
112:                return remote;
113:            }
114:
115:            public int getLocalPort() {
116:                return local_port;
117:            }
118:
119:            public int getRemotePort() {
120:                return remote_port;
121:            }
122:
123:            public void start() throws Exception {
124:                local = InetAddress.getByName(local_addr);
125:                remote = InetAddress.getByName(remote_addr);
126:                srv_sock = new ServerSocket(local_port, 1, local);
127:                createOutgoingConnection(hb_interval); // connection to peer established, sets established=true
128:                startReceiverThread(); // start reading from incoming socket
129:                hb.start(); // starts heartbeat (conn establisher is not yet started)
130:            }
131:
132:            public void stop() {
133:                stopReceiverThread();
134:                hb.stop();
135:                try {
136:                    srv_sock.close();
137:                } catch (Exception e) {
138:                }
139:                established = false;
140:            }
141:
142:            /** Tries to send buffer across out socket. Tries to establish connection if not yet connected. */
143:            public boolean send(byte[] buf) {
144:                if (buf == null || buf.length == 0) {
145:                    if (log.isTraceEnabled())
146:                        System.err
147:                                .println("Link.send(): buffer is null or does not contain any data !");
148:                    return false;
149:                }
150:                if (!established) { // will be set by ConnectionEstablisher when connection has been set up
151:                    if (log.isTraceEnabled())
152:                        log
153:                                .error("Link.send(): connection not established, discarding message");
154:                    return false;
155:                }
156:
157:                try {
158:                    outstream.writeInt(buf.length); // synchronized anyway
159:                    outstream.write(buf); // synchronized anyway, we don't need to sync on outstream
160:                    return true;
161:                } catch (Exception ex) { // either IOException or EOFException (subclass if IOException)
162:                    if (log.isTraceEnabled())
163:                        log.error("Link.send1(): sending failed; retrying");
164:                    return retry(buf);
165:                }
166:            }
167:
168:            boolean retry(byte[] buf) {
169:                closeOutgoingConnection(); // there something wrong, close connection
170:                if (!createOutgoingConnection()) { // ... and re-open. if this fails,
171:                    closeOutgoingConnection(); // just abort and return failure to caller
172:                    return false;
173:                } else {
174:                    try {
175:                        outstream.writeInt(buf.length);
176:                        outstream.write(buf);
177:                        return true;
178:                    } catch (Exception e) {
179:                        if (log.isTraceEnabled())
180:                            System.out
181:                                    .println("Link.send2(): failed, closing connection");
182:                        closeOutgoingConnection();
183:                        return false;
184:                    }
185:                }
186:            }
187:
188:            /** Receiver thread main loop. Accept a connection and then read on it until the connection
189:            breaks. Only then is the next connection handled. The reason is that there is only supposed
190:            to be 1 connection to this server socket at the same time. 
191:             */
192:            public void run() {
193:                int num_bytes;
194:                byte[] buf;
195:                InetAddress peer = null;
196:                int peer_port = 0;
197:
198:                while (!stop) {
199:                    try {
200:                        if (log.isTraceEnabled())
201:                            System.out.println("-- WAITING for ACCEPT");
202:                        incoming = srv_sock.accept();
203:                        instream = new DataInputStream(incoming
204:                                .getInputStream());
205:                        peer = incoming.getInetAddress();
206:                        peer_port = incoming.getPort();
207:
208:                        if (log.isTraceEnabled())
209:                            System.out.println("-- ACCEPT: incoming is "
210:                                    + printSocket(incoming));
211:
212:                        /** This piece of code would only accept connections from the peer address defined above. */
213:                        if (remote.equals(incoming.getInetAddress())) {
214:                            if (log.isTraceEnabled())
215:                                System.out
216:                                        .println("Link.run(): accepted connection from "
217:                                                + peer + ':' + peer_port);
218:                        } else {
219:                            if (log.isTraceEnabled())
220:                                log
221:                                        .error("Link.run(): rejected connection request from "
222:                                                + peer
223:                                                + ':'
224:                                                + peer_port
225:                                                + ". Address not specified as peer in link !");
226:                            closeIncomingConnection(); // only close incoming connection
227:                            continue;
228:                        }
229:
230:                        // now try to create outgoing connection
231:                        if (!established) {
232:                            createOutgoingConnection();
233:                        }
234:
235:                        while (!stop) {
236:                            try {
237:                                num_bytes = instream.readInt();
238:                                if (num_bytes == HB_PACKET) {
239:                                    hb.receivedHeartbeat();
240:                                    continue;
241:                                }
242:
243:                                buf = new byte[num_bytes];
244:                                instream.readFully(buf, 0, buf.length);
245:                                hb.receivedMessage(); // equivalent to heartbeat response (HB_PACKET)
246:                                if (receiver != null)
247:                                    receiver.receive(buf);
248:                            } catch (Exception ex) { // IOException, EOFException, SocketException
249:                                closeIncomingConnection(); // close incoming when read() fails
250:                                break;
251:                            }
252:                        }
253:                    } catch (IOException io_ex) {
254:                        receiver_thread = null;
255:                        break;
256:                    } catch (Exception e) {
257:                    }
258:                }
259:            }
260:
261:            public String toString() {
262:                StringBuffer ret = new StringBuffer();
263:                ret.append("Link <" + local_addr + ':' + local_port + " --> "
264:                        + remote_addr + ':' + remote_port + '>');
265:                ret.append(established ? " (established)"
266:                        : " (not established)");
267:                return ret.toString();
268:            }
269:
270:            public boolean equals(Object other) {
271:                Link o;
272:
273:                if (other == null)
274:                    return false;
275:                if (!(other instanceof  Link))
276:                    return false;
277:                o = (Link) other;
278:                if (local_addr.equals(o.local_addr)
279:                        && remote_addr.equals(o.remote_addr)
280:                        && local_port == o.local_port
281:                        && remote_port == o.remote_port)
282:                    return true;
283:                else
284:                    return false;
285:            }
286:
287:            public int hashCode() {
288:                return local_addr.hashCode() + remote_addr.hashCode()
289:                        + local_port + remote_port;
290:            }
291:
292:            void startReceiverThread() {
293:                stopReceiverThread();
294:                receiver_thread = new Thread(this , "Link.ReceiverThreadThread");
295:                receiver_thread.setDaemon(true);
296:                receiver_thread.start();
297:            }
298:
299:            void stopReceiverThread() {
300:                if (receiver_thread != null && receiver_thread.isAlive()) {
301:                    stop = true;
302:                    closeIncomingConnection();
303:                    try {
304:                        receiver_thread.join(receiver_thread_join_timeout);
305:                    } catch (Exception e) {
306:                    }
307:                    stop = false;
308:                }
309:                receiver_thread = null;
310:            }
311:
312:            /** Tries to create an outgoing connection. If successful, the heartbeat is started. Does <em>not</em>
313:            stop the connection establisher ! The reason is that this method is going to be called by the
314:            connection establisher as well, therefore it would kill itself ! */
315:            boolean createOutgoingConnection() {
316:                synchronized (outgoing_mutex) { // serialize access with ConnectionEstablisher
317:                    if (established) {
318:                        return true;
319:                    }
320:                    try {
321:                        // create a socket to remote:remote_port, bind to local address (choose any local port);
322:                        outgoing = new Socket(remote, remote_port, local, 0); // 0 means choose any local port
323:                        outgoing.setSoLinger(true, 1); // 1 second  // +++ ? needed ? it is off by default !
324:                        outstream = new DataOutputStream(outgoing
325:                                .getOutputStream());
326:                        if (receiver != null)
327:                            receiver.linkUp(local, local_port, remote,
328:                                    remote_port);
329:                        established = true;
330:
331:                        if (log.isTraceEnabled())
332:                            System.out.println("-- CREATE: outgoing is "
333:                                    + printSocket(outgoing));
334:
335:                        return true;
336:                    } catch (Exception e) {
337:                        established = false;
338:                        return false;
339:                    }
340:                }
341:            }
342:
343:            /** 
344:            Tries to create an outgoing connection. If successful, the heartbeat is started. Does <em>not</em>
345:            stop the connection establisher ! The reason is that this method is going to be called by the
346:            connection establisher as well, therefore it would kill itself !
347:             */
348:            boolean createOutgoingConnection(long timeout) {
349:                synchronized (outgoing_mutex) { // serialize access with ConnectionEstablisher
350:                    if (established) {
351:                        return true;
352:                    }
353:                    try {
354:                        if (writer == null)
355:                            writer = new TimedWriter();
356:
357:                        // create a socket to remote:remote_port, bind to local address (choose any local port);
358:                        // outgoing=new Socket(remote, remote_port, local, 0); // 0 means choose any local port
359:                        outgoing = writer.createSocket(local, remote,
360:                                remote_port, timeout);
361:                        outgoing.setSoLinger(true, 1); // 1 second  // +++ ? needed ? it is off by default !
362:                        outstream = new DataOutputStream(outgoing
363:                                .getOutputStream());
364:                        if (receiver != null)
365:                            receiver.linkUp(local, local_port, remote,
366:                                    remote_port);
367:                        established = true;
368:                        if (log.isTraceEnabled())
369:                            System.out.println("-- CREATE: outgoing is "
370:                                    + printSocket(outgoing));
371:                        return true;
372:                    } catch (Exception e) {
373:                        established = false;
374:                        return false;
375:                    }
376:                }
377:            }
378:
379:            /** Closes the outgoing connection */
380:            void closeOutgoingConnection() {
381:                synchronized (outgoing_mutex) {
382:                    if (!established) {
383:                        return;
384:                    }
385:                    if (outstream != null) {
386:
387:                        if (log.isTraceEnabled())
388:                            System.out.println("-- CLOSE: outgoing is "
389:                                    + printSocket(outgoing));
390:
391:                        try {
392:                            outstream.close(); // flush data before socket is closed
393:                        } catch (Exception e) {
394:                        }
395:                        outstream = null;
396:                    }
397:                    if (outgoing != null) {
398:                        try {
399:                            outgoing.close();
400:                        } catch (Exception e) {
401:                        }
402:                        outgoing = null;
403:                    }
404:                    established = false;
405:                    if (receiver != null)
406:                        receiver.linkDown(local, local_port, remote,
407:                                remote_port);
408:                }
409:            }
410:
411:            /** When the heartbeat thread detects that the peer 'hangs' (not detected by incoming.read()), 
412:                then it closes the outgoing *and* incoming socket. The latter needs to be done,
413:                so that we can return to accept() and await a new client connection request. */
414:            synchronized void closeIncomingConnection() {
415:                if (instream != null) {
416:
417:                    if (log.isTraceEnabled())
418:                        System.out.println("-- CLOSE: incoming is "
419:                                + printSocket(incoming));
420:
421:                    try {
422:                        instream.close();
423:                    } catch (Exception e) {
424:                    }
425:                    instream = null;
426:                }
427:                if (incoming != null) {
428:                    try {
429:                        incoming.close();
430:                    } catch (Exception e) {
431:                    }
432:                    incoming = null;
433:                }
434:            }
435:
436:            /** Close outgoing and incoming sockets. */
437:            synchronized void closeConnections() {
438:
439:                // 1. Closes the outgoing connection. Then the connection establisher is started. The heartbeat
440:                //    thread cannot be stopped in here, because this method is called by it !	    
441:                closeOutgoingConnection();
442:
443:                // 2. When the heartbeat thread detects that the peer 'hangs' (not detected by incoming.read()), 
444:                //    then it closes the outgoing *and* incoming socket. The latter needs to be done,
445:                //    so that we can return to accept() and await a new client connection request.
446:                closeIncomingConnection();
447:            }
448:
449:            String printSocket(Socket s) {
450:                if (s == null)
451:                    return "<null>";
452:                StringBuffer ret = new StringBuffer();
453:                ret.append(s.getLocalAddress().getHostName());
454:                ret.append(':');
455:                ret.append(s.getLocalPort());
456:                ret.append(" --> ");
457:                ret.append(s.getInetAddress().getHostName());
458:                ret.append(':');
459:                ret.append(s.getPort());
460:                return ret.toString();
461:            }
462:
463:            /**
464:               Sends heartbeats across the link as long as we are connected (established=true). Uses a TimedWriter
465:               for both sending and responding to heartbeats. The reason is that a write() might hang if the
466:               peer has not closed its end, but the connection hangs (e.g. network partition, peer was stop-a'ed,
467:               ctrl-z of peer or peer's NIC was unplumbed) and the writer buffer is filled to capacity. This way,
468:               we don't hang sending timeouts.
469:             */
470:            class Heartbeat implements  Runnable {
471:                Thread thread = null;
472:                long hb_timeout = 10000; // time to wait for heartbeats from peer, if not received -> boom !
473:                long interval = 3000; // {send a heartbeat | try to create connection} every 3 secs
474:                boolean stop_hb = false;
475:                long last_hb = System.currentTimeMillis();
476:                boolean missed_hb = false;
477:                final TimedWriter timed_writer = new TimedWriter();
478:
479:                public Heartbeat(long timeout, long hb_interval) {
480:                    this .hb_timeout = timeout;
481:                    this .interval = hb_interval;
482:                }
483:
484:                public synchronized void start() {
485:                    stop();
486:                    stop_hb = false;
487:                    missed_hb = false;
488:                    last_hb = System.currentTimeMillis();
489:                    thread = new Thread(this , "HeartbeatThread");
490:                    thread.setDaemon(true);
491:                    thread.start();
492:                }
493:
494:                public synchronized void interrupt() {
495:                    thread.interrupt();
496:                }
497:
498:                public synchronized void stop() {
499:                    if (thread != null && thread.isAlive()) {
500:                        stop_hb = true;
501:                        missed_hb = false;
502:                        thread.interrupt();
503:                        try {
504:                            thread.join(hb_timeout + 1000);
505:                        } catch (Exception e) {
506:                        }
507:                        thread = null;
508:                    }
509:                }
510:
511:                /**
512:                   When we receive a message from the peer, this means the peer is alive. Therefore we
513:                   update the time of the last heartbeat.
514:                 */
515:                public void receivedMessage() {
516:                    last_hb = System.currentTimeMillis();
517:                    if (missed_hb) {
518:                        if (receiver != null)
519:                            receiver.receivedHeartbeatAgain(local, local_port,
520:                                    remote, remote_port);
521:                        missed_hb = false;
522:                    }
523:                }
524:
525:                /** Callback, called by the Link whenever it encounters a heartbeat (HB_PACKET) */
526:                public void receivedHeartbeat() {
527:                    last_hb = System.currentTimeMillis();
528:                    if (missed_hb) {
529:                        if (receiver != null)
530:                            receiver.receivedHeartbeatAgain(local, local_port,
531:                                    remote, remote_port);
532:                        missed_hb = false;
533:                    }
534:                }
535:
536:                /**
537:                   Sends heartbeats when connection is established. Tries to establish connection when not established.
538:                   Switches between 'established' and 'not established' roles.
539:                 */
540:                public void run() {
541:                    long diff = 0, curr_time = 0, num_missed_hbs = 0;
542:
543:                    if (log.isTraceEnabled())
544:                        System.out.println("heartbeat to " + remote + ':'
545:                                + remote_port + " started");
546:                    while (!stop_hb) {
547:
548:                        if (established) { // send heartbeats
549:
550:                            // 1. Send heartbeat (use timed write)
551:                            if (outstream != null) {
552:                                try {
553:                                    timed_writer.write(outstream, HB_PACKET,
554:                                            1500);
555:                                    Thread.sleep(interval);
556:                                } catch (Exception io_ex) { // IOException and TimedWriter.Timeout
557:                                    closeOutgoingConnection(); // sets established to false
558:                                    continue;
559:                                }
560:                            } else {
561:                                established = false;
562:                                continue;
563:                            }
564:
565:                            // 2. If time of last HB received > timeout --> close connection
566:                            curr_time = System.currentTimeMillis();
567:                            diff = curr_time - last_hb;
568:
569:                            if (curr_time - last_hb > interval) {
570:                                num_missed_hbs = (curr_time - last_hb)
571:                                        / interval;
572:                                if (receiver != null)
573:                                    receiver.missedHeartbeat(local, local_port,
574:                                            remote, remote_port,
575:                                            (int) num_missed_hbs);
576:                                missed_hb = true;
577:                            }
578:
579:                            if (diff >= hb_timeout) {
580:                                if (log.isTraceEnabled())
581:                                    System.out
582:                                            .println("###### Link.Heartbeat.run(): no heartbeat receveived for "
583:                                                    + diff
584:                                                    + " msecs. Closing connections. #####");
585:                                closeConnections(); // close both incoming *and* outgoing connections
586:                            }
587:                        } else { // try to establish connection
588:                            synchronized (outgoing_mutex) { // serialize access with createOutgoingConnection()
589:                                if (established) {
590:                                    continue;
591:                                }
592:                                try {
593:                                    outgoing = timed_writer.createSocket(local,
594:                                            remote, remote_port, interval);
595:                                    outstream = new DataOutputStream(outgoing
596:                                            .getOutputStream());
597:                                    if (receiver != null)
598:                                        receiver.linkUp(local, local_port,
599:                                                remote, remote_port);
600:                                    established = true;
601:                                    if (log.isTraceEnabled())
602:                                        System.out.println("-- CREATE (CE): "
603:                                                + printSocket(outgoing));
604:                                    continue;
605:                                } catch (InterruptedException interrupted_ex) {
606:                                    continue;
607:                                } catch (Exception ex) { // IOException, TimedWriter.Timeout
608:                                    Util.sleep(interval); // returns when done or interrupted
609:                                }
610:                            }
611:                        }
612:                    }
613:                    if (log.isTraceEnabled())
614:                        System.out.println("heartbeat to " + remote + ':'
615:                                + remote_port + " stopped");
616:                    thread = null;
617:                }
618:            }
619:
620:            private static class MyReceiver implements  Link.Receiver {
621:
622:                public void receive(byte[] msg) {
623:                    System.out.println("<-- " + new String(msg));
624:                }
625:
626:                public void linkDown(InetAddress l, int lp, InetAddress r,
627:                        int rp) {
628:                    System.out.println("** linkDown(): " + r + ':' + rp);
629:                }
630:
631:                public void linkUp(InetAddress l, int lp, InetAddress r, int rp) {
632:                    System.out.println("** linkUp(): " + r + ':' + rp);
633:                }
634:
635:                public void missedHeartbeat(InetAddress l, int lp,
636:                        InetAddress r, int rp, int num) {
637:                    System.out.println("** missedHeartbeat(): " + r + ':' + rp);
638:                }
639:
640:                public void receivedHeartbeatAgain(InetAddress l, int lp,
641:                        InetAddress r, int rp) {
642:                    System.out.println("** receivedHeartbeatAgain(): " + r
643:                            + ':' + rp);
644:                }
645:            }
646:
647:            public static void main(String[] args) {
648:                String local, remote;
649:                int local_port, remote_port;
650:
651:                if (args.length != 4) {
652:                    System.err
653:                            .println("\nLink <local host> <local port> <remote host> <remote port>\n");
654:                    return;
655:                }
656:                local = args[0];
657:                remote = args[2];
658:                local_port = Integer.parseInt(args[1]);
659:                remote_port = Integer.parseInt(args[3]);
660:
661:                Link l = new Link(local, local_port, remote, remote_port,
662:                        new MyReceiver());
663:
664:                try {
665:                    l.start();
666:                    System.out.println(l);
667:
668:                    BufferedReader in = new BufferedReader(
669:                            new InputStreamReader(System.in));
670:                    while (true) {
671:                        System.out.print("> ");
672:                        System.out.flush();
673:                        String line = in.readLine();
674:                        l.send(line.getBytes());
675:                    }
676:                } catch (Exception e) {
677:                    System.err.println(e);
678:                }
679:            }
680:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.