001: package org.objectweb.celtix.jbi.transport;
002:
003: import java.io.ByteArrayInputStream;
004: import java.io.ByteArrayOutputStream;
005: import java.io.IOException;
006: import java.io.InputStream;
007: import java.lang.reflect.Method;
008: import java.util.concurrent.Executor;
009: import java.util.concurrent.Future;
010: import java.util.logging.Logger;
011:
012: import javax.jbi.messaging.DeliveryChannel;
013: import javax.jbi.messaging.InOut;
014: import javax.jbi.messaging.MessageExchangeFactory;
015: import javax.jbi.messaging.NormalizedMessage;
016: import javax.jws.WebService;
017: import javax.wsdl.Port;
018: import javax.xml.namespace.QName;
019: import javax.xml.transform.Source;
020: import javax.xml.transform.stream.StreamSource;
021: import javax.xml.ws.handler.MessageContext;
022:
023: import org.objectweb.celtix.bindings.ClientBinding;
024: import org.objectweb.celtix.bindings.ResponseCallback;
025: import org.objectweb.celtix.context.InputStreamMessageContext;
026: import org.objectweb.celtix.context.ObjectMessageContext;
027: import org.objectweb.celtix.context.OutputStreamMessageContext;
028: import org.objectweb.celtix.transports.ClientTransport;
029: import org.objectweb.celtix.ws.addressing.EndpointReferenceType;
030: import org.objectweb.celtix.wsdl.EndpointReferenceUtils;
031:
032: /**
033: * Connects Celtix clients to the NormalizedMessageRouter. Celtix
034: * messages are wrapped in a NormalizedMessage before being sent to
035: * the NMR and are unwrapped when being received from it.
036: */
037: public class JBIClientTransport implements ClientTransport {
038:
039: private static final Logger LOG = Logger
040: .getLogger(JBIClientTransport.class.getName());
041: private final DeliveryChannel channel;
042: private final EndpointReferenceType endpointRef;
043: private final QName serviceName;
044: private final ResponseCallback responseCallback;
045:
046: public JBIClientTransport(DeliveryChannel dc,
047: EndpointReferenceType epr, ClientBinding binding) {
048: channel = dc;
049: endpointRef = epr;
050: serviceName = EndpointReferenceUtils
051: .getServiceName(endpointRef);
052: responseCallback = binding.createResponseCallback();
053: }
054:
055: public void invokeOneway(OutputStreamMessageContext context)
056: throws IOException {
057: throw new RuntimeException("not yet implemented");
058: }
059:
060: public InputStreamMessageContext invoke(
061: OutputStreamMessageContext context) throws IOException {
062:
063: try {
064: Method targetMethod = (Method) context
065: .get(ObjectMessageContext.METHOD_OBJ);
066: Class<?> clz = targetMethod.getDeclaringClass();
067:
068: LOG.fine("invoking service " + clz);
069:
070: WebService ws = clz.getAnnotation(WebService.class);
071: assert ws != null;
072: QName interfaceName = new QName(ws.targetNamespace(), ws
073: .name());
074:
075: MessageExchangeFactory factory = channel
076: .createExchangeFactoryForService(serviceName);
077: LOG.fine("create message exchange svc: " + serviceName);
078: InOut xchng = factory.createInOutExchange();
079:
080: NormalizedMessage inMsg = xchng.createMessage();
081: LOG.fine("exchange endpoint: " + xchng.getEndpoint());
082:
083: InputStream ins = null;
084:
085: if (inMsg != null) {
086: LOG.fine("setup message contents on " + inMsg);
087: inMsg.setContent(getMessageContent(context));
088: xchng.setService(serviceName);
089: LOG.fine("service for exchange " + serviceName);
090:
091: xchng.setInterfaceName(interfaceName);
092:
093: xchng.setOperation(new QName(targetMethod.getName()));
094: xchng.setInMessage(inMsg);
095: LOG.fine("sending message");
096: channel.sendSync(xchng);
097:
098: NormalizedMessage outMsg = xchng.getOutMessage();
099: ins = JBIMessageHelper
100: .convertMessageToInputStream(outMsg
101: .getContent());
102:
103: } else {
104: System.out.println("no message yet");
105: }
106:
107: if (ins == null) {
108: throw new IOException("unable to retrieve message");
109: }
110: return new JBIInputStreamMessageContext(context, ins);
111:
112: } catch (Exception ex) {
113: ex.printStackTrace();
114: throw new IOException(ex.toString());
115: }
116: }
117:
118: Source getMessageContent(OutputStreamMessageContext context) {
119: assert context instanceof JBIOutputStreamMessageContext : "context must be of type JBIOutputStreamMessageContext";
120:
121: JBIOutputStreamMessageContext ctx = (JBIOutputStreamMessageContext) context;
122: ByteArrayOutputStream bos = (ByteArrayOutputStream) ctx
123: .getOutputStream();
124: return new StreamSource(new ByteArrayInputStream(bos
125: .toByteArray()));
126: }
127:
128: public Future<InputStreamMessageContext> invokeAsync(
129: OutputStreamMessageContext context, Executor executor)
130: throws IOException {
131: throw new RuntimeException("not yet implemented");
132: }
133:
134: public void finalPrepareOutputStreamContext(
135: OutputStreamMessageContext context) throws IOException {
136: }
137:
138: public ResponseCallback getResponseCallback() {
139: return responseCallback;
140: }
141:
142: public void shutdown() {
143: }
144:
145: public OutputStreamMessageContext createOutputStreamContext(
146: MessageContext context) throws IOException {
147: return new JBIOutputStreamMessageContext(context);
148: }
149:
150: public EndpointReferenceType getTargetEndpoint() {
151: // TODO Auto-generated method stub
152: return null;
153: }
154:
155: public EndpointReferenceType getDecoupledEndpoint()
156: throws IOException {
157: // TODO Auto-generated method stub
158: return null;
159: }
160:
161: public Port getPort() {
162: // TODO Auto-generated method stub
163: return null;
164: }
165: }
|