001: package org.jgroups.protocols;
002:
003: import org.jgroups.*;
004: import org.jgroups.stack.Protocol;
005:
006: import java.util.*;
007:
008: /**
009: * The Discovery protocol layer retrieves the initial membership (used by the GMS when started
010: * by sending event FIND_INITIAL_MBRS down the stack). We do this by specific subclasses, e.g. by mcasting PING
011: * requests to an IP MCAST address or, if gossiping is enabled, by contacting the GossipRouter.
012: * The responses should allow us to determine the coordinator whom we have to
013: * contact, e.g. in case we want to join the group. When we are a server (after having
014: * received the BECOME_SERVER event), we'll respond to PING requests with a PING
015: * response.<p> The FIND_INITIAL_MBRS event will eventually be answered with a
016: * FIND_INITIAL_MBRS_OK event up the stack.
017: * The following properties are available
018: * <ul>
019: * <li>timeout - the timeout (ms) to wait for the initial members, default is 3000=3 secs
020: * <li>num_initial_members - the minimum number of initial members for a FIND_INITAL_MBRS, default is 2
021: * <li>num_ping_requests - the number of GET_MBRS_REQ messages to be sent (min=1), distributed over timeout ms
022: * </ul>
023: * @author Bela Ban
024: * @version $Id: Discovery.java,v 1.16.2.1 2007/04/27 08:03:51 belaban Exp $
025: */
026: public abstract class Discovery extends Protocol {
027: final Vector members = new Vector(11);
028: Address local_addr = null;
029: String group_addr = null;
030: long timeout = 3000;
031: int num_initial_members = 2;
032: boolean is_server = false;
033: PingWaiter ping_waiter;
034:
035: /** Number of GET_MBRS_REQ messages to be sent (min=1), distributed over timeout ms */
036: int num_ping_requests = 2;
037:
038: int num_discovery_requests = 0;
039:
040: /** Called after local_addr was set */
041: public void localAddressSet(Address addr) {
042: }
043:
044: public abstract void sendGetMembersRequest();
045:
046: /** Called when CONNECT_OK has been received */
047: public void handleConnectOK() {
048: }
049:
050: public void handleDisconnect() {
051: }
052:
053: public void handleConnect() {
054: }
055:
056: public long getTimeout() {
057: return timeout;
058: }
059:
060: public void setTimeout(long timeout) {
061: this .timeout = timeout;
062: if (ping_waiter != null)
063: ping_waiter.setTimeout(timeout);
064: }
065:
066: public int getNumInitialMembers() {
067: return num_initial_members;
068: }
069:
070: public void setNumInitialMembers(int num_initial_members) {
071: this .num_initial_members = num_initial_members;
072: if (ping_waiter != null)
073: ping_waiter.setNumRsps(num_initial_members);
074: }
075:
076: public int getNumPingRequests() {
077: return num_ping_requests;
078: }
079:
080: public void setNumPingRequests(int num_ping_requests) {
081: this .num_ping_requests = num_ping_requests;
082: }
083:
084: public int getNumberOfDiscoveryRequestsSent() {
085: return num_discovery_requests;
086: }
087:
088: public Vector providedUpServices() {
089: Vector ret = new Vector(1);
090: ret.addElement(new Integer(Event.FIND_INITIAL_MBRS));
091: return ret;
092: }
093:
094: /**
095: * sets the properties of the PING protocol.
096: * The following properties are available
097: * property: timeout - the timeout (ms) to wait for the initial members, default is 3000=3 secs
098: * property: num_initial_members - the minimum number of initial members for a FIND_INITAL_MBRS, default is 2
099: * @param props - a property set
100: * @return returns true if all properties were parsed properly
101: * returns false if there are unrecnogized properties in the property set
102: */
103: public boolean setProperties(Properties props) {
104: String str;
105:
106: super .setProperties(props);
107: str = props.getProperty("timeout"); // max time to wait for initial members
108: if (str != null) {
109: timeout = Long.parseLong(str);
110: if (timeout <= 0) {
111: if (log.isErrorEnabled())
112: log.error("timeout must be > 0");
113: return false;
114: }
115: props.remove("timeout");
116: }
117:
118: str = props.getProperty("num_initial_members"); // wait for at most n members
119: if (str != null) {
120: num_initial_members = Integer.parseInt(str);
121: props.remove("num_initial_members");
122: }
123:
124: str = props.getProperty("num_ping_requests"); // number of GET_MBRS_REQ messages
125: if (str != null) {
126: num_ping_requests = Integer.parseInt(str);
127: props.remove("num_ping_requests");
128: if (num_ping_requests < 1)
129: num_ping_requests = 1;
130: }
131:
132: if (props.size() > 0) {
133: StringBuffer sb = new StringBuffer();
134: for (Enumeration e = props.propertyNames(); e
135: .hasMoreElements();) {
136: sb.append(e.nextElement().toString());
137: if (e.hasMoreElements()) {
138: sb.append(", ");
139: }
140: }
141: if (log.isErrorEnabled())
142: log
143: .error("The following properties are not recognized: "
144: + sb);
145: return false;
146: }
147: return true;
148: }
149:
150: public void resetStats() {
151: super .resetStats();
152: num_discovery_requests = 0;
153: }
154:
155: public void start() throws Exception {
156: super .start();
157: PingSender ping_sender = new PingSender(timeout,
158: num_ping_requests, this );
159: if (ping_waiter == null)
160: ping_waiter = new PingWaiter(timeout, num_initial_members,
161: this , ping_sender);
162: }
163:
164: public void stop() {
165: is_server = false;
166: if (ping_waiter != null)
167: ping_waiter.stop();
168: }
169:
170: /**
171: * Finds the initial membership
172: * @return Vector<PingRsp>
173: */
174: public Vector findInitialMembers() {
175: return ping_waiter != null ? ping_waiter.findInitialMembers()
176: : null;
177: }
178:
179: public String findInitialMembersAsString() {
180: Vector results = findInitialMembers();
181: if (results == null || results.size() == 0)
182: return "<empty>";
183: PingRsp rsp;
184: StringBuffer sb = new StringBuffer();
185: for (Iterator it = results.iterator(); it.hasNext();) {
186: rsp = (PingRsp) it.next();
187: sb.append(rsp).append("\n");
188: }
189: return sb.toString();
190: }
191:
192: /**
193: * An event was received from the layer below. Usually the current layer will want to examine
194: * the event type and - depending on its type - perform some computation
195: * (e.g. removing headers from a MSG event type, or updating the internal membership list
196: * when receiving a VIEW_CHANGE event).
197: * Finally the event is either a) discarded, or b) an event is sent down
198: * the stack using <code>PassDown</code> or c) the event (or another event) is sent up
199: * the stack using <code>PassUp</code>.
200: * <p/>
201: * For the PING protocol, the Up operation does the following things.
202: * 1. If the event is a Event.MSG then PING will inspect the message header.
203: * If the header is null, PING simply passes up the event
204: * If the header is PingHeader.GET_MBRS_REQ then the PING protocol
205: * will PassDown a PingRequest message
206: * If the header is PingHeader.GET_MBRS_RSP we will add the message to the initial members
207: * vector and wake up any waiting threads.
208: * 2. If the event is Event.SET_LOCAL_ADDR we will simple set the local address of this protocol
209: * 3. For all other messages we simple pass it up to the protocol above
210: *
211: * @param evt - the event that has been sent from the layer below
212: */
213:
214: public void up(Event evt) {
215: Message msg, rsp_msg;
216: Object obj;
217: PingHeader hdr, rsp_hdr;
218: PingRsp rsp;
219: Address coord;
220:
221: switch (evt.getType()) {
222:
223: case Event.MSG:
224: msg = (Message) evt.getArg();
225: obj = msg.getHeader(getName());
226: if (obj == null || !(obj instanceof PingHeader)) {
227: passUp(evt);
228: return;
229: }
230: hdr = (PingHeader) msg.removeHeader(getName());
231:
232: switch (hdr.type) {
233:
234: case PingHeader.GET_MBRS_REQ: // return Rsp(local_addr, coord)
235: if (local_addr != null && msg.getSrc() != null
236: && local_addr.equals(msg.getSrc())) {
237: return;
238: }
239: synchronized (members) {
240: coord = members.size() > 0 ? (Address) members
241: .firstElement() : local_addr;
242: }
243:
244: PingRsp ping_rsp = new PingRsp(local_addr, coord,
245: is_server);
246: rsp_msg = new Message(msg.getSrc(), null, null);
247: rsp_hdr = new PingHeader(PingHeader.GET_MBRS_RSP,
248: ping_rsp);
249: rsp_msg.putHeader(getName(), rsp_hdr);
250: if (log.isTraceEnabled())
251: log.trace("received GET_MBRS_REQ from "
252: + msg.getSrc() + ", sending response "
253: + rsp_hdr);
254: passDown(new Event(Event.MSG, rsp_msg));
255: return;
256:
257: case PingHeader.GET_MBRS_RSP: // add response to vector and notify waiting thread
258: rsp = hdr.arg;
259:
260: if (log.isTraceEnabled())
261: log.trace("received GET_MBRS_RSP, rsp=" + rsp);
262: ping_waiter.addResponse(rsp);
263: return;
264:
265: default:
266: if (log.isWarnEnabled())
267: log.warn("got PING header with unknown type ("
268: + hdr.type + ')');
269: return;
270: }
271:
272: case Event.SET_LOCAL_ADDRESS:
273: passUp(evt);
274: local_addr = (Address) evt.getArg();
275: localAddressSet(local_addr);
276: break;
277:
278: case Event.CONNECT_OK:
279: handleConnectOK();
280: passUp(evt);
281: break;
282:
283: default:
284: passUp(evt); // Pass up to the layer above us
285: break;
286: }
287: }
288:
289: /**
290: * An event is to be sent down the stack. The layer may want to examine its type and perform
291: * some action on it, depending on the event's type. If the event is a message MSG, then
292: * the layer may need to add a header to it (or do nothing at all) before sending it down
293: * the stack using <code>PassDown</code>. In case of a GET_ADDRESS event (which tries to
294: * retrieve the stack's address from one of the bottom layers), the layer may need to send
295: * a new response event back up the stack using <code>passUp()</code>.
296: * The PING protocol is interested in several different down events,
297: * Event.FIND_INITIAL_MBRS - sent by the GMS layer and expecting a GET_MBRS_OK
298: * Event.TMP_VIEW and Event.VIEW_CHANGE - a view change event
299: * Event.BECOME_SERVER - called after client has joined and is fully working group member
300: * Event.CONNECT, Event.DISCONNECT.
301: */
302: public void down(Event evt) {
303:
304: switch (evt.getType()) {
305:
306: case Event.FIND_INITIAL_MBRS: // sent by GMS layer, pass up a GET_MBRS_OK event
307: // sends the GET_MBRS_REQ to all members, waits 'timeout' ms or until 'num_initial_members' have been retrieved
308: num_discovery_requests++;
309: ping_waiter.start();
310: break;
311:
312: case Event.TMP_VIEW:
313: case Event.VIEW_CHANGE:
314: Vector tmp;
315: if ((tmp = ((View) evt.getArg()).getMembers()) != null) {
316: synchronized (members) {
317: members.clear();
318: members.addAll(tmp);
319: }
320: }
321: passDown(evt);
322: break;
323:
324: case Event.BECOME_SERVER: // called after client has joined and is fully working group member
325: passDown(evt);
326: is_server = true;
327: break;
328:
329: case Event.CONNECT:
330: group_addr = (String) evt.getArg();
331: passDown(evt);
332: handleConnect();
333: break;
334:
335: case Event.DISCONNECT:
336: handleDisconnect();
337: passDown(evt);
338: break;
339:
340: default:
341: passDown(evt); // Pass on to the layer below us
342: break;
343: }
344: }
345:
346: /* -------------------------- Private methods ---------------------------- */
347:
348: protected final View makeView(Vector mbrs) {
349: Address coord;
350: long id;
351: ViewId view_id = new ViewId(local_addr);
352:
353: coord = view_id.getCoordAddress();
354: id = view_id.getId();
355: return new View(coord, id, mbrs);
356: }
357:
358: }
|