001: /*
002: Copyright (C) 2002-2005 MySQL AB
003:
004: This program is free software; you can redistribute it and/or modify
005: it under the terms of version 2 of the GNU General Public License as
006: published by the Free Software Foundation.
007:
008: There are special exceptions to the terms and conditions of the GPL
009: as it is applied to this software. View the full text of the
010: exception in file EXCEPTIONS-CONNECTOR-J in the directory of this
011: software distribution.
012:
013: This program is distributed in the hope that it will be useful,
014: but WITHOUT ANY WARRANTY; without even the implied warranty of
015: MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
016: GNU General Public License for more details.
017:
018: You should have received a copy of the GNU General Public License
019: along with this program; if not, write to the Free Software
020: Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
021:
022:
023: */
024:
025: package com.mysql.jdbc.util;
026:
027: import java.io.IOException;
028: import java.io.InputStream;
029:
030: import com.mysql.jdbc.log.Log;
031:
032: /**
033: * A non-blocking buffered input stream. Reads more if it can, won't block to
034: * fill the buffer, only blocks to satisfy a request of read(byte[])
035: *
036: * @author Mark Matthews
037: *
038: * @version $Id: ReadAheadInputStream.java,v 1.1.2.1 2005/05/13 18:58:39
039: * mmatthews Exp $
040: */
041: public class ReadAheadInputStream extends InputStream {
042:
043: private final static int DEFAULT_BUFFER_SIZE = 4096;
044:
045: private InputStream underlyingStream;
046:
047: private byte buf[];
048:
049: protected int endOfCurrentData;
050:
051: protected int currentPosition;
052:
053: protected boolean doDebug = false;
054:
055: protected Log log;
056:
057: private void fill(int readAtLeastTheseManyBytes) throws IOException {
058: checkClosed();
059:
060: this .currentPosition = 0; /* no mark: throw away the buffer */
061:
062: this .endOfCurrentData = currentPosition;
063:
064: // Read at least as many bytes as the caller wants, but don't
065: // block to fill the whole buffer (like java.io.BufferdInputStream
066: // does)
067:
068: int bytesToRead = Math.min(this .buf.length - currentPosition,
069: readAtLeastTheseManyBytes);
070:
071: int bytesAvailable = this .underlyingStream.available();
072:
073: if (bytesAvailable > bytesToRead) {
074:
075: // Great, there's more available, let's grab those
076: // bytes too! (read-ahead)
077:
078: bytesToRead = Math.min(this .buf.length - currentPosition,
079: bytesAvailable);
080: }
081:
082: if (this .doDebug) {
083: StringBuffer debugBuf = new StringBuffer();
084: debugBuf.append(" ReadAheadInputStream.fill(");
085: debugBuf.append(readAtLeastTheseManyBytes);
086: debugBuf.append("), buffer_size=");
087: debugBuf.append(this .buf.length);
088: debugBuf.append(", current_position=");
089: debugBuf.append(currentPosition);
090: debugBuf.append(", need to read ");
091: debugBuf.append(Math.min(this .buf.length - currentPosition,
092: readAtLeastTheseManyBytes));
093: debugBuf.append(" bytes to fill request,");
094:
095: if (bytesAvailable > 0) {
096: debugBuf.append(" underlying InputStream reports ");
097: debugBuf.append(bytesAvailable);
098:
099: debugBuf.append(" total bytes available,");
100: }
101:
102: debugBuf.append(" attempting to read ");
103: debugBuf.append(bytesToRead);
104: debugBuf.append(" bytes.");
105:
106: if (this .log != null) {
107: this .log.logTrace(debugBuf.toString());
108: } else {
109: System.err.println(debugBuf.toString());
110: }
111: }
112:
113: int n = this .underlyingStream.read(this .buf, currentPosition,
114: bytesToRead);
115:
116: if (n > 0) {
117: endOfCurrentData = n + currentPosition;
118: }
119: }
120:
121: private int readFromUnderlyingStreamIfNecessary(byte[] b, int off,
122: int len) throws IOException {
123: checkClosed();
124:
125: int avail = endOfCurrentData - currentPosition;
126:
127: if (this .doDebug) {
128: StringBuffer debugBuf = new StringBuffer();
129: debugBuf.append("ReadAheadInputStream.readIfNecessary(");
130: debugBuf.append(b);
131: debugBuf.append(",");
132: debugBuf.append(off);
133: debugBuf.append(",");
134: debugBuf.append(len);
135: debugBuf.append(")");
136:
137: if (avail <= 0) {
138: debugBuf
139: .append(" not all data available in buffer, must read from stream");
140:
141: if (len >= this .buf.length) {
142: debugBuf
143: .append(", amount requested > buffer, returning direct read() from stream");
144: }
145: }
146:
147: if (this .log != null) {
148: this .log.logTrace(debugBuf.toString());
149: } else {
150: System.err.println(debugBuf.toString());
151: }
152: }
153:
154: if (avail <= 0) {
155:
156: if (len >= this .buf.length) {
157: return this .underlyingStream.read(b, off, len);
158: }
159:
160: fill(len);
161:
162: avail = endOfCurrentData - currentPosition;
163:
164: if (avail <= 0)
165: return -1;
166: }
167:
168: int bytesActuallyRead = (avail < len) ? avail : len;
169:
170: System.arraycopy(this .buf, currentPosition, b, off,
171: bytesActuallyRead);
172:
173: this .currentPosition += bytesActuallyRead;
174:
175: return bytesActuallyRead;
176: }
177:
178: public synchronized int read(byte b[], int off, int len)
179: throws IOException {
180: checkClosed(); // Check for closed stream
181: if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
182: throw new IndexOutOfBoundsException();
183: } else if (len == 0) {
184: return 0;
185: }
186:
187: int totalBytesRead = 0;
188:
189: while (true) {
190: int bytesReadThisRound = readFromUnderlyingStreamIfNecessary(
191: b, off + totalBytesRead, len - totalBytesRead);
192:
193: // end-of-stream?
194: if (bytesReadThisRound <= 0) {
195: if (totalBytesRead == 0) {
196: totalBytesRead = bytesReadThisRound;
197: }
198:
199: break;
200: }
201:
202: totalBytesRead += bytesReadThisRound;
203:
204: // Read _at_least_ enough bytes
205: if (totalBytesRead >= len) {
206: break;
207: }
208:
209: // Nothing to read?
210: if (this .underlyingStream.available() <= 0) {
211: break;
212: }
213: }
214:
215: return totalBytesRead;
216: }
217:
218: public int read() throws IOException {
219: checkClosed();
220:
221: if (currentPosition >= endOfCurrentData) {
222: fill(1);
223: if (currentPosition >= endOfCurrentData)
224: return -1;
225: }
226:
227: return this .buf[currentPosition++] & 0xff;
228: }
229:
230: public int available() throws IOException {
231: checkClosed();
232:
233: return this .underlyingStream.available()
234: + (this .endOfCurrentData - this .currentPosition);
235: }
236:
237: private void checkClosed() throws IOException {
238:
239: if (this .buf == null) {
240: throw new IOException("Stream closed");
241: }
242: }
243:
244: /**
245: *
246: */
247: public ReadAheadInputStream(InputStream toBuffer, boolean debug,
248: Log logTo) {
249: this (toBuffer, DEFAULT_BUFFER_SIZE, debug, logTo);
250: }
251:
252: public ReadAheadInputStream(InputStream toBuffer, int bufferSize,
253: boolean debug, Log logTo) {
254: this .underlyingStream = toBuffer;
255: this .buf = new byte[bufferSize];
256: this .doDebug = debug;
257: this .log = logTo;
258: }
259:
260: /*
261: * (non-Javadoc)
262: *
263: * @see java.io.Closeable#close()
264: */
265: public void close() throws IOException {
266: if (this .underlyingStream != null) {
267: try {
268: this .underlyingStream.close();
269: } finally {
270: this .underlyingStream = null;
271: this .buf = null;
272: this .log = null;
273: }
274: }
275: }
276:
277: /*
278: * (non-Javadoc)
279: *
280: * @see java.io.InputStream#markSupported()
281: */
282: public boolean markSupported() {
283: return false;
284: }
285:
286: /*
287: * (non-Javadoc)
288: *
289: * @see java.io.InputStream#skip(long)
290: */
291: public long skip(long n) throws IOException {
292: checkClosed();
293: if (n <= 0) {
294: return 0;
295: }
296:
297: long bytesAvailInBuffer = this .endOfCurrentData
298: - this .currentPosition;
299:
300: if (bytesAvailInBuffer <= 0) {
301:
302: fill((int) n);
303: bytesAvailInBuffer = this .endOfCurrentData
304: - this .currentPosition;
305: if (bytesAvailInBuffer <= 0)
306: return 0;
307: }
308:
309: long bytesSkipped = (bytesAvailInBuffer < n) ? bytesAvailInBuffer
310: : n;
311: this.currentPosition += bytesSkipped;
312: return bytesSkipped;
313: }
314: }
|