001: // $Id: CoordGmsImpl.java,v 1.9.10.1 2007/04/27 08:03:51 belaban Exp $
002:
003: package org.jgroups.protocols;
004:
005: import org.jgroups.Address;
006: import org.jgroups.Event;
007: import org.jgroups.View;
008: import org.jgroups.ViewId;
009: import org.jgroups.blocks.GroupRequest;
010: import org.jgroups.blocks.MethodCall;
011:
012: import java.util.Collections;
013: import java.util.HashSet;
014: import java.util.Iterator;
015: import java.util.Vector;
016:
017: public class CoordGmsImpl extends GmsImpl {
018: boolean leaving = false;
019: boolean received_last_view = false;
020: final Object leave_mutex = new Object();
021:
022: public CoordGmsImpl(GMS g) {
023: gms = g;
024: }
025:
026: public void init() {
027: leaving = false;
028: received_last_view = false;
029: }
030:
031: public void join(Address mbr) {
032: wrongMethod("join");
033: }
034:
035: /**
036: * The coordinator itself wants to leave the group
037: */
038: public void leave(Address mbr) {
039: if (mbr.equals(gms.local_addr))
040: leaving = true;
041:
042: handleLeave(mbr, false); // regular leave
043: synchronized (leave_mutex) {
044: if (leaving && received_last_view) // handleViewChange() has acquired leave_mutex before us...
045: return;
046: try {
047: leave_mutex.wait(gms.leave_timeout); // will be notified by handleViewChange()
048: } catch (Exception e) {
049: }
050: }
051: }
052:
053: public void suspect(Address mbr) {
054: handleSuspect(mbr);
055: }
056:
057: /**
058: * Invoked upon receiving a MERGE event from the MERGE layer. We have found a partition and
059: * should merge with them, then I will become a Participant.
060: *
061: * @param other_coords A list of other coordinators found. In the current implementation the list
062: * only has a single element
063: */
064: public void merge(Vector other_coords) {
065: View new_view;
066: Address other_coord = other_coords != null ? (Address) other_coords
067: .elementAt(0)
068: : null;
069:
070: if (log.isInfoEnabled())
071: log.info("other_coord = " + other_coord);
072: try {
073: MethodCall call = new MethodCall(
074: "handleMerge",
075: new Object[] { gms.view_id, gms.mbrs.getMembers() },
076: new String[] { ViewId.class.getName(),
077: Vector.class.getName() });
078: new_view = (View) gms.callRemoteMethod(other_coord, call,
079: GroupRequest.GET_ALL, 0);
080: } catch (Exception ex) {
081: if (log.isErrorEnabled())
082: log.error("timed out or was suspected");
083: return;
084: }
085: if (new_view == null) {
086: if (log.isWarnEnabled())
087: log.warn("received a Merge Denied");
088: gms.passDown(new Event(Event.MERGE_DENIED));
089: return; //Merge denied
090: }
091:
092: //Flushing my old view
093: gms.flush(gms.mbrs.getMembers(), null);
094: MethodCall call = new MethodCall(
095: "handleViewChange",
096: new Object[] { new_view.getVid(), new_view.getMembers() },
097: new String[] { ViewId.class.getName(),
098: Vector.class.getName() });
099: gms.callRemoteMethods(gms.mbrs.getMembers(), call,
100: GroupRequest.GET_ALL, 0);
101: gms.becomeParticipant();
102: if (log.isInfoEnabled())
103: log.info("merge done");
104: }
105:
106: public synchronized boolean handleJoin(Address mbr) {
107: Vector new_mbrs = new Vector(1);
108:
109: if (log.isInfoEnabled())
110: log.info("received JOIN request from " + mbr);
111: if (gms.local_addr.equals(mbr)) {
112: if (log.isErrorEnabled())
113: log.error("cannot join myself !");
114: return false;
115: }
116: if (gms.mbrs.contains(mbr)) {
117: if (log.isWarnEnabled())
118: log.warn("member " + mbr + " already present !");
119: return true; // already joined
120: }
121:
122: new_mbrs.addElement(mbr);
123: gms.castViewChange(new_mbrs, null, null);
124: return true;
125: }
126:
127: /**
128: * Exclude <code>mbr</code> from the membership. If <code>suspected</code> is true, then
129: * this member crashed and therefore is forced to leave, otherwise it is leaving voluntarily.
130: */
131: public synchronized void handleLeave(Address mbr, boolean suspected) {
132: Vector v = new Vector(1); // contains either leaving mbrs or suspected mbrs
133: if (!gms.mbrs.contains(mbr)) {
134: if (log.isErrorEnabled())
135: log.error("mbr " + mbr + " is not a member !");
136: return;
137: }
138: v.addElement(mbr);
139: if (suspected)
140: gms.castViewChange(null, null, v);
141: else
142: gms.castViewChange(null, v, null);
143: }
144:
145: public void handleViewChange(ViewId new_view, Vector mbrs) {
146: if (leaving) {
147: if (mbrs.contains(gms.local_addr)) {
148: if (log.isWarnEnabled())
149: log
150: .warn("received view in which I'm still a member, cannot quit yet");
151: gms.installView(new_view, mbrs); // +++ modify
152: } else {
153: synchronized (leave_mutex) {
154: received_last_view = true;
155: leave_mutex.notifyAll();
156: }
157: }
158: return;
159: }
160: gms.installView(new_view, mbrs); // +++ modify
161: }
162:
163: /**
164: * Invoked by another coordinator that asks to merge its view with mine.
165: * I 'll be the new coordinator.
166: * We should flush our view, install a new view with all the members and
167: * return the new view that will be installed by the other coordinator before
168: * becoming a participant.
169: */
170: public synchronized View handleMerge(ViewId other_vid,
171: Vector other_mbrs) {
172:
173: if (log.isInfoEnabled())
174: log.info("other_vid=" + other_vid + " , other_mbrs="
175: + other_mbrs);
176:
177: //Check that the views are disjoint otherwire return null (means MERGE_DENIED)
178: for (Iterator i = other_mbrs.iterator(); i.hasNext();) {
179: if (gms.mbrs.contains((Address) i.next())) {
180: gms.passDown(new Event(Event.MERGE_DENIED));
181: return null;
182: }
183: }
184:
185: //Compute new View
186: ViewId vid = new ViewId(gms.local_addr, Math.max(other_vid
187: .getId() + 1, gms.ltime + 1));
188: HashSet members = new HashSet(gms.mbrs.getMembers());
189: members.addAll(other_mbrs);
190: Vector new_mbrs = new Vector(members);
191: Collections.sort(new_mbrs);
192: View new_view = new View(vid, new_mbrs);
193:
194: //Flush my view
195: gms.flush(gms.mbrs.getMembers(), null);
196:
197: //Install new view
198: MethodCall call = new MethodCall("handleViewChange",
199: new Object[] { vid, new_mbrs },
200: new String[] { ViewId.class.getName(),
201: Vector.class.getName() });
202: gms.callRemoteMethods(gms.mbrs.getMembers(), call,
203: GroupRequest.GET_ALL, 0);
204: return new_view;
205: }
206:
207: public void handleSuspect(Address mbr) {
208: if (mbr.equals(gms.local_addr)) {
209: if (log.isErrorEnabled())
210: log
211: .error("I am the coord and am suspected: am not quitting !");
212: return;
213: }
214: handleLeave(mbr, true); // irregular leave - forced
215: }
216:
217: }
|