Source Code Cross Referenced for FD_SOCK.java in  » Net » JGroups-2.4.1-sp3 » org » jgroups » protocols » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » JGroups 2.4.1 sp3 » org.jgroups.protocols 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        // $Id: FD_SOCK.java,v 1.51.2.1 2007/04/27 08:03:51 belaban Exp $
0002:
0003:        package org.jgroups.protocols;
0004:
0005:        import org.jgroups.*;
0006:        import org.jgroups.stack.IpAddress;
0007:        import org.jgroups.stack.Protocol;
0008:        import org.jgroups.util.*;
0009:
0010:        import java.io.*;
0011:        import java.net.InetAddress;
0012:        import java.net.ServerSocket;
0013:        import java.net.Socket;
0014:        import java.net.UnknownHostException;
0015:        import java.util.*;
0016:        import java.util.List;
0017:
0018:        /**
0019:         * Failure detection protocol based on sockets. Failure detection is ring-based. Each member creates a
0020:         * server socket and announces its address together with the server socket's address in a multicast. A
0021:         * pinger thread will be started when the membership goes above 1 and will be stopped when it drops below
0022:         * 2. The pinger thread connects to its neighbor on the right and waits until the socket is closed. When
0023:         * the socket is closed by the monitored peer in an abnormal fashion (IOException), the neighbor will be
0024:         * suspected.<p> The main feature of this protocol is that no ping messages need to be exchanged between
0025:         * any 2 peers, and failure detection relies entirely on TCP sockets. The advantage is that no activity
0026:         * will take place between 2 peers as long as they are alive (i.e. have their server sockets open).
0027:         * The disadvantage is that hung servers or crashed routers will not cause sockets to be closed, therefore
0028:         * they won't be detected.
0029:         * The FD_SOCK protocol will work for groups where members are on different hosts<p>
0030:         * The costs involved are 2 additional threads: one that
0031:         * monitors the client side of the socket connection (to monitor a peer) and another one that manages the
0032:         * server socket. However, those threads will be idle as long as both peers are running.
0033:         * @author Bela Ban May 29 2001
0034:         */
0035:        public class FD_SOCK extends Protocol implements  Runnable {
0036:            long get_cache_timeout = 3000; // msecs to wait for the socket cache from the coordinator
0037:            static final long get_cache_retry_timeout = 500; // msecs to wait until we retry getting the cache from coord
0038:            long suspect_msg_interval = 5000; // (BroadcastTask): mcast SUSPECT every 5000 msecs
0039:            int num_tries = 3; // attempts coord is solicited for socket cache until we give up
0040:            final Vector members = new Vector(11); // list of group members (updated on VIEW_CHANGE)
0041:            boolean srv_sock_sent = false; // has own socket been broadcast yet ?
0042:            final Vector pingable_mbrs = new Vector(11); // mbrs from which we select ping_dest. may be subset of 'members'
0043:            final Promise get_cache_promise = new Promise(); // used for rendezvous on GET_CACHE and GET_CACHE_RSP
0044:            boolean got_cache_from_coord = false; // was cache already fetched ?
0045:            Address local_addr = null; // our own address
0046:            ServerSocket srv_sock = null; // server socket to which another member connects to monitor me
0047:
0048:            InetAddress bind_addr = null; // the NIC on which the ServerSocket should listen
0049:
0050:            String group_name = null; // the name of the group (set on CONNECT, nulled on DISCONNECT)
0051:
0052:            /** @deprecated Use {@link bind_addr} instead */
0053:            InetAddress srv_sock_bind_addr = null; // the NIC on which the ServerSocket should listen
0054:
0055:            private ServerSocketHandler srv_sock_handler = null; // accepts new connections on srv_sock
0056:            IpAddress srv_sock_addr = null; // pair of server_socket:port
0057:            Address ping_dest = null; // address of the member we monitor
0058:            Socket ping_sock = null; // socket to the member we monitor
0059:            InputStream ping_input = null; // input stream of the socket to the member we monitor
0060:            Thread pinger_thread = null; // listens on ping_sock, suspects member if socket is closed
0061:            final Object pinger_mutex = new Object();
0062:
0063:            final Hashtable cache = new Hashtable(11); // keys=Addresses, vals=IpAddresses (socket:port)
0064:
0065:            /** Start port for server socket (uses first available port starting at start_port). A value of 0 (default)
0066:             * picks a random port */
0067:            int start_port = 0;
0068:            final Promise ping_addr_promise = new Promise(); // to fetch the ping_addr for ping_dest
0069:            final Object sock_mutex = new Object(); // for access to ping_sock, ping_input
0070:            TimeScheduler timer = null;
0071:            private final BroadcastTask bcast_task = new BroadcastTask(); // to transmit SUSPECT message (until view change)
0072:            boolean regular_sock_close = false; // used by interruptPingerThread() when new ping_dest is computed
0073:            int num_suspect_events = 0;
0074:            private static final int INTERRUPT = 8;
0075:            private static final int NORMAL_TERMINATION = 9;
0076:            private static final int ABNORMAL_TERMINATION = -1;
0077:            private static final String name = "FD_SOCK";
0078:
0079:            BoundedList suspect_history = new BoundedList(20);
0080:
0081:            /** whether to use KEEP_ALIVE on the ping socket or not */
0082:            private boolean keep_alive = true;
0083:
0084:            private boolean running = false;
0085:
0086:            public String getName() {
0087:                return name;
0088:            }
0089:
0090:            public String getLocalAddress() {
0091:                return local_addr != null ? local_addr.toString() : "null";
0092:            }
0093:
0094:            public String getMembers() {
0095:                return members != null ? members.toString() : "null";
0096:            }
0097:
0098:            public String getPingableMembers() {
0099:                return pingable_mbrs != null ? pingable_mbrs.toString()
0100:                        : "null";
0101:            }
0102:
0103:            public String getPingDest() {
0104:                return ping_dest != null ? ping_dest.toString() : "null";
0105:            }
0106:
0107:            public int getNumSuspectEventsGenerated() {
0108:                return num_suspect_events;
0109:            }
0110:
0111:            public String printSuspectHistory() {
0112:                StringBuffer sb = new StringBuffer();
0113:                for (Enumeration en = suspect_history.elements(); en
0114:                        .hasMoreElements();) {
0115:                    sb.append(new Date()).append(": ").append(en.nextElement())
0116:                            .append("\n");
0117:                }
0118:                return sb.toString();
0119:            }
0120:
0121:            public boolean setProperties(Properties props) {
0122:                String str;
0123:
0124:                super .setProperties(props);
0125:                str = props.getProperty("get_cache_timeout");
0126:                if (str != null) {
0127:                    get_cache_timeout = Long.parseLong(str);
0128:                    props.remove("get_cache_timeout");
0129:                }
0130:
0131:                str = props.getProperty("suspect_msg_interval");
0132:                if (str != null) {
0133:                    suspect_msg_interval = Long.parseLong(str);
0134:                    props.remove("suspect_msg_interval");
0135:                }
0136:
0137:                str = props.getProperty("num_tries");
0138:                if (str != null) {
0139:                    num_tries = Integer.parseInt(str);
0140:                    props.remove("num_tries");
0141:                }
0142:
0143:                str = props.getProperty("start_port");
0144:                if (str != null) {
0145:                    start_port = Integer.parseInt(str);
0146:                    props.remove("start_port");
0147:                }
0148:
0149:                str = props.getProperty("keep_alive");
0150:                if (str != null) {
0151:                    keep_alive = new Boolean(str).booleanValue();
0152:                    props.remove("keep_alive");
0153:                }
0154:
0155:                str = props.getProperty("srv_sock_bind_addr");
0156:                if (str != null) {
0157:                    log
0158:                            .error("srv_sock_bind_addr is deprecated and will be ignored - use bind_addr instead");
0159:                    props.remove("srv_sock_bind_addr");
0160:                }
0161:
0162:                boolean ignore_systemprops = Util
0163:                        .isBindAddressPropertyIgnored();
0164:                str = Util.getProperty(new String[] { Global.BIND_ADDR,
0165:                        Global.BIND_ADDR_OLD }, props, "bind_addr",
0166:                        ignore_systemprops, null);
0167:                if (str != null) {
0168:                    try {
0169:                        bind_addr = InetAddress.getByName(str);
0170:                    } catch (UnknownHostException unknown) {
0171:                        log.error("(bind_addr): host " + str + " not known");
0172:                        return false;
0173:                    }
0174:                    props.remove("bind_addr");
0175:                }
0176:
0177:                if (props.size() > 0) {
0178:                    log.error("the following properties are not recognized: "
0179:                            + props);
0180:                    return false;
0181:                }
0182:                return true;
0183:            }
0184:
0185:            public void init() throws Exception {
0186:                srv_sock_handler = new ServerSocketHandler();
0187:                timer = stack != null ? stack.timer : null;
0188:                if (timer == null)
0189:                    throw new Exception("FD_SOCK.init(): timer == null");
0190:            }
0191:
0192:            public void start() throws Exception {
0193:                super .start();
0194:                running = true;
0195:            }
0196:
0197:            public void stop() {
0198:                running = false;
0199:                bcast_task.removeAll();
0200:                stopPingerThread();
0201:                stopServerSocket();
0202:            }
0203:
0204:            public void resetStats() {
0205:                super .resetStats();
0206:                num_suspect_events = 0;
0207:                suspect_history.removeAll();
0208:            }
0209:
0210:            public void up(Event evt) {
0211:                Message msg;
0212:                FdHeader hdr;
0213:
0214:                switch (evt.getType()) {
0215:
0216:                case Event.SET_LOCAL_ADDRESS:
0217:                    local_addr = (Address) evt.getArg();
0218:                    break;
0219:
0220:                case Event.MSG:
0221:                    msg = (Message) evt.getArg();
0222:                    hdr = (FdHeader) msg.removeHeader(name);
0223:                    if (hdr == null)
0224:                        break; // message did not originate from FD_SOCK layer, just pass up
0225:
0226:                    switch (hdr.type) {
0227:
0228:                    case FdHeader.SUSPECT:
0229:                        if (hdr.mbrs != null) {
0230:                            if (log.isDebugEnabled())
0231:                                log.debug("[SUSPECT] hdr=" + hdr);
0232:                            for (int i = 0; i < hdr.mbrs.size(); i++) {
0233:                                Address m = (Address) hdr.mbrs.elementAt(i);
0234:                                if (local_addr != null && m.equals(local_addr)) {
0235:                                    if (log.isWarnEnabled())
0236:                                        log
0237:                                                .warn("I was suspected by "
0238:                                                        + msg.getSrc()
0239:                                                        + "; ignoring the SUSPECT message");
0240:                                    continue;
0241:                                }
0242:                                passUp(new Event(Event.SUSPECT, hdr.mbrs
0243:                                        .elementAt(i)));
0244:                                passDown(new Event(Event.SUSPECT, hdr.mbrs
0245:                                        .elementAt(i)));
0246:                            }
0247:                        } else if (log.isWarnEnabled())
0248:                            log.warn("[SUSPECT]: hdr.mbrs == null");
0249:                        break;
0250:
0251:                    // If I have the sock for 'hdr.mbr', return it. Otherwise look it up in my cache and return it
0252:                    case FdHeader.WHO_HAS_SOCK:
0253:                        if (local_addr != null
0254:                                && local_addr.equals(msg.getSrc()))
0255:                            return; // don't reply to WHO_HAS bcasts sent by me !
0256:
0257:                        if (hdr.mbr == null) {
0258:                            if (log.isErrorEnabled())
0259:                                log.error("hdr.mbr is null");
0260:                            return;
0261:                        }
0262:
0263:                        if (log.isTraceEnabled())
0264:                            log.trace("who-has-sock " + hdr.mbr);
0265:
0266:                        // 1. Try my own address, maybe it's me whose socket is wanted
0267:                        if (local_addr != null && local_addr.equals(hdr.mbr)
0268:                                && srv_sock_addr != null) {
0269:                            sendIHaveSockMessage(msg.getSrc(), local_addr,
0270:                                    srv_sock_addr); // unicast message to msg.getSrc()
0271:                            return;
0272:                        }
0273:
0274:                        // 2. If I don't have it, maybe it is in the cache
0275:                        if (cache.containsKey(hdr.mbr))
0276:                            sendIHaveSockMessage(msg.getSrc(), hdr.mbr,
0277:                                    (IpAddress) cache.get(hdr.mbr)); // ucast msg
0278:                        break;
0279:
0280:                    // Update the cache with the addr:sock_addr entry (if on the same host)
0281:                    case FdHeader.I_HAVE_SOCK:
0282:                        if (hdr.mbr == null || hdr.sock_addr == null) {
0283:                            if (log.isErrorEnabled())
0284:                                log
0285:                                        .error("[I_HAVE_SOCK]: hdr.mbr is null or hdr.sock_addr == null");
0286:                            return;
0287:                        }
0288:
0289:                        // if(!cache.containsKey(hdr.mbr))
0290:                        cache.put(hdr.mbr, hdr.sock_addr); // update the cache
0291:                        if (log.isTraceEnabled())
0292:                            log.trace("i-have-sock: " + hdr.mbr + " --> "
0293:                                    + hdr.sock_addr + " (cache is " + cache
0294:                                    + ')');
0295:
0296:                        if (ping_dest != null && hdr.mbr.equals(ping_dest))
0297:                            ping_addr_promise.setResult(hdr.sock_addr);
0298:                        break;
0299:
0300:                    // Return the cache to the sender of this message
0301:                    case FdHeader.GET_CACHE:
0302:                        if (hdr.mbr == null) {
0303:                            if (log.isErrorEnabled())
0304:                                log.error("(GET_CACHE): hdr.mbr == null");
0305:                            return;
0306:                        }
0307:                        hdr = new FdHeader(FdHeader.GET_CACHE_RSP);
0308:                        hdr.cachedAddrs = (Hashtable) cache.clone();
0309:                        msg = new Message(hdr.mbr, null, null);
0310:                        msg.putHeader(name, hdr);
0311:                        passDown(new Event(Event.MSG, msg));
0312:                        break;
0313:
0314:                    case FdHeader.GET_CACHE_RSP:
0315:                        if (hdr.cachedAddrs == null) {
0316:                            if (log.isErrorEnabled())
0317:                                log.error("(GET_CACHE_RSP): cache is null");
0318:                            return;
0319:                        }
0320:                        get_cache_promise.setResult(hdr.cachedAddrs);
0321:                        break;
0322:                    }
0323:                    return;
0324:
0325:                case Event.CONFIG:
0326:                    if (bind_addr == null) {
0327:                        Map config = (Map) evt.getArg();
0328:                        bind_addr = (InetAddress) config.get("bind_addr");
0329:                    }
0330:                    break;
0331:                }
0332:
0333:                passUp(evt); // pass up to the layer above us
0334:            }
0335:
0336:            public void down(Event evt) {
0337:                Address mbr, tmp_ping_dest;
0338:                View v;
0339:
0340:                switch (evt.getType()) {
0341:
0342:                case Event.UNSUSPECT:
0343:                    bcast_task.removeSuspectedMember((Address) evt.getArg());
0344:                    break;
0345:
0346:                case Event.CONNECT:
0347:                    passDown(evt);
0348:                    group_name = (String) evt.getArg();
0349:                    srv_sock = Util.createServerSocket(bind_addr, start_port); // grab a random unused port above 10000
0350:                    srv_sock_addr = new IpAddress(bind_addr, srv_sock
0351:                            .getLocalPort());
0352:                    startServerSocket();
0353:                    break;
0354:
0355:                case Event.DISCONNECT:
0356:                    group_name = null;
0357:                    String tmp,
0358:                    prefix = Global.THREAD_PREFIX;
0359:                    int index;
0360:                    tmp = srv_sock_handler != null ? srv_sock_handler.getName()
0361:                            : null;
0362:                    if (tmp != null) {
0363:                        index = tmp.indexOf(prefix);
0364:                        if (index > -1) {
0365:                            tmp = tmp.substring(0, index);
0366:                            srv_sock_handler.setName(tmp);
0367:                        }
0368:                    }
0369:                    synchronized (pinger_mutex) {
0370:                        tmp = pinger_thread != null ? pinger_thread.getName()
0371:                                : null;
0372:                        if (tmp != null) {
0373:                            index = tmp.indexOf(prefix);
0374:                            if (index > -1) {
0375:                                tmp = tmp.substring(0, index);
0376:                                pinger_thread.setName(tmp);
0377:                            }
0378:                        }
0379:                    }
0380:
0381:                    stopServerSocket();
0382:
0383:                    break;
0384:
0385:                case Event.VIEW_CHANGE:
0386:                    v = (View) evt.getArg();
0387:                    Vector new_mbrs = v.getMembers();
0388:                    passDown(evt);
0389:
0390:                    synchronized (this ) {
0391:                        members.removeAllElements();
0392:                        members.addAll(new_mbrs);
0393:                        bcast_task.adjustSuspectedMembers(members);
0394:                        pingable_mbrs.removeAllElements();
0395:                        pingable_mbrs.addAll(members);
0396:                        if (log.isDebugEnabled())
0397:                            log.debug("VIEW_CHANGE received: " + members);
0398:
0399:                        // 1. Get the addr:pid cache from the coordinator (only if not already fetched)
0400:                        if (!got_cache_from_coord) {
0401:                            getCacheFromCoordinator();
0402:                            got_cache_from_coord = true;
0403:                        }
0404:
0405:                        // 2. Broadcast my own addr:sock to all members so they can update their cache
0406:                        if (!srv_sock_sent) {
0407:                            if (srv_sock_addr != null) {
0408:                                sendIHaveSockMessage(null, // send to all members
0409:                                        local_addr, srv_sock_addr);
0410:                                srv_sock_sent = true;
0411:                            } else if (log.isWarnEnabled())
0412:                                log
0413:                                        .warn("(VIEW_CHANGE): srv_sock_addr == null");
0414:                        }
0415:
0416:                        // 3. Remove all entries in 'cache' which are not in the new membership
0417:                        for (Enumeration e = cache.keys(); e.hasMoreElements();) {
0418:                            mbr = (Address) e.nextElement();
0419:                            if (!members.contains(mbr))
0420:                                cache.remove(mbr);
0421:                        }
0422:
0423:                        if (members.size() > 1) {
0424:                            synchronized (pinger_mutex) {
0425:                                if (pinger_thread != null
0426:                                        && pinger_thread.isAlive()) {
0427:                                    tmp_ping_dest = determinePingDest();
0428:                                    if (ping_dest != null
0429:                                            && tmp_ping_dest != null
0430:                                            && !ping_dest.equals(tmp_ping_dest)) {
0431:                                        interruptPingerThread(); // allows the thread to use the new socket
0432:                                    }
0433:                                } else
0434:                                    startPingerThread(); // only starts if not yet running
0435:                            }
0436:                        } else {
0437:                            ping_dest = null;
0438:                            stopPingerThread();
0439:                        }
0440:                    }
0441:                    break;
0442:
0443:                default:
0444:                    passDown(evt);
0445:                    break;
0446:                }
0447:            }
0448:
0449:            /**
0450:             * Runs as long as there are 2 members and more. Determines the member to be monitored and fetches its
0451:             * server socket address (if n/a, sends a message to obtain it). The creates a client socket and listens on
0452:             * it until the connection breaks. If it breaks, emits a SUSPECT message. It the connection is closed regularly,
0453:             * nothing happens. In both cases, a new member to be monitored will be chosen and monitoring continues (unless
0454:             * there are fewer than 2 members).
0455:             */
0456:            public void run() {
0457:                Address tmp_ping_dest;
0458:                IpAddress ping_addr;
0459:                int max_fetch_tries = 10; // number of times a socket address is to be requested before giving up
0460:
0461:                if (log.isTraceEnabled())
0462:                    log.trace("pinger_thread started"); // +++ remove
0463:                while (pinger_thread != null
0464:                        && Thread.currentThread().equals(pinger_thread)
0465:                        && running) {
0466:                    tmp_ping_dest = determinePingDest(); // gets the neighbor to our right
0467:                    if (log.isDebugEnabled())
0468:                        log.debug("determinePingDest()=" + tmp_ping_dest
0469:                                + ", pingable_mbrs=" + pingable_mbrs);
0470:                    if (tmp_ping_dest == null) {
0471:                        ping_dest = null;
0472:                        synchronized (pinger_mutex) {
0473:                            pinger_thread = null;
0474:                        }
0475:                        break;
0476:                    }
0477:                    ping_dest = tmp_ping_dest;
0478:                    ping_addr = fetchPingAddress(ping_dest);
0479:                    if (ping_addr == null) {
0480:                        if (!running)
0481:                            break;
0482:                        if (log.isErrorEnabled())
0483:                            log.error("socket address for " + ping_dest
0484:                                    + " could not be fetched, retrying");
0485:                        if (--max_fetch_tries <= 0)
0486:                            break;
0487:                        Util.sleep(2000);
0488:                        continue;
0489:                    }
0490:
0491:                    if (!setupPingSocket(ping_addr)) {
0492:                        // covers use cases #7 and #8 in ManualTests.txt
0493:                        if (log.isDebugEnabled())
0494:                            log.debug("could not create socket to " + ping_dest
0495:                                    + "; suspecting " + ping_dest);
0496:                        broadcastSuspectMessage(ping_dest);
0497:                        pingable_mbrs.removeElement(ping_dest);
0498:                        continue;
0499:                    }
0500:
0501:                    if (log.isDebugEnabled())
0502:                        log.debug("ping_dest=" + ping_dest + ", ping_sock="
0503:                                + ping_sock + ", cache=" + cache);
0504:
0505:                    // at this point ping_input must be non-null, otherwise setupPingSocket() would have thrown an exception
0506:                    try {
0507:                        if (ping_input != null) {
0508:                            int c = ping_input.read();
0509:                            switch (c) {
0510:                            case NORMAL_TERMINATION:
0511:                                if (log.isDebugEnabled())
0512:                                    log.debug("peer closed socket normally");
0513:                                synchronized (pinger_mutex) {
0514:                                    pinger_thread = null;
0515:                                }
0516:                                break;
0517:                            case ABNORMAL_TERMINATION:
0518:                                handleSocketClose(null);
0519:                                break;
0520:                            default:
0521:                                break;
0522:                            }
0523:                        }
0524:                    } catch (IOException ex) { // we got here when the peer closed the socket --> suspect peer and then continue
0525:                        handleSocketClose(ex);
0526:                    } catch (Throwable catch_all_the_rest) {
0527:                        log.error("exception", catch_all_the_rest);
0528:                    }
0529:                }
0530:                if (log.isDebugEnabled())
0531:                    log.debug("pinger thread terminated");
0532:                synchronized (pinger_mutex) {
0533:                    pinger_thread = null;
0534:                }
0535:            }
0536:
0537:            /* ----------------------------------- Private Methods -------------------------------------- */
0538:
0539:            void handleSocketClose(Exception ex) {
0540:                teardownPingSocket(); // make sure we have no leftovers
0541:                if (!regular_sock_close) { // only suspect if socket was not closed regularly (by interruptPingerThread())
0542:                    if (log.isDebugEnabled())
0543:                        log
0544:                                .debug("peer "
0545:                                        + ping_dest
0546:                                        + " closed socket ("
0547:                                        + (ex != null ? ex.getClass().getName()
0548:                                                : "eof") + ')');
0549:                    broadcastSuspectMessage(ping_dest);
0550:                    pingable_mbrs.removeElement(ping_dest);
0551:                } else {
0552:                    if (log.isDebugEnabled())
0553:                        log.debug("socket to " + ping_dest + " was reset");
0554:                    regular_sock_close = false;
0555:                }
0556:            }
0557:
0558:            /**
0559:             * Does *not* need to be synchronized on pinger_mutex because the caller (down()) already has the mutex acquired
0560:             */
0561:            void startPingerThread() {
0562:                running = true;
0563:                if (pinger_thread == null) {
0564:                    pinger_thread = new Thread(Util.getGlobalThreadGroup(),
0565:                            this , "FD_SOCK Ping thread");
0566:                    pinger_thread.setDaemon(true);
0567:                    pinger_thread.start();
0568:                    if (group_name != null) {
0569:                        String tmp, prefix = Global.THREAD_PREFIX;
0570:                        tmp = pinger_thread.getName();
0571:                        if (tmp != null && tmp.indexOf(prefix) == -1) {
0572:                            tmp += prefix + group_name + ")";
0573:                            pinger_thread.setName(tmp);
0574:                        }
0575:                    }
0576:                }
0577:            }
0578:
0579:            void stopPingerThread() {
0580:                running = false;
0581:                synchronized (pinger_mutex) {
0582:                    if (pinger_thread != null && pinger_thread.isAlive()) {
0583:                        regular_sock_close = true;
0584:                        pinger_thread = null;
0585:                        sendPingTermination(); // PATCH by Bruce Schuchardt (http://jira.jboss.com/jira/browse/JGRP-246)
0586:                        teardownPingSocket();
0587:                        ping_addr_promise.setResult(null);
0588:                    }
0589:                }
0590:            }
0591:
0592:            // PATCH: send something so the connection handler can exit
0593:            synchronized void sendPingTermination() {
0594:                sendPingSignal(NORMAL_TERMINATION);
0595:            }
0596:
0597:            void sendPingInterrupt() {
0598:                sendPingSignal(INTERRUPT);
0599:            }
0600:
0601:            synchronized void sendPingSignal(int signal) {
0602:                if (ping_sock != null) {
0603:                    try {
0604:                        OutputStream out = ping_sock.getOutputStream();
0605:                        if (out != null) {
0606:                            out.write(signal);
0607:                            out.flush();
0608:                        }
0609:                    } catch (Throwable t) {
0610:                        if (log.isTraceEnabled())
0611:                            log.trace("problem sending signal "
0612:                                    + signalToString(signal), t);
0613:                    }
0614:                }
0615:            }
0616:
0617:            /**
0618:             * Interrupts the pinger thread. The Thread.interrupt() method doesn't seem to work under Linux with JDK 1.3.1
0619:             * (JDK 1.2.2 had no problems here), therefore we close the socket (setSoLinger has to be set !) if we are
0620:             * running under Linux. This should be tested under Windows. (Solaris 8 and JDK 1.3.1 definitely works).<p>
0621:             * Oct 29 2001 (bela): completely removed Thread.interrupt(), but used socket close on all OSs. This makes this
0622:             * code portable and we don't have to check for OSs.<p/>
0623:             * Does *not* need to be synchronized on pinger_mutex because the caller (down()) already has the mutex acquired
0624:             * @see org.jgroups.tests.InterruptTest to determine whether Thread.interrupt() works for InputStream.read().
0625:             */
0626:            void interruptPingerThread() {
0627:                if (pinger_thread != null && pinger_thread.isAlive()) {
0628:                    regular_sock_close = true;
0629:                    sendPingInterrupt(); // PATCH by Bruce Schuchardt (http://jira.jboss.com/jira/browse/JGRP-246)
0630:                    teardownPingSocket(); // will wake up the pinger thread. less elegant than Thread.interrupt(), but does the job
0631:                }
0632:            }
0633:
0634:            void startServerSocket() {
0635:                if (srv_sock_handler != null) {
0636:                    srv_sock_handler.start(); // won't start if already running
0637:                    if (group_name != null) {
0638:                        String tmp, prefix = Global.THREAD_PREFIX;
0639:                        tmp = srv_sock_handler.getName();
0640:                        if (tmp != null && tmp.indexOf(prefix) == -1) {
0641:                            tmp += prefix + group_name + ")";
0642:                            srv_sock_handler.setName(tmp);
0643:                        }
0644:                    }
0645:                }
0646:            }
0647:
0648:            void stopServerSocket() {
0649:                if (srv_sock_handler != null)
0650:                    srv_sock_handler.stop();
0651:            }
0652:
0653:            /**
0654:             * Creates a socket to <code>dest</code>, and assigns it to ping_sock. Also assigns ping_input
0655:             */
0656:            boolean setupPingSocket(IpAddress dest) {
0657:                synchronized (sock_mutex) {
0658:                    if (dest == null) {
0659:                        if (log.isErrorEnabled())
0660:                            log.error("destination address is null");
0661:                        return false;
0662:                    }
0663:                    try {
0664:                        ping_sock = new Socket(dest.getIpAddress(), dest
0665:                                .getPort());
0666:                        ping_sock.setSoLinger(true, 1);
0667:                        ping_sock.setKeepAlive(keep_alive);
0668:                        ping_input = ping_sock.getInputStream();
0669:                        return true;
0670:                    } catch (Throwable ex) {
0671:                        return false;
0672:                    }
0673:                }
0674:            }
0675:
0676:            void teardownPingSocket() {
0677:                synchronized (sock_mutex) {
0678:                    if (ping_sock != null) {
0679:                        try {
0680:                            ping_sock.shutdownInput();
0681:                            ping_sock.close();
0682:                        } catch (Exception ex) {
0683:                        }
0684:                        ping_sock = null;
0685:                    }
0686:                    Util.close(ping_input);
0687:                    ping_input = null;
0688:                }
0689:            }
0690:
0691:            /**
0692:             * Determines coordinator C. If C is null and we are the first member, return. Else loop: send GET_CACHE message
0693:             * to coordinator and wait for GET_CACHE_RSP response. Loop until valid response has been received.
0694:             */
0695:            void getCacheFromCoordinator() {
0696:                Address coord;
0697:                int attempts = num_tries;
0698:                Message msg;
0699:                FdHeader hdr;
0700:                Hashtable result;
0701:
0702:                get_cache_promise.reset();
0703:                while (attempts > 0) {
0704:                    if ((coord = determineCoordinator()) != null) {
0705:                        if (coord.equals(local_addr)) { // we are the first member --> empty cache
0706:                            if (log.isDebugEnabled())
0707:                                log.debug("first member; cache is empty");
0708:                            return;
0709:                        }
0710:                        hdr = new FdHeader(FdHeader.GET_CACHE);
0711:                        hdr.mbr = local_addr;
0712:                        msg = new Message(coord, null, null);
0713:                        msg.putHeader(name, hdr);
0714:                        passDown(new Event(Event.MSG, msg));
0715:                        result = (Hashtable) get_cache_promise
0716:                                .getResult(get_cache_timeout);
0717:                        if (result != null) {
0718:                            cache.putAll(result); // replace all entries (there should be none !) in cache with the new values
0719:                            if (log.isTraceEnabled())
0720:                                log.trace("got cache from " + coord
0721:                                        + ": cache is " + cache);
0722:                            return;
0723:                        } else {
0724:                            if (log.isErrorEnabled())
0725:                                log.error("received null cache; retrying");
0726:                        }
0727:                    }
0728:
0729:                    Util.sleep(get_cache_retry_timeout);
0730:                    --attempts;
0731:                }
0732:            }
0733:
0734:            /**
0735:             * Sends a SUSPECT message to all group members. Only the coordinator (or the next member in line if the coord
0736:             * itself is suspected) will react to this message by installing a new view. To overcome the unreliability
0737:             * of the SUSPECT message (it may be lost because we are not above any retransmission layer), the following scheme
0738:             * is used: after sending the SUSPECT message, it is also added to the broadcast task, which will periodically
0739:             * re-send the SUSPECT until a view is received in which the suspected process is not a member anymore. The reason is
0740:             * that - at one point - either the coordinator or another participant taking over for a crashed coordinator, will
0741:             * react to the SUSPECT message and issue a new view, at which point the broadcast task stops.
0742:             */
0743:            void broadcastSuspectMessage(Address suspected_mbr) {
0744:                Message suspect_msg;
0745:                FdHeader hdr;
0746:
0747:                if (suspected_mbr == null)
0748:                    return;
0749:
0750:                if (log.isTraceEnabled())
0751:                    log.trace("suspecting " + suspected_mbr
0752:                            + " (own address is " + local_addr + ')');
0753:
0754:                // 1. Send a SUSPECT message right away; the broadcast task will take some time to send it (sleeps first)
0755:                hdr = new FdHeader(FdHeader.SUSPECT);
0756:                hdr.mbrs = new Vector(1);
0757:                hdr.mbrs.addElement(suspected_mbr);
0758:                suspect_msg = new Message();
0759:                suspect_msg.putHeader(name, hdr);
0760:                passDown(new Event(Event.MSG, suspect_msg));
0761:
0762:                // 2. Add to broadcast task and start latter (if not yet running). The task will end when
0763:                //    suspected members are removed from the membership
0764:                bcast_task.addSuspectedMember(suspected_mbr);
0765:                if (stats) {
0766:                    num_suspect_events++;
0767:                    suspect_history.add(suspected_mbr);
0768:                }
0769:            }
0770:
0771:            void broadcastWhoHasSockMessage(Address mbr) {
0772:                Message msg;
0773:                FdHeader hdr;
0774:
0775:                if (local_addr != null && mbr != null)
0776:                    if (log.isDebugEnabled())
0777:                        log.debug("[" + local_addr + "]: who-has " + mbr);
0778:
0779:                msg = new Message(); // bcast msg
0780:                hdr = new FdHeader(FdHeader.WHO_HAS_SOCK);
0781:                hdr.mbr = mbr;
0782:                msg.putHeader(name, hdr);
0783:                passDown(new Event(Event.MSG, msg));
0784:            }
0785:
0786:            /**
0787:             Sends or broadcasts a I_HAVE_SOCK response. If 'dst' is null, the reponse will be broadcast, otherwise
0788:             it will be unicast back to the requester
0789:             */
0790:            void sendIHaveSockMessage(Address dst, Address mbr, IpAddress addr) {
0791:                Message msg = new Message(dst, null, null);
0792:                FdHeader hdr = new FdHeader(FdHeader.I_HAVE_SOCK);
0793:                hdr.mbr = mbr;
0794:                hdr.sock_addr = addr;
0795:                msg.putHeader(name, hdr);
0796:                passDown(new Event(Event.MSG, msg));
0797:            }
0798:
0799:            /**
0800:             Attempts to obtain the ping_addr first from the cache, then by unicasting q request to <code>mbr</code>,
0801:             then by multicasting a request to all members.
0802:             */
0803:            private IpAddress fetchPingAddress(Address mbr) {
0804:                IpAddress ret;
0805:                Message ping_addr_req;
0806:                FdHeader hdr;
0807:
0808:                if (mbr == null) {
0809:                    if (log.isErrorEnabled())
0810:                        log.error("mbr == null");
0811:                    return null;
0812:                }
0813:                // 1. Try to get from cache. Add a little delay so that joining mbrs can send their socket address before
0814:                //    we ask them to do so
0815:                ret = (IpAddress) cache.get(mbr);
0816:                if (ret != null) {
0817:                    return ret;
0818:                }
0819:
0820:                Util.sleep(300);
0821:                if ((ret = (IpAddress) cache.get(mbr)) != null)
0822:                    return ret;
0823:
0824:                // 2. Try to get from mbr
0825:                ping_addr_promise.reset();
0826:                ping_addr_req = new Message(mbr, null, null); // unicast
0827:                hdr = new FdHeader(FdHeader.WHO_HAS_SOCK);
0828:                hdr.mbr = mbr;
0829:                ping_addr_req.putHeader(name, hdr);
0830:                passDown(new Event(Event.MSG, ping_addr_req));
0831:                if (!running)
0832:                    return null;
0833:                ret = (IpAddress) ping_addr_promise.getResult(3000);
0834:                if (ret != null) {
0835:                    return ret;
0836:                }
0837:
0838:                // 3. Try to get from all members
0839:                ping_addr_req = new Message(null); // multicast
0840:                hdr = new FdHeader(FdHeader.WHO_HAS_SOCK);
0841:                hdr.mbr = mbr;
0842:                ping_addr_req.putHeader(name, hdr);
0843:                passDown(new Event(Event.MSG, ping_addr_req));
0844:                ret = (IpAddress) ping_addr_promise.getResult(3000);
0845:                return ret;
0846:            }
0847:
0848:            Address determinePingDest() {
0849:                Address tmp;
0850:
0851:                if (pingable_mbrs == null || pingable_mbrs.size() < 2
0852:                        || local_addr == null)
0853:                    return null;
0854:                for (int i = 0; i < pingable_mbrs.size(); i++) {
0855:                    tmp = (Address) pingable_mbrs.elementAt(i);
0856:                    if (local_addr.equals(tmp)) {
0857:                        if (i + 1 >= pingable_mbrs.size())
0858:                            return (Address) pingable_mbrs.elementAt(0);
0859:                        else
0860:                            return (Address) pingable_mbrs.elementAt(i + 1);
0861:                    }
0862:                }
0863:                return null;
0864:            }
0865:
0866:            Address determineCoordinator() {
0867:                return members.size() > 0 ? (Address) members.elementAt(0)
0868:                        : null;
0869:            }
0870:
0871:            static String signalToString(int signal) {
0872:                switch (signal) {
0873:                case NORMAL_TERMINATION:
0874:                    return "NORMAL_TERMINATION";
0875:                case ABNORMAL_TERMINATION:
0876:                    return "ABNORMAL_TERMINATION";
0877:                case INTERRUPT:
0878:                    return "INTERRUPT";
0879:                default:
0880:                    return "n/a";
0881:                }
0882:            }
0883:
0884:            /* ------------------------------- End of Private Methods ------------------------------------ */
0885:
0886:            public static class FdHeader extends Header implements  Streamable {
0887:                public static final byte SUSPECT = 10;
0888:                public static final byte WHO_HAS_SOCK = 11;
0889:                public static final byte I_HAVE_SOCK = 12;
0890:                public static final byte GET_CACHE = 13; // sent by joining member to coordinator
0891:                public static final byte GET_CACHE_RSP = 14; // sent by coordinator to joining member in response to GET_CACHE
0892:
0893:                byte type = SUSPECT;
0894:                Address mbr = null; // set on WHO_HAS_SOCK (requested mbr), I_HAVE_SOCK
0895:                IpAddress sock_addr; // set on I_HAVE_SOCK
0896:
0897:                // Hashtable<Address,IpAddress>
0898:                Hashtable cachedAddrs = null; // set on GET_CACHE_RSP
0899:                Vector mbrs = null; // set on SUSPECT (list of suspected members)
0900:
0901:                public FdHeader() {
0902:                } // used for externalization
0903:
0904:                public FdHeader(byte type) {
0905:                    this .type = type;
0906:                }
0907:
0908:                public FdHeader(byte type, Address mbr) {
0909:                    this .type = type;
0910:                    this .mbr = mbr;
0911:                }
0912:
0913:                public FdHeader(byte type, Vector mbrs) {
0914:                    this .type = type;
0915:                    this .mbrs = mbrs;
0916:                }
0917:
0918:                public FdHeader(byte type, Hashtable cachedAddrs) {
0919:                    this .type = type;
0920:                    this .cachedAddrs = cachedAddrs;
0921:                }
0922:
0923:                public String toString() {
0924:                    StringBuffer sb = new StringBuffer();
0925:                    sb.append(type2String(type));
0926:                    if (mbr != null)
0927:                        sb.append(", mbr=").append(mbr);
0928:                    if (sock_addr != null)
0929:                        sb.append(", sock_addr=").append(sock_addr);
0930:                    if (cachedAddrs != null)
0931:                        sb.append(", cache=").append(cachedAddrs);
0932:                    if (mbrs != null)
0933:                        sb.append(", mbrs=").append(mbrs);
0934:                    return sb.toString();
0935:                }
0936:
0937:                public static String type2String(byte type) {
0938:                    switch (type) {
0939:                    case SUSPECT:
0940:                        return "SUSPECT";
0941:                    case WHO_HAS_SOCK:
0942:                        return "WHO_HAS_SOCK";
0943:                    case I_HAVE_SOCK:
0944:                        return "I_HAVE_SOCK";
0945:                    case GET_CACHE:
0946:                        return "GET_CACHE";
0947:                    case GET_CACHE_RSP:
0948:                        return "GET_CACHE_RSP";
0949:                    default:
0950:                        return "unknown type (" + type + ')';
0951:                    }
0952:                }
0953:
0954:                public void writeExternal(ObjectOutput out) throws IOException {
0955:                    out.writeByte(type);
0956:                    out.writeObject(mbr);
0957:                    out.writeObject(sock_addr);
0958:                    out.writeObject(cachedAddrs);
0959:                    out.writeObject(mbrs);
0960:                }
0961:
0962:                public void readExternal(ObjectInput in) throws IOException,
0963:                        ClassNotFoundException {
0964:                    type = in.readByte();
0965:                    mbr = (Address) in.readObject();
0966:                    sock_addr = (IpAddress) in.readObject();
0967:                    cachedAddrs = (Hashtable) in.readObject();
0968:                    mbrs = (Vector) in.readObject();
0969:                }
0970:
0971:                public long size() {
0972:                    long retval = Global.BYTE_SIZE; // type
0973:                    retval += Util.size(mbr);
0974:                    retval += Util.size(sock_addr);
0975:
0976:                    retval += Global.INT_SIZE; // cachedAddrs size
0977:                    Map.Entry entry;
0978:                    Address key;
0979:                    IpAddress val;
0980:                    if (cachedAddrs != null) {
0981:                        for (Iterator it = cachedAddrs.entrySet().iterator(); it
0982:                                .hasNext();) {
0983:                            entry = (Map.Entry) it.next();
0984:                            if ((key = (Address) entry.getKey()) != null)
0985:                                retval += Util.size(key);
0986:                            retval += Global.BYTE_SIZE; // presence for val
0987:                            if ((val = (IpAddress) entry.getValue()) != null)
0988:                                retval += val.size();
0989:                        }
0990:                    }
0991:
0992:                    retval += Global.INT_SIZE; // mbrs size
0993:                    if (mbrs != null) {
0994:                        for (int i = 0; i < mbrs.size(); i++) {
0995:                            retval += Util.size((Address) mbrs.elementAt(i));
0996:                        }
0997:                    }
0998:
0999:                    return retval;
1000:                }
1001:
1002:                public void writeTo(DataOutputStream out) throws IOException {
1003:                    int size;
1004:                    out.writeByte(type);
1005:                    Util.writeAddress(mbr, out);
1006:                    Util.writeStreamable(sock_addr, out);
1007:                    size = cachedAddrs != null ? cachedAddrs.size() : 0;
1008:                    out.writeInt(size);
1009:                    if (size > 0) {
1010:                        for (Iterator it = cachedAddrs.entrySet().iterator(); it
1011:                                .hasNext();) {
1012:                            Map.Entry entry = (Map.Entry) it.next();
1013:                            Address key = (Address) entry.getKey();
1014:                            IpAddress val = (IpAddress) entry.getValue();
1015:                            Util.writeAddress(key, out);
1016:                            Util.writeStreamable(val, out);
1017:                        }
1018:                    }
1019:                    size = mbrs != null ? mbrs.size() : 0;
1020:                    out.writeInt(size);
1021:                    if (size > 0) {
1022:                        for (Iterator it = mbrs.iterator(); it.hasNext();) {
1023:                            Address address = (Address) it.next();
1024:                            Util.writeAddress(address, out);
1025:                        }
1026:                    }
1027:                }
1028:
1029:                public void readFrom(DataInputStream in) throws IOException,
1030:                        IllegalAccessException, InstantiationException {
1031:                    int size;
1032:                    type = in.readByte();
1033:                    mbr = Util.readAddress(in);
1034:                    sock_addr = (IpAddress) Util.readStreamable(
1035:                            IpAddress.class, in);
1036:                    size = in.readInt();
1037:                    if (size > 0) {
1038:                        if (cachedAddrs == null)
1039:                            cachedAddrs = new Hashtable();
1040:                        for (int i = 0; i < size; i++) {
1041:                            Address key = Util.readAddress(in);
1042:                            IpAddress val = (IpAddress) Util.readStreamable(
1043:                                    IpAddress.class, in);
1044:                            cachedAddrs.put(key, val);
1045:                        }
1046:                    }
1047:                    size = in.readInt();
1048:                    if (size > 0) {
1049:                        if (mbrs == null)
1050:                            mbrs = new Vector();
1051:                        for (int i = 0; i < size; i++) {
1052:                            Address addr = Util.readAddress(in);
1053:                            mbrs.add(addr);
1054:                        }
1055:                    }
1056:                }
1057:
1058:            }
1059:
1060:            /**
1061:             * Handles the server-side of a client-server socket connection. Waits until a client connects, and then loops
1062:             * until that client closes the connection. Note that there is no new thread spawned for the listening on the
1063:             * client socket, therefore there can only be 1 client connection at the same time. Subsequent clients attempting
1064:             * to create a connection will be blocked until the first client closes its connection. This should not be a problem
1065:             * as the ring nature of the FD_SOCK protocol always has only 1 client connect to its right-hand-side neighbor.
1066:             */
1067:            private class ServerSocketHandler implements  Runnable {
1068:                Thread acceptor = null;
1069:                /** List<ClientConnectionHandler> */
1070:                final List clients = new ArrayList();
1071:
1072:                String getName() {
1073:                    return acceptor != null ? acceptor.getName() : null;
1074:                }
1075:
1076:                void setName(String thread_name) {
1077:                    if (acceptor != null)
1078:                        acceptor.setName(thread_name);
1079:                }
1080:
1081:                ServerSocketHandler() {
1082:                    start();
1083:                }
1084:
1085:                final void start() {
1086:                    if (acceptor == null) {
1087:                        acceptor = new Thread(Util.getGlobalThreadGroup(),
1088:                                this , "ServerSocket acceptor thread");
1089:                        acceptor.setDaemon(true);
1090:                        acceptor.start();
1091:                    }
1092:                }
1093:
1094:                final void stop() {
1095:                    if (acceptor != null && acceptor.isAlive()) {
1096:                        try {
1097:                            srv_sock.close(); // this will terminate thread, peer will receive SocketException (socket close)
1098:                        } catch (Exception ex) {
1099:                        }
1100:                    }
1101:                    synchronized (clients) {
1102:                        for (Iterator it = clients.iterator(); it.hasNext();) {
1103:                            ClientConnectionHandler handler = (ClientConnectionHandler) it
1104:                                    .next();
1105:                            handler.stopThread();
1106:                        }
1107:                        clients.clear();
1108:                    }
1109:                    acceptor = null;
1110:                }
1111:
1112:                /** Only accepts 1 client connection at a time (saving threads) */
1113:                public void run() {
1114:                    Socket client_sock;
1115:                    while (acceptor != null && srv_sock != null) {
1116:                        try {
1117:                            if (log.isTraceEnabled()) // +++ remove
1118:                                log.trace("waiting for client connections on "
1119:                                        + srv_sock.getInetAddress() + ":"
1120:                                        + srv_sock.getLocalPort());
1121:                            client_sock = srv_sock.accept();
1122:                            if (log.isTraceEnabled()) // +++ remove
1123:                                log.trace("accepted connection from "
1124:                                        + client_sock.getInetAddress() + ':'
1125:                                        + client_sock.getPort());
1126:                            ClientConnectionHandler client_conn_handler = new ClientConnectionHandler(
1127:                                    client_sock, clients);
1128:                            synchronized (clients) {
1129:                                clients.add(client_conn_handler);
1130:                            }
1131:                            client_conn_handler.start();
1132:                        } catch (IOException io_ex2) {
1133:                            break;
1134:                        }
1135:                    }
1136:                    acceptor = null;
1137:                }
1138:            }
1139:
1140:            /** Handles a client connection; multiple client can connect at the same time */
1141:            private static class ClientConnectionHandler extends Thread {
1142:                Socket client_sock = null;
1143:                InputStream in;
1144:                final Object mutex = new Object();
1145:                final List clients = new ArrayList();
1146:
1147:                ClientConnectionHandler(Socket client_sock, List clients) {
1148:                    setName("ClientConnectionHandler");
1149:                    setDaemon(true);
1150:                    this .client_sock = client_sock;
1151:                    this .clients.addAll(clients);
1152:                }
1153:
1154:                void stopThread() {
1155:                    synchronized (mutex) {
1156:                        if (client_sock != null) {
1157:                            try {
1158:                                OutputStream out = client_sock
1159:                                        .getOutputStream();
1160:                                out.write(NORMAL_TERMINATION);
1161:                                out.flush();
1162:                                closeClientSocket();
1163:                            } catch (Throwable t) {
1164:                            }
1165:                        }
1166:                    }
1167:                }
1168:
1169:                void closeClientSocket() {
1170:                    synchronized (mutex) {
1171:                        Util.close(client_sock);
1172:                        client_sock = null;
1173:                    }
1174:                }
1175:
1176:                public void run() {
1177:                    try {
1178:                        synchronized (mutex) {
1179:                            if (client_sock == null)
1180:                                return;
1181:                            in = client_sock.getInputStream();
1182:                        }
1183:                        int b = 0;
1184:                        do {
1185:                            b = in.read();
1186:                        } while (b != ABNORMAL_TERMINATION
1187:                                && b != NORMAL_TERMINATION);
1188:                    } catch (IOException ex) {
1189:                    } finally {
1190:                        Socket sock = client_sock; // PATCH: avoid race condition causing NPE
1191:                        if (sock != null && !sock.isClosed())
1192:                            closeClientSocket();
1193:                        synchronized (clients) {
1194:                            clients.remove(this );
1195:                        }
1196:                    }
1197:                }
1198:            }
1199:
1200:            /**
1201:             * Task that periodically broadcasts a list of suspected members to the group. Goal is not to lose
1202:             * a SUSPECT message: since these are bcast unreliably, they might get dropped. The BroadcastTask makes
1203:             * sure they are retransmitted until a view has been received which doesn't contain the suspected members
1204:             * any longer. Then the task terminates.
1205:             */
1206:            private class BroadcastTask implements  TimeScheduler.Task {
1207:                final Vector suspected_mbrs = new Vector(7);
1208:                boolean stopped = false;
1209:
1210:                /** Adds a suspected member. Starts the task if not yet running */
1211:                public void addSuspectedMember(Address mbr) {
1212:                    if (mbr == null)
1213:                        return;
1214:                    if (!members.contains(mbr))
1215:                        return;
1216:                    synchronized (suspected_mbrs) {
1217:                        if (!suspected_mbrs.contains(mbr)) {
1218:                            suspected_mbrs.addElement(mbr);
1219:                            if (log.isDebugEnabled())
1220:                                log.debug("mbr=" + mbr + " (size="
1221:                                        + suspected_mbrs.size() + ')');
1222:                        }
1223:                        if (stopped && suspected_mbrs.size() > 0) {
1224:                            stopped = false;
1225:                            timer.add(this , true);
1226:                        }
1227:                    }
1228:                }
1229:
1230:                public void removeSuspectedMember(Address suspected_mbr) {
1231:                    if (suspected_mbr == null)
1232:                        return;
1233:                    if (log.isDebugEnabled())
1234:                        log.debug("member is " + suspected_mbr);
1235:                    synchronized (suspected_mbrs) {
1236:                        suspected_mbrs.removeElement(suspected_mbr);
1237:                        if (suspected_mbrs.size() == 0)
1238:                            stopped = true;
1239:                    }
1240:                }
1241:
1242:                public void removeAll() {
1243:                    synchronized (suspected_mbrs) {
1244:                        suspected_mbrs.removeAllElements();
1245:                        stopped = true;
1246:                    }
1247:                }
1248:
1249:                /**
1250:                 * Removes all elements from suspected_mbrs that are <em>not</em> in the new membership
1251:                 */
1252:                public void adjustSuspectedMembers(Vector new_mbrship) {
1253:                    Address suspected_mbr;
1254:
1255:                    if (new_mbrship == null || new_mbrship.size() == 0)
1256:                        return;
1257:                    synchronized (suspected_mbrs) {
1258:                        for (Iterator it = suspected_mbrs.iterator(); it
1259:                                .hasNext();) {
1260:                            suspected_mbr = (Address) it.next();
1261:                            if (!new_mbrship.contains(suspected_mbr)) {
1262:                                it.remove();
1263:                                if (log.isDebugEnabled())
1264:                                    log.debug("removed " + suspected_mbr
1265:                                            + " (size=" + suspected_mbrs.size()
1266:                                            + ')');
1267:                            }
1268:                        }
1269:                        if (suspected_mbrs.size() == 0)
1270:                            stopped = true;
1271:                    }
1272:                }
1273:
1274:                public boolean cancelled() {
1275:                    return stopped;
1276:                }
1277:
1278:                public long nextInterval() {
1279:                    return suspect_msg_interval;
1280:                }
1281:
1282:                public void run() {
1283:                    Message suspect_msg;
1284:                    FdHeader hdr;
1285:
1286:                    if (log.isDebugEnabled())
1287:                        log
1288:                                .debug("broadcasting SUSPECT message (suspected_mbrs="
1289:                                        + suspected_mbrs + ") to group");
1290:
1291:                    synchronized (suspected_mbrs) {
1292:                        if (suspected_mbrs.size() == 0) {
1293:                            stopped = true;
1294:                            if (log.isDebugEnabled())
1295:                                log.debug("task done (no suspected members)");
1296:                            return;
1297:                        }
1298:
1299:                        hdr = new FdHeader(FdHeader.SUSPECT);
1300:                        hdr.mbrs = (Vector) suspected_mbrs.clone();
1301:                    }
1302:                    suspect_msg = new Message(); // mcast SUSPECT to all members
1303:                    suspect_msg.putHeader(name, hdr);
1304:                    passDown(new Event(Event.MSG, suspect_msg));
1305:                    if (log.isDebugEnabled())
1306:                        log.debug("task done");
1307:                }
1308:            }
1309:
1310:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.