001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.catalina.tribes.test.io;
018:
019: import java.util.ArrayList;
020:
021: import org.apache.catalina.tribes.Channel;
022: import org.apache.catalina.tribes.ManagedChannel;
023: import org.apache.catalina.tribes.Member;
024: import org.apache.catalina.tribes.MembershipListener;
025: import org.apache.catalina.tribes.group.GroupChannel;
026: import junit.framework.TestCase;
027: import org.apache.catalina.tribes.ChannelListener;
028: import java.io.Serializable;
029: import java.util.Random;
030: import java.util.HashMap;
031: import org.apache.catalina.tribes.transport.ReplicationTransmitter;
032:
033: public class TestSenderConnections extends TestCase {
034: private static int count = 2;
035: private ManagedChannel[] channels = new ManagedChannel[count];
036: private TestMsgListener[] listeners = new TestMsgListener[count];
037:
038: protected void setUp() throws Exception {
039: super .setUp();
040: for (int i = 0; i < channels.length; i++) {
041: channels[i] = new GroupChannel();
042: channels[i].getMembershipService().setPayload(
043: ("Channel-" + (i + 1)).getBytes("ASCII"));
044: listeners[i] = new TestMsgListener(("Listener-" + (i + 1)));
045: channels[i].addChannelListener(listeners[i]);
046: channels[i].start(Channel.SND_RX_SEQ | Channel.SND_TX_SEQ);
047:
048: }
049: }
050:
051: public void clear() {
052: }
053:
054: public void sendMessages(long delay, long sleep) throws Exception {
055: Member local = channels[0].getLocalMember(true);
056: Member dest = channels[1].getLocalMember(true);
057: int n = 3;
058: System.out.println("Sending " + n + " messages from ["
059: + local.getName() + "] to [" + dest.getName() + "]");
060: for (int i = 0; i < n; i++) {
061: channels[0].send(new Member[] { dest }, new TestMsg(), 0);
062: if (delay > 0)
063: Thread.sleep(delay);
064: }
065: System.out.println("Messages sent. Sleeping for "
066: + (sleep / 1000) + " seconds to inspect connections");
067: if (sleep > 0)
068: Thread.sleep(sleep);
069:
070: }
071:
072: public void testConnectionLinger() throws Exception {
073: sendMessages(0, 15000);
074: }
075:
076: public void testKeepAliveCount() throws Exception {
077: System.out.println("Setting keep alive count to 0");
078: for (int i = 0; i < channels.length; i++) {
079: ReplicationTransmitter t = (ReplicationTransmitter) channels[0]
080: .getChannelSender();
081: t.getTransport().setKeepAliveCount(0);
082: }
083: sendMessages(1000, 15000);
084: }
085:
086: public void testKeepAliveTime() throws Exception {
087: System.out.println("Setting keep alive count to 1 second");
088: for (int i = 0; i < channels.length; i++) {
089: ReplicationTransmitter t = (ReplicationTransmitter) channels[0]
090: .getChannelSender();
091: t.getTransport().setKeepAliveTime(1000);
092: }
093: sendMessages(2000, 15000);
094: }
095:
096: protected void tearDown() throws Exception {
097: for (int i = 0; i < channels.length; i++) {
098: channels[i].stop(Channel.DEFAULT);
099: }
100:
101: }
102:
103: public static class TestMsg implements Serializable {
104: static Random r = new Random(System.currentTimeMillis());
105: HashMap map = new HashMap();
106:
107: public TestMsg() {
108: int size = Math.abs(r.nextInt() % 200);
109: for (int i = 0; i < size; i++) {
110: int length = Math.abs(r.nextInt() % 65000);
111: ArrayList list = new ArrayList(length);
112: map.put(new Integer(i), list);
113: }
114: }
115: }
116:
117: public class TestMsgListener implements ChannelListener {
118: public String name = null;
119:
120: public TestMsgListener(String name) {
121: this .name = name;
122: }
123:
124: public void messageReceived(Serializable msg, Member sender) {
125: System.out.println("[" + name + "] Received message:" + msg
126: + " from " + sender.getName());
127: }
128:
129: public boolean accept(Serializable msg, Member sender) {
130: return true;
131: }
132:
133: }
134:
135: }
|