001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: *
019: */
020: package org.apache.mina.handler.stream;
021:
022: import java.io.IOException;
023: import java.io.InputStream;
024: import java.io.OutputStream;
025: import java.net.SocketTimeoutException;
026:
027: import org.apache.mina.common.AttributeKey;
028: import org.apache.mina.common.IdleStatus;
029: import org.apache.mina.common.IoBuffer;
030: import org.apache.mina.common.IoHandler;
031: import org.apache.mina.common.IoHandlerAdapter;
032: import org.apache.mina.common.IoSession;
033: import org.slf4j.Logger;
034: import org.slf4j.LoggerFactory;
035:
036: /**
037: * A {@link IoHandler} that adapts asynchronous MINA events to stream I/O.
038: * <p>
039: * Please extend this class and implement
040: * {@link #processStreamIo(IoSession, InputStream, OutputStream)} to
041: * execute your stream I/O logic; <b>please note that you must forward
042: * the process request to other thread or thread pool.</b>
043: *
044: * @author The Apache MINA Project (dev@mina.apache.org)
045: * @version $Rev: 616100 $, $Date: 2008-01-28 15:58:32 -0700 (Mon, 28 Jan 2008) $
046: */
047: public abstract class StreamIoHandler extends IoHandlerAdapter {
048: private final Logger logger = LoggerFactory.getLogger(getClass());
049:
050: private static final AttributeKey KEY_IN = new AttributeKey(
051: StreamIoHandler.class, "in");
052: private static final AttributeKey KEY_OUT = new AttributeKey(
053: StreamIoHandler.class, "out");
054:
055: private int readTimeout;
056:
057: private int writeTimeout;
058:
059: protected StreamIoHandler() {
060: }
061:
062: /**
063: * Implement this method to execute your stream I/O logic;
064: * <b>please note that you must forward the process request to other
065: * thread or thread pool.</b>
066: */
067: protected abstract void processStreamIo(IoSession session,
068: InputStream in, OutputStream out);
069:
070: /**
071: * Returns read timeout in seconds.
072: * The default value is <tt>0</tt> (disabled).
073: */
074: public int getReadTimeout() {
075: return readTimeout;
076: }
077:
078: /**
079: * Sets read timeout in seconds.
080: * The default value is <tt>0</tt> (disabled).
081: */
082: public void setReadTimeout(int readTimeout) {
083: this .readTimeout = readTimeout;
084: }
085:
086: /**
087: * Returns write timeout in seconds.
088: * The default value is <tt>0</tt> (disabled).
089: */
090: public int getWriteTimeout() {
091: return writeTimeout;
092: }
093:
094: /**
095: * Sets write timeout in seconds.
096: * The default value is <tt>0</tt> (disabled).
097: */
098: public void setWriteTimeout(int writeTimeout) {
099: this .writeTimeout = writeTimeout;
100: }
101:
102: /**
103: * Initializes streams and timeout settings.
104: */
105: @Override
106: public void sessionOpened(IoSession session) {
107: // Set timeouts
108: session.getConfig().setWriteTimeout(writeTimeout);
109: session.getConfig().setIdleTime(IdleStatus.READER_IDLE,
110: readTimeout);
111:
112: // Create streams
113: InputStream in = new IoSessionInputStream();
114: OutputStream out = new IoSessionOutputStream(session);
115: session.setAttribute(KEY_IN, in);
116: session.setAttribute(KEY_OUT, out);
117: processStreamIo(session, in, out);
118: }
119:
120: /**
121: * Closes streams
122: */
123: @Override
124: public void sessionClosed(IoSession session) throws Exception {
125: final InputStream in = (InputStream) session
126: .getAttribute(KEY_IN);
127: final OutputStream out = (OutputStream) session
128: .getAttribute(KEY_OUT);
129: try {
130: in.close();
131: } finally {
132: out.close();
133: }
134: }
135:
136: /**
137: * Forwards read data to input stream.
138: */
139: @Override
140: public void messageReceived(IoSession session, Object buf) {
141: final IoSessionInputStream in = (IoSessionInputStream) session
142: .getAttribute(KEY_IN);
143: in.write((IoBuffer) buf);
144: }
145:
146: /**
147: * Forwards caught exceptions to input stream.
148: */
149: @Override
150: public void exceptionCaught(IoSession session, Throwable cause) {
151: final IoSessionInputStream in = (IoSessionInputStream) session
152: .getAttribute(KEY_IN);
153:
154: IOException e = null;
155: if (cause instanceof StreamIoException) {
156: e = (IOException) cause.getCause();
157: } else if (cause instanceof IOException) {
158: e = (IOException) cause;
159: }
160:
161: if (e != null && in != null) {
162: in.throwException(e);
163: } else {
164: logger.warn("Unexpected exception.", cause);
165: session.close();
166: }
167: }
168:
169: /**
170: * Handles read timeout.
171: */
172: @Override
173: public void sessionIdle(IoSession session, IdleStatus status) {
174: if (status == IdleStatus.READER_IDLE) {
175: throw new StreamIoException(new SocketTimeoutException(
176: "Read timeout"));
177: }
178: }
179:
180: private static class StreamIoException extends RuntimeException {
181: private static final long serialVersionUID = 3976736960742503222L;
182:
183: public StreamIoException(IOException cause) {
184: super(cause);
185: }
186: }
187: }
|