001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright notice. All rights reserved.
003: */
004: package com.tc.net.protocol;
005:
006: import com.tc.bytes.TCByteBuffer;
007: import com.tc.bytes.TCByteBufferFactory;
008: import com.tc.logging.TCLogger;
009: import com.tc.net.core.TCConnection;
010: import com.tc.util.Assert;
011:
012: import java.util.LinkedList;
013:
014: /**
015: * Base class for protocol adaptors
016: *
017: * @author teck
018: */
019: public abstract class AbstractTCProtocolAdaptor implements
020: TCProtocolAdaptor {
021: protected static final int MODE_HEADER = 1;
022: protected static final int MODE_DATA = 2;
023:
024: private final TCLogger logger;
025: private final LinkedList collectedMessages = new LinkedList();
026: private int dataBytesNeeded;
027: private AbstractTCNetworkHeader header;
028: private TCByteBuffer[] dataBuffers;
029: private int bufferIndex = -1;
030: private int mode;
031:
032: public AbstractTCProtocolAdaptor(TCLogger logger) {
033: this .logger = logger;
034: init();
035: }
036:
037: public void addReadData(TCConnection source, TCByteBuffer[] data,
038: int length) throws TCProtocolException {
039: processIncomingData(source, data, length);
040: }
041:
042: public final TCByteBuffer[] getReadBuffers() {
043: if (mode == MODE_HEADER) {
044: return new TCByteBuffer[] { header.getDataBuffer() };
045: }
046:
047: Assert.eval(mode == MODE_DATA);
048:
049: if (dataBuffers == null) {
050: dataBuffers = createDataBuffers(dataBytesNeeded);
051: Assert.eval(dataBuffers.length > 0);
052: bufferIndex = 0;
053: }
054:
055: // only return the subset of buffers that can actually receive more bytes
056: final TCByteBuffer[] rv = new TCByteBuffer[dataBuffers.length
057: - bufferIndex];
058: System.arraycopy(dataBuffers, bufferIndex, rv, 0, rv.length);
059:
060: // Make sure we're not passing back a set of arrays with no space left in them
061: boolean spaceAvail = false;
062: for (int i = 0, n = rv.length; i < n; i++) {
063: if (rv[i].hasRemaining()) {
064: spaceAvail = true;
065: break;
066: }
067: }
068:
069: Assert.assertTrue("No space in buffers to read more data",
070: spaceAvail);
071: return rv;
072: }
073:
074: abstract protected AbstractTCNetworkHeader getNewProtocolHeader();
075:
076: // subclasses override this method to return specific message types
077: abstract protected TCNetworkMessage createMessage(
078: TCConnection source, TCNetworkHeader hdr,
079: TCByteBuffer[] data) throws TCProtocolException;
080:
081: abstract protected int computeDataLength(TCNetworkHeader hdr);
082:
083: protected final TCNetworkMessage collectMessage() {
084: synchronized (this .collectedMessages) {
085: return (TCNetworkMessage) collectedMessages.removeFirst();
086: }
087: }
088:
089: protected final void init() {
090: mode = MODE_HEADER;
091: dataBuffers = null;
092: header = getNewProtocolHeader();
093: }
094:
095: protected final boolean processIncomingData(TCConnection source,
096: TCByteBuffer[] data, final int length)
097: throws TCProtocolException {
098: if (mode == MODE_HEADER) {
099: return processHeaderData(source, data);
100: }
101:
102: Assert.eval(mode == MODE_DATA);
103: if (length > dataBytesNeeded) {
104: throw new TCProtocolException(
105: "More data read then expected: (" + length + " > "
106: + dataBytesNeeded + ")");
107: }
108: return processPayloadData(source, data);
109: }
110:
111: protected final TCLogger getLogger() {
112: return logger;
113: }
114:
115: private TCByteBuffer[] createDataBuffers(int length) {
116: Assert.eval(mode == MODE_DATA);
117: return TCByteBufferFactory.getFixedSizedInstancesForLength(
118: false, length);
119: }
120:
121: private boolean processHeaderData(TCConnection source,
122: final TCByteBuffer[] data) throws TCProtocolException {
123: Assert.eval(data.length == 1);
124: Assert.eval(data[0] == this .header.getDataBuffer());
125:
126: if (!this .header.isHeaderLengthAvail()) {
127: return false;
128: }
129:
130: final TCByteBuffer buf = data[0];
131: final int headerLength = this .header.getHeaderByteLength();
132: final int bufferLength = buf.limit();
133:
134: if (headerLength == AbstractTCNetworkHeader.LENGTH_NOT_AVAIL) {
135: return false;
136: }
137:
138: if ((headerLength < header.minLength)
139: || (headerLength > header.maxLength)
140: || (headerLength < bufferLength)) {
141: // header data is screwed
142: throw new TCProtocolException("Invalid Header Length: "
143: + headerLength + ", min: " + header.minLength
144: + ", max: " + header.maxLength + ", bufLen: "
145: + bufferLength);
146: }
147:
148: if (bufferLength != headerLength) {
149: // maybe we should support a way to swap out the header buffer for a larger sized one
150: // instead of always manadating that the backing buffer behind a header have
151: // enough capacity for the largest possible header for the given protocol. Just a thought
152:
153: // protocol header is bigger than min length, adjust buffer limit and continue
154: buf.limit(headerLength);
155: return false;
156: } else {
157: Assert.eval(bufferLength == headerLength);
158:
159: if (buf.position() == headerLength) {
160: this .header.validate();
161:
162: this .mode = MODE_DATA;
163: this .dataBytesNeeded = computeDataLength(this .header);
164: this .dataBuffers = null;
165:
166: if (this .dataBytesNeeded < 0) {
167: throw new TCProtocolException(
168: "Negative data size detected: "
169: + this .dataBytesNeeded);
170: }
171:
172: // allow for message types with zero length data payloads
173: if (0 == this .dataBytesNeeded) {
174: synchronized (this .collectedMessages) {
175: this .collectedMessages.addLast(createMessage(
176: source, this .header, null));
177: }
178: return true;
179: }
180:
181: return false;
182: } else {
183: // protocol header not completely read yet, do nothing
184: return false;
185: }
186: }
187: }
188:
189: private boolean processPayloadData(TCConnection source,
190: final TCByteBuffer[] data) throws TCProtocolException {
191: for (int i = 0; i < data.length; i++) {
192: final TCByteBuffer buffer = data[i];
193:
194: if (!buffer.hasRemaining()) {
195: buffer.flip();
196: dataBytesNeeded -= buffer.limit();
197: bufferIndex++;
198:
199: if (dataBytesNeeded < 0) {
200: throw new TCProtocolException(
201: "More data in buffers than expected");
202: }
203: } else {
204: break;
205: }
206: }
207:
208: if (0 == dataBytesNeeded) {
209: if (bufferIndex != dataBuffers.length) {
210: throw new TCProtocolException(
211: "Not all buffers consumed");
212: }
213:
214: // message is complete!
215: TCNetworkMessage msg = createMessage(source, header,
216: dataBuffers);
217:
218: if (logger.isDebugEnabled()) {
219: logger.debug("Message complete on connection " + source
220: + ": " + msg.toString());
221: }
222:
223: synchronized (collectedMessages) {
224: collectedMessages.addLast(msg);
225: }
226:
227: return true;
228: }
229:
230: Assert.eval(dataBytesNeeded > 0);
231:
232: // data portion not done, try again later
233: return false;
234: }
235:
236: }
|