Source Code Cross Referenced for Message.java in  » Net » JGroups-2.4.1-sp3 » org » jgroups » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » JGroups 2.4.1 sp3 » org.jgroups 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        // $Id: Message.java,v 1.53 2006/08/13 15:38:52 belaban Exp $
002:
003:        package org.jgroups;
004:
005:        import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
006:        import org.apache.commons.logging.Log;
007:        import org.apache.commons.logging.LogFactory;
008:        import org.jgroups.conf.ClassConfigurator;
009:        import org.jgroups.stack.IpAddress;
010:        import org.jgroups.util.Marshaller;
011:        import org.jgroups.util.Streamable;
012:        import org.jgroups.util.Util;
013:
014:        import java.io.*;
015:        import java.util.HashSet;
016:        import java.util.Iterator;
017:        import java.util.Map;
018:
019:        /**
020:         * A Message encapsulates data sent to members of a group. It contains among other things the
021:         * address of the sender, the destination address, a payload (byte buffer) and a list of
022:         * headers. Headers are added by protocols on the sender side and removed by protocols
023:         * on the receiver's side.
024:         * <p>
025:         * The byte buffer can point to a reference, and we can subset it using index and length. However,
026:         * when the message is serialized, we only write the bytes between index and length.
027:         * @author Bela Ban
028:         */
029:        public class Message implements  Externalizable, Streamable {
030:            protected Address dest_addr = null;
031:            protected Address src_addr = null;
032:
033:            /** The payload */
034:            private byte[] buf = null;
035:
036:            /** The index into the payload (usually 0) */
037:            protected transient int offset = 0;
038:
039:            /** The number of bytes in the buffer (usually buf.length is buf not equal to null). */
040:            protected transient int length = 0;
041:
042:            /** Map<String,Header> */
043:            protected Map headers;
044:
045:            protected static final Log log = LogFactory.getLog(Message.class);
046:
047:            private static final long serialVersionUID = 7966206671974139740L;
048:
049:            static final byte DEST_SET = 1;
050:            static final byte SRC_SET = 2;
051:            static final byte BUF_SET = 4;
052:            // static final byte HDRS_SET=8; // bela July 15 2005: not needed, we always create headers
053:            static final byte IPADDR_DEST = 16;
054:            static final byte IPADDR_SRC = 32;
055:            static final byte SRC_HOST_NULL = 64;
056:
057:            static final HashSet nonStreamableHeaders = new HashSet(); // todo: remove when all headers are streamable
058:
059:            /** Map<Address,Address>. Maintains mappings to canonical addresses */
060:            private static final Map canonicalAddresses = new ConcurrentReaderHashMap();
061:            private static final boolean DISABLE_CANONICALIZATION;
062:
063:            static {
064:                boolean b;
065:                try {
066:                    b = Boolean.getBoolean("disable_canonicalization");
067:                } catch (java.security.AccessControlException e) {
068:                    // this will happen in an applet context
069:                    b = false;
070:                }
071:                DISABLE_CANONICALIZATION = b;
072:            }
073:
074:            /** Public constructor
075:             *  @param dest Address of receiver. If it is <em>null</em> or a <em>string</em>, then
076:             *              it is sent to the group (either to current group or to the group as given
077:             *              in the string). If it is a Vector, then it contains a number of addresses
078:             *              to which it must be sent. Otherwise, it contains a single destination.<p>
079:             *              Addresses are generally untyped (all are of type <em>Object</em>. A channel
080:             *              instance must know what types of addresses it expects and downcast
081:             *              accordingly.
082:             */
083:            public Message(Address dest) {
084:                dest_addr = dest;
085:                headers = createHeaders(7);
086:            }
087:
088:            /** Public constructor
089:             *  @param dest Address of receiver. If it is <em>null</em> or a <em>string</em>, then
090:             *              it is sent to the group (either to current group or to the group as given
091:             *              in the string). If it is a Vector, then it contains a number of addresses
092:             *              to which it must be sent. Otherwise, it contains a single destination.<p>
093:             *              Addresses are generally untyped (all are of type <em>Object</em>. A channel
094:             *              instance must know what types of addresses it expects and downcast
095:             *              accordingly.
096:             *  @param src  Address of sender
097:             *  @param buf  Message to be sent. Note that this buffer must not be modified (e.g. buf[0]=0 is
098:             *              not allowed), since we don't copy the contents on clopy() or clone().
099:             */
100:            public Message(Address dest, Address src, byte[] buf) {
101:                this (dest);
102:                src_addr = src;
103:                setBuffer(buf);
104:            }
105:
106:            /**
107:             * Constructs a message. The index and length parameters allow to provide a <em>reference</em> to
108:             * a byte buffer, rather than a copy, and refer to a subset of the buffer. This is important when
109:             * we want to avoid copying. When the message is serialized, only the subset is serialized.
110:             * @param dest Address of receiver. If it is <em>null</em> or a <em>string</em>, then
111:             *              it is sent to the group (either to current group or to the group as given
112:             *              in the string). If it is a Vector, then it contains a number of addresses
113:             *              to which it must be sent. Otherwise, it contains a single destination.<p>
114:             *              Addresses are generally untyped (all are of type <em>Object</em>. A channel
115:             *              instance must know what types of addresses it expects and downcast
116:             *              accordingly.
117:             * @param src    Address of sender
118:             * @param buf    A reference to a byte buffer
119:             * @param offset The index into the byte buffer
120:             * @param length The number of bytes to be used from <tt>buf</tt>. Both index and length are checked for
121:             *               array index violations and an ArrayIndexOutOfBoundsException will be thrown if invalid
122:             */
123:            public Message(Address dest, Address src, byte[] buf, int offset,
124:                    int length) {
125:                this (dest);
126:                src_addr = src;
127:                setBuffer(buf, offset, length);
128:            }
129:
130:            /** Public constructor
131:             *  @param dest Address of receiver. If it is <em>null</em> or a <em>string</em>, then
132:             *              it is sent to the group (either to current group or to the group as given
133:             *              in the string). If it is a Vector, then it contains a number of addresses
134:             *              to which it must be sent. Otherwise, it contains a single destination.<p>
135:             *              Addresses are generally untyped (all are of type <em>Object</em>. A channel
136:             *              instance must know what types of addresses it expects and downcast
137:             *              accordingly.
138:             *  @param src  Address of sender
139:             *  @param obj  The object will be serialized into the byte buffer. <em>Object
140:             *              has to be serializable </em>! Note that the resulting buffer must not be modified
141:             *              (e.g. buf[0]=0 is not allowed), since we don't copy the contents on clopy() or clone().
142:             */
143:            public Message(Address dest, Address src, Serializable obj) {
144:                this (dest);
145:                src_addr = src;
146:                setObject(obj);
147:            }
148:
149:            public Message() {
150:                headers = createHeaders(7);
151:            }
152:
153:            public Message(boolean create_headers) {
154:                if (create_headers)
155:                    headers = createHeaders(7);
156:            }
157:
158:            public Address getDest() {
159:                return dest_addr;
160:            }
161:
162:            public void setDest(Address new_dest) {
163:                if (DISABLE_CANONICALIZATION)
164:                    dest_addr = new_dest;
165:                else
166:                    dest_addr = canonicalAddress(new_dest);
167:            }
168:
169:            public Address getSrc() {
170:                return src_addr;
171:            }
172:
173:            public void setSrc(Address new_src) {
174:                if (DISABLE_CANONICALIZATION)
175:                    src_addr = new_src;
176:                else
177:                    src_addr = canonicalAddress(new_src);
178:            }
179:
180:            /**
181:             * Returns a <em>reference</em> to the payload (byte buffer). Note that this buffer should not be modified as
182:             * we do not copy the buffer on copy() or clone(): the buffer of the copied message is simply a reference to
183:             * the old buffer.<br/>
184:             * Even if offset and length are used: we return the <em>entire</em> buffer, not a subset.
185:             */
186:            public byte[] getRawBuffer() {
187:                return buf;
188:            }
189:
190:            /**
191:             * Returns a copy of the buffer if offset and length are used, otherwise a reference.
192:             * @return byte array with a copy of the buffer.
193:             */
194:            final public byte[] getBuffer() {
195:                if (buf == null)
196:                    return null;
197:                if (offset == 0 && length == buf.length)
198:                    return buf;
199:                else {
200:                    byte[] retval = new byte[length];
201:                    System.arraycopy(buf, offset, retval, 0, length);
202:                    return retval;
203:                }
204:            }
205:
206:            final public void setBuffer(byte[] b) {
207:                buf = b;
208:                if (buf != null) {
209:                    offset = 0;
210:                    length = buf.length;
211:                } else {
212:                    offset = length = 0;
213:                }
214:            }
215:
216:            /**
217:             * Set the internal buffer to point to a subset of a given buffer
218:             * @param b The reference to a given buffer. If null, we'll reset the buffer to null
219:             * @param offset The initial position
220:             * @param length The number of bytes
221:             */
222:            final public void setBuffer(byte[] b, int offset, int length) {
223:                buf = b;
224:                if (buf != null) {
225:                    if (offset < 0 || offset > buf.length)
226:                        throw new ArrayIndexOutOfBoundsException(offset);
227:                    if ((offset + length) > buf.length)
228:                        throw new ArrayIndexOutOfBoundsException(
229:                                (offset + length));
230:                    this .offset = offset;
231:                    this .length = length;
232:                } else {
233:                    offset = length = 0;
234:                }
235:            }
236:
237:            /** Returns the offset into the buffer at which the data starts */
238:            public int getOffset() {
239:                return offset;
240:            }
241:
242:            /** Returns the number of bytes in the buffer */
243:            public int getLength() {
244:                return length;
245:            }
246:
247:            public Map getHeaders() {
248:                return headers;
249:            }
250:
251:            final public void setObject(Serializable obj) {
252:                if (obj == null)
253:                    return;
254:                try {
255:                    byte[] tmp = Util.objectToByteBuffer(obj);
256:                    setBuffer(tmp);
257:                } catch (Exception ex) {
258:                    throw new IllegalArgumentException(ex.toString());
259:                }
260:            }
261:
262:            final public Object getObject() {
263:                // if(buf == null) return null;
264:                try {
265:                    return Util.objectFromByteBuffer(buf, offset, length);
266:                } catch (Exception ex) {
267:                    throw new IllegalArgumentException(ex.toString());
268:                }
269:            }
270:
271:            /**
272:             * Nulls all fields of this message so that the message can be reused. Removes all headers from the
273:             * hashmap, but keeps the hashmap
274:             */
275:            public void reset() {
276:                dest_addr = src_addr = null;
277:                setBuffer(null);
278:                headers.clear();
279:            }
280:
281:            /*---------------------- Used by protocol layers ----------------------*/
282:
283:            /** Puts a header given a key into the hashmap. Overwrites potential existing entry. */
284:            public void putHeader(String key, Header hdr) {
285:                headers.put(key, hdr);
286:            }
287:
288:            public Header removeHeader(String key) {
289:                return (Header) headers.remove(key);
290:            }
291:
292:            public void removeHeaders() {
293:                headers.clear();
294:            }
295:
296:            public Header getHeader(String key) {
297:                return (Header) headers.get(key);
298:            }
299:
300:            /*---------------------------------------------------------------------*/
301:
302:            public Message copy() {
303:                return copy(true);
304:            }
305:
306:            /**
307:             * Create a copy of the message. If offset and length are used (to refer to another buffer), the copy will
308:             * contain only the subset offset and length point to, copying the subset into the new copy.
309:             * @param copy_buffer
310:             * @return Message with specified data
311:             */
312:            public Message copy(boolean copy_buffer) {
313:                Message retval = new Message(false);
314:                retval.dest_addr = dest_addr;
315:                retval.src_addr = src_addr;
316:
317:                if (copy_buffer && buf != null) {
318:
319:                    // change bela Feb 26 2004: we don't resolve the reference
320:                    retval.setBuffer(buf, offset, length);
321:                }
322:
323:                retval.headers = createHeaders(headers);
324:                return retval;
325:            }
326:
327:            protected Object clone() throws CloneNotSupportedException {
328:                return copy();
329:            }
330:
331:            public Message makeReply() {
332:                return new Message(src_addr);
333:            }
334:
335:            public String toString() {
336:                StringBuffer ret = new StringBuffer(64);
337:                ret.append("[dst: ");
338:                if (dest_addr == null)
339:                    ret.append("<null>");
340:                else
341:                    ret.append(dest_addr);
342:                ret.append(", src: ");
343:                if (src_addr == null)
344:                    ret.append("<null>");
345:                else
346:                    ret.append(src_addr);
347:
348:                int size;
349:                if (headers != null && (size = headers.size()) > 0)
350:                    ret.append(" (").append(size).append(" headers)");
351:
352:                ret.append(", size = ");
353:                if (buf != null && length > 0)
354:                    ret.append(length);
355:                else
356:                    ret.append('0');
357:                ret.append(" bytes");
358:                ret.append(']');
359:                return ret.toString();
360:            }
361:
362:            /** Tries to read an object from the message's buffer and prints it */
363:            public String toStringAsObject() {
364:
365:                if (buf == null)
366:                    return null;
367:                try {
368:                    Object obj = getObject();
369:                    return obj != null ? obj.toString() : "";
370:                } catch (Exception e) { // it is not an object
371:                    return "";
372:                }
373:            }
374:
375:            /**
376:             * Returns size of buffer, plus some constant overhead for src and dest, plus number of headers time
377:             * some estimated size/header. The latter is needed because we don't want to marshal all headers just
378:             * to find out their size requirements. If a header implements Sizeable, the we can get the correct
379:             * size.<p> Size estimations don't have to be very accurate since this is mainly used by FRAG to
380:             * determine whether to fragment a message or not. Fragmentation will then serialize the message,
381:             * therefore getting the correct value.
382:             */
383:
384:            /**
385:             * Returns the exact size of the marshalled message. Uses method size() of each header to compute the size, so if
386:             * a Header subclass doesn't implement size() we will use an approximation. However, most relevant header subclasses
387:             * have size() implemented correctly. (See org.jgroups.tests.SizeTest).
388:             * @return The number of bytes for the marshalled message
389:             */
390:            public long size() {
391:                long retval = Global.BYTE_SIZE // leading byte
392:                        + length // buffer
393:                        + (buf != null ? Global.INT_SIZE : 0); // if buf != null 4 bytes for length
394:
395:                // if(dest_addr != null)
396:                // retval+=dest_addr.size();
397:                if (src_addr != null)
398:                    retval += (src_addr).size();
399:
400:                Map.Entry entry;
401:                String key;
402:                Header hdr;
403:                retval += Global.SHORT_SIZE; // size (short)
404:                for (Iterator it = headers.entrySet().iterator(); it.hasNext();) {
405:                    entry = (Map.Entry) it.next();
406:                    key = (String) entry.getKey();
407:                    retval += key.length() + 2; // not the same as writeUTF(), but almost
408:                    hdr = (Header) entry.getValue();
409:                    retval += 5; // 1 for presence of magic number, 4 for magic number
410:                    retval += hdr.size();
411:                }
412:                return retval;
413:            }
414:
415:            public String printObjectHeaders() {
416:                StringBuffer sb = new StringBuffer();
417:                Map.Entry entry;
418:
419:                if (headers != null) {
420:                    for (Iterator it = headers.entrySet().iterator(); it
421:                            .hasNext();) {
422:                        entry = (Map.Entry) it.next();
423:                        sb.append(entry.getKey()).append(": ").append(
424:                                entry.getValue()).append('\n');
425:                    }
426:                }
427:                return sb.toString();
428:            }
429:
430:            /* ----------------------------------- Interface Externalizable ------------------------------- */
431:
432:            public void writeExternal(ObjectOutput out) throws IOException {
433:                int len;
434:                Externalizable hdr;
435:                Map.Entry entry;
436:
437:                if (dest_addr != null) {
438:                    out.writeBoolean(true);
439:                    Marshaller.write(dest_addr, out);
440:                } else {
441:                    out.writeBoolean(false);
442:                }
443:
444:                if (src_addr != null) {
445:                    out.writeBoolean(true);
446:                    Marshaller.write(src_addr, out);
447:                } else {
448:                    out.writeBoolean(false);
449:                }
450:
451:                if (buf == null)
452:                    out.writeInt(0);
453:                else {
454:                    out.writeInt(length);
455:                    out.write(buf, offset, length);
456:                }
457:
458:                len = headers.size();
459:                out.writeInt(len);
460:                for (Iterator it = headers.entrySet().iterator(); it.hasNext();) {
461:                    entry = (Map.Entry) it.next();
462:                    out.writeUTF((String) entry.getKey());
463:                    hdr = (Externalizable) entry.getValue();
464:                    Marshaller.write(hdr, out);
465:                }
466:            }
467:
468:            public void readExternal(ObjectInput in) throws IOException,
469:                    ClassNotFoundException {
470:                boolean destAddressExist = in.readBoolean();
471:
472:                if (destAddressExist) {
473:                    dest_addr = (Address) Marshaller.read(in);
474:                    if (!DISABLE_CANONICALIZATION)
475:                        dest_addr = canonicalAddress(dest_addr);
476:                }
477:
478:                boolean srcAddressExist = in.readBoolean();
479:                if (srcAddressExist) {
480:                    src_addr = (Address) Marshaller.read(in);
481:                    if (!DISABLE_CANONICALIZATION)
482:                        src_addr = canonicalAddress(src_addr);
483:                }
484:
485:                int i = in.readInt();
486:                if (i != 0) {
487:                    buf = new byte[i];
488:                    in.readFully(buf);
489:                    offset = 0;
490:                    length = buf.length;
491:                }
492:
493:                int len = in.readInt();
494:                while (len-- > 0) {
495:                    Object key = in.readUTF();
496:                    Object value = Marshaller.read(in);
497:                    headers.put(key, value);
498:                }
499:            }
500:
501:            /* --------------------------------- End of Interface Externalizable ----------------------------- */
502:
503:            /* ----------------------------------- Interface Streamable  ------------------------------- */
504:
505:            /**
506:             * Streams all members (dest and src addresses, buffer and headers) to the output stream.
507:             * @param out
508:             * @throws IOException
509:             */
510:            public void writeTo(DataOutputStream out) throws IOException {
511:                byte leading = 0;
512:
513:                //        if(dest_addr != null) {
514:                //            leading+=DEST_SET;
515:                //            if(dest_addr instanceof IpAddress)
516:                //                leading+=IPADDR_DEST;
517:                //        }
518:
519:                if (src_addr != null) {
520:                    leading += SRC_SET;
521:                    if (src_addr instanceof  IpAddress) {
522:                        leading += IPADDR_SRC;
523:                        if (((IpAddress) src_addr).getIpAddress() == null) {
524:                            leading += SRC_HOST_NULL;
525:                        }
526:                    }
527:                }
528:                if (buf != null)
529:                    leading += BUF_SET;
530:
531:                // 1. write the leading byte first
532:                out.write(leading);
533:
534:                // 2. dest_addr
535:                //        if(dest_addr != null) {
536:                //            if(dest_addr instanceof IpAddress)
537:                //                dest_addr.writeTo(out);
538:                //            else
539:                //                Util.writeAddress(dest_addr, out);
540:                //        }
541:
542:                // 3. src_addr
543:                if (src_addr != null) {
544:                    if (src_addr instanceof  IpAddress) {
545:                        src_addr.writeTo(out);
546:                    } else {
547:                        Util.writeAddress(src_addr, out);
548:                    }
549:                }
550:
551:                // 4. buf
552:                if (buf != null) {
553:                    out.writeInt(length);
554:                    out.write(buf, offset, length);
555:                }
556:
557:                // 5. headers
558:                int size = headers.size();
559:                out.writeShort(size);
560:                Map.Entry entry;
561:                for (Iterator it = headers.entrySet().iterator(); it.hasNext();) {
562:                    entry = (Map.Entry) it.next();
563:                    out.writeUTF((String) entry.getKey());
564:                    writeHeader((Header) entry.getValue(), out);
565:                }
566:            }
567:
568:            public void readFrom(DataInputStream in) throws IOException,
569:                    IllegalAccessException, InstantiationException {
570:                int len, leading;
571:                String hdr_name;
572:                Header hdr;
573:
574:                // 1. read the leading byte first
575:                leading = in.readByte();
576:
577:                // 1. dest_addr
578:                //        if((leading & DEST_SET) == DEST_SET) {
579:                //            if((leading & IPADDR_DEST) == IPADDR_DEST) {
580:                //                dest_addr=new IpAddress();
581:                //                dest_addr.readFrom(in);
582:                //            }
583:                //            else {
584:                //                dest_addr=Util.readAddress(in);
585:                //            }
586:                //        }
587:
588:                // 2. src_addr
589:                if ((leading & SRC_SET) == SRC_SET) {
590:                    if ((leading & IPADDR_SRC) == IPADDR_SRC) {
591:                        src_addr = new IpAddress();
592:                        src_addr.readFrom(in);
593:                    } else {
594:                        src_addr = Util.readAddress(in);
595:                    }
596:                    if (!DISABLE_CANONICALIZATION)
597:                        src_addr = canonicalAddress(src_addr);
598:                }
599:
600:                // 3. buf
601:                if ((leading & BUF_SET) == BUF_SET) {
602:                    len = in.readInt();
603:                    buf = new byte[len];
604:                    in.read(buf, 0, len);
605:                    length = len;
606:                }
607:
608:                // 4. headers
609:                len = in.readShort();
610:                headers = createHeaders(len);
611:                for (int i = 0; i < len; i++) {
612:                    hdr_name = in.readUTF();
613:                    hdr = readHeader(in);
614:                    headers.put(hdr_name, hdr);
615:                }
616:            }
617:
618:            /* --------------------------------- End of Interface Streamable ----------------------------- */
619:
620:            /* ----------------------------------- Private methods ------------------------------- */
621:
622:            private static void writeHeader(Header value, DataOutputStream out)
623:                    throws IOException {
624:                int magic_number;
625:                String classname;
626:                ObjectOutputStream oos = null;
627:                try {
628:                    magic_number = ClassConfigurator.getInstance(false)
629:                            .getMagicNumber(value.getClass());
630:                    // write the magic number or the class name
631:                    if (magic_number == -1) {
632:                        out.writeBoolean(false);
633:                        classname = value.getClass().getName();
634:                        out.writeUTF(classname);
635:                    } else {
636:                        out.writeBoolean(true);
637:                        out.writeInt(magic_number);
638:                    }
639:
640:                    // write the contents
641:                    if (value instanceof  Streamable) {
642:                        ((Streamable) value).writeTo(out);
643:                    } else {
644:                        oos = new ObjectOutputStream(out);
645:                        value.writeExternal(oos);
646:                        if (!nonStreamableHeaders.contains(value.getClass())) {
647:                            nonStreamableHeaders.add(value.getClass());
648:                            if (log.isTraceEnabled())
649:                                log.trace("encountered non-Streamable header: "
650:                                        + value.getClass());
651:                        }
652:                    }
653:                } catch (ChannelException e) {
654:                    IOException io_ex = new IOException("failed writing header");
655:                    io_ex.initCause(e);
656:                    throw io_ex;
657:                } finally {
658:                    if (oos != null)
659:                        oos.close(); // this is a no-op on ByteArrayOutputStream
660:                }
661:            }
662:
663:            private static Header readHeader(DataInputStream in)
664:                    throws IOException {
665:                Header hdr;
666:                boolean use_magic_number = in.readBoolean();
667:                int magic_number;
668:                String classname;
669:                Class clazz;
670:                ObjectInputStream ois = null;
671:
672:                try {
673:                    if (use_magic_number) {
674:                        magic_number = in.readInt();
675:                        clazz = ClassConfigurator.getInstance(false).get(
676:                                magic_number);
677:                        if (clazz == null)
678:                            log.error("magic number " + magic_number
679:                                    + " is not available in magic map");
680:                    } else {
681:                        classname = in.readUTF();
682:                        clazz = ClassConfigurator.getInstance(false).get(
683:                                classname);
684:                    }
685:                    hdr = (Header) clazz.newInstance();
686:                    if (hdr instanceof  Streamable) {
687:                        ((Streamable) hdr).readFrom(in);
688:                    } else {
689:                        ois = new ObjectInputStream(in);
690:                        hdr.readExternal(ois);
691:                    }
692:                } catch (Exception ex) {
693:                    IOException io_ex = new IOException("failed reading header");
694:                    io_ex.initCause(ex);
695:                    throw io_ex;
696:                }
697:                return hdr;
698:            }
699:
700:            private static Map createHeaders(int size) {
701:                return size > 0 ? new ConcurrentReaderHashMap(size)
702:                        : new ConcurrentReaderHashMap();
703:            }
704:
705:            private static Map createHeaders(Map m) {
706:                return new ConcurrentReaderHashMap(m);
707:            }
708:
709:            /** canonicalize addresses to some extent.  There are race conditions
710:             * allowed in this method, so it may not fully canonicalize an address
711:             * @param nonCanonicalAddress
712:             * @return canonical representation of the address
713:             */
714:            private static Address canonicalAddress(Address nonCanonicalAddress) {
715:                Address result = null;
716:                if (nonCanonicalAddress == null) {
717:                    return null;
718:                }
719:                // do not synchronize between get/put on the canonical map to avoid cost of contention
720:                // this can allow multiple equivalent addresses to leak out, but it's worth the cost savings
721:                try {
722:                    result = (Address) canonicalAddresses
723:                            .get(nonCanonicalAddress);
724:                } catch (NullPointerException npe) {
725:                    // no action needed
726:                }
727:                if (result == null) {
728:                    result = nonCanonicalAddress;
729:                    canonicalAddresses.put(nonCanonicalAddress, result);
730:                }
731:                return result;
732:            }
733:
734:            /* ------------------------------- End of Private methods ---------------------------- */
735:
736:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.