001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: *
019: */
020: package org.apache.mina.transport;
021:
022: import java.net.SocketAddress;
023:
024: import junit.framework.TestCase;
025:
026: import org.apache.mina.common.ConnectFuture;
027: import org.apache.mina.common.IoAcceptor;
028: import org.apache.mina.common.IoBuffer;
029: import org.apache.mina.common.IoHandler;
030: import org.apache.mina.common.IoHandlerAdapter;
031: import org.apache.mina.common.IoSession;
032: import org.apache.mina.common.TransportMetadata;
033: import org.apache.mina.util.AvailablePortFinder;
034:
035: /**
036: * Abstract base class for testing suspending and resuming reads and
037: * writes.
038: *
039: * @author The Apache MINA Project (dev@mina.apache.org)
040: * @version $Rev$, $Date$
041: */
042: public abstract class AbstractTrafficControlTest extends TestCase {
043: protected int port = 0;
044:
045: protected IoAcceptor acceptor;
046:
047: protected TransportMetadata transportType;
048:
049: public AbstractTrafficControlTest(IoAcceptor acceptor) {
050: this .acceptor = acceptor;
051: }
052:
053: @Override
054: protected void setUp() throws Exception {
055: super .setUp();
056:
057: port = AvailablePortFinder.getNextAvailable();
058: acceptor.setHandler(new ServerIoHandler());
059: acceptor.bind(createServerSocketAddress(port));
060: }
061:
062: @Override
063: protected void tearDown() throws Exception {
064: super .tearDown();
065:
066: acceptor.dispose();
067: }
068:
069: protected abstract ConnectFuture connect(int port, IoHandler handler)
070: throws Exception;
071:
072: protected abstract SocketAddress createServerSocketAddress(int port);
073:
074: public void testSuspendResumeReadWrite() throws Exception {
075: ConnectFuture future = connect(port, new ClientIoHandler());
076: future.awaitUninterruptibly();
077: IoSession session = future.getSession();
078:
079: // We wait for the sessionCreated() event is fired because we
080: // cannot guarantee that it is invoked already.
081: while (session.getAttribute("lock") == null) {
082: Thread.yield();
083: }
084:
085: Object lock = session.getAttribute("lock");
086: synchronized (lock) {
087:
088: write(session, "1");
089: assertEquals('1', read(session));
090: assertEquals("1", getReceived(session));
091: assertEquals("1", getSent(session));
092:
093: session.suspendRead();
094:
095: Thread.sleep(100);
096:
097: write(session, "2");
098: assertFalse(canRead(session));
099: assertEquals("1", getReceived(session));
100: assertEquals("12", getSent(session));
101:
102: session.suspendWrite();
103:
104: Thread.sleep(100);
105:
106: write(session, "3");
107: assertFalse(canRead(session));
108: assertEquals("1", getReceived(session));
109: assertEquals("12", getSent(session));
110:
111: session.resumeRead();
112:
113: Thread.sleep(100);
114:
115: write(session, "4");
116: assertEquals('2', read(session));
117: assertEquals("12", getReceived(session));
118: assertEquals("12", getSent(session));
119:
120: session.resumeWrite();
121:
122: Thread.sleep(100);
123:
124: assertEquals('3', read(session));
125: assertEquals('4', read(session));
126:
127: write(session, "5");
128: assertEquals('5', read(session));
129: assertEquals("12345", getReceived(session));
130: assertEquals("12345", getSent(session));
131:
132: session.suspendWrite();
133:
134: Thread.sleep(100);
135:
136: write(session, "6");
137: assertFalse(canRead(session));
138: assertEquals("12345", getReceived(session));
139: assertEquals("12345", getSent(session));
140:
141: session.suspendRead();
142: session.resumeWrite();
143:
144: Thread.sleep(100);
145:
146: write(session, "7");
147: assertFalse(canRead(session));
148: assertEquals("12345", getReceived(session));
149: assertEquals("1234567", getSent(session));
150:
151: session.resumeRead();
152:
153: Thread.sleep(100);
154:
155: assertEquals('6', read(session));
156: assertEquals('7', read(session));
157:
158: assertEquals("1234567", getReceived(session));
159: assertEquals("1234567", getSent(session));
160:
161: }
162:
163: session.close().awaitUninterruptibly();
164: }
165:
166: private void write(IoSession session, String s) throws Exception {
167: session.write(IoBuffer.wrap(s.getBytes("ASCII")));
168: }
169:
170: private int read(IoSession session) throws Exception {
171: int pos = ((Integer) session.getAttribute("pos")).intValue();
172: for (int i = 0; i < 10 && pos == getReceived(session).length(); i++) {
173: Object lock = session.getAttribute("lock");
174: lock.wait(200);
175: }
176: session.setAttribute("pos", new Integer(pos + 1));
177: String received = getReceived(session);
178: assertTrue(received.length() > pos);
179: return getReceived(session).charAt(pos);
180: }
181:
182: private boolean canRead(IoSession session) throws Exception {
183: int pos = ((Integer) session.getAttribute("pos")).intValue();
184: Object lock = session.getAttribute("lock");
185: lock.wait(250);
186: String received = getReceived(session);
187: return pos < received.length();
188: }
189:
190: private String getReceived(IoSession session) throws Exception {
191: return session.getAttribute("received").toString();
192: }
193:
194: private String getSent(IoSession session) throws Exception {
195: return session.getAttribute("sent").toString();
196: }
197:
198: public static class ClientIoHandler extends IoHandlerAdapter {
199: @Override
200: public void sessionCreated(IoSession session) throws Exception {
201: super .sessionCreated(session);
202: session.setAttribute("pos", new Integer(0));
203: session.setAttribute("received", new StringBuffer());
204: session.setAttribute("sent", new StringBuffer());
205: session.setAttribute("lock", new Object());
206: }
207:
208: @Override
209: public void messageReceived(IoSession session, Object message)
210: throws Exception {
211: IoBuffer buffer = (IoBuffer) message;
212: byte[] data = new byte[buffer.remaining()];
213: buffer.get(data);
214: Object lock = session.getAttribute("lock");
215: synchronized (lock) {
216: StringBuffer sb = (StringBuffer) session
217: .getAttribute("received");
218: sb.append(new String(data, "ASCII"));
219: lock.notifyAll();
220: }
221: }
222:
223: @Override
224: public void messageSent(IoSession session, Object message)
225: throws Exception {
226: IoBuffer buffer = (IoBuffer) message;
227: buffer.rewind();
228: byte[] data = new byte[buffer.remaining()];
229: buffer.get(data);
230: StringBuffer sb = (StringBuffer) session
231: .getAttribute("sent");
232: sb.append(new String(data, "ASCII"));
233: }
234:
235: }
236:
237: private static class ServerIoHandler extends IoHandlerAdapter {
238: @Override
239: public void messageReceived(IoSession session, Object message)
240: throws Exception {
241: // Just echo the received bytes.
242: IoBuffer rb = (IoBuffer) message;
243: IoBuffer wb = IoBuffer.allocate(rb.remaining());
244: wb.put(rb);
245: wb.flip();
246: session.write(wb);
247: }
248: }
249: }
|