0001: /*
0002: * JBoss, Home of Professional Open Source.
0003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
0004: * as indicated by the @author tags. See the copyright.txt file in the
0005: * distribution for a full listing of individual contributors.
0006: *
0007: * This is free software; you can redistribute it and/or modify it
0008: * under the terms of the GNU Lesser General Public License as
0009: * published by the Free Software Foundation; either version 2.1 of
0010: * the License, or (at your option) any later version.
0011: *
0012: * This software is distributed in the hope that it will be useful,
0013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
0014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
0015: * Lesser General Public License for more details.
0016: *
0017: * You should have received a copy of the GNU Lesser General Public
0018: * License along with this software; if not, write to the Free
0019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
0020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
0021: */
0022: package org.jboss.ha.framework.server;
0023:
0024: import java.util.Set;
0025: import java.util.Vector;
0026: import java.util.ArrayList;
0027: import java.util.HashMap;
0028: import java.util.Iterator;
0029: import java.util.Collection;
0030: import java.util.HashSet;
0031: import java.util.List;
0032: import java.util.Map;
0033:
0034: import java.io.Serializable;
0035:
0036: import javax.management.MBeanServer;
0037: import javax.management.ObjectName;
0038:
0039: import EDU.oswego.cs.dl.util.concurrent.Latch;
0040: import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
0041:
0042: import org.jboss.logging.Logger;
0043:
0044: import org.jboss.ha.framework.interfaces.ClusterMergeStatus;
0045: import org.jboss.ha.framework.interfaces.ClusterNode;
0046: import org.jboss.ha.framework.interfaces.DistributedReplicantManager;
0047: import org.jboss.ha.framework.interfaces.HAPartition;
0048:
0049: /**
0050: * This class manages replicated objects.
0051: *
0052: * @author <a href="mailto:bill@burkecentral.com">Bill Burke</a>.
0053: * @author <a href="mailto:sacha.labourey@cogito-info.ch">Sacha Labourey</a>.
0054: * @author Scott.stark@jboss.org
0055: * @version $Revision: 61770 $
0056: */
0057: public class DistributedReplicantManagerImpl implements
0058: DistributedReplicantManagerImplMBean,
0059: HAPartition.HAMembershipExtendedListener,
0060: HAPartition.HAPartitionStateTransfer,
0061: AsynchEventHandler.AsynchEventProcessor {
0062: // Constants -----------------------------------------------------
0063:
0064: protected final static String SERVICE_NAME = "DistributedReplicantManager";
0065:
0066: // Attributes ----------------------------------------------------
0067: protected static int threadID;
0068:
0069: protected ConcurrentReaderHashMap localReplicants = new ConcurrentReaderHashMap();
0070: protected ConcurrentReaderHashMap replicants = new ConcurrentReaderHashMap();
0071: protected ConcurrentReaderHashMap keyListeners = new ConcurrentReaderHashMap();
0072: protected HashMap intraviewIdCache = new HashMap();
0073: protected HAPartition partition;
0074: /** The handler used to send replicant change notifications asynchronously */
0075: protected AsynchEventHandler asynchHandler;
0076:
0077: protected Logger log;
0078:
0079: protected MBeanServer mbeanserver;
0080: protected ObjectName jmxName;
0081:
0082: protected String nodeName = null;
0083:
0084: protected Latch partitionNameKnown = new Latch();
0085: protected boolean trace;
0086:
0087: protected Class[] add_types = new Class[] { String.class,
0088: String.class, Serializable.class };
0089: protected Class[] remove_types = new Class[] { String.class,
0090: String.class };
0091:
0092: // Static --------------------------------------------------------
0093:
0094: // Constructors --------------------------------------------------
0095:
0096: /**
0097: * This class manages replicated objects through the given partition
0098: *
0099: * @param partition {@link HAPartition} through which replicated objects will be exchanged
0100: */
0101: public DistributedReplicantManagerImpl(HAPartition partition,
0102: MBeanServer server) {
0103: this .partition = partition;
0104: this .mbeanserver = server;
0105: this .log = Logger
0106: .getLogger(DistributedReplicantManagerImpl.class
0107: .getName()
0108: + "." + partition.getPartitionName());
0109: this .trace = log.isTraceEnabled();
0110: }
0111:
0112: // Public --------------------------------------------------------
0113:
0114: public void init() throws Exception {
0115: log.debug("registerRPCHandler");
0116: partition.registerRPCHandler(SERVICE_NAME, this );
0117: log.debug("subscribeToStateTransferEvents");
0118: partition.subscribeToStateTransferEvents(SERVICE_NAME, this );
0119: log.debug("registerMembershipListener");
0120: partition.registerMembershipListener(this );
0121:
0122: // subscribed this "sub-service" of HAPartition with JMX
0123: // TODO: In the future (when state transfer issues will be completed),
0124: // we will need to redesign the way HAPartitions and its sub-protocols are
0125: // registered with JMX. They will most probably be independant JMX services.
0126: //
0127: String name = "jboss:service=" + SERVICE_NAME
0128: + ",partitionName=" + this .partition.getPartitionName();
0129: this .jmxName = new javax.management.ObjectName(name);
0130: this .mbeanserver.registerMBean(this , jmxName);
0131: }
0132:
0133: public void start() throws Exception {
0134: this .nodeName = this .partition.getNodeName();
0135:
0136: // Create the asynch listener handler thread
0137: asynchHandler = new AsynchEventHandler(this ,
0138: "AsynchKeyChangeHandler");
0139: asynchHandler.start();
0140:
0141: partitionNameKnown.release(); // partition name is now known!
0142:
0143: //log.info("mergemembers");
0144: //mergeMembers();
0145: }
0146:
0147: public void stop() throws Exception {
0148: // BES 200604 -- implication of NR's JBLCUSTER-38 change. Moving to
0149: // destroy allows restart of HAPartition while local registrations
0150: // survive -- stopping partition does not stop all registered services
0151: // e.g. ejbs; if we maintain their registrations we can pass them to
0152: // the cluster when we restart. However, we are leaving all the remote
0153: // replicants we have registered around, so they will still be included
0154: // as targets if anyone contacts our EJB while partition is stopped.
0155: // Probably OK; if they aren't valid the client will find this out.
0156:
0157: // NR 200505 : [JBCLUSTER-38] move to destroy
0158: // if (localReplicants != null)
0159: // {
0160: // synchronized(localReplicants)
0161: // {
0162: // while (! localReplicants.isEmpty ())
0163: // {
0164: // this.remove ((String)localReplicants.keySet().iterator().next ());
0165: // }
0166: // }
0167: // }
0168:
0169: // Stop the asynch handler thread
0170: try {
0171: asynchHandler.stop();
0172: } catch (Exception e) {
0173: log.warn("Failed to stop asynchHandler", e);
0174: }
0175:
0176: // NR 200505 : [JBCLUSTER-38] move to destroy
0177: // this.mbeanserver.unregisterMBean (this.jmxName);
0178: }
0179:
0180: // NR 200505 : [JBCLUSTER-38] unbind at destroy
0181: public void destroy() throws Exception {
0182: // now partition can't be resuscitated, so remove local replicants
0183: if (localReplicants != null) {
0184: synchronized (localReplicants) {
0185: String[] keys = new String[localReplicants.size()];
0186: localReplicants.keySet().toArray(keys);
0187: for (int n = 0; n < keys.length; n++) {
0188: this .removeLocal(keys[n]); // channel is disconnected, so
0189: // don't try to notify cluster
0190: }
0191: }
0192: }
0193:
0194: this .mbeanserver.unregisterMBean(this .jmxName);
0195:
0196: partition.unregisterRPCHandler(SERVICE_NAME, this );
0197: partition
0198: .unsubscribeFromStateTransferEvents(SERVICE_NAME, this );
0199: partition.unregisterMembershipListener(this );
0200: }
0201:
0202: public String listContent() throws Exception {
0203: // we merge all replicants services: local only or not
0204: //
0205: java.util.Collection services = this .getAllServices();
0206:
0207: StringBuffer result = new StringBuffer();
0208: java.util.Iterator catsIter = services.iterator();
0209:
0210: result.append("<pre>");
0211:
0212: while (catsIter.hasNext()) {
0213: String category = (String) catsIter.next();
0214: HashMap content = (HashMap) this .replicants.get(category);
0215: if (content == null)
0216: content = new HashMap();
0217: java.util.Iterator keysIter = content.keySet().iterator();
0218:
0219: result
0220: .append("-----------------------------------------------\n");
0221: result.append("Service : ").append(category).append("\n\n");
0222:
0223: Serializable local = lookupLocalReplicant(category);
0224: if (local == null)
0225: result
0226: .append("\t- Service is *not* available locally\n");
0227: else
0228: result
0229: .append("\t- Service *is* also available locally\n");
0230:
0231: while (keysIter.hasNext()) {
0232: String location = (String) keysIter.next();
0233: result.append("\t- ").append(location).append("\n");
0234: }
0235:
0236: result.append("\n");
0237:
0238: }
0239:
0240: result.append("</pre>");
0241:
0242: return result.toString();
0243: }
0244:
0245: public String listXmlContent() throws Exception {
0246: // we merge all replicants services: local only or not
0247: //
0248: java.util.Collection services = this .getAllServices();
0249: StringBuffer result = new StringBuffer();
0250:
0251: result.append("<ReplicantManager>\n");
0252:
0253: java.util.Iterator catsIter = services.iterator();
0254: while (catsIter.hasNext()) {
0255: String category = (String) catsIter.next();
0256: HashMap content = (HashMap) this .replicants.get(category);
0257: if (content == null)
0258: content = new HashMap();
0259: java.util.Iterator keysIter = content.keySet().iterator();
0260:
0261: result.append("\t<Service>\n");
0262: result.append("\t\t<ServiceName>").append(category).append(
0263: "</ServiceName>\n");
0264:
0265: Serializable local = lookupLocalReplicant(category);
0266: if (local != null) {
0267: result.append("\t\t<Location>\n");
0268: result.append("\t\t\t<Name local=\"True\">").append(
0269: this .nodeName).append("</Name>\n");
0270: result.append("\t\t</Location>\n");
0271: }
0272:
0273: while (keysIter.hasNext()) {
0274: String location = (String) keysIter.next();
0275: result.append("\t\t<Location>\n");
0276: result.append("\t\t\t<Name local=\"False\">").append(
0277: location).append("</Name>\n");
0278: result.append("\t\t</Location>\n");
0279: }
0280:
0281: result.append("\t<Service>\n");
0282:
0283: }
0284:
0285: result.append("<ReplicantManager>\n");
0286:
0287: return result.toString();
0288: }
0289:
0290: // HAPartition.HAPartitionStateTransfer implementation ----------------------------------------------
0291:
0292: public Serializable getCurrentState() {
0293: java.util.Collection services = this .getAllServices();
0294: HashMap result = new HashMap();
0295:
0296: java.util.Iterator catsIter = services.iterator();
0297: while (catsIter.hasNext()) {
0298: String category = (String) catsIter.next();
0299: HashMap content = (HashMap) this .replicants.get(category);
0300: if (content == null)
0301: content = new HashMap();
0302: else
0303: content = (HashMap) content.clone();
0304:
0305: Serializable local = lookupLocalReplicant(category);
0306: if (local != null)
0307: content.put(this .nodeName, local);
0308:
0309: result.put(category, content);
0310: }
0311:
0312: // we add the intraviewid cache to the global result
0313: //
0314: Object[] globalResult = new Object[] { result, intraviewIdCache };
0315: return globalResult;
0316: }
0317:
0318: public void setCurrentState(Serializable newState) {
0319: Object[] globalState = (Object[]) newState;
0320:
0321: HashMap map = (HashMap) globalState[0];
0322: this .replicants.putAll(map);
0323: this .intraviewIdCache = (HashMap) globalState[1];
0324:
0325: if (trace) {
0326: log
0327: .trace(nodeName
0328: + ": received new state, will republish local replicants");
0329: }
0330: MembersPublisher publisher = new MembersPublisher();
0331: publisher.start();
0332: }
0333:
0334: public Collection getAllServices() {
0335: HashSet services = new HashSet();
0336: services.addAll(localReplicants.keySet());
0337: services.addAll(replicants.keySet());
0338: return services;
0339: }
0340:
0341: // HAPartition.HAMembershipListener implementation ----------------------------------------------
0342:
0343: public void membershipChangedDuringMerge(Vector deadMembers,
0344: Vector newMembers, Vector allMembers,
0345: Vector originatingGroups) {
0346: // Here we only care about deadMembers. Purge all replicant lists of deadMembers
0347: // and then notify all listening nodes.
0348: //
0349: log.info("Merging partitions...");
0350: log.info("Dead members: " + deadMembers.size());
0351: log.info("Originating groups: " + originatingGroups);
0352: purgeDeadMembers(deadMembers);
0353: if (newMembers.size() > 0) {
0354: new MergeMembers().start();
0355: }
0356: }
0357:
0358: public void membershipChanged(Vector deadMembers,
0359: Vector newMembers, Vector allMembers) {
0360: // Here we only care about deadMembers. Purge all replicant lists of deadMembers
0361: // and then notify all listening nodes.
0362: //
0363: log.info("I am (" + nodeName
0364: + ") received membershipChanged event:");
0365: log.info("Dead members: " + deadMembers.size() + " ("
0366: + deadMembers + ")");
0367: log.info("New Members : " + newMembers.size() + " ("
0368: + newMembers + ")");
0369: log.info("All Members : " + allMembers.size() + " ("
0370: + allMembers + ")");
0371: purgeDeadMembers(deadMembers);
0372:
0373: // we don't need to merge members anymore
0374: }
0375:
0376: // AsynchEventHandler.AsynchEventProcessor implementation -----------------
0377:
0378: public void processEvent(Object event) {
0379: KeyChangeEvent kce = (KeyChangeEvent) event;
0380: notifyKeyListeners(kce.key, kce.replicants);
0381: }
0382:
0383: static class KeyChangeEvent {
0384: String key;
0385: List replicants;
0386: }
0387:
0388: // DistributedReplicantManager implementation ----------------------------------------------
0389:
0390: public void add(String key, Serializable replicant)
0391: throws Exception {
0392: if (trace)
0393: log.trace("add, key=" + key + ", value=" + replicant);
0394: partitionNameKnown.acquire(); // we don't propagate until our name is known
0395:
0396: Object[] args = { key, this .nodeName, replicant };
0397: partition.callMethodOnCluster(SERVICE_NAME, "_add", args,
0398: add_types, true);
0399: synchronized (localReplicants) {
0400: localReplicants.put(key, replicant);
0401: notifyKeyListeners(key, lookupReplicants(key));
0402: }
0403: }
0404:
0405: public void remove(String key) throws Exception {
0406: partitionNameKnown.acquire(); // we don't propagate until our name is known
0407:
0408: // optimisation: we don't make a costly network call
0409: // if there is nothing to remove
0410: if (localReplicants.containsKey(key)) {
0411: Object[] args = { key, this .nodeName };
0412: partition.callAsynchMethodOnCluster(SERVICE_NAME,
0413: "_remove", args, remove_types, true);
0414: removeLocal(key);
0415: }
0416: }
0417:
0418: protected void removeLocal(String key) {
0419: synchronized (localReplicants) {
0420: localReplicants.remove(key);
0421: List result = lookupReplicants(key);
0422: if (result == null)
0423: result = new ArrayList(); // don't pass null but an empty list
0424: notifyKeyListeners(key, result);
0425: }
0426: }
0427:
0428: public Serializable lookupLocalReplicant(String key) {
0429: return (Serializable) localReplicants.get(key);
0430: }
0431:
0432: public List lookupReplicants(String key) {
0433: Serializable local = lookupLocalReplicant(key);
0434: HashMap replicant = (HashMap) replicants.get(key);
0435: if (replicant == null && local == null)
0436: return null;
0437:
0438: ArrayList rtn = new ArrayList();
0439:
0440: if (replicant == null) {
0441: if (local != null)
0442: rtn.add(local);
0443: } else {
0444: // JBAS-2677. Put the replicants in view order.
0445: ClusterNode[] nodes = partition.getClusterNodes();
0446: String replNode;
0447: Object replVal;
0448: for (int i = 0; i < nodes.length; i++) {
0449: replNode = nodes[i].getName();
0450: if (local != null && nodeName.equals(replNode)) {
0451: rtn.add(local);
0452: continue;
0453: }
0454:
0455: replVal = replicant.get(replNode);
0456: if (replVal != null)
0457: rtn.add(replVal);
0458: }
0459: }
0460:
0461: return rtn;
0462: }
0463:
0464: public List lookupReplicantsNodeNames(String key) {
0465: boolean locallyReplicated = localReplicants.containsKey(key);
0466: HashMap replicant = (HashMap) replicants.get(key);
0467: if (replicant == null && !locallyReplicated)
0468: return null;
0469:
0470: ArrayList rtn = new ArrayList();
0471:
0472: if (replicant == null) {
0473: if (locallyReplicated)
0474: rtn.add(this .nodeName);
0475: } else {
0476: // JBAS-2677. Put the replicants in view order.
0477: Set keys = replicant.keySet();
0478: ClusterNode[] nodes = partition.getClusterNodes();
0479: String keyOwner;
0480: for (int i = 0; i < nodes.length; i++) {
0481: keyOwner = nodes[i].getName();
0482: if (locallyReplicated && nodeName.equals(keyOwner)) {
0483: rtn.add(this .nodeName);
0484: continue;
0485: }
0486:
0487: if (keys.contains(keyOwner))
0488: rtn.add(keyOwner);
0489: }
0490: }
0491:
0492: return rtn;
0493: }
0494:
0495: public void registerListener(String key,
0496: DistributedReplicantManager.ReplicantListener subscriber) {
0497: synchronized (keyListeners) {
0498: ArrayList listeners = (ArrayList) keyListeners.get(key);
0499: if (listeners == null) {
0500: listeners = new ArrayList();
0501: keyListeners.put(key, listeners);
0502: }
0503: listeners.add(subscriber);
0504: }
0505: }
0506:
0507: public void unregisterListener(String key,
0508: DistributedReplicantManager.ReplicantListener subscriber) {
0509: synchronized (keyListeners) {
0510: ArrayList listeners = (ArrayList) keyListeners.get(key);
0511: if (listeners == null)
0512: return;
0513:
0514: listeners.remove(subscriber);
0515: if (listeners.size() == 0)
0516: keyListeners.remove(key);
0517:
0518: }
0519: }
0520:
0521: public int getReplicantsViewId(String key) {
0522: Integer result = (Integer) this .intraviewIdCache.get(key);
0523:
0524: if (result == null)
0525: return 0;
0526: else
0527: return result.intValue();
0528: }
0529:
0530: public boolean isMasterReplica(String key) {
0531: if (trace)
0532: log.trace("isMasterReplica, key=" + key);
0533: // if I am not a replicat, I cannot be the master...
0534: //
0535: if (!localReplicants.containsKey(key)) {
0536: if (trace)
0537: log.trace("no localReplicants, key=" + key
0538: + ", isMasterReplica=false");
0539: return false;
0540: }
0541:
0542: Vector allNodes = this .partition.getCurrentView();
0543: HashMap repForKey = (HashMap) replicants.get(key);
0544: if (repForKey == null) {
0545: if (trace)
0546: log.trace("no replicants, key=" + key
0547: + ", isMasterReplica=true");
0548: return true;
0549: }
0550: Vector replicaNodes = new Vector((repForKey).keySet());
0551: boolean isMasterReplica = false;
0552: for (int i = 0; i < allNodes.size(); i++) {
0553: String aMember = (String) allNodes.elementAt(i);
0554: if (trace)
0555: log.trace("Testing member: " + aMember);
0556: if (replicaNodes.contains(aMember)) {
0557: if (trace)
0558: log
0559: .trace("Member found in replicaNodes, isMasterReplica=false");
0560: break;
0561: } else if (aMember.equals(this .nodeName)) {
0562: if (trace)
0563: log
0564: .trace("Member == nodeName, isMasterReplica=true");
0565: isMasterReplica = true;
0566: break;
0567: }
0568: }
0569: return isMasterReplica;
0570: }
0571:
0572: // DistributedReplicantManager cluster callbacks ----------------------------------------------
0573:
0574: /**
0575: * Cluster callback called when a new replicant is added on another node
0576: * @param key Replicant key
0577: * @param nodeName Node that add the current replicant
0578: * @param replicant Serialized representation of the replicant
0579: */
0580: public void _add(String key, String nodeName, Serializable replicant) {
0581: if (trace)
0582: log.trace("_add(" + key + ", " + nodeName);
0583:
0584: try {
0585: addReplicant(key, nodeName, replicant);
0586: // Notify listeners asynchronously
0587: KeyChangeEvent kce = new KeyChangeEvent();
0588: kce.key = key;
0589: kce.replicants = lookupReplicants(key);
0590: asynchHandler.queueEvent(kce);
0591: } catch (Exception ex) {
0592: log.error("_add failed", ex);
0593: }
0594: }
0595:
0596: /**
0597: * Cluster callback called when a replicant is removed by another node
0598: * @param key Name of the replicant key
0599: * @param nodeName Node that wants to remove its replicant for the give key
0600: */
0601: public void _remove(String key, String nodeName) {
0602: try {
0603: if (removeReplicant(key, nodeName)) {
0604: // Notify listeners asynchronously
0605: KeyChangeEvent kce = new KeyChangeEvent();
0606: kce.key = key;
0607: kce.replicants = lookupReplicants(key);
0608: asynchHandler.queueEvent(kce);
0609: }
0610: } catch (Exception ex) {
0611: log.error("_remove failed", ex);
0612: }
0613: }
0614:
0615: protected boolean removeReplicant(String key, String nodeName)
0616: throws Exception {
0617: synchronized (replicants) {
0618: HashMap replicant = (HashMap) replicants.get(key);
0619: if (replicant == null)
0620: return false;
0621: Object removed = replicant.remove(nodeName);
0622: if (removed != null) {
0623: Collection values = replicant.values();
0624: if (values.size() == 0) {
0625: replicants.remove(key);
0626: }
0627: return true;
0628: }
0629: }
0630: return false;
0631: }
0632:
0633: /**
0634: * Cluster callback called when a node wants to know our complete list of local replicants
0635: * @throws Exception Thrown if a cluster communication exception occurs
0636: * @return A java array of size 2 containing the name of our node in this cluster and the serialized representation of our state
0637: */
0638: public Object[] lookupLocalReplicants() throws Exception {
0639: partitionNameKnown.acquire(); // we don't answer until our name is known
0640:
0641: Object[] rtn = { this .nodeName, localReplicants };
0642: if (trace)
0643: log.trace("lookupLocalReplicants called (" + rtn[0]
0644: + "). Return: " + localReplicants.size());
0645: return rtn;
0646: }
0647:
0648: // Package protected ---------------------------------------------
0649:
0650: // Protected -----------------------------------------------------
0651:
0652: protected int calculateReplicantsHash(List members) {
0653: int result = 0;
0654: Object obj = null;
0655:
0656: for (int i = 0; i < members.size(); i++) {
0657: obj = members.get(i);
0658: if (obj != null)
0659: result += obj.hashCode(); // no explicit overflow with int addition
0660: }
0661:
0662: return result;
0663: }
0664:
0665: protected int updateReplicantsHashId(String key) {
0666: // we first get a list of all nodes names that replicate this key
0667: //
0668: List nodes = this .lookupReplicantsNodeNames(key);
0669: int result = 0;
0670:
0671: if ((nodes == null) || (nodes.size() == 0)) {
0672: // no nore replicants for this key: we uncache our view id
0673: //
0674: this .intraviewIdCache.remove(key);
0675: } else {
0676: result = this .calculateReplicantsHash(nodes);
0677: this .intraviewIdCache.put(key, new Integer(result));
0678: }
0679:
0680: return result;
0681:
0682: }
0683:
0684: ///////////////
0685: // DistributedReplicantManager API
0686: ///////////////
0687:
0688: /**
0689: * Add a replicant to the replicants map.
0690: * @param key replicant key name
0691: * @param nodeName name of the node that adds this replicant
0692: * @param replicant Serialized representation of the replica
0693: */
0694: protected void addReplicant(String key, String nodeName,
0695: Serializable replicant) {
0696: addReplicant(replicants, key, nodeName, replicant);
0697: }
0698:
0699: /**
0700: * Logic for adding replicant to any map.
0701: * @param map structure in which adding the new replicant
0702: * @param key name of the replicant key
0703: * @param nodeName name of the node adding the replicant
0704: * @param replicant serialized representation of the replicant that is added
0705: */
0706: protected void addReplicant(Map map, String key, String nodeName,
0707: Serializable replicant) {
0708: synchronized (map) {
0709: HashMap rep = (HashMap) map.get(key);
0710: if (rep == null) {
0711: if (trace)
0712: log.trace("_adding new HashMap");
0713: rep = new HashMap();
0714: map.put(key, rep);
0715: }
0716: rep.put(nodeName, replicant);
0717: }
0718: }
0719:
0720: protected Vector getKeysReplicatedByNode(String nodeName) {
0721: Vector result = new Vector();
0722: synchronized (replicants) {
0723: Iterator keysIter = replicants.keySet().iterator();
0724: while (keysIter.hasNext()) {
0725: String key = (String) keysIter.next();
0726: HashMap values = (HashMap) replicants.get(key);
0727: if ((values != null) && values.containsKey(nodeName)) {
0728: result.add(key);
0729: }
0730: }
0731: }
0732: return result;
0733: }
0734:
0735: /**
0736: * Indicates if the a replicant already exists for a given key/node pair
0737: * @param key replicant key name
0738: * @param nodeName name of the node
0739: * @return a boolean indicating if a replicant for the given node exists for the given key
0740: */
0741: protected boolean replicantEntryAlreadyExists(String key,
0742: String nodeName) {
0743: return replicantEntryAlreadyExists(replicants, key, nodeName);
0744: }
0745:
0746: /**
0747: * Indicates if the a replicant already exists for a given key/node pair in the give data structure
0748: */
0749: protected boolean replicantEntryAlreadyExists(Map map, String key,
0750: String nodeName) {
0751: HashMap rep = (HashMap) map.get(key);
0752: if (rep == null)
0753: return false;
0754: else
0755: return rep.containsKey(nodeName);
0756: }
0757:
0758: /**
0759: * Notifies, through a callback, the listeners for a given replicant that the set of replicants has changed
0760: * @param key The replicant key name
0761: * @param newReplicants The new list of replicants
0762: *
0763: */
0764: protected void notifyKeyListeners(String key, List newReplicants) {
0765: if (trace)
0766: log.trace("notifyKeyListeners");
0767:
0768: // we first update the intra-view id for this particular key
0769: //
0770: int newId = updateReplicantsHashId(key);
0771:
0772: ArrayList listeners = (ArrayList) keyListeners.get(key);
0773: if (listeners == null) {
0774: if (trace)
0775: log.trace("listeners is null");
0776: return;
0777: }
0778:
0779: // ArrayList's iterator is not thread safe
0780: DistributedReplicantManager.ReplicantListener[] toNotify = null;
0781: synchronized (listeners) {
0782: toNotify = new DistributedReplicantManager.ReplicantListener[listeners
0783: .size()];
0784: toNotify = (DistributedReplicantManager.ReplicantListener[]) listeners
0785: .toArray(toNotify);
0786: }
0787:
0788: if (trace)
0789: log.trace("notifying " + toNotify.length
0790: + " listeners for key change: " + key);
0791: for (int i = 0; i < toNotify.length; i++) {
0792: if (toNotify[i] != null)
0793: toNotify[i]
0794: .replicantsChanged(key, newReplicants, newId);
0795: }
0796: }
0797:
0798: protected void republishLocalReplicants() {
0799: try {
0800: if (trace)
0801: log.trace("Start Re-Publish local replicants in DRM");
0802:
0803: HashMap localReplicants;
0804: synchronized (this .localReplicants) {
0805: localReplicants = new HashMap(this .localReplicants);
0806: }
0807:
0808: Iterator entries = localReplicants.entrySet().iterator();
0809: while (entries.hasNext()) {
0810: Map.Entry entry = (Map.Entry) entries.next();
0811: String key = (String) entry.getKey();
0812: Object replicant = entry.getValue();
0813: if (replicant != null) {
0814: if (trace)
0815: log.trace("publishing, key=" + key + ", value="
0816: + replicant);
0817:
0818: Object[] args = { key, this .nodeName, replicant };
0819:
0820: partition.callAsynchMethodOnCluster(SERVICE_NAME,
0821: "_add", args, add_types, true);
0822: notifyKeyListeners(key, lookupReplicants(key));
0823: }
0824: }
0825: if (trace)
0826: log.trace("End Re-Publish local replicants");
0827: } catch (Exception e) {
0828: log.error("Re-Publish failed", e);
0829: }
0830: }
0831:
0832: ////////////////////
0833: // Group membership API
0834: ////////////////////
0835:
0836: protected void mergeMembers() {
0837: boolean isAlreadyMerging = ClusterMergeStatus
0838: .isMergeInProcess();
0839: try {
0840: ClusterMergeStatus.startMergeProcess();
0841:
0842: log.debug("Start merging members in DRM service...");
0843: java.util.HashSet notifies = new java.util.HashSet();
0844: ArrayList rsp = partition.callMethodOnCluster(SERVICE_NAME,
0845: "lookupLocalReplicants", new Object[] {},
0846: new Class[] {}, true);
0847: if (rsp.size() == 0)
0848: log
0849: .debug("No responses from other nodes during the DRM merge process.");
0850: else {
0851: log.debug("The DRM merge process has received "
0852: + rsp.size() + " answers");
0853: }
0854: for (int i = 0; i < rsp.size(); i++) {
0855: Object o = rsp.get(i);
0856: if (o == null) {
0857: log
0858: .warn("As part of the answers received during the DRM merge process, a NULL message was received!");
0859: continue;
0860: } else if (o instanceof Throwable) {
0861: log
0862: .warn(
0863: "As part of the answers received during the DRM merge process, a Throwable was received!",
0864: (Throwable) o);
0865: continue;
0866: }
0867:
0868: Object[] objs = (Object[]) o;
0869: String node = (String) objs[0];
0870: Map replicants = (Map) objs[1];
0871: Iterator keys = replicants.keySet().iterator();
0872:
0873: //FIXME: We don't remove keys in the merge process but only add new keys!
0874: while (keys.hasNext()) {
0875: String key = (String) keys.next();
0876: // done to reduce duplicate notifications
0877: if (!replicantEntryAlreadyExists(key, node)) {
0878: addReplicant(key, node,
0879: (Serializable) replicants.get(key));
0880: notifies.add(key);
0881: }
0882: }
0883:
0884: Vector currentStatus = getKeysReplicatedByNode(node);
0885: if (currentStatus.size() > replicants.size()) {
0886: // The merge process needs to remove some (now)
0887: // unexisting keys
0888: //
0889: for (int currentKeysId = 0, currentKeysMax = currentStatus
0890: .size(); currentKeysId < currentKeysMax; currentKeysId++) {
0891: String theKey = (String) currentStatus
0892: .elementAt(currentKeysId);
0893: if (!replicants.containsKey(theKey)) {
0894: removeReplicant(theKey, node);
0895: notifies.add(theKey);
0896: }
0897: }
0898: }
0899: }
0900:
0901: Iterator notifIter = notifies.iterator();
0902: while (notifIter.hasNext()) {
0903: String key = (String) notifIter.next();
0904: notifyKeyListeners(key, lookupReplicants(key));
0905: }
0906: log.debug("..Finished merging members in DRM service");
0907:
0908: } catch (Exception ex) {
0909: log.error("merge failed", ex);
0910: } finally {
0911: if (!isAlreadyMerging)
0912: ClusterMergeStatus.endMergeProcess();
0913: }
0914: }
0915:
0916: /**
0917: * get rid of dead members from replicant list
0918: * return true if anything was purged.
0919: */
0920: protected void purgeDeadMembers(Vector deadMembers) {
0921: if (deadMembers.size() <= 0)
0922: return;
0923:
0924: log.debug("purgeDeadMembers, " + deadMembers);
0925: try {
0926: synchronized (replicants) {
0927: Iterator keys = replicants.keySet().iterator();
0928: while (keys.hasNext()) {
0929: String key = (String) keys.next();
0930: HashMap replicant = (HashMap) replicants.get(key);
0931: boolean modified = false;
0932: for (int i = 0; i < deadMembers.size(); i++) {
0933: String node = deadMembers.elementAt(i)
0934: .toString();
0935: log.debug("trying to remove deadMember " + node
0936: + " for key " + key);
0937: Object removed = replicant.remove(node);
0938: if (removed != null) {
0939: log.debug(node + " was removed");
0940: modified = true;
0941: } else {
0942: log.debug(node + " was NOT removed!!!");
0943: }
0944: }
0945: if (modified) {
0946: notifyKeyListeners(key, lookupReplicants(key));
0947: }
0948: }
0949: }
0950: } catch (Exception ex) {
0951: log.error("purgeDeadMembers failed", ex);
0952: }
0953: }
0954:
0955: /**
0956: */
0957: protected void cleanupKeyListeners() {
0958: // NOT IMPLEMENTED YET
0959: }
0960:
0961: protected synchronized static int nextThreadID() {
0962: return threadID++;
0963: }
0964:
0965: // Private -------------------------------------------------------
0966:
0967: // Inner classes -------------------------------------------------
0968:
0969: protected class MergeMembers extends Thread {
0970: public MergeMembers() {
0971: super ("DRM Async Merger#" + nextThreadID());
0972: }
0973:
0974: /**
0975: * Called when the service needs to merge with another partition. This
0976: * process is performed asynchronously
0977: */
0978: public void run() {
0979: log.debug("Sleeping for 50ms before mergeMembers");
0980: try {
0981: // if this thread invokes a cluster method call before
0982: // membershipChanged event completes, it could timeout/hang
0983: // we need to discuss this with Bela.
0984: Thread.sleep(50);
0985: } catch (Exception ignored) {
0986: }
0987: mergeMembers();
0988: }
0989: }
0990:
0991: protected class MembersPublisher extends Thread {
0992: public MembersPublisher() {
0993: super ("DRM Async Publisher#" + nextThreadID());
0994: }
0995:
0996: /**
0997: * Called when service needs to re-publish its local replicants to other
0998: * cluster members after this node has joined the cluster.
0999: */
1000: public void run() {
1001: log
1002: .debug("DRM: Sleeping before re-publishing for 50ms just in case");
1003: try {
1004: // if this thread invokes a cluster method call before
1005: // membershipChanged event completes, it could timeout/hang
1006: // we need to discuss this with Bela.
1007: Thread.sleep(50);
1008: } catch (Exception ignored) {
1009: }
1010: republishLocalReplicants();
1011: }
1012: }
1013: }
|