001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.catalina.tribes.io;
018:
019: import java.util.Arrays;
020:
021: import org.apache.catalina.tribes.ChannelMessage;
022: import org.apache.catalina.tribes.Member;
023: import org.apache.catalina.tribes.membership.MemberImpl;
024: import org.apache.catalina.tribes.util.UUIDGenerator;
025: import org.apache.catalina.tribes.Channel;
026: import java.sql.Timestamp;
027:
028: /**
029: * The <code>ChannelData</code> object is used to transfer a message through the
030: * channel interceptor stack and eventually out on a transport to be sent
031: * to another node. While the message is being processed by the different
032: * interceptors, the message data can be manipulated as each interceptor seems appropriate.
033: * @author Peter Rossbach
034: * @author Filip Hanik
035: * @version $Revision: 538977 $ $Date: 2007-05-17 17:43:49 +0200 (jeu., 17 mai 2007) $
036: *
037: */
038: public class ChannelData implements ChannelMessage {
039: public static ChannelData[] EMPTY_DATA_ARRAY = new ChannelData[0];
040:
041: public static boolean USE_SECURE_RANDOM_FOR_UUID = false;
042:
043: /**
044: * The options this message was sent with
045: */
046: private int options = 0;
047: /**
048: * The message data, stored in a dynamic buffer
049: */
050: private XByteBuffer message;
051: /**
052: * The timestamp that goes with this message
053: */
054: private long timestamp;
055: /**
056: * A unique message id
057: */
058: private byte[] uniqueId;
059: /**
060: * The source or reply-to address for this message
061: */
062: private Member address;
063:
064: /**
065: * Creates an empty channel data with a new unique Id
066: * @see #ChannelData(boolean)
067: */
068: public ChannelData() {
069: this (true);
070: }
071:
072: /**
073: * Create an empty channel data object
074: * @param generateUUID boolean - if true, a unique Id will be generated
075: */
076: public ChannelData(boolean generateUUID) {
077: if (generateUUID)
078: generateUUID();
079: }
080:
081: /**
082: * Creates a new channel data object with data
083: * @param uniqueId - unique message id
084: * @param message - message data
085: * @param timestamp - message timestamp
086: */
087: public ChannelData(byte[] uniqueId, XByteBuffer message,
088: long timestamp) {
089: this .uniqueId = uniqueId;
090: this .message = message;
091: this .timestamp = timestamp;
092: }
093:
094: /**
095: * @return Returns the message byte buffer
096: */
097: public XByteBuffer getMessage() {
098: return message;
099: }
100:
101: /**
102: * @param message The message to send.
103: */
104: public void setMessage(XByteBuffer message) {
105: this .message = message;
106: }
107:
108: /**
109: * @return Returns the timestamp.
110: */
111: public long getTimestamp() {
112: return timestamp;
113: }
114:
115: /**
116: * @param timestamp The timestamp to send
117: */
118: public void setTimestamp(long timestamp) {
119: this .timestamp = timestamp;
120: }
121:
122: /**
123: * @return Returns the uniqueId.
124: */
125: public byte[] getUniqueId() {
126: return uniqueId;
127: }
128:
129: /**
130: * @param uniqueId The uniqueId to send.
131: */
132: public void setUniqueId(byte[] uniqueId) {
133: this .uniqueId = uniqueId;
134: }
135:
136: /**
137: * @return returns the message options
138: * see org.apache.catalina.tribes.Channel#sendMessage(org.apache.catalina.tribes.Member[], java.io.Serializable, int)
139: *
140: */
141: public int getOptions() {
142: return options;
143: }
144:
145: /**
146: * @param sets the message options
147: */
148: public void setOptions(int options) {
149: this .options = options;
150: }
151:
152: /**
153: * Returns the source or reply-to address
154: * @return Member
155: */
156: public Member getAddress() {
157: return address;
158: }
159:
160: /**
161: * Sets the source or reply-to address
162: * @param address Member
163: */
164: public void setAddress(Member address) {
165: this .address = address;
166: }
167:
168: /**
169: * Generates a UUID and invokes setUniqueId
170: */
171: public void generateUUID() {
172: byte[] data = new byte[16];
173: UUIDGenerator.randomUUID(USE_SECURE_RANDOM_FOR_UUID, data, 0);
174: setUniqueId(data);
175: }
176:
177: public int getDataPackageLength() {
178: int length = 4 + //options
179: 8 + //timestamp off=4
180: 4 + //unique id length off=12
181: uniqueId.length + //id data off=12+uniqueId.length
182: 4 + //addr length off=12+uniqueId.length+4
183: ((MemberImpl) address).getDataLength() + //member data off=12+uniqueId.length+4+add.length
184: 4 + //message length off=12+uniqueId.length+4+add.length+4
185: message.getLength();
186: return length;
187:
188: }
189:
190: /**
191: * Serializes the ChannelData object into a byte[] array
192: * @return byte[]
193: */
194: public byte[] getDataPackage() {
195: int length = getDataPackageLength();
196: byte[] data = new byte[length];
197: int offset = 0;
198: return getDataPackage(data, offset);
199: }
200:
201: public byte[] getDataPackage(byte[] data, int offset) {
202: byte[] addr = ((MemberImpl) address).getData(false);
203: XByteBuffer.toBytes(options, data, offset);
204: offset += 4; //options
205: XByteBuffer.toBytes(timestamp, data, offset);
206: offset += 8; //timestamp
207: XByteBuffer.toBytes(uniqueId.length, data, offset);
208: offset += 4; //uniqueId.length
209: System.arraycopy(uniqueId, 0, data, offset, uniqueId.length);
210: offset += uniqueId.length; //uniqueId data
211: XByteBuffer.toBytes(addr.length, data, offset);
212: offset += 4; //addr.length
213: System.arraycopy(addr, 0, data, offset, addr.length);
214: offset += addr.length; //addr data
215: XByteBuffer.toBytes(message.getLength(), data, offset);
216: offset += 4; //message.length
217: System.arraycopy(message.getBytesDirect(), 0, data, offset,
218: message.getLength());
219: offset += message.getLength(); //message data
220: return data;
221: }
222:
223: /**
224: * Deserializes a ChannelData object from a byte array
225: * @param b byte[]
226: * @return ChannelData
227: */
228: public static ChannelData getDataFromPackage(XByteBuffer xbuf) {
229: ChannelData data = new ChannelData(false);
230: int offset = 0;
231: data.setOptions(XByteBuffer
232: .toInt(xbuf.getBytesDirect(), offset));
233: offset += 4; //options
234: data.setTimestamp(XByteBuffer.toLong(xbuf.getBytesDirect(),
235: offset));
236: offset += 8; //timestamp
237: data.uniqueId = new byte[XByteBuffer.toInt(xbuf
238: .getBytesDirect(), offset)];
239: offset += 4; //uniqueId length
240: System.arraycopy(xbuf.getBytesDirect(), offset, data.uniqueId,
241: 0, data.uniqueId.length);
242: offset += data.uniqueId.length; //uniqueId data
243: //byte[] addr = new byte[XByteBuffer.toInt(xbuf.getBytesDirect(),offset)];
244: int addrlen = XByteBuffer.toInt(xbuf.getBytesDirect(), offset);
245: offset += 4; //addr length
246: //System.arraycopy(xbuf.getBytesDirect(),offset,addr,0,addr.length);
247: data.setAddress(MemberImpl.getMember(xbuf.getBytesDirect(),
248: offset, addrlen));
249: //offset += addr.length; //addr data
250: offset += addrlen;
251: int xsize = XByteBuffer.toInt(xbuf.getBytesDirect(), offset);
252: offset += 4; //xsize length
253: System.arraycopy(xbuf.getBytesDirect(), offset, xbuf
254: .getBytesDirect(), 0, xsize);
255: xbuf.setLength(xsize);
256: data.message = xbuf;
257: return data;
258:
259: }
260:
261: public static ChannelData getDataFromPackage(byte[] b) {
262: ChannelData data = new ChannelData(false);
263: int offset = 0;
264: data.setOptions(XByteBuffer.toInt(b, offset));
265: offset += 4; //options
266: data.setTimestamp(XByteBuffer.toLong(b, offset));
267: offset += 8; //timestamp
268: data.uniqueId = new byte[XByteBuffer.toInt(b, offset)];
269: offset += 4; //uniqueId length
270: System.arraycopy(b, offset, data.uniqueId, 0,
271: data.uniqueId.length);
272: offset += data.uniqueId.length; //uniqueId data
273: byte[] addr = new byte[XByteBuffer.toInt(b, offset)];
274: offset += 4; //addr length
275: System.arraycopy(b, offset, addr, 0, addr.length);
276: data.setAddress(MemberImpl.getMember(addr));
277: offset += addr.length; //addr data
278: int xsize = XByteBuffer.toInt(b, offset);
279: //data.message = new XByteBuffer(new byte[xsize],false);
280: data.message = BufferPool.getBufferPool().getBuffer(xsize,
281: false);
282: offset += 4; //message length
283: System.arraycopy(b, offset, data.message.getBytesDirect(), 0,
284: xsize);
285: data.message.append(b, offset, xsize);
286: offset += xsize; //message data
287: return data;
288: }
289:
290: public int hashCode() {
291: return XByteBuffer.toInt(getUniqueId(), 0);
292: }
293:
294: /**
295: * Compares to ChannelData objects, only compares on getUniqueId().equals(o.getUniqueId())
296: * @param o Object
297: * @return boolean
298: */
299: public boolean equals(Object o) {
300: if (o instanceof ChannelData) {
301: return Arrays.equals(getUniqueId(), ((ChannelData) o)
302: .getUniqueId());
303: } else
304: return false;
305: }
306:
307: /**
308: * Create a shallow clone, only the data gets recreated
309: * @return ClusterData
310: */
311: public Object clone() {
312: // byte[] d = this.getDataPackage();
313: // return ClusterData.getDataFromPackage(d);
314: ChannelData clone = new ChannelData(false);
315: clone.options = this .options;
316: clone.message = new XByteBuffer(this .message.getBytesDirect(),
317: false);
318: clone.timestamp = this .timestamp;
319: clone.uniqueId = this .uniqueId;
320: clone.address = this .address;
321: return clone;
322: }
323:
324: /**
325: * Complete clone
326: * @return ClusterData
327: */
328: public Object deepclone() {
329: byte[] d = this .getDataPackage();
330: return ChannelData.getDataFromPackage(d);
331: }
332:
333: /**
334: * Utility method, returns true if the options flag indicates that an ack
335: * is to be sent after the message has been received and processed
336: * @param options int - the options for the message
337: * @return boolean
338: * @see org.apache.catalina.tribes.Channel#SEND_OPTIONS_USE_ACK
339: * @see org.apache.catalina.tribes.Channel#SEND_OPTIONS_SYNCHRONIZED_ACK
340: */
341: public static boolean sendAckSync(int options) {
342: return ((Channel.SEND_OPTIONS_USE_ACK & options) == Channel.SEND_OPTIONS_USE_ACK)
343: && ((Channel.SEND_OPTIONS_SYNCHRONIZED_ACK & options) == Channel.SEND_OPTIONS_SYNCHRONIZED_ACK);
344: }
345:
346: /**
347: * Utility method, returns true if the options flag indicates that an ack
348: * is to be sent after the message has been received but not yet processed
349: * @param options int - the options for the message
350: * @return boolean
351: * @see org.apache.catalina.tribes.Channel#SEND_OPTIONS_USE_ACK
352: * @see org.apache.catalina.tribes.Channel#SEND_OPTIONS_SYNCHRONIZED_ACK
353: */
354: public static boolean sendAckAsync(int options) {
355: return ((Channel.SEND_OPTIONS_USE_ACK & options) == Channel.SEND_OPTIONS_USE_ACK)
356: && ((Channel.SEND_OPTIONS_SYNCHRONIZED_ACK & options) != Channel.SEND_OPTIONS_SYNCHRONIZED_ACK);
357: }
358:
359: public String toString() {
360: StringBuffer buf = new StringBuffer();
361: buf.append("ClusterData[src=");
362: buf.append(getAddress()).append("; id=");
363: buf.append(bToS(getUniqueId())).append("; sent=");
364: buf.append(new Timestamp(this .getTimestamp()).toString())
365: .append("]");
366: return buf.toString();
367: }
368:
369: public static String bToS(byte[] data) {
370: StringBuffer buf = new StringBuffer(4 * 16);
371: buf.append("{");
372: for (int i = 0; data != null && i < data.length; i++)
373: buf.append(String.valueOf(data[i])).append(" ");
374: buf.append("}");
375: return buf.toString();
376: }
377:
378: }
|