001: package org.objectweb.celtix.routing;
002:
003: import java.net.MalformedURLException;
004: import java.net.URL;
005: import java.util.ArrayList;
006: import java.util.List;
007: import java.util.Map;
008: import java.util.logging.Level;
009: import java.util.logging.Logger;
010:
011: import javax.annotation.Resource;
012: import javax.wsdl.Definition;
013: import javax.wsdl.Port;
014: import javax.xml.namespace.QName;
015: import javax.xml.transform.stream.StreamSource;
016: import javax.xml.ws.BindingProvider;
017: import javax.xml.ws.Dispatch;
018: import javax.xml.ws.Provider;
019: import javax.xml.ws.Service;
020: import javax.xml.ws.ServiceMode;
021: import javax.xml.ws.WebServiceContext;
022: import javax.xml.ws.WebServiceException;
023: import javax.xml.ws.WebServiceProvider;
024: import javax.xml.ws.handler.MessageContext;
025:
026: import org.objectweb.celtix.common.i18n.Message;
027: import org.objectweb.celtix.common.logging.LogUtils;
028: import org.objectweb.celtix.routing.configuration.DestinationType;
029: import org.objectweb.celtix.routing.configuration.RouteType;
030:
031: @WebServiceProvider
032: @ServiceMode(value=Service.Mode.MESSAGE)
033: public class StreamSourceMessageProvider implements
034: Provider<StreamSource> {
035: private static final Logger LOG = LogUtils
036: .getL7dLogger(StreamSourceMessageProvider.class);
037: protected List<Dispatch<StreamSource>> dList;
038: private final Definition wsdlModel;
039: private final RouteType route;
040: private final URL wsdlLocation;
041: private boolean doInit;
042:
043: /**
044: * Injectable context.
045: */
046: @Resource
047: private WebServiceContext wsCtx;
048:
049: public StreamSourceMessageProvider(Definition model, RouteType rt) {
050: wsdlModel = model;
051: route = rt;
052: doInit = true;
053:
054: try {
055: wsdlLocation = new URL(wsdlModel.getDocumentBaseURI());
056: } catch (MalformedURLException mue) {
057: throw new WebServiceException("Invalid wsdl url", mue);
058: }
059: }
060:
061: @Resource
062: public void setContext(WebServiceContext ctx) {
063: wsCtx = ctx;
064: }
065:
066: public WebServiceContext getContext() {
067: return wsCtx;
068: }
069:
070: public StreamSource invoke(StreamSource request) {
071: if (doInit) {
072: init();
073: }
074:
075: Dispatch<StreamSource> dispatch = dList.get(0);
076:
077: //TODO Set Request/Response Context like Transport Attributes.
078: updateRequestContext(dispatch.getRequestContext());
079:
080: //TODO Use Async API
081: StreamSource resp = dispatch.invoke(request);
082:
083: updateWebServiceContext(dispatch.getResponseContext());
084: return resp;
085: }
086:
087: protected synchronized void init() {
088: if (doInit) {
089: List<DestinationType> dtList = route.getDestination();
090: if (null == dList) {
091: dList = new ArrayList<Dispatch<StreamSource>>(dtList
092: .size());
093: }
094:
095: for (DestinationType dt : dtList) {
096: Service dtService = createService(wsdlLocation, dt
097: .getService());
098: String portName;
099:
100: if (dt.isSetPort()) {
101: portName = dt.getPort();
102: } else {
103: javax.wsdl.Service destService = wsdlModel
104: .getService(dt.getService());
105: portName = ((Port) destService.getPorts().values()
106: .iterator()).getName();
107: }
108:
109: Dispatch<StreamSource> streamDispatch = createDispatch(
110: dtService, dt.getPort());
111: if (null == streamDispatch) {
112: LOG.log(Level.SEVERE, "CREATEDISPATCH_FAILURE",
113: new Object[] { dt.getService(), portName });
114: throw new WebServiceException(new Message(
115: "CREATEDISPATCH_FAILURE", LOG, dt
116: .getService(), portName).toString());
117: }
118: dList.add(streamDispatch);
119: }
120: doInit = false;
121: }
122: }
123:
124: protected Service createService(URL wsdlUrl, QName serviceName) {
125: //TODO Set Executor used by the Source Endpoint onto Service
126: //Currently destination service uses bus workqueue.
127: return Service.create(wsdlUrl, serviceName);
128: }
129:
130: protected Dispatch<StreamSource> createDispatch(
131: Service destService, String portName) {
132: QName port = new QName(destService.getServiceName()
133: .getNamespaceURI(), portName);
134:
135: return destService.createDispatch(port, StreamSource.class,
136: Service.Mode.MESSAGE);
137: }
138:
139: private void updateRequestContext(Map<String, Object> reqCtx) {
140: MessageContext sourceMsgCtx = getContext().getMessageContext();
141: reqCtx.put(BindingProvider.USERNAME_PROPERTY, sourceMsgCtx
142: .get(BindingProvider.USERNAME_PROPERTY));
143: reqCtx.put(BindingProvider.PASSWORD_PROPERTY, sourceMsgCtx
144: .get(BindingProvider.PASSWORD_PROPERTY));
145: }
146:
147: private void updateWebServiceContext(Map<String, Object> respCtx) {
148: //TODO
149: }
150: }
|