0001: // $Id: FD_SOCK.java,v 1.51.2.1 2007/04/27 08:03:51 belaban Exp $
0002:
0003: package org.jgroups.protocols;
0004:
0005: import org.jgroups.*;
0006: import org.jgroups.stack.IpAddress;
0007: import org.jgroups.stack.Protocol;
0008: import org.jgroups.util.*;
0009:
0010: import java.io.*;
0011: import java.net.InetAddress;
0012: import java.net.ServerSocket;
0013: import java.net.Socket;
0014: import java.net.UnknownHostException;
0015: import java.util.*;
0016: import java.util.List;
0017:
0018: /**
0019: * Failure detection protocol based on sockets. Failure detection is ring-based. Each member creates a
0020: * server socket and announces its address together with the server socket's address in a multicast. A
0021: * pinger thread will be started when the membership goes above 1 and will be stopped when it drops below
0022: * 2. The pinger thread connects to its neighbor on the right and waits until the socket is closed. When
0023: * the socket is closed by the monitored peer in an abnormal fashion (IOException), the neighbor will be
0024: * suspected.<p> The main feature of this protocol is that no ping messages need to be exchanged between
0025: * any 2 peers, and failure detection relies entirely on TCP sockets. The advantage is that no activity
0026: * will take place between 2 peers as long as they are alive (i.e. have their server sockets open).
0027: * The disadvantage is that hung servers or crashed routers will not cause sockets to be closed, therefore
0028: * they won't be detected.
0029: * The FD_SOCK protocol will work for groups where members are on different hosts<p>
0030: * The costs involved are 2 additional threads: one that
0031: * monitors the client side of the socket connection (to monitor a peer) and another one that manages the
0032: * server socket. However, those threads will be idle as long as both peers are running.
0033: * @author Bela Ban May 29 2001
0034: */
0035: public class FD_SOCK extends Protocol implements Runnable {
0036: long get_cache_timeout = 3000; // msecs to wait for the socket cache from the coordinator
0037: static final long get_cache_retry_timeout = 500; // msecs to wait until we retry getting the cache from coord
0038: long suspect_msg_interval = 5000; // (BroadcastTask): mcast SUSPECT every 5000 msecs
0039: int num_tries = 3; // attempts coord is solicited for socket cache until we give up
0040: final Vector members = new Vector(11); // list of group members (updated on VIEW_CHANGE)
0041: boolean srv_sock_sent = false; // has own socket been broadcast yet ?
0042: final Vector pingable_mbrs = new Vector(11); // mbrs from which we select ping_dest. may be subset of 'members'
0043: final Promise get_cache_promise = new Promise(); // used for rendezvous on GET_CACHE and GET_CACHE_RSP
0044: boolean got_cache_from_coord = false; // was cache already fetched ?
0045: Address local_addr = null; // our own address
0046: ServerSocket srv_sock = null; // server socket to which another member connects to monitor me
0047:
0048: InetAddress bind_addr = null; // the NIC on which the ServerSocket should listen
0049:
0050: String group_name = null; // the name of the group (set on CONNECT, nulled on DISCONNECT)
0051:
0052: /** @deprecated Use {@link bind_addr} instead */
0053: InetAddress srv_sock_bind_addr = null; // the NIC on which the ServerSocket should listen
0054:
0055: private ServerSocketHandler srv_sock_handler = null; // accepts new connections on srv_sock
0056: IpAddress srv_sock_addr = null; // pair of server_socket:port
0057: Address ping_dest = null; // address of the member we monitor
0058: Socket ping_sock = null; // socket to the member we monitor
0059: InputStream ping_input = null; // input stream of the socket to the member we monitor
0060: Thread pinger_thread = null; // listens on ping_sock, suspects member if socket is closed
0061: final Object pinger_mutex = new Object();
0062:
0063: final Hashtable cache = new Hashtable(11); // keys=Addresses, vals=IpAddresses (socket:port)
0064:
0065: /** Start port for server socket (uses first available port starting at start_port). A value of 0 (default)
0066: * picks a random port */
0067: int start_port = 0;
0068: final Promise ping_addr_promise = new Promise(); // to fetch the ping_addr for ping_dest
0069: final Object sock_mutex = new Object(); // for access to ping_sock, ping_input
0070: TimeScheduler timer = null;
0071: private final BroadcastTask bcast_task = new BroadcastTask(); // to transmit SUSPECT message (until view change)
0072: boolean regular_sock_close = false; // used by interruptPingerThread() when new ping_dest is computed
0073: int num_suspect_events = 0;
0074: private static final int INTERRUPT = 8;
0075: private static final int NORMAL_TERMINATION = 9;
0076: private static final int ABNORMAL_TERMINATION = -1;
0077: private static final String name = "FD_SOCK";
0078:
0079: BoundedList suspect_history = new BoundedList(20);
0080:
0081: /** whether to use KEEP_ALIVE on the ping socket or not */
0082: private boolean keep_alive = true;
0083:
0084: private boolean running = false;
0085:
0086: public String getName() {
0087: return name;
0088: }
0089:
0090: public String getLocalAddress() {
0091: return local_addr != null ? local_addr.toString() : "null";
0092: }
0093:
0094: public String getMembers() {
0095: return members != null ? members.toString() : "null";
0096: }
0097:
0098: public String getPingableMembers() {
0099: return pingable_mbrs != null ? pingable_mbrs.toString()
0100: : "null";
0101: }
0102:
0103: public String getPingDest() {
0104: return ping_dest != null ? ping_dest.toString() : "null";
0105: }
0106:
0107: public int getNumSuspectEventsGenerated() {
0108: return num_suspect_events;
0109: }
0110:
0111: public String printSuspectHistory() {
0112: StringBuffer sb = new StringBuffer();
0113: for (Enumeration en = suspect_history.elements(); en
0114: .hasMoreElements();) {
0115: sb.append(new Date()).append(": ").append(en.nextElement())
0116: .append("\n");
0117: }
0118: return sb.toString();
0119: }
0120:
0121: public boolean setProperties(Properties props) {
0122: String str;
0123:
0124: super .setProperties(props);
0125: str = props.getProperty("get_cache_timeout");
0126: if (str != null) {
0127: get_cache_timeout = Long.parseLong(str);
0128: props.remove("get_cache_timeout");
0129: }
0130:
0131: str = props.getProperty("suspect_msg_interval");
0132: if (str != null) {
0133: suspect_msg_interval = Long.parseLong(str);
0134: props.remove("suspect_msg_interval");
0135: }
0136:
0137: str = props.getProperty("num_tries");
0138: if (str != null) {
0139: num_tries = Integer.parseInt(str);
0140: props.remove("num_tries");
0141: }
0142:
0143: str = props.getProperty("start_port");
0144: if (str != null) {
0145: start_port = Integer.parseInt(str);
0146: props.remove("start_port");
0147: }
0148:
0149: str = props.getProperty("keep_alive");
0150: if (str != null) {
0151: keep_alive = new Boolean(str).booleanValue();
0152: props.remove("keep_alive");
0153: }
0154:
0155: str = props.getProperty("srv_sock_bind_addr");
0156: if (str != null) {
0157: log
0158: .error("srv_sock_bind_addr is deprecated and will be ignored - use bind_addr instead");
0159: props.remove("srv_sock_bind_addr");
0160: }
0161:
0162: boolean ignore_systemprops = Util
0163: .isBindAddressPropertyIgnored();
0164: str = Util.getProperty(new String[] { Global.BIND_ADDR,
0165: Global.BIND_ADDR_OLD }, props, "bind_addr",
0166: ignore_systemprops, null);
0167: if (str != null) {
0168: try {
0169: bind_addr = InetAddress.getByName(str);
0170: } catch (UnknownHostException unknown) {
0171: log.error("(bind_addr): host " + str + " not known");
0172: return false;
0173: }
0174: props.remove("bind_addr");
0175: }
0176:
0177: if (props.size() > 0) {
0178: log.error("the following properties are not recognized: "
0179: + props);
0180: return false;
0181: }
0182: return true;
0183: }
0184:
0185: public void init() throws Exception {
0186: srv_sock_handler = new ServerSocketHandler();
0187: timer = stack != null ? stack.timer : null;
0188: if (timer == null)
0189: throw new Exception("FD_SOCK.init(): timer == null");
0190: }
0191:
0192: public void start() throws Exception {
0193: super .start();
0194: running = true;
0195: }
0196:
0197: public void stop() {
0198: running = false;
0199: bcast_task.removeAll();
0200: stopPingerThread();
0201: stopServerSocket();
0202: }
0203:
0204: public void resetStats() {
0205: super .resetStats();
0206: num_suspect_events = 0;
0207: suspect_history.removeAll();
0208: }
0209:
0210: public void up(Event evt) {
0211: Message msg;
0212: FdHeader hdr;
0213:
0214: switch (evt.getType()) {
0215:
0216: case Event.SET_LOCAL_ADDRESS:
0217: local_addr = (Address) evt.getArg();
0218: break;
0219:
0220: case Event.MSG:
0221: msg = (Message) evt.getArg();
0222: hdr = (FdHeader) msg.removeHeader(name);
0223: if (hdr == null)
0224: break; // message did not originate from FD_SOCK layer, just pass up
0225:
0226: switch (hdr.type) {
0227:
0228: case FdHeader.SUSPECT:
0229: if (hdr.mbrs != null) {
0230: if (log.isDebugEnabled())
0231: log.debug("[SUSPECT] hdr=" + hdr);
0232: for (int i = 0; i < hdr.mbrs.size(); i++) {
0233: Address m = (Address) hdr.mbrs.elementAt(i);
0234: if (local_addr != null && m.equals(local_addr)) {
0235: if (log.isWarnEnabled())
0236: log
0237: .warn("I was suspected by "
0238: + msg.getSrc()
0239: + "; ignoring the SUSPECT message");
0240: continue;
0241: }
0242: passUp(new Event(Event.SUSPECT, hdr.mbrs
0243: .elementAt(i)));
0244: passDown(new Event(Event.SUSPECT, hdr.mbrs
0245: .elementAt(i)));
0246: }
0247: } else if (log.isWarnEnabled())
0248: log.warn("[SUSPECT]: hdr.mbrs == null");
0249: break;
0250:
0251: // If I have the sock for 'hdr.mbr', return it. Otherwise look it up in my cache and return it
0252: case FdHeader.WHO_HAS_SOCK:
0253: if (local_addr != null
0254: && local_addr.equals(msg.getSrc()))
0255: return; // don't reply to WHO_HAS bcasts sent by me !
0256:
0257: if (hdr.mbr == null) {
0258: if (log.isErrorEnabled())
0259: log.error("hdr.mbr is null");
0260: return;
0261: }
0262:
0263: if (log.isTraceEnabled())
0264: log.trace("who-has-sock " + hdr.mbr);
0265:
0266: // 1. Try my own address, maybe it's me whose socket is wanted
0267: if (local_addr != null && local_addr.equals(hdr.mbr)
0268: && srv_sock_addr != null) {
0269: sendIHaveSockMessage(msg.getSrc(), local_addr,
0270: srv_sock_addr); // unicast message to msg.getSrc()
0271: return;
0272: }
0273:
0274: // 2. If I don't have it, maybe it is in the cache
0275: if (cache.containsKey(hdr.mbr))
0276: sendIHaveSockMessage(msg.getSrc(), hdr.mbr,
0277: (IpAddress) cache.get(hdr.mbr)); // ucast msg
0278: break;
0279:
0280: // Update the cache with the addr:sock_addr entry (if on the same host)
0281: case FdHeader.I_HAVE_SOCK:
0282: if (hdr.mbr == null || hdr.sock_addr == null) {
0283: if (log.isErrorEnabled())
0284: log
0285: .error("[I_HAVE_SOCK]: hdr.mbr is null or hdr.sock_addr == null");
0286: return;
0287: }
0288:
0289: // if(!cache.containsKey(hdr.mbr))
0290: cache.put(hdr.mbr, hdr.sock_addr); // update the cache
0291: if (log.isTraceEnabled())
0292: log.trace("i-have-sock: " + hdr.mbr + " --> "
0293: + hdr.sock_addr + " (cache is " + cache
0294: + ')');
0295:
0296: if (ping_dest != null && hdr.mbr.equals(ping_dest))
0297: ping_addr_promise.setResult(hdr.sock_addr);
0298: break;
0299:
0300: // Return the cache to the sender of this message
0301: case FdHeader.GET_CACHE:
0302: if (hdr.mbr == null) {
0303: if (log.isErrorEnabled())
0304: log.error("(GET_CACHE): hdr.mbr == null");
0305: return;
0306: }
0307: hdr = new FdHeader(FdHeader.GET_CACHE_RSP);
0308: hdr.cachedAddrs = (Hashtable) cache.clone();
0309: msg = new Message(hdr.mbr, null, null);
0310: msg.putHeader(name, hdr);
0311: passDown(new Event(Event.MSG, msg));
0312: break;
0313:
0314: case FdHeader.GET_CACHE_RSP:
0315: if (hdr.cachedAddrs == null) {
0316: if (log.isErrorEnabled())
0317: log.error("(GET_CACHE_RSP): cache is null");
0318: return;
0319: }
0320: get_cache_promise.setResult(hdr.cachedAddrs);
0321: break;
0322: }
0323: return;
0324:
0325: case Event.CONFIG:
0326: if (bind_addr == null) {
0327: Map config = (Map) evt.getArg();
0328: bind_addr = (InetAddress) config.get("bind_addr");
0329: }
0330: break;
0331: }
0332:
0333: passUp(evt); // pass up to the layer above us
0334: }
0335:
0336: public void down(Event evt) {
0337: Address mbr, tmp_ping_dest;
0338: View v;
0339:
0340: switch (evt.getType()) {
0341:
0342: case Event.UNSUSPECT:
0343: bcast_task.removeSuspectedMember((Address) evt.getArg());
0344: break;
0345:
0346: case Event.CONNECT:
0347: passDown(evt);
0348: group_name = (String) evt.getArg();
0349: srv_sock = Util.createServerSocket(bind_addr, start_port); // grab a random unused port above 10000
0350: srv_sock_addr = new IpAddress(bind_addr, srv_sock
0351: .getLocalPort());
0352: startServerSocket();
0353: break;
0354:
0355: case Event.DISCONNECT:
0356: group_name = null;
0357: String tmp,
0358: prefix = Global.THREAD_PREFIX;
0359: int index;
0360: tmp = srv_sock_handler != null ? srv_sock_handler.getName()
0361: : null;
0362: if (tmp != null) {
0363: index = tmp.indexOf(prefix);
0364: if (index > -1) {
0365: tmp = tmp.substring(0, index);
0366: srv_sock_handler.setName(tmp);
0367: }
0368: }
0369: synchronized (pinger_mutex) {
0370: tmp = pinger_thread != null ? pinger_thread.getName()
0371: : null;
0372: if (tmp != null) {
0373: index = tmp.indexOf(prefix);
0374: if (index > -1) {
0375: tmp = tmp.substring(0, index);
0376: pinger_thread.setName(tmp);
0377: }
0378: }
0379: }
0380:
0381: stopServerSocket();
0382:
0383: break;
0384:
0385: case Event.VIEW_CHANGE:
0386: v = (View) evt.getArg();
0387: Vector new_mbrs = v.getMembers();
0388: passDown(evt);
0389:
0390: synchronized (this ) {
0391: members.removeAllElements();
0392: members.addAll(new_mbrs);
0393: bcast_task.adjustSuspectedMembers(members);
0394: pingable_mbrs.removeAllElements();
0395: pingable_mbrs.addAll(members);
0396: if (log.isDebugEnabled())
0397: log.debug("VIEW_CHANGE received: " + members);
0398:
0399: // 1. Get the addr:pid cache from the coordinator (only if not already fetched)
0400: if (!got_cache_from_coord) {
0401: getCacheFromCoordinator();
0402: got_cache_from_coord = true;
0403: }
0404:
0405: // 2. Broadcast my own addr:sock to all members so they can update their cache
0406: if (!srv_sock_sent) {
0407: if (srv_sock_addr != null) {
0408: sendIHaveSockMessage(null, // send to all members
0409: local_addr, srv_sock_addr);
0410: srv_sock_sent = true;
0411: } else if (log.isWarnEnabled())
0412: log
0413: .warn("(VIEW_CHANGE): srv_sock_addr == null");
0414: }
0415:
0416: // 3. Remove all entries in 'cache' which are not in the new membership
0417: for (Enumeration e = cache.keys(); e.hasMoreElements();) {
0418: mbr = (Address) e.nextElement();
0419: if (!members.contains(mbr))
0420: cache.remove(mbr);
0421: }
0422:
0423: if (members.size() > 1) {
0424: synchronized (pinger_mutex) {
0425: if (pinger_thread != null
0426: && pinger_thread.isAlive()) {
0427: tmp_ping_dest = determinePingDest();
0428: if (ping_dest != null
0429: && tmp_ping_dest != null
0430: && !ping_dest.equals(tmp_ping_dest)) {
0431: interruptPingerThread(); // allows the thread to use the new socket
0432: }
0433: } else
0434: startPingerThread(); // only starts if not yet running
0435: }
0436: } else {
0437: ping_dest = null;
0438: stopPingerThread();
0439: }
0440: }
0441: break;
0442:
0443: default:
0444: passDown(evt);
0445: break;
0446: }
0447: }
0448:
0449: /**
0450: * Runs as long as there are 2 members and more. Determines the member to be monitored and fetches its
0451: * server socket address (if n/a, sends a message to obtain it). The creates a client socket and listens on
0452: * it until the connection breaks. If it breaks, emits a SUSPECT message. It the connection is closed regularly,
0453: * nothing happens. In both cases, a new member to be monitored will be chosen and monitoring continues (unless
0454: * there are fewer than 2 members).
0455: */
0456: public void run() {
0457: Address tmp_ping_dest;
0458: IpAddress ping_addr;
0459: int max_fetch_tries = 10; // number of times a socket address is to be requested before giving up
0460:
0461: if (log.isTraceEnabled())
0462: log.trace("pinger_thread started"); // +++ remove
0463: while (pinger_thread != null
0464: && Thread.currentThread().equals(pinger_thread)
0465: && running) {
0466: tmp_ping_dest = determinePingDest(); // gets the neighbor to our right
0467: if (log.isDebugEnabled())
0468: log.debug("determinePingDest()=" + tmp_ping_dest
0469: + ", pingable_mbrs=" + pingable_mbrs);
0470: if (tmp_ping_dest == null) {
0471: ping_dest = null;
0472: synchronized (pinger_mutex) {
0473: pinger_thread = null;
0474: }
0475: break;
0476: }
0477: ping_dest = tmp_ping_dest;
0478: ping_addr = fetchPingAddress(ping_dest);
0479: if (ping_addr == null) {
0480: if (!running)
0481: break;
0482: if (log.isErrorEnabled())
0483: log.error("socket address for " + ping_dest
0484: + " could not be fetched, retrying");
0485: if (--max_fetch_tries <= 0)
0486: break;
0487: Util.sleep(2000);
0488: continue;
0489: }
0490:
0491: if (!setupPingSocket(ping_addr)) {
0492: // covers use cases #7 and #8 in ManualTests.txt
0493: if (log.isDebugEnabled())
0494: log.debug("could not create socket to " + ping_dest
0495: + "; suspecting " + ping_dest);
0496: broadcastSuspectMessage(ping_dest);
0497: pingable_mbrs.removeElement(ping_dest);
0498: continue;
0499: }
0500:
0501: if (log.isDebugEnabled())
0502: log.debug("ping_dest=" + ping_dest + ", ping_sock="
0503: + ping_sock + ", cache=" + cache);
0504:
0505: // at this point ping_input must be non-null, otherwise setupPingSocket() would have thrown an exception
0506: try {
0507: if (ping_input != null) {
0508: int c = ping_input.read();
0509: switch (c) {
0510: case NORMAL_TERMINATION:
0511: if (log.isDebugEnabled())
0512: log.debug("peer closed socket normally");
0513: synchronized (pinger_mutex) {
0514: pinger_thread = null;
0515: }
0516: break;
0517: case ABNORMAL_TERMINATION:
0518: handleSocketClose(null);
0519: break;
0520: default:
0521: break;
0522: }
0523: }
0524: } catch (IOException ex) { // we got here when the peer closed the socket --> suspect peer and then continue
0525: handleSocketClose(ex);
0526: } catch (Throwable catch_all_the_rest) {
0527: log.error("exception", catch_all_the_rest);
0528: }
0529: }
0530: if (log.isDebugEnabled())
0531: log.debug("pinger thread terminated");
0532: synchronized (pinger_mutex) {
0533: pinger_thread = null;
0534: }
0535: }
0536:
0537: /* ----------------------------------- Private Methods -------------------------------------- */
0538:
0539: void handleSocketClose(Exception ex) {
0540: teardownPingSocket(); // make sure we have no leftovers
0541: if (!regular_sock_close) { // only suspect if socket was not closed regularly (by interruptPingerThread())
0542: if (log.isDebugEnabled())
0543: log
0544: .debug("peer "
0545: + ping_dest
0546: + " closed socket ("
0547: + (ex != null ? ex.getClass().getName()
0548: : "eof") + ')');
0549: broadcastSuspectMessage(ping_dest);
0550: pingable_mbrs.removeElement(ping_dest);
0551: } else {
0552: if (log.isDebugEnabled())
0553: log.debug("socket to " + ping_dest + " was reset");
0554: regular_sock_close = false;
0555: }
0556: }
0557:
0558: /**
0559: * Does *not* need to be synchronized on pinger_mutex because the caller (down()) already has the mutex acquired
0560: */
0561: void startPingerThread() {
0562: running = true;
0563: if (pinger_thread == null) {
0564: pinger_thread = new Thread(Util.getGlobalThreadGroup(),
0565: this , "FD_SOCK Ping thread");
0566: pinger_thread.setDaemon(true);
0567: pinger_thread.start();
0568: if (group_name != null) {
0569: String tmp, prefix = Global.THREAD_PREFIX;
0570: tmp = pinger_thread.getName();
0571: if (tmp != null && tmp.indexOf(prefix) == -1) {
0572: tmp += prefix + group_name + ")";
0573: pinger_thread.setName(tmp);
0574: }
0575: }
0576: }
0577: }
0578:
0579: void stopPingerThread() {
0580: running = false;
0581: synchronized (pinger_mutex) {
0582: if (pinger_thread != null && pinger_thread.isAlive()) {
0583: regular_sock_close = true;
0584: pinger_thread = null;
0585: sendPingTermination(); // PATCH by Bruce Schuchardt (http://jira.jboss.com/jira/browse/JGRP-246)
0586: teardownPingSocket();
0587: ping_addr_promise.setResult(null);
0588: }
0589: }
0590: }
0591:
0592: // PATCH: send something so the connection handler can exit
0593: synchronized void sendPingTermination() {
0594: sendPingSignal(NORMAL_TERMINATION);
0595: }
0596:
0597: void sendPingInterrupt() {
0598: sendPingSignal(INTERRUPT);
0599: }
0600:
0601: synchronized void sendPingSignal(int signal) {
0602: if (ping_sock != null) {
0603: try {
0604: OutputStream out = ping_sock.getOutputStream();
0605: if (out != null) {
0606: out.write(signal);
0607: out.flush();
0608: }
0609: } catch (Throwable t) {
0610: if (log.isTraceEnabled())
0611: log.trace("problem sending signal "
0612: + signalToString(signal), t);
0613: }
0614: }
0615: }
0616:
0617: /**
0618: * Interrupts the pinger thread. The Thread.interrupt() method doesn't seem to work under Linux with JDK 1.3.1
0619: * (JDK 1.2.2 had no problems here), therefore we close the socket (setSoLinger has to be set !) if we are
0620: * running under Linux. This should be tested under Windows. (Solaris 8 and JDK 1.3.1 definitely works).<p>
0621: * Oct 29 2001 (bela): completely removed Thread.interrupt(), but used socket close on all OSs. This makes this
0622: * code portable and we don't have to check for OSs.<p/>
0623: * Does *not* need to be synchronized on pinger_mutex because the caller (down()) already has the mutex acquired
0624: * @see org.jgroups.tests.InterruptTest to determine whether Thread.interrupt() works for InputStream.read().
0625: */
0626: void interruptPingerThread() {
0627: if (pinger_thread != null && pinger_thread.isAlive()) {
0628: regular_sock_close = true;
0629: sendPingInterrupt(); // PATCH by Bruce Schuchardt (http://jira.jboss.com/jira/browse/JGRP-246)
0630: teardownPingSocket(); // will wake up the pinger thread. less elegant than Thread.interrupt(), but does the job
0631: }
0632: }
0633:
0634: void startServerSocket() {
0635: if (srv_sock_handler != null) {
0636: srv_sock_handler.start(); // won't start if already running
0637: if (group_name != null) {
0638: String tmp, prefix = Global.THREAD_PREFIX;
0639: tmp = srv_sock_handler.getName();
0640: if (tmp != null && tmp.indexOf(prefix) == -1) {
0641: tmp += prefix + group_name + ")";
0642: srv_sock_handler.setName(tmp);
0643: }
0644: }
0645: }
0646: }
0647:
0648: void stopServerSocket() {
0649: if (srv_sock_handler != null)
0650: srv_sock_handler.stop();
0651: }
0652:
0653: /**
0654: * Creates a socket to <code>dest</code>, and assigns it to ping_sock. Also assigns ping_input
0655: */
0656: boolean setupPingSocket(IpAddress dest) {
0657: synchronized (sock_mutex) {
0658: if (dest == null) {
0659: if (log.isErrorEnabled())
0660: log.error("destination address is null");
0661: return false;
0662: }
0663: try {
0664: ping_sock = new Socket(dest.getIpAddress(), dest
0665: .getPort());
0666: ping_sock.setSoLinger(true, 1);
0667: ping_sock.setKeepAlive(keep_alive);
0668: ping_input = ping_sock.getInputStream();
0669: return true;
0670: } catch (Throwable ex) {
0671: return false;
0672: }
0673: }
0674: }
0675:
0676: void teardownPingSocket() {
0677: synchronized (sock_mutex) {
0678: if (ping_sock != null) {
0679: try {
0680: ping_sock.shutdownInput();
0681: ping_sock.close();
0682: } catch (Exception ex) {
0683: }
0684: ping_sock = null;
0685: }
0686: Util.close(ping_input);
0687: ping_input = null;
0688: }
0689: }
0690:
0691: /**
0692: * Determines coordinator C. If C is null and we are the first member, return. Else loop: send GET_CACHE message
0693: * to coordinator and wait for GET_CACHE_RSP response. Loop until valid response has been received.
0694: */
0695: void getCacheFromCoordinator() {
0696: Address coord;
0697: int attempts = num_tries;
0698: Message msg;
0699: FdHeader hdr;
0700: Hashtable result;
0701:
0702: get_cache_promise.reset();
0703: while (attempts > 0) {
0704: if ((coord = determineCoordinator()) != null) {
0705: if (coord.equals(local_addr)) { // we are the first member --> empty cache
0706: if (log.isDebugEnabled())
0707: log.debug("first member; cache is empty");
0708: return;
0709: }
0710: hdr = new FdHeader(FdHeader.GET_CACHE);
0711: hdr.mbr = local_addr;
0712: msg = new Message(coord, null, null);
0713: msg.putHeader(name, hdr);
0714: passDown(new Event(Event.MSG, msg));
0715: result = (Hashtable) get_cache_promise
0716: .getResult(get_cache_timeout);
0717: if (result != null) {
0718: cache.putAll(result); // replace all entries (there should be none !) in cache with the new values
0719: if (log.isTraceEnabled())
0720: log.trace("got cache from " + coord
0721: + ": cache is " + cache);
0722: return;
0723: } else {
0724: if (log.isErrorEnabled())
0725: log.error("received null cache; retrying");
0726: }
0727: }
0728:
0729: Util.sleep(get_cache_retry_timeout);
0730: --attempts;
0731: }
0732: }
0733:
0734: /**
0735: * Sends a SUSPECT message to all group members. Only the coordinator (or the next member in line if the coord
0736: * itself is suspected) will react to this message by installing a new view. To overcome the unreliability
0737: * of the SUSPECT message (it may be lost because we are not above any retransmission layer), the following scheme
0738: * is used: after sending the SUSPECT message, it is also added to the broadcast task, which will periodically
0739: * re-send the SUSPECT until a view is received in which the suspected process is not a member anymore. The reason is
0740: * that - at one point - either the coordinator or another participant taking over for a crashed coordinator, will
0741: * react to the SUSPECT message and issue a new view, at which point the broadcast task stops.
0742: */
0743: void broadcastSuspectMessage(Address suspected_mbr) {
0744: Message suspect_msg;
0745: FdHeader hdr;
0746:
0747: if (suspected_mbr == null)
0748: return;
0749:
0750: if (log.isTraceEnabled())
0751: log.trace("suspecting " + suspected_mbr
0752: + " (own address is " + local_addr + ')');
0753:
0754: // 1. Send a SUSPECT message right away; the broadcast task will take some time to send it (sleeps first)
0755: hdr = new FdHeader(FdHeader.SUSPECT);
0756: hdr.mbrs = new Vector(1);
0757: hdr.mbrs.addElement(suspected_mbr);
0758: suspect_msg = new Message();
0759: suspect_msg.putHeader(name, hdr);
0760: passDown(new Event(Event.MSG, suspect_msg));
0761:
0762: // 2. Add to broadcast task and start latter (if not yet running). The task will end when
0763: // suspected members are removed from the membership
0764: bcast_task.addSuspectedMember(suspected_mbr);
0765: if (stats) {
0766: num_suspect_events++;
0767: suspect_history.add(suspected_mbr);
0768: }
0769: }
0770:
0771: void broadcastWhoHasSockMessage(Address mbr) {
0772: Message msg;
0773: FdHeader hdr;
0774:
0775: if (local_addr != null && mbr != null)
0776: if (log.isDebugEnabled())
0777: log.debug("[" + local_addr + "]: who-has " + mbr);
0778:
0779: msg = new Message(); // bcast msg
0780: hdr = new FdHeader(FdHeader.WHO_HAS_SOCK);
0781: hdr.mbr = mbr;
0782: msg.putHeader(name, hdr);
0783: passDown(new Event(Event.MSG, msg));
0784: }
0785:
0786: /**
0787: Sends or broadcasts a I_HAVE_SOCK response. If 'dst' is null, the reponse will be broadcast, otherwise
0788: it will be unicast back to the requester
0789: */
0790: void sendIHaveSockMessage(Address dst, Address mbr, IpAddress addr) {
0791: Message msg = new Message(dst, null, null);
0792: FdHeader hdr = new FdHeader(FdHeader.I_HAVE_SOCK);
0793: hdr.mbr = mbr;
0794: hdr.sock_addr = addr;
0795: msg.putHeader(name, hdr);
0796: passDown(new Event(Event.MSG, msg));
0797: }
0798:
0799: /**
0800: Attempts to obtain the ping_addr first from the cache, then by unicasting q request to <code>mbr</code>,
0801: then by multicasting a request to all members.
0802: */
0803: private IpAddress fetchPingAddress(Address mbr) {
0804: IpAddress ret;
0805: Message ping_addr_req;
0806: FdHeader hdr;
0807:
0808: if (mbr == null) {
0809: if (log.isErrorEnabled())
0810: log.error("mbr == null");
0811: return null;
0812: }
0813: // 1. Try to get from cache. Add a little delay so that joining mbrs can send their socket address before
0814: // we ask them to do so
0815: ret = (IpAddress) cache.get(mbr);
0816: if (ret != null) {
0817: return ret;
0818: }
0819:
0820: Util.sleep(300);
0821: if ((ret = (IpAddress) cache.get(mbr)) != null)
0822: return ret;
0823:
0824: // 2. Try to get from mbr
0825: ping_addr_promise.reset();
0826: ping_addr_req = new Message(mbr, null, null); // unicast
0827: hdr = new FdHeader(FdHeader.WHO_HAS_SOCK);
0828: hdr.mbr = mbr;
0829: ping_addr_req.putHeader(name, hdr);
0830: passDown(new Event(Event.MSG, ping_addr_req));
0831: if (!running)
0832: return null;
0833: ret = (IpAddress) ping_addr_promise.getResult(3000);
0834: if (ret != null) {
0835: return ret;
0836: }
0837:
0838: // 3. Try to get from all members
0839: ping_addr_req = new Message(null); // multicast
0840: hdr = new FdHeader(FdHeader.WHO_HAS_SOCK);
0841: hdr.mbr = mbr;
0842: ping_addr_req.putHeader(name, hdr);
0843: passDown(new Event(Event.MSG, ping_addr_req));
0844: ret = (IpAddress) ping_addr_promise.getResult(3000);
0845: return ret;
0846: }
0847:
0848: Address determinePingDest() {
0849: Address tmp;
0850:
0851: if (pingable_mbrs == null || pingable_mbrs.size() < 2
0852: || local_addr == null)
0853: return null;
0854: for (int i = 0; i < pingable_mbrs.size(); i++) {
0855: tmp = (Address) pingable_mbrs.elementAt(i);
0856: if (local_addr.equals(tmp)) {
0857: if (i + 1 >= pingable_mbrs.size())
0858: return (Address) pingable_mbrs.elementAt(0);
0859: else
0860: return (Address) pingable_mbrs.elementAt(i + 1);
0861: }
0862: }
0863: return null;
0864: }
0865:
0866: Address determineCoordinator() {
0867: return members.size() > 0 ? (Address) members.elementAt(0)
0868: : null;
0869: }
0870:
0871: static String signalToString(int signal) {
0872: switch (signal) {
0873: case NORMAL_TERMINATION:
0874: return "NORMAL_TERMINATION";
0875: case ABNORMAL_TERMINATION:
0876: return "ABNORMAL_TERMINATION";
0877: case INTERRUPT:
0878: return "INTERRUPT";
0879: default:
0880: return "n/a";
0881: }
0882: }
0883:
0884: /* ------------------------------- End of Private Methods ------------------------------------ */
0885:
0886: public static class FdHeader extends Header implements Streamable {
0887: public static final byte SUSPECT = 10;
0888: public static final byte WHO_HAS_SOCK = 11;
0889: public static final byte I_HAVE_SOCK = 12;
0890: public static final byte GET_CACHE = 13; // sent by joining member to coordinator
0891: public static final byte GET_CACHE_RSP = 14; // sent by coordinator to joining member in response to GET_CACHE
0892:
0893: byte type = SUSPECT;
0894: Address mbr = null; // set on WHO_HAS_SOCK (requested mbr), I_HAVE_SOCK
0895: IpAddress sock_addr; // set on I_HAVE_SOCK
0896:
0897: // Hashtable<Address,IpAddress>
0898: Hashtable cachedAddrs = null; // set on GET_CACHE_RSP
0899: Vector mbrs = null; // set on SUSPECT (list of suspected members)
0900:
0901: public FdHeader() {
0902: } // used for externalization
0903:
0904: public FdHeader(byte type) {
0905: this .type = type;
0906: }
0907:
0908: public FdHeader(byte type, Address mbr) {
0909: this .type = type;
0910: this .mbr = mbr;
0911: }
0912:
0913: public FdHeader(byte type, Vector mbrs) {
0914: this .type = type;
0915: this .mbrs = mbrs;
0916: }
0917:
0918: public FdHeader(byte type, Hashtable cachedAddrs) {
0919: this .type = type;
0920: this .cachedAddrs = cachedAddrs;
0921: }
0922:
0923: public String toString() {
0924: StringBuffer sb = new StringBuffer();
0925: sb.append(type2String(type));
0926: if (mbr != null)
0927: sb.append(", mbr=").append(mbr);
0928: if (sock_addr != null)
0929: sb.append(", sock_addr=").append(sock_addr);
0930: if (cachedAddrs != null)
0931: sb.append(", cache=").append(cachedAddrs);
0932: if (mbrs != null)
0933: sb.append(", mbrs=").append(mbrs);
0934: return sb.toString();
0935: }
0936:
0937: public static String type2String(byte type) {
0938: switch (type) {
0939: case SUSPECT:
0940: return "SUSPECT";
0941: case WHO_HAS_SOCK:
0942: return "WHO_HAS_SOCK";
0943: case I_HAVE_SOCK:
0944: return "I_HAVE_SOCK";
0945: case GET_CACHE:
0946: return "GET_CACHE";
0947: case GET_CACHE_RSP:
0948: return "GET_CACHE_RSP";
0949: default:
0950: return "unknown type (" + type + ')';
0951: }
0952: }
0953:
0954: public void writeExternal(ObjectOutput out) throws IOException {
0955: out.writeByte(type);
0956: out.writeObject(mbr);
0957: out.writeObject(sock_addr);
0958: out.writeObject(cachedAddrs);
0959: out.writeObject(mbrs);
0960: }
0961:
0962: public void readExternal(ObjectInput in) throws IOException,
0963: ClassNotFoundException {
0964: type = in.readByte();
0965: mbr = (Address) in.readObject();
0966: sock_addr = (IpAddress) in.readObject();
0967: cachedAddrs = (Hashtable) in.readObject();
0968: mbrs = (Vector) in.readObject();
0969: }
0970:
0971: public long size() {
0972: long retval = Global.BYTE_SIZE; // type
0973: retval += Util.size(mbr);
0974: retval += Util.size(sock_addr);
0975:
0976: retval += Global.INT_SIZE; // cachedAddrs size
0977: Map.Entry entry;
0978: Address key;
0979: IpAddress val;
0980: if (cachedAddrs != null) {
0981: for (Iterator it = cachedAddrs.entrySet().iterator(); it
0982: .hasNext();) {
0983: entry = (Map.Entry) it.next();
0984: if ((key = (Address) entry.getKey()) != null)
0985: retval += Util.size(key);
0986: retval += Global.BYTE_SIZE; // presence for val
0987: if ((val = (IpAddress) entry.getValue()) != null)
0988: retval += val.size();
0989: }
0990: }
0991:
0992: retval += Global.INT_SIZE; // mbrs size
0993: if (mbrs != null) {
0994: for (int i = 0; i < mbrs.size(); i++) {
0995: retval += Util.size((Address) mbrs.elementAt(i));
0996: }
0997: }
0998:
0999: return retval;
1000: }
1001:
1002: public void writeTo(DataOutputStream out) throws IOException {
1003: int size;
1004: out.writeByte(type);
1005: Util.writeAddress(mbr, out);
1006: Util.writeStreamable(sock_addr, out);
1007: size = cachedAddrs != null ? cachedAddrs.size() : 0;
1008: out.writeInt(size);
1009: if (size > 0) {
1010: for (Iterator it = cachedAddrs.entrySet().iterator(); it
1011: .hasNext();) {
1012: Map.Entry entry = (Map.Entry) it.next();
1013: Address key = (Address) entry.getKey();
1014: IpAddress val = (IpAddress) entry.getValue();
1015: Util.writeAddress(key, out);
1016: Util.writeStreamable(val, out);
1017: }
1018: }
1019: size = mbrs != null ? mbrs.size() : 0;
1020: out.writeInt(size);
1021: if (size > 0) {
1022: for (Iterator it = mbrs.iterator(); it.hasNext();) {
1023: Address address = (Address) it.next();
1024: Util.writeAddress(address, out);
1025: }
1026: }
1027: }
1028:
1029: public void readFrom(DataInputStream in) throws IOException,
1030: IllegalAccessException, InstantiationException {
1031: int size;
1032: type = in.readByte();
1033: mbr = Util.readAddress(in);
1034: sock_addr = (IpAddress) Util.readStreamable(
1035: IpAddress.class, in);
1036: size = in.readInt();
1037: if (size > 0) {
1038: if (cachedAddrs == null)
1039: cachedAddrs = new Hashtable();
1040: for (int i = 0; i < size; i++) {
1041: Address key = Util.readAddress(in);
1042: IpAddress val = (IpAddress) Util.readStreamable(
1043: IpAddress.class, in);
1044: cachedAddrs.put(key, val);
1045: }
1046: }
1047: size = in.readInt();
1048: if (size > 0) {
1049: if (mbrs == null)
1050: mbrs = new Vector();
1051: for (int i = 0; i < size; i++) {
1052: Address addr = Util.readAddress(in);
1053: mbrs.add(addr);
1054: }
1055: }
1056: }
1057:
1058: }
1059:
1060: /**
1061: * Handles the server-side of a client-server socket connection. Waits until a client connects, and then loops
1062: * until that client closes the connection. Note that there is no new thread spawned for the listening on the
1063: * client socket, therefore there can only be 1 client connection at the same time. Subsequent clients attempting
1064: * to create a connection will be blocked until the first client closes its connection. This should not be a problem
1065: * as the ring nature of the FD_SOCK protocol always has only 1 client connect to its right-hand-side neighbor.
1066: */
1067: private class ServerSocketHandler implements Runnable {
1068: Thread acceptor = null;
1069: /** List<ClientConnectionHandler> */
1070: final List clients = new ArrayList();
1071:
1072: String getName() {
1073: return acceptor != null ? acceptor.getName() : null;
1074: }
1075:
1076: void setName(String thread_name) {
1077: if (acceptor != null)
1078: acceptor.setName(thread_name);
1079: }
1080:
1081: ServerSocketHandler() {
1082: start();
1083: }
1084:
1085: final void start() {
1086: if (acceptor == null) {
1087: acceptor = new Thread(Util.getGlobalThreadGroup(),
1088: this , "ServerSocket acceptor thread");
1089: acceptor.setDaemon(true);
1090: acceptor.start();
1091: }
1092: }
1093:
1094: final void stop() {
1095: if (acceptor != null && acceptor.isAlive()) {
1096: try {
1097: srv_sock.close(); // this will terminate thread, peer will receive SocketException (socket close)
1098: } catch (Exception ex) {
1099: }
1100: }
1101: synchronized (clients) {
1102: for (Iterator it = clients.iterator(); it.hasNext();) {
1103: ClientConnectionHandler handler = (ClientConnectionHandler) it
1104: .next();
1105: handler.stopThread();
1106: }
1107: clients.clear();
1108: }
1109: acceptor = null;
1110: }
1111:
1112: /** Only accepts 1 client connection at a time (saving threads) */
1113: public void run() {
1114: Socket client_sock;
1115: while (acceptor != null && srv_sock != null) {
1116: try {
1117: if (log.isTraceEnabled()) // +++ remove
1118: log.trace("waiting for client connections on "
1119: + srv_sock.getInetAddress() + ":"
1120: + srv_sock.getLocalPort());
1121: client_sock = srv_sock.accept();
1122: if (log.isTraceEnabled()) // +++ remove
1123: log.trace("accepted connection from "
1124: + client_sock.getInetAddress() + ':'
1125: + client_sock.getPort());
1126: ClientConnectionHandler client_conn_handler = new ClientConnectionHandler(
1127: client_sock, clients);
1128: synchronized (clients) {
1129: clients.add(client_conn_handler);
1130: }
1131: client_conn_handler.start();
1132: } catch (IOException io_ex2) {
1133: break;
1134: }
1135: }
1136: acceptor = null;
1137: }
1138: }
1139:
1140: /** Handles a client connection; multiple client can connect at the same time */
1141: private static class ClientConnectionHandler extends Thread {
1142: Socket client_sock = null;
1143: InputStream in;
1144: final Object mutex = new Object();
1145: final List clients = new ArrayList();
1146:
1147: ClientConnectionHandler(Socket client_sock, List clients) {
1148: setName("ClientConnectionHandler");
1149: setDaemon(true);
1150: this .client_sock = client_sock;
1151: this .clients.addAll(clients);
1152: }
1153:
1154: void stopThread() {
1155: synchronized (mutex) {
1156: if (client_sock != null) {
1157: try {
1158: OutputStream out = client_sock
1159: .getOutputStream();
1160: out.write(NORMAL_TERMINATION);
1161: out.flush();
1162: closeClientSocket();
1163: } catch (Throwable t) {
1164: }
1165: }
1166: }
1167: }
1168:
1169: void closeClientSocket() {
1170: synchronized (mutex) {
1171: Util.close(client_sock);
1172: client_sock = null;
1173: }
1174: }
1175:
1176: public void run() {
1177: try {
1178: synchronized (mutex) {
1179: if (client_sock == null)
1180: return;
1181: in = client_sock.getInputStream();
1182: }
1183: int b = 0;
1184: do {
1185: b = in.read();
1186: } while (b != ABNORMAL_TERMINATION
1187: && b != NORMAL_TERMINATION);
1188: } catch (IOException ex) {
1189: } finally {
1190: Socket sock = client_sock; // PATCH: avoid race condition causing NPE
1191: if (sock != null && !sock.isClosed())
1192: closeClientSocket();
1193: synchronized (clients) {
1194: clients.remove(this );
1195: }
1196: }
1197: }
1198: }
1199:
1200: /**
1201: * Task that periodically broadcasts a list of suspected members to the group. Goal is not to lose
1202: * a SUSPECT message: since these are bcast unreliably, they might get dropped. The BroadcastTask makes
1203: * sure they are retransmitted until a view has been received which doesn't contain the suspected members
1204: * any longer. Then the task terminates.
1205: */
1206: private class BroadcastTask implements TimeScheduler.Task {
1207: final Vector suspected_mbrs = new Vector(7);
1208: boolean stopped = false;
1209:
1210: /** Adds a suspected member. Starts the task if not yet running */
1211: public void addSuspectedMember(Address mbr) {
1212: if (mbr == null)
1213: return;
1214: if (!members.contains(mbr))
1215: return;
1216: synchronized (suspected_mbrs) {
1217: if (!suspected_mbrs.contains(mbr)) {
1218: suspected_mbrs.addElement(mbr);
1219: if (log.isDebugEnabled())
1220: log.debug("mbr=" + mbr + " (size="
1221: + suspected_mbrs.size() + ')');
1222: }
1223: if (stopped && suspected_mbrs.size() > 0) {
1224: stopped = false;
1225: timer.add(this , true);
1226: }
1227: }
1228: }
1229:
1230: public void removeSuspectedMember(Address suspected_mbr) {
1231: if (suspected_mbr == null)
1232: return;
1233: if (log.isDebugEnabled())
1234: log.debug("member is " + suspected_mbr);
1235: synchronized (suspected_mbrs) {
1236: suspected_mbrs.removeElement(suspected_mbr);
1237: if (suspected_mbrs.size() == 0)
1238: stopped = true;
1239: }
1240: }
1241:
1242: public void removeAll() {
1243: synchronized (suspected_mbrs) {
1244: suspected_mbrs.removeAllElements();
1245: stopped = true;
1246: }
1247: }
1248:
1249: /**
1250: * Removes all elements from suspected_mbrs that are <em>not</em> in the new membership
1251: */
1252: public void adjustSuspectedMembers(Vector new_mbrship) {
1253: Address suspected_mbr;
1254:
1255: if (new_mbrship == null || new_mbrship.size() == 0)
1256: return;
1257: synchronized (suspected_mbrs) {
1258: for (Iterator it = suspected_mbrs.iterator(); it
1259: .hasNext();) {
1260: suspected_mbr = (Address) it.next();
1261: if (!new_mbrship.contains(suspected_mbr)) {
1262: it.remove();
1263: if (log.isDebugEnabled())
1264: log.debug("removed " + suspected_mbr
1265: + " (size=" + suspected_mbrs.size()
1266: + ')');
1267: }
1268: }
1269: if (suspected_mbrs.size() == 0)
1270: stopped = true;
1271: }
1272: }
1273:
1274: public boolean cancelled() {
1275: return stopped;
1276: }
1277:
1278: public long nextInterval() {
1279: return suspect_msg_interval;
1280: }
1281:
1282: public void run() {
1283: Message suspect_msg;
1284: FdHeader hdr;
1285:
1286: if (log.isDebugEnabled())
1287: log
1288: .debug("broadcasting SUSPECT message (suspected_mbrs="
1289: + suspected_mbrs + ") to group");
1290:
1291: synchronized (suspected_mbrs) {
1292: if (suspected_mbrs.size() == 0) {
1293: stopped = true;
1294: if (log.isDebugEnabled())
1295: log.debug("task done (no suspected members)");
1296: return;
1297: }
1298:
1299: hdr = new FdHeader(FdHeader.SUSPECT);
1300: hdr.mbrs = (Vector) suspected_mbrs.clone();
1301: }
1302: suspect_msg = new Message(); // mcast SUSPECT to all members
1303: suspect_msg.putHeader(name, hdr);
1304: passDown(new Event(Event.MSG, suspect_msg));
1305: if (log.isDebugEnabled())
1306: log.debug("task done");
1307: }
1308: }
1309:
1310: }
|