001: // $Id: TCPGOSSIP.java,v 1.20.2.1 2007/04/27 08:03:51 belaban Exp $
002:
003: package org.jgroups.protocols;
004:
005: import org.jgroups.Address;
006: import org.jgroups.Event;
007: import org.jgroups.Message;
008: import org.jgroups.stack.GossipClient;
009: import org.jgroups.stack.IpAddress;
010:
011: import java.util.*;
012: import java.net.UnknownHostException;
013:
014: /**
015: * The TCPGOSSIP protocol layer retrieves the initial membership (used by the GMS when started
016: * by sending event FIND_INITIAL_MBRS down the stack).
017: * We do this by contacting one or more GossipRouters, which must be running at well-known
018: * addresses:ports. The responses should allow us to determine the coordinator whom we have to
019: * contact, e.g. in case we want to join the group. When we are a server (after having
020: * received the BECOME_SERVER event), we'll respond to TCPGOSSIP requests with a TCPGOSSIP
021: * response.<p> The FIND_INITIAL_MBRS event will eventually be answered with a
022: * FIND_INITIAL_MBRS_OK event up the stack.
023: *
024: * @author Bela Ban
025: */
026: public class TCPGOSSIP extends Discovery {
027: Vector initial_hosts = null; // (list of IpAddresses) hosts to be contacted for the initial membership
028: GossipClient gossip_client = null; // accesses the GossipRouter(s) to find initial mbrship
029:
030: // we need to refresh the registration with the GossipRouter(s) periodically,
031: // so that our entries are not purged from the cache
032: long gossip_refresh_rate = 20000;
033:
034: final static Vector EMPTY_VECTOR = new Vector();
035: final static String name = "TCPGOSSIP";
036:
037: public String getName() {
038: return name;
039: }
040:
041: public boolean setProperties(Properties props) {
042: String str;
043: str = props.getProperty("gossip_refresh_rate"); // wait for at most n members
044: if (str != null) {
045: gossip_refresh_rate = Integer.parseInt(str);
046: props.remove("gossip_refresh_rate");
047: }
048:
049: str = props.getProperty("initial_hosts");
050: if (str != null) {
051: props.remove("initial_hosts");
052: try {
053: initial_hosts = createInitialHosts(str);
054: } catch (UnknownHostException ex) {
055: if (log.isErrorEnabled())
056: log.error("failed creating initial hosts", ex);
057: return false;
058: }
059: }
060:
061: if (initial_hosts == null || initial_hosts.size() == 0) {
062: if (log.isErrorEnabled())
063: log
064: .error("initial_hosts must contain the address of at least one GossipRouter");
065: return false;
066: }
067: return super .setProperties(props);
068: }
069:
070: public void start() throws Exception {
071: super .start();
072: if (gossip_client == null)
073: gossip_client = new GossipClient(initial_hosts,
074: gossip_refresh_rate);
075: }
076:
077: public void stop() {
078: super .stop();
079: if (gossip_client != null) {
080: gossip_client.stop();
081: gossip_client = null;
082: }
083: }
084:
085: public void destroy() {
086: if (gossip_client != null) {
087: gossip_client.destroy();
088: gossip_client = null;
089: }
090: }
091:
092: public void handleConnectOK() {
093: if (group_addr == null || local_addr == null) {
094: if (log.isErrorEnabled())
095: log
096: .error("[CONNECT_OK]: group_addr or local_addr is null. "
097: + "cannot register with GossipRouter(s)");
098: } else {
099: if (log.isTraceEnabled())
100: log
101: .trace("[CONNECT_OK]: registering "
102: + local_addr + " under " + group_addr
103: + " with GossipRouter");
104: gossip_client.register(group_addr, local_addr);
105: }
106: }
107:
108: public void sendGetMembersRequest() {
109: Message msg, copy;
110: PingHeader hdr;
111: List tmp_mbrs;
112: Address mbr_addr;
113:
114: if (group_addr == null) {
115: if (log.isErrorEnabled())
116: log
117: .error("[FIND_INITIAL_MBRS]: group_addr is null, cannot get mbrship");
118: passUp(new Event(Event.FIND_INITIAL_MBRS_OK, EMPTY_VECTOR));
119: return;
120: }
121: if (log.isTraceEnabled())
122: log.trace("fetching members from GossipRouter(s)");
123: tmp_mbrs = gossip_client.getMembers(group_addr);
124: if (tmp_mbrs == null || tmp_mbrs.size() == 0) {
125: if (log.isErrorEnabled())
126: log
127: .error("[FIND_INITIAL_MBRS]: gossip client found no members");
128: passUp(new Event(Event.FIND_INITIAL_MBRS_OK, EMPTY_VECTOR));
129: return;
130: }
131: if (log.isTraceEnabled())
132: log.trace("consolidated mbrs from GossipRouter(s) are "
133: + tmp_mbrs);
134:
135: // 1. 'Mcast' GET_MBRS_REQ message
136: hdr = new PingHeader(PingHeader.GET_MBRS_REQ, null);
137: msg = new Message(null);
138: msg.putHeader(name, hdr);
139:
140: for (Iterator it = tmp_mbrs.iterator(); it.hasNext();) {
141: mbr_addr = (Address) it.next();
142: copy = msg.copy();
143: copy.setDest(mbr_addr);
144: if (log.isTraceEnabled())
145: log
146: .trace("[FIND_INITIAL_MBRS] sending PING request to "
147: + copy.getDest());
148: passDown(new Event(Event.MSG, copy));
149: }
150: }
151:
152: /* -------------------------- Private methods ---------------------------- */
153:
154: /**
155: * Input is "daddy[8880],sindhu[8880],camille[5555]. Return list of IpAddresses
156: */
157: private Vector createInitialHosts(String l)
158: throws UnknownHostException {
159: Vector tmp = new Vector();
160: String host;
161: int port;
162: IpAddress addr;
163: StringTokenizer tok = new StringTokenizer(l, ",");
164: String t;
165:
166: while (tok.hasMoreTokens()) {
167: try {
168: t = tok.nextToken();
169: host = t.substring(0, t.indexOf('['));
170: port = Integer.parseInt(t.substring(t.indexOf('[') + 1,
171: t.indexOf(']')));
172: addr = new IpAddress(host, port);
173: tmp.addElement(addr);
174: } catch (NumberFormatException e) {
175: if (log.isErrorEnabled())
176: log.error("exeption is " + e);
177: }
178: }
179:
180: return tmp;
181: }
182:
183: }
|