001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.invocation.unified.interfaces;
023:
024: import java.io.IOException;
025: import java.io.ObjectInput;
026: import java.io.ObjectOutput;
027: import java.io.StreamCorruptedException;
028: import java.net.MalformedURLException;
029: import java.rmi.MarshalledObject;
030: import java.rmi.RemoteException;
031: import java.rmi.ServerException;
032: import java.util.ArrayList;
033: import java.util.List;
034: import java.util.WeakHashMap;
035: import org.jboss.ha.framework.interfaces.ClusteringTargetsRepository;
036: import org.jboss.ha.framework.interfaces.FamilyClusterInfo;
037: import org.jboss.ha.framework.interfaces.GenericClusteringException;
038: import org.jboss.ha.framework.interfaces.HARMIResponse;
039: import org.jboss.ha.framework.interfaces.LoadBalancePolicy;
040: import org.jboss.invocation.Invocation;
041: import org.jboss.invocation.PayloadKey;
042: import org.jboss.invocation.ServiceUnavailableException;
043: import org.jboss.remoting.CannotConnectException;
044: import org.jboss.remoting.Client;
045: import org.jboss.remoting.InvokerLocator;
046: import org.jboss.remoting.serialization.IMarshalledValue;
047:
048: /**
049: * @author <a href="mailto:tom.elrod@jboss.com">Tom Elrod</a>
050: */
051: public class UnifiedInvokerHAProxy extends UnifiedInvokerProxy {
052:
053: static final long serialVersionUID = -4813929243402349966L;
054:
055: private LoadBalancePolicy loadBalancePolicy;
056: private String proxyFamilyName = null;
057:
058: private FamilyClusterInfo familyClusterInfo = null;
059:
060: public static final WeakHashMap txFailoverAuthorizations = new WeakHashMap();
061:
062: public UnifiedInvokerHAProxy() {
063: super ();
064: log
065: .debug("UnifiedInvokerHAProxy constructor called with no arguments.");
066: setSubSystem("invokerha");
067: }
068:
069: public UnifiedInvokerHAProxy(InvokerLocator locator,
070: boolean isStrictRMIException, ArrayList targets,
071: LoadBalancePolicy policy, String proxyFamilyName,
072: long viewId) {
073: super (locator, isStrictRMIException);
074:
075: this .familyClusterInfo = ClusteringTargetsRepository
076: .initTarget(proxyFamilyName, targets, viewId);
077: this .loadBalancePolicy = policy;
078: this .proxyFamilyName = proxyFamilyName;
079:
080: setSubSystem("invokerha");
081: }
082:
083: public boolean txContextAllowsFailover(Invocation invocation) {
084: javax.transaction.Transaction tx = invocation.getTransaction();
085: if (tx != null) {
086: synchronized (tx) {
087: return !txFailoverAuthorizations.containsKey(tx);
088: }
089: } else {
090: return true;
091: }
092: }
093:
094: public void invocationHasReachedAServer(Invocation invocation) {
095: javax.transaction.Transaction tx = invocation.getTransaction();
096: if (tx != null) {
097: synchronized (tx) {
098: txFailoverAuthorizations.put(tx, null);
099: }
100: }
101: }
102:
103: protected int totalNumberOfTargets() {
104: if (this .familyClusterInfo != null) {
105: return this .familyClusterInfo.getTargets().size();
106: } else {
107: return 0;
108: }
109: }
110:
111: protected void resetView() {
112: this .familyClusterInfo.resetView();
113: }
114:
115: /**
116: * Gets the remoting client to call on which is selected by the load balancing policy.
117: * If the target InvokerLocator selected is not for the current remoting client, a new one
118: * will be initialized.
119: *
120: * @param invocationBasedRouting
121: * @return
122: * @throws MalformedURLException
123: */
124: protected Client getClient(Invocation invocationBasedRouting)
125: throws MalformedURLException {
126: Object target = loadBalancePolicy.chooseTarget(
127: familyClusterInfo, invocationBasedRouting);
128: InvokerLocator targetLocator = (InvokerLocator) target;
129:
130: // check if load balancer pick the client invoker we already have
131: if (!getLocator().equals(targetLocator)) {
132: init(targetLocator);
133: }
134: return getClient();
135: }
136:
137: /**
138: * @param invocation A pointer to the invocation object
139: * @return Return value of method invocation.
140: * @throws Exception Failed to invoke method.
141: */
142: public Object invoke(Invocation invocation) throws Exception {
143: // we give the opportunity, to any server interceptor, to know if this a
144: // first invocation to a node or if it is a failovered call
145: //
146: int failoverCounter = 0;
147: invocation.setValue("FAILOVER_COUNTER", new Integer(
148: failoverCounter), PayloadKey.AS_IS);
149:
150: Object response = null;
151: Exception lastException = null;
152:
153: boolean failoverAuthorized = true;
154: while (familyClusterInfo.getTargets() != null
155: && familyClusterInfo.getTargets().size() > 0
156: && failoverAuthorized) {
157: boolean definitivlyRemoveNodeOnFailure = true;
158:
159: try {
160: invocation.setValue("CLUSTER_VIEW_ID", new Long(
161: this .familyClusterInfo.getCurrentViewId()));
162:
163: log.debug("Client cluster view id: "
164: + familyClusterInfo.getCurrentViewId());
165: log.debug(printPossibleTargets());
166:
167: Client clientInstance = getClient(invocation);
168:
169: log.debug("Making invocation on "
170: + clientInstance.getInvoker().getLocator());
171:
172: response = clientInstance.invoke(invocation, null);
173:
174: HARMIResponse haResponse = null;
175:
176: if (response instanceof Exception) {
177: log.debug("Invocation returened exception: "
178: + response);
179: if (response instanceof GenericClusteringException) {
180: GenericClusteringException gcex = (GenericClusteringException) response;
181: lastException = gcex;
182: // this is a generic clustering exception that contain the
183: // completion status: usefull to determine if we are authorized
184: // to re-issue a query to another node
185: //
186: if (gcex.getCompletionStatus() == GenericClusteringException.COMPLETED_NO) {
187: // we don't want to remove the node from the list of failed
188: // node UNLESS there is a risk to indefinitively loop
189: //
190: if (totalNumberOfTargets() >= failoverCounter) {
191: if (!gcex.isDefinitive()) {
192: definitivlyRemoveNodeOnFailure = false;
193: }
194: }
195: removeDeadTarget(getLocator());
196: if (!definitivlyRemoveNodeOnFailure) {
197: resetView();
198: }
199: failoverAuthorized = txContextAllowsFailover(invocation);
200:
201: failoverCounter++;
202: invocation.setValue("FAILOVER_COUNTER",
203: new Integer(failoverCounter),
204: PayloadKey.AS_IS);
205:
206: log
207: .debug("Received GenericClusteringException where request was not completed. Will retry.");
208:
209: continue;
210: } else {
211: invocationHasReachedAServer(invocation);
212: throw new ServerException(
213: "Clustering error", gcex);
214: }
215: } else {
216: throw ((Exception) response);
217: }
218: }
219: if (response instanceof IMarshalledValue) {
220: haResponse = (HARMIResponse) ((IMarshalledValue) response)
221: .get();
222: } else if (response instanceof MarshalledObject) {
223: haResponse = (HARMIResponse) ((MarshalledObject) response)
224: .get();
225: } else {
226: haResponse = (HARMIResponse) response;
227: }
228:
229: // check for clustered targets
230: if (haResponse.newReplicants != null) {
231: updateClusterInfo(haResponse.newReplicants,
232: haResponse.currentViewId);
233: }
234:
235: response = haResponse.response;
236: return response;
237:
238: } catch (CannotConnectException cncEx) {
239: log.debug(
240: "Invocation failed: CannotConnectException - "
241: + cncEx, cncEx);
242: removeDeadTarget(getLocator());
243: resetView();
244: failoverAuthorized = txContextAllowsFailover(invocation);
245:
246: failoverCounter++;
247: invocation.setValue("FAILOVER_COUNTER", new Integer(
248: failoverCounter), PayloadKey.AS_IS);
249: } catch (GenericClusteringException gcex) {
250: lastException = gcex;
251: // this is a generic clustering exception that contain the
252: // completion status: usefull to determine if we are authorized
253: // to re-issue a query to another node
254: //
255: if (gcex.getCompletionStatus() == GenericClusteringException.COMPLETED_NO) {
256: // we don't want to remove the node from the list of failed
257: // node UNLESS there is a risk to indefinitively loop
258: //
259: if (totalNumberOfTargets() >= failoverCounter) {
260: if (!gcex.isDefinitive()) {
261: definitivlyRemoveNodeOnFailure = false;
262: }
263: }
264: removeDeadTarget(getLocator());
265: if (!definitivlyRemoveNodeOnFailure) {
266: resetView();
267: }
268: failoverAuthorized = txContextAllowsFailover(invocation);
269:
270: failoverCounter++;
271: invocation.setValue("FAILOVER_COUNTER",
272: new Integer(failoverCounter),
273: PayloadKey.AS_IS);
274:
275: log
276: .debug("Received GenericClusteringException where request was not completed. Will retry.");
277: } else {
278: invocationHasReachedAServer(invocation);
279: throw new ServerException("Clustering error", gcex);
280: }
281: } catch (RemoteException aex) {
282: log.debug(
283: "Invocation failed: RemoteException - " + aex,
284: aex);
285:
286: // per Jira issue JBREM-61
287: if (isStrictRMIException()) {
288: throw new ServerException(aex.getMessage(), aex);
289: } else {
290: throw aex;
291: }
292: } catch (Throwable throwable) {
293: log.debug("Invocation failed: " + throwable, throwable);
294:
295: // this is somewhat of a hack as remoting throws throwable,
296: // so will let Exception types bubble up, but if Throwable type,
297: // then have to wrap in new Exception, as this is the signature
298: // of this invoke method.
299: if (throwable instanceof Exception) {
300: throw (Exception) throwable;
301: }
302: throw new Exception(throwable);
303: }
304: }
305:
306: if (failoverAuthorized == false) {
307: throw new ServiceUnavailableException(
308: "Service unavailable (failover not possible inside a user transaction) for "
309: + invocation.getObjectName()
310: + " calling method "
311: + invocation.getMethod(), lastException);
312: } else {
313: throw new ServiceUnavailableException(
314: "Service unavailable for "
315: + invocation.getObjectName()
316: + " calling method "
317: + invocation.getMethod(), lastException);
318: }
319: }
320:
321: private Object printPossibleTargets() {
322: StringBuffer buffer = new StringBuffer();
323: if (familyClusterInfo != null) {
324: List possibleTargets = familyClusterInfo.getTargets();
325: if (possibleTargets != null && possibleTargets.size() > 0) {
326: for (int x = 0; x < possibleTargets.size(); x++) {
327: buffer.append("\nPossible target " + (x + 1) + ": "
328: + possibleTargets.get(x));
329: }
330: }
331: }
332: return buffer.toString();
333: }
334:
335: private void removeDeadTarget(InvokerLocator locator) {
336: if (locator != null) {
337: if (this .familyClusterInfo != null) {
338: familyClusterInfo.removeDeadTarget(locator);
339: log.debug("Removed " + locator + " from target list.");
340: }
341: }
342: }
343:
344: private void updateClusterInfo(ArrayList newReplicants,
345: long currentViewId) {
346: if (familyClusterInfo != null) {
347: familyClusterInfo.updateClusterInfo(newReplicants,
348: currentViewId);
349: log.debug("Updating cluster info. New view id: "
350: + currentViewId);
351: log.debug("New cluster target list is:");
352: for (int x = 0; x < newReplicants.size(); x++) {
353: log.debug(newReplicants.get(x));
354: }
355: }
356: }
357:
358: /**
359: * Externalize this instance and handle obtaining the remoteInvoker stub
360: */
361: public void writeExternal(final ObjectOutput out)
362: throws IOException {
363: out.writeInt(CURRENT_VERSION);
364:
365: out.writeUTF(getLocator().getOriginalURI());
366: out.writeBoolean(isStrictRMIException());
367: // JBAS-2071 - sync on FCI to ensure targets and vid are consistent
368: ArrayList targets = null;
369: long vid = 0;
370: synchronized (this .familyClusterInfo) {
371: targets = this .familyClusterInfo.getTargets();
372: vid = this .familyClusterInfo.getCurrentViewId();
373: }
374: out.writeObject(targets);
375: out.writeObject(this .loadBalancePolicy);
376: out.writeObject(this .proxyFamilyName);
377: out.writeLong(vid);
378: }
379:
380: /**
381: * Un-externalize this instance.
382: */
383: public void readExternal(final ObjectInput in) throws IOException,
384: ClassNotFoundException {
385: int version = in.readInt();
386: // Read in and map the version of the serialized data seen
387: switch (version) {
388: case VERSION_5_0:
389: setLocator(new InvokerLocator(in.readUTF()));
390: setStrictRMIException(in.readBoolean());
391: init(getLocator());
392:
393: ArrayList targets = (ArrayList) in.readObject();
394: this .loadBalancePolicy = (LoadBalancePolicy) in
395: .readObject();
396: this .proxyFamilyName = (String) in.readObject();
397: long vid = in.readLong();
398:
399: // keep a reference on our family object
400: //
401: this .familyClusterInfo = ClusteringTargetsRepository
402: .initTarget(this .proxyFamilyName, targets, vid);
403:
404: break;
405: default:
406: throw new StreamCorruptedException("Unknown version seen: "
407: + version);
408: }
409: }
410:
411: }
|