001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: *
019: */
020: package org.safehaus.asyncweb.service.transport.mina;
021:
022: import java.io.IOException;
023: import java.net.InetSocketAddress;
024:
025: import org.apache.mina.common.DefaultWriteRequest;
026: import org.apache.mina.common.IdleStatus;
027: import org.apache.mina.common.IoFilterAdapter;
028: import org.apache.mina.common.IoFutureListener;
029: import org.apache.mina.common.IoSession;
030: import org.apache.mina.common.WriteFuture;
031: import org.apache.mina.common.WriteRequest;
032: import org.apache.mina.filter.codec.ProtocolCodecFilter;
033: import org.apache.mina.filter.codec.ProtocolDecoderException;
034: import org.apache.mina.handler.multiton.SingleSessionIoHandler;
035: import org.safehaus.asyncweb.codec.HttpServerCodecFactory;
036: import org.safehaus.asyncweb.codec.decoder.HttpDecoderException;
037: import org.safehaus.asyncweb.common.DefaultHttpRequest;
038: import org.safehaus.asyncweb.common.DefaultHttpResponse;
039: import org.safehaus.asyncweb.common.HttpRequest;
040: import org.safehaus.asyncweb.common.HttpResponseStatus;
041: import org.safehaus.asyncweb.common.HttpVersion;
042: import org.safehaus.asyncweb.common.MutableHttpResponse;
043: import org.safehaus.asyncweb.service.HttpServiceContext;
044: import org.safehaus.asyncweb.service.HttpServiceFilter;
045: import org.safehaus.asyncweb.service.ServiceContainer;
046: import org.safehaus.asyncweb.service.context.AbstractHttpServiceContext;
047: import org.safehaus.asyncweb.service.pipeline.RequestPipeline;
048: import org.safehaus.asyncweb.service.pipeline.RequestPipelineListener;
049: import org.safehaus.asyncweb.service.pipeline.StandardRequestPipeline;
050: import org.slf4j.Logger;
051: import org.slf4j.LoggerFactory;
052:
053: class SingleHttpSessionIoHandler implements SingleSessionIoHandler {
054:
055: private static final Logger LOG = LoggerFactory
056: .getLogger(SingleHttpSessionIoHandler.class);
057:
058: /**
059: * The number of parsers we pre-allocate
060: */
061: // private static final int DEFAULT_PARSERS = 5;
062: /**
063: * The default idle time
064: */
065: private static final int DEFAULT_IDLE_TIME = 30000;
066:
067: /**
068: * Out default pipeline
069: */
070: private static final int DEFAULT_PIPELINE = 100;
071:
072: private final ServiceContainer container;
073: private final IoSession session;
074: private final RequestPipeline pipeline;
075:
076: private HttpServiceContext currentContext;
077: private int readIdleTime = DEFAULT_IDLE_TIME;
078:
079: SingleHttpSessionIoHandler(ServiceContainer container,
080: IoSession session) {
081: this .container = container;
082: this .session = session;
083: this .pipeline = new StandardRequestPipeline(DEFAULT_PIPELINE);
084:
085: session.getConfig().setIdleTime(IdleStatus.READER_IDLE,
086: readIdleTime);
087:
088: session.getFilterChain().addLast("codec",
089: new ProtocolCodecFilter(new HttpServerCodecFactory()));
090:
091: session.getFilterChain().addLast("converter",
092: new ContextConverter());
093:
094: session.getFilterChain().addLast("pipeline",
095: new RequestPipelineAdapter(pipeline));
096:
097: int i = 0;
098: for (HttpServiceFilter serviceFilter : container
099: .getServiceFilters()) {
100: session.getFilterChain().addLast("serviceFilter." + (i++),
101: new ServiceFilterAdapter(serviceFilter));
102: }
103: }
104:
105: public void sessionCreated() {
106: }
107:
108: public void sessionOpened() {
109: LOG.info("Connection opened");
110: }
111:
112: public void sessionClosed() {
113: LOG.info("Connection closed");
114: }
115:
116: /**
117: * Invoked when this connection idles out.
118: * If we are in the process of parsing a request, the current request
119: * is rejected with a {@link HttpResponseStatus#REQUEST_TIMEOUT} response status.
120: *
121: */
122: public void sessionIdle(IdleStatus idleType) {
123: if (session.getIdleCount(idleType) == 1) {
124: // // FIXME currentRequest is always null now; we need to cooperate with a decoder.
125: // if (currentContext != null) {
126: // LOG.info("Read idled out while parsing request. Scheduling timeout response");
127: // handleReadFailure(currentContext, HttpResponseStatus.REQUEST_TIMEOUT, "Timeout while reading request");
128: // } else {
129: LOG
130: .info("Idled with no current request. Scheduling closure when pipeline empties");
131: pipeline.runWhenEmpty(new Runnable() {
132: public void run() {
133: LOG
134: .info("Pipeline empty after idle. Closing session");
135: session.close();
136: }
137: });
138: // }
139: }
140: }
141:
142: public void exceptionCaught(Throwable cause) {
143: MutableHttpResponse response = null;
144: if (cause instanceof ProtocolDecoderException) {
145: HttpResponseStatus status;
146: if (cause instanceof HttpDecoderException) {
147: status = ((HttpDecoderException) cause)
148: .getResponseStatus();
149: } else {
150: status = HttpResponseStatus.BAD_REQUEST;
151: }
152:
153: LOG.warn("Bad request:", cause);
154:
155: response = new DefaultHttpResponse();
156: response.setProtocolVersion(HttpVersion.HTTP_1_1);
157: response.setStatus(status);
158: } else if (cause instanceof IOException) {
159: LOG.warn("IOException on HTTP connection", cause);
160: session.close();
161: } else {
162: response = new DefaultHttpResponse();
163: response.setProtocolVersion(HttpVersion.HTTP_1_1);
164: response
165: .setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
166: LOG.warn("Unexpected exception from a service.", cause);
167: }
168: if (response != null) {
169: HttpServiceContext context = this .currentContext;
170: if (context == null) {
171: context = createContext(new DefaultHttpRequest());
172: }
173: context.commitResponse(response);
174: }
175: }
176:
177: public void messageReceived(Object message) {
178: // FIXME messageReceived invoked only when whole message is built.
179:
180: // When headers were built
181: //sendContinuationIfRequested(request);
182:
183: // When body has been built
184: }
185:
186: /**
187: * Sends a continuation response for the specified request, if
188: * the client has requested that a continuation response should
189: * be provided.</br>
190: * The continuation response is enqueued with our pipe-line.
191: * Note that we do <i>not</i> commit the request - a final response
192: * must still be provided.
193: *
194: * TODO: We're not currently adding value here: If we cant route
195: * to a service, we'll still accept the request. We should
196: * add the ability to inject the request in to the container
197: * and let it decide whether the request can be handled.
198: */
199: private void sendContinuationIfRequested(HttpServiceContext context) {
200: if (context.getRequest().requiresContinuationResponse()) {
201: MutableHttpResponse continuationResponse = new DefaultHttpResponse();
202: continuationResponse.setStatus(HttpResponseStatus.CONTINUE);
203: context.commitResponse(continuationResponse);
204: LOG.info("Scheduled continuation response");
205: }
206: }
207:
208: /**
209: * Invoked when we fail to parse an incoming request.
210: * We configure our parser to discard any further data received from the client,
211: * and schedule a response with the appropriate failure code for the
212: * current request
213: *
214: * @param status The status
215: * @param message Failure message
216: */
217: private void handleReadFailure(HttpServiceContext context,
218: HttpResponseStatus status, String message) {
219: if (LOG.isInfoEnabled()) {
220: LOG.info("Failed to handle client request. Reason: "
221: + status);
222: }
223: MutableHttpResponse response = new DefaultHttpResponse();
224: response.setStatusReasonPhrase(message);
225: response.setStatus(status);
226: context.commitResponse(response);
227: }
228:
229: /**
230: * Invoked when data wrote has been fully written.
231: * If we have scheduled closure after sending a final response, we will
232: * be provided with the <code>CLOSE_MARKER</code> as our marker object.<br/>
233: * This signals us to schedule closure of the connection
234: *
235: * @param message The marker provided when writing data. If this is
236: * our closure marker, we schedule closure of the connection
237: */
238: public void messageSent(Object message) {
239: }
240:
241: /**
242: * Sets the read idle time for all connections
243: *
244: * @param readIdleTime The read idle time (seconds)
245: */
246: public void setReadIdleTime(int readIdleTime) {
247: this .readIdleTime = readIdleTime;
248: }
249:
250: private HttpServiceContext createContext(HttpRequest request) {
251: return new DefaultHttpServiceContext(request);
252: }
253:
254: private class ContextConverter extends IoFilterAdapter {
255:
256: @Override
257: public void filterWrite(NextFilter nextFilter,
258: IoSession session, WriteRequest writeRequest)
259: throws Exception {
260: nextFilter.filterWrite(session, new DefaultWriteRequest(
261: ((HttpServiceContext) writeRequest.getMessage())
262: .getCommittedResponse(), writeRequest
263: .getFuture()));
264: }
265:
266: @Override
267: public void messageReceived(NextFilter nextFilter,
268: IoSession session, Object message) throws Exception {
269: HttpRequest request = (HttpRequest) message;
270: HttpServiceContext context = createContext(request);
271: currentContext = context;
272: nextFilter.messageReceived(session, context);
273: }
274: }
275:
276: private class ServiceFilterAdapter extends IoFilterAdapter {
277: private final HttpServiceFilter filter;
278:
279: public ServiceFilterAdapter(HttpServiceFilter filter) {
280: this .filter = filter;
281: }
282:
283: @Override
284: public void messageReceived(final NextFilter nextFilter,
285: final IoSession session, final Object message)
286: throws Exception {
287: org.safehaus.asyncweb.service.HttpServiceFilter.NextFilter nextFilterAdapter = new org.safehaus.asyncweb.service.HttpServiceFilter.NextFilter() {
288: public void invoke() {
289: nextFilter.messageReceived(session, message);
290: }
291: };
292: filter.handleRequest(nextFilterAdapter,
293: (HttpServiceContext) message);
294: }
295:
296: @Override
297: public void filterWrite(final NextFilter nextFilter,
298: final IoSession session, final WriteRequest writeRequest)
299: throws Exception {
300: org.safehaus.asyncweb.service.HttpServiceFilter.NextFilter nextFilterAdapter = new org.safehaus.asyncweb.service.HttpServiceFilter.NextFilter() {
301: public void invoke() {
302: nextFilter.filterWrite(session, writeRequest);
303: }
304: };
305:
306: HttpServiceContext context = (HttpServiceContext) writeRequest
307: .getMessage();
308:
309: filter.handleResponse(nextFilterAdapter, context);
310: }
311: }
312:
313: private class RequestPipelineAdapter extends IoFilterAdapter {
314:
315: private final RequestPipeline pipeline;
316:
317: public RequestPipelineAdapter(final RequestPipeline pipeline) {
318: this .pipeline = pipeline;
319: }
320:
321: public void sessionOpened(final NextFilter nextFilter,
322: final IoSession session) {
323: pipeline.setPipelineListener(new RequestPipelineListener() {
324: public void responseReleased(HttpServiceContext context) {
325: nextFilter
326: .filterWrite(
327: session,
328: new DefaultWriteRequest(
329: context,
330: ((DefaultHttpServiceContext) context)
331: .getWriteFuture()));
332: }
333: });
334:
335: nextFilter.sessionOpened(session);
336: }
337:
338: @Override
339: public void messageReceived(NextFilter nextFilter,
340: IoSession session, Object message) throws Exception {
341: HttpServiceContext context = (HttpServiceContext) message;
342: if (pipeline.addRequest(context)) {
343: LOG.debug("Allocated slot in request pipeline");
344: nextFilter.messageReceived(session, message);
345: } else {
346: // The client has filled their pipeline. Currently, this
347: // triggers closure. Another option would be to drop read interest
348: // until we drain.
349: LOG
350: .warn("Could not allocate room in the pipeline for request");
351: handleReadFailure(context,
352: HttpResponseStatus.SERVICE_UNAVAILABLE,
353: "Pipeline full");
354: }
355: }
356:
357: @Override
358: public void filterWrite(NextFilter nextFilter,
359: IoSession session, WriteRequest writeRequest)
360: throws Exception {
361: DefaultHttpServiceContext context = (DefaultHttpServiceContext) writeRequest
362: .getMessage();
363: context.setWriteFuture(writeRequest.getFuture());
364: pipeline.releaseResponse(context);
365: // nextFilter will be invoked when pipeline listener is notified.
366: }
367: }
368:
369: private class DefaultHttpServiceContext extends
370: AbstractHttpServiceContext {
371: private WriteFuture writeFuture;
372:
373: private DefaultHttpServiceContext(HttpRequest request) {
374: super ((InetSocketAddress) session.getRemoteAddress(),
375: request, container);
376: }
377:
378: private WriteFuture getWriteFuture() {
379: return writeFuture;
380: }
381:
382: private void setWriteFuture(WriteFuture writeFuture) {
383: if (!isResponseCommitted()) {
384: throw new IllegalStateException();
385: }
386: this .writeFuture = writeFuture;
387: }
388:
389: @Override
390: protected void doWrite(boolean requiresClosure) {
391: currentContext = null;
392: WriteFuture future = session.write(this );
393: if (requiresClosure) {
394: LOG.debug("Added CLOSE future listener.");
395: future.addListener(IoFutureListener.CLOSE);
396: }
397: }
398: }
399: }
|