001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.invocation.jrmp.interfaces;
023:
024: import java.io.Externalizable;
025: import java.io.IOException;
026: import java.io.ObjectInput;
027: import java.io.ObjectOutput;
028: import java.rmi.MarshalledObject;
029: import java.rmi.RemoteException;
030: import java.rmi.ServerException;
031: import java.util.ArrayList;
032: import java.util.WeakHashMap;
033: import javax.transaction.TransactionRolledbackException;
034:
035: import org.jboss.ha.framework.interfaces.ClusteringTargetsRepository;
036: import org.jboss.ha.framework.interfaces.FamilyClusterInfo;
037: import org.jboss.ha.framework.interfaces.GenericClusteringException;
038: import org.jboss.ha.framework.interfaces.HARMIResponse;
039: import org.jboss.ha.framework.interfaces.LoadBalancePolicy;
040: import org.jboss.invocation.Invocation;
041: import org.jboss.invocation.Invoker;
042: import org.jboss.invocation.InvokerProxyHA;
043: import org.jboss.invocation.MarshalledInvocation;
044: import org.jboss.invocation.PayloadKey;
045: import org.jboss.invocation.ServiceUnavailableException;
046: import org.jboss.logging.Logger;
047:
048: /**
049: * An extension of the JRMPInvokerProxy that supports failover and load
050: * balancing among a
051: *
052: * @author <a href="mailto:marc.fleury@jboss.org">Marc Fleury</a>
053: * @author Scott.Stark@jboss.org
054: * @version $Revision: 57188 $
055: */
056: public class JRMPInvokerProxyHA extends JRMPInvokerProxy implements
057: InvokerProxyHA, Externalizable {
058: // Public --------------------------------------------------------
059: /** The serialVersionUID
060: * @since 1.7.2.8
061: */
062: private static final long serialVersionUID = -967671822225981666L;
063: private static final Logger log = Logger
064: .getLogger(JRMPInvokerProxyHA.class);
065: public static final WeakHashMap txFailoverAuthorizations = new WeakHashMap();
066:
067: protected LoadBalancePolicy loadBalancePolicy;
068: protected String proxyFamilyName = null;
069:
070: FamilyClusterInfo familyClusterInfo = null;
071: //protected transient long currentViewId = 0;
072: /** Trace level logging flag only set when the proxy is created or read from JNDI */
073: protected transient boolean trace = false;
074:
075: public JRMPInvokerProxyHA() {
076: }
077:
078: public JRMPInvokerProxyHA(ArrayList targets,
079: LoadBalancePolicy policy, String proxyFamilyName,
080: long viewId) {
081: this .familyClusterInfo = ClusteringTargetsRepository
082: .initTarget(proxyFamilyName, targets, viewId);
083: this .loadBalancePolicy = policy;
084: this .proxyFamilyName = proxyFamilyName;
085: this .trace = log.isTraceEnabled();
086: if (trace)
087: log.trace("Init, cluterInfo: " + familyClusterInfo
088: + ", policy=" + loadBalancePolicy);
089: }
090:
091: public void updateClusterInfo(ArrayList targets, long viewId) {
092: if (familyClusterInfo != null)
093: this .familyClusterInfo.updateClusterInfo(targets, viewId);
094: }
095:
096: public Object getRemoteTarget() {
097: return getRemoteTarget(null);
098: }
099:
100: public Object getRemoteTarget(Invocation invocationBasedRouting) {
101: return loadBalancePolicy.chooseTarget(this .familyClusterInfo,
102: invocationBasedRouting);
103: }
104:
105: public void remoteTargetHasFailed(Object target) {
106: removeDeadTarget(target);
107: }
108:
109: protected void removeDeadTarget(Object target) {
110: //System.out.println("Removing a dead target: Size before : " + Integer.toString(this.familyClusterInfo.getTargets ().size()));
111: if (this .familyClusterInfo != null)
112: this .familyClusterInfo.removeDeadTarget(target);
113: }
114:
115: protected int totalNumberOfTargets() {
116: if (this .familyClusterInfo != null)
117: return this .familyClusterInfo.getTargets().size();
118: else
119: return 0;
120: }
121:
122: protected void resetView() {
123: this .familyClusterInfo.resetView();
124: }
125:
126: public boolean txContextAllowsFailover(Invocation invocation) {
127: javax.transaction.Transaction tx = invocation.getTransaction();
128: if (tx != null) {
129: synchronized (tx) {
130: return !txFailoverAuthorizations.containsKey(tx);
131: }
132: } else {
133: return true;
134: }
135: }
136:
137: public void invocationHasReachedAServer(Invocation invocation) {
138: javax.transaction.Transaction tx = invocation.getTransaction();
139: if (tx != null) {
140: synchronized (tx) {
141: txFailoverAuthorizations.put(tx, null);
142: }
143: }
144: }
145:
146: /**
147: * The invocation on the delegate, calls the right invoker. Remote if we are remote, local if we
148: * are local.
149: */
150: public Object invoke(Invocation invocation) throws Exception {
151: // we give the opportunity, to any server interceptor, to know if this a
152: // first invocation to a node or if it is a failovered call
153: //
154: int failoverCounter = 0;
155: invocation.setValue("FAILOVER_COUNTER", new Integer(
156: failoverCounter), PayloadKey.AS_IS);
157:
158: // We are going to go through a Remote invocation, switch to a Marshalled Invocation
159: MarshalledInvocation mi = new MarshalledInvocation(invocation);
160:
161: // Set the transaction propagation context
162: mi
163: .setTransactionPropagationContext(getTransactionPropagationContext());
164: mi.setValue("CLUSTER_VIEW_ID", new Long(this .familyClusterInfo
165: .getCurrentViewId()));
166: Invoker target = (Invoker) getRemoteTarget(invocation);
167:
168: boolean failoverAuthorized = true;
169: Exception lastException = null;
170: while (target != null && failoverAuthorized) {
171: boolean definitivlyRemoveNodeOnFailure = true;
172: try {
173: if (trace)
174: log.trace("Invoking on target=" + target);
175: Object rtnObj = target.invoke(mi);
176: HARMIResponse rsp = null;
177: if (rtnObj instanceof MarshalledObject) {
178: rsp = (HARMIResponse) ((MarshalledObject) rtnObj)
179: .get();
180: } else {
181: rsp = (HARMIResponse) rtnObj;
182: }
183: if (rsp.newReplicants != null) {
184: if (trace) {
185: log
186: .trace("newReplicants: "
187: + rsp.newReplicants);
188: }
189: updateClusterInfo(rsp.newReplicants,
190: rsp.currentViewId);
191: }
192: //else System.out.println("Static set of replicants: " + this.familyClusterInfo.getCurrentViewId () + " (me = " + this + ")");
193:
194: invocationHasReachedAServer(invocation);
195:
196: return rsp.response;
197: } catch (java.net.ConnectException e) {
198: lastException = e;
199: } catch (java.net.UnknownHostException e) {
200: lastException = e;
201: } catch (java.rmi.ConnectException e) {
202: lastException = e;
203: } catch (java.rmi.ConnectIOException e) {
204: lastException = e;
205: } catch (java.rmi.NoSuchObjectException e) {
206: lastException = e;
207: } catch (java.rmi.UnknownHostException e) {
208: lastException = e;
209: } catch (GenericClusteringException e) {
210: lastException = e;
211: // this is a generic clustering exception that contain the
212: // completion status: usefull to determine if we are authorized
213: // to re-issue a query to another node
214: //
215: if (e.getCompletionStatus() == GenericClusteringException.COMPLETED_NO) {
216: // we don't want to remove the node from the list of failed
217: // node UNLESS there is a risk to indefinitively loop
218: //
219: if (totalNumberOfTargets() >= failoverCounter) {
220: if (!e.isDefinitive())
221: definitivlyRemoveNodeOnFailure = false;
222: }
223: } else {
224: invocationHasReachedAServer(invocation);
225: throw new ServerException("Clustering error", e);
226: }
227: } catch (ServerException e) {
228: //Why do NoSuchObjectExceptions get ignored for a retry here
229: //unlike in the non-HA case?
230: invocationHasReachedAServer(invocation);
231: if (e.detail instanceof TransactionRolledbackException) {
232: throw (TransactionRolledbackException) e.detail;
233: }
234: if (e.detail instanceof RemoteException) {
235: throw (RemoteException) e.detail;
236: }
237: throw e;
238: } catch (Exception e) {
239: lastException = e;
240: invocationHasReachedAServer(invocation);
241: throw e;
242: }
243:
244: if (trace)
245: log.trace("Invoke failed, target=" + target,
246: lastException);
247:
248: // If we reach here, this means that we must fail-over
249: remoteTargetHasFailed(target);
250: if (!definitivlyRemoveNodeOnFailure) {
251: resetView();
252: }
253:
254: failoverAuthorized = txContextAllowsFailover(invocation);
255: target = (Invoker) getRemoteTarget(invocation);
256:
257: failoverCounter++;
258: mi.setValue("FAILOVER_COUNTER",
259: new Integer(failoverCounter), PayloadKey.AS_IS);
260: }
261: // if we get here this means list was exhausted
262: String msg = "Service unavailable.";
263: if (failoverAuthorized == false) {
264: msg = "Service unavailable (failover not possible inside a user transaction).";
265: }
266: throw new ServiceUnavailableException(msg, lastException);
267: }
268:
269: /**
270: * Externalize this instance.
271: *
272: * If this instance lives in a different VM than its container
273: * invoker, the remote interface of the container invoker is
274: * not externalized.
275: */
276: public void writeExternal(final ObjectOutput out)
277: throws IOException {
278: // JBAS-2071 - sync on FCI to ensure targets and vid are consistent
279: ArrayList targets = null;
280: long vid = 0;
281: synchronized (this .familyClusterInfo) {
282: targets = this .familyClusterInfo.getTargets();
283: vid = this .familyClusterInfo.getCurrentViewId();
284: }
285: out.writeObject(targets);
286: out.writeObject(this .loadBalancePolicy);
287: out.writeObject(this .proxyFamilyName);
288: out.writeLong(vid);
289: }
290:
291: /**
292: * Un-externalize this instance.
293: *
294: * We check timestamps of the interfaces to see if the instance is in the original VM of creation
295: */
296: public void readExternal(final ObjectInput in) throws IOException,
297: ClassNotFoundException {
298: ArrayList targets = (ArrayList) in.readObject();
299: this .loadBalancePolicy = (LoadBalancePolicy) in.readObject();
300: this .proxyFamilyName = (String) in.readObject();
301: long vid = in.readLong();
302:
303: // keep a reference on our family object
304: //
305: this .familyClusterInfo = ClusteringTargetsRepository
306: .initTarget(this .proxyFamilyName, targets, vid);
307: this .trace = log.isTraceEnabled();
308: if (trace)
309: log.trace("Init, clusterInfo: " + familyClusterInfo
310: + ", policy=" + loadBalancePolicy);
311: }
312:
313: // Private -------------------------------------------------------
314:
315: // Inner classes -------------------------------------------------
316: }
|