001: // $Id: STATE_TRANSFER_Test.java,v 1.8 2006/03/27 08:34:24 belaban Exp $
002: package org.jgroups.protocols;
003:
004: import junit.framework.Test;
005: import junit.framework.TestCase;
006: import junit.framework.TestSuite;
007: import org.jgroups.Address;
008: import org.jgroups.Channel;
009: import org.jgroups.ChannelClosedException;
010: import org.jgroups.ChannelException;
011: import org.jgroups.ChannelListener;
012: import org.jgroups.ChannelNotConnectedException;
013: import org.jgroups.ExitEvent;
014: import org.jgroups.GetStateEvent;
015: import org.jgroups.JChannel;
016: import org.jgroups.SetStateEvent;
017: import org.jgroups.Message;
018: import org.jgroups.util.Util;
019:
020: /**
021: * It's an attemp to setup Junit test case template for Protocol regression. <p>
022: * Two "processes" are started, and the coord. keeps sending msg of a counter. The 2nd
023: * process joins the grp and get the state from the coordinator. The subsequent msgs
024: * after the setState will be validated to ensure the total ordering of msg delivery. <p>
025: * This should cover the fix introduced by rev. 1.12
026: *
027: * @author Wenbo Zhu
028: * @version 1.0
029: */
030: public class STATE_TRANSFER_Test extends TestCase {
031:
032: public final static String CHANNEL_PROPS = "UDP(mcast_addr=228.8.8.8;mcast_port=45566;ip_ttl=32;"
033: + "mcast_send_buf_size=64000;mcast_recv_buf_size=64000):"
034: + "PING(timeout=2000;num_initial_members=3):"
035: + "MERGE2(min_interval=5000;max_interval=10000):"
036: + "FD_SOCK:"
037: + "VERIFY_SUSPECT(timeout=1500):"
038: + "UNICAST(timeout=600,1200,2400,4800):"
039: + "STABLE():"
040: + "NAKACK(retransmit_timeout=600,1200,2400,4800):"
041: + "FRAG(frag_size=8096;down_thread=false;up_thread=false):"
042: + "FLUSH():"
043: + "GMS(join_timeout=5000;join_retry_timeout=2000;"
044: + "print_local_addr=true):"
045: + "VIEW_ENFORCER:"
046: + "TOTAL:"
047: + "STATE_TRANSFER:" + "QUEUE";
048:
049: public static final String GROUP_NAME = "jgroups.TEST_GROUP";
050:
051: private Coordinator coord;
052:
053: public STATE_TRANSFER_Test(String testName) {
054: super (testName);
055: }
056:
057: protected void setUp() throws Exception {
058: super .setUp();
059:
060: System.setProperty("org.apache.commons.logging.Log",
061: "org.apache.commons.logging.impl.SimpleLog");
062: System.setProperty(
063: "org.apache.commons.logging.simplelog.defaultlog",
064: "error");
065:
066: coord = new Coordinator();
067: coord.recvLoop();
068: coord.sendLoop();
069: }
070:
071: protected void tearDown() throws Exception {
072: super .tearDown();
073:
074: coord.stop();
075: coord = null;
076: }
077:
078: static class Coordinator implements ChannelListener {
079:
080: private JChannel channel = null;
081: private int cnt = 0; // the state
082: private volatile boolean closed = false;
083:
084: protected Coordinator() throws ChannelException {
085:
086: channel = new JChannel(CHANNEL_PROPS);
087: channel.setOpt(Channel.LOCAL, Boolean.FALSE);
088: channel.setOpt(Channel.AUTO_RECONNECT, Boolean.TRUE);
089: channel.addChannelListener(this );
090: channel.connect(GROUP_NAME);
091: }
092:
093: public void channelConnected(Channel channel) {
094: }
095:
096: public void channelDisconnected(Channel channel) {
097: }
098:
099: public void channelClosed(Channel channel) {
100: }
101:
102: public void channelShunned() {
103: }
104:
105: public void channelReconnected(Address addr) { // n/a. now
106: }
107:
108: public void recvLoop() throws Exception {
109: Thread task = new Thread(new Runnable() {
110: public void run() {
111: Object tmp;
112: while (!closed) {
113: try {
114: tmp = channel.receive(0);
115: if (tmp instanceof ExitEvent) {
116: System.err
117: .println("-- received EXIT, waiting for ChannelReconnected callback");
118: break;
119: }
120: if (tmp instanceof GetStateEvent) {
121: synchronized (Coordinator.this ) {
122: System.err
123: .println("-- GetStateEvent, cnt="
124: + cnt);
125: channel
126: .returnState(Util
127: .objectToByteBuffer(new Integer(
128: cnt)));
129: }
130: }
131: } catch (ChannelNotConnectedException not) {
132: break;
133: } catch (ChannelClosedException closed) {
134: break;
135: } catch (Exception e) {
136: System.err.println(e);
137: }
138: }
139: }
140: });
141: task.start();
142: }
143:
144: public void sendLoop() throws Exception {
145: Thread task = new Thread(new Runnable() {
146:
147: public void run() {
148: while (!closed) {
149: try {
150: synchronized (Coordinator.this ) {
151: channel.send(null, null, new Integer(
152: ++cnt));
153: System.err.println("send cnt=" + cnt);
154: }
155: Thread.sleep(1000);
156: } catch (ChannelNotConnectedException not) {
157: break;
158: } catch (ChannelClosedException closed) {
159: break;
160: } catch (Exception e) {
161: System.err.println(e);
162: }
163: }
164: }
165: });
166: task.start();
167: }
168:
169: public void stop() {
170: closed = true;
171: channel.close();
172: }
173: }
174:
175: public void testBasicStateSync() throws Exception {
176:
177: Channel channel = new JChannel(CHANNEL_PROPS);
178: channel.setOpt(Channel.LOCAL, Boolean.FALSE);
179:
180: channel.connect(GROUP_NAME);
181:
182: Thread.sleep(1000);
183:
184: boolean join = false;
185: join = channel.getState(null, 100000l);
186: assertTrue(join);
187:
188: Object tmp;
189: int cnt = -1;
190: while (true) {
191: try {
192: tmp = channel.receive(0);
193: if (tmp instanceof ExitEvent) {
194: break;
195: }
196: if (tmp instanceof SetStateEvent) {
197: cnt = ((Integer) Util
198: .objectFromByteBuffer(((SetStateEvent) tmp)
199: .getArg())).intValue();
200: System.err.println("-- SetStateEvent, cnt=" + cnt);
201: continue;
202: }
203: if (tmp instanceof Message) {
204: if (cnt != -1) {
205: int msg = ((Integer) ((Message) tmp)
206: .getObject()).intValue();
207: assertEquals(cnt, msg - 1);
208: break; // done
209: }
210: }
211: } catch (ChannelNotConnectedException not) {
212: break;
213: } catch (ChannelClosedException closed) {
214: break;
215: } catch (Exception e) {
216: System.err.println(e);
217: }
218: }
219:
220: channel.close();
221: }
222:
223: public static Test suite() {
224: return new TestSuite(STATE_TRANSFER_Test.class);
225: }
226:
227: public static void main(String[] args) {
228: junit.textui.TestRunner.run(suite());
229: }
230:
231: }
|