001: package org.jgroups.tests;
002:
003: import java.io.ByteArrayOutputStream;
004: import java.io.InputStream;
005: import java.io.IOException;
006: import java.io.OutputStream;
007:
008: import junit.framework.TestCase;
009: import org.jgroups.Channel;
010: import org.jgroups.ExtendedMessageListener;
011: import org.jgroups.JChannelFactory;
012: import org.jgroups.Message;
013: import org.jgroups.View;
014: import org.jgroups.blocks.RpcDispatcher;
015: import org.jgroups.stack.GossipRouter;
016: import org.jgroups.util.Util;
017:
018: /**
019: * Tests merging with a multiplexer channel
020: * @author Jerry Gauthier
021: * @version $Id: MultiplexerMergeTest.java,v 1.1.2.2 2007/03/12 14:52:18 vlada Exp $
022: */
023: public class MultiplexerMergeTest extends TestCase {
024: // stack file must be on classpath
025: private static final String STACK_FILE = "stacks.xml";
026: private static final String STACK_NAME = "tunnel";
027: // router address and port must match definition in stack
028: private static final int ROUTER_PORT = 12001;
029: private static final String BIND_ADDR = "127.0.0.1";
030:
031: private JChannelFactory factory;
032: private JChannelFactory factory2;
033: private Channel ch1;
034: private Channel ch2;
035: private GossipRouter router;
036: private RpcDispatcher dispatcher1;
037: private RpcDispatcher dispatcher2;
038:
039: public MultiplexerMergeTest(String name) {
040: super (name);
041: }
042:
043: protected void setUp() throws Exception {
044: super .setUp();
045:
046: factory = new JChannelFactory();
047: factory.setMultiplexerConfig(STACK_FILE);
048:
049: factory2 = new JChannelFactory();
050: factory2.setMultiplexerConfig(STACK_FILE);
051:
052: startRouter();
053:
054: ch1 = factory.createMultiplexerChannel(STACK_NAME, "foo");
055: dispatcher1 = new RpcDispatcher(ch1, null, null, new Object(),
056: false);
057: dispatcher1.setMessageListener(new MessageListenerAdaptor(
058: "listener1", "client1 initial state"));
059: ch1.connect("bla");
060: ch1.getState(null, 10000);
061:
062: ch2 = factory2.createMultiplexerChannel(STACK_NAME, "foo");
063: dispatcher2 = new RpcDispatcher(ch2, null, null, new Object(),
064: false);
065: dispatcher2.setMessageListener(new MessageListenerAdaptor(
066: "listener2", "client2 initial state"));
067: ch2.connect("bla");
068: boolean rc = ch2.getState(null, 10000);
069: //assertTrue("channel2 failed to obtain state successfully", rc);
070:
071: System.out.println("sleeping for 5 seconds");
072: Util.sleep(5000);
073: }
074:
075: public void tearDown() throws Exception {
076: super .tearDown();
077: ch2.close();
078: ch1.close();
079: stopRouter();
080: }
081:
082: public void testPartitionAndSubsequentMerge() throws Exception {
083: partitionAndMerge();
084: }
085:
086: private void partitionAndMerge() throws Exception {
087: View v = ch2.getView();
088: System.out.println("ch2 view is " + v);
089: assertEquals("channel2 should have 2 members", 2, ch2.getView()
090: .size());
091:
092: System.out
093: .println("++ simulating network partition by stopping the GossipRouter");
094: stopRouter();
095:
096: System.out.println("sleeping for 20 seconds");
097: Util.sleep(20000);
098:
099: v = ch1.getView();
100: System.out.println("-- ch1.view: " + v);
101: v = ch2.getView();
102: System.out.println("-- ch2.view: " + v);
103:
104: assertEquals(
105: "channel2 should have 1 member (channels should have excluded each other)",
106: 1, v.size());
107:
108: System.out
109: .println("++ simulating merge by starting the GossipRouter again");
110: router.start();
111:
112: System.out.println("sleeping for 30 seconds");
113: Util.sleep(30000);
114:
115: v = ch1.getView();
116: System.out.println("-- ch1.view: " + v);
117: v = ch2.getView();
118: System.out.println("-- ch2.view: " + v);
119:
120: assertEquals(
121: "channel2 is supposed to have 2 members again after merge",
122: 2, ch2.getView().size());
123: }
124:
125: private void startRouter() throws Exception {
126: router = new GossipRouter(ROUTER_PORT, BIND_ADDR);
127: router.start();
128: }
129:
130: private void stopRouter() {
131: router.stop();
132: }
133:
134: private final class MessageListenerAdaptor implements
135: ExtendedMessageListener {
136: private String m_name;
137: private byte[] m_state = null;
138:
139: MessageListenerAdaptor(String name, String state) {
140: m_name = name;
141: if (state != null)
142: m_state = state.getBytes();
143: }
144:
145: public void receive(Message msg) {
146: System.out
147: .println(m_name
148: + " MultiplexerMergeTest.receive() - not implemented");
149: }
150:
151: public byte[] getState() {
152: System.out
153: .println(m_name
154: + " MultiplexerMergeTest.getState() - returning byte[] state = "
155: + new String(m_state));
156: return m_state;
157: }
158:
159: public void setState(byte[] state) {
160: System.out
161: .println(m_name
162: + " MultiplexerMergeTest.setState(byte[]) - setting state = "
163: + new String(state));
164: m_state = state;
165: }
166:
167: public void setState(InputStream is) {
168: m_state = getInputStreamBytes(is);
169: try {
170: is.close();
171: } catch (IOException e) {
172: System.out
173: .println(m_name
174: + " MultiplexerMergeTest.setState(InputStream): "
175: + e.toString());
176: }
177: System.out
178: .println(m_name
179: + " MultiplexerMergeTest.setState(InputStream) - setting stream state = "
180: + new String(m_state));
181: }
182:
183: public void getState(OutputStream os) {
184: System.out
185: .println(m_name
186: + " MultiplexerMergeTest.getState(OutputStream) returning stream state = "
187: + new String(m_state));
188: try {
189: os.write(m_state);
190: os.flush();
191: os.close();
192: } catch (IOException e) {
193: System.out
194: .println(m_name
195: + " MultiplexerMergeTest.getState(OutputStream) failed: "
196: + e.toString());
197: }
198: }
199:
200: public byte[] getState(String state_id) {
201: System.out
202: .println(m_name
203: + " MultiplexerMergeTest.getState(String) - not implemented");
204: return null;
205: }
206:
207: public void getState(String state_id, OutputStream os) {
208: System.out
209: .println(m_name
210: + " MultiplexerMergeTest.getState(String, InputStream) - not implemented");
211: }
212:
213: public void setState(String state_id, byte[] state) {
214: System.out
215: .println(m_name
216: + " MultiplexerMergeTest.setState(String, byte[]) - not implemented");
217: }
218:
219: public void setState(String state_id, InputStream is) {
220: System.out
221: .println(m_name
222: + " MultiplexerMergeTest.setState(String, InputStream) - not implemented");
223: }
224: }
225:
226: private static byte[] getInputStreamBytes(InputStream is) {
227: byte[] b = null;
228: if (is != null) {
229: b = new byte[1024];
230: ByteArrayOutputStream baos = new ByteArrayOutputStream();
231: try {
232: while (true) {
233: int bytes = is.read(b);
234: if (bytes == -1) {
235: break;
236: }
237: baos.write(b, 0, bytes);
238: }
239: } catch (Exception e) {
240: e.printStackTrace();
241: } finally {
242: try {
243: if (baos != null) {
244: b = baos.toByteArray();
245: baos.close();
246: }
247: } catch (Exception e) {
248: e.printStackTrace();
249: }
250: }
251: }
252: return b;
253: }
254:
255: public static void main(String[] args) {
256: String[] testCaseName = { MultiplexerMergeTest.class.getName() };
257: junit.textui.TestRunner.main(testCaseName);
258: }
259:
260: }
|