001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.catalina.tribes.test.transport;
018:
019: import java.net.ServerSocket;
020: import java.net.Socket;
021: import java.io.InputStream;
022: import java.text.DecimalFormat;
023: import java.math.BigDecimal;
024:
025: public class SocketValidateReceive {
026: static long start = 0;
027: static double mb = 0;
028: static byte[] buf = new byte[8192 * 4];
029: static boolean first = true;
030: static int count = 0;
031: static DecimalFormat df = new DecimalFormat("##.00");
032: static BigDecimal total = new BigDecimal(0);
033: static BigDecimal bytes = new BigDecimal(32871);
034:
035: public static void main(String[] args) throws Exception {
036: int size = 43800;
037: if (args.length > 0)
038: try {
039: size = Integer.parseInt(args[0]);
040: } catch (Exception x) {
041: }
042:
043: ServerSocket srvSocket = new ServerSocket(9999);
044: System.out.println("Listening on 9999");
045: Socket socket = srvSocket.accept();
046: socket.setReceiveBufferSize(size);
047: InputStream in = socket.getInputStream();
048: MyDataReader reader = new MyDataReader(50000);
049: Thread t = new Thread() {
050: public void run() {
051: while (true) {
052: try {
053: Thread.sleep(1000);
054: printStats(start, mb, count, df, total);
055: } catch (Exception x) {
056: }
057: }
058: }
059: };
060: t.setDaemon(true);
061: t.start();
062:
063: while (true) {
064: if (first) {
065: first = false;
066: start = System.currentTimeMillis();
067: }
068: int len = in.read(buf);
069: if (len == -1) {
070: printStats(start, mb, count, df, total);
071: System.exit(1);
072: }
073: count += reader.append(buf, 0, len);
074:
075: if (bytes.intValue() != len)
076: bytes = new BigDecimal((double) len);
077: total = total.add(bytes);
078: mb += ((double) len) / 1024 / 1024;
079: if (((count) % 10000) == 0) {
080: printStats(start, mb, count, df, total);
081: }
082: }
083:
084: }
085:
086: private static void printStats(long start, double mb, int count,
087: DecimalFormat df, BigDecimal total) {
088: long time = System.currentTimeMillis();
089: double seconds = ((double) (time - start)) / 1000;
090: System.out.println("Throughput " + df.format(mb / seconds)
091: + " MB/seconds messages " + count + ", total " + mb
092: + " MB, total " + total + " bytes.");
093: }
094:
095: public static class MyDataReader {
096: byte[] data = new byte[43800];
097: int length = 10;
098: int cur = 0;
099: byte seq = 0;
100:
101: public MyDataReader(int len) {
102: length = len;
103: }
104:
105: public int append(byte[] b, int off, int len) throws Exception {
106: int packages = 0;
107: for (int i = off; i < len; i++) {
108: if (cur == length) {
109: cur = 0;
110: seq++;
111: packages++;
112: }
113: if (b[i] != seq)
114: throw new Exception("mismatch on seq:" + seq
115: + " and byte nr:" + cur + " count:" + count
116: + " packages:" + packages);
117: cur++;
118: }
119: return packages;
120: }
121: }
122: }
|