01: /*
02: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright notice. All rights reserved.
03: */
04: package com.tc.net.protocol.transport;
05:
06: import com.tc.bytes.TCByteBuffer;
07: import com.tc.logging.TCLogger;
08: import com.tc.logging.TCLogging;
09: import com.tc.net.core.TCConnection;
10: import com.tc.net.protocol.AbstractTCNetworkHeader;
11: import com.tc.net.protocol.AbstractTCProtocolAdaptor;
12: import com.tc.net.protocol.TCNetworkHeader;
13: import com.tc.net.protocol.TCNetworkMessage;
14: import com.tc.net.protocol.TCProtocolException;
15:
16: /**
17: * Connection adaptor to parse wire protocol messages
18: *
19: * @author teck
20: */
21: public class WireProtocolAdaptorImpl extends AbstractTCProtocolAdaptor
22: implements WireProtocolAdaptor {
23: private static final TCLogger logger = TCLogging
24: .getLogger(WireProtocolAdaptorImpl.class);
25:
26: private final WireProtocolMessageSink sink;
27:
28: protected WireProtocolAdaptorImpl(WireProtocolMessageSink sink) {
29: super (logger);
30: this .sink = sink;
31: }
32:
33: public void addReadData(TCConnection source, TCByteBuffer[] data,
34: int length) throws TCProtocolException {
35: final boolean msgDone = this .processIncomingData(source, data,
36: length);
37:
38: if (msgDone) {
39: try {
40: WireProtocolMessage msg = getWireProtocolMessage();
41: // TODO: validate the src/dest IP and port in header against the connection it came in on
42:
43: if (logger.isDebugEnabled()) {
44: logger.debug("\nRECEIVE\n" + msg.toString());
45: }
46:
47: sink.putMessage(msg);
48: } finally {
49: init();
50: }
51: }
52:
53: return;
54: }
55:
56: protected AbstractTCNetworkHeader getNewProtocolHeader() {
57: return new WireProtocolHeader();
58: }
59:
60: private WireProtocolMessage getWireProtocolMessage() {
61: WireProtocolMessage rv = (WireProtocolMessage) collectMessage();
62: return rv;
63: }
64:
65: protected int computeDataLength(TCNetworkHeader header) {
66: WireProtocolHeader wph = (WireProtocolHeader) header;
67: return wph.getTotalPacketLength() - wph.getHeaderByteLength();
68: }
69:
70: protected TCNetworkMessage createMessage(TCConnection source,
71: TCNetworkHeader hdr, TCByteBuffer[] data)
72: throws TCProtocolException {
73: if (data == null) {
74: throw new TCProtocolException(
75: "Wire protocol messages must have a payload");
76: }
77: WireProtocolHeader wph = (WireProtocolHeader) hdr;
78: final WireProtocolMessage rv;
79:
80: if (wph.isTransportHandshakeMessage()) {
81: rv = new TransportHandshakeMessageImpl(source, wph, data);
82: } else {
83: rv = new WireProtocolMessageImpl(source, wph, data);
84: }
85:
86: return rv;
87: }
88: }
|