0001: /*
0002: * JBoss, Home of Professional Open Source.
0003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
0004: * as indicated by the @author tags. See the copyright.txt file in the
0005: * distribution for a full listing of individual contributors.
0006: *
0007: * This is free software; you can redistribute it and/or modify it
0008: * under the terms of the GNU Lesser General Public License as
0009: * published by the Free Software Foundation; either version 2.1 of
0010: * the License, or (at your option) any later version.
0011: *
0012: * This software is distributed in the hope that it will be useful,
0013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
0015: * Lesser General Public License for more details.
0016: *
0017: * You should have received a copy of the GNU Lesser General Public
0018: * License along with this software; if not, write to the Free
0019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
0020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
0021: */
0022: package org.jboss.ha.framework.server;
0024: import java.io.ByteArrayInputStream;
0025: import java.io.ByteArrayOutputStream;
0026: import java.io.Serializable;
0027: import java.text.SimpleDateFormat;
0028: import java.util.ArrayList;
0029: import java.util.Date;
0030: import java.util.HashMap;
0031: import java.util.Iterator;
0032: import java.util.Vector;
0034: import javax.naming.Context;
0035: import javax.naming.InitialContext;
0036: import javax.naming.Name;
0037: import javax.naming.NameNotFoundException;
0038: import javax.naming.Reference;
0039: import javax.naming.StringRefAddr;
0040: import javax.management.MBeanServer;
0042: import org.jgroups.JChannel;
0043: import org.jgroups.MergeView;
0044: import org.jgroups.View;
0045: import org.jgroups.Message;
0046: import org.jgroups.blocks.GroupRequest;
0047: import org.jgroups.blocks.MethodCall;
0048: import org.jgroups.stack.IpAddress;
0049: import org.jgroups.util.Rsp;
0050: import org.jgroups.util.RspList;
0052: import org.jboss.invocation.MarshalledValueInputStream;
0053: import org.jboss.invocation.MarshalledValueOutputStream;
0054: import org.jboss.ha.framework.interfaces.DistributedReplicantManager;
0055: import org.jboss.ha.framework.interfaces.DistributedState;
0056: import org.jboss.ha.framework.interfaces.HAPartition;
0057: import org.jboss.ha.framework.interfaces.ClusterNode;
0059: import org.jboss.naming.NonSerializableFactory;
0060: import org.jboss.logging.Logger;
0062: /**
0063: * This class is an abstraction class for a JGroups RPCDispatch and JChannel.
0064: * It is a default implementation of HAPartition for the
0065: * <a href="http://www.jgroups.com/">JGroups</A> framework
0066: *
0067: * @author <a href="mailto:sacha.labourey@cogito-info.ch">Sacha Labourey</a>.
0068: * @author <a href="mailto:bill@burkecentral.com">Bill Burke</a>.
0069: * @author Scott.Stark@jboss.org
0070: * @version $Revision: 62255 $
0071: */
0072: public class HAPartitionImpl extends org.jgroups.blocks.RpcDispatcher
0073: implements org.jgroups.MessageListener,
0074: org.jgroups.MembershipListener, HAPartition,
0075: AsynchEventHandler.AsynchEventProcessor {
0076: private static class NoHandlerForRPC implements Serializable {
0077: static final long serialVersionUID = -1263095408483622838L;
0078: }
0080: // Constants -----------------------------------------------------
0082: // final MethodLookup method_lookup_clos = new MethodLookupClos();
0084: // Attributes ----------------------------------------------------
0086: protected HashMap rpcHandlers = new HashMap();
0087: protected HashMap stateHandlers = new HashMap();
0088: /** Do we send any membership change notifications synchronously? */
0089: protected boolean allowSyncListeners = false;
0090: /** The synch HAMembershipListener and HAMembershipExtendedListeners */
0091: protected ArrayList synchListeners = new ArrayList();
0092: /** The asynch HAMembershipListener and HAMembershipExtendedListeners */
0093: protected ArrayList asynchListeners = new ArrayList();
0094: /** The handler used to send membership change notifications asynchronously */
0095: protected AsynchEventHandler asynchHandler;
0096: /** The current cluster partition members */
0097: protected Vector members = null;
0098: protected Vector jgmembers = null;
0100: public Vector history = null;
0102: /** The partition members other than this node */
0103: protected Vector otherMembers = null;
0104: protected Vector jgotherMembers = null;
0105: /** The JChannel name */
0106: protected String partitionName;
0107: /** the local JG IP Address */
0108: protected org.jgroups.stack.IpAddress localJGAddress = null;
0109: /** The cluster transport protocol address string */
0110: protected String nodeName;
0111: /** me as a ClusterNode */
0112: protected ClusterNode me = null;
0113: /** The timeout for cluster RPC calls */
0114: protected long timeout = 60000;
0115: /** The JGroups partition channel */
0116: protected JChannel channel;
0117: /** The cluster replicant manager */
0118: protected DistributedReplicantManagerImpl replicantManager;
0119: /** The cluster state manager */
0120: protected DistributedStateImpl dsManager;
0121: /** The cluster instance log category */
0122: protected Logger log;
0123: protected Logger clusterLifeCycleLog;
0124: /** The current cluster view id */
0125: protected long currentViewId = -1;
0126: /** The JMX MBeanServer to use for registrations */
0127: protected MBeanServer server;
0128: /** Number of ms to wait for state */
0129: protected long state_transfer_timeout = 60000;
0130: /** Whether to bind the partition into JNDI */
0131: protected boolean bindIntoJndi = true;
0133: /**
0134: * True if state was initialized during start-up.
0135: */
0136: protected boolean isStateSet = false;
0138: /**
0139: * An exception occuring upon fetch state.
0140: */
0141: protected Exception setStateException;
0142: private final Object stateLock = new Object();
0144: // Static --------------------------------------------------------
0146: /**
0147: * Creates an object from a byte buffer
0148: */
0149: public static Object objectFromByteBuffer(byte[] buffer)
0150: throws Exception {
0151: if (buffer == null)
0152: return null;
0154: ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
0155: MarshalledValueInputStream mvis = new MarshalledValueInputStream(
0156: bais);
0157: return mvis.readObject();
0158: }
0160: /**
0161: * Serializes an object into a byte buffer.
0162: * The object has to implement interface Serializable or Externalizable
0163: */
0164: public static byte[] objectToByteBuffer(Object obj)
0165: throws Exception {
0166: ByteArrayOutputStream baos = new ByteArrayOutputStream();
0167: MarshalledValueOutputStream mvos = new MarshalledValueOutputStream(
0168: baos);
0169: mvos.writeObject(obj);
0170: mvos.flush();
0171: return baos.toByteArray();
0172: }
0174: public long getStateTransferTimeout() {
0175: return state_transfer_timeout;
0176: }
0178: public void setStateTransferTimeout(long state_transfer_timeout) {
0179: this .state_transfer_timeout = state_transfer_timeout;
0180: }
0182: public long getMethodCallTimeout() {
0183: return timeout;
0184: }
0186: public void setMethodCallTimeout(long timeout) {
0187: this .timeout = timeout;
0188: }
0190: // Constructors --------------------------------------------------
0192: public HAPartitionImpl(String partitionName,
0193: org.jgroups.JChannel channel, boolean deadlock_detection,
0194: MBeanServer server) throws Exception {
0195: this (partitionName, channel, deadlock_detection);
0196: this .server = server;
0197: }
0199: public HAPartitionImpl(String partitionName,
0200: org.jgroups.JChannel channel, boolean deadlock_detection)
0201: throws Exception {
0202: super (channel, null, null, new Object(), deadlock_detection); // init RpcDispatcher with a fake target object
0203: this .log = Logger.getLogger(HAPartition.class.getName() + "."
0204: + partitionName);
0205: this .clusterLifeCycleLog = Logger.getLogger(HAPartition.class
0206: .getName()
0207: + ".lifecycle." + partitionName);
0208: this .channel = channel;
0209: this .partitionName = partitionName;
0210: this .history = new Vector();
0211: this .setMarshaller(new MarshallerImpl());
0212: logHistory("Partition object created");
0213: }
0215: // Public --------------------------------------------------------
0217: public void init() throws Exception {
0218: log.info("Initializing");
0219: logHistory("Initializing partition");
0221: // Subscribe to dHA events comming generated by the org.jgroups. protocol stack
0222: //
0223: log.debug("setMembershipListener");
0224: setMembershipListener(this );
0225: log.debug("setMessageListener");
0226: setMessageListener(this );
0228: // Create the DRM and link it to this HAPartition
0229: //
0230: log.debug("create replicant manager");
0231: this .replicantManager = new DistributedReplicantManagerImpl(
0232: this , this .server);
0233: log.debug("init replicant manager");
0234: this .replicantManager.init();
0235: log.debug("bind replicant manager");
0237: // Create the DS and link it to this HAPartition
0238: //
0239: log.debug("create distributed state");
0240: this .dsManager = new DistributedStateImpl(this , this .server);
0241: log.debug("init distributed state service");
0242: this .dsManager.init();
0243: log.debug("bind distributed state service");
0245: // Create the asynchronous handler for view changes
0246: asynchHandler = new AsynchEventHandler(this ,
0247: "AsynchViewChangeHandler");
0249: log.debug("done initing.");
0250: }
0252: public void startPartition() throws Exception {
0253: // get current JG group properties
0254: //
0255: logHistory("Starting partition");
0256: log.debug("get nodeName");
0257: this .localJGAddress = (IpAddress) channel.getLocalAddress();
0258: this .me = new ClusterNode(this .localJGAddress);
0259: this .nodeName = this .me.getName();
0261: log.debug("Get current members");
0262: View view = channel.getView();
0263: this .jgmembers = (Vector) view.getMembers().clone();
0264: this .members = translateAddresses(this .jgmembers); // TRANSLATE
0265: log.info("Number of cluster members: " + members.size());
0266: for (int m = 0; m > members.size(); m++) {
0267: Object node = members.get(m);
0268: log.debug(node);
0269: }
0270: // Keep a list of other members only for "exclude-self" RPC calls
0271: //
0272: this .jgotherMembers = (Vector) view.getMembers().clone();
0273: this .jgotherMembers.remove(channel.getLocalAddress());
0274: this .otherMembers = translateAddresses(this .jgotherMembers); // TRANSLATE
0275: log.info("Other members: " + this .otherMembers.size());
0277: verifyNodeIsUnique(view.getMembers());
0279: // Update the initial view id
0280: //
0281: this .currentViewId = view.getVid().getId();
0283: // We must now synchronize new state transfer subscriber
0284: //
0285: fetchState();
0287: // We start now able to start our DRM and DS
0288: //
0289: this .replicantManager.start();
0290: this .dsManager.start();
0292: // Start the asynch listener handler thread
0293: asynchHandler.start();
0295: // Bind ourself in the public JNDI space if configured to do so
0296: if (!bindIntoJndi)
0297: return;
0299: Context ctx = new InitialContext();
0300: this .bind("/HAPartition/" + partitionName, this ,
0301: HAPartitionImpl.class, ctx);
0302: }
0304: protected void fetchState() throws Exception {
0305: log.info("Fetching state (will wait for "
0306: + this .state_transfer_timeout + " milliseconds):");
0307: long start, stop;
0308: isStateSet = false;
0309: start = System.currentTimeMillis();
0310: boolean rc = channel
0311: .getState(null, this .state_transfer_timeout);
0312: if (rc) {
0313: synchronized (stateLock) {
0314: while (!isStateSet) {
0315: if (setStateException != null)
0316: throw setStateException;
0318: try {
0319: stateLock.wait();
0320: } catch (InterruptedException iex) {
0321: }
0322: }
0323: }
0324: stop = System.currentTimeMillis();
0325: log.info("state was retrieved successfully (in "
0326: + (stop - start) + " milliseconds)");
0327: } else {
0328: // No one provided us with state.
0329: // We need to find out if we are the coordinator, so we must
0330: // block until viewAccepted() is called at least once
0332: synchronized (members) {
0333: while (members.size() == 0) {
0334: log.debug("waiting on viewAccepted()");
0335: try {
0336: members.wait();
0337: } catch (InterruptedException iex) {
0338: }
0339: }
0340: }
0342: if (isCurrentNodeCoordinator()) {
0343: log
0344: .info("State could not be retrieved (we are the first member in group)");
0345: } else {
0346: throw new IllegalStateException(
0347: "Initial state transfer failed: "
0348: + "Channel.getState() returned false");
0349: }
0350: }
0351: }
0353: public void closePartition() throws Exception {
0354: logHistory("Closing partition");
0355: log.info("Closing partition " + partitionName);
0357: try {
0358: asynchHandler.stop();
0359: } catch (Exception e) {
0360: log.warn("Failed to stop asynchHandler", e);
0361: }
0363: // Stop the DRM and DS services
0364: //
0365: try {
0366: this .replicantManager.stop();
0367: } catch (Exception e) {
0368: log.error("operation failed", e);
0369: }
0371: try {
0372: this .dsManager.stop();
0373: } catch (Exception e) {
0374: log.error("operation failed", e);
0375: }
0377: // NR 200505 : [JBCLUSTER-38] replace channel.close() by a disconnect and
0378: // add the destroyPartition() step
0379: try {
0380: // channel.close();
0381: channel.disconnect();
0382: } catch (Exception e) {
0383: log.error("operation failed", e);
0384: }
0386: if (bindIntoJndi) {
0387: String boundName = "/HAPartition/" + partitionName;
0388: InitialContext ctx = new InitialContext();
0389: try {
0390: ctx.unbind(boundName);
0391: } finally {
0392: ctx.close();
0393: }
0394: NonSerializableFactory.unbind(boundName);
0395: }
0397: log.info("Partition " + partitionName + " closed.");
0398: }
0400: // NR 200505 : [JBCLUSTER-38] destroy partition close the channel
0401: public void destroyPartition() throws Exception {
0403: try {
0404: this .replicantManager.destroy();
0405: } catch (Exception e) {
0406: log.error("operation failed", e);
0407: }
0409: try {
0410: this .dsManager.destroy();
0411: } catch (Exception e) {
0412: log.error("operation failed", e);
0413: }
0414: try {
0415: channel.close();
0416: } catch (Exception e) {
0417: log.error("operation failed", e);
0418: }
0420: log.info("Partition " + partitionName + " destroyed.");
0421: }
0423: // org.jgroups.MessageListener implementation ----------------------------------------------
0425: // MessageListener methods
0426: //
0427: public byte[] getState() {
0428: logHistory("getState called on partition");
0429: boolean debug = log.isDebugEnabled();
0431: log.debug("getState called.");
0432: try {
0433: // we now get the sub-state of each HAPartitionStateTransfer subscribers and
0434: // build a "macro" state
0435: //
0436: HashMap state = new HashMap();
0437: Iterator keys = stateHandlers.keySet().iterator();
0438: while (keys.hasNext()) {
0439: String key = (String) keys.next();
0440: HAPartition.HAPartitionStateTransfer subscriber = (HAPartition.HAPartitionStateTransfer) stateHandlers
0441: .get(key);
0442: if (debug)
0443: log.debug("getState for " + key);
0444: state.put(key, subscriber.getCurrentState());
0445: }
0446: return objectToByteBuffer(state);
0447: } catch (Exception ex) {
0448: log.error("getState failed", ex);
0449: }
0450: return null;
0451: }
0453: public void setState(byte[] obj) {
0454: logHistory("setState called on partition");
0455: try {
0456: log.debug("setState called");
0457: if (obj == null) {
0458: log.debug("state is null");
0459: return;
0460: }
0462: long used_mem_before, used_mem_after;
0463: int state_size = obj != null ? obj.length : 0;
0464: Runtime rt = Runtime.getRuntime();
0465: used_mem_before = rt.totalMemory() - rt.freeMemory();
0467: HashMap state = (HashMap) objectFromByteBuffer(obj);
0468: java.util.Iterator keys = state.keySet().iterator();
0469: while (keys.hasNext()) {
0470: String key = (String) keys.next();
0471: log.debug("setState for " + key);
0472: Object someState = state.get(key);
0473: HAPartition.HAPartitionStateTransfer subscriber = (HAPartition.HAPartitionStateTransfer) stateHandlers
0474: .get(key);
0475: if (subscriber != null) {
0476: try {
0477: subscriber
0478: .setCurrentState((java.io.Serializable) someState);
0479: } catch (Exception e) {
0480: // Don't let issues with one subscriber affect others
0481: // unless it is DRM or DS, which are really internal
0482: // functions of the HAPartition
0483: if (DistributedReplicantManagerImpl.SERVICE_NAME
0484: .equals(key)
0485: || DistributedStateImpl.SERVICE_NAME
0486: .equals(key)) {
0487: if (e instanceof RuntimeException)
0488: throw (RuntimeException) e;
0489: else
0490: throw new RuntimeException(e);
0491: } else {
0492: log.error(
0493: "Caught exception setting state to "
0494: + subscriber, e);
0495: }
0496: }
0497: } else {
0498: log.debug("There is no stateHandler for: " + key);
0499: }
0500: }
0502: used_mem_after = rt.totalMemory() - rt.freeMemory();
0503: log.debug("received a state of " + state_size
0504: + " bytes; expanded memory by "
0505: + (used_mem_after - used_mem_before)
0506: + " bytes (used memory before: " + used_mem_before
0507: + ", used memory after: " + used_mem_after + ")");
0509: isStateSet = true;
0510: } catch (Throwable t) {
0511: log.error("failed setting state", t);
0512: if (t instanceof Exception)
0513: setStateException = (Exception) t;
0514: else
0515: setStateException = new Exception(t);
0516: } finally {
0517: synchronized (stateLock) {
0518: // Notify wait that state has been set.
0519: stateLock.notifyAll();
0520: }
0521: }
0522: }
0524: public void receive(org.jgroups.Message msg) { /* complete */
0525: }
0527: // org.jgroups.MembershipListener implementation ----------------------------------------------
0529: public void suspect(org.jgroups.Address suspected_mbr) {
0530: logHistory("Node suspected: "
0531: + (suspected_mbr == null ? "null" : suspected_mbr
0532: .toString()));
0533: if (isCurrentNodeCoordinator())
0534: clusterLifeCycleLog.info("Suspected member: "
0535: + suspected_mbr);
0536: else
0537: log.info("Suspected member: " + suspected_mbr);
0538: }
0540: public void block() {
0541: }
0543: /** Notification of a cluster view change. This is done from the JG protocol
0544: * handlder thread and we must be careful to not unduly block this thread.
0545: * Because of this there are two types of listeners, synchronous and
0546: * asynchronous. The synchronous listeners are messaged with the view change
0547: * event using the calling thread while the asynchronous listeners are
0548: * messaged using a seperate thread.
0549: *
0550: * @param newView
0551: */
0552: public void viewAccepted(View newView) {
0553: try {
0554: // we update the view id
0555: //
0556: this .currentViewId = newView.getVid().getId();
0558: // Keep a list of other members only for "exclude-self" RPC calls
0559: //
0560: this .jgotherMembers = (Vector) newView.getMembers().clone();
0561: this .jgotherMembers.remove(channel.getLocalAddress());
0562: this .otherMembers = translateAddresses(this .jgotherMembers); // TRANSLATE!
0563: Vector translatedNewView = translateAddresses((Vector) newView
0564: .getMembers().clone());
0565: logHistory("New view: " + translatedNewView
0566: + " with viewId: " + this .currentViewId
0567: + " (old view: " + this .members + " )");
0569: // Save the previous view and make a copy of the new view
0570: Vector oldMembers = this .members;
0572: Vector newjgMembers = (Vector) newView.getMembers().clone();
0573: Vector newMembers = translateAddresses(newjgMembers); // TRANSLATE
0574: if (this .members == null) {
0575: // Initial viewAccepted
0576: this .members = newMembers;
0577: this .jgmembers = newjgMembers;
0578: log.debug("ViewAccepted: initial members set");
0579: return;
0580: }
0581: this .members = newMembers;
0582: this .jgmembers = newjgMembers;
0584: int difference = 0;
0585: if (oldMembers == null)
0586: difference = newMembers.size() - 1;
0587: else
0588: difference = newMembers.size() - oldMembers.size();
0590: if (isCurrentNodeCoordinator())
0591: clusterLifeCycleLog
0592: .info("New cluster view for partition "
0593: + this .partitionName + " (id: "
0594: + this .currentViewId + ", delta: "
0595: + difference + ") : " + this .members);
0596: else
0597: log.info("New cluster view for partition "
0598: + this .partitionName + ": "
0599: + this .currentViewId + " (" + this .members
0600: + " delta: " + difference + ")");
0602: // Build a ViewChangeEvent for the asynch listeners
0603: ViewChangeEvent event = new ViewChangeEvent();
0604: event.viewId = currentViewId;
0605: event.allMembers = translatedNewView;
0606: event.deadMembers = getDeadMembers(oldMembers,
0607: event.allMembers);
0608: event.newMembers = getNewMembers(oldMembers,
0609: event.allMembers);
0610: event.originatingGroups = null;
0611: // if the new view occurs because of a merge, we first inform listeners of the merge
0612: if (newView instanceof MergeView) {
0613: MergeView mergeView = (MergeView) newView;
0614: event.originatingGroups = mergeView.getSubgroups();
0615: }
0617: log.debug("membership changed from "
0618: + (oldMembers == null ? 0 : oldMembers.size())
0619: + " to " + event.allMembers.size());
0620: // Put the view change to the asynch queue
0621: this .asynchHandler.queueEvent(event);
0623: // Broadcast the new view to the synchronous view change listeners
0624: if (this .allowSyncListeners) {
0625: this .notifyListeners(synchListeners, event.viewId,
0626: event.allMembers, event.deadMembers,
0627: event.newMembers, event.originatingGroups);
0628: }
0629: } catch (Exception ex) {
0630: log.error("ViewAccepted failed", ex);
0631: }
0632: }
0634: // HAPartition implementation ----------------------------------------------
0636: public String getNodeName() {
0637: return nodeName;
0638: }
0640: public String getPartitionName() {
0641: return partitionName;
0642: }
0644: public DistributedReplicantManager getDistributedReplicantManager() {
0645: return replicantManager;
0646: }
0648: public DistributedState getDistributedStateService() {
0649: return this .dsManager;
0650: }
0652: public long getCurrentViewId() {
0653: return this .currentViewId;
0654: }
0656: public Vector getCurrentView() {
0657: Vector result = new Vector(this .members.size());
0658: for (int i = 0; i < members.size(); i++) {
0659: result.add(((ClusterNode) members.elementAt(i)).getName());
0660: }
0661: return result;
0662: }
0664: public ClusterNode[] getClusterNodes() {
0665: ClusterNode[] nodes = new ClusterNode[this .members.size()];
0666: this .members.toArray(nodes);
0667: return nodes;
0668: }
0670: public ClusterNode getClusterNode() {
0671: return me;
0672: }
0674: public boolean isCurrentNodeCoordinator() {
0675: if (this .members == null || this .members.size() == 0
0676: || this .me == null)
0677: return false;
0678: return this .members.elementAt(0).equals(this .me);
0679: }
0681: // ***************************
0682: // ***************************
0683: // RPC multicast communication
0684: // ***************************
0685: // ***************************
0686: //
0687: public void registerRPCHandler(String objName, Object subscriber) {
0688: rpcHandlers.put(objName, subscriber);
0689: }
0691: public void unregisterRPCHandler(String objName, Object subscriber) {
0692: rpcHandlers.remove(objName);
0693: }
0695: /**
0696: *
0697: * @param objName
0698: * @param methodName
0699: * @param args
0700: * @param excludeSelf
0701: * @return
0702: * @throws Exception
0703: * @deprecated Use {@link #callMethodOnCluster(String,String,Object[],Class[], boolean)} instead
0704: */
0705: public ArrayList callMethodOnCluster(String objName,
0706: String methodName, Object[] args, boolean excludeSelf)
0707: throws Exception {
0708: return callMethodOnCluster(objName, methodName, args, null,
0709: excludeSelf);
0710: }
0712: /**
0713: * This function is an abstraction of RpcDispatcher.
0714: */
0715: public ArrayList callMethodOnCluster(String objName,
0716: String methodName, Object[] args, Class[] types,
0717: boolean excludeSelf) throws Exception {
0718: return callMethodOnCluster(objName, methodName, args, types,
0719: excludeSelf, this .timeout);
0720: }
0722: public ArrayList callMethodOnCluster(String objName,
0723: String methodName, Object[] args, Class[] types,
0724: boolean excludeSelf, long methodTimeout) throws Exception {
0725: ArrayList rtn = new ArrayList();
0726: MethodCall m = null;
0727: RspList rsp = null;
0728: boolean trace = log.isTraceEnabled();
0730: if (types != null)
0731: m = new MethodCall(objName + "." + methodName, args, types);
0732: else
0733: m = new MethodCall(objName + "." + methodName, args);
0735: if (excludeSelf) {
0736: if (trace) {
0737: log.trace("callMethodOnCluster(true), objName="
0738: + objName + ", methodName=" + methodName
0739: + ", members=" + jgotherMembers);
0740: }
0741: rsp = this .callRemoteMethods(this .jgotherMembers, m,
0742: GroupRequest.GET_ALL, methodTimeout);
0743: } else {
0744: if (trace) {
0745: log.trace("callMethodOnCluster(false), objName="
0746: + objName + ", methodName=" + methodName
0747: + ", members=" + members);
0748: }
0749: rsp = this .callRemoteMethods(null, m, GroupRequest.GET_ALL,
0750: methodTimeout);
0751: }
0753: if (rsp != null) {
0754: for (int i = 0; i < rsp.size(); i++) {
0755: Object item = rsp.elementAt(i);
0756: if (item instanceof Rsp) {
0757: Rsp response = (Rsp) item;
0758: // Only include received responses
0759: boolean wasReceived = response.wasReceived();
0760: if (wasReceived == true) {
0761: item = response.getValue();
0762: if (!(item instanceof NoHandlerForRPC))
0763: rtn.add(item);
0764: } else if (trace)
0765: log.trace("Ignoring non-received response: "
0766: + response);
0767: } else {
0768: if (!(item instanceof NoHandlerForRPC))
0769: rtn.add(item);
0770: else if (trace)
0771: log.trace("Ignoring NoHandlerForRPC");
0772: }
0773: }
0774: }
0776: return rtn;
0777: }
0779: /**
0780: * Calls method on Cluster coordinator node only. The cluster coordinator node is the first node to join the
0781: * cluster.
0782: * and is replaced
0783: * @param objName
0784: * @param methodName
0785: * @param args
0786: * @param types
0787: * @param excludeSelf
0788: * @return
0789: * @throws Exception
0790: */
0791: public ArrayList callMethodOnCoordinatorNode(String objName,
0792: String methodName, Object[] args, Class[] types,
0793: boolean excludeSelf) throws Exception {
0794: return callMethodOnCoordinatorNode(objName, methodName, args,
0795: types, excludeSelf, this .timeout);
0796: }
0798: /**
0799: * Calls method on Cluster coordinator node only. The cluster coordinator node is the first node to join the
0800: * cluster.
0801: * and is replaced
0802: * @param objName
0803: * @param methodName
0804: * @param args
0805: * @param types
0806: * @param excludeSelf
0807: * @param methodTimeout
0808: * @return
0809: * @throws Exception
0810: */
0811: public ArrayList callMethodOnCoordinatorNode(String objName,
0812: String methodName, Object[] args, Class[] types,
0813: boolean excludeSelf, long methodTimeout) throws Exception {
0814: ArrayList rtn = new ArrayList();
0815: MethodCall m = null;
0816: RspList rsp = null;
0817: boolean trace = log.isTraceEnabled();
0819: if (types != null)
0820: m = new MethodCall(objName + "." + methodName, args, types);
0821: else
0822: m = new MethodCall(objName + "." + methodName, args);
0824: if (trace) {
0825: log.trace("callMethodOnCoordinatorNode(false), objName="
0826: + objName + ", methodName=" + methodName);
0827: }
0829: // the first cluster view member is the coordinator
0830: Vector coordinatorOnly = new Vector();
0831: // If we are the coordinator, only call ourself if 'excludeSelf' is false
0832: if (false == isCurrentNodeCoordinator() || false == excludeSelf)
0833: coordinatorOnly.addElement(this .jgmembers.elementAt(0));
0835: rsp = this .callRemoteMethods(coordinatorOnly, m,
0836: GroupRequest.GET_ALL, methodTimeout);
0838: if (rsp != null) {
0839: for (int i = 0; i < rsp.size(); i++) {
0840: Object item = rsp.elementAt(i);
0841: if (item instanceof Rsp) {
0842: Rsp response = (Rsp) item;
0843: // Only include received responses
0844: boolean wasReceived = response.wasReceived();
0845: if (wasReceived == true) {
0846: item = response.getValue();
0847: if (!(item instanceof NoHandlerForRPC))
0848: rtn.add(item);
0849: } else if (trace)
0850: log.trace("Ignoring non-received response: "
0851: + response);
0852: } else {
0853: if (!(item instanceof NoHandlerForRPC))
0854: rtn.add(item);
0855: else if (trace)
0856: log.trace("Ignoring NoHandlerForRPC");
0857: }
0858: }
0859: }
0861: return rtn;
0862: }
0864: /**
0865: *
0866: * @param objName
0867: * @param methodName
0868: * @param args
0869: * @param excludeSelf
0870: * @throws Exception
0871: * @deprecated Use {@link #callAsynchMethodOnCluster(String, String, Object[], Class[], boolean)} instead
0872: */
0873: public void callAsynchMethodOnCluster(String objName,
0874: String methodName, Object[] args, boolean excludeSelf)
0875: throws Exception {
0876: callAsynchMethodOnCluster(objName, methodName, args, null,
0877: excludeSelf);
0878: }
0880: /**
0881: * This function is an abstraction of RpcDispatcher for asynchronous messages
0882: */
0883: public void callAsynchMethodOnCluster(String objName,
0884: String methodName, Object[] args, Class[] types,
0885: boolean excludeSelf) throws Exception {
0886: MethodCall m = null;
0887: boolean trace = log.isTraceEnabled();
0889: if (types != null)
0890: m = new MethodCall(objName + "." + methodName, args, types);
0891: else
0892: m = new MethodCall(objName + "." + methodName, args);
0894: if (excludeSelf) {
0895: if (trace) {
0896: log.trace("callAsynchMethodOnCluster(true), objName="
0897: + objName + ", methodName=" + methodName
0898: + ", members=" + jgotherMembers);
0899: }
0900: this .callRemoteMethods(this .jgotherMembers, m,
0901: GroupRequest.GET_NONE, timeout);
0902: } else {
0903: if (trace) {
0904: log.trace("callAsynchMethodOnCluster(false), objName="
0905: + objName + ", methodName=" + methodName
0906: + ", members=" + members);
0907: }
0908: this .callRemoteMethods(null, m, GroupRequest.GET_NONE,
0909: timeout);
0910: }
0911: }
0913: // *************************
0914: // *************************
0915: // State transfer management
0916: // *************************
0917: // *************************
0918: //
0919: public void subscribeToStateTransferEvents(String objectName,
0920: HAPartitionStateTransfer subscriber) {
0921: stateHandlers.put(objectName, subscriber);
0922: }
0924: public void unsubscribeFromStateTransferEvents(String objectName,
0925: HAPartitionStateTransfer subscriber) {
0926: stateHandlers.remove(objectName);
0927: }
0929: // *************************
0930: // *************************
0931: // Group Membership listeners
0932: // *************************
0933: // *************************
0934: //
0935: public void registerMembershipListener(HAMembershipListener listener) {
0936: boolean isAsynch = (this .allowSyncListeners == false)
0937: || (listener instanceof AsynchHAMembershipListener)
0938: || (listener instanceof AsynchHAMembershipExtendedListener);
0939: if (isAsynch) {
0940: synchronized (this .asynchListeners) {
0941: this .asynchListeners.add(listener);
0942: }
0943: } else {
0944: synchronized (this .synchListeners) {
0945: this .synchListeners.add(listener);
0946: }
0947: }
0948: }
0950: public void unregisterMembershipListener(
0951: HAMembershipListener listener) {
0952: boolean isAsynch = (this .allowSyncListeners == false)
0953: || (listener instanceof AsynchHAMembershipListener)
0954: || (listener instanceof AsynchHAMembershipExtendedListener);
0955: if (isAsynch) {
0956: synchronized (this .asynchListeners) {
0957: this .asynchListeners.remove(listener);
0958: }
0959: } else {
0960: synchronized (this .synchListeners) {
0961: this .synchListeners.remove(listener);
0962: }
0963: }
0964: }
0966: public boolean getAllowSynchronousMembershipNotifications() {
0967: return allowSyncListeners;
0968: }
0970: public void setAllowSynchronousMembershipNotifications(
0971: boolean allowSync) {
0972: this .allowSyncListeners = allowSync;
0973: }
0975: // org.jgroups.RpcDispatcher overrides ---------------------------------------------------
0977: /**
0978: * Message contains MethodCall. Execute it against *this* object and return result.
0979: * Use MethodCall.Invoke() to do this. Return result.
0980: *
0981: * This overrides RpcDispatcher.Handle so that we can dispatch to many different objects.
0982: * @param req The org.jgroups. representation of the method invocation
0983: * @return The serializable return value from the invocation
0984: */
0985: public Object handle(Message req) {
0986: Object body = null;
0987: Object retval = null;
0988: MethodCall method_call = null;
0989: boolean trace = log.isTraceEnabled();
0991: if (trace)
0992: log.trace("Partition " + partitionName + " received msg");
0993: if (req == null || req.getBuffer() == null) {
0994: log.warn("message or message buffer is null !");
0995: return null;
0996: }
0998: try {
0999: body = objectFromByteBuffer(req.getBuffer());
1000: } catch (Exception e) {
1001: log.warn("failed unserializing message buffer (msg=" + req
1002: + ")", e);
1003: return null;
1004: }
1006: if (body == null || !(body instanceof MethodCall)) {
1007: log.warn("message does not contain a MethodCall object !");
1008: return null;
1009: }
1011: // get method call informations
1012: //
1013: method_call = (MethodCall) body;
1014: String methodName = method_call.getName();
1016: if (trace)
1017: log.trace("pre methodName: " + methodName);
1019: int idx = methodName.lastIndexOf('.');
1020: String handlerName = methodName.substring(0, idx);
1021: String newMethodName = methodName.substring(idx + 1);
1023: if (trace) {
1024: log.trace("handlerName: " + handlerName + " methodName: "
1025: + newMethodName);
1026: log.trace("Handle: " + methodName);
1027: }
1029: // prepare method call
1030: method_call.setName(newMethodName);
1031: Object handler = rpcHandlers.get(handlerName);
1032: if (handler == null) {
1033: if (trace)
1034: log.debug("No rpc handler registered under: "
1035: + handlerName);
1036: return new NoHandlerForRPC();
1037: }
1039: /* Invoke it and just return any exception with trace level logging of
1040: the exception. The exception semantics of a group rpc call are weak as
1041: the return value may be a normal return value or the exception thrown.
1042: */
1043: try {
1044: retval = method_call.invoke(handler);
1045: if (trace)
1046: log.trace("rpc call return value: " + retval);
1047: } catch (Throwable t) {
1048: if (trace)
1049: log.trace("rpc call threw exception", t);
1050: retval = t;
1051: }
1053: return retval;
1054: }
1056: // AsynchEventHandler.AsynchEventProcessor -----------------------
1058: public void processEvent(Object event) {
1059: ViewChangeEvent vce = (ViewChangeEvent) event;
1060: notifyListeners(asynchListeners, vce.viewId, vce.allMembers,
1061: vce.deadMembers, vce.newMembers, vce.originatingGroups);
1063: }
1065: // Package protected ---------------------------------------------
1067: // Protected -----------------------------------------------------
1069: protected void verifyNodeIsUnique(Vector javaGroupIpAddresses)
1070: throws Exception {
1071: byte[] localUniqueName = this .localJGAddress
1072: .getAdditionalData();
1073: if (localUniqueName == null) {
1074: log
1075: .warn("No additional information has been found in the JavaGroup address: "
1076: + "make sure you are running with a correct version of JGroups and that the protocol "
1077: + " you are using supports the 'additionalData' behaviour");
1078: return;
1079: }
1081: for (int i = 0; i < javaGroupIpAddresses.size(); i++) {
1082: IpAddress address = (IpAddress) javaGroupIpAddresses
1083: .elementAt(i);
1084: if (!address.equals(this .localJGAddress)) {
1085: if (localUniqueName.equals(address.getAdditionalData()))
1086: throw new Exception(
1087: "Local node removed from cluster ("
1088: + this .localJGAddress
1089: + "): another node ("
1090: + address
1091: + ") publicizing the same name was already there");
1092: }
1093: }
1094: }
1096: /**
1097: * Helper method that binds the partition in the JNDI tree.
1098: * @param jndiName Name under which the object must be bound
1099: * @param who Object to bind in JNDI
1100: * @param classType Class type under which should appear the bound object
1101: * @param ctx Naming context under which we bind the object
1102: * @throws Exception Thrown if a naming exception occurs during binding
1103: */
1104: protected void bind(String jndiName, Object who, Class classType,
1105: Context ctx) throws Exception {
1106: // Ah ! This service isn't serializable, so we use a helper class
1107: //
1108: NonSerializableFactory.bind(jndiName, who);
1109: Name n = ctx.getNameParser("").parse(jndiName);
1110: while (n.size() > 1) {
1111: String ctxName = n.get(0);
1112: try {
1113: ctx = (Context) ctx.lookup(ctxName);
1114: } catch (NameNotFoundException e) {
1115: log.debug("creating Subcontext" + ctxName);
1116: ctx = ctx.createSubcontext(ctxName);
1117: }
1118: n = n.getSuffix(1);
1119: }
1121: // The helper class NonSerializableFactory uses address type nns, we go on to
1122: // use the helper class to bind the service object in JNDI
1123: //
1124: StringRefAddr addr = new StringRefAddr("nns", jndiName);
1125: Reference ref = new Reference(classType.getName(), addr,
1126: NonSerializableFactory.class.getName(), null);
1127: ctx.rebind(n.get(0), ref);
1128: }
1130: /**
1131: * Helper method that returns a vector of dead members from two input vectors: new and old vectors of two views.
1132: * Dead members are old - new members.
1133: * @param oldMembers Vector of old members
1134: * @param newMembers Vector of new members
1135: * @return Vector of members that have died between the two views, can be empty.
1136: */
1137: protected Vector getDeadMembers(Vector oldMembers, Vector newMembers) {
1138: boolean debug = log.isDebugEnabled();
1139: if (oldMembers == null)
1140: oldMembers = new Vector();
1141: if (newMembers == null)
1142: newMembers = new Vector();
1143: Vector dead = (Vector) oldMembers.clone();
1144: dead.removeAll(newMembers);
1145: if (dead.size() > 0 && debug)
1146: log.debug("dead members: " + dead);
1147: return dead;
1148: }
1150: /**
1151: * Helper method that returns a vector of new members from two input vectors: new and old vectors of two views.
1152: * @param oldMembers Vector of old members
1153: * @param allMembers Vector of new members
1154: * @return Vector of members that have joined the partition between the two views
1155: */
1156: protected Vector getNewMembers(Vector oldMembers, Vector allMembers) {
1157: if (oldMembers == null)
1158: oldMembers = new Vector();
1159: if (allMembers == null)
1160: allMembers = new Vector();
1161: Vector newMembers = (Vector) allMembers.clone();
1162: newMembers.removeAll(oldMembers);
1163: return newMembers;
1164: }
1166: protected void notifyListeners(ArrayList theListeners, long viewID,
1167: Vector allMembers, Vector deadMembers, Vector newMembers,
1168: Vector originatingGroups) {
1169: log.debug("Begin notifyListeners, viewID: " + viewID);
1171: synchronized (theListeners) {
1172: // JBAS-3619 -- don't hold synch lock while notifying
1173: theListeners = (ArrayList) theListeners.clone();
1174: }
1176: for (int i = 0; i < theListeners.size(); i++) {
1177: HAMembershipListener aListener = null;
1178: try {
1179: aListener = (HAMembershipListener) theListeners.get(i);
1180: if (originatingGroups != null
1181: && (aListener instanceof HAMembershipExtendedListener)) {
1182: HAMembershipExtendedListener exListener = (HAMembershipExtendedListener) aListener;
1183: exListener.membershipChangedDuringMerge(
1184: deadMembers, newMembers, allMembers,
1185: originatingGroups);
1186: } else {
1187: aListener.membershipChanged(deadMembers,
1188: newMembers, allMembers);
1189: }
1190: } catch (Throwable e) {
1191: // a problem in a listener should not prevent other members to receive the new view
1192: log.warn("HAMembershipListener callback failure: "
1193: + aListener, e);
1194: }
1195: }
1196: log.debug("End notifyListeners, viewID: " + viewID);
1197: }
1199: /*
1200: * Allows caller to specify whether the partition instance should be bound into JNDI. Default value is true.
1201: * This method must be called before the partition is started as the binding occurs during startup.
1202: *
1203: * @param bind Whether to bind the partition into JNDI.
1204: */
1205: public void setBindIntoJndi(boolean bind) {
1206: bindIntoJndi = bind;
1207: }
1209: /*
1210: * Allows caller to determine whether the partition instance has been bound into JNDI.
1211: *
1212: * @return true if the partition has been bound into JNDI.
1213: */
1214: public boolean getBindIntoJndi() {
1215: return bindIntoJndi;
1216: }
1218: protected Vector translateAddresses(Vector jgAddresses) {
1219: if (jgAddresses == null)
1220: return null;
1222: Vector result = new Vector(jgAddresses.size());
1223: for (int i = 0; i < jgAddresses.size(); i++) {
1224: IpAddress addr = (IpAddress) jgAddresses.elementAt(i);
1225: result.add(new ClusterNode(addr));
1226: }
1228: return result;
1229: }
1231: public void logHistory(String message) {
1232: try {
1233: history.add(new SimpleDateFormat().format(new Date())
1234: + " : " + message);
1235: } catch (Exception ignored) {
1236: }
1237: }
1239: /** A simple data class containing the view change event needed to
1240: * message the HAMembershipListeners
1241: */
1242: private static class ViewChangeEvent {
1243: long viewId;
1244: Vector deadMembers;
1245: Vector newMembers;
1246: Vector allMembers;
1247: Vector originatingGroups;
1248: }
1250: private class MarshallerImpl implements
1251: org.jgroups.blocks.RpcDispatcher.Marshaller {
1253: public Object objectFromByteBuffer(byte[] buf) throws Exception {
1254: return HAPartitionImpl.objectFromByteBuffer(buf);
1255: }
1257: public byte[] objectToByteBuffer(Object obj) throws Exception {
1258: return HAPartitionImpl.objectToByteBuffer(obj);
1259: }
1261: }
1263: // Private -------------------------------------------------------
1265: // Inner classes -------------------------------------------------
1267: }