001: package org.objectweb.celtix.jbi.transport;
002:
003: import java.io.ByteArrayInputStream;
004: import java.io.ByteArrayOutputStream;
005: import java.io.IOException;
006: import java.io.InputStream;
007: import java.util.logging.Level;
008: import java.util.logging.Logger;
009:
010: import javax.jbi.messaging.DeliveryChannel;
011: import javax.jbi.messaging.MessageExchange;
012: import javax.jbi.messaging.NormalizedMessage;
013: import javax.jbi.servicedesc.ServiceEndpoint;
014: import javax.xml.namespace.QName;
015: import javax.xml.parsers.DocumentBuilder;
016: import javax.xml.parsers.DocumentBuilderFactory;
017: import javax.xml.transform.dom.DOMSource;
018: import javax.xml.ws.handler.MessageContext;
019:
020: import org.w3c.dom.Document;
021:
022: import org.objectweb.celtix.context.ObjectMessageContext;
023: import org.objectweb.celtix.context.ObjectMessageContextImpl;
024: import org.objectweb.celtix.context.OutputStreamMessageContext;
025: import org.objectweb.celtix.jbi.se.CeltixServiceUnit;
026: import org.objectweb.celtix.jbi.se.CeltixServiceUnitManager;
027: import org.objectweb.celtix.transports.ServerTransport;
028: import org.objectweb.celtix.transports.ServerTransportCallback;
029: import org.objectweb.celtix.ws.addressing.EndpointReferenceType;
030:
031: /**
032: * Connects Celtix clients to the NormalizedMessageRouter. Celtix
033: * messages are wrapped in a NormalizedMessage before being sent to
034: * the NMR and are unwrapped when being received from it.
035: */
036: public class JBIServerTransport implements ServerTransport {
037:
038: private static final Logger LOG = Logger
039: .getLogger(JBIServerTransport.class.getName());
040:
041: private static final String MESSAGE_EXCHANGE_PROPERTY = "celtix.jbi.message.exchange";
042: private final CeltixServiceUnitManager suManager;
043: private final DeliveryChannel channel;
044: private ServerTransportCallback callback;
045: private volatile boolean running;
046: private JBIDispatcher dispatcher;
047: private final DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory
048: .newInstance();
049:
050: public JBIServerTransport(CeltixServiceUnitManager sum,
051: DeliveryChannel dc) {
052: suManager = sum;
053: channel = dc;
054: docBuilderFactory.setNamespaceAware(true);
055: }
056:
057: public void shutdown() {
058: running = false;
059: }
060:
061: public OutputStreamMessageContext createOutputStreamContext(
062: MessageContext context) throws IOException {
063:
064: return new JBIOutputStreamMessageContext(context);
065: }
066:
067: public void finalPrepareOutputStreamContext(
068: OutputStreamMessageContext context) throws IOException {
069: }
070:
071: public void activate(ServerTransportCallback cb) throws IOException {
072: // activate endpoints here
073: LOG.info("activating JBI server transport");
074: callback = cb;
075: dispatcher = new JBIDispatcher();
076: new Thread(dispatcher).start();
077: }
078:
079: public void deactivate() throws IOException {
080: running = false;
081: }
082:
083: public void postDispatch(MessageContext ctx,
084: OutputStreamMessageContext msgContext) {
085:
086: try {
087: JBIOutputStreamMessageContext jbiCtx = (JBIOutputStreamMessageContext) msgContext;
088: ByteArrayOutputStream baos = (ByteArrayOutputStream) jbiCtx
089: .getOutputStream();
090: ByteArrayInputStream bais = new ByteArrayInputStream(baos
091: .toByteArray());
092: LOG.finest("building document from bytes");
093: DocumentBuilder builder = docBuilderFactory
094: .newDocumentBuilder();
095: Document doc = builder.parse(bais);
096:
097: MessageExchange xchng = (MessageExchange) ctx
098: .get(MESSAGE_EXCHANGE_PROPERTY);
099: LOG.fine("creating NormalizedMessage");
100: NormalizedMessage msg = xchng.createMessage();
101: msg.setContent(new DOMSource(doc));
102: xchng.setMessage(msg, "out");
103: LOG.fine("postDispatch sending out message to NWR");
104: channel.send(xchng);
105: } catch (Exception ex) {
106: LOG.log(Level.SEVERE, "error sending Out message", ex);
107: }
108: }
109:
110: public OutputStreamMessageContext rebase(MessageContext context,
111: EndpointReferenceType decoupledResponseEndpoint)
112: throws IOException {
113: // TODO Auto-generated method stub
114: return null;
115: }
116:
117: private void dispatch(MessageExchange exchange,
118: ServerTransportCallback cb) throws IOException {
119:
120: try {
121: QName opName = exchange.getOperation();
122: LOG.fine("dispatch: " + opName);
123:
124: NormalizedMessage nm = exchange.getMessage("in");
125: final InputStream in = JBIMessageHelper
126: .convertMessageToInputStream(nm.getContent());
127: // dispatch through callback
128:
129: ObjectMessageContext ctx = new ObjectMessageContextImpl();
130: LOG.finest("dispatching message on callback: " + cb);
131: ctx.put(MESSAGE_EXCHANGE_PROPERTY, exchange);
132: cb
133: .dispatch(
134: new JBIInputStreamMessageContext(ctx, in),
135: this );
136: } catch (Exception ex) {
137: LOG.log(Level.SEVERE, "error preparing message", ex);
138: throw new IOException(ex.getMessage());
139: }
140: }
141:
142: private class JBIDispatcher implements Runnable {
143:
144: public final void run() {
145:
146: try {
147: running = true;
148: LOG
149: .fine("JBIServerTransport message receiving thread started");
150: do {
151: MessageExchange exchange = channel.accept();
152: if (exchange != null) {
153: // REVISIT: serialized message handling not such a
154: // good idea.
155: // REVISIT: can there be more than one ep?
156: ServiceEndpoint ep = exchange.getEndpoint();
157: CeltixServiceUnit csu = suManager
158: .getServiceUnitForEndpoint(ep);
159: ClassLoader oldLoader = Thread.currentThread()
160: .getContextClassLoader();
161:
162: try {
163: Thread.currentThread()
164: .setContextClassLoader(
165: csu.getClassLoader());
166: if (csu != null) {
167: LOG
168: .finest("dispatching to Celtix service unit");
169: dispatch(exchange, callback);
170: } else {
171: LOG.info("no CeltixServiceUnit found");
172: }
173: } finally {
174: Thread.currentThread()
175: .setContextClassLoader(oldLoader);
176: }
177: }
178: } while (running);
179: } catch (Exception ex) {
180: LOG.log(Level.SEVERE, "error running dispatch thread",
181: ex);
182: }
183: LOG
184: .fine("JBIServerTransport message processing thread exitting");
185: }
186: }
187: }
|