001: /*
002: * All content copyright (c) 2003-2007 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tcclient.util.concurrent.locks;
006:
007: import com.tc.exception.TCRuntimeException;
008: import com.tc.object.bytecode.ManagerUtil;
009: import com.tc.object.lockmanager.api.LockLevel;
010: import com.tc.util.UnsafeUtil;
011: import com.tc.util.concurrent.locks.TCLock;
012:
013: import java.util.ArrayList;
014: import java.util.Collection;
015: import java.util.Date;
016: import java.util.HashMap;
017: import java.util.List;
018: import java.util.Map;
019: import java.util.concurrent.TimeUnit;
020: import java.util.concurrent.locks.Condition;
021: import java.util.concurrent.locks.Lock;
022:
023: public class ConditionObject implements Condition, java.io.Serializable {
024: private transient List waitingThreads;
025: private transient int numOfWaitingThreards;
026: private transient Map waitOnUnshared;
027:
028: private final Lock originalLock;
029: private final SyncCondition realCondition;
030:
031: private static long getSystemNanos() {
032: return System.nanoTime();
033: }
034:
035: public ConditionObject(TCLock originalLock) {
036: this .originalLock = originalLock;
037: this .realCondition = new SyncCondition();
038: this .waitingThreads = new ArrayList();
039: this .numOfWaitingThreards = 0;
040: this .waitOnUnshared = new HashMap();
041: }
042:
043: public ConditionObject() {
044: this .originalLock = null;
045: this .realCondition = null;
046: this .waitingThreads = new ArrayList();
047: this .numOfWaitingThreards = 0;
048: this .waitOnUnshared = new HashMap();
049: }
050:
051: private void fullRelease() {
052: while (((TCLock) originalLock).localHeldCount() > 0) {
053: originalLock.unlock();
054: }
055: }
056:
057: private void reacquireLock(int numOfHolds) {
058: if (((TCLock) originalLock).localHeldCount() >= numOfHolds) {
059: return;
060: }
061: while (((TCLock) originalLock).localHeldCount() < numOfHolds) {
062: originalLock.lock();
063: }
064: }
065:
066: private void checkCauseAndThrowInterruptedExceptionIfNecessary(
067: TCRuntimeException e) throws InterruptedException {
068: if (e.getCause() instanceof InterruptedException) {
069: throw (InterruptedException) e.getCause();
070: } else {
071: throw e;
072: }
073: }
074:
075: private void addWaitOnUnshared() {
076: waitOnUnshared.put(Thread.currentThread(), ManagerUtil
077: .isManaged(realCondition) ? Boolean.FALSE
078: : Boolean.TRUE);
079: }
080:
081: private boolean isLockRealConditionInUnshared() {
082: if (!ManagerUtil.isManaged(realCondition)
083: || !ManagerUtil.isHeldByCurrentThread(realCondition,
084: LockLevel.WRITE)) {
085: return true;
086: }
087: return false;
088: }
089:
090: public void await() throws InterruptedException {
091: Thread currentThread = Thread.currentThread();
092:
093: if (!((TCLock) originalLock).isHeldByCurrentThread()) {
094: throw new IllegalMonitorStateException();
095: }
096: if (Thread.interrupted()) {
097: throw new InterruptedException();
098: }
099:
100: int numOfHolds = ((TCLock) originalLock).localHeldCount();
101:
102: realCondition.incrementVersionIfSignalled();
103: int version = realCondition.getVersion();
104: fullRelease();
105: try {
106: ManagerUtil.monitorEnter(realCondition, LockLevel.WRITE);
107: UnsafeUtil.monitorEnter(realCondition);
108: boolean isLockInUnshared = isLockRealConditionInUnshared();
109: try {
110: if (realCondition.hasNotSignalledOnVersion(version)) {
111: waitingThreads.add(currentThread);
112: numOfWaitingThreards++;
113:
114: addWaitOnUnshared();
115: try {
116: ManagerUtil.objectWait0(realCondition);
117: } finally {
118: waitOnUnshared.remove(currentThread);
119: waitingThreads.remove(currentThread);
120: numOfWaitingThreards--;
121: }
122: }
123: } finally {
124: UnsafeUtil.monitorExit(realCondition);
125: if (!isLockInUnshared) {
126: ManagerUtil.monitorExit(realCondition);
127: }
128: }
129: } catch (TCRuntimeException e) {
130: checkCauseAndThrowInterruptedExceptionIfNecessary(e);
131: } finally {
132: reacquireLock(numOfHolds);
133: }
134: }
135:
136: public void awaitUninterruptibly() {
137: Thread currentThread = Thread.currentThread();
138:
139: if (!((TCLock) originalLock).isHeldByCurrentThread()) {
140: throw new IllegalMonitorStateException();
141: }
142:
143: int numOfHolds = ((TCLock) originalLock).localHeldCount();
144: boolean isInterrupted = false;
145: realCondition.incrementVersionIfSignalled();
146: int version = realCondition.getVersion();
147: fullRelease();
148: try {
149: ManagerUtil.monitorEnter(realCondition, LockLevel.WRITE);
150: UnsafeUtil.monitorEnter(realCondition);
151: boolean isLockInUnshared = isLockRealConditionInUnshared();
152: try {
153: if (realCondition.hasNotSignalledOnVersion(version)) {
154: while (true) {
155: waitingThreads.add(currentThread);
156: numOfWaitingThreards++;
157:
158: addWaitOnUnshared();
159: try {
160: ManagerUtil.objectWait0(realCondition);
161: break;
162: } catch (InterruptedException e) {
163: isInterrupted = true;
164: } finally {
165: waitOnUnshared.remove(currentThread);
166: waitingThreads.remove(currentThread);
167: numOfWaitingThreards--;
168: }
169: }
170: }
171: } finally {
172: UnsafeUtil.monitorExit(realCondition);
173: if (!isLockInUnshared) {
174: ManagerUtil.monitorExit(realCondition);
175: }
176: }
177: } finally {
178: reacquireLock(numOfHolds);
179: }
180:
181: if (isInterrupted) {
182: currentThread.interrupt();
183: }
184: }
185:
186: public long awaitNanos(long nanosTimeout)
187: throws InterruptedException {
188: Thread currentThread = Thread.currentThread();
189:
190: if (!((TCLock) originalLock).isHeldByCurrentThread()) {
191: throw new IllegalMonitorStateException();
192: }
193: if (Thread.interrupted()) {
194: throw new InterruptedException();
195: }
196:
197: int numOfHolds = ((TCLock) originalLock).localHeldCount();
198: realCondition.incrementVersionIfSignalled();
199: int version = realCondition.getVersion();
200: fullRelease();
201: try {
202: ManagerUtil.monitorEnter(realCondition, LockLevel.WRITE);
203: UnsafeUtil.monitorEnter(realCondition);
204: boolean isLockInUnshared = isLockRealConditionInUnshared();
205: try {
206: if (realCondition.hasNotSignalledOnVersion(version)) {
207: waitingThreads.add(currentThread);
208: numOfWaitingThreards++;
209:
210: addWaitOnUnshared();
211: try {
212: long startTime = getSystemNanos();
213: TimeUnit.NANOSECONDS.timedWait(realCondition,
214: nanosTimeout);
215: long remainingTime = nanosTimeout
216: - (getSystemNanos() - startTime);
217: return remainingTime;
218: } finally {
219: waitOnUnshared.remove(currentThread);
220: waitingThreads.remove(currentThread);
221: numOfWaitingThreards--;
222: }
223: } else {
224: return nanosTimeout;
225: }
226: } finally {
227: UnsafeUtil.monitorExit(realCondition);
228: if (!isLockInUnshared) {
229: ManagerUtil.monitorExit(realCondition);
230: }
231: }
232: } catch (TCRuntimeException e) {
233: checkCauseAndThrowInterruptedExceptionIfNecessary(e);
234: return 0;
235: } finally {
236: reacquireLock(numOfHolds);
237: }
238: }
239:
240: public boolean await(long time, TimeUnit unit)
241: throws InterruptedException {
242: if (unit == null) {
243: throw new NullPointerException();
244: }
245: return awaitNanos(unit.toNanos(time)) > 0;
246: }
247:
248: public boolean awaitUntil(Date deadline)
249: throws InterruptedException {
250: if (deadline == null) {
251: throw new NullPointerException();
252: }
253:
254: long abstime = deadline.getTime();
255: if (System.currentTimeMillis() > abstime) {
256: return true;
257: }
258: return !await(abstime - System.currentTimeMillis(),
259: TimeUnit.MILLISECONDS);
260: }
261:
262: private boolean hasWaitOnUnshared() {
263: return waitOnUnshared.values().contains(Boolean.TRUE);
264: }
265:
266: public void signal() {
267: if (!((TCLock) originalLock).isHeldByCurrentThread()) {
268: throw new IllegalMonitorStateException();
269: }
270:
271: ManagerUtil.monitorEnter(realCondition, LockLevel.WRITE);
272: UnsafeUtil.monitorEnter(realCondition);
273: boolean isLockInUnshared = isLockRealConditionInUnshared();
274: try {
275: ManagerUtil.objectNotify(realCondition);
276: if (hasWaitOnUnshared()) {
277: realCondition.notify();
278: }
279: realCondition.setSignalled();
280: } finally {
281: UnsafeUtil.monitorExit(realCondition);
282: if (!isLockInUnshared) {
283: ManagerUtil.monitorExit(realCondition);
284: }
285: }
286: }
287:
288: public void signalAll() {
289: if (!((TCLock) originalLock).isHeldByCurrentThread()) {
290: throw new IllegalMonitorStateException();
291: }
292:
293: ManagerUtil.monitorEnter(realCondition, LockLevel.WRITE);
294: UnsafeUtil.monitorEnter(realCondition);
295: boolean isLockInUnshared = isLockRealConditionInUnshared();
296: try {
297: ManagerUtil.objectNotifyAll(realCondition);
298: if (hasWaitOnUnshared()) {
299: realCondition.notifyAll();
300: }
301: realCondition.setSignalled();
302: } finally {
303: UnsafeUtil.monitorExit(realCondition);
304: if (!isLockInUnshared) {
305: ManagerUtil.monitorExit(realCondition);
306: }
307: }
308: }
309:
310: public int getWaitQueueLength(Lock lock) {
311: if (originalLock != lock)
312: throw new IllegalArgumentException("not owner");
313: if (!ManagerUtil.isManaged(originalLock)) {
314: return numOfWaitingThreards;
315: } else {
316: return ManagerUtil.waitLength(realCondition);
317: }
318: }
319:
320: public Collection getWaitingThreads(Lock lock) {
321: if (originalLock != lock)
322: throw new IllegalArgumentException("not owner");
323: return waitingThreads;
324: }
325:
326: private void readObject(java.io.ObjectInputStream s)
327: throws java.io.IOException, ClassNotFoundException {
328: s.defaultReadObject();
329: this .waitingThreads = new ArrayList();
330: this .numOfWaitingThreards = 0;
331: this .waitOnUnshared = new HashMap();
332: }
333:
334: private void writeObject(java.io.ObjectOutputStream s)
335: throws java.io.IOException {
336: s.defaultWriteObject();
337: }
338:
339: public static class SyncCondition implements java.io.Serializable {
340: private final static byte SIGNALLED = 0;
341: private final static byte NOT_SIGNALLED = 1;
342:
343: private int version;
344: private byte signalled;
345:
346: public SyncCondition() {
347: super ();
348: this .version = 0;
349: this .signalled = NOT_SIGNALLED;
350: }
351:
352: public boolean isSignalled() {
353: return signalled == SIGNALLED;
354: }
355:
356: public void setSignalled() {
357: signalled = SIGNALLED;
358: }
359:
360: public void incrementVersionIfSignalled() {
361: if (isSignalled()) {
362: this .version++;
363: resetSignalled();
364: }
365: }
366:
367: public int getVersion() {
368: return this .version;
369: }
370:
371: public boolean hasNotSignalledOnVersion(int targetVersion) {
372: return !isSignalled() && (this .version == targetVersion);
373: }
374:
375: private void resetSignalled() {
376: this.signalled = NOT_SIGNALLED;
377: }
378: }
379:
380: }
|