001: /**
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: */package org.apache.cxf.endpoint;
019:
020: import java.net.URI;
021: import java.net.URISyntaxException;
022: import java.net.URL;
023: import java.util.List;
024: import java.util.Map;
025: import java.util.logging.Level;
026: import java.util.logging.Logger;
027:
028: import javax.xml.namespace.QName;
029:
030: import com.ibm.wsdl.extensions.soap.SOAPBindingImpl;
031:
032: import org.apache.cxf.Bus;
033: import org.apache.cxf.BusFactory;
034: import org.apache.cxf.binding.Binding;
035: import org.apache.cxf.common.i18n.UncheckedException;
036: import org.apache.cxf.common.logging.LogUtils;
037: import org.apache.cxf.helpers.CastUtils;
038: import org.apache.cxf.interceptor.AbstractBasicInterceptorProvider;
039: import org.apache.cxf.interceptor.ClientOutFaultObserver;
040: import org.apache.cxf.interceptor.Interceptor;
041: import org.apache.cxf.interceptor.InterceptorChain;
042: import org.apache.cxf.message.Exchange;
043: import org.apache.cxf.message.ExchangeImpl;
044: import org.apache.cxf.message.Message;
045: import org.apache.cxf.message.MessageContentsList;
046: import org.apache.cxf.message.MessageImpl;
047: import org.apache.cxf.phase.PhaseChainCache;
048: import org.apache.cxf.phase.PhaseInterceptorChain;
049: import org.apache.cxf.phase.PhaseManager;
050: import org.apache.cxf.service.Service;
051: import org.apache.cxf.service.model.BindingInfo;
052: import org.apache.cxf.service.model.BindingMessageInfo;
053: import org.apache.cxf.service.model.BindingOperationInfo;
054: import org.apache.cxf.service.model.EndpointInfo;
055: import org.apache.cxf.service.model.InterfaceInfo;
056: import org.apache.cxf.service.model.MessageInfo;
057: import org.apache.cxf.service.model.OperationInfo;
058: import org.apache.cxf.service.model.ServiceInfo;
059: import org.apache.cxf.transport.Conduit;
060: import org.apache.cxf.transport.MessageObserver;
061: import org.apache.cxf.wsdl11.WSDLServiceFactory;
062:
063: public class ClientImpl extends AbstractBasicInterceptorProvider
064: implements Client, Retryable, MessageObserver {
065:
066: public static final String FINISHED = "exchange.finished";
067:
068: private static final Logger LOG = LogUtils
069: .getL7dLogger(ClientImpl.class);
070:
071: protected Bus bus;
072: protected ConduitSelector conduitSelector;
073: protected ClientOutFaultObserver outFaultObserver;
074: protected int synchronousTimeout = 10000; // default 10 second timeout
075:
076: protected PhaseChainCache outboundChainCache = new PhaseChainCache();
077: protected PhaseChainCache inboundChainCache = new PhaseChainCache();
078:
079: public ClientImpl(Bus b, Endpoint e) {
080: this (b, e, (ConduitSelector) null);
081: }
082:
083: public ClientImpl(Bus b, Endpoint e, Conduit c) {
084: this (b, e, new PreexistingConduitSelector(c));
085: }
086:
087: public ClientImpl(Bus b, Endpoint e, ConduitSelector sc) {
088: bus = b;
089: outFaultObserver = new ClientOutFaultObserver(bus);
090: getConduitSelector(sc).setEndpoint(e);
091: notifyLifecycleManager();
092: }
093:
094: public ClientImpl(URL wsdlUrl) {
095: this (BusFactory.getThreadDefaultBus(), wsdlUrl, null, null);
096: }
097:
098: public ClientImpl(URL wsdlUrl, QName port) {
099: this (BusFactory.getThreadDefaultBus(), wsdlUrl, null, port);
100: }
101:
102: public ClientImpl(Bus bus, URL wsdlUrl, QName service, QName port) {
103: this .bus = bus;
104:
105: WSDLServiceFactory sf = (service == null) ? (new WSDLServiceFactory(
106: bus, wsdlUrl))
107: : (new WSDLServiceFactory(bus, wsdlUrl, service));
108: Service svc = sf.create();
109:
110: EndpointInfo epfo = findEndpoint(svc, port);
111:
112: try {
113: getConduitSelector().setEndpoint(
114: new EndpointImpl(bus, svc, epfo));
115: } catch (EndpointException epex) {
116: throw new IllegalStateException(
117: "Unable to create endpoint: " + epex.getMessage(),
118: epex);
119: }
120: notifyLifecycleManager();
121: }
122:
123: public void destroy() {
124:
125: // TODO: also inform the conduit so it can shutdown any response listeners
126:
127: ClientLifeCycleManager mgr = bus
128: .getExtension(ClientLifeCycleManager.class);
129: if (null != mgr) {
130: mgr.clientDestroyed(this );
131: }
132: }
133:
134: private void notifyLifecycleManager() {
135: ClientLifeCycleManager mgr = bus
136: .getExtension(ClientLifeCycleManager.class);
137: if (null != mgr) {
138: mgr.clientCreated(this );
139: }
140: }
141:
142: private EndpointInfo findEndpoint(Service svc, QName port) {
143: EndpointInfo epfo;
144: if (port != null) {
145: epfo = svc.getEndpointInfo(port);
146: if (epfo == null) {
147: throw new IllegalArgumentException("The service "
148: + svc.getName() + " does not have an endpoint "
149: + port + ".");
150: }
151: } else {
152: epfo = null;
153: for (ServiceInfo svcfo : svc.getServiceInfos()) {
154: for (EndpointInfo e : svcfo.getEndpoints()) {
155: BindingInfo bfo = e.getBinding();
156:
157: if (bfo.getBindingId().equals(
158: "http://schemas.xmlsoap.org/wsdl/soap/")) {
159: for (Object o : bfo.getExtensors().get()) {
160: if (o instanceof SOAPBindingImpl) {
161: SOAPBindingImpl soapB = (SOAPBindingImpl) o;
162: if (soapB
163: .getTransportURI()
164: .equals(
165: "http://schemas.xmlsoap.org/soap/http")) {
166: epfo = e;
167: break;
168: }
169: }
170: }
171:
172: }
173: }
174: }
175: if (epfo == null) {
176: throw new UnsupportedOperationException(
177: "Only document-style SOAP 1.1 http are supported "
178: + "for auto-selection of endpoint; none were found.");
179: }
180: }
181: return epfo;
182: }
183:
184: public Endpoint getEndpoint() {
185: return getConduitSelector().getEndpoint();
186: }
187:
188: public Object[] invoke(BindingOperationInfo oi, Object... params)
189: throws Exception {
190: return invoke(oi, params, null);
191: }
192:
193: public Object[] invoke(String operationName, Object... params)
194: throws Exception {
195: QName q = new QName(getEndpoint().getService().getName()
196: .getNamespaceURI(), operationName);
197:
198: return invoke(q, params);
199: }
200:
201: public Object[] invoke(QName operationName, Object... params)
202: throws Exception {
203: BindingOperationInfo op = getEndpoint().getEndpointInfo()
204: .getBinding().getOperation(operationName);
205: if (op == null) {
206: throw new UncheckedException(
207: new org.apache.cxf.common.i18n.Message(
208: "NO_OPERATION", LOG, operationName));
209: }
210:
211: if (op.isUnwrappedCapable()) {
212: op = op.getUnwrappedOperation();
213: }
214:
215: return invoke(op, params);
216: }
217:
218: public Object[] invoke(BindingOperationInfo oi, Object[] params,
219: Map<String, Object> context) throws Exception {
220: return invoke(oi, params, context, null);
221: }
222:
223: public Object[] invoke(BindingOperationInfo oi, Object[] params,
224: Map<String, Object> context, Exchange exchange)
225: throws Exception {
226: if (exchange == null) {
227: exchange = new ExchangeImpl();
228: }
229: Endpoint endpoint = getEndpoint();
230:
231: Map<String, Object> requestContext = null;
232: Map<String, Object> responseContext = null;
233: if (LOG.isLoggable(Level.FINE)) {
234: LOG.fine("Invoke, operation info: " + oi + ", params: "
235: + params);
236: }
237: Message message = endpoint.getBinding().createMessage();
238: if (null != context) {
239: requestContext = CastUtils.cast((Map) context
240: .get(REQUEST_CONTEXT));
241: responseContext = CastUtils.cast((Map) context
242: .get(RESPONSE_CONTEXT));
243: message.put(Message.INVOCATION_CONTEXT, context);
244: }
245: //setup the message context
246: setContext(requestContext, message);
247: setParameters(params, message);
248:
249: if (null != requestContext) {
250: exchange.putAll(requestContext);
251: }
252: exchange.setOneWay(oi.getOutput() == null);
253:
254: exchange.setOutMessage(message);
255:
256: setOutMessageProperties(message, oi);
257: setExchangeProperties(exchange, endpoint, oi);
258:
259: // setup chain
260:
261: PhaseInterceptorChain chain = setupInterceptorChain(endpoint);
262: message.setInterceptorChain(chain);
263:
264: modifyChain(chain, requestContext);
265: chain.setFaultObserver(outFaultObserver);
266:
267: // setup conduit selector
268: prepareConduitSelector(message);
269:
270: // execute chain
271: chain.doIntercept(message);
272:
273: getConduitSelector().complete(exchange);
274:
275: // Check to see if there is a Fault from the outgoing chain
276: Exception ex = message.getContent(Exception.class);
277:
278: if (ex != null) {
279: throw ex;
280: }
281: ex = message.getExchange().get(Exception.class);
282: if (ex != null) {
283: throw ex;
284: }
285:
286: // Wait for a response if we need to
287: if (!oi.getOperationInfo().isOneWay()) {
288: synchronized (exchange) {
289: waitResponse(exchange);
290: }
291: }
292:
293: // Grab the response objects if there are any
294: List resList = null;
295: Message inMsg = exchange.getInMessage();
296: if (inMsg != null) {
297: if (null != responseContext) {
298: responseContext.putAll(inMsg);
299: if (LOG.isLoggable(Level.FINE)) {
300: LOG.fine("set responseContext to be"
301: + responseContext);
302: }
303: }
304: resList = inMsg.getContent(List.class);
305: }
306:
307: // check for an incoming fault
308: ex = getException(exchange);
309:
310: if (ex != null) {
311: throw ex;
312: }
313:
314: if (resList != null) {
315: return resList.toArray();
316: }
317: return null;
318: }
319:
320: protected Exception getException(Exchange exchange) {
321: if (exchange.getInFaultMessage() != null) {
322: return exchange.getInFaultMessage().getContent(
323: Exception.class);
324: } else if (exchange.getOutFaultMessage() != null) {
325: return exchange.getOutFaultMessage().getContent(
326: Exception.class);
327: }
328: return null;
329: }
330:
331: private void setContext(Map<String, Object> ctx, Message message) {
332: if (ctx != null) {
333: message.putAll(ctx);
334: if (LOG.isLoggable(Level.FINE)) {
335: LOG.fine("set requestContext to message be" + ctx);
336: }
337: }
338: }
339:
340: private void waitResponse(Exchange exchange) {
341: int remaining = synchronousTimeout;
342: while (!Boolean.TRUE.equals(exchange.get(FINISHED))
343: && remaining > 0) {
344: long start = System.currentTimeMillis();
345: try {
346: exchange.wait(remaining);
347: } catch (InterruptedException ex) {
348: // ignore
349: }
350: long end = System.currentTimeMillis();
351: remaining -= (int) (end - start);
352: }
353: if (!Boolean.TRUE.equals(exchange.get(FINISHED))) {
354: LogUtils.log(LOG, Level.WARNING, "RESPONSE_TIMEOUT",
355: exchange.get(OperationInfo.class).getName()
356: .toString());
357: }
358: }
359:
360: private void setParameters(Object[] params, Message message) {
361: MessageContentsList contents = new MessageContentsList(params);
362: message.setContent(List.class, contents);
363: }
364:
365: public void onMessage(Message message) {
366: Endpoint endpoint = message.getExchange().get(Endpoint.class);
367: if (endpoint == null) {
368: // in this case correlation will occur outside the transport,
369: // however there's a possibility that the endpoint may have been
370: // rebased in the meantime, so that the response will be mediated
371: // via a set of in interceptors provided by a *different* endpoint
372: //
373: endpoint = getConduitSelector().getEndpoint();
374: message.getExchange().put(Endpoint.class, endpoint);
375: }
376: message = endpoint.getBinding().createMessage(message);
377: message.put(Message.REQUESTOR_ROLE, Boolean.TRUE);
378: message.put(Message.INBOUND_MESSAGE, Boolean.TRUE);
379: PhaseManager pm = bus.getExtension(PhaseManager.class);
380:
381: List<Interceptor> i1 = bus.getInInterceptors();
382: if (LOG.isLoggable(Level.FINE)) {
383: LOG.fine("Interceptors contributed by bus: " + i1);
384: }
385: List<Interceptor> i2 = endpoint.getInInterceptors();
386: if (LOG.isLoggable(Level.FINE)) {
387: LOG.fine("Interceptors contributed by endpoint: " + i2);
388: }
389: List<Interceptor> i3 = getInInterceptors();
390: if (LOG.isLoggable(Level.FINE)) {
391: LOG.fine("Interceptors contributed by client: " + i3);
392: }
393: List<Interceptor> i4 = endpoint.getBinding()
394: .getInInterceptors();
395: if (LOG.isLoggable(Level.FINE)) {
396: LOG.fine("Interceptors contributed by binding: " + i4);
397: }
398:
399: PhaseInterceptorChain chain = inboundChainCache.get(pm
400: .getInPhases(), i1, i2, i3, i4);
401: message.setInterceptorChain(chain);
402:
403: chain.setFaultObserver(outFaultObserver);
404:
405: // execute chain
406: try {
407: String startingAfterInterceptorID = (String) message
408: .get(PhaseInterceptorChain.STARTING_AFTER_INTERCEPTOR_ID);
409: String startingInterceptorID = (String) message
410: .get(PhaseInterceptorChain.STARTING_AT_INTERCEPTOR_ID);
411: if (startingAfterInterceptorID != null) {
412: chain.doInterceptStartingAfter(message,
413: startingAfterInterceptorID);
414: } else if (startingInterceptorID != null) {
415: chain.doInterceptStartingAt(message,
416: startingInterceptorID);
417: } else {
418: chain.doIntercept(message);
419: }
420: } finally {
421: synchronized (message.getExchange()) {
422: if (!isPartialResponse(message)) {
423: message.getExchange().put(FINISHED, Boolean.TRUE);
424: message.getExchange().setInMessage(message);
425: message.getExchange().notifyAll();
426: }
427: }
428: }
429: }
430:
431: public Conduit getConduit() {
432: Message message = new MessageImpl();
433: Exchange exchange = new ExchangeImpl();
434: message.setExchange(exchange);
435: setExchangeProperties(exchange, null, null);
436: return getConduitSelector().selectConduit(message);
437: }
438:
439: protected void prepareConduitSelector(Message message) {
440: getConduitSelector().prepare(message);
441: message.getExchange().put(ConduitSelector.class,
442: getConduitSelector());
443: }
444:
445: protected void setOutMessageProperties(Message message,
446: BindingOperationInfo boi) {
447: message.put(Message.REQUESTOR_ROLE, Boolean.TRUE);
448: message.put(Message.INBOUND_MESSAGE, Boolean.FALSE);
449: message.put(BindingMessageInfo.class, boi.getInput());
450: message.put(MessageInfo.class, boi.getOperationInfo()
451: .getInput());
452: }
453:
454: protected void setExchangeProperties(Exchange exchange,
455: Endpoint endpoint, BindingOperationInfo boi) {
456: if (endpoint != null) {
457: exchange.put(Endpoint.class, endpoint);
458: exchange.put(Service.class, endpoint.getService());
459: if (endpoint.getEndpointInfo().getService() != null) {
460: exchange.put(ServiceInfo.class, endpoint
461: .getEndpointInfo().getService());
462: exchange.put(InterfaceInfo.class, endpoint
463: .getEndpointInfo().getService().getInterface());
464: }
465: exchange.put(Binding.class, endpoint.getBinding());
466: exchange.put(BindingInfo.class, endpoint.getEndpointInfo()
467: .getBinding());
468: }
469: if (boi != null) {
470: exchange.put(BindingOperationInfo.class, boi);
471: exchange.put(OperationInfo.class, boi.getOperationInfo());
472: }
473:
474: exchange.put(MessageObserver.class, this );
475: exchange.put(Retryable.class, this );
476: exchange.put(Bus.class, bus);
477:
478: if (endpoint != null && boi != null) {
479:
480: EndpointInfo endpointInfo = endpoint.getEndpointInfo();
481: exchange.put(Message.WSDL_OPERATION, boi.getName());
482:
483: QName serviceQName = endpointInfo.getService().getName();
484: exchange.put(Message.WSDL_SERVICE, serviceQName);
485:
486: QName interfaceQName = endpointInfo.getService()
487: .getInterface().getName();
488: exchange.put(Message.WSDL_INTERFACE, interfaceQName);
489:
490: QName portQName = endpointInfo.getName();
491: exchange.put(Message.WSDL_PORT, portQName);
492: URI wsdlDescription = endpointInfo.getProperty("URI",
493: URI.class);
494: if (wsdlDescription == null) {
495: String address = endpointInfo.getAddress();
496: try {
497: wsdlDescription = new URI(address + "?wsdl");
498: } catch (URISyntaxException e) {
499: // do nothing
500: }
501: endpointInfo.setProperty("URI", wsdlDescription);
502: }
503: exchange.put(Message.WSDL_DESCRIPTION, wsdlDescription);
504: }
505: }
506:
507: protected PhaseInterceptorChain setupInterceptorChain(
508: Endpoint endpoint) {
509:
510: PhaseManager pm = bus.getExtension(PhaseManager.class);
511:
512: List<Interceptor> i1 = bus.getOutInterceptors();
513: if (LOG.isLoggable(Level.FINE)) {
514: LOG.fine("Interceptors contributed by bus: " + i1);
515: }
516: List<Interceptor> i2 = endpoint.getOutInterceptors();
517: if (LOG.isLoggable(Level.FINE)) {
518: LOG.fine("Interceptors contributed by endpoint: " + i2);
519: }
520: List<Interceptor> i3 = getOutInterceptors();
521: if (LOG.isLoggable(Level.FINE)) {
522: LOG.fine("Interceptors contributed by client: " + i3);
523: }
524: List<Interceptor> i4 = endpoint.getBinding()
525: .getOutInterceptors();
526: if (LOG.isLoggable(Level.FINE)) {
527: LOG.fine("Interceptors contributed by binding: " + i4);
528: }
529: return outboundChainCache
530: .get(pm.getOutPhases(), i1, i2, i3, i4);
531: }
532:
533: protected void modifyChain(InterceptorChain chain,
534: Map<String, Object> ctx) {
535: // no-op
536: }
537:
538: protected void setEndpoint(Endpoint e) {
539: getConduitSelector().setEndpoint(e);
540: }
541:
542: public int getSynchronousTimeout() {
543: return synchronousTimeout;
544: }
545:
546: public void setSynchronousTimeout(int synchronousTimeout) {
547: this .synchronousTimeout = synchronousTimeout;
548: }
549:
550: public final ConduitSelector getConduitSelector() {
551: return getConduitSelector(null);
552: }
553:
554: protected final synchronized ConduitSelector getConduitSelector(
555: ConduitSelector override) {
556: if (null == conduitSelector) {
557: setConduitSelector(override != null ? override
558: : new UpfrontConduitSelector());
559: }
560: return conduitSelector;
561: }
562:
563: public final void setConduitSelector(ConduitSelector selector) {
564: conduitSelector = selector;
565: }
566:
567: private boolean isPartialResponse(Message in) {
568: return Boolean.TRUE.equals(in
569: .get(Message.PARTIAL_RESPONSE_MESSAGE));
570: }
571: }
|