001: package com.tc.net.groups;
002:
003: import org.apache.catalina.tribes.Channel;
004: import org.apache.catalina.tribes.ChannelException;
005: import org.apache.catalina.tribes.ChannelListener;
006: import org.apache.catalina.tribes.Member;
007: import org.apache.catalina.tribes.MembershipListener;
008: import org.apache.catalina.tribes.ChannelException.FaultyMember;
009: import org.apache.catalina.tribes.group.ChannelCoordinator;
010: import org.apache.catalina.tribes.group.GroupChannel;
011: import org.apache.catalina.tribes.group.interceptors.OrderInterceptor;
012: import org.apache.catalina.tribes.group.interceptors.StaticMembershipInterceptor;
013: import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector;
014: import org.apache.catalina.tribes.group.interceptors.TcpPingInterceptor;
015: import org.apache.catalina.tribes.membership.StaticMember;
016: import org.apache.catalina.tribes.transport.DataSender;
017: import org.apache.catalina.tribes.transport.ReceiverBase;
018: import org.apache.catalina.tribes.transport.ReplicationTransmitter;
019:
020: import com.tc.async.api.EventContext;
021: import com.tc.async.api.Sink;
022: import com.tc.logging.TCLogger;
023: import com.tc.logging.TCLogging;
024: import com.tc.properties.TCPropertiesImpl;
025: import com.tc.util.Assert;
026: import com.tc.util.Conversion;
027: import com.tc.util.concurrent.CopyOnWriteArrayMap;
028:
029: import java.io.IOException;
030: import java.io.Serializable;
031: import java.lang.reflect.Constructor;
032: import java.lang.reflect.Modifier;
033: import java.util.ArrayList;
034: import java.util.Arrays;
035: import java.util.HashSet;
036: import java.util.Hashtable;
037: import java.util.Iterator;
038: import java.util.List;
039: import java.util.Map;
040: import java.util.Properties;
041: import java.util.concurrent.ConcurrentHashMap;
042: import java.util.concurrent.CopyOnWriteArrayList;
043:
044: public class TribesGroupManager implements GroupManager,
045: ChannelListener, MembershipListener {
046:
047: private static final String L2_NHA = "l2.nha";
048: private static final String SEND_TIMEOUT_PROP = "send.timeout.millis";
049: private static final String USE_MCAST = "mcast.enabled";
050: private static final String USE_ORDER_INTERCEPTOR = "tribes.orderinterceptor.enabled";
051: private static final int SEND_OPTIONS_NO_ACK = 0x00;
052: private static final String TRIBES_FAILURE_TIMEOUT = "tribes.failuredetector.millis";
053:
054: private static final TCLogger logger = TCLogging
055: .getLogger(TribesGroupManager.class);
056:
057: private static final boolean useMcast = TCPropertiesImpl
058: .getProperties().getPropertiesFor(L2_NHA).getBoolean(
059: USE_MCAST);
060: // TODO::FIXME:: Its disabled since it causes issues (exposed using TIMS test)
061: private static final boolean useOrderInterceptor = TCPropertiesImpl
062: .getProperties().getPropertiesFor(L2_NHA).getBoolean(
063: USE_ORDER_INTERCEPTOR);
064:
065: private final GroupChannel group;
066: private TcpFailureDetector failuredetector;
067: private Member this Member;
068: private NodeID this NodeID;
069:
070: private final CopyOnWriteArrayList<GroupEventsListener> groupListeners = new CopyOnWriteArrayList<GroupEventsListener>();
071: // private final Map<NodeID, Member> nodes = new CopyOnWriteArrayMap<NodeID, Member>();
072: private final CopyOnWriteArrayMap nodes = new CopyOnWriteArrayMap(
073: new CopyOnWriteArrayMap.TypedArrayFactory() {
074: public Object[] createTypedArray(int size) {
075: return new MemberNode[size];
076: }
077: });
078: private final Map<String, GroupMessageListener> messageListeners = new ConcurrentHashMap<String, GroupMessageListener>();
079: private final Map<MessageID, GroupResponse> pendingRequests = new Hashtable<MessageID, GroupResponse>();
080:
081: private boolean stopped = false;
082: private boolean debug = false;
083: private ZapNodeRequestProcessor zapNodeRequestProcessor = new DefaultZapNodeRequestProcessor(
084: logger);
085:
086: public TribesGroupManager() {
087: group = new GroupChannel();
088: registerForMessages(GroupZapNodeMessage.class,
089: new ZapNodeRequestRouter());
090: }
091:
092: public NodeID join(final Node this Node, final Node[] allNodes)
093: throws GroupException {
094: if (useMcast)
095: return joinMcast();
096: else
097: return joinStatic(this Node, allNodes);
098:
099: }
100:
101: public synchronized void stop() throws GroupException {
102: try {
103: group.stop(Channel.DEFAULT);
104: } catch (ChannelException e) {
105: logger.error(e);
106: throw new GroupException(e);
107: } finally {
108: stopped = true;
109: }
110: }
111:
112: private void commonGroupChanelConfig() {
113: // Configure send timeout
114: ReplicationTransmitter transmitter = (ReplicationTransmitter) group
115: .getChannelSender();
116: DataSender sender = transmitter.getTransport();
117: final long l = TCPropertiesImpl.getProperties()
118: .getPropertiesFor(L2_NHA).getLong(SEND_TIMEOUT_PROP);
119: sender.setTimeout(l);
120: ChannelCoordinator cc = (ChannelCoordinator) group.getNext();
121: final Properties mcastProps = new Properties();
122: TCPropertiesImpl.getProperties().getPropertiesFor(
123: "l2.nha.tribes.mcast").addAllPropertiesTo(mcastProps);
124: cc.getMembershipService().setProperties(mcastProps);
125: // add listeners
126: group.addMembershipListener(this );
127: group.addChannelListener(this );
128: }
129:
130: protected NodeID joinStatic(final Node this Node,
131: final Node[] allNodes) throws GroupException {
132: try {
133: // set up static nodes
134: StaticMembershipInterceptor smi = setupStaticMembers(
135: this Node, allNodes);
136:
137: // set up receiver
138: ReceiverBase receiver = (ReceiverBase) group
139: .getChannelReceiver();
140: receiver.setAddress(this Node.getHost());
141: receiver.setPort(this Node.getPort());
142: receiver.setAutoBind(0);
143: receiver.setDirect(false);
144:
145: commonGroupChanelConfig();
146: TcpPingInterceptor tcp = new TcpPingInterceptor();
147: tcp.setUseThread(true);
148: tcp.setInterval(1000);
149:
150: // set up failure detector
151: final long ms = TCPropertiesImpl.getProperties()
152: .getPropertiesFor(L2_NHA).getLong(
153: TRIBES_FAILURE_TIMEOUT);
154: failuredetector = new TcpFailureDetector();
155: failuredetector.setConnectTimeout(ms);
156:
157: if (useOrderInterceptor) {
158: OrderInterceptor oi = new OrderInterceptor();
159: oi.setExpire(60000);
160: group.addInterceptor(oi);
161: } else {
162: // XXX::FIXME::TODO:: These settings are added since OrderInterceptor has issues and we want to maintain message
163: // ordering
164: receiver.setMaxThreads(1);
165: receiver.setMinThreads(1);
166: }
167:
168: // start services
169: group.addInterceptor(tcp);
170: group.addInterceptor(failuredetector);
171: group.addInterceptor(smi);
172: group.start(Channel.SND_RX_SEQ | Channel.SND_TX_SEQ);
173: return this .this NodeID;
174: } catch (ChannelException e) {
175: logger.error(e);
176: throw new GroupException(e);
177: }
178: }
179:
180: protected NodeID joinMcast() throws GroupException {
181: try {
182: commonGroupChanelConfig();
183:
184: ReceiverBase receiver = (ReceiverBase) group
185: .getChannelReceiver();
186: receiver.setDirect(false);
187:
188: if (useOrderInterceptor) {
189: OrderInterceptor oi = new OrderInterceptor();
190: oi.setExpire(60000);
191: group.addInterceptor(oi);
192: } else {
193: // XXX::FIXME::TODO:: These settings are added since OrderInterceptor has issues and we want to maintain message
194: // ordering
195: receiver.setMaxThreads(1);
196: receiver.setMinThreads(1);
197: }
198:
199: group.start(Channel.DEFAULT);
200: this .this Member = group.getLocalMember(false);
201: this .this NodeID = makeNodeIDFrom(this .this Member);
202: return this .this NodeID;
203: } catch (ChannelException e) {
204: logger.error(e);
205: throw new GroupException(e);
206: }
207: }
208:
209: /**
210: * XXX:: This method is a temporary hack to make TribesGroupManager work. Without this for static members, we get
211: * different UniqueID for the same members, one in nodeJoined event and one in the messages. Until that is fixed, the
212: * NodeID is going to be based on the host and port for static members.
213: */
214: private static NodeID makeNodeIDFrom(Member member) {
215: if (useMcast) {
216: return new NodeIDImpl(member.getName(), member
217: .getUniqueId());
218: } else {
219: byte[] host = member.getHost();
220: int port = member.getPort();
221: if (port < 0) {
222: port = member.getSecurePort();
223: if (port < 0) {
224: // Ports shouldn't be 0 either, but in our test framework when there is only one in-process active, it could
225: // be.
226: throw new AssertionError("Invalid port number : "
227: + port + " for host "
228: + Conversion.bytesToHex(host));
229: }
230: }
231: int length = host.length;
232: byte uid[] = new byte[length + 4];
233: System.arraycopy(host, 0, uid, 0, length);
234: Conversion.writeInt(port, uid, length);
235: return new NodeIDImpl(member.getName(), uid);
236: }
237: }
238:
239: private StaticMembershipInterceptor setupStaticMembers(
240: final Node this Node, final Node[] allNodes)
241: throws AssertionError {
242: StaticMembershipInterceptor smi = new StaticMembershipInterceptor();
243: for (int i = 0; i < allNodes.length; i++) {
244: final Node node = allNodes[i];
245: if (this Node.equals(node))
246: continue;
247: StaticMember sm = makeMember(node);
248: if (sm == null)
249: continue;
250: smi.addStaticMember(sm);
251: }
252: // set up this node
253: this Member = makeMember(this Node);
254: if (this Member == null) {
255: throw new AssertionError(
256: "Error setting up this group member: " + this Node);
257: }
258: this .this NodeID = makeNodeIDFrom(this Member);
259: smi.setLocalMember(this Member);
260: return smi;
261: }
262:
263: public NodeID getLocalNodeID() throws GroupException {
264: if (this .this NodeID == null) {
265: throw new GroupException(
266: "Node hasnt joined the group yet !");
267: }
268: return this .this NodeID;
269: }
270:
271: private static void validateExternalizableClass(
272: Class<AbstractGroupMessage> clazz) {
273: String name = clazz.getName();
274: try {
275: Constructor<AbstractGroupMessage> cons = clazz
276: .getDeclaredConstructor(new Class[0]);
277: if ((cons.getModifiers() & Modifier.PUBLIC) == 0) {
278: //
279: throw new AssertionError(name
280: + " : public no arg constructor not found");
281: }
282: } catch (NoSuchMethodException ex) {
283: throw new AssertionError(name
284: + " : public no arg constructor not found");
285: }
286: }
287:
288: private static void validateEventClass(Class<?> clazz) {
289: if (!EventContext.class.isAssignableFrom(clazz)) {
290: throw new AssertionError(clazz
291: + " does not implement interface "
292: + EventContext.class.getName());
293: }
294: }
295:
296: @SuppressWarnings("unchecked")
297: public void registerForMessages(Class msgClass,
298: GroupMessageListener listener) {
299: validateExternalizableClass(msgClass);
300: GroupMessageListener prev = messageListeners.put(msgClass
301: .getName(), listener);
302: if (prev != null) {
303: logger.warn("Previous listener removed : " + prev);
304: }
305: }
306:
307: @SuppressWarnings("unchecked")
308: public void routeMessages(Class msgClass, Sink sink) {
309: validateEventClass(msgClass);
310: registerForMessages(msgClass, new RouteGroupMessagesToSink(
311: msgClass.getName(), sink));
312: }
313:
314: public boolean accept(Serializable msg, Member sender) {
315: if (stopped || !(msg instanceof GroupMessage)) {
316: logger
317: .warn("Rejecting message : "
318: + msg
319: + " from "
320: + sender.getName()
321: + " since its not Group Message or TribesGroupManager is stopped : "
322: + stopped);
323: return false;
324: }
325: return true;
326: }
327:
328: public void messageReceived(Serializable msg, Member sender) {
329: GroupMessage gmsg = (GroupMessage) msg;
330: if (debug) {
331: logger.info(this .this NodeID + " recd msg "
332: + gmsg.getMessageID() + " From " + sender.getName()
333: + " Msg : " + msg);
334: }
335: MessageID requestID = gmsg.inResponseTo();
336: NodeID from = makeNodeIDFrom(sender);
337: MemberNode inode = (MemberNode) nodes.get(from);
338: if (inode == null) {
339: String warn = "Message from non-existing member " + sender
340: + " . Adding this node to nodes = " + nodes;
341: logger.warn(warn);
342: // XXX:: Sometimes messages arrive before memberAdded event. So we are faking it. Also @see comment below
343: from = basicMemberAdded(from, sender);
344: } else {
345: // We always maintain reference equality to all NodeIDs that is exposed to Application layer for a particular
346: // instance of the server. This is done so that when Zap node request comes in, we can identify if it is for the
347: // current instance of the remote node or not. When send fails with timeout (like in Solaris boxes) and the active
348: // tries to Zap node, we don't want to zap the wrong instance (i.e. the server might have crashed and come back
349: // already)
350: from = inode.getNodeID();
351: }
352: gmsg.setMessageOrginator(from);
353: if (requestID.isNull()
354: || !notifyPendingRequests(requestID, gmsg, sender)) {
355: fireMessageReceivedEvent(from, gmsg);
356: }
357: }
358:
359: private static StaticMember makeMember(final Node node) {
360: try {
361: StaticMember rv = new StaticMember(node.getHost(), node
362: .getPort(), 0);
363: // rv.setUniqueId(UUIDGenerator.randomUUID(true));
364: return rv;
365: } catch (IOException e) {
366: logger.error("Error creating group member", e);
367: return null;
368: }
369: }
370:
371: private boolean notifyPendingRequests(MessageID requestID,
372: GroupMessage gmsg, Member sender) {
373: GroupResponseImpl response = (GroupResponseImpl) pendingRequests
374: .get(requestID);
375: if (response != null) {
376: response.addResponseFrom(sender, gmsg);
377: return true;
378: }
379: return false;
380: }
381:
382: private void fireMessageReceivedEvent(NodeID from, GroupMessage msg) {
383: GroupMessageListener listener = messageListeners.get(msg
384: .getClass().getName());
385: if (listener != null) {
386: listener.messageReceived(from, msg);
387: } else {
388: String errorMsg = "No Route for " + msg + " from " + from;
389: logger.error(errorMsg);
390: throw new AssertionError(errorMsg);
391: }
392:
393: }
394:
395: public void registerForGroupEvents(GroupEventsListener listener) {
396: groupListeners.add(listener);
397: }
398:
399: public void memberAdded(Member member) {
400: if (debug) {
401: logger.info("memberAdded -> name=" + member.getName()
402: + ", uid="
403: + Conversion.bytesToHex(member.getUniqueId()));
404: }
405: NodeID newNode = makeNodeIDFrom(member);
406: basicMemberAdded(newNode, member);
407: }
408:
409: private NodeID basicMemberAdded(NodeID newNode, Member member) {
410: MemberNode inode;
411: synchronized (nodes) {
412: inode = (MemberNode) nodes.get(newNode);
413: if (inode == null) {
414: nodes.put(newNode, new MemberNode(newNode, member));
415: } else {
416: logger
417: .warn("Member Added Event called for : "
418: + newNode
419: + " while it is still present in the list of nodes : "
420: + inode.getMember() + " : " + nodes);
421: if (!inode.getMember().equals(member)) {
422: logger.error("Old Member : " + inode.getMember()
423: + " NOT Equal to New one " + member);
424: }
425: return inode.getNodeID();
426: }
427: }
428: fireNodeEvent(newNode, true);
429: return newNode;
430: }
431:
432: private void fireNodeEvent(NodeID newNode, boolean joined) {
433: if (debug) {
434: logger.info("fireNodeEvent: joined = " + joined
435: + ", node = " + newNode);
436: }
437: Iterator<GroupEventsListener> i = groupListeners.iterator();
438: while (i.hasNext()) {
439: GroupEventsListener listener = i.next();
440: if (joined) {
441: listener.nodeJoined(newNode);
442: } else {
443: listener.nodeLeft(newNode);
444: }
445: }
446: }
447:
448: public void memberDisappeared(Member member) {
449: if (debug) {
450: logger.info("memberDisappeared -> name=" + member.getName()
451: + ", uid="
452: + Conversion.bytesToHex(member.getUniqueId()));
453: }
454: NodeID node = makeNodeIDFrom(member);
455: MemberNode inode = (MemberNode) nodes.remove(node);
456: if (inode != null) {
457: // Make sure that all external application layer sees the same NodeID instance always
458: fireNodeEvent(inode.getNodeID(), false);
459: } else {
460: logger
461: .warn("Member Disappered Event called for : "
462: + node
463: + " while it is not present in the list of nodes : "
464: + nodes);
465: }
466: notifyAnyPendingRequests(member);
467: }
468:
469: private void notifyAnyPendingRequests(Member member) {
470: synchronized (pendingRequests) {
471: for (Iterator<GroupResponse> i = pendingRequests.values()
472: .iterator(); i.hasNext();) {
473: GroupResponseImpl response = (GroupResponseImpl) i
474: .next();
475: response.notifyMemberDead(member);
476: }
477: }
478: }
479:
480: public void sendAll(GroupMessage msg) throws GroupException {
481: if (debug) {
482: logger.info(this .this NodeID + " : Sending to ALL : "
483: + msg.getMessageID());
484: }
485: try {
486: Member m[] = getCurrentMembers();
487: if (m.length > 0) {
488: group.send(m, msg, SEND_OPTIONS_NO_ACK);
489: }
490: } catch (ChannelException e) {
491: throw new GroupException(e);
492: }
493: }
494:
495: // TODO:: This method can be optimized by caching members;
496: private Member[] getCurrentMembers() {
497: // return group.getMembers();
498: MemberNode[] inodes = (MemberNode[]) nodes.valuesToArray();
499: Member[] members = new Member[inodes.length];
500: for (int i = 0; i < members.length; i++) {
501: members[i] = inodes[i].getMember();
502: }
503: return members;
504: }
505:
506: public GroupResponse sendAllAndWaitForResponse(GroupMessage msg)
507: throws GroupException {
508: if (debug) {
509: logger.info(this .this NodeID
510: + " : Sending to ALL and Waiting for Response : "
511: + msg.getMessageID());
512: }
513: GroupResponseImpl groupResponse = new GroupResponseImpl();
514: MessageID msgID = msg.getMessageID();
515: GroupResponse old = pendingRequests.put(msgID, groupResponse);
516: Assert.assertNull(old);
517: groupResponse.sendTo(group, msg, getCurrentMembers());
518: groupResponse.waitForAllResponses();
519: pendingRequests.remove(msgID);
520: return groupResponse;
521: }
522:
523: public void sendTo(NodeID node, GroupMessage msg)
524: throws GroupException {
525: if (debug) {
526: logger.info(this .this NodeID + " : Sending to : " + node
527: + " msg " + msg.getMessageID());
528: }
529: MemberNode inode = (MemberNode) nodes.get(node);
530: if (inode != null) {
531: try {
532: group.send(new Member[] { inode.getMember() }, msg,
533: SEND_OPTIONS_NO_ACK);
534: } catch (ChannelException e) {
535: throw new GroupException(e);
536: }
537: } else {
538: String error = "Msg sent to non-exisitent Node : Node "
539: + node + ". Msg : " + msg;
540: logger.error(error);
541: throw new GroupException(error);
542: }
543: }
544:
545: public GroupMessage sendToAndWaitForResponse(NodeID nodeID,
546: GroupMessage msg) throws GroupException {
547: if (debug) {
548: logger.info(this .this NodeID + " : Sending to " + nodeID
549: + " and Waiting for Response : "
550: + msg.getMessageID());
551: }
552: GroupResponseImpl groupResponse = new GroupResponseImpl();
553: MessageID msgID = msg.getMessageID();
554: MemberNode inode = (MemberNode) nodes.get(nodeID);
555: if (inode != null) {
556: Member to[] = new Member[1];
557: to[0] = inode.getMember();
558: GroupResponse old = pendingRequests.put(msgID,
559: groupResponse);
560: Assert.assertNull(old);
561: groupResponse.sendTo(group, msg, to);
562: groupResponse.waitForAllResponses();
563: pendingRequests.remove(msgID);
564: } else {
565: String errorMsg = "Node " + nodeID
566: + " not present in the group. Ignoring Message : "
567: + msg;
568: logger.error(errorMsg);
569: throw new GroupException(errorMsg);
570: }
571: return groupResponse.getResponse(nodeID);
572: }
573:
574: public void setZapNodeRequestProcessor(
575: ZapNodeRequestProcessor processor) {
576: this .zapNodeRequestProcessor = processor;
577: }
578:
579: public void zapNode(NodeID nodeID, int type, String reason) {
580: MemberNode inode = (MemberNode) nodes.get(nodeID);
581: if (inode == null) {
582: logger
583: .warn("Ignoring Zap node request since Member is null");
584: } else if (inode.getNodeID() != nodeID) {
585: logger
586: .warn("Ignoring Zap node request since the Node ID for zapNode request is not reference equal to the one in the internal list. "
587: + " This probably means that zap node request is meant for the previous instance of the server. NodeID "
588: + nodeID + " INode = " + inode);
589: } else if (!zapNodeRequestProcessor
590: .acceptOutgoingZapNodeRequest(nodeID, type, reason)) {
591: logger.warn("Ignoreing Zap node request since "
592: + zapNodeRequestProcessor + " asked us to : "
593: + nodeID + " type = " + type + " reason = "
594: + reason);
595: } else {
596: long weights[] = zapNodeRequestProcessor
597: .getCurrentNodeWeights();
598: logger.warn("Zapping node : " + nodeID + " type = " + type
599: + " reason = " + reason + " my weight = "
600: + Arrays.toString(weights));
601: GroupMessage msg = GroupZapNodeMessageFactory
602: .createGroupZapNodeMessage(type, reason, weights);
603: try {
604: sendTo(nodeID, msg);
605: } catch (GroupException e) {
606: logger.error("Error sending ZapNode Request to "
607: + nodeID + " msg = " + msg);
608: }
609: logger.warn("Removing member " + inode + " from group");
610: memberDisappeared(inode.getMember());
611: }
612: }
613:
614: private static class GroupResponseImpl implements GroupResponse {
615:
616: HashSet<NodeID> waitFor = new HashSet<NodeID>();
617: List<GroupMessage> responses = new ArrayList<GroupMessage>();
618:
619: public synchronized List<GroupMessage> getResponses() {
620: Assert.assertTrue(waitFor.isEmpty());
621: return responses;
622: }
623:
624: public synchronized GroupMessage getResponse(NodeID nodeID) {
625: Assert.assertTrue(waitFor.isEmpty());
626: for (Iterator<GroupMessage> i = responses.iterator(); i
627: .hasNext();) {
628: GroupMessage msg = i.next();
629: if (nodeID.equals(msg.messageFrom()))
630: return msg;
631: }
632: return null;
633: }
634:
635: public void sendTo(GroupChannel group, GroupMessage msg,
636: Member[] m) {
637: try {
638: if (m.length > 0) {
639: setUpWaitFor(m);
640: group.send(m, msg, SEND_OPTIONS_NO_ACK);
641: }
642: } catch (ChannelException e) {
643: logger.error("Error sending msg : " + msg, e);
644: reconsileWaitFor(e);
645: }
646: }
647:
648: private synchronized void setUpWaitFor(Member[] m) {
649: for (int i = 0; i < m.length; i++) {
650: waitFor.add(makeNodeIDFrom(m[i]));
651: }
652: }
653:
654: public synchronized void addResponseFrom(Member sender,
655: GroupMessage gmsg) {
656: if (!waitFor.remove(makeNodeIDFrom(sender))) {
657: String message = "Recd response from a member not in list : "
658: + sender
659: + " : waiting For : "
660: + waitFor
661: + " msg : " + gmsg;
662: logger.error(message);
663: throw new AssertionError(message);
664: }
665: responses.add(gmsg);
666: notifyAll();
667: }
668:
669: public synchronized void notifyMemberDead(Member member) {
670: waitFor.remove(makeNodeIDFrom(member));
671: notifyAll();
672: }
673:
674: public synchronized void waitForAllResponses()
675: throws GroupException {
676: int count = 0;
677: while (!waitFor.isEmpty()) {
678: try {
679: this .wait(5000);
680: if (++count > 1) {
681: logger.warn("Still waiting for response from "
682: + waitFor + ". Count = " + count);
683: }
684: } catch (InterruptedException e) {
685: throw new GroupException(e);
686: }
687: }
688: }
689:
690: private synchronized void reconsileWaitFor(ChannelException e) {
691: FaultyMember fm[] = e.getFaultyMembers();
692: for (int i = 0; i < fm.length; i++) {
693: logger.warn("Removing faulty Member " + fm[i]
694: + " from list");
695: waitFor.remove(makeNodeIDFrom(fm[i].getMember()));
696: }
697: logger.info("Current waiting members = " + waitFor);
698: }
699: }
700:
701: private static final class MemberNode {
702:
703: private final NodeID nodeID;
704: private final Member member;
705:
706: public MemberNode(NodeID nodeID, Member member) {
707: this .nodeID = nodeID;
708: this .member = member;
709: }
710:
711: public NodeID getNodeID() {
712: return nodeID;
713: }
714:
715: public Member getMember() {
716: return member;
717: }
718:
719: public String toString() {
720: return "[ " + nodeID + " => " + member + " ]";
721: }
722:
723: }
724:
725: private final class ZapNodeRequestRouter implements
726: GroupMessageListener {
727:
728: public void messageReceived(NodeID fromNode, GroupMessage msg) {
729: GroupZapNodeMessage zapMsg = (GroupZapNodeMessage) msg;
730: zapNodeRequestProcessor.incomingZapNodeRequest(msg
731: .messageFrom(), zapMsg.getZapNodeType(), zapMsg
732: .getReason(), zapMsg.getWeights());
733: }
734: }
735: }
|