0001: /*
0002: * Licensed to the Apache Software Foundation (ASF) under one or more
0003: * contributor license agreements. See the NOTICE file distributed with
0004: * this work for additional information regarding copyright ownership.
0005: * The ASF licenses this file to You under the Apache License, Version 2.0
0006: * (the "License"); you may not use this file except in compliance with
0007: * the License. You may obtain a copy of the License at
0008: *
0009: * http://www.apache.org/licenses/LICENSE-2.0
0010: *
0011: * Unless required by applicable law or agreed to in writing, software
0012: * distributed under the License is distributed on an "AS IS" BASIS,
0013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014: * See the License for the specific language governing permissions and
0015: * limitations under the License.
0016: */
0017:
0018: package org.apache.catalina.ha.tcp;
0019:
0020: import java.beans.PropertyChangeSupport;
0021: import java.io.Serializable;
0022: import java.util.ArrayList;
0023: import java.util.HashMap;
0024: import java.util.Iterator;
0025: import java.util.List;
0026: import java.util.Map;
0027:
0028: import org.apache.catalina.Container;
0029: import org.apache.catalina.Context;
0030: import org.apache.catalina.Engine;
0031: import org.apache.catalina.Host;
0032: import org.apache.catalina.Lifecycle;
0033: import org.apache.catalina.LifecycleEvent;
0034: import org.apache.catalina.LifecycleException;
0035: import org.apache.catalina.LifecycleListener;
0036: import org.apache.catalina.Manager;
0037: import org.apache.catalina.Valve;
0038: import org.apache.catalina.ha.CatalinaCluster;
0039: import org.apache.catalina.ha.ClusterListener;
0040: import org.apache.catalina.ha.ClusterManager;
0041: import org.apache.catalina.ha.ClusterMessage;
0042: import org.apache.catalina.ha.ClusterValve;
0043: import org.apache.catalina.ha.session.DeltaManager;
0044: import org.apache.catalina.ha.util.IDynamicProperty;
0045: import org.apache.catalina.tribes.Channel;
0046: import org.apache.catalina.tribes.ChannelListener;
0047: import org.apache.catalina.tribes.Member;
0048: import org.apache.catalina.tribes.MembershipListener;
0049: import org.apache.catalina.tribes.group.GroupChannel;
0050: import org.apache.catalina.util.LifecycleSupport;
0051: import org.apache.catalina.util.StringManager;
0052: import org.apache.juli.logging.Log;
0053: import org.apache.juli.logging.LogFactory;
0054: import org.apache.tomcat.util.IntrospectionUtils;
0055: import org.apache.catalina.ha.session.ClusterSessionListener;
0056: import org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor;
0057: import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector;
0058: import org.apache.catalina.ha.session.JvmRouteBinderValve;
0059: import org.apache.catalina.ha.session.JvmRouteSessionIDBinderListener;
0060:
0061: /**
0062: * A <b>Cluster </b> implementation using simple multicast. Responsible for
0063: * setting up a cluster and provides callers with a valid multicast
0064: * receiver/sender.
0065: *
0066: * FIXME remove install/remove/start/stop context dummys
0067: * FIXME wrote testcases
0068: *
0069: * @author Filip Hanik
0070: * @author Remy Maucherat
0071: * @author Peter Rossbach
0072: * @version $Revision: 532865 $, $Date: 2007-04-26 23:09:41 +0200 (jeu., 26 avr. 2007) $
0073: */
0074: public class SimpleTcpCluster implements CatalinaCluster, Lifecycle,
0075: LifecycleListener, IDynamicProperty, MembershipListener,
0076: ChannelListener {
0077:
0078: public static Log log = LogFactory.getLog(SimpleTcpCluster.class);
0079:
0080: // ----------------------------------------------------- Instance Variables
0081:
0082: /**
0083: * Descriptive information about this component implementation.
0084: */
0085: protected static final String info = "SimpleTcpCluster/2.2";
0086:
0087: public static final String BEFORE_MEMBERREGISTER_EVENT = "before_member_register";
0088:
0089: public static final String AFTER_MEMBERREGISTER_EVENT = "after_member_register";
0090:
0091: public static final String BEFORE_MANAGERREGISTER_EVENT = "before_manager_register";
0092:
0093: public static final String AFTER_MANAGERREGISTER_EVENT = "after_manager_register";
0094:
0095: public static final String BEFORE_MANAGERUNREGISTER_EVENT = "before_manager_unregister";
0096:
0097: public static final String AFTER_MANAGERUNREGISTER_EVENT = "after_manager_unregister";
0098:
0099: public static final String BEFORE_MEMBERUNREGISTER_EVENT = "before_member_unregister";
0100:
0101: public static final String AFTER_MEMBERUNREGISTER_EVENT = "after_member_unregister";
0102:
0103: public static final String SEND_MESSAGE_FAILURE_EVENT = "send_message_failure";
0104:
0105: public static final String RECEIVE_MESSAGE_FAILURE_EVENT = "receive_message_failure";
0106:
0107: /**
0108: * Group channel.
0109: */
0110: protected Channel channel = new GroupChannel();
0111:
0112: /**
0113: * Name for logging purpose
0114: */
0115: protected String clusterImpName = "SimpleTcpCluster";
0116:
0117: /**
0118: * The string manager for this package.
0119: */
0120: protected StringManager sm = StringManager
0121: .getManager(Constants.Package);
0122:
0123: /**
0124: * The cluster name to join
0125: */
0126: protected String clusterName;
0127:
0128: /**
0129: * call Channel.heartbeat() at container background thread
0130: * @see org.apache.catalina.tribes.group.GroupChannel#heartbeat()
0131: */
0132: protected boolean heartbeatBackgroundEnabled = false;
0133:
0134: /**
0135: * The Container associated with this Cluster.
0136: */
0137: protected Container container = null;
0138:
0139: /**
0140: * The lifecycle event support for this component.
0141: */
0142: protected LifecycleSupport lifecycle = new LifecycleSupport(this );
0143:
0144: /**
0145: * Has this component been started?
0146: */
0147: protected boolean started = false;
0148:
0149: /**
0150: * The property change support for this component.
0151: */
0152: protected PropertyChangeSupport support = new PropertyChangeSupport(
0153: this );
0154:
0155: /**
0156: * The context name <->manager association for distributed contexts.
0157: */
0158: protected Map managers = new HashMap();
0159:
0160: protected ClusterManager managerTemplate = new DeltaManager();
0161:
0162: private List valves = new ArrayList();
0163:
0164: private org.apache.catalina.ha.ClusterDeployer clusterDeployer;
0165:
0166: /**
0167: * Listeners of messages
0168: */
0169: protected List clusterListeners = new ArrayList();
0170:
0171: /**
0172: * Comment for <code>notifyLifecycleListenerOnFailure</code>
0173: */
0174: private boolean notifyLifecycleListenerOnFailure = false;
0175:
0176: /**
0177: * dynamic sender <code>properties</code>
0178: */
0179: private Map properties = new HashMap();
0180:
0181: private int channelSendOptions = Channel.SEND_OPTIONS_ASYNCHRONOUS;
0182:
0183: // ------------------------------------------------------------- Properties
0184:
0185: public SimpleTcpCluster() {
0186: }
0187:
0188: /**
0189: * Return descriptive information about this Cluster implementation and the
0190: * corresponding version number, in the format
0191: * <code><description>/<version></code>.
0192: */
0193: public String getInfo() {
0194: return (info);
0195: }
0196:
0197: /**
0198: * Return heartbeat enable flag (default false)
0199: * @return the heartbeatBackgroundEnabled
0200: */
0201: public boolean isHeartbeatBackgroundEnabled() {
0202: return heartbeatBackgroundEnabled;
0203: }
0204:
0205: /**
0206: * enabled that container backgroundThread call heartbeat at channel
0207: * @param heartbeatBackgroundEnabled the heartbeatBackgroundEnabled to set
0208: */
0209: public void setHeartbeatBackgroundEnabled(
0210: boolean heartbeatBackgroundEnabled) {
0211: this .heartbeatBackgroundEnabled = heartbeatBackgroundEnabled;
0212: }
0213:
0214: /**
0215: * Set the name of the cluster to join, if no cluster with this name is
0216: * present create one.
0217: *
0218: * @param clusterName
0219: * The clustername to join
0220: */
0221: public void setClusterName(String clusterName) {
0222: this .clusterName = clusterName;
0223: }
0224:
0225: /**
0226: * Return the name of the cluster that this Server is currently configured
0227: * to operate within.
0228: *
0229: * @return The name of the cluster associated with this server
0230: */
0231: public String getClusterName() {
0232: if (clusterName == null && container != null)
0233: return container.getName();
0234: return clusterName;
0235: }
0236:
0237: /**
0238: * Set the Container associated with our Cluster
0239: *
0240: * @param container
0241: * The Container to use
0242: */
0243: public void setContainer(Container container) {
0244: Container oldContainer = this .container;
0245: this .container = container;
0246: support.firePropertyChange("container", oldContainer,
0247: this .container);
0248: }
0249:
0250: /**
0251: * Get the Container associated with our Cluster
0252: *
0253: * @return The Container associated with our Cluster
0254: */
0255: public Container getContainer() {
0256: return (this .container);
0257: }
0258:
0259: /**
0260: * @return Returns the notifyLifecycleListenerOnFailure.
0261: */
0262: public boolean isNotifyLifecycleListenerOnFailure() {
0263: return notifyLifecycleListenerOnFailure;
0264: }
0265:
0266: /**
0267: * @param notifyListenerOnFailure
0268: * The notifyLifecycleListenerOnFailure to set.
0269: */
0270: public void setNotifyLifecycleListenerOnFailure(
0271: boolean notifyListenerOnFailure) {
0272: boolean oldNotifyListenerOnFailure = this .notifyLifecycleListenerOnFailure;
0273: this .notifyLifecycleListenerOnFailure = notifyListenerOnFailure;
0274: support.firePropertyChange("notifyLifecycleListenerOnFailure",
0275: oldNotifyListenerOnFailure,
0276: this .notifyLifecycleListenerOnFailure);
0277: }
0278:
0279: /**
0280: * @deprecated use getManagerTemplate().getClass().getName() instead.
0281: * @return String
0282: */
0283: public String getManagerClassName() {
0284: return managerTemplate.getClass().getName();
0285: }
0286:
0287: /**
0288: * @deprecated use nested <Manager> element inside the cluster config instead.
0289: * @param managerClassName String
0290: */
0291: public void setManagerClassName(String managerClassName) {
0292: log
0293: .warn("setManagerClassName is deprecated, use nested <Manager> element inside the <Cluster> element instead, this request will be ignored.");
0294: }
0295:
0296: /**
0297: * Add cluster valve
0298: * Cluster Valves are only add to container when cluster is started!
0299: * @param valve The new cluster Valve.
0300: */
0301: public void addValve(Valve valve) {
0302: if (valve instanceof ClusterValve && (!valves.contains(valve)))
0303: valves.add(valve);
0304: }
0305:
0306: /**
0307: * get all cluster valves
0308: * @return current cluster valves
0309: */
0310: public Valve[] getValves() {
0311: return (Valve[]) valves.toArray(new Valve[valves.size()]);
0312: }
0313:
0314: /**
0315: * Get the cluster listeners associated with this cluster. If this Array has
0316: * no listeners registered, a zero-length array is returned.
0317: */
0318: public ClusterListener[] findClusterListeners() {
0319: if (clusterListeners.size() > 0) {
0320: ClusterListener[] listener = new ClusterListener[clusterListeners
0321: .size()];
0322: clusterListeners.toArray(listener);
0323: return listener;
0324: } else
0325: return new ClusterListener[0];
0326:
0327: }
0328:
0329: /**
0330: * add cluster message listener and register cluster to this listener
0331: *
0332: * @see org.apache.catalina.ha.CatalinaCluster#addClusterListener(org.apache.catalina.ha.MessageListener)
0333: */
0334: public void addClusterListener(ClusterListener listener) {
0335: if (listener != null && !clusterListeners.contains(listener)) {
0336: clusterListeners.add(listener);
0337: listener.setCluster(this );
0338: }
0339: }
0340:
0341: /**
0342: * remove message listener and deregister Cluster from listener
0343: *
0344: * @see org.apache.catalina.ha.CatalinaCluster#removeClusterListener(org.apache.catalina.ha.MessageListener)
0345: */
0346: public void removeClusterListener(ClusterListener listener) {
0347: if (listener != null) {
0348: clusterListeners.remove(listener);
0349: listener.setCluster(null);
0350: }
0351: }
0352:
0353: /**
0354: * get current Deployer
0355: */
0356: public org.apache.catalina.ha.ClusterDeployer getClusterDeployer() {
0357: return clusterDeployer;
0358: }
0359:
0360: /**
0361: * set a new Deployer, must be set before cluster started!
0362: */
0363: public void setClusterDeployer(
0364: org.apache.catalina.ha.ClusterDeployer clusterDeployer) {
0365: this .clusterDeployer = clusterDeployer;
0366: }
0367:
0368: public void setChannel(Channel channel) {
0369: this .channel = channel;
0370: }
0371:
0372: public void setManagerTemplate(ClusterManager managerTemplate) {
0373: this .managerTemplate = managerTemplate;
0374: }
0375:
0376: public void setChannelSendOptions(int channelSendOptions) {
0377: this .channelSendOptions = channelSendOptions;
0378: }
0379:
0380: /**
0381: * has members
0382: */
0383: protected boolean hasMembers = false;
0384:
0385: public boolean hasMembers() {
0386: return hasMembers;
0387: }
0388:
0389: /**
0390: * Get all current cluster members
0391: * @return all members or empty array
0392: */
0393: public Member[] getMembers() {
0394: return channel.getMembers();
0395: }
0396:
0397: /**
0398: * Return the member that represents this node.
0399: *
0400: * @return Member
0401: */
0402: public Member getLocalMember() {
0403: return channel.getLocalMember(true);
0404: }
0405:
0406: // ------------------------------------------------------------- dynamic
0407: // manager property handling
0408:
0409: /**
0410: * JMX hack to direct use at jconsole
0411: *
0412: * @param name
0413: * @param value
0414: */
0415: public void setProperty(String name, String value) {
0416: setProperty(name, (Object) value);
0417: }
0418:
0419: /**
0420: * set config attributes with reflect and propagate to all managers
0421: *
0422: * @param name
0423: * @param value
0424: */
0425: public void setProperty(String name, Object value) {
0426: if (log.isTraceEnabled())
0427: log.trace(sm.getString("SimpleTcpCluster.setProperty",
0428: name, value, properties.get(name)));
0429: properties.put(name, value);
0430: //using a dynamic way of setting properties is nice, but a security risk
0431: //if exposed through JMX. This way you can sit and try to guess property names,
0432: //we will only allow explicit property names
0433: log
0434: .warn("Dynamic setProperty("
0435: + name
0436: + ",value) has been disabled, please use explicit properties for the element you are trying to identify");
0437: if (started) {
0438: // FIXME Hmm, is that correct when some DeltaManagers are direct configured inside Context?
0439: // Why we not support it for other elements, like sender, receiver or membership?
0440: // Must we restart element after change?
0441: // if (name.startsWith("manager")) {
0442: // String key = name.substring("manager".length() + 1);
0443: // String pvalue = value.toString();
0444: // for (Iterator iter = managers.values().iterator(); iter.hasNext();) {
0445: // Manager manager = (Manager) iter.next();
0446: // if(manager instanceof DeltaManager && ((ClusterManager) manager).isDefaultMode()) {
0447: // IntrospectionUtils.setProperty(manager, key, pvalue );
0448: // }
0449: // }
0450: // }
0451: }
0452: }
0453:
0454: /**
0455: * get current config
0456: *
0457: * @param key
0458: * @return The property
0459: */
0460: public Object getProperty(String key) {
0461: if (log.isTraceEnabled())
0462: log
0463: .trace(sm.getString("SimpleTcpCluster.getProperty",
0464: key));
0465: return properties.get(key);
0466: }
0467:
0468: /**
0469: * Get all properties keys
0470: *
0471: * @return An iterator over the property names.
0472: */
0473: public Iterator getPropertyNames() {
0474: return properties.keySet().iterator();
0475: }
0476:
0477: /**
0478: * remove a configured property.
0479: *
0480: * @param key
0481: */
0482: public void removeProperty(String key) {
0483: properties.remove(key);
0484: }
0485:
0486: /**
0487: * transfer properties from cluster configuration to subelement bean.
0488: * @param prefix
0489: * @param bean
0490: */
0491: protected void transferProperty(String prefix, Object bean) {
0492: if (prefix != null) {
0493: for (Iterator iter = getPropertyNames(); iter.hasNext();) {
0494: String pkey = (String) iter.next();
0495: if (pkey.startsWith(prefix)) {
0496: String key = pkey.substring(prefix.length() + 1);
0497: Object value = getProperty(pkey);
0498: IntrospectionUtils.setProperty(bean, key, value
0499: .toString());
0500: }
0501: }
0502: }
0503: }
0504:
0505: // --------------------------------------------------------- Public Methods
0506:
0507: /**
0508: * @return Returns the managers.
0509: */
0510: public Map getManagers() {
0511: return managers;
0512: }
0513:
0514: public Channel getChannel() {
0515: return channel;
0516: }
0517:
0518: public ClusterManager getManagerTemplate() {
0519: return managerTemplate;
0520: }
0521:
0522: public int getChannelSendOptions() {
0523: return channelSendOptions;
0524: }
0525:
0526: /**
0527: * Create new Manager without add to cluster (comes with start the manager)
0528: *
0529: * @param name
0530: * Context Name of this manager
0531: * @see org.apache.catalina.Cluster#createManager(java.lang.String)
0532: * @see #addManager(String, Manager)
0533: * @see DeltaManager#start()
0534: */
0535: public synchronized Manager createManager(String name) {
0536: if (log.isDebugEnabled())
0537: log.debug("Creating ClusterManager for context " + name
0538: + " using class " + getManagerClassName());
0539: Manager manager = null;
0540: try {
0541: manager = managerTemplate.cloneFromTemplate();
0542: ((ClusterManager) manager).setName(name);
0543: } catch (Exception x) {
0544: log
0545: .error(
0546: "Unable to clone cluster manager, defaulting to org.apache.catalina.ha.session.DeltaManager",
0547: x);
0548: manager = new org.apache.catalina.ha.session.DeltaManager();
0549: } finally {
0550: if (manager != null && (manager instanceof ClusterManager))
0551: ((ClusterManager) manager).setCluster(this );
0552: }
0553: return manager;
0554: }
0555:
0556: public void registerManager(Manager manager) {
0557:
0558: if (!(manager instanceof ClusterManager)) {
0559: log
0560: .warn("Manager [ "
0561: + manager
0562: + "] does not implement ClusterManager, addition to cluster has been aborted.");
0563: return;
0564: }
0565: ClusterManager cmanager = (ClusterManager) manager;
0566: cmanager.setDistributable(true);
0567: // Notify our interested LifecycleListeners
0568: lifecycle.fireLifecycleEvent(BEFORE_MANAGERREGISTER_EVENT,
0569: manager);
0570: String clusterName = getManagerName(cmanager.getName(), manager);
0571: cmanager.setName(clusterName);
0572: cmanager.setCluster(this );
0573: cmanager.setDefaultMode(false);
0574:
0575: managers.put(clusterName, manager);
0576: // Notify our interested LifecycleListeners
0577: lifecycle.fireLifecycleEvent(AFTER_MANAGERREGISTER_EVENT,
0578: manager);
0579: }
0580:
0581: /**
0582: * remove an application form cluster replication bus
0583: *
0584: * @see org.apache.catalina.ha.CatalinaCluster#removeManager(java.lang.String,Manager)
0585: */
0586: public void removeManager(Manager manager) {
0587: if (manager != null && manager instanceof ClusterManager) {
0588: ClusterManager cmgr = (ClusterManager) manager;
0589: // Notify our interested LifecycleListeners
0590: lifecycle.fireLifecycleEvent(
0591: BEFORE_MANAGERUNREGISTER_EVENT, manager);
0592: managers.remove(getManagerName(cmgr.getName(), manager));
0593: cmgr.setCluster(null);
0594: // Notify our interested LifecycleListeners
0595: lifecycle.fireLifecycleEvent(AFTER_MANAGERUNREGISTER_EVENT,
0596: manager);
0597: }
0598: }
0599:
0600: /**
0601: * @param name
0602: * @param manager
0603: * @return
0604: */
0605: public String getManagerName(String name, Manager manager) {
0606: String clusterName = name;
0607: if (clusterName == null)
0608: clusterName = manager.getContainer().getName();
0609: if (getContainer() instanceof Engine) {
0610: Container context = manager.getContainer();
0611: if (context != null && context instanceof Context) {
0612: Container host = ((Context) context).getParent();
0613: if (host != null && host instanceof Host
0614: && clusterName != null
0615: && !(clusterName.indexOf("#") >= 0))
0616: clusterName = host.getName() + "#" + clusterName;
0617: }
0618: }
0619: return clusterName;
0620: }
0621:
0622: /*
0623: * Get Manager
0624: *
0625: * @see org.apache.catalina.ha.CatalinaCluster#getManager(java.lang.String)
0626: */
0627: public Manager getManager(String name) {
0628: return (Manager) managers.get(name);
0629: }
0630:
0631: // ------------------------------------------------------ Lifecycle Methods
0632:
0633: /**
0634: * Execute a periodic task, such as reloading, etc. This method will be
0635: * invoked inside the classloading context of this container. Unexpected
0636: * throwables will be caught and logged.
0637: * @see org.apache.catalina.ha.deploy.FarmWarDeployer#backgroundProcess()
0638: * @see org.apache.catalina.tribes.group.GroupChannel#heartbeat()
0639: * @see org.apache.catalina.tribes.group.GroupChannel.HeartbeatThread#run()
0640: *
0641: */
0642: public void backgroundProcess() {
0643: if (clusterDeployer != null)
0644: clusterDeployer.backgroundProcess();
0645:
0646: //send a heartbeat through the channel
0647: if (isHeartbeatBackgroundEnabled() && channel != null)
0648: channel.heartbeat();
0649: }
0650:
0651: /**
0652: * Add a lifecycle event listener to this component.
0653: *
0654: * @param listener
0655: * The listener to add
0656: */
0657: public void addLifecycleListener(LifecycleListener listener) {
0658: lifecycle.addLifecycleListener(listener);
0659: }
0660:
0661: /**
0662: * Get the lifecycle listeners associated with this lifecycle. If this
0663: * Lifecycle has no listeners registered, a zero-length array is returned.
0664: */
0665: public LifecycleListener[] findLifecycleListeners() {
0666:
0667: return lifecycle.findLifecycleListeners();
0668:
0669: }
0670:
0671: /**
0672: * Remove a lifecycle event listener from this component.
0673: *
0674: * @param listener
0675: * The listener to remove
0676: */
0677: public void removeLifecycleListener(LifecycleListener listener) {
0678: lifecycle.removeLifecycleListener(listener);
0679: }
0680:
0681: /**
0682: * Use as base to handle start/stop/periodic Events from host. Currently
0683: * only log the messages as trace level.
0684: *
0685: * @see org.apache.catalina.LifecycleListener#lifecycleEvent(org.apache.catalina.LifecycleEvent)
0686: */
0687: public void lifecycleEvent(LifecycleEvent lifecycleEvent) {
0688: if (log.isTraceEnabled())
0689: log
0690: .trace(sm.getString("SimpleTcpCluster.event.log",
0691: lifecycleEvent.getType(), lifecycleEvent
0692: .getData()));
0693: }
0694:
0695: // ------------------------------------------------------ public
0696:
0697: /**
0698: * Prepare for the beginning of active use of the public methods of this
0699: * component. This method should be called after <code>configure()</code>,
0700: * and before any of the public methods of the component are utilized. <BR>
0701: * Starts the cluster communication channel, this will connect with the
0702: * other nodes in the cluster, and request the current session state to be
0703: * transferred to this node.
0704: *
0705: * @exception IllegalStateException
0706: * if this component has already been started
0707: * @exception LifecycleException
0708: * if this component detects a fatal error that prevents this
0709: * component from being used
0710: */
0711: public void start() throws LifecycleException {
0712: if (started)
0713: throw new LifecycleException(sm
0714: .getString("cluster.alreadyStarted"));
0715: if (log.isInfoEnabled())
0716: log.info("Cluster is about to start");
0717:
0718: // Notify our interested LifecycleListeners
0719: lifecycle.fireLifecycleEvent(BEFORE_START_EVENT, this );
0720: try {
0721: checkDefaults();
0722: registerClusterValve();
0723: channel.addMembershipListener(this );
0724: channel.addChannelListener(this );
0725: channel.start(channel.DEFAULT);
0726: if (clusterDeployer != null)
0727: clusterDeployer.start();
0728: this .started = true;
0729: // Notify our interested LifecycleListeners
0730: lifecycle.fireLifecycleEvent(AFTER_START_EVENT, this );
0731: } catch (Exception x) {
0732: log.error("Unable to start cluster.", x);
0733: throw new LifecycleException(x);
0734: }
0735: }
0736:
0737: protected void checkDefaults() {
0738: if (clusterListeners.size() == 0) {
0739: addClusterListener(new JvmRouteSessionIDBinderListener());
0740: addClusterListener(new ClusterSessionListener());
0741: }
0742: if (valves.size() == 0) {
0743: addValve(new JvmRouteBinderValve());
0744: addValve(new ReplicationValve());
0745: }
0746: if (clusterDeployer != null)
0747: clusterDeployer.setCluster(this );
0748: if (channel == null)
0749: channel = new GroupChannel();
0750: if (channel instanceof GroupChannel
0751: && !((GroupChannel) channel).getInterceptors()
0752: .hasNext()) {
0753: channel.addInterceptor(new MessageDispatch15Interceptor());
0754: channel.addInterceptor(new TcpFailureDetector());
0755: }
0756: }
0757:
0758: /**
0759: * register all cluster valve to host or engine
0760: * @throws Exception
0761: * @throws ClassNotFoundException
0762: */
0763: protected void registerClusterValve() throws Exception {
0764: if (container != null) {
0765: for (Iterator iter = valves.iterator(); iter.hasNext();) {
0766: ClusterValve valve = (ClusterValve) iter.next();
0767: if (log.isDebugEnabled())
0768: log.debug("Invoking addValve on " + getContainer()
0769: + " with class="
0770: + valve.getClass().getName());
0771: if (valve != null) {
0772: IntrospectionUtils
0773: .callMethodN(
0774: getContainer(),
0775: "addValve",
0776: new Object[] { valve },
0777: new Class[] { org.apache.catalina.Valve.class });
0778:
0779: }
0780: valve.setCluster(this );
0781: }
0782: }
0783: }
0784:
0785: /**
0786: * unregister all cluster valve to host or engine
0787: * @throws Exception
0788: * @throws ClassNotFoundException
0789: */
0790: protected void unregisterClusterValve() throws Exception {
0791: for (Iterator iter = valves.iterator(); iter.hasNext();) {
0792: ClusterValve valve = (ClusterValve) iter.next();
0793: if (log.isDebugEnabled())
0794: log.debug("Invoking removeValve on " + getContainer()
0795: + " with class=" + valve.getClass().getName());
0796: if (valve != null) {
0797: IntrospectionUtils
0798: .callMethodN(
0799: getContainer(),
0800: "removeValve",
0801: new Object[] { valve },
0802: new Class[] { org.apache.catalina.Valve.class });
0803: }
0804: valve.setCluster(this );
0805: }
0806: }
0807:
0808: /**
0809: * Gracefully terminate the active cluster component.<br/>
0810: * This will disconnect the cluster communication channel, stop the
0811: * listener and deregister the valves from host or engine.<br/><br/>
0812: * <b>Note:</b><br/>The sub elements receiver, sender, membership,
0813: * listener or valves are not removed. You can easily start the cluster again.
0814: *
0815: * @exception IllegalStateException
0816: * if this component has not been started
0817: * @exception LifecycleException
0818: * if this component detects a fatal error that needs to be
0819: * reported
0820: */
0821: public void stop() throws LifecycleException {
0822:
0823: if (!started)
0824: throw new IllegalStateException(sm
0825: .getString("cluster.notStarted"));
0826: // Notify our interested LifecycleListeners
0827: lifecycle.fireLifecycleEvent(BEFORE_STOP_EVENT, this );
0828:
0829: if (clusterDeployer != null)
0830: clusterDeployer.stop();
0831: this .managers.clear();
0832: try {
0833: if (clusterDeployer != null)
0834: clusterDeployer.setCluster(null);
0835: channel.stop(Channel.DEFAULT);
0836: channel.removeChannelListener(this );
0837: channel.removeMembershipListener(this );
0838: this .unregisterClusterValve();
0839: } catch (Exception x) {
0840: log.error("Unable to stop cluster valve.", x);
0841: }
0842: started = false;
0843: // Notify our interested LifecycleListeners
0844: lifecycle.fireLifecycleEvent(AFTER_STOP_EVENT, this );
0845: }
0846:
0847: /**
0848: * send message to all cluster members
0849: * @param msg message to transfer
0850: *
0851: * @see org.apache.catalina.ha.CatalinaCluster#send(org.apache.catalina.ha.ClusterMessage)
0852: */
0853: public void send(ClusterMessage msg) {
0854: send(msg, null);
0855: }
0856:
0857: /**
0858: * send message to all cluster members same cluster domain
0859: *
0860: * @see org.apache.catalina.ha.CatalinaCluster#send(org.apache.catalina.ha.ClusterMessage)
0861: */
0862: public void sendClusterDomain(ClusterMessage msg) {
0863: send(msg, null);
0864: }
0865:
0866: /**
0867: * send a cluster message to one member
0868: *
0869: * @param msg message to transfer
0870: * @param dest Receiver member
0871: * @see org.apache.catalina.ha.CatalinaCluster#send(org.apache.catalina.ha.ClusterMessage,
0872: * org.apache.catalina.ha.Member)
0873: */
0874: public void send(ClusterMessage msg, Member dest) {
0875: try {
0876: msg.setAddress(getLocalMember());
0877: if (dest != null) {
0878: if (!getLocalMember().equals(dest)) {
0879: channel.send(new Member[] { dest }, msg,
0880: channelSendOptions);
0881: } else
0882: log.error("Unable to send message to local member "
0883: + msg);
0884: } else {
0885: channel.send(channel.getMembers(), msg,
0886: channelSendOptions);
0887: }
0888: } catch (Exception x) {
0889: log.error("Unable to send message through cluster sender.",
0890: x);
0891: }
0892: }
0893:
0894: /**
0895: * New cluster member is registered
0896: *
0897: * @see org.apache.catalina.ha.MembershipListener#memberAdded(org.apache.catalina.ha.Member)
0898: */
0899: public void memberAdded(Member member) {
0900: try {
0901: hasMembers = channel.hasMembers();
0902: if (log.isInfoEnabled())
0903: log.info("Replication member added:" + member);
0904: // Notify our interested LifecycleListeners
0905: lifecycle.fireLifecycleEvent(BEFORE_MEMBERREGISTER_EVENT,
0906: member);
0907: // Notify our interested LifecycleListeners
0908: lifecycle.fireLifecycleEvent(AFTER_MEMBERREGISTER_EVENT,
0909: member);
0910: } catch (Exception x) {
0911: log.error("Unable to connect to replication system.", x);
0912: }
0913:
0914: }
0915:
0916: /**
0917: * Cluster member is gone
0918: *
0919: * @see org.apache.catalina.ha.MembershipListener#memberDisappeared(org.apache.catalina.ha.Member)
0920: */
0921: public void memberDisappeared(Member member) {
0922: try {
0923: hasMembers = channel.hasMembers();
0924: if (log.isInfoEnabled())
0925: log.info("Received member disappeared:" + member);
0926: // Notify our interested LifecycleListeners
0927: lifecycle.fireLifecycleEvent(BEFORE_MEMBERUNREGISTER_EVENT,
0928: member);
0929: // Notify our interested LifecycleListeners
0930: lifecycle.fireLifecycleEvent(AFTER_MEMBERUNREGISTER_EVENT,
0931: member);
0932: } catch (Exception x) {
0933: log
0934: .error(
0935: "Unable remove cluster node from replication system.",
0936: x);
0937: }
0938: }
0939:
0940: // --------------------------------------------------------- receiver
0941: // messages
0942:
0943: /**
0944: * notify all listeners from receiving a new message is not ClusterMessage
0945: * emitt Failure Event to LifecylceListener
0946: *
0947: * @param message
0948: * receveived Message
0949: */
0950: public boolean accept(Serializable msg, Member sender) {
0951: return (msg instanceof ClusterMessage);
0952: }
0953:
0954: public void messageReceived(Serializable message, Member sender) {
0955: ClusterMessage fwd = (ClusterMessage) message;
0956: fwd.setAddress(sender);
0957: messageReceived(fwd);
0958: }
0959:
0960: public void messageReceived(ClusterMessage message) {
0961:
0962: long start = 0;
0963: if (log.isDebugEnabled() && message != null)
0964: log.debug("Assuming clocks are synched: Replication for "
0965: + message.getUniqueId()
0966: + " took="
0967: + (System.currentTimeMillis() - (message)
0968: .getTimestamp()) + " ms.");
0969:
0970: //invoke all the listeners
0971: boolean accepted = false;
0972: if (message != null) {
0973: for (Iterator iter = clusterListeners.iterator(); iter
0974: .hasNext();) {
0975: ClusterListener listener = (ClusterListener) iter
0976: .next();
0977: if (listener.accept(message)) {
0978: accepted = true;
0979: listener.messageReceived(message);
0980: }
0981: }
0982: }
0983: if (!accepted && log.isDebugEnabled()) {
0984: if (notifyLifecycleListenerOnFailure) {
0985: Member dest = message.getAddress();
0986: // Notify our interested LifecycleListeners
0987: lifecycle.fireLifecycleEvent(
0988: RECEIVE_MESSAGE_FAILURE_EVENT,
0989: new SendMessageData(message, dest, null));
0990: }
0991: log.debug("Message " + message.toString() + " from type "
0992: + message.getClass().getName()
0993: + " transfered but no listener registered");
0994: }
0995: return;
0996: }
0997:
0998: // --------------------------------------------------------- Logger
0999:
1000: public Log getLogger() {
1001: return log;
1002: }
1003:
1004: // ------------------------------------------------------------- deprecated
1005:
1006: /**
1007: *
1008: * @see org.apache.catalina.Cluster#setProtocol(java.lang.String)
1009: */
1010: public void setProtocol(String protocol) {
1011: }
1012:
1013: /**
1014: * @see org.apache.catalina.Cluster#getProtocol()
1015: */
1016: public String getProtocol() {
1017: return null;
1018: }
1019: }
|