001: // $Id: DistributedQueueTest.java,v 1.9 2006/08/13 09:05:10 mimbert Exp $
002:
003: package org.jgroups.blocks;
004:
005: import junit.framework.Test;
006: import junit.framework.TestCase;
007: import junit.framework.TestSuite;
008: import org.apache.commons.logging.Log;
009: import org.apache.commons.logging.LogFactory;
010:
011: import java.util.Vector;
012:
013: public class DistributedQueueTest extends TestCase {
014:
015: final int NUM_ITEMS = 10;
016: static Log logger = LogFactory.getLog(DistributedQueueTest.class);
017: String props;
018:
019: public DistributedQueueTest(String testName) {
020: super (testName);
021: }
022:
023: public static Test suite() {
024: return new TestSuite(DistributedQueueTest.class);
025: }
026:
027: protected DistributedQueue queue1;
028: protected DistributedQueue queue2;
029: protected DistributedQueue queue3;
030:
031: public void setUp() throws Exception {
032:
033: super .setUp();
034: props = "UDP(mcast_recv_buf_size=80000;mcast_send_buf_size=150000;mcast_port=45566;"
035: + "mcast_addr=228.8.8.8;ip_ttl=32):"
036: + "PING(timeout=2000;num_initial_members=3):"
037: + "FD_SOCK:"
038: + "VERIFY_SUSPECT(timeout=1500):"
039: + "UNICAST(timeout=600,1200,2000,2500):"
040: + "FRAG(frag_size=8096;down_thread=false;up_thread=false):"
041: + "TOTAL_TOKEN(unblock_sending=10;block_sending=50):"
042: + "pbcast.GMS(print_local_addr=true;join_timeout=3000;join_retry_timeout=2000;shun=true):"
043: + "STATE_TRANSFER:" + "QUEUE";
044:
045: queue1 = new DistributedQueue("testing", null, props, 5000);
046: log("created queue1");
047:
048: // give some time for the channel to become a coordinator
049: try {
050: Thread.sleep(1000);
051: } catch (Exception ex) {
052: }
053:
054: queue2 = new DistributedQueue("testing", null, props, 5000);
055: log("created queue2");
056:
057: try {
058: Thread.sleep(1000);
059: } catch (InterruptedException ex) {
060: }
061:
062: queue3 = new DistributedQueue("testing", null, props, 5000);
063: log("created queue3");
064:
065: try {
066: Thread.sleep(1000);
067: } catch (InterruptedException ex) {
068: }
069: }
070:
071: public void tearDown() throws Exception {
072: super .tearDown();
073: log("stopping queue1");
074: queue1.stop();
075: log("stopped queue1");
076:
077: log("stopping queue2");
078: queue2.stop();
079: log("stopped queue2");
080:
081: log("stopping queue3");
082: queue3.stop();
083: log("stopped queue3");
084: }
085:
086: void log(String msg) {
087: System.out.println("-- [" + Thread.currentThread().getName()
088: + "]: " + msg);
089: }
090:
091: class PutTask implements Runnable {
092: protected DistributedQueue queue;
093: protected String name;
094: protected boolean finished;
095:
096: public PutTask(String name, DistributedQueue q) {
097: queue = q;
098: this .name = name;
099: finished = false;
100: }
101:
102: public void run() {
103: for (int i = 0; i < NUM_ITEMS; i++) {
104: queue.add(name + '_' + i);
105: }
106: finished = true;
107: log("added " + NUM_ITEMS + " elements - done");
108: }
109:
110: public boolean finished() {
111: return finished;
112: }
113: }
114:
115: public void testMultipleWriter() throws Exception {
116: PutTask t1 = new PutTask("Queue1", queue1);
117: PutTask t2 = new PutTask("Queue2", queue2);
118: PutTask t3 = new PutTask("Queue3", queue3);
119: Thread rTask1 = new Thread(t1);
120: Thread rTask2 = new Thread(t2);
121: Thread rTask3 = new Thread(t3);
122:
123: rTask1.start();
124: rTask2.start();
125: rTask3.start();
126:
127: while (!t1.finished() || !t2.finished() || !t3.finished()) {
128: try {
129: Thread.sleep(1000);
130: } catch (InterruptedException ex) {
131: }
132: }
133:
134: assertEquals(queue1.size(), queue2.size());
135: assertEquals(queue1.size(), queue3.size());
136:
137: checkContents(queue1.getContents(), queue2.getContents());
138: checkContents(queue1.getContents(), queue3.getContents());
139: }
140:
141: protected void checkContents(Vector q1, Vector q2) {
142: for (int i = 0; i < q1.size(); i++) {
143: Object e1 = q1.elementAt(i);
144: Object e2 = q2.elementAt(i);
145: boolean t = e1.equals(e2);
146: if (!t) {
147: logger.error("Data order differs :" + e1 + "!=" + e2);
148: } else
149: logger.debug("Data order ok :" + e1 + "==" + e2);
150: assertTrue(e1.equals(e2));
151: }
152: }
153:
154: public static void main(String[] args) {
155: junit.textui.TestRunner.run(suite());
156: }
157: }
|