001: /*
002: * Copyright (c) xsocket.org, 2006-2008. All rights reserved.
003: *
004: * This library is free software; you can redistribute it and/or
005: * modify it under the terms of the GNU Lesser General Public
006: * License as published by the Free Software Foundation; either
007: * version 2.1 of the License, or (at your option) any later version.
008: *
009: * This library is distributed in the hope that it will be useful,
010: * but WITHOUT ANY WARRANTY; without even the implied warranty of
011: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012: * Lesser General Public License for more details.
013: *
014: * You should have received a copy of the GNU Lesser General Public
015: * License along with this library; if not, write to the Free Software
016: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
017: *
018: * Please refer to the LGPL license at: http://www.gnu.org/copyleft/lesser.txt
019: * The latest copy of this software may be found on http://www.xsocket.org/
020: */
021: package org.xsocket.connection;
022:
023: import java.io.IOException;
024: import java.nio.BufferUnderflowException;
025: import java.util.concurrent.atomic.AtomicInteger;
026:
027: import org.junit.Assert;
028: import org.junit.Test;
029: import org.xsocket.Execution;
030: import org.xsocket.MaxReadSizeExceededException;
031: import org.xsocket.QAUtil;
032: import org.xsocket.Resource;
033: import org.xsocket.connection.HandlerProxy;
034: import org.xsocket.connection.IConnectHandler;
035: import org.xsocket.connection.IConnectionScoped;
036: import org.xsocket.connection.IDataHandler;
037: import org.xsocket.connection.IDisconnectHandler;
038: import org.xsocket.connection.IHandler;
039: import org.xsocket.connection.INonBlockingConnection;
040: import org.xsocket.connection.IServer;
041: import org.xsocket.connection.NonBlockingConnection;
042: import org.xsocket.connection.Server;
043: import org.xsocket.connection.ConnectionUtils;
044:
045: public class HandlerProxyTest {
046:
047: @Test
048: public void testNullHandler() throws Exception {
049: IServer server = new Server(0, new EchoHandler());
050: ConnectionUtils.start(server);
051: INonBlockingConnection con = new NonBlockingConnection(
052: "localhost", server.getLocalPort());
053:
054: IHandler proxy = HandlerProxy.newPrototype(null, null)
055: .newProxy(con);
056:
057: ((IConnectHandler) proxy).onConnect(con);
058: ((IDataHandler) proxy).onData(con);
059: ((IIdleTimeoutHandler) proxy).onIdleTimeout(con);
060:
061: Assert.assertFalse(con.isOpen());
062: server.close();
063: }
064:
065: @Test
066: public void testInjectServer() throws Exception {
067: IServer server = new Server(0, new EchoHandler());
068: ConnectionUtils.start(server);
069: INonBlockingConnection con = new NonBlockingConnection(
070: "localhost", server.getLocalPort());
071:
072: MultiThreadedDisconnectHandler hdl = new MultiThreadedDisconnectHandler();
073: HandlerProxy prototype = HandlerProxy.newPrototype(hdl, server);
074: prototype.onInit();
075:
076: Assert.assertEquals(server, hdl.server);
077:
078: server.close();
079: con.close();
080: }
081:
082: @Test
083: public void testNonConnectionScopedInstance() throws Exception {
084: IServer server = new Server(0, new EchoHandler());
085: ConnectionUtils.start(server);
086: INonBlockingConnection con = new NonBlockingConnection(
087: "localhost", server.getLocalPort());
088:
089: MultiThreadedDisconnectHandler hdl = new MultiThreadedDisconnectHandler();
090: HandlerProxy prototype = HandlerProxy.newPrototype(hdl, null);
091:
092: IHandler proxy1 = prototype.newProxy(con);
093: ((IDisconnectHandler) proxy1).onDisconnect(con);
094:
095: QAUtil.sleep(200);
096: Assert.assertEquals(1, hdl.countOnDisconnectCalled.intValue());
097:
098: IHandler proxy2 = prototype.newProxy(con);
099: ((IDisconnectHandler) proxy2).onDisconnect(con);
100:
101: QAUtil.sleep(200);
102: Assert.assertEquals(2, hdl.countOnDisconnectCalled.intValue());
103:
104: con.close();
105: server.close();
106: }
107:
108: @Test
109: public void testConnectionScopedInstance() throws Exception {
110: IServer server = new Server(0, new EchoHandler());
111: ConnectionUtils.start(server);
112: INonBlockingConnection con = new NonBlockingConnection(
113: "localhost", server.getLocalPort());
114:
115: ConnectionScopedMultiThreadedDisconnectHandler hdl = new ConnectionScopedMultiThreadedDisconnectHandler();
116: HandlerProxy prototype = HandlerProxy.newPrototype(hdl, null);
117:
118: IHandler proxy1 = prototype.newProxy(con);
119: ((IDisconnectHandler) proxy1).onDisconnect(con);
120:
121: QAUtil.sleep(200);
122: Assert.assertEquals(0, hdl.countOnDisconnectCalled.intValue());
123:
124: IHandler proxy2 = prototype.newProxy(con);
125: ((IDisconnectHandler) proxy2).onDisconnect(con);
126:
127: QAUtil.sleep(200);
128: Assert.assertEquals(0, hdl.countOnDisconnectCalled.intValue());
129:
130: con.close();
131: server.close();
132: }
133:
134: @Test
135: public void testNonThreaded() throws Exception {
136: NonThreadedDisconnectHandler hdl = new NonThreadedDisconnectHandler();
137: IHandler proxy = HandlerProxy.newPrototype(hdl, null).newProxy(
138: null);
139:
140: ((IConnectHandler) proxy).onConnect(null);
141: ((IDataHandler) proxy).onData(null);
142: ((IDisconnectHandler) proxy).onDisconnect(null);
143:
144: Assert.assertEquals(1, hdl.countOnDisconnectCalled.get());
145: Assert.assertTrue(Thread.currentThread().getName().equals(
146: hdl.threadName));
147: }
148:
149: @Test
150: public void testMultiThreaded() throws Exception {
151: IServer server = new Server(0, new EchoHandler());
152: ConnectionUtils.start(server);
153: INonBlockingConnection con = new NonBlockingConnection(
154: "localhost", server.getLocalPort());
155:
156: MultiThreadedDisconnectHandler hdl = new MultiThreadedDisconnectHandler();
157: IHandler proxy = HandlerProxy.newPrototype(hdl, null).newProxy(
158: con);
159:
160: ((IConnectHandler) proxy).onConnect(con);
161: ((IDataHandler) proxy).onData(con);
162: ((IDisconnectHandler) proxy).onDisconnect(con);
163:
164: QAUtil.sleep(200);
165:
166: Assert.assertEquals(1, hdl.countOnDisconnectCalled.get());
167: Assert.assertFalse(Thread.currentThread().getName().equals(
168: hdl.threadName));
169:
170: server.close();
171: con.close();
172: }
173:
174: @Test
175: public void testConcurrent() throws Exception {
176: //QAUtil.setLogLevel(HandlerProxy.class.getName(), Level.FINE);
177:
178: Handler hdl = new Handler();
179: IServer server = new Server(0, hdl);
180: ConnectionUtils.start(server);
181:
182: INonBlockingConnection con = new NonBlockingConnection(
183: "localhost", server.getLocalPort());
184: for (int i = 0; i < 100; i++) {
185: con.write("test");
186: QAUtil.sleep(i);
187: }
188:
189: Assert.assertEquals(1, hdl.maxConnecurrent);
190:
191: con.close();
192: server.close();
193: con.close();
194: }
195:
196: @Execution(Execution.NONTHREADED)
197: private static final class NonThreadedDisconnectHandler implements
198: IDisconnectHandler {
199:
200: @Resource
201: private IServer server = null;
202:
203: private AtomicInteger countOnDisconnectCalled = new AtomicInteger();
204: private String threadName = null;
205:
206: public boolean onDisconnect(INonBlockingConnection connection)
207: throws IOException {
208: threadName = Thread.currentThread().getName();
209: countOnDisconnectCalled.incrementAndGet();
210: return true;
211: }
212: }
213:
214: @Execution(Execution.MULTITHREADED)
215: private static final class MultiThreadedDisconnectHandler implements
216: IDisconnectHandler {
217:
218: @Resource
219: private IServer server = null;
220:
221: private AtomicInteger countOnDisconnectCalled = new AtomicInteger();
222: private String threadName = null;
223:
224: public boolean onDisconnect(INonBlockingConnection connection)
225: throws IOException {
226: threadName = Thread.currentThread().getName();
227: countOnDisconnectCalled.incrementAndGet();
228: return true;
229: }
230: }
231:
232: private static final class ConnectionScopedMultiThreadedDisconnectHandler
233: implements IDisconnectHandler, IConnectionScoped {
234:
235: @Resource
236: private IServer server = null;
237:
238: private AtomicInteger countOnDisconnectCalled = new AtomicInteger();
239: private String threadName = null;
240:
241: public boolean onDisconnect(INonBlockingConnection connection)
242: throws IOException {
243: threadName = Thread.currentThread().getName();
244: countOnDisconnectCalled.incrementAndGet();
245: return true;
246: }
247:
248: @Override
249: public Object clone() throws CloneNotSupportedException {
250: ConnectionScopedMultiThreadedDisconnectHandler copy = (ConnectionScopedMultiThreadedDisconnectHandler) super
251: .clone();
252: copy.countOnDisconnectCalled = new AtomicInteger();
253: copy.threadName = null;
254: return copy;
255: }
256: }
257:
258: private static final class Handler implements IDataHandler {
259:
260: private AtomicInteger countConcurrent = new AtomicInteger();
261: private int maxConnecurrent = 0;
262:
263: public boolean onData(INonBlockingConnection connection)
264: throws IOException, BufferUnderflowException,
265: MaxReadSizeExceededException {
266: if (countConcurrent.incrementAndGet() > maxConnecurrent) {
267: maxConnecurrent = countConcurrent.get();
268: }
269:
270: QAUtil.sleep(5);
271:
272: countConcurrent.decrementAndGet();
273: return true;
274: }
275: }
276:
277: }
|