001: /*
002: * Copyright (c) 1998-2008 Caucho Technology -- all rights reserved
003: *
004: * This file is part of Resin(R) Open Source
005: *
006: * Each copy or derived work must preserve the copyright notice and this
007: * notice unmodified.
008: *
009: * Resin Open Source is free software; you can redistribute it and/or modify
010: * it under the terms of the GNU General Public License as published by
011: * the Free Software Foundation; either version 2 of the License, or
012: * (at your option) any later version.
013: *
014: * Resin Open Source is distributed in the hope that it will be useful,
015: * but WITHOUT ANY WARRANTY; without even the implied warranty of
016: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE, or any warranty
017: * of NON-INFRINGEMENT. See the GNU General Public License for more
018: * details.
019: *
020: * You should have received a copy of the GNU General Public License
021: * along with Resin Open Source; if not, write to the
022: *
023: * Free Software Foundation, Inc.
024: * 59 Temple Place, Suite 330
025: * Boston, MA 02111-1307 USA
026: *
027: * @author Scott Ferguson
028: */
029:
030: package com.caucho.jms.message;
031:
032: import com.caucho.jms.JMSExceptionWrapper;
033: import com.caucho.util.CharBuffer;
034: import com.caucho.vfs.*;
035:
036: import javax.jms.BytesMessage;
037: import javax.jms.JMSException;
038: import javax.jms.MessageEOFException;
039: import javax.jms.MessageFormatException;
040: import java.io.*;
041: import java.util.logging.Level;
042:
043: /**
044: * A byte-stream message.
045: */
046: public class BytesMessageImpl extends MessageImpl implements
047: BytesMessage {
048: private TempStream _tempStream;
049: private ReadStream _rs;
050: private WriteStream _ws;
051:
052: public BytesMessageImpl() {
053: }
054:
055: BytesMessageImpl(BytesMessage bytes) throws JMSException {
056: super (bytes);
057:
058: bytes.reset();
059:
060: checkBodyWriteable();
061:
062: try {
063: TempBuffer tempBuf = TempBuffer.allocate();
064: byte[] buffer = tempBuf.getBuffer();
065: WriteStream out = getWriteStream();
066:
067: int sublen;
068:
069: while ((sublen = bytes.readBytes(buffer, buffer.length)) > 0) {
070: out.write(buffer, 0, sublen);
071: }
072:
073: TempBuffer.free(tempBuf);
074: tempBuf = null;
075: } catch (IOException e) {
076: log.log(Level.FINE, e.toString(), e);
077:
078: throw new JMSException(e.toString());
079: }
080:
081: reset();
082: }
083:
084: BytesMessageImpl(BytesMessageImpl bytes) throws JMSException {
085: super (bytes);
086:
087: bytes.reset();
088:
089: _tempStream = bytes._tempStream;
090:
091: reset();
092: }
093:
094: /**
095: * Returns the type enumeration.
096: */
097: @Override
098: public MessageType getType() {
099: return MessageType.BYTES;
100: }
101:
102: /**
103: * Sets the body for reading.
104: */
105: public void setReceive() throws JMSException {
106: super .setReceive();
107:
108: reset();
109: }
110:
111: /**
112: * Set the stream for reading.
113: */
114: public void reset() throws JMSException {
115: setBodyReadOnly();
116:
117: try {
118: // XXX: test for null
119: if (_ws != null)
120: _ws.close();
121:
122: if (_tempStream != null) {
123: if (_rs != null)
124: _rs.close();
125:
126: _rs = _tempStream.openReadAndSaveBuffer();
127: }
128: } catch (IOException e) {
129: throw new JMSExceptionWrapper(e);
130: }
131: }
132:
133: /**
134: * Read a boolean from the stream.
135: */
136: public boolean readBoolean() throws JMSException {
137: ReadStream is = getReadStream();
138:
139: try {
140: int value = is.read();
141:
142: if (value >= 0)
143: return value == 1;
144: else
145: throw new MessageEOFException("BytesMessage EOF");
146: } catch (IOException e) {
147: throw new JMSExceptionWrapper(e);
148: }
149: }
150:
151: /**
152: * Read a byte from the stream.
153: */
154: public byte readByte() throws JMSException {
155: ReadStream is = getReadStream();
156:
157: try {
158: int value = is.read();
159:
160: if (value >= 0)
161: return (byte) value;
162: else
163: throw new MessageEOFException("BytesMessage EOF");
164: } catch (IOException e) {
165: throw new JMSExceptionWrapper(e);
166: }
167: }
168:
169: /**
170: * Read an unsigned byte from the stream.
171: */
172: public int readUnsignedByte() throws JMSException {
173: ReadStream is = getReadStream();
174:
175: if (is == null)
176: return -1;
177:
178: try {
179: int value = is.read();
180:
181: if (value >= 0)
182: return value;
183: else
184: throw new MessageEOFException("BytesMessage EOF");
185: } catch (IOException e) {
186: throw new JMSExceptionWrapper(e);
187: }
188: }
189:
190: /**
191: * Read a short from the stream.
192: */
193: public short readShort() throws JMSException {
194: ReadStream is = getReadStream();
195:
196: try {
197: int d1 = is.read();
198: int d2 = is.read();
199:
200: if (d2 >= 0)
201: return (short) ((d1 << 8) + d2);
202: else
203: throw new MessageEOFException("BytesMessage EOF");
204: } catch (IOException e) {
205: throw new JMSExceptionWrapper(e);
206: }
207: }
208:
209: /**
210: * Read an unsigned short from the stream.
211: */
212: public int readUnsignedShort() throws JMSException {
213: ReadStream is = getReadStream();
214:
215: try {
216: int d1 = is.read();
217: int d2 = is.read();
218:
219: if (d2 >= 0)
220: return ((d1 << 8) + d2);
221: else
222: throw new MessageEOFException("BytesMessage EOF");
223: } catch (IOException e) {
224: throw new JMSExceptionWrapper(e);
225: }
226: }
227:
228: /**
229: * Read an integer from the stream.
230: */
231: public int readInt() throws JMSException {
232: ReadStream is = getReadStream();
233:
234: try {
235: int d1 = is.read();
236: int d2 = is.read();
237: int d3 = is.read();
238: int d4 = is.read();
239:
240: if (d4 >= 0)
241: return (d1 << 24) + (d2 << 16) + (d3 << 8) + d4;
242: else
243: throw new MessageEOFException("BytesMessage EOF");
244: } catch (IOException e) {
245: throw new JMSExceptionWrapper(e);
246: }
247: }
248:
249: /**
250: * Read a long from the stream.
251: */
252: public long readLong() throws JMSException {
253: ReadStream is = getReadStream();
254:
255: try {
256: long d1 = is.read();
257: long d2 = is.read();
258: long d3 = is.read();
259: long d4 = is.read();
260: long d5 = is.read();
261: long d6 = is.read();
262: long d7 = is.read();
263: long d8 = is.read();
264:
265: if (d8 >= 0) {
266: return ((d1 << 56) + (d2 << 48) + (d3 << 40)
267: + (d4 << 32) + (d5 << 24) + (d6 << 16)
268: + (d7 << 8) + (d8));
269: } else
270: throw new MessageEOFException("BytesMessage EOF");
271: } catch (IOException e) {
272: throw new JMSExceptionWrapper(e);
273: }
274: }
275:
276: /**
277: * Read a float from the stream.
278: */
279: public float readFloat() throws JMSException {
280: return Float.intBitsToFloat(readInt());
281: }
282:
283: /**
284: * Read a double from the stream.
285: */
286: public double readDouble() throws JMSException {
287: return Double.longBitsToDouble(readLong());
288: }
289:
290: /**
291: * Read a character object from the stream.
292: */
293: public char readChar() throws JMSException {
294: ReadStream is = getReadStream();
295:
296: try {
297: int d1 = is.read();
298: int d2 = is.read();
299:
300: if (d2 >= 0)
301: return (char) ((d1 << 8) + d2);
302: else
303: throw new MessageEOFException("BytesMessage EOF");
304: } catch (IOException e) {
305: throw new JMSExceptionWrapper(e);
306: }
307: }
308:
309: /**
310: * Read a string from the stream.
311: */
312: public String readUTF() throws JMSException {
313: ReadStream is = getReadStream();
314: CharBuffer cb = new CharBuffer();
315:
316: try {
317: int len = readShort();
318:
319: int d1;
320:
321: while (len > 0) {
322: d1 = is.read();
323:
324: if (d1 < 0x80) {
325: cb.append((char) d1);
326: len -= 1;
327: } else if ((d1 & 0xe0) == 0xc0) {
328: int d2 = is.read();
329:
330: cb
331: .append((char) (((d1 & 0x1f) << 6) + (d2 & 0x3f)));
332:
333: len -= 2;
334: } else if ((d1 & 0xf0) == 0xe0) {
335: int d2 = is.read();
336: int d3 = is.read();
337:
338: cb.append((char) (((d1 & 0xf) << 12)
339: + ((d2 & 0x3f) << 6) + (d3 & 0x3f)));
340:
341: len -= 3;
342: } else
343: throw new MessageFormatException(L
344: .l("invalid UTF-8 in bytes message"));
345: }
346: } catch (JMSException e) {
347: throw e;
348: } catch (RuntimeException e) {
349: throw e;
350: } catch (Throwable e) {
351: throw new JMSExceptionWrapper(e);
352: }
353:
354: return cb.toString();
355: }
356:
357: /**
358: * Read a byte array object from the stream.
359: */
360: public int readBytes(byte[] value) throws JMSException {
361: ReadStream is = getReadStream();
362:
363: try {
364: return is.read(value);
365: } catch (IOException e) {
366: throw new JMSExceptionWrapper(e);
367: }
368: }
369:
370: /**
371: * Read a byte array object from the stream.
372: */
373: public int readBytes(byte[] value, int length) throws JMSException {
374: ReadStream is = getReadStream();
375:
376: try {
377: return is.read(value, 0, length);
378: } catch (IOException e) {
379: throw new JMSExceptionWrapper(e);
380: }
381: }
382:
383: protected ReadStream getReadStream() throws JMSException {
384: checkBodyReadable();
385:
386: /* ejb/6a87
387: if (_rs == null)
388: throw new MessageEOFException(L.l("bytes message may not be read"));
389: */
390:
391: return _rs;
392: }
393:
394: /**
395: * Clears the message and puts it into write mode.
396: */
397: public void clearBody() throws JMSException {
398: super .clearBody();
399:
400: _ws = null;
401: _tempStream = null;
402: _rs = null;
403: }
404:
405: /**
406: * Writes a boolean to the stream.
407: */
408: public void writeBoolean(boolean b) throws JMSException {
409: try {
410: getWriteStream().write(b ? 1 : 0);
411: } catch (IOException e) {
412: throw new JMSExceptionWrapper(e);
413: }
414: }
415:
416: /**
417: * Writes a byte to the stream.
418: */
419: public void writeByte(byte b) throws JMSException {
420: try {
421: getWriteStream().write(b);
422: } catch (IOException e) {
423: throw new JMSExceptionWrapper(e);
424: }
425: }
426:
427: /**
428: * Writes a short to the stream.
429: */
430: public void writeShort(short s) throws JMSException {
431: try {
432: WriteStream ws = getWriteStream();
433:
434: ws.write(s >> 8);
435: ws.write(s);
436: } catch (IOException e) {
437: throw new JMSExceptionWrapper(e);
438: }
439: }
440:
441: /**
442: * Writes an integer to the stream.
443: */
444: public void writeInt(int i) throws JMSException {
445: try {
446: WriteStream ws = getWriteStream();
447:
448: ws.write(i >> 24);
449: ws.write(i >> 16);
450: ws.write(i >> 8);
451: ws.write(i);
452: } catch (IOException e) {
453: throw new JMSExceptionWrapper(e);
454: }
455: }
456:
457: /**
458: * Writes a long to the stream.
459: */
460: public void writeLong(long l) throws JMSException {
461: try {
462: WriteStream ws = getWriteStream();
463:
464: ws.write((int) (l >> 56));
465: ws.write((int) (l >> 48));
466: ws.write((int) (l >> 40));
467: ws.write((int) (l >> 32));
468: ws.write((int) (l >> 24));
469: ws.write((int) (l >> 16));
470: ws.write((int) (l >> 8));
471: ws.write((int) l);
472: } catch (IOException e) {
473: throw new JMSExceptionWrapper(e);
474: }
475: }
476:
477: /**
478: * Writes a float to the stream.
479: */
480: public void writeFloat(float f) throws JMSException {
481: int i = Float.floatToIntBits(f);
482: writeInt(i);
483: }
484:
485: /**
486: * Writes a double to the stream.
487: */
488: public void writeDouble(double d) throws JMSException {
489: long l = Double.doubleToLongBits(d);
490: writeLong(l);
491: }
492:
493: /**
494: * Writes a string to the stream.
495: */
496: public void writeUTF(String s) throws JMSException {
497: try {
498: WriteStream out = getWriteStream();
499:
500: int len = s.length();
501:
502: int byteLength = 0;
503:
504: for (int i = 0; i < len; i++) {
505: int ch = s.charAt(0);
506:
507: if (ch == 0)
508: byteLength += 2;
509: else if (ch < 0x80)
510: byteLength += 1;
511: else if (ch < 0x800)
512: byteLength += 2;
513: else
514: byteLength += 3;
515: }
516:
517: out.write(byteLength >> 8);
518: out.write(byteLength);
519:
520: for (int i = 0; i < len; i++) {
521: int ch = s.charAt(i);
522:
523: if (ch == 0) {
524: out.write(0xc0);
525: out.write(0x80);
526: } else if (ch < 0x80)
527: out.write(ch);
528: else if (ch < 0x800) {
529: out.write(0xc0 + ((ch >> 6) & 0x1f));
530: out.write(0x80 + (ch & 0x3f));
531: } else if (ch < 0x8000) {
532: out.write(0xe0 + ((ch >> 12) & 0x0f));
533: out.write(0x80 + ((ch >> 6) & 0x3f));
534: out.write(0x80 + (ch & 0x3f));
535: }
536: }
537: } catch (IOException e) {
538: throw new JMSExceptionWrapper(e);
539: }
540: }
541:
542: /**
543: * Writes a character to the stream.
544: */
545: public void writeChar(char ch) throws JMSException {
546: try {
547: WriteStream ws = getWriteStream();
548:
549: ws.write(ch >> 8);
550: ws.write(ch);
551: } catch (IOException e) {
552: throw new JMSExceptionWrapper(e);
553: }
554: }
555:
556: /**
557: * Writes a byte array to the stream.
558: */
559: public void writeBytes(byte[] buf) throws JMSException {
560: writeBytes(buf, 0, buf.length);
561: }
562:
563: /**
564: * Writes a byte array to the stream.
565: */
566: public void writeBytes(byte[] buf, int offset, int length)
567: throws JMSException {
568: try {
569: WriteStream ws = getWriteStream();
570:
571: ws.write(buf, offset, length);
572: } catch (IOException e) {
573: throw new JMSExceptionWrapper(e);
574: }
575: }
576:
577: /**
578: * Writes the next object.
579: */
580: public void writeObject(Object obj) throws JMSException {
581: if (obj == null)
582: throw new NullPointerException();
583: else if (obj instanceof Boolean)
584: writeBoolean(((Boolean) obj).booleanValue());
585: else if (obj instanceof Byte)
586: writeByte(((Byte) obj).byteValue());
587: else if (obj instanceof Short)
588: writeShort(((Short) obj).shortValue());
589: else if (obj instanceof Character)
590: writeChar(((Character) obj).charValue());
591: else if (obj instanceof Integer)
592: writeInt(((Integer) obj).intValue());
593: else if (obj instanceof Long)
594: writeLong(((Long) obj).longValue());
595: else if (obj instanceof Float)
596: writeFloat(((Float) obj).floatValue());
597: else if (obj instanceof Double)
598: writeDouble(((Double) obj).doubleValue());
599: else if (obj instanceof String)
600: writeUTF((String) obj);
601: else if (obj instanceof byte[])
602: writeBytes((byte[]) obj);
603: else
604: throw new MessageFormatException(obj.getClass().getName()
605: + ": " + String.valueOf(obj));
606: }
607:
608: public long getBodyLength() throws JMSException {
609: checkBodyReadable();
610:
611: if (_tempStream == null)
612: return 0;
613: else
614: return _tempStream.getLength();
615: }
616:
617: protected WriteStream getWriteStream() throws JMSException {
618: checkBodyWriteable();
619:
620: if (_tempStream == null)
621: _tempStream = new TempStream();
622:
623: if (_ws == null)
624: _ws = new WriteStream(_tempStream);
625:
626: return _ws;
627: }
628:
629: public MessageImpl copy() {
630: try {
631: return new BytesMessageImpl(this );
632: } catch (Exception e) {
633: throw new RuntimeException(e);
634: }
635: }
636:
637: protected void copy(BytesMessageImpl newMsg) {
638: super .copy(newMsg);
639:
640: try {
641: if (_ws != null)
642: _ws.flush();
643:
644: if (_tempStream != null)
645: newMsg._tempStream = _tempStream.copy();
646: } catch (Exception e) {
647: log.log(Level.WARNING, e.toString(), e);
648: }
649: }
650:
651: /**
652: * Serialize the body to an input stream.
653: */
654: @Override
655: public InputStream bodyToInputStream() throws IOException {
656: if (_tempStream != null)
657: return _tempStream.openReadAndSaveBuffer();
658: else
659: return null;
660: }
661:
662: /**
663: * Read the body from an input stream.
664: */
665: @Override
666: public void readBody(InputStream is) throws IOException,
667: JMSException {
668: if (is != null) {
669: WriteStream out = getWriteStream();
670:
671: out.writeStream(is);
672:
673: out.close();
674: }
675:
676: reset();
677: }
678:
679: public String toString() {
680: return "BytesMessageImpl[]";
681: }
682: }
|