Source Code Cross Referenced for HTTPClientTransport.java in  » ESB » celtix-1.0 » org » objectweb » celtix » bus » transports » http » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » ESB » celtix 1.0 » org.objectweb.celtix.bus.transports.http 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        package org.objectweb.celtix.bus.transports.http;
002:
003:        import java.io.BufferedOutputStream;
004:        import java.io.ByteArrayInputStream;
005:        import java.io.ByteArrayOutputStream;
006:        import java.io.FilterOutputStream;
007:        import java.io.IOException;
008:        import java.io.InputStream;
009:        import java.io.OutputStream;
010:        import java.net.HttpURLConnection;
011:        import java.net.InetSocketAddress;
012:        import java.net.Proxy;
013:        import java.net.URL;
014:        import java.net.URLConnection;
015:        import java.util.Arrays;
016:        import java.util.HashMap;
017:        import java.util.List;
018:        import java.util.Map;
019:        import java.util.concurrent.Callable;
020:        import java.util.concurrent.Executor;
021:        import java.util.concurrent.Future;
022:        import java.util.concurrent.FutureTask;
023:        import java.util.logging.Level;
024:        import java.util.logging.Logger;
025:
026:        import javax.net.ssl.HttpsURLConnection;
027:        import javax.wsdl.Port;
028:        import javax.wsdl.WSDLException;
029:        import javax.xml.ws.BindingProvider;
030:        import javax.xml.ws.WebServiceException;
031:        import javax.xml.ws.handler.MessageContext;
032:
033:        import static javax.xml.ws.handler.MessageContext.HTTP_RESPONSE_CODE;
034:
035:        import org.mortbay.http.HttpRequest;
036:        import org.mortbay.http.HttpResponse;
037:        import org.mortbay.http.handler.AbstractHttpHandler;
038:        import org.objectweb.celtix.Bus;
039:        import org.objectweb.celtix.bindings.BindingContextUtils;
040:        import org.objectweb.celtix.bindings.ClientBinding;
041:        import org.objectweb.celtix.bindings.ResponseCallback;
042:        import org.objectweb.celtix.bus.busimpl.ComponentCreatedEvent;
043:        import org.objectweb.celtix.bus.busimpl.ComponentRemovedEvent;
044:        import org.objectweb.celtix.bus.configuration.security.AuthorizationPolicy;
045:        import org.objectweb.celtix.bus.configuration.security.SSLClientPolicy;
046:        import org.objectweb.celtix.bus.configuration.wsdl.WsdlHttpConfigurationProvider;
047:        import org.objectweb.celtix.bus.configuration.wsdl.WsdlPortProvider;
048:        import org.objectweb.celtix.bus.management.counters.TransportClientCounters;
049:        import org.objectweb.celtix.bus.transports.https.JettySslClientConfigurer;
050:        import org.objectweb.celtix.common.logging.LogUtils;
051:        import org.objectweb.celtix.common.util.Base64Utility;
052:        import org.objectweb.celtix.configuration.Configuration;
053:        import org.objectweb.celtix.configuration.ConfigurationBuilder;
054:        import org.objectweb.celtix.configuration.ConfigurationBuilderFactory;
055:        import org.objectweb.celtix.context.GenericMessageContext;
056:        import org.objectweb.celtix.context.InputStreamMessageContext;
057:        import org.objectweb.celtix.context.MessageContextWrapper;
058:        import org.objectweb.celtix.context.ObjectMessageContext;
059:        import org.objectweb.celtix.context.OutputStreamMessageContext;
060:        import org.objectweb.celtix.transports.ClientTransport;
061:        import org.objectweb.celtix.transports.http.configuration.HTTPClientPolicy;
062:        import org.objectweb.celtix.ws.addressing.EndpointReferenceType;
063:        import org.objectweb.celtix.wsdl.EndpointReferenceUtils;
064:
065:        public class HTTPClientTransport implements  ClientTransport {
066:
067:            private static final Logger LOG = LogUtils
068:                    .getL7dLogger(HTTPClientTransport.class);
069:
070:            private static final String PORT_CONFIGURATION_URI = "http://celtix.objectweb.org/bus/jaxws/port-config";
071:            private static final String HTTP_CLIENT_CONFIGURATION_URI = "http://celtix.objectweb.org/bus/transports/http/http-client-config";
072:            private static final String HTTP_CLIENT_CONFIGURATION_ID = "http-client";
073:
074:            final HTTPClientPolicy policy;
075:            final SSLClientPolicy sslClientPolicy;
076:            final AuthorizationPolicy authPolicy;
077:            final AuthorizationPolicy proxyAuthPolicy;
078:            final Configuration configuration;
079:            final Configuration portConfiguration;
080:            final EndpointReferenceType targetEndpoint;
081:            final Bus bus;
082:            final Port port;
083:            final HTTPTransportFactory factory;
084:
085:            URL url;
086:            TransportClientCounters counters;
087:
088:            private JettyHTTPServerEngine decoupledEngine;
089:            private EndpointReferenceType decoupledEndpoint;
090:            private String decoupledAddress;
091:            private URL decoupledURL;
092:            private ClientBinding clientBinding;
093:            private ResponseCallback responseCallback;
094:
095:            public HTTPClientTransport(Bus b, EndpointReferenceType ref,
096:                    ClientBinding binding, HTTPTransportFactory f)
097:                    throws WSDLException, IOException {
098:
099:                bus = b;
100:                portConfiguration = getPortConfiguration(bus, ref);
101:                String address = portConfiguration.getString("address");
102:                EndpointReferenceUtils.setAddress(ref, address);
103:                targetEndpoint = ref;
104:                clientBinding = binding;
105:                factory = f;
106:                url = new URL(address);
107:                counters = new TransportClientCounters("HTTPClientTransport");
108:
109:                port = EndpointReferenceUtils
110:                        .getPort(bus.getWSDLManager(), ref);
111:                configuration = createConfiguration(portConfiguration);
112:                policy = getClientPolicy(configuration);
113:                authPolicy = getAuthPolicy("authorization", configuration);
114:                proxyAuthPolicy = getAuthPolicy("proxyAuthorization",
115:                        configuration);
116:                sslClientPolicy = getSSLClientPolicy(configuration);
117:                bus.sendEvent(new ComponentCreatedEvent(this ));
118:
119:            }
120:
121:            private HTTPClientPolicy getClientPolicy(Configuration conf) {
122:                HTTPClientPolicy pol = conf.getObject(HTTPClientPolicy.class,
123:                        "httpClient");
124:                if (pol == null) {
125:                    pol = new HTTPClientPolicy();
126:                }
127:                return pol;
128:            }
129:
130:            private AuthorizationPolicy getAuthPolicy(String type,
131:                    Configuration conf) {
132:                AuthorizationPolicy pol = conf.getObject(
133:                        AuthorizationPolicy.class, type);
134:                if (pol == null) {
135:                    pol = new AuthorizationPolicy();
136:                }
137:                return pol;
138:            }
139:
140:            private SSLClientPolicy getSSLClientPolicy(Configuration conf) {
141:                SSLClientPolicy pol = conf.getObject(SSLClientPolicy.class,
142:                        "sslClient");
143:                if (pol == null) {
144:                    pol = new SSLClientPolicy();
145:                }
146:                return pol;
147:            }
148:
149:            public EndpointReferenceType getTargetEndpoint() {
150:                return targetEndpoint;
151:            }
152:
153:            public synchronized EndpointReferenceType getDecoupledEndpoint()
154:                    throws IOException {
155:                if (decoupledEndpoint == null
156:                        && policy.getDecoupledEndpoint() != null) {
157:                    decoupledEndpoint = setUpDecoupledEndpoint();
158:                }
159:                return decoupledEndpoint;
160:            }
161:
162:            public Port getPort() {
163:                return port;
164:            }
165:
166:            public OutputStreamMessageContext createOutputStreamContext(
167:                    MessageContext context) throws IOException {
168:                return new HTTPClientOutputStreamContext(url, policy,
169:                        authPolicy, proxyAuthPolicy, sslClientPolicy, context,
170:                        portConfiguration);
171:            }
172:
173:            public void finalPrepareOutputStreamContext(
174:                    OutputStreamMessageContext context) throws IOException {
175:                HTTPClientOutputStreamContext ctx = (HTTPClientOutputStreamContext) context;
176:                ctx.flushHeaders();
177:            }
178:
179:            public void invokeOneway(OutputStreamMessageContext context)
180:                    throws IOException {
181:                try {
182:                    HTTPClientOutputStreamContext ctx = (HTTPClientOutputStreamContext) context;
183:                    context.getOutputStream().close();
184:                    ctx.getCorrespondingInputStreamContext().getInputStream()
185:                            .close();
186:                    counters.getInvokeOneWay().increase();
187:                } catch (Exception ex) {
188:                    counters.getInvokeError().increase();
189:                    throw new IOException(ex.getMessage());
190:                }
191:            }
192:
193:            public InputStreamMessageContext invoke(
194:                    OutputStreamMessageContext context) throws IOException {
195:                try {
196:                    context.getOutputStream().close();
197:                    HTTPClientOutputStreamContext requestContext = (HTTPClientOutputStreamContext) context;
198:                    counters.getInvoke().increase();
199:                    return getResponseContext(requestContext);
200:                } catch (Exception ex) {
201:                    counters.getInvokeError().increase();
202:                    throw new IOException(ex.getMessage());
203:                }
204:            }
205:
206:            public Future<InputStreamMessageContext> invokeAsync(
207:                    OutputStreamMessageContext context, Executor executor)
208:                    throws IOException {
209:                try {
210:                    context.getOutputStream().close();
211:                    HTTPClientOutputStreamContext ctx = (HTTPClientOutputStreamContext) context;
212:                    FutureTask<InputStreamMessageContext> f = new FutureTask<InputStreamMessageContext>(
213:                            getInputStreamMessageContextCallable(ctx));
214:                    // client (service) must always have an executor associated with it
215:                    executor.execute(f);
216:                    counters.getInvokeAsync().increase();
217:                    return f;
218:                } catch (Exception ex) {
219:                    counters.getInvokeError().increase();
220:                    throw new IOException(ex.getMessage());
221:                }
222:            }
223:
224:            public ResponseCallback getResponseCallback() {
225:                return responseCallback;
226:            }
227:
228:            public void shutdown() {
229:                if (url != null) {
230:                    try {
231:                        URLConnection connect = url.openConnection();
232:                        if (connect instanceof  HttpURLConnection) {
233:                            ((HttpURLConnection) connect).disconnect();
234:                        }
235:                    } catch (IOException ex) {
236:                        //ignore
237:                    }
238:                    url = null;
239:                }
240:
241:                if (decoupledURL != null && decoupledEngine != null) {
242:                    try {
243:                        DecoupledHandler decoupledHandler = (DecoupledHandler) decoupledEngine
244:                                .getServant(decoupledAddress);
245:                        if (decoupledHandler != null) {
246:                            decoupledHandler.release();
247:                        }
248:                    } catch (IOException ioe) {
249:                        // ignore
250:                    }
251:                }
252:
253:                bus.sendEvent(new ComponentRemovedEvent(this ));
254:            }
255:
256:            protected InputStreamMessageContext getResponseContext(
257:                    HTTPClientOutputStreamContext requestContext)
258:                    throws IOException {
259:                InputStreamMessageContext responseContext = null;
260:                if (hasDecoupledEndpoint()) {
261:                    int responseCode = getResponseCode(requestContext.connection);
262:                    if (responseCode == HttpURLConnection.HTTP_ACCEPTED) {
263:                        // server transport was rebased on decoupled response endpoint,
264:                        // dispatch this partial response immediately as it may include
265:                        // piggybacked content
266:                        responseContext = requestContext
267:                                .getCorrespondingInputStreamContext();
268:                        BindingContextUtils.storeDecoupledResponse(
269:                                responseContext, true);
270:                    } else {
271:                        // request failed *before* server transport was rebased on
272:                        // decoupled response endpoint
273:                        responseContext = requestContext
274:                                .getCorrespondingInputStreamContext();
275:                    }
276:                } else {
277:                    responseContext = requestContext
278:                            .getCorrespondingInputStreamContext();
279:                }
280:                return responseContext;
281:            }
282:
283:            private EndpointReferenceType setUpDecoupledEndpoint() {
284:                EndpointReferenceType reference = EndpointReferenceUtils
285:                        .getEndpointReference(policy.getDecoupledEndpoint());
286:                if (reference != null) {
287:                    decoupledAddress = reference.getAddress().getValue();
288:                    LOG
289:                            .info("creating decoupled endpoint: "
290:                                    + decoupledAddress);
291:                    try {
292:                        decoupledURL = new URL(decoupledAddress);
293:                        decoupledEngine = JettyHTTPServerEngine.getForPort(bus,
294:                                decoupledURL.getProtocol(), decoupledURL
295:                                        .getPort());
296:                        DecoupledHandler decoupledHandler = (DecoupledHandler) decoupledEngine
297:                                .getServant(decoupledAddress);
298:                        if (decoupledHandler == null) {
299:                            responseCallback = clientBinding
300:                                    .createResponseCallback();
301:                            decoupledEngine.addServant(decoupledAddress,
302:                                    new DecoupledHandler(responseCallback));
303:                        } else {
304:                            responseCallback = decoupledHandler.duplicate();
305:                        }
306:
307:                    } catch (Exception e) {
308:                        // REVISIT move message to localizable Messages.properties
309:                        LOG.log(Level.WARNING,
310:                                "decoupled endpoint creation failed: ", e);
311:                    }
312:                }
313:                return reference;
314:            }
315:
316:            protected synchronized boolean hasDecoupledEndpoint() {
317:                return decoupledEndpoint != null;
318:            }
319:
320:            protected static Configuration getPortConfiguration(Bus bus,
321:                    EndpointReferenceType ref) {
322:                Configuration busConfiguration = bus.getConfiguration();
323:                String id = EndpointReferenceUtils.getServiceName(ref)
324:                        .toString()
325:                        + "/" + EndpointReferenceUtils.getPortName(ref);
326:                Configuration portConfiguration = busConfiguration.getChild(
327:                        PORT_CONFIGURATION_URI, id);
328:
329:                if (portConfiguration == null) {
330:                    ConfigurationBuilder cb = ConfigurationBuilderFactory
331:                            .getBuilder(null);
332:                    portConfiguration = cb.getConfiguration(
333:                            PORT_CONFIGURATION_URI, id, bus.getConfiguration());
334:                    if (null == portConfiguration) {
335:                        portConfiguration = cb.buildConfiguration(
336:                                PORT_CONFIGURATION_URI, id, bus
337:                                        .getConfiguration());
338:                    }
339:
340:                    // add the additional provider
341:                    Port port = null;
342:                    try {
343:                        port = EndpointReferenceUtils.getPort(bus
344:                                .getWSDLManager(), ref);
345:                    } catch (WSDLException ex) {
346:                        throw new WebServiceException(
347:                                "Could not get port from wsdl", ex);
348:                    }
349:                    portConfiguration.getProviders().add(
350:                            new WsdlPortProvider(port));
351:                }
352:                return portConfiguration;
353:            }
354:
355:            private Configuration createConfiguration(Configuration portCfg) {
356:                ConfigurationBuilder cb = ConfigurationBuilderFactory
357:                        .getBuilder(null);
358:                Configuration cfg = cb.getConfiguration(
359:                        HTTP_CLIENT_CONFIGURATION_URI,
360:                        HTTP_CLIENT_CONFIGURATION_ID, portCfg);
361:                if (null == cfg) {
362:                    cfg = cb.buildConfiguration(HTTP_CLIENT_CONFIGURATION_URI,
363:                            HTTP_CLIENT_CONFIGURATION_ID, portCfg);
364:                }
365:                // register the additional provider
366:                if (null != port) {
367:                    cfg.getProviders().add(
368:                            new WsdlHttpConfigurationProvider(port, false));
369:                }
370:                return cfg;
371:            }
372:
373:            protected static int getResponseCode(URLConnection connection)
374:                    throws IOException {
375:                int responseCode = HttpURLConnection.HTTP_OK;
376:                if (connection instanceof  HttpURLConnection) {
377:                    HttpURLConnection hc = (HttpURLConnection) connection;
378:                    responseCode = hc.getResponseCode();
379:                } else {
380:                    if (connection.getHeaderField(HTTP_RESPONSE_CODE) != null) {
381:                        responseCode = Integer.parseInt(connection
382:                                .getHeaderField(HTTP_RESPONSE_CODE));
383:                    }
384:                }
385:                return responseCode;
386:            }
387:
388:            protected InputStreamMessageContextCallable getInputStreamMessageContextCallable(
389:                    HTTPClientOutputStreamContext context) {
390:                return new InputStreamMessageContextCallable(context);
391:            }
392:
393:            protected static class HTTPClientOutputStreamContext extends
394:                    MessageContextWrapper implements  OutputStreamMessageContext {
395:
396:                URLConnection connection;
397:                WrappedOutputStream origOut;
398:                OutputStream out;
399:                HTTPClientInputStreamContext inputStreamContext;
400:                HTTPClientPolicy policy;
401:                AuthorizationPolicy authPolicy;
402:                AuthorizationPolicy proxyAuthPolicy;
403:                SSLClientPolicy sslClientPolicy;
404:                Configuration portConfiguration;
405:
406:                @SuppressWarnings("unchecked")
407:                public HTTPClientOutputStreamContext(URL url,
408:                        HTTPClientPolicy p, AuthorizationPolicy ap,
409:                        AuthorizationPolicy pap, SSLClientPolicy sslcp,
410:                        MessageContext ctx, Configuration configParam)
411:                        throws IOException {
412:                    super (ctx);
413:
414:                    Map<String, List<String>> headers = (Map<String, List<String>>) super 
415:                            .get(HTTP_REQUEST_HEADERS);
416:                    if (null == headers) {
417:                        headers = new HashMap<String, List<String>>();
418:                        super .put(HTTP_REQUEST_HEADERS, headers);
419:                    }
420:
421:                    policy = p;
422:                    authPolicy = ap;
423:                    proxyAuthPolicy = pap;
424:                    sslClientPolicy = sslcp;
425:                    portConfiguration = configParam;
426:                    String value = (String) ctx
427:                            .get(BindingProvider.ENDPOINT_ADDRESS_PROPERTY);
428:                    if (value != null) {
429:                        url = new URL(value);
430:                    }
431:
432:                    if (policy.isSetProxyServer()) {
433:                        Proxy proxy = new Proxy(Proxy.Type.valueOf(policy
434:                                .getProxyServerType().toString()),
435:                                new InetSocketAddress(policy.getProxyServer(),
436:                                        policy.getProxyServerPort()));
437:                        connection = url.openConnection(proxy);
438:                    } else {
439:                        connection = url.openConnection();
440:                    }
441:                    connection.setDoOutput(true);
442:
443:                    if (connection instanceof  HttpURLConnection) {
444:                        HttpURLConnection hc = (HttpURLConnection) connection;
445:                        hc.setRequestMethod("POST");
446:                    }
447:
448:                    connection.setConnectTimeout((int) policy
449:                            .getConnectionTimeout());
450:                    connection.setReadTimeout((int) policy.getReceiveTimeout());
451:
452:                    connection.setUseCaches(false);
453:                    if (connection instanceof  HttpURLConnection) {
454:                        HttpURLConnection hc = (HttpURLConnection) connection;
455:                        if (policy.isAutoRedirect()) {
456:                            //cannot use chunking if autoredirect as the request will need to be
457:                            //completely cached locally and resent to the redirect target
458:                            hc.setInstanceFollowRedirects(true);
459:                        } else {
460:                            hc.setInstanceFollowRedirects(false);
461:                            if (policy.isAllowChunking()) {
462:                                hc.setChunkedStreamingMode(2048);
463:                            }
464:                        }
465:                    }
466:                    setPolicies(headers);
467:                    if (connection instanceof  HttpsURLConnection) {
468:                        setSSLPolicies();
469:                    }
470:
471:                    origOut = new WrappedOutputStream();
472:                    out = origOut;
473:                }
474:
475:                private void setSSLPolicies() {
476:                    JettySslClientConfigurer sslClientConfigurer = new JettySslClientConfigurer(
477:                            sslClientPolicy, connection, portConfiguration);
478:                    sslClientConfigurer.configure();
479:                }
480:
481:                private void setPolicies(Map<String, List<String>> headers) {
482:                    String userName = (String) get(BindingProvider.USERNAME_PROPERTY);
483:                    if (userName == null && authPolicy.isSetUserName()) {
484:                        userName = authPolicy.getUserName();
485:                    }
486:                    if (userName != null) {
487:                        String passwd = (String) get(BindingProvider.PASSWORD_PROPERTY);
488:                        if (passwd == null && authPolicy.isSetPassword()) {
489:                            passwd = authPolicy.getPassword();
490:                        }
491:                        userName += ":";
492:                        if (passwd != null) {
493:                            userName += passwd;
494:                        }
495:                        userName = Base64Utility.encode(userName.getBytes());
496:                        headers.put("Authorization", Arrays
497:                                .asList(new String[] { "Basic " + userName }));
498:                    } else if (authPolicy.isSetAuthorizationType()
499:                            && authPolicy.isSetAuthorization()) {
500:                        String type = authPolicy.getAuthorizationType();
501:                        type += " ";
502:                        type += authPolicy.getAuthorization();
503:                        headers.put("Authorization", Arrays
504:                                .asList(new String[] { type }));
505:                    }
506:                    if (proxyAuthPolicy.isSetUserName()) {
507:                        userName = proxyAuthPolicy.getUserName();
508:                        if (userName != null) {
509:                            String passwd = "";
510:                            if (proxyAuthPolicy.isSetPassword()) {
511:                                passwd = proxyAuthPolicy.getPassword();
512:                            }
513:                            userName += ":";
514:                            if (passwd != null) {
515:                                userName += passwd;
516:                            }
517:                            userName = Base64Utility
518:                                    .encode(userName.getBytes());
519:                            headers.put("Proxy-Authorization",
520:                                    Arrays.asList(new String[] { "Basic "
521:                                            + userName }));
522:                        } else if (proxyAuthPolicy.isSetAuthorizationType()
523:                                && proxyAuthPolicy.isSetAuthorization()) {
524:                            String type = proxyAuthPolicy
525:                                    .getAuthorizationType();
526:                            type += " ";
527:                            type += proxyAuthPolicy.getAuthorization();
528:                            headers.put("Proxy-Authorization", Arrays
529:                                    .asList(new String[] { type }));
530:                        }
531:                    }
532:                    if (policy.isSetCacheControl()) {
533:                        headers.put("Cache-Control", Arrays
534:                                .asList(new String[] { policy.getCacheControl()
535:                                        .value() }));
536:                    }
537:                    if (policy.isSetHost()) {
538:                        headers.put("Host", Arrays.asList(new String[] { policy
539:                                .getHost() }));
540:                    }
541:                    if (policy.isSetConnection()) {
542:                        headers.put("Connection", Arrays
543:                                .asList(new String[] { policy.getConnection()
544:                                        .value() }));
545:                    }
546:                    if (policy.isSetAccept()) {
547:                        headers.put("Accept", Arrays
548:                                .asList(new String[] { policy.getAccept() }));
549:                    }
550:                    if (policy.isSetAcceptEncoding()) {
551:                        headers.put("Accept-Encoding", Arrays
552:                                .asList(new String[] { policy
553:                                        .getAcceptEncoding() }));
554:                    }
555:                    if (policy.isSetAcceptLanguage()) {
556:                        headers.put("Accept-Language", Arrays
557:                                .asList(new String[] { policy
558:                                        .getAcceptLanguage() }));
559:                    }
560:                    if (policy.isSetContentType()) {
561:                        headers.put("Content-Type",
562:                                Arrays.asList(new String[] { policy
563:                                        .getContentType() }));
564:                    }
565:                    if (policy.isSetCookie()) {
566:                        headers.put("Cookie", Arrays
567:                                .asList(new String[] { policy.getCookie() }));
568:                    }
569:                    if (policy.isSetBrowserType()) {
570:                        headers.put("BrowserType",
571:                                Arrays.asList(new String[] { policy
572:                                        .getBrowserType() }));
573:                    }
574:                    if (policy.isSetReferer()) {
575:                        headers.put("Referer", Arrays
576:                                .asList(new String[] { policy.getReferer() }));
577:                    }
578:                }
579:
580:                @SuppressWarnings("unchecked")
581:                void flushHeaders() throws IOException {
582:                    Map<String, List<String>> headers = (Map<String, List<String>>) super 
583:                            .get(HTTP_REQUEST_HEADERS);
584:                    if (null != headers) {
585:                        for (String header : headers.keySet()) {
586:                            List<String> headerList = headers.get(header);
587:                            for (String string : headerList) {
588:                                connection.addRequestProperty(header, string);
589:                            }
590:                        }
591:                    }
592:
593:                    origOut.resetOut(new BufferedOutputStream(connection
594:                            .getOutputStream(), 1024));
595:                }
596:
597:                public void setFault(boolean isFault) {
598:                    //nothing to do
599:                }
600:
601:                public boolean isFault() {
602:                    return false;
603:                }
604:
605:                public void setOneWay(boolean isOneWay) {
606:                    put(ONEWAY_MESSAGE_TF, isOneWay);
607:                }
608:
609:                public boolean isOneWay() {
610:                    return ((Boolean) get(ONEWAY_MESSAGE_TF)).booleanValue();
611:                }
612:
613:                public OutputStream getOutputStream() {
614:                    return out;
615:                }
616:
617:                public void setOutputStream(OutputStream o) {
618:                    out = o;
619:                }
620:
621:                public InputStreamMessageContext getCorrespondingInputStreamContext()
622:                        throws IOException {
623:                    if (inputStreamContext == null) {
624:                        inputStreamContext = new HTTPClientInputStreamContext(
625:                                connection);
626:                    }
627:                    return inputStreamContext;
628:                }
629:
630:                private class WrappedOutputStream extends FilterOutputStream {
631:                    WrappedOutputStream() {
632:                        super (new ByteArrayOutputStream());
633:                    }
634:
635:                    void resetOut(OutputStream newOut) throws IOException {
636:                        ByteArrayOutputStream bout = (ByteArrayOutputStream) out;
637:                        if (bout.size() > 0) {
638:                            bout.writeTo(newOut);
639:                        }
640:                        out = newOut;
641:                    }
642:
643:                    public void close() throws IOException {
644:                        out.flush();
645:                        if (inputStreamContext != null) {
646:                            inputStreamContext.initialise();
647:                        }
648:                    }
649:                }
650:            }
651:
652:            static class HTTPClientInputStreamContext extends
653:                    GenericMessageContext implements  InputStreamMessageContext {
654:
655:                private static final long serialVersionUID = 1L;
656:
657:                final URLConnection connection;
658:                InputStream origInputStream;
659:                InputStream inStream;
660:                private boolean initialised;
661:
662:                public HTTPClientInputStreamContext(URLConnection con)
663:                        throws IOException {
664:                    connection = con;
665:                    initialise();
666:                }
667:
668:                /**
669:                 * Calling getHeaderFields on the connection implicitly gets
670:                 * the InputStream from the connection.  Getting the
671:                 * InputStream implicitly closes the output stream which
672:                 * renders it unwritable.  The InputStream context is created
673:                 * before the binding is finished with it.  For this reason it
674:                 * is necessary to initialise the InputStreamContext lazily.
675:                 * When the OutputStream associated with this connection is
676:                 * closed, it will invoke on this initialise method.
677:                 */
678:                void initialise() throws IOException {
679:                    if (!initialised) {
680:                        put(ObjectMessageContext.MESSAGE_INPUT, false);
681:                        put(HTTP_RESPONSE_HEADERS, connection.getHeaderFields());
682:                        put(HTTP_RESPONSE_CODE, getResponseCode(connection));
683:                        if (connection instanceof  HttpURLConnection) {
684:                            HttpURLConnection hc = (HttpURLConnection) connection;
685:                            origInputStream = hc.getErrorStream();
686:                            if (null == origInputStream) {
687:                                origInputStream = connection.getInputStream();
688:                            }
689:                        } else {
690:                            origInputStream = connection.getInputStream();
691:                        }
692:
693:                        inStream = origInputStream;
694:                        initialised = true;
695:                    }
696:                }
697:
698:                public InputStream getInputStream() {
699:                    try {
700:                        initialise();
701:                    } catch (IOException ex) {
702:                        throw new RuntimeException(ex);
703:                    }
704:                    return inStream;
705:                }
706:
707:                public void setInputStream(InputStream ins) {
708:                    inStream = ins;
709:                }
710:
711:                public void setFault(boolean isFault) {
712:                    //nothing to do
713:                }
714:
715:                public boolean isFault() {
716:                    assert get(HTTP_RESPONSE_CODE) != null;
717:                    return ((Integer) get(HTTP_RESPONSE_CODE)).intValue() == 500;
718:                }
719:            }
720:
721:            static class HTTPDecoupledClientInputStreamContext extends
722:                    GenericMessageContext implements  InputStreamMessageContext {
723:
724:                InputStream inStream;
725:
726:                public HTTPDecoupledClientInputStreamContext(
727:                        HttpRequest decoupledResponse) throws IOException {
728:                    put(ObjectMessageContext.MESSAGE_INPUT, false);
729:                    put(HTTP_RESPONSE_HEADERS, decoupledResponse
730:                            .getParameters());
731:                    put(HTTP_RESPONSE_CODE, HttpURLConnection.HTTP_ACCEPTED);
732:                    inStream = drain(decoupledResponse.getInputStream());
733:                }
734:
735:                public InputStream getInputStream() {
736:                    return inStream;
737:                }
738:
739:                public void setInputStream(InputStream ins) {
740:                    inStream = ins;
741:                }
742:
743:                public void setFault(boolean isFault) {
744:                    //nothing to do
745:                }
746:
747:                public boolean isFault() {
748:                    return false;
749:                }
750:
751:                private static InputStream drain(InputStream r)
752:                        throws IOException {
753:                    byte[] bytes = new byte[4096];
754:                    ByteArrayOutputStream w = new ByteArrayOutputStream();
755:                    try {
756:                        int offset = 0;
757:                        int length = r.read(bytes, offset, bytes.length
758:                                - offset);
759:                        while (length != -1) {
760:                            offset += length;
761:
762:                            if (offset == bytes.length) {
763:                                w.write(bytes, 0, bytes.length);
764:                                offset = 0;
765:                            }
766:
767:                            length = r.read(bytes, offset, bytes.length
768:                                    - offset);
769:                        }
770:                        if (offset != 0) {
771:                            w.write(bytes, 0, offset);
772:                        }
773:                    } finally {
774:                        bytes = null;
775:                    }
776:                    return new ByteArrayInputStream(w.toByteArray());
777:                }
778:            }
779:
780:            private class InputStreamMessageContextCallable implements 
781:                    Callable<InputStreamMessageContext> {
782:                private final HTTPClientOutputStreamContext httpClientOutputStreamContext;
783:
784:                InputStreamMessageContextCallable(
785:                        HTTPClientOutputStreamContext ctx) {
786:                    httpClientOutputStreamContext = ctx;
787:                }
788:
789:                public InputStreamMessageContext call() throws Exception {
790:                    return getResponseContext(httpClientOutputStreamContext);
791:                }
792:            }
793:
794:            private class DecoupledHandler extends AbstractHttpHandler {
795:                private ResponseCallback responseCallback;
796:                private int refCount;
797:
798:                DecoupledHandler(ResponseCallback callback) {
799:                    responseCallback = callback;
800:                }
801:
802:                synchronized ResponseCallback duplicate() {
803:                    refCount++;
804:                    return responseCallback;
805:                }
806:
807:                synchronized void release() {
808:                    if (--refCount == 0) {
809:                        try {
810:                            decoupledEngine.removeServant(decoupledAddress);
811:                            JettyHTTPServerEngine.destroyForPort(decoupledURL
812:                                    .getPort());
813:                        } catch (IOException ex) {
814:                            //ignore
815:                        }
816:                    }
817:                }
818:
819:                public void handle(String pathInContext, String pathParams,
820:                        HttpRequest req, HttpResponse resp) throws IOException {
821:                    HTTPDecoupledClientInputStreamContext ctx = new HTTPDecoupledClientInputStreamContext(
822:                            req);
823:                    responseCallback.dispatch(ctx);
824:                    resp.commit();
825:                    req.setHandled(true);
826:                }
827:            }
828:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.