0001: /*
0002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
0003: * notice. All rights reserved.
0004: */
0005: package com.tc.objectserver.lockmanager.impl;
0006:
0007: import org.apache.commons.collections.map.ListOrderedMap;
0008:
0009: import com.tc.async.api.Sink;
0010: import com.tc.exception.TCInternalError;
0011: import com.tc.exception.TCLockUpgradeNotSupportedError;
0012: import com.tc.logging.TCLogger;
0013: import com.tc.logging.TCLogging;
0014: import com.tc.management.L2LockStatsManager;
0015: import com.tc.net.groups.NodeID;
0016: import com.tc.object.lockmanager.api.LockContext;
0017: import com.tc.object.lockmanager.api.LockID;
0018: import com.tc.object.lockmanager.api.LockLevel;
0019: import com.tc.object.lockmanager.api.ServerThreadID;
0020: import com.tc.object.lockmanager.api.ThreadID;
0021: import com.tc.object.lockmanager.api.WaitTimer;
0022: import com.tc.object.lockmanager.api.WaitTimerCallback;
0023: import com.tc.object.net.DSOChannelManager;
0024: import com.tc.object.tx.WaitInvocation;
0025: import com.tc.objectserver.context.LockResponseContext;
0026: import com.tc.objectserver.lockmanager.api.LockEventListener;
0027: import com.tc.objectserver.lockmanager.api.LockHolder;
0028: import com.tc.objectserver.lockmanager.api.LockMBean;
0029: import com.tc.objectserver.lockmanager.api.LockWaitContext;
0030: import com.tc.objectserver.lockmanager.api.NotifiedWaiters;
0031: import com.tc.objectserver.lockmanager.api.ServerLockRequest;
0032: import com.tc.objectserver.lockmanager.api.TCIllegalMonitorStateException;
0033: import com.tc.objectserver.lockmanager.api.Waiter;
0034: import com.tc.util.Assert;
0035:
0036: import java.util.ArrayList;
0037: import java.util.Collection;
0038: import java.util.Collections;
0039: import java.util.HashMap;
0040: import java.util.Iterator;
0041: import java.util.List;
0042: import java.util.Map;
0043: import java.util.Set;
0044: import java.util.TimerTask;
0045:
0046: public class Lock {
0047: private static final TCLogger logger = TCLogging
0048: .getLogger(Lock.class);
0049: public final static Lock NULL_LOCK = new Lock(LockID.NULL_ID, 0,
0050: new LockEventListener[] {}, true,
0051: LockManagerImpl.ALTRUISTIC_LOCK_POLICY,
0052: ServerThreadContextFactory.DEFAULT_FACTORY,
0053: L2LockStatsManager.NULL_LOCK_STATS_MANAGER);
0054:
0055: private final LockEventListener[] listeners;
0056: private final Map greedyHolders = new HashMap();
0057: private final Map holders = new HashMap();
0058: private final Map tryLockTimers = new HashMap();
0059: private final ListOrderedMap pendingLockRequests = new ListOrderedMap();
0060:
0061: private final ListOrderedMap waiters = new ListOrderedMap();
0062: private final Map waitTimers = new HashMap();
0063: private final LockID lockID;
0064: private final long timeout;
0065: private final boolean isNull;
0066: private int level;
0067: private boolean recalled = false;
0068:
0069: private int lockPolicy;
0070: private final ServerThreadContextFactory threadContextFactory;
0071: private final L2LockStatsManager lockStatsManager;
0072:
0073: // real constructor used by lock manager
0074: Lock(LockID lockID, ServerThreadContext txn, int lockLevel,
0075: Sink lockResponseSink, long timeout,
0076: LockEventListener[] listeners, int lockPolicy,
0077: ServerThreadContextFactory threadContextFactory,
0078: L2LockStatsManager lockStatsManager) {
0079: this (lockID, timeout, listeners, false, lockPolicy,
0080: threadContextFactory, lockStatsManager);
0081: requestLock(txn, lockLevel, lockResponseSink);
0082: }
0083:
0084: // real constructor used by lock manager when re-establishing waits and lock holds on
0085: // restart.
0086: Lock(LockID lockID, ServerThreadContext txn, long timeout,
0087: LockEventListener[] listeners, int lockPolicy,
0088: ServerThreadContextFactory threadContextFactory,
0089: L2LockStatsManager lockStatsManager) {
0090: this (lockID, timeout, listeners, false, lockPolicy,
0091: threadContextFactory, lockStatsManager);
0092: }
0093:
0094: // for tests
0095: Lock(LockID lockID, long timeout, LockEventListener[] listeners) {
0096: this (lockID, timeout, listeners, false,
0097: LockManagerImpl.ALTRUISTIC_LOCK_POLICY,
0098: ServerThreadContextFactory.DEFAULT_FACTORY,
0099: L2LockStatsManager.NULL_LOCK_STATS_MANAGER);
0100: }
0101:
0102: private Lock(LockID lockID, long timeout,
0103: LockEventListener[] listeners, boolean isNull,
0104: int lockPolicy,
0105: ServerThreadContextFactory threadContextFactory,
0106: L2LockStatsManager lockStatsManager) {
0107: this .lockID = lockID;
0108: this .listeners = listeners;
0109: this .timeout = timeout;
0110: this .isNull = isNull;
0111: this .lockPolicy = lockPolicy;
0112: this .threadContextFactory = threadContextFactory;
0113: this .lockStatsManager = lockStatsManager;
0114: }
0115:
0116: static LockResponseContext createLockRejectedResponseContext(
0117: LockID lockID, ServerThreadID threadID, int level) {
0118: return new LockResponseContext(lockID, threadID.getNodeID(),
0119: threadID.getClientThreadID(), level,
0120: LockResponseContext.LOCK_NOT_AWARDED);
0121: }
0122:
0123: static LockResponseContext createLockAwardResponseContext(
0124: LockID lockID, ServerThreadID threadID, int level) {
0125: LockResponseContext lrc = new LockResponseContext(lockID,
0126: threadID.getNodeID(), threadID.getClientThreadID(),
0127: level, LockResponseContext.LOCK_AWARD);
0128: return lrc;
0129: }
0130:
0131: static LockResponseContext createLockRecallResponseContext(
0132: LockID lockID, ServerThreadID threadID, int level) {
0133: return new LockResponseContext(lockID, threadID.getNodeID(),
0134: threadID.getClientThreadID(), level,
0135: LockResponseContext.LOCK_RECALL);
0136: }
0137:
0138: static LockResponseContext createLockWaitTimeoutResponseContext(
0139: LockID lockID, ServerThreadID threadID, int level) {
0140: return new LockResponseContext(lockID, threadID.getNodeID(),
0141: threadID.getClientThreadID(), level,
0142: LockResponseContext.LOCK_WAIT_TIMEOUT);
0143: }
0144:
0145: static LockResponseContext createLockQueriedResponseContext(
0146: LockID lockID, ServerThreadID threadID, int level,
0147: int lockRequestQueueLength, Collection greedyHolders,
0148: Collection holders, Collection waiters) {
0149: return new LockResponseContext(lockID, threadID.getNodeID(),
0150: threadID.getClientThreadID(), level,
0151: lockRequestQueueLength, greedyHolders, holders,
0152: waiters, LockResponseContext.LOCK_INFO);
0153: }
0154:
0155: static LockResponseContext createLockStatEnableResponseContext(
0156: LockID lockID, NodeID nodeID, ThreadID threadID, int level,
0157: int stackTraceDepth, int statCollectFrequency) {
0158: return new LockResponseContext(lockID, nodeID, threadID, level,
0159: LockResponseContext.LOCK_STAT_ENABLED, stackTraceDepth,
0160: statCollectFrequency);
0161: }
0162:
0163: static LockResponseContext createLockStatDisableResponseContext(
0164: LockID lockID, NodeID nodeID, ThreadID threadID, int level) {
0165: return new LockResponseContext(lockID, nodeID, threadID, level,
0166: LockResponseContext.LOCK_STAT_DISABLED);
0167: }
0168:
0169: private static Request createRequest(ServerThreadContext txn,
0170: int lockLevel, Sink lockResponseSink,
0171: WaitInvocation timeout, boolean isBlock) {
0172: Request request = null;
0173: if (isBlock) {
0174: request = new TryLockRequest(txn, lockLevel,
0175: lockResponseSink, timeout);
0176: } else {
0177: request = new Request(txn, lockLevel, lockResponseSink);
0178: }
0179: return request;
0180: }
0181:
0182: synchronized LockMBean getMBean(DSOChannelManager channelManager) {
0183: int count;
0184: LockHolder[] holds = new LockHolder[this .holders.size()];
0185: ServerLockRequest[] reqs = new ServerLockRequest[this .pendingLockRequests
0186: .size()];
0187: Waiter[] waits = new Waiter[this .waiters.size()];
0188:
0189: count = 0;
0190: for (Iterator i = this .holders.values().iterator(); i.hasNext();) {
0191: Holder h = (Holder) i.next();
0192: NodeID cid = h.getNodeID();
0193: holds[count] = new LockHolder(h.getLockID(), cid,
0194: channelManager.getChannelAddress(cid), h
0195: .getThreadID(), h.getLockLevel(), h
0196: .getTimestamp());
0197: holds[count++].lockAcquired(h.getTimestamp());
0198: }
0199:
0200: count = 0;
0201: for (Iterator i = this .pendingLockRequests.values().iterator(); i
0202: .hasNext();) {
0203: Request r = (Request) i.next();
0204: NodeID cid = r.getRequesterID();
0205: reqs[count++] = new ServerLockRequest(cid, channelManager
0206: .getChannelAddress(cid), r.getSourceID(), r
0207: .getLockLevel(), r.getTimestamp());
0208: }
0209:
0210: count = 0;
0211: for (Iterator i = this .waiters.values().iterator(); i.hasNext();) {
0212: LockWaitContext wc = (LockWaitContext) i.next();
0213: NodeID cid = wc.getNodeID();
0214: waits[count++] = new Waiter(cid, channelManager
0215: .getChannelAddress(cid), wc.getThreadID(), wc
0216: .getWaitInvocation(), wc.getTimestamp());
0217: }
0218:
0219: return new LockMBeanImpl(lockID, holds, reqs, waits);
0220: }
0221:
0222: synchronized void enableClientStat(Sink lockResponseSink,
0223: int stackTraceDepth, int statCollectFrequency) {
0224: enableClientStatInHolders(lockResponseSink, stackTraceDepth,
0225: statCollectFrequency);
0226: enableClientStatInPendings(lockResponseSink, stackTraceDepth,
0227: statCollectFrequency);
0228: enableClientStatInWaiters(lockResponseSink, stackTraceDepth,
0229: statCollectFrequency);
0230: }
0231:
0232: synchronized void disableClientStat(Set statEnabledClients,
0233: Sink lockResponseSink) {
0234: for (Iterator i = statEnabledClients.iterator(); i.hasNext();) {
0235: NodeID nodeID = (NodeID) i.next();
0236: lockResponseSink.add(createLockStatDisableResponseContext(
0237: lockID, nodeID, ThreadID.VM_ID, level));
0238: }
0239: }
0240:
0241: private void enableClientStatInHolders(Sink lockResponseSink,
0242: int stackTraceDepth, int statCollectFrequency) {
0243: for (Iterator i = holders.values().iterator(); i.hasNext();) {
0244: Holder holder = (Holder) i.next();
0245: remoteEnableClientStat(lockResponseSink,
0246: holder.getNodeID(), stackTraceDepth,
0247: statCollectFrequency);
0248: }
0249: }
0250:
0251: private void enableClientStatInPendings(Sink lockResponseSink,
0252: int stackTraceDepth, int statCollectFrequency) {
0253: for (Iterator i = pendingLockRequests.values().iterator(); i
0254: .hasNext();) {
0255: Request request = (Request) i.next();
0256: remoteEnableClientStat(lockResponseSink, request
0257: .getThreadContext().getId().getNodeID(),
0258: stackTraceDepth, statCollectFrequency);
0259: }
0260: }
0261:
0262: private void enableClientStatInWaiters(Sink lockResponseSink,
0263: int stackTraceDepth, int statCollectFrequency) {
0264: for (Iterator i = waiters.values().iterator(); i.hasNext();) {
0265: LockWaitContext ctxt = (LockWaitContext) i.next();
0266: remoteEnableClientStat(lockResponseSink, ctxt.getNodeID(),
0267: stackTraceDepth, statCollectFrequency);
0268: }
0269: }
0270:
0271: private void remoteEnableClientStat(Sink lockResponseSink,
0272: NodeID nodeID, int stackTraceDepth, int statCollectFrequency) {
0273: if (!lockStatsManager.isLockStackTraceEnabledInClient(lockID,
0274: nodeID)) {
0275: lockResponseSink.add(createLockStatEnableResponseContext(
0276: lockID, nodeID, ThreadID.VM_ID, level,
0277: stackTraceDepth, statCollectFrequency));
0278: lockStatsManager.recordClientStackTraceEnabled(lockID,
0279: nodeID);
0280: }
0281: }
0282:
0283: private void enableClientStatIfNeeded(Sink lockResponseSink,
0284: ServerThreadContext txn) {
0285: if (lockStatsManager.isClientLockStackTraceEnable(lockID)) {
0286: remoteEnableClientStat(lockResponseSink, txn.getId()
0287: .getNodeID(), lockStatsManager
0288: .getLockStackTraceDepth(lockID), lockStatsManager
0289: .getLockStatCollectFrequency(lockID));
0290: }
0291: }
0292:
0293: synchronized void queryLock(ServerThreadContext txn,
0294: Sink lockResponseSink) {
0295:
0296: // TODO:
0297: // The Remote Lock Manager needs to ask the client for lock information when greedy lock is awarded.
0298: // Currently, the Remote Lock Manager responds to queryLock by looking at the server only.
0299: lockResponseSink.add(createLockQueriedResponseContext(
0300: this .lockID, txn.getId(), this .level,
0301: this .pendingLockRequests.size(), this .greedyHolders
0302: .values(), this .holders.values(), this .waiters
0303: .values()));
0304: }
0305:
0306: boolean tryRequestLock(ServerThreadContext txn,
0307: int requestedLockLevel, WaitInvocation timeout,
0308: WaitTimer waitTimer, WaitTimerCallback callback,
0309: Sink lockResponseSink) {
0310: return requestLock(txn, requestedLockLevel, lockResponseSink,
0311: true, timeout, waitTimer, callback);
0312: }
0313:
0314: boolean requestLock(ServerThreadContext txn,
0315: int requestedLockLevel, Sink lockResponseSink) {
0316: return requestLock(txn, requestedLockLevel, lockResponseSink,
0317: false, null, null, null);
0318: }
0319:
0320: // XXX:: UPGRADE Requests can come in with requestLockLevel == UPGRADE on a notified wait during server crash
0321: synchronized boolean requestLock(ServerThreadContext txn,
0322: int requestedLockLevel, Sink lockResponseSink,
0323: boolean noBlock, WaitInvocation timeout,
0324: WaitTimer waitTimer, WaitTimerCallback callback) {
0325:
0326: if (holdsReadLock(txn) && LockLevel.isWrite(requestedLockLevel)) {
0327: // lock upgrade is not supported; it should have been rejected by the client.
0328: throw new TCLockUpgradeNotSupportedError(
0329: "Lock upgrade is not supported. The request should have been rejected by the client. Your client may be using an older version of tc.jar");
0330: }
0331:
0332: if (waiters.containsKey(txn))
0333: throw new AssertionError(
0334: "Attempt to request a lock in a Thread "
0335: + "that is already part of the wait set. lock = "
0336: + this );
0337:
0338: enableClientStatIfNeeded(lockResponseSink, txn);
0339: recordLockRequestStat(txn.getId().getNodeID(), txn.getId()
0340: .getClientThreadID(), requestedLockLevel);
0341:
0342: // debug("requestLock - BEGIN -", txn, ",", LockLevel.toString(requestedLockLevel));
0343: // it is an error (probably originating from the client side) to
0344: // request a lock you already hold
0345: Holder holder = getHolder(txn);
0346: if (noBlock
0347: && !timeout.needsToWait()
0348: && holder == null
0349: && (requestedLockLevel != LockLevel.READ || !this
0350: .isRead())
0351: && (getHoldersCount() > 0 || hasGreedyHolders())) {
0352: cannotAwardAndRespond(txn, requestedLockLevel,
0353: lockResponseSink);
0354: return false;
0355: }
0356:
0357: if (holder != null) {
0358: if (LockLevel.NIL_LOCK_LEVEL != (holder.getLockLevel() & requestedLockLevel)) {
0359: // formatting
0360: throw new AssertionError(
0361: "Client requesting already held lock! holder="
0362: + holder + ", lock=" + this );
0363: }
0364: }
0365:
0366: if (isPolicyGreedy()) {
0367: if (canAwardGreedilyOnTheClient(txn, requestedLockLevel)) {
0368: // These requests are the ones in the wire when the greedy lock was given out to the client.
0369: // We can safely ignore it as the clients will be able to award it locally.
0370: logger
0371: .debug(lockID
0372: + " : Lock.requestLock() : Ignoring the Lock request("
0373: + txn
0374: + ","
0375: + LockLevel
0376: .toString(requestedLockLevel)
0377: + ") message from the a client that has the lock greedily.");
0378: return false;
0379: } else if (recalled) {
0380: // add to pending until recall process is complete, those who hold the lock greedily will send the
0381: // pending state during recall commit.
0382: if (!holdsGreedyLock(txn)) {
0383: queueRequest(txn, requestedLockLevel,
0384: lockResponseSink, noBlock, timeout,
0385: waitTimer, callback);
0386: }
0387: return false;
0388: }
0389: }
0390:
0391: // Lock granting logic:
0392: // 0. If no one is holding this lock, go ahead and award it
0393: // 1. If only a read lock is held and no write locks are pending, and another read
0394: // (and only read) lock is requested, award it. If Write locks are pending, we dont want to
0395: // starve the WRITES by keeping on awarding READ Locks.
0396: // 2. Else the request must be queued (ie. added to pending list)
0397:
0398: if ((getHoldersCount() == 0)
0399: || ((!hasPending()) && ((requestedLockLevel == LockLevel.READ) && this
0400: .isRead()))) {
0401: // (0, 1) uncontended or additional read lock
0402: if (isPolicyGreedy()
0403: && ((requestedLockLevel == LockLevel.READ) || (getWaiterCount() == 0))) {
0404: awardGreedyAndRespond(txn, requestedLockLevel,
0405: lockResponseSink);
0406: } else {
0407: awardAndRespond(txn, txn.getId().getClientThreadID(),
0408: requestedLockLevel, lockResponseSink);
0409: }
0410: } else {
0411: // (2) queue request
0412: if (isPolicyGreedy() && hasGreedyHolders()) {
0413: recall(requestedLockLevel);
0414: }
0415: if (!holdsGreedyLock(txn)) {
0416: queueRequest(txn, requestedLockLevel, lockResponseSink,
0417: noBlock, timeout, waitTimer, callback);
0418: }
0419: return false;
0420: }
0421:
0422: return true;
0423: }
0424:
0425: private void queueRequest(ServerThreadContext txn,
0426: int requestedLockLevel, Sink lockResponseSink,
0427: boolean noBlock, WaitInvocation timeout,
0428: WaitTimer waitTimer, WaitTimerCallback callback) {
0429: if (noBlock) {
0430: // By the time it reaches here, timeout.needsToWait() must be true
0431: Assert.assertTrue(timeout.needsToWait());
0432: addPendingTryLockRequest(txn, requestedLockLevel, timeout,
0433: lockResponseSink, waitTimer, callback);
0434: } else {
0435: addPendingLockRequest(txn, requestedLockLevel,
0436: lockResponseSink);
0437: }
0438: }
0439:
0440: synchronized void addRecalledHolder(ServerThreadContext txn,
0441: int lockLevel) {
0442: // debug("addRecalledHolder - BEGIN -", txn, ",", LockLevel.toString(lockLevel));
0443: if (!LockLevel.isWrite(level) && LockLevel.isWrite(lockLevel)) {
0444: // Client issued a WRITE lock without holding a GREEDY WRITE. Bug in the client.
0445: throw new AssertionError(
0446: "Client issued a WRITE lock without holding a GREEDY WRITE !");
0447: }
0448: recordLockRequestStat(txn.getId().getNodeID(), txn.getId()
0449: .getClientThreadID(), lockLevel);
0450: awardLock(txn, txn.getId().getClientThreadID(), lockLevel);
0451: }
0452:
0453: synchronized void addRecalledPendingRequest(
0454: ServerThreadContext txn, int lockLevel,
0455: Sink lockResponseSink) {
0456: // debug("addRecalledPendingRequest - BEGIN -", txn, ",", LockLevel.toString(lockLevel));
0457: recordLockRequestStat(txn.getId().getNodeID(), txn.getId()
0458: .getClientThreadID(), lockLevel);
0459: addPendingLockRequest(txn, lockLevel, lockResponseSink);
0460: }
0461:
0462: synchronized void addRecalledTryLockPendingRequest(
0463: ServerThreadContext txn, int lockLevel,
0464: WaitInvocation timeout, Sink lockResponseSink,
0465: WaitTimer waitTimer, WaitTimerCallback callback) {
0466: recordLockRequestStat(txn.getId().getNodeID(), txn.getId()
0467: .getClientThreadID(), lockLevel);
0468:
0469: if (!timeout.needsToWait()) {
0470: cannotAwardAndRespond(txn, lockLevel, lockResponseSink);
0471: return;
0472: }
0473:
0474: addPendingTryLockRequest(txn, lockLevel, timeout,
0475: lockResponseSink, waitTimer, callback);
0476: }
0477:
0478: private void addPendingTryLockRequest(ServerThreadContext txn,
0479: int lockLevel, WaitInvocation timeout,
0480: Sink lockResponseSink, WaitTimer waitTimer,
0481: WaitTimerCallback callback) {
0482: Request request = addPending(txn, lockLevel, lockResponseSink,
0483: timeout, true);
0484:
0485: TryLockContextImpl tryLockWaitRequestContext = new TryLockContextImpl(
0486: txn, this , timeout, lockLevel, lockResponseSink);
0487:
0488: scheduleWaitForTryLock(callback, waitTimer, request,
0489: tryLockWaitRequestContext);
0490: }
0491:
0492: private void addPendingLockRequest(
0493: ServerThreadContext threadContext, int lockLevel,
0494: Sink awardLockSink) {
0495: addPending(threadContext, lockLevel, awardLockSink, null, false);
0496: }
0497:
0498: private Request addPending(ServerThreadContext threadContext,
0499: int lockLevel, Sink awardLockSink, WaitInvocation timeout,
0500: boolean noBlock) {
0501: Assert.assertFalse(isNull());
0502: // debug("addPending() - BEGIN -", threadContext, ", ", LockLevel.toString(lockLevel));
0503:
0504: Request request = createRequest(threadContext, lockLevel,
0505: awardLockSink, timeout, noBlock);
0506:
0507: if (pendingLockRequests.containsValue(request)) {
0508: logger.debug("Ignoring existing Request " + request
0509: + " in Lock " + lockID);
0510: return request;
0511: }
0512:
0513: this .pendingLockRequests.put(threadContext, request);
0514: for (Iterator currentHolders = holders.values().iterator(); currentHolders
0515: .hasNext();) {
0516: Holder holder = (Holder) currentHolders.next();
0517: notifyAddPending(holder);
0518: }
0519: threadContext.setWaitingOn(this );
0520: return request;
0521: }
0522:
0523: private synchronized void recall(int recallLevel) {
0524: if (recalled) {
0525: return;
0526: }
0527: recordLockHoppedStat();
0528: for (Iterator i = greedyHolders.values().iterator(); i
0529: .hasNext();) {
0530: Holder holder = (Holder) i.next();
0531: // debug("recall() - issued for -", holder);
0532: holder.getSink().add(
0533: createLockRecallResponseContext(holder.getLockID(),
0534: holder.getThreadContext().getId(),
0535: recallLevel));
0536: recalled = true;
0537: }
0538: }
0539:
0540: private boolean isGreedyRequest(ServerThreadContext txn) {
0541: return (txn.getId().getClientThreadID().equals(ThreadID.VM_ID));
0542: }
0543:
0544: private boolean isPolicyGreedy() {
0545: return lockPolicy == LockManagerImpl.GREEDY_LOCK_POLICY;
0546: }
0547:
0548: int getLockPolicy() {
0549: return lockPolicy;
0550: }
0551:
0552: int getLockLevel() {
0553: return level;
0554: }
0555:
0556: void setLockPolicy(int newPolicy) {
0557: if (!isNull() && newPolicy != lockPolicy) {
0558: this .lockPolicy = newPolicy;
0559: if (!isPolicyGreedy()) {
0560: recall(LockLevel.WRITE);
0561: }
0562: }
0563: }
0564:
0565: private void awardGreedyAndRespond(ServerThreadContext txn,
0566: int requestedLockLevel, Sink lockResponseSink) {
0567: // debug("awardGreedyAndRespond() - BEGIN - ", txn, ",", LockLevel.toString(requestedLockLevel));
0568: final ServerThreadContext clientTx = getClientVMContext(txn);
0569: final int greedyLevel = LockLevel
0570: .makeGreedy(requestedLockLevel);
0571:
0572: NodeID ch = txn.getId().getNodeID();
0573: checkAndClearStateOnGreedyAward(
0574: txn.getId().getClientThreadID(), ch, requestedLockLevel);
0575: Holder holder = awardAndRespond(clientTx, txn.getId()
0576: .getClientThreadID(), greedyLevel, lockResponseSink);
0577: holder.setSink(lockResponseSink);
0578: greedyHolders.put(ch, holder);
0579: clearWaitingOn(txn);
0580: }
0581:
0582: private void cannotAwardAndRespond(ServerThreadContext txn,
0583: int requestedLockLevel, Sink lockResponseSink) {
0584: lockResponseSink.add(createLockRejectedResponseContext(
0585: this .lockID, txn.getId(), requestedLockLevel));
0586: recordLockRejectStat(txn.getId().getNodeID(), txn.getId()
0587: .getClientThreadID());
0588: }
0589:
0590: private Holder awardAndRespond(ServerThreadContext txn,
0591: ThreadID requestThreadID, int requestedLockLevel,
0592: Sink lockResponseSink) {
0593: // debug("awardRespond() - BEGIN - ", txn, ",", LockLevel.toString(requestedLockLevel));
0594: Holder holder = awardLock(txn, requestThreadID,
0595: requestedLockLevel);
0596: lockResponseSink.add(createLockAwardResponseContext(
0597: this .lockID, txn.getId(), requestedLockLevel));
0598: return holder;
0599: }
0600:
0601: synchronized void notify(ServerThreadContext txn, boolean all,
0602: NotifiedWaiters addNotifiedWaitersTo)
0603: throws TCIllegalMonitorStateException {
0604: // debug("notify() - BEGIN - ", txn, ", all = " + all);
0605: if (waiters.containsKey(txn)) {
0606: throw Assert.failure("Can't notify self: " + txn);
0607: }
0608: checkLegalWaitNotifyState(txn);
0609:
0610: if (waiters.size() > 0) {
0611: final int numToNotify = all ? waiters.size() : 1;
0612: for (int i = 0; i < numToNotify; i++) {
0613: LockWaitContext wait = (LockWaitContext) waiters
0614: .remove(0);
0615: removeAndCancelWaitTimer(wait);
0616: recordLockRequestStat(wait.getNodeID(), wait
0617: .getThreadID(), wait.lockLevel());
0618: createPendingFromWaiter(wait);
0619: addNotifiedWaitersTo.addNotification(new LockContext(
0620: lockID, wait.getNodeID(), wait.getThreadID(),
0621: wait.lockLevel()));
0622: }
0623: recordLockNotifyStat(numToNotify);
0624: }
0625: }
0626:
0627: synchronized void interrupt(ServerThreadContext txn) {
0628: if (waiters.size() == 0 || !waiters.containsKey(txn)) {
0629: logger
0630: .warn("Cannot interrupt: " + txn
0631: + " is not waiting.");
0632: return;
0633: }
0634: LockWaitContext wait = (LockWaitContext) waiters.remove(txn);
0635: recordLockRequestStat(wait.getNodeID(), wait.getThreadID(),
0636: wait.lockLevel());
0637: removeAndCancelWaitTimer(wait);
0638: createPendingFromWaiter(wait);
0639: }
0640:
0641: private void removeAndCancelWaitTimer(LockWaitContext wait) {
0642: TimerTask task = (TimerTask) waitTimers.remove(wait);
0643: if (task != null)
0644: task.cancel();
0645: }
0646:
0647: private Request createPendingFromWaiter(LockWaitContext wait) {
0648: // XXX: This cast to WaitContextImpl is lame. I'm not sure how to refactor it right now.
0649: Request request = createRequest(((LockWaitContextImpl) wait)
0650: .getThreadContext(), wait.lockLevel(), wait
0651: .getLockResponseSink(), null, false);
0652: createPending(wait, request);
0653: return request;
0654: }
0655:
0656: private void createPending(LockWaitContext wait, Request request) {
0657: ServerThreadContext txn = ((LockWaitContextImpl) wait)
0658: .getThreadContext();
0659: pendingLockRequests.put(txn, request);
0660:
0661: if (isPolicyGreedy() && hasGreedyHolders()) {
0662: recall(request.getLockLevel());
0663: }
0664: }
0665:
0666: synchronized void tryRequestLockTimeout(LockWaitContext context) {
0667: TryLockContextImpl tryLockContext = (TryLockContextImpl) context;
0668: ServerThreadContext txn = tryLockContext.getThreadContext();
0669: Object removed = tryLockTimers.remove(txn);
0670: if (removed != null) {
0671: pendingLockRequests.remove(txn);
0672: Sink lockResponseSink = context.getLockResponseSink();
0673: int lockLevel = context.lockLevel();
0674: cannotAwardAndRespond(txn, lockLevel, lockResponseSink);
0675: }
0676: }
0677:
0678: synchronized void waitTimeout(LockWaitContext context) {
0679:
0680: // debug("waitTimeout() - BEGIN -", context);
0681: // XXX: This cast is gross, too.
0682: ServerThreadContext txn = ((LockWaitContextImpl) context)
0683: .getThreadContext();
0684: Object removed = waiters.remove(txn);
0685:
0686: if (removed != null) {
0687: waitTimers.remove(context);
0688: Sink lockResponseSink = context.getLockResponseSink();
0689: int lockLevel = context.lockLevel();
0690:
0691: // Add a wait Timeout message
0692: lockResponseSink.add(createLockWaitTimeoutResponseContext(
0693: this .lockID, txn.getId(), lockLevel));
0694:
0695: recordLockRequestStat(context.getNodeID(), context
0696: .getThreadID(), lockLevel);
0697: if (holders.size() == 0) {
0698: if (isPolicyGreedy() && (getWaiterCount() == 0)) {
0699: awardGreedyAndRespond(txn, lockLevel,
0700: lockResponseSink);
0701: } else {
0702: awardAndRespond(txn, txn.getId()
0703: .getClientThreadID(), lockLevel,
0704: lockResponseSink);
0705: }
0706: } else {
0707: createPendingFromWaiter(context);
0708: }
0709: }
0710: }
0711:
0712: synchronized void wait(ServerThreadContext txn,
0713: WaitTimer waitTimer, WaitInvocation call,
0714: WaitTimerCallback callback, Sink lockResponseSink)
0715: throws TCIllegalMonitorStateException {
0716: // debug("wait() - BEGIN -", txn, ", ", call);
0717: if (waiters.containsKey(txn))
0718: throw Assert.failure("Already in wait set: " + txn);
0719: checkLegalWaitNotifyState(txn);
0720:
0721: Holder current = getHolder(txn);
0722: Assert.assertNotNull(current);
0723:
0724: LockWaitContext waitContext = new LockWaitContextImpl(txn,
0725: this , call, current.getLockLevel(), lockResponseSink);
0726: waiters.put(txn, waitContext);
0727:
0728: scheduleWait(callback, waitTimer, waitContext);
0729: txn.setWaitingOn(this );
0730: removeCurrentHold(txn);
0731:
0732: recordLockWaitStat();
0733: nextPending();
0734: }
0735:
0736: // This method reestablished Wait State and schedules wait timeouts too. There are cases where we may need to ignore a
0737: // wait, if we already know about it. Note that it could be either in waiting or pending state.
0738: synchronized void addRecalledWaiter(ServerThreadContext txn,
0739: WaitInvocation call, int lockLevel, Sink lockResponseSink,
0740: WaitTimer waitTimer, WaitTimerCallback callback) {
0741: // debug("addRecalledWaiter() - BEGIN -", txn, ", ", call);
0742:
0743: LockWaitContext waitContext = new LockWaitContextImpl(txn,
0744: this , call, lockLevel, lockResponseSink);
0745: if (waiters.containsKey(txn)) {
0746: logger.debug("addRecalledWaiter(): Ignoring " + waitContext
0747: + " as it is already in waiters list.");
0748: return;
0749: }
0750: Request request = createRequest(txn, lockLevel,
0751: lockResponseSink, null, false);
0752: if (pendingLockRequests.containsValue(request)) {
0753: logger.debug("addRecalledWaiter(): Ignoring " + waitContext
0754: + " as it is already in pending list.");
0755: return;
0756: }
0757: waiters.put(txn, waitContext);
0758: scheduleWait(callback, waitTimer, waitContext);
0759: recordLockWaitStat();
0760: }
0761:
0762: // This method reestablished Wait State and does not schedules wait timeouts too. This is
0763: // called when LockManager is starting and wait timers are started when the lock Manager is started.
0764: synchronized void reestablishWait(ServerThreadContext txn,
0765: WaitInvocation call, int lockLevel, Sink lockResponseSink) {
0766: LockWaitContext waitContext = new LockWaitContextImpl(txn,
0767: this , call, lockLevel, lockResponseSink);
0768: Object old = waiters.put(txn, waitContext);
0769: if (old != null)
0770: throw Assert.failure("Already in wait set: " + txn);
0771: }
0772:
0773: synchronized void reestablishLock(
0774: ServerThreadContext threadContext, int requestedLevel,
0775: Sink lockResponseSink) {
0776: if ((LockLevel.isWrite(requestedLevel) && holders.size() != 0)
0777: || (LockLevel.isRead(requestedLevel) && LockLevel
0778: .isWrite(this .level))) {
0779: throw new AssertionError("Lock " + this
0780: + " already held by other Holder. Can't grant to "
0781: + threadContext
0782: + LockLevel.toString(requestedLevel));
0783:
0784: }
0785: if (waiters.get(threadContext) != null) {
0786: throw new AssertionError("Thread " + threadContext
0787: + "is already in Wait state for Lock " + this
0788: + ". Can't grant Lock Hold !");
0789: }
0790: recordLockRequestStat(threadContext.getId().getNodeID(),
0791: threadContext.getId().getClientThreadID(),
0792: requestedLevel);
0793: if (isGreedyRequest(threadContext)) {
0794: int greedyLevel = LockLevel.makeGreedy(requestedLevel);
0795: NodeID nid = threadContext.getId().getNodeID();
0796: Holder holder = awardLock(threadContext, threadContext
0797: .getId().getClientThreadID(), greedyLevel);
0798: holder.setSink(lockResponseSink);
0799: greedyHolders.put(nid, holder);
0800: } else {
0801: awardLock(threadContext, threadContext.getId()
0802: .getClientThreadID(), requestedLevel);
0803: }
0804: }
0805:
0806: private void scheduleWait(WaitTimerCallback callback,
0807: WaitTimer waitTimer, LockWaitContext waitContext) {
0808: final TimerTask timer = waitTimer.scheduleTimer(callback,
0809: waitContext.getWaitInvocation(), waitContext);
0810: if (timer != null) {
0811: waitTimers.put(waitContext, timer);
0812: }
0813: }
0814:
0815: private TimerTask scheduleWaitForTryLock(
0816: WaitTimerCallback callback, WaitTimer waitTimer,
0817: Request pendingRequest,
0818: TryLockContextImpl tryLockWaitRequestContext) {
0819: final TimerTask timer = waitTimer.scheduleTimer(callback,
0820: tryLockWaitRequestContext.getWaitInvocation(),
0821: tryLockWaitRequestContext);
0822: if (timer != null) {
0823: tryLockTimers.put(tryLockWaitRequestContext
0824: .getThreadContext(), timer);
0825: }
0826: return timer;
0827: }
0828:
0829: private void checkLegalWaitNotifyState(
0830: ServerThreadContext threadContext)
0831: throws TCIllegalMonitorStateException {
0832: Assert.assertFalse(isNull());
0833:
0834: final int holdersSize = holders.size();
0835: if (holdersSize != 1) {
0836: throw new TCIllegalMonitorStateException(
0837: "Invalid holder set size: " + holdersSize);
0838: }
0839:
0840: final int currentLevel = this .level;
0841: if (!LockLevel.isWrite(currentLevel)) {
0842: throw new TCIllegalMonitorStateException(
0843: "Incorrect lock level: "
0844: + LockLevel.toString(currentLevel));
0845: }
0846:
0847: Holder holder = getHolder(threadContext);
0848: if (holder == null) {
0849: holder = getHolder(getClientVMContext(threadContext));
0850: }
0851:
0852: if (holder == null) {
0853: // make formatter sane
0854: throw new TCIllegalMonitorStateException(threadContext
0855: + " is not the current lock holder for: "
0856: + threadContext);
0857: }
0858: }
0859:
0860: private ServerThreadContext getClientVMContext(
0861: ServerThreadContext threadContext) {
0862: return threadContextFactory.getOrCreate(threadContext.getId()
0863: .getNodeID(), ThreadID.VM_ID);
0864: }
0865:
0866: public synchronized int getHoldersCount() {
0867: return holders.size();
0868: }
0869:
0870: public synchronized int getPendingCount() {
0871: return pendingLockRequests.size();
0872: }
0873:
0874: Collection getHoldersCollection() {
0875: return Collections
0876: .unmodifiableCollection(this .holders.values());
0877: }
0878:
0879: public synchronized String toString() {
0880: try {
0881: StringBuffer rv = new StringBuffer();
0882:
0883: rv.append(lockID).append(", ").append("Level: ").append(
0884: LockLevel.toString(this .level)).append("\r\n");
0885:
0886: rv.append("Holders (").append(holders.size()).append(
0887: ")\r\n");
0888: for (Iterator iter = holders.values().iterator(); iter
0889: .hasNext();) {
0890: rv.append('\t').append(iter.next().toString()).append(
0891: "\r\n");
0892: }
0893:
0894: rv.append("Wait Set (").append(waiters.size()).append(
0895: ")\r\n");
0896: for (Iterator iter = waiters.values().iterator(); iter
0897: .hasNext();) {
0898: rv.append('\t').append(iter.next().toString()).append(
0899: "\r\n");
0900: }
0901:
0902: rv.append("Pending lock requests (").append(
0903: pendingLockRequests.size()).append(")\r\n");
0904: for (Iterator iter = pendingLockRequests.values()
0905: .iterator(); iter.hasNext();) {
0906: rv.append('\t').append(iter.next().toString()).append(
0907: "\r\n");
0908: }
0909:
0910: return rv.toString();
0911: } catch (Throwable t) {
0912: t.printStackTrace();
0913: return "Exception in toString(): " + t.getMessage();
0914: }
0915: }
0916:
0917: private Holder awardLock(ServerThreadContext threadContext,
0918: ThreadID requestThreadID, int lockLevel) {
0919: Assert.assertFalse(isNull());
0920:
0921: Holder holder = getHolder(threadContext);
0922:
0923: Assert.assertNull(holder);
0924: threadContext.addLock(this );
0925: holder = new Holder(this .lockID, threadContext, this .timeout);
0926: holder.addLockLevel(lockLevel);
0927: Object prev = this .holders.put(threadContext, holder);
0928: Assert.assertNull(prev);
0929: this .level = holder.getLockLevel();
0930: notifyAwardLock(holder);
0931: recordLockAwardStat(holder.getNodeID(), requestThreadID,
0932: isGreedyRequest(threadContext), holder.getTimestamp());
0933: return holder;
0934: }
0935:
0936: private void notifyAwardLock(Holder holder) {
0937: final int waitingCount = this .pendingLockRequests.size();
0938:
0939: for (int i = 0; i < listeners.length; i++) {
0940: listeners[i].notifyAward(waitingCount, holder);
0941: }
0942: }
0943:
0944: public synchronized boolean isRead() {
0945: return LockLevel.READ == this .level;
0946: }
0947:
0948: public synchronized boolean isWrite() {
0949: return LockLevel.WRITE == this .level;
0950: }
0951:
0952: private boolean holdsReadLock(ServerThreadContext threadContext) {
0953: Holder holder = getHolder(threadContext);
0954: if (holder != null) {
0955: return holder.getLockLevel() == LockLevel.READ;
0956: }
0957: return false;
0958: }
0959:
0960: private Holder getHolder(ServerThreadContext threadContext) {
0961: return (Holder) this .holders.get(threadContext);
0962: }
0963:
0964: public Holder getLockHolder(ServerThreadContext threadContext) {
0965: Holder lockHolder = (Holder) this .holders.get(threadContext);
0966: if (lockHolder == null) {
0967: lockHolder = (Holder) this .holders
0968: .get(getClientVMContext(threadContext));
0969: }
0970: return lockHolder;
0971: }
0972:
0973: private void notifyAddPending(Holder holder) {
0974: final int waitingCount = this .pendingLockRequests.size();
0975:
0976: for (int i = 0; i < this .listeners.length; i++) {
0977: this .listeners[i].notifyAddPending(waitingCount, holder);
0978: }
0979: }
0980:
0981: synchronized int getWaiterCount() {
0982: return this .waiters.size();
0983: }
0984:
0985: synchronized boolean hasPending() {
0986: return pendingLockRequests.size() > 0;
0987: }
0988:
0989: synchronized boolean hasWaiting() {
0990: return this .waiters.size() > 0;
0991: }
0992:
0993: boolean hasGreedyHolders() {
0994: return this .greedyHolders.size() > 0;
0995: }
0996:
0997: synchronized boolean hasWaiting(ServerThreadContext threadContext) {
0998: return (this .waiters.get(threadContext) != null);
0999: }
1000:
1001: public LockID getLockID() {
1002: return lockID;
1003: }
1004:
1005: public boolean isNull() {
1006: return this .isNull;
1007: }
1008:
1009: public int hashCode() {
1010: return this .lockID.hashCode();
1011: }
1012:
1013: public boolean equals(Object obj) {
1014: if (obj instanceof Lock) {
1015: Lock other = (Lock) obj;
1016: return this .lockID.equals(other.lockID);
1017: }
1018: return false;
1019: }
1020:
1021: private boolean readHolder() {
1022: // We only need to check the first holder as we cannot have 2 holder, one holding a READ and another holding a
1023: // WRITE.
1024: Holder holder = (Holder) holders.values().iterator().next();
1025: return holder != null
1026: && LockLevel.isRead(holder.getLockLevel());
1027: }
1028:
1029: synchronized boolean nextPending() {
1030: Assert.eval(!isNull());
1031: // debug("nextPending() - BEGIN -");
1032:
1033: boolean clear;
1034: try {
1035: // Lock upgrade is not supported.
1036: if (!pendingLockRequests.isEmpty()) {
1037: Request request = (Request) pendingLockRequests
1038: .get(pendingLockRequests.get(0));
1039: int reqLockLevel = request.getLockLevel();
1040:
1041: boolean canGrantRequest = (reqLockLevel == LockLevel.READ) ? (holders
1042: .isEmpty() || readHolder())
1043: : holders.isEmpty();
1044: if (canGrantRequest) {
1045:
1046: switch (reqLockLevel) {
1047: case LockLevel.WRITE: {
1048: pendingLockRequests.remove(0);
1049: cancelTryLockTimer(request);
1050: // Give locks greedily only if there is no one waiting or pending for this lock
1051: if (isPolicyGreedy()
1052: && isAllPendingLockRequestsFromNode(request
1053: .getRequesterID())
1054: && (getWaiterCount() == 0)) {
1055: // debug("nextPending() - Giving GREEDY WRITE request -", request);
1056: grantGreedyRequest(request);
1057: } else {
1058: grantRequest(request);
1059: }
1060: break;
1061: }
1062: case LockLevel.READ: {
1063: // debug("nextPending() - granting READ request -", request);
1064: awardAllReads();
1065: break;
1066: }
1067: default: {
1068: throw new TCInternalError(
1069: "Unknown lock level in request: "
1070: + reqLockLevel);
1071: }
1072: }
1073: }
1074: }
1075: } finally {
1076: clear = holders.size() == 0 && this .waiters.size() == 0
1077: && this .pendingLockRequests.size() == 0;
1078: }
1079:
1080: return clear;
1081: }
1082:
1083: private Request cancelTryLockTimer(Request request) {
1084: if (!(request instanceof TryLockRequest)) {
1085: return null;
1086: }
1087:
1088: ServerThreadContext requestThreadContext = request
1089: .getThreadContext();
1090: TimerTask recallTimer = (TimerTask) tryLockTimers
1091: .remove(requestThreadContext);
1092: if (recallTimer != null) {
1093: recallTimer.cancel();
1094: return request;
1095: }
1096: return null;
1097: }
1098:
1099: private void grantGreedyRequest(Request request) {
1100: // debug("grantGreedyRequest() - BEGIN -", request);
1101: ServerThreadContext threadContext = request.getThreadContext();
1102: awardGreedyAndRespond(threadContext, request.getLockLevel(),
1103: request.getLockResponseSink());
1104: clearWaitingOn(threadContext);
1105: }
1106:
1107: private void grantRequest(Request request) {
1108: // debug("grantRequest() - BEGIN -", request);
1109: ServerThreadContext threadContext = request.getThreadContext();
1110: awardLock(threadContext, threadContext.getId()
1111: .getClientThreadID(), request.getLockLevel());
1112: clearWaitingOn(threadContext);
1113: request.execute(lockID);
1114: }
1115:
1116: /**
1117: * Remove the specified lock hold.
1118: *
1119: * @return true if the current hold was an upgrade
1120: */
1121: synchronized boolean removeCurrentHold(
1122: ServerThreadContext threadContext) {
1123: // debug("removeCurrentHold() - BEGIN -", threadContext);
1124: Holder holder = getHolder(threadContext);
1125: if (holder != null) {
1126: this .holders.remove(threadContext);
1127: threadContext.removeLock(this );
1128: threadContextFactory.removeIfClear(threadContext);
1129: if (isGreedyRequest(threadContext)) {
1130: removeGreedyHolder(threadContext.getId().getNodeID());
1131: }
1132: this .level = (holders.size() == 0 ? LockLevel.NIL_LOCK_LEVEL
1133: : LockLevel.READ);
1134: notifyRevoke(holder);
1135: recordLockReleaseStat(holder.getNodeID(), holder
1136: .getThreadID());
1137: }
1138: return false;
1139: }
1140:
1141: synchronized boolean recallCommit(ServerThreadContext threadContext) {
1142: // debug("recallCommit() - BEGIN -", threadContext);
1143: Assert.assertTrue(isGreedyRequest(threadContext));
1144: boolean issueRecall = !recalled;
1145: removeCurrentHold(threadContext);
1146: if (issueRecall) {
1147: recall(LockLevel.WRITE);
1148: }
1149: if (recalled == false) {
1150: return nextPending();
1151: }
1152: return false;
1153: }
1154:
1155: private synchronized void removeGreedyHolder(NodeID nodeID) {
1156: // debug("removeGreedyHolder() - BEGIN -", channelID);
1157: greedyHolders.remove(nodeID);
1158: if (!hasGreedyHolders()) {
1159: recalled = false;
1160: }
1161: }
1162:
1163: private void clearWaitingOn(ServerThreadContext threadContext) {
1164: threadContext.clearWaitingOn();
1165: threadContextFactory.removeIfClear(threadContext);
1166: }
1167:
1168: synchronized void awardAllReads() {
1169: // debug("awardAllReads() - BEGIN -");
1170: List pendingReadLockRequests = new ArrayList(
1171: pendingLockRequests.size());
1172: boolean hasPendingWrites = false;
1173:
1174: for (Iterator i = pendingLockRequests.values().iterator(); i
1175: .hasNext();) {
1176: Request request = (Request) i.next();
1177: if (request.getLockLevel() == LockLevel.READ) {
1178: pendingReadLockRequests.add(request);
1179: i.remove();
1180: } else if (!hasPendingWrites
1181: && request.getLockLevel() == LockLevel.WRITE) {
1182: hasPendingWrites = true;
1183: }
1184: }
1185:
1186: for (Iterator i = pendingReadLockRequests.iterator(); i
1187: .hasNext();) {
1188: Request request = (Request) i.next();
1189: cancelTryLockTimer(request);
1190: if (isPolicyGreedy() && !hasPendingWrites) {
1191: ServerThreadContext tid = request.getThreadContext();
1192: if (!holdsGreedyLock(tid)) {
1193: grantGreedyRequest(request);
1194: } else {
1195: // These can be awarded locally in the client ...
1196: clearWaitingOn(tid);
1197: }
1198: } else {
1199: grantRequest(request);
1200: }
1201: }
1202: }
1203:
1204: synchronized boolean holdsSomeLock(NodeID nodeID) {
1205: for (Iterator iter = holders.values().iterator(); iter
1206: .hasNext();) {
1207: Holder holder = (Holder) iter.next();
1208: if (holder.getNodeID().equals(nodeID)) {
1209: return true;
1210: }
1211: }
1212: return false;
1213: }
1214:
1215: synchronized boolean holdsGreedyLock(
1216: ServerThreadContext threadContext) {
1217: return (greedyHolders.get(threadContext.getId().getNodeID()) != null);
1218: }
1219:
1220: synchronized boolean canAwardGreedilyOnTheClient(
1221: ServerThreadContext threadContext, int lockLevel) {
1222: Holder holder = (Holder) greedyHolders.get(threadContext
1223: .getId().getNodeID());
1224: if (holder != null) {
1225: return (LockLevel.isWrite(holder.getLockLevel()) || holder
1226: .getLockLevel() == lockLevel);
1227: }
1228: return false;
1229: }
1230:
1231: private void notifyRevoke(Holder holder) {
1232: for (int i = 0; i < this .listeners.length; i++) {
1233: this .listeners[i].notifyRevoke(holder);
1234: }
1235: }
1236:
1237: void notifyStarted(WaitTimerCallback callback, WaitTimer timer) {
1238: for (Iterator i = waiters.values().iterator(); i.hasNext();) {
1239: LockWaitContext ctxt = (LockWaitContext) i.next();
1240: scheduleWait(callback, timer, ctxt);
1241: }
1242: }
1243:
1244: synchronized boolean isAllPendingLockRequestsFromNode(NodeID nodeID) {
1245: for (Iterator i = pendingLockRequests.values().iterator(); i
1246: .hasNext();) {
1247: Request r = (Request) i.next();
1248: if (!r.getRequesterID().equals(nodeID)) {
1249: return false;
1250: }
1251: }
1252: return true;
1253: }
1254:
1255: /**
1256: * This clears out stuff from the pending and wait lists that belonged to a dead session. It occurs to me that this is
1257: * a race condition because a request could come in on the connection, then the cleanup could happen, and then the
1258: * request could be processed. We need to drop requests that are processed after the cleanup
1259: *
1260: * @param nid
1261: */
1262: synchronized void clearStateForNode(NodeID nid) {
1263: // debug("clearStateForChannel() - BEGIN -", channelId);
1264: for (Iterator i = holders.values().iterator(); i.hasNext();) {
1265: Holder holder = (Holder) i.next();
1266: if (holder.getNodeID().equals(nid)) {
1267: i.remove();
1268: }
1269: }
1270: for (Iterator i = pendingLockRequests.values().iterator(); i
1271: .hasNext();) {
1272: Request r = (Request) i.next();
1273: if (r.getRequesterID().equals(nid)) {
1274: i.remove();
1275: }
1276: }
1277:
1278: for (Iterator i = waiters.values().iterator(); i.hasNext();) {
1279: LockWaitContext wc = (LockWaitContext) i.next();
1280: if (wc.getNodeID().equals(nid)) {
1281: i.remove();
1282: }
1283: }
1284:
1285: for (Iterator i = waitTimers.keySet().iterator(); i.hasNext();) {
1286: LockWaitContext wc = (LockWaitContext) i.next();
1287: if (wc.getNodeID().equals(nid)) {
1288: try {
1289: TimerTask task = (TimerTask) waitTimers.get(wc);
1290: task.cancel();
1291: } finally {
1292: i.remove();
1293: }
1294: }
1295: }
1296: removeGreedyHolder(nid);
1297: }
1298:
1299: synchronized void checkAndClearStateOnGreedyAward(
1300: ThreadID clientThreadID, NodeID ch, int requestedLevel) {
1301: // We dont want to award a greedy lock if there are waiters. Lock upgrade is not a problem as it is no longer
1302: // supported.
1303: // debug("checkAndClearStateOnGreedyAward For ", ch, ", ", LockLevel.toString(requestedLevel));
1304: // debug("checkAndClear... BEFORE Lock = ", this);
1305: // Assert.assertTrue(pendingLockUpgrades.size() == 0);
1306: Assert.assertTrue((requestedLevel == LockLevel.READ)
1307: || (waiters.size() == 0));
1308:
1309: for (Iterator i = holders.values().iterator(); i.hasNext();) {
1310: Holder holder = (Holder) i.next();
1311: if (holder.getNodeID().equals(ch)) {
1312: ServerThreadContext txn = holder.getThreadContext();
1313: txn.removeLock(this );
1314: threadContextFactory.removeIfClear(txn);
1315:
1316: i.remove();
1317: }
1318: }
1319: for (Iterator i = pendingLockRequests.values().iterator(); i
1320: .hasNext();) {
1321: Request r = (Request) i.next();
1322: if (r.getRequesterID().equals(ch)) {
1323: if ((requestedLevel == LockLevel.WRITE)
1324: || (r.getLockLevel() == requestedLevel)) {
1325: // debug("checkAndClear... removing request = ", r);
1326: i.remove();
1327: ServerThreadContext tid = r.getThreadContext();
1328: // debug("checkAndClear... clearing threadContext = ", tid);
1329: clearWaitingOn(tid);
1330: cancelTryLockTimer(r);
1331: } else {
1332: throw new AssertionError(
1333: "Issuing READ lock greedily when WRITE pending !");
1334: }
1335: }
1336: }
1337: // debug("checkAndClear... AFTER Lock = ", this);
1338: }
1339:
1340: private void recordLockRequestStat(NodeID nodeID,
1341: ThreadID threadID, int lockLevel) {
1342: lockStatsManager.lockRequested(lockID, nodeID, threadID,
1343: lockLevel);
1344: }
1345:
1346: private void recordLockAwardStat(NodeID nodeID, ThreadID threadID,
1347: boolean isGreedyRequest, long awardTimestamp) {
1348: lockStatsManager.lockAwarded(lockID, nodeID, threadID,
1349: isGreedyRequest, awardTimestamp);
1350: }
1351:
1352: private void recordLockReleaseStat(NodeID nodeID, ThreadID threadID) {
1353: lockStatsManager.lockReleased(lockID, nodeID, threadID);
1354: }
1355:
1356: private void recordLockHoppedStat() {
1357: lockStatsManager.lockHopped(lockID);
1358: }
1359:
1360: private void recordLockRejectStat(NodeID nodeID, ThreadID threadID) {
1361: lockStatsManager.lockRejected(lockID, nodeID, threadID);
1362: }
1363:
1364: private void recordLockWaitStat() {
1365: lockStatsManager.lockWait(lockID);
1366: }
1367:
1368: private void recordLockNotifyStat(int numToNotify) {
1369: lockStatsManager.lockNotified(lockID, numToNotify);
1370: }
1371:
1372: // I wish we were using 1.5 !!!
1373: // private void debug(Object o1, Object o2) {
1374: // logger.warn(lockID + String.valueOf(o1) + String.valueOf(o2));
1375: // }
1376: //
1377: // private void debug(Object o1, Object o2, Object o3) {
1378: // logger.warn(lockID + String.valueOf(o1) + String.valueOf(o2) + String.valueOf(o3));
1379: // }
1380: //
1381: // private void debug(Object o1, Object o2, Object o3, Object o4) {
1382: // logger.warn(lockID + String.valueOf(o1) + String.valueOf(o2) + String.valueOf(o3) + String.valueOf(o4));
1383: // }
1384: //
1385: // private void debug(Object o) {
1386: // logger.warn(lockID + String.valueOf(o));
1387: // }
1388:
1389: }
|