001: // $Id: Message.java,v 1.53 2006/08/13 15:38:52 belaban Exp $
002:
003: package org.jgroups;
004:
005: import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
006: import org.apache.commons.logging.Log;
007: import org.apache.commons.logging.LogFactory;
008: import org.jgroups.conf.ClassConfigurator;
009: import org.jgroups.stack.IpAddress;
010: import org.jgroups.util.Marshaller;
011: import org.jgroups.util.Streamable;
012: import org.jgroups.util.Util;
013:
014: import java.io.*;
015: import java.util.HashSet;
016: import java.util.Iterator;
017: import java.util.Map;
018:
019: /**
020: * A Message encapsulates data sent to members of a group. It contains among other things the
021: * address of the sender, the destination address, a payload (byte buffer) and a list of
022: * headers. Headers are added by protocols on the sender side and removed by protocols
023: * on the receiver's side.
024: * <p>
025: * The byte buffer can point to a reference, and we can subset it using index and length. However,
026: * when the message is serialized, we only write the bytes between index and length.
027: * @author Bela Ban
028: */
029: public class Message implements Externalizable, Streamable {
030: protected Address dest_addr = null;
031: protected Address src_addr = null;
032:
033: /** The payload */
034: private byte[] buf = null;
035:
036: /** The index into the payload (usually 0) */
037: protected transient int offset = 0;
038:
039: /** The number of bytes in the buffer (usually buf.length is buf not equal to null). */
040: protected transient int length = 0;
041:
042: /** Map<String,Header> */
043: protected Map headers;
044:
045: protected static final Log log = LogFactory.getLog(Message.class);
046:
047: private static final long serialVersionUID = 7966206671974139740L;
048:
049: static final byte DEST_SET = 1;
050: static final byte SRC_SET = 2;
051: static final byte BUF_SET = 4;
052: // static final byte HDRS_SET=8; // bela July 15 2005: not needed, we always create headers
053: static final byte IPADDR_DEST = 16;
054: static final byte IPADDR_SRC = 32;
055: static final byte SRC_HOST_NULL = 64;
056:
057: static final HashSet nonStreamableHeaders = new HashSet(); // todo: remove when all headers are streamable
058:
059: /** Map<Address,Address>. Maintains mappings to canonical addresses */
060: private static final Map canonicalAddresses = new ConcurrentReaderHashMap();
061: private static final boolean DISABLE_CANONICALIZATION;
062:
063: static {
064: boolean b;
065: try {
066: b = Boolean.getBoolean("disable_canonicalization");
067: } catch (java.security.AccessControlException e) {
068: // this will happen in an applet context
069: b = false;
070: }
071: DISABLE_CANONICALIZATION = b;
072: }
073:
074: /** Public constructor
075: * @param dest Address of receiver. If it is <em>null</em> or a <em>string</em>, then
076: * it is sent to the group (either to current group or to the group as given
077: * in the string). If it is a Vector, then it contains a number of addresses
078: * to which it must be sent. Otherwise, it contains a single destination.<p>
079: * Addresses are generally untyped (all are of type <em>Object</em>. A channel
080: * instance must know what types of addresses it expects and downcast
081: * accordingly.
082: */
083: public Message(Address dest) {
084: dest_addr = dest;
085: headers = createHeaders(7);
086: }
087:
088: /** Public constructor
089: * @param dest Address of receiver. If it is <em>null</em> or a <em>string</em>, then
090: * it is sent to the group (either to current group or to the group as given
091: * in the string). If it is a Vector, then it contains a number of addresses
092: * to which it must be sent. Otherwise, it contains a single destination.<p>
093: * Addresses are generally untyped (all are of type <em>Object</em>. A channel
094: * instance must know what types of addresses it expects and downcast
095: * accordingly.
096: * @param src Address of sender
097: * @param buf Message to be sent. Note that this buffer must not be modified (e.g. buf[0]=0 is
098: * not allowed), since we don't copy the contents on clopy() or clone().
099: */
100: public Message(Address dest, Address src, byte[] buf) {
101: this (dest);
102: src_addr = src;
103: setBuffer(buf);
104: }
105:
106: /**
107: * Constructs a message. The index and length parameters allow to provide a <em>reference</em> to
108: * a byte buffer, rather than a copy, and refer to a subset of the buffer. This is important when
109: * we want to avoid copying. When the message is serialized, only the subset is serialized.
110: * @param dest Address of receiver. If it is <em>null</em> or a <em>string</em>, then
111: * it is sent to the group (either to current group or to the group as given
112: * in the string). If it is a Vector, then it contains a number of addresses
113: * to which it must be sent. Otherwise, it contains a single destination.<p>
114: * Addresses are generally untyped (all are of type <em>Object</em>. A channel
115: * instance must know what types of addresses it expects and downcast
116: * accordingly.
117: * @param src Address of sender
118: * @param buf A reference to a byte buffer
119: * @param offset The index into the byte buffer
120: * @param length The number of bytes to be used from <tt>buf</tt>. Both index and length are checked for
121: * array index violations and an ArrayIndexOutOfBoundsException will be thrown if invalid
122: */
123: public Message(Address dest, Address src, byte[] buf, int offset,
124: int length) {
125: this (dest);
126: src_addr = src;
127: setBuffer(buf, offset, length);
128: }
129:
130: /** Public constructor
131: * @param dest Address of receiver. If it is <em>null</em> or a <em>string</em>, then
132: * it is sent to the group (either to current group or to the group as given
133: * in the string). If it is a Vector, then it contains a number of addresses
134: * to which it must be sent. Otherwise, it contains a single destination.<p>
135: * Addresses are generally untyped (all are of type <em>Object</em>. A channel
136: * instance must know what types of addresses it expects and downcast
137: * accordingly.
138: * @param src Address of sender
139: * @param obj The object will be serialized into the byte buffer. <em>Object
140: * has to be serializable </em>! Note that the resulting buffer must not be modified
141: * (e.g. buf[0]=0 is not allowed), since we don't copy the contents on clopy() or clone().
142: */
143: public Message(Address dest, Address src, Serializable obj) {
144: this (dest);
145: src_addr = src;
146: setObject(obj);
147: }
148:
149: public Message() {
150: headers = createHeaders(7);
151: }
152:
153: public Message(boolean create_headers) {
154: if (create_headers)
155: headers = createHeaders(7);
156: }
157:
158: public Address getDest() {
159: return dest_addr;
160: }
161:
162: public void setDest(Address new_dest) {
163: if (DISABLE_CANONICALIZATION)
164: dest_addr = new_dest;
165: else
166: dest_addr = canonicalAddress(new_dest);
167: }
168:
169: public Address getSrc() {
170: return src_addr;
171: }
172:
173: public void setSrc(Address new_src) {
174: if (DISABLE_CANONICALIZATION)
175: src_addr = new_src;
176: else
177: src_addr = canonicalAddress(new_src);
178: }
179:
180: /**
181: * Returns a <em>reference</em> to the payload (byte buffer). Note that this buffer should not be modified as
182: * we do not copy the buffer on copy() or clone(): the buffer of the copied message is simply a reference to
183: * the old buffer.<br/>
184: * Even if offset and length are used: we return the <em>entire</em> buffer, not a subset.
185: */
186: public byte[] getRawBuffer() {
187: return buf;
188: }
189:
190: /**
191: * Returns a copy of the buffer if offset and length are used, otherwise a reference.
192: * @return byte array with a copy of the buffer.
193: */
194: final public byte[] getBuffer() {
195: if (buf == null)
196: return null;
197: if (offset == 0 && length == buf.length)
198: return buf;
199: else {
200: byte[] retval = new byte[length];
201: System.arraycopy(buf, offset, retval, 0, length);
202: return retval;
203: }
204: }
205:
206: final public void setBuffer(byte[] b) {
207: buf = b;
208: if (buf != null) {
209: offset = 0;
210: length = buf.length;
211: } else {
212: offset = length = 0;
213: }
214: }
215:
216: /**
217: * Set the internal buffer to point to a subset of a given buffer
218: * @param b The reference to a given buffer. If null, we'll reset the buffer to null
219: * @param offset The initial position
220: * @param length The number of bytes
221: */
222: final public void setBuffer(byte[] b, int offset, int length) {
223: buf = b;
224: if (buf != null) {
225: if (offset < 0 || offset > buf.length)
226: throw new ArrayIndexOutOfBoundsException(offset);
227: if ((offset + length) > buf.length)
228: throw new ArrayIndexOutOfBoundsException(
229: (offset + length));
230: this .offset = offset;
231: this .length = length;
232: } else {
233: offset = length = 0;
234: }
235: }
236:
237: /** Returns the offset into the buffer at which the data starts */
238: public int getOffset() {
239: return offset;
240: }
241:
242: /** Returns the number of bytes in the buffer */
243: public int getLength() {
244: return length;
245: }
246:
247: public Map getHeaders() {
248: return headers;
249: }
250:
251: final public void setObject(Serializable obj) {
252: if (obj == null)
253: return;
254: try {
255: byte[] tmp = Util.objectToByteBuffer(obj);
256: setBuffer(tmp);
257: } catch (Exception ex) {
258: throw new IllegalArgumentException(ex.toString());
259: }
260: }
261:
262: final public Object getObject() {
263: // if(buf == null) return null;
264: try {
265: return Util.objectFromByteBuffer(buf, offset, length);
266: } catch (Exception ex) {
267: throw new IllegalArgumentException(ex.toString());
268: }
269: }
270:
271: /**
272: * Nulls all fields of this message so that the message can be reused. Removes all headers from the
273: * hashmap, but keeps the hashmap
274: */
275: public void reset() {
276: dest_addr = src_addr = null;
277: setBuffer(null);
278: headers.clear();
279: }
280:
281: /*---------------------- Used by protocol layers ----------------------*/
282:
283: /** Puts a header given a key into the hashmap. Overwrites potential existing entry. */
284: public void putHeader(String key, Header hdr) {
285: headers.put(key, hdr);
286: }
287:
288: public Header removeHeader(String key) {
289: return (Header) headers.remove(key);
290: }
291:
292: public void removeHeaders() {
293: headers.clear();
294: }
295:
296: public Header getHeader(String key) {
297: return (Header) headers.get(key);
298: }
299:
300: /*---------------------------------------------------------------------*/
301:
302: public Message copy() {
303: return copy(true);
304: }
305:
306: /**
307: * Create a copy of the message. If offset and length are used (to refer to another buffer), the copy will
308: * contain only the subset offset and length point to, copying the subset into the new copy.
309: * @param copy_buffer
310: * @return Message with specified data
311: */
312: public Message copy(boolean copy_buffer) {
313: Message retval = new Message(false);
314: retval.dest_addr = dest_addr;
315: retval.src_addr = src_addr;
316:
317: if (copy_buffer && buf != null) {
318:
319: // change bela Feb 26 2004: we don't resolve the reference
320: retval.setBuffer(buf, offset, length);
321: }
322:
323: retval.headers = createHeaders(headers);
324: return retval;
325: }
326:
327: protected Object clone() throws CloneNotSupportedException {
328: return copy();
329: }
330:
331: public Message makeReply() {
332: return new Message(src_addr);
333: }
334:
335: public String toString() {
336: StringBuffer ret = new StringBuffer(64);
337: ret.append("[dst: ");
338: if (dest_addr == null)
339: ret.append("<null>");
340: else
341: ret.append(dest_addr);
342: ret.append(", src: ");
343: if (src_addr == null)
344: ret.append("<null>");
345: else
346: ret.append(src_addr);
347:
348: int size;
349: if (headers != null && (size = headers.size()) > 0)
350: ret.append(" (").append(size).append(" headers)");
351:
352: ret.append(", size = ");
353: if (buf != null && length > 0)
354: ret.append(length);
355: else
356: ret.append('0');
357: ret.append(" bytes");
358: ret.append(']');
359: return ret.toString();
360: }
361:
362: /** Tries to read an object from the message's buffer and prints it */
363: public String toStringAsObject() {
364:
365: if (buf == null)
366: return null;
367: try {
368: Object obj = getObject();
369: return obj != null ? obj.toString() : "";
370: } catch (Exception e) { // it is not an object
371: return "";
372: }
373: }
374:
375: /**
376: * Returns size of buffer, plus some constant overhead for src and dest, plus number of headers time
377: * some estimated size/header. The latter is needed because we don't want to marshal all headers just
378: * to find out their size requirements. If a header implements Sizeable, the we can get the correct
379: * size.<p> Size estimations don't have to be very accurate since this is mainly used by FRAG to
380: * determine whether to fragment a message or not. Fragmentation will then serialize the message,
381: * therefore getting the correct value.
382: */
383:
384: /**
385: * Returns the exact size of the marshalled message. Uses method size() of each header to compute the size, so if
386: * a Header subclass doesn't implement size() we will use an approximation. However, most relevant header subclasses
387: * have size() implemented correctly. (See org.jgroups.tests.SizeTest).
388: * @return The number of bytes for the marshalled message
389: */
390: public long size() {
391: long retval = Global.BYTE_SIZE // leading byte
392: + length // buffer
393: + (buf != null ? Global.INT_SIZE : 0); // if buf != null 4 bytes for length
394:
395: // if(dest_addr != null)
396: // retval+=dest_addr.size();
397: if (src_addr != null)
398: retval += (src_addr).size();
399:
400: Map.Entry entry;
401: String key;
402: Header hdr;
403: retval += Global.SHORT_SIZE; // size (short)
404: for (Iterator it = headers.entrySet().iterator(); it.hasNext();) {
405: entry = (Map.Entry) it.next();
406: key = (String) entry.getKey();
407: retval += key.length() + 2; // not the same as writeUTF(), but almost
408: hdr = (Header) entry.getValue();
409: retval += 5; // 1 for presence of magic number, 4 for magic number
410: retval += hdr.size();
411: }
412: return retval;
413: }
414:
415: public String printObjectHeaders() {
416: StringBuffer sb = new StringBuffer();
417: Map.Entry entry;
418:
419: if (headers != null) {
420: for (Iterator it = headers.entrySet().iterator(); it
421: .hasNext();) {
422: entry = (Map.Entry) it.next();
423: sb.append(entry.getKey()).append(": ").append(
424: entry.getValue()).append('\n');
425: }
426: }
427: return sb.toString();
428: }
429:
430: /* ----------------------------------- Interface Externalizable ------------------------------- */
431:
432: public void writeExternal(ObjectOutput out) throws IOException {
433: int len;
434: Externalizable hdr;
435: Map.Entry entry;
436:
437: if (dest_addr != null) {
438: out.writeBoolean(true);
439: Marshaller.write(dest_addr, out);
440: } else {
441: out.writeBoolean(false);
442: }
443:
444: if (src_addr != null) {
445: out.writeBoolean(true);
446: Marshaller.write(src_addr, out);
447: } else {
448: out.writeBoolean(false);
449: }
450:
451: if (buf == null)
452: out.writeInt(0);
453: else {
454: out.writeInt(length);
455: out.write(buf, offset, length);
456: }
457:
458: len = headers.size();
459: out.writeInt(len);
460: for (Iterator it = headers.entrySet().iterator(); it.hasNext();) {
461: entry = (Map.Entry) it.next();
462: out.writeUTF((String) entry.getKey());
463: hdr = (Externalizable) entry.getValue();
464: Marshaller.write(hdr, out);
465: }
466: }
467:
468: public void readExternal(ObjectInput in) throws IOException,
469: ClassNotFoundException {
470: boolean destAddressExist = in.readBoolean();
471:
472: if (destAddressExist) {
473: dest_addr = (Address) Marshaller.read(in);
474: if (!DISABLE_CANONICALIZATION)
475: dest_addr = canonicalAddress(dest_addr);
476: }
477:
478: boolean srcAddressExist = in.readBoolean();
479: if (srcAddressExist) {
480: src_addr = (Address) Marshaller.read(in);
481: if (!DISABLE_CANONICALIZATION)
482: src_addr = canonicalAddress(src_addr);
483: }
484:
485: int i = in.readInt();
486: if (i != 0) {
487: buf = new byte[i];
488: in.readFully(buf);
489: offset = 0;
490: length = buf.length;
491: }
492:
493: int len = in.readInt();
494: while (len-- > 0) {
495: Object key = in.readUTF();
496: Object value = Marshaller.read(in);
497: headers.put(key, value);
498: }
499: }
500:
501: /* --------------------------------- End of Interface Externalizable ----------------------------- */
502:
503: /* ----------------------------------- Interface Streamable ------------------------------- */
504:
505: /**
506: * Streams all members (dest and src addresses, buffer and headers) to the output stream.
507: * @param out
508: * @throws IOException
509: */
510: public void writeTo(DataOutputStream out) throws IOException {
511: byte leading = 0;
512:
513: // if(dest_addr != null) {
514: // leading+=DEST_SET;
515: // if(dest_addr instanceof IpAddress)
516: // leading+=IPADDR_DEST;
517: // }
518:
519: if (src_addr != null) {
520: leading += SRC_SET;
521: if (src_addr instanceof IpAddress) {
522: leading += IPADDR_SRC;
523: if (((IpAddress) src_addr).getIpAddress() == null) {
524: leading += SRC_HOST_NULL;
525: }
526: }
527: }
528: if (buf != null)
529: leading += BUF_SET;
530:
531: // 1. write the leading byte first
532: out.write(leading);
533:
534: // 2. dest_addr
535: // if(dest_addr != null) {
536: // if(dest_addr instanceof IpAddress)
537: // dest_addr.writeTo(out);
538: // else
539: // Util.writeAddress(dest_addr, out);
540: // }
541:
542: // 3. src_addr
543: if (src_addr != null) {
544: if (src_addr instanceof IpAddress) {
545: src_addr.writeTo(out);
546: } else {
547: Util.writeAddress(src_addr, out);
548: }
549: }
550:
551: // 4. buf
552: if (buf != null) {
553: out.writeInt(length);
554: out.write(buf, offset, length);
555: }
556:
557: // 5. headers
558: int size = headers.size();
559: out.writeShort(size);
560: Map.Entry entry;
561: for (Iterator it = headers.entrySet().iterator(); it.hasNext();) {
562: entry = (Map.Entry) it.next();
563: out.writeUTF((String) entry.getKey());
564: writeHeader((Header) entry.getValue(), out);
565: }
566: }
567:
568: public void readFrom(DataInputStream in) throws IOException,
569: IllegalAccessException, InstantiationException {
570: int len, leading;
571: String hdr_name;
572: Header hdr;
573:
574: // 1. read the leading byte first
575: leading = in.readByte();
576:
577: // 1. dest_addr
578: // if((leading & DEST_SET) == DEST_SET) {
579: // if((leading & IPADDR_DEST) == IPADDR_DEST) {
580: // dest_addr=new IpAddress();
581: // dest_addr.readFrom(in);
582: // }
583: // else {
584: // dest_addr=Util.readAddress(in);
585: // }
586: // }
587:
588: // 2. src_addr
589: if ((leading & SRC_SET) == SRC_SET) {
590: if ((leading & IPADDR_SRC) == IPADDR_SRC) {
591: src_addr = new IpAddress();
592: src_addr.readFrom(in);
593: } else {
594: src_addr = Util.readAddress(in);
595: }
596: if (!DISABLE_CANONICALIZATION)
597: src_addr = canonicalAddress(src_addr);
598: }
599:
600: // 3. buf
601: if ((leading & BUF_SET) == BUF_SET) {
602: len = in.readInt();
603: buf = new byte[len];
604: in.read(buf, 0, len);
605: length = len;
606: }
607:
608: // 4. headers
609: len = in.readShort();
610: headers = createHeaders(len);
611: for (int i = 0; i < len; i++) {
612: hdr_name = in.readUTF();
613: hdr = readHeader(in);
614: headers.put(hdr_name, hdr);
615: }
616: }
617:
618: /* --------------------------------- End of Interface Streamable ----------------------------- */
619:
620: /* ----------------------------------- Private methods ------------------------------- */
621:
622: private static void writeHeader(Header value, DataOutputStream out)
623: throws IOException {
624: int magic_number;
625: String classname;
626: ObjectOutputStream oos = null;
627: try {
628: magic_number = ClassConfigurator.getInstance(false)
629: .getMagicNumber(value.getClass());
630: // write the magic number or the class name
631: if (magic_number == -1) {
632: out.writeBoolean(false);
633: classname = value.getClass().getName();
634: out.writeUTF(classname);
635: } else {
636: out.writeBoolean(true);
637: out.writeInt(magic_number);
638: }
639:
640: // write the contents
641: if (value instanceof Streamable) {
642: ((Streamable) value).writeTo(out);
643: } else {
644: oos = new ObjectOutputStream(out);
645: value.writeExternal(oos);
646: if (!nonStreamableHeaders.contains(value.getClass())) {
647: nonStreamableHeaders.add(value.getClass());
648: if (log.isTraceEnabled())
649: log.trace("encountered non-Streamable header: "
650: + value.getClass());
651: }
652: }
653: } catch (ChannelException e) {
654: IOException io_ex = new IOException("failed writing header");
655: io_ex.initCause(e);
656: throw io_ex;
657: } finally {
658: if (oos != null)
659: oos.close(); // this is a no-op on ByteArrayOutputStream
660: }
661: }
662:
663: private static Header readHeader(DataInputStream in)
664: throws IOException {
665: Header hdr;
666: boolean use_magic_number = in.readBoolean();
667: int magic_number;
668: String classname;
669: Class clazz;
670: ObjectInputStream ois = null;
671:
672: try {
673: if (use_magic_number) {
674: magic_number = in.readInt();
675: clazz = ClassConfigurator.getInstance(false).get(
676: magic_number);
677: if (clazz == null)
678: log.error("magic number " + magic_number
679: + " is not available in magic map");
680: } else {
681: classname = in.readUTF();
682: clazz = ClassConfigurator.getInstance(false).get(
683: classname);
684: }
685: hdr = (Header) clazz.newInstance();
686: if (hdr instanceof Streamable) {
687: ((Streamable) hdr).readFrom(in);
688: } else {
689: ois = new ObjectInputStream(in);
690: hdr.readExternal(ois);
691: }
692: } catch (Exception ex) {
693: IOException io_ex = new IOException("failed reading header");
694: io_ex.initCause(ex);
695: throw io_ex;
696: }
697: return hdr;
698: }
699:
700: private static Map createHeaders(int size) {
701: return size > 0 ? new ConcurrentReaderHashMap(size)
702: : new ConcurrentReaderHashMap();
703: }
704:
705: private static Map createHeaders(Map m) {
706: return new ConcurrentReaderHashMap(m);
707: }
708:
709: /** canonicalize addresses to some extent. There are race conditions
710: * allowed in this method, so it may not fully canonicalize an address
711: * @param nonCanonicalAddress
712: * @return canonical representation of the address
713: */
714: private static Address canonicalAddress(Address nonCanonicalAddress) {
715: Address result = null;
716: if (nonCanonicalAddress == null) {
717: return null;
718: }
719: // do not synchronize between get/put on the canonical map to avoid cost of contention
720: // this can allow multiple equivalent addresses to leak out, but it's worth the cost savings
721: try {
722: result = (Address) canonicalAddresses
723: .get(nonCanonicalAddress);
724: } catch (NullPointerException npe) {
725: // no action needed
726: }
727: if (result == null) {
728: result = nonCanonicalAddress;
729: canonicalAddresses.put(nonCanonicalAddress, result);
730: }
731: return result;
732: }
733:
734: /* ------------------------------- End of Private methods ---------------------------- */
735:
736: }
|