Source Code Cross Referenced for TribesGroupManager.java in  » Net » Terracotta » com » tc » net » groups » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » Terracotta » com.tc.net.groups 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        package com.tc.net.groups;
002:
003:        import org.apache.catalina.tribes.Channel;
004:        import org.apache.catalina.tribes.ChannelException;
005:        import org.apache.catalina.tribes.ChannelListener;
006:        import org.apache.catalina.tribes.Member;
007:        import org.apache.catalina.tribes.MembershipListener;
008:        import org.apache.catalina.tribes.ChannelException.FaultyMember;
009:        import org.apache.catalina.tribes.group.ChannelCoordinator;
010:        import org.apache.catalina.tribes.group.GroupChannel;
011:        import org.apache.catalina.tribes.group.interceptors.OrderInterceptor;
012:        import org.apache.catalina.tribes.group.interceptors.StaticMembershipInterceptor;
013:        import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector;
014:        import org.apache.catalina.tribes.group.interceptors.TcpPingInterceptor;
015:        import org.apache.catalina.tribes.membership.StaticMember;
016:        import org.apache.catalina.tribes.transport.DataSender;
017:        import org.apache.catalina.tribes.transport.ReceiverBase;
018:        import org.apache.catalina.tribes.transport.ReplicationTransmitter;
019:
020:        import com.tc.async.api.EventContext;
021:        import com.tc.async.api.Sink;
022:        import com.tc.logging.TCLogger;
023:        import com.tc.logging.TCLogging;
024:        import com.tc.properties.TCPropertiesImpl;
025:        import com.tc.util.Assert;
026:        import com.tc.util.Conversion;
027:        import com.tc.util.concurrent.CopyOnWriteArrayMap;
028:
029:        import java.io.IOException;
030:        import java.io.Serializable;
031:        import java.lang.reflect.Constructor;
032:        import java.lang.reflect.Modifier;
033:        import java.util.ArrayList;
034:        import java.util.Arrays;
035:        import java.util.HashSet;
036:        import java.util.Hashtable;
037:        import java.util.Iterator;
038:        import java.util.List;
039:        import java.util.Map;
040:        import java.util.Properties;
041:        import java.util.concurrent.ConcurrentHashMap;
042:        import java.util.concurrent.CopyOnWriteArrayList;
043:
044:        public class TribesGroupManager implements  GroupManager,
045:                ChannelListener, MembershipListener {
046:
047:            private static final String L2_NHA = "l2.nha";
048:            private static final String SEND_TIMEOUT_PROP = "send.timeout.millis";
049:            private static final String USE_MCAST = "mcast.enabled";
050:            private static final String USE_ORDER_INTERCEPTOR = "tribes.orderinterceptor.enabled";
051:            private static final int SEND_OPTIONS_NO_ACK = 0x00;
052:            private static final String TRIBES_FAILURE_TIMEOUT = "tribes.failuredetector.millis";
053:
054:            private static final TCLogger logger = TCLogging
055:                    .getLogger(TribesGroupManager.class);
056:
057:            private static final boolean useMcast = TCPropertiesImpl
058:                    .getProperties().getPropertiesFor(L2_NHA).getBoolean(
059:                            USE_MCAST);
060:            // TODO::FIXME:: Its disabled since it causes issues (exposed using TIMS test)
061:            private static final boolean useOrderInterceptor = TCPropertiesImpl
062:                    .getProperties().getPropertiesFor(L2_NHA).getBoolean(
063:                            USE_ORDER_INTERCEPTOR);
064:
065:            private final GroupChannel group;
066:            private TcpFailureDetector failuredetector;
067:            private Member this Member;
068:            private NodeID this NodeID;
069:
070:            private final CopyOnWriteArrayList<GroupEventsListener> groupListeners = new CopyOnWriteArrayList<GroupEventsListener>();
071:            // private final Map<NodeID, Member> nodes = new CopyOnWriteArrayMap<NodeID, Member>();
072:            private final CopyOnWriteArrayMap nodes = new CopyOnWriteArrayMap(
073:                    new CopyOnWriteArrayMap.TypedArrayFactory() {
074:                        public Object[] createTypedArray(int size) {
075:                            return new MemberNode[size];
076:                        }
077:                    });
078:            private final Map<String, GroupMessageListener> messageListeners = new ConcurrentHashMap<String, GroupMessageListener>();
079:            private final Map<MessageID, GroupResponse> pendingRequests = new Hashtable<MessageID, GroupResponse>();
080:
081:            private boolean stopped = false;
082:            private boolean debug = false;
083:            private ZapNodeRequestProcessor zapNodeRequestProcessor = new DefaultZapNodeRequestProcessor(
084:                    logger);
085:
086:            public TribesGroupManager() {
087:                group = new GroupChannel();
088:                registerForMessages(GroupZapNodeMessage.class,
089:                        new ZapNodeRequestRouter());
090:            }
091:
092:            public NodeID join(final Node this Node, final Node[] allNodes)
093:                    throws GroupException {
094:                if (useMcast)
095:                    return joinMcast();
096:                else
097:                    return joinStatic(this Node, allNodes);
098:
099:            }
100:
101:            public synchronized void stop() throws GroupException {
102:                try {
103:                    group.stop(Channel.DEFAULT);
104:                } catch (ChannelException e) {
105:                    logger.error(e);
106:                    throw new GroupException(e);
107:                } finally {
108:                    stopped = true;
109:                }
110:            }
111:
112:            private void commonGroupChanelConfig() {
113:                // Configure send timeout
114:                ReplicationTransmitter transmitter = (ReplicationTransmitter) group
115:                        .getChannelSender();
116:                DataSender sender = transmitter.getTransport();
117:                final long l = TCPropertiesImpl.getProperties()
118:                        .getPropertiesFor(L2_NHA).getLong(SEND_TIMEOUT_PROP);
119:                sender.setTimeout(l);
120:                ChannelCoordinator cc = (ChannelCoordinator) group.getNext();
121:                final Properties mcastProps = new Properties();
122:                TCPropertiesImpl.getProperties().getPropertiesFor(
123:                        "l2.nha.tribes.mcast").addAllPropertiesTo(mcastProps);
124:                cc.getMembershipService().setProperties(mcastProps);
125:                // add listeners
126:                group.addMembershipListener(this );
127:                group.addChannelListener(this );
128:            }
129:
130:            protected NodeID joinStatic(final Node this Node,
131:                    final Node[] allNodes) throws GroupException {
132:                try {
133:                    // set up static nodes
134:                    StaticMembershipInterceptor smi = setupStaticMembers(
135:                            this Node, allNodes);
136:
137:                    // set up receiver
138:                    ReceiverBase receiver = (ReceiverBase) group
139:                            .getChannelReceiver();
140:                    receiver.setAddress(this Node.getHost());
141:                    receiver.setPort(this Node.getPort());
142:                    receiver.setAutoBind(0);
143:                    receiver.setDirect(false);
144:
145:                    commonGroupChanelConfig();
146:                    TcpPingInterceptor tcp = new TcpPingInterceptor();
147:                    tcp.setUseThread(true);
148:                    tcp.setInterval(1000);
149:
150:                    // set up failure detector
151:                    final long ms = TCPropertiesImpl.getProperties()
152:                            .getPropertiesFor(L2_NHA).getLong(
153:                                    TRIBES_FAILURE_TIMEOUT);
154:                    failuredetector = new TcpFailureDetector();
155:                    failuredetector.setConnectTimeout(ms);
156:
157:                    if (useOrderInterceptor) {
158:                        OrderInterceptor oi = new OrderInterceptor();
159:                        oi.setExpire(60000);
160:                        group.addInterceptor(oi);
161:                    } else {
162:                        // XXX::FIXME::TODO:: These settings are added since OrderInterceptor has issues and we want to maintain message
163:                        // ordering
164:                        receiver.setMaxThreads(1);
165:                        receiver.setMinThreads(1);
166:                    }
167:
168:                    // start services
169:                    group.addInterceptor(tcp);
170:                    group.addInterceptor(failuredetector);
171:                    group.addInterceptor(smi);
172:                    group.start(Channel.SND_RX_SEQ | Channel.SND_TX_SEQ);
173:                    return this .this NodeID;
174:                } catch (ChannelException e) {
175:                    logger.error(e);
176:                    throw new GroupException(e);
177:                }
178:            }
179:
180:            protected NodeID joinMcast() throws GroupException {
181:                try {
182:                    commonGroupChanelConfig();
183:
184:                    ReceiverBase receiver = (ReceiverBase) group
185:                            .getChannelReceiver();
186:                    receiver.setDirect(false);
187:
188:                    if (useOrderInterceptor) {
189:                        OrderInterceptor oi = new OrderInterceptor();
190:                        oi.setExpire(60000);
191:                        group.addInterceptor(oi);
192:                    } else {
193:                        // XXX::FIXME::TODO:: These settings are added since OrderInterceptor has issues and we want to maintain message
194:                        // ordering
195:                        receiver.setMaxThreads(1);
196:                        receiver.setMinThreads(1);
197:                    }
198:
199:                    group.start(Channel.DEFAULT);
200:                    this .this Member = group.getLocalMember(false);
201:                    this .this NodeID = makeNodeIDFrom(this .this Member);
202:                    return this .this NodeID;
203:                } catch (ChannelException e) {
204:                    logger.error(e);
205:                    throw new GroupException(e);
206:                }
207:            }
208:
209:            /**
210:             * XXX:: This method is a temporary hack to make TribesGroupManager work. Without this for static members, we get
211:             * different UniqueID for the same members, one in nodeJoined event and one in the messages. Until that is fixed, the
212:             * NodeID is going to be based on the host and port for static members.
213:             */
214:            private static NodeID makeNodeIDFrom(Member member) {
215:                if (useMcast) {
216:                    return new NodeIDImpl(member.getName(), member
217:                            .getUniqueId());
218:                } else {
219:                    byte[] host = member.getHost();
220:                    int port = member.getPort();
221:                    if (port < 0) {
222:                        port = member.getSecurePort();
223:                        if (port < 0) {
224:                            // Ports shouldn't be 0 either, but in our test framework when there is only one in-process active, it could
225:                            // be.
226:                            throw new AssertionError("Invalid port number : "
227:                                    + port + " for host "
228:                                    + Conversion.bytesToHex(host));
229:                        }
230:                    }
231:                    int length = host.length;
232:                    byte uid[] = new byte[length + 4];
233:                    System.arraycopy(host, 0, uid, 0, length);
234:                    Conversion.writeInt(port, uid, length);
235:                    return new NodeIDImpl(member.getName(), uid);
236:                }
237:            }
238:
239:            private StaticMembershipInterceptor setupStaticMembers(
240:                    final Node this Node, final Node[] allNodes)
241:                    throws AssertionError {
242:                StaticMembershipInterceptor smi = new StaticMembershipInterceptor();
243:                for (int i = 0; i < allNodes.length; i++) {
244:                    final Node node = allNodes[i];
245:                    if (this Node.equals(node))
246:                        continue;
247:                    StaticMember sm = makeMember(node);
248:                    if (sm == null)
249:                        continue;
250:                    smi.addStaticMember(sm);
251:                }
252:                // set up this node
253:                this Member = makeMember(this Node);
254:                if (this Member == null) {
255:                    throw new AssertionError(
256:                            "Error setting up this group member: " + this Node);
257:                }
258:                this .this NodeID = makeNodeIDFrom(this Member);
259:                smi.setLocalMember(this Member);
260:                return smi;
261:            }
262:
263:            public NodeID getLocalNodeID() throws GroupException {
264:                if (this .this NodeID == null) {
265:                    throw new GroupException(
266:                            "Node hasnt joined the group yet !");
267:                }
268:                return this .this NodeID;
269:            }
270:
271:            private static void validateExternalizableClass(
272:                    Class<AbstractGroupMessage> clazz) {
273:                String name = clazz.getName();
274:                try {
275:                    Constructor<AbstractGroupMessage> cons = clazz
276:                            .getDeclaredConstructor(new Class[0]);
277:                    if ((cons.getModifiers() & Modifier.PUBLIC) == 0) {
278:                        //
279:                        throw new AssertionError(name
280:                                + " : public no arg constructor not found");
281:                    }
282:                } catch (NoSuchMethodException ex) {
283:                    throw new AssertionError(name
284:                            + " : public no arg constructor not found");
285:                }
286:            }
287:
288:            private static void validateEventClass(Class<?> clazz) {
289:                if (!EventContext.class.isAssignableFrom(clazz)) {
290:                    throw new AssertionError(clazz
291:                            + " does not implement interface "
292:                            + EventContext.class.getName());
293:                }
294:            }
295:
296:            @SuppressWarnings("unchecked")
297:            public void registerForMessages(Class msgClass,
298:                    GroupMessageListener listener) {
299:                validateExternalizableClass(msgClass);
300:                GroupMessageListener prev = messageListeners.put(msgClass
301:                        .getName(), listener);
302:                if (prev != null) {
303:                    logger.warn("Previous listener removed : " + prev);
304:                }
305:            }
306:
307:            @SuppressWarnings("unchecked")
308:            public void routeMessages(Class msgClass, Sink sink) {
309:                validateEventClass(msgClass);
310:                registerForMessages(msgClass, new RouteGroupMessagesToSink(
311:                        msgClass.getName(), sink));
312:            }
313:
314:            public boolean accept(Serializable msg, Member sender) {
315:                if (stopped || !(msg instanceof  GroupMessage)) {
316:                    logger
317:                            .warn("Rejecting message : "
318:                                    + msg
319:                                    + " from "
320:                                    + sender.getName()
321:                                    + " since its not Group Message or TribesGroupManager is stopped : "
322:                                    + stopped);
323:                    return false;
324:                }
325:                return true;
326:            }
327:
328:            public void messageReceived(Serializable msg, Member sender) {
329:                GroupMessage gmsg = (GroupMessage) msg;
330:                if (debug) {
331:                    logger.info(this .this NodeID + " recd msg "
332:                            + gmsg.getMessageID() + " From " + sender.getName()
333:                            + " Msg : " + msg);
334:                }
335:                MessageID requestID = gmsg.inResponseTo();
336:                NodeID from = makeNodeIDFrom(sender);
337:                MemberNode inode = (MemberNode) nodes.get(from);
338:                if (inode == null) {
339:                    String warn = "Message from non-existing member " + sender
340:                            + " . Adding this node to nodes = " + nodes;
341:                    logger.warn(warn);
342:                    // XXX:: Sometimes messages arrive before memberAdded event. So we are faking it. Also @see comment below
343:                    from = basicMemberAdded(from, sender);
344:                } else {
345:                    // We always maintain reference equality to all NodeIDs that is exposed to Application layer for a particular
346:                    // instance of the server. This is done so that when Zap node request comes in, we can identify if it is for the
347:                    // current instance of the remote node or not. When send fails with timeout (like in Solaris boxes) and the active
348:                    // tries to Zap node, we don't want to zap the wrong instance (i.e. the server might have crashed and come back
349:                    // already)
350:                    from = inode.getNodeID();
351:                }
352:                gmsg.setMessageOrginator(from);
353:                if (requestID.isNull()
354:                        || !notifyPendingRequests(requestID, gmsg, sender)) {
355:                    fireMessageReceivedEvent(from, gmsg);
356:                }
357:            }
358:
359:            private static StaticMember makeMember(final Node node) {
360:                try {
361:                    StaticMember rv = new StaticMember(node.getHost(), node
362:                            .getPort(), 0);
363:                    // rv.setUniqueId(UUIDGenerator.randomUUID(true));
364:                    return rv;
365:                } catch (IOException e) {
366:                    logger.error("Error creating group member", e);
367:                    return null;
368:                }
369:            }
370:
371:            private boolean notifyPendingRequests(MessageID requestID,
372:                    GroupMessage gmsg, Member sender) {
373:                GroupResponseImpl response = (GroupResponseImpl) pendingRequests
374:                        .get(requestID);
375:                if (response != null) {
376:                    response.addResponseFrom(sender, gmsg);
377:                    return true;
378:                }
379:                return false;
380:            }
381:
382:            private void fireMessageReceivedEvent(NodeID from, GroupMessage msg) {
383:                GroupMessageListener listener = messageListeners.get(msg
384:                        .getClass().getName());
385:                if (listener != null) {
386:                    listener.messageReceived(from, msg);
387:                } else {
388:                    String errorMsg = "No Route for " + msg + " from " + from;
389:                    logger.error(errorMsg);
390:                    throw new AssertionError(errorMsg);
391:                }
392:
393:            }
394:
395:            public void registerForGroupEvents(GroupEventsListener listener) {
396:                groupListeners.add(listener);
397:            }
398:
399:            public void memberAdded(Member member) {
400:                if (debug) {
401:                    logger.info("memberAdded -> name=" + member.getName()
402:                            + ", uid="
403:                            + Conversion.bytesToHex(member.getUniqueId()));
404:                }
405:                NodeID newNode = makeNodeIDFrom(member);
406:                basicMemberAdded(newNode, member);
407:            }
408:
409:            private NodeID basicMemberAdded(NodeID newNode, Member member) {
410:                MemberNode inode;
411:                synchronized (nodes) {
412:                    inode = (MemberNode) nodes.get(newNode);
413:                    if (inode == null) {
414:                        nodes.put(newNode, new MemberNode(newNode, member));
415:                    } else {
416:                        logger
417:                                .warn("Member Added Event called for : "
418:                                        + newNode
419:                                        + " while it is still present in the list of nodes : "
420:                                        + inode.getMember() + " : " + nodes);
421:                        if (!inode.getMember().equals(member)) {
422:                            logger.error("Old Member : " + inode.getMember()
423:                                    + " NOT Equal to  New one " + member);
424:                        }
425:                        return inode.getNodeID();
426:                    }
427:                }
428:                fireNodeEvent(newNode, true);
429:                return newNode;
430:            }
431:
432:            private void fireNodeEvent(NodeID newNode, boolean joined) {
433:                if (debug) {
434:                    logger.info("fireNodeEvent: joined = " + joined
435:                            + ", node = " + newNode);
436:                }
437:                Iterator<GroupEventsListener> i = groupListeners.iterator();
438:                while (i.hasNext()) {
439:                    GroupEventsListener listener = i.next();
440:                    if (joined) {
441:                        listener.nodeJoined(newNode);
442:                    } else {
443:                        listener.nodeLeft(newNode);
444:                    }
445:                }
446:            }
447:
448:            public void memberDisappeared(Member member) {
449:                if (debug) {
450:                    logger.info("memberDisappeared -> name=" + member.getName()
451:                            + ", uid="
452:                            + Conversion.bytesToHex(member.getUniqueId()));
453:                }
454:                NodeID node = makeNodeIDFrom(member);
455:                MemberNode inode = (MemberNode) nodes.remove(node);
456:                if (inode != null) {
457:                    // Make sure that all external application layer sees the same NodeID instance always
458:                    fireNodeEvent(inode.getNodeID(), false);
459:                } else {
460:                    logger
461:                            .warn("Member Disappered Event called for : "
462:                                    + node
463:                                    + " while it is not present in the list of nodes : "
464:                                    + nodes);
465:                }
466:                notifyAnyPendingRequests(member);
467:            }
468:
469:            private void notifyAnyPendingRequests(Member member) {
470:                synchronized (pendingRequests) {
471:                    for (Iterator<GroupResponse> i = pendingRequests.values()
472:                            .iterator(); i.hasNext();) {
473:                        GroupResponseImpl response = (GroupResponseImpl) i
474:                                .next();
475:                        response.notifyMemberDead(member);
476:                    }
477:                }
478:            }
479:
480:            public void sendAll(GroupMessage msg) throws GroupException {
481:                if (debug) {
482:                    logger.info(this .this NodeID + " : Sending to ALL : "
483:                            + msg.getMessageID());
484:                }
485:                try {
486:                    Member m[] = getCurrentMembers();
487:                    if (m.length > 0) {
488:                        group.send(m, msg, SEND_OPTIONS_NO_ACK);
489:                    }
490:                } catch (ChannelException e) {
491:                    throw new GroupException(e);
492:                }
493:            }
494:
495:            // TODO:: This method can be optimized by caching members;
496:            private Member[] getCurrentMembers() {
497:                // return group.getMembers();
498:                MemberNode[] inodes = (MemberNode[]) nodes.valuesToArray();
499:                Member[] members = new Member[inodes.length];
500:                for (int i = 0; i < members.length; i++) {
501:                    members[i] = inodes[i].getMember();
502:                }
503:                return members;
504:            }
505:
506:            public GroupResponse sendAllAndWaitForResponse(GroupMessage msg)
507:                    throws GroupException {
508:                if (debug) {
509:                    logger.info(this .this NodeID
510:                            + " : Sending to ALL and Waiting for Response : "
511:                            + msg.getMessageID());
512:                }
513:                GroupResponseImpl groupResponse = new GroupResponseImpl();
514:                MessageID msgID = msg.getMessageID();
515:                GroupResponse old = pendingRequests.put(msgID, groupResponse);
516:                Assert.assertNull(old);
517:                groupResponse.sendTo(group, msg, getCurrentMembers());
518:                groupResponse.waitForAllResponses();
519:                pendingRequests.remove(msgID);
520:                return groupResponse;
521:            }
522:
523:            public void sendTo(NodeID node, GroupMessage msg)
524:                    throws GroupException {
525:                if (debug) {
526:                    logger.info(this .this NodeID + " : Sending to : " + node
527:                            + " msg " + msg.getMessageID());
528:                }
529:                MemberNode inode = (MemberNode) nodes.get(node);
530:                if (inode != null) {
531:                    try {
532:                        group.send(new Member[] { inode.getMember() }, msg,
533:                                SEND_OPTIONS_NO_ACK);
534:                    } catch (ChannelException e) {
535:                        throw new GroupException(e);
536:                    }
537:                } else {
538:                    String error = "Msg sent to non-exisitent Node : Node "
539:                            + node + ". Msg : " + msg;
540:                    logger.error(error);
541:                    throw new GroupException(error);
542:                }
543:            }
544:
545:            public GroupMessage sendToAndWaitForResponse(NodeID nodeID,
546:                    GroupMessage msg) throws GroupException {
547:                if (debug) {
548:                    logger.info(this .this NodeID + " : Sending to " + nodeID
549:                            + " and Waiting for Response : "
550:                            + msg.getMessageID());
551:                }
552:                GroupResponseImpl groupResponse = new GroupResponseImpl();
553:                MessageID msgID = msg.getMessageID();
554:                MemberNode inode = (MemberNode) nodes.get(nodeID);
555:                if (inode != null) {
556:                    Member to[] = new Member[1];
557:                    to[0] = inode.getMember();
558:                    GroupResponse old = pendingRequests.put(msgID,
559:                            groupResponse);
560:                    Assert.assertNull(old);
561:                    groupResponse.sendTo(group, msg, to);
562:                    groupResponse.waitForAllResponses();
563:                    pendingRequests.remove(msgID);
564:                } else {
565:                    String errorMsg = "Node " + nodeID
566:                            + " not present in the group. Ignoring Message : "
567:                            + msg;
568:                    logger.error(errorMsg);
569:                    throw new GroupException(errorMsg);
570:                }
571:                return groupResponse.getResponse(nodeID);
572:            }
573:
574:            public void setZapNodeRequestProcessor(
575:                    ZapNodeRequestProcessor processor) {
576:                this .zapNodeRequestProcessor = processor;
577:            }
578:
579:            public void zapNode(NodeID nodeID, int type, String reason) {
580:                MemberNode inode = (MemberNode) nodes.get(nodeID);
581:                if (inode == null) {
582:                    logger
583:                            .warn("Ignoring Zap node request since Member is null");
584:                } else if (inode.getNodeID() != nodeID) {
585:                    logger
586:                            .warn("Ignoring Zap node request since the Node ID for zapNode request is not reference equal to the one in the internal list. "
587:                                    + " This probably means that zap node request is meant for the previous instance of the server. NodeID "
588:                                    + nodeID + " INode = " + inode);
589:                } else if (!zapNodeRequestProcessor
590:                        .acceptOutgoingZapNodeRequest(nodeID, type, reason)) {
591:                    logger.warn("Ignoreing Zap node request since "
592:                            + zapNodeRequestProcessor + " asked us to : "
593:                            + nodeID + " type = " + type + " reason = "
594:                            + reason);
595:                } else {
596:                    long weights[] = zapNodeRequestProcessor
597:                            .getCurrentNodeWeights();
598:                    logger.warn("Zapping node : " + nodeID + " type = " + type
599:                            + " reason = " + reason + " my weight = "
600:                            + Arrays.toString(weights));
601:                    GroupMessage msg = GroupZapNodeMessageFactory
602:                            .createGroupZapNodeMessage(type, reason, weights);
603:                    try {
604:                        sendTo(nodeID, msg);
605:                    } catch (GroupException e) {
606:                        logger.error("Error sending ZapNode Request to "
607:                                + nodeID + " msg = " + msg);
608:                    }
609:                    logger.warn("Removing member " + inode + " from group");
610:                    memberDisappeared(inode.getMember());
611:                }
612:            }
613:
614:            private static class GroupResponseImpl implements  GroupResponse {
615:
616:                HashSet<NodeID> waitFor = new HashSet<NodeID>();
617:                List<GroupMessage> responses = new ArrayList<GroupMessage>();
618:
619:                public synchronized List<GroupMessage> getResponses() {
620:                    Assert.assertTrue(waitFor.isEmpty());
621:                    return responses;
622:                }
623:
624:                public synchronized GroupMessage getResponse(NodeID nodeID) {
625:                    Assert.assertTrue(waitFor.isEmpty());
626:                    for (Iterator<GroupMessage> i = responses.iterator(); i
627:                            .hasNext();) {
628:                        GroupMessage msg = i.next();
629:                        if (nodeID.equals(msg.messageFrom()))
630:                            return msg;
631:                    }
632:                    return null;
633:                }
634:
635:                public void sendTo(GroupChannel group, GroupMessage msg,
636:                        Member[] m) {
637:                    try {
638:                        if (m.length > 0) {
639:                            setUpWaitFor(m);
640:                            group.send(m, msg, SEND_OPTIONS_NO_ACK);
641:                        }
642:                    } catch (ChannelException e) {
643:                        logger.error("Error sending msg : " + msg, e);
644:                        reconsileWaitFor(e);
645:                    }
646:                }
647:
648:                private synchronized void setUpWaitFor(Member[] m) {
649:                    for (int i = 0; i < m.length; i++) {
650:                        waitFor.add(makeNodeIDFrom(m[i]));
651:                    }
652:                }
653:
654:                public synchronized void addResponseFrom(Member sender,
655:                        GroupMessage gmsg) {
656:                    if (!waitFor.remove(makeNodeIDFrom(sender))) {
657:                        String message = "Recd response from a member not in list : "
658:                                + sender
659:                                + " : waiting For : "
660:                                + waitFor
661:                                + " msg : " + gmsg;
662:                        logger.error(message);
663:                        throw new AssertionError(message);
664:                    }
665:                    responses.add(gmsg);
666:                    notifyAll();
667:                }
668:
669:                public synchronized void notifyMemberDead(Member member) {
670:                    waitFor.remove(makeNodeIDFrom(member));
671:                    notifyAll();
672:                }
673:
674:                public synchronized void waitForAllResponses()
675:                        throws GroupException {
676:                    int count = 0;
677:                    while (!waitFor.isEmpty()) {
678:                        try {
679:                            this .wait(5000);
680:                            if (++count > 1) {
681:                                logger.warn("Still waiting for response from "
682:                                        + waitFor + ". Count = " + count);
683:                            }
684:                        } catch (InterruptedException e) {
685:                            throw new GroupException(e);
686:                        }
687:                    }
688:                }
689:
690:                private synchronized void reconsileWaitFor(ChannelException e) {
691:                    FaultyMember fm[] = e.getFaultyMembers();
692:                    for (int i = 0; i < fm.length; i++) {
693:                        logger.warn("Removing faulty Member " + fm[i]
694:                                + " from list");
695:                        waitFor.remove(makeNodeIDFrom(fm[i].getMember()));
696:                    }
697:                    logger.info("Current waiting members = " + waitFor);
698:                }
699:            }
700:
701:            private static final class MemberNode {
702:
703:                private final NodeID nodeID;
704:                private final Member member;
705:
706:                public MemberNode(NodeID nodeID, Member member) {
707:                    this .nodeID = nodeID;
708:                    this .member = member;
709:                }
710:
711:                public NodeID getNodeID() {
712:                    return nodeID;
713:                }
714:
715:                public Member getMember() {
716:                    return member;
717:                }
718:
719:                public String toString() {
720:                    return "[ " + nodeID + " => " + member + " ]";
721:                }
722:
723:            }
724:
725:            private final class ZapNodeRequestRouter implements 
726:                    GroupMessageListener {
727:
728:                public void messageReceived(NodeID fromNode, GroupMessage msg) {
729:                    GroupZapNodeMessage zapMsg = (GroupZapNodeMessage) msg;
730:                    zapNodeRequestProcessor.incomingZapNodeRequest(msg
731:                            .messageFrom(), zapMsg.getZapNodeType(), zapMsg
732:                            .getReason(), zapMsg.getWeights());
733:                }
734:            }
735:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.