001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright notice. All rights reserved.
003: */
004: package java.util.concurrent;
005:
006: import java.util.concurrent.locks.Condition;
007: import java.util.concurrent.locks.ReentrantLock;
008:
009: public class FutureTaskTC implements Future, Runnable {
010: private final Sync sync;
011:
012: public FutureTaskTC(Callable callable) {
013: if (callable == null)
014: throw new NullPointerException();
015: sync = new Sync(callable);
016: }
017:
018: public FutureTaskTC(Runnable runnable, Object result) {
019: sync = new Sync(Executors.callable(runnable, result));
020: }
021:
022: public boolean isCancelled() {
023: return sync.innerIsCancelled();
024: }
025:
026: public boolean isDone() {
027: return sync.innerIsDone();
028: }
029:
030: public boolean cancel(boolean mayInterruptIfRunning) {
031: return sync.innerCancel(mayInterruptIfRunning);
032: }
033:
034: public Object get() throws InterruptedException, ExecutionException {
035: return sync.innerGet();
036: }
037:
038: public Object get(long timeout, TimeUnit unit)
039: throws InterruptedException, ExecutionException,
040: TimeoutException {
041: return sync.innerGet(unit.toNanos(timeout));
042: }
043:
044: protected void done() {
045: //
046: }
047:
048: protected void set(Object v) {
049: sync.innerSet(v);
050: }
051:
052: protected void setException(Throwable t) {
053: sync.innerSetException(t);
054: }
055:
056: public void run() {
057: sync.innerRun();
058: }
059:
060: protected boolean runAndReset() {
061: return sync.innerRunAndReset();
062: }
063:
064: private final class Sync {
065: private static final int RUNNING = 1;
066: private static final int RAN = 2;
067: private static final int CANCELLED = 4;
068:
069: private final Callable callable;
070: private Object result;
071: private Throwable exception;
072:
073: private int state;
074: private final ReentrantLock lock;
075: private final Condition ran;
076:
077: private transient volatile Thread runner;
078: private Object proxyRunner;
079:
080: Sync(Callable callable) {
081: this .callable = callable;
082: lock = new ReentrantLock();
083: ran = lock.newCondition();
084: }
085:
086: Sync() {
087: callable = null;
088: lock = null;
089: ran = null;
090: }
091:
092: private boolean ranOrCancelled(int state) {
093: return (state & (RAN | CANCELLED)) != 0;
094: }
095:
096: private int tryAcquireShared() {
097: return innerIsDone() ? 1 : -1;
098: }
099:
100: protected boolean tryReleaseShared(int ignore) {
101: managedTryReleaseShared();
102: return true;
103: }
104:
105: private boolean managedTryReleaseShared() {
106: runner = null;
107: proxyRunner = null;
108: ran.signalAll();
109: return true;
110: }
111:
112: private final void setState(int state) {
113: this .state = state;
114: }
115:
116: private final int getSynchronizedState() {
117: lock.lock();
118: try {
119: return state;
120: } finally {
121: lock.unlock();
122: }
123: }
124:
125: private final boolean compareAndSetState(int expected,
126: int newValue) {
127: lock.lock();
128: try {
129: int s = state;
130: if (s == expected) {
131: setState(newValue);
132: return true;
133: }
134: } finally {
135: lock.unlock();
136: }
137: return false;
138: }
139:
140: boolean innerIsCancelled() {
141: return getSynchronizedState() == CANCELLED;
142: }
143:
144: boolean innerIsDone() {
145: lock.lock();
146: try {
147: return ranOrCancelled(state) && proxyRunner == null;
148: } finally {
149: lock.unlock();
150: }
151: }
152:
153: Object innerGet() throws InterruptedException,
154: ExecutionException {
155: lock.lock();
156: try {
157: while (tryAcquireShared() < 0) {
158: ran.await();
159: }
160: } finally {
161: lock.unlock();
162: }
163:
164: if (getSynchronizedState() == CANCELLED)
165: throw new CancellationException();
166: if (exception != null)
167: throw new ExecutionException(exception);
168: return result;
169: }
170:
171: Object innerGet(long nanosTimeout) throws InterruptedException,
172: ExecutionException, TimeoutException {
173: lock.lock();
174: try {
175: long startTime = System.nanoTime();
176: while ((tryAcquireShared() < 0) && (nanosTimeout > 0)) {
177: ran.await(nanosTimeout, TimeUnit.NANOSECONDS);
178: long endTime = System.nanoTime();
179: nanosTimeout -= (endTime - startTime);
180: }
181: if (tryAcquireShared() < 0) {
182: throw new TimeoutException();
183: }
184: } finally {
185: lock.unlock();
186: }
187:
188: if (getSynchronizedState() == CANCELLED)
189: throw new CancellationException();
190: if (exception != null)
191: throw new ExecutionException(exception);
192: return result;
193: }
194:
195: void innerSet(Object v) {
196: lock.lock();
197: try {
198: managedInnerSet(v);
199: } finally {
200: lock.unlock();
201: }
202: }
203:
204: private void managedInnerSet(Object v) {
205: int s = state;
206: if (ranOrCancelled(s))
207: return;
208: setState(RAN);
209: result = v;
210: managedTryReleaseShared();
211: done();
212: }
213:
214: void innerSetException(Throwable t) {
215: lock.lock();
216: try {
217: int s = state;
218: if (ranOrCancelled(s))
219: return;
220: setState(RAN);
221: exception = t;
222: result = null;
223: managedTryReleaseShared();
224: done();
225: } finally {
226: lock.unlock();
227: }
228: }
229:
230: private void managedInnerCancel() {
231: Thread r = null;
232: lock.lock();
233: try {
234: r = runner;
235: } finally {
236: lock.unlock();
237: }
238: if (r != null) {
239: r.interrupt();
240: }
241: }
242:
243: boolean innerCancel(boolean mayInterruptIfRunning) {
244: lock.lock();
245: try {
246: int s = state;
247: if (ranOrCancelled(s))
248: return false;
249: setState(CANCELLED);
250: } finally {
251: lock.unlock();
252: }
253: lock.lock();
254: try {
255: if (mayInterruptIfRunning) {
256: managedInnerCancel();
257: }
258: tryReleaseShared(0);
259: } finally {
260: lock.unlock();
261: }
262: done();
263: return true;
264: }
265:
266: void innerRun() {
267: if (!compareAndSetState(0, RUNNING))
268: return;
269:
270: try {
271: boolean isRunning = false;
272: lock.lock();
273: try {
274: isRunning = state == RUNNING;
275: if (isRunning) {
276: runner = Thread.currentThread();
277: proxyRunner = runner.toString();
278: }
279: } finally {
280: lock.unlock();
281: }
282: if (isRunning) {
283: Object o = callable.call();
284: lock.lock();
285: try {
286: managedInnerSet(o);
287: } finally {
288: lock.unlock();
289: }
290: } else {
291: lock.lock();
292: try {
293: managedTryReleaseShared();
294: } finally {
295: lock.unlock();
296: }
297: }
298: } catch (Throwable ex) {
299: innerSetException(ex);
300: }
301: }
302:
303: boolean innerRunAndReset() {
304: if (!compareAndSetState(0, RUNNING))
305: return false;
306: try {
307: runner = Thread.currentThread();
308: callable.call();
309: runner = null;
310: return compareAndSetState(RUNNING, 0);
311: } catch (Throwable ex) {
312: innerSetException(ex);
313: return false;
314: }
315: }
316: }
317: }
|