001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: *
019: */
020: package org.apache.mina.handler.stream;
021:
022: import java.io.IOException;
023: import java.io.InputStream;
024:
025: import org.apache.mina.common.IoBuffer;
026: import org.apache.mina.common.IoHandler;
027: import org.apache.mina.common.IoSession;
028:
029: /**
030: * An {@link InputStream} that buffers data read from
031: * {@link IoHandler#messageReceived(IoSession, Object)} events.
032: *
033: * @author The Apache MINA Project (dev@mina.apache.org)
034: * @version $Rev: 581234 $, $Date: 2007-10-02 07:39:48 -0600 (Tue, 02 Oct 2007) $
035: */
036: class IoSessionInputStream extends InputStream {
037: private final Object mutex = new Object();
038:
039: private final IoBuffer buf;
040:
041: private volatile boolean closed;
042:
043: private volatile boolean released;
044:
045: private IOException exception;
046:
047: public IoSessionInputStream() {
048: buf = IoBuffer.allocate(16);
049: buf.setAutoExpand(true);
050: buf.limit(0);
051: }
052:
053: @Override
054: public int available() {
055: if (released) {
056: return 0;
057: } else {
058: synchronized (mutex) {
059: return buf.remaining();
060: }
061: }
062: }
063:
064: @Override
065: public void close() {
066: if (closed) {
067: return;
068: }
069:
070: synchronized (mutex) {
071: closed = true;
072: releaseBuffer();
073:
074: mutex.notifyAll();
075: }
076: }
077:
078: @Override
079: public int read() throws IOException {
080: synchronized (mutex) {
081: if (!waitForData()) {
082: return -1;
083: }
084:
085: return buf.get() & 0xff;
086: }
087: }
088:
089: @Override
090: public int read(byte[] b, int off, int len) throws IOException {
091: synchronized (mutex) {
092: if (!waitForData()) {
093: return -1;
094: }
095:
096: int readBytes;
097:
098: if (len > buf.remaining()) {
099: readBytes = buf.remaining();
100: } else {
101: readBytes = len;
102: }
103:
104: buf.get(b, off, readBytes);
105:
106: return readBytes;
107: }
108: }
109:
110: private boolean waitForData() throws IOException {
111: if (released) {
112: return false;
113: }
114:
115: synchronized (mutex) {
116: while (!released && buf.remaining() == 0
117: && exception == null) {
118: try {
119: mutex.wait();
120: } catch (InterruptedException e) {
121: IOException ioe = new IOException(
122: "Interrupted while waiting for more data");
123: ioe.initCause(e);
124: throw ioe;
125: }
126: }
127: }
128:
129: if (exception != null) {
130: releaseBuffer();
131: throw exception;
132: }
133:
134: if (closed && buf.remaining() == 0) {
135: releaseBuffer();
136:
137: return false;
138: }
139:
140: return true;
141: }
142:
143: private void releaseBuffer() {
144: if (released) {
145: return;
146: }
147:
148: released = true;
149: }
150:
151: public void write(IoBuffer src) {
152: synchronized (mutex) {
153: if (closed) {
154: return;
155: }
156:
157: if (buf.hasRemaining()) {
158: this .buf.compact();
159: this .buf.put(src);
160: this .buf.flip();
161: } else {
162: this .buf.clear();
163: this .buf.put(src);
164: this .buf.flip();
165: mutex.notifyAll();
166: }
167: }
168: }
169:
170: public void throwException(IOException e) {
171: synchronized (mutex) {
172: if (exception == null) {
173: exception = e;
174:
175: mutex.notifyAll();
176: }
177: }
178: }
179: }
|