001: /*
002: File: SemaphoreControlledChannel.java
003:
004: Originally written by Doug Lea and released into the public domain.
005: This may be used for any purposes whatsoever without acknowledgment.
006: Thanks for the assistance and support of Sun Microsystems Labs,
007: and everyone contributing, testing, and using this code.
008:
009: History:
010: Date Who What
011: 16Jun1998 dl Create public version
012: 5Aug1998 dl replaced int counters with longs
013: 08dec2001 dl reflective constructor now uses longs too.
014: */
015:
016: package EDU.oswego.cs.dl.util.concurrent;
017:
018: import java.lang.reflect.*;
019:
020: /**
021: * Abstract class for channels that use Semaphores to
022: * control puts and takes.
023: * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
024: **/
025:
026: public abstract class SemaphoreControlledChannel implements
027: BoundedChannel {
028: protected final Semaphore putGuard_;
029: protected final Semaphore takeGuard_;
030: protected int capacity_;
031:
032: /**
033: * Create a channel with the given capacity and default
034: * semaphore implementation
035: * @exception IllegalArgumentException if capacity less or equal to zero
036: **/
037:
038: public SemaphoreControlledChannel(int capacity)
039: throws IllegalArgumentException {
040: if (capacity <= 0)
041: throw new IllegalArgumentException();
042: capacity_ = capacity;
043: putGuard_ = new Semaphore(capacity);
044: takeGuard_ = new Semaphore(0);
045: }
046:
047: /**
048: * Create a channel with the given capacity and
049: * semaphore implementations instantiated from the supplied class
050: * @exception IllegalArgumentException if capacity less or equal to zero.
051: * @exception NoSuchMethodException If class does not have constructor
052: * that intializes permits
053: * @exception SecurityException if constructor information
054: * not accessible
055: * @exception InstantiationException if semaphore class is abstract
056: * @exception IllegalAccessException if constructor cannot be called
057: * @exception InvocationTargetException if semaphore constructor throws an
058: * exception
059: **/
060: public SemaphoreControlledChannel(int capacity, Class semaphoreClass)
061: throws IllegalArgumentException, NoSuchMethodException,
062: SecurityException, InstantiationException,
063: IllegalAccessException, InvocationTargetException {
064: if (capacity <= 0)
065: throw new IllegalArgumentException();
066: capacity_ = capacity;
067: Class[] longarg = { Long.TYPE };
068: Constructor ctor = semaphoreClass
069: .getDeclaredConstructor(longarg);
070: Long[] cap = { new Long(capacity) };
071: putGuard_ = (Semaphore) (ctor.newInstance(cap));
072: Long[] zero = { new Long(0) };
073: takeGuard_ = (Semaphore) (ctor.newInstance(zero));
074: }
075:
076: public int capacity() {
077: return capacity_;
078: }
079:
080: /**
081: * Return the number of elements in the buffer.
082: * This is only a snapshot value, that may change
083: * immediately after returning.
084: **/
085:
086: public int size() {
087: return (int) (takeGuard_.permits());
088: }
089:
090: /**
091: * Internal mechanics of put.
092: **/
093: protected abstract void insert(Object x);
094:
095: /**
096: * Internal mechanics of take.
097: **/
098: protected abstract Object extract();
099:
100: public void put(Object x) throws InterruptedException {
101: if (x == null)
102: throw new IllegalArgumentException();
103: if (Thread.interrupted())
104: throw new InterruptedException();
105: putGuard_.acquire();
106: try {
107: insert(x);
108: takeGuard_.release();
109: } catch (ClassCastException ex) {
110: putGuard_.release();
111: throw ex;
112: }
113: }
114:
115: public boolean offer(Object x, long msecs)
116: throws InterruptedException {
117: if (x == null)
118: throw new IllegalArgumentException();
119: if (Thread.interrupted())
120: throw new InterruptedException();
121: if (!putGuard_.attempt(msecs))
122: return false;
123: else {
124: try {
125: insert(x);
126: takeGuard_.release();
127: return true;
128: } catch (ClassCastException ex) {
129: putGuard_.release();
130: throw ex;
131: }
132: }
133: }
134:
135: public Object take() throws InterruptedException {
136: if (Thread.interrupted())
137: throw new InterruptedException();
138: takeGuard_.acquire();
139: try {
140: Object x = extract();
141: putGuard_.release();
142: return x;
143: } catch (ClassCastException ex) {
144: takeGuard_.release();
145: throw ex;
146: }
147: }
148:
149: public Object poll(long msecs) throws InterruptedException {
150: if (Thread.interrupted())
151: throw new InterruptedException();
152: if (!takeGuard_.attempt(msecs))
153: return null;
154: else {
155: try {
156: Object x = extract();
157: putGuard_.release();
158: return x;
159: } catch (ClassCastException ex) {
160: takeGuard_.release();
161: throw ex;
162: }
163: }
164: }
165:
166: }
|