001: /**
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: */package org.apache.cxf.clustering;
019:
020: import java.io.IOException;
021: import java.util.HashMap;
022: import java.util.List;
023: import java.util.Map;
024: import java.util.logging.Level;
025: import java.util.logging.Logger;
026:
027: import org.apache.cxf.common.logging.LogUtils;
028: import org.apache.cxf.endpoint.AbstractConduitSelector;
029: import org.apache.cxf.endpoint.Client;
030: import org.apache.cxf.endpoint.Endpoint;
031: import org.apache.cxf.endpoint.Retryable;
032: import org.apache.cxf.helpers.CastUtils;
033: import org.apache.cxf.message.Exchange;
034: import org.apache.cxf.message.Message;
035: import org.apache.cxf.service.model.BindingOperationInfo;
036: import org.apache.cxf.transport.Conduit;
037:
038: /**
039: * Implements a target selection strategy based on failover to an
040: * alternate target endpoint when a transport level failure is
041: * encountered.
042: */
043: public class FailoverTargetSelector extends AbstractConduitSelector {
044:
045: private static final Logger LOG = LogUtils
046: .getL7dLogger(FailoverTargetSelector.class);
047: private Map<InvocationKey, InvocationContext> inProgress;
048: private FailoverStrategy failoverStrategy;
049:
050: /**
051: * Normal constructor.
052: */
053: public FailoverTargetSelector() {
054: this (null);
055: }
056:
057: /**
058: * Constructor, allowing a specific conduit to override normal selection.
059: *
060: * @param c specific conduit
061: */
062: public FailoverTargetSelector(Conduit c) {
063: super (c);
064: inProgress = new HashMap<InvocationKey, InvocationContext>();
065: }
066:
067: /**
068: * Called prior to the interceptor chain being traversed.
069: *
070: * @param message the current Message
071: */
072: public synchronized void prepare(Message message) {
073: Exchange exchange = message.getExchange();
074: InvocationKey key = new InvocationKey(exchange);
075: if (!inProgress.containsKey(key)) {
076: Endpoint endpoint = exchange.get(Endpoint.class);
077: BindingOperationInfo bindingOperationInfo = exchange
078: .get(BindingOperationInfo.class);
079: Object[] params = message.getContent(List.class).toArray();
080: Map<String, Object> context = CastUtils.cast((Map) message
081: .get(Message.INVOCATION_CONTEXT));
082: InvocationContext invocation = new InvocationContext(
083: endpoint, bindingOperationInfo, params, context);
084: inProgress.put(key, invocation);
085: }
086: }
087:
088: /**
089: * Called when a Conduit is actually required.
090: *
091: * @param message
092: * @return the Conduit to use for mediation of the message
093: */
094: public Conduit selectConduit(Message message) {
095: return getSelectedConduit(message);
096: }
097:
098: /**
099: * Called on completion of the MEP for which the Conduit was required.
100: *
101: * @param exchange represents the completed MEP
102: */
103: public void complete(Exchange exchange) {
104: InvocationKey key = new InvocationKey(exchange);
105: InvocationContext invocation = null;
106: synchronized (this ) {
107: invocation = inProgress.get(key);
108: }
109: boolean failover = false;
110: if (requiresFailover(exchange)) {
111: Endpoint failoverTarget = getFailoverTarget(exchange,
112: invocation);
113: if (failoverTarget != null) {
114: endpoint = failoverTarget;
115: selectedConduit.close();
116: selectedConduit = null;
117: Exception prevExchangeFault = (Exception) exchange
118: .remove(Exception.class.getName());
119: Message outMessage = exchange.getOutMessage();
120: Exception prevMessageFault = outMessage
121: .getContent(Exception.class);
122: outMessage.setContent(Exception.class, null);
123: overrideAddressProperty(invocation.getContext());
124: Retryable retry = exchange.get(Retryable.class);
125: exchange.clear();
126: if (retry != null) {
127: try {
128: failover = true;
129: retry.invoke(invocation
130: .getBindingOperationInfo(), invocation
131: .getParams(), invocation.getContext(),
132: exchange);
133: } catch (Exception e) {
134: if (exchange.get(Exception.class) != null) {
135: exchange.put(Exception.class,
136: prevExchangeFault);
137: }
138: if (outMessage.getContent(Exception.class) != null) {
139: outMessage.setContent(Exception.class,
140: prevMessageFault);
141: }
142: }
143: }
144: } else {
145: if (endpoint != invocation.getOriginalEndpoint()) {
146: endpoint = invocation.getOriginalEndpoint();
147: getLogger().log(Level.INFO,
148: "REVERT_TO_ORIGINAL_TARGET",
149: endpoint.getEndpointInfo().getName());
150: }
151: }
152: }
153: if (!failover) {
154: getLogger().info("FAILOVER_NOT_REQUIRED");
155: synchronized (this ) {
156: inProgress.remove(key);
157: }
158: super .complete(exchange);
159: }
160: }
161:
162: /**
163: * @param strategy the FailoverStrategy to use
164: */
165: public synchronized void setStrategy(FailoverStrategy strategy) {
166: getLogger().log(Level.INFO, "USING_STRATEGY",
167: new Object[] { strategy });
168: failoverStrategy = strategy;
169: }
170:
171: /**
172: * @return strategy the FailoverStrategy to use
173: */
174: public synchronized FailoverStrategy getStrategy() {
175: if (failoverStrategy == null) {
176: failoverStrategy = new SequentialStrategy();
177: getLogger().log(Level.INFO, "USING_STRATEGY",
178: new Object[] { failoverStrategy });
179: }
180: return failoverStrategy;
181: }
182:
183: /**
184: * @return the logger to use
185: */
186: protected Logger getLogger() {
187: return LOG;
188: }
189:
190: /**
191: * Check if the exchange is suitable for a failover.
192: *
193: * @param exchange the current Exchange
194: * @return boolean true iff a failover should be attempted
195: */
196: private boolean requiresFailover(Exchange exchange) {
197: Message outMessage = exchange.getOutMessage();
198: Exception ex = outMessage.get(Exception.class) != null ? outMessage
199: .get(Exception.class)
200: : exchange.get(Exception.class);
201: getLogger().log(Level.INFO, "CHECK_LAST_INVOKE_FAILED",
202: new Object[] { ex != null });
203: Throwable curr = ex;
204: boolean failover = false;
205: while (curr != null) {
206: getLogger().log(Level.WARNING,
207: "CHECK_FAILURE_IN_TRANSPORT",
208: new Object[] { ex, curr instanceof IOException });
209: failover = curr instanceof java.io.IOException;
210: curr = curr.getCause();
211: }
212: return failover;
213: }
214:
215: /**
216: * Get the failover target endpoint, if a suitable one is available.
217: *
218: * @param exchange the current Exchange
219: * @param invocation the current InvocationContext
220: * @return a failover endpoint if one is available
221: */
222: private Endpoint getFailoverTarget(Exchange exchange,
223: InvocationContext invocation) {
224: if (invocation.getAlternateTargets() == null) {
225: // no previous failover attempt on this invocation
226: //
227: invocation.setAlternateTargets(getStrategy()
228: .getAlternateEndpoints(exchange));
229: }
230:
231: return getStrategy().selectAlternateEndpoint(
232: invocation.getAlternateTargets());
233: }
234:
235: /**
236: * Override the ENDPOINT_ADDRESS property in the request context
237: *
238: * @param context the request context
239: */
240: private void overrideAddressProperty(Map<String, Object> context) {
241: Map<String, Object> requestContext = CastUtils
242: .cast((Map) context.get(Client.REQUEST_CONTEXT));
243: if (requestContext != null) {
244: requestContext.put(Message.ENDPOINT_ADDRESS, getEndpoint()
245: .getEndpointInfo().getAddress());
246: requestContext.put("javax.xml.ws.service.endpoint.address",
247: getEndpoint().getEndpointInfo().getAddress());
248: }
249: }
250:
251: /**
252: * Used to wrap an Exchange for usage as a Map key. The raw Exchange
253: * is not a suitable key type, as the hashCode is computed from its
254: * current contents, which may obvioulsy change over the lifetime of
255: * an invocation.
256: */
257: private static class InvocationKey {
258: private Exchange exchange;
259:
260: InvocationKey(Exchange ex) {
261: exchange = ex;
262: }
263:
264: @Override
265: public int hashCode() {
266: return System.identityHashCode(exchange);
267: }
268:
269: @Override
270: public boolean equals(Object o) {
271: return o instanceof InvocationKey
272: && exchange == ((InvocationKey) o).exchange;
273: }
274: }
275:
276: /**
277: * Records the context of an invocation.
278: */
279: private static class InvocationContext {
280: private Endpoint originalEndpoint;
281: private BindingOperationInfo bindingOperationInfo;
282: private Object[] params;
283: private Map<String, Object> context;
284: private List<Endpoint> alternateTargets;
285:
286: InvocationContext(Endpoint endpoint, BindingOperationInfo boi,
287: Object[] prms, Map<String, Object> ctx) {
288: originalEndpoint = endpoint;
289: bindingOperationInfo = boi;
290: params = prms;
291: context = ctx;
292: }
293:
294: Endpoint getOriginalEndpoint() {
295: return originalEndpoint;
296: }
297:
298: BindingOperationInfo getBindingOperationInfo() {
299: return bindingOperationInfo;
300: }
301:
302: Object[] getParams() {
303: return params;
304: }
305:
306: Map<String, Object> getContext() {
307: return context;
308: }
309:
310: List<Endpoint> getAlternateTargets() {
311: return alternateTargets;
312: }
313:
314: void setAlternateTargets(List<Endpoint> alternates) {
315: alternateTargets = alternates;
316: }
317: }
318: }
|