001: // $Id: PING.java,v 1.30.2.1 2007/04/27 08:03:51 belaban Exp $
002:
003: package org.jgroups.protocols;
004:
005: import org.jgroups.*;
006: import org.jgroups.stack.GossipClient;
007: import org.jgroups.stack.IpAddress;
008: import org.jgroups.util.List;
009: import org.jgroups.util.Util;
010:
011: import java.net.InetAddress;
012: import java.net.UnknownHostException;
013: import java.util.*;
014:
015: /**
016: * The PING protocol layer retrieves the initial membership (used by the GMS when started
017: * by sending event FIND_INITIAL_MBRS down the stack). We do this by mcasting PING
018: * requests to an IP MCAST address (or, if gossiping is enabled, by contacting the GossipRouter).
019: * The responses should allow us to determine the coordinator whom we have to
020: * contact, e.g. in case we want to join the group. When we are a server (after having
021: * received the BECOME_SERVER event), we'll respond to PING requests with a PING
022: * response.<p> The FIND_INITIAL_MBRS event will eventually be answered with a
023: * FIND_INITIAL_MBRS_OK event up the stack.
024: * The following properties are available
025: * property: gossip_host - if you are using GOSSIP then this defines the host of the GossipRouter, default is null
026: * property: gossip_port - if you are using GOSSIP then this defines the port of the GossipRouter, default is null
027: */
028: public class PING extends Discovery {
029: String gossip_host = null;
030: int gossip_port = 0;
031: long gossip_refresh = 20000; // time in msecs after which the entry in GossipRouter will be refreshed
032: GossipClient client;
033: int port_range = 1; // number of ports to be probed for initial membership
034: List initial_hosts = null; // hosts to be contacted for the initial membership
035: public static final String name = "PING";
036:
037: public String getName() {
038: return name;
039: }
040:
041: /**
042: * sets the properties of the PING protocol.
043: * The following properties are available
044: * property: timeout - the timeout (ms) to wait for the initial members, default is 3000=3 secs
045: * property: num_initial_members - the minimum number of initial members for a FIND_INITAL_MBRS, default is 2
046: * property: gossip_host - if you are using GOSSIP then this defines the host of the GossipRouter, default is null
047: * property: gossip_port - if you are using GOSSIP then this defines the port of the GossipRouter, default is null
048: *
049: * @param props - a property set containing only PING properties
050: * @return returns true if all properties were parsed properly
051: * returns false if there are unrecnogized properties in the property set
052: */
053: public boolean setProperties(Properties props) {
054: String str;
055:
056: str = props.getProperty("gossip_host");
057: if (str != null) {
058: gossip_host = str;
059: props.remove("gossip_host");
060: }
061:
062: str = props.getProperty("gossip_port");
063: if (str != null) {
064: gossip_port = Integer.parseInt(str);
065: props.remove("gossip_port");
066: }
067:
068: str = props.getProperty("gossip_refresh");
069: if (str != null) {
070: gossip_refresh = Long.parseLong(str);
071: props.remove("gossip_refresh");
072: }
073:
074: if (gossip_host != null && gossip_port != 0) {
075: try {
076: client = new GossipClient(new IpAddress(InetAddress
077: .getByName(gossip_host), gossip_port),
078: gossip_refresh);
079: } catch (Exception e) {
080: if (log.isErrorEnabled())
081: log
082: .error("creation of GossipClient failed, exception="
083: + e);
084: return false; // will cause stack creation to abort
085: }
086: }
087:
088: str = props.getProperty("port_range"); // if member cannot be contacted on base port,
089: if (str != null) { // how many times can we increment the port
090: port_range = Integer.parseInt(str);
091: if (port_range < 1) {
092: port_range = 1;
093: }
094: props.remove("port_range");
095: }
096:
097: str = props.getProperty("initial_hosts");
098: if (str != null) {
099: props.remove("initial_hosts");
100: try {
101: initial_hosts = createInitialHosts(str);
102: } catch (UnknownHostException e) {
103: if (log.isErrorEnabled())
104: log
105: .error(
106: "failed constructing initial list of hosts",
107: e);
108: return false;
109: }
110: }
111:
112: return super .setProperties(props);
113: }
114:
115: public void stop() {
116: super .stop();
117: if (client != null) {
118: client.stop();
119: }
120: }
121:
122: public void localAddressSet(Address addr) {
123: // Add own address to initial_hosts if not present: we must always be able to ping ourself !
124: if (initial_hosts != null && local_addr != null) {
125: List hlist;
126: boolean inInitialHosts = false;
127: for (Enumeration en = initial_hosts.elements(); en
128: .hasMoreElements()
129: && !inInitialHosts;) {
130: hlist = (List) en.nextElement();
131: if (hlist.contains(local_addr)) {
132: inInitialHosts = true;
133: }
134: }
135: if (!inInitialHosts) {
136: hlist = new List();
137: hlist.add(local_addr);
138: initial_hosts.add(hlist);
139: if (log.isDebugEnabled())
140: log.debug("adding my address (" + local_addr
141: + ") to initial_hosts; initial_hosts="
142: + initial_hosts);
143: }
144: }
145: }
146:
147: public void handleConnect() {
148: if (client != null)
149: client.register(group_addr, local_addr);
150: }
151:
152: public void handleDisconnect() {
153: if (client != null)
154: client.stop();
155: }
156:
157: public void sendGetMembersRequest() {
158: Message msg;
159: PingHeader hdr;
160: java.util.List gossip_rsps;
161:
162: if (client != null) {
163: gossip_rsps = client.getMembers(group_addr);
164: if (gossip_rsps != null && gossip_rsps.size() > 0) {
165: // Set a temporary membership in the UDP layer, so that the following multicast
166: // will be sent to all of them
167: Event view_event = new Event(Event.TMP_VIEW,
168: makeView(new Vector(gossip_rsps)));
169: passDown(view_event); // needed e.g. by failure detector or UDP
170: } else {
171: passUp(new Event(Event.FIND_INITIAL_MBRS_OK, null));
172: return;
173: }
174:
175: if (gossip_rsps.size() > 0) {
176: for (Iterator it = gossip_rsps.iterator(); it.hasNext();) {
177: Address dest = (Address) it.next();
178: msg = new Message(dest, null, null); // unicast msg
179: msg.putHeader(getName(), new PingHeader(
180: PingHeader.GET_MBRS_REQ, null));
181: passDown(new Event(Event.MSG, msg));
182: }
183: }
184:
185: Util.sleep(500);
186: } else {
187: if (initial_hosts != null && initial_hosts.size() > 0) {
188: IpAddress h;
189: List hlist;
190: msg = new Message(null);
191: msg.putHeader(getName(), new PingHeader(
192: PingHeader.GET_MBRS_REQ, null));
193: for (Enumeration en = initial_hosts.elements(); en
194: .hasMoreElements();) {
195: hlist = (List) en.nextElement();
196: boolean isMember = false;
197: for (Enumeration hen = hlist.elements(); hen
198: .hasMoreElements()
199: && !isMember;) {
200: h = (IpAddress) hen.nextElement();
201: msg.setDest(h);
202: if (log.isTraceEnabled())
203: log
204: .trace("[FIND_INITIAL_MBRS] sending PING request to "
205: + msg.getDest());
206: passDown(new Event(Event.MSG, msg.copy()));
207: }
208: }
209: } else {
210: // 1. Mcast GET_MBRS_REQ message
211: hdr = new PingHeader(PingHeader.GET_MBRS_REQ, null);
212: msg = new Message(null); // mcast msg
213: msg.putHeader(getName(), hdr); // needs to be getName(), so we might get "MPING" !
214: sendMcastDiscoveryRequest(msg);
215: }
216: }
217: }
218:
219: void sendMcastDiscoveryRequest(Message discovery_request) {
220: passDown(new Event(Event.MSG, discovery_request));
221: }
222:
223: /* -------------------------- Private methods ---------------------------- */
224:
225: /**
226: * Input is "daddy[8880],sindhu[8880],camille[5555]. Return List of IpAddresses
227: */
228: private List createInitialHosts(String l)
229: throws UnknownHostException {
230: List tmp = new List();
231: StringTokenizer tok = new StringTokenizer(l, ",");
232: String t;
233:
234: while (tok.hasMoreTokens()) {
235: try {
236: t = tok.nextToken();
237: String host = t.substring(0, t.indexOf('['));
238: int port = Integer.parseInt(t.substring(
239: t.indexOf('[') + 1, t.indexOf(']')));
240: List hosts = new List();
241: for (int i = port; i < port + port_range; i++) {
242: hosts.add(new IpAddress(host, i));
243: }
244: tmp.add(hosts);
245: } catch (NumberFormatException e) {
246: if (log.isErrorEnabled())
247: log.error("exeption is " + e);
248: }
249: }
250: return tmp;
251: }
252:
253: }
|