001: /*
002: * Circular Object Buffer
003: * Copyright (C) 2002-2004 Stephen Ostermiller
004: * http://ostermiller.org/contact.pl?regarding=Java+Utilities
005: *
006: * This program is free software; you can redistribute it and/or modify
007: * it under the terms of the GNU General Public License as published by
008: * the Free Software Foundation; either version 2 of the License, or
009: * (at your option) any later version.
010: *
011: * This program is distributed in the hope that it will be useful,
012: * but WITHOUT ANY WARRANTY; without even the implied warranty of
013: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
014: * GNU General Public License for more details.
015: *
016: * See COPYING.TXT for details.
017: */
018: package com.Ostermiller.util;
019:
020: /**
021: * Implements the Circular Buffer producer/consumer model for Objects.
022: * More information about this class is available from <a target="_top" href=
023: * "http://ostermiller.org/utils/CircularObjectBuffer.html">ostermiller.org</a>.
024: * <p>
025: * This class is thread safe.
026: *
027: * @see CircularCharBuffer
028: * @see CircularByteBuffer
029: *
030: * @author Stephen Ostermiller http://ostermiller.org/contact.pl?regarding=Java+Utilities
031: * @param <ElementType> Type of object allowed in this circular buffer
032: * @since ostermillerutils 1.00.00
033: */
034: public class CircularObjectBuffer<ElementType> {
035:
036: /**
037: * The default size for a circular object buffer.
038: *
039: * @since ostermillerutils 1.00.00
040: */
041: private final static int DEFAULT_SIZE = 1024;
042:
043: /**
044: * A buffer that will grow as things are added.
045: *
046: * @since ostermillerutils 1.00.00
047: */
048: public final static int INFINITE_SIZE = -1;
049:
050: /**
051: * The circular buffer.
052: * <p>
053: * The actual capacity of the buffer is one less than the actual length
054: * of the buffer so that an empty and a full buffer can be
055: * distinguished. An empty buffer will have the readPostion and the
056: * writePosition equal to each other. A full buffer will have
057: * the writePosition one less than the readPostion.
058: * <p>
059: * There are two important indexes into the buffer:
060: * The readPosition, and the writePosition. The Objects
061: * available to be read go from the readPosition to the writePosition,
062: * wrapping around the end of the buffer. The space available for writing
063: * goes from the write position to one less than the readPosition,
064: * wrapping around the end of the buffer.
065: *
066: * @since ostermillerutils 1.00.00
067: */
068: protected ElementType[] buffer;
069: /**
070: * Index of the first Object available to be read.
071: *
072: * @since ostermillerutils 1.00.00
073: */
074: protected volatile int readPosition = 0;
075: /**
076: * Index of the first Object available to be written.
077: *
078: * @since ostermillerutils 1.00.00
079: */
080: protected volatile int writePosition = 0;
081: /**
082: * If this buffer is infinite (should resize itself when full)
083: *
084: * @since ostermillerutils 1.00.00
085: */
086: protected volatile boolean infinite = false;
087: /**
088: * True if a write to a full buffer should block until the buffer
089: * has room, false if the write method should throw an IOException
090: *
091: * @since ostermillerutils 1.00.00
092: */
093: protected boolean blockingWrite = true;
094:
095: /**
096: * True when no more input is coming into this buffer. At that
097: * point reading from the buffer may return null if the buffer
098: * is empty, otherwise a read will block until an Object is available.
099: *
100: * @since ostermillerutils 1.00.00
101: */
102: protected boolean inputDone = false;
103:
104: /**
105: * Make this buffer ready for reuse. The contents of the buffer
106: * will be cleared and the streams associated with this buffer
107: * will be reopened if they had been closed.
108: *
109: * @since ostermillerutils 1.00.00
110: */
111: public void clear() {
112: synchronized (this ) {
113: readPosition = 0;
114: writePosition = 0;
115: inputDone = false;
116: }
117: }
118:
119: /**
120: * Get number of Objects that are available to be read.
121: * <p>
122: * Note that the number of Objects available plus
123: * the number of Objects free may not add up to the
124: * capacity of this buffer, as the buffer may reserve some
125: * space for other purposes.
126: *
127: * @return the size in Objects of this buffer
128: *
129: * @since ostermillerutils 1.00.00
130: */
131: public int getAvailable() {
132: synchronized (this ) {
133: return available();
134: }
135: }
136:
137: /**
138: * Get the number of Objects this buffer has free for
139: * writing.
140: * <p>
141: * Note that the number of Objects available plus
142: * the number of Objects free may not add up to the
143: * capacity of this buffer, as the buffer may reserve some
144: * space for other purposes.
145: *
146: * @return the available space in Objects of this buffer
147: *
148: * @since ostermillerutils 1.00.00
149: */
150: public int getSpaceLeft() {
151: synchronized (this ) {
152: return spaceLeft();
153: }
154: }
155:
156: /**
157: * Get the capacity of this buffer.
158: * <p>
159: * Note that the number of Objects available plus
160: * the number of Objects free may not add up to the
161: * capacity of this buffer, as the buffer may reserve some
162: * space for other purposes.
163: *
164: * @return the size in Objects of this buffer
165: *
166: * @since ostermillerutils 1.00.00
167: */
168: public int getSize() {
169: synchronized (this ) {
170: return buffer.length;
171: }
172: }
173:
174: @SuppressWarnings("unchecked")
175: private ElementType[] createArray(int size) {
176: return (ElementType[]) new Object[size];
177: }
178:
179: /**
180: * double the size of the buffer
181: *
182: * @since ostermillerutils 1.00.00
183: */
184: private void resize() {
185: ElementType[] newBuffer = createArray(buffer.length * 2);
186: int available = available();
187: if (readPosition <= writePosition) {
188: // any space between the read and
189: // the first write needs to be saved.
190: // In this case it is all in one piece.
191: int length = writePosition - readPosition;
192: System
193: .arraycopy(buffer, readPosition, newBuffer, 0,
194: length);
195: } else {
196: int length1 = buffer.length - readPosition;
197: System.arraycopy(buffer, readPosition, newBuffer, 0,
198: length1);
199: int length2 = writePosition;
200: System.arraycopy(buffer, 0, newBuffer, length1, length2);
201: }
202: buffer = newBuffer;
203: readPosition = 0;
204: writePosition = available;
205: }
206:
207: /**
208: * Space available in the buffer which can be written.
209: *
210: * @since ostermillerutils 1.00.00
211: */
212: private int spaceLeft() {
213: if (writePosition < readPosition) {
214: // any space between the first write and
215: // the read except one Object is available.
216: // In this case it is all in one piece.
217: return (readPosition - writePosition - 1);
218: }
219: // space at the beginning and end.
220: return ((buffer.length - 1) - (writePosition - readPosition));
221: }
222:
223: /**
224: * Objects available for reading.
225: *
226: * @since ostermillerutils 1.00.00
227: */
228: private int available() {
229: if (readPosition <= writePosition) {
230: // any space between the first read and
231: // the first write is available. In this case i
232: // is all in one piece.
233: return (writePosition - readPosition);
234: }
235: // space at the beginning and end.
236: return (buffer.length - (readPosition - writePosition));
237: }
238:
239: /**
240: * Create a new buffer with a default capacity.
241: * Writing to a full buffer will block until space
242: * is available rather than throw an exception.
243: *
244: * @since ostermillerutils 1.00.00
245: */
246: public CircularObjectBuffer() {
247: this (DEFAULT_SIZE, true);
248: }
249:
250: /**
251: * Create a new buffer with given capacity.
252: * Writing to a full buffer will block until space
253: * is available rather than throw an exception.
254: * <p>
255: * Note that the buffer may reserve some Objects for
256: * special purposes and capacity number of Objects may
257: * not be able to be written to the buffer.
258: * <p>
259: * Note that if the buffer is of INFINITE_SIZE it will
260: * neither block or throw exceptions, but rather grow
261: * without bound.
262: *
263: * @param size desired capacity of the buffer in Objects or CircularObjectBuffer.INFINITE_SIZE.
264: *
265: * @since ostermillerutils 1.00.00
266: */
267: public CircularObjectBuffer(int size) {
268: this (size, true);
269: }
270:
271: /**
272: * Create a new buffer with a default capacity and
273: * given blocking behavior.
274: *
275: * @param blockingWrite true writing to a full buffer should block
276: * until space is available, false if an exception should
277: * be thrown instead.
278: *
279: * @since ostermillerutils 1.00.00
280: */
281: public CircularObjectBuffer(boolean blockingWrite) {
282: this (DEFAULT_SIZE, blockingWrite);
283: }
284:
285: /**
286: * Create a new buffer with the given capacity and
287: * blocking behavior.
288: * <p>
289: * Note that the buffer may reserve some Objects for
290: * special purposes and capacity number of Objects may
291: * not be able to be written to the buffer.
292: * <p>
293: * Note that if the buffer is of INFINITE_SIZE it will
294: * neither block or throw exceptions, but rather grow
295: * without bound.
296: *
297: * @param size desired capacity of the buffer in Objects or CircularObjectBuffer.INFINITE_SIZE.
298: * @param blockingWrite true writing to a full buffer should block
299: * until space is available, false if an exception should
300: * be thrown instead.
301: *
302: * @since ostermillerutils 1.00.00
303: */
304: public CircularObjectBuffer(int size, boolean blockingWrite) {
305: if (size == INFINITE_SIZE) {
306: buffer = createArray(DEFAULT_SIZE);
307: infinite = true;
308: } else {
309: buffer = createArray(size);
310: infinite = false;
311: }
312: this .blockingWrite = blockingWrite;
313: }
314:
315: /**
316: * Get a single Object from this buffer. This method should be called
317: * by the consumer.
318: * This method will block until a Object is available or no more
319: * objects are available.
320: *
321: * @return The Object read, or null if there are no more objects
322: * @throws InterruptedException if the thread is interrupted while waiting.
323: *
324: * @since ostermillerutils 1.00.00
325: */
326: public ElementType read() throws InterruptedException {
327: while (true) {
328: synchronized (this ) {
329: int available = available();
330: if (available > 0) {
331: ElementType result = buffer[readPosition];
332: readPosition++;
333: if (readPosition == buffer.length) {
334: readPosition = 0;
335: }
336: return result;
337: } else if (inputDone) {
338: return null;
339: }
340: }
341: Thread.sleep(100);
342: }
343: }
344:
345: /**
346: * Get Objects into an array from this buffer. This method should
347: * be called by the consumer.
348: * This method will block until some input is available,
349: * or there is no more input.
350: *
351: * @param buf Destination buffer.
352: * @return The number of Objects read, or -1 there will
353: * be no more objects available.
354: * @throws InterruptedException if the thread is interrupted while waiting.
355: *
356: * @since ostermillerutils 1.00.00
357: */
358: public int read(ElementType[] buf) throws InterruptedException {
359: return read(buf, 0, buf.length);
360: }
361:
362: /**
363: * Get Objects into a portion of an array from this buffer. This
364: * method should be called by the consumer.
365: * This method will block until some input is available,
366: * an I/O error occurs, or the end of the stream is reached.
367: *
368: * @param buf Destination buffer.
369: * @param off Offset at which to start storing Objects.
370: * @param len Maximum number of Objects to read.
371: * @return The number of Objects read, or -1 there will
372: * be no more objects available.
373: * @throws InterruptedException if the thread is interrupted while waiting.
374: *
375: * @since ostermillerutils 1.00.00
376: */
377: public int read(ElementType[] buf, int off, int len)
378: throws InterruptedException {
379: while (true) {
380: synchronized (this ) {
381: int available = available();
382: if (available > 0) {
383: int length = Math.min(len, available);
384: int firstLen = Math.min(length, buffer.length
385: - readPosition);
386: int secondLen = length - firstLen;
387: System.arraycopy(buffer, readPosition, buf, off,
388: firstLen);
389: if (secondLen > 0) {
390: System.arraycopy(buffer, 0, buf,
391: off + firstLen, secondLen);
392: readPosition = secondLen;
393: } else {
394: readPosition += length;
395: }
396: if (readPosition == buffer.length) {
397: readPosition = 0;
398: }
399: return length;
400: } else if (inputDone) {
401: return -1;
402: }
403: }
404: Thread.sleep(100);
405: }
406: }
407:
408: /**
409: * Skip Objects. This method should be used by the consumer
410: * when it does not care to examine some number of Objects.
411: * This method will block until some Objects are available,
412: * or there will be no more Objects available.
413: *
414: * @param n The number of Objects to skip
415: * @return The number of Objects actually skipped
416: * @throws IllegalArgumentException if n is negative.
417: * @throws InterruptedException if the thread is interrupted while waiting.
418: *
419: * @since ostermillerutils 1.00.00
420: */
421: public long skip(long n) throws InterruptedException,
422: IllegalArgumentException {
423: while (true) {
424: synchronized (this ) {
425: int available = available();
426: if (available > 0) {
427: int length = Math.min((int) n, available);
428: int firstLen = Math.min(length, buffer.length
429: - readPosition);
430: int secondLen = length - firstLen;
431: if (secondLen > 0) {
432: readPosition = secondLen;
433: } else {
434: readPosition += length;
435: }
436: if (readPosition == buffer.length) {
437: readPosition = 0;
438: }
439: return length;
440: } else if (inputDone) {
441: return 0;
442: }
443: }
444: Thread.sleep(100);
445: }
446: }
447:
448: /**
449: * This method should be used by the producer to signal to the consumer
450: * that the producer is done producing objects and that the consumer
451: * should stop asking for objects once it has used up buffered objects.
452: * <p>
453: * Once the producer has signaled that it is done, further write() invocations
454: * will cause an IllegalStateException to be thrown. Calling done() multiple times,
455: * however, has no effect.
456: *
457: * @since ostermillerutils 1.00.00
458: */
459: public void done() {
460: synchronized (this ) {
461: inputDone = true;
462: }
463: }
464:
465: /**
466: * Fill this buffer with array of Objects. This method should be called
467: * by the producer.
468: * If the buffer allows blocking writes, this method will block until
469: * all the data has been written rather than throw a BufferOverflowException.
470: *
471: * @param buf Array of Objects to be written
472: * @throws BufferOverflowException if buffer does not allow blocking writes
473: * and the buffer is full. If the exception is thrown, no data
474: * will have been written since the buffer was set to be non-blocking.
475: * @throws IllegalStateException if done() has been called.
476: * @throws InterruptedException if the write is interrupted.
477: *
478: * @since ostermillerutils 1.00.00
479: */
480: public void write(ElementType[] buf)
481: throws BufferOverflowException, IllegalStateException,
482: InterruptedException {
483: write(buf, 0, buf.length);
484: }
485:
486: /**
487: * Fill this buffer with a portion of an array of Objects.
488: * This method should be called by the producer.
489: * If the buffer allows blocking writes, this method will block until
490: * all the data has been written rather than throw an IOException.
491: *
492: * @param buf Array of Objects
493: * @param off Offset from which to start writing Objects
494: * @param len - Number of Objects to write
495: * @throws BufferOverflowException if buffer does not allow blocking writes
496: * and the buffer is full. If the exception is thrown, no data
497: * will have been written since the buffer was set to be non-blocking.
498: * @throws IllegalStateException if done() has been called.
499: * @throws InterruptedException if the write is interrupted.
500: *
501: * @since ostermillerutils 1.00.00
502: */
503: public void write(ElementType[] buf, int off, int len)
504: throws BufferOverflowException, IllegalStateException,
505: InterruptedException {
506: while (len > 0) {
507: synchronized (CircularObjectBuffer.this ) {
508: if (inputDone)
509: throw new IllegalStateException(
510: "CircularObjectBuffer.done() has been called, CircularObjectBuffer.write() failed.");
511: int spaceLeft = spaceLeft();
512: while (infinite && spaceLeft < len) {
513: resize();
514: spaceLeft = spaceLeft();
515: }
516: if (!blockingWrite && spaceLeft < len)
517: throw new BufferOverflowException(
518: "CircularObjectBuffer is full; cannot write "
519: + len + " Objects");
520: int realLen = Math.min(len, spaceLeft);
521: int firstLen = Math.min(realLen, buffer.length
522: - writePosition);
523: int secondLen = Math.min(realLen - firstLen,
524: buffer.length - readPosition - 1);
525: int written = firstLen + secondLen;
526: if (firstLen > 0) {
527: System.arraycopy(buf, off, buffer, writePosition,
528: firstLen);
529: }
530: if (secondLen > 0) {
531: System.arraycopy(buf, off + firstLen, buffer, 0,
532: secondLen);
533: writePosition = secondLen;
534: } else {
535: writePosition += written;
536: }
537: if (writePosition == buffer.length) {
538: writePosition = 0;
539: }
540: off += written;
541: len -= written;
542: }
543: if (len > 0) {
544: Thread.sleep(100);
545: }
546: }
547: }
548:
549: /**
550: * Add a single Object to this buffer. This method should be
551: * called by the producer.
552: * If the buffer allows blocking writes, this method will block until
553: * all the data has been written rather than throw an IOException.
554: *
555: * @param o Object to be written.
556: * @throws BufferOverflowException if buffer does not allow blocking writes
557: * and the buffer is full. If the exception is thrown, no data
558: * will have been written since the buffer was set to be non-blocking.
559: * @throws IllegalStateException if done() has been called.
560: * @throws InterruptedException if the write is interrupted.
561: *
562: * @since ostermillerutils 1.00.00
563: */
564: public void write(ElementType o) throws BufferOverflowException,
565: IllegalStateException, InterruptedException {
566: boolean written = false;
567: while (!written) {
568: synchronized (CircularObjectBuffer.this ) {
569: if (inputDone)
570: throw new IllegalStateException(
571: "CircularObjectBuffer.done() has been called, CircularObjectBuffer.write() failed.");
572: int spaceLeft = spaceLeft();
573: while (infinite && spaceLeft < 1) {
574: resize();
575: spaceLeft = spaceLeft();
576: }
577: if (!blockingWrite && spaceLeft < 1)
578: throw new BufferOverflowException(
579: "CircularObjectBuffer is full; cannot write 1 Object");
580: if (spaceLeft > 0) {
581: buffer[writePosition] = o;
582: writePosition++;
583: if (writePosition == buffer.length) {
584: writePosition = 0;
585: }
586: written = true;
587: }
588: }
589: if (!written) {
590: Thread.sleep(100);
591: }
592: }
593: }
594: }
|