001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.net.protocol.tcm;
006:
007: import com.tc.bytes.TCByteBuffer;
008: import com.tc.io.TCByteBufferInput;
009: import com.tc.io.TCByteBufferInputStream;
010: import com.tc.io.TCByteBufferOutput;
011: import com.tc.io.TCSerializable;
012: import com.tc.net.groups.ClientID;
013: import com.tc.net.protocol.AbstractTCNetworkMessage;
014: import com.tc.util.Assert;
015: import com.tc.util.concurrent.SetOnceFlag;
016:
017: import java.io.IOException;
018:
019: /**
020: * @author teck
021: */
022: public abstract class TCMessageImpl extends AbstractTCNetworkMessage
023: implements TCMessage {
024:
025: private final MessageMonitor monitor;
026: private final SetOnceFlag processed = new SetOnceFlag();
027: private final SetOnceFlag isSent = new SetOnceFlag();
028: private final TCMessageType type;
029: private final MessageChannel channel;
030: private int nvCount;
031:
032: private TCByteBufferOutput out;
033: private TCByteBufferInputStream bbis;
034: private int messageVersion;
035:
036: /**
037: * Creates a new TCMessage to write data into (ie. to send to the network)
038: */
039: protected TCMessageImpl(MessageMonitor monitor,
040: TCByteBufferOutput output, MessageChannel channel,
041: TCMessageType type) {
042: super (new TCMessageHeaderImpl(type));
043: this .monitor = monitor;
044: this .type = type;
045: this .channel = channel;
046:
047: // this.bbos = new TCByteBufferOutputStream(4, 4096, false);
048: this .out = output;
049:
050: // write out a zero. When dehydrated, this space will be replaced with the NV count
051: this .out.writeInt(0);
052: }
053:
054: /**
055: * Creates a new TCMessage object backed by the given data array (used when messages are read from the network)
056: *
057: * @param header
058: * @param data
059: */
060: protected TCMessageImpl(MessageMonitor monitor,
061: MessageChannel channel, TCMessageHeader header,
062: TCByteBuffer[] data) {
063: super (header, data);
064: this .monitor = monitor;
065: this .type = TCMessageType.getInstance(header.getMessageType());
066: this .messageVersion = header.getMessageTypeVersion();
067: this .bbis = new TCByteBufferInputStream(data);
068: this .channel = channel;
069: }
070:
071: public TCMessageType getMessageType() {
072: return type;
073: }
074:
075: protected int getMessageVersion() {
076: return this .messageVersion;
077: }
078:
079: protected void setMessageVersion(int version) {
080: this .messageVersion = version;
081: }
082:
083: // use me to read directly from the message data (as opposed to using the name-value mechanism)
084: protected TCByteBufferInput getInputStream() {
085: return this .bbis;
086: }
087:
088: // use me to write directly to the message data (as opposed to using the name-value mechanism)
089: // protected TCByteBufferOutputStream getOutputStream() {
090: // return this.bbos;
091: // }
092:
093: protected void dehydrateValues() {
094: // override me to add NV data to your message
095: }
096:
097: /**
098: * Prepares all instance data into the payload byte buffer array in preparation for sending it.
099: */
100: public void dehydrate() {
101: if (processed.attemptSet()) {
102: try {
103: dehydrateValues();
104:
105: final TCByteBuffer[] nvData = out.toArray();
106:
107: Assert.eval(nvData.length > 0);
108: nvData[0].putInt(0, nvCount);
109: setPayload(nvData);
110:
111: TCMessageHeader hdr = (TCMessageHeader) getHeader();
112: hdr.setMessageType(getMessageType().getType());
113: hdr.setMessageTypeVersion(getMessageVersion());
114:
115: seal();
116: } catch (Throwable t) {
117: t.printStackTrace();
118: throw new RuntimeException(t);
119: } finally {
120: this .out.close();
121: if (!isOutputStreamRecycled())
122: this .out = null;
123: }
124: }
125: }
126:
127: /**
128: * Reads the payload byte buffer data and sets instance data. This should be called after the message is read from the
129: * network before it is released to the client for use. XXX:: This synchronization is there to create proper memory
130: * boundary.
131: */
132: public synchronized void hydrate() throws IOException,
133: UnknownNameException {
134: if (processed.attemptSet()) {
135: try {
136: final int count = bbis.readInt();
137: if (count < 0) {
138: throw new IOException("negative NV count: " + count);
139: }
140:
141: for (int i = 0; i < count; i++) {
142: final byte name = bbis.readByte();
143: if (!hydrateValue(name)) {
144: logger.error(" Hydrate Error - " + toString());
145: throw new UnknownNameException(getClass(), name);
146: }
147: }
148: } finally {
149: this .bbis.close();
150: this .bbis = null;
151: doRecycleOnRead();
152: }
153: monitor.newIncomingMessage(this );
154: }
155: }
156:
157: // Can be overloaded by sub classes to decide when to recycle differently.
158: public void doRecycleOnRead() {
159: recycle();
160: }
161:
162: // if a subclass calls recycleOutputStream, then they need to override this method to return true
163: protected boolean isOutputStreamRecycled() {
164: return false;
165: }
166:
167: protected void recycleOutputStream() {
168: if (out != null) {
169: out.recycle();
170: }
171: }
172:
173: /**
174: * Subclasses *really* must implement this to set appropriate instance variables with the value of the given name.
175: * Return false if the given name is unknown to your message class
176: *
177: * @param name
178: */
179: protected boolean hydrateValue(byte name) throws IOException {
180: if (false) {
181: throw new IOException("silence compiler warning");
182: }
183: return false;
184: }
185:
186: protected boolean getBooleanValue() throws IOException {
187: return bbis.readBoolean();
188: }
189:
190: protected byte getByteValue() throws IOException {
191: return bbis.readByte();
192: }
193:
194: protected char getCharValue() throws IOException {
195: return bbis.readChar();
196: }
197:
198: protected double getDoubleValue() throws IOException {
199: return bbis.readDouble();
200: }
201:
202: protected float getFloatValue() throws IOException {
203: return bbis.readFloat();
204: }
205:
206: protected int getIntValue() throws IOException {
207: return bbis.readInt();
208: }
209:
210: protected long getLongValue() throws IOException {
211: return bbis.readLong();
212: }
213:
214: protected short getShortValue() throws IOException {
215: return bbis.readShort();
216: }
217:
218: protected Object getObject(TCSerializable target)
219: throws IOException {
220: return target.deserializeFrom(bbis);
221: }
222:
223: protected String getStringValue() throws IOException {
224: return bbis.readString();
225: }
226:
227: protected byte[] getBytesArray() throws IOException {
228: int length = bbis.readInt();
229: byte bytes[] = new byte[length];
230: int off = 0;
231: while (length > 0) {
232: int read = bbis.read(bytes, off, length);
233: length -= read;
234: off += read;
235: }
236: return bytes;
237: }
238:
239: protected void putNVPair(byte name, boolean value) {
240: nvCount++;
241: out.write(name);
242: out.writeBoolean(value);
243: }
244:
245: protected void putNVPair(byte name, byte value) {
246: nvCount++;
247: out.write(name);
248: out.writeByte(value);
249: }
250:
251: protected void putNVPair(byte name, char value) {
252: nvCount++;
253: out.write(name);
254: out.writeChar(value);
255: }
256:
257: protected void putNVPair(byte name, double value) {
258: nvCount++;
259: out.write(name);
260: out.writeDouble(value);
261: }
262:
263: protected void putNVPair(byte name, float value) {
264: nvCount++;
265: out.write(name);
266: out.writeFloat(value);
267: }
268:
269: protected void putNVPair(byte name, int value) {
270: nvCount++;
271: out.write(name);
272: out.writeInt(value);
273: }
274:
275: protected void putNVPair(byte name, long value) {
276: nvCount++;
277: out.write(name);
278: out.writeLong(value);
279: }
280:
281: protected void putNVPair(byte name, short value) {
282: nvCount++;
283: out.write(name);
284: out.writeShort(value);
285: }
286:
287: protected void putNVPair(byte name, String value) {
288: nvCount++;
289: out.write(name);
290: out.writeString(value);
291: }
292:
293: protected void putNVPair(byte name, TCSerializable object) {
294: nvCount++;
295: out.write(name);
296: object.serializeTo(out);
297: }
298:
299: protected void putNVPair(byte name, TCByteBuffer[] data) {
300: nvCount++;
301: out.write(name);
302: out.write(data);
303: }
304:
305: protected void putNVPair(byte name, byte[] bytes) {
306: nvCount++;
307: out.writeInt(bytes.length);
308: out.write(bytes);
309: }
310:
311: public ChannelID getChannelID() {
312: return channel.getChannelID();
313: }
314:
315: public MessageChannel getChannel() {
316: return channel;
317: }
318:
319: /*
320: * (non-Javadoc)
321: *
322: * @see com.tc.net.protocol.tcm.ApplicationMessage#send()
323: */
324: public void send() {
325: if (isSent.attemptSet()) {
326: dehydrate();
327: basicSend();
328: }
329: }
330:
331: private void basicSend() {
332: channel.send(this );
333: monitor.newOutgoingMessage(this );
334: }
335:
336: // FIXME:: This is here till them tc-comms merge.
337: // TODO:: Remove this method once getSourceID and getDestinationID is merged into truck. You can use those methods
338: // instead.
339: public ClientID getClientID() {
340: return new ClientID(getChannelID());
341: }
342: }
|