001: package org.objectweb.celtix.bus.transports.http;
002:
003: import java.io.BufferedOutputStream;
004: import java.io.IOException;
005: import java.io.OutputStream;
006: import java.net.HttpURLConnection;
007: import java.net.Socket;
008: import java.net.SocketException;
009: import java.net.URL;
010: import java.net.URLConnection;
011: import java.util.ArrayList;
012: import java.util.Enumeration;
013: import java.util.Iterator;
014: import java.util.List;
015: import java.util.Map;
016: import java.util.logging.Level;
017:
018: import javax.wsdl.Definition;
019: import javax.wsdl.WSDLException;
020: import javax.xml.ws.handler.MessageContext;
021:
022: import org.mortbay.http.HttpRequest;
023: import org.mortbay.http.HttpResponse;
024: import org.mortbay.http.handler.AbstractHttpHandler;
025: import org.objectweb.celtix.Bus;
026: import org.objectweb.celtix.BusEvent;
027: import org.objectweb.celtix.BusEventListener;
028: import org.objectweb.celtix.BusException;
029: import org.objectweb.celtix.bindings.BindingContextUtils;
030: import org.objectweb.celtix.bus.busimpl.ComponentCreatedEvent;
031: import org.objectweb.celtix.bus.busimpl.ComponentRemovedEvent;
032: import org.objectweb.celtix.bus.configuration.ConfigurationEvent;
033: import org.objectweb.celtix.bus.configuration.ConfigurationEventFilter;
034: import org.objectweb.celtix.bus.management.counters.TransportServerCounters;
035: import org.objectweb.celtix.bus.ws.addressing.ContextUtils;
036: import org.objectweb.celtix.context.OutputStreamMessageContext;
037: import org.objectweb.celtix.transports.ServerTransportCallback;
038: import org.objectweb.celtix.ws.addressing.EndpointReferenceType;
039: import org.objectweb.celtix.wsdl.EndpointReferenceUtils;
040:
041: public class JettyHTTPServerTransport extends
042: AbstractHTTPServerTransport implements BusEventListener {
043:
044: private static final long serialVersionUID = 1L;
045: JettyHTTPServerEngine engine;
046: TransportServerCounters counters;
047:
048: public JettyHTTPServerTransport(Bus b, EndpointReferenceType ref)
049: throws WSDLException, IOException {
050: super (b, ref);
051: counters = new TransportServerCounters(
052: "JettyHTTPServerTransport");
053: engine = JettyHTTPServerEngine.getForPort(bus, nurl
054: .getProtocol(), nurl.getPort());
055: //register the configuration event
056: ConfigurationEventFilter configurationEventFilter = new ConfigurationEventFilter();
057: try {
058: bus.addListener((BusEventListener) this ,
059: configurationEventFilter);
060: } catch (BusException ex) {
061: LOG.log(Level.SEVERE, "REMOVE_LISTENER_FAILURE_MSG", ex);
062: }
063:
064: bus.sendEvent(new ComponentCreatedEvent(this ));
065: }
066:
067: public void shutdown() {
068: try {
069: bus.removeListener((BusEventListener) this );
070: } catch (BusException ex) {
071: LOG.log(Level.SEVERE, "REMOVE_LISTENER_FAILURE_MSG", ex);
072: }
073: bus.sendEvent(new ComponentRemovedEvent(this ));
074: }
075:
076: public synchronized void activate(ServerTransportCallback cb)
077: throws IOException {
078: callback = cb;
079: engine.addServant(url, new AbstractHttpHandler() {
080: public void handle(String pathInContext, String pathParams,
081: HttpRequest req, HttpResponse resp)
082: throws IOException {
083: if (pathInContext.equals(getName())) {
084: doService(req, resp);
085: }
086: }
087: });
088: }
089:
090: public void deactivate() throws IOException {
091: engine.removeServant(url);
092: }
093:
094: public OutputStreamMessageContext rebase(MessageContext context,
095: EndpointReferenceType decoupledResponseEndpoint)
096: throws IOException {
097: OutputStreamMessageContext outputContext = null;
098: HttpRequest request = (HttpRequest) context
099: .get(HTTPServerInputStreamContext.HTTP_REQUEST);
100: HttpResponse response = (HttpResponse) context
101: .get(HTTPServerInputStreamContext.HTTP_RESPONSE);
102: if (response != null) {
103: outputContext = new HTTPServerRebasedOutputStreamContext(
104: context, request, response);
105: context.put(HTTPServerInputStreamContext.HTTP_RESPONSE,
106: decoupledResponseEndpoint);
107: }
108: return outputContext;
109: }
110:
111: public void postDispatch(MessageContext bindingContext,
112: OutputStreamMessageContext context) {
113: Object responseObj = bindingContext
114: .get(HTTPServerInputStreamContext.HTTP_RESPONSE);
115:
116: if (context.isOneWay()) {
117: counters.getRequestOneWay().increase();
118: }
119: counters.getRequestTotal().increase();
120:
121: if (responseObj instanceof HttpResponse) {
122: HttpResponse response = (HttpResponse) responseObj;
123:
124: if (response.getStatus() == 500) {
125: counters.getTotalError().increase();
126: }
127:
128: try {
129: response.commit();
130: } catch (IOException e) {
131: e.printStackTrace();
132: }
133:
134: } else if (responseObj instanceof URLConnection) {
135: try {
136: URLConnection connection = (URLConnection) responseObj;
137: connection.getOutputStream().close();
138: connection.getInputStream().close();
139: } catch (IOException ioe) {
140: LOG.log(Level.WARNING, "DECOUPLED_RESPONSE_FAILED_MSG",
141: ioe);
142: }
143: }
144:
145: }
146:
147: protected void copyRequestHeaders(MessageContext ctx,
148: Map<String, List<String>> headers) {
149: HttpRequest req = (HttpRequest) ctx
150: .get(HTTPServerInputStreamContext.HTTP_REQUEST);
151: for (Enumeration e = req.getFieldNames(); e.hasMoreElements();) {
152: String fname = (String) e.nextElement();
153: List<String> values;
154: if (headers.containsKey(fname)) {
155: values = headers.get(fname);
156: } else {
157: values = new ArrayList<String>();
158: headers.put(fname, values);
159: }
160: for (Enumeration e2 = req.getFieldValues(fname); e2
161: .hasMoreElements();) {
162: String val = (String) e2.nextElement();
163: values.add(val);
164: }
165: }
166: }
167:
168: // REVISIT factor out to common shared with HTTPClientTransport
169: protected URLConnection getConnection(URL url) throws IOException {
170: URLConnection connection = url.openConnection();
171: connection.setDoOutput(true);
172: connection.setUseCaches(false);
173: if (connection instanceof HttpURLConnection) {
174: HttpURLConnection hc = (HttpURLConnection) connection;
175: hc.setRequestMethod("POST");
176: }
177: connection.setRequestProperty("Content-Type", "text/xml");
178: return connection;
179: }
180:
181: protected void setPolicies(MessageContext ctx,
182: Map<String, List<String>> headers) {
183: super .setPolicies(ctx, headers);
184: if (policy.isSetReceiveTimeout()) {
185: HttpRequest req = (HttpRequest) ctx
186: .get(HTTPServerInputStreamContext.HTTP_REQUEST);
187: Object connection = req.getHttpConnection().getConnection();
188: if (connection instanceof Socket) {
189: Socket sock = (Socket) connection;
190: try {
191: sock.setSoTimeout((int) policy.getReceiveTimeout());
192: } catch (SocketException ex) {
193: LOG.log(Level.INFO, "Could not set SoTimeout", ex);
194: }
195: }
196: }
197: }
198:
199: protected void copyHeaders(MessageContext context,
200: HttpResponse response) {
201: Map<?, ?> headers = (Map<?, ?>) context
202: .get(MessageContext.HTTP_RESPONSE_HEADERS);
203: if (null != headers) {
204: for (Iterator<?> iter = headers.keySet().iterator(); iter
205: .hasNext();) {
206: String header = (String) iter.next();
207: List<?> headerList = (List<?>) headers.get(header);
208: for (Object string : headerList) {
209: response.addField(header, (String) string);
210: }
211: }
212: }
213:
214: }
215:
216: /**
217: * @param context The associated MessageContext.
218: * @return the context that will be used to obtain the OutputStream
219: */
220: public OutputStreamMessageContext createOutputStreamContext(
221: MessageContext context) throws IOException {
222: OutputStreamMessageContext ret = null;
223: // REVISIT: move isRequestor to BindingContextUtils
224: if (ContextUtils.isRequestor(context)) {
225: // create client output stream context
226: ret = new AbstractHTTPRequestorOutputStreamContext(context) {
227: protected URLConnection getConnection(URL url)
228: throws IOException {
229: return url.openConnection();
230: }
231: };
232: } else {
233: ret = new HTTPServerOutputStreamContext(context);
234: }
235: return ret;
236: }
237:
238: void doService(HttpRequest req, HttpResponse resp)
239: throws IOException {
240:
241: if (policy.isSetRedirectURL()) {
242: resp.sendRedirect(policy.getRedirectURL());
243: resp.commit();
244: req.setHandled(true);
245: return;
246: }
247:
248: if ("GET".equals(req.getMethod())
249: && req.getURI().toString().toLowerCase().endsWith(
250: "?wsdl")) {
251: try {
252:
253: Definition def = EndpointReferenceUtils
254: .getWSDLDefinition(bus.getWSDLManager(),
255: reference);
256: resp.addField("Content-Type", "text/xml");
257: bus.getWSDLManager().getWSDLFactory().newWSDLWriter()
258: .writeWSDL(def, resp.getOutputStream());
259: resp.getOutputStream().flush();
260: resp.commit();
261: req.setHandled(true);
262: return;
263: } catch (WSDLException ex) {
264: // TODO Auto-generated catch block
265: ex.printStackTrace();
266: }
267: }
268:
269: final class Servicer implements Runnable {
270: private boolean complete;
271: private final HttpRequest request;
272: private final HttpResponse response;
273:
274: Servicer(HttpRequest reqs, HttpResponse resps) {
275: request = reqs;
276: response = resps;
277: }
278:
279: public void run() {
280: try {
281: serviceRequest(request, response);
282: } catch (IOException ex) {
283: // TODO handle exception
284: LOG.log(Level.SEVERE, "DISPATCH_FAILURE_MSG", ex);
285: } finally {
286: complete = true;
287: synchronized (this ) {
288: notifyAll();
289: }
290: }
291: }
292:
293: public synchronized void waitForCompletion() {
294: while (!complete) {
295: try {
296: wait();
297: } catch (InterruptedException ex) {
298: // ignore
299: }
300: }
301: }
302: }
303:
304: if (null == callback.getExecutor()) {
305: serviceRequest(req, resp);
306: } else {
307: Servicer servicer = new Servicer(req, resp);
308: callback.getExecutor().execute(servicer);
309: servicer.waitForCompletion();
310: }
311: }
312:
313: void serviceRequest(final HttpRequest req, final HttpResponse resp)
314: throws IOException {
315: try {
316: if (LOG.isLoggable(Level.INFO)) {
317: LOG.info("Service http request on thread: "
318: + Thread.currentThread());
319: }
320:
321: HTTPServerInputStreamContext ctx = new HTTPServerInputStreamContext(
322: this ) {
323: public void initContext() throws IOException {
324: super .initContext();
325: inStream = req.getInputStream();
326: origInputStream = inStream;
327: }
328: };
329: BindingContextUtils.storeAsyncOnewayDispatch(ctx, true);
330: ctx.put(HTTPServerInputStreamContext.HTTP_REQUEST, req);
331: ctx.put(HTTPServerInputStreamContext.HTTP_RESPONSE, resp);
332: ctx.initContext();
333:
334: callback.dispatch(ctx, this );
335: resp.commit();
336: req.setHandled(true);
337: } finally {
338: if (LOG.isLoggable(Level.INFO)) {
339: LOG.info("Finished servicing http request on thread: "
340: + Thread.currentThread());
341: }
342: }
343: }
344:
345: private class HTTPServerOutputStreamContext extends
346: AbstractHTTPServerOutputStreamContext {
347:
348: HTTPServerOutputStreamContext(MessageContext ctx)
349: throws IOException {
350: super (JettyHTTPServerTransport.this , ctx);
351: }
352:
353: protected void flushHeaders() throws IOException {
354: Object responseObj = get(HTTPServerInputStreamContext.HTTP_RESPONSE);
355: OutputStream responseStream = null;
356: if (responseObj instanceof HttpResponse) {
357: // non-decoupled response
358: HttpResponse response = (HttpResponse) responseObj;
359:
360: Integer i = (Integer) context.get(HTTP_RESPONSE_CODE);
361: if (i != null) {
362: if (i.intValue() == 500) {
363: response.setStatus(i.intValue(),
364: "Fault Occurred");
365: } else {
366: response.setStatus(i.intValue());
367: }
368: } else {
369: response.setStatus(200);
370: }
371:
372: copyHeaders(context, response);
373: responseStream = response.getOutputStream();
374:
375: if (isOneWay()) {
376: response.commit();
377: }
378: } else if (responseObj instanceof EndpointReferenceType) {
379: // decoupled response
380: EndpointReferenceType decoupledResponseEndpoint = (EndpointReferenceType) responseObj;
381:
382: if (!isOneWay()) {
383: // REVISIT: use policy logic from HTTPClientTransport
384: // REVISIT: handle connection closure
385: URL url = new URL(decoupledResponseEndpoint
386: .getAddress().getValue());
387: URLConnection connection = getConnection(url);
388: responseStream = connection.getOutputStream();
389: put(HTTPServerInputStreamContext.HTTP_RESPONSE,
390: connection);
391: }
392: } else if (responseObj instanceof URLConnection) {
393: // resent decoupled response
394: URL url = ((URLConnection) responseObj).getURL();
395: URLConnection connection = getConnection(url);
396: responseStream = connection.getOutputStream();
397: put(HTTPServerInputStreamContext.HTTP_RESPONSE,
398: connection);
399: } else {
400: LOG.log(Level.WARNING, "UNEXPECTED_RESPONSE_TYPE_MSG",
401: responseObj.getClass());
402: throw new IOException("UNEXPECTED_RESPONSE_TYPE_MSG"
403: + responseObj.getClass());
404: }
405:
406: if (isOneWay()) {
407: context
408: .remove(HTTPServerInputStreamContext.HTTP_RESPONSE);
409: } else {
410: origOut.resetOut(new BufferedOutputStream(
411: responseStream, 1024));
412: }
413: }
414: }
415:
416: private class HTTPServerRebasedOutputStreamContext extends
417: AbstractHTTPServerOutputStreamContext {
418:
419: private HttpRequest request;
420: private HttpResponse response;
421:
422: HTTPServerRebasedOutputStreamContext(MessageContext ctx,
423: HttpRequest req, HttpResponse resp) throws IOException {
424: super (JettyHTTPServerTransport.this , ctx);
425: request = req;
426: response = resp;
427: }
428:
429: protected void flushHeaders() throws IOException {
430: if (response != null) {
431: copyHeaders(context, response);
432: response.setStatus(HttpURLConnection.HTTP_ACCEPTED,
433: "Accepted");
434: response.commit();
435: request.setHandled(true);
436: origOut.resetOut(new BufferedOutputStream(response
437: .getOutputStream(), 1024));
438: }
439: }
440: }
441:
442: public void processEvent(BusEvent e) throws BusException {
443: if (e.getID().equals(ConfigurationEvent.RECONFIGURED)) {
444: String configName = (String) e.getSource();
445: reConfigure(configName);
446: }
447: }
448:
449: private void reConfigure(String configName) {
450: if ("servicesMonitoring".equals(configName)) {
451: if (bus.getConfiguration().getBoolean("servicesMonitoring")) {
452: counters.resetCounters();
453: } else {
454: counters.stopCounters();
455: }
456: }
457: }
458: }
|