001: /*
002: * $Id: CxfConnector.java 11405 2008-03-18 00:13:00Z dirk.olmes $
003: * --------------------------------------------------------------------------------------
004: * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
005: *
006: * The software in this package is published under the terms of the CPAL v1.0
007: * license, a copy of which has been included with this distribution in the
008: * LICENSE.txt file.
009: */
010:
011: package org.mule.transport.cxf;
012:
013: import org.mule.api.MuleException;
014: import org.mule.api.context.notification.MuleContextNotificationListener;
015: import org.mule.api.context.notification.ServerNotification;
016: import org.mule.api.endpoint.EndpointBuilder;
017: import org.mule.api.endpoint.EndpointURI;
018: import org.mule.api.endpoint.InboundEndpoint;
019: import org.mule.api.lifecycle.InitialisationException;
020: import org.mule.api.service.Service;
021: import org.mule.api.transport.MessageReceiver;
022: import org.mule.component.DefaultJavaComponent;
023: import org.mule.context.notification.MuleContextNotification;
024: import org.mule.endpoint.EndpointURIEndpointBuilder;
025: import org.mule.model.seda.SedaService;
026: import org.mule.object.SingletonObjectFactory;
027: import org.mule.routing.inbound.DefaultInboundRouterCollection;
028: import org.mule.transport.AbstractConnector;
029: import org.mule.transport.cxf.transport.MuleUniversalTransport;
030: import org.mule.transport.http.HttpConnector;
031: import org.mule.transport.http.HttpConstants;
032:
033: import java.util.ArrayList;
034: import java.util.List;
035:
036: import javax.xml.namespace.QName;
037:
038: import org.apache.cxf.Bus;
039: import org.apache.cxf.bus.spring.SpringBusFactory;
040: import org.apache.cxf.endpoint.Server;
041: import org.apache.cxf.transport.ConduitInitiatorManager;
042: import org.apache.cxf.transport.DestinationFactoryManager;
043:
044: /**
045: * Connects Mule to a CXF bus instance.
046: */
047: public class CxfConnector extends AbstractConnector implements
048: MuleContextNotificationListener {
049:
050: public static final String CXF = "cxf";
051: public static final String CXF_SERVICE_COMPONENT_NAME = "_cxfServiceComponent";
052: public static final String CONFIGURATION_LOCATION = "configurationLocation";
053: public static final String DEFAULT_MULE_NAMESPACE_URI = "http://www.muleumo.org";
054: public static final String BUS_PROPERTY = CXF;
055:
056: // The CXF Bus object
057: private Bus bus;
058: private String configurationLocation;
059: private String defaultFrontend = CxfConstants.JAX_WS_FRONTEND;
060: private List<SedaService> services = new ArrayList<SedaService>();
061:
062: public CxfConnector() {
063: super ();
064: registerProtocols();
065: }
066:
067: protected void registerProtocols() {
068: registerSupportedProtocol("http");
069: registerSupportedProtocol("https");
070: registerSupportedProtocol("jms");
071: registerSupportedProtocol("vm");
072: registerSupportedProtocol("servlet");
073: }
074:
075: public String getProtocol() {
076: return CXF;
077: }
078:
079: protected void doInitialise() throws InitialisationException {
080: if (configurationLocation != null) {
081: bus = new SpringBusFactory()
082: .createBus(configurationLocation);
083: } else {
084: bus = new SpringBusFactory().createBus();
085: }
086:
087: MuleUniversalTransport transport = new MuleUniversalTransport(
088: this );
089: DestinationFactoryManager dfm = bus
090: .getExtension(DestinationFactoryManager.class);
091: dfm.registerDestinationFactory(
092: "http://schemas.xmlsoap.org/soap/http", transport);
093: dfm.registerDestinationFactory(
094: "http://schemas.xmlsoap.org/wsdl/soap/http", transport);
095: dfm.registerDestinationFactory(
096: MuleUniversalTransport.TRANSPORT_ID, transport);
097:
098: ConduitInitiatorManager extension = bus
099: .getExtension(ConduitInitiatorManager.class);
100: extension.registerConduitInitiator(
101: "http://schemas.xmlsoap.org/wsdl/soap/", transport);
102: extension.registerConduitInitiator(
103: "http://schemas.xmlsoap.org/soap/http", transport);
104: extension.registerConduitInitiator(
105: MuleUniversalTransport.TRANSPORT_ID, transport);
106:
107: // Registers the listener
108: try {
109: muleContext.registerListener(this );
110: } catch (Exception e) {
111: throw new InitialisationException(e, this );
112: }
113: }
114:
115: protected void doDispose() {
116: // template method
117: }
118:
119: protected void doConnect() throws Exception {
120: // template method
121: }
122:
123: protected void doDisconnect() throws Exception {
124: // template method
125: }
126:
127: protected void doStart() throws MuleException {
128:
129: }
130:
131: protected void doStop() throws MuleException {
132: bus.shutdown(true);
133: }
134:
135: public Bus getCxfBus() {
136: return bus;
137: }
138:
139: public void setCxfBus(Bus bus) {
140: this .bus = bus;
141: }
142:
143: public String getConfigurationLocation() {
144: return configurationLocation;
145: }
146:
147: public void setConfigurationLocation(String configurationLocation) {
148: this .configurationLocation = configurationLocation;
149: }
150:
151: public String getDefaultFrontend() {
152: return defaultFrontend;
153: }
154:
155: public void setDefaultFrontend(String defaultFrontend) {
156: this .defaultFrontend = defaultFrontend;
157: }
158:
159: @SuppressWarnings("unchecked")
160: protected void registerReceiverWithMuleService(
161: MessageReceiver receiver, EndpointURI ep)
162: throws MuleException {
163: CxfMessageReceiver cxfReceiver = (CxfMessageReceiver) receiver;
164: Server server = cxfReceiver.getServer();
165:
166: // TODO MULE-2228 Simplify this API
167: SedaService c = new SedaService();
168: c.setName(CXF_SERVICE_COMPONENT_NAME
169: + server.getEndpoint().getService().getName()
170: + c.hashCode());
171: c.setModel(muleContext.getRegistry().lookupSystemModel());
172:
173: CxfServiceComponent svcComponent = new CxfServiceComponent(
174: (CxfMessageReceiver) receiver);
175: svcComponent.setBus(bus);
176:
177: c.setComponent(new DefaultJavaComponent(
178: new SingletonObjectFactory(svcComponent)));
179:
180: // No determine if the endpointUri requires a new connector to be
181: // registed in the case of http we only need to register the new
182: // endpointUri if the port is different
183: String endpoint = receiver.getEndpointURI().getAddress();
184: String scheme = ep.getScheme().toLowerCase();
185:
186: boolean sync = receiver.getEndpoint().isSynchronous();
187:
188: // If we are using sockets then we need to set the endpoint name appropiately
189: // and if using http/https
190: // we need to default to POST and set the Content-Type
191: if (scheme.equals("http") || scheme.equals("https")
192: || scheme.equals("ssl") || scheme.equals("tcp")
193: || scheme.equals("servlet")) {
194: receiver.getEndpoint().getProperties().put(
195: HttpConnector.HTTP_METHOD_PROPERTY, "POST");
196: receiver.getEndpoint().getProperties().put(
197: HttpConstants.HEADER_CONTENT_TYPE, "text/xml");
198: }
199:
200: QName serviceName = server.getEndpoint().getEndpointInfo()
201: .getName();
202:
203: EndpointBuilder serviceEndpointbuilder = new EndpointURIEndpointBuilder(
204: endpoint, muleContext);
205: serviceEndpointbuilder.setSynchronous(sync);
206: serviceEndpointbuilder.setName(ep.getScheme() + ":"
207: + serviceName.getLocalPart());
208: // Set the transformers on the endpoint too
209: serviceEndpointbuilder.setTransformers(receiver.getEndpoint()
210: .getTransformers().isEmpty() ? null : receiver
211: .getEndpoint().getTransformers());
212: serviceEndpointbuilder
213: .setResponseTransformers(receiver.getEndpoint()
214: .getResponseTransformers().isEmpty() ? null
215: : receiver.getEndpoint()
216: .getResponseTransformers());
217: // set the filter on the axis endpoint on the real receiver endpoint
218: serviceEndpointbuilder.setFilter(receiver.getEndpoint()
219: .getFilter());
220: // set the Security filter on the axis endpoint on the real receiver
221: // endpoint
222: serviceEndpointbuilder.setSecurityFilter(receiver.getEndpoint()
223: .getSecurityFilter());
224:
225: // TODO Do we really need to modify the existing receiver endpoint? What happnes if we don't security,
226: // filters and transformers will get invoked twice?
227: EndpointBuilder receiverEndpointBuilder = new EndpointURIEndpointBuilder(
228: receiver.getEndpoint(), muleContext);
229: // Remove the Axis filter now
230: receiverEndpointBuilder.setFilter(null);
231: // Remove the Axis Receiver Security filter now
232: receiverEndpointBuilder.setSecurityFilter(null);
233:
234: InboundEndpoint serviceEndpoint = muleContext.getRegistry()
235: .lookupEndpointFactory().getInboundEndpoint(
236: serviceEndpointbuilder);
237:
238: InboundEndpoint receiverEndpoint = muleContext.getRegistry()
239: .lookupEndpointFactory().getInboundEndpoint(
240: receiverEndpointBuilder);
241:
242: receiver.setEndpoint(receiverEndpoint);
243:
244: c.setInboundRouter(new DefaultInboundRouterCollection());
245: c.getInboundRouter().addEndpoint(serviceEndpoint);
246:
247: services.add(c);
248: }
249:
250: /**
251: * The method determines the key used to store the receiver against.
252: *
253: * @param service the service for which the endpoint is being registered
254: * @param endpoint the endpoint being registered for the service
255: * @return the key to store the newly created receiver against. In this case it
256: * is the service name, which is equivilent to the Axis service name.
257: */
258: @Override
259: protected Object getReceiverKey(Service service,
260: InboundEndpoint endpoint) {
261: if (endpoint.getEndpointURI().getPort() == -1) {
262: return service.getName();
263: } else {
264: return endpoint.getEndpointURI().getAddress();
265: }
266: }
267:
268: public void onNotification(ServerNotification event) {
269: // We need to register the CXF service service once the model
270: // starts because
271: // when the model starts listeners on components are started, thus
272: // all listener
273: // need to be registered for this connector before the CXF service
274: // service is registered. The implication of this is that to add a
275: // new service and a
276: // different http port the model needs to be restarted before the
277: // listener is available
278: if (event.getAction() == MuleContextNotification.CONTEXT_STARTED) {
279: for (Service c : services) {
280: try {
281: muleContext.getRegistry().registerService(c);
282: } catch (MuleException e) {
283: handleException(e);
284: }
285: }
286: }
287: }
288:
289: public boolean isSyncEnabled(String protocol) {
290: protocol = protocol.toLowerCase();
291: if (protocol.equals("http") || protocol.equals("https")
292: || protocol.equals("ssl") || protocol.equals("tcp")
293: || protocol.equals("servlet")) {
294: return true;
295: } else {
296: return super.isSyncEnabled(protocol);
297: }
298: }
299: }
|