Source Code Cross Referenced for SingleHttpSessionIoHandler.java in  » Web-Server » asyncweb » org » safehaus » asyncweb » service » transport » mina » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Web Server » asyncweb » org.safehaus.asyncweb.service.transport.mina 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         *  Licensed to the Apache Software Foundation (ASF) under one
003:         *  or more contributor license agreements.  See the NOTICE file
004:         *  distributed with this work for additional information
005:         *  regarding copyright ownership.  The ASF licenses this file
006:         *  to you under the Apache License, Version 2.0 (the
007:         *  "License"); you may not use this file except in compliance
008:         *  with the License.  You may obtain a copy of the License at
009:         *
010:         *    http://www.apache.org/licenses/LICENSE-2.0
011:         *
012:         *  Unless required by applicable law or agreed to in writing,
013:         *  software distributed under the License is distributed on an
014:         *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015:         *  KIND, either express or implied.  See the License for the
016:         *  specific language governing permissions and limitations
017:         *  under the License.
018:         *
019:         */
020:        package org.safehaus.asyncweb.service.transport.mina;
021:
022:        import java.io.IOException;
023:        import java.net.InetSocketAddress;
024:
025:        import org.apache.mina.common.DefaultWriteRequest;
026:        import org.apache.mina.common.IdleStatus;
027:        import org.apache.mina.common.IoFilterAdapter;
028:        import org.apache.mina.common.IoFutureListener;
029:        import org.apache.mina.common.IoSession;
030:        import org.apache.mina.common.WriteFuture;
031:        import org.apache.mina.common.WriteRequest;
032:        import org.apache.mina.filter.codec.ProtocolCodecFilter;
033:        import org.apache.mina.filter.codec.ProtocolDecoderException;
034:        import org.apache.mina.handler.multiton.SingleSessionIoHandler;
035:        import org.safehaus.asyncweb.codec.HttpServerCodecFactory;
036:        import org.safehaus.asyncweb.codec.decoder.HttpDecoderException;
037:        import org.safehaus.asyncweb.common.DefaultHttpRequest;
038:        import org.safehaus.asyncweb.common.DefaultHttpResponse;
039:        import org.safehaus.asyncweb.common.HttpRequest;
040:        import org.safehaus.asyncweb.common.HttpResponseStatus;
041:        import org.safehaus.asyncweb.common.HttpVersion;
042:        import org.safehaus.asyncweb.common.MutableHttpResponse;
043:        import org.safehaus.asyncweb.service.HttpServiceContext;
044:        import org.safehaus.asyncweb.service.HttpServiceFilter;
045:        import org.safehaus.asyncweb.service.ServiceContainer;
046:        import org.safehaus.asyncweb.service.context.AbstractHttpServiceContext;
047:        import org.safehaus.asyncweb.service.pipeline.RequestPipeline;
048:        import org.safehaus.asyncweb.service.pipeline.RequestPipelineListener;
049:        import org.safehaus.asyncweb.service.pipeline.StandardRequestPipeline;
050:        import org.slf4j.Logger;
051:        import org.slf4j.LoggerFactory;
052:
053:        class SingleHttpSessionIoHandler implements  SingleSessionIoHandler {
054:
055:            private static final Logger LOG = LoggerFactory
056:                    .getLogger(SingleHttpSessionIoHandler.class);
057:
058:            /**
059:             * The number of parsers we pre-allocate
060:             */
061:            //  private static final int DEFAULT_PARSERS = 5;
062:            /**
063:             * The default idle time
064:             */
065:            private static final int DEFAULT_IDLE_TIME = 30000;
066:
067:            /**
068:             * Out default pipeline
069:             */
070:            private static final int DEFAULT_PIPELINE = 100;
071:
072:            private final ServiceContainer container;
073:            private final IoSession session;
074:            private final RequestPipeline pipeline;
075:
076:            private HttpServiceContext currentContext;
077:            private int readIdleTime = DEFAULT_IDLE_TIME;
078:
079:            SingleHttpSessionIoHandler(ServiceContainer container,
080:                    IoSession session) {
081:                this .container = container;
082:                this .session = session;
083:                this .pipeline = new StandardRequestPipeline(DEFAULT_PIPELINE);
084:
085:                session.getConfig().setIdleTime(IdleStatus.READER_IDLE,
086:                        readIdleTime);
087:
088:                session.getFilterChain().addLast("codec",
089:                        new ProtocolCodecFilter(new HttpServerCodecFactory()));
090:
091:                session.getFilterChain().addLast("converter",
092:                        new ContextConverter());
093:
094:                session.getFilterChain().addLast("pipeline",
095:                        new RequestPipelineAdapter(pipeline));
096:
097:                int i = 0;
098:                for (HttpServiceFilter serviceFilter : container
099:                        .getServiceFilters()) {
100:                    session.getFilterChain().addLast("serviceFilter." + (i++),
101:                            new ServiceFilterAdapter(serviceFilter));
102:                }
103:            }
104:
105:            public void sessionCreated() {
106:            }
107:
108:            public void sessionOpened() {
109:                LOG.info("Connection opened");
110:            }
111:
112:            public void sessionClosed() {
113:                LOG.info("Connection closed");
114:            }
115:
116:            /**
117:             * Invoked when this connection idles out.
118:             * If we are in the process of parsing a request, the current request
119:             * is rejected with a {@link HttpResponseStatus#REQUEST_TIMEOUT} response status.
120:             * 
121:             */
122:            public void sessionIdle(IdleStatus idleType) {
123:                if (session.getIdleCount(idleType) == 1) {
124:                    //      // FIXME currentRequest is always null now; we need to cooperate with a decoder.
125:                    //      if (currentContext != null) {
126:                    //        LOG.info("Read idled out while parsing request. Scheduling timeout response");
127:                    //        handleReadFailure(currentContext, HttpResponseStatus.REQUEST_TIMEOUT, "Timeout while reading request");
128:                    //      } else {
129:                    LOG
130:                            .info("Idled with no current request. Scheduling closure when pipeline empties");
131:                    pipeline.runWhenEmpty(new Runnable() {
132:                        public void run() {
133:                            LOG
134:                                    .info("Pipeline empty after idle. Closing session");
135:                            session.close();
136:                        }
137:                    });
138:                    //      }
139:                }
140:            }
141:
142:            public void exceptionCaught(Throwable cause) {
143:                MutableHttpResponse response = null;
144:                if (cause instanceof  ProtocolDecoderException) {
145:                    HttpResponseStatus status;
146:                    if (cause instanceof  HttpDecoderException) {
147:                        status = ((HttpDecoderException) cause)
148:                                .getResponseStatus();
149:                    } else {
150:                        status = HttpResponseStatus.BAD_REQUEST;
151:                    }
152:
153:                    LOG.warn("Bad request:", cause);
154:
155:                    response = new DefaultHttpResponse();
156:                    response.setProtocolVersion(HttpVersion.HTTP_1_1);
157:                    response.setStatus(status);
158:                } else if (cause instanceof  IOException) {
159:                    LOG.warn("IOException on HTTP connection", cause);
160:                    session.close();
161:                } else {
162:                    response = new DefaultHttpResponse();
163:                    response.setProtocolVersion(HttpVersion.HTTP_1_1);
164:                    response
165:                            .setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
166:                    LOG.warn("Unexpected exception from a service.", cause);
167:                }
168:                if (response != null) {
169:                    HttpServiceContext context = this .currentContext;
170:                    if (context == null) {
171:                        context = createContext(new DefaultHttpRequest());
172:                    }
173:                    context.commitResponse(response);
174:                }
175:            }
176:
177:            public void messageReceived(Object message) {
178:                // FIXME messageReceived invoked only when whole message is built.
179:
180:                // When headers were built
181:                //sendContinuationIfRequested(request);
182:
183:                // When body has been built
184:            }
185:
186:            /**
187:             * Sends a continuation response for the specified request, if
188:             * the client has requested that a continuation response should
189:             * be provided.</br>
190:             * The continuation response is enqueued with our pipe-line.
191:             * Note that we do <i>not</i> commit the request - a final response
192:             * must still be provided.
193:             * 
194:             * TODO: We're not currently adding value here: If we cant route
195:             *       to a service, we'll still accept the request. We should
196:             *       add the ability to inject the request in to the container
197:             *       and let it decide whether the request can be handled.
198:             */
199:            private void sendContinuationIfRequested(HttpServiceContext context) {
200:                if (context.getRequest().requiresContinuationResponse()) {
201:                    MutableHttpResponse continuationResponse = new DefaultHttpResponse();
202:                    continuationResponse.setStatus(HttpResponseStatus.CONTINUE);
203:                    context.commitResponse(continuationResponse);
204:                    LOG.info("Scheduled continuation response");
205:                }
206:            }
207:
208:            /**
209:             * Invoked when we fail to parse an incoming request.
210:             * We configure our parser to discard any further data received from the client,
211:             * and schedule a response with the appropriate failure code for the
212:             * current request
213:             * 
214:             * @param status  The status
215:             * @param message Failure message
216:             */
217:            private void handleReadFailure(HttpServiceContext context,
218:                    HttpResponseStatus status, String message) {
219:                if (LOG.isInfoEnabled()) {
220:                    LOG.info("Failed to handle client request. Reason: "
221:                            + status);
222:                }
223:                MutableHttpResponse response = new DefaultHttpResponse();
224:                response.setStatusReasonPhrase(message);
225:                response.setStatus(status);
226:                context.commitResponse(response);
227:            }
228:
229:            /**
230:             * Invoked when data wrote has been fully written.
231:             * If we have scheduled closure after sending a final response, we will
232:             * be provided with the <code>CLOSE_MARKER</code> as our marker object.<br/>
233:             * This signals us to schedule closure of the connection
234:             * 
235:             * @param message   The marker provided when writing data. If this is
236:             *                 our closure marker, we schedule closure of the connection
237:             */
238:            public void messageSent(Object message) {
239:            }
240:
241:            /**
242:             * Sets the read idle time for all connections
243:             * 
244:             * @param readIdleTime  The read idle time (seconds)
245:             */
246:            public void setReadIdleTime(int readIdleTime) {
247:                this .readIdleTime = readIdleTime;
248:            }
249:
250:            private HttpServiceContext createContext(HttpRequest request) {
251:                return new DefaultHttpServiceContext(request);
252:            }
253:
254:            private class ContextConverter extends IoFilterAdapter {
255:
256:                @Override
257:                public void filterWrite(NextFilter nextFilter,
258:                        IoSession session, WriteRequest writeRequest)
259:                        throws Exception {
260:                    nextFilter.filterWrite(session, new DefaultWriteRequest(
261:                            ((HttpServiceContext) writeRequest.getMessage())
262:                                    .getCommittedResponse(), writeRequest
263:                                    .getFuture()));
264:                }
265:
266:                @Override
267:                public void messageReceived(NextFilter nextFilter,
268:                        IoSession session, Object message) throws Exception {
269:                    HttpRequest request = (HttpRequest) message;
270:                    HttpServiceContext context = createContext(request);
271:                    currentContext = context;
272:                    nextFilter.messageReceived(session, context);
273:                }
274:            }
275:
276:            private class ServiceFilterAdapter extends IoFilterAdapter {
277:                private final HttpServiceFilter filter;
278:
279:                public ServiceFilterAdapter(HttpServiceFilter filter) {
280:                    this .filter = filter;
281:                }
282:
283:                @Override
284:                public void messageReceived(final NextFilter nextFilter,
285:                        final IoSession session, final Object message)
286:                        throws Exception {
287:                    org.safehaus.asyncweb.service.HttpServiceFilter.NextFilter nextFilterAdapter = new org.safehaus.asyncweb.service.HttpServiceFilter.NextFilter() {
288:                        public void invoke() {
289:                            nextFilter.messageReceived(session, message);
290:                        }
291:                    };
292:                    filter.handleRequest(nextFilterAdapter,
293:                            (HttpServiceContext) message);
294:                }
295:
296:                @Override
297:                public void filterWrite(final NextFilter nextFilter,
298:                        final IoSession session, final WriteRequest writeRequest)
299:                        throws Exception {
300:                    org.safehaus.asyncweb.service.HttpServiceFilter.NextFilter nextFilterAdapter = new org.safehaus.asyncweb.service.HttpServiceFilter.NextFilter() {
301:                        public void invoke() {
302:                            nextFilter.filterWrite(session, writeRequest);
303:                        }
304:                    };
305:
306:                    HttpServiceContext context = (HttpServiceContext) writeRequest
307:                            .getMessage();
308:
309:                    filter.handleResponse(nextFilterAdapter, context);
310:                }
311:            }
312:
313:            private class RequestPipelineAdapter extends IoFilterAdapter {
314:
315:                private final RequestPipeline pipeline;
316:
317:                public RequestPipelineAdapter(final RequestPipeline pipeline) {
318:                    this .pipeline = pipeline;
319:                }
320:
321:                public void sessionOpened(final NextFilter nextFilter,
322:                        final IoSession session) {
323:                    pipeline.setPipelineListener(new RequestPipelineListener() {
324:                        public void responseReleased(HttpServiceContext context) {
325:                            nextFilter
326:                                    .filterWrite(
327:                                            session,
328:                                            new DefaultWriteRequest(
329:                                                    context,
330:                                                    ((DefaultHttpServiceContext) context)
331:                                                            .getWriteFuture()));
332:                        }
333:                    });
334:
335:                    nextFilter.sessionOpened(session);
336:                }
337:
338:                @Override
339:                public void messageReceived(NextFilter nextFilter,
340:                        IoSession session, Object message) throws Exception {
341:                    HttpServiceContext context = (HttpServiceContext) message;
342:                    if (pipeline.addRequest(context)) {
343:                        LOG.debug("Allocated slot in request pipeline");
344:                        nextFilter.messageReceived(session, message);
345:                    } else {
346:                        // The client has filled their pipeline. Currently, this
347:                        // triggers closure. Another option would be to drop read interest
348:                        // until we drain. 
349:                        LOG
350:                                .warn("Could not allocate room in the pipeline for request");
351:                        handleReadFailure(context,
352:                                HttpResponseStatus.SERVICE_UNAVAILABLE,
353:                                "Pipeline full");
354:                    }
355:                }
356:
357:                @Override
358:                public void filterWrite(NextFilter nextFilter,
359:                        IoSession session, WriteRequest writeRequest)
360:                        throws Exception {
361:                    DefaultHttpServiceContext context = (DefaultHttpServiceContext) writeRequest
362:                            .getMessage();
363:                    context.setWriteFuture(writeRequest.getFuture());
364:                    pipeline.releaseResponse(context);
365:                    // nextFilter will be invoked when pipeline listener is notified.
366:                }
367:            }
368:
369:            private class DefaultHttpServiceContext extends
370:                    AbstractHttpServiceContext {
371:                private WriteFuture writeFuture;
372:
373:                private DefaultHttpServiceContext(HttpRequest request) {
374:                    super ((InetSocketAddress) session.getRemoteAddress(),
375:                            request, container);
376:                }
377:
378:                private WriteFuture getWriteFuture() {
379:                    return writeFuture;
380:                }
381:
382:                private void setWriteFuture(WriteFuture writeFuture) {
383:                    if (!isResponseCommitted()) {
384:                        throw new IllegalStateException();
385:                    }
386:                    this .writeFuture = writeFuture;
387:                }
388:
389:                @Override
390:                protected void doWrite(boolean requiresClosure) {
391:                    currentContext = null;
392:                    WriteFuture future = session.write(this );
393:                    if (requiresClosure) {
394:                        LOG.debug("Added CLOSE future listener.");
395:                        future.addListener(IoFutureListener.CLOSE);
396:                    }
397:                }
398:            }
399:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.