001: package org.objectweb.celtix.bus.transports.http.protocol.pipe;
002:
003: import java.io.BufferedOutputStream;
004: import java.io.IOException;
005: import java.io.OutputStream;
006: import java.util.ArrayList;
007: import java.util.Arrays;
008: import java.util.LinkedHashMap;
009: import java.util.List;
010: import java.util.Map;
011:
012: import javax.wsdl.Definition;
013: import javax.wsdl.WSDLException;
014: import javax.xml.ws.handler.MessageContext;
015:
016: import org.objectweb.celtix.Bus;
017: import org.objectweb.celtix.bus.transports.http.AbstractHTTPServerOutputStreamContext;
018: import org.objectweb.celtix.bus.transports.http.AbstractHTTPServerTransport;
019: import org.objectweb.celtix.bus.transports.http.HTTPServerInputStreamContext;
020: import org.objectweb.celtix.context.OutputStreamMessageContext;
021: import org.objectweb.celtix.transports.ServerTransportCallback;
022: import org.objectweb.celtix.ws.addressing.EndpointReferenceType;
023: import org.objectweb.celtix.wsdl.EndpointReferenceUtils;
024:
025: public class PipeHTTPServerTransport extends
026: AbstractHTTPServerTransport {
027: private static final String PIPE_RESPONSE = "PIPE_RESPONSE";
028:
029: public PipeHTTPServerTransport(Bus b, EndpointReferenceType ref)
030: throws WSDLException, IOException {
031: super (b, ref);
032: }
033:
034: protected void copyRequestHeaders(MessageContext ctx,
035: Map<String, List<String>> headers) {
036: PipeResponse req = (PipeResponse) ctx.get(PIPE_RESPONSE);
037: Map<String, List<String>> reqHeaders = req.getRequestHeaders();
038:
039: for (String fname : reqHeaders.keySet()) {
040: List<String> values;
041: if (headers.containsKey(fname)) {
042: values = headers.get(fname);
043: } else {
044: values = new ArrayList<String>();
045: headers.put(fname, values);
046: }
047: values.addAll(reqHeaders.get(fname));
048: }
049: }
050:
051: public void activate(ServerTransportCallback cb) throws IOException {
052: callback = cb;
053: PipeServer.SERVERS.put(name, this );
054: }
055:
056: public void deactivate() throws IOException {
057: PipeServer.SERVERS.remove(name);
058: }
059:
060: public void postDispatch(MessageContext bindingContext,
061: OutputStreamMessageContext context) {
062: PipeResponse resp = (PipeResponse) context.get(PIPE_RESPONSE);
063: try {
064: resp.getOutputStream().close();
065: } catch (IOException e) {
066: //ignore
067: e.printStackTrace();
068: }
069: }
070:
071: public OutputStreamMessageContext createOutputStreamContext(
072: MessageContext context) throws IOException {
073: return new AbstractHTTPServerOutputStreamContext(this , context) {
074: @SuppressWarnings("unchecked")
075: protected void flushHeaders() throws IOException {
076: PipeResponse resp = (PipeResponse) context
077: .get(PIPE_RESPONSE);
078:
079: Map<String, List<String>> respHeaders = new LinkedHashMap<String, List<String>>();
080:
081: if (containsKey(HTTP_RESPONSE_CODE)) {
082: respHeaders.put(HTTP_RESPONSE_CODE, Arrays
083: .asList(new String[] { get(
084: HTTP_RESPONSE_CODE).toString() }));
085: }
086: Map<String, List<String>> headers = (Map<String, List<String>>) super
087: .get(HTTP_RESPONSE_HEADERS);
088: if (null != headers) {
089: for (String name : headers.keySet()) {
090: List<String> headerList = (List<String>) headers
091: .get(name);
092: respHeaders.put(name, headerList);
093: }
094: }
095: OutputStream out = resp.setResponse(respHeaders);
096: origOut.resetOut(new BufferedOutputStream(out, 1024));
097: }
098: };
099: }
100:
101: void doService(PipeResponse response) throws IOException {
102:
103: final class Servicer implements Runnable {
104: private final PipeResponse response;
105:
106: Servicer(PipeResponse resps) {
107: response = resps;
108: }
109:
110: public void run() {
111: try {
112: serviceRequest(response);
113: } catch (IOException ex) {
114: // TODO handle exception
115: ex.printStackTrace();
116: }
117: }
118: }
119:
120: Servicer servicer = new Servicer(response);
121: if (null == callback.getExecutor()) {
122: bus.getWorkQueueManager().getAutomaticWorkQueue().execute(
123: servicer);
124: } else {
125: callback.getExecutor().execute(servicer);
126: }
127: }
128:
129: void serviceRequest(final PipeResponse response) throws IOException {
130: if (!response.getURLConnection().getDoInput()) {
131: try {
132: Definition def = EndpointReferenceUtils
133: .getWSDLDefinition(bus.getWSDLManager(),
134: reference);
135: Map<String, List<String>> headers = new LinkedHashMap<String, List<String>>();
136: headers.put("Content-Type", Arrays
137: .asList(new String[] { "text/xml" }));
138: OutputStream out = response.setResponse(headers);
139: bus.getWSDLManager().getWSDLFactory().newWSDLWriter()
140: .writeWSDL(def, out);
141: out.flush();
142: out.close();
143: return;
144: } catch (WSDLException ex) {
145: // TODO Auto-generated catch block
146: ex.printStackTrace();
147: }
148: }
149:
150: HTTPServerInputStreamContext ctx = new HTTPServerInputStreamContext(
151: this ) {
152: public void initContext() throws IOException {
153: super.initContext();
154: inStream = response.getRequestInputStream();
155: origInputStream = inStream;
156: }
157: };
158: ctx.put(PIPE_RESPONSE, response);
159: ctx.initContext();
160:
161: callback.dispatch(ctx, this);
162: }
163:
164: }
|