001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: */
016:
017: package org.apache.catalina.tribes.group.interceptors;
018:
019: import org.apache.catalina.tribes.ChannelException;
020: import org.apache.catalina.tribes.ChannelMessage;
021: import org.apache.catalina.tribes.Member;
022: import org.apache.catalina.tribes.group.ChannelInterceptorBase;
023: import org.apache.catalina.tribes.group.InterceptorPayload;
024: import org.apache.catalina.tribes.io.ChannelData;
025: import org.apache.catalina.tribes.io.XByteBuffer;
026: import java.text.DecimalFormat;
027: import org.apache.catalina.tribes.membership.MemberImpl;
028: import java.util.concurrent.atomic.AtomicInteger;
029: import java.util.concurrent.atomic.AtomicLong;
030:
031: /**
032: *
033: *
034: * @author Filip Hanik
035: * @version 1.0
036: */
037: public class ThroughputInterceptor extends ChannelInterceptorBase {
038: protected static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory
039: .getLog(ThroughputInterceptor.class);
040:
041: double mbTx = 0;
042: double mbAppTx = 0;
043: double mbRx = 0;
044: double timeTx = 0;
045: double lastCnt = 0;
046: AtomicLong msgTxCnt = new AtomicLong(1);
047: AtomicLong msgRxCnt = new AtomicLong(0);
048: AtomicLong msgTxErr = new AtomicLong(0);
049: int interval = 10000;
050: AtomicInteger access = new AtomicInteger(0);
051: long txStart = 0;
052: long rxStart = 0;
053: DecimalFormat df = new DecimalFormat("#0.00");
054:
055: public void sendMessage(Member[] destination, ChannelMessage msg,
056: InterceptorPayload payload) throws ChannelException {
057: if (access.addAndGet(1) == 1)
058: txStart = System.currentTimeMillis();
059: long bytes = XByteBuffer
060: .getDataPackageLength(((ChannelData) msg)
061: .getDataPackageLength());
062: try {
063: super .sendMessage(destination, msg, payload);
064: } catch (ChannelException x) {
065: msgTxErr.addAndGet(1);
066: access.addAndGet(-1);
067: throw x;
068: }
069: mbTx += ((double) (bytes * destination.length))
070: / (1024d * 1024d);
071: mbAppTx += ((double) (bytes)) / (1024d * 1024d);
072: if (access.addAndGet(-1) == 0) {
073: long stop = System.currentTimeMillis();
074: timeTx += ((double) (stop - txStart)) / 1000d;
075: if ((msgTxCnt.get() / interval) >= lastCnt) {
076: lastCnt++;
077: report(timeTx);
078: }
079: }
080: msgTxCnt.addAndGet(1);
081: }
082:
083: public void messageReceived(ChannelMessage msg) {
084: if (rxStart == 0)
085: rxStart = System.currentTimeMillis();
086: long bytes = XByteBuffer
087: .getDataPackageLength(((ChannelData) msg)
088: .getDataPackageLength());
089: mbRx += ((double) bytes) / (1024d * 1024d);
090: msgRxCnt.addAndGet(1);
091: if (msgRxCnt.get() % interval == 0)
092: report(timeTx);
093: super .messageReceived(msg);
094:
095: }
096:
097: public void report(double timeTx) {
098: StringBuffer buf = new StringBuffer(
099: "ThroughputInterceptor Report[\n\tTx Msg:");
100: buf.append(msgTxCnt).append(" messages\n\tSent:");
101: buf.append(df.format(mbTx));
102: buf.append(" MB (total)\n\tSent:");
103: buf.append(df.format(mbAppTx));
104: buf.append(" MB (application)\n\tTime:");
105: buf.append(df.format(timeTx));
106: buf.append(" seconds\n\tTx Speed:");
107: buf.append(df.format(mbTx / timeTx));
108: buf.append(" MB/sec (total)\n\tTxSpeed:");
109: buf.append(df.format(mbAppTx / timeTx));
110: buf.append(" MB/sec (application)\n\tError Msg:");
111: buf.append(msgTxErr).append("\n\tRx Msg:");
112: buf.append(msgRxCnt);
113: buf.append(" messages\n\tRx Speed:");
114: buf
115: .append(df
116: .format(mbRx
117: / ((double) ((System
118: .currentTimeMillis() - rxStart) / 1000))));
119: buf.append(" MB/sec (since 1st msg)\n\tReceived:");
120: buf.append(df.format(mbRx)).append(" MB]\n");
121: if (log.isInfoEnabled())
122: log.info(buf);
123: }
124:
125: public void setInterval(int interval) {
126: this .interval = interval;
127: }
128:
129: public int getInterval() {
130: return interval;
131: }
132:
133: }
|