0001: package org.jgroups.protocols.pbcast;
0002:
0003: import org.jgroups.*;
0004: import org.jgroups.stack.Protocol;
0005: import org.jgroups.util.*;
0006: import org.jgroups.util.Queue;
0007: import org.apache.commons.logging.Log;
0008:
0009: import java.io.*;
0010: import java.util.*;
0011: import java.util.List;
0012:
0013: /**
0014: * Group membership protocol. Handles joins/leaves/crashes (suspicions) and emits new views
0015: * accordingly. Use VIEW_ENFORCER on top of this layer to make sure new members don't receive
0016: * any messages until they are members
0017: * @author Bela Ban
0018: * @version $Id: GMS.java,v 1.68.2.5 2007/04/27 08:03:55 belaban Exp $
0019: */
0020: public class GMS extends Protocol {
0021: private GmsImpl impl = null;
0022: Address local_addr = null;
0023: final Membership members = new Membership(); // real membership
0024: private final Membership tmp_members = new Membership(); // base for computing next view
0025:
0026: /** Members joined but for which no view has been received yet */
0027: private final Vector joining = new Vector(7);
0028:
0029: /** Members excluded from group, but for which no view has been received yet */
0030: private final Vector leaving = new Vector(7);
0031:
0032: View view = null;
0033: ViewId view_id = null;
0034: private long ltime = 0;
0035: long join_timeout = 5000;
0036: long join_retry_timeout = 2000;
0037: long flush_timeout = 4000;
0038: long leave_timeout = 5000;
0039: private long digest_timeout = 0; // time to wait for a digest (from PBCAST). should be fast
0040: long merge_timeout = 10000; // time to wait for all MERGE_RSPS
0041: private final Object impl_mutex = new Object(); // synchronizes event entry into impl
0042: private final Promise digest_promise = new Promise(); // holds result of GET_DIGEST event
0043: private final Promise flush_promise = new Promise();
0044: boolean use_flush = false;
0045: private final Hashtable impls = new Hashtable(3);
0046: private boolean shun = false;
0047: boolean merge_leader = false; // can I initiate a merge ?
0048: private boolean print_local_addr = true;
0049: boolean disable_initial_coord = false; // can the member become a coord on startup or not ?
0050: /** Setting this to false disables concurrent startups. This is only used by unit testing code
0051: * for testing merging. To everybody else: don't change it to false ! */
0052: boolean handle_concurrent_startup = true;
0053: /** Whether view bundling (http://jira.jboss.com/jira/browse/JGRP-144) should be enabled or not. Setting this to
0054: * false forces each JOIN/LEAVE/SUPSECT request to be handled separately. By default these requests are processed
0055: * together if they are queued at approximately the same time */
0056: private boolean view_bundling = true;
0057: private long max_bundling_time = 50; // 50ms max to wait for other JOIN, LEAVE or SUSPECT requests
0058: static final String CLIENT = "Client";
0059: static final String COORD = "Coordinator";
0060: static final String PART = "Participant";
0061: TimeScheduler timer = null;
0062:
0063: /** Max number of old members to keep in history */
0064: protected int num_prev_mbrs = 50;
0065:
0066: /** Keeps track of old members (up to num_prev_mbrs) */
0067: BoundedList prev_members = null;
0068:
0069: int num_views = 0;
0070:
0071: /** Stores the last 20 views */
0072: BoundedList prev_views = new BoundedList(20);
0073:
0074: /** Class to process JOIN, LEAVE and MERGE requests */
0075: private final ViewHandler view_handler = new ViewHandler();
0076:
0077: /** To collect VIEW_ACKs from all members */
0078: final AckCollector ack_collector = new AckCollector();
0079:
0080: /** Time in ms to wait for all VIEW acks (0 == wait forever) */
0081: long view_ack_collection_timeout = 2000;
0082:
0083: /** How long should a Resumer wait until resuming the ViewHandler */
0084: long resume_task_timeout = 20000;
0085:
0086: boolean flushProtocolInStack = false;
0087:
0088: public static final String name = "GMS";
0089:
0090: public GMS() {
0091: initState();
0092: }
0093:
0094: public String getName() {
0095: return name;
0096: }
0097:
0098: public String getView() {
0099: return view_id != null ? view_id.toString() : "null";
0100: }
0101:
0102: public int getNumberOfViews() {
0103: return num_views;
0104: }
0105:
0106: public String getLocalAddress() {
0107: return local_addr != null ? local_addr.toString() : "null";
0108: }
0109:
0110: public String getMembers() {
0111: return members != null ? members.toString() : "[]";
0112: }
0113:
0114: public int getNumMembers() {
0115: return members != null ? members.size() : 0;
0116: }
0117:
0118: public long getJoinTimeout() {
0119: return join_timeout;
0120: }
0121:
0122: public void setJoinTimeout(long t) {
0123: join_timeout = t;
0124: }
0125:
0126: public long getJoinRetryTimeout() {
0127: return join_retry_timeout;
0128: }
0129:
0130: public void setJoinRetryTimeout(long t) {
0131: join_retry_timeout = t;
0132: }
0133:
0134: public boolean isShun() {
0135: return shun;
0136: }
0137:
0138: public void setShun(boolean s) {
0139: shun = s;
0140: }
0141:
0142: public String printPreviousMembers() {
0143: StringBuffer sb = new StringBuffer();
0144: if (prev_members != null) {
0145: for (Enumeration en = prev_members.elements(); en
0146: .hasMoreElements();) {
0147: sb.append(en.nextElement()).append("\n");
0148: }
0149: }
0150: return sb.toString();
0151: }
0152:
0153: public int viewHandlerSize() {
0154: return view_handler.size();
0155: }
0156:
0157: public boolean isViewHandlerSuspended() {
0158: return view_handler.suspended();
0159: }
0160:
0161: public String dumpViewHandlerQueue() {
0162: return view_handler.dumpQueue();
0163: }
0164:
0165: public String dumpViewHandlerHistory() {
0166: return view_handler.dumpHistory();
0167: }
0168:
0169: public void suspendViewHandler() {
0170: view_handler.suspend(null);
0171: }
0172:
0173: public void resumeViewHandler() {
0174: view_handler.resumeForce();
0175: }
0176:
0177: Log getLog() {
0178: return log;
0179: }
0180:
0181: ViewHandler getViewHandler() {
0182: return view_handler;
0183: }
0184:
0185: public String printPreviousViews() {
0186: StringBuffer sb = new StringBuffer();
0187: for (Enumeration en = prev_views.elements(); en
0188: .hasMoreElements();) {
0189: sb.append(en.nextElement()).append("\n");
0190: }
0191: return sb.toString();
0192: }
0193:
0194: public boolean isCoordinator() {
0195: Address coord = determineCoordinator();
0196: return coord != null && local_addr != null
0197: && local_addr.equals(coord);
0198: }
0199:
0200: public void resetStats() {
0201: super .resetStats();
0202: num_views = 0;
0203: prev_views.removeAll();
0204: }
0205:
0206: public Vector requiredDownServices() {
0207: Vector retval = new Vector(3);
0208: retval.addElement(new Integer(Event.GET_DIGEST));
0209: retval.addElement(new Integer(Event.SET_DIGEST));
0210: retval.addElement(new Integer(Event.FIND_INITIAL_MBRS));
0211: return retval;
0212: }
0213:
0214: public void setImpl(GmsImpl new_impl) {
0215: synchronized (impl_mutex) {
0216: if (impl == new_impl) // superfluous
0217: return;
0218: impl = new_impl;
0219: if (log.isDebugEnabled()) {
0220: String msg = (local_addr != null ? local_addr
0221: .toString()
0222: + " " : "")
0223: + "changed role to "
0224: + new_impl.getClass().getName();
0225: log.debug(msg);
0226: }
0227: }
0228: }
0229:
0230: public GmsImpl getImpl() {
0231: return impl;
0232: }
0233:
0234: public void init() throws Exception {
0235: prev_members = new BoundedList(num_prev_mbrs);
0236: timer = stack != null ? stack.timer : null;
0237: if (timer == null)
0238: throw new Exception("GMS.init(): timer is null");
0239: if (impl != null)
0240: impl.init();
0241: }
0242:
0243: public void start() throws Exception {
0244: if (impl != null)
0245: impl.start();
0246: if (!flushProtocolInStack && use_flush) {
0247: log
0248: .warn("use_flush is true, however, FLUSH protocol not found in stack.");
0249: use_flush = false;
0250: }
0251: }
0252:
0253: public void stop() {
0254: view_handler.stop(true);
0255: if (impl != null)
0256: impl.stop();
0257: if (prev_members != null)
0258: prev_members.removeAll();
0259: }
0260:
0261: public void becomeCoordinator() {
0262: CoordGmsImpl tmp = (CoordGmsImpl) impls.get(COORD);
0263: if (tmp == null) {
0264: tmp = new CoordGmsImpl(this );
0265: impls.put(COORD, tmp);
0266: }
0267: try {
0268: tmp.init();
0269: } catch (Exception e) {
0270: log.error("exception switching to coordinator role", e);
0271: }
0272: setImpl(tmp);
0273: }
0274:
0275: public void becomeParticipant() {
0276: ParticipantGmsImpl tmp = (ParticipantGmsImpl) impls.get(PART);
0277:
0278: if (tmp == null) {
0279: tmp = new ParticipantGmsImpl(this );
0280: impls.put(PART, tmp);
0281: }
0282: try {
0283: tmp.init();
0284: } catch (Exception e) {
0285: log.error("exception switching to participant", e);
0286: }
0287: setImpl(tmp);
0288: }
0289:
0290: public void becomeClient() {
0291: ClientGmsImpl tmp = (ClientGmsImpl) impls.get(CLIENT);
0292: if (tmp == null) {
0293: tmp = new ClientGmsImpl(this );
0294: impls.put(CLIENT, tmp);
0295: }
0296: try {
0297: tmp.init();
0298: } catch (Exception e) {
0299: log.error("exception switching to client role", e);
0300: }
0301: setImpl(tmp);
0302: }
0303:
0304: boolean haveCoordinatorRole() {
0305: return impl != null && impl instanceof CoordGmsImpl;
0306: }
0307:
0308: /**
0309: * Computes the next view. Returns a copy that has <code>old_mbrs</code> and
0310: * <code>suspected_mbrs</code> removed and <code>new_mbrs</code> added.
0311: */
0312: public View getNextView(Collection new_mbrs, Collection old_mbrs,
0313: Collection suspected_mbrs) {
0314: Vector mbrs;
0315: long vid;
0316: View v;
0317: Membership tmp_mbrs;
0318: Address tmp_mbr;
0319:
0320: synchronized (members) {
0321: if (view_id == null) {
0322: log.error("view_id is null");
0323: return null; // this should *never* happen !
0324: }
0325: vid = Math.max(view_id.getId(), ltime) + 1;
0326: ltime = vid;
0327: tmp_mbrs = tmp_members.copy(); // always operate on the temporary membership
0328: tmp_mbrs.remove(suspected_mbrs);
0329: tmp_mbrs.remove(old_mbrs);
0330: tmp_mbrs.add(new_mbrs);
0331: mbrs = tmp_mbrs.getMembers();
0332: v = new View(local_addr, vid, mbrs);
0333:
0334: // Update membership (see DESIGN for explanation):
0335: tmp_members.set(mbrs);
0336:
0337: // Update joining list (see DESIGN for explanation)
0338: if (new_mbrs != null) {
0339: for (Iterator it = new_mbrs.iterator(); it.hasNext();) {
0340: tmp_mbr = (Address) it.next();
0341: if (!joining.contains(tmp_mbr))
0342: joining.addElement(tmp_mbr);
0343: }
0344: }
0345:
0346: // Update leaving list (see DESIGN for explanations)
0347: if (old_mbrs != null) {
0348: for (Iterator it = old_mbrs.iterator(); it.hasNext();) {
0349: Address addr = (Address) it.next();
0350: if (!leaving.contains(addr))
0351: leaving.add(addr);
0352: }
0353: }
0354: if (suspected_mbrs != null) {
0355: for (Iterator it = suspected_mbrs.iterator(); it
0356: .hasNext();) {
0357: Address addr = (Address) it.next();
0358: if (!leaving.contains(addr))
0359: leaving.add(addr);
0360: }
0361: }
0362: return v;
0363: }
0364: }
0365:
0366: /**
0367: Compute a new view, given the current view, the new members and the suspected/left
0368: members. Then simply mcast the view to all members. This is different to the VS GMS protocol,
0369: in which we run a FLUSH protocol which tries to achive consensus on the set of messages mcast in
0370: the current view before proceeding to install the next view.
0371:
0372: The members for the new view are computed as follows:
0373: <pre>
0374: existing leaving suspected joining
0375:
0376: 1. new_view y n n y
0377: 2. tmp_view y y n y
0378: (view_dest)
0379: </pre>
0380:
0381: <ol>
0382: <li>
0383: The new view to be installed includes the existing members plus the joining ones and
0384: excludes the leaving and suspected members.
0385: <li>
0386: A temporary view is sent down the stack as an <em>event</em>. This allows the bottom layer
0387: (e.g. UDP or TCP) to determine the members to which to send a multicast message. Compared
0388: to the new view, leaving members are <em>included</em> since they have are waiting for a
0389: view in which they are not members any longer before they leave. So, if we did not set a
0390: temporary view, joining members would not receive the view (signalling that they have been
0391: joined successfully). The temporary view is essentially the current view plus the joining
0392: members (old members are still part of the current view).
0393: </ol>
0394: */
0395: public void castViewChange(Vector new_mbrs, Vector old_mbrs,
0396: Vector suspected_mbrs) {
0397: View new_view;
0398:
0399: // next view: current mbrs + new_mbrs - old_mbrs - suspected_mbrs
0400: new_view = getNextView(new_mbrs, old_mbrs, suspected_mbrs);
0401: castViewChange(new_view, null);
0402: }
0403:
0404: public void castViewChange(View new_view, Digest digest) {
0405: castViewChangeWithDest(new_view, digest, null);
0406: }
0407:
0408: /**
0409: * Broadcasts the new view and digest, and waits for acks from all members in the list given as argument.
0410: * If the list is null, we take the members who are part of new_view
0411: * @param new_view
0412: * @param digest
0413: * @param members
0414: */
0415: public void castViewChangeWithDest(View new_view, Digest digest,
0416: java.util.List members) {
0417: Message view_change_msg;
0418: GmsHeader hdr;
0419: long start, stop;
0420: ViewId vid = new_view.getVid();
0421: int size = -1;
0422:
0423: if (members == null || members.size() == 0)
0424: members = new_view.getMembers();
0425:
0426: if (log.isTraceEnabled())
0427: log.trace("mcasting view {" + new_view + "} ("
0428: + new_view.size() + " mbrs)\n");
0429:
0430: start = System.currentTimeMillis();
0431: view_change_msg = new Message(); // bcast to all members
0432: hdr = new GmsHeader(GmsHeader.VIEW, new_view);
0433: hdr.my_digest = digest;
0434: view_change_msg.putHeader(name, hdr);
0435:
0436: ack_collector.reset(vid, members);
0437: size = ack_collector.size();
0438: passDown(new Event(Event.MSG, view_change_msg));
0439:
0440: try {
0441: ack_collector.waitForAllAcks(view_ack_collection_timeout);
0442: stop = System.currentTimeMillis();
0443: if (log.isTraceEnabled())
0444: log.trace("received all ACKs (" + size + ") for " + vid
0445: + " in " + (stop - start) + "ms");
0446: } catch (TimeoutException e) {
0447: log.warn("failed to collect all ACKs (" + size
0448: + ") for view " + new_view + " after "
0449: + view_ack_collection_timeout
0450: + "ms, missing ACKs from "
0451: + ack_collector.printMissing() + " (received="
0452: + ack_collector.printReceived() + "), local_addr="
0453: + local_addr);
0454: }
0455: }
0456:
0457: /**
0458: * Sets the new view and sends a VIEW_CHANGE event up and down the stack. If the view is a MergeView (subclass
0459: * of View), then digest will be non-null and has to be set before installing the view.
0460: */
0461: public void installView(View new_view, Digest digest) {
0462: if (digest != null)
0463: mergeDigest(digest);
0464: installView(new_view);
0465: }
0466:
0467: /**
0468: * Sets the new view and sends a VIEW_CHANGE event up and down the stack.
0469: */
0470: public void installView(View new_view) {
0471: Address coord;
0472: int rc;
0473: ViewId vid = new_view.getVid();
0474: Vector mbrs = new_view.getMembers();
0475:
0476: if (log.isDebugEnabled())
0477: log.debug("[local_addr=" + local_addr + "] view is "
0478: + new_view);
0479: if (stats) {
0480: num_views++;
0481: prev_views.add(new_view);
0482: }
0483:
0484: ack_collector.handleView(new_view);
0485:
0486: // Discards view with id lower than our own. Will be installed without check if first view
0487: if (view_id != null) {
0488: rc = vid.compareTo(view_id);
0489: if (rc <= 0) {
0490: if (log.isTraceEnabled() && rc < 0) // only scream if view is smaller, silently discard same views
0491: log.trace("[" + local_addr
0492: + "] received view < current view;"
0493: + " discarding it (current vid: " + view_id
0494: + ", new vid: " + vid + ')');
0495: return;
0496: }
0497: }
0498:
0499: ltime = Math.max(vid.getId(), ltime); // compute Lamport logical time
0500:
0501: /* Check for self-inclusion: if I'm not part of the new membership, I just discard it.
0502: This ensures that messages sent in view V1 are only received by members of V1 */
0503: if (checkSelfInclusion(mbrs) == false) {
0504: // only shun if this member was previously part of the group. avoids problem where multiple
0505: // members (e.g. X,Y,Z) join {A,B} concurrently, X is joined first, and Y and Z get view
0506: // {A,B,X}, which would cause Y and Z to be shunned as they are not part of the membership
0507: // bela Nov 20 2003
0508: if (shun && local_addr != null
0509: && prev_members.contains(local_addr)) {
0510: if (log.isWarnEnabled())
0511: log
0512: .warn("I ("
0513: + local_addr
0514: + ") am not a member of view "
0515: + new_view
0516: + ", shunning myself and leaving the group (prev_members are "
0517: + prev_members
0518: + ", current view is " + view + ")");
0519: if (impl != null)
0520: impl.handleExit();
0521: passUp(new Event(Event.EXIT));
0522: } else {
0523: if (log.isWarnEnabled())
0524: log.warn("I (" + local_addr
0525: + ") am not a member of view " + new_view
0526: + "; discarding view");
0527: }
0528: return;
0529: }
0530:
0531: synchronized (members) { // serialize access to views
0532: // assign new_view to view_id
0533: if (new_view instanceof MergeView)
0534: view = new View(new_view.getVid(), new_view
0535: .getMembers());
0536: else
0537: view = new_view;
0538: view_id = vid.copy();
0539:
0540: // Set the membership. Take into account joining members
0541: if (mbrs != null && mbrs.size() > 0) {
0542: members.set(mbrs);
0543: tmp_members.set(members);
0544: joining.removeAll(mbrs); // remove all members in mbrs from joining
0545: // remove all elements from 'leaving' that are not in 'mbrs'
0546: leaving.retainAll(mbrs);
0547:
0548: tmp_members.add(joining); // add members that haven't yet shown up in the membership
0549: tmp_members.remove(leaving); // remove members that haven't yet been removed from the membership
0550:
0551: // add to prev_members
0552: for (Iterator it = mbrs.iterator(); it.hasNext();) {
0553: Address addr = (Address) it.next();
0554: if (!prev_members.contains(addr))
0555: prev_members.add(addr);
0556: }
0557: }
0558:
0559: // Send VIEW_CHANGE event up and down the stack:
0560: Event view_event = new Event(Event.VIEW_CHANGE, new_view
0561: .clone());
0562: // changed order of passing view up and down (http://jira.jboss.com/jira/browse/JGRP-347)
0563: passUp(view_event);
0564: passDown(view_event); // needed e.g. by failure detector or UDP
0565:
0566: coord = determineCoordinator();
0567: // if(coord != null && coord.equals(local_addr) && !(coord.equals(vid.getCoordAddress()))) {
0568: // changed on suggestion by yaronr and Nicolas Piedeloupe
0569: if (coord != null && coord.equals(local_addr)
0570: && !haveCoordinatorRole()) {
0571: becomeCoordinator();
0572: } else {
0573: if (haveCoordinatorRole() && !local_addr.equals(coord))
0574: becomeParticipant();
0575: }
0576: }
0577: }
0578:
0579: protected Address determineCoordinator() {
0580: synchronized (members) {
0581: return members != null && members.size() > 0 ? (Address) members
0582: .elementAt(0)
0583: : null;
0584: }
0585: }
0586:
0587: /** Checks whether the potential_new_coord would be the new coordinator (2nd in line) */
0588: protected boolean wouldBeNewCoordinator(Address potential_new_coord) {
0589: Address new_coord;
0590:
0591: if (potential_new_coord == null)
0592: return false;
0593:
0594: synchronized (members) {
0595: if (members.size() < 2)
0596: return false;
0597: new_coord = (Address) members.elementAt(1); // member at 2nd place
0598: return new_coord != null
0599: && new_coord.equals(potential_new_coord);
0600: }
0601: }
0602:
0603: /** Returns true if local_addr is member of mbrs, else false */
0604: protected boolean checkSelfInclusion(Vector mbrs) {
0605: Object mbr;
0606: if (mbrs == null)
0607: return false;
0608: for (int i = 0; i < mbrs.size(); i++) {
0609: mbr = mbrs.elementAt(i);
0610: if (mbr != null && local_addr.equals(mbr))
0611: return true;
0612: }
0613: return false;
0614: }
0615:
0616: public View makeView(Vector mbrs) {
0617: Address coord = null;
0618: long id = 0;
0619:
0620: if (view_id != null) {
0621: coord = view_id.getCoordAddress();
0622: id = view_id.getId();
0623: }
0624: return new View(coord, id, mbrs);
0625: }
0626:
0627: public View makeView(Vector mbrs, ViewId vid) {
0628: Address coord = null;
0629: long id = 0;
0630:
0631: if (vid != null) {
0632: coord = vid.getCoordAddress();
0633: id = vid.getId();
0634: }
0635: return new View(coord, id, mbrs);
0636: }
0637:
0638: /** Send down a SET_DIGEST event */
0639: public void setDigest(Digest d) {
0640: passDown(new Event(Event.SET_DIGEST, d));
0641: }
0642:
0643: /** Send down a MERGE_DIGEST event */
0644: public void mergeDigest(Digest d) {
0645: passDown(new Event(Event.MERGE_DIGEST, d));
0646: }
0647:
0648: /** Sends down a GET_DIGEST event and waits for the GET_DIGEST_OK response, or
0649: timeout, whichever occurs first */
0650: public Digest getDigest() {
0651: Digest ret = null;
0652:
0653: digest_promise.reset();
0654: passDown(Event.GET_DIGEST_EVT);
0655: try {
0656: ret = (Digest) digest_promise
0657: .getResultWithTimeout(digest_timeout);
0658: } catch (TimeoutException e) {
0659: if (log.isErrorEnabled())
0660: log.error("digest could not be fetched from below");
0661: }
0662: return ret;
0663: }
0664:
0665: boolean startFlush(View new_view, int numberOfAttempts) {
0666: boolean successfulFlush = false;
0667: Vector membersInNewView = new_view.getMembers();
0668: if (membersInNewView == null || membersInNewView.isEmpty()) {
0669: //there are no members to FLUSH
0670: successfulFlush = true;
0671: } else {
0672: flush_promise.reset();
0673: passUp(new Event(Event.SUSPEND, new_view));
0674: try {
0675: Boolean r = (Boolean) flush_promise
0676: .getResultWithTimeout(flush_timeout);
0677: successfulFlush = r.booleanValue();
0678: } catch (TimeoutException e) {
0679: log
0680: .warn("GMS coordinator "
0681: + local_addr
0682: + " timed out waiting for flush responses after "
0683: + flush_timeout + " msec");
0684: }
0685:
0686: if (!successfulFlush && numberOfAttempts > 0) {
0687:
0688: long backOffSleepTime = Util.random(5000);
0689: if (log.isInfoEnabled())
0690: log
0691: .info("Flush in progress detected at GMS coordinator "
0692: + local_addr
0693: + ". Backing off for "
0694: + backOffSleepTime
0695: + " ms. Attempts left "
0696: + numberOfAttempts);
0697:
0698: Util.sleepRandom(backOffSleepTime);
0699: successfulFlush = startFlush(new_view,
0700: --numberOfAttempts);
0701: }
0702: }
0703: return successfulFlush;
0704: }
0705:
0706: void stopFlush(View view) {
0707:
0708: //since we did not call startFlush on
0709: //empty view do not call RESUME either
0710: if (view != null && view.getMembers().isEmpty())
0711: return;
0712:
0713: if (log.isDebugEnabled()) {
0714: log.debug("sending RESUME event");
0715: }
0716: passUp(new Event(Event.RESUME));
0717: }
0718:
0719: public void up(Event evt) {
0720: Object obj;
0721: Message msg;
0722: GmsHeader hdr;
0723: MergeData merge_data;
0724:
0725: switch (evt.getType()) {
0726:
0727: case Event.MSG:
0728: msg = (Message) evt.getArg();
0729: obj = msg.getHeader(name);
0730: if (obj == null || !(obj instanceof GmsHeader))
0731: break;
0732: hdr = (GmsHeader) msg.removeHeader(name);
0733: switch (hdr.type) {
0734: case GmsHeader.JOIN_REQ:
0735: view_handler.add(new Request(Request.JOIN, hdr.mbr,
0736: false, null));
0737: break;
0738: case GmsHeader.JOIN_RSP:
0739: impl.handleJoinResponse(hdr.join_rsp);
0740: break;
0741: case GmsHeader.LEAVE_REQ:
0742: if (log.isDebugEnabled())
0743: log.debug("received LEAVE_REQ for " + hdr.mbr
0744: + " from " + msg.getSrc());
0745: if (hdr.mbr == null) {
0746: if (log.isErrorEnabled())
0747: log.error("LEAVE_REQ's mbr field is null");
0748: return;
0749: }
0750: view_handler.add(new Request(Request.LEAVE, hdr.mbr,
0751: false, null));
0752: break;
0753: case GmsHeader.LEAVE_RSP:
0754: impl.handleLeaveResponse();
0755: break;
0756: case GmsHeader.VIEW:
0757: if (hdr.view == null) {
0758: if (log.isErrorEnabled())
0759: log.error("[VIEW]: view == null");
0760: return;
0761: }
0762:
0763: // send VIEW_ACK to sender of view
0764: Address coord = msg.getSrc();
0765: Message view_ack = new Message(coord, null, null);
0766: GmsHeader tmphdr = new GmsHeader(GmsHeader.VIEW_ACK,
0767: hdr.view);
0768: view_ack.putHeader(name, tmphdr);
0769: if (log.isTraceEnabled())
0770: log.trace("sending VIEW_ACK to " + coord);
0771: passDown(new Event(Event.MSG, view_ack));
0772: impl.handleViewChange(hdr.view, hdr.my_digest);
0773: break;
0774:
0775: case GmsHeader.VIEW_ACK:
0776: Object sender = msg.getSrc();
0777: ack_collector.ack(sender);
0778: return; // don't pass further up
0779:
0780: case GmsHeader.MERGE_REQ:
0781: impl.handleMergeRequest(msg.getSrc(), hdr.merge_id);
0782: break;
0783:
0784: case GmsHeader.MERGE_RSP:
0785: merge_data = new MergeData(msg.getSrc(), hdr.view,
0786: hdr.my_digest);
0787: merge_data.merge_rejected = hdr.merge_rejected;
0788: impl.handleMergeResponse(merge_data, hdr.merge_id);
0789: break;
0790:
0791: case GmsHeader.INSTALL_MERGE_VIEW:
0792: impl.handleMergeView(new MergeData(msg.getSrc(),
0793: hdr.view, hdr.my_digest), hdr.merge_id);
0794: break;
0795:
0796: case GmsHeader.CANCEL_MERGE:
0797: impl.handleMergeCancelled(hdr.merge_id);
0798: break;
0799:
0800: default:
0801: if (log.isErrorEnabled())
0802: log.error("GmsHeader with type=" + hdr.type
0803: + " not known");
0804: }
0805: return; // don't pass up
0806:
0807: case Event.CONNECT_OK: // sent by someone else, but WE are responsible for sending this !
0808: case Event.DISCONNECT_OK: // dito (e.g. sent by TP layer). Don't send up the stack
0809: return;
0810:
0811: case Event.SET_LOCAL_ADDRESS:
0812: local_addr = (Address) evt.getArg();
0813: if (print_local_addr) {
0814: System.out
0815: .println("\n-------------------------------------------------------\n"
0816: + "GMS: address is "
0817: + local_addr
0818: + "\n-------------------------------------------------------");
0819: }
0820: break; // pass up
0821:
0822: case Event.SUSPECT:
0823: Address suspected = (Address) evt.getArg();
0824: view_handler.add(new Request(Request.SUSPECT, suspected,
0825: true, null));
0826: ack_collector.suspect(suspected);
0827: break; // pass up
0828:
0829: case Event.UNSUSPECT:
0830: impl.unsuspect((Address) evt.getArg());
0831: return; // discard
0832:
0833: case Event.MERGE:
0834: view_handler.add(new Request(Request.MERGE, null, false,
0835: (Vector) evt.getArg()));
0836: return; // don't pass up
0837: }
0838:
0839: if (impl.handleUpEvent(evt))
0840: passUp(evt);
0841: }
0842:
0843: /**
0844: This method is overridden to avoid hanging on getDigest(): when a JOIN is received, the coordinator needs
0845: to retrieve the digest from the NAKACK layer. It therefore sends down a GET_DIGEST event, to which the NAKACK layer
0846: responds with a GET_DIGEST_OK event.<p>
0847: However, the GET_DIGEST_OK event will not be processed because the thread handling the JOIN request won't process
0848: the GET_DIGEST_OK event until the JOIN event returns. The receiveUpEvent() method is executed by the up-handler
0849: thread of the lower protocol and therefore can handle the event. All we do here is unblock the mutex on which
0850: JOIN is waiting, allowing JOIN to return with a valid digest. The GET_DIGEST_OK event is then discarded, because
0851: it won't be processed twice.
0852: */
0853: public void receiveUpEvent(Event evt) {
0854: switch (evt.getType()) {
0855: case Event.GET_DIGEST_OK:
0856: digest_promise.setResult(evt.getArg());
0857: return; // don't pass further up
0858: }
0859: super .receiveUpEvent(evt);
0860: }
0861:
0862: public void down(Event evt) {
0863: switch (evt.getType()) {
0864:
0865: case Event.CONNECT:
0866: Object arg = null;
0867: passDown(evt);
0868: if (local_addr == null)
0869: if (log.isFatalEnabled())
0870: log.fatal("[CONNECT] local_addr is null");
0871: try {
0872: impl.join(local_addr);
0873: } catch (SecurityException e) {
0874: arg = e;
0875: }
0876: passUp(new Event(Event.CONNECT_OK, arg));
0877: return; // don't pass down: was already passed down
0878:
0879: case Event.DISCONNECT:
0880: impl.leave((Address) evt.getArg());
0881: if (!(impl instanceof CoordGmsImpl)) {
0882: passUp(new Event(Event.DISCONNECT_OK));
0883: initState(); // in case connect() is called again
0884: }
0885: break; // pass down
0886: case Event.SUSPEND_OK:
0887: flush_promise.setResult(Boolean.TRUE);
0888: break;
0889:
0890: case Event.SUSPEND_FAILED:
0891: flush_promise.setResult(Boolean.FALSE);
0892: break;
0893:
0894: case Event.CONFIG:
0895: Map config = (Map) evt.getArg();
0896: if (config != null && config.containsKey("flush_timeout")) {
0897: Long ftimeout = (Long) config.get("flush_timeout");
0898: use_flush = true;
0899: flush_timeout = ftimeout.longValue();
0900: }
0901: if ((config != null && !config
0902: .containsKey("flush_suported"))) {
0903: flushProtocolInStack = true;
0904: }
0905: break;
0906: }
0907:
0908: passDown(evt);
0909: }
0910:
0911: /** Setup the Protocol instance according to the configuration string */
0912: public boolean setProperties(Properties props) {
0913: String str;
0914:
0915: super .setProperties(props);
0916: str = props.getProperty("shun");
0917: if (str != null) {
0918: shun = Boolean.valueOf(str).booleanValue();
0919: props.remove("shun");
0920: }
0921:
0922: str = props.getProperty("merge_leader");
0923: if (str != null) {
0924: merge_leader = Boolean.valueOf(str).booleanValue();
0925: props.remove("merge_leader");
0926: }
0927:
0928: str = props.getProperty("print_local_addr");
0929: if (str != null) {
0930: print_local_addr = Boolean.valueOf(str).booleanValue();
0931: props.remove("print_local_addr");
0932: }
0933:
0934: str = props.getProperty("join_timeout"); // time to wait for JOIN
0935: if (str != null) {
0936: join_timeout = Long.parseLong(str);
0937: props.remove("join_timeout");
0938: }
0939:
0940: str = props.getProperty("join_retry_timeout"); // time to wait between JOINs
0941: if (str != null) {
0942: join_retry_timeout = Long.parseLong(str);
0943: props.remove("join_retry_timeout");
0944: }
0945:
0946: str = props.getProperty("leave_timeout"); // time to wait until coord responds to LEAVE req.
0947: if (str != null) {
0948: leave_timeout = Long.parseLong(str);
0949: props.remove("leave_timeout");
0950: }
0951:
0952: str = props.getProperty("merge_timeout"); // time to wait for MERGE_RSPS from subgroup coordinators
0953: if (str != null) {
0954: merge_timeout = Long.parseLong(str);
0955: props.remove("merge_timeout");
0956: }
0957:
0958: str = props.getProperty("digest_timeout"); // time to wait for GET_DIGEST_OK from PBCAST
0959: if (str != null) {
0960: digest_timeout = Long.parseLong(str);
0961: props.remove("digest_timeout");
0962: }
0963:
0964: str = props.getProperty("view_ack_collection_timeout");
0965: if (str != null) {
0966: view_ack_collection_timeout = Long.parseLong(str);
0967: props.remove("view_ack_collection_timeout");
0968: }
0969:
0970: str = props.getProperty("resume_task_timeout");
0971: if (str != null) {
0972: resume_task_timeout = Long.parseLong(str);
0973: props.remove("resume_task_timeout");
0974: }
0975:
0976: str = props.getProperty("disable_initial_coord");
0977: if (str != null) {
0978: disable_initial_coord = Boolean.valueOf(str).booleanValue();
0979: props.remove("disable_initial_coord");
0980: }
0981:
0982: str = props.getProperty("handle_concurrent_startup");
0983: if (str != null) {
0984: handle_concurrent_startup = Boolean.valueOf(str)
0985: .booleanValue();
0986: props.remove("handle_concurrent_startup");
0987: }
0988:
0989: str = props.getProperty("num_prev_mbrs");
0990: if (str != null) {
0991: num_prev_mbrs = Integer.parseInt(str);
0992: props.remove("num_prev_mbrs");
0993: }
0994:
0995: str = props.getProperty("use_flush");
0996: if (str != null) {
0997: use_flush = Boolean.valueOf(str).booleanValue();
0998: props.remove("use_flush");
0999: }
1000:
1001: str = props.getProperty("flush_timeout");
1002: if (str != null) {
1003: flush_timeout = Long.parseLong(str);
1004: props.remove("flush_timeout");
1005: }
1006:
1007: str = props.getProperty("view_bundling");
1008: if (str != null) {
1009: view_bundling = Boolean.valueOf(str).booleanValue();
1010: props.remove("view_bundling");
1011: }
1012:
1013: str = props.getProperty("max_bundling_time");
1014: if (str != null) {
1015: max_bundling_time = Long.parseLong(str);
1016: props.remove("max_bundling_time");
1017: }
1018:
1019: if (props.size() > 0) {
1020: log.error("the following properties are not recognized: "
1021: + props);
1022: return false;
1023: }
1024: return true;
1025: }
1026:
1027: /* ------------------------------- Private Methods --------------------------------- */
1028:
1029: final void initState() {
1030: becomeClient();
1031: view_id = null;
1032: view = null;
1033: }
1034:
1035: /* --------------------------- End of Private Methods ------------------------------- */
1036:
1037: public static class GmsHeader extends Header implements Streamable {
1038: public static final byte JOIN_REQ = 1;
1039: public static final byte JOIN_RSP = 2;
1040: public static final byte LEAVE_REQ = 3;
1041: public static final byte LEAVE_RSP = 4;
1042: public static final byte VIEW = 5;
1043: public static final byte MERGE_REQ = 6;
1044: public static final byte MERGE_RSP = 7;
1045: public static final byte INSTALL_MERGE_VIEW = 8;
1046: public static final byte CANCEL_MERGE = 9;
1047: public static final byte VIEW_ACK = 10;
1048:
1049: byte type = 0;
1050: View view = null; // used when type=VIEW or MERGE_RSP or INSTALL_MERGE_VIEW
1051: Address mbr = null; // used when type=JOIN_REQ or LEAVE_REQ
1052: JoinRsp join_rsp = null; // used when type=JOIN_RSP
1053: Digest my_digest = null; // used when type=MERGE_RSP or INSTALL_MERGE_VIEW
1054: ViewId merge_id = null; // used when type=MERGE_REQ or MERGE_RSP or INSTALL_MERGE_VIEW or CANCEL_MERGE
1055: boolean merge_rejected = false; // used when type=MERGE_RSP
1056:
1057: public GmsHeader() {
1058: } // used for Externalization
1059:
1060: public GmsHeader(byte type) {
1061: this .type = type;
1062: }
1063:
1064: /** Used for VIEW header */
1065: public GmsHeader(byte type, View view) {
1066: this .type = type;
1067: this .view = view;
1068: }
1069:
1070: /** Used for JOIN_REQ or LEAVE_REQ header */
1071: public GmsHeader(byte type, Address mbr) {
1072: this .type = type;
1073: this .mbr = mbr;
1074: }
1075:
1076: /** Used for JOIN_RSP header */
1077: public GmsHeader(byte type, JoinRsp join_rsp) {
1078: this .type = type;
1079: this .join_rsp = join_rsp;
1080: }
1081:
1082: public byte getType() {
1083: return type;
1084: }
1085:
1086: public Address getMemeber() {
1087: return mbr;
1088: }
1089:
1090: public String toString() {
1091: StringBuffer sb = new StringBuffer("GmsHeader");
1092: sb.append('[' + type2String(type) + ']');
1093: switch (type) {
1094: case JOIN_REQ:
1095: sb.append(": mbr=" + mbr);
1096: break;
1097:
1098: case JOIN_RSP:
1099: sb.append(": join_rsp=" + join_rsp);
1100: break;
1101:
1102: case LEAVE_REQ:
1103: sb.append(": mbr=" + mbr);
1104: break;
1105:
1106: case LEAVE_RSP:
1107: break;
1108:
1109: case VIEW:
1110: case VIEW_ACK:
1111: sb.append(": view=" + view);
1112: break;
1113:
1114: case MERGE_REQ:
1115: sb.append(": merge_id=" + merge_id);
1116: break;
1117:
1118: case MERGE_RSP:
1119: sb.append(": view=" + view + ", digest=" + my_digest
1120: + ", merge_rejected=" + merge_rejected
1121: + ", merge_id=" + merge_id);
1122: break;
1123:
1124: case INSTALL_MERGE_VIEW:
1125: sb.append(": view=" + view + ", digest=" + my_digest);
1126: break;
1127:
1128: case CANCEL_MERGE:
1129: sb.append(", <merge cancelled>, merge_id=" + merge_id);
1130: break;
1131: }
1132: return sb.toString();
1133: }
1134:
1135: public static String type2String(int type) {
1136: switch (type) {
1137: case JOIN_REQ:
1138: return "JOIN_REQ";
1139: case JOIN_RSP:
1140: return "JOIN_RSP";
1141: case LEAVE_REQ:
1142: return "LEAVE_REQ";
1143: case LEAVE_RSP:
1144: return "LEAVE_RSP";
1145: case VIEW:
1146: return "VIEW";
1147: case MERGE_REQ:
1148: return "MERGE_REQ";
1149: case MERGE_RSP:
1150: return "MERGE_RSP";
1151: case INSTALL_MERGE_VIEW:
1152: return "INSTALL_MERGE_VIEW";
1153: case CANCEL_MERGE:
1154: return "CANCEL_MERGE";
1155: case VIEW_ACK:
1156: return "VIEW_ACK";
1157: default:
1158: return "<unknown>";
1159: }
1160: }
1161:
1162: public void writeExternal(ObjectOutput out) throws IOException {
1163: out.writeByte(type);
1164: out.writeObject(view);
1165: out.writeObject(mbr);
1166: out.writeObject(join_rsp);
1167: out.writeObject(my_digest);
1168: out.writeObject(merge_id);
1169: out.writeBoolean(merge_rejected);
1170: }
1171:
1172: public void readExternal(ObjectInput in) throws IOException,
1173: ClassNotFoundException {
1174: type = in.readByte();
1175: view = (View) in.readObject();
1176: mbr = (Address) in.readObject();
1177: join_rsp = (JoinRsp) in.readObject();
1178: my_digest = (Digest) in.readObject();
1179: merge_id = (ViewId) in.readObject();
1180: merge_rejected = in.readBoolean();
1181: }
1182:
1183: public void writeTo(DataOutputStream out) throws IOException {
1184: out.writeByte(type);
1185: boolean isMergeView = view != null
1186: && view instanceof MergeView;
1187: out.writeBoolean(isMergeView);
1188: Util.writeStreamable(view, out);
1189: Util.writeAddress(mbr, out);
1190: Util.writeStreamable(join_rsp, out);
1191: Util.writeStreamable(my_digest, out);
1192: Util.writeStreamable(merge_id, out); // kludge: we know merge_id is a ViewId
1193: out.writeBoolean(merge_rejected);
1194: }
1195:
1196: public void readFrom(DataInputStream in) throws IOException,
1197: IllegalAccessException, InstantiationException {
1198: type = in.readByte();
1199: boolean isMergeView = in.readBoolean();
1200: if (isMergeView)
1201: view = (View) Util.readStreamable(MergeView.class, in);
1202: else
1203: view = (View) Util.readStreamable(View.class, in);
1204: mbr = Util.readAddress(in);
1205: join_rsp = (JoinRsp) Util.readStreamable(JoinRsp.class, in);
1206: my_digest = (Digest) Util.readStreamable(Digest.class, in);
1207: merge_id = (ViewId) Util.readStreamable(ViewId.class, in);
1208: merge_rejected = in.readBoolean();
1209: }
1210:
1211: public long size() {
1212: long retval = Global.BYTE_SIZE * 2; // type + merge_rejected
1213:
1214: retval += Global.BYTE_SIZE; // presence view
1215: retval += Global.BYTE_SIZE; // MergeView or View
1216: if (view != null)
1217: retval += view.serializedSize();
1218:
1219: retval += Util.size(mbr);
1220:
1221: retval += Global.BYTE_SIZE; // presence of join_rsp
1222: if (join_rsp != null)
1223: retval += join_rsp.serializedSize();
1224:
1225: retval += Global.BYTE_SIZE; // presence for my_digest
1226: if (my_digest != null)
1227: retval += my_digest.serializedSize();
1228:
1229: retval += Global.BYTE_SIZE; // presence for merge_id
1230: if (merge_id != null)
1231: retval += merge_id.serializedSize();
1232: return retval;
1233: }
1234:
1235: }
1236:
1237: public static class Request {
1238: static final int JOIN = 1;
1239: static final int LEAVE = 2;
1240: static final int SUSPECT = 3;
1241: static final int MERGE = 4;
1242: static final int VIEW = 5;
1243:
1244: int type = -1;
1245: Address mbr;
1246: boolean suspected;
1247: Vector coordinators;
1248: View view;
1249: Digest digest;
1250: List target_members;
1251:
1252: Request(int type) {
1253: this .type = type;
1254: }
1255:
1256: Request(int type, Address mbr, boolean suspected,
1257: Vector coordinators) {
1258: this .type = type;
1259: this .mbr = mbr;
1260: this .suspected = suspected;
1261: this .coordinators = coordinators;
1262: }
1263:
1264: public int getType() {
1265: return type;
1266: }
1267:
1268: public String toString() {
1269: switch (type) {
1270: case JOIN:
1271: return "JOIN(" + mbr + ")";
1272: case LEAVE:
1273: return "LEAVE(" + mbr + ", " + suspected + ")";
1274: case SUSPECT:
1275: return "SUSPECT(" + mbr + ")";
1276: case MERGE:
1277: return "MERGE(" + coordinators + ")";
1278: case VIEW:
1279: return "VIEW (" + view.getVid() + ")";
1280: }
1281: return "<invalid (type=" + type + ")";
1282: }
1283:
1284: /**
1285: * Specifies whether this request can be processed with other request simultaneously
1286: */
1287: public boolean canBeProcessedTogether(Request other) {
1288: if (other == null)
1289: return false;
1290: int other_type = other.getType();
1291: return (type == JOIN || type == LEAVE || type == SUSPECT)
1292: && (other_type == JOIN || other_type == LEAVE || other_type == SUSPECT);
1293: }
1294: }
1295:
1296: /**
1297: * Class which processes JOIN, LEAVE and MERGE requests. Requests are queued and processed in FIFO order
1298: * @author Bela Ban
1299: * @version $Id: GMS.java,v 1.68.2.5 2007/04/27 08:03:55 belaban Exp $
1300: */
1301: class ViewHandler implements Runnable {
1302: volatile Thread thread;
1303: Queue q = new Queue(); // Queue<Request>
1304: boolean suspended = false;
1305: final static long INTERVAL = 5000;
1306: private static final long MAX_COMPLETION_TIME = 10000;
1307: /** Maintains a list of the last 20 requests */
1308: private final BoundedList history = new BoundedList(20);
1309:
1310: /** Map<Object,TimeScheduler.CancellableTask>. Keeps track of Resumer tasks which have not fired yet */
1311: private final Map resume_tasks = new HashMap();
1312: private Object merge_id = null;
1313:
1314: void add(Request req) {
1315: add(req, false, false);
1316: }
1317:
1318: synchronized void add(Request req, boolean at_head,
1319: boolean unsuspend) {
1320: if (suspended && !unsuspend) {
1321: log.warn("queue is suspended; request " + req
1322: + " is discarded");
1323: return;
1324: }
1325: start(unsuspend);
1326: try {
1327: if (at_head)
1328: q.addAtHead(req);
1329: else
1330: q.add(req);
1331: history.add(new Date() + ": " + req.toString());
1332: } catch (QueueClosedException e) {
1333: if (log.isTraceEnabled())
1334: log.trace("queue is closed; request " + req
1335: + " is discarded");
1336: }
1337: }
1338:
1339: void waitUntilCompleted(long timeout) {
1340: waitUntilCompleted(timeout, false);
1341: }
1342:
1343: synchronized void waitUntilCompleted(long timeout,
1344: boolean resume) {
1345: if (thread != null) {
1346: try {
1347: thread.join(timeout);
1348: } catch (InterruptedException e) {
1349: }
1350: }
1351: if (resume)
1352: resumeForce();
1353: }
1354:
1355: /**
1356: * Waits until the current request has been processes, then clears the queue and discards new
1357: * requests from now on
1358: */
1359: public synchronized void suspend(Object merge_id) {
1360: if (suspended)
1361: return;
1362: suspended = true;
1363: this .merge_id = merge_id;
1364: q.clear();
1365: waitUntilCompleted(MAX_COMPLETION_TIME);
1366: q.close(true);
1367: if (log.isTraceEnabled())
1368: log.trace("suspended ViewHandler");
1369: Resumer r = new Resumer(resume_task_timeout, merge_id,
1370: resume_tasks, this );
1371: resume_tasks.put(merge_id, r);
1372: timer.add(r);
1373: }
1374:
1375: public synchronized void resume(Object merge_id) {
1376: if (!suspended)
1377: return;
1378: boolean same_merge_id = this .merge_id != null
1379: && merge_id != null
1380: && this .merge_id.equals(merge_id);
1381: same_merge_id = same_merge_id
1382: || (this .merge_id == null && merge_id == null);
1383:
1384: if (!same_merge_id) {
1385: if (log.isWarnEnabled())
1386: log.warn("resume(" + merge_id + ") does not match "
1387: + this .merge_id + ", ignoring resume()");
1388: return;
1389: }
1390: synchronized (resume_tasks) {
1391: TimeScheduler.CancellableTask task = (TimeScheduler.CancellableTask) resume_tasks
1392: .get(merge_id);
1393: if (task != null) {
1394: task.cancel();
1395: resume_tasks.remove(merge_id);
1396: }
1397: }
1398: resumeForce();
1399: }
1400:
1401: public synchronized void resumeForce() {
1402: if (q.closed())
1403: q.reset();
1404: suspended = false;
1405: if (log.isTraceEnabled())
1406: log.trace("resumed ViewHandler");
1407: }
1408:
1409: public void run() {
1410: long start, stop, wait_time;
1411: List requests = new LinkedList();
1412: while (Thread.currentThread().equals(thread)) {
1413: requests.clear();
1414: try {
1415: boolean keepGoing = false;
1416: start = System.currentTimeMillis();
1417: do {
1418: Request firstRequest = (Request) q
1419: .remove(INTERVAL); // throws a TimeoutException if it runs into timeout
1420: requests.add(firstRequest);
1421: if (q.size() > 0) {
1422: Request nextReq = (Request) q.peek();
1423: keepGoing = view_bundling
1424: && firstRequest
1425: .canBeProcessedTogether(nextReq);
1426: } else {
1427: stop = System.currentTimeMillis();
1428: wait_time = max_bundling_time
1429: - (stop - start);
1430: if (wait_time > 0)
1431: Util.sleep(wait_time);
1432: keepGoing = q.size() > 0;
1433: }
1434: } while (keepGoing);
1435: process(requests);
1436: } catch (QueueClosedException e) {
1437: break;
1438: } catch (TimeoutException e) {
1439: break;
1440: } catch (Throwable catchall) {
1441: Util.sleep(50);
1442: }
1443: }
1444: }
1445:
1446: public int size() {
1447: return q.size();
1448: }
1449:
1450: public boolean suspended() {
1451: return suspended;
1452: }
1453:
1454: public String dumpQueue() {
1455: StringBuffer sb = new StringBuffer();
1456: List v = q.values();
1457: for (Iterator it = v.iterator(); it.hasNext();) {
1458: sb.append(it.next() + "\n");
1459: }
1460: return sb.toString();
1461: }
1462:
1463: public String dumpHistory() {
1464: StringBuffer sb = new StringBuffer();
1465: for (Enumeration en = history.elements(); en
1466: .hasMoreElements();) {
1467: sb.append(en.nextElement() + "\n");
1468: }
1469: return sb.toString();
1470: }
1471:
1472: private void process(List requests) {
1473: if (requests.isEmpty())
1474: return;
1475: if (log.isTraceEnabled())
1476: log.trace("processing " + requests);
1477: Request firstReq = (Request) requests.get(0);
1478: switch (firstReq.type) {
1479: case Request.JOIN:
1480: case Request.LEAVE:
1481: case Request.SUSPECT:
1482: Collection newMembers = new LinkedHashSet(requests
1483: .size());
1484: Collection suspectedMembers = new LinkedHashSet(
1485: requests.size());
1486: Collection oldMembers = new LinkedHashSet(requests
1487: .size());
1488: for (Iterator i = requests.iterator(); i.hasNext();) {
1489: Request req = (Request) i.next();
1490: switch (req.type) {
1491: case Request.JOIN:
1492: newMembers.add(req.mbr);
1493: break;
1494: case Request.LEAVE:
1495: if (req.suspected)
1496: suspectedMembers.add(req.mbr);
1497: else
1498: oldMembers.add(req.mbr);
1499: break;
1500: case Request.SUSPECT:
1501: suspectedMembers.add(req.mbr);
1502: break;
1503: }
1504: }
1505: impl.handleMembershipChange(newMembers, oldMembers,
1506: suspectedMembers);
1507: break;
1508: case Request.MERGE:
1509: if (requests.size() > 1)
1510: log
1511: .error("more than one MERGE request to process, ignoring the others");
1512: impl.merge(firstReq.coordinators);
1513: break;
1514: case Request.VIEW:
1515: if (requests.size() > 1)
1516: log
1517: .error("more than one VIEW request to process, ignoring the others");
1518: try {
1519: if (use_flush) {
1520: boolean successfulFlush = startFlush(
1521: firstReq.view, 3);
1522: if (successfulFlush) {
1523: log
1524: .info("Successful GMS flush by coordinator at "
1525: + getLocalAddress());
1526: }
1527: }
1528: castViewChangeWithDest(firstReq.view,
1529: firstReq.digest, firstReq.target_members);
1530: } finally {
1531: if (use_flush)
1532: stopFlush(firstReq.view);
1533: }
1534: break;
1535: default:
1536: log.error("request " + firstReq.type
1537: + " is unknown; discarded");
1538: }
1539: }
1540:
1541: synchronized void start(boolean unsuspend) {
1542: if (q.closed())
1543: q.reset();
1544: if (unsuspend) {
1545: suspended = false;
1546: synchronized (resume_tasks) {
1547: TimeScheduler.CancellableTask task = (TimeScheduler.CancellableTask) resume_tasks
1548: .get(merge_id);
1549: if (task != null) {
1550: task.cancel();
1551: resume_tasks.remove(merge_id);
1552: }
1553: }
1554: }
1555: merge_id = null;
1556: if (thread == null || !thread.isAlive()) {
1557: thread = new Thread(Util.getGlobalThreadGroup(), this ,
1558: "ViewHandler");
1559: thread.setDaemon(false); // thread cannot terminate if we have tasks left, e.g. when we as coord leave
1560: thread.start();
1561: if (log.isTraceEnabled())
1562: log.trace("ViewHandler started");
1563: }
1564: }
1565:
1566: synchronized void stop(boolean flush) {
1567: q.close(flush);
1568: TimeScheduler.CancellableTask task;
1569: synchronized (resume_tasks) {
1570: for (Iterator it = resume_tasks.values().iterator(); it
1571: .hasNext();) {
1572: task = (TimeScheduler.CancellableTask) it.next();
1573: task.cancel();
1574: }
1575: resume_tasks.clear();
1576: }
1577: merge_id = null;
1578: // resumeForce();
1579: }
1580: }
1581:
1582: /**
1583: * Resumer is a second line of defense: when the ViewHandler is suspended, it will be resumed when the current
1584: * merge is cancelled, or when the merge completes. However, in a case where this never happens (this
1585: * shouldn't be the case !), the Resumer will nevertheless resume the ViewHandler.
1586: * We chose this strategy because ViewHandler is critical: if it is suspended indefinitely, we would
1587: * not be able to process new JOIN requests ! So, this is for peace of mind, although it most likely
1588: * will never be used...
1589: */
1590: static class Resumer implements TimeScheduler.CancellableTask {
1591: boolean cancelled = false;
1592: long interval;
1593: final Object token;
1594: final Map tasks;
1595: final ViewHandler handler;
1596:
1597: public Resumer(long interval, final Object token, final Map t,
1598: final ViewHandler handler) {
1599: this .interval = interval;
1600: this .token = token;
1601: this .tasks = t;
1602: this .handler = handler;
1603: }
1604:
1605: public void cancel() {
1606: cancelled = true;
1607: }
1608:
1609: public boolean cancelled() {
1610: return cancelled;
1611: }
1612:
1613: public long nextInterval() {
1614: return interval;
1615: }
1616:
1617: public void run() {
1618: TimeScheduler.CancellableTask t;
1619: boolean execute = true;
1620: synchronized (tasks) {
1621: t = (TimeScheduler.CancellableTask) tasks.get(token);
1622: if (t != null) {
1623: t.cancel();
1624: execute = true;
1625: } else {
1626: execute = false;
1627: }
1628: tasks.remove(token);
1629: }
1630: if (execute) {
1631: handler.resume(token);
1632: }
1633: }
1634: }
1635:
1636: }
|