001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.objectserver.lockmanager.impl;
006:
007: import org.apache.commons.io.output.NullOutputStream;
008:
009: import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
010:
011: import com.tc.exception.TCLockUpgradeNotSupportedError;
012: import com.tc.management.L2LockStatsManager;
013: import com.tc.management.L2LockStatsManagerImpl;
014: import com.tc.net.groups.ClientID;
015: import com.tc.net.groups.NodeID;
016: import com.tc.net.protocol.tcm.ChannelID;
017: import com.tc.object.lockmanager.api.LockID;
018: import com.tc.object.lockmanager.api.LockLevel;
019: import com.tc.object.lockmanager.api.ServerThreadID;
020: import com.tc.object.lockmanager.api.ThreadID;
021: import com.tc.object.tx.WaitInvocation;
022: import com.tc.objectserver.api.TestSink;
023: import com.tc.objectserver.context.LockResponseContext;
024: import com.tc.objectserver.lockmanager.api.DeadlockChain;
025: import com.tc.objectserver.lockmanager.api.DeadlockResults;
026: import com.tc.objectserver.lockmanager.api.LockHolder;
027: import com.tc.objectserver.lockmanager.api.LockMBean;
028: import com.tc.objectserver.lockmanager.api.NullChannelManager;
029: import com.tc.objectserver.lockmanager.api.ServerLockRequest;
030: import com.tc.objectserver.lockmanager.api.Waiter;
031: import com.tc.util.concurrent.ThreadUtil;
032:
033: import java.io.IOException;
034: import java.io.ObjectOutputStream;
035: import java.util.ArrayList;
036: import java.util.Arrays;
037: import java.util.Comparator;
038: import java.util.HashMap;
039: import java.util.Iterator;
040: import java.util.List;
041: import java.util.Map;
042: import java.util.Random;
043:
044: import junit.framework.TestCase;
045:
046: /**
047: * @author steve
048: */
049: public class LockManagerTest extends TestCase {
050: private TestSink sink;
051: private LockManagerImpl lockManager;
052: private Random random = new Random();
053:
054: final int numLocks = 30;
055: final int numThreads = 15;
056: private LockID[] locks = makeUniqueLocks(numLocks);
057: private ServerThreadID[] txns = makeUniqueTxns(numThreads);
058:
059: protected void setUp() throws Exception {
060: super .setUp();
061: resetLockManager();
062: sink = new TestSink();
063: }
064:
065: private void resetLockManager() {
066: resetLockManager(false);
067: }
068:
069: private void resetLockManager(boolean start) {
070: if (lockManager != null) {
071: try {
072: lockManager.stop();
073: } catch (InterruptedException e) {
074: fail();
075: }
076: }
077:
078: lockManager = new LockManagerImpl(new NullChannelManager(),
079: L2LockStatsManager.NULL_LOCK_STATS_MANAGER);
080: lockManager
081: .setLockPolicy(LockManagerImpl.ALTRUISTIC_LOCK_POLICY);
082: if (start) {
083: lockManager.start();
084: }
085: }
086:
087: protected void tearDown() throws Exception {
088: assertEquals(0, lockManager.getLockCount());
089: assertEquals(0, lockManager.getThreadContextCount());
090: super .tearDown();
091: }
092:
093: public void testLockMBean() throws IOException {
094:
095: final long start = System.currentTimeMillis();
096: final ClientID cid1 = new ClientID(new ChannelID(1));
097: final ClientID cid2 = new ClientID(new ChannelID(2));
098:
099: LockID lid1 = new LockID("1");
100: LockID lid2 = new LockID("2");
101: LockID lid3 = new LockID("3");
102: ThreadID tid1 = new ThreadID(1);
103: WaitInvocation wait = new WaitInvocation(Integer.MAX_VALUE);
104:
105: L2LockStatsManager lockStatsManager = new L2LockStatsManagerImpl();
106: lockManager = new LockManagerImpl(new NullChannelManager() {
107: public String getChannelAddress(NodeID nid) {
108: if (cid1.equals(nid)) {
109: return "127.0.0.1:6969";
110: }
111: return "no longer connected";
112: }
113: }, lockStatsManager);
114:
115: lockManager
116: .setLockPolicy(LockManagerImpl.ALTRUISTIC_LOCK_POLICY);
117: lockManager.start();
118: lockStatsManager.start(new NullChannelManager(), lockManager,
119: sink);
120:
121: lockManager
122: .requestLock(lid1, cid1, tid1, LockLevel.WRITE, sink); // hold
123: lockManager
124: .requestLock(lid1, cid2, tid1, LockLevel.WRITE, sink); // pending
125:
126: lockManager.requestLock(lid2, cid1, tid1, LockLevel.READ, sink); // hold
127: lockManager.requestLock(lid2, cid2, tid1, LockLevel.READ, sink); // hold
128: try {
129: lockManager.requestLock(lid2, cid1, tid1, LockLevel.WRITE,
130: sink); // try upgrade and fail
131: throw new AssertionError(
132: "Should have thrown an TCLockUpgradeNotSupportedError.");
133: } catch (TCLockUpgradeNotSupportedError e) {
134: //
135: }
136:
137: lockManager
138: .requestLock(lid3, cid1, tid1, LockLevel.WRITE, sink); // hold
139: lockManager.wait(lid3, cid1, tid1, wait, sink); // wait
140:
141: LockMBean[] lockBeans = lockManager.getAllLocks();
142: assertEquals(3, lockBeans.length);
143: sortLocksByID(lockBeans);
144:
145: LockMBean bean1 = lockBeans[0];
146: LockMBean bean2 = lockBeans[1];
147: LockMBean bean3 = lockBeans[2];
148: testSerialize(bean1);
149: testSerialize(bean2);
150: testSerialize(bean3);
151:
152: validateBean1(bean1, start);
153: validateBean2(bean2, start);
154: validateBean3(bean3, start, wait);
155:
156: lockManager.clearAllLocksFor(cid1);
157: lockManager.clearAllLocksFor(cid2);
158: }
159:
160: private void validateBean3(LockMBean bean3, long time,
161: WaitInvocation wait) {
162: LockHolder[] holders = bean3.getHolders();
163: ServerLockRequest[] reqs = bean3.getPendingRequests();
164: Waiter[] waiters = bean3.getWaiters();
165: assertEquals(0, holders.length);
166: assertEquals(0, reqs.length);
167: assertEquals(1, waiters.length);
168:
169: Waiter waiter = waiters[0];
170: assertEquals(wait.toString(), waiter.getWaitInvocation());
171: assertTrue(waiter.getStartTime() >= time);
172: assertEquals(new ClientID(new ChannelID(1)), waiter.getNodeID());
173: assertEquals("127.0.0.1:6969", waiter.getChannelAddr());
174: assertEquals(new ThreadID(1), waiter.getThreadID());
175: }
176:
177: private void validateBean2(LockMBean bean2, long time) {
178: LockHolder[] holders = bean2.getHolders();
179: ServerLockRequest[] reqs = bean2.getPendingRequests();
180: Waiter[] waiters = bean2.getWaiters();
181: assertEquals(2, holders.length);
182: assertEquals(0, reqs.length);
183: assertEquals(0, waiters.length);
184:
185: LockHolder holder = holders[0];
186: assertEquals(LockLevel.toString(LockLevel.READ), holder
187: .getLockLevel());
188: assertTrue(holder.getTimeAcquired() >= time);
189: assertEquals(new ClientID(new ChannelID(1)), holder.getNodeID());
190: assertEquals("127.0.0.1:6969", holder.getChannelAddr());
191: assertEquals(new ThreadID(1), holder.getThreadID());
192:
193: holder = holders[1];
194: assertEquals(LockLevel.toString(LockLevel.READ), holder
195: .getLockLevel());
196: assertTrue(holder.getTimeAcquired() >= time);
197: assertEquals(new ClientID(new ChannelID(2)), holder.getNodeID());
198: assertEquals("no longer connected", holder.getChannelAddr());
199: assertEquals(new ThreadID(1), holder.getThreadID());
200:
201: // ServerLockRequest up = upgrades[0];
202: // assertEquals(LockLevel.toString(LockLevel.WRITE), up.getLockLevel());
203: // assertTrue(up.getRequestTime() >= time);
204: // assertEquals(new ChannelID(1), up.getChannelID());
205: // assertEquals("127.0.0.1:6969", up.getChannelAddr());
206: // assertEquals(new ThreadID(1), up.getThreadID());
207: }
208:
209: private void validateBean1(LockMBean bean1, long time) {
210: LockHolder[] holders = bean1.getHolders();
211: ServerLockRequest[] reqs = bean1.getPendingRequests();
212: Waiter[] waiters = bean1.getWaiters();
213: assertEquals(1, holders.length);
214: assertEquals(1, reqs.length);
215: assertEquals(0, waiters.length);
216:
217: LockHolder holder = holders[0];
218: assertEquals(LockLevel.toString(LockLevel.WRITE), holder
219: .getLockLevel());
220: assertTrue(holder.getTimeAcquired() >= time);
221: assertEquals(new ClientID(new ChannelID(1)), holder.getNodeID());
222: assertEquals(new ThreadID(1), holder.getThreadID());
223: assertEquals("127.0.0.1:6969", holder.getChannelAddr());
224:
225: ServerLockRequest req = reqs[0];
226: assertEquals(LockLevel.toString(LockLevel.WRITE), req
227: .getLockLevel());
228: assertTrue(req.getRequestTime() >= time);
229: assertEquals(new ClientID(new ChannelID(2)), req.getNodeID());
230: assertEquals("no longer connected", req.getChannelAddr());
231: assertEquals(new ThreadID(1), req.getThreadID());
232: }
233:
234: private void testSerialize(Object o) throws IOException {
235: ObjectOutputStream oos = new ObjectOutputStream(
236: new NullOutputStream());
237: oos.writeObject(o);
238: oos.close();
239: }
240:
241: private void sortLocksByID(LockMBean[] lockBeans) {
242: Arrays.sort(lockBeans, new Comparator() {
243: public int compare(Object o1, Object o2) {
244: LockMBean l1 = (LockMBean) o1;
245: LockMBean l2 = (LockMBean) o2;
246:
247: String id1 = l1.getLockName();
248: String id2 = l2.getLockName();
249:
250: return id1.compareTo(id2);
251: }
252: });
253: }
254:
255: public void testReestablishWait() throws Exception {
256: LockID lockID1 = new LockID("my lock");
257: ClientID nid1 = new ClientID(new ChannelID(1));
258: ThreadID tx1 = new ThreadID(1);
259: ThreadID tx2 = new ThreadID(2);
260:
261: try {
262: assertEquals(0, lockManager.getLockCount());
263: long waitTime = 1000;
264: WaitInvocation waitCall = new WaitInvocation(waitTime);
265: TestSink responseSink = new TestSink();
266: long t0 = System.currentTimeMillis();
267: lockManager.reestablishWait(lockID1, nid1, tx1,
268: LockLevel.WRITE, waitCall, responseSink);
269: lockManager.reestablishWait(lockID1, nid1, tx2,
270: LockLevel.WRITE, waitCall, responseSink);
271: lockManager.start();
272:
273: LockResponseContext ctxt = (LockResponseContext) responseSink
274: .waitForAdd(waitTime * 3);
275: assertTrue(ctxt != null);
276: assertTrue(System.currentTimeMillis() - t0 >= waitTime);
277: assertResponseContext(lockID1, nid1, tx1, LockLevel.WRITE,
278: ctxt);
279: assertTrue(ctxt.isLockWaitTimeout());
280: ThreadUtil.reallySleep(waitTime * 3);
281: assertEquals(3, responseSink.size()); // 2 wait timeouts and 1 award
282: ctxt = (LockResponseContext) responseSink.take();
283: LockResponseContext ctxt1 = (LockResponseContext) responseSink
284: .take();
285: LockResponseContext ctxt2 = (LockResponseContext) responseSink
286: .take();
287: assertTrue((ctxt1.isLockAward() && ctxt2
288: .isLockWaitTimeout())
289: || (ctxt2.isLockAward() && ctxt1
290: .isLockWaitTimeout()));
291:
292: } finally {
293: lockManager = null;
294: resetLockManager();
295: }
296: }
297:
298: public void testReestablishLockAfterReestablishWait()
299: throws Exception {
300: LockID lockID1 = new LockID("my lock");
301: ClientID cid1 = new ClientID(new ChannelID(1));
302: ThreadID tx1 = new ThreadID(1);
303: ThreadID tx2 = new ThreadID(2);
304: int requestedLevel = LockLevel.WRITE;
305: WaitInvocation waitCall = new WaitInvocation();
306: try {
307: TestSink responseSink = new TestSink();
308: assertEquals(0, lockManager.getLockCount());
309: lockManager.reestablishWait(lockID1, cid1, tx1,
310: LockLevel.WRITE, waitCall, responseSink);
311: assertEquals(1, lockManager.getLockCount());
312: assertEquals(0, responseSink.getInternalQueue().size());
313:
314: // now try to award the lock to the same client-transaction
315: try {
316: lockManager.reestablishLock(lockID1, cid1, tx1,
317: requestedLevel, responseSink);
318: fail("Should have thrown an AssertionError.");
319: } catch (AssertionError e) {
320: // expected
321: }
322: // now try to reestablish the same lock from a different transaction. It
323: // sould succeed
324: assertEquals(1, lockManager.getLockCount());
325: lockManager.reestablishLock(lockID1, cid1, tx2,
326: requestedLevel, responseSink);
327: } finally {
328: lockManager = null;
329: resetLockManager();
330: }
331: }
332:
333: public void testReestablishReadLock() throws Exception {
334: LockID lockID1 = new LockID("my lock");
335: ClientID cid1 = new ClientID(new ChannelID(1));
336: ThreadID tx1 = new ThreadID(1);
337: ThreadID tx2 = new ThreadID(2);
338: ThreadID tx3 = new ThreadID(3);
339: int requestedLevel = LockLevel.READ;
340:
341: try {
342: TestSink responseSink = new TestSink();
343: assertEquals(0, lockManager.getLockCount());
344:
345: lockManager.reestablishLock(lockID1, cid1, tx1,
346: requestedLevel, responseSink);
347: assertEquals(1, lockManager.getLockCount());
348:
349: // now reestablish the same read lock in another transaction. It should
350: // succeed.
351: lockManager.reestablishLock(lockID1, cid1, tx2,
352: requestedLevel, responseSink);
353: assertEquals(1, lockManager.getLockCount());
354:
355: // now reestablish the the same write lock. It should fail.
356: try {
357: lockManager.reestablishLock(lockID1, cid1, tx3,
358: LockLevel.WRITE, responseSink);
359: fail("Should have thrown a LockManagerError.");
360: } catch (AssertionError e) {
361: //
362: }
363:
364: } finally {
365: // this needs to be done for tearDown() to pass.
366: lockManager = null;
367: resetLockManager();
368: }
369:
370: try {
371: TestSink responseSink = new TestSink();
372: assertEquals(0, lockManager.getLockCount());
373: lockManager.reestablishLock(lockID1, cid1, tx1,
374: LockLevel.WRITE, responseSink);
375: assertEquals(1, lockManager.getLockCount());
376:
377: // now reestablish a read lock. This should fail.
378: responseSink = new TestSink();
379: try {
380: lockManager.reestablishLock(lockID1, cid1, tx2,
381: LockLevel.READ, responseSink);
382: fail("Should have thrown a LockManagerError");
383: } catch (AssertionError e) {
384: //
385: }
386:
387: } finally {
388: lockManager = null;
389: resetLockManager();
390: }
391: }
392:
393: public void testReestablishWriteLock() throws Exception {
394:
395: LockID lockID1 = new LockID("my lock");
396: LockID lockID2 = new LockID("my other lock");
397: ClientID cid1 = new ClientID(new ChannelID(1));
398: ClientID cid2 = new ClientID(new ChannelID(2));
399: ThreadID tx1 = new ThreadID(1);
400: ThreadID tx2 = new ThreadID(2);
401: int requestedLevel = LockLevel.WRITE;
402:
403: try {
404: TestSink responseSink = new TestSink();
405: assertEquals(0, lockManager.getLockCount());
406: lockManager.reestablishLock(lockID1, cid1, tx1,
407: requestedLevel, responseSink);
408: assertEquals(1, lockManager.getLockCount());
409:
410: try {
411: lockManager.reestablishLock(lockID1, cid2, tx2,
412: requestedLevel, responseSink);
413: fail("Expected a LockManagerError!");
414: } catch (AssertionError e) {
415: //
416: }
417:
418: // try to reestablish another lock. It should succeed.
419: lockManager.reestablishLock(lockID2, cid1, tx1,
420: requestedLevel, responseSink);
421:
422: lockManager.start();
423: // you shouldn't be able to call reestablishLock after the lock manager
424: // has started.
425: try {
426: lockManager.reestablishLock(lockID1, cid1, tx1,
427: requestedLevel, null);
428: fail("Should have thrown a LockManagerError");
429: } catch (Error e) {
430: //
431: }
432:
433: } finally {
434: // this needs to be done for tearDown() to pass.
435: lockManager = null;
436: resetLockManager();
437: }
438: }
439:
440: // private void assertResponseSink(LockID lockID, ChannelID channel, TransactionID tx, int requestedLevel,
441: // TestSink responseSink) {
442: // assertEquals(1, responseSink.getInternalQueue().size());
443: // LockResponseContext ctxt = (LockResponseContext) responseSink.getInternalQueue().get(0);
444: // assertResponseContext(lockID, channel, tx, requestedLevel, ctxt);
445: // }
446:
447: private void assertResponseContext(LockID lockID, NodeID nid,
448: ThreadID tx1, int requestedLevel, LockResponseContext ctxt) {
449: assertEquals(lockID, ctxt.getLockID());
450: assertEquals(nid, ctxt.getNodeID());
451: assertEquals(tx1, ctxt.getThreadID());
452: assertEquals(requestedLevel, ctxt.getLockLevel());
453: }
454:
455: public void testWaitTimeoutsIgnoredDuringStartup() throws Exception {
456: LockID lockID = new LockID("my lcok");
457: ClientID cid1 = new ClientID(new ChannelID(1));
458: ThreadID tx1 = new ThreadID(1);
459: try {
460: long waitTime = 1000;
461: WaitInvocation waitInvocation = new WaitInvocation(waitTime);
462: TestSink responseSink = new TestSink();
463: lockManager.reestablishWait(lockID, cid1, tx1,
464: LockLevel.WRITE, waitInvocation, responseSink);
465:
466: LockResponseContext ctxt = (LockResponseContext) responseSink
467: .waitForAdd(waitTime * 2);
468: assertNull(ctxt);
469:
470: lockManager.start();
471: ctxt = (LockResponseContext) responseSink.waitForAdd(0);
472: assertNotNull(ctxt);
473: } finally {
474: lockManager = null;
475: resetLockManager();
476: }
477: }
478:
479: public void testWaitTimeoutsIgnoredDuringShutdown()
480: throws InterruptedException {
481:
482: ClientID cid = new ClientID(new ChannelID(1));
483: LockID lockID = new LockID("1");
484: ThreadID txID = new ThreadID(1);
485:
486: lockManager.start();
487: boolean granted = lockManager.requestLock(lockID, cid, txID,
488: LockLevel.WRITE, sink);
489: assertTrue(granted);
490:
491: lockManager.wait(lockID, cid, txID, new WaitInvocation(1000),
492: sink);
493: lockManager.stop();
494:
495: assertFalse(lockManager.hasPending(lockID));
496: assertEquals(0, lockManager.getLockCount());
497:
498: ThreadUtil.reallySleep(1500);
499:
500: assertFalse(lockManager.hasPending(lockID));
501: assertEquals(0, lockManager.getLockCount());
502: }
503:
504: public void testOffBlocksUntilNoOutstandingLocksViaWait()
505: throws Exception {
506: // this is no longer expected behavior
507: if (true)
508: return;
509: List queue = sink.getInternalQueue();
510: ClientID cid = new ClientID(new ChannelID(1));
511: LockID lockID = new LockID("1");
512: ThreadID txID = new ThreadID(1);
513:
514: final LinkedQueue shutdownSteps = new LinkedQueue();
515: ShutdownThread shutdown = new ShutdownThread(shutdownSteps);
516:
517: try {
518: lockManager.start();
519: lockManager.requestLock(lockID, cid, txID, LockLevel.WRITE,
520: sink);
521: assertEquals(1, queue.size());
522:
523: shutdown.start();
524: shutdownSteps.take();
525: ThreadUtil.reallySleep(500);
526: // make sure shutdown didn't complete.
527: assertTrue(shutdownSteps.peek() == null);
528:
529: // make sure that waiting on an outstanding lock causes the lock to be
530: // released and allows shutdown to
531: // complete.
532: lockManager.wait(lockID, cid, new ThreadID(1),
533: new WaitInvocation(), sink);
534: shutdownSteps.take();
535:
536: } finally {
537: lockManager.clearAllLocksFor(cid);
538: }
539: }
540:
541: public void testOffDoesNotBlockUntilNoOutstandingLocksViaUnlock()
542: throws Exception {
543: List queue = sink.getInternalQueue();
544: ClientID cid1 = new ClientID(new ChannelID(1));
545: LockID lock1 = new LockID("1");
546: ThreadID tx1 = new ThreadID(1);
547:
548: final LinkedQueue shutdownSteps = new LinkedQueue();
549: ShutdownThread shutdown = new ShutdownThread(shutdownSteps);
550: try {
551: lockManager.start();
552: lockManager.requestLock(lock1, cid1, tx1, LockLevel.WRITE,
553: sink);
554: assertEquals(1, queue.size());
555:
556: shutdown.start();
557: shutdownSteps.take();
558: ThreadUtil.reallySleep(1000);
559: shutdownSteps.take();
560: } finally {
561: lockManager = null;
562: resetLockManager();
563: }
564: }
565:
566: public void testOffCancelsWaits() throws Exception {
567: // implement me.
568: }
569:
570: public void testOffStopsGrantingNewLocks() throws Exception {
571: List queue = sink.getInternalQueue();
572: ClientID cid = new ClientID(new ChannelID(1));
573: LockID lockID = new LockID("1");
574: ThreadID txID = new ThreadID(1);
575: try {
576: // Test that the normal case works as expected...
577: lockManager.start();
578: lockManager.requestLock(lockID, cid, txID, LockLevel.WRITE,
579: sink);
580: assertEquals(1, queue.size());
581: queue.clear();
582: lockManager.unlock(lockID, cid, txID);
583: lockManager.requestLock(lockID, cid, txID, LockLevel.WRITE,
584: sink);
585: assertEquals(1, queue.size());
586: lockManager.unlock(lockID, cid, txID);
587:
588: // Call shutdown and make sure that the lock isn't granted via the
589: // "requestLock" method
590: queue.clear();
591: lockManager.stop();
592: lockManager.requestLock(lockID, cid, txID, LockLevel.WRITE,
593: sink);
594: assertEquals(0, queue.size());
595: } finally {
596: lockManager.clearAllLocksFor(cid);
597: }
598: }
599:
600: public void testRequestDoesntGrantPendingLocks() throws Exception {
601: List queue = sink.getInternalQueue();
602: ClientID cid = new ClientID(new ChannelID(1));
603: LockID lockID = new LockID("1");
604: ThreadID txID = new ThreadID(1);
605:
606: try {
607: lockManager.start();
608: // now try stacking locks and make sure that calling unlock doesn't grant
609: // the pending locks.
610: lockManager.requestLock(lockID, cid, txID, LockLevel.WRITE,
611: sink);
612: queue.clear();
613: lockManager.requestLock(lockID, cid, new ThreadID(2),
614: LockLevel.WRITE, sink);
615: // the second lock should be pending.
616: assertEquals(0, queue.size());
617: } finally {
618: lockManager = null;
619: resetLockManager();
620: }
621: }
622:
623: public void testUnlockIgnoredDuringShutdown() throws Exception {
624: List queue = sink.getInternalQueue();
625: ClientID cid = new ClientID(new ChannelID(1));
626: LockID lockID = new LockID("1");
627: ThreadID txID = new ThreadID(1);
628: try {
629: lockManager.start();
630: // now try stacking locks and make sure that calling unlock doesn't grant
631: // the pending locks.
632: lockManager.requestLock(lockID, cid, txID, LockLevel.WRITE,
633: sink);
634: queue.clear();
635: lockManager.requestLock(lockID, cid, new ThreadID(2),
636: LockLevel.WRITE, sink);
637: // the second lock should be pending.
638: assertEquals(0, queue.size());
639:
640: lockManager.stop();
641:
642: // unlock the first lock
643: lockManager.unlock(lockID, cid, txID);
644: // the second lock should still be pending
645: assertEquals(0, queue.size());
646:
647: } finally {
648: lockManager = null;
649: resetLockManager();
650: }
651: }
652:
653: public void testLockManagerBasics() {
654: LockID l1 = new LockID("1");
655: ClientID c1 = new ClientID(new ChannelID(1));
656: ThreadID s1 = new ThreadID(0);
657:
658: ClientID c2 = new ClientID(new ChannelID(2));
659: ClientID c3 = new ClientID(new ChannelID(3));
660: ClientID c4 = new ClientID(new ChannelID(4));
661: lockManager.start();
662: lockManager.requestLock(l1, c1, s1, LockLevel.WRITE, sink);
663: assertTrue(sink.size() == 1);
664: System.out.println(sink.getInternalQueue().remove(0));
665:
666: lockManager.requestLock(l1, c2, s1, LockLevel.WRITE, sink);
667: assertTrue(sink.size() == 0);
668: lockManager.unlock(l1, c1, s1);
669: assertTrue(sink.size() == 1);
670: System.out.println(sink.getInternalQueue().remove(0));
671:
672: lockManager.requestLock(l1, c3, s1, LockLevel.READ, sink);
673: assertTrue(sink.size() == 0);
674: assertTrue(lockManager.hasPending(l1));
675: lockManager.unlock(l1, c2, s1);
676: assertTrue(sink.size() == 1);
677: assertFalse(lockManager.hasPending(l1));
678:
679: lockManager.requestLock(l1, c4, s1, LockLevel.WRITE, sink);
680: assertTrue(lockManager.hasPending(l1));
681: lockManager.unlock(l1, c3, s1);
682: assertFalse(lockManager.hasPending(l1));
683: lockManager.unlock(l1, c4, s1);
684: }
685:
686: public void testDeadLock1() {
687: // A simple deadlock. Thread 1 holds lock1, wants lock2. Thread2 holds
688: // lock2, wants lock1
689:
690: LockID l1 = new LockID("1");
691: LockID l2 = new LockID("2");
692: ClientID c1 = new ClientID(new ChannelID(1));
693:
694: ThreadID s1 = new ThreadID(1);
695: ThreadID s2 = new ThreadID(2);
696:
697: ServerThreadID thread1 = new ServerThreadID(c1, s1);
698: ServerThreadID thread2 = new ServerThreadID(c1, s2);
699:
700: lockManager.start();
701: // thread1 gets lock1
702: lockManager.requestLock(l1, c1, s1, LockLevel.WRITE, sink);
703: // thread2 gets lock2
704: lockManager.requestLock(l2, c1, s2, LockLevel.WRITE, sink);
705: // thread1 trys to get lock2 (blocks)
706: lockManager.requestLock(l2, c1, s1, LockLevel.WRITE, sink);
707: // thread2 trys to get lock1 (blocks)
708: lockManager.requestLock(l1, c1, s2, LockLevel.WRITE, sink);
709:
710: TestDeadlockResults deadlocks = new TestDeadlockResults();
711: lockManager.scanForDeadlocks(deadlocks);
712:
713: assertEquals(1, deadlocks.chains.size());
714: Map check = new HashMap();
715: check.put(thread1, l2);
716: check.put(thread2, l1);
717: assertSpecificDeadlock((DeadlockChain) deadlocks.chains.get(0),
718: check);
719:
720: // test the mgmt interface too
721: DeadlockChain[] results = lockManager.scanForDeadlocks();
722: assertEquals(1, results.length);
723: check = new HashMap();
724: check.put(thread1, l2);
725: check.put(thread2, l1);
726: assertSpecificDeadlock(results[0], check);
727:
728: lockManager.clearAllLocksFor(c1);
729: }
730:
731: public void testDeadLock3() {
732: // test that includes locks with more than 1 holder
733:
734: // contended locks
735: LockID l1 = new LockID("1");
736: LockID l2 = new LockID("2");
737:
738: // uncontended read locks
739: LockID l3 = new LockID("3");
740: LockID l4 = new LockID("4");
741: LockID l5 = new LockID("5");
742:
743: ClientID c1 = new ClientID(new ChannelID(1));
744: ThreadID s1 = new ThreadID(1);
745: ThreadID s2 = new ThreadID(2);
746:
747: ServerThreadID thread1 = new ServerThreadID(c1, s1);
748: ServerThreadID thread2 = new ServerThreadID(c1, s2);
749:
750: lockManager.start();
751:
752: // thread1 holds all three read locks, thread2 has 2 of them
753: lockManager.requestLock(l3, c1, s1, LockLevel.READ, sink);
754: lockManager.requestLock(l4, c1, s1, LockLevel.READ, sink);
755: lockManager.requestLock(l5, c1, s1, LockLevel.READ, sink);
756: lockManager.requestLock(l3, c1, s2, LockLevel.READ, sink);
757: lockManager.requestLock(l4, c1, s2, LockLevel.READ, sink);
758:
759: // thread1 gets lock1
760: lockManager.requestLock(l1, c1, s1, LockLevel.WRITE, sink);
761: // thread2 gets lock2
762: lockManager.requestLock(l2, c1, s2, LockLevel.WRITE, sink);
763: // thread1 trys to get lock2 (blocks)
764: lockManager.requestLock(l2, c1, s1, LockLevel.WRITE, sink);
765: // thread2 trys to get lock1 (blocks)
766: lockManager.requestLock(l1, c1, s2, LockLevel.WRITE, sink);
767:
768: TestDeadlockResults deadlocks = new TestDeadlockResults();
769: lockManager.scanForDeadlocks(deadlocks);
770:
771: assertEquals(1, deadlocks.chains.size());
772: Map check = new HashMap();
773: check.put(thread1, l2);
774: check.put(thread2, l1);
775: assertSpecificDeadlock((DeadlockChain) deadlocks.chains.get(0),
776: check);
777:
778: lockManager.clearAllLocksFor(c1);
779: }
780:
781: public void disableTestUpgradeDeadLock() {
782: // Detect deadlock in competing upgrades
783: LockID l1 = new LockID("L1");
784:
785: ClientID c0 = new ClientID(new ChannelID(0));
786: ThreadID s1 = new ThreadID(1);
787: ThreadID s2 = new ThreadID(2);
788:
789: ServerThreadID thread1 = new ServerThreadID(c0, s1);
790: ServerThreadID thread2 = new ServerThreadID(c0, s2);
791:
792: lockManager.start();
793:
794: // thread1 gets lock1 (R)
795: lockManager.requestLock(l1, c0, s1, LockLevel.READ, sink);
796: // thread2 gets lock1 (R)
797: lockManager.requestLock(l1, c0, s2, LockLevel.READ, sink);
798:
799: // thread1 requests upgrade
800: lockManager.requestLock(l1, c0, s1, LockLevel.WRITE, sink);
801: // thread2 requests upgrade
802: lockManager.requestLock(l1, c0, s2, LockLevel.WRITE, sink);
803:
804: TestDeadlockResults deadlocks = new TestDeadlockResults();
805: lockManager.scanForDeadlocks(deadlocks);
806:
807: assertEquals(1, deadlocks.chains.size());
808:
809: Map check = new HashMap();
810: check.put(thread1, l1);
811: check.put(thread2, l1);
812: assertSpecificDeadlock((DeadlockChain) deadlocks.chains.get(0),
813: check);
814:
815: lockManager.clearAllLocksFor(c0);
816: }
817:
818: public void testLackOfDeadlock() throws InterruptedException {
819: lockManager.start();
820: for (int i = 0; i < 500; i++) {
821: internalTestLackofDeadlock(false);
822: resetLockManager(true);
823: internalTestLackofDeadlock(true);
824: resetLockManager(true);
825: }
826: }
827:
828: private void internalTestLackofDeadlock(boolean useRealThreads)
829: throws InterruptedException {
830: List threads = new ArrayList();
831:
832: for (int t = 0; t < numThreads; t++) {
833: NodeID cid = txns[t].getNodeID();
834: ThreadID tid = txns[t].getClientThreadID();
835:
836: RandomRequest req = new RandomRequest(cid, tid);
837: if (useRealThreads) {
838: Thread thread = new Thread(req);
839: thread.start();
840: threads.add(thread);
841: } else {
842: req.run();
843: }
844: }
845:
846: if (useRealThreads) {
847: for (Iterator iter = threads.iterator(); iter.hasNext();) {
848: Thread t = (Thread) iter.next();
849: t.join();
850: }
851: }
852:
853: TestDeadlockResults results = new TestDeadlockResults();
854: lockManager.scanForDeadlocks(results);
855:
856: assertEquals(0, results.chains.size());
857:
858: for (int i = 0; i < txns.length; i++) {
859: lockManager.clearAllLocksFor(txns[i].getNodeID());
860: }
861: }
862:
863: private class RandomRequest implements Runnable {
864: private final NodeID cid;
865: private final ThreadID tid;
866:
867: public RandomRequest(NodeID cid, ThreadID tid) {
868: this .cid = cid;
869: this .tid = tid;
870: }
871:
872: public void run() {
873: final int start = random.nextInt(numLocks);
874: final int howMany = random.nextInt(numLocks - start);
875:
876: for (int i = 0; i < howMany; i++) {
877: LockID lock = locks[start + i];
878: boolean read = random.nextInt(10) < 8; // 80% reads
879: int level = read ? LockLevel.READ : LockLevel.WRITE;
880: boolean granted = lockManager.requestLock(lock, cid,
881: tid, level, sink);
882: if (!granted) {
883: break;
884: }
885: }
886: }
887: }
888:
889: private ServerThreadID[] makeUniqueTxns(int num) {
890: ServerThreadID[] rv = new ServerThreadID[num];
891: for (int i = 0; i < num; i++) {
892: rv[i] = new ServerThreadID(new ClientID(new ChannelID(i)),
893: new ThreadID(i));
894: }
895: return rv;
896: }
897:
898: private LockID[] makeUniqueLocks(int num) {
899: LockID[] rv = new LockID[num];
900: for (int i = 0; i < num; i++) {
901: rv[i] = new LockID("lock-" + i);
902: }
903:
904: return rv;
905: }
906:
907: private void assertSpecificDeadlock(DeadlockChain chain, Map check) {
908: DeadlockChain start = chain;
909: do {
910: LockID lock = (LockID) check.remove(chain.getWaiter());
911: assertEquals(lock, chain.getWaitingOn());
912: chain = chain.getNextLink();
913: } while (chain != start);
914:
915: assertEquals(0, check.size());
916: }
917:
918: public void testDeadLock2() {
919: // A slightly more complicated deadlock:
920: // -- Thread1 holds lock1, wants lock2
921: // -- Thread2 holds lock2, wants lock3
922: // -- Thread3 holds lock3, wants lock1
923:
924: LockID l1 = new LockID("L1");
925: LockID l2 = new LockID("L2");
926: LockID l3 = new LockID("L3");
927: ClientID c0 = new ClientID(new ChannelID(0));
928: ThreadID s1 = new ThreadID(1);
929: ThreadID s2 = new ThreadID(2);
930: ThreadID s3 = new ThreadID(3);
931:
932: ServerThreadID thread1 = new ServerThreadID(c0, s1);
933: ServerThreadID thread2 = new ServerThreadID(c0, s2);
934: ServerThreadID thread3 = new ServerThreadID(c0, s3);
935:
936: lockManager.start();
937:
938: // thread1 gets lock1
939: lockManager.requestLock(l1, c0, s1, LockLevel.WRITE, sink);
940: // thread2 gets lock2
941: lockManager.requestLock(l2, c0, s2, LockLevel.WRITE, sink);
942: // thread3 gets lock3
943: lockManager.requestLock(l3, c0, s3, LockLevel.WRITE, sink);
944:
945: // thread1 trys to get lock2 (blocks)
946: lockManager.requestLock(l2, c0, s1, LockLevel.WRITE, sink);
947: // thread2 trys to get lock3 (blocks)
948: lockManager.requestLock(l3, c0, s2, LockLevel.WRITE, sink);
949: // thread3 trys to get lock1 (blocks)
950: lockManager.requestLock(l1, c0, s3, LockLevel.WRITE, sink);
951:
952: TestDeadlockResults deadlocks = new TestDeadlockResults();
953: lockManager.scanForDeadlocks(deadlocks);
954:
955: assertEquals(1, deadlocks.chains.size());
956:
957: Map check = new HashMap();
958: check.put(thread1, l2);
959: check.put(thread2, l3);
960: check.put(thread3, l1);
961: assertSpecificDeadlock((DeadlockChain) deadlocks.chains.get(0),
962: check);
963:
964: lockManager.clearAllLocksFor(c0);
965: }
966:
967: private class ShutdownThread extends Thread {
968: private final LinkedQueue shutdownSteps;
969:
970: private ShutdownThread(LinkedQueue shutdownSteps) {
971: this .shutdownSteps = shutdownSteps;
972: }
973:
974: public void run() {
975: try {
976: shutdownSteps.put(new Object());
977: lockManager.stop();
978: shutdownSteps.put(new Object());
979: } catch (Exception e) {
980: e.printStackTrace();
981: fail();
982: }
983: }
984: }
985:
986: private static class TestDeadlockResults implements DeadlockResults {
987: final List chains = new ArrayList();
988:
989: public void foundDeadlock(DeadlockChain chain) {
990: chains.add(chain);
991: }
992: }
993:
994: }
|