001: /**
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: */package org.apache.cxf.jaxws;
019:
020: import java.net.HttpURLConnection;
021: import java.net.URI;
022: import java.net.URISyntaxException;
023: import java.util.ArrayList;
024: import java.util.HashMap;
025: import java.util.List;
026: import java.util.Map;
027: import java.util.concurrent.Executor;
028: import java.util.concurrent.Executors;
029: import java.util.concurrent.Future;
030: import java.util.concurrent.FutureTask;
031: import java.util.logging.Level;
032: import java.util.logging.Logger;
033:
034: import javax.activation.DataSource;
035: import javax.xml.bind.JAXBContext;
036: import javax.xml.namespace.QName;
037: import javax.xml.soap.SOAPException;
038: import javax.xml.soap.SOAPFactory;
039: import javax.xml.soap.SOAPFault;
040: import javax.xml.soap.SOAPMessage;
041: import javax.xml.transform.Source;
042: import javax.xml.ws.AsyncHandler;
043: import javax.xml.ws.Binding;
044: import javax.xml.ws.BindingProvider;
045: import javax.xml.ws.Dispatch;
046: import javax.xml.ws.Response;
047: import javax.xml.ws.Service;
048: import javax.xml.ws.WebServiceException;
049: import javax.xml.ws.http.HTTPBinding;
050: import javax.xml.ws.http.HTTPException;
051: import javax.xml.ws.soap.SOAPBinding;
052: import javax.xml.ws.soap.SOAPFaultException;
053:
054: import org.apache.cxf.Bus;
055: import org.apache.cxf.binding.soap.SoapBinding;
056: import org.apache.cxf.common.logging.LogUtils;
057: import org.apache.cxf.endpoint.ConduitSelector;
058: import org.apache.cxf.endpoint.Endpoint;
059: import org.apache.cxf.endpoint.UpfrontConduitSelector;
060: import org.apache.cxf.interceptor.Fault;
061: import org.apache.cxf.interceptor.Interceptor;
062: import org.apache.cxf.interceptor.MessageSenderInterceptor;
063: import org.apache.cxf.jaxws.handler.logical.DispatchLogicalHandlerInterceptor;
064: import org.apache.cxf.jaxws.handler.soap.DispatchSOAPHandlerInterceptor;
065: import org.apache.cxf.jaxws.interceptors.DispatchInDatabindingInterceptor;
066: import org.apache.cxf.jaxws.interceptors.DispatchOutDatabindingInterceptor;
067: import org.apache.cxf.jaxws.support.ContextPropertiesMapping;
068: import org.apache.cxf.jaxws.support.JaxWsEndpointImpl;
069: import org.apache.cxf.message.Exchange;
070: import org.apache.cxf.message.ExchangeImpl;
071: import org.apache.cxf.message.Message;
072: import org.apache.cxf.phase.Phase;
073: import org.apache.cxf.phase.PhaseInterceptorChain;
074: import org.apache.cxf.phase.PhaseManager;
075: import org.apache.cxf.service.model.EndpointInfo;
076: import org.apache.cxf.transport.MessageObserver;
077:
078: public class DispatchImpl<T> extends BindingProviderImpl implements
079: Dispatch<T>, MessageObserver {
080: private static final Logger LOG = LogUtils
081: .getL7dLogger(DispatchImpl.class);
082:
083: private Bus bus;
084:
085: private Class<T> cl;
086: private Executor executor;
087: private JAXBContext context;
088: private Service.Mode mode;
089:
090: private ConduitSelector conduitSelector;
091:
092: DispatchImpl(Bus b, Service.Mode m, Class<T> clazz, Executor e,
093: Endpoint ep) {
094: this (b, m, null, clazz, e, ep);
095: }
096:
097: DispatchImpl(Bus b, Service.Mode m, JAXBContext ctx,
098: Class<T> clazz, Executor e, Endpoint ep) {
099: super (((JaxWsEndpointImpl) ep).getJaxwsBinding());
100: bus = b;
101: executor = e;
102: context = ctx;
103: cl = clazz;
104: mode = m;
105: getConduitSelector().setEndpoint(ep);
106: setupEndpointAddressContext(ep);
107: }
108:
109: private void setupEndpointAddressContext(Endpoint endpoint) {
110: //NOTE for jms transport the address would be null
111: if (null != endpoint
112: && null != endpoint.getEndpointInfo().getAddress()) {
113: Map<String, Object> requestContext = this
114: .getRequestContext();
115: requestContext.put(
116: BindingProvider.ENDPOINT_ADDRESS_PROPERTY, endpoint
117: .getEndpointInfo().getAddress());
118: }
119: }
120:
121: public T invoke(T obj) {
122: return invoke(obj, false);
123: }
124:
125: public T invoke(T obj, boolean isOneWay) {
126: if (LOG.isLoggable(Level.INFO)) {
127: LOG.info("Dispatch: invoke called");
128: }
129:
130: Endpoint endpoint = getEndpoint();
131: Message message = endpoint.getBinding().createMessage();
132:
133: if (context != null) {
134: message.setContent(JAXBContext.class, context);
135: }
136:
137: Map<String, Object> reqContext = new HashMap<String, Object>(
138: this .getRequestContext());
139: Map<String, Object> respContext = this .getResponseContext();
140: // clear the response context's hold information
141: // Not call the clear Context is to avoid the error
142: // that getResponseContext() would be called by Client code first
143: respContext.clear();
144:
145: ContextPropertiesMapping.mapRequestfromJaxws2Cxf(reqContext);
146: message.putAll(reqContext);
147: //need to do context mapping from jax-ws to cxf message
148:
149: Exchange exchange = new ExchangeImpl();
150:
151: exchange.setOutMessage(message);
152: setExchangeProperties(exchange, endpoint);
153:
154: message.setContent(Object.class, obj);
155:
156: if (obj instanceof SOAPMessage) {
157: message.setContent(SOAPMessage.class, obj);
158: } else if (obj instanceof Source) {
159: message.setContent(Source.class, obj);
160: } else if (obj instanceof DataSource) {
161: message.setContent(DataSource.class, obj);
162: }
163:
164: message.put(Message.REQUESTOR_ROLE, Boolean.TRUE);
165:
166: PhaseInterceptorChain chain = getDispatchOutChain(endpoint);
167: message.setInterceptorChain(chain);
168:
169: // setup conduit selector
170: prepareConduitSelector(message);
171:
172: // execute chain
173: chain.doIntercept(message);
174:
175: getConduitSelector().complete(exchange);
176:
177: if (message.getContent(Exception.class) != null) {
178: if (getBinding() instanceof SOAPBinding) {
179: try {
180: SOAPFault soapFault = SOAPFactory.newInstance()
181: .createFault();
182: Fault fault = (Fault) message
183: .getContent(Exception.class);
184: soapFault.setFaultCode(fault.getFaultCode());
185: soapFault.setFaultString(fault.getMessage());
186: SOAPFaultException exception = new SOAPFaultException(
187: soapFault);
188: throw exception;
189: } catch (SOAPException e) {
190: throw new WebServiceException(e);
191: }
192: } else if (getBinding() instanceof HTTPBinding) {
193: HTTPException exception = new HTTPException(
194: HttpURLConnection.HTTP_INTERNAL_ERROR);
195: exception
196: .initCause(message.getContent(Exception.class));
197: throw exception;
198: } else {
199: throw new WebServiceException(message
200: .getContent(Exception.class));
201: }
202: }
203:
204: // correlate response
205: if (getConduitSelector().selectConduit(message)
206: .getBackChannel() != null) {
207: // process partial response and wait for decoupled response
208: } else {
209: // process response: send was synchronous so when we get here we can assume that the
210: // Exchange's inbound message is set and had been passed through the inbound interceptor chain.
211: }
212:
213: if (!isOneWay) {
214: synchronized (exchange) {
215: Message inMsg = waitResponse(exchange);
216: respContext.putAll(inMsg);
217: //need to do context mapping from cxf message to jax-ws
218: ContextPropertiesMapping
219: .mapResponsefromCxf2Jaxws(respContext);
220: return cl.cast(inMsg.getContent(Object.class));
221: }
222: }
223: return null;
224:
225: }
226:
227: private Message waitResponse(Exchange exchange) {
228: Message inMsg = exchange.getInMessage();
229: if (inMsg == null) {
230: try {
231: exchange.wait();
232: } catch (InterruptedException e) {
233: //TODO - timeout
234: }
235: inMsg = exchange.getInMessage();
236: }
237: if (inMsg.getContent(Exception.class) != null) {
238: //TODO - exceptions
239: throw new RuntimeException(inMsg
240: .getContent(Exception.class));
241: }
242: return inMsg;
243: }
244:
245: private PhaseInterceptorChain getDispatchOutChain(Endpoint endpoint) {
246: PhaseManager pm = bus.getExtension(PhaseManager.class);
247: PhaseInterceptorChain chain = new PhaseInterceptorChain(pm
248: .getOutPhases());
249:
250: List<Interceptor> il = bus.getOutInterceptors();
251: if (LOG.isLoggable(Level.FINE)) {
252: LOG.fine("Interceptors contributed by bus: " + il);
253: }
254: chain.add(il);
255:
256: if (endpoint instanceof JaxWsEndpointImpl) {
257: Binding jaxwsBinding = ((JaxWsEndpointImpl) endpoint)
258: .getJaxwsBinding();
259: if (endpoint.getBinding() instanceof SoapBinding) {
260: chain.add(new DispatchSOAPHandlerInterceptor(
261: jaxwsBinding));
262: } else {
263: // TODO: what for non soap bindings?
264: }
265: chain.add(new DispatchLogicalHandlerInterceptor(
266: jaxwsBinding));
267: }
268:
269: chain.add(new MessageSenderInterceptor());
270:
271: chain.add(new DispatchOutDatabindingInterceptor(mode));
272: return chain;
273: }
274:
275: public void onMessage(Message message) {
276: Endpoint endpoint = getEndpoint();
277: message = endpoint.getBinding().createMessage(message);
278:
279: message.put(Message.REQUESTOR_ROLE, Boolean.TRUE);
280:
281: PhaseManager pm = bus.getExtension(PhaseManager.class);
282: PhaseInterceptorChain chain = new PhaseInterceptorChain(pm
283: .getInPhases());
284: message.setInterceptorChain(chain);
285:
286: List<Interceptor> il = bus.getInInterceptors();
287: if (LOG.isLoggable(Level.FINE)) {
288: LOG.fine("Interceptors contributed by bus: " + il);
289: }
290: chain.add(il);
291:
292: if (endpoint instanceof JaxWsEndpointImpl) {
293: Binding jaxwsBinding = ((JaxWsEndpointImpl) endpoint)
294: .getJaxwsBinding();
295: if (endpoint.getBinding() instanceof SoapBinding) {
296: chain.add(new DispatchSOAPHandlerInterceptor(
297: jaxwsBinding));
298: }
299: DispatchLogicalHandlerInterceptor slhi = new DispatchLogicalHandlerInterceptor(
300: jaxwsBinding, Phase.USER_LOGICAL);
301: chain.add(slhi);
302: }
303:
304: List<Interceptor> inInterceptors = new ArrayList<Interceptor>();
305: inInterceptors.add(new DispatchInDatabindingInterceptor(cl,
306: mode));
307: chain.add(inInterceptors);
308:
309: // execute chain
310: try {
311: chain.doIntercept(message);
312: } finally {
313: synchronized (message.getExchange()) {
314: message.getExchange().setInMessage(message);
315: message.getExchange().notifyAll();
316: }
317: }
318: }
319:
320: private Executor getExecutor() {
321: if (executor == null) {
322: executor = getEndpoint().getService().getExecutor();
323: }
324: if (executor == null) {
325: executor = Executors.newFixedThreadPool(5);
326: }
327: if (executor == null) {
328: System.err.println("Can't not get executor");
329: }
330: return executor;
331: }
332:
333: private Endpoint getEndpoint() {
334: return getConduitSelector().getEndpoint();
335: }
336:
337: public Future<?> invokeAsync(T obj, AsyncHandler<T> asyncHandler) {
338: Response<?> r = invokeAsync(obj);
339: AsyncCallbackFuture callback = new AsyncCallbackFuture(r,
340: asyncHandler);
341:
342: getExecutor().execute(callback);
343: return callback;
344: }
345:
346: public Response<T> invokeAsync(T obj) {
347: FutureTask<T> f = new FutureTask<T>(
348: new DispatchAsyncCallable<T>(this , obj));
349:
350: getExecutor().execute(f);
351: return new AsyncResponse<T>(f, cl);
352: }
353:
354: public void invokeOneWay(T obj) {
355: invoke(obj, true);
356: }
357:
358: public synchronized ConduitSelector getConduitSelector() {
359: if (null == conduitSelector) {
360: conduitSelector = new UpfrontConduitSelector();
361: }
362: return conduitSelector;
363: }
364:
365: public void setConduitSelector(ConduitSelector selector) {
366: conduitSelector = selector;
367: }
368:
369: protected void prepareConduitSelector(Message message) {
370: message.getExchange().put(ConduitSelector.class,
371: getConduitSelector());
372: }
373:
374: protected void setExchangeProperties(Exchange exchange,
375: Endpoint endpoint) {
376: exchange.put(Service.Mode.class, mode);
377: exchange.put(Class.class, cl);
378: exchange.put(org.apache.cxf.service.Service.class, endpoint
379: .getService());
380: exchange.put(Endpoint.class, endpoint);
381:
382: exchange.put(MessageObserver.class, this );
383: exchange.put(Bus.class, bus);
384:
385: if (endpoint != null) {
386:
387: EndpointInfo endpointInfo = endpoint.getEndpointInfo();
388:
389: QName serviceQName = endpointInfo.getService().getName();
390: exchange.put(Message.WSDL_SERVICE, serviceQName);
391:
392: QName interfaceQName = endpointInfo.getService()
393: .getInterface().getName();
394: exchange.put(Message.WSDL_INTERFACE, interfaceQName);
395:
396: QName portQName = endpointInfo.getName();
397: exchange.put(Message.WSDL_PORT, portQName);
398: URI wsdlDescription = endpointInfo.getProperty("URI",
399: URI.class);
400: if (wsdlDescription == null) {
401: String address = endpointInfo.getAddress();
402: try {
403: wsdlDescription = new URI(address + "?wsdl");
404: } catch (URISyntaxException e) {
405: // do nothing
406: }
407: endpointInfo.setProperty("URI", wsdlDescription);
408: }
409: exchange.put(Message.WSDL_DESCRIPTION, wsdlDescription);
410: }
411: }
412:
413: }
|