001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.catalina.tribes.test.interceptors;
018:
019: import org.apache.catalina.tribes.Channel;
020: import org.apache.catalina.tribes.Member;
021: import org.apache.catalina.tribes.group.GroupChannel;
022: import org.apache.catalina.tribes.group.interceptors.NonBlockingCoordinator;
023: import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector;
024: import junit.framework.TestCase;
025: import junit.framework.TestResult;
026: import junit.framework.TestSuite;
027: import org.apache.catalina.tribes.ChannelListener;
028: import java.io.Serializable;
029: import org.apache.catalina.tribes.group.interceptors.OrderInterceptor;
030: import org.apache.catalina.tribes.group.ChannelInterceptorBase;
031: import org.apache.catalina.tribes.ChannelMessage;
032: import org.apache.catalina.tribes.group.InterceptorPayload;
033: import org.apache.catalina.tribes.ChannelException;
034: import java.util.concurrent.atomic.AtomicInteger;
035:
036: public class TestOrderInterceptor extends TestCase {
037:
038: GroupChannel[] channels = null;
039: OrderInterceptor[] orderitcs = null;
040: MangleOrderInterceptor[] mangleitcs = null;
041: TestListener[] test = null;
042: int channelCount = 2;
043: Thread[] threads = null;
044:
045: protected void setUp() throws Exception {
046: System.out.println("Setup");
047: super .setUp();
048: channels = new GroupChannel[channelCount];
049: orderitcs = new OrderInterceptor[channelCount];
050: mangleitcs = new MangleOrderInterceptor[channelCount];
051: test = new TestListener[channelCount];
052: threads = new Thread[channelCount];
053: for (int i = 0; i < channelCount; i++) {
054: channels[i] = new GroupChannel();
055:
056: orderitcs[i] = new OrderInterceptor();
057: mangleitcs[i] = new MangleOrderInterceptor();
058: orderitcs[i].setExpire(Long.MAX_VALUE);
059: channels[i].addInterceptor(orderitcs[i]);
060: channels[i].addInterceptor(mangleitcs[i]);
061: test[i] = new TestListener(i);
062: channels[i].addChannelListener(test[i]);
063: final int j = i;
064: threads[i] = new Thread() {
065: public void run() {
066: try {
067: channels[j].start(Channel.DEFAULT);
068: Thread.sleep(50);
069: } catch (Exception x) {
070: x.printStackTrace();
071: }
072: }
073: };
074: }
075: for (int i = 0; i < channelCount; i++)
076: threads[i].start();
077: for (int i = 0; i < channelCount; i++)
078: threads[i].join();
079: Thread.sleep(1000);
080: }
081:
082: public void testOrder1() throws Exception {
083: Member[] dest = channels[0].getMembers();
084: final AtomicInteger value = new AtomicInteger(0);
085: for (int i = 0; i < 100; i++) {
086: channels[0].send(dest, new Integer(value.getAndAdd(1)), 0);
087: }
088: Thread.sleep(5000);
089: for (int i = 0; i < test.length; i++) {
090: super .assertEquals(false, test[i].fail);
091: }
092: }
093:
094: public void testOrder2() throws Exception {
095: final Member[] dest = channels[0].getMembers();
096: final AtomicInteger value = new AtomicInteger(0);
097: Runnable run = new Runnable() {
098: public void run() {
099: for (int i = 0; i < 100; i++) {
100: try {
101: synchronized (channels[0]) {
102: channels[0].send(dest, new Integer(value
103: .getAndAdd(1)), 0);
104: }
105: } catch (Exception x) {
106: x.printStackTrace();
107: assertEquals(true, false);
108: }
109: }
110: }
111: };
112: Thread[] threads = new Thread[5];
113: for (int i = 0; i < threads.length; i++) {
114: threads[i] = new Thread(run);
115: }
116: for (int i = 0; i < threads.length; i++) {
117: threads[i].start();
118: }
119: for (int i = 0; i < threads.length; i++) {
120: threads[i].join();
121: }
122: Thread.sleep(5000);
123: for (int i = 0; i < test.length; i++) {
124: super .assertEquals(false, test[i].fail);
125: }
126: }
127:
128: protected void tearDown() throws Exception {
129: System.out.println("tearDown");
130: super .tearDown();
131: for (int i = 0; i < channelCount; i++) {
132: channels[i].stop(Channel.DEFAULT);
133: }
134: }
135:
136: public static void main(String[] args) throws Exception {
137: TestSuite suite = new TestSuite();
138: suite.addTestSuite(TestOrderInterceptor.class);
139: suite.run(new TestResult());
140: }
141:
142: public static class TestListener implements ChannelListener {
143: int id = -1;
144:
145: public TestListener(int id) {
146: this .id = id;
147: }
148:
149: int cnt = 0;
150: int total = 0;
151: boolean fail = false;
152:
153: public synchronized void messageReceived(Serializable msg,
154: Member sender) {
155: total++;
156: Integer i = (Integer) msg;
157: if (i.intValue() != cnt)
158: fail = true;
159: else
160: cnt++;
161: System.out.println("Listener[" + id + "] Message received:"
162: + i + " Count:" + total + " Fail:" + fail);
163:
164: }
165:
166: public boolean accept(Serializable msg, Member sender) {
167: return (msg instanceof Integer);
168: }
169: }
170:
171: public static class MangleOrderInterceptor extends
172: ChannelInterceptorBase {
173: int cnt = 1;
174: ChannelMessage hold = null;
175: Member[] dest = null;
176:
177: public synchronized void sendMessage(Member[] destination,
178: ChannelMessage msg, InterceptorPayload payload)
179: throws ChannelException {
180: if (hold == null) {
181: //System.out.println("Skipping message:"+msg);
182: hold = (ChannelMessage) msg.deepclone();
183: dest = new Member[destination.length];
184: System.arraycopy(destination, 0, dest, 0, dest.length);
185: } else {
186: //System.out.println("Sending message:"+msg);
187: super .sendMessage(destination, msg, payload);
188: //System.out.println("Sending message:"+hold);
189: super.sendMessage(dest, hold, null);
190: hold = null;
191: dest = null;
192: }
193: }
194: }
195:
196: }
|