001: /*
002: * Copyright (c) 2001 by Matt Welsh and The Regents of the University of
003: * California. All rights reserved.
004: *
005: * Permission to use, copy, modify, and distribute this software and its
006: * documentation for any purpose, without fee, and without written agreement is
007: * hereby granted, provided that the above copyright notice and the following
008: * two paragraphs appear in all copies of this software.
009: *
010: * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR
011: * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
012: * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF
013: * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
014: *
015: * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
016: * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
017: * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
018: * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO
019: * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
020: *
021: * Author: Matt Welsh <mdw@cs.berkeley.edu>
022: *
023: */
024:
025: package seda.sandStorm.lib.util;
026:
027: import seda.nbio.*;
028: import java.util.*;
029: import java.io.*;
030: import java.net.*;
031:
032: /**
033: * This class allows you to treat a list of byte arrays as a single
034: * NonblockingInputStream. This is helpful for parsing data contained
035: * within network packets, where the payload for one ADU might be
036: * spread across multiple packets. This is a *nonblocking* interface;
037: * if you attempt to read data from it, and none is available, it will
038: * return immediately.
039: *
040: * @author Matt Welsh
041: */
042: public class MultiByteArrayInputStream extends NonblockingInputStream {
043:
044: private static final boolean DEBUG = false;
045: private static final int NUMARRAYS = 2;
046:
047: private boolean closed;
048: private int cur_offset;
049: private int cur_array;
050: private byte[] arrays[];
051: private int push_array;
052: private int mark_array, mark_offset;
053:
054: /**
055: * Create a MultiByteArrayInputStream with the given array of
056: * byte arrays.
057: */
058: public MultiByteArrayInputStream(byte barr[][]) {
059: arrays = new byte[barr.length + NUMARRAYS][];
060: System.arraycopy(barr, 0, arrays, 0, barr.length);
061: push_array = barr.length;
062: cur_offset = 0;
063: cur_array = 0;
064: mark_array = -1;
065: mark_offset = -1;
066: closed = false;
067: }
068:
069: /**
070: * Create an empty MultiByteArrayInputStream.
071: */
072: public MultiByteArrayInputStream() {
073: arrays = new byte[NUMARRAYS][];
074: push_array = 0;
075: cur_offset = 0;
076: cur_array = 0;
077: mark_array = -1;
078: mark_offset = -1;
079: closed = false;
080: }
081:
082: /**
083: * Add an array to this MultiByteArrayInputStream.
084: */
085: public synchronized void addArray(byte barr[]) {
086: arrays[push_array] = barr;
087: push_array++;
088: if (push_array == arrays.length)
089: expandArrays();
090: }
091:
092: // Expand arrays if too long
093: private void expandArrays() {
094: byte[] oldarr[] = arrays;
095: arrays = new byte[oldarr.length + NUMARRAYS][];
096: System.arraycopy(oldarr, 0, arrays, 0, oldarr.length);
097: }
098:
099: /**
100: * Read the next byte from this stream.
101: * Returns -1 if no data is available.
102: */
103: public synchronized int read() throws IOException {
104: if (DEBUG)
105: System.err.println("MBS: read() called");
106: if (closed)
107: throw new EOFException(
108: "MultiByteArrayInputStream is closed!");
109: if (cur_array == push_array) {
110: return -1;
111: } else {
112: if (DEBUG)
113: System.err.println("read: cur_array " + cur_array
114: + " num " + arrays.length + " cur_offset "
115: + cur_offset + " len "
116: + arrays[cur_array].length);
117:
118: int c = (int) (arrays[cur_array][cur_offset] & 0xff);
119: cur_offset++;
120: if (cur_offset == arrays[cur_array].length) {
121: cur_offset = 0;
122: cur_array++;
123: }
124: return c;
125: }
126: }
127:
128: /**
129: * Read the next byte from this stream.
130: * Returns -1 if no data is available.
131: */
132: public synchronized int nbRead() throws IOException {
133: if (DEBUG)
134: System.err.println("MBS: nbRead() called");
135: if (closed)
136: throw new EOFException(
137: "MultiByteArrayInputStream is closed!");
138: int c;
139: if (cur_array == push_array) {
140: return -1;
141: } else {
142: c = (int) (arrays[cur_array][cur_offset] & 0xff);
143: cur_offset++;
144: if (cur_offset == arrays[cur_array].length) {
145: cur_offset = 0;
146: cur_array++;
147: }
148: return c;
149: }
150: }
151:
152: /**
153: * Read data from this input stream into the given byte array starting
154: * at offset 0 for b.length bytes. Returns the actual number of bytes
155: * read; returns -1 if no data is available.
156: */
157: public synchronized int read(byte b[]) throws IOException {
158: if (DEBUG)
159: System.err.println("MBS: read(byte[]) called, size "
160: + b.length);
161: if (closed)
162: throw new EOFException(
163: "MultiByteArrayInputStream is closed!");
164: return read(b, 0, b.length);
165: }
166:
167: /**
168: * Read data from this input stream into the given byte array starting
169: * at offset 'off' for 'len' bytes. Returns the actual number of bytes
170: * read; returns -1 if no data is available.
171: */
172: public synchronized int read(byte b[], int off, int len)
173: throws IOException {
174: if (DEBUG)
175: System.err
176: .println("MBS: read(byte[], int, int) called, size "
177: + b.length
178: + ", off "
179: + off
180: + ", len "
181: + len);
182: if (closed)
183: throw new EOFException(
184: "MultiByteArrayInputStream is closed!");
185: int n = off;
186: int total = 0;
187: int last = Math.min(off + len, b.length);
188:
189: if (DEBUG)
190: System.err
191: .println("MBS: read(byte[], int, int): cur_array "
192: + cur_array + ", push_array " + push_array
193: + ", arrays.length " + arrays.length
194: + ", n " + n + ", last " + last);
195:
196: if (cur_array == push_array)
197: return -1;
198:
199: while ((cur_array < arrays.length) && (cur_array != push_array)
200: && (n < last)) {
201: if (DEBUG)
202: System.err
203: .println("MBS: read(byte[], int, int): cur_array "
204: + cur_array
205: + ", push_array "
206: + push_array
207: + ", arrays.length "
208: + arrays.length
209: + ", n "
210: + n
211: + ", last " + last);
212:
213: int num_left = arrays[cur_array].length - cur_offset;
214: int tocopy = Math.min(num_left, last - n);
215: System.arraycopy(arrays[cur_array], cur_offset, b, n,
216: tocopy);
217: total += tocopy;
218: n += tocopy;
219: cur_offset += tocopy;
220: if (cur_offset == arrays[cur_array].length) {
221: cur_offset = 0;
222: cur_array++;
223: }
224: }
225: return total;
226: }
227:
228: /**
229: * Skip n bytes in this stream; returns the number of bytes
230: * actually skipped (which may be less than the number requested).
231: */
232: public synchronized long skip(long n) throws IOException {
233: if (DEBUG)
234: System.err.println("MBS: skip() called, n=" + n);
235: if (closed)
236: throw new EOFException(
237: "MultiByteArrayInputStream is closed!");
238: int requested = Math.min((int) n, Integer.MAX_VALUE);
239: int totalskipped = 0;
240:
241: if (cur_array == push_array)
242: return 0;
243:
244: while ((cur_array < arrays.length) && (requested > 0)) {
245: int num_left = arrays[cur_array].length - cur_offset;
246: int toskip = Math.min(num_left, requested);
247: totalskipped += toskip;
248: requested -= toskip;
249: cur_offset = 0;
250: cur_array++;
251: }
252: return totalskipped;
253: }
254:
255: /**
256: * Return the number of bytes available for reading.
257: */
258: public synchronized int available() throws IOException {
259: if (closed)
260: throw new EOFException(
261: "MultiByteArrayInputStream is closed!");
262: if (cur_array == push_array)
263: return 0;
264: int num_left = arrays[cur_array].length - cur_offset;
265: for (int i = cur_array + 1; i < arrays.length; i++) {
266: if (arrays[i] == null)
267: break;
268: num_left += arrays[i].length;
269: }
270: if (DEBUG)
271: System.err.println("MBS: available() called, num_left="
272: + num_left);
273: return num_left;
274: }
275:
276: /**
277: * Close this stream.
278: */
279: public synchronized void close() throws IOException {
280: if (DEBUG)
281: System.err.println("MBS: close() called");
282: if (closed)
283: throw new EOFException(
284: "MultiByteArrayInputStream is closed!");
285: arrays = null; // Facilitate GC
286: closed = true;
287: }
288:
289: /**
290: * Returns true, since mark() and reset() are supported.
291: */
292: public boolean markSupported() {
293: return true;
294: }
295:
296: /**
297: * Returns the stream to the position of the previous mark().
298: */
299: public synchronized void reset() throws IOException {
300: if (DEBUG)
301: System.err.println("MBS: reset() called");
302: if (mark_array == -1)
303: throw new IOException(
304: "MultiByteArrayInputStream not marked!");
305: cur_array = mark_array;
306: cur_offset = mark_offset;
307: }
308:
309: /**
310: * Set the stream's mark to the current position.
311: * 'readlimit' is ignored, since there is no limit to how many
312: * bytes can be read before the mark is invalidated.
313: */
314: public synchronized void mark(int readlimit) {
315: if (DEBUG)
316: System.err.println("MBS: mark() called, readlimit="
317: + readlimit);
318: mark_array = cur_array;
319: mark_offset = cur_offset;
320: }
321:
322: /**
323: * Return the number of bytes registered.
324: */
325: public synchronized int numArrays() {
326: return push_array;
327: }
328:
329: /**
330: * Reset this input stream - clear all internal data and pointers to
331: * a fresh initialized state.
332: */
333: public synchronized void clear() {
334: arrays = new byte[NUMARRAYS][];
335: push_array = 0;
336: cur_offset = 0;
337: cur_array = 0;
338: mark_array = -1;
339: mark_offset = -1;
340: closed = false;
341: }
342:
343: }
|