001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.invocation.pooled.server;
023:
024: import org.jboss.system.Registry;
025: import java.rmi.MarshalledObject;
026: import javax.management.ObjectName;
027: import org.jboss.invocation.Invocation;
028: import org.jboss.invocation.MarshalledInvocation;
029: import org.jboss.invocation.pooled.interfaces.PooledInvokerProxy;
030: import org.jboss.invocation.pooled.interfaces.ServerAddress;
031: import java.util.HashMap;
032: import org.jboss.invocation.Invoker;
033: import org.jboss.invocation.InvokerHA;
034: import org.jboss.invocation.jrmp.interfaces.JRMPInvokerProxyHA;
035: import org.jboss.ha.framework.interfaces.HARMIResponse;
036: import org.jboss.ha.framework.server.HATarget;
037: import org.jboss.ha.framework.interfaces.LoadBalancePolicy;
038: import org.jboss.ha.framework.interfaces.GenericClusteringException;
039: import javax.management.InstanceNotFoundException;
040: import javax.management.ReflectionException;
041:
042: import java.util.ArrayList;
043:
044: /**
045: * This invoker pools Threads and client connections to one server socket.
046: * The purpose is to avoid a bunch of failings of RMI.
047: *
048: * 1. Avoid making a client socket connection with every invocation call.
049: * This is very expensive. Also on windows if too many clients try
050: * to connect at the same time, you get connection refused exceptions.
051: * This invoker/proxy combo alleviates this.
052: *
053: * 2. Avoid creating a thread per invocation. The client/server connection
054: * is preserved and attached to the same thread.
055:
056: * So we have connection pooling on the server and client side, and thread pooling
057: * on the server side. Pool, is an LRU pool, so resources should be cleaned up.
058: *
059: *
060: * @author <a href="mailto:bill@jboss.org">Bill Burke</a>
061: * @version $Revision: 57188 $
062: *
063: * @jmx:mbean extends="org.jboss.system.ServiceMBean"
064: */
065: public final class PooledInvokerHA extends PooledInvoker implements
066: InvokerHA {
067: protected HashMap beanMap = new HashMap();
068:
069: protected void jmxBind() {
070: Registry.bind(getServiceName(), this );
071: }
072:
073: // JRMPInvoker.destroyService() does the right thing
074:
075: public java.io.Serializable getStub() {
076: ServerAddress sa = new ServerAddress(clientConnectAddress,
077: clientConnectPort, enableTcpNoDelay, timeout,
078: clientSocketFactory);
079: return new PooledInvokerProxy(sa, clientMaxPoolSize);
080: }
081:
082: public void registerBean(ObjectName beanName, HATarget target)
083: throws Exception {
084: Integer hash = new Integer(beanName.hashCode());
085: log.debug("registerBean: " + beanName);
086:
087: if (beanMap.containsKey(hash)) {
088: // FIXME [oleg] In theory this is possible!
089: throw new IllegalStateException(
090: "Trying to register bean with the existing hashCode");
091: }
092: beanMap.put(hash, target);
093: }
094:
095: public Invoker createProxy(ObjectName beanName,
096: LoadBalancePolicy policy, String proxyFamilyName)
097: throws Exception {
098: Integer hash = new Integer(beanName.hashCode());
099: HATarget target = (HATarget) beanMap.get(hash);
100: if (target == null) {
101: throw new IllegalStateException(
102: "The bean hashCode not found");
103: }
104:
105: String familyName = proxyFamilyName;
106: if (familyName == null)
107: familyName = target.getAssociatedPartition()
108: .getPartitionName()
109: + "/" + beanName;
110:
111: JRMPInvokerProxyHA proxy = new JRMPInvokerProxyHA(target
112: .getReplicants(), policy, familyName, target
113: .getCurrentViewId());
114: return proxy;
115: }
116:
117: public void unregisterBean(ObjectName beanName) throws Exception {
118: Integer hash = new Integer(beanName.hashCode());
119: beanMap.remove(hash);
120: }
121:
122: /**
123: * Invoke a Remote interface method.
124: */
125: public Object invoke(Invocation invocation) throws Exception {
126: ClassLoader oldCl = Thread.currentThread()
127: .getContextClassLoader();
128: try {
129: // Deserialize the transaction if it is there
130: invocation
131: .setTransaction(importTPC(((MarshalledInvocation) invocation)
132: .getTransactionPropagationContext()));
133:
134: // Extract the ObjectName, the rest is still marshalled
135: ObjectName mbean = (ObjectName) Registry.lookup(invocation
136: .getObjectName());
137: long clientViewId = ((Long) invocation
138: .getValue("CLUSTER_VIEW_ID")).longValue();
139:
140: HATarget target = (HATarget) beanMap.get(invocation
141: .getObjectName());
142: if (target == null) {
143: // We could throw IllegalStateException but we have a race condition that could occur:
144: // when we undeploy a bean, the cluster takes some time to converge
145: // and to recalculate a new viewId and list of replicant for each HATarget.
146: // Consequently, a client could own an up-to-date list of the replicants
147: // (before the cluster has converged) and try to perform an invocation
148: // on this node where the HATarget no more exist, thus receiving a
149: // wrong exception and no failover is performed with an IllegalStateException
150: //
151: throw new GenericClusteringException(
152: GenericClusteringException.COMPLETED_NO,
153: "target is not/no more registered on this node");
154: }
155:
156: if (!target.invocationsAllowed())
157: throw new GenericClusteringException(
158: GenericClusteringException.COMPLETED_NO,
159: "invocations are currently not allowed on this target");
160:
161: // The cl on the thread should be set in another interceptor
162: Object rtn = getServer().invoke(mbean, "invoke",
163: new Object[] { invocation },
164: Invocation.INVOKE_SIGNATURE);
165:
166: HARMIResponse rsp = new HARMIResponse();
167:
168: if (clientViewId != target.getCurrentViewId()) {
169: rsp.newReplicants = new ArrayList(target
170: .getReplicants());
171: rsp.currentViewId = target.getCurrentViewId();
172: }
173: rsp.response = rtn;
174: return rsp;
175: } catch (InstanceNotFoundException e) {
176: throw new GenericClusteringException(
177: GenericClusteringException.COMPLETED_NO, e);
178: } catch (ReflectionException e) {
179: throw new GenericClusteringException(
180: GenericClusteringException.COMPLETED_NO, e);
181: } catch (Exception e) {
182: org.jboss.mx.util.JMXExceptionDecoder.rethrow(e);
183:
184: // the compiler does not know an exception is thrown by the above
185: throw new org.jboss.util.UnreachableStatementException();
186: } finally {
187: Thread.currentThread().setContextClassLoader(oldCl);
188: }
189: }
190: }
191: // vim:expandtab:tabstop=3:shiftwidth=3
|