001: /*
002: * Copyright 2003-2005 The Apache Software Foundation
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016: package org.apache.commons.collections.buffer;
017:
018: import java.io.PrintWriter;
019: import java.io.StringWriter;
020: import java.util.Collection;
021:
022: import org.apache.commons.collections.Buffer;
023: import org.apache.commons.collections.BufferUnderflowException;
024:
025: /**
026: * Decorates another <code>Buffer</code> to make {@link #get()} and
027: * {@link #remove()} block when the <code>Buffer</code> is empty.
028: * <p>
029: * If either <code>get</code> or <code>remove</code> is called on an empty
030: * <code>Buffer</code>, the calling thread waits for notification that
031: * an <code>add</code> or <code>addAll</code> operation has completed.
032: * <p>
033: * When one or more entries are added to an empty <code>Buffer</code>,
034: * all threads blocked in <code>get</code> or <code>remove</code> are notified.
035: * There is no guarantee that concurrent blocked <code>get</code> or
036: * <code>remove</code> requests will be "unblocked" and receive data in the
037: * order that they arrive.
038: * <p>
039: * This class is Serializable from Commons Collections 3.1.
040: * This class contains an extra field in 3.2, however the serialization
041: * specification will handle this gracefully.
042: *
043: * @author Stephen Colebourne
044: * @author Janek Bogucki
045: * @author Phil Steitz
046: * @author James Carman
047: * @version $Revision: 348808 $ $Date: 2005-11-24 21:35:09 +0000 (Thu, 24 Nov 2005) $
048: * @since Commons Collections 3.0
049: */
050: public class BlockingBuffer extends SynchronizedBuffer {
051:
052: /** Serialization version. */
053: private static final long serialVersionUID = 1719328905017860541L;
054: /** The timeout value in milliseconds. */
055: private final long timeout;
056:
057: /**
058: * Factory method to create a blocking buffer.
059: *
060: * @param buffer the buffer to decorate, must not be null
061: * @return a new blocking Buffer
062: * @throws IllegalArgumentException if buffer is null
063: */
064: public static Buffer decorate(Buffer buffer) {
065: return new BlockingBuffer(buffer);
066: }
067:
068: /**
069: * Factory method to create a blocking buffer with a timeout value.
070: *
071: * @param buffer the buffer to decorate, must not be null
072: * @param timeoutMillis the timeout value in milliseconds, zero or less for no timeout
073: * @return a new blocking buffer
074: * @throws IllegalArgumentException if the buffer is null
075: * @since Commons Collections 3.2
076: */
077: public static Buffer decorate(Buffer buffer, long timeoutMillis) {
078: return new BlockingBuffer(buffer, timeoutMillis);
079: }
080:
081: //-----------------------------------------------------------------------
082: /**
083: * Constructor that wraps (not copies).
084: *
085: * @param buffer the buffer to decorate, must not be null
086: * @throws IllegalArgumentException if the buffer is null
087: */
088: protected BlockingBuffer(Buffer buffer) {
089: super (buffer);
090: this .timeout = 0;
091: }
092:
093: /**
094: * Constructor that wraps (not copies).
095: *
096: * @param buffer the buffer to decorate, must not be null
097: * @param timeoutMillis the timeout value in milliseconds, zero or less for no timeout
098: * @throws IllegalArgumentException if the buffer is null
099: * @since Commons Collections 3.2
100: */
101: protected BlockingBuffer(Buffer buffer, long timeoutMillis) {
102: super (buffer);
103: this .timeout = (timeoutMillis < 0 ? 0 : timeoutMillis);
104: }
105:
106: //-----------------------------------------------------------------------
107: public boolean add(Object o) {
108: synchronized (lock) {
109: boolean result = collection.add(o);
110: lock.notifyAll();
111: return result;
112: }
113: }
114:
115: public boolean addAll(Collection c) {
116: synchronized (lock) {
117: boolean result = collection.addAll(c);
118: lock.notifyAll();
119: return result;
120: }
121: }
122:
123: /**
124: * Gets the next value from the buffer, waiting until an object is
125: * added if the buffer is empty. This method uses the default timeout
126: * set in the constructor.
127: *
128: * @throws BufferUnderflowException if an interrupt is received
129: */
130: public Object get() {
131: synchronized (lock) {
132: while (collection.isEmpty()) {
133: try {
134: if (timeout <= 0) {
135: lock.wait();
136: } else {
137: return get(timeout);
138: }
139: } catch (InterruptedException e) {
140: PrintWriter out = new PrintWriter(
141: new StringWriter());
142: e.printStackTrace(out);
143: throw new BufferUnderflowException(
144: "Caused by InterruptedException: "
145: + out.toString());
146: }
147: }
148: return getBuffer().get();
149: }
150: }
151:
152: /**
153: * Gets the next value from the buffer, waiting until an object is
154: * added for up to the specified timeout value if the buffer is empty.
155: *
156: * @param timeout the timeout value in milliseconds
157: * @throws BufferUnderflowException if an interrupt is received
158: * @throws BufferUnderflowException if the timeout expires
159: * @since Commons Collections 3.2
160: */
161: public Object get(final long timeout) {
162: synchronized (lock) {
163: final long expiration = System.currentTimeMillis()
164: + timeout;
165: long timeLeft = expiration - System.currentTimeMillis();
166: while (timeLeft > 0 && collection.isEmpty()) {
167: try {
168: lock.wait(timeLeft);
169: timeLeft = expiration - System.currentTimeMillis();
170: } catch (InterruptedException e) {
171: PrintWriter out = new PrintWriter(
172: new StringWriter());
173: e.printStackTrace(out);
174: throw new BufferUnderflowException(
175: "Caused by InterruptedException: "
176: + out.toString());
177: }
178: }
179: if (collection.isEmpty()) {
180: throw new BufferUnderflowException("Timeout expired");
181: }
182: return getBuffer().get();
183: }
184: }
185:
186: /**
187: * Removes the next value from the buffer, waiting until an object is
188: * added if the buffer is empty. This method uses the default timeout
189: * set in the constructor.
190: *
191: * @throws BufferUnderflowException if an interrupt is received
192: */
193: public Object remove() {
194: synchronized (lock) {
195: while (collection.isEmpty()) {
196: try {
197: if (timeout <= 0) {
198: lock.wait();
199: } else {
200: return remove(timeout);
201: }
202: } catch (InterruptedException e) {
203: PrintWriter out = new PrintWriter(
204: new StringWriter());
205: e.printStackTrace(out);
206: throw new BufferUnderflowException(
207: "Caused by InterruptedException: "
208: + out.toString());
209: }
210: }
211: return getBuffer().remove();
212: }
213: }
214:
215: /**
216: * Removes the next value from the buffer, waiting until an object is
217: * added for up to the specified timeout value if the buffer is empty.
218: *
219: * @param timeout the timeout value in milliseconds
220: * @throws BufferUnderflowException if an interrupt is received
221: * @throws BufferUnderflowException if the timeout expires
222: * @since Commons Collections 3.2
223: */
224: public Object remove(final long timeout) {
225: synchronized (lock) {
226: final long expiration = System.currentTimeMillis()
227: + timeout;
228: long timeLeft = expiration - System.currentTimeMillis();
229: while (timeLeft > 0 && collection.isEmpty()) {
230: try {
231: lock.wait(timeLeft);
232: timeLeft = expiration - System.currentTimeMillis();
233: } catch (InterruptedException e) {
234: PrintWriter out = new PrintWriter(
235: new StringWriter());
236: e.printStackTrace(out);
237: throw new BufferUnderflowException(
238: "Caused by InterruptedException: "
239: + out.toString());
240: }
241: }
242: if (collection.isEmpty()) {
243: throw new BufferUnderflowException("Timeout expired");
244: }
245: return getBuffer().remove();
246: }
247: }
248:
249: }
|