001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright notice. All rights reserved.
003: */
004: package com.tc.io;
005:
006: import com.tc.bytes.TCByteBuffer;
007:
008: import java.io.DataInputStream;
009: import java.io.EOFException;
010: import java.io.IOException;
011: import java.io.InputStream;
012: import java.util.ArrayList;
013: import java.util.List;
014:
015: public class TCByteBufferInputStream extends InputStream implements
016: TCDataInput, TCByteBufferInput {
017: private static final int EOF = -1;
018: private static final TCByteBuffer[] EMPTY_BYTE_BUFFER_ARRAY = new TCByteBuffer[] {};
019:
020: private TCByteBuffer[] data;
021: private int totalLength;
022: private int numBufs;
023: private boolean closed = false;
024: private int position = 0;
025: private int index = 0;
026: private Mark mark = null;
027:
028: private TCByteBufferInputStream(TCByteBuffer[] sourceData,
029: int dupeLength, int sourceIndex) {
030: this (sourceData, dupeLength, sourceIndex, true);
031: }
032:
033: public TCByteBufferInputStream(TCByteBuffer data) {
034: this (new TCByteBuffer[] { data });
035: }
036:
037: public TCByteBufferInputStream(TCByteBuffer[] data) {
038: if (data == null) {
039: throw new NullPointerException();
040: }
041:
042: long length = 0;
043:
044: this .data = new TCByteBuffer[data.length];
045:
046: for (int i = 0, n = data.length; i < n; i++) {
047: TCByteBuffer buf = data[i];
048: if (buf == null) {
049: throw new NullPointerException("null buffer at index "
050: + i);
051: }
052:
053: this .data[i] = buf.duplicate().rewind();
054: length += buf.limit();
055: }
056:
057: if (length > Integer.MAX_VALUE) {
058: throw new IllegalArgumentException("too much data: "
059: + length);
060: }
061:
062: this .numBufs = this .data.length;
063: this .totalLength = (int) length;
064: }
065:
066: private TCByteBufferInputStream(TCByteBuffer[] sourceData,
067: int dupeLength, int sourceIndex, boolean duplicate) {
068: // skipping checks. Invariants should hold since this is a private cstr()
069:
070: if (duplicate) {
071: this .data = new TCByteBuffer[sourceData.length
072: - sourceIndex];
073: } else {
074: this .data = sourceData;
075: }
076:
077: this .numBufs = this .data.length;
078:
079: if (duplicate) {
080: for (int i = 0, n = this .data.length; i < n; i++) {
081: this .data[i] = sourceData[sourceIndex + i].duplicate();
082: }
083: }
084:
085: this .totalLength = dupeLength;
086: this .position = 0;
087: this .index = 0;
088: }
089:
090: /**
091: * Duplicate this stream. The resulting stream will share data with the source stream (ie. no copying), but the two
092: * streams will have independent read positions. The read position of the result stream will initially be the same as
093: * the source stream
094: */
095: public TCByteBufferInput duplicate() {
096: checkClosed();
097: return new TCByteBufferInputStream(data, available(), index);
098: }
099:
100: /**
101: * Effectively the same thing as calling duplicate().limit(int), but potentially creating far less garbage (depending
102: * on the size difference between the original stream and the slice you want)
103: */
104: public TCByteBufferInput duplicateAndLimit(final int limit) {
105: checkClosed();
106:
107: if (limit > available()) {
108: throw new IllegalArgumentException(
109: "Not enough data left in stream: " + limit + " > "
110: + available());
111: }
112:
113: if (limit == 0) {
114: return new TCByteBufferInputStream(EMPTY_BYTE_BUFFER_ARRAY);
115: }
116:
117: int numBytesNeeded = limit;
118: int dataIndex = this .index;
119: int lastLimit = -1;
120: while (numBytesNeeded > 0) {
121: TCByteBuffer buf = this .data[dataIndex];
122: int numFromThisBuffer = Math.min(numBytesNeeded, buf
123: .remaining());
124: lastLimit = buf.position() + numFromThisBuffer;
125: numBytesNeeded -= numFromThisBuffer;
126: if (numBytesNeeded > 0) {
127: dataIndex++;
128: }
129: }
130:
131: int size = (dataIndex - this .index) + 1;
132: TCByteBuffer[] limitedData = new TCByteBuffer[size];
133: for (int i = 0, n = limitedData.length; i < n; i++) {
134: limitedData[i] = this .data[this .index + i].duplicate();
135: }
136:
137: limitedData[limitedData.length - 1].limit(lastLimit);
138:
139: return new TCByteBufferInputStream(limitedData, limit, 0, false);
140: }
141:
142: public TCByteBuffer[] toArray() {
143: checkClosed();
144:
145: if (available() == 0) {
146: return EMPTY_BYTE_BUFFER_ARRAY;
147: }
148:
149: TCByteBuffer[] rv = new TCByteBuffer[numBufs - index];
150: rv[0] = data[index].slice();
151: for (int i = 1, n = rv.length; i < n; i++) {
152: rv[i] = data[index + i].duplicate();
153: }
154:
155: return rv;
156: }
157:
158: /**
159: * Artificially limit the length of this input stream starting at the current read position. This operation is
160: * destructive to the stream contents (ie. data trimmed off by setting limit can never be read with this stream).
161: */
162: public TCDataInput limit(int limit) {
163: checkClosed();
164:
165: if (available() < limit) {
166: throw new IllegalArgumentException(
167: "Not enough data left in stream: " + limit + " > "
168: + available());
169: }
170:
171: List newData = new ArrayList();
172: int num = limit;
173: while (num > 0) {
174: TCByteBuffer current = data[index];
175: int avail = current.remaining();
176: int take = Math.min(avail, num);
177: if (take > 0) {
178: newData.add(current.slice().limit(take));
179: num -= take;
180: }
181: nextBuffer();
182: }
183:
184: this .data = new TCByteBuffer[newData.size()];
185: this .data = (TCByteBuffer[]) newData.toArray(this .data);
186: this .numBufs = this .data.length;
187: this .totalLength = limit;
188: this .position = 0;
189: this .index = 0;
190:
191: return this ;
192: }
193:
194: public int getTotalLength() {
195: return totalLength;
196: }
197:
198: public int available() {
199: return totalLength - position;
200: }
201:
202: public void close() {
203: if (!closed) {
204: closed = true;
205: this .data = null;
206: }
207: }
208:
209: public void mark(int readlimit) {
210: throw new UnsupportedOperationException();
211: }
212:
213: // XXX: This is a TC special version of mark() to be used in conjunction with tcReset()...We should eventually
214: // implement the general purpose mark(int) method as specified by InputStream. NOTE: It has some unusual semantics
215: // that make it a little trickier to implement (in our case) than you might think (specifially the readLimit field)
216: public void mark() {
217: checkClosed();
218: mark = new Mark(index, data[index].position(), position);
219: }
220:
221: public boolean markSupported() {
222: return false;
223: }
224:
225: public final int read(byte[] b, int off, int len) {
226: checkClosed();
227:
228: if (b == null) {
229: throw new NullPointerException();
230: }
231: if ((off < 0) || (off > b.length) || (len < 0)
232: || ((off + len) > b.length) || ((off + len) < 0)) {
233: throw new IndexOutOfBoundsException();
234: }
235: if (len == 0) {
236: return 0;
237: }
238:
239: if (available() == 0) {
240: return EOF;
241: }
242:
243: int bytesRead = 0;
244: int numToRead = Math.min(available(), len);
245:
246: while (index < numBufs) {
247: TCByteBuffer buf = data[index];
248: if (buf.hasRemaining()) {
249: int read = Math.min(buf.remaining(), numToRead);
250: buf.get(b, off, read);
251: off += read;
252: position += read;
253: bytesRead += read;
254: numToRead -= read;
255: if (numToRead == 0)
256: break;
257: }
258: nextBuffer();
259: }
260:
261: return bytesRead;
262: }
263:
264: public final int read(byte[] b) {
265: return read(b, 0, b.length);
266: }
267:
268: public final int read() {
269: checkClosed();
270:
271: while (index < numBufs) {
272: if (this .data[index].hasRemaining()) {
273: position++;
274: return this .data[index].get() & 0xFF;
275: }
276: nextBuffer();
277: }
278: return EOF;
279: }
280:
281: private void nextBuffer() {
282: if (mark == null) {
283: this .data[index] = null;
284: }
285: index++;
286: }
287:
288: public void reset() {
289: throw new UnsupportedOperationException();
290: }
291:
292: /**
293: * Reset this input stream to the position recorded by the last call to mark(). This method discards the previous
294: * value of the mark
295: *
296: * @throws IOException if mark() has never been called on this stream
297: */
298: public void tcReset() {
299: checkClosed();
300: if (mark == null) {
301: throw new IllegalStateException("no mark set");
302: }
303:
304: int rewindToIndex = mark.getBufferIndex();
305: while (index > rewindToIndex) {
306: data[index].position(0);
307: index--;
308: }
309:
310: index = rewindToIndex;
311: data[rewindToIndex].position(mark.getBufferPosition());
312: position = mark.getStreamPosition();
313: mark = null;
314: }
315:
316: public long skip(long skip) {
317: checkClosed();
318:
319: if (skip > Integer.MAX_VALUE) {
320: throw new IllegalArgumentException("skip value too large: "
321: + skip);
322: }
323:
324: if ((skip <= 0) || (available() == 0)) {
325: return 0;
326: } // per java.io.InputStream.skip() javadoc
327:
328: int numToSkip = Math.min(available(), (int) skip);
329:
330: int bytesSkipped = 0;
331: while (index < numBufs) {
332: TCByteBuffer buf = data[index];
333: int remaining = buf.remaining();
334: if (remaining > 0) {
335: int numToRead = Math.min(remaining, numToSkip);
336: buf.position(buf.position() + numToRead);
337: position += numToRead;
338: bytesSkipped += numToRead;
339: numToSkip -= numToRead;
340: if (numToSkip == 0)
341: break;
342: }
343: nextBuffer();
344: }
345:
346: return bytesSkipped;
347: }
348:
349: private void checkClosed() {
350: if (closed) {
351: throw new IllegalStateException("stream is closed");
352: }
353: }
354:
355: private static class Mark {
356: private final int position;
357: private final int bufferIndex;
358: private final int streamPosition;
359:
360: Mark(int bufferIndex, int bufferPosition, int streamPosition) {
361: this .bufferIndex = bufferIndex;
362: this .position = bufferPosition;
363: this .streamPosition = streamPosition;
364: }
365:
366: int getBufferIndex() {
367: return bufferIndex;
368: }
369:
370: int getBufferPosition() {
371: return position;
372: }
373:
374: int getStreamPosition() {
375: return streamPosition;
376: }
377: }
378:
379: public final int readInt() throws IOException {
380: int byte1 = read();
381: int byte2 = read();
382: int byte3 = read();
383: int byte4 = read();
384: if ((byte1 | byte2 | byte3 | byte4) < 0)
385: throw new EOFException();
386: return ((byte1 << 24) + (byte2 << 16) + (byte3 << 8) + (byte4 << 0));
387: }
388:
389: public final byte readByte() throws IOException {
390: int b = read();
391: if (b < 0)
392: throw new EOFException();
393: return (byte) (b);
394: }
395:
396: public final boolean readBoolean() throws IOException {
397: int b = read();
398: if (b < 0)
399: throw new EOFException();
400: return (b != 0);
401: }
402:
403: public final char readChar() throws IOException {
404: int byte1 = read();
405: int byte2 = read();
406: if ((byte1 | byte2) < 0)
407: throw new EOFException();
408: return (char) ((byte1 << 8) + (byte2 << 0));
409: }
410:
411: public final double readDouble() throws IOException {
412: return Double.longBitsToDouble(readLong());
413: }
414:
415: public final long readLong() throws IOException {
416: int byte1 = read();
417: int byte2 = read();
418: int byte3 = read();
419: int byte4 = read();
420: int byte5 = read();
421: int byte6 = read();
422: int byte7 = read();
423: int byte8 = read();
424:
425: if ((byte1 | byte2 | byte3 | byte4 | byte5 | byte6 | byte7 | byte8) < 0)
426: throw new EOFException();
427:
428: return (((long) byte1 << 56) + ((long) (byte2 & 255) << 48)
429: + ((long) (byte3 & 255) << 40)
430: + ((long) (byte4 & 255) << 32)
431: + ((long) (byte5 & 255) << 24) + ((byte6 & 255) << 16)
432: + ((byte7 & 255) << 8) + ((byte8 & 255) << 0));
433: }
434:
435: public final float readFloat() throws IOException {
436: return Float.intBitsToFloat(readInt());
437: }
438:
439: public final short readShort() throws IOException {
440: int byte1 = read();
441: int byte2 = read();
442: if ((byte1 | byte2) < 0)
443: throw new EOFException();
444: return (short) ((byte1 << 8) + (byte2 << 0));
445: }
446:
447: public final String readString() throws IOException {
448: boolean isNull = readBoolean();
449: if (isNull) {
450: return null;
451: }
452:
453: int utf = read();
454: if (utf < 0) {
455: throw new EOFException();
456: }
457:
458: switch (utf) {
459: case 0: {
460: return readStringFromChars();
461: }
462: case 1: {
463: return DataInputStream.readUTF(this );
464: }
465: default:
466: throw new AssertionError("utf = " + utf);
467: }
468:
469: // unreachable
470: }
471:
472: private String readStringFromChars() throws IOException {
473: int len = readInt();
474: char[] chars = new char[len];
475: for (int i = 0, n = chars.length; i < n; i++) {
476: chars[i] = readChar();
477: }
478: return new String(chars);
479: }
480:
481: public final void readFully(byte[] b) throws IOException {
482: readFully(b, 0, b.length);
483: }
484:
485: public final void readFully(byte[] b, int off, int len)
486: throws IOException {
487: if (len < 0)
488: throw new IndexOutOfBoundsException();
489: int n = 0;
490: while (n < len) {
491: int count = read(b, off + n, len - n);
492: if (count < 0)
493: throw new EOFException();
494: n += count;
495: }
496: }
497:
498: public final int skipBytes(int n) {
499: return (int) skip(n);
500: }
501:
502: public final int readUnsignedByte() throws IOException {
503: int b = read();
504: if (b < 0)
505: throw new EOFException();
506: return b;
507: }
508:
509: public final int readUnsignedShort() throws IOException {
510: int byte1 = read();
511: int byte2 = read();
512: if ((byte1 | byte2) < 0)
513: throw new EOFException();
514: return (byte1 << 8) + (byte2 << 0);
515: }
516:
517: public final String readLine() {
518: // Don't implement this method
519: throw new UnsupportedOperationException();
520: }
521:
522: public final String readUTF() {
523: // Don't implement this method --> use readString() instead
524: throw new UnsupportedOperationException();
525: }
526:
527: }
|