001: /*
002: * Copyright (c) xsocket.org, 2006-2008. All rights reserved.
003: *
004: * This library is free software; you can redistribute it and/or
005: * modify it under the terms of the GNU Lesser General Public
006: * License as published by the Free Software Foundation; either
007: * version 2.1 of the License, or (at your option) any later version.
008: *
009: * This library is distributed in the hope that it will be useful,
010: * but WITHOUT ANY WARRANTY; without even the implied warranty of
011: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012: * Lesser General Public License for more details.
013: *
014: * You should have received a copy of the GNU Lesser General Public
015: * License along with this library; if not, write to the Free Software
016: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
017: *
018: * Please refer to the LGPL license at: http://www.gnu.org/copyleft/lesser.txt
019: * The latest copy of this software may be found on http://www.xsocket.org/
020: */
021: package org.xsocket.connection;
022:
023: import java.io.IOException;
024: import java.io.InputStreamReader;
025: import java.io.LineNumberReader;
026: import java.io.OutputStreamWriter;
027: import java.io.PrintWriter;
028: import java.net.InetAddress;
029: import java.net.ServerSocket;
030: import java.net.Socket;
031: import java.nio.BufferUnderflowException;
032: import java.util.ArrayList;
033: import java.util.List;
034: import java.util.concurrent.ExecutorService;
035: import java.util.concurrent.Executors;
036:
037: import org.junit.Assert;
038: import org.junit.Test;
039: import org.xsocket.DataConverter;
040: import org.xsocket.MaxReadSizeExceededException;
041: import org.xsocket.QAUtil;
042: import org.xsocket.WaitTimeoutException;
043: import org.xsocket.connection.BlockingConnection;
044: import org.xsocket.connection.BlockingConnectionPool;
045: import org.xsocket.connection.IBlockingConnection;
046: import org.xsocket.connection.IDataHandler;
047: import org.xsocket.connection.INonBlockingConnection;
048: import org.xsocket.connection.IServer;
049: import org.xsocket.connection.ConnectionUtils;
050: import org.xsocket.connection.IConnection.FlushMode;
051:
052: /**
053: *
054: * @author grro@xsocket.org
055: */
056: public final class BlockingConnectionPoolTest {
057:
058: private static final String DELIMITER = System
059: .getProperty("line.separator");
060: private static final int LOOPS = 10;
061:
062: private int running = 0;
063:
064: private final List<String> errors = new ArrayList<String>();
065:
066: public static void main(String[] args) throws Exception {
067: IServer server = new org.xsocket.connection.Server(
068: new EchoHandler());
069: ConnectionUtils.start(server);
070:
071: new BlockingConnectionPoolTest().callPooled("localhost", server
072: .getLocalPort(), 40000000);
073: }
074:
075: @Test
076: public void testSimple() throws Exception {
077:
078: BlockingConnectionPool pool = new BlockingConnectionPool();
079: Server server = new Server(50);
080: new Thread(server).start();
081:
082: ConnectionUtils.registerMBean(pool);
083:
084: IBlockingConnection con = null;
085: for (int i = 0; i < 50; i++) {
086: try {
087: // retrieve a connection (if no connection is in pool, a new one will be created)
088: con = pool.getBlockingConnection("localhost", server
089: .getLocalPort());
090: con.write("Hello" + DELIMITER);
091: Assert.assertEquals("OK", con
092: .readStringByDelimiter(DELIMITER));
093:
094: con.close();
095: } catch (IOException e) {
096: if (con != null) {
097: try {
098: // if the connection is invalid -> destroy it (it will not return into the pool)
099: pool.destroy(con);
100: } catch (Exception ignore) {
101: }
102: }
103: }
104: }
105:
106: pool.close();
107: server.close();
108: }
109:
110: @Test
111: public void testSimplePerformanceCompare() throws Exception {
112:
113: //QAUtil.setLogLevel(NonBlockingConnectionPool.class.getName(), Level.FINE);
114:
115: IServer server = new org.xsocket.connection.Server(
116: new BlackHoleHandler());
117: server.setFlushMode(FlushMode.ASYNC);
118: ConnectionUtils.start(server);
119:
120: // warm up
121: callPooled("localhost", server.getLocalPort(), 20);
122: callUnPooled("localhost", server.getLocalPort(), 20);
123:
124: long elapsedPooled = callPooled("localhost", server
125: .getLocalPort(), 1000);
126: long elapsedUnpooled = callUnPooled("localhost", server
127: .getLocalPort(), 1000);
128:
129: System.out.println("\r\npooled "
130: + DataConverter.toFormatedDuration(elapsedPooled) + " "
131: + " unpooled "
132: + DataConverter.toFormatedDuration(elapsedUnpooled));
133:
134: server.close();
135:
136: Assert.assertTrue(elapsedPooled < elapsedUnpooled);
137: }
138:
139: private long callPooled(String hostname, int port, int loops)
140: throws IOException {
141: long elapsed = 0;
142:
143: BlockingConnectionPool pool = new BlockingConnectionPool();
144:
145: for (int i = 0; i < loops; i++) {
146: long start = System.nanoTime();
147: IBlockingConnection con = pool.getBlockingConnection(
148: hostname, port);
149: con.setFlushmode(FlushMode.ASYNC);
150:
151: con.write("Hello" + EchoHandler.DELIMITER);
152: elapsed += System.nanoTime() - start;
153: con.close();
154: }
155:
156: pool.close();
157:
158: return (elapsed / 1000000);
159: }
160:
161: private long callUnPooled(String hostname, int port, int loops)
162: throws IOException {
163: long elapsed = 0;
164:
165: for (int i = 0; i < loops; i++) {
166: long start = System.nanoTime();
167: IBlockingConnection con = new BlockingConnection(hostname,
168: port);
169: con.setFlushmode(FlushMode.ASYNC);
170:
171: con.write("Hello" + EchoHandler.DELIMITER);
172: elapsed += System.nanoTime() - start;
173: con.close();
174: }
175:
176: return (elapsed / 1000000);
177: }
178:
179: @Test
180: public void testUnlimitedPool() throws Exception {
181: // QAUtil.setLogLevel("org.xsocket.stream.NonBlockingConnectionPool", Level.FINE);
182: BlockingConnectionPool pool = new BlockingConnectionPool();
183: pool.setPooledIdleTimeoutMillis(5 * 1000);
184: ConnectionUtils.registerMBean(pool);
185:
186: Server server = new Server(50);
187: new Thread(server).start();
188:
189: // startWorkers("localhost", server.getLocalPort(), pool, 100);
190: startWorkers("localhost", server.getLocalPort(), pool, 10);
191:
192: QAUtil.sleep(300);
193: Assert.assertTrue(pool.getNumPooledActive() > 3);
194:
195: do {
196: QAUtil.sleep(200);
197: } while (running > 0);
198:
199: for (String error : errors) {
200: System.out.println("error: " + error);
201: }
202:
203: Assert.assertTrue(errors.size() == 0);
204: Assert.assertTrue(pool.getNumPooledActive() == 0);
205:
206: pool.close();
207: server.close();
208: }
209:
210: @Test
211: public void testLimitedPool() throws Exception {
212: //QAUtil.setLogLevel("org.xsocket.ResourcePool", Level.FINE);
213:
214: int maxActive = 2;
215:
216: BlockingConnectionPool pool = new BlockingConnectionPool();
217: pool.setMaxActivePooled(maxActive);
218:
219: ConnectionUtils.registerMBean(pool);
220:
221: Server server = new Server(50);
222: new Thread(server).start();
223:
224: startWorkers("localhost", server.getLocalPort(), pool, 5);
225:
226: do {
227: QAUtil.sleep(500);
228: Assert.assertTrue(pool.getNumPooledActive() <= maxActive);
229: } while (running > 0);
230:
231: for (String error : errors) {
232: System.out.println("error: " + error);
233: }
234:
235: Assert.assertTrue(errors.size() == 0);
236:
237: pool.close();
238: server.close();
239: }
240:
241: @Test
242: public void testLimitedPoolWaitTimeout() throws Exception {
243: int maxActive = 2;
244:
245: BlockingConnectionPool pool = new BlockingConnectionPool();
246: pool.setMaxActivePooled(maxActive);
247: pool.setCreationMaxWaitMillis(500);
248:
249: Server server = new Server(10L * 60L * 1000L);
250: new Thread(server).start();
251:
252: startWorkers("localhost", server.getLocalPort(), pool,
253: maxActive); // all connections will be taken
254:
255: QAUtil.sleep(300);
256:
257: try {
258: pool
259: .getBlockingConnection("0.0.0.0", server
260: .getLocalPort());
261: Assert.fail("WaitTimeoutEception should have been thrown");
262: } catch (WaitTimeoutException expected) {
263: }
264:
265: pool.close();
266: server.close();
267: }
268:
269: @Test
270: public void testCloseAndDestroyConnection() throws Exception {
271: //QAUtil.setLogLevel(NonBlockingConnectionPool.class.getName(), Level.FINE);
272:
273: BlockingConnectionPool pool = new BlockingConnectionPool();
274:
275: Server server = new Server(50);
276: new Thread(server).start();
277:
278: IBlockingConnection connection = pool.getBlockingConnection(
279: "localhost", server.getLocalPort());
280: connection.setAutoflush(false);
281:
282: Assert.assertTrue(pool.getNumPooledActive() == 1);
283: Assert.assertTrue(pool.getNumPooledIdle() == 0);
284:
285: connection.close();
286: Assert.assertTrue(pool.getNumPooledActive() == 0);
287: Assert.assertTrue(pool.getNumPooledIdle() == 1);
288:
289: connection = pool.getBlockingConnection("localhost", server
290: .getLocalPort());
291: Assert.assertTrue(pool.getNumPooledActive() == 1);
292: Assert.assertTrue(pool.getNumPooledIdle() == 0);
293:
294: pool.destroy(connection);
295: Assert.assertTrue(pool.getNumPooledActive() == 0);
296: Assert.assertTrue(pool.getNumPooledIdle() == 0);
297:
298: pool.close();
299: server.close();
300: }
301:
302: @Test
303: public void testIdleTimeout() throws Exception {
304: errors.clear();
305: BlockingConnectionPool pool = new BlockingConnectionPool();
306:
307: Server server = new Server(50);
308: new Thread(server).start();
309:
310: IBlockingConnection con = pool.getBlockingConnection(
311: "localhost", server.getLocalPort());
312: con.setIdleTimeoutMillis(1 * 1000);
313:
314: Assert.assertTrue(con.isOpen());
315:
316: QAUtil.sleep(1500);
317: Assert.assertFalse(con.isOpen());
318: Assert.assertTrue(pool.getNumPooledActive() == 0);
319:
320: pool.close();
321: server.close();
322: }
323:
324: @Test
325: public void testConnectionTimeout() throws Exception {
326: errors.clear();
327: final BlockingConnectionPool pool = new BlockingConnectionPool();
328:
329: Server server = new Server(50);
330: new Thread(server).start();
331:
332: IBlockingConnection con = pool.getBlockingConnection(
333: "localhost", server.getLocalPort());
334: con.setConnectionTimeoutMillis(1 * 1000);
335:
336: Assert.assertTrue(con.isOpen());
337:
338: QAUtil.sleep(1500);
339: Assert.assertFalse(con.isOpen());
340: Assert.assertTrue(pool.getNumPooledActive() == 0);
341:
342: pool.close();
343: server.close();
344: }
345:
346: private void startWorkers(final String host, final int port,
347: final BlockingConnectionPool pool, int count) {
348: for (int i = 0; i < count; i++) {
349: Thread t = new Thread() {
350: @Override
351: public void run() {
352: running++;
353:
354: IBlockingConnection con = null;
355:
356: for (int i = 0; i < LOOPS; i++) {
357: try {
358: con = pool
359: .getBlockingConnection(host, port);
360: con.setAutoflush(false);
361:
362: con.write("test1" + DELIMITER);
363: con.write("test2" + DELIMITER);
364: con.flush();
365:
366: Assert.assertEquals("OK", con
367: .readStringByDelimiter(DELIMITER,
368: Integer.MAX_VALUE));
369: Assert.assertEquals("OK", con
370: .readStringByDelimiter(DELIMITER,
371: Integer.MAX_VALUE));
372: } catch (Exception ignore) {
373:
374: } finally {
375: if (con != null) {
376: try {
377: con.close();
378:
379: } catch (Exception ignore) {
380: }
381: }
382: }
383: }
384:
385: running--;
386: }
387: };
388: t.start();
389: }
390: }
391:
392: private static final class BlackHoleHandler implements IDataHandler {
393:
394: public boolean onData(INonBlockingConnection connection)
395: throws IOException, BufferUnderflowException,
396: MaxReadSizeExceededException {
397: connection.readByteBufferByLength(connection.available());
398: return true;
399: }
400: }
401:
402: private static class Server implements Runnable {
403: private ExecutorService executorService = Executors
404: .newCachedThreadPool();
405: private volatile boolean isRunning = true;
406:
407: private ServerSocket sso = null;
408: private long pause = 0;
409:
410: Server(long pause) throws IOException {
411: this .pause = pause;
412: sso = new ServerSocket(0);
413: }
414:
415: public void run() {
416: while (isRunning) {
417: try {
418: Socket s = sso.accept();
419: executorService.execute(new Worker(s, pause));
420: } catch (Exception e) {
421: if (isRunning) {
422: e.printStackTrace();
423: }
424: }
425: }
426: }
427:
428: public InetAddress getLocalAddress() {
429: return sso.getInetAddress();
430: }
431:
432: public int getLocalPort() {
433: return sso.getLocalPort();
434: }
435:
436: public void close() throws IOException {
437: isRunning = false;
438: sso.close();
439: }
440: }
441:
442: private static class Worker implements Runnable {
443: private volatile boolean isRunning = true;
444:
445: private LineNumberReader in = null;
446: private PrintWriter out = null;
447: private Socket s = null;
448: private long pause = 0;
449:
450: Worker(Socket s, long pause) throws IOException {
451: this .s = s;
452: this .pause = pause;
453: in = new LineNumberReader(new InputStreamReader(s
454: .getInputStream()));
455: out = new PrintWriter(new OutputStreamWriter(s
456: .getOutputStream()));
457: }
458:
459: public void run() {
460: while (isRunning) {
461: try {
462: String request = in.readLine();
463: if (request != null) {
464: try {
465: Thread.sleep(pause);
466: } catch (InterruptedException ignore) {
467: }
468:
469: out.write("OK" + DELIMITER);
470: out.flush();
471: //System.out.print(".");
472:
473: //LOG.info("Server sending..");
474:
475: } else {
476: isRunning = false;
477: }
478: } catch (Exception e) {
479: e.printStackTrace();
480: }
481: }
482: try {
483: in.close();
484: out.close();
485: s.close();
486: } catch (Exception e) {
487: e.printStackTrace();
488: }
489: }
490: }
491: }
|