001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright notice. All rights reserved.
003: */
004: package com.tc.net.protocol;
005:
006: import com.tc.bytes.TCByteBuffer;
007: import com.tc.exception.TCInternalError;
008: import com.tc.logging.TCLogger;
009: import com.tc.logging.TCLogging;
010: import com.tc.util.Assert;
011: import com.tc.util.StringUtil;
012: import com.tc.util.concurrent.SetOnceFlag;
013:
014: /**
015: * Base class for network messages
016: *
017: * @author teck
018: */
019: public class AbstractTCNetworkMessage implements TCNetworkMessage {
020: protected static final TCLogger logger = TCLogging
021: .getLogger(TCNetworkMessage.class);
022:
023: protected AbstractTCNetworkMessage(TCNetworkHeader header) {
024: this (header, null, null, false);
025: }
026:
027: protected AbstractTCNetworkMessage(TCNetworkHeader header,
028: TCNetworkMessage msgPayload) {
029: this (header, msgPayload, null, true);
030: }
031:
032: protected AbstractTCNetworkMessage(TCNetworkHeader header,
033: TCByteBuffer[] payload) {
034: this (header, null, payload, true);
035: }
036:
037: private AbstractTCNetworkMessage(TCNetworkHeader header,
038: TCNetworkMessage msgPayload, TCByteBuffer[] payload,
039: boolean seal) {
040: Assert.eval(header != null);
041:
042: this .header = header;
043: this .messagePayload = msgPayload;
044: this .payloadData = (payload == null) ? EMPTY_BUFFER_ARRAY
045: : payload;
046:
047: if (msgPayload != null) {
048: payloadData = msgPayload.getEntireMessageData();
049: }
050:
051: if (seal) {
052: seal();
053: }
054: }
055:
056: public final int getDataLength() {
057: checkSealed();
058: return dataLength;
059: }
060:
061: public final int getHeaderLength() {
062: checkSealed();
063: return headerLength;
064: }
065:
066: public final int getTotalLength() {
067: checkSealed();
068: return totalLength;
069: }
070:
071: public final TCNetworkHeader getHeader() {
072: checkNotRecycled();
073: return header;
074: }
075:
076: public final TCNetworkMessage getMessagePayload() {
077: checkNotRecycled();
078: return messagePayload;
079: }
080:
081: public final TCByteBuffer[] getPayload() {
082: checkNotRecycled();
083: return payloadData;
084: }
085:
086: protected final void setPayload(TCByteBuffer[] newPayload) {
087: checkNotSealed();
088:
089: entireMessageData = null;
090:
091: if (newPayload == null) {
092: payloadData = EMPTY_BUFFER_ARRAY;
093: } else {
094: payloadData = newPayload;
095: }
096: }
097:
098: protected final void setMessagePayload(TCNetworkMessage subMessage) {
099: checkNotSealed();
100:
101: entireMessageData = null;
102:
103: if (subMessage == null) {
104: payloadData = EMPTY_BUFFER_ARRAY;
105: this .messagePayload = null;
106: } else {
107: if (!subMessage.isSealed()) {
108: throw new IllegalStateException(
109: "Message paylaod is not yet sealed");
110: }
111: this .messagePayload = subMessage;
112: this .payloadData = subMessage.getEntireMessageData();
113: }
114: }
115:
116: public final TCByteBuffer[] getEntireMessageData() {
117: checkSealed();
118:
119: // this array should have already been set in seal()
120: Assert.eval(entireMessageData != null);
121:
122: return entireMessageData;
123: }
124:
125: public final String toString() {
126: try {
127: return toString0();
128: } catch (Exception e) {
129: logger.warn("Exception in toString()", e);
130: return "EXCEPTION in toString(): " + e.getMessage();
131: }
132: }
133:
134: protected final String toString0() {
135: StringBuffer buf = new StringBuffer();
136: buf.append("Message Class: ").append(getClass().getName())
137: .append("\n");
138: buf.append("Sealed: ").append(sealed.isSet()).append(", ");
139: buf.append("Header Length: ").append(getHeaderLength()).append(
140: ", ");
141: buf.append("Data Length: ").append(getDataLength())
142: .append(", ");
143: buf.append("Total Length: ").append(getTotalLength()).append(
144: "\n");
145:
146: String extraMsgInfo = describeMessage();
147: if (extraMsgInfo != null) {
148: buf.append(extraMsgInfo);
149: }
150:
151: if (header != null) {
152: buf.append("Header (").append(header.getClass().getName())
153: .append(")\n");
154: buf.append(StringUtil.indentLines(header.toString()));
155: if (buf.charAt(buf.length() - 1) != '\n') {
156: buf.append('\n');
157: }
158: }
159:
160: buf.append("Payload:\n");
161: if (messagePayload != null) {
162: buf.append(
163: StringUtil.indentLines(messagePayload.toString()))
164: .append("\n");
165: } else {
166: if (payloadData != null) {
167: buf.append(StringUtil.indentLines(describePayload()));
168: } else {
169: buf.append(StringUtil
170: .indentLines("*** No payoad data ***\n"));
171: }
172: }
173:
174: return buf.toString();
175: }
176:
177: // override this method to add more information about your message
178: protected String describeMessage() {
179: return null;
180: }
181:
182: // override this method to add more description to your payload data
183: protected String describePayload() {
184: StringBuffer buf = new StringBuffer();
185:
186: if ((payloadData != null) && (payloadData.length != 0)) {
187: for (int i = 0; i < payloadData.length; i++) {
188: if (payloadData[i] != null) {
189: //
190: }
191:
192: buf.append("Buffer ").append(i).append(": ");
193:
194: if (payloadData[i] != null) {
195: buf.append(payloadData[i].toString());
196: } else {
197: buf.append("null");
198: }
199:
200: buf.append("\n");
201: }
202: } else {
203: buf.append("No payload buffers present");
204: }
205:
206: return buf.toString();
207: }
208:
209: protected String dump() {
210: StringBuffer toRet = new StringBuffer(toString());
211: toRet.append("\n\n");
212: if (entireMessageData != null) {
213: for (int i = 0; i < entireMessageData.length; i++) {
214: toRet.append('[').append(i).append(']').append('=')
215: .append(entireMessageData[i].toString());
216: toRet.append(" = { ");
217: byte ba[] = entireMessageData[i].array();
218: for (int j = 0; j < ba.length; j++) {
219: toRet.append(Byte.toString(ba[j])).append(' ');
220: }
221: toRet.append(" } \n\n");
222: }
223: }
224: return toRet.toString();
225: }
226:
227: public final boolean isSealed() {
228: return sealed.isSet();
229: }
230:
231: public final void seal() {
232: if (sealed.attemptSet()) {
233: final int size = 1 + ((payloadData == null) ? 0
234: : payloadData.length);
235: entireMessageData = new TCByteBuffer[size];
236: entireMessageData[0] = header.getDataBuffer();
237: System.arraycopy(payloadData, 0, entireMessageData, 1,
238: payloadData.length);
239:
240: long dataLen = 0;
241: for (int i = 1; i < entireMessageData.length; i++) {
242: dataLen += entireMessageData[i].limit();
243: }
244:
245: if (dataLen > Integer.MAX_VALUE) {
246: throw new TCInternalError("Message too big");
247: }
248:
249: this .dataLength = (int) dataLen;
250: this .headerLength = header.getHeaderByteLength();
251: this .totalLength = this .headerLength + this .dataLength;
252: } else {
253: throw new IllegalStateException("Message is sealed");
254: }
255: }
256:
257: public final void wasSent() {
258: fireSentCallback();
259: doRecycleOnWrite();
260: }
261:
262: // Can be overloaded by sub classes to decide when to recycle differently.
263: public void doRecycleOnWrite() {
264: recycle();
265: }
266:
267: public void recycle() {
268: if (entireMessageData != null) {
269: int i = 0;
270: if (entireMessageData.length > 1
271: && entireMessageData[0].array() == entireMessageData[1]
272: .array()) {
273: // This is done as TCMessageParser creates a dupilcate of the first buffer for the header.
274: // @see TCMessageParser.parseMessage()
275: // Can be done more elegantly, but it is done like this keeping performance in mind.
276: i++;
277: }
278: for (; i < entireMessageData.length; i++) {
279: entireMessageData[i].recycle();
280: }
281: entireMessageData = null;
282: } else {
283: logger
284: .warn("Entire Message is null ! Probably recycle was called twice !");
285: Thread.dumpStack();
286: }
287: }
288:
289: protected boolean isRecycled() {
290: return isSealed() && entireMessageData == null;
291: }
292:
293: private void fireSentCallback() {
294: if (sentCallback != null) {
295: if (sentCallbackFired.attemptSet()) {
296: try {
297: sentCallback.run();
298: } catch (Exception e) {
299: logger
300: .error(
301: "Caught exception running sent callback",
302: e);
303: }
304: }
305: }
306: }
307:
308: public final void setSentCallback(Runnable callback) {
309: this .sentCallback = callback;
310: }
311:
312: public final Runnable getSentCallback() {
313: return this .sentCallback;
314: }
315:
316: private void checkNotRecycled() {
317: if (isRecycled()) {
318: throw new IllegalStateException(
319: "Message is already Recycled");
320: }
321: }
322:
323: private void checkSealed() {
324: if (!isSealed()) {
325: throw new IllegalStateException("Message is not sealed");
326: }
327: }
328:
329: private void checkNotSealed() {
330: if (sealed.isSet()) {
331: throw new IllegalStateException("Message is sealed");
332: }
333: }
334:
335: private final SetOnceFlag sealed = new SetOnceFlag();
336: private final SetOnceFlag sentCallbackFired = new SetOnceFlag();
337: private static final TCByteBuffer[] EMPTY_BUFFER_ARRAY = {};
338: private final TCNetworkHeader header;
339: private TCByteBuffer[] payloadData;
340: private TCNetworkMessage messagePayload;
341: private TCByteBuffer[] entireMessageData;
342: private int totalLength;
343: private int dataLength;
344: private int headerLength;
345: private Runnable sentCallback = null;
346:
347: }
|