001: // $Id: ClientGmsImpl.java,v 1.34.2.2 2007/04/27 08:03:55 belaban Exp $
002:
003: package org.jgroups.protocols.pbcast;
004:
005: import org.jgroups.*;
006: import org.jgroups.protocols.PingRsp;
007: import org.jgroups.util.Promise;
008: import org.jgroups.util.Util;
009:
010: import java.util.*;
011:
012: /**
013: * Client part of GMS. Whenever a new member wants to join a group, it starts in the CLIENT role.
014: * No multicasts to the group will be received and processed until the member has been joined and
015: * turned into a SERVER (either coordinator or participant, mostly just participant). This class
016: * only implements <code>Join</code> (called by clients who want to join a certain group, and
017: * <code>ViewChange</code> which is called by the coordinator that was contacted by this client, to
018: * tell the client what its initial membership is.
019: * @author Bela Ban
020: * @version $Revision: 1.34.2.2 $
021: */
022: public class ClientGmsImpl extends GmsImpl {
023: private final Vector initial_mbrs = new Vector(11);
024: private boolean initial_mbrs_received = false;
025: private final Promise join_promise = new Promise();
026:
027: public ClientGmsImpl(GMS g) {
028: super (g);
029: }
030:
031: public void init() throws Exception {
032: super .init();
033: synchronized (initial_mbrs) {
034: initial_mbrs.clear();
035: initial_mbrs_received = false;
036: }
037: join_promise.reset();
038: }
039:
040: /**
041: * Joins this process to a group. Determines the coordinator and sends a unicast
042: * handleJoin() message to it. The coordinator returns a JoinRsp and then broadcasts the new view, which
043: * contains a message digest and the current membership (including the joiner). The joiner is then
044: * supposed to install the new view and the digest and starts accepting mcast messages. Previous
045: * mcast messages were discarded (this is done in PBCAST).<p>
046: * If successful, impl is changed to an instance of ParticipantGmsImpl.
047: * Otherwise, we continue trying to send join() messages to the coordinator,
048: * until we succeed (or there is no member in the group. In this case, we create our own singleton group).
049: * <p>When GMS.disable_initial_coord is set to true, then we won't become coordinator on receiving an initial
050: * membership of 0, but instead will retry (forever) until we get an initial membership of > 0.
051: * @param mbr Our own address (assigned through SET_LOCAL_ADDRESS)
052: */
053: public void join(Address mbr) {
054: Address coord;
055: JoinRsp rsp;
056: Digest tmp_digest;
057: View tmp_view;
058: leaving = false;
059:
060: join_promise.reset();
061: while (!leaving) {
062: findInitialMembers();
063: if (log.isDebugEnabled())
064: log.debug("initial_mbrs are " + initial_mbrs);
065: if (initial_mbrs.size() == 0) {
066: if (gms.disable_initial_coord) {
067: if (log.isTraceEnabled())
068: log
069: .trace("received an initial membership of 0, but cannot become coordinator "
070: + "(disable_initial_coord=true), will retry fetching the initial membership");
071: continue;
072: }
073: if (log.isDebugEnabled())
074: log
075: .debug("no initial members discovered: creating group as first member");
076: becomeSingletonMember(mbr);
077: return;
078: }
079:
080: coord = determineCoord(initial_mbrs);
081: if (coord == null) { // e.g. because we have all clients only
082: if (gms.handle_concurrent_startup == false) {
083: if (log.isTraceEnabled())
084: log
085: .trace("handle_concurrent_startup is false; ignoring responses of initial clients");
086: becomeSingletonMember(mbr);
087: return;
088: }
089:
090: if (log.isTraceEnabled())
091: log
092: .trace("could not determine coordinator from responses "
093: + initial_mbrs);
094:
095: // so the member to become singleton member (and thus coord) is the first of all clients
096: Set clients = new TreeSet(); // sorted
097: clients.add(mbr); // add myself again (was removed by findInitialMembers())
098: for (int i = 0; i < initial_mbrs.size(); i++) {
099: PingRsp pingRsp = (PingRsp) initial_mbrs
100: .elementAt(i);
101: Address client_addr = pingRsp.getAddress();
102: if (client_addr != null)
103: clients.add(client_addr);
104: }
105: if (log.isTraceEnabled())
106: log.trace("clients to choose new coord from are: "
107: + clients);
108: Address new_coord = (Address) clients.iterator().next();
109: if (new_coord.equals(mbr)) {
110: if (log.isTraceEnabled())
111: log
112: .trace("I ("
113: + mbr
114: + ") am the first of the clients, will become coordinator");
115: becomeSingletonMember(mbr);
116: return;
117: } else {
118: if (log.isTraceEnabled())
119: log
120: .trace("I ("
121: + mbr
122: + ") am not the first of the clients, "
123: + "waiting for another client to become coordinator");
124: Util.sleep(500);
125: }
126: continue;
127: }
128:
129: try {
130: if (log.isDebugEnabled())
131: log.debug("sending handleJoin(" + mbr + ") to "
132: + coord);
133: sendJoinMessage(coord, mbr);
134: rsp = (JoinRsp) join_promise
135: .getResult(gms.join_timeout);
136:
137: if (rsp == null) {
138: if (log.isWarnEnabled())
139: log.warn("join(" + mbr + ") sent to " + coord
140: + " timed out, retrying");
141: } else {
142: // 1. check whether JOIN was rejected
143: String failure = rsp.getFailReason();
144: if (failure != null)
145: throw new SecurityException(failure);
146:
147: // 2. Install digest
148: tmp_digest = rsp.getDigest();
149: tmp_view = rsp.getView();
150: if (tmp_digest == null || tmp_view == null) {
151: if (log.isErrorEnabled())
152: log
153: .error("JoinRsp has a null view or digest: view="
154: + tmp_view
155: + ", digest="
156: + tmp_digest
157: + ", skipping it");
158: } else {
159: tmp_digest.incrementHighSeqno(coord); // see DESIGN for an explanantion
160: gms.setDigest(tmp_digest);
161:
162: if (log.isDebugEnabled())
163: log.debug("[" + gms.local_addr
164: + "]: JoinRsp=" + tmp_view
165: + " [size=" + tmp_view.size()
166: + "]\n\n");
167:
168: if (!installView(tmp_view)) {
169: if (log.isErrorEnabled())
170: log
171: .error("view installation failed, retrying to join group");
172: Util.sleep(gms.join_retry_timeout);
173: continue;
174: }
175:
176: // send VIEW_ACK to sender of view
177: Message view_ack = new Message(coord, null,
178: null);
179: GMS.GmsHeader tmphdr = new GMS.GmsHeader(
180: GMS.GmsHeader.VIEW_ACK, tmp_view);
181: view_ack.putHeader(GMS.name, tmphdr);
182: gms.passDown(new Event(Event.MSG, view_ack));
183:
184: gms.passUp(new Event(Event.BECOME_SERVER));
185: gms.passDown(new Event(Event.BECOME_SERVER));
186: return;
187: }
188: }
189: } catch (SecurityException security_ex) {
190: throw security_ex;
191: } catch (Throwable e) {
192: if (log.isDebugEnabled())
193: log.debug("exception=" + e + ", retrying");
194: }
195:
196: Util.sleep(gms.join_retry_timeout);
197: }
198: }
199:
200: public void leave(Address mbr) {
201: leaving = true;
202: wrongMethod("leave");
203: }
204:
205: public void handleJoinResponse(JoinRsp join_rsp) {
206: join_promise.setResult(join_rsp); // will wake up join() method
207: }
208:
209: public void handleLeaveResponse() {
210: }
211:
212: public void suspect(Address mbr) {
213: }
214:
215: public void unsuspect(Address mbr) {
216: }
217:
218: public void handleMembershipChange(Collection newMembers,
219: Collection leavingMembers, Collection suspectedMembers) {
220: }
221:
222: /**
223: * Does nothing. Discards all views while still client.
224: */
225: public synchronized void handleViewChange(View new_view,
226: Digest digest) {
227: if (log.isTraceEnabled())
228: log.trace("view " + new_view.getVid()
229: + " is discarded as we are not a participant");
230: }
231:
232: /**
233: * Called by join(). Installs the view returned by calling Coord.handleJoin() and
234: * becomes coordinator.
235: */
236: private boolean installView(View new_view) {
237: Vector mems = new_view.getMembers();
238: if (log.isDebugEnabled())
239: log.debug("new_view=" + new_view);
240: if (gms.local_addr == null || mems == null
241: || !mems.contains(gms.local_addr)) {
242: if (log.isErrorEnabled())
243: log.error("I (" + gms.local_addr
244: + ") am not member of " + mems
245: + ", will not install view");
246: return false;
247: }
248: gms.installView(new_view);
249: gms.becomeParticipant();
250: gms.passUp(new Event(Event.BECOME_SERVER));
251: gms.passDown(new Event(Event.BECOME_SERVER));
252: return true;
253: }
254:
255: /** Returns immediately. Clients don't handle suspect() requests */
256: // public void handleSuspect(Address mbr) {
257: // }
258:
259: public boolean handleUpEvent(Event evt) {
260: Vector tmp;
261:
262: switch (evt.getType()) {
263:
264: case Event.FIND_INITIAL_MBRS_OK:
265: tmp = (Vector) evt.getArg();
266: synchronized (initial_mbrs) {
267: if (tmp != null && tmp.size() > 0) {
268: initial_mbrs.addAll(tmp);
269: }
270: initial_mbrs_received = true;
271: initial_mbrs.notifyAll();
272: }
273: return false; // don't pass up the stack
274: }
275: return true;
276: }
277:
278: /* --------------------------- Private Methods ------------------------------------ */
279:
280: void sendJoinMessage(Address coord, Address mbr) {
281: Message msg;
282: GMS.GmsHeader hdr;
283:
284: msg = new Message(coord, null, null);
285: hdr = new GMS.GmsHeader(GMS.GmsHeader.JOIN_REQ, mbr);
286: msg.putHeader(gms.getName(), hdr);
287: gms.passDown(new Event(Event.MSG, msg));
288: }
289:
290: /**
291: * Pings initial members. Removes self before returning vector of initial members.
292: * Uses IP multicast or gossiping, depending on parameters.
293: */
294: void findInitialMembers() {
295: PingRsp ping_rsp;
296:
297: synchronized (initial_mbrs) {
298: initial_mbrs.removeAllElements();
299: initial_mbrs_received = false;
300: gms.passDown(new Event(Event.FIND_INITIAL_MBRS));
301:
302: // the initial_mbrs_received flag is needed when passDown() is executed on the same thread, so when
303: // it returns, a response might actually have been received (even though the initial_mbrs might still be empty)
304: if (initial_mbrs_received == false) {
305: try {
306: initial_mbrs.wait();
307: } catch (Exception e) {
308: }
309: }
310:
311: for (int i = 0; i < initial_mbrs.size(); i++) {
312: ping_rsp = (PingRsp) initial_mbrs.elementAt(i);
313: if (ping_rsp.own_addr != null && gms.local_addr != null
314: && ping_rsp.own_addr.equals(gms.local_addr)) {
315: initial_mbrs.removeElementAt(i);
316: break;
317: }
318: }
319: }
320: }
321:
322: /**
323: The coordinator is determined by a majority vote. If there are an equal number of votes for
324: more than 1 candidate, we determine the winner randomly.
325: */
326: private Address determineCoord(Vector mbrs) {
327: PingRsp mbr;
328: Hashtable votes;
329: int count, most_votes;
330: Address winner = null, tmp;
331:
332: if (mbrs == null || mbrs.size() < 1)
333: return null;
334:
335: votes = new Hashtable(5);
336:
337: // count *all* the votes (unlike the 2000 election)
338: for (int i = 0; i < mbrs.size(); i++) {
339: mbr = (PingRsp) mbrs.elementAt(i);
340: if (mbr.is_server && mbr.coord_addr != null) {
341: if (!votes.containsKey(mbr.coord_addr))
342: votes.put(mbr.coord_addr, new Integer(1));
343: else {
344: count = ((Integer) votes.get(mbr.coord_addr))
345: .intValue();
346: votes.put(mbr.coord_addr, new Integer(count + 1));
347: }
348: }
349: }
350:
351: if (votes.size() > 1) {
352: if (log.isWarnEnabled())
353: log
354: .warn("there was more than 1 candidate for coordinator: "
355: + votes);
356: } else {
357: if (log.isDebugEnabled())
358: log.debug("election results: " + votes);
359: }
360:
361: // determine who got the most votes
362: most_votes = 0;
363: for (Enumeration e = votes.keys(); e.hasMoreElements();) {
364: tmp = (Address) e.nextElement();
365: count = ((Integer) votes.get(tmp)).intValue();
366: if (count > most_votes) {
367: winner = tmp;
368: // fixed July 15 2003 (patch submitted by Darren Hobbs, patch-id=771418)
369: most_votes = count;
370: }
371: }
372: votes.clear();
373: return winner;
374: }
375:
376: void becomeSingletonMember(Address mbr) {
377: Digest initial_digest;
378: ViewId view_id;
379: Vector mbrs = new Vector(1);
380:
381: // set the initial digest (since I'm the first member)
382: initial_digest = new Digest(1); // 1 member (it's only me)
383: initial_digest.add(gms.local_addr, 0, 0); // initial seqno mcast by me will be 1 (highest seen +1)
384: gms.setDigest(initial_digest);
385:
386: view_id = new ViewId(mbr); // create singleton view with mbr as only member
387: mbrs.addElement(mbr);
388: gms.installView(new View(view_id, mbrs));
389: gms.becomeCoordinator(); // not really necessary - installView() should do it
390:
391: gms.passUp(new Event(Event.BECOME_SERVER));
392: gms.passDown(new Event(Event.BECOME_SERVER));
393: if (log.isDebugEnabled())
394: log.debug("created group (first member). My view is "
395: + gms.view_id + ", impl is "
396: + gms.getImpl().getClass().getName());
397: }
398:
399: }
|