0001: /*
0002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
0003: * notice. All rights reserved.
0004: */
0005: package com.tc.object.lockmanager.impl;
0006:
0007: import com.tc.exception.TCLockUpgradeNotSupportedError;
0008: import com.tc.exception.TCRuntimeException;
0009: import com.tc.logging.TCLogger;
0010: import com.tc.logging.TCLogging;
0011: import com.tc.management.ClientLockStatManager;
0012: import com.tc.object.lockmanager.api.LockFlushCallback;
0013: import com.tc.object.lockmanager.api.LockID;
0014: import com.tc.object.lockmanager.api.LockLevel;
0015: import com.tc.object.lockmanager.api.LockNotPendingError;
0016: import com.tc.object.lockmanager.api.LockRequest;
0017: import com.tc.object.lockmanager.api.Notify;
0018: import com.tc.object.lockmanager.api.RemoteLockManager;
0019: import com.tc.object.lockmanager.api.ThreadID;
0020: import com.tc.object.lockmanager.api.TryLockRequest;
0021: import com.tc.object.lockmanager.api.WaitListener;
0022: import com.tc.object.lockmanager.api.WaitLockRequest;
0023: import com.tc.object.lockmanager.api.WaitTimer;
0024: import com.tc.object.lockmanager.api.WaitTimerCallback;
0025: import com.tc.object.tx.WaitInvocation;
0026: import com.tc.util.Assert;
0027: import com.tc.util.State;
0028: import com.tc.util.TCAssertionError;
0029: import com.tc.util.Util;
0030:
0031: import gnu.trove.TIntIntHashMap;
0032: import gnu.trove.TIntStack;
0033:
0034: import java.util.ArrayList;
0035: import java.util.Collection;
0036: import java.util.Collections;
0037: import java.util.HashMap;
0038: import java.util.HashSet;
0039: import java.util.Iterator;
0040: import java.util.LinkedHashMap;
0041: import java.util.List;
0042: import java.util.Map;
0043: import java.util.Set;
0044: import java.util.TimerTask;
0045: import java.util.Map.Entry;
0046:
0047: class ClientLock implements WaitTimerCallback, LockFlushCallback {
0048:
0049: private static final TCLogger logger = TCLogging
0050: .getLogger(ClientLock.class);
0051:
0052: private static final State RUNNING = new State("RUNNING");
0053: private static final State PAUSED = new State("PAUSED");
0054:
0055: private final Map holders = Collections
0056: .synchronizedMap(new HashMap());
0057: private final Set rejectedLockRequesterIDs = new HashSet();
0058: private final LockID lockID;
0059: private final Map waitLocksByRequesterID = new HashMap();
0060: private final Map pendingLockRequests = new LinkedHashMap();
0061: private final Map waitTimers = new HashMap();
0062: private final RemoteLockManager remoteLockManager;
0063: private final WaitTimer waitTimer;
0064:
0065: private final Greediness greediness = new Greediness();
0066: private int useCount = 0;
0067: private volatile State state = RUNNING;
0068: private long timeUsed = System.currentTimeMillis();
0069: private final ClientLockStatManager lockStatManager;
0070:
0071: ClientLock(LockID lockID, RemoteLockManager remoteLockManager,
0072: WaitTimer waitTimer, ClientLockStatManager lockStatManager) {
0073: Assert.assertNotNull(lockID);
0074: this .lockID = lockID;
0075: this .remoteLockManager = remoteLockManager;
0076: this .waitTimer = waitTimer;
0077: this .lockStatManager = lockStatManager;
0078: }
0079:
0080: private void recordStatIfEnabled() {
0081: if (lockStatManager.isStatEnabled(lockID)) {
0082: lockStatManager.recordStackTrace(lockID);
0083: }
0084: }
0085:
0086: boolean tryLock(ThreadID threadID, WaitInvocation timeout, int type) {
0087: lock(threadID, type, timeout, true);
0088: return isHeldBy(threadID, type);
0089: }
0090:
0091: public void lock(ThreadID threadID, int type) {
0092: lock(threadID, type, null, false);
0093: }
0094:
0095: private void lock(ThreadID threadID, int type,
0096: WaitInvocation timeout, boolean noBlock) {
0097: int lockType = type;
0098: if (LockLevel.isSynchronous(type)) {
0099: if (!LockLevel.isSynchronousWrite(type)) {
0100: throw new AssertionError(
0101: "Only Synchronous WRITE lock is supported now");
0102: }
0103: lockType = LockLevel.WRITE;
0104: }
0105: basicLock(threadID, lockType, timeout, noBlock);
0106: if (lockType != type) {
0107: awardSynchronous(threadID, lockType);
0108: }
0109: }
0110:
0111: private void basicLock(ThreadID requesterID, int type,
0112: WaitInvocation timeout, boolean noBlock) {
0113: final Object waitLock;
0114: final Action action = new Action();
0115:
0116: synchronized (this ) {
0117: waitUntillRunning();
0118:
0119: recordStatIfEnabled();
0120: // if it is tryLock and is already being held by other thread of the same node, return
0121: // immediately.
0122: if (noBlock && isHeld() && !isHeldBy(requesterID)
0123: && !timeout.needsToWait()) {
0124: return;
0125: }
0126: // debug("lock - BEGIN - ", requesterID, LockLevel.toString(type));
0127: if (isHeldBy(requesterID)) {
0128: // deal with upgrades/downgrades on locks already held
0129: if (isConcurrentWriteLock(requesterID)) {
0130: // NOTE: when removing this restriction, there are other places to clean up:
0131: // 1) ClientTransactionManagerImpl.apply()
0132: // 2) DNAFactory.flushDNAFor(LockID)
0133: // 3) RemoteTransactionManagerImpl.commit()
0134: // 4) ClientLock.removeCurrent()
0135: throw new AssertionError(
0136: "Don't currently support nested concurrent write locks");
0137: }
0138:
0139: if (LockLevel.isWrite(type)
0140: && isHoldingReadLockExclusively(requesterID)) {
0141: // do not allow lock upgrade
0142: throw new TCLockUpgradeNotSupportedError();
0143: }
0144:
0145: if (isHeldBy(requesterID, LockLevel.WRITE)) {
0146: // if we already hold a WRITE lock, allow this transaction to have any lock
0147: award(requesterID, type);
0148: return;
0149: }
0150:
0151: if (LockLevel.isRead(type)
0152: && isHeldBy(requesterID, LockLevel.READ)) {
0153: // if re-requesting a read lock, we don't need to ask the server
0154: award(requesterID, type);
0155: return;
0156: }
0157: }
0158: if (LockLevel.isConcurrent(type)) {
0159: award(requesterID, type);
0160: return;
0161: }
0162:
0163: if (canAwardGreedilyNow(requesterID, type)) {
0164: award(requesterID, type);
0165: return;
0166: }
0167:
0168: // All other cases have to wait for some reason or the other
0169: waitLock = addToPendingLockRequest(requesterID, type,
0170: timeout, noBlock);
0171: if (greediness.isNotGreedy()) {
0172: // debug("lock - remote requestLock ", requesterID, LockLevel.toString(type));
0173: if (noBlock) {
0174: remoteLockManager.tryRequestLock(lockID,
0175: requesterID, timeout, type);
0176: } else {
0177: remoteLockManager.requestLock(lockID, requesterID,
0178: type);
0179: }
0180: } else {
0181: // If the lock already granted to another thread greedily within the same JVM and if
0182: // it is a tryLock request with a timeout, schedule a local timer.
0183: if (noBlock && timeout.needsToWait()) {
0184: scheduleWaitForTryLock(requesterID, type, timeout);
0185: }
0186:
0187: if (isGreedyRecallNeeded(requesterID, type)) {
0188: // XXX::Greedy upgrades are not done for a reason.
0189: // debug("lock - calling RECALL ", requesterID, LockLevel.toString(type));
0190: greediness.recall(type);
0191: }
0192: if (canProceedWithRecall()) {
0193: greediness.startRecallCommit();
0194: action.addAction(Action.RECALL_COMMIT);
0195: }
0196: }
0197: }
0198: if (action.doRecallCommit()) {
0199: // debug("lock - calling RECALL Commit ", requesterID, LockLevel.toString(type));
0200: flush();
0201: recallCommit();
0202: }
0203:
0204: boolean isInterrupted = false;
0205: if (noBlock) {
0206: isInterrupted = waitForTryLock(requesterID, waitLock);
0207: } else {
0208: isInterrupted = waitForLock(requesterID, type, waitLock);
0209: }
0210: Util.selfInterruptIfNeeded(isInterrupted);
0211: // debug("lock - GOT IT - ", requesterID, LockLevel.toString(type));
0212: }
0213:
0214: /*
0215: * @returns true if the greedy lock should be let go.
0216: */
0217: private synchronized boolean isGreedyRecallNeeded(
0218: ThreadID threadID, int level) {
0219: if (greediness.isGreedy()) {
0220: // We let the lock recalled if the request is for WRITE and we hold a Greedy READ
0221: if (LockLevel.isWrite(level) && greediness.isReadOnly()) {
0222: return true;
0223: }
0224: }
0225: return false;
0226: }
0227:
0228: public void unlock(ThreadID threadID) {
0229: Action action;
0230: boolean changed;
0231:
0232: do {
0233: changed = false;
0234: // Examine
0235: synchronized (this ) {
0236: waitUntillRunning();
0237: // debug("unlock - BEGIN - ", id);
0238: recordStatIfEnabled();
0239: action = unlockAction(threadID);
0240: }
0241:
0242: // Flush
0243: if (action.doRemoteLockRequest() || action.doRecallCommit()
0244: || action.doSynchronousCommit()) {
0245: // debug("unlock - flush - ", id);
0246: flush();
0247: }
0248:
0249: // modify and send
0250: synchronized (this ) {
0251: // check to see if the lock state has changed in anyway
0252: Action newAction = unlockAction(threadID);
0253: if (action.equals(newAction)) {
0254: removeCurrent(threadID);
0255: if (action.doAwardGreedyLocks()) {
0256: awardLocksGreedily();
0257: } else if (action.doRecallCommit()) {
0258: greediness.startRecallCommit();
0259: // debug("unlock - calling RECALL Commit ", id);
0260: recallCommit();
0261: } else if (action.doRemoteLockRequest()) {
0262: remoteLockManager.releaseLock(lockID, threadID);
0263: }
0264: } else {
0265: // try again
0266: changed = true;
0267: logger.debug(lockID + " :: unlock() : " + threadID
0268: + " STATE CHANGED - From = " + action
0269: + " To = " + newAction + " - retrying ...");
0270: }
0271: }
0272: } while (changed);
0273: }
0274:
0275: private Action unlockAction(ThreadID threadID) {
0276: final Action action = new Action();
0277: boolean remote = isRemoteUnlockRequired(threadID);
0278: if (greediness.isNotGreedy() && remote) {
0279: action.addAction(Action.REMOTE_LOCK_REQUEST);
0280: } else if (remote && canProceedWithRecall(threadID)) {
0281: // This is the last outstanding unlock, so sync with server.
0282: action.addAction(Action.RECALL_COMMIT);
0283: } else if (greediness.isGreedy()) {
0284: action.addAction(Action.AWARD_GREEDY_LOCKS);
0285: }
0286: if (isLockSynchronouslyHeld(threadID)) {
0287: action.addAction(Action.SYNCHRONOUS_COMMIT);
0288: }
0289: return action;
0290: }
0291:
0292: public void wait(ThreadID threadID, WaitInvocation call,
0293: Object waitLock, WaitListener listener)
0294: throws InterruptedException {
0295: Action action;
0296: boolean changed;
0297: int server_level = LockLevel.NIL_LOCK_LEVEL;
0298: if (listener == null) {
0299: throw new AssertionError("Null WaitListener passed.");
0300: }
0301:
0302: do {
0303: changed = false;
0304:
0305: // Examine
0306: synchronized (this ) {
0307: waitUntillRunning();
0308: recordStatIfEnabled();
0309: checkValidWaitNotifyState(threadID);
0310: action = waitAction(threadID);
0311: }
0312:
0313: // Flush
0314: if (action.doRemoteLockRequest() || action.doRecallCommit()
0315: || action.doSynchronousCommit()) {
0316: flush();
0317: }
0318:
0319: // modify and send
0320: synchronized (this ) {
0321: // check to see if the lock state has changed in anyway
0322: Action newAction = waitAction(threadID);
0323: if (action.equals(newAction)) {
0324: LockHold holder = (LockHold) this .holders
0325: .get(threadID);
0326: Assert.assertNotNull(holder);
0327: server_level = holder.goToWaitState();
0328:
0329: Object prev = waitLocksByRequesterID.put(threadID,
0330: waitLock);
0331: Assert.eval(prev == null);
0332:
0333: WaitLockRequest waitLockRequest = new WaitLockRequest(
0334: lockID, threadID, server_level, call);
0335:
0336: if (this .pendingLockRequests.put(threadID,
0337: waitLockRequest) != null) {
0338: // formatting
0339: throw new AssertionError(
0340: "WaitLockRequest already pending: "
0341: + waitLockRequest);
0342: }
0343:
0344: if (action.doAwardGreedyLocks()) {
0345: scheduleWaitTimeout(waitLockRequest);
0346: awardLocksGreedily();
0347: } else if (action.doRecallCommit()) {
0348: greediness.startRecallCommit();
0349: recallCommit();
0350: } else if (action.doRemoteLockRequest()) {
0351: remoteLockManager.releaseLockWait(lockID,
0352: threadID, call);
0353: }
0354: } else {
0355: // try again - this could potentially loop forever in a highly contended environment
0356: changed = true;
0357: logger.debug(lockID + " :: wait() : " + threadID
0358: + " : STATE CHANGED - From = " + action
0359: + " To = " + newAction + " - retrying ...");
0360: }
0361: }
0362: } while (changed);
0363:
0364: listener.handleWaitEvent();
0365: if (waitForLock(threadID, server_level, waitLock)) {
0366: throw new InterruptedException();
0367: }
0368: }
0369:
0370: private Action waitAction(ThreadID threadID) {
0371: final Action action = new Action();
0372: if (greediness.isNotGreedy()) {
0373: action.addAction(Action.REMOTE_LOCK_REQUEST);
0374: } else if (canProceedWithRecall(threadID)) {
0375: action.addAction(Action.RECALL_COMMIT);
0376: } else if (greediness.isGreedy()) {
0377: action.addAction(Action.AWARD_GREEDY_LOCKS);
0378: }
0379: if (isLockSynchronouslyHeld(threadID)) {
0380: action.addAction(Action.SYNCHRONOUS_COMMIT);
0381: }
0382: return action;
0383: }
0384:
0385: public synchronized Notify notify(ThreadID threadID, boolean all) {
0386: boolean isRemote;
0387: waitUntillRunning();
0388: recordStatIfEnabled();
0389:
0390: checkValidWaitNotifyState(threadID);
0391: if (!greediness.isNotGreedy()) {
0392: isRemote = notifyLocalWaits(threadID, all);
0393: } else {
0394: isRemote = true;
0395: }
0396: return isRemote ? new Notify(lockID, threadID, all)
0397: : Notify.NULL;
0398: }
0399:
0400: private synchronized void handleInterruptIfWait(ThreadID threadID) {
0401: LockRequest lockRequest = (LockRequest) pendingLockRequests
0402: .get(threadID);
0403: if (!isOnlyWaitLockRequest(lockRequest)) {
0404: return;
0405: }
0406: movedToPending(threadID);
0407: if (canAwardGreedilyNow(threadID, lockRequest.lockLevel())) {
0408: awardLock(threadID, lockRequest.lockLevel());
0409: return;
0410: }
0411: if (greediness.isNotGreedy()) {
0412: // If the lock is not greedily awarded, we need to notify the server to move
0413: // the lock to the pending state.
0414: this .remoteLockManager.interrruptWait(lockID, threadID);
0415: }
0416: }
0417:
0418: // This method needs to be called from a synchronized(this) context.
0419: private void movedToPending(ThreadID threadID) {
0420: LockHold holder = (LockHold) this .holders.get(threadID);
0421: Assert.assertNotNull(holder);
0422: int server_level = holder.goToPending();
0423: LockRequest pending = new LockRequest(lockID, threadID,
0424: server_level);
0425: LockRequest waiter = (LockRequest) this .pendingLockRequests
0426: .remove(threadID);
0427: if (waiter == null) {
0428: logger.warn("Pending request " + pending
0429: + " is not present");
0430: return;
0431: }
0432: // if (waiter instanceof WaitLockRequest) {
0433: if (isOnlyWaitLockRequest(waiter)) {
0434: cancelTimer((WaitLockRequest) waiter);
0435: } else {
0436: logger.warn("Pending request " + pending
0437: + " is not a waiter: " + waiter);
0438: }
0439: this .pendingLockRequests.put(threadID, pending);
0440: }
0441:
0442: /**
0443: * This method is called from a stage-thread and should never block
0444: */
0445: public synchronized void notified(ThreadID threadID) {
0446: movedToPending(threadID);
0447: }
0448:
0449: /**
0450: * XXX:: This method is called from a stage-thread and should never block. Server recalls READ or WRITE depending on
0451: * what triggered the recall.
0452: */
0453: public synchronized void recall(int interestedLevel,
0454: LockFlushCallback callback) {
0455: // debug("recall() - BEGIN - ", LockLevel.toString(interestedLevel));
0456: if (greediness.isGreedy()) {
0457: greediness.recall(interestedLevel);
0458: if (canProceedWithRecall()) {
0459: greediness.startRecallCommit();
0460: if (isTransactionsForLockFlushed(callback)) {
0461: // debug("recall() - recall commit - ", LockLevel.toString(interestedLevel));
0462: recallCommit();
0463: }
0464: }
0465: }
0466: }
0467:
0468: /**
0469: * XXX:: This method is called from a stage-thread and should never block.
0470: */
0471: public void cannotAwardLock(ThreadID threadID, int level) {
0472: final Object waitLock;
0473: synchronized (this ) {
0474: waitLock = waitLocksByRequesterID.remove(threadID);
0475: if (waitLock == null && !threadID.equals(ThreadID.VM_ID)) {
0476: // Not waiting for this lock
0477: throw new LockNotPendingError(
0478: "Attempt to reject a lock request that isn't pending: lockID: "
0479: + lockID + ", level: " + level
0480: + ", requesterID: " + threadID
0481: + ", waitLocksByRequesterID: "
0482: + waitLocksByRequesterID);
0483: }
0484: LockRequest lockRequest = (LockRequest) pendingLockRequests
0485: .remove(threadID);
0486: if (lockRequest == null) {
0487: // formatting
0488: throw new AssertionError(
0489: "Attempt to remove a pending lock request that wasn't pending; lockID: "
0490: + lockID + ", level: " + level
0491: + ", requesterID: " + threadID);
0492: }
0493: cancelTryLockWaitTimerIfNeeded(lockRequest);
0494: }
0495: synchronized (waitLock) {
0496: reject(threadID);
0497: waitLock.notifyAll();
0498: }
0499: }
0500:
0501: private void reject(ThreadID threadID) {
0502: synchronized (rejectedLockRequesterIDs) {
0503: rejectedLockRequesterIDs.add(threadID);
0504: }
0505: }
0506:
0507: /**
0508: * XXX:: This method is called from a stage-thread and should never block.
0509: */
0510: public void awardLock(ThreadID threadID, int level) {
0511: final Object waitLock;
0512: synchronized (this ) {
0513: // debug("awardLock() - BEGIN - ", requesterID, LockLevel.toString(level));
0514: waitLock = waitLocksByRequesterID.remove(threadID);
0515: if (waitLock == null && !threadID.equals(ThreadID.VM_ID)) {
0516: // Not waiting for this lock
0517: throw new LockNotPendingError(
0518: "Attempt to award a lock that isn't pending [lockID: "
0519: + lockID + ", level: " + level
0520: + ", requesterID: " + threadID + "]");
0521: }
0522: if (LockLevel.isGreedy(level)) {
0523: Assert.assertEquals(threadID, ThreadID.VM_ID);
0524: // A Greedy lock is issued by the server. From now on client handles all lock awards until
0525: // 1) a recall is issued
0526: // 2) an lock upgrade is needed.
0527: final int nlevel = LockLevel.makeNotGreedy(level);
0528: greediness.add(nlevel);
0529: awardLocksGreedily();
0530: return;
0531: }
0532:
0533: // LockRequest lockRequestKey = new LockRequest(id, requesterID, level);
0534: LockRequest lockRequest = (LockRequest) pendingLockRequests
0535: .remove(threadID);
0536: if (lockRequest == null) {
0537: // formatting
0538: throw new AssertionError(
0539: "Attempt to remove a pending lock request that wasn't pending; lockID: "
0540: + lockID + ", level: " + level
0541: + ", requesterID: " + threadID);
0542: }
0543: cancelTryLockWaitTimerIfNeeded(lockRequest);
0544: }
0545:
0546: synchronized (waitLock) {
0547: award(threadID, level);
0548: waitLock.notifyAll();
0549: }
0550: }
0551:
0552: /*
0553: * @returns the wait object for lock request
0554: */
0555: private synchronized Object addToPendingLockRequest(
0556: ThreadID threadID, int lockLevel, WaitInvocation timeout,
0557: boolean noBlock) {
0558: // Add Lock Request
0559: LockRequest lockRequest = null;
0560: if (noBlock) {
0561: lockRequest = new TryLockRequest(lockID, threadID,
0562: lockLevel, timeout);
0563: } else {
0564: lockRequest = new LockRequest(lockID, threadID, lockLevel);
0565: }
0566: Object old = pendingLockRequests.put(threadID, lockRequest);
0567: if (old != null) {
0568: // formatting
0569: throw new AssertionError(
0570: "Lock request already outstandind - " + old);
0571: }
0572:
0573: // Add wait object for lock request
0574: Object o = new Object();
0575: Object prev = waitLocksByRequesterID.put(threadID, o);
0576: if (prev != null) {
0577: throw new AssertionError(
0578: "Assert Failed : Previous value is not null. Prev = "
0579: + prev + " Thread id = " + threadID);
0580: }
0581: return o;
0582: }
0583:
0584: private boolean waitForTryLock(ThreadID threadID, Object waitLock) {
0585: // debug("waitForTryLock() - BEGIN - ", requesterID, LockLevel.toString(type));
0586: boolean isInterrupted = false;
0587: synchronized (waitLock) {
0588: // We need to check if the respond has returned already before we do a wait
0589: while (!isLockRequestResponded(threadID)) {
0590: try {
0591: waitLock.wait();
0592: } catch (InterruptedException ioe) {
0593: isInterrupted = true;
0594: }
0595: }
0596: }
0597: return isInterrupted;
0598: }
0599:
0600: private void scheduleTryLockTimerIfNeeded(TryLockRequest request) {
0601: if (waitTimers.containsKey(request)) {
0602: return;
0603: }
0604:
0605: ThreadID threadID = request.threadID();
0606: int lockLevel = request.lockLevel();
0607: WaitInvocation timeout = request.getWaitInvocation();
0608: timeout.adjust();
0609: scheduleWaitForTryLock(threadID, lockLevel, timeout);
0610: }
0611:
0612: private void scheduleWaitForTryLock(ThreadID threadID,
0613: int lockLevel, WaitInvocation timeout) {
0614: TryLockRequest tryLockWaitRequest = (TryLockRequest) pendingLockRequests
0615: .get(threadID);
0616: scheduleWaitTimeout(tryLockWaitRequest);
0617: }
0618:
0619: private boolean isLockRequestResponded(ThreadID threadID) {
0620: if (isHeldBy(threadID)) {
0621: return true;
0622: }
0623: synchronized (rejectedLockRequesterIDs) {
0624: return rejectedLockRequesterIDs.remove(threadID);
0625: }
0626: }
0627:
0628: private boolean waitForLock(ThreadID threadID, int type,
0629: Object waitLock) {
0630: // debug("waitForLock() - BEGIN - ", requesterID, LockLevel.toString(type));
0631: boolean isInterrupted = false;
0632: while (!isHeldBy(threadID, type)) {
0633: try {
0634: synchronized (waitLock) {
0635: if (!isHeldBy(threadID, type)) {
0636: waitLock.wait();
0637: }
0638: }
0639: } catch (InterruptedException ioe) {
0640: if (!isInterrupted) {
0641: isInterrupted = true;
0642: handleInterruptIfWait(threadID);
0643: }
0644: } catch (Throwable e) {
0645: throw new TCRuntimeException(e);
0646: }
0647: }
0648: return isInterrupted;
0649: // debug("waitForLock() - WAKEUP - ", requesterID, LockLevel.toString(type));
0650: }
0651:
0652: // Schedule a timer.
0653: private synchronized void scheduleWaitTimeout(
0654: WaitLockRequest waitLockRequest) {
0655: final TimerTask timer = waitTimer.scheduleTimer(this ,
0656: waitLockRequest.getWaitInvocation(), waitLockRequest);
0657: if (timer != null) {
0658: waitTimers.put(waitLockRequest, timer);
0659: }
0660: }
0661:
0662: private synchronized void awardLocksGreedily() {
0663: // debug("awardLocksGreedily() - BEGIN - ", "");
0664: List copy = new ArrayList(pendingLockRequests.values());
0665: for (Iterator i = copy.iterator(); i.hasNext();) {
0666: Object o = i.next();
0667: if (isOnlyWaitLockRequest(o))
0668: continue;
0669: LockRequest lr = (LockRequest) o;
0670: if (canAwardGreedilyNow(lr.threadID(), lr.lockLevel())) {
0671: awardLock(lr.threadID(), lr.lockLevel());
0672: } else if (isTryLockRequest(lr)) {
0673: scheduleTryLockTimerIfNeeded((TryLockRequest) lr);
0674: }
0675: }
0676: }
0677:
0678: /**
0679: * Returns true if a remote lock release is required. The difference between this and the real removeCurrent is that
0680: * this does not change state.
0681: */
0682: private synchronized boolean isRemoteUnlockRequired(
0683: ThreadID threadID) {
0684: // debug("isRemoteUnlockRequired() - BEGIN - ", id);
0685: LockHold holder = (LockHold) this .holders.get(threadID);
0686: Assert.assertNotNull(holder);
0687:
0688: if (LockLevel.isConcurrent(holder.getLevel())) {
0689: return false;
0690: }
0691:
0692: return holder.isRemoteUnlockRequired();
0693: }
0694:
0695: /**
0696: * Returns true if a remote lock release is required
0697: */
0698: private synchronized boolean removeCurrent(ThreadID threadID) {
0699: // debug("removeCurrrent() - BEGIN - ", id);
0700: LockHold holder = (LockHold) this .holders.get(threadID);
0701: Assert.assertNotNull(holder);
0702:
0703: if (LockLevel.isConcurrent(holder.getLevel())) {
0704: holder.removeCurrent();
0705:
0706: if (holder.getLevel() == LockLevel.NIL_LOCK_LEVEL) {
0707: this .holders.remove(threadID);
0708: }
0709:
0710: return false;
0711: }
0712:
0713: boolean rv = holder.removeCurrent();
0714:
0715: if (holder.getLevel() == LockLevel.NIL_LOCK_LEVEL) {
0716: this .holders.remove(threadID);
0717: }
0718:
0719: return rv;
0720: }
0721:
0722: private void checkValidWaitNotifyState(ThreadID threadID) {
0723: if (!isHeldBy(threadID, LockLevel.WRITE)) {
0724: // make the formatter happy
0725: throw new IllegalMonitorStateException(
0726: "The current Thread (" + threadID
0727: + ") does not hold a WRITE lock for "
0728: + lockID);
0729: }
0730: }
0731:
0732: /**
0733: * Returns true if this notification should be send to the server for handling. This notification is not needed to be
0734: * sent to the server if all is false and we have notified 1 waiter locally.
0735: */
0736: private synchronized boolean notifyLocalWaits(ThreadID threadID,
0737: boolean all) {
0738: for (Iterator i = new HashSet(pendingLockRequests.values())
0739: .iterator(); i.hasNext();) {
0740: Object o = i.next();
0741: if (isOnlyWaitLockRequest(o)) {
0742: WaitLockRequest wlr = (WaitLockRequest) o;
0743: notified(wlr.threadID());
0744: if (!all) {
0745: return false;
0746: }
0747: }
0748: }
0749: return true;
0750: }
0751:
0752: private boolean isTransactionsForLockFlushed(
0753: LockFlushCallback callback) {
0754: return remoteLockManager.isTransactionsForLockFlushed(lockID,
0755: callback);
0756: }
0757:
0758: public void transactionsForLockFlushed(LockID id) {
0759: Assert.assertEquals(lockID, id);
0760: recallCommit();
0761: }
0762:
0763: private synchronized void recallCommit() {
0764: // debug("recallCommit() - BEGIN - ", "");
0765: if (greediness.isRecallInProgress()) {
0766: greediness.recallComplete();
0767: cancelTimers();
0768: // Attach the pending lock requests and tryLock requests to the recall commit message.
0769: remoteLockManager.recallCommit(lockID,
0770: addHoldersToAsLockRequests(new ArrayList()),
0771: addAllWaitersTo(new ArrayList()),
0772: addAllPendingLockRequestsTo(new ArrayList()),
0773: addAllPendingTryLockRequestsTo(new ArrayList()));
0774: } else {
0775: logger
0776: .debug(lockID
0777: + " : recallCommit() : skipping as the state is not RECALL_IN_PROGRESS !");
0778: }
0779: }
0780:
0781: private void flush() {
0782: remoteLockManager.flush(lockID);
0783: }
0784:
0785: public synchronized Collection addAllWaitersTo(Collection c) {
0786: if (greediness.isNotGreedy()) {
0787: for (Iterator i = pendingLockRequests.values().iterator(); i
0788: .hasNext();) {
0789: Object o = i.next();
0790: if (isOnlyWaitLockRequest(o)) {
0791: c.add(o);
0792: }
0793: }
0794: }
0795: return c;
0796: }
0797:
0798: public synchronized Collection addHoldersToAsLockRequests(
0799: Collection c) {
0800: if (greediness.isNotGreedy()) {
0801: for (Iterator i = holders.keySet().iterator(); i.hasNext();) {
0802: ThreadID threadID = (ThreadID) i.next();
0803: LockHold hold = (LockHold) holders.get(threadID);
0804: if (hold.isHolding()
0805: && hold.getServerLevel() != LockLevel.NIL_LOCK_LEVEL) {
0806: c.add(new LockRequest(this .lockID, threadID, hold
0807: .getServerLevel()));
0808: }
0809: }
0810: } else {
0811: // All other states -- GREEDY, RECALLED, RECALL-COMMIT-INPROGRESS
0812: c.add(new LockRequest(this .lockID, ThreadID.VM_ID,
0813: greediness.getLevel()));
0814: }
0815: return c;
0816: }
0817:
0818: public synchronized Collection addAllPendingLockRequestsTo(
0819: Collection c) {
0820: if (greediness.isNotGreedy()) {
0821: for (Iterator i = pendingLockRequests.values().iterator(); i
0822: .hasNext();) {
0823: LockRequest request = (LockRequest) i.next();
0824: if (isWaitLockRequest(request))
0825: continue;
0826: c.add(request);
0827: }
0828: }
0829: return c;
0830: }
0831:
0832: public synchronized Collection addAllPendingTryLockRequestsTo(
0833: Collection c) {
0834: if (greediness.isNotGreedy()) {
0835: for (Iterator i = pendingLockRequests.values().iterator(); i
0836: .hasNext();) {
0837: LockRequest request = (LockRequest) i.next();
0838: if (isTryLockRequest(request)) {
0839: c.add(request);
0840: }
0841: }
0842: }
0843: return c;
0844: }
0845:
0846: public synchronized void incUseCount() {
0847: if (useCount == Integer.MAX_VALUE) {
0848: throw new AssertionError(
0849: "Lock use count cannot exceed integer max value");
0850: }
0851: useCount++;
0852: timeUsed = System.currentTimeMillis();
0853: }
0854:
0855: public synchronized void decUseCount() {
0856: if (useCount == 0) {
0857: throw new AssertionError("Lock use count is zero");
0858: }
0859: useCount--;
0860: timeUsed = System.currentTimeMillis();
0861: }
0862:
0863: public synchronized int getUseCount() {
0864: return useCount;
0865: }
0866:
0867: // call from a synchronized(this) context
0868: private void cancelTryLockWaitTimerIfNeeded(LockRequest request) {
0869: if (isTryLockRequest(request)) {
0870: cancelTimer((TryLockRequest) request);
0871: }
0872: }
0873:
0874: private synchronized void cancelTimer(WaitLockRequest request) {
0875: TimerTask timer = (TimerTask) waitTimers.remove(request);
0876: if (timer != null) {
0877: timer.cancel();
0878: }
0879: }
0880:
0881: private synchronized void cancelTimers() {
0882: Collection copy = new ArrayList(waitTimers.keySet());
0883: for (Iterator iter = copy.iterator(); iter.hasNext();) {
0884: WaitLockRequest wlr = (WaitLockRequest) iter.next();
0885: cancelTimer(wlr);
0886: }
0887: }
0888:
0889: /*
0890: * @see ClientLock.addRecalledHolders();
0891: */
0892: private synchronized boolean canProceedWithRecall() {
0893: return canProceedWithRecall(ThreadID.NULL_ID);
0894: }
0895:
0896: private synchronized boolean canProceedWithRecall(ThreadID threadID) {
0897: if (greediness.isRecalled()) {
0898: Map map = addRecalledHoldersTo(new HashMap());
0899: if (threadID != ThreadID.NULL_ID) {
0900: map.remove(threadID);
0901: }
0902: for (Iterator i = pendingLockRequests.values().iterator(); i
0903: .hasNext()
0904: && map.size() != 0;) {
0905: Object o = i.next();
0906: if (isOnlyWaitLockRequest(o)) {
0907: // These are not in the map.
0908: continue;
0909: }
0910: if (o instanceof LockRequest) {
0911: LockRequest lr = (LockRequest) o;
0912: map.remove(lr.threadID());
0913: }
0914: }
0915: return (map.size() == 0);
0916: }
0917: return false;
0918: }
0919:
0920: private void award(ThreadID threadID, int level) {
0921: // debug("award() - BEGIN - ", id, LockLevel.toString(level));
0922: synchronized (this ) {
0923: LockHold holder = (LockHold) this .holders.get(threadID);
0924: if (holder == null) {
0925: holders.put(threadID, new LockHold(this .lockID, level));
0926: } else if (holder.isHolding()) {
0927: holder.add(level);
0928: } else {
0929: // Lock is awarded after wait
0930: try {
0931: holder.goToHolding(level);
0932: } catch (TCAssertionError er) {
0933: logger
0934: .warn("Lock in wrong STATE for holder - ("
0935: + threadID + ", "
0936: + LockLevel.toString(level)
0937: + ") - " + this );
0938: throw er;
0939: }
0940: }
0941: }
0942: }
0943:
0944: private synchronized void awardSynchronous(ThreadID threadID,
0945: int lockLevel) {
0946: LockHold holder = (LockHold) this .holders.get(threadID);
0947: if (holder != null && holder.isHolding()
0948: && ((holder.getLevel() & lockLevel) == lockLevel)) {
0949: holder.makeLastAwardSynchronous(lockLevel);
0950: }
0951: }
0952:
0953: /*
0954: * @returns true if the lock can be awarded greedily now.
0955: */
0956: private synchronized boolean canAwardGreedilyNow(ThreadID threadID,
0957: int level) {
0958: if (greediness.isGreedy()) {
0959: // We can award the lock greedily now if
0960: // 1) The request is for WRITE and we hold a Greedy WRITE and there are no holders.
0961: // 2) The request is for WRITE and we hold a Greedy WRITE and there is only 1 holder, which is the requesterID.
0962: // This is a valid local UPGRADE request.
0963: // 3) The request is for READ and we hold a Greedy WRITE and there are no holders of WRITE.
0964: // 4) The request is for READ and we hold a Greedy READ
0965: if (LockLevel.isWrite(level)
0966: && greediness.isWrite()
0967: && (!isHeld() || ((heldCount() == 1) && isHeldBy(threadID)))) {
0968: // (1) and (2)
0969: return true;
0970: } else if (LockLevel.isRead(level) && greediness.isWrite()
0971: && !isWriteHeld()) {
0972: // (3)
0973: return true;
0974: } else if (LockLevel.isRead(level) && greediness.isRead()) {
0975: // (4)
0976: return true;
0977: }
0978: }
0979: return false;
0980: }
0981:
0982: private boolean isLockSynchronouslyHeld(ThreadID threadID) {
0983: LockHold holder = (LockHold) this .holders.get(threadID);
0984: if (holder != null && holder.isHolding()) {
0985: return holder.isLastLockSynchronouslyHeld();
0986: }
0987: return false;
0988: }
0989:
0990: /*
0991: * @returns true if the greedy lock should be let go.
0992: */
0993: // private synchronized boolean isGreedyRecallNeeded(ThreadID threadID, int level) {
0994: // if (greediness.isGreedy()) {
0995: // // We let the lock recalled if the request is for WRITE and we hold a Greedy READ
0996: // if (LockLevel.isWrite(level) && greediness.isReadOnly()) { return true; }
0997: // }
0998: // return false;
0999: // }
1000: private boolean isWriteHeld() {
1001: synchronized (holders) {
1002: for (Iterator it = holders.values().iterator(); it
1003: .hasNext();) {
1004: LockHold holder = (LockHold) it.next();
1005: if (holder.isHolding()
1006: && LockLevel.isWrite(holder.getLevel())) {
1007: return true;
1008: }
1009: }
1010: return false;
1011: }
1012: }
1013:
1014: /**
1015: * This method adds the ThreadIDs of the Holders of the locks that is been recalled. The server can recall a lock
1016: * because of a READ or a WRITE request. The Holders for whom we need to worry about is explained below. Greedy |
1017: * Recall | Who 1) R | W | No Holders who are not pending lock request/ wait 2) W | W | No Holders who are not pending
1018: * lock request/ wait 3) W | R | No Write Holders who are not pending lock request/ wait
1019: */
1020: private synchronized Map addRecalledHoldersTo(Map map) {
1021: Assert.assertTrue(greediness.isRecalled());
1022: for (Iterator it = holders.entrySet().iterator(); it.hasNext();) {
1023: Entry e = (Entry) it.next();
1024: ThreadID id = (ThreadID) e.getKey();
1025: LockHold holder = (LockHold) e.getValue();
1026: if (!holder.isHolding())
1027: continue;
1028: if ((greediness.getRecalledLevel() == LockLevel.READ)
1029: && LockLevel.isRead(holder.getLevel())) {
1030: // (3)
1031: continue;
1032: }
1033: map.put(id, id);
1034: }
1035: return map;
1036: }
1037:
1038: public synchronized void waitTimeout(Object callbackObject) {
1039: waitUntillRunning();
1040: // debug("waitTimeout() - BEGIN - ", callbackObject);
1041: if (isTryLockRequest(callbackObject)) {
1042: // If the callbackObject is a tryLock request, reject the tryLock request.
1043: TryLockRequest wlr = (TryLockRequest) callbackObject;
1044: LockID timeoutLockID = wlr.lockID();
1045: if (!lockID.equals(timeoutLockID)) {
1046: throw new AssertionError(
1047: "waitTimeout: LockIDs are not the same : "
1048: + lockID + " : " + timeoutLockID);
1049: }
1050: // We need to check if the tryLock is still waiting locally to accommodate the race condition that the
1051: // timer thread just timeouts and before the timer thread invokes this method, the application thread does
1052: // an unlock and recallCommit, which will move the tryLock request to the server.
1053: if (isTryLockWaiting(wlr)) {
1054: int timeoutLockLevel = wlr.lockLevel();
1055: ThreadID timeoutThreadID = wlr.threadID();
1056: cannotAwardLock(timeoutThreadID, timeoutLockLevel);
1057: }
1058: } else if (isOnlyWaitLockRequest(callbackObject)) {
1059: WaitLockRequest wlr = (WaitLockRequest) callbackObject;
1060: LockID timedoutLockID = wlr.lockID();
1061: if (!lockID.equals(timedoutLockID)) {
1062: throw new AssertionError(
1063: "WaitTimeout: LockIDs are not the same : "
1064: + lockID + " : " + timedoutLockID);
1065: }
1066: if (greediness.isWrite() && isWaiting(wlr.threadID())) {
1067: notified(wlr.threadID());
1068: awardLocksGreedily();
1069: return;
1070: }
1071: }
1072: logger.warn("Ignoring wait timeout for : " + callbackObject);
1073: }
1074:
1075: public synchronized boolean isClear() {
1076: return (holders.isEmpty() && greediness.isNotGreedy()
1077: && (pendingLockRequests.size() == 0) && (useCount == 0));
1078: }
1079:
1080: // This method is synchronized such that we can quickly inspect for potential timeouts and only on possible
1081: // timeouts we grab the lock.
1082: public boolean timedout() {
1083: if (useCount != 0) {
1084: return false;
1085: }
1086: synchronized (this ) {
1087: return (holders.isEmpty() && greediness.isGreedy()
1088: && (pendingLockRequests.size() == 0)
1089: && (useCount == 0) && ((System.currentTimeMillis() - timeUsed) > ClientLockManagerImpl.TIMEOUT));
1090: }
1091: }
1092:
1093: private boolean isHeldBy(ThreadID threadID) {
1094: synchronized (holders) {
1095: LockHold holder = (LockHold) holders.get(threadID);
1096: if (holder != null) {
1097: return holder.isHolding();
1098: }
1099: return false;
1100: }
1101: }
1102:
1103: public boolean isHoldingReadLockExclusively(ThreadID threadID) {
1104: return isHeldBy(threadID, LockLevel.READ)
1105: && !isHeldBy(threadID, LockLevel.WRITE);
1106: }
1107:
1108: public boolean isHeldBy(ThreadID threadID, int level) {
1109: synchronized (holders) {
1110: LockHold holder = (LockHold) holders.get(threadID);
1111: if (holder != null) {
1112: return ((holder.isHolding()) && ((holder.getLevel() & level) == level));
1113: }
1114: return false;
1115: }
1116: }
1117:
1118: public boolean isHeld() {
1119: synchronized (holders) {
1120: for (Iterator it = holders.values().iterator(); it
1121: .hasNext();) {
1122: LockHold holder = (LockHold) it.next();
1123: if (holder.isHolding()) {
1124: return true;
1125: }
1126: }
1127: return false;
1128: }
1129: }
1130:
1131: private boolean isTryLockWaiting(TryLockRequest request) {
1132: return waitTimers.containsKey(request);
1133: }
1134:
1135: private boolean isWaiting(ThreadID threadID) {
1136: synchronized (holders) {
1137: LockHold holder = (LockHold) holders.get(threadID);
1138: if (holder != null) {
1139: return holder.isWaiting();
1140: }
1141: return false;
1142: }
1143: }
1144:
1145: private int heldCount() {
1146: int count = 0;
1147: synchronized (holders) {
1148: for (Iterator it = holders.values().iterator(); it
1149: .hasNext();) {
1150: LockHold holder = (LockHold) it.next();
1151: if (holder.isHolding()) {
1152: count++;
1153: }
1154: }
1155: }
1156: return count;
1157: }
1158:
1159: public int localHeldCount(ThreadID threadID, int lockLevel) {
1160: LockHold holder;
1161: synchronized (holders) {
1162: holder = (LockHold) holders.get(threadID);
1163: }
1164: if (holder == null)
1165: return 0;
1166: else
1167: return holder.heldCount(lockLevel);
1168: }
1169:
1170: public synchronized int queueLength() {
1171: int count = 0;
1172: for (Iterator i = pendingLockRequests.values().iterator(); i
1173: .hasNext();) {
1174: Object o = i.next();
1175: if (!isOnlyWaitLockRequest(o))
1176: count++;
1177: }
1178: return count;
1179: }
1180:
1181: public synchronized int waitLength() {
1182: int localCount = 0;
1183: for (Iterator i = pendingLockRequests.values().iterator(); i
1184: .hasNext();) {
1185: Object o = i.next();
1186: if (isOnlyWaitLockRequest(o))
1187: localCount++;
1188: }
1189: return localCount;
1190: }
1191:
1192: public LockID getLockID() {
1193: return lockID;
1194: }
1195:
1196: public int hashCode() {
1197: return this .lockID.hashCode();
1198: }
1199:
1200: public boolean equals(Object obj) {
1201: if (obj instanceof ClientLock) {
1202: ClientLock lock = (ClientLock) obj;
1203: return lock.lockID.equals(lockID);
1204: }
1205: return false;
1206: }
1207:
1208: public String toString() {
1209: return "Lock@" + System.identityHashCode(this ) + " [ " + lockID
1210: + " ] : Holders = " + holders
1211: + " : PendingLockRequest : " + pendingLockRequests
1212: + " : Use count : " + useCount + " : state : " + state
1213: + " : " + greediness;
1214: }
1215:
1216: private boolean isConcurrentWriteLock(ThreadID threadID) {
1217: LockHold holder = (LockHold) holders.get(threadID);
1218: if (holder != null) {
1219: return LockLevel.isConcurrent(holder.getLevel());
1220: }
1221: return false;
1222: }
1223:
1224: public void pause() {
1225: state = PAUSED;
1226: }
1227:
1228: public synchronized void unpause() {
1229: state = RUNNING;
1230: notifyAll();
1231: }
1232:
1233: private synchronized void waitUntillRunning() {
1234: boolean isInterrupted = false;
1235: while (state != RUNNING) {
1236: try {
1237: wait();
1238: } catch (InterruptedException e) {
1239: isInterrupted = true;
1240: }
1241: }
1242: Util.selfInterruptIfNeeded(isInterrupted);
1243: }
1244:
1245: private boolean isWaitLockRequest(Object request) {
1246: return request instanceof WaitLockRequest;
1247: }
1248:
1249: private boolean isTryLockRequest(Object request) {
1250: return request instanceof TryLockRequest;
1251: }
1252:
1253: private boolean isOnlyWaitLockRequest(Object request) {
1254: return isWaitLockRequest(request) && !isTryLockRequest(request);
1255: }
1256:
1257: // I wish we were using 1.5 !!!
1258: // private void debug(Object o1, Object o2) {
1259: // logger.warn(lockID + String.valueOf(o1) + String.valueOf(o2));
1260: // }
1261: //
1262: // private void debug(Object o1, Object o2, Object o3) {
1263: // logger.warn(lockID + String.valueOf(o1) + String.valueOf(o2) + String.valueOf(o3));
1264: // }
1265: //
1266: // private void debug(Object o1, Object o2, Object o3, Object o4) {
1267: // logger.warn(lockID + String.valueOf(o1) + String.valueOf(o2) + String.valueOf(o3) + String.valueOf(o4));
1268: // }
1269: //
1270: // private void debug(Object o) {
1271: // logger.warn(lockID + String.valueOf(o));
1272: // }
1273: //
1274:
1275: private static class LockHold {
1276: private static final State HOLDING = new State("HOLDING");
1277: private static final State WAITING = new State("WAITING");
1278: private static final State PENDING = new State("PENDING");
1279:
1280: private int level;
1281: private int server_level;
1282: private State state;
1283: private final TIntIntHashMap counts = new TIntIntHashMap();
1284: private final TIntStack levels = new TIntStack();
1285: private final LockID lockID;
1286:
1287: LockHold(LockID lockID, int level) {
1288: this .lockID = lockID;
1289: if (!LockLevel.isDiscrete(level)) {
1290: throw new AssertionError("Non-discreet level " + level);
1291: }
1292: Assert.eval(level != LockLevel.NIL_LOCK_LEVEL);
1293: this .level = level;
1294: this .levels.push(level);
1295: this .counts.put(level, 1);
1296: initServerLevel();
1297: this .state = HOLDING;
1298: }
1299:
1300: private void initServerLevel() {
1301: if (level == LockLevel.READ || level == LockLevel.WRITE) {
1302: server_level = level;
1303: } else {
1304: server_level = LockLevel.NIL_LOCK_LEVEL;
1305: }
1306: }
1307:
1308: int getServerLevel() {
1309: return this .server_level;
1310: }
1311:
1312: int getLevel() {
1313: return this .level;
1314: }
1315:
1316: boolean isHolding() {
1317: return (state == HOLDING);
1318: }
1319:
1320: boolean isWaiting() {
1321: return (state == WAITING);
1322: }
1323:
1324: boolean isPending() {
1325: return (state == PENDING);
1326: }
1327:
1328: int heldCount() {
1329: return levels.size();
1330: }
1331:
1332: int heldCount(int lockLevel) {
1333: return this .counts.get(lockLevel);
1334: }
1335:
1336: void makeLastAwardSynchronous(int lockLevel) {
1337: int lastLevel = this .levels.pop();
1338: Assert.assertEquals(lockLevel, lastLevel);
1339: this .levels.push(LockLevel.makeSynchronous(lockLevel));
1340: }
1341:
1342: boolean isLastLockSynchronouslyHeld() {
1343: int lastLevel = this .levels.peek();
1344: return LockLevel.isSynchronous(lastLevel);
1345: }
1346:
1347: void add(int lockLevel) {
1348: Assert.eval("Non-discreet level " + lockLevel, LockLevel
1349: .isDiscrete(lockLevel));
1350:
1351: this .levels.push(lockLevel);
1352: this .level |= lockLevel;
1353: Assert.eval(level != LockLevel.NIL_LOCK_LEVEL);
1354: if ((lockLevel == LockLevel.READ && (!LockLevel
1355: .isWrite(server_level)))
1356: || (lockLevel == LockLevel.WRITE)) {
1357: server_level |= lockLevel;
1358: }
1359: if (!this .counts.increment(lockLevel)) {
1360: this .counts.put(lockLevel, 1);
1361: }
1362: }
1363:
1364: /**
1365: * Returns true if a remote lock release is required. This method does not change the state.
1366: */
1367: boolean isRemoteUnlockRequired() {
1368: Assert.eval(this .levels.size() > 0);
1369: int lastLevel = LockLevel.makeNotSynchronous(levels.peek());
1370:
1371: Assert.eval(this .counts.contains(lastLevel));
1372: int count = this .counts.get(lastLevel);
1373: Assert.eval(count > 0);
1374:
1375: count--;
1376: if (count > 0) {
1377: return false;
1378: }
1379:
1380: return lastLevel == LockLevel.WRITE
1381: || ((this .level ^ lastLevel) == LockLevel.NIL_LOCK_LEVEL);
1382: }
1383:
1384: /**
1385: * Returns true if a remote lock release is required
1386: */
1387: boolean removeCurrent() {
1388: Assert.eval(this .levels.size() > 0);
1389: int lastLevel = LockLevel.makeNotSynchronous(levels.pop());
1390:
1391: Assert.eval(this .counts.contains(lastLevel));
1392: int count = this .counts.remove(lastLevel);
1393: Assert.eval(count > 0);
1394:
1395: count--;
1396: if (count > 0) {
1397: this .counts.put(lastLevel, count);
1398: return false;
1399: }
1400:
1401: this .level ^= lastLevel;
1402:
1403: if ((lastLevel == LockLevel.READ && (!LockLevel
1404: .isWrite(server_level)))
1405: || (lastLevel == LockLevel.WRITE)) {
1406: server_level ^= lastLevel;
1407: }
1408: return lastLevel == LockLevel.WRITE
1409: || this .level == LockLevel.NIL_LOCK_LEVEL;
1410: }
1411:
1412: int goToWaitState() {
1413: Assert.assertTrue(LockLevel.isWrite(this .level));
1414: Assert.assertTrue(state == HOLDING);
1415: this .state = WAITING;
1416: /*
1417: * server_level is not changed to NIL_LOCK_LEVEL even though the server will release the lock as we need to know
1418: * what state we were holding before wait on certain scenarios like server crash etc.
1419: *
1420: * @see ClientLockManager.notified
1421: */
1422: return this .server_level;
1423: }
1424:
1425: int goToPending() {
1426: /*
1427: * The lock might not be in WAITING state if the server has restarted and the client has been notified again
1428: * because of a resent of an already applied transaction. The lock could even be in HOLDING state if the resent
1429: * transaction is already fully ACKED before the server crashed and the originating client processes the ACK after
1430: * resend, since notifies and lock awards happen in different thread. So we ignore such notifies.
1431: */
1432: if (this .state != WAITING) {
1433: logger
1434: .warn(this .lockID
1435: + ": Ignoring Moving to PENDING since not in WAITING state: current state = "
1436: + this .state);
1437: } else {
1438: this .state = PENDING;
1439: }
1440: return this .server_level;
1441: }
1442:
1443: void goToHolding(int slevel) {
1444: Assert.assertTrue(slevel == server_level);
1445: if (state != PENDING)
1446: throw new AssertionError(
1447: "Attempt to to to HOLDING while not PENDING: "
1448: + state);
1449: this .state = HOLDING;
1450: }
1451:
1452: public String toString() {
1453: return "LockHold[" + state + ","
1454: + LockLevel.toString(level) + "]";
1455: }
1456: }
1457:
1458: private static class Greediness {
1459: private static final State NOT_GREEDY = new State("NOT GREEDY");
1460: private static final State GREEDY = new State("GREEDY");
1461: private static final State RECALLED = new State("RECALLED");
1462: private static final State RECALL_IN_PROGRESS = new State(
1463: "RECALL IN PROGRESS");
1464:
1465: private int level = LockLevel.NIL_LOCK_LEVEL;
1466: private int recallLevel = LockLevel.NIL_LOCK_LEVEL;
1467: private State state = NOT_GREEDY;
1468:
1469: void add(int l) {
1470: this .level |= l;
1471: state = GREEDY;
1472: }
1473:
1474: int getLevel() {
1475: return level;
1476: }
1477:
1478: int getRecalledLevel() {
1479: return recallLevel;
1480: }
1481:
1482: void recall(int rlevel) {
1483: Assert.assertTrue(state == GREEDY);
1484: this .recallLevel |= rlevel;
1485: state = RECALLED;
1486: }
1487:
1488: boolean isRead() {
1489: return LockLevel.isRead(level);
1490: }
1491:
1492: boolean isReadOnly() {
1493: return isRead() && !isWrite();
1494: }
1495:
1496: boolean isWrite() {
1497: return LockLevel.isWrite(level);
1498: }
1499:
1500: boolean isUpgrade() {
1501: return isRead() && isWrite();
1502: }
1503:
1504: boolean isGreedy() {
1505: return (state == GREEDY);
1506: }
1507:
1508: // XXX:: Note that isNotGreedy() != (!isGreedy())
1509: boolean isNotGreedy() {
1510: return (state == NOT_GREEDY);
1511: }
1512:
1513: public String toString() {
1514: return "Greedy Token [ Lock Level = "
1515: + LockLevel.toString(level) + ", Recall Level = "
1516: + LockLevel.toString(recallLevel) + ", " + state
1517: + "]";
1518: }
1519:
1520: boolean isRecalled() {
1521: return (state == RECALLED);
1522: }
1523:
1524: boolean isRecallInProgress() {
1525: return (state == RECALL_IN_PROGRESS);
1526: }
1527:
1528: void startRecallCommit() {
1529: Assert.assertTrue(state == RECALLED);
1530: state = RECALL_IN_PROGRESS;
1531: }
1532:
1533: void recallComplete() {
1534: Assert.assertTrue(state == RECALL_IN_PROGRESS);
1535: this .state = NOT_GREEDY;
1536: this .recallLevel = LockLevel.NIL_LOCK_LEVEL;
1537: this .level = LockLevel.NIL_LOCK_LEVEL;
1538: }
1539: }
1540:
1541: private static class Action {
1542: private static final int NIL_ACTION = 0x00;
1543: private static final int REMOTE_LOCK_REQUEST = 0x01;
1544: private static final int RECALL = 0x02;
1545: private static final int RECALL_COMMIT = 0x04;
1546: private static final int AWARD_GREEDY_LOCKS = 0x08;
1547: private static final int SYNCHRONOUS_COMMIT = 0x10;
1548:
1549: private int action = NIL_ACTION;
1550:
1551: void addAction(int a) {
1552: action |= a;
1553: }
1554:
1555: boolean doRemoteLockRequest() {
1556: return ((action & REMOTE_LOCK_REQUEST) == REMOTE_LOCK_REQUEST);
1557: }
1558:
1559: boolean doRecall() {
1560: return ((action & RECALL) == RECALL);
1561: }
1562:
1563: boolean doRecallCommit() {
1564: return ((action & RECALL_COMMIT) == RECALL_COMMIT);
1565: }
1566:
1567: boolean doAwardGreedyLocks() {
1568: return ((action & AWARD_GREEDY_LOCKS) == AWARD_GREEDY_LOCKS);
1569: }
1570:
1571: boolean doSynchronousCommit() {
1572: return ((action & SYNCHRONOUS_COMMIT) == SYNCHRONOUS_COMMIT);
1573: }
1574:
1575: public boolean equals(Object o) {
1576: if (!(o instanceof Action))
1577: return false;
1578: return (((Action) o).action == action);
1579: }
1580:
1581: public int hashCode() {
1582: return action;
1583: }
1584:
1585: public String toString() {
1586: return "Action:[" + getDescription() + "]";
1587: }
1588:
1589: public String getDescription() {
1590: if (action == NIL_ACTION) {
1591: return "NIL_ACTION";
1592: }
1593: StringBuffer sb = new StringBuffer(" ");
1594: if (doAwardGreedyLocks())
1595: sb.append("AWARD_GREEDY_LOCKS,");
1596: if (doRecall())
1597: sb.append("RECALL,");
1598: if (doRecallCommit())
1599: sb.append("RECALL_COMMIT,");
1600: if (doRemoteLockRequest())
1601: sb.append("REMOTE_LOCK_REQUEST,");
1602: if (doSynchronousCommit())
1603: sb.append("SYNCHRONOUS_COMMIT,");
1604: sb.setLength(sb.length() - 1);
1605: return sb.toString();
1606: }
1607: }
1608: }
|