001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.objectserver.handler;
006:
007: import EDU.oswego.cs.dl.util.concurrent.SynchronizedRef;
008:
009: import com.tc.async.api.Stage;
010: import com.tc.async.impl.MockStage;
011: import com.tc.exception.ImplementMe;
012: import com.tc.l2.api.L2Coordinator;
013: import com.tc.l2.ha.L2HADisabledCooridinator;
014: import com.tc.l2.msg.RelayedCommitTransactionMessage;
015: import com.tc.logging.TCLogger;
016: import com.tc.net.groups.ClientID;
017: import com.tc.net.protocol.tcm.ChannelID;
018: import com.tc.object.dmi.DmiDescriptor;
019: import com.tc.object.dna.impl.ObjectStringSerializer;
020: import com.tc.object.gtx.GlobalTransactionID;
021: import com.tc.object.lockmanager.api.LockID;
022: import com.tc.object.msg.CommitTransactionMessage;
023: import com.tc.object.msg.NullMessageRecycler;
024: import com.tc.object.net.ChannelStats;
025: import com.tc.object.net.DSOChannelManager;
026: import com.tc.object.tx.TransactionID;
027: import com.tc.object.tx.TxnBatchID;
028: import com.tc.object.tx.TxnType;
029: import com.tc.objectserver.api.ObjectManager;
030: import com.tc.objectserver.core.api.ServerConfigurationContext;
031: import com.tc.objectserver.gtx.TestGlobalTransactionManager;
032: import com.tc.objectserver.handshakemanager.ServerClientHandshakeManager;
033: import com.tc.objectserver.impl.TestObjectManager;
034: import com.tc.objectserver.l1.api.ClientStateManager;
035: import com.tc.objectserver.lockmanager.api.LockManager;
036: import com.tc.objectserver.persistence.api.ManagedObjectStore;
037: import com.tc.objectserver.tx.ServerTransaction;
038: import com.tc.objectserver.tx.ServerTransactionImpl;
039: import com.tc.objectserver.tx.ServerTransactionManager;
040: import com.tc.objectserver.tx.TestServerTransactionManager;
041: import com.tc.objectserver.tx.TestTransactionBatchManager;
042: import com.tc.objectserver.tx.TransactionBatchReader;
043: import com.tc.objectserver.tx.TransactionBatchReaderFactory;
044: import com.tc.objectserver.tx.TransactionalObjectManager;
045: import com.tc.test.TCTestCase;
046: import com.tc.util.SequenceID;
047: import com.tc.util.SequenceValidator;
048:
049: import java.util.Collection;
050: import java.util.Collections;
051: import java.util.HashMap;
052: import java.util.HashSet;
053: import java.util.LinkedList;
054: import java.util.List;
055: import java.util.Map;
056:
057: public class ProcessTransactionHandlerTest extends TCTestCase {
058:
059: private TestServerConfigurationContext cctxt;
060: private TestObjectManager objectManager;
061: private ProcessTransactionHandler handler;
062: private TestTransactionBatchReaderFactory transactionBatchReaderFactory;
063: private SynchronizedRef batchReader;
064: private TestTransactionBatchManager transactionBatchManager;
065: private TestGlobalTransactionManager gtxm;
066: private SequenceValidator sequenceValidator;
067: public L2Coordinator l2Coordinator;
068: public TestServerTransactionManager transactionMgr;
069:
070: public void setUp() throws Exception {
071: objectManager = new TestObjectManager();
072: transactionBatchManager = new TestTransactionBatchManager();
073: gtxm = new TestGlobalTransactionManager();
074: sequenceValidator = new SequenceValidator(0);
075: handler = new ProcessTransactionHandler(
076: transactionBatchManager, sequenceValidator,
077: new NullMessageRecycler());
078:
079: transactionBatchReaderFactory = new TestTransactionBatchReaderFactory();
080: transactionMgr = new TestServerTransactionManager();
081: l2Coordinator = new L2HADisabledCooridinator();
082: cctxt = new TestServerConfigurationContext();
083: batchReader = new SynchronizedRef(null);
084: handler.initializeContext(cctxt);
085: }
086:
087: public void tests() throws Exception {
088:
089: TestTransactionBatchReader batch = new TestTransactionBatchReader();
090: batch.clientID = new ClientID(new ChannelID(1));
091: batch.batchID = new TxnBatchID(1);
092:
093: final List dnaList = Collections.EMPTY_LIST;
094: final Map newRootsMap = Collections.EMPTY_MAP;
095: ServerTransaction serverTransaction = new ServerTransactionImpl(
096: gtxm, batch.batchID, new TransactionID(1),
097: new SequenceID(1), new LockID[0], batch.clientID,
098: dnaList, new ObjectStringSerializer(), newRootsMap,
099: TxnType.NORMAL, new LinkedList(),
100: DmiDescriptor.EMPTY_ARRAY);
101: Collection completedTransactionIDs = new HashSet();
102: for (int i = 0; i < 10; i++) {
103: completedTransactionIDs.add(new GlobalTransactionID(i));
104: }
105: batch.acknowledged.addAll(completedTransactionIDs);
106: batch.transactions.add(serverTransaction);
107:
108: batchReader.set(batch);
109:
110: // make sure our context queues are empty...
111: assertTrue(transactionBatchManager.defineBatchContexts
112: .isEmpty());
113: assertTrue(objectManager.lookupObjectForCreateIfNecessaryContexts
114: .isEmpty());
115: // HANDLE EVENT
116: objectManager.makePending = true;
117: handler.handleEvent(null);
118: // make sure defineBatch was called on the transaction manager.
119: Object[] args = (Object[]) transactionBatchManager.defineBatchContexts
120: .take();
121: assertEquals(batch.clientID, args[0]);
122: assertEquals(new Integer(1), args[1]);
123: // there shouldn't be any more calls in the queue
124: assertTrue(transactionBatchManager.defineBatchContexts
125: .isEmpty());
126:
127: // check to see if incomingTransactions are called on transactionMgr
128: Object[] incomingCallContext = (Object[]) transactionMgr.incomingTxnContexts
129: .remove(0);
130: assertNotNull(incomingCallContext);
131: assertTrue(transactionMgr.incomingTxnContexts.isEmpty());
132:
133: // Look up shouldnt have happened yet
134: args = (Object[]) objectManager.lookupObjectForCreateIfNecessaryContexts
135: .poll(100);
136: assertNull(args);
137:
138: // send another txn and see that an event context is added again to the stage
139: batch = new TestTransactionBatchReader();
140: batch.clientID = new ClientID(new ChannelID(1));
141: batch.batchID = new TxnBatchID(2);
142:
143: serverTransaction = new ServerTransactionImpl(gtxm,
144: batch.batchID, new TransactionID(2), new SequenceID(2),
145: new LockID[0], batch.clientID, dnaList,
146: new ObjectStringSerializer(), newRootsMap,
147: TxnType.NORMAL, new LinkedList(),
148: DmiDescriptor.EMPTY_ARRAY);
149: completedTransactionIDs = new HashSet();
150: for (int i = 11; i < 20; i++) {
151: completedTransactionIDs.add(new GlobalTransactionID(i));
152: }
153: batch.acknowledged.addAll(completedTransactionIDs);
154: batch.transactions.add(serverTransaction);
155:
156: batchReader.set(batch);
157:
158: // make sure our context queues are empty...
159: assertTrue(transactionBatchManager.defineBatchContexts
160: .isEmpty());
161: assertTrue(objectManager.lookupObjectForCreateIfNecessaryContexts
162: .isEmpty());
163: // HANDLE EVENT
164: objectManager.makePending = true;
165: handler.handleEvent(null);
166:
167: // check to see if incomingTransactions are called on transactionMgr
168: incomingCallContext = (Object[]) transactionMgr.incomingTxnContexts
169: .remove(0);
170: assertNotNull(incomingCallContext);
171: assertTrue(transactionMgr.incomingTxnContexts.isEmpty());
172:
173: }
174:
175: private final class TestTransactionBatchReader implements
176: TransactionBatchReader {
177:
178: public final Collection acknowledged = new HashSet();
179: public TxnBatchID batchID;
180: public ClientID clientID;
181: public final List transactions = new LinkedList();
182: int current = 0;
183:
184: public ServerTransaction getNextTransaction() {
185: return transactions.size() > current ? (ServerTransaction) transactions
186: .get(current++)
187: : null;
188: }
189:
190: public void reset() {
191: current = 0;
192: }
193:
194: public TxnBatchID getBatchID() {
195: return batchID;
196: }
197:
198: public int getNumTxns() {
199: return transactions.size();
200: }
201:
202: public ClientID getNodeID() {
203: return clientID;
204: }
205:
206: }
207:
208: private final class TestTransactionBatchReaderFactory implements
209: TransactionBatchReaderFactory {
210:
211: public TransactionBatchReader newTransactionBatchReader(
212: CommitTransactionMessage ctxt) {
213: return (TransactionBatchReader) batchReader.get();
214: }
215:
216: public TransactionBatchReader newTransactionBatchReader(
217: RelayedCommitTransactionMessage commitMessage) {
218: throw new ImplementMe();
219: }
220:
221: }
222:
223: private final class TestServerConfigurationContext implements
224: ServerConfigurationContext {
225:
226: public Map sinks = new HashMap();
227:
228: public ObjectManager getObjectManager() {
229: return objectManager;
230: }
231:
232: public LockManager getLockManager() {
233: return null;
234: }
235:
236: public DSOChannelManager getChannelManager() {
237: return null;
238: }
239:
240: public ClientStateManager getClientStateManager() {
241: return null;
242: }
243:
244: public ServerTransactionManager getTransactionManager() {
245: return transactionMgr;
246: }
247:
248: public ManagedObjectStore getObjectStore() {
249: return null;
250: }
251:
252: public ServerClientHandshakeManager getClientHandshakeManager() {
253: return null;
254: }
255:
256: public ChannelStats getChannelStats() {
257: return null;
258: }
259:
260: public Stage getStage(String name) {
261: if (!sinks.containsKey(name)) {
262: sinks.put(name, new MockStage(name));
263: }
264: return (Stage) sinks.get(name);
265: }
266:
267: public TCLogger getLogger(Class clazz) {
268: return null;
269: }
270:
271: public TransactionBatchReaderFactory getTransactionBatchReaderFactory() {
272: return transactionBatchReaderFactory;
273: }
274:
275: public TransactionalObjectManager getTransactionalObjectManager() {
276: return null;
277: }
278:
279: public L2Coordinator getL2Coordinator() {
280: return l2Coordinator;
281: }
282: }
283:
284: }
|