001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.catalina.tribes.test.channel;
018:
019: import junit.framework.TestCase;
020: import java.io.Serializable;
021: import java.util.Random;
022: import java.util.Arrays;
023: import org.apache.catalina.tribes.ChannelListener;
024: import org.apache.catalina.tribes.Member;
025: import org.apache.catalina.tribes.group.GroupChannel;
026: import org.apache.catalina.tribes.test.channel.TestDataIntegrity.Listener;
027: import org.apache.catalina.tribes.group.interceptors.MessageDispatchInterceptor;
028: import org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor;
029:
030: /**
031: * <p>Title: </p>
032: *
033: * <p>Description: </p>
034: *
035: * <p>Company: </p>
036: *
037: * @author not attributable
038: * @version 1.0
039: */
040: public class TestDataIntegrity extends TestCase {
041: int msgCount = 500;
042: int threadCount = 20;
043: GroupChannel channel1;
044: GroupChannel channel2;
045: Listener listener1;
046: int threadCounter = 0;
047:
048: protected void setUp() throws Exception {
049: super .setUp();
050: channel1 = new GroupChannel();
051: channel1.addInterceptor(new MessageDispatch15Interceptor());
052: channel2 = new GroupChannel();
053: channel2.addInterceptor(new MessageDispatch15Interceptor());
054: listener1 = new Listener();
055: channel2.addChannelListener(listener1);
056: channel1.start(GroupChannel.DEFAULT);
057: channel2.start(GroupChannel.DEFAULT);
058: }
059:
060: protected void tearDown() throws Exception {
061: super .tearDown();
062: channel1.stop(GroupChannel.DEFAULT);
063: channel2.stop(GroupChannel.DEFAULT);
064: }
065:
066: public void testDataSendNO_ACK() throws Exception {
067: System.err.println("Starting NO_ACK");
068: Thread[] threads = new Thread[threadCount];
069: for (int x = 0; x < threads.length; x++) {
070: threads[x] = new Thread() {
071: public void run() {
072: try {
073: long start = System.currentTimeMillis();
074: for (int i = 0; i < msgCount; i++)
075: channel1.send(new Member[] { channel2
076: .getLocalMember(false) }, Data
077: .createRandomData(), 0);
078: System.out.println("Thread[" + this .getName()
079: + "] sent " + msgCount
080: + " messages in "
081: + (System.currentTimeMillis() - start)
082: + " ms.");
083: } catch (Exception x) {
084: x.printStackTrace();
085: return;
086: } finally {
087: threadCounter++;
088: }
089: }
090: };
091: }
092: for (int x = 0; x < threads.length; x++) {
093: threads[x].start();
094: }
095: for (int x = 0; x < threads.length; x++) {
096: threads[x].join();
097: }
098: //sleep for 50 sec, let the other messages in
099: long start = System.currentTimeMillis();
100: while ((System.currentTimeMillis() - start) < 15000
101: && msgCount * threadCount != listener1.count)
102: Thread.sleep(500);
103: System.err.println("Finished NO_ACK [" + listener1.count + "]");
104: assertEquals("Checking success messages.", msgCount
105: * threadCount, listener1.count);
106: }
107:
108: public void testDataSendASYNCM() throws Exception {
109: System.err.println("Starting ASYNC MULTI THREAD");
110: Thread[] threads = new Thread[threadCount];
111: for (int x = 0; x < threads.length; x++) {
112: threads[x] = new Thread() {
113: public void run() {
114: try {
115: long start = System.currentTimeMillis();
116: for (int i = 0; i < msgCount; i++)
117: channel1
118: .send(
119: new Member[] { channel2
120: .getLocalMember(false) },
121: Data.createRandomData(),
122: GroupChannel.SEND_OPTIONS_ASYNCHRONOUS);
123: System.out.println("Thread[" + this .getName()
124: + "] sent " + msgCount
125: + " messages in "
126: + (System.currentTimeMillis() - start)
127: + " ms.");
128: } catch (Exception x) {
129: x.printStackTrace();
130: return;
131: } finally {
132: threadCounter++;
133: }
134: }
135: };
136: }
137: for (int x = 0; x < threads.length; x++) {
138: threads[x].start();
139: }
140: for (int x = 0; x < threads.length; x++) {
141: threads[x].join();
142: }
143: //sleep for 50 sec, let the other messages in
144: long start = System.currentTimeMillis();
145: while ((System.currentTimeMillis() - start) < 15000
146: && msgCount * threadCount != listener1.count)
147: Thread.sleep(500);
148: System.err.println("Finished ASYNC MULTI THREAD ["
149: + listener1.count + "]");
150: assertEquals("Checking success messages.", msgCount
151: * threadCount, listener1.count);
152: }
153:
154: public void testDataSendASYNC() throws Exception {
155: System.err.println("Starting ASYNC");
156: for (int i = 0; i < msgCount; i++)
157: channel1.send(
158: new Member[] { channel2.getLocalMember(false) },
159: Data.createRandomData(),
160: GroupChannel.SEND_OPTIONS_ASYNCHRONOUS);
161: //sleep for 50 sec, let the other messages in
162: long start = System.currentTimeMillis();
163: while ((System.currentTimeMillis() - start) < 5000
164: && msgCount != listener1.count)
165: Thread.sleep(500);
166: System.err.println("Finished ASYNC");
167: assertEquals("Checking success messages.", msgCount,
168: listener1.count);
169: }
170:
171: public void testDataSendACK() throws Exception {
172: System.err.println("Starting ACK");
173: for (int i = 0; i < msgCount; i++)
174: channel1.send(
175: new Member[] { channel2.getLocalMember(false) },
176: Data.createRandomData(),
177: GroupChannel.SEND_OPTIONS_USE_ACK);
178: Thread.sleep(250);
179: System.err.println("Finished ACK");
180: assertEquals("Checking success messages.", msgCount,
181: listener1.count);
182: }
183:
184: public void testDataSendSYNCACK() throws Exception {
185: System.err.println("Starting SYNC_ACK");
186: for (int i = 0; i < msgCount; i++)
187: channel1.send(
188: new Member[] { channel2.getLocalMember(false) },
189: Data.createRandomData(),
190: GroupChannel.SEND_OPTIONS_SYNCHRONIZED_ACK
191: | GroupChannel.SEND_OPTIONS_USE_ACK);
192: Thread.sleep(250);
193: System.err.println("Finished SYNC_ACK");
194: assertEquals("Checking success messages.", msgCount,
195: listener1.count);
196: }
197:
198: public static class Listener implements ChannelListener {
199: long count = 0;
200:
201: public boolean accept(Serializable s, Member m) {
202: return (s instanceof Data);
203: }
204:
205: public void messageReceived(Serializable s, Member m) {
206: Data d = (Data) s;
207: if (!Data.verify(d)) {
208: System.err.println("ERROR");
209: } else {
210: count++;
211: if ((count % 1000) == 0) {
212: System.err.println("SUCCESS:" + count);
213: }
214: }
215: }
216: }
217:
218: public static class Data implements Serializable {
219: public int length;
220: public byte[] data;
221: public byte key;
222: public static Random r = new Random(System.currentTimeMillis());
223:
224: public static Data createRandomData() {
225: int i = r.nextInt();
226: i = (i % 127);
227: int length = Math.abs(r.nextInt() % 65555);
228: Data d = new Data();
229: d.length = length;
230: d.key = (byte) i;
231: d.data = new byte[length];
232: Arrays.fill(d.data, d.key);
233: return d;
234: }
235:
236: public static boolean verify(Data d) {
237: boolean result = (d.length == d.data.length);
238: for (int i = 0; result && (i < d.data.length); i++)
239: result = result && d.data[i] == d.key;
240: return result;
241: }
242: }
243:
244: }
|