0001: // $Id: CoordGmsImpl.java,v 1.52.2.2 2007/04/27 08:03:55 belaban Exp $
0002:
0003: package org.jgroups.protocols.pbcast;
0004:
0005: import org.jgroups.*;
0006: import org.jgroups.util.TimeScheduler;
0007:
0008: import java.util.*;
0009:
0010: /**
0011: * Coordinator role of the Group MemberShip (GMS) protocol. Accepts JOIN and LEAVE requests and emits view changes
0012: * accordingly.
0013: * @author Bela Ban
0014: */
0015: public class CoordGmsImpl extends GmsImpl {
0016: private boolean merging = false;
0017: private final MergeTask merge_task = new MergeTask();
0018: private final Vector merge_rsps = new Vector(11);
0019: // for MERGE_REQ/MERGE_RSP correlation, contains MergeData elements
0020: private ViewId merge_id = null;
0021:
0022: private Address merge_leader = null;
0023:
0024: private MergeCanceller merge_canceller = null;
0025:
0026: private final Object merge_canceller_mutex = new Object();
0027:
0028: /** the max time in ms to suspend message garbage collection */
0029: private final Long MAX_SUSPEND_TIMEOUT = new Long(30000);
0030:
0031: public CoordGmsImpl(GMS g) {
0032: super (g);
0033: }
0034:
0035: private void setMergeId(ViewId merge_id) {
0036: this .merge_id = merge_id;
0037: synchronized (merge_canceller_mutex) {
0038: if (this .merge_id != null) {
0039: stopMergeCanceller();
0040: merge_canceller = new MergeCanceller(this .merge_id,
0041: gms.merge_timeout);
0042: gms.timer.add(merge_canceller);
0043: } else { // merge completed
0044: stopMergeCanceller();
0045: }
0046: }
0047: }
0048:
0049: private void stopMergeCanceller() {
0050: synchronized (merge_canceller_mutex) {
0051: if (merge_canceller != null) {
0052: merge_canceller.cancel();
0053: merge_canceller = null;
0054: }
0055: }
0056: }
0057:
0058: public void init() throws Exception {
0059: super .init();
0060: cancelMerge();
0061: }
0062:
0063: public void join(Address mbr) {
0064: wrongMethod("join");
0065: }
0066:
0067: /** The coordinator itself wants to leave the group */
0068: public void leave(Address mbr) {
0069: if (mbr == null) {
0070: if (log.isErrorEnabled())
0071: log.error("member's address is null !");
0072: return;
0073: }
0074: if (mbr.equals(gms.local_addr))
0075: leaving = true;
0076: gms.getViewHandler().add(
0077: new GMS.Request(GMS.Request.LEAVE, mbr, false, null));
0078: gms.getViewHandler().stop(true); // wait until all requests have been processed, then close the queue and leave
0079: gms.getViewHandler().waitUntilCompleted(gms.leave_timeout);
0080: }
0081:
0082: public void handleJoinResponse(JoinRsp join_rsp) {
0083: }
0084:
0085: public void handleLeaveResponse() {
0086: }
0087:
0088: public void suspect(Address mbr) {
0089: if (mbr.equals(gms.local_addr)) {
0090: if (log.isWarnEnabled())
0091: log
0092: .warn("I am the coord and I'm being am suspected -- will probably leave shortly");
0093: return;
0094: }
0095: Collection emptyVector = new LinkedHashSet(0);
0096: Collection suspected = new LinkedHashSet(1);
0097: suspected.add(mbr);
0098: handleMembershipChange(emptyVector, emptyVector, suspected);
0099: }
0100:
0101: public void unsuspect(Address mbr) {
0102:
0103: }
0104:
0105: /**
0106: * Invoked upon receiving a MERGE event from the MERGE layer. Starts the merge protocol.
0107: * See description of protocol in DESIGN.
0108: * @param other_coords A list of coordinators (including myself) found by MERGE protocol
0109: */
0110: public void merge(Vector other_coords) {
0111: Membership tmp;
0112:
0113: if (merging) {
0114: if (log.isWarnEnabled())
0115: log
0116: .warn("merge already in progress, discarded MERGE event (I am "
0117: + gms.local_addr + ")");
0118: return;
0119: }
0120: merge_leader = null;
0121: if (other_coords == null) {
0122: if (log.isWarnEnabled())
0123: log
0124: .warn("list of other coordinators is null. Will not start merge.");
0125: return;
0126: }
0127:
0128: if (other_coords.size() <= 1) {
0129: if (log.isErrorEnabled())
0130: log.error("number of coordinators found is "
0131: + other_coords.size()
0132: + "; will not perform merge");
0133: return;
0134: }
0135:
0136: /* Establish deterministic order, so that coords can elect leader */
0137: tmp = new Membership(other_coords);
0138: tmp.sort();
0139: merge_leader = (Address) tmp.elementAt(0);
0140: // if(log.isDebugEnabled()) log.debug("coordinators in merge protocol are: " + tmp);
0141: if (merge_leader.equals(gms.local_addr) || gms.merge_leader) {
0142: if (log.isTraceEnabled())
0143: log
0144: .trace("I ("
0145: + gms.local_addr
0146: + ") will be the leader. Starting the merge task");
0147: startMergeTask(other_coords);
0148: } else {
0149: if (log.isTraceEnabled())
0150: log.trace("I (" + gms.local_addr
0151: + ") am not the merge leader, "
0152: + "waiting for merge leader (" + merge_leader
0153: + ")to initiate merge");
0154: }
0155: }
0156:
0157: /**
0158: * Get the view and digest and send back both (MergeData) in the form of a MERGE_RSP to the sender.
0159: * If a merge is already in progress, send back a MergeData with the merge_rejected field set to true.
0160: */
0161: public void handleMergeRequest(Address sender, ViewId merge_id) {
0162: Digest digest;
0163: View view;
0164:
0165: if (sender == null) {
0166: if (log.isErrorEnabled())
0167: log
0168: .error("sender == null; cannot send back a response");
0169: return;
0170: }
0171: if (merging) {
0172: if (log.isErrorEnabled())
0173: log.error("merge already in progress");
0174: sendMergeRejectedResponse(sender, merge_id);
0175: return;
0176: }
0177: merging = true;
0178:
0179: /* Clears the view handler queue and discards all JOIN/LEAVE/MERGE requests until after the MERGE */
0180: gms.getViewHandler().suspend(merge_id);
0181:
0182: setMergeId(merge_id);
0183: if (log.isDebugEnabled())
0184: log.debug("sender=" + sender + ", merge_id=" + merge_id);
0185: digest = gms.getDigest();
0186: view = new View(gms.view_id.copy(), gms.members.getMembers());
0187: gms.passDown(new Event(Event.ENABLE_UNICASTS_TO, sender));
0188: sendMergeResponse(sender, view, digest);
0189: }
0190:
0191: private MergeData getMergeResponse(Address sender, ViewId merge_id) {
0192: Digest digest;
0193: View view;
0194: MergeData retval;
0195:
0196: if (sender == null) {
0197: if (log.isErrorEnabled())
0198: log
0199: .error("sender == null; cannot send back a response");
0200: return null;
0201: }
0202: if (merging) {
0203: if (log.isErrorEnabled())
0204: log.error("merge already in progress");
0205: retval = new MergeData(sender, null, null);
0206: retval.merge_rejected = true;
0207: return retval;
0208: }
0209: merging = true;
0210: setMergeId(merge_id);
0211: if (log.isDebugEnabled())
0212: log.debug("sender=" + sender + ", merge_id=" + merge_id);
0213:
0214: try {
0215: digest = gms.getDigest();
0216: view = new View(gms.view_id.copy(), gms.members
0217: .getMembers());
0218: retval = new MergeData(sender, view, digest);
0219: retval.view = view;
0220: retval.digest = digest;
0221: } catch (NullPointerException null_ex) {
0222: return null;
0223: }
0224: return retval;
0225: }
0226:
0227: public void handleMergeResponse(MergeData data, ViewId merge_id) {
0228: if (data == null) {
0229: if (log.isErrorEnabled())
0230: log.error("merge data is null");
0231: return;
0232: }
0233: if (merge_id == null || this .merge_id == null) {
0234: if (log.isErrorEnabled())
0235: log.error("merge_id (" + merge_id
0236: + ") or this.merge_id (" + this .merge_id
0237: + ") is null (sender=" + data.getSender()
0238: + ").");
0239: return;
0240: }
0241:
0242: if (!this .merge_id.equals(merge_id)) {
0243: if (log.isErrorEnabled())
0244: log.error("this.merge_id (" + this .merge_id
0245: + ") is different from merge_id (" + merge_id
0246: + ')');
0247: return;
0248: }
0249:
0250: synchronized (merge_rsps) {
0251: if (!merge_rsps.contains(data)) {
0252: merge_rsps.addElement(data);
0253: merge_rsps.notifyAll();
0254: }
0255: }
0256: }
0257:
0258: /**
0259: * If merge_id is not equal to this.merge_id then discard.
0260: * Else cast the view/digest to all members of this group.
0261: */
0262: public void handleMergeView(MergeData data, ViewId merge_id) {
0263: if (merge_id == null || this .merge_id == null
0264: || !this .merge_id.equals(merge_id)) {
0265: if (log.isErrorEnabled())
0266: log
0267: .error("merge_ids don't match (or are null); merge view discarded");
0268: return;
0269: }
0270: java.util.List my_members = gms.view != null ? gms.view
0271: .getMembers() : null;
0272:
0273: // only send to our *current* members, if we have A and B being merged (we are B), then we would *not*
0274: // receive a VIEW_ACK from A because A doesn't see us in the pre-merge view yet and discards the view
0275:
0276: GMS.Request req = new GMS.Request(GMS.Request.VIEW);
0277: req.view = data.view;
0278: req.digest = data.digest;
0279: req.target_members = my_members;
0280: gms.getViewHandler().add(req, true, // at head so it is processed next
0281: true); // un-suspend the queue
0282: merging = false;
0283: }
0284:
0285: public void handleMergeCancelled(ViewId merge_id) {
0286: if (merge_id != null && this .merge_id != null
0287: && this .merge_id.equals(merge_id)) {
0288: if (log.isDebugEnabled())
0289: log.debug("merge was cancelled (merge_id=" + merge_id
0290: + ", local_addr=" + gms.local_addr + ")");
0291: setMergeId(null);
0292: this .merge_leader = null;
0293: merging = false;
0294: gms.getViewHandler().resume(merge_id);
0295: }
0296: }
0297:
0298: private void cancelMerge() {
0299: Object tmp = merge_id;
0300: if (merge_id != null && log.isDebugEnabled())
0301: log.debug("cancelling merge (merge_id=" + merge_id + ')');
0302: setMergeId(null);
0303: this .merge_leader = null;
0304: stopMergeTask();
0305: merging = false;
0306: synchronized (merge_rsps) {
0307: merge_rsps.clear();
0308: }
0309: gms.getViewHandler().resume(tmp);
0310: }
0311:
0312: /**
0313: * Computes the new view (including the newly joined member) and get the digest from PBCAST.
0314: * Returns both in the form of a JoinRsp
0315: */
0316: /*private synchronized void handleJoin(Address mbr) {
0317: View v;
0318: Digest d, tmp;
0319: JoinRsp join_rsp;
0320:
0321: if(mbr == null) {
0322: if(log.isErrorEnabled()) log.error("mbr is null");
0323: return;
0324: }
0325: if(gms.local_addr.equals(mbr)) {
0326: if(log.isErrorEnabled()) log.error("cannot join myself !");
0327: return;
0328: }
0329: if(log.isDebugEnabled()) log.debug("mbr=" + mbr);
0330: if(gms.members.contains(mbr)) { // already joined: return current digest and membership
0331: if(log.isWarnEnabled())
0332: log.warn(mbr + " already present; returning existing view " + gms.view);
0333: join_rsp=new JoinRsp(new View(gms.view_id, gms.members.getMembers()), gms.getDigest());
0334: sendJoinResponse(join_rsp, mbr);
0335: return;
0336: }
0337:
0338: try {
0339: // we cannot garbage collect during joining a new member *if* we're the only member
0340: // Example: {A}, B joins, after returning JoinRsp to B, A garbage collects messages higher than those in the
0341: // digest returned to the client, so the client will *not* be able to ask for retransmission of those
0342: // messages if he misses them
0343: gms.passDown(new Event(Event.SUSPEND_STABLE, MAX_SUSPEND_TIMEOUT));
0344: Vector new_mbrs=new Vector(1);
0345: new_mbrs.addElement(mbr);
0346: tmp=gms.getDigest(); // get existing digest
0347: if(tmp == null) {
0348: if(log.isErrorEnabled()) log.error("received null digest from GET_DIGEST: will cause JOIN to fail");
0349: return;
0350: }
0351:
0352: d=new Digest(tmp.size() + 1); // create a new digest, which contains 1 more member
0353: d.add(tmp); // add the existing digest to the new one
0354: d.add(mbr, 0, 0); // ... and add the new member. it's first seqno will be 1
0355: v=gms.getNextView(new_mbrs, null, null);
0356: if(log.isDebugEnabled()) log.debug("joined member " + mbr + ", view is " + v);
0357: join_rsp=new JoinRsp(v, d);
0358:
0359: // 2. Send down a local TMP_VIEW event. This is needed by certain layers (e.g. NAKACK) to compute correct digest
0360: // in case client's next request (e.g. getState()) reaches us *before* our own view change multicast.
0361: // Check NAKACK's TMP_VIEW handling for details
0362: if(join_rsp.getView() != null)
0363: gms.passDown(new Event(Event.TMP_VIEW, join_rsp.getView()));
0364:
0365: Vector tmp_mbrs=join_rsp.getView() != null? new Vector(join_rsp.getView().getMembers()) : null;
0366:
0367: if(gms.use_flush) {
0368:
0369: //3a. FLUSH protocol is in use. First we FLUSH current members. Then we send a
0370: // view to a joining member and we will wait for his ACK together with view
0371: // ACKs from current members (castViewChangeWithDest). After all ACKS have been
0372: // collected, FLUSH is stopped (below in finally clause) and thus members are
0373: // allowed to send messages again.
0374: gms.startFlush(join_rsp.getView());
0375: sendJoinResponse(join_rsp, mbr);
0376: gms.castViewChangeWithDest(join_rsp.getView(), null, tmp_mbrs);
0377: }
0378: else {
0379: //3b. Broadcast the new view
0380: // we'll multicast the new view first and only, when everyone has replied with a VIEW_ACK (or timeout),
0381: // send the JOIN_RSP back to the client. This prevents the client from sending multicast messages in
0382: // view V2 which may get dropped by existing members because they're still in view V1.
0383: // (http://jira.jboss.com/jira/browse/JGRP-235)
0384:
0385: if(tmp_mbrs != null)
0386: tmp_mbrs.remove(mbr); // exclude the newly joined member from VIEW_ACKs
0387:
0388: gms.castViewChangeWithDest(join_rsp.getView(), null, tmp_mbrs);
0389:
0390: // 4. Return result to client
0391: sendJoinResponse(join_rsp, mbr);
0392: }
0393:
0394: }
0395: finally {
0396: if(gms.use_flush)
0397: gms.stopFlush();
0398: gms.passDown(new Event(Event.RESUME_STABLE));
0399: }
0400: }
0401: */
0402:
0403: /**
0404: Exclude <code>mbr</code> from the membership. If <code>suspected</code> is true, then
0405: this member crashed and therefore is forced to leave, otherwise it is leaving voluntarily.
0406: */
0407: /*private void handleLeave(Address mbr, boolean suspected) {
0408: View new_view=null;
0409: Vector v=new Vector(1);
0410: v.addElement(mbr);
0411:
0412: // contains either leaving mbrs or suspected mbrs
0413: if(log.isDebugEnabled()) log.debug("mbr=" + mbr);
0414: if(!gms.members.contains(mbr)) {
0415: if(log.isTraceEnabled()) log.trace("mbr " + mbr + " is not a member !");
0416: return;
0417: }
0418:
0419: if(gms.view_id == null) {
0420: // we're probably not the coord anymore (we just left ourselves), let someone else do it
0421: // (client will retry when it doesn't get a response
0422: if(log.isDebugEnabled())
0423: log.debug("gms.view_id is null, I'm not the coordinator anymore (leaving=" + leaving +
0424: "); the new coordinator will handle the leave request");
0425: return;
0426: }
0427: try {
0428: sendLeaveResponse(mbr); // send an ack to the leaving member
0429: if(suspected)
0430: new_view=gms.getNextView(null, null, v);
0431: else
0432: new_view=gms.getNextView(null, v, null);
0433:
0434: if(gms.use_flush) {
0435: gms.startFlush(new_view);
0436: }
0437: gms.castViewChange(new_view, null);
0438: }
0439: finally {
0440: if(gms.use_flush) {
0441: gms.stopFlush();
0442: }
0443: }
0444: if(leaving) {
0445: gms.passUp(new Event(Event.DISCONNECT_OK));
0446: gms.initState(); // in case connect() is called again
0447: }
0448: }*/
0449:
0450: public void handleMembershipChange(Collection new_mbrs,
0451: Collection leaving_mbrs, Collection suspected_mbrs) {
0452: if (new_mbrs == null)
0453: new_mbrs = new LinkedHashSet(0);
0454: if (suspected_mbrs == null)
0455: suspected_mbrs = new LinkedHashSet(0);
0456: if (leaving_mbrs == null)
0457: leaving_mbrs = new LinkedHashSet(0);
0458: boolean joining_mbrs = !new_mbrs.isEmpty();
0459:
0460: new_mbrs.remove(gms.local_addr); // remove myself - cannot join myself (already joined)
0461:
0462: if (gms.view_id == null) {
0463: // we're probably not the coord anymore (we just left ourselves), let someone else do it
0464: // (client will retry when it doesn't get a response)
0465: if (log.isDebugEnabled())
0466: log
0467: .debug("gms.view_id is null, I'm not the coordinator anymore (leaving="
0468: + leaving
0469: + "); the new coordinator will handle the leave request");
0470: return;
0471: }
0472:
0473: Vector current_members = gms.members.getMembers();
0474: leaving_mbrs.retainAll(current_members); // remove all elements of leaving_mbrs which are not current members
0475: if (suspected_mbrs.remove(gms.local_addr)) {
0476: if (log.isWarnEnabled())
0477: log
0478: .warn("I am the coord and I'm being suspected -- will probably leave shortly");
0479: }
0480: suspected_mbrs.retainAll(current_members); // remove all elements of suspected_mbrs which are not current members
0481:
0482: // for the members that have already joined, return the current digest and membership
0483: for (Iterator it = new_mbrs.iterator(); it.hasNext();) {
0484: Address mbr = (Address) it.next();
0485: if (gms.members.contains(mbr)) { // already joined: return current digest and membership
0486: if (log.isWarnEnabled())
0487: log
0488: .warn(mbr
0489: + " already present; returning existing view "
0490: + gms.view);
0491: JoinRsp join_rsp = new JoinRsp(new View(gms.view_id,
0492: gms.members.getMembers()), gms.getDigest());
0493: sendJoinResponse(join_rsp, mbr);
0494: it.remove();
0495: }
0496: }
0497:
0498: if (new_mbrs.isEmpty() && leaving_mbrs.isEmpty()
0499: && suspected_mbrs.isEmpty()) {
0500: if (log.isTraceEnabled())
0501: log
0502: .trace("found no members to add or remove, will not create new view");
0503: return;
0504: }
0505:
0506: JoinRsp join_rsp = null;
0507: View new_view = gms.getNextView(new_mbrs, leaving_mbrs,
0508: suspected_mbrs);
0509: if (log.isDebugEnabled())
0510: log.debug("new=" + new_mbrs + ", suspected="
0511: + suspected_mbrs + ", leaving=" + leaving_mbrs
0512: + ", new view: " + new_view);
0513: try {
0514:
0515: // we cannot garbage collect during joining a new member *if* we're the only member
0516: // Example: {A}, B joins, after returning JoinRsp to B, A garbage collects messages higher than those
0517: // in the digest returned to the client, so the client will *not* be able to ask for retransmission
0518: // of those messages if he misses them
0519: if (joining_mbrs) {
0520: gms.passDown(new Event(Event.SUSPEND_STABLE,
0521: MAX_SUSPEND_TIMEOUT));
0522: Digest d = null, tmp = gms.getDigest(); // get existing digest
0523: if (tmp == null)
0524: log
0525: .error("received null digest from GET_DIGEST: will cause JOIN to fail");
0526: else {
0527: // create a new digest, which contains the new member
0528: d = new Digest(tmp.size() + new_mbrs.size());
0529: d.add(tmp); // add the existing digest to the new one
0530: for (Iterator i = new_mbrs.iterator(); i.hasNext();)
0531: d.add((Address) i.next(), 0, 0); // ... and add the new members. their first seqno will be 1
0532: }
0533: join_rsp = new JoinRsp(new_view, d);
0534: }
0535:
0536: sendLeaveResponses(leaving_mbrs); // no-op if no leaving members
0537:
0538: // Send down a local TMP_VIEW event. This is needed by certain layers (e.g. NAKACK) to compute correct digest
0539: // in case client's next request (e.g. getState()) reaches us *before* our own view change multicast.
0540: // Check NAKACK's TMP_VIEW handling for details
0541: if (new_view != null)
0542: gms.passDown(new Event(Event.TMP_VIEW, new_view));
0543:
0544: Vector tmp_mbrs = new_view != null ? new Vector(new_view
0545: .getMembers()) : null;
0546: if (gms.use_flush) {
0547: // First we flush current members. Then we send a view to all joining member and we wait for their ACKs
0548: // together with ACKs from current members. After all ACKS have been collected, FLUSH is stopped
0549: // (below in finally clause) and members are allowed to send messages again
0550: boolean successfulFlush = gms.startFlush(new_view, 3);
0551: if (successfulFlush) {
0552: log.info("Successful GMS flush by coordinator at "
0553: + gms.getLocalAddress());
0554: }
0555: sendJoinResponses(join_rsp, new_mbrs); // might be a no-op if no joining members
0556: gms.castViewChangeWithDest(new_view, null, tmp_mbrs);
0557: } else {
0558: if (tmp_mbrs != null) // exclude the newly joined member from VIEW_ACKs
0559: tmp_mbrs.removeAll(new_mbrs);
0560: // Broadcast the new view
0561: // we'll multicast the new view first and only, when everyone has replied with a VIEW_ACK (or timeout),
0562: // send the JOIN_RSP back to the client. This prevents the client from sending multicast messages in
0563: // view V2 which may get dropped by existing members because they're still in view V1.
0564: // (http://jira.jboss.com/jira/browse/JGRP-235)
0565: gms.castViewChangeWithDest(new_view, null, tmp_mbrs);
0566: sendJoinResponses(join_rsp, new_mbrs); // Return result to newly joined clients (if there are any)
0567: }
0568: } finally {
0569: if (joining_mbrs)
0570: gms.passDown(new Event(Event.RESUME_STABLE));
0571: if (gms.use_flush)
0572: gms.stopFlush(new_view);
0573: if (leaving) {
0574: gms.passUp(new Event(Event.DISCONNECT_OK));
0575: gms.initState(); // in case connect() is called again
0576: }
0577: }
0578: }
0579:
0580: /**
0581: * Called by the GMS when a VIEW is received.
0582: * @param new_view The view to be installed
0583: * @param digest If view is a MergeView, digest contains the seqno digest of all members and has to
0584: * be set by GMS
0585: */
0586: public void handleViewChange(View new_view, Digest digest) {
0587: Vector mbrs = new_view.getMembers();
0588: if (log.isDebugEnabled()) {
0589: if (digest != null)
0590: log.debug("view=" + new_view + ", digest=" + digest);
0591: else
0592: log.debug("view=" + new_view);
0593: }
0594:
0595: if (leaving && !mbrs.contains(gms.local_addr))
0596: return;
0597: gms.installView(new_view, digest);
0598: }
0599:
0600: public void handleExit() {
0601: cancelMerge();
0602: }
0603:
0604: public void stop() {
0605: super .stop(); // sets leaving=false
0606: stopMergeTask();
0607: }
0608:
0609: /* ------------------------------------------ Private methods ----------------------------------------- */
0610:
0611: void startMergeTask(Vector coords) {
0612: synchronized (merge_task) {
0613: merge_task.start(coords);
0614: }
0615: }
0616:
0617: void stopMergeTask() {
0618: synchronized (merge_task) {
0619: merge_task.stop();
0620: }
0621: }
0622:
0623: private void sendJoinResponses(JoinRsp rsp, Collection c) {
0624: if (c != null && rsp != null) {
0625: for (Iterator it = c.iterator(); it.hasNext();) {
0626: sendJoinResponse(rsp, (Address) it.next());
0627: }
0628: }
0629: }
0630:
0631: private void sendJoinResponse(JoinRsp rsp, Address dest) {
0632: Message m = new Message(dest, null, null);
0633: GMS.GmsHeader hdr = new GMS.GmsHeader(GMS.GmsHeader.JOIN_RSP,
0634: rsp);
0635: m.putHeader(gms.getName(), hdr);
0636: gms.passDown(new Event(Event.MSG, m));
0637: }
0638:
0639: private void sendLeaveResponses(Collection c) {
0640: for (Iterator i = c.iterator(); i.hasNext();) {
0641: Message msg = new Message((Address) i.next(), null, null); // send an ack to the leaving member
0642: GMS.GmsHeader hdr = new GMS.GmsHeader(
0643: GMS.GmsHeader.LEAVE_RSP);
0644: msg.putHeader(gms.getName(), hdr);
0645: gms.passDown(new Event(Event.MSG, msg));
0646: }
0647: }
0648:
0649: /**
0650: * Sends a MERGE_REQ to all coords and populates a list of MergeData (in merge_rsps). Returns after coords.size()
0651: * response have been received, or timeout msecs have elapsed (whichever is first).<p>
0652: * If a subgroup coordinator rejects the MERGE_REQ (e.g. because of participation in a different merge),
0653: * <em>that member will be removed from coords !</em>
0654: * @param coords A list of Addresses of subgroup coordinators (inluding myself)
0655: * @param timeout Max number of msecs to wait for the merge responses from the subgroup coords
0656: */
0657: private void getMergeDataFromSubgroupCoordinators(Vector coords,
0658: long timeout) {
0659: Message msg;
0660: GMS.GmsHeader hdr;
0661:
0662: long curr_time, time_to_wait, end_time, start, stop;
0663: int num_rsps_expected;
0664:
0665: if (coords == null || coords.size() <= 1) {
0666: if (log.isErrorEnabled())
0667: log.error("coords == null or size <= 1");
0668: return;
0669: }
0670:
0671: start = System.currentTimeMillis();
0672: MergeData tmp;
0673: synchronized (merge_rsps) {
0674: merge_rsps.removeAllElements();
0675: if (log.isDebugEnabled())
0676: log.debug("sending MERGE_REQ to " + coords);
0677: Address coord;
0678: for (int i = 0; i < coords.size(); i++) {
0679: coord = (Address) coords.elementAt(i);
0680: if (gms.local_addr != null
0681: && gms.local_addr.equals(coord)) {
0682: tmp = getMergeResponse(gms.local_addr, merge_id);
0683: if (tmp != null)
0684: merge_rsps.add(tmp);
0685: continue;
0686: }
0687:
0688: // this allows UNICAST to remove coord from previous_members in case of a merge
0689: gms
0690: .passDown(new Event(Event.ENABLE_UNICASTS_TO,
0691: coord));
0692:
0693: msg = new Message(coord, null, null);
0694: hdr = new GMS.GmsHeader(GMS.GmsHeader.MERGE_REQ);
0695: hdr.mbr = gms.local_addr;
0696: hdr.merge_id = merge_id;
0697: msg.putHeader(gms.getName(), hdr);
0698: gms.passDown(new Event(Event.MSG, msg));
0699: }
0700:
0701: // wait until num_rsps_expected >= num_rsps or timeout elapsed
0702: num_rsps_expected = coords.size();
0703: curr_time = System.currentTimeMillis();
0704: end_time = curr_time + timeout;
0705: while (end_time > curr_time) {
0706: time_to_wait = end_time - curr_time;
0707: if (log.isDebugEnabled())
0708: log.debug("waiting " + time_to_wait
0709: + " msecs for merge responses");
0710: if (merge_rsps.size() < num_rsps_expected) {
0711: try {
0712: merge_rsps.wait(time_to_wait);
0713: } catch (Exception ex) {
0714: }
0715: }
0716: if (log.isDebugEnabled())
0717: log
0718: .debug("num_rsps_expected="
0719: + num_rsps_expected
0720: + ", actual responses="
0721: + merge_rsps.size());
0722:
0723: if (merge_rsps.size() >= num_rsps_expected)
0724: break;
0725: curr_time = System.currentTimeMillis();
0726: }
0727: stop = System.currentTimeMillis();
0728: if (log.isTraceEnabled())
0729: log.trace("collected " + merge_rsps.size()
0730: + " merge response(s) in " + (stop - start)
0731: + "ms");
0732: }
0733: }
0734:
0735: /**
0736: * Generates a unique merge id by taking the local address and the current time
0737: */
0738: private ViewId generateMergeId() {
0739: return new ViewId(gms.local_addr, System.currentTimeMillis());
0740: // we're (ab)using ViewId as a merge id
0741: }
0742:
0743: /**
0744: * Merge all MergeData. All MergeData elements should be disjunct (both views and digests). However,
0745: * this method is prepared to resolve duplicate entries (for the same member). Resolution strategy for
0746: * views is to merge only 1 of the duplicate members. Resolution strategy for digests is to take the higher
0747: * seqnos for duplicate digests.<p>
0748: * After merging all members into a Membership and subsequent sorting, the first member of the sorted membership
0749: * will be the new coordinator.
0750: * @param v A list of MergeData items. Elements with merge_rejected=true were removed before. Is guaranteed
0751: * not to be null and to contain at least 1 member.
0752: */
0753: private MergeData consolidateMergeData(Vector v) {
0754: MergeData ret;
0755: MergeData tmp_data;
0756: long logical_time = 0; // for new_vid
0757: ViewId new_vid, tmp_vid;
0758: MergeView new_view;
0759: View tmp_view;
0760: Membership new_mbrs = new Membership();
0761: int num_mbrs;
0762: Digest new_digest;
0763: Address new_coord;
0764: Vector subgroups = new Vector(11);
0765: // contains a list of Views, each View is a subgroup
0766:
0767: for (int i = 0; i < v.size(); i++) {
0768: tmp_data = (MergeData) v.elementAt(i);
0769: if (log.isDebugEnabled())
0770: log.debug("merge data is " + tmp_data);
0771: tmp_view = tmp_data.getView();
0772: if (tmp_view != null) {
0773: tmp_vid = tmp_view.getVid();
0774: if (tmp_vid != null) {
0775: // compute the new view id (max of all vids +1)
0776: logical_time = Math.max(logical_time, tmp_vid
0777: .getId());
0778: }
0779: // merge all membership lists into one (prevent duplicates)
0780: new_mbrs.add(tmp_view.getMembers());
0781: subgroups.addElement(tmp_view.clone());
0782: }
0783: }
0784:
0785: // the new coordinator is the first member of the consolidated & sorted membership list
0786: new_mbrs.sort();
0787: num_mbrs = new_mbrs.size();
0788: new_coord = num_mbrs > 0 ? (Address) new_mbrs.elementAt(0)
0789: : null;
0790: if (new_coord == null) {
0791: if (log.isErrorEnabled())
0792: log.error("new_coord == null");
0793: return null;
0794: }
0795: // should be the highest view ID seen up to now plus 1
0796: new_vid = new ViewId(new_coord, logical_time + 1);
0797:
0798: // determine the new view
0799: new_view = new MergeView(new_vid, new_mbrs.getMembers(),
0800: subgroups);
0801: if (log.isDebugEnabled())
0802: log.debug("new merged view will be " + new_view);
0803:
0804: // determine the new digest
0805: new_digest = consolidateDigests(v, num_mbrs);
0806: if (new_digest == null) {
0807: if (log.isErrorEnabled())
0808: log.error("digest could not be consolidated");
0809: return null;
0810: }
0811: if (log.isDebugEnabled())
0812: log.debug("consolidated digest=" + new_digest);
0813: ret = new MergeData(gms.local_addr, new_view, new_digest);
0814: return ret;
0815: }
0816:
0817: /**
0818: * Merge all digests into one. For each sender, the new value is min(low_seqno), max(high_seqno),
0819: * max(high_seqno_seen)
0820: */
0821: private Digest consolidateDigests(Vector v, int num_mbrs) {
0822: MergeData data;
0823: Digest tmp_digest, retval = new Digest(num_mbrs);
0824:
0825: for (int i = 0; i < v.size(); i++) {
0826: data = (MergeData) v.elementAt(i);
0827: tmp_digest = data.getDigest();
0828: if (tmp_digest == null) {
0829: if (log.isErrorEnabled())
0830: log.error("tmp_digest == null; skipping");
0831: continue;
0832: }
0833: retval.merge(tmp_digest);
0834: }
0835: return retval;
0836: }
0837:
0838: /**
0839: * Sends the new view and digest to all subgroup coordinors in coords. Each coord will in turn
0840: * <ol>
0841: * <li>cast the new view and digest to all the members of its subgroup (MergeView)
0842: * <li>on reception of the view, if it is a MergeView, each member will set the digest and install
0843: * the new view
0844: * </ol>
0845: */
0846: private void sendMergeView(Vector coords,
0847: MergeData combined_merge_data) {
0848: Message msg;
0849: GMS.GmsHeader hdr;
0850: Address coord;
0851: View v;
0852: Digest d;
0853:
0854: if (coords == null || combined_merge_data == null)
0855: return;
0856:
0857: v = combined_merge_data.view;
0858: d = combined_merge_data.digest;
0859: if (v == null || d == null) {
0860: if (log.isErrorEnabled())
0861: log
0862: .error("view or digest is null, cannot send consolidated merge view/digest");
0863: return;
0864: }
0865:
0866: if (log.isTraceEnabled())
0867: log.trace("sending merge view " + v.getVid()
0868: + " to coordinators " + coords);
0869:
0870: for (int i = 0; i < coords.size(); i++) {
0871: coord = (Address) coords.elementAt(i);
0872: msg = new Message(coord, null, null);
0873: hdr = new GMS.GmsHeader(GMS.GmsHeader.INSTALL_MERGE_VIEW);
0874: hdr.view = v;
0875: hdr.my_digest = d;
0876: hdr.merge_id = merge_id;
0877: msg.putHeader(gms.getName(), hdr);
0878: gms.passDown(new Event(Event.MSG, msg));
0879: }
0880: }
0881:
0882: /**
0883: * Send back a response containing view and digest to sender
0884: */
0885: private void sendMergeResponse(Address sender, View view,
0886: Digest digest) {
0887: Message msg = new Message(sender, null, null);
0888: GMS.GmsHeader hdr = new GMS.GmsHeader(GMS.GmsHeader.MERGE_RSP);
0889: hdr.merge_id = merge_id;
0890: hdr.view = view;
0891: hdr.my_digest = digest;
0892: msg.putHeader(gms.getName(), hdr);
0893: if (log.isDebugEnabled())
0894: log.debug("response=" + hdr);
0895: gms.passDown(new Event(Event.MSG, msg));
0896: }
0897:
0898: private void sendMergeCancelledMessage(Vector coords,
0899: ViewId merge_id) {
0900: Message msg;
0901: GMS.GmsHeader hdr;
0902: Address coord;
0903:
0904: if (coords == null || merge_id == null) {
0905: if (log.isErrorEnabled())
0906: log.error("coords or merge_id == null");
0907: return;
0908: }
0909: for (int i = 0; i < coords.size(); i++) {
0910: coord = (Address) coords.elementAt(i);
0911: msg = new Message(coord, null, null);
0912: hdr = new GMS.GmsHeader(GMS.GmsHeader.CANCEL_MERGE);
0913: hdr.merge_id = merge_id;
0914: msg.putHeader(gms.getName(), hdr);
0915: gms.passDown(new Event(Event.MSG, msg));
0916: }
0917: }
0918:
0919: /** Removed rejected merge requests from merge_rsps and coords */
0920: private void removeRejectedMergeRequests(Vector coords) {
0921: MergeData data;
0922: for (Iterator it = merge_rsps.iterator(); it.hasNext();) {
0923: data = (MergeData) it.next();
0924: if (data.merge_rejected) {
0925: if (data.getSender() != null && coords != null)
0926: coords.removeElement(data.getSender());
0927: it.remove();
0928: if (log.isDebugEnabled())
0929: log.debug("removed element " + data);
0930: }
0931: }
0932: }
0933:
0934: /* --------------------------------------- End of Private methods ------------------------------------- */
0935:
0936: /**
0937: * Starts the merge protocol (only run by the merge leader). Essentially sends a MERGE_REQ to all
0938: * coordinators of all subgroups found. Each coord receives its digest and view and returns it.
0939: * The leader then computes the digest and view for the new group from the return values. Finally, it
0940: * sends this merged view/digest to all subgroup coordinators; each coordinator will install it in their
0941: * subgroup.
0942: */
0943: private class MergeTask implements Runnable {
0944: Thread t = null;
0945: Vector coords = null; // list of subgroup coordinators to be contacted
0946:
0947: public void start(Vector coords) {
0948: if (t == null || !t.isAlive()) {
0949: this .coords = (Vector) (coords != null ? coords.clone()
0950: : null);
0951: t = new Thread(this , "MergeTask");
0952: t.setDaemon(true);
0953: t.start();
0954: }
0955: }
0956:
0957: public void stop() {
0958: Thread tmp = t;
0959: if (isRunning()) {
0960: t = null;
0961: tmp.interrupt();
0962: }
0963: t = null;
0964: coords = null;
0965: }
0966:
0967: public boolean isRunning() {
0968: return t != null && t.isAlive();
0969: }
0970:
0971: /**
0972: * Runs the merge protocol as a leader
0973: */
0974: public void run() {
0975: MergeData combined_merge_data;
0976:
0977: if (merging == true) {
0978: if (log.isWarnEnabled())
0979: log
0980: .warn("merge is already in progress, terminating");
0981: return;
0982: }
0983:
0984: if (log.isDebugEnabled())
0985: log.debug("merge task started, coordinators are "
0986: + this .coords);
0987: try {
0988:
0989: /* 1. Generate a merge_id that uniquely identifies the merge in progress */
0990: setMergeId(generateMergeId());
0991:
0992: /* 2. Fetch the current Views/Digests from all subgroup coordinators */
0993: getMergeDataFromSubgroupCoordinators(coords,
0994: gms.merge_timeout);
0995:
0996: /* 3. Remove rejected MergeData elements from merge_rsp and coords (so we'll send the new view only
0997: to members who accepted the merge request) */
0998: removeRejectedMergeRequests(coords);
0999:
1000: if (merge_rsps.size() <= 1) {
1001: if (log.isWarnEnabled())
1002: log
1003: .warn("merge responses from subgroup coordinators <= 1 ("
1004: + merge_rsps
1005: + "). Cancelling merge");
1006: sendMergeCancelledMessage(coords, merge_id);
1007: return;
1008: }
1009:
1010: /* 4. Combine all views and digests into 1 View/1 Digest */
1011: combined_merge_data = consolidateMergeData(merge_rsps);
1012: if (combined_merge_data == null) {
1013: if (log.isErrorEnabled())
1014: log.error("combined_merge_data == null");
1015: sendMergeCancelledMessage(coords, merge_id);
1016: return;
1017: }
1018:
1019: /* 5. Don't allow JOINs or LEAVEs until we are done with the merge. Suspend() will clear the
1020: view handler queue, so no requests beyond this current MERGE request will be processed */
1021: gms.getViewHandler().suspend(merge_id);
1022:
1023: /* 6. Send the new View/Digest to all coordinators (including myself). On reception, they will
1024: install the digest and view in all of their subgroup members */
1025: sendMergeView(coords, combined_merge_data);
1026: } catch (Throwable ex) {
1027: if (log.isErrorEnabled())
1028: log.error("exception while merging", ex);
1029: } finally {
1030: sendMergeCancelledMessage(coords, merge_id);
1031: stopMergeCanceller(); // this is probably not necessary
1032: merging = false;
1033: merge_leader = null;
1034: if (log.isDebugEnabled())
1035: log.debug("merge task terminated");
1036: t = null;
1037: }
1038: }
1039: }
1040:
1041: private class MergeCanceller implements TimeScheduler.Task {
1042: private Object my_merge_id = null;
1043: private long timeout;
1044: private boolean cancelled = false;
1045:
1046: MergeCanceller(Object my_merge_id, long timeout) {
1047: this .my_merge_id = my_merge_id;
1048: this .timeout = timeout;
1049: }
1050:
1051: public boolean cancelled() {
1052: return cancelled;
1053: }
1054:
1055: public void cancel() {
1056: cancelled = true;
1057: }
1058:
1059: public long nextInterval() {
1060: return timeout;
1061: }
1062:
1063: public void run() {
1064: if (merge_id != null && my_merge_id.equals(merge_id)) {
1065: if (log.isTraceEnabled())
1066: log.trace("cancelling merge due to timer timeout ("
1067: + timeout + " ms)");
1068: cancelMerge();
1069: cancelled = true;
1070: } else {
1071: if (log.isTraceEnabled())
1072: log
1073: .trace("timer kicked in after "
1074: + timeout
1075: + " ms, but no (or different) merge was in progress: "
1076: + "merge_id=" + merge_id
1077: + ", my_merge_id=" + my_merge_id);
1078: }
1079: }
1080: }
1081:
1082: }
|