0001: /*
0002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
0003: * notice. All rights reserved.
0004: */
0005: package com.tc.objectserver.lockmanager.impl;
0006:
0007: import org.apache.commons.io.output.NullOutputStream;
0008:
0009: import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
0010:
0011: import com.tc.exception.ImplementMe;
0012: import com.tc.management.L2LockStatsManager;
0013: import com.tc.net.TCSocketAddress;
0014: import com.tc.net.groups.ClientID;
0015: import com.tc.net.groups.NodeID;
0016: import com.tc.net.protocol.NetworkStackID;
0017: import com.tc.net.protocol.TCNetworkMessage;
0018: import com.tc.net.protocol.tcm.ChannelEventListener;
0019: import com.tc.net.protocol.tcm.ChannelID;
0020: import com.tc.net.protocol.tcm.MessageChannel;
0021: import com.tc.net.protocol.tcm.TCMessage;
0022: import com.tc.net.protocol.tcm.TCMessageType;
0023: import com.tc.object.lockmanager.api.LockID;
0024: import com.tc.object.lockmanager.api.LockLevel;
0025: import com.tc.object.lockmanager.api.ServerThreadID;
0026: import com.tc.object.lockmanager.api.ThreadID;
0027: import com.tc.object.tx.WaitInvocation;
0028: import com.tc.objectserver.api.TestSink;
0029: import com.tc.objectserver.context.LockResponseContext;
0030: import com.tc.objectserver.lockmanager.api.DeadlockChain;
0031: import com.tc.objectserver.lockmanager.api.DeadlockResults;
0032: import com.tc.objectserver.lockmanager.api.LockHolder;
0033: import com.tc.objectserver.lockmanager.api.LockMBean;
0034: import com.tc.objectserver.lockmanager.api.NullChannelManager;
0035: import com.tc.objectserver.lockmanager.api.ServerLockRequest;
0036: import com.tc.objectserver.lockmanager.api.Waiter;
0037: import com.tc.util.concurrent.ThreadUtil;
0038:
0039: import java.io.IOException;
0040: import java.io.ObjectOutputStream;
0041: import java.net.UnknownHostException;
0042: import java.util.ArrayList;
0043: import java.util.Arrays;
0044: import java.util.Comparator;
0045: import java.util.HashMap;
0046: import java.util.Iterator;
0047: import java.util.List;
0048: import java.util.Map;
0049: import java.util.Random;
0050:
0051: import junit.framework.TestCase;
0052:
0053: /**
0054: * @author steve
0055: */
0056: public class GreedyLockManagerTest extends TestCase {
0057: private TestSink sink;
0058: private LockManagerImpl lockManager;
0059: private Random random = new Random();
0060:
0061: final int numLocks = 100;
0062: final int numThreads = 30;
0063: private LockID[] locks = makeUniqueLocks(numLocks);
0064: private ServerThreadID[] txns = makeUniqueTxns(numThreads);
0065:
0066: protected void setUp() throws Exception {
0067: super .setUp();
0068: resetLockManager();
0069: sink = new TestSink();
0070: }
0071:
0072: private void resetLockManager() {
0073: resetLockManager(false);
0074: }
0075:
0076: private void resetLockManager(boolean start) {
0077: if (lockManager != null) {
0078: try {
0079: lockManager.stop();
0080: } catch (InterruptedException e) {
0081: fail();
0082: }
0083: }
0084:
0085: lockManager = new LockManagerImpl(new NullChannelManager(),
0086: L2LockStatsManager.NULL_LOCK_STATS_MANAGER);
0087: if (start) {
0088: lockManager.start();
0089: }
0090: }
0091:
0092: protected void tearDown() throws Exception {
0093: assertEquals(0, lockManager.getLockCount());
0094: assertEquals(0, lockManager.getThreadContextCount());
0095: super .tearDown();
0096: }
0097:
0098: public void testLockMBean() throws IOException {
0099:
0100: final MessageChannel channel = new TestMessageChannel();
0101:
0102: final long start = System.currentTimeMillis();
0103: final ClientID cid1 = new ClientID(new ChannelID(1));
0104: ClientID cid2 = new ClientID(new ChannelID(2));
0105: ClientID cid3 = new ClientID(new ChannelID(3));
0106: LockID lid1 = new LockID("1");
0107: LockID lid2 = new LockID("2");
0108: LockID lid3 = new LockID("3");
0109: ThreadID tid1 = new ThreadID(1);
0110: WaitInvocation wait = new WaitInvocation(Integer.MAX_VALUE);
0111:
0112: lockManager = new LockManagerImpl(new NullChannelManager() {
0113: public MessageChannel getChannel(ChannelID id) {
0114: if (cid1.equals(id)) {
0115: return channel;
0116: }
0117: return null;
0118: }
0119:
0120: public String getChannelAddress(NodeID nid) {
0121: if (cid1.equals(nid)) {
0122: return "127.0.0.1:6969";
0123: }
0124: return "no longer connected";
0125: }
0126: }, L2LockStatsManager.NULL_LOCK_STATS_MANAGER);
0127:
0128: lockManager.start();
0129:
0130: lockManager
0131: .requestLock(lid1, cid1, tid1, LockLevel.WRITE, sink); // hold greedy
0132: lockManager
0133: .requestLock(lid1, cid2, tid1, LockLevel.WRITE, sink); // pending
0134:
0135: lockManager.requestLock(lid2, cid1, tid1, LockLevel.READ, sink); // hold greedy
0136: lockManager.requestLock(lid2, cid2, tid1, LockLevel.READ, sink); // hold greedy
0137: lockManager
0138: .requestLock(lid2, cid3, tid1, LockLevel.WRITE, sink); // pending
0139:
0140: lockManager
0141: .requestLock(lid3, cid1, tid1, LockLevel.WRITE, sink); // hold greedy
0142:
0143: LockMBean[] lockBeans = lockManager.getAllLocks();
0144: assertEquals(3, lockBeans.length);
0145: sortLocksByID(lockBeans);
0146:
0147: LockMBean bean1 = lockBeans[0];
0148: LockMBean bean2 = lockBeans[1];
0149: LockMBean bean3 = lockBeans[2];
0150: testSerialize(bean1);
0151: testSerialize(bean2);
0152: testSerialize(bean3);
0153:
0154: validateBean1(bean1, start);
0155: validateBean2(bean2, start);
0156: validateBean3(bean3, start, wait);
0157:
0158: lockManager.clearAllLocksFor(cid1);
0159: lockManager.clearAllLocksFor(cid2);
0160: lockManager.clearAllLocksFor(cid3);
0161: }
0162:
0163: private void validateBean3(LockMBean bean3, long time,
0164: WaitInvocation wait) {
0165: LockHolder[] holders = bean3.getHolders();
0166: ServerLockRequest[] reqs = bean3.getPendingRequests();
0167: Waiter[] waiters = bean3.getWaiters();
0168: assertEquals(1, holders.length);
0169: assertEquals(0, reqs.length);
0170: assertEquals(0, waiters.length);
0171:
0172: LockHolder holder = holders[0];
0173: assertEquals(LockLevel.toString(LockLevel.WRITE), holder
0174: .getLockLevel());
0175: assertTrue(holder.getTimeAcquired() >= time);
0176: assertEquals(new ClientID(new ChannelID(1)), holder.getNodeID());
0177: assertEquals("127.0.0.1:6969", holder.getChannelAddr());
0178: assertEquals(ThreadID.VM_ID, holder.getThreadID());
0179:
0180: }
0181:
0182: private void validateBean2(LockMBean bean2, long time) {
0183: LockHolder[] holders = bean2.getHolders();
0184: ServerLockRequest[] reqs = bean2.getPendingRequests();
0185: Waiter[] waiters = bean2.getWaiters();
0186: assertEquals(2, holders.length);
0187: assertEquals(1, reqs.length);
0188: assertEquals(0, waiters.length);
0189:
0190: LockHolder holder = holders[0];
0191: assertEquals(LockLevel.toString(LockLevel.READ), holder
0192: .getLockLevel());
0193: assertTrue(holder.getTimeAcquired() >= time);
0194: if ((new ClientID(new ChannelID(1))).equals(holder.getNodeID())) {
0195: assertEquals("127.0.0.1:6969", holder.getChannelAddr());
0196: } else if ((new ClientID(new ChannelID(2))).equals(holder
0197: .getNodeID())) {
0198: assertEquals("no longer connected", holder.getChannelAddr());
0199: } else {
0200: fail("Invalid Channel number ! " + holder.getNodeID());
0201: }
0202: assertEquals(ThreadID.VM_ID, holder.getThreadID());
0203:
0204: holder = holders[1];
0205: assertEquals(LockLevel.toString(LockLevel.READ), holder
0206: .getLockLevel());
0207: assertTrue(holder.getTimeAcquired() >= time);
0208: if ((new ClientID(new ChannelID(1))).equals(holder.getNodeID())) {
0209: assertEquals("127.0.0.1:6969", holder.getChannelAddr());
0210: } else if ((new ClientID(new ChannelID(2))).equals(holder
0211: .getNodeID())) {
0212: assertEquals("no longer connected", holder.getChannelAddr());
0213: } else {
0214: fail("Invalid Channel number ! " + holder.getNodeID());
0215: }
0216: assertEquals(ThreadID.VM_ID, holder.getThreadID());
0217:
0218: ServerLockRequest req = reqs[0];
0219: assertEquals(LockLevel.toString(LockLevel.WRITE), req
0220: .getLockLevel());
0221: assertTrue(req.getRequestTime() >= time);
0222: assertEquals(new ClientID(new ChannelID(3)), req.getNodeID());
0223: assertEquals("no longer connected", req.getChannelAddr());
0224: assertEquals(new ThreadID(1), req.getThreadID());
0225: }
0226:
0227: private void validateBean1(LockMBean bean1, long time) {
0228: LockHolder[] holders = bean1.getHolders();
0229: ServerLockRequest[] reqs = bean1.getPendingRequests();
0230: Waiter[] waiters = bean1.getWaiters();
0231: assertEquals(1, holders.length);
0232: assertEquals(1, reqs.length);
0233: assertEquals(0, waiters.length);
0234:
0235: LockHolder holder = holders[0];
0236: assertEquals(LockLevel.toString(LockLevel.WRITE), holder
0237: .getLockLevel());
0238: assertTrue(holder.getTimeAcquired() >= time);
0239: assertEquals(new ClientID(new ChannelID(1)), holder.getNodeID());
0240: assertEquals(ThreadID.VM_ID, holder.getThreadID());
0241: assertEquals("127.0.0.1:6969", holder.getChannelAddr());
0242:
0243: ServerLockRequest req = reqs[0];
0244: assertEquals(LockLevel.toString(LockLevel.WRITE), req
0245: .getLockLevel());
0246: assertTrue(req.getRequestTime() >= time);
0247: assertEquals(new ClientID(new ChannelID(2)), req.getNodeID());
0248: assertEquals("no longer connected", req.getChannelAddr());
0249: assertEquals(new ThreadID(1), req.getThreadID());
0250: }
0251:
0252: private void testSerialize(Object o) throws IOException {
0253: ObjectOutputStream oos = new ObjectOutputStream(
0254: new NullOutputStream());
0255: oos.writeObject(o);
0256: oos.close();
0257: }
0258:
0259: private void sortLocksByID(LockMBean[] lockBeans) {
0260: Arrays.sort(lockBeans, new Comparator() {
0261: public int compare(Object o1, Object o2) {
0262: LockMBean l1 = (LockMBean) o1;
0263: LockMBean l2 = (LockMBean) o2;
0264:
0265: String id1 = l1.getLockName();
0266: String id2 = l2.getLockName();
0267:
0268: return id1.compareTo(id2);
0269: }
0270: });
0271: }
0272:
0273: public void testReestablishWait() throws Exception {
0274: LockID lockID1 = new LockID("my lock");
0275: ClientID cid1 = new ClientID(new ChannelID(1));
0276: ThreadID tx1 = new ThreadID(1);
0277: ThreadID tx2 = new ThreadID(2);
0278:
0279: try {
0280: assertEquals(0, lockManager.getLockCount());
0281: long waitTime = 1000;
0282: WaitInvocation waitCall1 = new WaitInvocation(waitTime);
0283: WaitInvocation waitCall2 = new WaitInvocation(waitTime * 2);
0284: TestSink responseSink = new TestSink();
0285: long t0 = System.currentTimeMillis();
0286: lockManager.reestablishWait(lockID1, cid1, tx1,
0287: LockLevel.WRITE, waitCall1, responseSink);
0288: lockManager.reestablishWait(lockID1, cid1, tx2,
0289: LockLevel.WRITE, waitCall2, responseSink);
0290: lockManager.start();
0291:
0292: // Wait timeout
0293: LockResponseContext ctxt = (LockResponseContext) responseSink
0294: .take();
0295: assertTrue(System.currentTimeMillis() - t0 >= waitTime);
0296: assertTrue(ctxt.isLockWaitTimeout());
0297: assertResponseContext(lockID1, cid1, tx1, LockLevel.WRITE,
0298: ctxt);
0299:
0300: // Award - but should not give it as Greedy
0301: LockResponseContext ctxt1 = (LockResponseContext) responseSink
0302: .take();
0303: LockResponseContext ctxt2 = (LockResponseContext) responseSink
0304: .take();
0305: assertTrue(System.currentTimeMillis() - t0 >= waitTime);
0306: assertTrue((ctxt1.isLockAward() && ctxt2
0307: .isLockWaitTimeout())
0308: || (ctxt2.isLockAward() && ctxt1
0309: .isLockWaitTimeout()));
0310:
0311: if (ctxt1.isLockAward()) {
0312: assertAwardNotGreedy(ctxt1, lockID1, tx1);
0313: } else if (ctxt2.isLockAward()) {
0314: assertAwardNotGreedy(ctxt2, lockID1, tx1);
0315: }
0316:
0317: lockManager.unlock(lockID1, cid1, tx1);
0318:
0319: // Award - Greedy
0320: ctxt = (LockResponseContext) responseSink.take();
0321: assertAwardGreedy(ctxt, lockID1);
0322:
0323: assertTrue(responseSink.waitForAdd(waitTime * 3) == null);
0324:
0325: } finally {
0326: lockManager = null;
0327: resetLockManager();
0328: }
0329: }
0330:
0331: private void assertAwardNotGreedy(LockResponseContext ctxt,
0332: LockID lockID1, ThreadID tx1) {
0333: assertTrue(ctxt != null);
0334: assertTrue(ctxt.isLockAward());
0335: assertTrue(ctxt.getThreadID().equals(tx1));
0336: assertTrue(ctxt.getLockID().equals(lockID1));
0337: assertTrue(!LockLevel.isGreedy(ctxt.getLockLevel()));
0338: }
0339:
0340: private void assertAwardGreedy(LockResponseContext ctxt,
0341: LockID lockID1) {
0342: assertTrue(ctxt != null);
0343: assertTrue(ctxt.isLockAward());
0344: assertTrue(ctxt.getThreadID().equals(ThreadID.VM_ID));
0345: assertTrue(ctxt.getLockID().equals(lockID1));
0346: assertTrue(LockLevel.isGreedy(ctxt.getLockLevel()));
0347:
0348: }
0349:
0350: public void testReestablishLockAfterReestablishWait()
0351: throws Exception {
0352: LockID lockID1 = new LockID("my lock");
0353: ClientID cid1 = new ClientID(new ChannelID(1));
0354: ThreadID tx1 = new ThreadID(1);
0355: ThreadID tx2 = new ThreadID(2);
0356: int requestedLevel = LockLevel.WRITE;
0357: WaitInvocation waitCall = new WaitInvocation();
0358: try {
0359: TestSink responseSink = new TestSink();
0360: assertEquals(0, lockManager.getLockCount());
0361: lockManager.reestablishWait(lockID1, cid1, tx1,
0362: LockLevel.WRITE, waitCall, responseSink);
0363: assertEquals(1, lockManager.getLockCount());
0364: assertEquals(0, responseSink.getInternalQueue().size());
0365:
0366: // now try to award the lock to the same client-transaction
0367: try {
0368: lockManager.reestablishLock(lockID1, cid1, tx1,
0369: requestedLevel, responseSink);
0370: fail("Should have thrown an AssertionError.");
0371: } catch (AssertionError e) {
0372: // expected
0373: }
0374: // now try to reestablish the same lock from a different transaction. It
0375: // sould succeed
0376: assertEquals(1, lockManager.getLockCount());
0377: lockManager.reestablishLock(lockID1, cid1, tx2,
0378: requestedLevel, responseSink);
0379: } finally {
0380: lockManager = null;
0381: resetLockManager();
0382: }
0383: }
0384:
0385: public void testReestablishReadLock() throws Exception {
0386: LockID lockID1 = new LockID("my lock");
0387: ClientID cid1 = new ClientID(new ChannelID(1));
0388: ThreadID tx1 = new ThreadID(1);
0389: ThreadID tx2 = new ThreadID(2);
0390: ThreadID tx3 = new ThreadID(3);
0391: int requestedLevel = LockLevel.READ;
0392:
0393: try {
0394: TestSink responseSink = new TestSink();
0395: assertEquals(0, lockManager.getLockCount());
0396:
0397: lockManager.reestablishLock(lockID1, cid1, tx1,
0398: requestedLevel, responseSink);
0399: assertEquals(1, lockManager.getLockCount());
0400:
0401: // now reestablish the same read lock in another transaction. It should
0402: // succeed.
0403: responseSink = new TestSink();
0404: lockManager.reestablishLock(lockID1, cid1, tx2,
0405: requestedLevel, responseSink);
0406: assertEquals(1, lockManager.getLockCount());
0407:
0408: // now reestablish the the same write lock. It should fail.
0409: responseSink = new TestSink();
0410: try {
0411: lockManager.reestablishLock(lockID1, cid1, tx3,
0412: LockLevel.WRITE, responseSink);
0413: fail("Should have thrown a LockManagerError.");
0414: } catch (AssertionError e) {
0415: //
0416: }
0417:
0418: } finally {
0419: // this needs to be done for tearDown() to pass.
0420: lockManager = null;
0421: resetLockManager();
0422: }
0423:
0424: try {
0425: TestSink responseSink = new TestSink();
0426: assertEquals(0, lockManager.getLockCount());
0427: lockManager.reestablishLock(lockID1, cid1, tx1,
0428: LockLevel.WRITE, responseSink);
0429: assertEquals(1, lockManager.getLockCount());
0430:
0431: // now reestablish a read lock. This should fail.
0432: responseSink = new TestSink();
0433: try {
0434: lockManager.reestablishLock(lockID1, cid1, tx2,
0435: LockLevel.READ, responseSink);
0436: fail("Should have thrown a LockManagerError");
0437: } catch (Error e) {
0438: //
0439: }
0440:
0441: } finally {
0442: lockManager = null;
0443: resetLockManager();
0444: }
0445: }
0446:
0447: public void testReestablishWriteLock() throws Exception {
0448:
0449: LockID lockID1 = new LockID("my lock");
0450: LockID lockID2 = new LockID("my other lock");
0451: ClientID cid1 = new ClientID(new ChannelID(1));
0452: ClientID cid2 = new ClientID(new ChannelID(2));
0453: ThreadID tx1 = new ThreadID(1);
0454: ThreadID tx2 = new ThreadID(2);
0455: int requestedLevel = LockLevel.WRITE;
0456:
0457: try {
0458: TestSink responseSink = new TestSink();
0459: assertEquals(0, lockManager.getLockCount());
0460: lockManager.reestablishLock(lockID1, cid1, tx1,
0461: requestedLevel, responseSink);
0462: assertEquals(1, lockManager.getLockCount());
0463:
0464: try {
0465: lockManager.reestablishLock(lockID1, cid2, tx2,
0466: requestedLevel, responseSink);
0467: fail("Expected a LockManagerError!");
0468: } catch (AssertionError e) {
0469: //
0470: }
0471:
0472: // try to reestablish another lock. It should succeed.
0473: lockManager.reestablishLock(lockID2, cid1, tx1,
0474: requestedLevel, responseSink);
0475:
0476: lockManager.start();
0477: // you shouldn't be able to call reestablishLock after the lock manager
0478: // has started.
0479: try {
0480: lockManager.reestablishLock(lockID1, cid1, tx1,
0481: requestedLevel, null);
0482: fail("Should have thrown a LockManagerError");
0483: } catch (Error e) {
0484: //
0485: }
0486:
0487: } finally {
0488: // this needs to be done for tearDown() to pass.
0489: lockManager = null;
0490: resetLockManager();
0491: }
0492: }
0493:
0494: // private void assertResponseSink(LockID lockID, ChannelID channel, TransactionID tx, int requestedLevel,
0495: // TestSink responseSink) {
0496: // assertEquals(1, responseSink.getInternalQueue().size());
0497: // LockResponseContext ctxt = (LockResponseContext) responseSink.getInternalQueue().get(0);
0498: // assertResponseContext(lockID, channel, tx, requestedLevel, ctxt);
0499: // }
0500:
0501: private void assertResponseContext(LockID lockID, NodeID nid,
0502: ThreadID tx1, int requestedLevel, LockResponseContext ctxt) {
0503: assertEquals(lockID, ctxt.getLockID());
0504: assertEquals(nid, ctxt.getNodeID());
0505: assertEquals(tx1, ctxt.getThreadID());
0506: assertEquals(requestedLevel, ctxt.getLockLevel());
0507: }
0508:
0509: public void testWaitTimeoutsIgnoredDuringStartup() throws Exception {
0510: LockID lockID = new LockID("my lcok");
0511: ClientID cid1 = new ClientID(new ChannelID(1));
0512: ThreadID tx1 = new ThreadID(1);
0513: try {
0514: long waitTime = 1000;
0515: WaitInvocation waitInvocation = new WaitInvocation(waitTime);
0516: TestSink responseSink = new TestSink();
0517: lockManager.reestablishWait(lockID, cid1, tx1,
0518: LockLevel.WRITE, waitInvocation, responseSink);
0519:
0520: LockResponseContext ctxt = (LockResponseContext) responseSink
0521: .waitForAdd(waitTime * 2);
0522: assertNull(ctxt);
0523:
0524: lockManager.start();
0525: ctxt = (LockResponseContext) responseSink.waitForAdd(0);
0526: assertNotNull(ctxt);
0527: } finally {
0528: lockManager = null;
0529: resetLockManager();
0530: }
0531: }
0532:
0533: public void testOffDoesNotBlockUntilNoOutstandingLocksViaUnlock()
0534: throws Exception {
0535: List queue = sink.getInternalQueue();
0536: ClientID cid1 = new ClientID(new ChannelID(1));
0537: LockID lock1 = new LockID("1");
0538: ThreadID tx1 = new ThreadID(1);
0539:
0540: final LinkedQueue shutdownSteps = new LinkedQueue();
0541: ShutdownThread shutdown = new ShutdownThread(shutdownSteps);
0542: try {
0543: lockManager.start();
0544: lockManager.requestLock(lock1, cid1, tx1, LockLevel.WRITE,
0545: sink);
0546: assertEquals(1, queue.size());
0547:
0548: shutdown.start();
0549: shutdownSteps.take();
0550: ThreadUtil.reallySleep(1000);
0551: shutdownSteps.take();
0552: } finally {
0553: lockManager = null;
0554: resetLockManager();
0555: }
0556: }
0557:
0558: public void testOffStopsGrantingNewLocks() throws Exception {
0559: List queue = sink.getInternalQueue();
0560: ClientID cid = new ClientID(new ChannelID(1));
0561: LockID lockID = new LockID("1");
0562: ThreadID txID = new ThreadID(1);
0563: try {
0564: // Test that the normal case works as expected...
0565: lockManager.start();
0566: lockManager.requestLock(lockID, cid, txID, LockLevel.WRITE,
0567: sink);
0568: assertEquals(1, queue.size());
0569: assertAwardGreedy((LockResponseContext) queue.get(0),
0570: lockID);
0571: queue.clear();
0572: lockManager.unlock(lockID, cid, ThreadID.VM_ID);
0573:
0574: lockManager.requestLock(lockID, cid, txID, LockLevel.WRITE,
0575: sink);
0576: assertEquals(1, queue.size());
0577: assertAwardGreedy((LockResponseContext) queue.get(0),
0578: lockID);
0579: queue.clear();
0580: lockManager.unlock(lockID, cid, ThreadID.VM_ID);
0581:
0582: // Call shutdown and make sure that the lock isn't granted via the
0583: // "requestLock" method
0584: queue.clear();
0585: lockManager.stop();
0586: lockManager.requestLock(lockID, cid, txID, LockLevel.WRITE,
0587: sink);
0588: assertEquals(0, queue.size());
0589: } finally {
0590: lockManager.clearAllLocksFor(cid);
0591: }
0592: }
0593:
0594: public void testRequestDoesntGrantPendingLocks() throws Exception {
0595: List queue = sink.getInternalQueue();
0596: ClientID cid = new ClientID(new ChannelID(1));
0597: LockID lockID = new LockID("1");
0598: ThreadID txID = new ThreadID(1);
0599:
0600: try {
0601: lockManager.start();
0602: // now try stacking locks and make sure that calling unlock doesn't grant
0603: // the pending locks but instead a recall is issued
0604: lockManager.requestLock(lockID, cid, txID, LockLevel.WRITE,
0605: sink);
0606: queue.clear();
0607: lockManager.requestLock(lockID, new ClientID(new ChannelID(
0608: 2)), new ThreadID(2), LockLevel.WRITE, sink);
0609: // the second lock should be pending but a recall should be issued.
0610: assertEquals(1, queue.size());
0611: LockResponseContext lrc = (LockResponseContext) sink.take();
0612: assertTrue(lrc.isLockRecall());
0613: assertEquals(lockID, lrc.getLockID());
0614: assertEquals(cid, lrc.getNodeID());
0615: assertEquals(ThreadID.VM_ID, lrc.getThreadID());
0616: assertEquals(LockLevel.WRITE, lrc.getLockLevel());
0617: } finally {
0618: lockManager = null;
0619: resetLockManager();
0620: }
0621: }
0622:
0623: public void testUnlockIgnoredDuringShutdown() throws Exception {
0624: List queue = sink.getInternalQueue();
0625: ClientID cid = new ClientID(new ChannelID(1));
0626: LockID lockID = new LockID("1");
0627: ThreadID txID = new ThreadID(1);
0628: try {
0629: lockManager.start();
0630: // now try stacking locks and make sure that calling unlock doesn't grant
0631: // the pending locks but instead a recall is issued
0632: lockManager.requestLock(lockID, cid, txID, LockLevel.WRITE,
0633: sink);
0634: queue.clear();
0635: lockManager.requestLock(lockID, new ClientID(new ChannelID(
0636: 2)), new ThreadID(2), LockLevel.WRITE, sink);
0637: // the second lock should be pending but a recall should be issued.
0638: assertEquals(1, queue.size());
0639: LockResponseContext lrc = (LockResponseContext) sink.take();
0640: assertTrue(lrc.isLockRecall());
0641: assertEquals(lockID, lrc.getLockID());
0642: assertEquals(cid, lrc.getNodeID());
0643: assertEquals(ThreadID.VM_ID, lrc.getThreadID());
0644: assertEquals(LockLevel.WRITE, lrc.getLockLevel());
0645:
0646: assertEquals(0, queue.size());
0647:
0648: lockManager.stop();
0649:
0650: // unlock the first lock
0651: lockManager.unlock(lockID, cid, txID);
0652: // the second lock should still be pending
0653: assertEquals(0, queue.size());
0654:
0655: } finally {
0656: lockManager = null;
0657: resetLockManager();
0658: }
0659: }
0660:
0661: public void testDeadLock1() {
0662: // behavior changed ...
0663: if (true)
0664: return;
0665:
0666: // A simple deadlock. Thread 1 holds lock1, wants lock2. Thread2 holds
0667: // lock2, wants lock1
0668:
0669: LockID l1 = new LockID("1");
0670: LockID l2 = new LockID("2");
0671: ClientID c1 = new ClientID(new ChannelID(1));
0672:
0673: ThreadID s1 = new ThreadID(1);
0674: ThreadID s2 = new ThreadID(2);
0675:
0676: ServerThreadID thread1 = new ServerThreadID(c1, s1);
0677: ServerThreadID thread2 = new ServerThreadID(c1, s2);
0678:
0679: lockManager.start();
0680: // thread1 gets lock1
0681: lockManager.requestLock(l1, c1, s1, LockLevel.WRITE, sink);
0682: // thread2 gets lock2
0683: lockManager.requestLock(l2, c1, s2, LockLevel.WRITE, sink);
0684: // thread1 trys to get lock2 (blocks)
0685: lockManager.requestLock(l2, c1, s1, LockLevel.WRITE, sink);
0686: // thread2 trys to get lock1 (blocks)
0687: lockManager.requestLock(l1, c1, s2, LockLevel.WRITE, sink);
0688:
0689: TestDeadlockResults deadlocks = new TestDeadlockResults();
0690: lockManager.scanForDeadlocks(deadlocks);
0691:
0692: assertEquals(1, deadlocks.chains.size());
0693: Map check = new HashMap();
0694: check.put(thread1, l2);
0695: check.put(thread2, l1);
0696: assertSpecificDeadlock((DeadlockChain) deadlocks.chains.get(0),
0697: check);
0698:
0699: // test the mgmt interface too
0700: DeadlockChain[] results = lockManager.scanForDeadlocks();
0701: assertEquals(1, results.length);
0702: check = new HashMap();
0703: check.put(thread1, l2);
0704: check.put(thread2, l1);
0705: assertSpecificDeadlock(results[0], check);
0706:
0707: lockManager.clearAllLocksFor(c1);
0708: }
0709:
0710: public void testDeadLock3() {
0711: // behavior changed ...
0712: if (true)
0713: return;
0714:
0715: // test that includes locks with more than 1 holder
0716:
0717: // contended locks
0718: LockID l1 = new LockID("1");
0719: LockID l2 = new LockID("2");
0720:
0721: // uncontended read locks
0722: LockID l3 = new LockID("3");
0723: LockID l4 = new LockID("4");
0724: LockID l5 = new LockID("5");
0725:
0726: ClientID c1 = new ClientID(new ChannelID(1));
0727: ThreadID s1 = new ThreadID(1);
0728: ThreadID s2 = new ThreadID(2);
0729:
0730: ServerThreadID thread1 = new ServerThreadID(c1, s1);
0731: ServerThreadID thread2 = new ServerThreadID(c1, s2);
0732:
0733: lockManager.start();
0734:
0735: // thread1 holds all three read locks, thread2 has 2 of them
0736: lockManager.requestLock(l3, c1, s1, LockLevel.READ, sink);
0737: lockManager.requestLock(l4, c1, s1, LockLevel.READ, sink);
0738: lockManager.requestLock(l5, c1, s1, LockLevel.READ, sink);
0739: lockManager.requestLock(l3, c1, s2, LockLevel.READ, sink);
0740: lockManager.requestLock(l4, c1, s2, LockLevel.READ, sink);
0741:
0742: // thread1 gets lock1
0743: lockManager.requestLock(l1, c1, s1, LockLevel.WRITE, sink);
0744: // thread2 gets lock2
0745: lockManager.requestLock(l2, c1, s2, LockLevel.WRITE, sink);
0746: // thread1 trys to get lock2 (blocks)
0747: lockManager.requestLock(l2, c1, s1, LockLevel.WRITE, sink);
0748: // thread2 trys to get lock1 (blocks)
0749: lockManager.requestLock(l1, c1, s2, LockLevel.WRITE, sink);
0750:
0751: TestDeadlockResults deadlocks = new TestDeadlockResults();
0752: lockManager.scanForDeadlocks(deadlocks);
0753:
0754: assertEquals(1, deadlocks.chains.size());
0755: Map check = new HashMap();
0756: check.put(thread1, l2);
0757: check.put(thread2, l1);
0758: assertSpecificDeadlock((DeadlockChain) deadlocks.chains.get(0),
0759: check);
0760:
0761: lockManager.clearAllLocksFor(c1);
0762: }
0763:
0764: public void testLackOfDeadlock() throws InterruptedException {
0765: lockManager.start();
0766: for (int i = 0; i < 50; i++) {
0767: internalTestLackofDeadlock(false);
0768: resetLockManager(true);
0769: internalTestLackofDeadlock(true);
0770: resetLockManager(true);
0771: }
0772: }
0773:
0774: private void internalTestLackofDeadlock(boolean useRealThreads)
0775: throws InterruptedException {
0776: List threads = new ArrayList();
0777:
0778: for (int t = 0; t < numThreads; t++) {
0779: NodeID cid = txns[t].getNodeID();
0780: ThreadID tid = txns[t].getClientThreadID();
0781:
0782: RandomRequest req = new RandomRequest(cid, tid);
0783: if (useRealThreads) {
0784: Thread thread = new Thread(req);
0785: thread.start();
0786: threads.add(thread);
0787: } else {
0788: req.run();
0789: }
0790: }
0791:
0792: if (useRealThreads) {
0793: for (Iterator iter = threads.iterator(); iter.hasNext();) {
0794: Thread t = (Thread) iter.next();
0795: t.join();
0796: }
0797: }
0798:
0799: TestDeadlockResults results = new TestDeadlockResults();
0800: lockManager.scanForDeadlocks(results);
0801:
0802: assertEquals(0, results.chains.size());
0803:
0804: for (int i = 0; i < txns.length; i++) {
0805: lockManager.clearAllLocksFor(txns[i].getNodeID());
0806: }
0807: }
0808:
0809: private class RandomRequest implements Runnable {
0810: private final NodeID cid;
0811: private final ThreadID tid;
0812:
0813: public RandomRequest(NodeID cid, ThreadID tid) {
0814: this .cid = cid;
0815: this .tid = tid;
0816: }
0817:
0818: public void run() {
0819: final int start = random.nextInt(numLocks);
0820: final int howMany = random.nextInt(numLocks - start);
0821:
0822: for (int i = 0; i < howMany; i++) {
0823: LockID lock = locks[start + i];
0824: boolean read = random.nextInt(10) < 8; // 80% reads
0825: int level = read ? LockLevel.READ : LockLevel.WRITE;
0826: boolean granted = lockManager.requestLock(lock, cid,
0827: tid, level, sink);
0828: if (!granted) {
0829: break;
0830: }
0831: }
0832: }
0833: }
0834:
0835: private ServerThreadID[] makeUniqueTxns(int num) {
0836: ServerThreadID[] rv = new ServerThreadID[num];
0837: for (int i = 0; i < num; i++) {
0838: rv[i] = new ServerThreadID(new ClientID(new ChannelID(i)),
0839: new ThreadID(i));
0840: }
0841: return rv;
0842: }
0843:
0844: private LockID[] makeUniqueLocks(int num) {
0845: LockID[] rv = new LockID[num];
0846: for (int i = 0; i < num; i++) {
0847: rv[i] = new LockID("lock-" + i);
0848: }
0849:
0850: return rv;
0851: }
0852:
0853: private void assertSpecificDeadlock(DeadlockChain chain, Map check) {
0854: DeadlockChain start = chain;
0855: do {
0856: LockID lock = (LockID) check.remove(chain.getWaiter());
0857: assertEquals(lock, chain.getWaitingOn());
0858: chain = chain.getNextLink();
0859: } while (chain != start);
0860:
0861: assertEquals(0, check.size());
0862: }
0863:
0864: public void testDeadLock2() {
0865: // behavior changed ...
0866: if (true)
0867: return;
0868:
0869: // A slightly more complicated deadlock:
0870: // -- Thread1 holds lock1, wants lock2
0871: // -- Thread2 holds lock2, wants lock3
0872: // -- Thread3 holds lock3, wants lock1
0873:
0874: LockID l1 = new LockID("L1");
0875: LockID l2 = new LockID("L2");
0876: LockID l3 = new LockID("L3");
0877: ClientID c0 = new ClientID(new ChannelID(0));
0878: ThreadID s1 = new ThreadID(1);
0879: ThreadID s2 = new ThreadID(2);
0880: ThreadID s3 = new ThreadID(3);
0881:
0882: ServerThreadID thread1 = new ServerThreadID(c0, s1);
0883: ServerThreadID thread2 = new ServerThreadID(c0, s2);
0884: ServerThreadID thread3 = new ServerThreadID(c0, s3);
0885:
0886: lockManager.start();
0887:
0888: // thread1 gets lock1
0889: lockManager.requestLock(l1, c0, s1, LockLevel.WRITE, sink);
0890: // thread2 gets lock2
0891: lockManager.requestLock(l2, c0, s2, LockLevel.WRITE, sink);
0892: // thread3 gets lock3
0893: lockManager.requestLock(l3, c0, s3, LockLevel.WRITE, sink);
0894:
0895: // thread1 trys to get lock2 (blocks)
0896: lockManager.requestLock(l2, c0, s1, LockLevel.WRITE, sink);
0897: // thread2 trys to get lock3 (blocks)
0898: lockManager.requestLock(l3, c0, s2, LockLevel.WRITE, sink);
0899: // thread3 trys to get lock1 (blocks)
0900: lockManager.requestLock(l1, c0, s3, LockLevel.WRITE, sink);
0901:
0902: TestDeadlockResults deadlocks = new TestDeadlockResults();
0903: lockManager.scanForDeadlocks(deadlocks);
0904:
0905: assertEquals(1, deadlocks.chains.size());
0906:
0907: Map check = new HashMap();
0908: check.put(thread1, l2);
0909: check.put(thread2, l3);
0910: check.put(thread3, l1);
0911: assertSpecificDeadlock((DeadlockChain) deadlocks.chains.get(0),
0912: check);
0913:
0914: lockManager.clearAllLocksFor(c0);
0915: }
0916:
0917: private class ShutdownThread extends Thread {
0918: private final LinkedQueue shutdownSteps;
0919:
0920: private ShutdownThread(LinkedQueue shutdownSteps) {
0921: this .shutdownSteps = shutdownSteps;
0922: }
0923:
0924: public void run() {
0925: try {
0926: shutdownSteps.put(new Object());
0927: lockManager.stop();
0928: shutdownSteps.put(new Object());
0929: } catch (Exception e) {
0930: e.printStackTrace();
0931: fail();
0932: }
0933: }
0934: }
0935:
0936: private static class TestMessageChannel implements MessageChannel {
0937:
0938: public TCSocketAddress getLocalAddress() {
0939: throw new ImplementMe();
0940: }
0941:
0942: public TCSocketAddress getRemoteAddress() {
0943: try {
0944: return new TCSocketAddress("127.0.0.1", 6969);
0945: } catch (UnknownHostException e) {
0946: throw new RuntimeException(e);
0947: }
0948: }
0949:
0950: public void addListener(ChannelEventListener listener) {
0951: throw new ImplementMe();
0952: }
0953:
0954: public ChannelID getChannelID() {
0955: throw new ImplementMe();
0956: }
0957:
0958: public boolean isOpen() {
0959: throw new ImplementMe();
0960: }
0961:
0962: public boolean isClosed() {
0963: throw new ImplementMe();
0964: }
0965:
0966: public TCMessage createMessage(TCMessageType type) {
0967: throw new ImplementMe();
0968: }
0969:
0970: public Object getAttachment(String key) {
0971: throw new ImplementMe();
0972: }
0973:
0974: public void addAttachment(String key, Object value,
0975: boolean replace) {
0976: throw new ImplementMe();
0977: }
0978:
0979: public Object removeAttachment(String key) {
0980: throw new ImplementMe();
0981: }
0982:
0983: public boolean isConnected() {
0984: throw new ImplementMe();
0985: }
0986:
0987: public void send(TCNetworkMessage message) {
0988: throw new ImplementMe();
0989: }
0990:
0991: public NetworkStackID open() {
0992: throw new ImplementMe();
0993: }
0994:
0995: public void close() {
0996: throw new ImplementMe();
0997: }
0998:
0999: }
1000:
1001: private static class TestDeadlockResults implements DeadlockResults {
1002: final List chains = new ArrayList();
1003:
1004: public void foundDeadlock(DeadlockChain chain) {
1005: chains.add(chain);
1006: }
1007: }
1008:
1009: }
|