Source Code Cross Referenced for BasicConnectionTable.java in  » Net » JGroups-2.4.1-sp3 » org » jgroups » blocks » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » JGroups 2.4.1 sp3 » org.jgroups.blocks 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        package org.jgroups.blocks;
002:
003:        import org.jgroups.Address;
004:        import org.jgroups.Version;
005:        import org.jgroups.stack.IpAddress;
006:        import org.jgroups.util.Queue;
007:        import org.jgroups.util.QueueClosedException;
008:        import org.jgroups.util.Util;
009:        import org.apache.commons.logging.Log;
010:        import org.apache.commons.logging.LogFactory;
011:
012:        import java.net.Socket;
013:        import java.net.InetAddress;
014:        import java.net.SocketException;
015:        import java.net.ServerSocket;
016:        import java.io.DataOutputStream;
017:        import java.io.DataInputStream;
018:        import java.io.BufferedOutputStream;
019:        import java.io.BufferedInputStream;
020:        import java.io.IOException;
021:        import java.io.EOFException;
022:        import java.util.*;
023:
024:        /**
025:         * Shared class for TCP connection tables.
026:         * @author Scott Marlow
027:         */
028:        public abstract class BasicConnectionTable {
029:            final HashMap conns = new HashMap(); // keys: Addresses (peer address), values: Connection
030:            Receiver receiver = null;
031:            boolean use_send_queues = true;
032:            InetAddress bind_addr = null;
033:            Address local_addr = null; // bind_addr + port of srv_sock
034:            int srv_port = 7800;
035:            int recv_buf_size = 120000;
036:            int send_buf_size = 60000;
037:            final Vector conn_listeners = new Vector(); // listeners to be notified when a conn is established/torn down
038:            final Object recv_mutex = new Object(); // to serialize simultaneous access to receive() from multiple Connections
039:            Reaper reaper = null; // closes conns that have been idle for more than n secs
040:            long reaper_interval = 60000; // reap unused conns once a minute
041:            long conn_expire_time = 300000; // connections can be idle for 5 minutes before they are reaped
042:            int sock_conn_timeout = 1000; // max time in millis to wait for Socket.connect() to return
043:            ThreadGroup thread_group = null;
044:            protected final Log log = LogFactory.getLog(getClass());
045:            final byte[] cookie = { 'b', 'e', 'l', 'a' };
046:            boolean use_reaper = false; // by default we don't reap idle conns
047:            static final int backlog = 20; // 20 conn requests are queued by ServerSocket (addtl will be discarded)
048:            ServerSocket srv_sock = null;
049:            boolean reuse_addr = false;
050:            boolean tcp_nodelay = false;
051:            int linger = -1;
052:
053:            /**
054:             * The address which will be broadcast to the group (the externally visible address which this host should
055:             * be contacted on). If external_addr is null, it will default to the same address that the server socket is bound to.
056:             */
057:            InetAddress external_addr = null;
058:            int max_port = 0; // maximum port to bind to (if < srv_port, no limit)
059:            Thread acceptor = null; // continuously calls srv_sock.accept()
060:            boolean running = false;
061:
062:            final static long MAX_JOIN_TIMEOUT = 10000;
063:
064:            public final void setReceiver(Receiver r) {
065:                receiver = r;
066:            }
067:
068:            public void addConnectionListener(ConnectionListener l) {
069:                if (l != null && !conn_listeners.contains(l))
070:                    conn_listeners.addElement(l);
071:            }
072:
073:            public void removeConnectionListener(ConnectionListener l) {
074:                if (l != null)
075:                    conn_listeners.removeElement(l);
076:            }
077:
078:            public Address getLocalAddress() {
079:                if (local_addr == null)
080:                    local_addr = bind_addr != null ? new IpAddress(bind_addr,
081:                            srv_port) : null;
082:                return local_addr;
083:            }
084:
085:            public int getSendBufferSize() {
086:                return send_buf_size;
087:            }
088:
089:            public void setSendBufferSize(int send_buf_size) {
090:                this .send_buf_size = send_buf_size;
091:            }
092:
093:            public int getReceiveBufferSize() {
094:                return recv_buf_size;
095:            }
096:
097:            public void setReceiveBufferSize(int recv_buf_size) {
098:                this .recv_buf_size = recv_buf_size;
099:            }
100:
101:            public int getSocketConnectionTimeout() {
102:                return sock_conn_timeout;
103:            }
104:
105:            public void setSocketConnectionTimeout(int sock_conn_timeout) {
106:                this .sock_conn_timeout = sock_conn_timeout;
107:            }
108:
109:            public int getNumConnections() {
110:                return conns.size();
111:            }
112:
113:            public boolean getTcpNodelay() {
114:                return tcp_nodelay;
115:            }
116:
117:            public void setTcpNodelay(boolean tcp_nodelay) {
118:                this .tcp_nodelay = tcp_nodelay;
119:            }
120:
121:            public int getLinger() {
122:                return linger;
123:            }
124:
125:            public void setLinger(int linger) {
126:                this .linger = linger;
127:            }
128:
129:            public boolean getUseSendQueues() {
130:                return use_send_queues;
131:            }
132:
133:            public void setUseSendQueues(boolean flag) {
134:                this .use_send_queues = flag;
135:            }
136:
137:            public void start() throws Exception {
138:                running = true;
139:            }
140:
141:            public void stop() {
142:                running = false;
143:            }
144:
145:            /**
146:             Remove <code>addr</code>from connection table. This is typically triggered when a member is suspected.
147:             */
148:            public void remove(Address addr) {
149:                Connection conn;
150:
151:                synchronized (conns) {
152:                    conn = (Connection) conns.remove(addr);
153:                }
154:
155:                if (conn != null) {
156:                    try {
157:                        conn.destroy(); // won't do anything if already destroyed
158:                    } catch (Exception e) {
159:                    }
160:                }
161:                if (log.isTraceEnabled())
162:                    log.trace("removed " + addr + ", connections are "
163:                            + toString());
164:            }
165:
166:            /**
167:             * Calls the receiver callback. We serialize access to this method because it may be called concurrently
168:             * by several Connection handler threads. Therefore the receiver doesn't need to synchronize.
169:             */
170:            public void receive(Address sender, byte[] data, int offset,
171:                    int length) {
172:                if (receiver != null) {
173:                    synchronized (recv_mutex) {
174:                        receiver.receive(sender, data, offset, length);
175:                    }
176:                } else if (log.isErrorEnabled())
177:                    log.error("receiver is null (not set) !");
178:            }
179:
180:            public String toString() {
181:                StringBuffer ret = new StringBuffer();
182:                Address key;
183:                Connection val;
184:                Map.Entry entry;
185:                HashMap copy;
186:
187:                synchronized (conns) {
188:                    copy = new HashMap(conns);
189:                }
190:                ret.append("connections (" + copy.size() + "):\n");
191:                for (Iterator it = copy.entrySet().iterator(); it.hasNext();) {
192:                    entry = (Map.Entry) it.next();
193:                    key = (Address) entry.getKey();
194:                    val = (Connection) entry.getValue();
195:                    ret.append("key: " + key + ": " + val + '\n');
196:                }
197:                ret.append('\n');
198:                return ret.toString();
199:            }
200:
201:            void notifyConnectionOpened(Address peer) {
202:                if (peer == null)
203:                    return;
204:                for (int i = 0; i < conn_listeners.size(); i++)
205:                    ((ConnectionListener) conn_listeners.elementAt(i))
206:                            .connectionOpened(peer);
207:            }
208:
209:            void notifyConnectionClosed(Address peer) {
210:                if (peer == null)
211:                    return;
212:                for (int i = 0; i < conn_listeners.size(); i++)
213:                    ((ConnectionListener) conn_listeners.elementAt(i))
214:                            .connectionClosed(peer);
215:            }
216:
217:            void addConnection(Address peer, Connection c) {
218:                conns.put(peer, c);
219:                if (reaper != null && !reaper.isRunning())
220:                    reaper.start();
221:            }
222:
223:            public void send(Address dest, byte[] data, int offset, int length)
224:                    throws Exception {
225:                Connection conn;
226:                if (dest == null) {
227:                    if (log.isErrorEnabled())
228:                        log.error("destination is null");
229:                    return;
230:                }
231:
232:                if (data == null) {
233:                    log.warn("data is null; discarding packet");
234:                    return;
235:                }
236:
237:                if (!running) {
238:                    if (log.isWarnEnabled())
239:                        log
240:                                .warn("connection table is not running, discarding message to "
241:                                        + dest);
242:                    return;
243:                }
244:
245:                // 1. Try to obtain correct Connection (or create one if not yet existent)
246:                try {
247:                    conn = getConnection(dest);
248:                    if (conn == null)
249:                        return;
250:                } catch (Throwable ex) {
251:                    throw new Exception("connection to " + dest
252:                            + " could not be established", ex);
253:                }
254:
255:                // 2. Send the message using that connection
256:                try {
257:                    conn.send(data, offset, length);
258:                } catch (Throwable ex) {
259:                    if (log.isTraceEnabled())
260:                        log.trace("sending msg to " + dest + " failed ("
261:                                + ex.getClass().getName()
262:                                + "); removing from connection table", ex);
263:                    remove(dest);
264:                }
265:            }
266:
267:            abstract Connection getConnection(Address dest) throws Exception;
268:
269:            /**
270:             * Removes all connections from ConnectionTable which are not in c
271:             * @param c
272:             */
273:            //public void retainAll(Collection c) {
274:            //  conns.keySet().retainAll(c);
275:            //}
276:
277:            /**
278:             * Removes all connections from ConnectionTable which are not in current_mbrs
279:             * @param current_mbrs
280:             */
281:            public void retainAll(Collection current_mbrs) {
282:                if (current_mbrs == null)
283:                    return;
284:                HashMap copy;
285:                synchronized (conns) {
286:                    copy = new HashMap(conns);
287:                    conns.keySet().retainAll(current_mbrs);
288:                }
289:
290:                // All of the connections that were not retained must be destroyed
291:                // so that their resources are cleaned up.
292:                Map.Entry entry;
293:                for (Iterator it = copy.entrySet().iterator(); it.hasNext();) {
294:                    entry = (Map.Entry) it.next();
295:                    Object oKey = entry.getKey();
296:                    if (!current_mbrs.contains(oKey)) { // This connection NOT in the resultant connection set
297:                        Connection conn = (Connection) entry.getValue();
298:                        if (null != conn) { // Destroy this connection
299:                            if (log.isTraceEnabled())
300:                                log.trace("Destroy this orphaned connection: "
301:                                        + conn);
302:                            conn.destroy();
303:                        }
304:                    }
305:                }
306:                copy.clear();
307:            }
308:
309:            /** Used for message reception. */
310:            public interface Receiver {
311:                void receive(Address sender, byte[] data, int offset, int length);
312:            }
313:
314:            /** Used to be notified about connection establishment and teardown. */
315:            public interface ConnectionListener {
316:                void connectionOpened(Address peer_addr);
317:
318:                void connectionClosed(Address peer_addr);
319:            }
320:
321:            class Connection implements  Runnable {
322:                Socket sock = null; // socket to/from peer (result of srv_sock.accept() or new Socket())
323:                String sock_addr = null; // used for Thread.getName()
324:                DataOutputStream out = null; // for sending messages
325:                DataInputStream in = null; // for receiving messages
326:                Thread receiverThread = null; // thread for receiving messages
327:                Address peer_addr = null; // address of the 'other end' of the connection
328:                final Object send_mutex = new Object(); // serialize sends
329:                long last_access = System.currentTimeMillis(); // last time a message was sent or received
330:
331:                /** Queue<byte[]> of data to be sent to the peer of this connection */
332:                Queue send_queue = new Queue();
333:                Sender sender = new ConnectionTable.Connection.Sender();
334:                boolean is_running = false;
335:
336:                private String getSockAddress() {
337:                    if (sock_addr != null)
338:                        return sock_addr;
339:                    if (sock != null) {
340:                        StringBuffer sb = new StringBuffer();
341:                        sb.append(sock.getLocalAddress().getHostAddress())
342:                                .append(':').append(sock.getLocalPort());
343:                        sb.append(" - ").append(
344:                                sock.getInetAddress().getHostAddress()).append(
345:                                ':').append(sock.getPort());
346:                        sock_addr = sb.toString();
347:                    }
348:                    return sock_addr;
349:                }
350:
351:                Connection(Socket s, Address peer_addr) {
352:                    sock = s;
353:                    this .peer_addr = peer_addr;
354:                    try {
355:                        // out=new DataOutputStream(sock.getOutputStream());
356:                        // in=new DataInputStream(sock.getInputStream());
357:
358:                        // The change to buffered input and output stream yielded a 400% performance gain !
359:                        // bela Sept 7 2006
360:                        out = new DataOutputStream(new BufferedOutputStream(
361:                                sock.getOutputStream()));
362:                        in = new DataInputStream(new BufferedInputStream(sock
363:                                .getInputStream()));
364:                    } catch (Exception ex) {
365:                        if (log.isErrorEnabled())
366:                            log.error("exception is " + ex);
367:                    }
368:                }
369:
370:                boolean established() {
371:                    return receiverThread != null;
372:                }
373:
374:                void setPeerAddress(Address peer_addr) {
375:                    this .peer_addr = peer_addr;
376:                }
377:
378:                Address getPeerAddress() {
379:                    return peer_addr;
380:                }
381:
382:                void updateLastAccessed() {
383:                    last_access = System.currentTimeMillis();
384:                }
385:
386:                void init() {
387:                    is_running = true;
388:                    if (receiverThread == null || !receiverThread.isAlive()) {
389:                        // Roland Kurmann 4/7/2003, put in thread_group
390:                        receiverThread = new Thread(thread_group, this ,
391:                                "ConnectionTable.Connection.Receiver ["
392:                                        + getSockAddress() + "]");
393:                        receiverThread.setDaemon(true);
394:                        receiverThread.start();
395:                        if (log.isTraceEnabled())
396:                            log
397:                                    .trace("ConnectionTable.Connection.Receiver started");
398:                    }
399:
400:                }
401:
402:                void destroy() {
403:                    is_running = false;
404:                    closeSocket(); // should terminate handler as well
405:                    sender.stop();
406:                    Thread tmp = receiverThread;
407:                    receiverThread = null;
408:                    if (tmp != null) {
409:                        try {
410:                            tmp.interrupt();
411:                            tmp.join(MAX_JOIN_TIMEOUT);
412:                        } catch (InterruptedException e) {
413:                        }
414:                        if (tmp.isAlive()) {
415:                            if (log.isWarnEnabled())
416:                                log
417:                                        .warn("stopped receiver thread, but thread ("
418:                                                + tmp + ") is still alive !");
419:                        }
420:                    }
421:                }
422:
423:                /**
424:                 *
425:                 * @param data Guaranteed to be non null
426:                 * @param offset
427:                 * @param length
428:                 */
429:                void send(byte[] data, int offset, int length) {
430:                    if (!is_running) {
431:                        if (log.isWarnEnabled())
432:                            log
433:                                    .warn("Connection is not running, discarding message");
434:                        return;
435:                    }
436:                    if (use_send_queues) {
437:                        try {
438:                            // we need to copy the byte[] buffer here because the original buffer might get changed meanwhile
439:                            byte[] tmp = new byte[length];
440:                            System.arraycopy(data, offset, tmp, 0, length);
441:                            send_queue.add(tmp);
442:                            if (!sender.isRunning())
443:                                sender.start();
444:                        } catch (QueueClosedException e) {
445:                            log.error("failed adding message to send_queue", e);
446:                        }
447:                    } else
448:                        _send(data, offset, length);
449:                }
450:
451:                private void _send(byte[] data, int offset, int length) {
452:                    synchronized (send_mutex) {
453:                        try {
454:                            doSend(data, offset, length);
455:                            updateLastAccessed();
456:                        } catch (IOException io_ex) {
457:                            if (log.isWarnEnabled())
458:                                log
459:                                        .warn("peer closed connection, trying to re-send msg");
460:                            try {
461:                                doSend(data, offset, length);
462:                                updateLastAccessed();
463:                            } catch (IOException io_ex2) {
464:                                if (log.isErrorEnabled())
465:                                    log
466:                                            .error("2nd attempt to send data failed too");
467:                            } catch (Exception ex2) {
468:                                if (log.isErrorEnabled())
469:                                    log.error("exception is " + ex2);
470:                            }
471:                        } catch (InterruptedException iex) {
472:                        } catch (Throwable ex) {
473:                            if (log.isErrorEnabled())
474:                                log.error("exception is " + ex);
475:                        }
476:                    }
477:                }
478:
479:                void doSend(byte[] data, int offset, int length)
480:                        throws Exception {
481:                    try {
482:                        // we're using 'double-writes', sending the buffer to the destination in 2 pieces. this would
483:                        // ensure that, if the peer closed the connection while we were idle, we would get an exception.
484:                        // this won't happen if we use a single write (see Stevens, ch. 5.13).
485:                        if (out != null) {
486:                            out.writeInt(length); // write the length of the data buffer first
487:                            Util.doubleWrite(data, offset, length, out);
488:                            out.flush(); // may not be very efficient (but safe)
489:                        }
490:                    } catch (Exception ex) {
491:                        remove(peer_addr);
492:                        throw ex;
493:                    }
494:                }
495:
496:                /**
497:                 * Reads the peer's address. First a cookie has to be sent which has to match my own cookie, otherwise
498:                 * the connection will be refused
499:                 */
500:                Address readPeerAddress(Socket client_sock) throws Exception {
501:                    Address client_peer_addr = null;
502:                    byte[] input_cookie = new byte[cookie.length];
503:                    int client_port = client_sock != null ? client_sock
504:                            .getPort() : 0;
505:                    short version;
506:                    InetAddress client_addr = client_sock != null ? client_sock
507:                            .getInetAddress() : null;
508:
509:                    if (in != null) {
510:                        initCookie(input_cookie);
511:
512:                        // read the cookie first
513:                        in.read(input_cookie, 0, input_cookie.length);
514:                        if (!matchCookie(input_cookie))
515:                            throw new SocketException(
516:                                    "ConnectionTable.Connection.readPeerAddress(): cookie sent by "
517:                                            + client_peer_addr
518:                                            + " does not match own cookie; terminating connection");
519:                        // then read the version
520:                        version = in.readShort();
521:
522:                        if (Version.isBinaryCompatible(version) == false) {
523:                            if (log.isWarnEnabled())
524:                                log.warn(new StringBuffer("packet from ")
525:                                        .append(client_addr).append(':')
526:                                        .append(client_port).append(
527:                                                " has different version (")
528:                                        .append(version)
529:                                        .append(") from ours (").append(
530:                                                Version.version).append(
531:                                                "). This may cause problems"));
532:                        }
533:                        client_peer_addr = new IpAddress();
534:                        client_peer_addr.readFrom(in);
535:
536:                        updateLastAccessed();
537:                    }
538:                    return client_peer_addr;
539:                }
540:
541:                /**
542:                 * Send the cookie first, then the our port number. If the cookie doesn't match the receiver's cookie,
543:                 * the receiver will reject the connection and close it.
544:                 */
545:                void sendLocalAddress(Address local_addr) {
546:                    if (local_addr == null) {
547:                        if (log.isWarnEnabled())
548:                            log.warn("local_addr is null");
549:                        return;
550:                    }
551:                    if (out != null) {
552:                        try {
553:                            // write the cookie
554:                            out.write(cookie, 0, cookie.length);
555:
556:                            // write the version
557:                            out.writeShort(Version.version);
558:                            local_addr.writeTo(out);
559:                            out.flush(); // needed ?
560:                            updateLastAccessed();
561:                        } catch (Throwable t) {
562:                            if (log.isErrorEnabled())
563:                                log.error("exception is " + t);
564:                        }
565:                    }
566:                }
567:
568:                void initCookie(byte[] c) {
569:                    if (c != null)
570:                        for (int i = 0; i < c.length; i++)
571:                            c[i] = 0;
572:                }
573:
574:                boolean matchCookie(byte[] input) {
575:                    if (input == null || input.length < cookie.length)
576:                        return false;
577:                    for (int i = 0; i < cookie.length; i++)
578:                        if (cookie[i] != input[i])
579:                            return false;
580:                    return true;
581:                }
582:
583:                String printCookie(byte[] c) {
584:                    if (c == null)
585:                        return "";
586:                    return new String(c);
587:                }
588:
589:                public void run() {
590:                    byte[] buf = new byte[256]; // start with 256, increase as we go
591:                    int len = 0;
592:
593:                    while (receiverThread != null
594:                            && receiverThread.equals(Thread.currentThread())
595:                            && is_running) {
596:                        try {
597:                            if (in == null) {
598:                                if (log.isErrorEnabled())
599:                                    log.error("input stream is null !");
600:                                break;
601:                            }
602:                            len = in.readInt();
603:                            if (len > buf.length)
604:                                buf = new byte[len];
605:                            in.readFully(buf, 0, len);
606:                            updateLastAccessed();
607:                            receive(peer_addr, buf, 0, len); // calls receiver.receive(msg)
608:                        } catch (OutOfMemoryError mem_ex) {
609:                            if (log.isWarnEnabled())
610:                                log
611:                                        .warn("dropped invalid message, closing connection");
612:                            break; // continue;
613:                        } catch (EOFException eof_ex) { // peer closed connection
614:                            if (log.isTraceEnabled())
615:                                log.trace("exception is " + eof_ex);
616:                            notifyConnectionClosed(peer_addr);
617:                            break;
618:                        } catch (IOException io_ex) {
619:                            if (log.isTraceEnabled())
620:                                log.trace("exception is " + io_ex);
621:                            notifyConnectionClosed(peer_addr);
622:                            break;
623:                        } catch (Throwable e) {
624:                            if (log.isWarnEnabled())
625:                                log.warn("exception is " + e);
626:                        }
627:                    }
628:                    if (log.isTraceEnabled())
629:                        log
630:                                .trace("ConnectionTable.Connection.Receiver terminated");
631:                    receiverThread = null;
632:                    closeSocket();
633:                    // remove(peer_addr);
634:                }
635:
636:                public String toString() {
637:                    StringBuffer ret = new StringBuffer();
638:                    InetAddress local = null, remote = null;
639:                    String local_str, remote_str;
640:
641:                    if (sock == null)
642:                        ret.append("<null socket>");
643:                    else {
644:                        //since the sock variable gets set to null we want to make
645:                        //make sure we make it through here without a nullpointer exception
646:                        Socket tmp_sock = sock;
647:                        local = tmp_sock.getLocalAddress();
648:                        remote = tmp_sock.getInetAddress();
649:                        local_str = local != null ? Util.shortName(local)
650:                                : "<null>";
651:                        remote_str = remote != null ? Util.shortName(remote)
652:                                : "<null>";
653:                        ret
654:                                .append('<'
655:                                        + local_str
656:                                        + ':'
657:                                        + tmp_sock.getLocalPort()
658:                                        + " --> "
659:                                        + remote_str
660:                                        + ':'
661:                                        + tmp_sock.getPort()
662:                                        + "> ("
663:                                        + ((System.currentTimeMillis() - last_access) / 1000)
664:                                        + " secs old)");
665:                        tmp_sock = null;
666:                    }
667:
668:                    return ret.toString();
669:                }
670:
671:                void closeSocket() {
672:                    Util.close(sock); // should actually close in/out (so we don't need to close them explicitly)
673:                    sock = null;
674:                    Util.close(out); // flushes data
675:                    // removed 4/22/2003 (request by Roland Kurmann)
676:                    // out=null;
677:                    Util.close(in);
678:                }
679:
680:                class Sender implements  Runnable {
681:                    Thread senderThread;
682:                    private boolean is_it_running = false;
683:
684:                    void start() {
685:                        if (senderThread == null || !senderThread.isAlive()) {
686:                            senderThread = new Thread(thread_group, this ,
687:                                    "ConnectionTable.Connection.Sender ["
688:                                            + getSockAddress() + "]");
689:                            senderThread.setDaemon(true);
690:                            senderThread.start();
691:                            is_it_running = true;
692:                            if (log.isTraceEnabled())
693:                                log
694:                                        .trace("ConnectionTable.Connection.Sender thread started");
695:                        }
696:                    }
697:
698:                    void stop() {
699:                        is_it_running = false;
700:                        if (send_queue != null)
701:                            send_queue.close(false);
702:                        if (senderThread != null) {
703:                            Thread tmp = senderThread;
704:                            senderThread = null;
705:                            tmp.interrupt();
706:                            try {
707:                                tmp.join(MAX_JOIN_TIMEOUT);
708:                            } catch (InterruptedException e) {
709:                            }
710:                            if (tmp.isAlive()) {
711:                                if (log.isWarnEnabled())
712:                                    log
713:                                            .warn("sender thread was interrupted, but is still alive: "
714:                                                    + tmp);
715:                            }
716:                        }
717:                    }
718:
719:                    boolean isRunning() {
720:                        return is_it_running && senderThread != null;
721:                    }
722:
723:                    public void run() {
724:                        byte[] data;
725:                        while (senderThread != null
726:                                && senderThread.equals(Thread.currentThread())
727:                                && is_it_running) {
728:                            try {
729:                                data = (byte[]) send_queue.remove();
730:                                if (data == null)
731:                                    continue;
732:                                _send(data, 0, data.length);
733:                            } catch (QueueClosedException e) {
734:                                break;
735:                            }
736:                        }
737:                        is_it_running = false;
738:                        if (log.isTraceEnabled())
739:                            log
740:                                    .trace("ConnectionTable.Connection.Sender thread terminated");
741:                    }
742:                }
743:
744:            }
745:
746:            class Reaper implements  Runnable {
747:                Thread t = null;
748:
749:                Reaper() {
750:                    ;
751:                }
752:
753:                public void start() {
754:                    if (conns.size() == 0)
755:                        return;
756:                    if (t != null && !t.isAlive())
757:                        t = null;
758:                    if (t == null) {
759:                        //RKU 7.4.2003, put in threadgroup
760:                        t = new Thread(thread_group, this ,
761:                                "ConnectionTable.ReaperThread");
762:                        t.setDaemon(true); // will allow us to terminate if all remaining threads are daemons
763:                        t.start();
764:                    }
765:                }
766:
767:                public void stop() {
768:                    Thread tmp = t;
769:                    if (t != null)
770:                        t = null;
771:                    if (tmp != null) {
772:                        tmp.interrupt(); // interrupts the sleep()
773:                        try {
774:                            tmp.join(MAX_JOIN_TIMEOUT);
775:                        } catch (InterruptedException e) {
776:                        }
777:                        if (tmp.isAlive()) {
778:                            if (log.isWarnEnabled())
779:                                log
780:                                        .warn("reaper thread was interrupted, but is still alive: "
781:                                                + tmp);
782:                        }
783:                    }
784:                }
785:
786:                public boolean isRunning() {
787:                    return t != null;
788:                }
789:
790:                public void run() {
791:                    Connection value;
792:                    Map.Entry entry;
793:                    long curr_time;
794:
795:                    if (log.isInfoEnabled())
796:                        log
797:                                .info("connection reaper thread was started. Number of connections="
798:                                        + conns.size()
799:                                        + ", reaper_interval="
800:                                        + reaper_interval
801:                                        + ", conn_expire_time="
802:                                        + conn_expire_time);
803:
804:                    while (conns.size() > 0 && t != null
805:                            && t.equals(Thread.currentThread())) {
806:                        Util.sleep(reaper_interval);
807:                        if (t == null || !Thread.currentThread().equals(t))
808:                            break;
809:                        synchronized (conns) {
810:                            curr_time = System.currentTimeMillis();
811:                            for (Iterator it = conns.entrySet().iterator(); it
812:                                    .hasNext();) {
813:                                entry = (Map.Entry) it.next();
814:                                value = (Connection) entry.getValue();
815:                                if (log.isInfoEnabled())
816:                                    log
817:                                            .info("connection is "
818:                                                    + ((curr_time - value.last_access) / 1000)
819:                                                    + " seconds old (curr-time="
820:                                                    + curr_time
821:                                                    + ", last_access="
822:                                                    + value.last_access + ')');
823:                                if (value.last_access + conn_expire_time < curr_time) {
824:                                    if (log.isInfoEnabled())
825:                                        log
826:                                                .info("connection "
827:                                                        + value
828:                                                        + " has been idle for too long (conn_expire_time="
829:                                                        + conn_expire_time
830:                                                        + "), will be removed");
831:                                    value.destroy();
832:                                    it.remove();
833:                                }
834:                            }
835:                        }
836:                    }
837:                    if (log.isInfoEnabled())
838:                        log.info("reaper terminated");
839:                    t = null;
840:                }
841:            }
842:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.