001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright notice. All rights reserved.
003: */
004: package com.tc.net.protocol;
005:
006: import org.apache.commons.io.CopyUtils;
007:
008: import com.tc.bytes.TCByteBuffer;
009: import com.tc.io.TCByteBufferInputStream;
010: import com.tc.io.TCByteBufferOutputStream;
011: import com.tc.net.core.TCConnection;
012: import com.tc.net.core.Verifier;
013: import com.tc.net.core.event.TCConnectionErrorEvent;
014: import com.tc.net.core.event.TCConnectionEvent;
015: import com.tc.net.core.event.TCConnectionEventListener;
016: import com.tc.util.Assert;
017:
018: import java.io.IOException;
019: import java.util.HashMap;
020: import java.util.Map;
021:
022: /**
023: * Silly little sink that just echoes any messages it receives back to the client
024: *
025: * @author teck
026: */
027: public class EchoSink implements GenericNetworkMessageSink,
028: TCConnectionEventListener {
029:
030: private final Map states = new HashMap();
031:
032: public interface ErrorListener {
033: void error(Throwable t);
034: }
035:
036: private final ErrorListener listener;
037: private final boolean verify;
038:
039: private static final ErrorListener defaultListener = new ErrorListener() {
040: public void error(Throwable t) {
041: t.printStackTrace();
042: }
043: };
044:
045: public EchoSink() {
046: this (false);
047: }
048:
049: public EchoSink(boolean verify) {
050: this (verify, defaultListener);
051: }
052:
053: public EchoSink(boolean verify, ErrorListener listener) {
054: this .verify = verify;
055: this .listener = listener;
056: }
057:
058: public void putMessage(GenericNetworkMessage msg) {
059: try {
060: putMessage0(msg);
061: } catch (Throwable t) {
062: listener.error(t);
063: }
064: }
065:
066: public void putMessage0(GenericNetworkMessage msg)
067: throws IOException {
068: final TCConnection source = msg.getSource();
069:
070: if (verify) {
071: verifyIncomingMessage(source, msg);
072: }
073:
074: // copy the message and send it right back to the client
075: TCByteBuffer[] recvData = msg.getPayload();
076: TCByteBufferOutputStream out = new TCByteBufferOutputStream();
077: TCByteBufferInputStream in = new TCByteBufferInputStream(
078: recvData);
079:
080: final int bytesCopied = CopyUtils.copy(in, out);
081: Assert.assertEquals(bytesCopied, msg.getDataLength());
082:
083: GenericNetworkMessage send = new GenericNetworkMessage(source,
084: out.toArray());
085: Assert.assertEquals(msg.getDataLength(), send.getDataLength());
086: send.setSequence(msg.getSequence());
087: send.setClientNum(msg.getClientNum());
088:
089: if (verify) {
090: compareData(msg.getEntireMessageData(), send
091: .getEntireMessageData());
092: }
093:
094: source.putMessage(send);
095: }
096:
097: static void compareData(TCByteBuffer[] in, TCByteBuffer[] out) {
098: TCByteBufferInputStream ins = new TCByteBufferInputStream(in);
099: TCByteBufferInputStream outs = new TCByteBufferInputStream(out);
100:
101: final int numBytes = ins.available();
102: if (numBytes != outs.available()) {
103: throw new RuntimeException("different data lengths: "
104: + numBytes + " vs " + outs.available());
105: }
106:
107: for (int i = 0; i < numBytes; i++) {
108: final int inByte = ins.read();
109: final int outByte = outs.read();
110:
111: if ((inByte == -1) || (outByte == -1)) {
112: throw new RuntimeException("premature EOF in stream");
113: }
114:
115: if (inByte != outByte) {
116: throw new RuntimeException("different byte " + inByte
117: + " != " + outByte);
118: }
119: }
120: }
121:
122: private void verifyIncomingMessage(TCConnection source,
123: GenericNetworkMessage msg) {
124: final Verifier verifier;
125: synchronized (states) {
126: if (!states.containsKey(source)) {
127: states.put(source, new Verifier(msg.getClientNum()));
128: }
129: verifier = (Verifier) states.get(source);
130: source.addListener(this );
131: }
132:
133: verifier.putMessage(msg);
134: }
135:
136: public void connectEvent(TCConnectionEvent event) {
137: //
138: }
139:
140: public void closeEvent(TCConnectionEvent event) {
141: synchronized (states) {
142: states.remove(event.getSource());
143: }
144: }
145:
146: public void errorEvent(TCConnectionErrorEvent errorEvent) {
147: //
148: }
149:
150: public void endOfFileEvent(TCConnectionEvent event) {
151: //
152: }
153: }
|