001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.object.tx;
006:
007: import EDU.oswego.cs.dl.util.concurrent.CyclicBarrier;
008: import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
009: import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
010: import EDU.oswego.cs.dl.util.concurrent.SynchronizedRef;
011:
012: import com.tc.bytes.TCByteBuffer;
013: import com.tc.exception.ImplementMe;
014: import com.tc.exception.TCRuntimeException;
015: import com.tc.logging.TCLogger;
016: import com.tc.logging.TCLogging;
017: import com.tc.object.MockTCObject;
018: import com.tc.object.ObjectID;
019: import com.tc.object.lockmanager.api.LockID;
020: import com.tc.object.logging.NullRuntimeLogger;
021: import com.tc.object.session.NullSessionManager;
022: import com.tc.object.session.SessionID;
023: import com.tc.util.SequenceID;
024: import com.tc.util.concurrent.NoExceptionLinkedQueue;
025: import com.tc.util.concurrent.ThreadUtil;
026:
027: import java.util.ArrayList;
028: import java.util.Collection;
029: import java.util.HashMap;
030: import java.util.HashSet;
031: import java.util.Iterator;
032: import java.util.LinkedList;
033: import java.util.List;
034: import java.util.Map;
035: import java.util.Set;
036:
037: import junit.framework.TestCase;
038:
039: public class RemoteTransactionManagerTest extends TestCase {
040:
041: private static final TCLogger logger = TCLogging
042: .getLogger(RemoteTransactionManagerTest.class);
043:
044: private RemoteTransactionManagerImpl manager;
045: private TestTransactionBatchFactory batchFactory;
046: private SynchronizedInt number;
047: private SynchronizedRef error;
048: private Map threads;
049: private LinkedQueue batchSendQueue;
050: private TransactionBatchAccounting batchAccounting;
051: private LockAccounting lockAccounting;
052:
053: public void setUp() throws Exception {
054: batchFactory = new TestTransactionBatchFactory();
055: batchAccounting = new TransactionBatchAccounting();
056: lockAccounting = new LockAccounting();
057: manager = new RemoteTransactionManagerImpl(logger,
058: batchFactory, batchAccounting, lockAccounting,
059: new NullSessionManager(), new MockChannel());
060: number = new SynchronizedInt(0);
061: error = new SynchronizedRef(null);
062: threads = new HashMap();
063: batchSendQueue = new LinkedQueue();
064: }
065:
066: public void tearDown() throws Exception {
067: if (error.get() != null) {
068: Throwable t = (Throwable) error.get();
069: fail(t.getMessage());
070: }
071: for (Iterator i = batchAccounting
072: .addIncompleteTransactionIDsTo(new LinkedList())
073: .iterator(); i.hasNext();) {
074: TransactionID txID = (TransactionID) i.next();
075: manager.receivedAcknowledgement(SessionID.NULL_ID, txID);
076: }
077: batchAccounting.clear();
078: batchAccounting.stop();
079: manager.clear();
080: }
081:
082: public void testFlush() throws Exception {
083: final LockID lockID1 = new LockID("lock1");
084: manager.flush(lockID1);
085: TestClientTransaction tx1 = new TestClientTransaction();
086: tx1.txID = new TransactionID(1);
087: tx1.lockID = lockID1;
088: tx1.allLockIDs.add(lockID1);
089: tx1.txnType = TxnType.NORMAL;
090: manager.commit(tx1);
091: final NoExceptionLinkedQueue flushCalls = new NoExceptionLinkedQueue();
092: Runnable flusher = new Runnable() {
093: public void run() {
094: manager.flush(lockID1);
095: flushCalls.put(lockID1);
096: }
097: };
098:
099: new Thread(flusher).start();
100: // XXX: Figure out how to do this without a timeout.
101: int timeout = 5 * 1000;
102: System.err.println("About too wait for " + timeout + " ms.");
103: assertNull(flushCalls.poll(timeout));
104:
105: manager.receivedAcknowledgement(SessionID.NULL_ID, tx1
106: .getTransactionID());
107: assertEquals(lockID1, flushCalls.take());
108:
109: TestClientTransaction tx2 = tx1;
110: tx2.txID = new TransactionID(2);
111:
112: // make sure flush falls through if the acknowledgement is received before the flush is called.
113: manager.commit(tx1);
114: manager.receivedAcknowledgement(SessionID.NULL_ID, tx2
115: .getTransactionID());
116: new Thread(flusher).start();
117: assertEquals(lockID1, flushCalls.take());
118: }
119:
120: public void testSendAckedGlobalTransactionIDs() throws Exception {
121: assertTrue(batchSendQueue.isEmpty());
122: ClientTransaction ctx = makeTransaction();
123: CyclicBarrier barrier = new CyclicBarrier(2);
124:
125: callCommitOnThread(ctx, barrier);
126: barrier.barrier();
127: ThreadUtil.reallySleep(500);
128: TestTransactionBatch batch = (TestTransactionBatch) batchFactory.newBatchQueue
129: .poll(1);
130: assertNotNull(batch);
131: assertSame(batch, batchSendQueue.poll(1));
132: assertTrue(batchSendQueue.isEmpty());
133:
134: // fill the current batch with a bunch of transactions
135: int count = 50;
136: for (int i = 0; i < count; i++) {
137: ClientTransaction ctx1 = makeTransaction();
138: barrier = new CyclicBarrier(2);
139: callCommitOnThread(ctx1, barrier);
140: barrier.barrier();
141: ThreadUtil.reallySleep(500);
142: }
143:
144: List batches = new ArrayList();
145: TestTransactionBatch batch1;
146: while ((batch1 = (TestTransactionBatch) batchSendQueue
147: .poll(3000)) != null) {
148: System.err.println("Recd batch " + batch1);
149: batches.add(batch1);
150: }
151:
152: // acknowledge the first transaction
153: manager.receivedAcknowledgement(SessionID.NULL_ID, ctx
154: .getTransactionID());
155:
156: manager.receivedBatchAcknowledgement(batch.batchID);
157:
158: // the batch ack should have sent another batch
159: batch = (TestTransactionBatch) batchSendQueue.poll(1);
160: assertNotNull(batch);
161: assertTrue(batchSendQueue.isEmpty());
162:
163: ctx = makeTransaction();
164: barrier = new CyclicBarrier(2);
165: callCommitOnThread(ctx, barrier);
166: barrier.barrier();
167: ThreadUtil.reallySleep(500);
168:
169: // acknowledge the remaining batches so the current batch will get sent.
170: for (Iterator i = batches.iterator(); i.hasNext();) {
171: batch1 = (TestTransactionBatch) i.next();
172: manager.receivedBatchAcknowledgement(batch1.batchID);
173: }
174: manager.receivedBatchAcknowledgement(batch.batchID);
175:
176: batch = (TestTransactionBatch) batchSendQueue.poll(1);
177: assertNotNull(batch);
178: assertTrue(batchSendQueue.isEmpty());
179: }
180:
181: public void testResendOutstandingBasics() throws Exception {
182: System.err.println("Testing testResendOutstandingBasics ...");
183: final Set batchTxs = new HashSet();
184:
185: final int maxBatchesOutstanding = manager
186: .getMaxOutStandingBatches();
187: final List batches = new ArrayList();
188:
189: TestTransactionBatch batchN;
190: for (int i = 0; i < maxBatchesOutstanding; i++) {
191: makeAndCommitTransactions(batchTxs, 1);
192: batchN = (TestTransactionBatch) batchSendQueue.take();
193: System.err.println("* Recd " + batchN);
194: assertEquals(batchN, getNextNewBatch());
195: assertTrue(batchSendQueue.isEmpty());
196: batches.add(batchN);
197: assertEquals(1, batchN.transactions.size());
198: }
199:
200: final int num = 5;
201: // These txns are not gonna be sent as we already reached the max Batches outstanding count
202: makeAndCommitTransactions(batchTxs, num);
203: ThreadUtil.reallySleep(2000);
204: assertTrue(batchSendQueue.isEmpty());
205:
206: // Resend outstanding batches
207: restart(manager);
208:
209: // Make sure the batches get resent
210: for (int i = batches.size(); i > 0; i--) {
211: assertTrue(batches.contains(batchSendQueue.take()));
212: }
213: assertTrue(batchSendQueue.isEmpty());
214:
215: // ACK batch 1; next batch (batch 3) will be sent.
216: manager
217: .receivedBatchAcknowledgement(((TestTransactionBatch) batches
218: .get(0)).batchID);
219: while ((batchN = (TestTransactionBatch) batchSendQueue
220: .poll(3000)) != null) {
221: System.err.println("** Recd " + batchN);
222: batches.add(batchN);
223: getNextNewBatch();
224: }
225:
226: // Resend outstanding batches
227: restart(manager);
228:
229: // This time, all batches + batch 3 should get resent
230: List sent = (List) drainQueueInto(batchSendQueue,
231: new LinkedList());
232: assertEquals(batches.size(), sent.size());
233: assertTrue(sent.containsAll(batches));
234:
235: // make some new transactions that should go into next batch
236: makeAndCommitTransactions(batchTxs, num);
237:
238: // ACK all the transactions in batch1
239: Collection batch1Txs = ((TestTransactionBatch) batches.get(0))
240: .addTransactionIDsTo(new HashSet());
241: for (Iterator i = batch1Txs.iterator(); i.hasNext();) {
242: TransactionID txnId = (TransactionID) i.next();
243: batchTxs.remove(txnId);
244: manager.receivedAcknowledgement(SessionID.NULL_ID, txnId);
245: }
246: batches.remove(0);
247:
248: // resend
249: restart(manager);
250:
251: // This time, batches except batch 1 should get resent
252: sent = (List) drainQueueInto(batchSendQueue, new LinkedList());
253: assertEquals(batches.size(), sent.size());
254: assertTrue(sent.containsAll(batches));
255:
256: // ACK all other batches
257: for (Iterator i = batches.iterator(); i.hasNext();) {
258: batchN = (TestTransactionBatch) i.next();
259: manager.receivedBatchAcknowledgement(batchN.batchID);
260: }
261:
262: while ((batchN = (TestTransactionBatch) batchSendQueue
263: .poll(3000)) != null) {
264: System.err.println("*** Recd " + batchN);
265: batches.add(batchN);
266: getNextNewBatch();
267: }
268:
269: // resend
270: restart(manager);
271:
272: // This time, batches except batch 1 should get resent
273: sent = (List) drainQueueInto(batchSendQueue, new LinkedList());
274: assertEquals(batches.size(), sent.size());
275: assertTrue(sent.containsAll(batches));
276:
277: // now make sure that the manager re-sends an outstanding batch until all of
278: // its transactions have been acked.
279: while (batches.size() > 0) {
280: Collection batchNTxs = ((TestTransactionBatch) batches
281: .get(0)).addTransactionIDsTo(new HashSet());
282: for (Iterator i = batchNTxs.iterator(); i.hasNext();) {
283: TransactionID txnId = (TransactionID) i.next();
284: batchTxs.remove(txnId);
285: manager.receivedAcknowledgement(SessionID.NULL_ID,
286: txnId);
287: restart(manager);
288: sent = (List) drainQueueInto(batchSendQueue,
289: new LinkedList());
290: if (i.hasNext()) {
291: // There are still un-ACKed transactions in this batch.
292: assertEquals(batches.size(), sent.size());
293: assertTrue(batches.containsAll(sent));
294: } else {
295: // all the transactions have been ACKed, so current batch (batch 4 should be sent)
296: batches.remove(0);
297: assertEquals(batches.size(), sent.size());
298: assertTrue(batches.containsAll(sent));
299: }
300: }
301: }
302: }
303:
304: private void restart(RemoteTransactionManager manager2) {
305: manager2.pause();
306: manager2.starting();
307: manager2.resendOutstandingAndUnpause();
308:
309: }
310:
311: private void makeAndCommitTransactions(final Set created,
312: final int count) throws InterruptedException {
313: CyclicBarrier commitBarrier = new CyclicBarrier(count + 1);
314: for (int i = 0; i < count; i++) {
315: ClientTransaction tx = makeTransaction();
316: created.add(tx);
317: callCommitOnThread(tx, commitBarrier);
318: }
319: // make sure all the threads have at least started...
320: commitBarrier.barrier();
321: // sleep a little bit to make sure they get to the commit() call.
322: ThreadUtil.reallySleep(1000);
323: }
324:
325: public void testBatching() throws InterruptedException {
326:
327: System.err.println("Testing testBatching ...");
328:
329: final int maxBatchesOutstanding = manager
330: .getMaxOutStandingBatches();
331: TestTransactionBatch batchN;
332: final Set batchTxs = new HashSet();
333: final List batches = new ArrayList();
334:
335: for (int i = 0; i < maxBatchesOutstanding; i++) {
336: makeAndCommitTransactions(batchTxs, 1);
337: batchN = (TestTransactionBatch) batchSendQueue.take();
338: System.err.println("* Recd " + batchN);
339: assertEquals(batchN, getNextNewBatch());
340: assertTrue(batchSendQueue.isEmpty());
341: batches.add(batchN);
342: assertEquals(1, batchN.transactions.size());
343: assertTrue(batchTxs.containsAll(batchN.transactions));
344: }
345:
346: final int num = 10;
347:
348: // create more transactions on the client side (they should all get batched
349: // locally)
350: List batch2Txs = new ArrayList();
351: CyclicBarrier barrier = new CyclicBarrier(num + 1);
352: for (int i = 1; i <= num; i++) {
353: ClientTransaction txn = makeTransaction();
354: batch2Txs.add(txn);
355: callCommitOnThread(txn, barrier);
356: }
357: batchTxs.addAll(batch2Txs);
358:
359: barrier.barrier();
360: assertFalse(barrier.broken());
361: ThreadUtil.reallySleep(2000);
362: assertTrue(batchSendQueue.isEmpty());
363:
364: // Make sure the rest transactions get into the second batch
365: TestTransactionBatch batch2 = getNextNewBatch();
366: Collection txnsInBatch = drainQueueInto(batch2.addTxQueue,
367: new HashSet());
368: assertTrue(batch2Txs.size() == txnsInBatch.size());
369: txnsInBatch.removeAll(batch2Txs);
370: assertTrue(txnsInBatch.size() == 0);
371: assertTrue(batch2.addTxQueue.isEmpty());
372:
373: TestTransactionBatch batch1 = ((TestTransactionBatch) batches
374: .remove(0));
375:
376: // ACK one of the batch (triggers send of next batch)
377: manager.receivedBatchAcknowledgement(batch1.batchID);
378: // make sure that the batch sent is what we expected.
379: assertSame(batch2, batchSendQueue.take());
380:
381: TestTransactionBatch batch3 = getNextNewBatch();
382:
383: // ACK another batch (no more TXNs to send this time)
384: assertTrue(batchSendQueue.isEmpty());
385: manager.receivedBatchAcknowledgement(batch2.batchID);
386: assertTrue(batchSendQueue.isEmpty());
387: for (Iterator i = batches.iterator(); i.hasNext();) {
388: TestTransactionBatch b = (TestTransactionBatch) i.next();
389: manager.receivedBatchAcknowledgement(b.batchID);
390: assertTrue(batchSendQueue.isEmpty());
391: }
392:
393: for (Iterator i = batchTxs.iterator(); i.hasNext();) {
394: ClientTransaction txn = (ClientTransaction) i.next();
395: manager.receivedAcknowledgement(SessionID.NULL_ID, txn
396: .getTransactionID());
397: assertTrue(batchSendQueue.isEmpty());
398: }
399:
400: // There should still be no batch to send.
401: assertTrue(batchSendQueue.isEmpty());
402: assertTrue(drainQueueInto(batch3.addTxQueue, new LinkedList())
403: .isEmpty());
404: }
405:
406: private Collection drainQueueInto(LinkedQueue queue, Collection dest)
407: throws InterruptedException {
408: while (!queue.isEmpty()) {
409: dest.add(queue.take());
410: }
411: return dest;
412: }
413:
414: private TestTransactionBatch getNextNewBatch()
415: throws InterruptedException {
416: TestTransactionBatch rv = (TestTransactionBatch) batchFactory.newBatchQueue
417: .take();
418: return rv;
419: }
420:
421: private synchronized void callCommitOnThread(
422: final ClientTransaction txn, final CyclicBarrier barrier) {
423: TransactionID txnID = txn.getTransactionID();
424:
425: Thread t = new Thread("Commit for txn #" + txnID.toLong()) {
426: public void run() {
427: try {
428: barrier.barrier();
429: manager.commit(txn);
430: } catch (Throwable th) {
431: th.printStackTrace();
432: error.set(th);
433: }
434: }
435: };
436:
437: threads.put(txnID, t);
438: t.start();
439: }
440:
441: private ClientTransaction makeTransaction() {
442: int num = number.increment();
443: LockID lid = new LockID("lock" + num);
444: TransactionContext tc = new TransactionContextImpl(lid,
445: TxnType.NORMAL, new LockID[] { lid });
446: ClientTransaction txn = new ClientTransactionImpl(
447: new TransactionID(num), new NullRuntimeLogger());
448: txn.setTransactionContext(tc);
449: txn.fieldChanged(new MockTCObject(new ObjectID(num), this ),
450: "class", "class.field", new ObjectID(num), -1);
451: return txn;
452: }
453:
454: private final class TestTransactionBatch implements
455: ClientTransactionBatch {
456:
457: public final TxnBatchID batchID;
458:
459: public final LinkedQueue addTxQueue = new LinkedQueue();
460: private final LinkedList transactions = new LinkedList();
461:
462: public TestTransactionBatch(TxnBatchID batchID) {
463: this .batchID = batchID;
464: }
465:
466: public String toString() {
467: return "TestTransactionBatch[" + batchID + "] = Txn [ "
468: + transactions + " ]";
469: }
470:
471: public synchronized boolean isEmpty() {
472: return transactions.isEmpty();
473: }
474:
475: public int numberOfTxns() {
476: return transactions.size();
477: }
478:
479: public boolean isNull() {
480: return false;
481: }
482:
483: public synchronized void addTransaction(ClientTransaction txn) {
484: try {
485: addTxQueue.put(txn);
486: transactions.add(txn);
487: } catch (InterruptedException e) {
488: throw new TCRuntimeException(e);
489: }
490: }
491:
492: public void removeTransaction(TransactionID txID) {
493: return;
494: }
495:
496: public Collection addTransactionIDsTo(Collection c) {
497: for (Iterator i = transactions.iterator(); i.hasNext();) {
498: ClientTransaction txn = (ClientTransaction) i.next();
499: c.add(txn.getTransactionID());
500: }
501: return c;
502: }
503:
504: public void send() {
505: try {
506: batchSendQueue.put(this );
507: } catch (InterruptedException e) {
508: throw new TCRuntimeException(e);
509: }
510: }
511:
512: public TCByteBuffer[] getData() {
513: return null;
514: }
515:
516: public TxnBatchID getTransactionBatchID() {
517: return this .batchID;
518: }
519:
520: public SequenceID getMinTransactionSequence() {
521: throw new ImplementMe();
522: }
523:
524: public void recycle() {
525: return;
526: }
527:
528: public Collection addTransactionSequenceIDsTo(
529: Collection sequenceIDs) {
530: for (Iterator i = transactions.iterator(); i.hasNext();) {
531: ClientTransaction txn = (ClientTransaction) i.next();
532: sequenceIDs.add(txn.getSequenceID());
533: }
534: return sequenceIDs;
535: }
536:
537: public String dump() {
538: return "TestTransactionBatch";
539: }
540:
541: public int byteSize() {
542: return 64000;
543: }
544:
545: }
546:
547: private final class TestTransactionBatchFactory implements
548: TransactionBatchFactory {
549: private long idSequence;
550: public final LinkedQueue newBatchQueue = new LinkedQueue();
551:
552: public synchronized ClientTransactionBatch nextBatch() {
553: ClientTransactionBatch rv = new TestTransactionBatch(
554: new TxnBatchID(++idSequence));
555: try {
556: newBatchQueue.put(rv);
557: } catch (InterruptedException e) {
558: throw new TCRuntimeException(e);
559: }
560: return rv;
561: }
562: }
563:
564: }
|