001: // $Id: ClientGmsImpl.java,v 1.12.6.1 2007/04/27 08:03:51 belaban Exp $
002:
003: package org.jgroups.protocols;
004:
005: import org.jgroups.Address;
006: import org.jgroups.Event;
007: import org.jgroups.View;
008: import org.jgroups.ViewId;
009: import org.jgroups.blocks.GroupRequest;
010: import org.jgroups.blocks.MethodCall;
011: import org.jgroups.util.Util;
012:
013: import java.util.Enumeration;
014: import java.util.Hashtable;
015: import java.util.Vector;
016:
017: /**
018: * Client part of GMS. Whenever a new member wants to join a group, it starts in the CLIENT role.
019: * No multicasts to the group will be received and processed until the member has been joined and
020: * turned into a SERVER (either coordinator or participant, mostly just participant). This class
021: * only implements <code>Join</code> (called by clients who want to join a certain group, and
022: * <code>ViewChange</code> which is called by the coordinator that was contacted by this client, to
023: * tell the client what its initial membership is.
024: *
025: * @author Bela Ban
026: * @version $Revision: 1.12.6.1 $
027: */
028: public class ClientGmsImpl extends GmsImpl {
029: final Vector initial_mbrs = new Vector(7);
030: final Object view_installation_mutex = new Object();
031: boolean joined = false;
032:
033: public ClientGmsImpl(GMS g) {
034: gms = g;
035: }
036:
037: public void init() {
038: initial_mbrs.removeAllElements();
039: joined = false;
040: }
041:
042: /**
043: * Will generate a CONNECT_OK event. Determines the coordinator and sends a unicast
044: * join() message to it. If successful, we wait for a ViewChange (can time out).
045: * If view change is received, impl is changed to an instance of ParticipantGmsImpl.
046: * Otherwise, we continue trying to send join() messages to the coordinator,
047: * until we succeed (or there is no member in the group. In this case, we create
048: * our own singleton group).
049: * <p>When GMS.disable_initial_coord is set to true, then we won't become coordinator on receiving an initial
050: * membership of 0, but instead will retry (forever) until we get an initial membership of > 0.
051: *
052: * @param mbr Our own address (assigned through SET_LOCAL_ADDRESS)
053: */
054: public void join(Address mbr) {
055: Address coord;
056: Event view_evt;
057:
058: while (!joined) {
059: findInitialMembers();
060: if (joined) {
061: if (log.isInfoEnabled())
062: log.info("joined successfully");
063: return;
064: }
065: if (initial_mbrs.size() == 0) {
066: if (gms.disable_initial_coord) {
067: if (log.isInfoEnabled())
068: log
069: .info("received an initial membership of 0, but "
070: + "cannot become coordinator (disable_initial_coord="
071: + gms.disable_initial_coord
072: + "), will retry fetching the initial membership");
073: continue;
074: }
075: joined = true;
076: gms.view_id = new ViewId(mbr); // create singleton view with mbr as only member
077: gms.mbrs.add(mbr);
078: view_evt = new Event(Event.VIEW_CHANGE, GMS.makeView(
079: gms.mbrs.getMembers(), gms.view_id));
080: gms.passDown(view_evt);
081: gms.passUp(view_evt);
082: gms.becomeCoordinator();
083:
084: gms.passUp(new Event(Event.BECOME_SERVER));
085: gms.passDown(new Event(Event.BECOME_SERVER));
086: if (log.isInfoEnabled())
087: log.info("created group (first member)");
088: break;
089: }
090:
091: coord = determineCoord(initial_mbrs);
092: if (coord == null) {
093: if (log.isWarnEnabled())
094: log
095: .warn("could not determine coordinator from responses "
096: + initial_mbrs);
097: continue;
098: }
099:
100: synchronized (view_installation_mutex) {
101: try {
102: if (log.isInfoEnabled())
103: log.info("sending handleJoin() to " + coord);
104: MethodCall call = new MethodCall("handleJoin",
105: new Object[] { mbr },
106: new Class[] { Address.class });
107: gms.callRemoteMethod(coord, call,
108: GroupRequest.GET_NONE, 0);
109: view_installation_mutex.wait(gms.join_timeout); // wait for view -> handleView()
110: } catch (Exception e) {
111: if (log.isErrorEnabled())
112: log.error("exception is " + e);
113: continue;
114: }
115: } // end synchronized
116:
117: if (joined) {
118: if (log.isInfoEnabled())
119: log.info("joined successfully");
120: return; // --> SUCCESS
121: } else {
122: if (log.isInfoEnabled())
123: log.info("failed, retrying");
124: Util.sleep(gms.join_retry_timeout);
125: }
126:
127: } // end while
128: }
129:
130: public void leave(Address mbr) {
131: wrongMethod("leave");
132: }
133:
134: public void suspect(Address mbr) {
135: // wrongMethod("suspect");
136: }
137:
138: public void merge(Vector other_coords) {
139: wrongMethod("merge");
140: }
141:
142: public boolean handleJoin(Address mbr) {
143: wrongMethod("handleJoin");
144: return false;
145: }
146:
147: /**
148: * Returns false. Clients don't handle leave() requests
149: */
150: public void handleLeave(Address mbr, boolean suspected) {
151: wrongMethod("handleLeave");
152: }
153:
154: /**
155: * Install the first view in which we are a member. This is essentially a confirmation
156: * of our JOIN request (see join() above).
157: */
158: public void handleViewChange(ViewId new_view, Vector mems) {
159: if (gms.local_addr != null && mems != null
160: && mems.contains(gms.local_addr)) {
161: synchronized (view_installation_mutex) { // wait until JOIN is sent (above)
162: joined = true;
163: view_installation_mutex.notifyAll();
164: gms.installView(new_view, mems);
165: gms.becomeParticipant();
166: gms.passUp(new Event(Event.BECOME_SERVER));
167: gms.passDown(new Event(Event.BECOME_SERVER));
168: }
169: synchronized (initial_mbrs) { // in case findInitialMembers() is still running:
170: initial_mbrs.notifyAll(); // this will unblock it
171: }
172: } else if (log.isWarnEnabled())
173: log.warn("am not member of " + mems
174: + ", will not install view");
175: }
176:
177: /**
178: * Returns immediately. Clients don't handle merge() requests
179: */
180: public View handleMerge(ViewId other_view, Vector other_members) {
181: wrongMethod("handleMerge");
182: return null;
183: }
184:
185: /**
186: * Returns immediately. Clients don't handle suspect() requests
187: */
188: public void handleSuspect(Address mbr) {
189: wrongMethod("handleSuspect");
190: }
191:
192: public boolean handleUpEvent(Event evt) {
193: Vector tmp;
194:
195: switch (evt.getType()) {
196:
197: case Event.FIND_INITIAL_MBRS_OK:
198: tmp = (Vector) evt.getArg();
199: synchronized (initial_mbrs) {
200: if (tmp != null && tmp.size() > 0)
201: for (int i = 0; i < tmp.size(); i++)
202: initial_mbrs.addElement(tmp.elementAt(i));
203: initial_mbrs.notifyAll();
204: }
205: return false; // don't pass up the stack
206: }
207:
208: return true;
209: }
210:
211: /* --------------------------- Private Methods ------------------------------------ */
212:
213: /**
214: * Pings initial members. Removes self before returning vector of initial members.
215: * Uses IP multicast or gossiping, depending on parameters.
216: */
217: void findInitialMembers() {
218: PingRsp ping_rsp;
219:
220: synchronized (initial_mbrs) {
221: initial_mbrs.removeAllElements();
222: gms.passDown(Event.FIND_INITIAL_MBRS_EVT);
223: if (initial_mbrs.size() == 0) {
224: try {
225: initial_mbrs.wait();
226: } catch (Exception e) {
227: }
228: }
229:
230: for (int i = 0; i < initial_mbrs.size(); i++) {
231: ping_rsp = (PingRsp) initial_mbrs.elementAt(i);
232: if (ping_rsp.own_addr != null && gms.local_addr != null
233: && ping_rsp.own_addr.equals(gms.local_addr)) {
234: initial_mbrs.removeElementAt(i);
235: break;
236: }
237: }
238: }
239: }
240:
241: /**
242: * The coordinator is determined by a majority vote. If there are an equal number of votes for
243: * more than 1 candidate, we determine the winner randomly.
244: */
245: Address determineCoord(Vector mbrs) {
246: PingRsp mbr;
247: Hashtable votes;
248: int count, most_votes;
249: Address winner = null, tmp;
250:
251: if (mbrs == null || mbrs.size() < 1)
252: return null;
253:
254: votes = new Hashtable(5);
255:
256: // count *all* the votes (unlike the 2000 election)
257: for (int i = 0; i < mbrs.size(); i++) {
258: mbr = (PingRsp) mbrs.elementAt(i);
259: if (mbr.coord_addr != null) {
260: if (!votes.containsKey(mbr.coord_addr))
261: votes.put(mbr.coord_addr, new Integer(1));
262: else {
263: count = ((Integer) votes.get(mbr.coord_addr))
264: .intValue();
265: votes.put(mbr.coord_addr, new Integer(count + 1));
266: }
267: }
268: }
269:
270: if (votes.size() > 1)
271: if (log.isWarnEnabled())
272: log
273: .warn("there was more than 1 candidate for coordinator: "
274: + votes);
275: else if (log.isInfoEnabled())
276: log.info("election results: " + votes);
277:
278: // determine who got the most votes
279: most_votes = 0;
280: for (Enumeration e = votes.keys(); e.hasMoreElements();) {
281: tmp = (Address) e.nextElement();
282: count = ((Integer) votes.get(tmp)).intValue();
283: if (count > most_votes) {
284: winner = tmp;
285: // fixed July 15 2003 (patch submitted by Darren Hobbs, patch-id=771418)
286: most_votes = count;
287: }
288: }
289: votes.clear();
290: return winner;
291: }
292:
293: }
|