001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.proxy.generic;
023:
024: import java.util.List;
025:
026: import javax.management.AttributeChangeNotificationFilter;
027: import javax.management.NotificationListener;
028: import javax.management.AttributeChangeNotification;
029: import javax.management.Notification;
030: import javax.management.ObjectName;
031:
032: import org.jboss.invocation.Invoker;
033: import org.jboss.invocation.InvokerHA;
034: import org.jboss.invocation.InvokerProxyHA;
035: import org.jboss.invocation.jrmp.server.JRMPProxyFactory;
036: import org.jboss.ha.framework.interfaces.LoadBalancePolicy;
037: import org.jboss.ha.framework.interfaces.DistributedReplicantManager;
038: import org.jboss.ha.framework.interfaces.HAPartition;
039: import org.jboss.ha.framework.interfaces.RoundRobin;
040: import org.jboss.ha.framework.server.HATarget;
041: import org.jboss.logging.Logger;
042: import org.jboss.proxy.GenericProxyFactory;
043: import org.jboss.system.Registry;
044: import org.jboss.system.ServiceMBean;
045:
046: /**
047: * ProxyFactory for Clustering
048: *
049: * @author <a href="mailto:adrian@jboss.org">Adrian Brock</a>
050: * @version $Revision: 57188 $
051: */
052: public class ProxyFactoryHA extends JRMPProxyFactory implements
053: ProxyFactoryHAMBean,
054: DistributedReplicantManager.ReplicantListener {
055: protected String replicantName = null;
056: protected InvokerHA invokerHA;
057: protected HATarget target;
058: protected Invoker invoker;
059: protected DistributedReplicantManager drm = null;
060: protected ObjectName partitionObjectName;
061: protected String loadBalancePolicy = RoundRobin.class.getName();
062: protected NotificationListener listener;
063: protected int state = 0;
064:
065: public ObjectName getPartitionObjectName() {
066: return partitionObjectName;
067: }
068:
069: public void setPartitionObjectName(ObjectName partitionObjectName) {
070: this .partitionObjectName = partitionObjectName;
071: }
072:
073: public String getLoadBalancePolicy() {
074: return loadBalancePolicy;
075: }
076:
077: public void setLoadBalancePolicy(String loadBalancePolicy) {
078: this .loadBalancePolicy = loadBalancePolicy;
079: }
080:
081: public void createService() throws Exception {
082: super .createService();
083:
084: // we register our inner-class to retrieve STATE notifications from our container
085: AttributeChangeNotificationFilter filter = new AttributeChangeNotificationFilter();
086: filter.enableAttribute("State");
087: listener = new StateChangeListener();
088: getServer().addNotificationListener(getTargetName(), listener,
089: filter, null);
090: }
091:
092: protected void startService() throws Exception {
093: String partitionName = (String) getServer().getAttribute(
094: partitionObjectName, "PartitionName");
095: HAPartition partition = (HAPartition) getServer().getAttribute(
096: partitionObjectName, "HAPartition");
097: if (partition == null)
098: throw new RuntimeException("Partition is not registered: "
099: + partitionObjectName);
100: this .drm = partition.getDistributedReplicantManager();
101:
102: replicantName = getTargetName().toString();
103:
104: invokerHA = (InvokerHA) Registry.lookup(getInvokerName());
105: if (invokerHA == null)
106: throw new RuntimeException("Invoker is not registered: "
107: + getInvokerName());
108:
109: int mode = HATarget.MAKE_INVOCATIONS_WAIT;
110: if (state == ServiceMBean.STARTED)
111: mode = HATarget.ENABLE_INVOCATIONS;
112: target = new HATarget(partition, replicantName, invokerHA
113: .getStub(), mode);
114: invokerHA.registerBean(getTargetName(), target);
115:
116: String clusterFamilyName = partitionName + "/"
117: + getTargetName() + "/";
118:
119: // make ABSOLUTLY sure we do register with the DRM AFTER the HATarget
120: // otherwise we will refresh the *old* home in JNDI (ie before the proxy
121: // is re-generated)
122: drm.registerListener(replicantName, this );
123:
124: ClassLoader cl = Thread.currentThread().getContextClassLoader();
125: Class clazz;
126: LoadBalancePolicy policy;
127:
128: clazz = cl.loadClass(loadBalancePolicy);
129: policy = (LoadBalancePolicy) clazz.newInstance();
130: invoker = invokerHA.createProxy(getTargetName(), policy,
131: clusterFamilyName + "H");
132:
133: // JRMPInvokerProxyHA.colocation.add(new Integer(jmxNameHash));
134:
135: super .startService();
136: }
137:
138: public void stopService() throws Exception {
139: super .stopService();
140:
141: try {
142: // JRMPInvokerProxyHA.colocation.remove(new Integer(jmxNameHash));
143: invokerHA.unregisterBean(getTargetName());
144: target.destroy();
145: } catch (Exception ignored) {
146: // ignore.
147: }
148: if (drm != null)
149: drm.unregisterListener(replicantName, this );
150: }
151:
152: public void destroyService() throws Exception {
153: super .destroyService();
154: getServer().removeNotificationListener(getTargetName(),
155: listener);
156: }
157:
158: protected void containerIsFullyStarted() {
159: if (target != null)
160: target
161: .setInvocationsAuthorization(HATarget.ENABLE_INVOCATIONS);
162: }
163:
164: protected void containerIsAboutToStop() {
165: if (target != null) {
166: target
167: .setInvocationsAuthorization(HATarget.DISABLE_INVOCATIONS);
168: target.disable();
169: }
170: }
171:
172: // synchronized keyword added when it became possible for DRM to issue
173: // concurrent replicantsChanged notifications. JBAS-2169.
174: public synchronized void replicantsChanged(String key,
175: List newReplicants, int newReplicantsViewId) {
176: try {
177: if (invoker instanceof InvokerProxyHA)
178: ((InvokerProxyHA) invoker).updateClusterInfo(target
179: .getReplicants(), target.getCurrentViewId());
180:
181: log.debug("Rebinding in JNDI... " + key);
182: rebind();
183: } catch (Exception none) {
184: log.debug(none);
185: }
186: }
187:
188: protected void createProxy(Object cacheID, String proxyBindingName,
189: ClassLoader loader, Class[] ifaces) {
190: GenericProxyFactory proxyFactory = new GenericProxyFactory();
191: theProxy = proxyFactory.createProxy(cacheID, getTargetName(),
192: invoker, getJndiName(), proxyBindingName,
193: getInterceptorClasses(), loader, ifaces);
194: }
195:
196: // inner-classes
197:
198: class StateChangeListener implements NotificationListener {
199: public void handleNotification(Notification notification,
200: Object handback) {
201: if (notification instanceof AttributeChangeNotification) {
202: AttributeChangeNotification notif = (AttributeChangeNotification) notification;
203: state = ((Integer) notif.getNewValue()).intValue();
204:
205: if (state == ServiceMBean.STARTED) {
206: log
207: .debug("Started: enabling remote access to mbean "
208: + getTargetName());
209: containerIsFullyStarted();
210: } else if (state == ServiceMBean.STOPPING) {
211: log
212: .debug("About to stop: disabling remote access to mbean "
213: + getTargetName());
214: containerIsAboutToStop();
215: }
216: }
217: }
218:
219: }
220: }
|