001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.servicemix.cxfbc;
018:
019: import java.util.ArrayList;
020: import java.util.Iterator;
021: import java.util.List;
022: import java.util.Map;
023: import java.util.ResourceBundle;
024: import java.util.concurrent.ConcurrentHashMap;
025: import java.util.concurrent.CopyOnWriteArrayList;
026:
027: import javax.activation.DataHandler;
028: import javax.jbi.component.ComponentContext;
029: import javax.jbi.management.DeploymentException;
030: import javax.jbi.messaging.ExchangeStatus;
031: import javax.jbi.messaging.MessageExchange;
032: import javax.wsdl.WSDLException;
033: import javax.wsdl.factory.WSDLFactory;
034: import javax.wsdl.xml.WSDLReader;
035: import javax.xml.namespace.QName;
036: import javax.xml.transform.Source;
037:
038: import org.w3c.dom.Element;
039: import org.w3c.dom.Node;
040: import org.w3c.dom.NodeList;
041:
042: import com.ibm.wsdl.Constants;
043: import org.apache.cxf.Bus;
044: import org.apache.cxf.attachment.AttachmentImpl;
045: import org.apache.cxf.binding.AbstractBindingFactory;
046: import org.apache.cxf.binding.jbi.JBIFault;
047: import org.apache.cxf.binding.soap.SoapFault;
048: import org.apache.cxf.binding.soap.SoapMessage;
049: import org.apache.cxf.binding.soap.interceptor.MustUnderstandInterceptor;
050: import org.apache.cxf.binding.soap.interceptor.ReadHeadersInterceptor;
051: import org.apache.cxf.binding.soap.interceptor.SoapActionOutInterceptor;
052: import org.apache.cxf.binding.soap.interceptor.SoapOutInterceptor;
053: import org.apache.cxf.binding.soap.interceptor.SoapPreProtocolOutInterceptor;
054: import org.apache.cxf.bus.spring.SpringBusFactory;
055: import org.apache.cxf.endpoint.Endpoint;
056: import org.apache.cxf.endpoint.EndpointImpl;
057: import org.apache.cxf.endpoint.Server;
058: import org.apache.cxf.endpoint.ServerImpl;
059: import org.apache.cxf.helpers.DOMUtils;
060: import org.apache.cxf.interceptor.AttachmentInInterceptor;
061: import org.apache.cxf.interceptor.AttachmentOutInterceptor;
062: import org.apache.cxf.interceptor.Fault;
063: import org.apache.cxf.interceptor.Interceptor;
064: import org.apache.cxf.interceptor.OutgoingChainInterceptor;
065: import org.apache.cxf.interceptor.StaxInInterceptor;
066: import org.apache.cxf.interceptor.StaxOutInterceptor;
067: import org.apache.cxf.message.Attachment;
068: import org.apache.cxf.message.Exchange;
069: import org.apache.cxf.message.Message;
070: import org.apache.cxf.message.MessageContentsList;
071: import org.apache.cxf.phase.AbstractPhaseInterceptor;
072: import org.apache.cxf.phase.Phase;
073: import org.apache.cxf.service.Service;
074: import org.apache.cxf.service.invoker.Invoker;
075: import org.apache.cxf.service.model.BindingFaultInfo;
076: import org.apache.cxf.service.model.BindingOperationInfo;
077: import org.apache.cxf.service.model.EndpointInfo;
078: import org.apache.cxf.service.model.MessagePartInfo;
079: import org.apache.cxf.service.model.ServiceInfo;
080: import org.apache.cxf.transport.ChainInitiationObserver;
081: import org.apache.cxf.ws.addressing.AddressingProperties;
082: import org.apache.cxf.ws.rm.Servant;
083: import org.apache.cxf.wsdl.WSDLManager;
084: import org.apache.cxf.wsdl11.WSDLServiceFactory;
085: import org.apache.servicemix.common.endpoints.ConsumerEndpoint;
086: import org.apache.servicemix.cxfbc.interceptors.JbiInInterceptor;
087: import org.apache.servicemix.cxfbc.interceptors.JbiInWsdl1Interceptor;
088: import org.apache.servicemix.cxfbc.interceptors.JbiOperationInterceptor;
089: import org.apache.servicemix.cxfbc.interceptors.JbiOutWsdl1Interceptor;
090: import org.apache.servicemix.cxfbc.interceptors.MtomCheckInterceptor;
091: import org.apache.servicemix.jbi.jaxp.SourceTransformer;
092: import org.apache.servicemix.jbi.messaging.NormalizedMessageImpl;
093: import org.apache.servicemix.soap.util.DomUtil;
094: import org.springframework.core.io.Resource;
095:
096: /**
097: *
098: * @author gnodet
099: * @org.apache.xbean.XBean element="consumer"
100: */
101: public class CxfBcConsumer extends ConsumerEndpoint implements
102: CxfBcEndpointWithInterceptor {
103:
104: List<Interceptor> in = new CopyOnWriteArrayList<Interceptor>();
105:
106: List<Interceptor> out = new CopyOnWriteArrayList<Interceptor>();
107:
108: List<Interceptor> outFault = new CopyOnWriteArrayList<Interceptor>();
109:
110: List<Interceptor> inFault = new CopyOnWriteArrayList<Interceptor>();
111:
112: private Resource wsdl;
113:
114: private Endpoint ep;
115:
116: private ChainInitiationObserver chain;
117:
118: private Server server;
119:
120: private Map<String, Message> messages = new ConcurrentHashMap<String, Message>();
121:
122: private boolean synchronous = true;
123:
124: private boolean isOneway;
125:
126: private String busCfg;
127:
128: private BindingFaultInfo faultWanted;
129:
130: private Bus bus;
131:
132: private boolean mtomEnabled;
133:
134: private String locationURI;
135:
136: private int timeout = 10;
137:
138: private boolean useJBIWrapper = true;
139:
140: /**
141: * @return the wsdl
142: */
143: public Resource getWsdl() {
144: return wsdl;
145: }
146:
147: /**
148: * @param wsdl
149: * the wsdl to set
150: */
151: public void setWsdl(Resource wsdl) {
152: this .wsdl = wsdl;
153: }
154:
155: public List<Interceptor> getOutFaultInterceptors() {
156: return outFault;
157: }
158:
159: public List<Interceptor> getInFaultInterceptors() {
160: return inFault;
161: }
162:
163: public List<Interceptor> getInInterceptors() {
164: return in;
165: }
166:
167: public List<Interceptor> getOutInterceptors() {
168: return out;
169: }
170:
171: public void setInInterceptors(List<Interceptor> interceptors) {
172: in = interceptors;
173: }
174:
175: public void setInFaultInterceptors(List<Interceptor> interceptors) {
176: inFault = interceptors;
177: }
178:
179: public void setOutInterceptors(List<Interceptor> interceptors) {
180: out = interceptors;
181: }
182:
183: public void setOutFaultInterceptors(List<Interceptor> interceptors) {
184: outFault = interceptors;
185: }
186:
187: public void process(MessageExchange exchange) throws Exception {
188: Message message = messages.remove(exchange.getExchangeId());
189: message.getInterceptorChain().resume();
190: if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
191: exchange.setStatus(ExchangeStatus.DONE);
192: message.getExchange().get(ComponentContext.class)
193: .getDeliveryChannel().send(exchange);
194: }
195: }
196:
197: @Override
198: public void start() throws Exception {
199: super .start();
200: server.start();
201: }
202:
203: @Override
204: public void stop() throws Exception {
205: server.stop();
206: super .stop();
207: }
208:
209: @Override
210: public void validate() throws DeploymentException {
211: try {
212: if (definition == null) {
213: if (wsdl == null) {
214: throw new DeploymentException(
215: "wsdl property must be set");
216: }
217: description = DomUtil.parse(wsdl.getInputStream());
218: WSDLFactory wsdlFactory = WSDLFactory.newInstance();
219: WSDLReader reader = wsdlFactory.newWSDLReader();
220: reader.setFeature(Constants.FEATURE_VERBOSE, false);
221: // definition = reader.readWSDL(wsdl.getURL().toString(),
222: // description);
223: try {
224: // use wsdl manager to parse wsdl or get cached definition
225: definition = getBus().getExtension(
226: WSDLManager.class).getDefinition(
227: wsdl.getURL());
228: } catch (WSDLException ex) {
229: // throw new ServiceConstructionException(new
230: // Message("SERVICE_CREATION_MSG", LOG), ex);
231: }
232: }
233: if (service == null) {
234: // looking for the servicename according to targetServiceName
235: // first
236: if (definition.getServices().containsKey(
237: getTargetService())) {
238: service = getTargetService();
239: } else {
240: service = (QName) definition.getServices().keySet()
241: .iterator().next();
242: }
243: }
244: WSDLServiceFactory factory = new WSDLServiceFactory(
245: getBus(), definition, service);
246: Service cxfService = factory.create();
247:
248: EndpointInfo ei = cxfService.getServiceInfos().iterator()
249: .next().getEndpoints().iterator().next();
250: for (ServiceInfo serviceInfo : cxfService.getServiceInfos()) {
251: if (serviceInfo.getName().equals(service)
252: && getEndpoint() != null
253: && serviceInfo
254: .getEndpoint(new QName(serviceInfo
255: .getName().getNamespaceURI(),
256: getEndpoint())) != null) {
257: ei = serviceInfo
258: .getEndpoint(new QName(serviceInfo
259: .getName().getNamespaceURI(),
260: getEndpoint()));
261:
262: }
263: }
264:
265: if (endpoint == null) {
266: endpoint = ei.getName().getLocalPart();
267: }
268:
269: if (locationURI != null) {
270: ei.setAddress(locationURI);
271: }
272:
273: ei.getBinding().setProperty(
274: AbstractBindingFactory.DATABINDING_DISABLED,
275: Boolean.TRUE);
276:
277: cxfService.getInInterceptors().add(
278: new MustUnderstandInterceptor());
279: cxfService.getInInterceptors().add(
280: new AttachmentInInterceptor());
281: cxfService.getInInterceptors().add(new StaxInInterceptor());
282: cxfService.getInInterceptors().add(
283: new ReadHeadersInterceptor(getBus()));
284: cxfService.getInInterceptors().add(
285: new JbiOperationInterceptor());
286: cxfService.getInInterceptors().add(
287: new JbiInWsdl1Interceptor(isUseJBIWrapper()));
288: cxfService.getInInterceptors().add(new JbiInInterceptor());
289: cxfService.getInInterceptors().add(
290: new JbiInvokerInterceptor());
291: cxfService.getInInterceptors().add(
292: new JbiPostInvokerInterceptor());
293:
294: cxfService.getInInterceptors().add(
295: new OutgoingChainInterceptor());
296:
297: cxfService.getOutInterceptors().add(
298: new JbiOutWsdl1Interceptor(isUseJBIWrapper()));
299: cxfService.getOutInterceptors().add(
300: new SoapActionOutInterceptor());
301: cxfService.getOutInterceptors().add(
302: new AttachmentOutInterceptor());
303: cxfService.getOutInterceptors().add(
304: new MtomCheckInterceptor(isMtomEnabled()));
305: cxfService.getOutInterceptors().add(
306: new StaxOutInterceptor());
307: cxfService.getOutInterceptors().add(
308: new SoapPreProtocolOutInterceptor());
309: cxfService.getOutInterceptors().add(
310: new SoapOutInterceptor(getBus()));
311: cxfService.getOutFaultInterceptors().add(
312: new SoapOutInterceptor(getBus()));
313:
314: ep = new EndpointImpl(getBus(), cxfService, ei);
315: getInInterceptors().addAll(getBus().getInInterceptors());
316: getInFaultInterceptors().addAll(
317: getBus().getInFaultInterceptors());
318: getOutInterceptors().addAll(getBus().getOutInterceptors());
319: getOutFaultInterceptors().addAll(
320: getBus().getOutFaultInterceptors());
321:
322: cxfService.getInInterceptors().addAll(getInInterceptors());
323: cxfService.getInFaultInterceptors().addAll(
324: getInFaultInterceptors());
325: cxfService.getOutInterceptors()
326: .addAll(getOutInterceptors());
327: cxfService.getOutFaultInterceptors().addAll(
328: getOutFaultInterceptors());
329:
330: ep.getInInterceptors().addAll(getInInterceptors());
331: ep.getInFaultInterceptors()
332: .addAll(getInFaultInterceptors());
333: ep.getOutInterceptors().addAll(getOutInterceptors());
334: ep.getOutFaultInterceptors().addAll(
335: getOutFaultInterceptors());
336:
337: ep.getOutInterceptors().add(new SoapActionOutInterceptor());
338: ep.getOutInterceptors().add(new AttachmentOutInterceptor());
339: ep.getOutInterceptors().add(new StaxOutInterceptor());
340: ep.getOutInterceptors().add(
341: new SoapOutInterceptor(getBus()));
342:
343: cxfService.getInInterceptors().addAll(
344: getBus().getInInterceptors());
345: cxfService.getInFaultInterceptors().addAll(
346: getBus().getInFaultInterceptors());
347: cxfService.getOutInterceptors().addAll(
348: getBus().getOutInterceptors());
349: cxfService.getOutFaultInterceptors().addAll(
350: getBus().getOutFaultInterceptors());
351:
352: chain = new JbiChainInitiationObserver(ep, getBus());
353: server = new ServerImpl(getBus(), ep, null, chain);
354:
355: super .validate();
356: } catch (DeploymentException e) {
357: throw e;
358: } catch (Exception e) {
359: throw new DeploymentException(e);
360: }
361: }
362:
363: protected Bus getBus() {
364: if (getBusCfg() != null) {
365: if (bus == null) {
366: SpringBusFactory bf = new SpringBusFactory();
367: bus = bf.createBus(getBusCfg());
368: }
369: return bus;
370: } else {
371: return ((CxfBcComponent) getServiceUnit().getComponent())
372: .getBus();
373: }
374: }
375:
376: public void setLocationURI(String locationURI) {
377: this .locationURI = locationURI;
378: }
379:
380: public String getLocationURI() {
381: return locationURI;
382: }
383:
384: protected class JbiChainInitiationObserver extends
385: ChainInitiationObserver {
386:
387: public JbiChainInitiationObserver(Endpoint endpoint, Bus bus) {
388: super (endpoint, bus);
389: }
390:
391: protected void setExchangeProperties(Exchange exchange,
392: Message m) {
393: super .setExchangeProperties(exchange, m);
394: exchange.put(ComponentContext.class, CxfBcConsumer.this
395: .getContext());
396: exchange.put(CxfBcConsumer.class, CxfBcConsumer.this );
397: }
398:
399: }
400:
401: public class JbiInvokerInterceptor extends
402: AbstractPhaseInterceptor<Message> {
403:
404: public JbiInvokerInterceptor() {
405: super (Phase.INVOKE);
406: }
407:
408: private Object getInvokee(Message message) {
409: Object invokee = message.getContent(List.class);
410: if (invokee == null) {
411: invokee = message.getContent(Object.class);
412: }
413: return invokee;
414: }
415:
416: private void copyJaxwsProperties(Message inMsg, Message outMsg) {
417: outMsg.put(Message.WSDL_OPERATION, inMsg
418: .get(Message.WSDL_OPERATION));
419: outMsg.put(Message.WSDL_SERVICE, inMsg
420: .get(Message.WSDL_SERVICE));
421: outMsg.put(Message.WSDL_INTERFACE, inMsg
422: .get(Message.WSDL_INTERFACE));
423: outMsg.put(Message.WSDL_PORT, inMsg.get(Message.WSDL_PORT));
424: outMsg.put(Message.WSDL_DESCRIPTION, inMsg
425: .get(Message.WSDL_DESCRIPTION));
426: }
427:
428: public void handleMessage(final Message message) throws Fault {
429:
430: final Exchange cxfExchange = message.getExchange();
431: final Endpoint endpoint = cxfExchange.get(Endpoint.class);
432: final Service service = endpoint.getService();
433: final Invoker invoker = service.getInvoker();
434:
435: if (invoker instanceof Servant) {
436: // it's rm request, run the invocation directly in bc, not send
437: // to se.
438:
439: Exchange runableEx = message.getExchange();
440:
441: Object result = invoker.invoke(runableEx,
442: getInvokee(message));
443: if (!cxfExchange.isOneWay()) {
444: Endpoint end = cxfExchange.get(Endpoint.class);
445:
446: Message outMessage = runableEx.getOutMessage();
447: if (outMessage == null) {
448: outMessage = end.getBinding().createMessage();
449: cxfExchange.setOutMessage(outMessage);
450: }
451: copyJaxwsProperties(message, outMessage);
452: if (result != null) {
453: MessageContentsList resList = null;
454: if (result instanceof MessageContentsList) {
455: resList = (MessageContentsList) result;
456: } else if (result instanceof List) {
457: resList = new MessageContentsList(
458: (List) result);
459: } else if (result.getClass().isArray()) {
460: resList = new MessageContentsList(
461: (Object[]) result);
462: } else {
463: outMessage.setContent(Object.class, result);
464: }
465: if (resList != null) {
466: outMessage.setContent(List.class, resList);
467: }
468: }
469: }
470:
471: return;
472: }
473:
474: MessageExchange exchange = message
475: .getContent(MessageExchange.class);
476: ComponentContext context = message.getExchange().get(
477: ComponentContext.class);
478: CxfBcConsumer.this .configureExchangeTarget(exchange);
479: CxfBcConsumer.this .messages.put(exchange.getExchangeId(),
480: message);
481: CxfBcConsumer.this .isOneway = message.getExchange().get(
482: BindingOperationInfo.class).getOperationInfo()
483: .isOneWay();
484: message.getExchange()
485: .setOneWay(CxfBcConsumer.this .isOneway);
486:
487: try {
488: if (CxfBcConsumer.this .synchronous
489: && !CxfBcConsumer.this .isOneway) {
490: message.getInterceptorChain().pause();
491: context.getDeliveryChannel().sendSync(exchange,
492: timeout * 1000);
493: process(exchange);
494: } else {
495: context.getDeliveryChannel().send(exchange);
496:
497: }
498: } catch (Exception e) {
499: throw new Fault(e);
500: }
501: }
502:
503: }
504:
505: protected class JbiPostInvokerInterceptor extends
506: AbstractPhaseInterceptor<Message> {
507: public JbiPostInvokerInterceptor() {
508: super (Phase.POST_INVOKE);
509: addBefore(OutgoingChainInterceptor.class.getName());
510: }
511:
512: public void handleMessage(final Message message) throws Fault {
513: MessageExchange exchange = message
514: .getContent(MessageExchange.class);
515: Exchange ex = message.getExchange();
516: if (exchange.getStatus() == ExchangeStatus.ERROR) {
517: throw new Fault(exchange.getError());
518: }
519: if (!ex.isOneWay()) {
520: if (exchange.getFault() != null) {
521: Fault f = null;
522: if (isUseJBIWrapper()) {
523: f = new JBIFault(
524: new org.apache.cxf.common.i18n.Message(
525: "Fault occured",
526: (ResourceBundle) null));
527: Element details = toElement(exchange.getFault()
528: .getContent());
529: f.setDetail(details);
530:
531: } else {
532: Element details = toElement(exchange.getFault()
533: .getContent());
534:
535: details = (Element) details
536: .getElementsByTagNameNS(
537: details.getNamespaceURI(),
538: "Body").item(0);
539: assert details != null;
540: details = (Element) details
541: .getElementsByTagNameNS(
542: details.getNamespaceURI(),
543: "Fault").item(0);
544: assert details != null;
545: details = (Element) details
546: .getElementsByTagName("detail").item(0);
547: assert details != null;
548: f = new SoapFault(
549: new org.apache.cxf.common.i18n.Message(
550: "Fault occured",
551: (ResourceBundle) null),
552: new QName(details.getNamespaceURI(),
553: "detail"));
554: f.setDetail(details);
555:
556: }
557: processFaultDetail(f, message);
558: message.put(BindingFaultInfo.class, faultWanted);
559:
560: throw f;
561: } else if (exchange.getMessage("out") != null) {
562: Endpoint endpoint = ex.get(Endpoint.class);
563: Message outMessage = ex.getOutMessage();
564: if (outMessage == null) {
565: outMessage = endpoint.getBinding()
566: .createMessage();
567: ex.setOutMessage(outMessage);
568: }
569: NormalizedMessageImpl norMessage = (NormalizedMessageImpl) exchange
570: .getMessage("out");
571:
572: if (outMessage instanceof SoapMessage) {
573: AddressingProperties addressingProperties = WSAUtils
574: .getCXFAddressingPropertiesFromMap((Map<String, String>) norMessage
575: .getProperty(WSAUtils.WSA_HEADERS_OUTBOUND));
576: outMessage.put(WSAUtils.WSA_HEADERS_OUTBOUND,
577: addressingProperties);
578: }
579: List<Attachment> attachmentList = new ArrayList<Attachment>();
580: outMessage.setContent(Source.class, exchange
581: .getMessage("out").getContent());
582: Iterator<String> iter = norMessage
583: .listAttachments();
584: while (iter.hasNext()) {
585: String id = iter.next();
586: DataHandler dh = norMessage.getAttachment(id);
587: attachmentList.add(new AttachmentImpl(id, dh));
588: }
589:
590: outMessage.setAttachments(attachmentList);
591: }
592: }
593:
594: }
595:
596: // this method is used for ws-policy to set BindingFaultInfo
597: protected void processFaultDetail(Fault fault, Message msg) {
598: Element exDetail = (Element) DOMUtils.getChild(fault
599: .getDetail(), Node.ELEMENT_NODE);
600: QName qname = new QName(exDetail.getNamespaceURI(),
601: exDetail.getLocalName());
602:
603: faultWanted = null;
604: BindingOperationInfo boi = msg.getExchange().get(
605: BindingOperationInfo.class);
606: if (boi.isUnwrapped()) {
607: boi = boi.getWrappedOperation();
608: }
609: for (BindingFaultInfo bfi : boi.getFaults()) {
610: for (MessagePartInfo mpi : bfi.getFaultInfo()
611: .getMessageParts()) {
612: if (qname.equals(mpi.getConcreteName())) {
613: faultWanted = bfi;
614: msg.put(BindingFaultInfo.class, faultWanted);
615: break;
616: }
617: }
618: if (faultWanted != null) {
619: break;
620: }
621: }
622:
623: }
624:
625: }
626:
627: private static Element toElement(Source src) throws Fault {
628: try {
629: SourceTransformer transformer = new SourceTransformer();
630: Element ret = transformer.toDOMElement(src);
631: ret = removeEmptyDefaultTns(ret);
632: return ret;
633: } catch (Exception e) {
634: throw new Fault(e);
635: }
636: }
637:
638: private static Element removeEmptyDefaultTns(Element ret) {
639: // to make unquailied fault work
640: if (ret.hasAttribute("xmlns")
641: && ret.getAttribute("xmlns").length() == 0) {
642: ret.removeAttribute("xmlns");
643: }
644: NodeList nodes = ret.getChildNodes();
645: for (int i = 0; i < nodes.getLength(); i++) {
646: if (nodes.item(i) instanceof Element) {
647: Element ele = (Element) nodes.item(i);
648: ele = removeEmptyDefaultTns(ele);
649:
650: }
651: }
652: return ret;
653: }
654:
655: public void setBusCfg(String busCfg) {
656: this .busCfg = busCfg;
657: }
658:
659: public String getBusCfg() {
660: return busCfg;
661: }
662:
663: public void setMtomEnabled(boolean mtomEnabled) {
664: this .mtomEnabled = mtomEnabled;
665: }
666:
667: public boolean isMtomEnabled() {
668: return mtomEnabled;
669: }
670:
671: public void setTimeout(int timeout) {
672: this .timeout = timeout;
673: }
674:
675: public int getTimeout() {
676: return timeout;
677: }
678:
679: public void setUseJBIWrapper(boolean useJBIWrapper) {
680: this .useJBIWrapper = useJBIWrapper;
681: }
682:
683: public boolean isUseJBIWrapper() {
684: return useJBIWrapper;
685: }
686:
687: }
|