Source Code Cross Referenced for Lock.java in  » Net » Terracotta » com » tc » objectserver » lockmanager » impl » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » Terracotta » com.tc.objectserver.lockmanager.impl 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        /*
0002:         * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
0003:         * notice. All rights reserved.
0004:         */
0005:        package com.tc.objectserver.lockmanager.impl;
0006:
0007:        import org.apache.commons.collections.map.ListOrderedMap;
0008:
0009:        import com.tc.async.api.Sink;
0010:        import com.tc.exception.TCInternalError;
0011:        import com.tc.exception.TCLockUpgradeNotSupportedError;
0012:        import com.tc.logging.TCLogger;
0013:        import com.tc.logging.TCLogging;
0014:        import com.tc.management.L2LockStatsManager;
0015:        import com.tc.net.groups.NodeID;
0016:        import com.tc.object.lockmanager.api.LockContext;
0017:        import com.tc.object.lockmanager.api.LockID;
0018:        import com.tc.object.lockmanager.api.LockLevel;
0019:        import com.tc.object.lockmanager.api.ServerThreadID;
0020:        import com.tc.object.lockmanager.api.ThreadID;
0021:        import com.tc.object.lockmanager.api.WaitTimer;
0022:        import com.tc.object.lockmanager.api.WaitTimerCallback;
0023:        import com.tc.object.net.DSOChannelManager;
0024:        import com.tc.object.tx.WaitInvocation;
0025:        import com.tc.objectserver.context.LockResponseContext;
0026:        import com.tc.objectserver.lockmanager.api.LockEventListener;
0027:        import com.tc.objectserver.lockmanager.api.LockHolder;
0028:        import com.tc.objectserver.lockmanager.api.LockMBean;
0029:        import com.tc.objectserver.lockmanager.api.LockWaitContext;
0030:        import com.tc.objectserver.lockmanager.api.NotifiedWaiters;
0031:        import com.tc.objectserver.lockmanager.api.ServerLockRequest;
0032:        import com.tc.objectserver.lockmanager.api.TCIllegalMonitorStateException;
0033:        import com.tc.objectserver.lockmanager.api.Waiter;
0034:        import com.tc.util.Assert;
0035:
0036:        import java.util.ArrayList;
0037:        import java.util.Collection;
0038:        import java.util.Collections;
0039:        import java.util.HashMap;
0040:        import java.util.Iterator;
0041:        import java.util.List;
0042:        import java.util.Map;
0043:        import java.util.Set;
0044:        import java.util.TimerTask;
0045:
0046:        public class Lock {
0047:            private static final TCLogger logger = TCLogging
0048:                    .getLogger(Lock.class);
0049:            public final static Lock NULL_LOCK = new Lock(LockID.NULL_ID, 0,
0050:                    new LockEventListener[] {}, true,
0051:                    LockManagerImpl.ALTRUISTIC_LOCK_POLICY,
0052:                    ServerThreadContextFactory.DEFAULT_FACTORY,
0053:                    L2LockStatsManager.NULL_LOCK_STATS_MANAGER);
0054:
0055:            private final LockEventListener[] listeners;
0056:            private final Map greedyHolders = new HashMap();
0057:            private final Map holders = new HashMap();
0058:            private final Map tryLockTimers = new HashMap();
0059:            private final ListOrderedMap pendingLockRequests = new ListOrderedMap();
0060:
0061:            private final ListOrderedMap waiters = new ListOrderedMap();
0062:            private final Map waitTimers = new HashMap();
0063:            private final LockID lockID;
0064:            private final long timeout;
0065:            private final boolean isNull;
0066:            private int level;
0067:            private boolean recalled = false;
0068:
0069:            private int lockPolicy;
0070:            private final ServerThreadContextFactory threadContextFactory;
0071:            private final L2LockStatsManager lockStatsManager;
0072:
0073:            // real constructor used by lock manager
0074:            Lock(LockID lockID, ServerThreadContext txn, int lockLevel,
0075:                    Sink lockResponseSink, long timeout,
0076:                    LockEventListener[] listeners, int lockPolicy,
0077:                    ServerThreadContextFactory threadContextFactory,
0078:                    L2LockStatsManager lockStatsManager) {
0079:                this (lockID, timeout, listeners, false, lockPolicy,
0080:                        threadContextFactory, lockStatsManager);
0081:                requestLock(txn, lockLevel, lockResponseSink);
0082:            }
0083:
0084:            // real constructor used by lock manager when re-establishing waits and lock holds on
0085:            // restart.
0086:            Lock(LockID lockID, ServerThreadContext txn, long timeout,
0087:                    LockEventListener[] listeners, int lockPolicy,
0088:                    ServerThreadContextFactory threadContextFactory,
0089:                    L2LockStatsManager lockStatsManager) {
0090:                this (lockID, timeout, listeners, false, lockPolicy,
0091:                        threadContextFactory, lockStatsManager);
0092:            }
0093:
0094:            // for tests
0095:            Lock(LockID lockID, long timeout, LockEventListener[] listeners) {
0096:                this (lockID, timeout, listeners, false,
0097:                        LockManagerImpl.ALTRUISTIC_LOCK_POLICY,
0098:                        ServerThreadContextFactory.DEFAULT_FACTORY,
0099:                        L2LockStatsManager.NULL_LOCK_STATS_MANAGER);
0100:            }
0101:
0102:            private Lock(LockID lockID, long timeout,
0103:                    LockEventListener[] listeners, boolean isNull,
0104:                    int lockPolicy,
0105:                    ServerThreadContextFactory threadContextFactory,
0106:                    L2LockStatsManager lockStatsManager) {
0107:                this .lockID = lockID;
0108:                this .listeners = listeners;
0109:                this .timeout = timeout;
0110:                this .isNull = isNull;
0111:                this .lockPolicy = lockPolicy;
0112:                this .threadContextFactory = threadContextFactory;
0113:                this .lockStatsManager = lockStatsManager;
0114:            }
0115:
0116:            static LockResponseContext createLockRejectedResponseContext(
0117:                    LockID lockID, ServerThreadID threadID, int level) {
0118:                return new LockResponseContext(lockID, threadID.getNodeID(),
0119:                        threadID.getClientThreadID(), level,
0120:                        LockResponseContext.LOCK_NOT_AWARDED);
0121:            }
0122:
0123:            static LockResponseContext createLockAwardResponseContext(
0124:                    LockID lockID, ServerThreadID threadID, int level) {
0125:                LockResponseContext lrc = new LockResponseContext(lockID,
0126:                        threadID.getNodeID(), threadID.getClientThreadID(),
0127:                        level, LockResponseContext.LOCK_AWARD);
0128:                return lrc;
0129:            }
0130:
0131:            static LockResponseContext createLockRecallResponseContext(
0132:                    LockID lockID, ServerThreadID threadID, int level) {
0133:                return new LockResponseContext(lockID, threadID.getNodeID(),
0134:                        threadID.getClientThreadID(), level,
0135:                        LockResponseContext.LOCK_RECALL);
0136:            }
0137:
0138:            static LockResponseContext createLockWaitTimeoutResponseContext(
0139:                    LockID lockID, ServerThreadID threadID, int level) {
0140:                return new LockResponseContext(lockID, threadID.getNodeID(),
0141:                        threadID.getClientThreadID(), level,
0142:                        LockResponseContext.LOCK_WAIT_TIMEOUT);
0143:            }
0144:
0145:            static LockResponseContext createLockQueriedResponseContext(
0146:                    LockID lockID, ServerThreadID threadID, int level,
0147:                    int lockRequestQueueLength, Collection greedyHolders,
0148:                    Collection holders, Collection waiters) {
0149:                return new LockResponseContext(lockID, threadID.getNodeID(),
0150:                        threadID.getClientThreadID(), level,
0151:                        lockRequestQueueLength, greedyHolders, holders,
0152:                        waiters, LockResponseContext.LOCK_INFO);
0153:            }
0154:
0155:            static LockResponseContext createLockStatEnableResponseContext(
0156:                    LockID lockID, NodeID nodeID, ThreadID threadID, int level,
0157:                    int stackTraceDepth, int statCollectFrequency) {
0158:                return new LockResponseContext(lockID, nodeID, threadID, level,
0159:                        LockResponseContext.LOCK_STAT_ENABLED, stackTraceDepth,
0160:                        statCollectFrequency);
0161:            }
0162:
0163:            static LockResponseContext createLockStatDisableResponseContext(
0164:                    LockID lockID, NodeID nodeID, ThreadID threadID, int level) {
0165:                return new LockResponseContext(lockID, nodeID, threadID, level,
0166:                        LockResponseContext.LOCK_STAT_DISABLED);
0167:            }
0168:
0169:            private static Request createRequest(ServerThreadContext txn,
0170:                    int lockLevel, Sink lockResponseSink,
0171:                    WaitInvocation timeout, boolean isBlock) {
0172:                Request request = null;
0173:                if (isBlock) {
0174:                    request = new TryLockRequest(txn, lockLevel,
0175:                            lockResponseSink, timeout);
0176:                } else {
0177:                    request = new Request(txn, lockLevel, lockResponseSink);
0178:                }
0179:                return request;
0180:            }
0181:
0182:            synchronized LockMBean getMBean(DSOChannelManager channelManager) {
0183:                int count;
0184:                LockHolder[] holds = new LockHolder[this .holders.size()];
0185:                ServerLockRequest[] reqs = new ServerLockRequest[this .pendingLockRequests
0186:                        .size()];
0187:                Waiter[] waits = new Waiter[this .waiters.size()];
0188:
0189:                count = 0;
0190:                for (Iterator i = this .holders.values().iterator(); i.hasNext();) {
0191:                    Holder h = (Holder) i.next();
0192:                    NodeID cid = h.getNodeID();
0193:                    holds[count] = new LockHolder(h.getLockID(), cid,
0194:                            channelManager.getChannelAddress(cid), h
0195:                                    .getThreadID(), h.getLockLevel(), h
0196:                                    .getTimestamp());
0197:                    holds[count++].lockAcquired(h.getTimestamp());
0198:                }
0199:
0200:                count = 0;
0201:                for (Iterator i = this .pendingLockRequests.values().iterator(); i
0202:                        .hasNext();) {
0203:                    Request r = (Request) i.next();
0204:                    NodeID cid = r.getRequesterID();
0205:                    reqs[count++] = new ServerLockRequest(cid, channelManager
0206:                            .getChannelAddress(cid), r.getSourceID(), r
0207:                            .getLockLevel(), r.getTimestamp());
0208:                }
0209:
0210:                count = 0;
0211:                for (Iterator i = this .waiters.values().iterator(); i.hasNext();) {
0212:                    LockWaitContext wc = (LockWaitContext) i.next();
0213:                    NodeID cid = wc.getNodeID();
0214:                    waits[count++] = new Waiter(cid, channelManager
0215:                            .getChannelAddress(cid), wc.getThreadID(), wc
0216:                            .getWaitInvocation(), wc.getTimestamp());
0217:                }
0218:
0219:                return new LockMBeanImpl(lockID, holds, reqs, waits);
0220:            }
0221:
0222:            synchronized void enableClientStat(Sink lockResponseSink,
0223:                    int stackTraceDepth, int statCollectFrequency) {
0224:                enableClientStatInHolders(lockResponseSink, stackTraceDepth,
0225:                        statCollectFrequency);
0226:                enableClientStatInPendings(lockResponseSink, stackTraceDepth,
0227:                        statCollectFrequency);
0228:                enableClientStatInWaiters(lockResponseSink, stackTraceDepth,
0229:                        statCollectFrequency);
0230:            }
0231:
0232:            synchronized void disableClientStat(Set statEnabledClients,
0233:                    Sink lockResponseSink) {
0234:                for (Iterator i = statEnabledClients.iterator(); i.hasNext();) {
0235:                    NodeID nodeID = (NodeID) i.next();
0236:                    lockResponseSink.add(createLockStatDisableResponseContext(
0237:                            lockID, nodeID, ThreadID.VM_ID, level));
0238:                }
0239:            }
0240:
0241:            private void enableClientStatInHolders(Sink lockResponseSink,
0242:                    int stackTraceDepth, int statCollectFrequency) {
0243:                for (Iterator i = holders.values().iterator(); i.hasNext();) {
0244:                    Holder holder = (Holder) i.next();
0245:                    remoteEnableClientStat(lockResponseSink,
0246:                            holder.getNodeID(), stackTraceDepth,
0247:                            statCollectFrequency);
0248:                }
0249:            }
0250:
0251:            private void enableClientStatInPendings(Sink lockResponseSink,
0252:                    int stackTraceDepth, int statCollectFrequency) {
0253:                for (Iterator i = pendingLockRequests.values().iterator(); i
0254:                        .hasNext();) {
0255:                    Request request = (Request) i.next();
0256:                    remoteEnableClientStat(lockResponseSink, request
0257:                            .getThreadContext().getId().getNodeID(),
0258:                            stackTraceDepth, statCollectFrequency);
0259:                }
0260:            }
0261:
0262:            private void enableClientStatInWaiters(Sink lockResponseSink,
0263:                    int stackTraceDepth, int statCollectFrequency) {
0264:                for (Iterator i = waiters.values().iterator(); i.hasNext();) {
0265:                    LockWaitContext ctxt = (LockWaitContext) i.next();
0266:                    remoteEnableClientStat(lockResponseSink, ctxt.getNodeID(),
0267:                            stackTraceDepth, statCollectFrequency);
0268:                }
0269:            }
0270:
0271:            private void remoteEnableClientStat(Sink lockResponseSink,
0272:                    NodeID nodeID, int stackTraceDepth, int statCollectFrequency) {
0273:                if (!lockStatsManager.isLockStackTraceEnabledInClient(lockID,
0274:                        nodeID)) {
0275:                    lockResponseSink.add(createLockStatEnableResponseContext(
0276:                            lockID, nodeID, ThreadID.VM_ID, level,
0277:                            stackTraceDepth, statCollectFrequency));
0278:                    lockStatsManager.recordClientStackTraceEnabled(lockID,
0279:                            nodeID);
0280:                }
0281:            }
0282:
0283:            private void enableClientStatIfNeeded(Sink lockResponseSink,
0284:                    ServerThreadContext txn) {
0285:                if (lockStatsManager.isClientLockStackTraceEnable(lockID)) {
0286:                    remoteEnableClientStat(lockResponseSink, txn.getId()
0287:                            .getNodeID(), lockStatsManager
0288:                            .getLockStackTraceDepth(lockID), lockStatsManager
0289:                            .getLockStatCollectFrequency(lockID));
0290:                }
0291:            }
0292:
0293:            synchronized void queryLock(ServerThreadContext txn,
0294:                    Sink lockResponseSink) {
0295:
0296:                // TODO:
0297:                // The Remote Lock Manager needs to ask the client for lock information when greedy lock is awarded.
0298:                // Currently, the Remote Lock Manager responds to queryLock by looking at the server only.
0299:                lockResponseSink.add(createLockQueriedResponseContext(
0300:                        this .lockID, txn.getId(), this .level,
0301:                        this .pendingLockRequests.size(), this .greedyHolders
0302:                                .values(), this .holders.values(), this .waiters
0303:                                .values()));
0304:            }
0305:
0306:            boolean tryRequestLock(ServerThreadContext txn,
0307:                    int requestedLockLevel, WaitInvocation timeout,
0308:                    WaitTimer waitTimer, WaitTimerCallback callback,
0309:                    Sink lockResponseSink) {
0310:                return requestLock(txn, requestedLockLevel, lockResponseSink,
0311:                        true, timeout, waitTimer, callback);
0312:            }
0313:
0314:            boolean requestLock(ServerThreadContext txn,
0315:                    int requestedLockLevel, Sink lockResponseSink) {
0316:                return requestLock(txn, requestedLockLevel, lockResponseSink,
0317:                        false, null, null, null);
0318:            }
0319:
0320:            // XXX:: UPGRADE Requests can come in with requestLockLevel == UPGRADE on a notified wait during server crash
0321:            synchronized boolean requestLock(ServerThreadContext txn,
0322:                    int requestedLockLevel, Sink lockResponseSink,
0323:                    boolean noBlock, WaitInvocation timeout,
0324:                    WaitTimer waitTimer, WaitTimerCallback callback) {
0325:
0326:                if (holdsReadLock(txn) && LockLevel.isWrite(requestedLockLevel)) {
0327:                    // lock upgrade is not supported; it should have been rejected by the client.
0328:                    throw new TCLockUpgradeNotSupportedError(
0329:                            "Lock upgrade is not supported. The request should have been rejected by the client. Your client may be using an older version of tc.jar");
0330:                }
0331:
0332:                if (waiters.containsKey(txn))
0333:                    throw new AssertionError(
0334:                            "Attempt to request a lock in a Thread "
0335:                                    + "that is already part of the wait set. lock = "
0336:                                    + this );
0337:
0338:                enableClientStatIfNeeded(lockResponseSink, txn);
0339:                recordLockRequestStat(txn.getId().getNodeID(), txn.getId()
0340:                        .getClientThreadID(), requestedLockLevel);
0341:
0342:                // debug("requestLock - BEGIN -", txn, ",", LockLevel.toString(requestedLockLevel));
0343:                // it is an error (probably originating from the client side) to
0344:                // request a lock you already hold
0345:                Holder holder = getHolder(txn);
0346:                if (noBlock
0347:                        && !timeout.needsToWait()
0348:                        && holder == null
0349:                        && (requestedLockLevel != LockLevel.READ || !this 
0350:                                .isRead())
0351:                        && (getHoldersCount() > 0 || hasGreedyHolders())) {
0352:                    cannotAwardAndRespond(txn, requestedLockLevel,
0353:                            lockResponseSink);
0354:                    return false;
0355:                }
0356:
0357:                if (holder != null) {
0358:                    if (LockLevel.NIL_LOCK_LEVEL != (holder.getLockLevel() & requestedLockLevel)) {
0359:                        // formatting
0360:                        throw new AssertionError(
0361:                                "Client requesting already held lock! holder="
0362:                                        + holder + ", lock=" + this );
0363:                    }
0364:                }
0365:
0366:                if (isPolicyGreedy()) {
0367:                    if (canAwardGreedilyOnTheClient(txn, requestedLockLevel)) {
0368:                        // These requests are the ones in the wire when the greedy lock was given out to the client.
0369:                        // We can safely ignore it as the clients will be able to award it locally.
0370:                        logger
0371:                                .debug(lockID
0372:                                        + " : Lock.requestLock() : Ignoring the Lock request("
0373:                                        + txn
0374:                                        + ","
0375:                                        + LockLevel
0376:                                                .toString(requestedLockLevel)
0377:                                        + ") message from the a client that has the lock greedily.");
0378:                        return false;
0379:                    } else if (recalled) {
0380:                        // add to pending until recall process is complete, those who hold the lock greedily will send the
0381:                        // pending state during recall commit.
0382:                        if (!holdsGreedyLock(txn)) {
0383:                            queueRequest(txn, requestedLockLevel,
0384:                                    lockResponseSink, noBlock, timeout,
0385:                                    waitTimer, callback);
0386:                        }
0387:                        return false;
0388:                    }
0389:                }
0390:
0391:                // Lock granting logic:
0392:                // 0. If no one is holding this lock, go ahead and award it
0393:                // 1. If only a read lock is held and no write locks are pending, and another read
0394:                // (and only read) lock is requested, award it. If Write locks are pending, we dont want to
0395:                // starve the WRITES by keeping on awarding READ Locks.
0396:                // 2. Else the request must be queued (ie. added to pending list)
0397:
0398:                if ((getHoldersCount() == 0)
0399:                        || ((!hasPending()) && ((requestedLockLevel == LockLevel.READ) && this 
0400:                                .isRead()))) {
0401:                    // (0, 1) uncontended or additional read lock
0402:                    if (isPolicyGreedy()
0403:                            && ((requestedLockLevel == LockLevel.READ) || (getWaiterCount() == 0))) {
0404:                        awardGreedyAndRespond(txn, requestedLockLevel,
0405:                                lockResponseSink);
0406:                    } else {
0407:                        awardAndRespond(txn, txn.getId().getClientThreadID(),
0408:                                requestedLockLevel, lockResponseSink);
0409:                    }
0410:                } else {
0411:                    // (2) queue request
0412:                    if (isPolicyGreedy() && hasGreedyHolders()) {
0413:                        recall(requestedLockLevel);
0414:                    }
0415:                    if (!holdsGreedyLock(txn)) {
0416:                        queueRequest(txn, requestedLockLevel, lockResponseSink,
0417:                                noBlock, timeout, waitTimer, callback);
0418:                    }
0419:                    return false;
0420:                }
0421:
0422:                return true;
0423:            }
0424:
0425:            private void queueRequest(ServerThreadContext txn,
0426:                    int requestedLockLevel, Sink lockResponseSink,
0427:                    boolean noBlock, WaitInvocation timeout,
0428:                    WaitTimer waitTimer, WaitTimerCallback callback) {
0429:                if (noBlock) {
0430:                    // By the time it reaches here, timeout.needsToWait() must be true
0431:                    Assert.assertTrue(timeout.needsToWait());
0432:                    addPendingTryLockRequest(txn, requestedLockLevel, timeout,
0433:                            lockResponseSink, waitTimer, callback);
0434:                } else {
0435:                    addPendingLockRequest(txn, requestedLockLevel,
0436:                            lockResponseSink);
0437:                }
0438:            }
0439:
0440:            synchronized void addRecalledHolder(ServerThreadContext txn,
0441:                    int lockLevel) {
0442:                // debug("addRecalledHolder - BEGIN -", txn, ",", LockLevel.toString(lockLevel));
0443:                if (!LockLevel.isWrite(level) && LockLevel.isWrite(lockLevel)) {
0444:                    // Client issued a WRITE lock without holding a GREEDY WRITE. Bug in the client.
0445:                    throw new AssertionError(
0446:                            "Client issued a WRITE lock without holding a GREEDY WRITE !");
0447:                }
0448:                recordLockRequestStat(txn.getId().getNodeID(), txn.getId()
0449:                        .getClientThreadID(), lockLevel);
0450:                awardLock(txn, txn.getId().getClientThreadID(), lockLevel);
0451:            }
0452:
0453:            synchronized void addRecalledPendingRequest(
0454:                    ServerThreadContext txn, int lockLevel,
0455:                    Sink lockResponseSink) {
0456:                // debug("addRecalledPendingRequest - BEGIN -", txn, ",", LockLevel.toString(lockLevel));
0457:                recordLockRequestStat(txn.getId().getNodeID(), txn.getId()
0458:                        .getClientThreadID(), lockLevel);
0459:                addPendingLockRequest(txn, lockLevel, lockResponseSink);
0460:            }
0461:
0462:            synchronized void addRecalledTryLockPendingRequest(
0463:                    ServerThreadContext txn, int lockLevel,
0464:                    WaitInvocation timeout, Sink lockResponseSink,
0465:                    WaitTimer waitTimer, WaitTimerCallback callback) {
0466:                recordLockRequestStat(txn.getId().getNodeID(), txn.getId()
0467:                        .getClientThreadID(), lockLevel);
0468:
0469:                if (!timeout.needsToWait()) {
0470:                    cannotAwardAndRespond(txn, lockLevel, lockResponseSink);
0471:                    return;
0472:                }
0473:
0474:                addPendingTryLockRequest(txn, lockLevel, timeout,
0475:                        lockResponseSink, waitTimer, callback);
0476:            }
0477:
0478:            private void addPendingTryLockRequest(ServerThreadContext txn,
0479:                    int lockLevel, WaitInvocation timeout,
0480:                    Sink lockResponseSink, WaitTimer waitTimer,
0481:                    WaitTimerCallback callback) {
0482:                Request request = addPending(txn, lockLevel, lockResponseSink,
0483:                        timeout, true);
0484:
0485:                TryLockContextImpl tryLockWaitRequestContext = new TryLockContextImpl(
0486:                        txn, this , timeout, lockLevel, lockResponseSink);
0487:
0488:                scheduleWaitForTryLock(callback, waitTimer, request,
0489:                        tryLockWaitRequestContext);
0490:            }
0491:
0492:            private void addPendingLockRequest(
0493:                    ServerThreadContext threadContext, int lockLevel,
0494:                    Sink awardLockSink) {
0495:                addPending(threadContext, lockLevel, awardLockSink, null, false);
0496:            }
0497:
0498:            private Request addPending(ServerThreadContext threadContext,
0499:                    int lockLevel, Sink awardLockSink, WaitInvocation timeout,
0500:                    boolean noBlock) {
0501:                Assert.assertFalse(isNull());
0502:                // debug("addPending() - BEGIN -", threadContext, ", ", LockLevel.toString(lockLevel));
0503:
0504:                Request request = createRequest(threadContext, lockLevel,
0505:                        awardLockSink, timeout, noBlock);
0506:
0507:                if (pendingLockRequests.containsValue(request)) {
0508:                    logger.debug("Ignoring existing Request " + request
0509:                            + " in Lock " + lockID);
0510:                    return request;
0511:                }
0512:
0513:                this .pendingLockRequests.put(threadContext, request);
0514:                for (Iterator currentHolders = holders.values().iterator(); currentHolders
0515:                        .hasNext();) {
0516:                    Holder holder = (Holder) currentHolders.next();
0517:                    notifyAddPending(holder);
0518:                }
0519:                threadContext.setWaitingOn(this );
0520:                return request;
0521:            }
0522:
0523:            private synchronized void recall(int recallLevel) {
0524:                if (recalled) {
0525:                    return;
0526:                }
0527:                recordLockHoppedStat();
0528:                for (Iterator i = greedyHolders.values().iterator(); i
0529:                        .hasNext();) {
0530:                    Holder holder = (Holder) i.next();
0531:                    // debug("recall() - issued for -", holder);
0532:                    holder.getSink().add(
0533:                            createLockRecallResponseContext(holder.getLockID(),
0534:                                    holder.getThreadContext().getId(),
0535:                                    recallLevel));
0536:                    recalled = true;
0537:                }
0538:            }
0539:
0540:            private boolean isGreedyRequest(ServerThreadContext txn) {
0541:                return (txn.getId().getClientThreadID().equals(ThreadID.VM_ID));
0542:            }
0543:
0544:            private boolean isPolicyGreedy() {
0545:                return lockPolicy == LockManagerImpl.GREEDY_LOCK_POLICY;
0546:            }
0547:
0548:            int getLockPolicy() {
0549:                return lockPolicy;
0550:            }
0551:
0552:            int getLockLevel() {
0553:                return level;
0554:            }
0555:
0556:            void setLockPolicy(int newPolicy) {
0557:                if (!isNull() && newPolicy != lockPolicy) {
0558:                    this .lockPolicy = newPolicy;
0559:                    if (!isPolicyGreedy()) {
0560:                        recall(LockLevel.WRITE);
0561:                    }
0562:                }
0563:            }
0564:
0565:            private void awardGreedyAndRespond(ServerThreadContext txn,
0566:                    int requestedLockLevel, Sink lockResponseSink) {
0567:                // debug("awardGreedyAndRespond() - BEGIN - ", txn, ",", LockLevel.toString(requestedLockLevel));
0568:                final ServerThreadContext clientTx = getClientVMContext(txn);
0569:                final int greedyLevel = LockLevel
0570:                        .makeGreedy(requestedLockLevel);
0571:
0572:                NodeID ch = txn.getId().getNodeID();
0573:                checkAndClearStateOnGreedyAward(
0574:                        txn.getId().getClientThreadID(), ch, requestedLockLevel);
0575:                Holder holder = awardAndRespond(clientTx, txn.getId()
0576:                        .getClientThreadID(), greedyLevel, lockResponseSink);
0577:                holder.setSink(lockResponseSink);
0578:                greedyHolders.put(ch, holder);
0579:                clearWaitingOn(txn);
0580:            }
0581:
0582:            private void cannotAwardAndRespond(ServerThreadContext txn,
0583:                    int requestedLockLevel, Sink lockResponseSink) {
0584:                lockResponseSink.add(createLockRejectedResponseContext(
0585:                        this .lockID, txn.getId(), requestedLockLevel));
0586:                recordLockRejectStat(txn.getId().getNodeID(), txn.getId()
0587:                        .getClientThreadID());
0588:            }
0589:
0590:            private Holder awardAndRespond(ServerThreadContext txn,
0591:                    ThreadID requestThreadID, int requestedLockLevel,
0592:                    Sink lockResponseSink) {
0593:                // debug("awardRespond() - BEGIN - ", txn, ",", LockLevel.toString(requestedLockLevel));
0594:                Holder holder = awardLock(txn, requestThreadID,
0595:                        requestedLockLevel);
0596:                lockResponseSink.add(createLockAwardResponseContext(
0597:                        this .lockID, txn.getId(), requestedLockLevel));
0598:                return holder;
0599:            }
0600:
0601:            synchronized void notify(ServerThreadContext txn, boolean all,
0602:                    NotifiedWaiters addNotifiedWaitersTo)
0603:                    throws TCIllegalMonitorStateException {
0604:                // debug("notify() - BEGIN - ", txn, ", all = " + all);
0605:                if (waiters.containsKey(txn)) {
0606:                    throw Assert.failure("Can't notify self: " + txn);
0607:                }
0608:                checkLegalWaitNotifyState(txn);
0609:
0610:                if (waiters.size() > 0) {
0611:                    final int numToNotify = all ? waiters.size() : 1;
0612:                    for (int i = 0; i < numToNotify; i++) {
0613:                        LockWaitContext wait = (LockWaitContext) waiters
0614:                                .remove(0);
0615:                        removeAndCancelWaitTimer(wait);
0616:                        recordLockRequestStat(wait.getNodeID(), wait
0617:                                .getThreadID(), wait.lockLevel());
0618:                        createPendingFromWaiter(wait);
0619:                        addNotifiedWaitersTo.addNotification(new LockContext(
0620:                                lockID, wait.getNodeID(), wait.getThreadID(),
0621:                                wait.lockLevel()));
0622:                    }
0623:                    recordLockNotifyStat(numToNotify);
0624:                }
0625:            }
0626:
0627:            synchronized void interrupt(ServerThreadContext txn) {
0628:                if (waiters.size() == 0 || !waiters.containsKey(txn)) {
0629:                    logger
0630:                            .warn("Cannot interrupt: " + txn
0631:                                    + " is not waiting.");
0632:                    return;
0633:                }
0634:                LockWaitContext wait = (LockWaitContext) waiters.remove(txn);
0635:                recordLockRequestStat(wait.getNodeID(), wait.getThreadID(),
0636:                        wait.lockLevel());
0637:                removeAndCancelWaitTimer(wait);
0638:                createPendingFromWaiter(wait);
0639:            }
0640:
0641:            private void removeAndCancelWaitTimer(LockWaitContext wait) {
0642:                TimerTask task = (TimerTask) waitTimers.remove(wait);
0643:                if (task != null)
0644:                    task.cancel();
0645:            }
0646:
0647:            private Request createPendingFromWaiter(LockWaitContext wait) {
0648:                // XXX: This cast to WaitContextImpl is lame. I'm not sure how to refactor it right now.
0649:                Request request = createRequest(((LockWaitContextImpl) wait)
0650:                        .getThreadContext(), wait.lockLevel(), wait
0651:                        .getLockResponseSink(), null, false);
0652:                createPending(wait, request);
0653:                return request;
0654:            }
0655:
0656:            private void createPending(LockWaitContext wait, Request request) {
0657:                ServerThreadContext txn = ((LockWaitContextImpl) wait)
0658:                        .getThreadContext();
0659:                pendingLockRequests.put(txn, request);
0660:
0661:                if (isPolicyGreedy() && hasGreedyHolders()) {
0662:                    recall(request.getLockLevel());
0663:                }
0664:            }
0665:
0666:            synchronized void tryRequestLockTimeout(LockWaitContext context) {
0667:                TryLockContextImpl tryLockContext = (TryLockContextImpl) context;
0668:                ServerThreadContext txn = tryLockContext.getThreadContext();
0669:                Object removed = tryLockTimers.remove(txn);
0670:                if (removed != null) {
0671:                    pendingLockRequests.remove(txn);
0672:                    Sink lockResponseSink = context.getLockResponseSink();
0673:                    int lockLevel = context.lockLevel();
0674:                    cannotAwardAndRespond(txn, lockLevel, lockResponseSink);
0675:                }
0676:            }
0677:
0678:            synchronized void waitTimeout(LockWaitContext context) {
0679:
0680:                // debug("waitTimeout() - BEGIN -", context);
0681:                // XXX: This cast is gross, too.
0682:                ServerThreadContext txn = ((LockWaitContextImpl) context)
0683:                        .getThreadContext();
0684:                Object removed = waiters.remove(txn);
0685:
0686:                if (removed != null) {
0687:                    waitTimers.remove(context);
0688:                    Sink lockResponseSink = context.getLockResponseSink();
0689:                    int lockLevel = context.lockLevel();
0690:
0691:                    // Add a wait Timeout message
0692:                    lockResponseSink.add(createLockWaitTimeoutResponseContext(
0693:                            this .lockID, txn.getId(), lockLevel));
0694:
0695:                    recordLockRequestStat(context.getNodeID(), context
0696:                            .getThreadID(), lockLevel);
0697:                    if (holders.size() == 0) {
0698:                        if (isPolicyGreedy() && (getWaiterCount() == 0)) {
0699:                            awardGreedyAndRespond(txn, lockLevel,
0700:                                    lockResponseSink);
0701:                        } else {
0702:                            awardAndRespond(txn, txn.getId()
0703:                                    .getClientThreadID(), lockLevel,
0704:                                    lockResponseSink);
0705:                        }
0706:                    } else {
0707:                        createPendingFromWaiter(context);
0708:                    }
0709:                }
0710:            }
0711:
0712:            synchronized void wait(ServerThreadContext txn,
0713:                    WaitTimer waitTimer, WaitInvocation call,
0714:                    WaitTimerCallback callback, Sink lockResponseSink)
0715:                    throws TCIllegalMonitorStateException {
0716:                // debug("wait() - BEGIN -", txn, ", ", call);
0717:                if (waiters.containsKey(txn))
0718:                    throw Assert.failure("Already in wait set: " + txn);
0719:                checkLegalWaitNotifyState(txn);
0720:
0721:                Holder current = getHolder(txn);
0722:                Assert.assertNotNull(current);
0723:
0724:                LockWaitContext waitContext = new LockWaitContextImpl(txn,
0725:                        this , call, current.getLockLevel(), lockResponseSink);
0726:                waiters.put(txn, waitContext);
0727:
0728:                scheduleWait(callback, waitTimer, waitContext);
0729:                txn.setWaitingOn(this );
0730:                removeCurrentHold(txn);
0731:
0732:                recordLockWaitStat();
0733:                nextPending();
0734:            }
0735:
0736:            // This method reestablished Wait State and schedules wait timeouts too. There are cases where we may need to ignore a
0737:            // wait, if we already know about it. Note that it could be either in waiting or pending state.
0738:            synchronized void addRecalledWaiter(ServerThreadContext txn,
0739:                    WaitInvocation call, int lockLevel, Sink lockResponseSink,
0740:                    WaitTimer waitTimer, WaitTimerCallback callback) {
0741:                // debug("addRecalledWaiter() - BEGIN -", txn, ", ", call);
0742:
0743:                LockWaitContext waitContext = new LockWaitContextImpl(txn,
0744:                        this , call, lockLevel, lockResponseSink);
0745:                if (waiters.containsKey(txn)) {
0746:                    logger.debug("addRecalledWaiter(): Ignoring " + waitContext
0747:                            + " as it is already in waiters list.");
0748:                    return;
0749:                }
0750:                Request request = createRequest(txn, lockLevel,
0751:                        lockResponseSink, null, false);
0752:                if (pendingLockRequests.containsValue(request)) {
0753:                    logger.debug("addRecalledWaiter(): Ignoring " + waitContext
0754:                            + " as it is already in pending list.");
0755:                    return;
0756:                }
0757:                waiters.put(txn, waitContext);
0758:                scheduleWait(callback, waitTimer, waitContext);
0759:                recordLockWaitStat();
0760:            }
0761:
0762:            // This method reestablished Wait State and does not schedules wait timeouts too. This is
0763:            // called when LockManager is starting and wait timers are started when the lock Manager is started.
0764:            synchronized void reestablishWait(ServerThreadContext txn,
0765:                    WaitInvocation call, int lockLevel, Sink lockResponseSink) {
0766:                LockWaitContext waitContext = new LockWaitContextImpl(txn,
0767:                        this , call, lockLevel, lockResponseSink);
0768:                Object old = waiters.put(txn, waitContext);
0769:                if (old != null)
0770:                    throw Assert.failure("Already in wait set: " + txn);
0771:            }
0772:
0773:            synchronized void reestablishLock(
0774:                    ServerThreadContext threadContext, int requestedLevel,
0775:                    Sink lockResponseSink) {
0776:                if ((LockLevel.isWrite(requestedLevel) && holders.size() != 0)
0777:                        || (LockLevel.isRead(requestedLevel) && LockLevel
0778:                                .isWrite(this .level))) {
0779:                    throw new AssertionError("Lock " + this 
0780:                            + " already held by other Holder. Can't grant to "
0781:                            + threadContext
0782:                            + LockLevel.toString(requestedLevel));
0783:
0784:                }
0785:                if (waiters.get(threadContext) != null) {
0786:                    throw new AssertionError("Thread " + threadContext
0787:                            + "is already in Wait state for Lock " + this 
0788:                            + ". Can't grant Lock Hold !");
0789:                }
0790:                recordLockRequestStat(threadContext.getId().getNodeID(),
0791:                        threadContext.getId().getClientThreadID(),
0792:                        requestedLevel);
0793:                if (isGreedyRequest(threadContext)) {
0794:                    int greedyLevel = LockLevel.makeGreedy(requestedLevel);
0795:                    NodeID nid = threadContext.getId().getNodeID();
0796:                    Holder holder = awardLock(threadContext, threadContext
0797:                            .getId().getClientThreadID(), greedyLevel);
0798:                    holder.setSink(lockResponseSink);
0799:                    greedyHolders.put(nid, holder);
0800:                } else {
0801:                    awardLock(threadContext, threadContext.getId()
0802:                            .getClientThreadID(), requestedLevel);
0803:                }
0804:            }
0805:
0806:            private void scheduleWait(WaitTimerCallback callback,
0807:                    WaitTimer waitTimer, LockWaitContext waitContext) {
0808:                final TimerTask timer = waitTimer.scheduleTimer(callback,
0809:                        waitContext.getWaitInvocation(), waitContext);
0810:                if (timer != null) {
0811:                    waitTimers.put(waitContext, timer);
0812:                }
0813:            }
0814:
0815:            private TimerTask scheduleWaitForTryLock(
0816:                    WaitTimerCallback callback, WaitTimer waitTimer,
0817:                    Request pendingRequest,
0818:                    TryLockContextImpl tryLockWaitRequestContext) {
0819:                final TimerTask timer = waitTimer.scheduleTimer(callback,
0820:                        tryLockWaitRequestContext.getWaitInvocation(),
0821:                        tryLockWaitRequestContext);
0822:                if (timer != null) {
0823:                    tryLockTimers.put(tryLockWaitRequestContext
0824:                            .getThreadContext(), timer);
0825:                }
0826:                return timer;
0827:            }
0828:
0829:            private void checkLegalWaitNotifyState(
0830:                    ServerThreadContext threadContext)
0831:                    throws TCIllegalMonitorStateException {
0832:                Assert.assertFalse(isNull());
0833:
0834:                final int holdersSize = holders.size();
0835:                if (holdersSize != 1) {
0836:                    throw new TCIllegalMonitorStateException(
0837:                            "Invalid holder set size: " + holdersSize);
0838:                }
0839:
0840:                final int currentLevel = this .level;
0841:                if (!LockLevel.isWrite(currentLevel)) {
0842:                    throw new TCIllegalMonitorStateException(
0843:                            "Incorrect lock level: "
0844:                                    + LockLevel.toString(currentLevel));
0845:                }
0846:
0847:                Holder holder = getHolder(threadContext);
0848:                if (holder == null) {
0849:                    holder = getHolder(getClientVMContext(threadContext));
0850:                }
0851:
0852:                if (holder == null) {
0853:                    // make formatter sane
0854:                    throw new TCIllegalMonitorStateException(threadContext
0855:                            + " is not the current lock holder for: "
0856:                            + threadContext);
0857:                }
0858:            }
0859:
0860:            private ServerThreadContext getClientVMContext(
0861:                    ServerThreadContext threadContext) {
0862:                return threadContextFactory.getOrCreate(threadContext.getId()
0863:                        .getNodeID(), ThreadID.VM_ID);
0864:            }
0865:
0866:            public synchronized int getHoldersCount() {
0867:                return holders.size();
0868:            }
0869:
0870:            public synchronized int getPendingCount() {
0871:                return pendingLockRequests.size();
0872:            }
0873:
0874:            Collection getHoldersCollection() {
0875:                return Collections
0876:                        .unmodifiableCollection(this .holders.values());
0877:            }
0878:
0879:            public synchronized String toString() {
0880:                try {
0881:                    StringBuffer rv = new StringBuffer();
0882:
0883:                    rv.append(lockID).append(", ").append("Level: ").append(
0884:                            LockLevel.toString(this .level)).append("\r\n");
0885:
0886:                    rv.append("Holders (").append(holders.size()).append(
0887:                            ")\r\n");
0888:                    for (Iterator iter = holders.values().iterator(); iter
0889:                            .hasNext();) {
0890:                        rv.append('\t').append(iter.next().toString()).append(
0891:                                "\r\n");
0892:                    }
0893:
0894:                    rv.append("Wait Set (").append(waiters.size()).append(
0895:                            ")\r\n");
0896:                    for (Iterator iter = waiters.values().iterator(); iter
0897:                            .hasNext();) {
0898:                        rv.append('\t').append(iter.next().toString()).append(
0899:                                "\r\n");
0900:                    }
0901:
0902:                    rv.append("Pending lock requests (").append(
0903:                            pendingLockRequests.size()).append(")\r\n");
0904:                    for (Iterator iter = pendingLockRequests.values()
0905:                            .iterator(); iter.hasNext();) {
0906:                        rv.append('\t').append(iter.next().toString()).append(
0907:                                "\r\n");
0908:                    }
0909:
0910:                    return rv.toString();
0911:                } catch (Throwable t) {
0912:                    t.printStackTrace();
0913:                    return "Exception in toString(): " + t.getMessage();
0914:                }
0915:            }
0916:
0917:            private Holder awardLock(ServerThreadContext threadContext,
0918:                    ThreadID requestThreadID, int lockLevel) {
0919:                Assert.assertFalse(isNull());
0920:
0921:                Holder holder = getHolder(threadContext);
0922:
0923:                Assert.assertNull(holder);
0924:                threadContext.addLock(this );
0925:                holder = new Holder(this .lockID, threadContext, this .timeout);
0926:                holder.addLockLevel(lockLevel);
0927:                Object prev = this .holders.put(threadContext, holder);
0928:                Assert.assertNull(prev);
0929:                this .level = holder.getLockLevel();
0930:                notifyAwardLock(holder);
0931:                recordLockAwardStat(holder.getNodeID(), requestThreadID,
0932:                        isGreedyRequest(threadContext), holder.getTimestamp());
0933:                return holder;
0934:            }
0935:
0936:            private void notifyAwardLock(Holder holder) {
0937:                final int waitingCount = this .pendingLockRequests.size();
0938:
0939:                for (int i = 0; i < listeners.length; i++) {
0940:                    listeners[i].notifyAward(waitingCount, holder);
0941:                }
0942:            }
0943:
0944:            public synchronized boolean isRead() {
0945:                return LockLevel.READ == this .level;
0946:            }
0947:
0948:            public synchronized boolean isWrite() {
0949:                return LockLevel.WRITE == this .level;
0950:            }
0951:
0952:            private boolean holdsReadLock(ServerThreadContext threadContext) {
0953:                Holder holder = getHolder(threadContext);
0954:                if (holder != null) {
0955:                    return holder.getLockLevel() == LockLevel.READ;
0956:                }
0957:                return false;
0958:            }
0959:
0960:            private Holder getHolder(ServerThreadContext threadContext) {
0961:                return (Holder) this .holders.get(threadContext);
0962:            }
0963:
0964:            public Holder getLockHolder(ServerThreadContext threadContext) {
0965:                Holder lockHolder = (Holder) this .holders.get(threadContext);
0966:                if (lockHolder == null) {
0967:                    lockHolder = (Holder) this .holders
0968:                            .get(getClientVMContext(threadContext));
0969:                }
0970:                return lockHolder;
0971:            }
0972:
0973:            private void notifyAddPending(Holder holder) {
0974:                final int waitingCount = this .pendingLockRequests.size();
0975:
0976:                for (int i = 0; i < this .listeners.length; i++) {
0977:                    this .listeners[i].notifyAddPending(waitingCount, holder);
0978:                }
0979:            }
0980:
0981:            synchronized int getWaiterCount() {
0982:                return this .waiters.size();
0983:            }
0984:
0985:            synchronized boolean hasPending() {
0986:                return pendingLockRequests.size() > 0;
0987:            }
0988:
0989:            synchronized boolean hasWaiting() {
0990:                return this .waiters.size() > 0;
0991:            }
0992:
0993:            boolean hasGreedyHolders() {
0994:                return this .greedyHolders.size() > 0;
0995:            }
0996:
0997:            synchronized boolean hasWaiting(ServerThreadContext threadContext) {
0998:                return (this .waiters.get(threadContext) != null);
0999:            }
1000:
1001:            public LockID getLockID() {
1002:                return lockID;
1003:            }
1004:
1005:            public boolean isNull() {
1006:                return this .isNull;
1007:            }
1008:
1009:            public int hashCode() {
1010:                return this .lockID.hashCode();
1011:            }
1012:
1013:            public boolean equals(Object obj) {
1014:                if (obj instanceof  Lock) {
1015:                    Lock other = (Lock) obj;
1016:                    return this .lockID.equals(other.lockID);
1017:                }
1018:                return false;
1019:            }
1020:
1021:            private boolean readHolder() {
1022:                // We only need to check the first holder as we cannot have 2 holder, one holding a READ and another holding a
1023:                // WRITE.
1024:                Holder holder = (Holder) holders.values().iterator().next();
1025:                return holder != null
1026:                        && LockLevel.isRead(holder.getLockLevel());
1027:            }
1028:
1029:            synchronized boolean nextPending() {
1030:                Assert.eval(!isNull());
1031:                // debug("nextPending() - BEGIN -");
1032:
1033:                boolean clear;
1034:                try {
1035:                    // Lock upgrade is not supported.
1036:                    if (!pendingLockRequests.isEmpty()) {
1037:                        Request request = (Request) pendingLockRequests
1038:                                .get(pendingLockRequests.get(0));
1039:                        int reqLockLevel = request.getLockLevel();
1040:
1041:                        boolean canGrantRequest = (reqLockLevel == LockLevel.READ) ? (holders
1042:                                .isEmpty() || readHolder())
1043:                                : holders.isEmpty();
1044:                        if (canGrantRequest) {
1045:
1046:                            switch (reqLockLevel) {
1047:                            case LockLevel.WRITE: {
1048:                                pendingLockRequests.remove(0);
1049:                                cancelTryLockTimer(request);
1050:                                // Give locks greedily only if there is no one waiting or pending for this lock
1051:                                if (isPolicyGreedy()
1052:                                        && isAllPendingLockRequestsFromNode(request
1053:                                                .getRequesterID())
1054:                                        && (getWaiterCount() == 0)) {
1055:                                    // debug("nextPending() - Giving GREEDY WRITE request -", request);
1056:                                    grantGreedyRequest(request);
1057:                                } else {
1058:                                    grantRequest(request);
1059:                                }
1060:                                break;
1061:                            }
1062:                            case LockLevel.READ: {
1063:                                // debug("nextPending() - granting READ request -", request);
1064:                                awardAllReads();
1065:                                break;
1066:                            }
1067:                            default: {
1068:                                throw new TCInternalError(
1069:                                        "Unknown lock level in request: "
1070:                                                + reqLockLevel);
1071:                            }
1072:                            }
1073:                        }
1074:                    }
1075:                } finally {
1076:                    clear = holders.size() == 0 && this .waiters.size() == 0
1077:                            && this .pendingLockRequests.size() == 0;
1078:                }
1079:
1080:                return clear;
1081:            }
1082:
1083:            private Request cancelTryLockTimer(Request request) {
1084:                if (!(request instanceof  TryLockRequest)) {
1085:                    return null;
1086:                }
1087:
1088:                ServerThreadContext requestThreadContext = request
1089:                        .getThreadContext();
1090:                TimerTask recallTimer = (TimerTask) tryLockTimers
1091:                        .remove(requestThreadContext);
1092:                if (recallTimer != null) {
1093:                    recallTimer.cancel();
1094:                    return request;
1095:                }
1096:                return null;
1097:            }
1098:
1099:            private void grantGreedyRequest(Request request) {
1100:                // debug("grantGreedyRequest() - BEGIN -", request);
1101:                ServerThreadContext threadContext = request.getThreadContext();
1102:                awardGreedyAndRespond(threadContext, request.getLockLevel(),
1103:                        request.getLockResponseSink());
1104:                clearWaitingOn(threadContext);
1105:            }
1106:
1107:            private void grantRequest(Request request) {
1108:                // debug("grantRequest() - BEGIN -", request);
1109:                ServerThreadContext threadContext = request.getThreadContext();
1110:                awardLock(threadContext, threadContext.getId()
1111:                        .getClientThreadID(), request.getLockLevel());
1112:                clearWaitingOn(threadContext);
1113:                request.execute(lockID);
1114:            }
1115:
1116:            /**
1117:             * Remove the specified lock hold.
1118:             * 
1119:             * @return true if the current hold was an upgrade
1120:             */
1121:            synchronized boolean removeCurrentHold(
1122:                    ServerThreadContext threadContext) {
1123:                // debug("removeCurrentHold() - BEGIN -", threadContext);
1124:                Holder holder = getHolder(threadContext);
1125:                if (holder != null) {
1126:                    this .holders.remove(threadContext);
1127:                    threadContext.removeLock(this );
1128:                    threadContextFactory.removeIfClear(threadContext);
1129:                    if (isGreedyRequest(threadContext)) {
1130:                        removeGreedyHolder(threadContext.getId().getNodeID());
1131:                    }
1132:                    this .level = (holders.size() == 0 ? LockLevel.NIL_LOCK_LEVEL
1133:                            : LockLevel.READ);
1134:                    notifyRevoke(holder);
1135:                    recordLockReleaseStat(holder.getNodeID(), holder
1136:                            .getThreadID());
1137:                }
1138:                return false;
1139:            }
1140:
1141:            synchronized boolean recallCommit(ServerThreadContext threadContext) {
1142:                // debug("recallCommit() - BEGIN -", threadContext);
1143:                Assert.assertTrue(isGreedyRequest(threadContext));
1144:                boolean issueRecall = !recalled;
1145:                removeCurrentHold(threadContext);
1146:                if (issueRecall) {
1147:                    recall(LockLevel.WRITE);
1148:                }
1149:                if (recalled == false) {
1150:                    return nextPending();
1151:                }
1152:                return false;
1153:            }
1154:
1155:            private synchronized void removeGreedyHolder(NodeID nodeID) {
1156:                // debug("removeGreedyHolder() - BEGIN -", channelID);
1157:                greedyHolders.remove(nodeID);
1158:                if (!hasGreedyHolders()) {
1159:                    recalled = false;
1160:                }
1161:            }
1162:
1163:            private void clearWaitingOn(ServerThreadContext threadContext) {
1164:                threadContext.clearWaitingOn();
1165:                threadContextFactory.removeIfClear(threadContext);
1166:            }
1167:
1168:            synchronized void awardAllReads() {
1169:                // debug("awardAllReads() - BEGIN -");
1170:                List pendingReadLockRequests = new ArrayList(
1171:                        pendingLockRequests.size());
1172:                boolean hasPendingWrites = false;
1173:
1174:                for (Iterator i = pendingLockRequests.values().iterator(); i
1175:                        .hasNext();) {
1176:                    Request request = (Request) i.next();
1177:                    if (request.getLockLevel() == LockLevel.READ) {
1178:                        pendingReadLockRequests.add(request);
1179:                        i.remove();
1180:                    } else if (!hasPendingWrites
1181:                            && request.getLockLevel() == LockLevel.WRITE) {
1182:                        hasPendingWrites = true;
1183:                    }
1184:                }
1185:
1186:                for (Iterator i = pendingReadLockRequests.iterator(); i
1187:                        .hasNext();) {
1188:                    Request request = (Request) i.next();
1189:                    cancelTryLockTimer(request);
1190:                    if (isPolicyGreedy() && !hasPendingWrites) {
1191:                        ServerThreadContext tid = request.getThreadContext();
1192:                        if (!holdsGreedyLock(tid)) {
1193:                            grantGreedyRequest(request);
1194:                        } else {
1195:                            // These can be awarded locally in the client ...
1196:                            clearWaitingOn(tid);
1197:                        }
1198:                    } else {
1199:                        grantRequest(request);
1200:                    }
1201:                }
1202:            }
1203:
1204:            synchronized boolean holdsSomeLock(NodeID nodeID) {
1205:                for (Iterator iter = holders.values().iterator(); iter
1206:                        .hasNext();) {
1207:                    Holder holder = (Holder) iter.next();
1208:                    if (holder.getNodeID().equals(nodeID)) {
1209:                        return true;
1210:                    }
1211:                }
1212:                return false;
1213:            }
1214:
1215:            synchronized boolean holdsGreedyLock(
1216:                    ServerThreadContext threadContext) {
1217:                return (greedyHolders.get(threadContext.getId().getNodeID()) != null);
1218:            }
1219:
1220:            synchronized boolean canAwardGreedilyOnTheClient(
1221:                    ServerThreadContext threadContext, int lockLevel) {
1222:                Holder holder = (Holder) greedyHolders.get(threadContext
1223:                        .getId().getNodeID());
1224:                if (holder != null) {
1225:                    return (LockLevel.isWrite(holder.getLockLevel()) || holder
1226:                            .getLockLevel() == lockLevel);
1227:                }
1228:                return false;
1229:            }
1230:
1231:            private void notifyRevoke(Holder holder) {
1232:                for (int i = 0; i < this .listeners.length; i++) {
1233:                    this .listeners[i].notifyRevoke(holder);
1234:                }
1235:            }
1236:
1237:            void notifyStarted(WaitTimerCallback callback, WaitTimer timer) {
1238:                for (Iterator i = waiters.values().iterator(); i.hasNext();) {
1239:                    LockWaitContext ctxt = (LockWaitContext) i.next();
1240:                    scheduleWait(callback, timer, ctxt);
1241:                }
1242:            }
1243:
1244:            synchronized boolean isAllPendingLockRequestsFromNode(NodeID nodeID) {
1245:                for (Iterator i = pendingLockRequests.values().iterator(); i
1246:                        .hasNext();) {
1247:                    Request r = (Request) i.next();
1248:                    if (!r.getRequesterID().equals(nodeID)) {
1249:                        return false;
1250:                    }
1251:                }
1252:                return true;
1253:            }
1254:
1255:            /**
1256:             * This clears out stuff from the pending and wait lists that belonged to a dead session. It occurs to me that this is
1257:             * a race condition because a request could come in on the connection, then the cleanup could happen, and then the
1258:             * request could be processed. We need to drop requests that are processed after the cleanup
1259:             * 
1260:             * @param nid
1261:             */
1262:            synchronized void clearStateForNode(NodeID nid) {
1263:                // debug("clearStateForChannel() - BEGIN -", channelId);
1264:                for (Iterator i = holders.values().iterator(); i.hasNext();) {
1265:                    Holder holder = (Holder) i.next();
1266:                    if (holder.getNodeID().equals(nid)) {
1267:                        i.remove();
1268:                    }
1269:                }
1270:                for (Iterator i = pendingLockRequests.values().iterator(); i
1271:                        .hasNext();) {
1272:                    Request r = (Request) i.next();
1273:                    if (r.getRequesterID().equals(nid)) {
1274:                        i.remove();
1275:                    }
1276:                }
1277:
1278:                for (Iterator i = waiters.values().iterator(); i.hasNext();) {
1279:                    LockWaitContext wc = (LockWaitContext) i.next();
1280:                    if (wc.getNodeID().equals(nid)) {
1281:                        i.remove();
1282:                    }
1283:                }
1284:
1285:                for (Iterator i = waitTimers.keySet().iterator(); i.hasNext();) {
1286:                    LockWaitContext wc = (LockWaitContext) i.next();
1287:                    if (wc.getNodeID().equals(nid)) {
1288:                        try {
1289:                            TimerTask task = (TimerTask) waitTimers.get(wc);
1290:                            task.cancel();
1291:                        } finally {
1292:                            i.remove();
1293:                        }
1294:                    }
1295:                }
1296:                removeGreedyHolder(nid);
1297:            }
1298:
1299:            synchronized void checkAndClearStateOnGreedyAward(
1300:                    ThreadID clientThreadID, NodeID ch, int requestedLevel) {
1301:                // We dont want to award a greedy lock if there are waiters. Lock upgrade is not a problem as it is no longer
1302:                // supported.
1303:                // debug("checkAndClearStateOnGreedyAward For ", ch, ", ", LockLevel.toString(requestedLevel));
1304:                // debug("checkAndClear... BEFORE Lock = ", this);
1305:                // Assert.assertTrue(pendingLockUpgrades.size() == 0);
1306:                Assert.assertTrue((requestedLevel == LockLevel.READ)
1307:                        || (waiters.size() == 0));
1308:
1309:                for (Iterator i = holders.values().iterator(); i.hasNext();) {
1310:                    Holder holder = (Holder) i.next();
1311:                    if (holder.getNodeID().equals(ch)) {
1312:                        ServerThreadContext txn = holder.getThreadContext();
1313:                        txn.removeLock(this );
1314:                        threadContextFactory.removeIfClear(txn);
1315:
1316:                        i.remove();
1317:                    }
1318:                }
1319:                for (Iterator i = pendingLockRequests.values().iterator(); i
1320:                        .hasNext();) {
1321:                    Request r = (Request) i.next();
1322:                    if (r.getRequesterID().equals(ch)) {
1323:                        if ((requestedLevel == LockLevel.WRITE)
1324:                                || (r.getLockLevel() == requestedLevel)) {
1325:                            // debug("checkAndClear... removing request = ", r);
1326:                            i.remove();
1327:                            ServerThreadContext tid = r.getThreadContext();
1328:                            // debug("checkAndClear... clearing threadContext = ", tid);
1329:                            clearWaitingOn(tid);
1330:                            cancelTryLockTimer(r);
1331:                        } else {
1332:                            throw new AssertionError(
1333:                                    "Issuing READ lock greedily when WRITE pending !");
1334:                        }
1335:                    }
1336:                }
1337:                // debug("checkAndClear... AFTER Lock = ", this);
1338:            }
1339:
1340:            private void recordLockRequestStat(NodeID nodeID,
1341:                    ThreadID threadID, int lockLevel) {
1342:                lockStatsManager.lockRequested(lockID, nodeID, threadID,
1343:                        lockLevel);
1344:            }
1345:
1346:            private void recordLockAwardStat(NodeID nodeID, ThreadID threadID,
1347:                    boolean isGreedyRequest, long awardTimestamp) {
1348:                lockStatsManager.lockAwarded(lockID, nodeID, threadID,
1349:                        isGreedyRequest, awardTimestamp);
1350:            }
1351:
1352:            private void recordLockReleaseStat(NodeID nodeID, ThreadID threadID) {
1353:                lockStatsManager.lockReleased(lockID, nodeID, threadID);
1354:            }
1355:
1356:            private void recordLockHoppedStat() {
1357:                lockStatsManager.lockHopped(lockID);
1358:            }
1359:
1360:            private void recordLockRejectStat(NodeID nodeID, ThreadID threadID) {
1361:                lockStatsManager.lockRejected(lockID, nodeID, threadID);
1362:            }
1363:
1364:            private void recordLockWaitStat() {
1365:                lockStatsManager.lockWait(lockID);
1366:            }
1367:
1368:            private void recordLockNotifyStat(int numToNotify) {
1369:                lockStatsManager.lockNotified(lockID, numToNotify);
1370:            }
1371:
1372:            // I wish we were using 1.5 !!!
1373:            // private void debug(Object o1, Object o2) {
1374:            // logger.warn(lockID + String.valueOf(o1) + String.valueOf(o2));
1375:            // }
1376:            //
1377:            // private void debug(Object o1, Object o2, Object o3) {
1378:            // logger.warn(lockID + String.valueOf(o1) + String.valueOf(o2) + String.valueOf(o3));
1379:            // }
1380:            //
1381:            // private void debug(Object o1, Object o2, Object o3, Object o4) {
1382:            // logger.warn(lockID + String.valueOf(o1) + String.valueOf(o2) + String.valueOf(o3) + String.valueOf(o4));
1383:            // }
1384:            //
1385:            // private void debug(Object o) {
1386:            // logger.warn(lockID + String.valueOf(o));
1387:            // }
1388:
1389:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.