001: /*
002: * Copyright (C) 2004 Stephen Ostermiller
003: * http://ostermiller.org/contact.pl?regarding=Java+Utilities
004: *
005: * This program is free software; you can redistribute it and/or modify
006: * it under the terms of the GNU General Public License as published by
007: * the Free Software Foundation; either version 2 of the License, or
008: * (at your option) any later version.
009: *
010: * This program is distributed in the hope that it will be useful,
011: * but WITHOUT ANY WARRANTY; without even the implied warranty of
012: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
013: * GNU General Public License for more details.
014: *
015: * See COPYING.TXT for details.
016: */
017: package com.Ostermiller.util;
018:
019: import java.io.*;
020: import java.util.ArrayList;
021:
022: /**
023: * An input stream which reads sequentially from multiple sources.
024: * More information about this class is available from <a target="_top" href=
025: * "http://ostermiller.org/utils/">ostermiller.org</a>.
026: *
027: * @author Stephen Ostermiller http://ostermiller.org/contact.pl?regarding=Java+Utilities
028: * @since ostermillerutils 1.04.00
029: */
030: public class ConcatInputStream extends InputStream {
031:
032: /**
033: * Current index to inputStreamQueue
034: *
035: * @since ostermillerutils 1.04.01
036: */
037: private int inputStreamQueueIndex = 0;
038:
039: /**
040: * Queue of inputStreams that have yet to be read from.
041: *
042: * @since ostermillerutils 1.04.01
043: */
044: private ArrayList<InputStream> inputStreamQueue = new ArrayList<InputStream>();
045:
046: /**
047: * A cache of the current inputStream from the inputStreamQueue
048: * to avoid unneeded access to the queue which must
049: * be synchronized.
050: *
051: * @since ostermillerutils 1.04.01
052: */
053: private InputStream currentInputStream = null;
054:
055: /**
056: * true iff the client may add more inputStreams.
057: *
058: * @since ostermillerutils 1.04.01
059: */
060: private boolean doneAddingInputStreams = false;
061:
062: /**
063: * Causes the addInputStream method to throw IllegalStateException
064: * and read() methods to return -1 (end of stream)
065: * when there is no more available data.
066: * <p>
067: * Calling this method when this class is no longer accepting
068: * more inputStreams has no effect.
069: *
070: * @since ostermillerutils 1.04.01
071: */
072: public void lastInputStreamAdded() {
073: doneAddingInputStreams = true;
074: }
075:
076: /**
077: * Add the given inputStream to the queue of inputStreams from which to
078: * concatenate data.
079: *
080: * @param in InputStream to add to the concatenation.
081: * @throws IllegalStateException if more inputStreams can't be added because lastInputStreamAdded() has been called, close() has been called, or a constructor with inputStream parameters was used.
082: *
083: * @since ostermillerutils 1.04.01
084: */
085: public void addInputStream(InputStream in) {
086: synchronized (inputStreamQueue) {
087: if (in == null)
088: throw new NullPointerException();
089: if (closed)
090: throw new IllegalStateException(
091: "ConcatInputStream has been closed");
092: if (doneAddingInputStreams)
093: throw new IllegalStateException(
094: "Cannot add more inputStreams - the last inputStream has already been added.");
095: inputStreamQueue.add(in);
096: }
097: }
098:
099: /**
100: * Add the given inputStream to the queue of inputStreams from which to
101: * concatenate data.
102: *
103: * @param in InputStream to add to the concatenation.
104: * @throws IllegalStateException if more inputStreams can't be added because lastInputStreamAdded() has been called, close() has been called, or a constructor with inputStream parameters was used.
105: * @throws NullPointerException the array of inputStreams, or any of the contents is null.
106: *
107: * @since ostermillerutils 1.04.01
108: */
109: public void addInputStreams(InputStream[] in) {
110: for (InputStream element : in) {
111: addInputStream(element);
112: }
113: }
114:
115: /**
116: * Gets the current inputStream, looking at the next
117: * one in the list if the current one is null.
118: *
119: * @since ostermillerutils 1.04.01
120: */
121: private InputStream getCurrentInputStream() {
122: if (currentInputStream == null
123: && inputStreamQueueIndex < inputStreamQueue.size()) {
124: synchronized (inputStreamQueue) {
125: // inputStream queue index is advanced only by the nextInputStream()
126: // method. Don't do it here.
127: currentInputStream = inputStreamQueue
128: .get(inputStreamQueueIndex);
129: }
130: }
131: return currentInputStream;
132: }
133:
134: /**
135: * Indicate that we are done with the current inputStream and we should
136: * advance to the next inputStream.
137: *
138: * @since ostermillerutils 1.04.01
139: */
140: private void advanceToNextInputStream() {
141: currentInputStream = null;
142: inputStreamQueueIndex++;
143: }
144:
145: /**
146: * True iff this the close() method has been called on this stream.
147: *
148: * @since ostermillerutils 1.04.00
149: */
150: private boolean closed = false;
151:
152: /**
153: * Create a new input stream that can dynamically accept new sources.
154: * <p>
155: * New sources should be added using the addInputStream() method.
156: * When all sources have been added the lastInputStreamAdded() should
157: * be called so that read methods can return -1 (end of stream).
158: * <p>
159: * Adding new sources may by interleaved with read calls.
160: *
161: * @since ostermillerutils 1.04.01
162: */
163: public ConcatInputStream() {
164: // Empty constructor
165: }
166:
167: /**
168: * Create a new InputStream with one source.
169: *
170: * @param in InputStream to use as a source.
171: *
172: * @throws NullPointerException if in is null
173: *
174: * @since ostermillerutils 1.04.00
175: */
176: public ConcatInputStream(InputStream in) {
177: addInputStream(in);
178: lastInputStreamAdded();
179: }
180:
181: /**
182: * Create a new InputStream with two sources.
183: *
184: * @param in1 first InputStream to use as a source.
185: * @param in2 second InputStream to use as a source.
186: *
187: * @throws NullPointerException if either source is null.
188: *
189: * @since ostermillerutils 1.04.00
190: */
191: public ConcatInputStream(InputStream in1, InputStream in2) {
192: addInputStream(in1);
193: addInputStream(in2);
194: lastInputStreamAdded();
195: }
196:
197: /**
198: * Create a new InputStream with an arbitrary number of sources.
199: *
200: * @param in InputStreams to use as a sources.
201: *
202: * @throws NullPointerException if the input array on any element is null.
203: *
204: * @since ostermillerutils 1.04.00
205: */
206: public ConcatInputStream(InputStream[] in) {
207: addInputStreams(in);
208: lastInputStreamAdded();
209: }
210:
211: /**
212: * Reads the next byte of data from the underlying streams. The value byte is
213: * returned as an int in the range 0 to 255. If no byte is available because
214: * the end of the stream has been reached, the value -1 is returned. This method
215: * blocks until input data is available, the end of the stream is detected, or
216: * an exception is thrown.
217: * <p>
218: * If this class in not done accepting inputstreams and the end of the last known
219: * stream is reached, this method will block forever unless another thread
220: * adds an inputstream or interrupts.
221: *
222: * @return the next byte of data, or -1 if the end of the stream is reached.
223: *
224: * @throws IOException if an I/O error occurs.
225: */
226: @Override
227: public int read() throws IOException {
228: if (closed)
229: throw new IOException("InputStream closed");
230: int r = -1;
231: while (r == -1) {
232: InputStream in = getCurrentInputStream();
233: if (in == null) {
234: if (doneAddingInputStreams)
235: return -1;
236: try {
237: Thread.sleep(100);
238: } catch (InterruptedException iox) {
239: throw new IOException("Interrupted");
240: }
241: } else {
242: r = in.read();
243: if (r == -1)
244: advanceToNextInputStream();
245: }
246: }
247: return r;
248: }
249:
250: /**
251: * Reads some number of bytes from the underlying streams and stores them into
252: * the buffer array b. The number of bytes actually read is returned as an
253: * integer. This method blocks until input data is available, end of file is
254: * detected, or an exception is thrown.
255: * <p>
256: * If the length of b is zero,
257: * then no bytes are read and 0 is returned; otherwise, there is an attempt
258: * to read at least one byte.
259: * <p>
260: * The read(b) method for class InputStream has the same effect as:<br>
261: * read(b, 0, b.length)
262: * <p>
263: * If this class in not done accepting inputstreams and the end of the last known
264: * stream is reached, this method will block forever unless another thread
265: * adds an inputstream or interrupts.
266: *
267: * @param b - Destination buffer
268: * @return The number of bytes read, or -1 if the end of the stream has been reached
269: *
270: * @throws IOException - If an I/O error occurs
271: * @throws NullPointerException - If b is null.
272: *
273: * @since ostermillerutils 1.04.00
274: */
275: @Override
276: public int read(byte[] b) throws IOException {
277: return read(b, 0, b.length);
278: }
279:
280: /**
281: * Reads up to length bytes of data from the underlying streams into an array of bytes.
282: * An attempt is made to read as many as length bytes, but a smaller number may be read,
283: * possibly zero. The number of bytes actually read is returned as an integer.
284: * <p>
285: * If length is zero,
286: * then no bytes are read and 0 is returned; otherwise, there is an attempt
287: * to read at least one byte.
288: * <p>
289: * This method blocks until input data is available
290: * <p>
291: * If this class in not done accepting inputstreams and the end of the last known
292: * stream is reached, this method will block forever unless another thread
293: * adds an inputstream or interrupts.
294: *
295: * @param b Destination buffer
296: * @param off Offset at which to start storing bytes
297: * @param len Maximum number of bytes to read
298: * @return The number of bytes read, or -1 if the end of the stream has been reached
299: *
300: * @throws IOException - If an I/O error occurs
301: * @throws NullPointerException - If b is null.
302: * @throws IndexOutOfBoundsException - if length or offset are not possible.
303: */
304: @Override
305: public int read(byte[] b, int off, int len) throws IOException {
306: if (off < 0 || len < 0 || off + len > b.length)
307: throw new IllegalArgumentException();
308: if (closed)
309: throw new IOException("InputStream closed");
310: int r = -1;
311: while (r == -1) {
312: InputStream in = getCurrentInputStream();
313: if (in == null) {
314: if (doneAddingInputStreams)
315: return -1;
316: try {
317: Thread.sleep(100);
318: } catch (InterruptedException iox) {
319: throw new IOException("Interrupted");
320: }
321: } else {
322: r = in.read(b, off, len);
323: if (r == -1)
324: advanceToNextInputStream();
325: }
326: }
327: return r;
328: }
329:
330: /**
331: * Skips over and discards n bytes of data from this input stream. The skip method
332: * may, for a variety of reasons, end up skipping over some smaller number of bytes,
333: * possibly 0. This may result from any of a number of conditions; reaching end of
334: * file before n bytes have been skipped is only one possibility. The actual number
335: * of bytes skipped is returned. If n is negative, no bytes are skipped.
336: * <p>
337: * If this class in not done accepting inputstreams and the end of the last known
338: * stream is reached, this method will block forever unless another thread
339: * adds an inputstream or interrupts.
340: *
341: * @param n he number of characters to skip
342: * @return The number of characters actually skipped
343: *
344: * @throws IOException If an I/O error occurs
345: *
346: * @since ostermillerutils 1.04.00
347: */
348: @Override
349: public long skip(long n) throws IOException {
350: if (closed)
351: throw new IOException("InputStream closed");
352: if (n <= 0)
353: return 0;
354: long s = -1;
355: while (s <= 0) {
356: InputStream in = getCurrentInputStream();
357: if (in == null) {
358: if (doneAddingInputStreams)
359: return 0;
360: try {
361: Thread.sleep(100);
362: } catch (InterruptedException iox) {
363: throw new IOException("Interrupted");
364: }
365: } else {
366: s = in.skip(n);
367: // When nothing was skipped it is a bit of a puzzle.
368: // The most common cause is that the end of the underlying
369: // stream was reached. In which case calling skip on it
370: // will always return zero. If somebody were calling skip
371: // until it skipped everything they needed, there would
372: // be an infinite loop if we were to return zero here.
373: // If we get zero, let us try to read one character so
374: // we can see if we are at the end of the stream. If so,
375: // we will move to the next.
376: if (s <= 0) {
377: // read() will advance to the next stream for us, so don't do it again
378: s = ((read() == -1) ? -1 : 1);
379: }
380: }
381:
382: }
383: return s;
384: }
385:
386: /**
387: * Returns the number of bytes that can be read (or skipped over) from this input
388: * stream without blocking by the next caller of a method for this input stream.
389: * The next caller might be the same thread or or another thread.
390: *
391: * @throws IOException If an I/O error occurs
392: *
393: * @since ostermillerutils 1.04.00
394: */
395: @Override
396: public int available() throws IOException {
397: if (closed)
398: throw new IOException("InputStream closed");
399: InputStream in = getCurrentInputStream();
400: if (in == null)
401: return 0;
402: return in.available();
403: }
404:
405: /**
406: * Closes this input stream and releases any system resources associated with the stream.
407: *
408: * @since ostermillerutils 1.04.00
409: */
410: @Override
411: public void close() throws IOException {
412: if (closed)
413: return;
414: for (Object element : inputStreamQueue) {
415: ((InputStream) element).close();
416: }
417: closed = true;
418: }
419:
420: /**
421: * Mark not supported
422: *
423: * @since ostermillerutils 1.04.00
424: */
425: @Override
426: public void mark(int readlimit) {
427: // Mark not supported -- do nothing
428: }
429:
430: /**
431: * Reset not supported.
432: *
433: * @throws IOException because reset is not supported.
434: *
435: * @since ostermillerutils 1.04.00
436: */
437: @Override
438: public void reset() throws IOException {
439: throw new IOException("Reset not supported");
440: }
441:
442: /**
443: * Does not support mark.
444: *
445: * @return false
446: *
447: * @since ostermillerutils 1.04.00
448: */
449: @Override
450: public boolean markSupported() {
451: return false;
452: }
453: }
|