001: /*
002: * Copyright (C) 2004 Stephen Ostermiller
003: * http://ostermiller.org/contact.pl?regarding=Java+Utilities
004: *
005: * This program is free software; you can redistribute it and/or modify
006: * it under the terms of the GNU General Public License as published by
007: * the Free Software Foundation; either version 2 of the License, or
008: * (at your option) any later version.
009: *
010: * This program is distributed in the hope that it will be useful,
011: * but WITHOUT ANY WARRANTY; without even the implied warranty of
012: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
013: * GNU General Public License for more details.
014: *
015: * See COPYING.TXT for details.
016: */
017: package com.Ostermiller.util;
018:
019: import java.io.*;
020: import java.util.ArrayList;
021:
022: /**
023: * A reader which reads sequentially from multiple sources.
024: * More information about this class is available from <a target="_top" href=
025: * "http://ostermiller.org/utils/">ostermiller.org</a>.
026: *
027: * @author Stephen Ostermiller http://ostermiller.org/contact.pl?regarding=Java+Utilities
028: * @since ostermillerutils 1.04.00
029: */
030: public class ConcatReader extends Reader {
031:
032: /**
033: * Current index to readerQueue
034: *
035: * @since ostermillerutils 1.04.01
036: */
037: private int readerQueueIndex = 0;
038:
039: /**
040: * Queue of readers that have yet to be read from.
041: *
042: * @since ostermillerutils 1.04.01
043: */
044: private ArrayList<Reader> readerQueue = new ArrayList<Reader>();
045:
046: /**
047: * A cache of the current reader from the readerQueue
048: * to avoid unneeded access to the queue which must
049: * be synchronized.
050: *
051: * @since ostermillerutils 1.04.01
052: */
053: private Reader currentReader = null;
054:
055: /**
056: * true iff the client may add more readers.
057: *
058: * @since ostermillerutils 1.04.01
059: */
060: private boolean doneAddingReaders = false;
061:
062: /**
063: * Causes the addReader method to throw IllegalStateException
064: * and read() methods to return -1 (end of stream)
065: * when there is no more available data.
066: * <p>
067: * Calling this method when this class is no longer accepting
068: * more readers has no effect.
069: *
070: * @since ostermillerutils 1.04.01
071: */
072: public void lastReaderAdded() {
073: doneAddingReaders = true;
074: }
075:
076: /**
077: * Add the given reader to the queue of readers from which to
078: * concatenate data.
079: *
080: * @param in Reader to add to the concatenation.
081: * @throws IllegalStateException if more readers can't be added because lastReaderAdded() has been called, close() has been called, or a constructor with reader parameters was used.
082: *
083: * @since ostermillerutils 1.04.01
084: */
085: public void addReader(Reader in) {
086: synchronized (readerQueue) {
087: if (in == null)
088: throw new NullPointerException();
089: if (closed)
090: throw new IllegalStateException(
091: "ConcatReader has been closed");
092: if (doneAddingReaders)
093: throw new IllegalStateException(
094: "Cannot add more readers - the last reader has already been added.");
095: readerQueue.add(in);
096: }
097: }
098:
099: /**
100: * Add the given reader to the queue of readers from which to
101: * concatenate data.
102: *
103: * @param in Reader to add to the concatenation.
104: * @throws IllegalStateException if more readers can't be added because lastReaderAdded() has been called, close() has been called, or a constructor with reader parameters was used.
105: * @throws NullPointerException the array of readers, or any of the contents is null.
106: *
107: * @since ostermillerutils 1.04.01
108: */
109: public void addReaders(Reader[] in) {
110: for (Reader element : in) {
111: addReader(element);
112: }
113: }
114:
115: /**
116: * Gets the current reader, looking at the next
117: * one in the list if the current one is null.
118: *
119: * @since ostermillerutils 1.04.01
120: */
121: private Reader getCurrentReader() {
122: if (currentReader == null
123: && readerQueueIndex < readerQueue.size()) {
124: synchronized (readerQueue) {
125: // reader queue index is advanced only by the nextReader()
126: // method. Don't do it here.
127: currentReader = readerQueue.get(readerQueueIndex);
128: }
129: }
130: return currentReader;
131: }
132:
133: /**
134: * Indicate that we are done with the current reader and we should
135: * advance to the next reader.
136: *
137: * @since ostermillerutils 1.04.01
138: */
139: private void advanceToNextReader() {
140: currentReader = null;
141: readerQueueIndex++;
142: }
143:
144: /**
145: * True iff this the close() method has been called on this stream.
146: *
147: * @since ostermillerutils 1.04.00
148: */
149: private boolean closed = false;
150:
151: /**
152: * Create a new reader that can dynamically accept new sources.
153: * <p>
154: * New sources should be added using the addReader() method.
155: * When all sources have been added the lastReaderAdded() should
156: * be called so that read methods can return -1 (end of stream).
157: * <p>
158: * Adding new sources may by interleaved with read calls.
159: *
160: * @since ostermillerutils 1.04.01
161: */
162: public ConcatReader() {
163: // Empty Constructor
164: }
165:
166: /**
167: * Create a new reader with one source.
168: * <p>
169: * When using this constructor, more readers cannot
170: * be added later, and calling addReader() will
171: * throw an illegal state Exception.
172: *
173: * @param in reader to use as a source.
174: *
175: * @throws NullPointerException if in is null
176: *
177: * @since ostermillerutils 1.04.00
178: */
179: public ConcatReader(Reader in) {
180: addReader(in);
181: lastReaderAdded();
182: }
183:
184: /**
185: * Create a new reader with two sources.
186: * <p>
187: * When using this constructor, more readers cannot
188: * be added later, and calling addReader() will
189: * throw an illegal state Exception.
190: *
191: * @param in1 first reader to use as a source.
192: * @param in2 second reader to use as a source.
193: *
194: * @throws NullPointerException if either source is null.
195: *
196: * @since ostermillerutils 1.04.00
197: */
198: public ConcatReader(Reader in1, Reader in2) {
199: addReader(in1);
200: addReader(in2);
201: lastReaderAdded();
202: }
203:
204: /**
205: * Create a new reader with an arbitrary number of sources.
206: * <p>
207: * When using this constructor, more readers cannot
208: * be added later, and calling addReader() will
209: * throw an illegal state Exception.
210: *
211: * @param in readers to use as a sources.
212: *
213: * @throws NullPointerException if the input array on any element is null.
214: *
215: * @since ostermillerutils 1.04.00
216: */
217: public ConcatReader(Reader[] in) {
218: addReaders(in);
219: lastReaderAdded();
220: }
221:
222: /**
223: * Read a single character. This method will block until a
224: * character is available, an I/O error occurs, or the end of all underlying
225: * streams are reached.
226: * <p>
227: * If this class in not done accepting readers and the end of the last known
228: * stream is reached, this method will block forever unless another thread
229: * adds a reader or interrupts.
230: *
231: * @return The character read, as an integer in the range 0 to 65535 (0x00-0xffff),
232: * or -1 if the end of the stream has been reached
233: *
234: * @throws IOException - If an I/O error occurs
235: *
236: * @since ostermillerutils 1.04.00
237: */
238: @Override
239: public int read() throws IOException {
240: if (closed)
241: throw new IOException("Reader closed");
242: int r = -1;
243: while (r == -1) {
244: Reader in = getCurrentReader();
245: if (in == null) {
246: if (doneAddingReaders)
247: return -1;
248: try {
249: Thread.sleep(100);
250: } catch (InterruptedException iox) {
251: throw new IOException("Interrupted");
252: }
253: } else {
254: r = in.read();
255: if (r == -1)
256: advanceToNextReader();
257: }
258: }
259: return r;
260: }
261:
262: /**
263: * Read characters into an array. This method will block until some input is available, an
264: * I/O error occurs, or the end of all underlying
265: * streams are reached.
266: * <p>
267: * If this class in not done accepting readers and the end of the last known
268: * stream is reached, this method will block forever unless another thread
269: * adds a reader or interrupts.
270: *
271: * @param cbuf - Destination buffer
272: * @return The number of characters read, or -1 if the end of the stream has been reached
273: *
274: * @throws IOException - If an I/O error occurs
275: * @throws NullPointerException - If the buffer is null.
276: *
277: * @since ostermillerutils 1.04.00
278: */
279: @Override
280: public int read(char[] cbuf) throws IOException {
281: return read(cbuf, 0, cbuf.length);
282: }
283:
284: /**
285: * Read characters into a portion of an array. This method will block until
286: * some input is available, an I/O error occurs, or the end of all underlying
287: * streams are reached.
288: * <p>
289: * If this class in not done accepting readers and the end of the last known
290: * stream is reached, this method will block forever unless another thread
291: * adds a reader or interrupts.
292: *
293: * @param cbuf Destination buffer
294: * @param off Offset at which to start storing characters
295: * @param len Maximum number of characters to read
296: * @return The number of characters read, or -1 if the end of the stream has been reached
297: *
298: * @throws IOException - If an I/O error occurs
299: * @throws NullPointerException - If the buffer is null.
300: * @throws IndexOutOfBoundsException - if length or offset are not possible.
301: *
302: * @since ostermillerutils 1.04.00
303: */
304: @Override
305: public int read(char[] cbuf, int off, int len) throws IOException {
306: if (off < 0 || len < 0 || off + len > cbuf.length)
307: throw new IndexOutOfBoundsException();
308: if (closed)
309: throw new IOException("Reader closed");
310: int r = -1;
311: while (r == -1) {
312: Reader in = getCurrentReader();
313: if (in == null) {
314: if (doneAddingReaders)
315: return -1;
316: try {
317: Thread.sleep(100);
318: } catch (InterruptedException iox) {
319: throw new IOException("Interrupted");
320: }
321: } else {
322: r = in.read(cbuf, off, len);
323: if (r == -1)
324: advanceToNextReader();
325: }
326: }
327: return r;
328: }
329:
330: /**
331: * Skip characters. This method will block until some characters are
332: * available, an I/O error occurs, or the end of the stream is reached.
333: * <p>
334: * If this class in not done accepting readers and the end of the last known
335: * stream is reached, this method will block forever unless another thread
336: * adds a reader or interrupts.
337: *
338: * @param n the number of characters to skip
339: * @return The number of characters actually skipped
340: *
341: * @throws IllegalArgumentException If n is negative.
342: * @throws IOException If an I/O error occurs
343: *
344: * @since ostermillerutils 1.04.00
345: */
346: @Override
347: public long skip(long n) throws IOException {
348: if (closed)
349: throw new IOException("Reader closed");
350: if (n <= 0)
351: return 0;
352: long s = -1;
353: while (s <= 0) {
354: Reader in = getCurrentReader();
355: if (in == null) {
356: if (doneAddingReaders)
357: return 0;
358: try {
359: Thread.sleep(100);
360: } catch (InterruptedException iox) {
361: throw new IOException("Interrupted");
362: }
363: } else {
364: s = in.skip(n);
365: // When nothing was skipped it is a bit of a puzzle.
366: // The most common cause is that the end of the underlying
367: // stream was reached. In which case calling skip on it
368: // will always return zero. If somebody were calling skip
369: // until it skipped everything they needed, there would
370: // be an infinite loop if we were to return zero here.
371: // If we get zero, let us try to read one character so
372: // we can see if we are at the end of the stream. If so,
373: // we will move to the next.
374: if (s <= 0) {
375: // read() will advance to the next stream for us, so don't do it again
376: s = ((read() == -1) ? -1 : 1);
377: }
378: }
379:
380: }
381: return s;
382: }
383:
384: /**
385: * Tell whether this stream is ready to be read.
386: *
387: * @return True if the next read() is guaranteed not to block for input,
388: * false otherwise. Note that returning false does not guarantee that the next
389: * read will block.
390: *
391: * @throws IOException If an I/O error occurs
392: *
393: * @since ostermillerutils 1.04.00
394: */
395: @Override
396: public boolean ready() throws IOException {
397: if (closed)
398: throw new IOException("Reader closed");
399: Reader in = getCurrentReader();
400: if (in == null)
401: return false;
402: return in.ready();
403: }
404:
405: /**
406: * Close the stream and any underlying streams.
407: * Once a stream has been closed, further read(), ready(), mark(), or reset()
408: * invocations will throw an IOException. Closing a previously-closed stream,
409: * however, has no effect.
410: *
411: * @throws IOException If an I/O error occurs
412: *
413: * @since ostermillerutils 1.04.00
414: */
415: @Override
416: public void close() throws IOException {
417: if (closed)
418: return;
419: for (Reader reader : readerQueue) {
420: reader.close();
421: }
422: closed = true;
423: }
424:
425: /**
426: * Mark not supported.
427: *
428: * @throws IOException because mark is not supported.
429: *
430: * @since ostermillerutils 1.04.00
431: */
432: @Override
433: public void mark(int readlimit) throws IOException {
434: throw new IOException("Mark not supported");
435: }
436:
437: /**
438: * Reset not supported.
439: *
440: * @throws IOException because reset is not supported.
441: *
442: * @since ostermillerutils 1.04.00
443: */
444: @Override
445: public void reset() throws IOException {
446: throw new IOException("Reset not supported");
447: }
448:
449: /**
450: * Mark not supported.
451: *
452: * @return false
453: *
454: * @since ostermillerutils 1.04.00
455: */
456: @Override
457: public boolean markSupported() {
458: return false;
459: }
460: }
|