001: package gnu.text;
002:
003: import java.io.*;
004:
005: /** An InPort that reads from a queue.
006: * The method append can be used to write chars to the end of the queue.
007: * @author Per Bothner <bothner@cygnus.com>
008: */
009:
010: public class QueueReader extends Reader {
011: char[] buffer;
012: int readAheadLimit;
013: int mark; // Mark position.
014: int pos; // Read position.
015: int limit; // Write position.
016: boolean EOFseen;
017:
018: public QueueReader() {
019: }
020:
021: public boolean markSupported() {
022: return true;
023: }
024:
025: public synchronized void mark(int readAheadLimit) {
026: this .readAheadLimit = readAheadLimit;
027: mark = pos;
028: }
029:
030: public synchronized void reset() {
031: if (readAheadLimit > 0)
032: pos = mark;
033: }
034:
035: void resize(int len) {
036: int cur_size = limit - pos;
037: if (readAheadLimit > 0 && pos - mark <= readAheadLimit)
038: cur_size = limit - mark;
039: else
040: mark = pos;
041: char[] new_buffer = (buffer.length < cur_size + len ? new char[2
042: * cur_size + len]
043: : buffer);
044: System.arraycopy(buffer, mark, new_buffer, 0, cur_size);
045: buffer = new_buffer;
046: pos -= mark;
047: mark = 0;
048: limit = cur_size;
049: }
050:
051: public void append(String str) {
052: append(str.toCharArray());
053: }
054:
055: public void append(char[] chars) {
056: append(chars, 0, chars.length);
057: }
058:
059: public synchronized void append(char[] chars, int off, int len) {
060: if (buffer == null)
061: buffer = new char[100 + len];
062: else if (buffer.length < limit + len)
063: resize(len);
064: System.arraycopy(chars, off, buffer, limit, len);
065: limit += len;
066: notifyAll();
067: }
068:
069: public synchronized void append(char ch) {
070: if (buffer == null)
071: buffer = new char[100];
072: else if (buffer.length <= limit)
073: resize(1);
074: buffer[limit++] = ch;
075: notifyAll();
076: }
077:
078: /** For the writer to signal that there is no more data to append. */
079: public synchronized void appendEOF() {
080: EOFseen = true;
081: }
082:
083: public synchronized boolean ready() {
084: return pos < limit || EOFseen;
085: }
086:
087: public synchronized int read() {
088: while (pos >= limit) {
089: if (EOFseen)
090: return -1;
091: try {
092: wait();
093: } catch (java.lang.InterruptedException ex) {
094: }
095: }
096: char ch = buffer[pos++];
097: return ch;
098: }
099:
100: public synchronized int read(char[] cbuf, int off, int len) {
101: if (len == 0)
102: return 0;
103: while (pos >= limit) {
104: if (EOFseen)
105: return -1;
106: try {
107: wait();
108: } catch (java.lang.InterruptedException ex) {
109: }
110: }
111: int avail = limit - pos;
112: if (len > avail)
113: len = avail;
114: System.arraycopy(buffer, pos, cbuf, off, len);
115: pos += len;
116: return len;
117: }
118:
119: public synchronized void close() {
120: pos = 0;
121: limit = 0;
122: mark = 0;
123: EOFseen = true;
124: buffer = null;
125: }
126: }
|