001: // $Id: GMS.java,v 1.16.10.1 2007/04/27 08:03:51 belaban Exp $
002:
003: package org.jgroups.protocols;
004:
005: import org.jgroups.*;
006: import org.jgroups.blocks.GroupRequest;
007: import org.jgroups.blocks.MethodCall;
008: import org.jgroups.stack.Protocol;
009: import org.jgroups.stack.RpcProtocol;
010: import org.jgroups.util.Queue;
011: import org.jgroups.util.QueueClosedException;
012:
013: import java.util.Hashtable;
014: import java.util.Properties;
015: import java.util.Vector;
016:
017: /**
018: * Group membership protocol. Handles joins/leaves/crashes (suspicions) and emits new views
019: * accordingly. Use VIEW_ENFORCER on top of this layer to make sure new members don't receive
020: * any messages until they are members.
021: *
022: * @author Bela Ban
023: */
024: public class GMS extends RpcProtocol implements Runnable {
025: private GmsImpl impl = null;
026: public Address local_addr = null;
027: public String group_addr = null;
028: public final Membership mbrs = new Membership();
029: public ViewId view_id = null;
030: public long ltime = 0;
031: public long join_timeout = 5000;
032: public long join_retry_timeout = 2000;
033: private long flush_timeout = 0; // 0=wait forever until FLUSH completes
034: private long rebroadcast_timeout = 0; // 0=wait forever until REBROADCAST completes
035: private long view_change_timeout = 10000; // until all handleViewChange() RPCs have returned
036: public long leave_timeout = 5000;
037: public final Object impl_mutex = new Object(); // synchronizes event entry into impl
038: public final Object view_mutex = new Object(); // synchronizes view installations
039: private Queue event_queue = new Queue(); // stores SUSPECT, MERGE events
040: private Thread evt_thread = null;
041: private final Object flush_mutex = new Object();
042: private FlushRsp flush_rsp = null;
043: private final Object rebroadcast_mutex = new Object();
044: private boolean rebroadcast_unstable_msgs = true;
045: private boolean print_local_addr = true;
046: boolean disable_initial_coord = false; // can the member become a coord on startup or not ?
047: private final Hashtable impls = new Hashtable();
048: static final String CLIENT = "Client";
049: static final String COORD = "Coordinator";
050: static final String PART = "Participant";
051:
052: public static final String name = "GMS";
053:
054: public GMS() {
055: initState();
056: }
057:
058: public String getName() {
059: return name;
060: }
061:
062: public Vector requiredDownServices() {
063: Vector retval = new Vector();
064: retval.addElement(new Integer(Event.FLUSH));
065: retval.addElement(new Integer(Event.FIND_INITIAL_MBRS));
066: return retval;
067: }
068:
069: public void setImpl(GmsImpl new_impl) {
070: synchronized (impl_mutex) {
071: impl = new_impl;
072: if (log.isInfoEnabled())
073: log.info("changed role to "
074: + new_impl.getClass().getName());
075: }
076: }
077:
078: public void start() throws Exception {
079: super .start();
080: if (checkForViewEnforcer(up_prot) == false) {
081: if (log.isWarnEnabled())
082: log
083: .warn("I need protocol layer "
084: + "VIEW_ENFORCER above me to discard messages sent to me while I'm "
085: + "not yet a group member ! Otherwise, these messages will be delivered "
086: + "to the application without checking...\n");
087: }
088:
089: if (_corr != null)
090: _corr.setDeadlockDetection(true);
091: else
092: throw new Exception(
093: "GMS.start(): cannot set deadlock detection in corr, as it is null !");
094: }
095:
096: public void becomeCoordinator() {
097: CoordGmsImpl tmp = (CoordGmsImpl) impls.get(COORD);
098:
099: if (tmp == null) {
100: tmp = new CoordGmsImpl(this );
101: tmp.leaving = false;
102: tmp.received_last_view = false; // +++ ?
103: impls.put(COORD, tmp);
104: }
105:
106: setImpl(tmp);
107: }
108:
109: public void becomeParticipant() {
110: ParticipantGmsImpl tmp = (ParticipantGmsImpl) impls.get(PART);
111:
112: if (tmp == null) {
113: tmp = new ParticipantGmsImpl(this );
114: tmp.leaving = false;
115: tmp.received_final_view = false;
116: impls.put(PART, tmp);
117: }
118: setImpl(tmp);
119: }
120:
121: public void becomeClient() {
122: ClientGmsImpl tmp = (ClientGmsImpl) impls.get(CLIENT);
123:
124: if (tmp == null) {
125: tmp = new ClientGmsImpl(this );
126: impls.put(CLIENT, tmp);
127: } else
128: tmp.init();
129:
130: setImpl(tmp);
131: }
132:
133: boolean haveCoordinatorRole() {
134: return impl != null && impl instanceof CoordGmsImpl;
135: }
136:
137: /**
138: * Computes the next view. Returns a copy that has <code>old_mbrs</code> and
139: * <code>suspected_mbrs</code> removed and <code>new_mbrs</code> added.
140: */
141: public View getNextView(Vector new_mbrs, Vector old_mbrs,
142: Vector suspected_mbrs) {
143: Vector members;
144: long vid;
145: View v;
146: Membership tmp_mbrs;
147: Vector mbrs_to_remove = new Vector();
148:
149: if (old_mbrs != null && old_mbrs.size() > 0)
150: for (int i = 0; i < old_mbrs.size(); i++)
151: mbrs_to_remove.addElement(old_mbrs.elementAt(i));
152: if (suspected_mbrs != null && suspected_mbrs.size() > 0)
153: for (int i = 0; i < suspected_mbrs.size(); i++)
154: if (!mbrs_to_remove.contains(suspected_mbrs
155: .elementAt(i)))
156: mbrs_to_remove.addElement(suspected_mbrs
157: .elementAt(i));
158:
159: synchronized (view_mutex) {
160: vid = Math.max(view_id.getId(), ltime) + 1;
161: ltime = vid;
162: tmp_mbrs = this .mbrs.copy();
163: tmp_mbrs.merge(new_mbrs, mbrs_to_remove);
164: members = (Vector) tmp_mbrs.getMembers().clone();
165: v = new View(local_addr, vid, members);
166: return v;
167: }
168: }
169:
170: /**
171: * Return a copy of the current membership minus the suspected members: FLUSH request is not sent
172: * to suspected members (because they won't respond, and not to joining members either.
173: * It IS sent to leaving members (before they are allowed to leave).
174: */
175: Vector computeFlushDestination(Vector suspected_mbrs) {
176: Vector ret = mbrs.getMembers(); // *copy* of current membership
177: if (suspected_mbrs != null && suspected_mbrs.size() > 0)
178: for (int i = 0; i < suspected_mbrs.size(); i++)
179: ret.removeElement(suspected_mbrs.elementAt(i));
180: return ret;
181: }
182:
183: /**
184: * Compute the destination set to which to send a VIEW_CHANGE message. This is the current
185: * members + the leaving members (old_mbrs) + the joining members (new_mbrs) - the suspected
186: * members.
187: */
188: private Vector computeViewDestination(Vector new_mbrs,
189: Vector suspected_mbrs) {
190: Vector ret = mbrs.getMembers(); // **copy* of current membership
191: Address mbr;
192:
193: // add new members
194: if (new_mbrs != null) {
195: for (int i = 0; i < new_mbrs.size(); i++) {
196: mbr = (Address) new_mbrs.elementAt(i);
197: if (!ret.contains(mbr))
198: ret.addElement(new_mbrs.elementAt(i));
199: }
200: }
201:
202: // old members are still in existing membership, don't need to add them explicitely
203:
204: // remove suspected members
205: if (suspected_mbrs != null) {
206: for (int i = 0; i < suspected_mbrs.size(); i++) {
207: mbr = (Address) suspected_mbrs.elementAt(i);
208: ret.removeElement(mbr);
209: }
210: }
211: return ret;
212: }
213:
214: /**
215: * FLUSH protocol.
216: * Send to current mbrs - suspected_mbrs (not including new_mbrs, but including old_mbr)
217: * Send TMP_VIEW event down,
218: * this allows FLUSH/NAKACK to set membership correctly
219: */
220:
221: public void flush(Vector flush_dest, Vector suspected_mbrs) {
222: Vector rebroadcast_msgs = new Vector();
223:
224: if (suspected_mbrs == null)
225: suspected_mbrs = new Vector();
226:
227: while (flush_dest.size() > 0) {
228: flush_rsp = null;
229: synchronized (flush_mutex) {
230: passDown(new Event(Event.FLUSH, flush_dest)); // send FLUSH to members in flush_dest
231: if (flush_rsp == null) {
232: try {
233: flush_mutex.wait(flush_timeout);
234: } catch (Exception e) {
235: }
236: }
237: }
238: if (flush_rsp == null) {
239: break;
240: }
241:
242: if (rebroadcast_unstable_msgs
243: && flush_rsp.unstable_msgs != null
244: && flush_rsp.unstable_msgs.size() > 0) {
245: Message m;
246: for (int i = 0; i < flush_rsp.unstable_msgs.size(); i++) {
247: m = (Message) flush_rsp.unstable_msgs.elementAt(i);
248:
249: // just add msg, NAKACK.RESEND will weed out duplicates based on
250: // <sender:id> before re-broadcasting msgs
251: rebroadcast_msgs.addElement(m);
252: }
253: }
254:
255: if (flush_rsp.result == true)
256: break;
257: else {
258: if (flush_rsp.failed_mbrs != null) {
259: for (int i = 0; i < flush_rsp.failed_mbrs.size(); i++) {
260: flush_dest.removeElement(flush_rsp.failed_mbrs
261: .elementAt(i));
262: suspected_mbrs.addElement(flush_rsp.failed_mbrs
263: .elementAt(i));
264: }
265: }
266: }
267: } // while
268: if (log.isInfoEnabled())
269: log.info("flushing completed.");
270:
271: // Rebroadcast unstable messages
272: if (rebroadcast_unstable_msgs && rebroadcast_msgs.size() > 0) {
273:
274: if (log.isInfoEnabled())
275: log.info("re-broadcasting unstable messages ("
276: + rebroadcast_msgs.size() + ')');
277: // NAKACK layer will rebroadcast the msgs (using the same seqnos assigned earlier)
278: synchronized (rebroadcast_mutex) {
279: passDown(new Event(Event.REBROADCAST_MSGS,
280: rebroadcast_msgs));
281: try {
282: rebroadcast_mutex.wait(rebroadcast_timeout);
283: } catch (Exception e) {
284: }
285: }
286: if (log.isInfoEnabled())
287: log.info("re-broadcasting messages completed");
288: }
289: }
290:
291: /**
292: * Compute a new view, given the current view, the new members and the suspected/left
293: * members. Run view update protocol to install a new view in all members (this involves
294: * casting the new view to all members). The targets for FLUSH and VIEW mcasts are
295: * computed as follows:<p>
296: * <pre>
297: * existing leaving suspected joining
298: * <p/>
299: * 1. FLUSH y y n n
300: * 2. new_view y n n y
301: * 3. tmp_view y y n y
302: * (view_dest)
303: * </pre>
304: * <p/>
305: * <ol>
306: * <li>
307: * The FLUSH is only sent to the existing and leaving members (they are the only ones that might have
308: * old messages not yet seen by the group. The suspected members would not answer anyway (because they
309: * have failed) and the joining members have certainly no old messages.
310: * <li>
311: * The new view to be installed includes the existing members plus the joining ones and
312: * excludes the leaving and suspected members.
313: * <li>
314: * A temporary view is sent down the stack as an <em>event</em>. This allows the bottom layer
315: * (e.g. UDP or TCP) to determine the members to which to send a multicast message. Compared
316: * to the new view, leaving members are <em>included</em> since they have are waiting for a
317: * view in which they are not members any longer before they leave. So, if we did not set a
318: * temporary view, joining members would not receive the view (signalling that they have been
319: * joined successfully). The temporary view is essentially the current view plus the joining
320: * members (old members are still part of the current view).
321: * </ol>
322: */
323: public void castViewChange(Vector new_mbrs, Vector old_mbrs,
324: Vector suspected_mbrs) {
325: View new_view, tmp_view;
326: ViewId new_vid;
327: Vector flush_dest = computeFlushDestination(suspected_mbrs); // members to which FLUSH/VIEW is sent
328: Vector view_dest = computeViewDestination(new_mbrs,
329: suspected_mbrs); // dest for view change
330:
331: // next view: current mbrs + new_mbrs - old_mbrs - suspected_mbrs
332: new_view = getNextView(new_mbrs, old_mbrs, suspected_mbrs);
333: new_vid = new_view.getVid();
334:
335: if (log.isInfoEnabled())
336: log.info("FLUSH phase, flush_dest: " + flush_dest
337: + "\n\tview_dest: " + view_dest + "\n\tnew_view: "
338: + new_view + '\n');
339: flush(flush_dest, suspected_mbrs);
340: if (log.isInfoEnabled())
341: log.info("FLUSH phase done");
342:
343: /* VIEW protocol. Send to current mbrs + new_mbrs + old_mbrs - suspected_mbrs. Since
344: suspected members were removed from view_dest during the previous FLUSH round(s), we
345: only need to add the new members. Send TMP_VIEW event down, this allows
346: FLUSH/NAKACK to set membership correctly */
347: view_dest = computeViewDestination(new_mbrs, suspected_mbrs);
348: tmp_view = new View(null, view_dest);
349:
350: Event view_event = new Event(Event.TMP_VIEW, tmp_view); // so the VIEW msg is sent to the correct mbrs
351: passDown(view_event); // needed e.g. by failure detector or UDP
352:
353: if (log.isInfoEnabled())
354: log.info("mcasting view {" + new_vid + ", " + view_dest
355: + '}');
356: passDown(new Event(Event.SWITCH_NAK_ACK)); // use ACK scheme for view bcast
357: Object[] args = new Object[] { new_vid, new_view.getMembers() /* these are the mbrs in the new view */};
358: MethodCall call = new MethodCall("handleViewChange", args,
359: new String[] { ViewId.class.getName(),
360: Vector.class.getName() });
361: callRemoteMethods(view_dest, // send to all members in 'view_dest'
362: call, GroupRequest.GET_ALL, view_change_timeout);
363: if (log.isInfoEnabled())
364: log.info("mcasting view completed");
365: passDown(new Event(Event.SWITCH_NAK)); // back to normal NAKs ...
366: }
367:
368: /**
369: * Assigns the new ltime. Installs view and view_id. Changes role to coordinator if necessary.
370: * Sends VIEW_CHANGE event up and down the stack.
371: */
372: public void installView(ViewId new_view, Vector mbrs) {
373: Object coord;
374: int rc;
375:
376: synchronized (view_mutex) { // serialize access to views
377: ltime = Math.max(new_view.getId(), ltime); // compute Lamport logical time
378: if (log.isInfoEnabled())
379: log.info("received view change, vid=" + new_view);
380:
381: /* Check for self-inclusion: if I'm not part of the new membership, I just discard it.
382: This ensures that messages sent in view V1 are only received by members of V1 */
383: if (checkSelfInclusion(mbrs) == false) {
384: if (log.isWarnEnabled())
385: log.warn("I'm not member of " + mbrs
386: + ", discarding");
387: return;
388: }
389:
390: if (view_id == null) {
391: view_id = (ViewId) new_view.clone();
392: } else {
393: rc = new_view.compareTo(view_id); // rc should always be a positive number
394: if (rc <= 0) { // don't accept view id lower than our own
395: if (log.isWarnEnabled())
396: log
397: .warn("received view <= current view; discarding it ! "
398: + "(view_id: "
399: + view_id
400: + ", new_view: "
401: + new_view
402: + ')');
403: return;
404: } else { // the check for vid equality was okay, assign new_view to view_id
405: if (new_view.getCoordAddress() != null) {
406: view_id = new ViewId(
407: new_view.getCoordAddress(), new_view
408: .getId());
409: } else {
410: view_id = new ViewId(view_id.getCoordAddress(),
411: new_view.getId());
412: }
413: }
414: }
415:
416: if (mbrs != null && mbrs.size() > 0)
417: this .mbrs.set(mbrs);
418:
419: // Send VIEW_CHANGE event up and down the stack:
420: Event view_event = new Event(Event.VIEW_CHANGE,
421: makeView(this .mbrs.getMembers()));
422: passDown(view_event); // needed e.g. by failure detector or UDP
423: passUp(view_event);
424:
425: coord = determineCoordinator();
426: if (coord != null && coord.equals(local_addr)) {
427: if (!haveCoordinatorRole()) // this avoids deadlock on coordinator - when suspect/join occurs simultaneously
428: becomeCoordinator();
429: } else {
430: if (haveCoordinatorRole() && !local_addr.equals(coord))
431: becomeParticipant();
432: }
433: }
434: }
435:
436: protected Address determineCoordinator() {
437: synchronized (mbrs) {
438: return mbrs != null && mbrs.size() > 0 ? (Address) mbrs
439: .elementAt(0) : null;
440: }
441: }
442:
443: /**
444: * Returns true if local_addr is member of mbrs, else false
445: */
446: protected boolean checkSelfInclusion(Vector mbrs) {
447: Object mbr;
448: if (mbrs == null)
449: return false;
450: for (int i = 0; i < mbrs.size(); i++) {
451: mbr = mbrs.elementAt(i);
452: if (mbr != null && local_addr.equals(mbr))
453: return true;
454: }
455: return false;
456: }
457:
458: public View makeView(Vector mbrs) {
459: Address coord = null;
460: long id = 0;
461:
462: if (view_id != null) {
463: coord = view_id.getCoordAddress();
464: id = view_id.getId();
465: }
466: return new View(coord, id, mbrs);
467: }
468:
469: public static View makeView(Vector mbrs, ViewId vid) {
470: Address coord = null;
471: long id = 0;
472:
473: if (vid != null) {
474: coord = vid.getCoordAddress();
475: id = vid.getId();
476: }
477: return new View(coord, id, mbrs);
478: }
479:
480: /* ------------------------- Request handler methods ----------------------------- */
481:
482: public void join(Address mbr) {
483: synchronized (impl_mutex) {
484: impl.join(mbr);
485: }
486: }
487:
488: public void leave(Address mbr) {
489: synchronized (impl_mutex) {
490: impl.leave(mbr);
491: }
492: }
493:
494: public void suspect(Address mbr) {
495: synchronized (impl_mutex) {
496: impl.suspect(mbr);
497: }
498: }
499:
500: public void merge(Vector other_coords) {
501: synchronized (impl_mutex) {
502: impl.merge(other_coords);
503: }
504: }
505:
506: public boolean handleJoin(Address mbr) {
507: synchronized (impl_mutex) {
508: return impl.handleJoin(mbr);
509: }
510: }
511:
512: public void handleLeave(Address mbr, boolean suspected) {
513: synchronized (impl_mutex) {
514: impl.handleLeave(mbr, suspected);
515: }
516: }
517:
518: public void handleViewChange(ViewId new_view, Vector mbrs) {
519: // synchronized (impl_mutex ) {
520: impl.handleViewChange(new_view, mbrs);
521: // }
522: }
523:
524: public View handleMerge(ViewId other_vid, Vector other_members) {
525: synchronized (impl_mutex) {
526: if (log.isTraceEnabled()) {
527: View v = impl.handleMerge(other_vid, other_members);
528: if (log.isInfoEnabled())
529: log.info("returning view: " + v);
530: return v;
531: }
532: return impl.handleMerge(other_vid, other_members);
533: }
534: }
535:
536: public void handleSuspect(Address mbr) {
537: synchronized (impl_mutex) {
538: impl.handleSuspect(mbr);
539: }
540: }
541:
542: /* --------------------- End of Request handler methods -------------------------- */
543:
544: boolean checkForViewEnforcer(Protocol up_protocol) {
545: String prot_name;
546:
547: if (up_protocol == null)
548: return false;
549: prot_name = up_protocol.getName();
550: if (prot_name != null && "VIEW_ENFORCER".equals(prot_name))
551: return true;
552: return checkForViewEnforcer(up_protocol.getUpProtocol());
553: }
554:
555: /**
556: * <b>Callback</b>. Called by superclass when event may be handled.<p>
557: * <b>Do not use <code>PassUp</code> in this method as the event is passed up
558: * by default by the superclass after this method returns !</b>
559: *
560: * @return boolean Defaults to true. If false, event will not be passed up the stack.
561: */
562: public boolean handleUpEvent(Event evt) {
563: switch (evt.getType()) {
564:
565: case Event.CONNECT_OK: // sent by someone else, but WE are responsible for sending this !
566: case Event.DISCONNECT_OK: // dito (e.g. sent by UDP layer)
567: return false;
568:
569: case Event.SET_LOCAL_ADDRESS:
570: local_addr = (Address) evt.getArg();
571:
572: if (print_local_addr) {
573: System.out
574: .println("\n-------------------------------------------------------\n"
575: + "GMS: address is "
576: + local_addr
577: + "\n-------------------------------------------------------");
578: }
579: return true; // pass up
580:
581: case Event.SUSPECT:
582: try {
583: event_queue.add(evt);
584: } catch (Exception e) {
585: }
586: return true; // pass up
587:
588: case Event.MERGE:
589: try {
590: event_queue.add(evt);
591: } catch (Exception e) {
592: }
593: return false; // don't pass up
594:
595: case Event.FLUSH_OK:
596: synchronized (flush_mutex) {
597: flush_rsp = (FlushRsp) evt.getArg();
598: flush_mutex.notifyAll();
599: }
600: return false; // don't pass up
601:
602: case Event.REBROADCAST_MSGS_OK:
603: synchronized (rebroadcast_mutex) {
604: rebroadcast_mutex.notifyAll();
605: }
606: return false; // don't pass up
607: }
608:
609: return impl.handleUpEvent(evt);
610: }
611:
612: /**
613: * <b>Callback</b>. Called by superclass when event may be handled.<p>
614: * <b>Do not use <code>PassDown</code> in this method as the event is passed down
615: * by default by the superclass after this method returns !</b>
616: *
617: * @return boolean Defaults to true. If false, event will not be passed down the stack.
618: */
619: public boolean handleDownEvent(Event evt) {
620: switch (evt.getType()) {
621:
622: case Event.CONNECT:
623: passDown(evt);
624: try {
625: group_addr = (String) evt.getArg();
626: } catch (ClassCastException cce) {
627: if (log.isErrorEnabled())
628: log.error("group address must "
629: + "be a string (group name) to make sense");
630: }
631: impl.join(local_addr);
632: passUp(new Event(Event.CONNECT_OK));
633: startEventHandlerThread();
634: return false; // don't pass down: was already passed down
635:
636: case Event.DISCONNECT:
637: impl.leave((Address) evt.getArg());
638: passUp(new Event(Event.DISCONNECT_OK));
639: stopEventHandlerThread();
640: initState();
641: return true; // pass down
642: }
643:
644: return impl.handleDownEvent(evt);
645: }
646:
647: // Priority handling, otherwise GMS.down(DISCONNECT) would block !
648: // Similar to FLUSH protocol
649: public void receiveDownEvent(Event evt) {
650: if (evt.getType() == Event.BLOCK_OK) {
651: passDown(evt);
652: return;
653: }
654: super .receiveDownEvent(evt);
655: }
656:
657: /**
658: * Setup the Protocol instance acording to the configuration string
659: */
660: public boolean setProperties(Properties props) {
661: String str;
662:
663: super .setProperties(props);
664: str = props.getProperty("join_timeout"); // time to wait for JOIN
665: if (str != null) {
666: join_timeout = Long.parseLong(str);
667: props.remove("join_timeout");
668: }
669:
670: str = props.getProperty("print_local_addr");
671: if (str != null) {
672: print_local_addr = Boolean.valueOf(str).booleanValue();
673: props.remove("print_local_addr");
674: }
675:
676: str = props.getProperty("view_change_timeout"); // time to wait for VIEW_CHANGE
677: if (str != null) {
678: view_change_timeout = Long.parseLong(str);
679: props.remove("view_change_timeout");
680: }
681:
682: str = props.getProperty("join_retry_timeout"); // time to wait between JOINs
683: if (str != null) {
684: join_retry_timeout = Long.parseLong(str);
685: props.remove("join_retry_timeout");
686: }
687:
688: str = props.getProperty("leave_timeout"); // time to wait until coord responds to LEAVE req.
689: if (str != null) {
690: leave_timeout = Long.parseLong(str);
691: props.remove("leave_timeout");
692: }
693:
694: str = props.getProperty("flush_timeout"); // time to wait until FLUSH completes (0=forever)
695: if (str != null) {
696: flush_timeout = Long.parseLong(str);
697: props.remove("flush_timeout");
698: }
699:
700: str = props.getProperty("rebroadcast_unstable_msgs"); // bcast unstable msgs (recvd from FLUSH)
701: if (str != null) {
702: rebroadcast_unstable_msgs = Boolean.valueOf(str)
703: .booleanValue();
704: props.remove("rebroadcast_unstable_msgs");
705: }
706:
707: str = props.getProperty("rebroadcast_timeout"); // time to wait until REBROADCAST_MSGS completes
708: if (str != null) {
709: rebroadcast_timeout = Long.parseLong(str);
710: props.remove("rebroadcast_timeout");
711: }
712:
713: str = props.getProperty("disable_initial_coord"); // allow initial mbr to become coord or not
714: if (str != null) {
715: disable_initial_coord = Boolean.valueOf(str).booleanValue();
716: props.remove("disable_initial_coord");
717: }
718:
719: if (props.size() > 0) {
720: log
721: .error("GMS.setProperties(): the following properties are not recognized: "
722: + props);
723:
724: return false;
725: }
726: return true;
727: }
728:
729: public void run() {
730: Event evt;
731:
732: while (evt_thread != null && event_queue != null) {
733: try {
734: evt = (Event) event_queue.remove();
735: switch (evt.getType()) {
736: case Event.SUSPECT:
737: impl.suspect((Address) evt.getArg());
738: break;
739: case Event.MERGE:
740: impl.merge((Vector) evt.getArg());
741: break;
742: default:
743: if (log.isErrorEnabled())
744: log
745: .error("event handler thread encountered event of type "
746: + Event.type2String(evt
747: .getType())
748: + ": not handled by me !");
749: break;
750: }
751: } catch (QueueClosedException closed) {
752: break;
753: } catch (Exception ex) {
754: if (log.isWarnEnabled())
755: log.warn("exception=" + ex);
756: }
757: }
758: }
759:
760: /* ------------------------------- Private Methods --------------------------------- */
761:
762: private void initState() {
763: becomeClient();
764: impl.init();
765: view_id = null;
766: if (mbrs != null)
767: mbrs.clear();
768: }
769:
770: private void startEventHandlerThread() {
771: if (event_queue == null)
772: event_queue = new Queue();
773: if (evt_thread == null) {
774: evt_thread = new Thread(this , "GMS.EventHandlerThread");
775: evt_thread.setDaemon(true);
776: evt_thread.start();
777: }
778: }
779:
780: private void stopEventHandlerThread() {
781: if (evt_thread != null) {
782: event_queue.close(false);
783: event_queue = null;
784: evt_thread = null;
785: return;
786: }
787:
788: if (event_queue != null) {
789: event_queue.close(false);
790: event_queue = null;
791: }
792: }
793:
794: }
|