Source Code Cross Referenced for DistributedReplicantManagerImpl.java in  » EJB-Server-JBoss-4.2.1 » cluster » org » jboss » ha » framework » server » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » EJB Server JBoss 4.2.1 » cluster » org.jboss.ha.framework.server 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        /*
0002:         * JBoss, Home of Professional Open Source.
0003:         * Copyright 2006, Red Hat Middleware LLC, and individual contributors
0004:         * as indicated by the @author tags. See the copyright.txt file in the
0005:         * distribution for a full listing of individual contributors.
0006:         *
0007:         * This is free software; you can redistribute it and/or modify it
0008:         * under the terms of the GNU Lesser General Public License as
0009:         * published by the Free Software Foundation; either version 2.1 of
0010:         * the License, or (at your option) any later version.
0011:         *
0012:         * This software is distributed in the hope that it will be useful,
0013:         * but WITHOUT ANY WARRANTY; without even the implied warranty of
0014:         * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
0015:         * Lesser General Public License for more details.
0016:         *
0017:         * You should have received a copy of the GNU Lesser General Public
0018:         * License along with this software; if not, write to the Free
0019:         * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
0020:         * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
0021:         */
0022:        package org.jboss.ha.framework.server;
0023:
0024:        import java.util.Set;
0025:        import java.util.Vector;
0026:        import java.util.ArrayList;
0027:        import java.util.HashMap;
0028:        import java.util.Iterator;
0029:        import java.util.Collection;
0030:        import java.util.HashSet;
0031:        import java.util.List;
0032:        import java.util.Map;
0033:
0034:        import java.io.Serializable;
0035:
0036:        import javax.management.MBeanServer;
0037:        import javax.management.ObjectName;
0038:
0039:        import EDU.oswego.cs.dl.util.concurrent.Latch;
0040:        import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
0041:
0042:        import org.jboss.logging.Logger;
0043:
0044:        import org.jboss.ha.framework.interfaces.ClusterMergeStatus;
0045:        import org.jboss.ha.framework.interfaces.ClusterNode;
0046:        import org.jboss.ha.framework.interfaces.DistributedReplicantManager;
0047:        import org.jboss.ha.framework.interfaces.HAPartition;
0048:
0049:        /** 
0050:         * This class manages replicated objects.
0051:         * 
0052:         * @author  <a href="mailto:bill@burkecentral.com">Bill Burke</a>.
0053:         * @author  <a href="mailto:sacha.labourey@cogito-info.ch">Sacha Labourey</a>.
0054:         * @author  Scott.stark@jboss.org
0055:         * @version $Revision: 61770 $
0056:         */
0057:        public class DistributedReplicantManagerImpl implements 
0058:                DistributedReplicantManagerImplMBean,
0059:                HAPartition.HAMembershipExtendedListener,
0060:                HAPartition.HAPartitionStateTransfer,
0061:                AsynchEventHandler.AsynchEventProcessor {
0062:            // Constants -----------------------------------------------------
0063:
0064:            protected final static String SERVICE_NAME = "DistributedReplicantManager";
0065:
0066:            // Attributes ----------------------------------------------------
0067:            protected static int threadID;
0068:
0069:            protected ConcurrentReaderHashMap localReplicants = new ConcurrentReaderHashMap();
0070:            protected ConcurrentReaderHashMap replicants = new ConcurrentReaderHashMap();
0071:            protected ConcurrentReaderHashMap keyListeners = new ConcurrentReaderHashMap();
0072:            protected HashMap intraviewIdCache = new HashMap();
0073:            protected HAPartition partition;
0074:            /** The handler used to send replicant change notifications asynchronously */
0075:            protected AsynchEventHandler asynchHandler;
0076:
0077:            protected Logger log;
0078:
0079:            protected MBeanServer mbeanserver;
0080:            protected ObjectName jmxName;
0081:
0082:            protected String nodeName = null;
0083:
0084:            protected Latch partitionNameKnown = new Latch();
0085:            protected boolean trace;
0086:
0087:            protected Class[] add_types = new Class[] { String.class,
0088:                    String.class, Serializable.class };
0089:            protected Class[] remove_types = new Class[] { String.class,
0090:                    String.class };
0091:
0092:            // Static --------------------------------------------------------
0093:
0094:            // Constructors --------------------------------------------------       
0095:
0096:            /**
0097:             * This class manages replicated objects through the given partition
0098:             *
0099:             * @param partition {@link HAPartition} through which replicated objects will be exchanged
0100:             */
0101:            public DistributedReplicantManagerImpl(HAPartition partition,
0102:                    MBeanServer server) {
0103:                this .partition = partition;
0104:                this .mbeanserver = server;
0105:                this .log = Logger
0106:                        .getLogger(DistributedReplicantManagerImpl.class
0107:                                .getName()
0108:                                + "." + partition.getPartitionName());
0109:                this .trace = log.isTraceEnabled();
0110:            }
0111:
0112:            // Public --------------------------------------------------------
0113:
0114:            public void init() throws Exception {
0115:                log.debug("registerRPCHandler");
0116:                partition.registerRPCHandler(SERVICE_NAME, this );
0117:                log.debug("subscribeToStateTransferEvents");
0118:                partition.subscribeToStateTransferEvents(SERVICE_NAME, this );
0119:                log.debug("registerMembershipListener");
0120:                partition.registerMembershipListener(this );
0121:
0122:                // subscribed this "sub-service" of HAPartition with JMX
0123:                // TODO: In the future (when state transfer issues will be completed), 
0124:                // we will need to redesign the way HAPartitions and its sub-protocols are
0125:                // registered with JMX. They will most probably be independant JMX services.
0126:                //
0127:                String name = "jboss:service=" + SERVICE_NAME
0128:                        + ",partitionName=" + this .partition.getPartitionName();
0129:                this .jmxName = new javax.management.ObjectName(name);
0130:                this .mbeanserver.registerMBean(this , jmxName);
0131:            }
0132:
0133:            public void start() throws Exception {
0134:                this .nodeName = this .partition.getNodeName();
0135:
0136:                // Create the asynch listener handler thread
0137:                asynchHandler = new AsynchEventHandler(this ,
0138:                        "AsynchKeyChangeHandler");
0139:                asynchHandler.start();
0140:
0141:                partitionNameKnown.release(); // partition name is now known!
0142:
0143:                //log.info("mergemembers");
0144:                //mergeMembers();
0145:            }
0146:
0147:            public void stop() throws Exception {
0148:                // BES 200604 -- implication of NR's JBLCUSTER-38 change.  Moving to
0149:                // destroy allows restart of HAPartition while local registrations 
0150:                // survive -- stopping partition does not stop all registered services
0151:                // e.g. ejbs; if we maintain their registrations we can pass them to
0152:                // the cluster when we restart.  However, we are leaving all the remote
0153:                // replicants we have registered around, so they will still be included 
0154:                // as targets if anyone contacts our EJB while partition is stopped.
0155:                // Probably OK; if they aren't valid the client will find this out.
0156:
0157:                //    NR 200505 : [JBCLUSTER-38] move to destroy
0158:                //      if (localReplicants != null)
0159:                //      {
0160:                //         synchronized(localReplicants)
0161:                //         {
0162:                //            while (! localReplicants.isEmpty ())
0163:                //            {               
0164:                //               this.remove ((String)localReplicants.keySet().iterator().next ());
0165:                //            }
0166:                //         }
0167:                //      }
0168:
0169:                // Stop the asynch handler thread
0170:                try {
0171:                    asynchHandler.stop();
0172:                } catch (Exception e) {
0173:                    log.warn("Failed to stop asynchHandler", e);
0174:                }
0175:
0176:                //    NR 200505 : [JBCLUSTER-38] move to destroy
0177:                //      this.mbeanserver.unregisterMBean (this.jmxName);
0178:            }
0179:
0180:            // NR 200505 : [JBCLUSTER-38] unbind at destroy
0181:            public void destroy() throws Exception {
0182:                // now partition can't be resuscitated, so remove local replicants
0183:                if (localReplicants != null) {
0184:                    synchronized (localReplicants) {
0185:                        String[] keys = new String[localReplicants.size()];
0186:                        localReplicants.keySet().toArray(keys);
0187:                        for (int n = 0; n < keys.length; n++) {
0188:                            this .removeLocal(keys[n]); // channel is disconnected, so
0189:                            // don't try to notify cluster
0190:                        }
0191:                    }
0192:                }
0193:
0194:                this .mbeanserver.unregisterMBean(this .jmxName);
0195:
0196:                partition.unregisterRPCHandler(SERVICE_NAME, this );
0197:                partition
0198:                        .unsubscribeFromStateTransferEvents(SERVICE_NAME, this );
0199:                partition.unregisterMembershipListener(this );
0200:            }
0201:
0202:            public String listContent() throws Exception {
0203:                // we merge all replicants services: local only or not
0204:                //
0205:                java.util.Collection services = this .getAllServices();
0206:
0207:                StringBuffer result = new StringBuffer();
0208:                java.util.Iterator catsIter = services.iterator();
0209:
0210:                result.append("<pre>");
0211:
0212:                while (catsIter.hasNext()) {
0213:                    String category = (String) catsIter.next();
0214:                    HashMap content = (HashMap) this .replicants.get(category);
0215:                    if (content == null)
0216:                        content = new HashMap();
0217:                    java.util.Iterator keysIter = content.keySet().iterator();
0218:
0219:                    result
0220:                            .append("-----------------------------------------------\n");
0221:                    result.append("Service : ").append(category).append("\n\n");
0222:
0223:                    Serializable local = lookupLocalReplicant(category);
0224:                    if (local == null)
0225:                        result
0226:                                .append("\t- Service is *not* available locally\n");
0227:                    else
0228:                        result
0229:                                .append("\t- Service *is* also available locally\n");
0230:
0231:                    while (keysIter.hasNext()) {
0232:                        String location = (String) keysIter.next();
0233:                        result.append("\t- ").append(location).append("\n");
0234:                    }
0235:
0236:                    result.append("\n");
0237:
0238:                }
0239:
0240:                result.append("</pre>");
0241:
0242:                return result.toString();
0243:            }
0244:
0245:            public String listXmlContent() throws Exception {
0246:                // we merge all replicants services: local only or not
0247:                //
0248:                java.util.Collection services = this .getAllServices();
0249:                StringBuffer result = new StringBuffer();
0250:
0251:                result.append("<ReplicantManager>\n");
0252:
0253:                java.util.Iterator catsIter = services.iterator();
0254:                while (catsIter.hasNext()) {
0255:                    String category = (String) catsIter.next();
0256:                    HashMap content = (HashMap) this .replicants.get(category);
0257:                    if (content == null)
0258:                        content = new HashMap();
0259:                    java.util.Iterator keysIter = content.keySet().iterator();
0260:
0261:                    result.append("\t<Service>\n");
0262:                    result.append("\t\t<ServiceName>").append(category).append(
0263:                            "</ServiceName>\n");
0264:
0265:                    Serializable local = lookupLocalReplicant(category);
0266:                    if (local != null) {
0267:                        result.append("\t\t<Location>\n");
0268:                        result.append("\t\t\t<Name local=\"True\">").append(
0269:                                this .nodeName).append("</Name>\n");
0270:                        result.append("\t\t</Location>\n");
0271:                    }
0272:
0273:                    while (keysIter.hasNext()) {
0274:                        String location = (String) keysIter.next();
0275:                        result.append("\t\t<Location>\n");
0276:                        result.append("\t\t\t<Name local=\"False\">").append(
0277:                                location).append("</Name>\n");
0278:                        result.append("\t\t</Location>\n");
0279:                    }
0280:
0281:                    result.append("\t<Service>\n");
0282:
0283:                }
0284:
0285:                result.append("<ReplicantManager>\n");
0286:
0287:                return result.toString();
0288:            }
0289:
0290:            // HAPartition.HAPartitionStateTransfer implementation ----------------------------------------------
0291:
0292:            public Serializable getCurrentState() {
0293:                java.util.Collection services = this .getAllServices();
0294:                HashMap result = new HashMap();
0295:
0296:                java.util.Iterator catsIter = services.iterator();
0297:                while (catsIter.hasNext()) {
0298:                    String category = (String) catsIter.next();
0299:                    HashMap content = (HashMap) this .replicants.get(category);
0300:                    if (content == null)
0301:                        content = new HashMap();
0302:                    else
0303:                        content = (HashMap) content.clone();
0304:
0305:                    Serializable local = lookupLocalReplicant(category);
0306:                    if (local != null)
0307:                        content.put(this .nodeName, local);
0308:
0309:                    result.put(category, content);
0310:                }
0311:
0312:                // we add the intraviewid cache to the global result
0313:                //
0314:                Object[] globalResult = new Object[] { result, intraviewIdCache };
0315:                return globalResult;
0316:            }
0317:
0318:            public void setCurrentState(Serializable newState) {
0319:                Object[] globalState = (Object[]) newState;
0320:
0321:                HashMap map = (HashMap) globalState[0];
0322:                this .replicants.putAll(map);
0323:                this .intraviewIdCache = (HashMap) globalState[1];
0324:
0325:                if (trace) {
0326:                    log
0327:                            .trace(nodeName
0328:                                    + ": received new state, will republish local replicants");
0329:                }
0330:                MembersPublisher publisher = new MembersPublisher();
0331:                publisher.start();
0332:            }
0333:
0334:            public Collection getAllServices() {
0335:                HashSet services = new HashSet();
0336:                services.addAll(localReplicants.keySet());
0337:                services.addAll(replicants.keySet());
0338:                return services;
0339:            }
0340:
0341:            // HAPartition.HAMembershipListener implementation ----------------------------------------------
0342:
0343:            public void membershipChangedDuringMerge(Vector deadMembers,
0344:                    Vector newMembers, Vector allMembers,
0345:                    Vector originatingGroups) {
0346:                // Here we only care about deadMembers.  Purge all replicant lists of deadMembers
0347:                // and then notify all listening nodes.
0348:                //
0349:                log.info("Merging partitions...");
0350:                log.info("Dead members: " + deadMembers.size());
0351:                log.info("Originating groups: " + originatingGroups);
0352:                purgeDeadMembers(deadMembers);
0353:                if (newMembers.size() > 0) {
0354:                    new MergeMembers().start();
0355:                }
0356:            }
0357:
0358:            public void membershipChanged(Vector deadMembers,
0359:                    Vector newMembers, Vector allMembers) {
0360:                // Here we only care about deadMembers.  Purge all replicant lists of deadMembers
0361:                // and then notify all listening nodes.
0362:                //
0363:                log.info("I am (" + nodeName
0364:                        + ") received membershipChanged event:");
0365:                log.info("Dead members: " + deadMembers.size() + " ("
0366:                        + deadMembers + ")");
0367:                log.info("New Members : " + newMembers.size() + " ("
0368:                        + newMembers + ")");
0369:                log.info("All Members : " + allMembers.size() + " ("
0370:                        + allMembers + ")");
0371:                purgeDeadMembers(deadMembers);
0372:
0373:                // we don't need to merge members anymore
0374:            }
0375:
0376:            // AsynchEventHandler.AsynchEventProcessor implementation -----------------
0377:
0378:            public void processEvent(Object event) {
0379:                KeyChangeEvent kce = (KeyChangeEvent) event;
0380:                notifyKeyListeners(kce.key, kce.replicants);
0381:            }
0382:
0383:            static class KeyChangeEvent {
0384:                String key;
0385:                List replicants;
0386:            }
0387:
0388:            // DistributedReplicantManager implementation ----------------------------------------------              
0389:
0390:            public void add(String key, Serializable replicant)
0391:                    throws Exception {
0392:                if (trace)
0393:                    log.trace("add, key=" + key + ", value=" + replicant);
0394:                partitionNameKnown.acquire(); // we don't propagate until our name is known
0395:
0396:                Object[] args = { key, this .nodeName, replicant };
0397:                partition.callMethodOnCluster(SERVICE_NAME, "_add", args,
0398:                        add_types, true);
0399:                synchronized (localReplicants) {
0400:                    localReplicants.put(key, replicant);
0401:                    notifyKeyListeners(key, lookupReplicants(key));
0402:                }
0403:            }
0404:
0405:            public void remove(String key) throws Exception {
0406:                partitionNameKnown.acquire(); // we don't propagate until our name is known
0407:
0408:                // optimisation: we don't make a costly network call
0409:                // if there is nothing to remove
0410:                if (localReplicants.containsKey(key)) {
0411:                    Object[] args = { key, this .nodeName };
0412:                    partition.callAsynchMethodOnCluster(SERVICE_NAME,
0413:                            "_remove", args, remove_types, true);
0414:                    removeLocal(key);
0415:                }
0416:            }
0417:
0418:            protected void removeLocal(String key) {
0419:                synchronized (localReplicants) {
0420:                    localReplicants.remove(key);
0421:                    List result = lookupReplicants(key);
0422:                    if (result == null)
0423:                        result = new ArrayList(); // don't pass null but an empty list
0424:                    notifyKeyListeners(key, result);
0425:                }
0426:            }
0427:
0428:            public Serializable lookupLocalReplicant(String key) {
0429:                return (Serializable) localReplicants.get(key);
0430:            }
0431:
0432:            public List lookupReplicants(String key) {
0433:                Serializable local = lookupLocalReplicant(key);
0434:                HashMap replicant = (HashMap) replicants.get(key);
0435:                if (replicant == null && local == null)
0436:                    return null;
0437:
0438:                ArrayList rtn = new ArrayList();
0439:
0440:                if (replicant == null) {
0441:                    if (local != null)
0442:                        rtn.add(local);
0443:                } else {
0444:                    // JBAS-2677. Put the replicants in view order.
0445:                    ClusterNode[] nodes = partition.getClusterNodes();
0446:                    String replNode;
0447:                    Object replVal;
0448:                    for (int i = 0; i < nodes.length; i++) {
0449:                        replNode = nodes[i].getName();
0450:                        if (local != null && nodeName.equals(replNode)) {
0451:                            rtn.add(local);
0452:                            continue;
0453:                        }
0454:
0455:                        replVal = replicant.get(replNode);
0456:                        if (replVal != null)
0457:                            rtn.add(replVal);
0458:                    }
0459:                }
0460:
0461:                return rtn;
0462:            }
0463:
0464:            public List lookupReplicantsNodeNames(String key) {
0465:                boolean locallyReplicated = localReplicants.containsKey(key);
0466:                HashMap replicant = (HashMap) replicants.get(key);
0467:                if (replicant == null && !locallyReplicated)
0468:                    return null;
0469:
0470:                ArrayList rtn = new ArrayList();
0471:
0472:                if (replicant == null) {
0473:                    if (locallyReplicated)
0474:                        rtn.add(this .nodeName);
0475:                } else {
0476:                    // JBAS-2677. Put the replicants in view order.
0477:                    Set keys = replicant.keySet();
0478:                    ClusterNode[] nodes = partition.getClusterNodes();
0479:                    String keyOwner;
0480:                    for (int i = 0; i < nodes.length; i++) {
0481:                        keyOwner = nodes[i].getName();
0482:                        if (locallyReplicated && nodeName.equals(keyOwner)) {
0483:                            rtn.add(this .nodeName);
0484:                            continue;
0485:                        }
0486:
0487:                        if (keys.contains(keyOwner))
0488:                            rtn.add(keyOwner);
0489:                    }
0490:                }
0491:
0492:                return rtn;
0493:            }
0494:
0495:            public void registerListener(String key,
0496:                    DistributedReplicantManager.ReplicantListener subscriber) {
0497:                synchronized (keyListeners) {
0498:                    ArrayList listeners = (ArrayList) keyListeners.get(key);
0499:                    if (listeners == null) {
0500:                        listeners = new ArrayList();
0501:                        keyListeners.put(key, listeners);
0502:                    }
0503:                    listeners.add(subscriber);
0504:                }
0505:            }
0506:
0507:            public void unregisterListener(String key,
0508:                    DistributedReplicantManager.ReplicantListener subscriber) {
0509:                synchronized (keyListeners) {
0510:                    ArrayList listeners = (ArrayList) keyListeners.get(key);
0511:                    if (listeners == null)
0512:                        return;
0513:
0514:                    listeners.remove(subscriber);
0515:                    if (listeners.size() == 0)
0516:                        keyListeners.remove(key);
0517:
0518:                }
0519:            }
0520:
0521:            public int getReplicantsViewId(String key) {
0522:                Integer result = (Integer) this .intraviewIdCache.get(key);
0523:
0524:                if (result == null)
0525:                    return 0;
0526:                else
0527:                    return result.intValue();
0528:            }
0529:
0530:            public boolean isMasterReplica(String key) {
0531:                if (trace)
0532:                    log.trace("isMasterReplica, key=" + key);
0533:                // if I am not a replicat, I cannot be the master...
0534:                //
0535:                if (!localReplicants.containsKey(key)) {
0536:                    if (trace)
0537:                        log.trace("no localReplicants, key=" + key
0538:                                + ", isMasterReplica=false");
0539:                    return false;
0540:                }
0541:
0542:                Vector allNodes = this .partition.getCurrentView();
0543:                HashMap repForKey = (HashMap) replicants.get(key);
0544:                if (repForKey == null) {
0545:                    if (trace)
0546:                        log.trace("no replicants, key=" + key
0547:                                + ", isMasterReplica=true");
0548:                    return true;
0549:                }
0550:                Vector replicaNodes = new Vector((repForKey).keySet());
0551:                boolean isMasterReplica = false;
0552:                for (int i = 0; i < allNodes.size(); i++) {
0553:                    String aMember = (String) allNodes.elementAt(i);
0554:                    if (trace)
0555:                        log.trace("Testing member: " + aMember);
0556:                    if (replicaNodes.contains(aMember)) {
0557:                        if (trace)
0558:                            log
0559:                                    .trace("Member found in replicaNodes, isMasterReplica=false");
0560:                        break;
0561:                    } else if (aMember.equals(this .nodeName)) {
0562:                        if (trace)
0563:                            log
0564:                                    .trace("Member == nodeName, isMasterReplica=true");
0565:                        isMasterReplica = true;
0566:                        break;
0567:                    }
0568:                }
0569:                return isMasterReplica;
0570:            }
0571:
0572:            // DistributedReplicantManager cluster callbacks ----------------------------------------------              
0573:
0574:            /**
0575:             * Cluster callback called when a new replicant is added on another node
0576:             * @param key Replicant key
0577:             * @param nodeName Node that add the current replicant
0578:             * @param replicant Serialized representation of the replicant
0579:             */
0580:            public void _add(String key, String nodeName, Serializable replicant) {
0581:                if (trace)
0582:                    log.trace("_add(" + key + ", " + nodeName);
0583:
0584:                try {
0585:                    addReplicant(key, nodeName, replicant);
0586:                    // Notify listeners asynchronously
0587:                    KeyChangeEvent kce = new KeyChangeEvent();
0588:                    kce.key = key;
0589:                    kce.replicants = lookupReplicants(key);
0590:                    asynchHandler.queueEvent(kce);
0591:                } catch (Exception ex) {
0592:                    log.error("_add failed", ex);
0593:                }
0594:            }
0595:
0596:            /**
0597:             * Cluster callback called when a replicant is removed by another node
0598:             * @param key Name of the replicant key
0599:             * @param nodeName Node that wants to remove its replicant for the give key
0600:             */
0601:            public void _remove(String key, String nodeName) {
0602:                try {
0603:                    if (removeReplicant(key, nodeName)) {
0604:                        // Notify listeners asynchronously
0605:                        KeyChangeEvent kce = new KeyChangeEvent();
0606:                        kce.key = key;
0607:                        kce.replicants = lookupReplicants(key);
0608:                        asynchHandler.queueEvent(kce);
0609:                    }
0610:                } catch (Exception ex) {
0611:                    log.error("_remove failed", ex);
0612:                }
0613:            }
0614:
0615:            protected boolean removeReplicant(String key, String nodeName)
0616:                    throws Exception {
0617:                synchronized (replicants) {
0618:                    HashMap replicant = (HashMap) replicants.get(key);
0619:                    if (replicant == null)
0620:                        return false;
0621:                    Object removed = replicant.remove(nodeName);
0622:                    if (removed != null) {
0623:                        Collection values = replicant.values();
0624:                        if (values.size() == 0) {
0625:                            replicants.remove(key);
0626:                        }
0627:                        return true;
0628:                    }
0629:                }
0630:                return false;
0631:            }
0632:
0633:            /**
0634:             * Cluster callback called when a node wants to know our complete list of local replicants
0635:             * @throws Exception Thrown if a cluster communication exception occurs
0636:             * @return A java array of size 2 containing the name of our node in this cluster and the serialized representation of our state
0637:             */
0638:            public Object[] lookupLocalReplicants() throws Exception {
0639:                partitionNameKnown.acquire(); // we don't answer until our name is known
0640:
0641:                Object[] rtn = { this .nodeName, localReplicants };
0642:                if (trace)
0643:                    log.trace("lookupLocalReplicants called (" + rtn[0]
0644:                            + "). Return: " + localReplicants.size());
0645:                return rtn;
0646:            }
0647:
0648:            // Package protected ---------------------------------------------
0649:
0650:            // Protected -----------------------------------------------------
0651:
0652:            protected int calculateReplicantsHash(List members) {
0653:                int result = 0;
0654:                Object obj = null;
0655:
0656:                for (int i = 0; i < members.size(); i++) {
0657:                    obj = members.get(i);
0658:                    if (obj != null)
0659:                        result += obj.hashCode(); // no explicit overflow with int addition
0660:                }
0661:
0662:                return result;
0663:            }
0664:
0665:            protected int updateReplicantsHashId(String key) {
0666:                // we first get a list of all nodes names that replicate this key
0667:                //
0668:                List nodes = this .lookupReplicantsNodeNames(key);
0669:                int result = 0;
0670:
0671:                if ((nodes == null) || (nodes.size() == 0)) {
0672:                    // no nore replicants for this key: we uncache our view id
0673:                    //
0674:                    this .intraviewIdCache.remove(key);
0675:                } else {
0676:                    result = this .calculateReplicantsHash(nodes);
0677:                    this .intraviewIdCache.put(key, new Integer(result));
0678:                }
0679:
0680:                return result;
0681:
0682:            }
0683:
0684:            ///////////////
0685:            // DistributedReplicantManager API
0686:            ///////////////
0687:
0688:            /**
0689:             * Add a replicant to the replicants map.
0690:             * @param key replicant key name
0691:             * @param nodeName name of the node that adds this replicant
0692:             * @param replicant Serialized representation of the replica
0693:             */
0694:            protected void addReplicant(String key, String nodeName,
0695:                    Serializable replicant) {
0696:                addReplicant(replicants, key, nodeName, replicant);
0697:            }
0698:
0699:            /**
0700:             * Logic for adding replicant to any map.
0701:             * @param map structure in which adding the new replicant
0702:             * @param key name of the replicant key
0703:             * @param nodeName name of the node adding the replicant
0704:             * @param replicant serialized representation of the replicant that is added
0705:             */
0706:            protected void addReplicant(Map map, String key, String nodeName,
0707:                    Serializable replicant) {
0708:                synchronized (map) {
0709:                    HashMap rep = (HashMap) map.get(key);
0710:                    if (rep == null) {
0711:                        if (trace)
0712:                            log.trace("_adding new HashMap");
0713:                        rep = new HashMap();
0714:                        map.put(key, rep);
0715:                    }
0716:                    rep.put(nodeName, replicant);
0717:                }
0718:            }
0719:
0720:            protected Vector getKeysReplicatedByNode(String nodeName) {
0721:                Vector result = new Vector();
0722:                synchronized (replicants) {
0723:                    Iterator keysIter = replicants.keySet().iterator();
0724:                    while (keysIter.hasNext()) {
0725:                        String key = (String) keysIter.next();
0726:                        HashMap values = (HashMap) replicants.get(key);
0727:                        if ((values != null) && values.containsKey(nodeName)) {
0728:                            result.add(key);
0729:                        }
0730:                    }
0731:                }
0732:                return result;
0733:            }
0734:
0735:            /**
0736:             * Indicates if the a replicant already exists for a given key/node pair
0737:             * @param key replicant key name
0738:             * @param nodeName name of the node
0739:             * @return a boolean indicating if a replicant for the given node exists for the given key
0740:             */
0741:            protected boolean replicantEntryAlreadyExists(String key,
0742:                    String nodeName) {
0743:                return replicantEntryAlreadyExists(replicants, key, nodeName);
0744:            }
0745:
0746:            /**
0747:             * Indicates if the a replicant already exists for a given key/node pair in the give data structure
0748:             */
0749:            protected boolean replicantEntryAlreadyExists(Map map, String key,
0750:                    String nodeName) {
0751:                HashMap rep = (HashMap) map.get(key);
0752:                if (rep == null)
0753:                    return false;
0754:                else
0755:                    return rep.containsKey(nodeName);
0756:            }
0757:
0758:            /**
0759:             * Notifies, through a callback, the listeners for a given replicant that the set of replicants has changed
0760:             * @param key The replicant key name
0761:             * @param newReplicants The new list of replicants
0762:             * 
0763:             */
0764:            protected void notifyKeyListeners(String key, List newReplicants) {
0765:                if (trace)
0766:                    log.trace("notifyKeyListeners");
0767:
0768:                // we first update the intra-view id for this particular key
0769:                //
0770:                int newId = updateReplicantsHashId(key);
0771:
0772:                ArrayList listeners = (ArrayList) keyListeners.get(key);
0773:                if (listeners == null) {
0774:                    if (trace)
0775:                        log.trace("listeners is null");
0776:                    return;
0777:                }
0778:
0779:                // ArrayList's iterator is not thread safe
0780:                DistributedReplicantManager.ReplicantListener[] toNotify = null;
0781:                synchronized (listeners) {
0782:                    toNotify = new DistributedReplicantManager.ReplicantListener[listeners
0783:                            .size()];
0784:                    toNotify = (DistributedReplicantManager.ReplicantListener[]) listeners
0785:                            .toArray(toNotify);
0786:                }
0787:
0788:                if (trace)
0789:                    log.trace("notifying " + toNotify.length
0790:                            + " listeners for key change: " + key);
0791:                for (int i = 0; i < toNotify.length; i++) {
0792:                    if (toNotify[i] != null)
0793:                        toNotify[i]
0794:                                .replicantsChanged(key, newReplicants, newId);
0795:                }
0796:            }
0797:
0798:            protected void republishLocalReplicants() {
0799:                try {
0800:                    if (trace)
0801:                        log.trace("Start Re-Publish local replicants in DRM");
0802:
0803:                    HashMap localReplicants;
0804:                    synchronized (this .localReplicants) {
0805:                        localReplicants = new HashMap(this .localReplicants);
0806:                    }
0807:
0808:                    Iterator entries = localReplicants.entrySet().iterator();
0809:                    while (entries.hasNext()) {
0810:                        Map.Entry entry = (Map.Entry) entries.next();
0811:                        String key = (String) entry.getKey();
0812:                        Object replicant = entry.getValue();
0813:                        if (replicant != null) {
0814:                            if (trace)
0815:                                log.trace("publishing, key=" + key + ", value="
0816:                                        + replicant);
0817:
0818:                            Object[] args = { key, this .nodeName, replicant };
0819:
0820:                            partition.callAsynchMethodOnCluster(SERVICE_NAME,
0821:                                    "_add", args, add_types, true);
0822:                            notifyKeyListeners(key, lookupReplicants(key));
0823:                        }
0824:                    }
0825:                    if (trace)
0826:                        log.trace("End Re-Publish local replicants");
0827:                } catch (Exception e) {
0828:                    log.error("Re-Publish failed", e);
0829:                }
0830:            }
0831:
0832:            ////////////////////
0833:            // Group membership API
0834:            ////////////////////
0835:
0836:            protected void mergeMembers() {
0837:                boolean isAlreadyMerging = ClusterMergeStatus
0838:                        .isMergeInProcess();
0839:                try {
0840:                    ClusterMergeStatus.startMergeProcess();
0841:
0842:                    log.debug("Start merging members in DRM service...");
0843:                    java.util.HashSet notifies = new java.util.HashSet();
0844:                    ArrayList rsp = partition.callMethodOnCluster(SERVICE_NAME,
0845:                            "lookupLocalReplicants", new Object[] {},
0846:                            new Class[] {}, true);
0847:                    if (rsp.size() == 0)
0848:                        log
0849:                                .debug("No responses from other nodes during the DRM merge process.");
0850:                    else {
0851:                        log.debug("The DRM merge process has received "
0852:                                + rsp.size() + " answers");
0853:                    }
0854:                    for (int i = 0; i < rsp.size(); i++) {
0855:                        Object o = rsp.get(i);
0856:                        if (o == null) {
0857:                            log
0858:                                    .warn("As part of the answers received during the DRM merge process, a NULL message was received!");
0859:                            continue;
0860:                        } else if (o instanceof  Throwable) {
0861:                            log
0862:                                    .warn(
0863:                                            "As part of the answers received during the DRM merge process, a Throwable was received!",
0864:                                            (Throwable) o);
0865:                            continue;
0866:                        }
0867:
0868:                        Object[] objs = (Object[]) o;
0869:                        String node = (String) objs[0];
0870:                        Map replicants = (Map) objs[1];
0871:                        Iterator keys = replicants.keySet().iterator();
0872:
0873:                        //FIXME: We don't remove keys in the merge process but only add new keys!
0874:                        while (keys.hasNext()) {
0875:                            String key = (String) keys.next();
0876:                            // done to reduce duplicate notifications
0877:                            if (!replicantEntryAlreadyExists(key, node)) {
0878:                                addReplicant(key, node,
0879:                                        (Serializable) replicants.get(key));
0880:                                notifies.add(key);
0881:                            }
0882:                        }
0883:
0884:                        Vector currentStatus = getKeysReplicatedByNode(node);
0885:                        if (currentStatus.size() > replicants.size()) {
0886:                            // The merge process needs to remove some (now)
0887:                            // unexisting keys
0888:                            //
0889:                            for (int currentKeysId = 0, currentKeysMax = currentStatus
0890:                                    .size(); currentKeysId < currentKeysMax; currentKeysId++) {
0891:                                String theKey = (String) currentStatus
0892:                                        .elementAt(currentKeysId);
0893:                                if (!replicants.containsKey(theKey)) {
0894:                                    removeReplicant(theKey, node);
0895:                                    notifies.add(theKey);
0896:                                }
0897:                            }
0898:                        }
0899:                    }
0900:
0901:                    Iterator notifIter = notifies.iterator();
0902:                    while (notifIter.hasNext()) {
0903:                        String key = (String) notifIter.next();
0904:                        notifyKeyListeners(key, lookupReplicants(key));
0905:                    }
0906:                    log.debug("..Finished merging members in DRM service");
0907:
0908:                } catch (Exception ex) {
0909:                    log.error("merge failed", ex);
0910:                } finally {
0911:                    if (!isAlreadyMerging)
0912:                        ClusterMergeStatus.endMergeProcess();
0913:                }
0914:            }
0915:
0916:            /**
0917:             * get rid of dead members from replicant list
0918:             * return true if anything was purged.
0919:             */
0920:            protected void purgeDeadMembers(Vector deadMembers) {
0921:                if (deadMembers.size() <= 0)
0922:                    return;
0923:
0924:                log.debug("purgeDeadMembers, " + deadMembers);
0925:                try {
0926:                    synchronized (replicants) {
0927:                        Iterator keys = replicants.keySet().iterator();
0928:                        while (keys.hasNext()) {
0929:                            String key = (String) keys.next();
0930:                            HashMap replicant = (HashMap) replicants.get(key);
0931:                            boolean modified = false;
0932:                            for (int i = 0; i < deadMembers.size(); i++) {
0933:                                String node = deadMembers.elementAt(i)
0934:                                        .toString();
0935:                                log.debug("trying to remove deadMember " + node
0936:                                        + " for key " + key);
0937:                                Object removed = replicant.remove(node);
0938:                                if (removed != null) {
0939:                                    log.debug(node + " was removed");
0940:                                    modified = true;
0941:                                } else {
0942:                                    log.debug(node + " was NOT removed!!!");
0943:                                }
0944:                            }
0945:                            if (modified) {
0946:                                notifyKeyListeners(key, lookupReplicants(key));
0947:                            }
0948:                        }
0949:                    }
0950:                } catch (Exception ex) {
0951:                    log.error("purgeDeadMembers failed", ex);
0952:                }
0953:            }
0954:
0955:            /**
0956:             */
0957:            protected void cleanupKeyListeners() {
0958:                // NOT IMPLEMENTED YET
0959:            }
0960:
0961:            protected synchronized static int nextThreadID() {
0962:                return threadID++;
0963:            }
0964:
0965:            // Private -------------------------------------------------------
0966:
0967:            // Inner classes -------------------------------------------------
0968:
0969:            protected class MergeMembers extends Thread {
0970:                public MergeMembers() {
0971:                    super ("DRM Async Merger#" + nextThreadID());
0972:                }
0973:
0974:                /**
0975:                 * Called when the service needs to merge with another partition. This
0976:                 * process is performed asynchronously
0977:                 */
0978:                public void run() {
0979:                    log.debug("Sleeping for 50ms before mergeMembers");
0980:                    try {
0981:                        // if this thread invokes a cluster method call before
0982:                        // membershipChanged event completes, it could timeout/hang
0983:                        // we need to discuss this with Bela.
0984:                        Thread.sleep(50);
0985:                    } catch (Exception ignored) {
0986:                    }
0987:                    mergeMembers();
0988:                }
0989:            }
0990:
0991:            protected class MembersPublisher extends Thread {
0992:                public MembersPublisher() {
0993:                    super ("DRM Async Publisher#" + nextThreadID());
0994:                }
0995:
0996:                /**
0997:                 * Called when service needs to re-publish its local replicants to other
0998:                 * cluster members after this node has joined the cluster.
0999:                 */
1000:                public void run() {
1001:                    log
1002:                            .debug("DRM: Sleeping before re-publishing for 50ms just in case");
1003:                    try {
1004:                        // if this thread invokes a cluster method call before
1005:                        // membershipChanged event completes, it could timeout/hang
1006:                        // we need to discuss this with Bela.
1007:                        Thread.sleep(50);
1008:                    } catch (Exception ignored) {
1009:                    }
1010:                    republishLocalReplicants();
1011:                }
1012:            }
1013:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.