001: /*
002: * All content copyright (c) 2003-2007 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.l2.msg;
006:
007: import com.tc.l2.ha.ClusterState;
008: import com.tc.net.protocol.transport.ConnectionID;
009: import com.tc.net.protocol.transport.ConnectionIDFactory;
010: import com.tc.objectserver.gtx.GlobalTransactionIDSequenceProvider;
011: import com.tc.objectserver.handler.GlobalTransactionIDBatchRequestHandler;
012: import com.tc.objectserver.impl.PersistentManagedObjectStore;
013: import com.tc.objectserver.impl.TestManagedObjectPersistor;
014: import com.tc.objectserver.persistence.api.PersistentMapStore;
015: import com.tc.objectserver.persistence.api.Persistor;
016: import com.tc.objectserver.persistence.impl.InMemoryPersistor;
017: import com.tc.objectserver.persistence.impl.TestMutableSequence;
018: import com.tc.objectserver.persistence.sleepycat.ConnectionIDFactoryImpl;
019: import com.tc.util.State;
020: import com.tc.util.sequence.ObjectIDSequence;
021:
022: import java.io.ByteArrayInputStream;
023: import java.io.ByteArrayOutputStream;
024: import java.io.ObjectInput;
025: import java.io.ObjectInputStream;
026: import java.io.ObjectOutput;
027: import java.io.ObjectOutputStream;
028: import java.util.HashMap;
029: import java.util.Iterator;
030:
031: import junit.framework.TestCase;
032:
033: public class ClusterStateMessageTest extends TestCase {
034: private static final int CLUSTER_STATE_1 = 1;
035: private static final int CLUSTER_STATE_2 = 2;
036:
037: private ClusterState clusterState_1;
038: private ClusterState clusterState_2;
039:
040: public void setUp() {
041: resetClusterState(CLUSTER_STATE_1);
042: resetClusterState(CLUSTER_STATE_2);
043: }
044:
045: public void tearDown() {
046: clusterState_1 = null;
047: clusterState_2 = null;
048: }
049:
050: private void resetClusterState(int clusterState) {
051: Persistor persistor = new InMemoryPersistor();
052: PersistentMapStore clusterStateStore = persistor
053: .getClusterStateStore();
054: ObjectIDSequence oidSequence = new PersistentManagedObjectStore(
055: new TestManagedObjectPersistor(new HashMap()));
056: ConnectionIDFactory connectionIdFactory = new ConnectionIDFactoryImpl(
057: persistor.getClientStatePersistor());
058: GlobalTransactionIDSequenceProvider gidSequenceProvider = new GlobalTransactionIDBatchRequestHandler(
059: new TestMutableSequence());
060: if (clusterState == CLUSTER_STATE_1) {
061: clusterState_1 = new ClusterState(clusterStateStore,
062: oidSequence, connectionIdFactory,
063: gidSequenceProvider);
064: clusterState_1.setClusterID("foobar");
065: } else {
066: clusterState_2 = new ClusterState(clusterStateStore,
067: oidSequence, connectionIdFactory,
068: gidSequenceProvider);
069: clusterState_2.setClusterID("foobar");
070: }
071: }
072:
073: private void validate(ClusterStateMessage csm,
074: ClusterStateMessage csm1) {
075: assertEquals(csm.getMessageID(), csm1.getMessageID());
076: assertEquals(csm.getType(), csm1.getType());
077: assertEquals(csm.inResponseTo(), csm1.inResponseTo());
078: assertEquals(csm.messageFrom(), csm1.messageFrom());
079:
080: assertEquals(csm.getNextAvailableObjectID(), csm1
081: .getNextAvailableObjectID());
082: assertEquals(csm.getNextAvailableGlobalTxnID(), csm1
083: .getNextAvailableGlobalTxnID());
084: assertEquals(csm.getClusterID(), csm1.getClusterID());
085: assertEquals(csm.getConnectionID(), csm1.getConnectionID());
086: }
087:
088: private void validate(ClusterState cs, ClusterState cs1) {
089: assertEquals(cs.getNextAvailableChannelID(), cs1
090: .getNextAvailableChannelID());
091: assertEquals(cs.getNextAvailableGlobalTxnID(), cs1
092: .getNextAvailableGlobalTxnID());
093: assertEquals(cs.getNextAvailableObjectID(), cs1
094: .getNextAvailableObjectID());
095: assertEquals(cs.getAllConnections().size(), cs1
096: .getAllConnections().size());
097:
098: Iterator iter1 = cs1.getAllConnections().iterator();
099: for (Iterator iter = cs.getAllConnections().iterator(); iter
100: .hasNext();) {
101: assertEquals(((ConnectionID) iter.next()).getID(),
102: ((ConnectionID) iter1.next()).getID());
103: }
104:
105: assertEquals(cs.getClusterID(), cs1.getClusterID());
106: }
107:
108: private ClusterStateMessage writeAndRead(ClusterStateMessage csm)
109: throws Exception {
110: ByteArrayOutputStream bo = new ByteArrayOutputStream();
111: ObjectOutput oo = new ObjectOutputStream(bo);
112: oo.writeObject(csm);
113: System.err.println("Written : " + csm);
114: ByteArrayInputStream bi = new ByteArrayInputStream(bo
115: .toByteArray());
116: ObjectInput oi = new ObjectInputStream(bi);
117: ClusterStateMessage csm1 = (ClusterStateMessage) oi
118: .readObject();
119: System.err.println("Read : " + csm1);
120: return csm1;
121: }
122:
123: private void modifyClusterState(int clusterState) {
124: ConnectionID connectionID_1 = new ConnectionID(10, "foo");
125: ConnectionID connectionID_2 = new ConnectionID(11, "foo");
126: ConnectionID connectionID_3 = new ConnectionID(12, "goo");
127:
128: if (clusterState == CLUSTER_STATE_1) {
129: clusterState_1.addNewConnection(connectionID_1);
130: clusterState_1.addNewConnection(connectionID_2);
131: clusterState_1.addNewConnection(connectionID_3);
132: clusterState_1.removeConnection(connectionID_2);
133: clusterState_1.setNextAvailableChannelID(13);
134: clusterState_1.setNextAvailableObjectID(222);
135: clusterState_1.setNextAvailableGlobalTransactionID(333);
136: clusterState_1.setCurrentState(new State("testing"));
137: } else {
138: clusterState_2.addNewConnection(connectionID_1);
139: clusterState_2.addNewConnection(connectionID_2);
140: clusterState_2.addNewConnection(connectionID_3);
141: clusterState_2.removeConnection(connectionID_2);
142: clusterState_2.setNextAvailableChannelID(13);
143: clusterState_2.setNextAvailableObjectID(222);
144: clusterState_2.setNextAvailableGlobalTransactionID(333);
145: clusterState_2.setCurrentState(new State("testing"));
146:
147: }
148: }
149:
150: public void testBasicSerialization() throws Exception {
151: modifyClusterState(CLUSTER_STATE_1);
152:
153: ClusterStateMessage csm = (ClusterStateMessage) ClusterStateMessageFactory
154: .createClusterStateMessage(clusterState_1);
155: ClusterStateMessage csm1 = writeAndRead(csm);
156: validate(csm, csm1);
157:
158: csm = (ClusterStateMessage) ClusterStateMessageFactory
159: .createOKResponse(new ClusterStateMessage());
160: csm1 = writeAndRead(csm);
161: validate(csm, csm1);
162: }
163:
164: public void testInitState() throws Exception {
165: ConnectionID connectionID = new ConnectionID(10, "foo");
166:
167: // COMPLETE_STATE
168: modifyClusterState(CLUSTER_STATE_1);
169: ClusterStateMessage csm = (ClusterStateMessage) ClusterStateMessageFactory
170: .createClusterStateMessage(clusterState_1);
171: ClusterStateMessage csm1 = writeAndRead(csm);
172: validate(csm, csm1);
173: csm1.initState(clusterState_2);
174: validate(clusterState_1, clusterState_2);
175:
176: // GLOBAL_TRANSACTION_ID
177: resetClusterState(CLUSTER_STATE_1);
178: resetClusterState(CLUSTER_STATE_2);
179: clusterState_1.setNextAvailableGlobalTransactionID(3423);
180: csm = (ClusterStateMessage) ClusterStateMessageFactory
181: .createNextAvailableGlobalTransactionIDMessage(clusterState_1);
182: csm1 = writeAndRead(csm);
183: validate(csm, csm1);
184: csm1.initState(clusterState_2);
185: validate(clusterState_1, clusterState_2);
186:
187: // OBJECT_ID
188: resetClusterState(CLUSTER_STATE_1);
189: resetClusterState(CLUSTER_STATE_2);
190: clusterState_1.setNextAvailableObjectID(6868);
191: csm = (ClusterStateMessage) ClusterStateMessageFactory
192: .createNextAvailableObjectIDMessage(clusterState_1);
193: csm1 = writeAndRead(csm);
194: validate(csm, csm1);
195: csm1.initState(clusterState_2);
196: validate(clusterState_1, clusterState_2);
197:
198: // NEW_CONNECTION_CREATED
199: csm = (ClusterStateMessage) ClusterStateMessageFactory
200: .createNewConnectionCreatedMessage(connectionID);
201: csm1 = writeAndRead(csm);
202: validate(csm, csm1);
203: resetClusterState(CLUSTER_STATE_2);
204: csm1.initState(clusterState_2);
205: assertEquals(connectionID, clusterState_2.getAllConnections()
206: .iterator().next());
207:
208: // CONNECTION_DESTROYED
209: csm = (ClusterStateMessage) ClusterStateMessageFactory
210: .createConnectionDestroyedMessage(connectionID);
211: csm1 = writeAndRead(csm);
212: validate(csm, csm1);
213: resetClusterState(CLUSTER_STATE_2);
214: clusterState_2.addNewConnection(connectionID);
215: csm1.initState(clusterState_2);
216: assertEquals(0, clusterState_2.getAllConnections().size());
217: }
218:
219: }
|