001: package org.objectweb.celtix.bus.transports.http;
002:
003: import java.io.BufferedOutputStream;
004: import java.io.ByteArrayInputStream;
005: import java.io.ByteArrayOutputStream;
006: import java.io.FilterOutputStream;
007: import java.io.IOException;
008: import java.io.InputStream;
009: import java.io.OutputStream;
010: import java.net.HttpURLConnection;
011: import java.net.InetSocketAddress;
012: import java.net.Proxy;
013: import java.net.URL;
014: import java.net.URLConnection;
015: import java.util.Arrays;
016: import java.util.HashMap;
017: import java.util.List;
018: import java.util.Map;
019: import java.util.concurrent.Callable;
020: import java.util.concurrent.Executor;
021: import java.util.concurrent.Future;
022: import java.util.concurrent.FutureTask;
023: import java.util.logging.Level;
024: import java.util.logging.Logger;
025:
026: import javax.net.ssl.HttpsURLConnection;
027: import javax.wsdl.Port;
028: import javax.wsdl.WSDLException;
029: import javax.xml.ws.BindingProvider;
030: import javax.xml.ws.WebServiceException;
031: import javax.xml.ws.handler.MessageContext;
032:
033: import static javax.xml.ws.handler.MessageContext.HTTP_RESPONSE_CODE;
034:
035: import org.mortbay.http.HttpRequest;
036: import org.mortbay.http.HttpResponse;
037: import org.mortbay.http.handler.AbstractHttpHandler;
038: import org.objectweb.celtix.Bus;
039: import org.objectweb.celtix.bindings.BindingContextUtils;
040: import org.objectweb.celtix.bindings.ClientBinding;
041: import org.objectweb.celtix.bindings.ResponseCallback;
042: import org.objectweb.celtix.bus.busimpl.ComponentCreatedEvent;
043: import org.objectweb.celtix.bus.busimpl.ComponentRemovedEvent;
044: import org.objectweb.celtix.bus.configuration.security.AuthorizationPolicy;
045: import org.objectweb.celtix.bus.configuration.security.SSLClientPolicy;
046: import org.objectweb.celtix.bus.configuration.wsdl.WsdlHttpConfigurationProvider;
047: import org.objectweb.celtix.bus.configuration.wsdl.WsdlPortProvider;
048: import org.objectweb.celtix.bus.management.counters.TransportClientCounters;
049: import org.objectweb.celtix.bus.transports.https.JettySslClientConfigurer;
050: import org.objectweb.celtix.common.logging.LogUtils;
051: import org.objectweb.celtix.common.util.Base64Utility;
052: import org.objectweb.celtix.configuration.Configuration;
053: import org.objectweb.celtix.configuration.ConfigurationBuilder;
054: import org.objectweb.celtix.configuration.ConfigurationBuilderFactory;
055: import org.objectweb.celtix.context.GenericMessageContext;
056: import org.objectweb.celtix.context.InputStreamMessageContext;
057: import org.objectweb.celtix.context.MessageContextWrapper;
058: import org.objectweb.celtix.context.ObjectMessageContext;
059: import org.objectweb.celtix.context.OutputStreamMessageContext;
060: import org.objectweb.celtix.transports.ClientTransport;
061: import org.objectweb.celtix.transports.http.configuration.HTTPClientPolicy;
062: import org.objectweb.celtix.ws.addressing.EndpointReferenceType;
063: import org.objectweb.celtix.wsdl.EndpointReferenceUtils;
064:
065: public class HTTPClientTransport implements ClientTransport {
066:
067: private static final Logger LOG = LogUtils
068: .getL7dLogger(HTTPClientTransport.class);
069:
070: private static final String PORT_CONFIGURATION_URI = "http://celtix.objectweb.org/bus/jaxws/port-config";
071: private static final String HTTP_CLIENT_CONFIGURATION_URI = "http://celtix.objectweb.org/bus/transports/http/http-client-config";
072: private static final String HTTP_CLIENT_CONFIGURATION_ID = "http-client";
073:
074: final HTTPClientPolicy policy;
075: final SSLClientPolicy sslClientPolicy;
076: final AuthorizationPolicy authPolicy;
077: final AuthorizationPolicy proxyAuthPolicy;
078: final Configuration configuration;
079: final Configuration portConfiguration;
080: final EndpointReferenceType targetEndpoint;
081: final Bus bus;
082: final Port port;
083: final HTTPTransportFactory factory;
084:
085: URL url;
086: TransportClientCounters counters;
087:
088: private JettyHTTPServerEngine decoupledEngine;
089: private EndpointReferenceType decoupledEndpoint;
090: private String decoupledAddress;
091: private URL decoupledURL;
092: private ClientBinding clientBinding;
093: private ResponseCallback responseCallback;
094:
095: public HTTPClientTransport(Bus b, EndpointReferenceType ref,
096: ClientBinding binding, HTTPTransportFactory f)
097: throws WSDLException, IOException {
098:
099: bus = b;
100: portConfiguration = getPortConfiguration(bus, ref);
101: String address = portConfiguration.getString("address");
102: EndpointReferenceUtils.setAddress(ref, address);
103: targetEndpoint = ref;
104: clientBinding = binding;
105: factory = f;
106: url = new URL(address);
107: counters = new TransportClientCounters("HTTPClientTransport");
108:
109: port = EndpointReferenceUtils
110: .getPort(bus.getWSDLManager(), ref);
111: configuration = createConfiguration(portConfiguration);
112: policy = getClientPolicy(configuration);
113: authPolicy = getAuthPolicy("authorization", configuration);
114: proxyAuthPolicy = getAuthPolicy("proxyAuthorization",
115: configuration);
116: sslClientPolicy = getSSLClientPolicy(configuration);
117: bus.sendEvent(new ComponentCreatedEvent(this ));
118:
119: }
120:
121: private HTTPClientPolicy getClientPolicy(Configuration conf) {
122: HTTPClientPolicy pol = conf.getObject(HTTPClientPolicy.class,
123: "httpClient");
124: if (pol == null) {
125: pol = new HTTPClientPolicy();
126: }
127: return pol;
128: }
129:
130: private AuthorizationPolicy getAuthPolicy(String type,
131: Configuration conf) {
132: AuthorizationPolicy pol = conf.getObject(
133: AuthorizationPolicy.class, type);
134: if (pol == null) {
135: pol = new AuthorizationPolicy();
136: }
137: return pol;
138: }
139:
140: private SSLClientPolicy getSSLClientPolicy(Configuration conf) {
141: SSLClientPolicy pol = conf.getObject(SSLClientPolicy.class,
142: "sslClient");
143: if (pol == null) {
144: pol = new SSLClientPolicy();
145: }
146: return pol;
147: }
148:
149: public EndpointReferenceType getTargetEndpoint() {
150: return targetEndpoint;
151: }
152:
153: public synchronized EndpointReferenceType getDecoupledEndpoint()
154: throws IOException {
155: if (decoupledEndpoint == null
156: && policy.getDecoupledEndpoint() != null) {
157: decoupledEndpoint = setUpDecoupledEndpoint();
158: }
159: return decoupledEndpoint;
160: }
161:
162: public Port getPort() {
163: return port;
164: }
165:
166: public OutputStreamMessageContext createOutputStreamContext(
167: MessageContext context) throws IOException {
168: return new HTTPClientOutputStreamContext(url, policy,
169: authPolicy, proxyAuthPolicy, sslClientPolicy, context,
170: portConfiguration);
171: }
172:
173: public void finalPrepareOutputStreamContext(
174: OutputStreamMessageContext context) throws IOException {
175: HTTPClientOutputStreamContext ctx = (HTTPClientOutputStreamContext) context;
176: ctx.flushHeaders();
177: }
178:
179: public void invokeOneway(OutputStreamMessageContext context)
180: throws IOException {
181: try {
182: HTTPClientOutputStreamContext ctx = (HTTPClientOutputStreamContext) context;
183: context.getOutputStream().close();
184: ctx.getCorrespondingInputStreamContext().getInputStream()
185: .close();
186: counters.getInvokeOneWay().increase();
187: } catch (Exception ex) {
188: counters.getInvokeError().increase();
189: throw new IOException(ex.getMessage());
190: }
191: }
192:
193: public InputStreamMessageContext invoke(
194: OutputStreamMessageContext context) throws IOException {
195: try {
196: context.getOutputStream().close();
197: HTTPClientOutputStreamContext requestContext = (HTTPClientOutputStreamContext) context;
198: counters.getInvoke().increase();
199: return getResponseContext(requestContext);
200: } catch (Exception ex) {
201: counters.getInvokeError().increase();
202: throw new IOException(ex.getMessage());
203: }
204: }
205:
206: public Future<InputStreamMessageContext> invokeAsync(
207: OutputStreamMessageContext context, Executor executor)
208: throws IOException {
209: try {
210: context.getOutputStream().close();
211: HTTPClientOutputStreamContext ctx = (HTTPClientOutputStreamContext) context;
212: FutureTask<InputStreamMessageContext> f = new FutureTask<InputStreamMessageContext>(
213: getInputStreamMessageContextCallable(ctx));
214: // client (service) must always have an executor associated with it
215: executor.execute(f);
216: counters.getInvokeAsync().increase();
217: return f;
218: } catch (Exception ex) {
219: counters.getInvokeError().increase();
220: throw new IOException(ex.getMessage());
221: }
222: }
223:
224: public ResponseCallback getResponseCallback() {
225: return responseCallback;
226: }
227:
228: public void shutdown() {
229: if (url != null) {
230: try {
231: URLConnection connect = url.openConnection();
232: if (connect instanceof HttpURLConnection) {
233: ((HttpURLConnection) connect).disconnect();
234: }
235: } catch (IOException ex) {
236: //ignore
237: }
238: url = null;
239: }
240:
241: if (decoupledURL != null && decoupledEngine != null) {
242: try {
243: DecoupledHandler decoupledHandler = (DecoupledHandler) decoupledEngine
244: .getServant(decoupledAddress);
245: if (decoupledHandler != null) {
246: decoupledHandler.release();
247: }
248: } catch (IOException ioe) {
249: // ignore
250: }
251: }
252:
253: bus.sendEvent(new ComponentRemovedEvent(this ));
254: }
255:
256: protected InputStreamMessageContext getResponseContext(
257: HTTPClientOutputStreamContext requestContext)
258: throws IOException {
259: InputStreamMessageContext responseContext = null;
260: if (hasDecoupledEndpoint()) {
261: int responseCode = getResponseCode(requestContext.connection);
262: if (responseCode == HttpURLConnection.HTTP_ACCEPTED) {
263: // server transport was rebased on decoupled response endpoint,
264: // dispatch this partial response immediately as it may include
265: // piggybacked content
266: responseContext = requestContext
267: .getCorrespondingInputStreamContext();
268: BindingContextUtils.storeDecoupledResponse(
269: responseContext, true);
270: } else {
271: // request failed *before* server transport was rebased on
272: // decoupled response endpoint
273: responseContext = requestContext
274: .getCorrespondingInputStreamContext();
275: }
276: } else {
277: responseContext = requestContext
278: .getCorrespondingInputStreamContext();
279: }
280: return responseContext;
281: }
282:
283: private EndpointReferenceType setUpDecoupledEndpoint() {
284: EndpointReferenceType reference = EndpointReferenceUtils
285: .getEndpointReference(policy.getDecoupledEndpoint());
286: if (reference != null) {
287: decoupledAddress = reference.getAddress().getValue();
288: LOG
289: .info("creating decoupled endpoint: "
290: + decoupledAddress);
291: try {
292: decoupledURL = new URL(decoupledAddress);
293: decoupledEngine = JettyHTTPServerEngine.getForPort(bus,
294: decoupledURL.getProtocol(), decoupledURL
295: .getPort());
296: DecoupledHandler decoupledHandler = (DecoupledHandler) decoupledEngine
297: .getServant(decoupledAddress);
298: if (decoupledHandler == null) {
299: responseCallback = clientBinding
300: .createResponseCallback();
301: decoupledEngine.addServant(decoupledAddress,
302: new DecoupledHandler(responseCallback));
303: } else {
304: responseCallback = decoupledHandler.duplicate();
305: }
306:
307: } catch (Exception e) {
308: // REVISIT move message to localizable Messages.properties
309: LOG.log(Level.WARNING,
310: "decoupled endpoint creation failed: ", e);
311: }
312: }
313: return reference;
314: }
315:
316: protected synchronized boolean hasDecoupledEndpoint() {
317: return decoupledEndpoint != null;
318: }
319:
320: protected static Configuration getPortConfiguration(Bus bus,
321: EndpointReferenceType ref) {
322: Configuration busConfiguration = bus.getConfiguration();
323: String id = EndpointReferenceUtils.getServiceName(ref)
324: .toString()
325: + "/" + EndpointReferenceUtils.getPortName(ref);
326: Configuration portConfiguration = busConfiguration.getChild(
327: PORT_CONFIGURATION_URI, id);
328:
329: if (portConfiguration == null) {
330: ConfigurationBuilder cb = ConfigurationBuilderFactory
331: .getBuilder(null);
332: portConfiguration = cb.getConfiguration(
333: PORT_CONFIGURATION_URI, id, bus.getConfiguration());
334: if (null == portConfiguration) {
335: portConfiguration = cb.buildConfiguration(
336: PORT_CONFIGURATION_URI, id, bus
337: .getConfiguration());
338: }
339:
340: // add the additional provider
341: Port port = null;
342: try {
343: port = EndpointReferenceUtils.getPort(bus
344: .getWSDLManager(), ref);
345: } catch (WSDLException ex) {
346: throw new WebServiceException(
347: "Could not get port from wsdl", ex);
348: }
349: portConfiguration.getProviders().add(
350: new WsdlPortProvider(port));
351: }
352: return portConfiguration;
353: }
354:
355: private Configuration createConfiguration(Configuration portCfg) {
356: ConfigurationBuilder cb = ConfigurationBuilderFactory
357: .getBuilder(null);
358: Configuration cfg = cb.getConfiguration(
359: HTTP_CLIENT_CONFIGURATION_URI,
360: HTTP_CLIENT_CONFIGURATION_ID, portCfg);
361: if (null == cfg) {
362: cfg = cb.buildConfiguration(HTTP_CLIENT_CONFIGURATION_URI,
363: HTTP_CLIENT_CONFIGURATION_ID, portCfg);
364: }
365: // register the additional provider
366: if (null != port) {
367: cfg.getProviders().add(
368: new WsdlHttpConfigurationProvider(port, false));
369: }
370: return cfg;
371: }
372:
373: protected static int getResponseCode(URLConnection connection)
374: throws IOException {
375: int responseCode = HttpURLConnection.HTTP_OK;
376: if (connection instanceof HttpURLConnection) {
377: HttpURLConnection hc = (HttpURLConnection) connection;
378: responseCode = hc.getResponseCode();
379: } else {
380: if (connection.getHeaderField(HTTP_RESPONSE_CODE) != null) {
381: responseCode = Integer.parseInt(connection
382: .getHeaderField(HTTP_RESPONSE_CODE));
383: }
384: }
385: return responseCode;
386: }
387:
388: protected InputStreamMessageContextCallable getInputStreamMessageContextCallable(
389: HTTPClientOutputStreamContext context) {
390: return new InputStreamMessageContextCallable(context);
391: }
392:
393: protected static class HTTPClientOutputStreamContext extends
394: MessageContextWrapper implements OutputStreamMessageContext {
395:
396: URLConnection connection;
397: WrappedOutputStream origOut;
398: OutputStream out;
399: HTTPClientInputStreamContext inputStreamContext;
400: HTTPClientPolicy policy;
401: AuthorizationPolicy authPolicy;
402: AuthorizationPolicy proxyAuthPolicy;
403: SSLClientPolicy sslClientPolicy;
404: Configuration portConfiguration;
405:
406: @SuppressWarnings("unchecked")
407: public HTTPClientOutputStreamContext(URL url,
408: HTTPClientPolicy p, AuthorizationPolicy ap,
409: AuthorizationPolicy pap, SSLClientPolicy sslcp,
410: MessageContext ctx, Configuration configParam)
411: throws IOException {
412: super (ctx);
413:
414: Map<String, List<String>> headers = (Map<String, List<String>>) super
415: .get(HTTP_REQUEST_HEADERS);
416: if (null == headers) {
417: headers = new HashMap<String, List<String>>();
418: super .put(HTTP_REQUEST_HEADERS, headers);
419: }
420:
421: policy = p;
422: authPolicy = ap;
423: proxyAuthPolicy = pap;
424: sslClientPolicy = sslcp;
425: portConfiguration = configParam;
426: String value = (String) ctx
427: .get(BindingProvider.ENDPOINT_ADDRESS_PROPERTY);
428: if (value != null) {
429: url = new URL(value);
430: }
431:
432: if (policy.isSetProxyServer()) {
433: Proxy proxy = new Proxy(Proxy.Type.valueOf(policy
434: .getProxyServerType().toString()),
435: new InetSocketAddress(policy.getProxyServer(),
436: policy.getProxyServerPort()));
437: connection = url.openConnection(proxy);
438: } else {
439: connection = url.openConnection();
440: }
441: connection.setDoOutput(true);
442:
443: if (connection instanceof HttpURLConnection) {
444: HttpURLConnection hc = (HttpURLConnection) connection;
445: hc.setRequestMethod("POST");
446: }
447:
448: connection.setConnectTimeout((int) policy
449: .getConnectionTimeout());
450: connection.setReadTimeout((int) policy.getReceiveTimeout());
451:
452: connection.setUseCaches(false);
453: if (connection instanceof HttpURLConnection) {
454: HttpURLConnection hc = (HttpURLConnection) connection;
455: if (policy.isAutoRedirect()) {
456: //cannot use chunking if autoredirect as the request will need to be
457: //completely cached locally and resent to the redirect target
458: hc.setInstanceFollowRedirects(true);
459: } else {
460: hc.setInstanceFollowRedirects(false);
461: if (policy.isAllowChunking()) {
462: hc.setChunkedStreamingMode(2048);
463: }
464: }
465: }
466: setPolicies(headers);
467: if (connection instanceof HttpsURLConnection) {
468: setSSLPolicies();
469: }
470:
471: origOut = new WrappedOutputStream();
472: out = origOut;
473: }
474:
475: private void setSSLPolicies() {
476: JettySslClientConfigurer sslClientConfigurer = new JettySslClientConfigurer(
477: sslClientPolicy, connection, portConfiguration);
478: sslClientConfigurer.configure();
479: }
480:
481: private void setPolicies(Map<String, List<String>> headers) {
482: String userName = (String) get(BindingProvider.USERNAME_PROPERTY);
483: if (userName == null && authPolicy.isSetUserName()) {
484: userName = authPolicy.getUserName();
485: }
486: if (userName != null) {
487: String passwd = (String) get(BindingProvider.PASSWORD_PROPERTY);
488: if (passwd == null && authPolicy.isSetPassword()) {
489: passwd = authPolicy.getPassword();
490: }
491: userName += ":";
492: if (passwd != null) {
493: userName += passwd;
494: }
495: userName = Base64Utility.encode(userName.getBytes());
496: headers.put("Authorization", Arrays
497: .asList(new String[] { "Basic " + userName }));
498: } else if (authPolicy.isSetAuthorizationType()
499: && authPolicy.isSetAuthorization()) {
500: String type = authPolicy.getAuthorizationType();
501: type += " ";
502: type += authPolicy.getAuthorization();
503: headers.put("Authorization", Arrays
504: .asList(new String[] { type }));
505: }
506: if (proxyAuthPolicy.isSetUserName()) {
507: userName = proxyAuthPolicy.getUserName();
508: if (userName != null) {
509: String passwd = "";
510: if (proxyAuthPolicy.isSetPassword()) {
511: passwd = proxyAuthPolicy.getPassword();
512: }
513: userName += ":";
514: if (passwd != null) {
515: userName += passwd;
516: }
517: userName = Base64Utility
518: .encode(userName.getBytes());
519: headers.put("Proxy-Authorization",
520: Arrays.asList(new String[] { "Basic "
521: + userName }));
522: } else if (proxyAuthPolicy.isSetAuthorizationType()
523: && proxyAuthPolicy.isSetAuthorization()) {
524: String type = proxyAuthPolicy
525: .getAuthorizationType();
526: type += " ";
527: type += proxyAuthPolicy.getAuthorization();
528: headers.put("Proxy-Authorization", Arrays
529: .asList(new String[] { type }));
530: }
531: }
532: if (policy.isSetCacheControl()) {
533: headers.put("Cache-Control", Arrays
534: .asList(new String[] { policy.getCacheControl()
535: .value() }));
536: }
537: if (policy.isSetHost()) {
538: headers.put("Host", Arrays.asList(new String[] { policy
539: .getHost() }));
540: }
541: if (policy.isSetConnection()) {
542: headers.put("Connection", Arrays
543: .asList(new String[] { policy.getConnection()
544: .value() }));
545: }
546: if (policy.isSetAccept()) {
547: headers.put("Accept", Arrays
548: .asList(new String[] { policy.getAccept() }));
549: }
550: if (policy.isSetAcceptEncoding()) {
551: headers.put("Accept-Encoding", Arrays
552: .asList(new String[] { policy
553: .getAcceptEncoding() }));
554: }
555: if (policy.isSetAcceptLanguage()) {
556: headers.put("Accept-Language", Arrays
557: .asList(new String[] { policy
558: .getAcceptLanguage() }));
559: }
560: if (policy.isSetContentType()) {
561: headers.put("Content-Type",
562: Arrays.asList(new String[] { policy
563: .getContentType() }));
564: }
565: if (policy.isSetCookie()) {
566: headers.put("Cookie", Arrays
567: .asList(new String[] { policy.getCookie() }));
568: }
569: if (policy.isSetBrowserType()) {
570: headers.put("BrowserType",
571: Arrays.asList(new String[] { policy
572: .getBrowserType() }));
573: }
574: if (policy.isSetReferer()) {
575: headers.put("Referer", Arrays
576: .asList(new String[] { policy.getReferer() }));
577: }
578: }
579:
580: @SuppressWarnings("unchecked")
581: void flushHeaders() throws IOException {
582: Map<String, List<String>> headers = (Map<String, List<String>>) super
583: .get(HTTP_REQUEST_HEADERS);
584: if (null != headers) {
585: for (String header : headers.keySet()) {
586: List<String> headerList = headers.get(header);
587: for (String string : headerList) {
588: connection.addRequestProperty(header, string);
589: }
590: }
591: }
592:
593: origOut.resetOut(new BufferedOutputStream(connection
594: .getOutputStream(), 1024));
595: }
596:
597: public void setFault(boolean isFault) {
598: //nothing to do
599: }
600:
601: public boolean isFault() {
602: return false;
603: }
604:
605: public void setOneWay(boolean isOneWay) {
606: put(ONEWAY_MESSAGE_TF, isOneWay);
607: }
608:
609: public boolean isOneWay() {
610: return ((Boolean) get(ONEWAY_MESSAGE_TF)).booleanValue();
611: }
612:
613: public OutputStream getOutputStream() {
614: return out;
615: }
616:
617: public void setOutputStream(OutputStream o) {
618: out = o;
619: }
620:
621: public InputStreamMessageContext getCorrespondingInputStreamContext()
622: throws IOException {
623: if (inputStreamContext == null) {
624: inputStreamContext = new HTTPClientInputStreamContext(
625: connection);
626: }
627: return inputStreamContext;
628: }
629:
630: private class WrappedOutputStream extends FilterOutputStream {
631: WrappedOutputStream() {
632: super (new ByteArrayOutputStream());
633: }
634:
635: void resetOut(OutputStream newOut) throws IOException {
636: ByteArrayOutputStream bout = (ByteArrayOutputStream) out;
637: if (bout.size() > 0) {
638: bout.writeTo(newOut);
639: }
640: out = newOut;
641: }
642:
643: public void close() throws IOException {
644: out.flush();
645: if (inputStreamContext != null) {
646: inputStreamContext.initialise();
647: }
648: }
649: }
650: }
651:
652: static class HTTPClientInputStreamContext extends
653: GenericMessageContext implements InputStreamMessageContext {
654:
655: private static final long serialVersionUID = 1L;
656:
657: final URLConnection connection;
658: InputStream origInputStream;
659: InputStream inStream;
660: private boolean initialised;
661:
662: public HTTPClientInputStreamContext(URLConnection con)
663: throws IOException {
664: connection = con;
665: initialise();
666: }
667:
668: /**
669: * Calling getHeaderFields on the connection implicitly gets
670: * the InputStream from the connection. Getting the
671: * InputStream implicitly closes the output stream which
672: * renders it unwritable. The InputStream context is created
673: * before the binding is finished with it. For this reason it
674: * is necessary to initialise the InputStreamContext lazily.
675: * When the OutputStream associated with this connection is
676: * closed, it will invoke on this initialise method.
677: */
678: void initialise() throws IOException {
679: if (!initialised) {
680: put(ObjectMessageContext.MESSAGE_INPUT, false);
681: put(HTTP_RESPONSE_HEADERS, connection.getHeaderFields());
682: put(HTTP_RESPONSE_CODE, getResponseCode(connection));
683: if (connection instanceof HttpURLConnection) {
684: HttpURLConnection hc = (HttpURLConnection) connection;
685: origInputStream = hc.getErrorStream();
686: if (null == origInputStream) {
687: origInputStream = connection.getInputStream();
688: }
689: } else {
690: origInputStream = connection.getInputStream();
691: }
692:
693: inStream = origInputStream;
694: initialised = true;
695: }
696: }
697:
698: public InputStream getInputStream() {
699: try {
700: initialise();
701: } catch (IOException ex) {
702: throw new RuntimeException(ex);
703: }
704: return inStream;
705: }
706:
707: public void setInputStream(InputStream ins) {
708: inStream = ins;
709: }
710:
711: public void setFault(boolean isFault) {
712: //nothing to do
713: }
714:
715: public boolean isFault() {
716: assert get(HTTP_RESPONSE_CODE) != null;
717: return ((Integer) get(HTTP_RESPONSE_CODE)).intValue() == 500;
718: }
719: }
720:
721: static class HTTPDecoupledClientInputStreamContext extends
722: GenericMessageContext implements InputStreamMessageContext {
723:
724: InputStream inStream;
725:
726: public HTTPDecoupledClientInputStreamContext(
727: HttpRequest decoupledResponse) throws IOException {
728: put(ObjectMessageContext.MESSAGE_INPUT, false);
729: put(HTTP_RESPONSE_HEADERS, decoupledResponse
730: .getParameters());
731: put(HTTP_RESPONSE_CODE, HttpURLConnection.HTTP_ACCEPTED);
732: inStream = drain(decoupledResponse.getInputStream());
733: }
734:
735: public InputStream getInputStream() {
736: return inStream;
737: }
738:
739: public void setInputStream(InputStream ins) {
740: inStream = ins;
741: }
742:
743: public void setFault(boolean isFault) {
744: //nothing to do
745: }
746:
747: public boolean isFault() {
748: return false;
749: }
750:
751: private static InputStream drain(InputStream r)
752: throws IOException {
753: byte[] bytes = new byte[4096];
754: ByteArrayOutputStream w = new ByteArrayOutputStream();
755: try {
756: int offset = 0;
757: int length = r.read(bytes, offset, bytes.length
758: - offset);
759: while (length != -1) {
760: offset += length;
761:
762: if (offset == bytes.length) {
763: w.write(bytes, 0, bytes.length);
764: offset = 0;
765: }
766:
767: length = r.read(bytes, offset, bytes.length
768: - offset);
769: }
770: if (offset != 0) {
771: w.write(bytes, 0, offset);
772: }
773: } finally {
774: bytes = null;
775: }
776: return new ByteArrayInputStream(w.toByteArray());
777: }
778: }
779:
780: private class InputStreamMessageContextCallable implements
781: Callable<InputStreamMessageContext> {
782: private final HTTPClientOutputStreamContext httpClientOutputStreamContext;
783:
784: InputStreamMessageContextCallable(
785: HTTPClientOutputStreamContext ctx) {
786: httpClientOutputStreamContext = ctx;
787: }
788:
789: public InputStreamMessageContext call() throws Exception {
790: return getResponseContext(httpClientOutputStreamContext);
791: }
792: }
793:
794: private class DecoupledHandler extends AbstractHttpHandler {
795: private ResponseCallback responseCallback;
796: private int refCount;
797:
798: DecoupledHandler(ResponseCallback callback) {
799: responseCallback = callback;
800: }
801:
802: synchronized ResponseCallback duplicate() {
803: refCount++;
804: return responseCallback;
805: }
806:
807: synchronized void release() {
808: if (--refCount == 0) {
809: try {
810: decoupledEngine.removeServant(decoupledAddress);
811: JettyHTTPServerEngine.destroyForPort(decoupledURL
812: .getPort());
813: } catch (IOException ex) {
814: //ignore
815: }
816: }
817: }
818:
819: public void handle(String pathInContext, String pathParams,
820: HttpRequest req, HttpResponse resp) throws IOException {
821: HTTPDecoupledClientInputStreamContext ctx = new HTTPDecoupledClientInputStreamContext(
822: req);
823: responseCallback.dispatch(ctx);
824: resp.commit();
825: req.setHandled(true);
826: }
827: }
828: }
|