001: /*
002: File: SemaphoreControlledChannel.java
003:
004: Originally written by Doug Lea and released into the public domain.
005: This may be used for any purposes whatsoever without acknowledgment.
006: Thanks for the assistance and support of Sun Microsystems Labs,
007: and everyone contributing, testing, and using this code.
008:
009: History:
010: Date Who What
011: 16Jun1998 dl Create public version
012: 5Aug1998 dl replaced int counters with longs
013: 08dec2001 dl reflective constructor now uses longs too.
014: */
015:
016: package org.dbunit.util.concurrent;
017:
018: import org.slf4j.Logger;
019: import org.slf4j.LoggerFactory;
020:
021: import java.lang.reflect.Constructor;
022: import java.lang.reflect.InvocationTargetException;
023:
024: /**
025: * Abstract class for channels that use Semaphores to
026: * control puts and takes.
027: * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
028: **/
029:
030: public abstract class SemaphoreControlledChannel implements
031: BoundedChannel {
032:
033: /**
034: * Logger for this class
035: */
036: private static final Logger logger = LoggerFactory
037: .getLogger(SemaphoreControlledChannel.class);
038:
039: protected final Semaphore putGuard_;
040: protected final Semaphore takeGuard_;
041: protected int capacity_;
042:
043: /**
044: * Create a channel with the given capacity and default
045: * semaphore implementation
046: * @exception IllegalArgumentException if capacity less or equal to zero
047: **/
048:
049: public SemaphoreControlledChannel(int capacity)
050: throws IllegalArgumentException {
051: if (capacity <= 0)
052: throw new IllegalArgumentException();
053: capacity_ = capacity;
054: putGuard_ = new Semaphore(capacity);
055: takeGuard_ = new Semaphore(0);
056: }
057:
058: /**
059: * Create a channel with the given capacity and
060: * semaphore implementations instantiated from the supplied class
061: * @exception IllegalArgumentException if capacity less or equal to zero.
062: * @exception NoSuchMethodException If class does not have constructor
063: * that intializes permits
064: * @exception SecurityException if constructor information
065: * not accessible
066: * @exception InstantiationException if semaphore class is abstract
067: * @exception IllegalAccessException if constructor cannot be called
068: * @exception InvocationTargetException if semaphore constructor throws an
069: * exception
070: **/
071: public SemaphoreControlledChannel(int capacity, Class semaphoreClass)
072: throws IllegalArgumentException, NoSuchMethodException,
073: SecurityException, InstantiationException,
074: IllegalAccessException, InvocationTargetException {
075: if (capacity <= 0)
076: throw new IllegalArgumentException();
077: capacity_ = capacity;
078: Class[] longarg = { Long.TYPE };
079: Constructor ctor = semaphoreClass
080: .getDeclaredConstructor(longarg);
081: Long[] cap = { new Long(capacity) };
082: putGuard_ = (Semaphore) (ctor.newInstance(cap));
083: Long[] zero = { new Long(0) };
084: takeGuard_ = (Semaphore) (ctor.newInstance(zero));
085: }
086:
087: public int capacity() {
088: logger.debug("capacity() - start");
089: return capacity_;
090: }
091:
092: /**
093: * Return the number of elements in the buffer.
094: * This is only a snapshot value, that may change
095: * immediately after returning.
096: **/
097:
098: public int size() {
099: logger.debug("size() - start");
100: return (int) (takeGuard_.permits());
101: }
102:
103: /**
104: * Internal mechanics of put.
105: **/
106: protected abstract void insert(Object x);
107:
108: /**
109: * Internal mechanics of take.
110: **/
111: protected abstract Object extract();
112:
113: public void put(Object x) throws InterruptedException {
114: logger.debug("put(x=" + x + ") - start");
115:
116: if (x == null)
117: throw new IllegalArgumentException();
118: if (Thread.interrupted())
119: throw new InterruptedException();
120: putGuard_.acquire();
121: try {
122: insert(x);
123: takeGuard_.release();
124: } catch (ClassCastException ex) {
125: logger.error("put()", ex);
126:
127: putGuard_.release();
128: throw ex;
129: }
130: }
131:
132: public boolean offer(Object x, long msecs)
133: throws InterruptedException {
134: logger.debug("offer(x=" + x + ", msecs=" + msecs + ") - start");
135:
136: if (x == null)
137: throw new IllegalArgumentException();
138: if (Thread.interrupted())
139: throw new InterruptedException();
140: if (!putGuard_.attempt(msecs))
141: return false;
142: else {
143: try {
144: insert(x);
145: takeGuard_.release();
146: return true;
147: } catch (ClassCastException ex) {
148: logger.error("offer()", ex);
149:
150: putGuard_.release();
151: throw ex;
152: }
153: }
154: }
155:
156: public Object take() throws InterruptedException {
157: logger.debug("take() - start");
158:
159: if (Thread.interrupted())
160: throw new InterruptedException();
161: takeGuard_.acquire();
162: try {
163: Object x = extract();
164: putGuard_.release();
165: return x;
166: } catch (ClassCastException ex) {
167: logger.error("take()", ex);
168:
169: takeGuard_.release();
170: throw ex;
171: }
172: }
173:
174: public Object poll(long msecs) throws InterruptedException {
175: logger.debug("poll(msecs=" + msecs + ") - start");
176:
177: if (Thread.interrupted())
178: throw new InterruptedException();
179: if (!takeGuard_.attempt(msecs))
180: return null;
181: else {
182: try {
183: Object x = extract();
184: putGuard_.release();
185: return x;
186: } catch (ClassCastException ex) {
187: logger.error("poll()", ex);
188:
189: takeGuard_.release();
190: throw ex;
191: }
192: }
193: }
194:
195: }
|