001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.invocation.unified.server;
023:
024: import java.util.ArrayList;
025: import java.util.HashMap;
026: import javax.management.ObjectName;
027: import org.jboss.ha.framework.interfaces.GenericClusteringException;
028: import org.jboss.ha.framework.interfaces.HARMIResponse;
029: import org.jboss.ha.framework.interfaces.LoadBalancePolicy;
030: import org.jboss.ha.framework.server.HATarget;
031: import org.jboss.invocation.Invocation;
032: import org.jboss.invocation.Invoker;
033: import org.jboss.invocation.InvokerHA;
034: import org.jboss.invocation.unified.interfaces.UnifiedInvokerHAProxy;
035: import org.jboss.mx.util.JMXExceptionDecoder;
036: import org.jboss.remoting.InvocationRequest;
037: import org.jboss.remoting.serialization.SerializationStreamFactory;
038: import org.jboss.system.Registry;
039:
040: /**
041: * @author <a href="mailto:tom.elrod@jboss.com">Tom Elrod</a>
042: */
043: public class UnifiedInvokerHA extends UnifiedInvoker implements
044: InvokerHA {
045: private HashMap beanMap = new HashMap();
046:
047: public UnifiedInvokerHA() {
048: super ();
049: setSubSystem("invokerha");
050: }
051:
052: protected void jmxBind() {
053: Registry.bind(getServiceName(), this );
054: }
055:
056: public java.io.Serializable getStub() {
057: return getInvoker().getLocator();
058: }
059:
060: public void registerBean(ObjectName beanName, HATarget target)
061: throws Exception {
062: Integer hash = new Integer(beanName.hashCode());
063:
064: if (beanMap.containsKey(hash)) {
065: throw new IllegalStateException("Trying to register bean ("
066: + beanName
067: + ") with an hash code that already exists");
068: }
069: beanMap.put(hash, target);
070: }
071:
072: public Invoker createProxy(ObjectName beanName,
073: LoadBalancePolicy policy, String proxyFamilyName)
074: throws Exception {
075: Integer hash = new Integer(beanName.hashCode());
076: HATarget target = (HATarget) beanMap.get(hash);
077: if (target == null) {
078: throw new IllegalStateException(
079: "The bean hashCode not found");
080: }
081:
082: String familyName = proxyFamilyName;
083: if (familyName == null) {
084: familyName = target.getAssociatedPartition()
085: .getPartitionName()
086: + "/" + beanName;
087: }
088:
089: UnifiedInvokerHAProxy proxy = new UnifiedInvokerHAProxy(
090: getInvoker().getLocator(), getStrictRMIException(),
091: target.getReplicants(), policy, proxyFamilyName, target
092: .getCurrentViewId());
093: return proxy;
094:
095: }
096:
097: public void unregisterBean(ObjectName beanName) throws Exception {
098: Integer hash = new Integer(beanName.hashCode());
099: beanMap.remove(hash);
100: }
101:
102: /**
103: * Implementation of the server invoker handler interface. Will take the invocation request
104: * and invoke down the interceptor chain.
105: *
106: * @param invocationReq
107: * @return
108: * @throws Throwable
109: */
110: public Object invoke(InvocationRequest invocationReq)
111: throws Throwable {
112: Invocation invocation = (Invocation) invocationReq
113: .getParameter();
114: Thread currentThread = Thread.currentThread();
115: ClassLoader oldCl = currentThread.getContextClassLoader();
116: ObjectName mbean = null;
117: try {
118: mbean = (ObjectName) Registry.lookup(invocation
119: .getObjectName());
120:
121: /** Clustering **/
122: long clientViewId = ((Long) invocation
123: .getValue("CLUSTER_VIEW_ID")).longValue();
124: HATarget target = (HATarget) beanMap.get(invocation
125: .getObjectName());
126: if (target == null) {
127: // We could throw IllegalStateException but we have a race condition that could occur:
128: // when we undeploy a bean, the cluster takes some time to converge
129: // and to recalculate a new viewId and list of replicant for each HATarget.
130: // Consequently, a client could own an up-to-date list of the replicants
131: // (before the cluster has converged) and try to perform an invocation
132: // on this node where the HATarget no more exist, thus receiving a
133: // wrong exception and no failover is performed with an IllegalStateException
134: //
135: throw new GenericClusteringException(
136: GenericClusteringException.COMPLETED_NO,
137: "target is not/no more registered on this node");
138: }
139:
140: if (!target.invocationsAllowed()) {
141: throw new GenericClusteringException(
142: GenericClusteringException.COMPLETED_NO,
143: "invocations are currently not allowed on this target");
144: }
145: /** End Clustering **/
146:
147: // The cl on the thread should be set in another interceptor
148: Object obj = getServer().invoke(mbean, "invoke",
149: new Object[] { invocation },
150: Invocation.INVOKE_SIGNATURE);
151:
152: /** Clustering **/
153:
154: HARMIResponse haResponse = new HARMIResponse();
155:
156: if (clientViewId != target.getCurrentViewId()) {
157: haResponse.newReplicants = new ArrayList(target
158: .getReplicants());
159: haResponse.currentViewId = target.getCurrentViewId();
160: }
161: haResponse.response = obj;
162:
163: /** End Clustering **/
164:
165: return SerializationStreamFactory.getManagerInstance(
166: getInvoker().getLocator().findSerializationType())
167: .createdMarshalledValue(haResponse);
168: } catch (Exception e) {
169: Throwable th = JMXExceptionDecoder.decode(e);
170: if (log.isTraceEnabled()) {
171: log.trace("Failed to invoke on mbean: " + mbean, th);
172: }
173:
174: if (th instanceof Exception) {
175: e = (Exception) th;
176: }
177:
178: throw e;
179: } finally {
180: currentThread.setContextClassLoader(oldCl);
181: Thread.interrupted(); // clear interruption because this thread may be pooled.
182: }
183:
184: }
185:
186: }
|