001: /*
002: * JacORB - a free Java ORB
003: *
004: * Copyright (C) 1997-2004 Gerald Brose.
005: *
006: * This library is free software; you can redistribute it and/or
007: * modify it under the terms of the GNU Library General Public
008: * License as published by the Free Software Foundation; either
009: * version 2 of the License, or (at your option) any later version.
010: *
011: * This library is distributed in the hope that it will be useful,
012: * but WITHOUT ANY WARRANTY; without even the implied warranty of
013: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
014: * Library General Public License for more details.
015: *
016: * You should have received a copy of the GNU Library General Public
017: * License along with this library; if not, write to the Free
018: * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
019: */
020:
021: package org.jacorb.orb.iiop;
022:
023: import java.io.IOException;
024: import java.io.InputStream;
025:
026: /**
027: * @author Kevin Conner (Kevin.Conner@arjuna.com)
028: * @version $Id: IIOPLoopbackInputStream.java,v 1.1 2005/08/04 05:04:50 francisco Exp $
029: */
030: class IIOPLoopbackInputStream extends InputStream {
031: private static final int BUF_SIZE = 2048;
032:
033: private final byte[] buf = new byte[BUF_SIZE];
034:
035: private boolean connected;
036: private boolean closed;
037: private int writerIndex;
038: private int readerIndex;
039:
040: IIOPLoopbackInputStream() {
041: }
042:
043: IIOPLoopbackInputStream(final IIOPLoopbackOutputStream los)
044: throws IOException {
045: connect(los);
046: }
047:
048: synchronized void connect(final IIOPLoopbackOutputStream los)
049: throws IOException {
050: if (connected) {
051: throw new IOException("Alread connected");
052: }
053: connected = true;
054: los.connect(this );
055: }
056:
057: public synchronized int read() throws IOException {
058: checkConnect();
059: return internalRead();
060: }
061:
062: public synchronized int read(final byte[] b, final int off,
063: final int len) throws IOException {
064: checkConnect();
065: checkBuffer(b, off, len);
066:
067: int returnCount = 0;
068:
069: while (returnCount < len) {
070: final int val = internalRead();
071: if (val == -1) {
072: break;
073: }
074: b[off + returnCount] = (byte) val;
075: returnCount++;
076: }
077:
078: return (returnCount > 0 ? returnCount : -1);
079: }
080:
081: public synchronized int available() throws IOException {
082: checkConnect();
083:
084: if (writerIndex > readerIndex) {
085: return (writerIndex - readerIndex);
086: } else if (readerIndex > writerIndex) {
087: return BUF_SIZE + writerIndex - readerIndex;
088: } else {
089: checkClosed();
090: return 0;
091: }
092: }
093:
094: public synchronized void close() throws IOException {
095: checkConnect();
096: closed = true;
097: notifyAll();
098: }
099:
100: synchronized void writerClose() throws IOException {
101: checkConnect();
102: closed = true;
103: notifyAll();
104: }
105:
106: synchronized void writeIntoBuffer(final int b) throws IOException {
107: checkConnect();
108: internalWrite((byte) b);
109: }
110:
111: synchronized void writeIntoBuffer(final byte[] b, final int off,
112: final int len) throws IOException {
113: checkConnect();
114: checkBuffer(b, off, len);
115:
116: for (int count = 0; count < len; count++) {
117: internalWrite(b[off + count]);
118: }
119: }
120:
121: private int internalRead() {
122: while (bufferEmpty()) {
123: if (closed) {
124: return -1;
125: }
126: try {
127: wait();
128: } catch (final InterruptedException ie) {
129: } // Ignore
130: }
131:
132: final boolean shouldNotify = bufferFull();
133:
134: final int val = buf[readerIndex] & 0xff;
135: readerIndex = nextIndex(readerIndex);
136:
137: if (shouldNotify) {
138: notifyAll();
139: }
140:
141: return val;
142: }
143:
144: private void internalWrite(final byte b) throws IOException {
145: while (bufferFull()) {
146: checkClosed();
147: try {
148: wait();
149: } catch (final InterruptedException ie) {
150: } // Ignore
151: }
152:
153: checkClosed();
154:
155: final boolean shouldNotify = bufferEmpty();
156:
157: buf[writerIndex] = b;
158: writerIndex = nextIndex(writerIndex);
159:
160: if (shouldNotify) {
161: notifyAll();
162: }
163: }
164:
165: private void checkConnect() throws IOException {
166: if (!connected) {
167: throw new IOException(
168: "IIOPLoopbackInputStream not connected");
169: }
170: }
171:
172: private void checkClosed() throws IOException {
173: if (closed) {
174: throw new IOException("IIOPLoopbackInputStream closed");
175: }
176: }
177:
178: private void checkBuffer(final byte[] b, final int off,
179: final int len) {
180: if (b == null) {
181: throw new NullPointerException("Null buffer");
182: }
183:
184: final int bufLen = b.length;
185: if ((off < 0) || (off >= bufLen) || (len < 0) || (len > bufLen)
186: || (off + len > bufLen)) {
187: throw new IndexOutOfBoundsException("Invalid offset/length");
188: }
189: }
190:
191: private boolean bufferEmpty() {
192: return (readerIndex == writerIndex);
193: }
194:
195: private boolean bufferFull() {
196: return (readerIndex == nextIndex(writerIndex));
197: }
198:
199: private int nextIndex(final int index) {
200: final int nextIndex = index + 1;
201: return (nextIndex == BUF_SIZE ? 0 : nextIndex);
202: }
203: }
|