001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.object.lockmanager.impl;
006:
007: import org.apache.commons.collections.map.ListOrderedMap;
008:
009: import com.tc.logging.TCLogger;
010: import com.tc.management.ClientLockStatManager;
011: import com.tc.object.bytecode.ManagerUtil;
012: import com.tc.object.lockmanager.api.ClientLockManager;
013: import com.tc.object.lockmanager.api.LockFlushCallback;
014: import com.tc.object.lockmanager.api.LockID;
015: import com.tc.object.lockmanager.api.LockLevel;
016: import com.tc.object.lockmanager.api.Notify;
017: import com.tc.object.lockmanager.api.QueryLockRequest;
018: import com.tc.object.lockmanager.api.RemoteLockManager;
019: import com.tc.object.lockmanager.api.ThreadID;
020: import com.tc.object.lockmanager.api.WaitListener;
021: import com.tc.object.lockmanager.api.WaitTimer;
022: import com.tc.object.session.SessionID;
023: import com.tc.object.session.SessionManager;
024: import com.tc.object.tx.WaitInvocation;
025: import com.tc.text.ConsoleParagraphFormatter;
026: import com.tc.text.ParagraphFormatter;
027: import com.tc.text.StringFormatter;
028: import com.tc.util.Assert;
029: import com.tc.util.State;
030: import com.tc.util.Util;
031:
032: import java.util.ArrayList;
033: import java.util.Collection;
034: import java.util.HashMap;
035: import java.util.HashSet;
036: import java.util.Iterator;
037: import java.util.Map;
038: import java.util.TimerTask;
039:
040: /**
041: * @author steve
042: */
043: public class ClientLockManagerImpl implements ClientLockManager,
044: LockFlushCallback {
045:
046: public static final long TIMEOUT = 60 * 1000;
047:
048: private static final State RUNNING = new State("RUNNING");
049: private static final State STARTING = new State("STARTING");
050: private static final State PAUSED = new State("PAUSED");
051:
052: private static final String MISSING_LOCK_TEXT = makeMissingLockText();
053:
054: private State state = RUNNING;
055: private final Map locksByID = new HashMap();
056: private final Map pendingQueryLockRequestsByID = new ListOrderedMap();
057: private final Map lockInfoByID = new HashMap();
058: private final RemoteLockManager remoteLockManager;
059: private final WaitTimer waitTimer = new WaitTimerImpl();
060: private final TCLogger logger;
061: private final SessionManager sessionManager;
062: private final ClientLockStatManager lockStatManager;
063:
064: public ClientLockManagerImpl(TCLogger logger,
065: RemoteLockManager remoteLockManager,
066: SessionManager sessionManager,
067: ClientLockStatManager lockStatManager) {
068: this .logger = logger;
069: this .remoteLockManager = remoteLockManager;
070: this .sessionManager = sessionManager;
071: this .lockStatManager = lockStatManager;
072: waitTimer.getTimer().schedule(new LockGCTask(this ), TIMEOUT,
073: TIMEOUT);
074: }
075:
076: public synchronized void pause() {
077: if (state == PAUSED)
078: throw new AssertionError(
079: "Attempt to pause while already paused : " + state);
080: this .state = PAUSED;
081: for (Iterator iter = new HashSet(locksByID.values()).iterator(); iter
082: .hasNext();) {
083: ClientLock lock = (ClientLock) iter.next();
084: lock.pause();
085: }
086: }
087:
088: public synchronized void starting() {
089: if (state != PAUSED)
090: throw new AssertionError(
091: "Attempt to start when not paused: " + state);
092: this .state = STARTING;
093: }
094:
095: public synchronized void unpause() {
096: if (state != STARTING)
097: throw new AssertionError(
098: "Attempt to unpause when not starting: " + state);
099: this .state = RUNNING;
100: notifyAll();
101: for (Iterator iter = locksByID.values().iterator(); iter
102: .hasNext();) {
103: ClientLock lock = (ClientLock) iter.next();
104: lock.unpause();
105: }
106: resubmitQueryLockRequests();
107: }
108:
109: public synchronized boolean isStarting() {
110: return state == STARTING;
111: }
112:
113: public synchronized void runGC() {
114: waitUntilRunning();
115: logger.info("Running Lock GC...");
116: ArrayList toGC = new ArrayList(locksByID.size());
117: for (Iterator iter = locksByID.values().iterator(); iter
118: .hasNext();) {
119: ClientLock lock = (ClientLock) iter.next();
120: if (lock.timedout()) {
121: toGC.add(lock.getLockID());
122: }
123: }
124: if (toGC.size() > 0) {
125: logger.debug("GCing "
126: + (toGC.size() < 11 ? toGC.toString() : toGC.size()
127: + " Locks ..."));
128: for (Iterator iter = toGC.iterator(); iter.hasNext();) {
129: LockID lockID = (LockID) iter.next();
130: recall(lockID, ThreadID.VM_ID, LockLevel.WRITE);
131: }
132: }
133: }
134:
135: public void enableStat(LockID lockID, int lockStackTraceDepth,
136: int lockStatCollectFrequency) {
137: synchronized (this ) {
138: waitUntilRunning();
139: lockStatManager.enableStackTrace(lockID,
140: lockStackTraceDepth, lockStatCollectFrequency);
141: }
142: }
143:
144: public void disableStat(LockID lockID) {
145: final ClientLock lock;
146:
147: synchronized (this ) {
148: waitUntilRunning();
149: lockStatManager.disableStackTrace(lockID);
150: }
151: }
152:
153: private GlobalLockInfo getLockInfo(LockID lockID, ThreadID threadID) {
154: Object waitLock = addToPendingQueryLockRequest(lockID, threadID);
155: remoteLockManager.queryLock(lockID, threadID);
156: waitForQueryReply(threadID, waitLock);
157: GlobalLockInfo lockInfo;
158: synchronized (lockInfoByID) {
159: lockInfo = (GlobalLockInfo) lockInfoByID.remove(threadID);
160: }
161: return lockInfo;
162: }
163:
164: // TODO:
165: // Needs to take care of the greedy lock case.
166: public int queueLength(LockID lockID, ThreadID threadID) {
167: ClientLock lock;
168: synchronized (this ) {
169: waitUntilRunning();
170: lock = getLock(lockID);
171: }
172: GlobalLockInfo lockInfo = getLockInfo(lockID, threadID);
173:
174: int queueLength = lockInfo.getLockRequestQueueLength();
175: if (lock != null) {
176: queueLength += lock.queueLength();
177: }
178: return queueLength;
179: }
180:
181: // TODO:
182: // Needs to take care of the greedy lock case.
183: public int waitLength(LockID lockID, ThreadID threadID) {
184: ClientLock lock;
185: synchronized (this ) {
186: waitUntilRunning();
187: lock = getLock(lockID);
188: }
189:
190: GlobalLockInfo lockInfo = getLockInfo(lockID, threadID);
191: int waitLength = lockInfo.getWaitersInfo().size();
192:
193: if (lock != null) {
194: return waitLength + lock.waitLength();
195: }
196:
197: return waitLength;
198: }
199:
200: // This methods return the number of times a lock is being locked by threadID.
201: public int localHeldCount(LockID lockID, int lockLevel,
202: ThreadID threadID) {
203: ClientLock lock;
204: synchronized (this ) {
205: waitUntilRunning();
206: lock = (ClientLock) locksByID.get(lockID);
207: }
208: if (lock == null) {
209: return 0;
210: } else {
211: return lock.localHeldCount(threadID, lockLevel);
212: }
213: }
214:
215: // TODO:
216: // Needs to take care of the greedy lock case.
217: public boolean isLocked(LockID lockID, ThreadID threadID,
218: int lockLevel) {
219: ClientLock lock;
220: synchronized (this ) {
221: waitUntilRunning();
222: lock = (ClientLock) locksByID.get(lockID);
223: }
224: if (lock != null) {
225: return lock.isHeldBy(threadID, lockLevel);
226: } else {
227: GlobalLockInfo lockInfo = getLockInfo(lockID, threadID);
228: return lockInfo.isLocked(lockLevel);
229: }
230: }
231:
232: private void waitForQueryReply(ThreadID threadID, Object waitLock) {
233: boolean isInterrupted = false;
234:
235: synchronized (waitLock) {
236: while (!hasLockInfo(threadID)) {
237: try {
238: waitLock.wait();
239: } catch (InterruptedException ioe) {
240: isInterrupted = true;
241: }
242: }
243: }
244: Util.selfInterruptIfNeeded(isInterrupted);
245: }
246:
247: private boolean hasLockInfo(ThreadID threadID) {
248: synchronized (lockInfoByID) {
249: return lockInfoByID.containsKey(threadID);
250: }
251: }
252:
253: public void lock(LockID lockID, ThreadID threadID, int type) {
254: Assert.assertNotNull("threadID", threadID);
255: final ClientLock lock;
256:
257: synchronized (this ) {
258: waitUntilRunning();
259: lock = getOrCreateLock(lockID);
260: lock.incUseCount();
261: }
262: lock.lock(threadID, type);
263: }
264:
265: public boolean tryLock(LockID lockID, ThreadID threadID,
266: WaitInvocation timeout, int type) {
267: Assert.assertNotNull("threadID", threadID);
268: final ClientLock lock;
269:
270: synchronized (this ) {
271: waitUntilRunning();
272: lock = getOrCreateLock(lockID);
273: lock.incUseCount();
274: }
275: boolean isLocked = lock.tryLock(threadID, timeout, type);
276: if (!isLocked) {
277: synchronized (this ) {
278: lock.decUseCount();
279: }
280: cleanUp(lock);
281: }
282: return isLocked;
283: }
284:
285: public void unlock(LockID lockID, ThreadID threadID) {
286: final ClientLock myLock;
287:
288: synchronized (this ) {
289: waitUntilRunning();
290: myLock = (ClientLock) locksByID.get(lockID);
291: if (myLock == null) {
292: throw missingLockException(lockID);
293: }
294: myLock.decUseCount();
295: }
296:
297: myLock.unlock(threadID);
298: cleanUp(myLock);
299: }
300:
301: private AssertionError missingLockException(LockID lockID) {
302: return new AssertionError(MISSING_LOCK_TEXT
303: + " Missing lock ID is " + lockID);
304: }
305:
306: public void wait(LockID lockID, ThreadID threadID,
307: WaitInvocation call, Object waitLock, WaitListener listener)
308: throws InterruptedException {
309: final ClientLock myLock;
310: synchronized (this ) {
311: waitUntilRunning();
312: myLock = (ClientLock) locksByID.get(lockID);
313: }
314: if (myLock == null) {
315: throw missingLockException(lockID);
316: }
317: myLock.wait(threadID, call, waitLock, listener);
318: }
319:
320: public Notify notify(LockID lockID, ThreadID threadID, boolean all) {
321: final ClientLock myLock;
322: synchronized (this ) {
323: waitUntilRunning();
324: myLock = (ClientLock) locksByID.get(lockID);
325: }
326: if (myLock == null) {
327: throw missingLockException(lockID);
328: }
329: return myLock.notify(threadID, all);
330: }
331:
332: /*
333: * The level represents the reason why server wanted a recall and will determite when a recall commit will happen.
334: */
335: public synchronized void recall(LockID lockID, ThreadID threadID,
336: int interestedLevel) {
337: Assert.assertEquals(ThreadID.VM_ID, threadID);
338: if (isPaused()) {
339: logger.warn("Ignoring recall request from dead server : "
340: + lockID + ", " + threadID + " interestedLevel : "
341: + LockLevel.toString(interestedLevel));
342: return;
343: }
344: final ClientLock myLock = (ClientLock) locksByID.get(lockID);
345: if (myLock != null) {
346: myLock.recall(interestedLevel, this );
347: cleanUp(myLock);
348: }
349: }
350:
351: public void transactionsForLockFlushed(LockID lockID) {
352: final ClientLock myLock;
353: synchronized (this ) {
354: waitUntilRunning();
355: myLock = (ClientLock) locksByID.get(lockID);
356: }
357: if (myLock != null) {
358: myLock.transactionsForLockFlushed(lockID);
359: cleanUp(myLock);
360: }
361: }
362:
363: /*
364: * Called from a stage thread and should never be blocked XXX:: I am currently not ignoring reponses from dead server
365: * because of a bug during server restart case. check out https://jira.terracotta.org/jira/browse/DEV-448 . After
366: * fixing that, one can ignore responses while in paused state.
367: */
368: public synchronized void queryLockCommit(ThreadID threadID,
369: GlobalLockInfo globalLockInfo) {
370: synchronized (lockInfoByID) {
371: lockInfoByID.put(threadID, globalLockInfo);
372: }
373: QueryLockRequest qRequest = (QueryLockRequest) pendingQueryLockRequestsByID
374: .remove(threadID);
375: if (qRequest == null) {
376: throw new AssertionError(
377: "Query Lock request does not exist.");
378: }
379: Object waitLock = qRequest.getWaitLock();
380: synchronized (waitLock) {
381: waitLock.notifyAll();
382: }
383: }
384:
385: public synchronized void waitTimedOut(LockID lockID,
386: ThreadID threadID) {
387: notified(lockID, threadID);
388: }
389:
390: private synchronized void cleanUp(ClientLock lock) {
391: if (lock.isClear()) {
392: Object o = locksByID.get(lock.getLockID());
393: if (o == lock) {
394: // Sometimes when called from recall, the unlock would have already removed this lock
395: // from the map and a new lock could be in the map from a new lock request. We dont want to
396: // remove that
397: locksByID.remove(lock.getLockID());
398: }
399: }
400: }
401:
402: /*
403: * Called from a stage thread and should never be blocked
404: */
405: public synchronized void notified(LockID lockID, ThreadID threadID) {
406: if (isPaused()) {
407: logger.warn("Ignoring notified call from dead server : "
408: + lockID + ", " + threadID);
409: return;
410: }
411: final ClientLock myLock = (ClientLock) locksByID.get(lockID);
412: if (myLock == null) {
413: throw new AssertionError(lockID.toString());
414: }
415: myLock.notified(threadID);
416: }
417:
418: /*
419: * XXX::This method is called from a stage thread. It operate on the lock inside the scope of the synchronization
420: * unlike other methods because, we want to decide whether to process this award or not and go with it atomically
421: */
422: public synchronized void awardLock(SessionID sessionID,
423: LockID lockID, ThreadID threadID, int level) {
424: if (isPaused() || !sessionManager.isCurrentSession(sessionID)) {
425: logger.warn("Ignoring lock award from a dead server :"
426: + sessionID + ", " + sessionManager + " : "
427: + lockID + " " + threadID + " "
428: + LockLevel.toString(level) + " state = " + state);
429: return;
430: }
431: final ClientLock lock = (ClientLock) locksByID.get(lockID);
432: if (lock == null) {
433: throw new AssertionError("awardLock(): Lock not found"
434: + lockID.toString() + " :: " + threadID + " :: "
435: + LockLevel.toString(level));
436: }
437: lock.awardLock(threadID, level);
438: }
439:
440: /*
441: * XXX:: @read comment for awardLock();
442: */
443: public synchronized void cannotAwardLock(SessionID sessionID,
444: LockID lockID, ThreadID threadID, int level) {
445: if (isPaused() || !sessionManager.isCurrentSession(sessionID)) {
446: logger.warn("Ignoring lock award from a dead server :"
447: + sessionID + ", " + sessionManager + " : "
448: + lockID + " " + threadID + " level = " + level
449: + " state = " + state);
450: return;
451: }
452: final ClientLock lock = (ClientLock) locksByID.get(lockID);
453: if (lock == null) {
454: throw new AssertionError("Client id: "
455: + ManagerUtil.getClientID()
456: + ", cannotAwardLock(): Lock not found"
457: + lockID.toString() + " :: " + threadID + " :: "
458: + LockLevel.toString(level));
459: }
460: lock.cannotAwardLock(threadID, level);
461: }
462:
463: // This method should be called within a synchronized(this) block.
464: private ClientLock getLock(LockID id) {
465: return (ClientLock) locksByID.get(id);
466: }
467:
468: private synchronized ClientLock getOrCreateLock(LockID id) {
469: ClientLock lock = (ClientLock) locksByID.get(id);
470: if (lock == null) {
471: lock = new ClientLock(id, remoteLockManager, waitTimer,
472: lockStatManager);
473: locksByID.put(id, lock);
474: }
475: return lock;
476: }
477:
478: public LockID lockIDFor(String id) {
479: if (id == null)
480: return LockID.NULL_ID;
481: return new LockID(id);
482: }
483:
484: public synchronized Collection addAllWaitersTo(Collection c) {
485: assertStarting();
486: for (Iterator i = locksByID.values().iterator(); i.hasNext();) {
487: ClientLock lock = (ClientLock) i.next();
488: lock.addAllWaitersTo(c);
489: }
490: return c;
491: }
492:
493: public synchronized Collection addAllHeldLocksTo(Collection c) {
494: assertStarting();
495: for (Iterator i = locksByID.values().iterator(); i.hasNext();) {
496: ClientLock lock = (ClientLock) i.next();
497: lock.addHoldersToAsLockRequests(c);
498: }
499: return c;
500: }
501:
502: public synchronized Collection addAllPendingLockRequestsTo(
503: Collection c) {
504: assertStarting();
505: for (Iterator i = locksByID.values().iterator(); i.hasNext();) {
506: ClientLock lock = (ClientLock) i.next();
507: lock.addAllPendingLockRequestsTo(c);
508: }
509: return c;
510: }
511:
512: public synchronized Collection addAllPendingTryLockRequestsTo(
513: Collection c) {
514: assertStarting();
515: for (Iterator i = locksByID.values().iterator(); i.hasNext();) {
516: ClientLock lock = (ClientLock) i.next();
517: lock.addAllPendingTryLockRequestsTo(c);
518: }
519: return c;
520: }
521:
522: synchronized boolean haveLock(LockID lockID, ThreadID threadID,
523: int lockType) {
524: ClientLock l = (ClientLock) locksByID.get(lockID);
525: if (l == null) {
526: return false;
527: }
528: return l.isHeldBy(threadID, lockType);
529: }
530:
531: private void waitUntilRunning() {
532: boolean isInterrupted = false;
533: while (!isRunning()) {
534: try {
535: wait();
536: } catch (InterruptedException e) {
537: isInterrupted = true;
538: }
539: }
540: Util.selfInterruptIfNeeded(isInterrupted);
541: }
542:
543: public synchronized boolean isRunning() {
544: return (state == RUNNING);
545: }
546:
547: public synchronized boolean isPaused() {
548: return (state == PAUSED);
549: }
550:
551: private void assertStarting() {
552: if (state != STARTING)
553: throw new AssertionError(
554: "ClientLockManager is not STARTING : " + state);
555: }
556:
557: /*
558: * @returns the wait object for lock request
559: */
560: private synchronized Object addToPendingQueryLockRequest(
561: LockID lockID, ThreadID threadID) {
562: // Add Lock Request
563: Object o = new Object();
564: QueryLockRequest qRequest = new QueryLockRequest(lockID,
565: threadID, o);
566: Object old = pendingQueryLockRequestsByID.put(threadID,
567: qRequest);
568: if (old != null) {
569: // formatting
570: throw new AssertionError(
571: "Query Lock request already outstanding - " + old);
572: }
573:
574: return o;
575: }
576:
577: private synchronized void resubmitQueryLockRequests() {
578: for (Iterator i = pendingQueryLockRequestsByID.values()
579: .iterator(); i.hasNext();) {
580: QueryLockRequest qRequest = (QueryLockRequest) i.next();
581: remoteLockManager.queryLock(qRequest.lockID(), qRequest
582: .threadID());
583: }
584: }
585:
586: private static String makeMissingLockText() {
587: ParagraphFormatter formatter = new ConsoleParagraphFormatter(
588: 72, new StringFormatter());
589:
590: String message = "An operation to a DSO lock was attempted for a lock that does not yet exist. This is usually the result ";
591: message += "of an object becoming shared in the middle of synchronized block on that object (in which case the monitorExit ";
592: message += "call will produce this exception). Additionally, attempts to wait()/notify()/notifyAll() on an object in such a block will ";
593: message += "also fail. To workaround this problem, the object/lock need to become shared in the scope of a different (earlier) ";
594: message += "synchronization block.";
595:
596: return formatter.format(message);
597: }
598:
599: static class LockGCTask extends TimerTask {
600:
601: final ClientLockManager lockManager;
602:
603: LockGCTask(ClientLockManager mgr) {
604: lockManager = mgr;
605: }
606:
607: public void run() {
608: lockManager.runGC();
609: }
610: }
611: }
|