001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.catalina.tribes.test.transport;
018:
019: import java.text.DecimalFormat;
020:
021: import org.apache.catalina.tribes.ChannelMessage;
022: import org.apache.catalina.tribes.Member;
023: import org.apache.catalina.tribes.MessageListener;
024: import org.apache.catalina.tribes.io.ChannelData;
025: import org.apache.catalina.tribes.io.XByteBuffer;
026: import org.apache.catalina.tribes.membership.MemberImpl;
027: import org.apache.catalina.tribes.transport.nio.NioReceiver;
028:
029: public class SocketNioReceive {
030: static int count = 0;
031: static int accept = 0;
032: static long start = 0;
033: static double mb = 0;
034: static int len = 0;
035: static DecimalFormat df = new DecimalFormat("##.00");
036: static double seconds = 0;
037:
038: protected static Object mutex = new Object();
039:
040: public static void main(String[] args) throws Exception {
041: Member mbr = new MemberImpl("localhost", 9999, 0);
042: ChannelData data = new ChannelData();
043: data.setAddress(mbr);
044: byte[] buf = new byte[8192 * 4];
045: data.setMessage(new XByteBuffer(buf, false));
046: buf = XByteBuffer.createDataPackage(data);
047: len = buf.length;
048: NioReceiver receiver = new NioReceiver();
049: receiver.setPort(9999);
050: receiver.setHost("localhost");
051: MyList list = new MyList();
052: receiver.setMessageListener(list);
053: receiver.start();
054: System.out.println("Listening on 9999");
055: while (true) {
056: try {
057: synchronized (mutex) {
058: mutex.wait(5000);
059: if (start != 0) {
060: System.out.println("Throughput "
061: + df.format(mb / seconds)
062: + " MB/seconds, messages " + count
063: + " accepts " + accept + ", total "
064: + mb + " MB.");
065: }
066: }
067: } catch (Throwable x) {
068: x.printStackTrace();
069: }
070: }
071: }
072:
073: public static class MyList implements MessageListener {
074: boolean first = true;
075:
076: public void messageReceived(ChannelMessage msg) {
077: if (first) {
078: first = false;
079: start = System.currentTimeMillis();
080: }
081: mb += ((double) len) / 1024 / 1024;
082: synchronized (this ) {
083: count++;
084: }
085: if (((count) % 10000) == 0) {
086: long time = System.currentTimeMillis();
087: seconds = ((double) (time - start)) / 1000;
088: System.out.println("Throughput "
089: + df.format(mb / seconds)
090: + " MB/seconds, messages " + count + ", total "
091: + mb + " MB.");
092: }
093: }
094:
095: public boolean accept(ChannelMessage msg) {
096: synchronized (this ) {
097: accept++;
098: }
099: return true;
100: }
101:
102: }
103: }
|