001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.objectserver.tx;
006:
007: import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
008:
009: import com.tc.exception.ImplementMe;
010: import com.tc.exception.TCRuntimeException;
011: import com.tc.net.groups.ClientID;
012: import com.tc.net.groups.NodeID;
013: import com.tc.net.protocol.tcm.ChannelID;
014: import com.tc.net.protocol.tcm.MessageChannel;
015: import com.tc.object.ObjectID;
016: import com.tc.object.dmi.DmiDescriptor;
017: import com.tc.object.dna.impl.ObjectStringSerializer;
018: import com.tc.object.lockmanager.api.LockID;
019: import com.tc.object.net.ChannelStats;
020: import com.tc.object.tx.ServerTransactionID;
021: import com.tc.object.tx.TransactionID;
022: import com.tc.object.tx.TxnBatchID;
023: import com.tc.object.tx.TxnType;
024: import com.tc.objectserver.api.ObjectInstanceMonitor;
025: import com.tc.objectserver.gtx.TestGlobalTransactionManager;
026: import com.tc.objectserver.impl.ObjectInstanceMonitorImpl;
027: import com.tc.objectserver.impl.TestObjectManager;
028: import com.tc.objectserver.l1.api.TestClientStateManager;
029: import com.tc.objectserver.l1.impl.TransactionAcknowledgeAction;
030: import com.tc.objectserver.lockmanager.api.TestLockManager;
031: import com.tc.objectserver.managedobject.BackReferences;
032: import com.tc.objectserver.persistence.impl.NullPersistenceTransactionProvider;
033: import com.tc.objectserver.persistence.impl.TestTransactionStore;
034: import com.tc.stats.counter.Counter;
035: import com.tc.stats.counter.CounterImpl;
036: import com.tc.util.SequenceID;
037: import com.tc.util.concurrent.NoExceptionLinkedQueue;
038:
039: import java.util.ArrayList;
040: import java.util.Collection;
041: import java.util.Collections;
042: import java.util.HashMap;
043: import java.util.HashSet;
044: import java.util.Iterator;
045: import java.util.LinkedList;
046: import java.util.List;
047: import java.util.Map;
048: import java.util.Set;
049:
050: import junit.framework.TestCase;
051:
052: public class ServerTransactionManagerImplTest extends TestCase {
053:
054: private ServerTransactionManagerImpl transactionManager;
055: private TestTransactionAcknowledgeAction action;
056: private TestClientStateManager clientStateManager;
057: private TestLockManager lockManager;
058: private TestObjectManager objectManager;
059: private TestTransactionStore transactionStore;
060: private Counter transactionRateCounter;
061: private TestChannelStats channelStats;
062: private TestGlobalTransactionManager gtxm;
063: private ObjectInstanceMonitor imo;
064: private NullPersistenceTransactionProvider ptxp;
065:
066: protected void setUp() throws Exception {
067: super .setUp();
068: this .action = new TestTransactionAcknowledgeAction();
069: this .clientStateManager = new TestClientStateManager();
070: this .lockManager = new TestLockManager();
071: this .objectManager = new TestObjectManager();
072: this .transactionStore = new TestTransactionStore();
073: this .transactionRateCounter = new CounterImpl();
074: this .channelStats = new TestChannelStats();
075: this .gtxm = new TestGlobalTransactionManager();
076: this .imo = new ObjectInstanceMonitorImpl();
077: newTransactionManager();
078: this .ptxp = new NullPersistenceTransactionProvider();
079: }
080:
081: protected void tearDown() throws Exception {
082: super .tearDown();
083: }
084:
085: private void newTransactionManager() {
086: this .transactionManager = new ServerTransactionManagerImpl(
087: this .gtxm, this .transactionStore, this .lockManager,
088: this .clientStateManager, this .objectManager,
089: new NullTransactionalObjectManager(), this .action,
090: this .transactionRateCounter, this .channelStats,
091: new ServerTransactionManagerConfig());
092: this .transactionManager.goToActiveMode();
093: this .transactionManager.start(Collections.EMPTY_SET);
094: }
095:
096: public void testRootCreatedEvent() {
097: Map roots = new HashMap();
098: roots.put("root", new ObjectID(1));
099:
100: // first test w/o any listeners attached
101: this .transactionManager.commit(ptxp, Collections.EMPTY_SET,
102: roots, Collections.EMPTY_LIST);
103:
104: // add a listener
105: Listener listener = new Listener();
106: this .transactionManager.addRootListener(listener);
107: roots.clear();
108: roots.put("root2", new ObjectID(2));
109:
110: this .transactionManager.commit(ptxp, Collections.EMPTY_SET,
111: roots, Collections.EMPTY_LIST);
112: assertEquals(1, listener.rootsCreated.size());
113: Root root = (Root) listener.rootsCreated.remove(0);
114: assertEquals("root2", root.name);
115: assertEquals(new ObjectID(2), root.id);
116:
117: // add another listener
118: Listener listener2 = new Listener();
119: this .transactionManager.addRootListener(listener2);
120: roots.clear();
121: roots.put("root3", new ObjectID(3));
122:
123: this .transactionManager.commit(ptxp, Collections.EMPTY_SET,
124: roots, Collections.EMPTY_LIST);
125: assertEquals(1, listener.rootsCreated.size());
126: root = (Root) listener.rootsCreated.remove(0);
127: assertEquals("root3", root.name);
128: assertEquals(new ObjectID(3), root.id);
129: root = (Root) listener2.rootsCreated.remove(0);
130: assertEquals("root3", root.name);
131: assertEquals(new ObjectID(3), root.id);
132:
133: // add a listener that throws an exception
134: this .transactionManager
135: .addRootListener(new ServerTransactionManagerEventListener() {
136: public void rootCreated(String name, ObjectID id) {
137: throw new RuntimeException(
138: "This exception is supposed to be here");
139: }
140: });
141: this .transactionManager.commit(ptxp, Collections.EMPTY_SET,
142: roots, Collections.EMPTY_LIST);
143: }
144:
145: public void testAddAndRemoveTransactionListeners() throws Exception {
146: TestServerTransactionListener l1 = new TestServerTransactionListener();
147: TestServerTransactionListener l2 = new TestServerTransactionListener();
148: transactionManager.addTransactionListener(l1);
149: transactionManager.addTransactionListener(l2);
150:
151: Set txns = new HashSet();
152: ClientID cid1 = new ClientID(new ChannelID(1));
153: List dnas = Collections.unmodifiableList(new LinkedList());
154: ObjectStringSerializer serializer = null;
155: Map newRoots = Collections.unmodifiableMap(new HashMap());
156: TxnType txnType = TxnType.NORMAL;
157:
158: HashSet tids = new HashSet();
159: for (int i = 0; i < 10; i++) {
160: TransactionID tid1 = new TransactionID(i);
161: SequenceID sequenceID = new SequenceID(i);
162: LockID[] lockIDs = new LockID[0];
163: ServerTransaction tx = new ServerTransactionImpl(gtxm,
164: new TxnBatchID(1), tid1, sequenceID, lockIDs, cid1,
165: dnas, serializer, newRoots, txnType,
166: new LinkedList(), DmiDescriptor.EMPTY_ARRAY);
167: txns.add(tx);
168: tids.add(tx.getServerTransactionID());
169: }
170: doStages(cid1, txns, false);
171:
172: // check for events
173: Object o[] = (Object[]) l1.incomingContext.take();
174: assertNotNull(o);
175: o = (Object[]) l2.incomingContext.take();
176: assertNotNull(o);
177:
178: for (int i = 0; i < 10; i++) {
179: ServerTransactionID tid1 = (ServerTransactionID) l1.appliedContext
180: .take();
181: ServerTransactionID tid2 = (ServerTransactionID) l2.appliedContext
182: .take();
183: assertEquals(tid1, tid2);
184: // System.err.println("tid1 = " + tid1 + " tid2 = " + tid2 + " tids = " + tids);
185: assertTrue(tids.contains(tid1));
186: tid1 = (ServerTransactionID) l1.completedContext.take();
187: tid2 = (ServerTransactionID) l2.completedContext.take();
188: assertEquals(tid1, tid2);
189: assertTrue(tids.contains(tid1));
190: }
191:
192: // No more events
193: o = (Object[]) l1.incomingContext.poll(2000);
194: assertNull(o);
195: o = (Object[]) l2.incomingContext.poll(2000);
196: assertNull(o);
197: ServerTransactionID tid = (ServerTransactionID) l1.appliedContext
198: .poll(2000);
199: assertNull(tid);
200: tid = (ServerTransactionID) l2.appliedContext.poll(2000);
201: assertNull(tid);
202: tid = (ServerTransactionID) l1.completedContext.poll(2000);
203: assertNull(tid);
204: tid = (ServerTransactionID) l2.completedContext.poll(2000);
205: assertNull(tid);
206:
207: // unregister one
208: transactionManager.removeTransactionListener(l2);
209:
210: // more txn
211: tids.clear();
212: txns.clear();
213: for (int i = 10; i < 20; i++) {
214: TransactionID tid1 = new TransactionID(i);
215: SequenceID sequenceID = new SequenceID(i);
216: LockID[] lockIDs = new LockID[0];
217: ServerTransaction tx = new ServerTransactionImpl(gtxm,
218: new TxnBatchID(2), tid1, sequenceID, lockIDs, cid1,
219: dnas, serializer, newRoots, txnType,
220: new LinkedList(), DmiDescriptor.EMPTY_ARRAY);
221: txns.add(tx);
222: tids.add(tx.getServerTransactionID());
223: }
224: doStages(cid1, txns, false);
225:
226: // Events to only l1
227: o = (Object[]) l1.incomingContext.take();
228: assertNotNull(o);
229: o = (Object[]) l2.incomingContext.poll(2000);
230: assertNull(o);
231:
232: for (int i = 0; i < 10; i++) {
233: ServerTransactionID tid1 = (ServerTransactionID) l1.appliedContext
234: .take();
235: ServerTransactionID tid2 = (ServerTransactionID) l2.appliedContext
236: .poll(1000);
237: assertNotNull(tid1);
238: assertNull(tid2);
239: assertTrue(tids.contains(tid1));
240: tid1 = (ServerTransactionID) l1.completedContext.take();
241: tid2 = (ServerTransactionID) l2.completedContext.poll(1000);
242: assertNotNull(tid1);
243: assertNull(tid2);
244: assertTrue(tids.contains(tid1));
245: }
246: }
247:
248: /**
249: * A transaction is broadcasted to another client, the orginating client disconnects and then the broadcasted client
250: * disconnects. This test was written to illustrate a scenario where when multiple clients were disconnecting, were
251: * acks are being waited for, a concurrent modification exception was thrown.
252: */
253: public void test2ClientsDisconnectAtTheSameTime() throws Exception {
254: ClientID cid1 = new ClientID(new ChannelID(1));
255: TransactionID tid1 = new TransactionID(1);
256: TransactionID tid2 = new TransactionID(2);
257: TransactionID tid3 = new TransactionID(3);
258: ClientID cid2 = new ClientID(new ChannelID(2));
259: ClientID cid3 = new ClientID(new ChannelID(3));
260: ClientID cid4 = new ClientID(new ChannelID(4));
261: ClientID cid5 = new ClientID(new ChannelID(5));
262:
263: LockID[] lockIDs = new LockID[0];
264: List dnas = Collections.unmodifiableList(new LinkedList());
265: ObjectStringSerializer serializer = null;
266: Map newRoots = Collections.unmodifiableMap(new HashMap());
267: TxnType txnType = TxnType.NORMAL;
268: SequenceID sequenceID = new SequenceID(1);
269: ServerTransaction tx1 = new ServerTransactionImpl(gtxm,
270: new TxnBatchID(1), tid1, sequenceID, lockIDs, cid1,
271: dnas, serializer, newRoots, txnType, new LinkedList(),
272: DmiDescriptor.EMPTY_ARRAY);
273:
274: Set txns = new HashSet();
275: txns.add(tx1);
276: Set txnIDs = new HashSet();
277: txnIDs.add(new ServerTransactionID(cid1, tid1));
278: transactionManager.incomingTransactions(cid1, txnIDs, txns,
279: false);
280: transactionManager.addWaitingForAcknowledgement(cid1, tid1,
281: cid2);
282: transactionManager.addWaitingForAcknowledgement(cid1, tid1,
283: cid3);
284: transactionManager.addWaitingForAcknowledgement(cid1, tid1,
285: cid4);
286: transactionManager.addWaitingForAcknowledgement(cid1, tid1,
287: cid5);
288: doStages(cid1, txns, true);
289:
290: // Adding a few more transactions to that Transaction Records are created for everybody
291: txns.clear();
292: txnIDs.clear();
293: ServerTransaction tx2 = new ServerTransactionImpl(gtxm,
294: new TxnBatchID(2), tid2, sequenceID, lockIDs, cid2,
295: dnas, serializer, newRoots, txnType, new LinkedList(),
296: DmiDescriptor.EMPTY_ARRAY);
297: txns.add(tx2);
298: txnIDs.add(new ServerTransactionID(cid2, tid2));
299: transactionManager.incomingTransactions(cid2, txnIDs, txns,
300: false);
301:
302: transactionManager.acknowledgement(cid2, tid2, cid3);
303: doStages(cid2, txns, true);
304:
305: txns.clear();
306: txnIDs.clear();
307: ServerTransaction tx3 = new ServerTransactionImpl(gtxm,
308: new TxnBatchID(2), tid3, sequenceID, lockIDs, cid3,
309: dnas, serializer, newRoots, txnType, new LinkedList(),
310: DmiDescriptor.EMPTY_ARRAY);
311: txns.add(tx3);
312: txnIDs.add(new ServerTransactionID(cid3, tid3));
313: transactionManager.incomingTransactions(cid3, txnIDs, txns,
314: false);
315:
316: transactionManager.acknowledgement(cid3, tid3, cid4);
317: transactionManager.acknowledgement(cid3, tid3, cid2);
318: doStages(cid2, txns, true);
319:
320: assertTrue(transactionManager.isWaiting(cid1, tid1));
321:
322: transactionManager.acknowledgement(cid1, tid1, cid3);
323: assertTrue(transactionManager.isWaiting(cid1, tid1));
324: transactionManager.acknowledgement(cid1, tid1, cid4);
325: assertTrue(transactionManager.isWaiting(cid1, tid1));
326: transactionManager.acknowledgement(cid1, tid1, cid5);
327: assertTrue(transactionManager.isWaiting(cid1, tid1));
328:
329: // Client 1 disconnects
330: transactionManager.shutdownNode(cid1);
331:
332: // Still waiting for tx1
333: assertTrue(transactionManager.isWaiting(cid1, tid1));
334:
335: // Client 2 disconnects now
336: // Concurrent Modification exception used to be thrown here.
337: transactionManager.shutdownNode(cid2);
338:
339: // Not waiting for tx1 anymore
340: assertFalse(transactionManager.isWaiting(cid1, tid1));
341:
342: // Client 3 disconnects now
343: // Concurrent Modification exception used to be thrown here.
344: transactionManager.shutdownNode(cid2);
345:
346: }
347:
348: public void tests() throws Exception {
349: ClientID cid1 = new ClientID(new ChannelID(1));
350: TransactionID tid1 = new TransactionID(1);
351: TransactionID tid2 = new TransactionID(2);
352: TransactionID tid3 = new TransactionID(3);
353: TransactionID tid4 = new TransactionID(4);
354: TransactionID tid5 = new TransactionID(5);
355: TransactionID tid6 = new TransactionID(6);
356:
357: ClientID cid2 = new ClientID(new ChannelID(2));
358: ClientID cid3 = new ClientID(new ChannelID(3));
359:
360: LockID[] lockIDs = new LockID[0];
361: List dnas = Collections.unmodifiableList(new LinkedList());
362: ObjectStringSerializer serializer = null;
363: Map newRoots = Collections.unmodifiableMap(new HashMap());
364: TxnType txnType = TxnType.NORMAL;
365: SequenceID sequenceID = new SequenceID(1);
366: ServerTransaction tx1 = new ServerTransactionImpl(gtxm,
367: new TxnBatchID(1), tid1, sequenceID, lockIDs, cid1,
368: dnas, serializer, newRoots, txnType, new LinkedList(),
369: DmiDescriptor.EMPTY_ARRAY);
370:
371: // Test with one waiter
372: Set txns = new HashSet();
373: txns.add(tx1);
374: Set txnIDs = new HashSet();
375: txnIDs.add(new ServerTransactionID(cid1, tid1));
376: transactionManager.incomingTransactions(cid1, txnIDs, txns,
377: false);
378: transactionManager.addWaitingForAcknowledgement(cid1, tid1,
379: cid2);
380: assertTrue(transactionManager.isWaiting(cid1, tid1));
381: assertTrue(action.clientID == null && action.txID == null);
382: transactionManager.acknowledgement(cid1, tid1, cid2);
383: assertTrue(action.clientID == null && action.txID == null);
384: doStages(cid1, txns);
385: assertTrue(action.clientID == cid1 && action.txID == tid1);
386: assertFalse(transactionManager.isWaiting(cid1, tid1));
387:
388: // Test with 2 waiters
389: action.clear();
390: gtxm.clear();
391: txns.clear();
392: txnIDs.clear();
393: sequenceID = new SequenceID(2);
394: ServerTransaction tx2 = new ServerTransactionImpl(gtxm,
395: new TxnBatchID(2), tid2, sequenceID, lockIDs, cid1,
396: dnas, serializer, newRoots, txnType, new LinkedList(),
397: DmiDescriptor.EMPTY_ARRAY);
398: txns.add(tx2);
399: txnIDs.add(new ServerTransactionID(cid1, tid2));
400: transactionManager.incomingTransactions(cid1, txnIDs, txns,
401: false);
402:
403: transactionManager.addWaitingForAcknowledgement(cid1, tid2,
404: cid2);
405: transactionManager.addWaitingForAcknowledgement(cid1, tid2,
406: cid3);
407: assertTrue(action.clientID == null && action.txID == null);
408: assertTrue(transactionManager.isWaiting(cid1, tid2));
409: transactionManager.acknowledgement(cid1, tid2, cid2);
410: assertTrue(action.clientID == null && action.txID == null);
411: assertTrue(transactionManager.isWaiting(cid1, tid2));
412: transactionManager.acknowledgement(cid1, tid2, cid3);
413: assertTrue(action.clientID == null && action.txID == null);
414: doStages(cid1, txns);
415: assertTrue(action.clientID == cid1 && action.txID == tid2);
416: assertFalse(transactionManager.isWaiting(cid1, tid2));
417:
418: // Test shutdown client with 2 waiters
419: action.clear();
420: gtxm.clear();
421: txns.clear();
422: txnIDs.clear();
423: sequenceID = new SequenceID(3);
424: ServerTransaction tx3 = new ServerTransactionImpl(gtxm,
425: new TxnBatchID(3), tid3, sequenceID, lockIDs, cid1,
426: dnas, serializer, newRoots, txnType, new LinkedList(),
427: DmiDescriptor.EMPTY_ARRAY);
428: txns.add(tx3);
429: txnIDs.add(new ServerTransactionID(cid1, tid3));
430: transactionManager.incomingTransactions(cid1, txnIDs, txns,
431: false);
432: transactionManager.addWaitingForAcknowledgement(cid1, tid3,
433: cid2);
434: transactionManager.addWaitingForAcknowledgement(cid1, tid3,
435: cid3);
436: assertTrue(action.clientID == null && action.txID == null);
437: assertTrue(transactionManager.isWaiting(cid1, tid3));
438: transactionManager.shutdownNode(cid3);
439: assertEquals(cid3, this .clientStateManager.shutdownClient);
440: assertTrue(transactionManager.isWaiting(cid1, tid3));
441: transactionManager.acknowledgement(cid1, tid3, cid2);
442: doStages(cid1, txns);
443: assertTrue(action.clientID == cid1 && action.txID == tid3);
444: assertFalse(transactionManager.isWaiting(cid1, tid3));
445:
446: // Test shutdown client that no one is waiting for
447: action.clear();
448: gtxm.clear();
449: txns.clear();
450: txnIDs.clear();
451: clientStateManager.shutdownClient = null;
452:
453: sequenceID = new SequenceID(4);
454: ServerTransaction tx4 = new ServerTransactionImpl(gtxm,
455: new TxnBatchID(4), tid4, sequenceID, lockIDs, cid1,
456: dnas, serializer, newRoots, txnType, new LinkedList(),
457: DmiDescriptor.EMPTY_ARRAY);
458: txns.add(tx4);
459: txnIDs.add(new ServerTransactionID(cid1, tid4));
460: transactionManager.incomingTransactions(cid1, txnIDs, txns,
461: false);
462: transactionManager.addWaitingForAcknowledgement(cid1, tid4,
463: cid2);
464: transactionManager.addWaitingForAcknowledgement(cid1, tid4,
465: cid3);
466: transactionManager.shutdownNode(cid1);
467: assertTrue(action.clientID == null && action.txID == null);
468: // It should still be waiting, since we only do cleans ups on completion of all transactions.
469: assertNull(clientStateManager.shutdownClient);
470: assertTrue(transactionManager.isWaiting(cid1, tid4));
471:
472: // adding new transactions should throw an error
473: boolean failed = false;
474: try {
475: transactionManager.incomingTransactions(cid1, txnIDs, txns,
476: false);
477: failed = true;
478: } catch (Throwable t) {
479: // failed as expected.
480: }
481: if (failed) {
482: //
483: throw new Exception(
484: "Calling incomingTransaction after client shutdown didnt throw an error as excepted!!! ;(");
485: }
486: transactionManager.acknowledgement(cid1, tid4, cid2);
487: assertTrue(transactionManager.isWaiting(cid1, tid4));
488: transactionManager.acknowledgement(cid1, tid4, cid3);
489: assertFalse(transactionManager.isWaiting(cid1, tid4));
490:
491: // shutdown is not called yet since apply commit and broadcast need to complete.
492: assertNull(clientStateManager.shutdownClient);
493: List serverTids = new ArrayList();
494: serverTids.add(new ServerTransactionID(cid1, tid4));
495: transactionManager.commit(ptxp, Collections.EMPTY_SET,
496: Collections.EMPTY_MAP, serverTids);
497: assertNull(clientStateManager.shutdownClient);
498: transactionManager.broadcasted(cid1, tid4);
499: assertEquals(cid1, clientStateManager.shutdownClient);
500:
501: // Test with 2 waiters on different tx's
502: action.clear();
503: gtxm.clear();
504: txns.clear();
505: txnIDs.clear();
506: sequenceID = new SequenceID(5);
507: ServerTransaction tx5 = new ServerTransactionImpl(gtxm,
508: new TxnBatchID(5), tid5, sequenceID, lockIDs, cid1,
509: dnas, serializer, newRoots, txnType, new LinkedList(),
510: DmiDescriptor.EMPTY_ARRAY);
511: sequenceID = new SequenceID(6);
512: ServerTransaction tx6 = new ServerTransactionImpl(gtxm,
513: new TxnBatchID(5), tid6, sequenceID, lockIDs, cid1,
514: dnas, serializer, newRoots, txnType, new LinkedList(),
515: DmiDescriptor.EMPTY_ARRAY);
516: txns.add(tx5);
517: txns.add(tx6);
518: txnIDs.add(new ServerTransactionID(cid1, tid5));
519: txnIDs.add(new ServerTransactionID(cid1, tid6));
520:
521: transactionManager.incomingTransactions(cid1, txnIDs, txns,
522: false);
523: transactionManager.addWaitingForAcknowledgement(cid1, tid5,
524: cid2);
525: transactionManager.addWaitingForAcknowledgement(cid1, tid6,
526: cid2);
527:
528: assertTrue(action.clientID == null && action.txID == null);
529: assertTrue(transactionManager.isWaiting(cid1, tid5));
530: assertTrue(transactionManager.isWaiting(cid1, tid6));
531:
532: transactionManager.acknowledgement(cid1, tid5, cid2);
533: assertFalse(transactionManager.isWaiting(cid1, tid5));
534: assertTrue(transactionManager.isWaiting(cid1, tid6));
535: doStages(cid1, txns);
536: assertTrue(action.clientID == cid1 && action.txID == tid5);
537:
538: }
539:
540: private void doStages(ClientID cid1, Set txns) {
541: doStages(cid1, txns, true);
542: }
543:
544: private void doStages(ClientID cid1, Set txns, boolean skipIncoming) {
545:
546: // process stage
547: if (!skipIncoming)
548: transactionManager.incomingTransactions(cid1,
549: getServerTransactionIDs(txns), txns, false);
550:
551: for (Iterator iter = txns.iterator(); iter.hasNext();) {
552: ServerTransaction tx = (ServerTransaction) iter.next();
553:
554: // apply stage
555: transactionManager.apply(tx, Collections.EMPTY_MAP,
556: new BackReferences(), imo);
557:
558: // commit stage
559: Set committedIDs = new HashSet();
560: committedIDs.add(tx.getServerTransactionID());
561: this .transactionManager.commit(ptxp, Collections.EMPTY_SET,
562: Collections.EMPTY_MAP, committedIDs);
563:
564: // broadcast stage
565: transactionManager.broadcasted(tx.getSourceID(), tx
566: .getTransactionID());
567: }
568: }
569:
570: private Set getServerTransactionIDs(Set txns) {
571: Set s = new HashSet(txns.size());
572: for (Iterator iter = txns.iterator(); iter.hasNext();) {
573: ServerTransaction st = (ServerTransaction) iter.next();
574: s.add(st.getServerTransactionID());
575: }
576: return s;
577: }
578:
579: private static final class TestChannelStats implements ChannelStats {
580:
581: public LinkedQueue notifyTransactionContexts = new LinkedQueue();
582:
583: public Counter getCounter(MessageChannel channel, String name) {
584: throw new ImplementMe();
585: }
586:
587: public void notifyTransaction(NodeID nodeID) {
588: try {
589: notifyTransactionContexts.put(nodeID);
590: } catch (InterruptedException e) {
591: throw new TCRuntimeException(e);
592: }
593: }
594:
595: }
596:
597: private static class Root {
598: final String name;
599: final ObjectID id;
600:
601: Root(String name, ObjectID id) {
602: this .name = name;
603: this .id = id;
604: }
605: }
606:
607: private static class Listener implements
608: ServerTransactionManagerEventListener {
609: final List rootsCreated = new ArrayList();
610:
611: public void rootCreated(String name, ObjectID id) {
612: rootsCreated.add(new Root(name, id));
613: }
614: }
615:
616: private static class TestServerTransactionListener implements
617: ServerTransactionListener {
618:
619: NoExceptionLinkedQueue incomingContext = new NoExceptionLinkedQueue();
620: NoExceptionLinkedQueue appliedContext = new NoExceptionLinkedQueue();
621: NoExceptionLinkedQueue completedContext = new NoExceptionLinkedQueue();
622:
623: public void incomingTransactions(NodeID source, Set serverTxnIDs) {
624: incomingContext.put(new Object[] { source, serverTxnIDs });
625: }
626:
627: public void transactionApplied(ServerTransactionID stxID) {
628: appliedContext.put(stxID);
629: }
630:
631: public void transactionCompleted(ServerTransactionID stxID) {
632: completedContext.put(stxID);
633: }
634:
635: public void addResentServerTransactionIDs(Collection stxIDs) {
636: throw new ImplementMe();
637: }
638:
639: public void clearAllTransactionsFor(NodeID deadNode) {
640: throw new ImplementMe();
641: }
642:
643: public void transactionManagerStarted(Set cids) {
644: throw new ImplementMe();
645: }
646:
647: }
648:
649: public class TestTransactionAcknowledgeAction implements
650: TransactionAcknowledgeAction {
651: public NodeID clientID;
652: public TransactionID txID;
653:
654: public void acknowledgeTransaction(ServerTransactionID stxID) {
655: this .txID = stxID.getClientTransactionID();
656: this .clientID = stxID.getSourceID();
657: }
658:
659: public void clear() {
660: txID = null;
661: clientID = null;
662: }
663:
664: }
665: }
|