001: package org.xsocket.connection;
002:
003: import java.io.File;
004: import java.io.FileOutputStream;
005: import java.io.IOException;
006: import java.io.RandomAccessFile;
007: import java.nio.BufferUnderflowException;
008: import java.nio.ByteBuffer;
009: import java.nio.channels.FileChannel;
010: import java.util.ArrayList;
011: import java.util.List;
012: import java.util.Timer;
013: import java.util.TimerTask;
014:
015: import org.apache.mina.common.IdleStatus;
016: import org.apache.mina.common.IoHandlerAdapter;
017: import org.apache.mina.common.IoSession;
018: import org.apache.mina.common.TransportType;
019: import org.apache.mina.transport.socket.nio.SocketSessionConfig;
020: import org.xsocket.DataConverter;
021: import org.xsocket.Execution;
022: import org.xsocket.connection.IConnection.FlushMode;
023:
024: public final class EchoHandler extends IoHandlerAdapter implements
025: IDataHandler, IConnectionTimeoutHandler, IIdleTimeoutHandler {
026:
027: private static final int PRINT_PERIOD = 5000;
028:
029: private int count = 0;
030: private long time = System.currentTimeMillis();
031:
032: private Timer timer = new Timer(true);
033:
034: private int delay = 0;
035: private boolean syncFlush = true;
036: private FileChannel fc = null;
037:
038: public EchoHandler(int delay, boolean syncFlush, boolean writeTofile)
039: throws IOException {
040: this .delay = delay;
041: this .syncFlush = syncFlush;
042:
043: if (writeTofile) {
044: File tempFile = File.createTempFile("test", "test");
045: tempFile.deleteOnExit();
046: fc = new RandomAccessFile(File.createTempFile("test",
047: "test"), "rw").getChannel();
048: System.out.println("write received data into "
049: + tempFile.getAbsolutePath());
050: }
051:
052: TimerTask task = new TimerTask() {
053: @Override
054: public void run() {
055: try {
056: String rate = printRate();
057: if (fc != null) {
058: System.out.println(rate + "(file size="
059: + fc.position() + ")");
060: } else {
061: System.out.println(rate);
062: }
063: } catch (Exception ignore) {
064: }
065: }
066: };
067: timer.schedule(task, PRINT_PERIOD, PRINT_PERIOD);
068:
069: System.out
070: .println("Echo handler initialized. Printing each rate (req/sec) each "
071: + DataConverter
072: .toFormatedDuration(PRINT_PERIOD));
073: }
074:
075: private void reset() {
076: time = System.currentTimeMillis();
077: count = 0;
078: }
079:
080: public String printRate() {
081: long current = System.currentTimeMillis();
082:
083: long rate = 0;
084:
085: if (count == 0) {
086: rate = 0;
087: } else {
088: rate = (count * 1000) / (current - time);
089: }
090:
091: reset();
092:
093: return Long.toString(rate);
094: }
095:
096: /////////////////////////////////////
097: // MINA callbacks
098: public void sessionCreated(IoSession session) {
099: if (session.getTransportType() == TransportType.SOCKET) {
100: ((SocketSessionConfig) session.getConfig())
101: .setReceiveBufferSize(2048);
102: }
103:
104: session.setIdleTime(IdleStatus.BOTH_IDLE, 60);
105: }
106:
107: public void sessionIdle(IoSession session, IdleStatus status) {
108:
109: }
110:
111: public void exceptionCaught(IoSession session, Throwable cause) {
112: cause.printStackTrace();
113: session.close();
114: }
115:
116: public void messageReceived(IoSession session, Object message)
117: throws Exception {
118:
119: org.apache.mina.common.ByteBuffer rb = (org.apache.mina.common.ByteBuffer) message;
120:
121: if (fc != null) {
122: fc.write(rb.buf());
123:
124: }
125:
126: rb.flip();
127:
128: // Write the received data back to remote peer
129: org.apache.mina.common.ByteBuffer wb = org.apache.mina.common.ByteBuffer
130: .allocate(rb.remaining());
131: wb.put(rb);
132: wb.flip();
133: session.write(wb);
134:
135: count++;
136: pause();
137: }
138:
139: /////////////////////////////////////
140: // xSocket callbacks
141:
142: public boolean onConnectionTimeout(INonBlockingConnection connection)
143: throws IOException {
144: return false;
145: }
146:
147: public boolean onIdleTimeout(INonBlockingConnection connection)
148: throws IOException {
149: return false;
150: }
151:
152: //@Execution(Execution.Mode.NONTHREADED)
153: public boolean onData(INonBlockingConnection connection)
154: throws IOException, BufferUnderflowException {
155: ByteBuffer[] data = connection
156: .readByteBufferByLength(connection.available());
157:
158: ByteBuffer[] response = new ByteBuffer[data.length];
159:
160: for (int i = 0; i < data.length; i++) {
161: response[i] = ByteBuffer.allocate(data[i].remaining());
162: response[i].put(data[i]);
163: response[i].flip();
164: }
165:
166: for (ByteBuffer buffer : data) {
167: buffer.flip();
168: }
169:
170: if (fc != null) {
171: fc.write(data);
172:
173: }
174:
175: connection.write(response);
176:
177: count++;
178: pause();
179: return true;
180: }
181:
182: private void pause() {
183: if (delay > 0) {
184: try {
185: Thread.sleep(delay);
186: } catch (InterruptedException ignore) {
187: }
188: }
189: }
190: }
|