001: package org.jgroups.tests;
002:
003: import java.io.Serializable;
004: import java.util.ArrayList;
005: import java.util.Iterator;
006: import java.util.List;
007: import java.util.Random;
008:
009: import junit.framework.Test;
010: import junit.framework.TestCase;
011: import junit.framework.TestSuite;
012:
013: import org.jgroups.Address;
014: import org.jgroups.BlockEvent;
015: import org.jgroups.ChannelClosedException;
016: import org.jgroups.ChannelNotConnectedException;
017: import org.jgroups.JChannel;
018: import org.jgroups.Message;
019: import org.jgroups.TimeoutException;
020: import org.jgroups.View;
021: import org.jgroups.ViewId;
022: import org.jgroups.util.Util;
023:
024: /**
025: * Virtual Synchrony guarantees that views in a group are observed
026: * in the same order by all group members and that views are totally ordered
027: * with respect to all regular messages in a group.
028: *
029: * Therefore, in virtually synchronous group communication model, all members
030: * that observe the same two consecutive views, receive the same set of regular
031: * multicast messages between those two views.
032: *
033: * VirtualSynchronyTest verifies virtual synchrony as follows. Each surviving member P
034: * from a view Vm that receives a new view Vn sends a message M to group coordinator Q
035: * containing number of messages received in view Vm. Each member P upon receiving
036: * a new view sends a random number of messages to everyone in the group.
037: *
038: * Group coordinator Q upon receiving each message M from a member P verifies
039: * if virtual synchrony is satisifed.
040: *
041: *
042: * @author Vladimir Blagojevic
043: * @version $Id$
044: *
045: */
046: public class VirtualSynchronyTest extends TestCase {
047:
048: private final static String CHANNEL_PROPS = "flush-udp.xml";
049: private final static int INITIAL_NUMBER_OF_MEMBERS = 5;
050: private int runningTime = 1000 * 50; // 50 secs
051: private Random r = new Random();
052:
053: public VirtualSynchronyTest(String arg0) {
054: super (arg0);
055: }
056:
057: public void testVSynch() throws Exception {
058: long start = System.currentTimeMillis();
059: boolean running = true;
060: List members = new ArrayList();
061:
062: //first spawn and join
063: for (int i = 0; i < INITIAL_NUMBER_OF_MEMBERS; i++) {
064: GroupMemberThread member = new GroupMemberThread("Member");
065: member.start();
066: members.add(member);
067: Util.sleep(getRandomDelayInSeconds(4, 6) * 1000);
068: }
069:
070: for (; running;) {
071:
072: //and then flip a coin
073: if (r.nextBoolean()) {
074: Util.sleep(getRandomDelayInSeconds(3, 8) * 1000);
075: GroupMemberThread member = new GroupMemberThread(
076: "Member");
077: member.start();
078: members.add(member);
079: } else if (members.size() > 1) {
080: Util.sleep(getRandomDelayInSeconds(3, 8) * 1000);
081: GroupMemberThread unluckyBastard = (GroupMemberThread) members
082: .get(r.nextInt(members.size()));
083: members.remove(unluckyBastard);
084: unluckyBastard.setRunning(false);
085: }
086: running = System.currentTimeMillis() - start <= runningTime;
087: System.out.println("Running time "
088: + ((System.currentTimeMillis() - start) / 1000)
089: + " secs");
090: }
091: System.out
092: .println("Done, Virtual Synchrony satisfied in all tests ");
093: }
094:
095: protected int getRandomDelayInSeconds(int from, int to) {
096: return from + r.nextInt(to - from);
097: }
098:
099: protected void setUp() throws Exception {
100: super .setUp();
101: }
102:
103: protected void tearDown() throws Exception {
104: super .tearDown();
105: }
106:
107: public static Test suite() {
108: return new TestSuite(VirtualSynchronyTest.class);
109: }
110:
111: public static void main(String[] args) {
112: String[] testCaseName = { VirtualSynchronyTest.class.getName() };
113: junit.textui.TestRunner.main(testCaseName);
114: }
115:
116: private static class GroupMemberThread extends Thread {
117: JChannel ch = null;
118: int numberOfMessagesInView = 0;
119: View currentView;
120: View prevView;
121: List payloads;
122: VSynchPayload payload;
123: volatile boolean running = true;
124: Random r;
125: int messagesSentPerView = 0;
126:
127: public GroupMemberThread(String name) {
128: super (name);
129: payloads = new ArrayList();
130: r = new Random();
131: messagesSentPerView = r.nextInt(25);
132: }
133:
134: public String getAddress() {
135: if (ch != null && ch.isConnected()) {
136: return ch.getLocalAddress().toString();
137: } else {
138: return "disconnected " + getName();
139: }
140: }
141:
142: public void setRunning(boolean b) {
143: running = false;
144: System.out.println("Disconnect " + getAddress());
145: if (ch != null)
146: ch.close();
147: }
148:
149: public void run() {
150: try {
151: ch = new JChannel(CHANNEL_PROPS);
152: ch.connect("vsynchtest");
153: } catch (Exception e) {
154: e.printStackTrace();
155: }
156:
157: while (running) {
158: Object msgReceived = null;
159: try {
160: msgReceived = ch.receive(0);
161: if (!running) {
162: // I am not a group member anymore so
163: // I will discard any transient message I
164: // receive
165: } else {
166: if (msgReceived instanceof View) {
167: gotView(msgReceived);
168: }
169:
170: if (msgReceived instanceof Message) {
171: gotMessage(msgReceived);
172: }
173:
174: if (msgReceived instanceof BlockEvent) {
175: ch.blockOk();
176: }
177: }
178:
179: } catch (TimeoutException e) {
180: } catch (Exception e) {
181: ch.close();
182: running = false;
183: }
184: }
185: }
186:
187: private void gotMessage(Object msgReceived) {
188: Message msg = (Message) msgReceived;
189: Object m = msg.getObject();
190:
191: if (m instanceof VSynchPayload) {
192: VSynchPayload pay = (VSynchPayload) m;
193: if (prevView != null
194: && prevView.getVid().equals(pay.viewId)) {
195: payloads.add(pay);
196: boolean receivedAllPayloads = ((payloads.size() == prevView
197: .getMembers().size()) || (payloads.size() == currentView
198: .getMembers().size()));
199: if (receivedAllPayloads) {
200: VSynchPayload first = (VSynchPayload) payloads
201: .get(0);
202: for (Iterator i = payloads.listIterator(1); i
203: .hasNext();) {
204: VSynchPayload p = (VSynchPayload) i.next();
205: assertEquals("Member " + p + " and "
206: + first + " failed VS",
207: first.msgViewCount, p.msgViewCount);
208: }
209: System.out.println("VS ok, all "
210: + payloads.size() + " members in "
211: + prevView.getVid()
212: + " view have received "
213: + first.msgViewCount
214: + " messages.\nAll messages sent in "
215: + prevView.getVid()
216: + " were delivered in "
217: + prevView.getVid());
218: }
219: }
220: } else if (m instanceof String) {
221: assertEquals(
222: "Member "
223: + ch.getLocalAddress()
224: + " received message from the wrong view. Message sender was "
225: + msg.getSrc(), currentView.getVid()
226: .getId(), Long.parseLong((String) m));
227: numberOfMessagesInView++;
228: }
229: }
230:
231: private void gotView(Object msg)
232: throws ChannelNotConnectedException,
233: ChannelClosedException {
234: View tmpView = (View) msg;
235: if (currentView != null) {
236: payload = new VSynchPayload(currentView.getVid(),
237: numberOfMessagesInView, ch.getLocalAddress());
238: ch.send((Address) tmpView.getMembers().get(0), null,
239: payload);
240: }
241: numberOfMessagesInView = 0;
242: payloads.clear();
243: prevView = currentView;
244: currentView = tmpView;
245: // send our allotment of messages
246: for (int i = 0; i < messagesSentPerView; i++) {
247: ch.send(null, null, Long.toString(currentView.getVid()
248: .getId()));
249: }
250: }
251: }
252:
253: private static class VSynchPayload implements Serializable {
254: public ViewId viewId;
255:
256: public int msgViewCount;
257:
258: public Address member;
259:
260: public VSynchPayload(ViewId viewId, int numbreOfMessagesInView,
261: Address a) {
262: super ();
263: this .viewId = viewId;
264: this .msgViewCount = numbreOfMessagesInView;
265: this .member = a;
266: }
267:
268: public String toString() {
269: return "[member=" + member + ",viewId=" + viewId.getId()
270: + ",msgCount=" + msgViewCount + "]";
271: }
272:
273: }
274: }
|