001: /*
002: * Copyright (c) 1998-2008 Caucho Technology -- all rights reserved
003: *
004: * This file is part of Resin(R) Open Source
005: *
006: * Each copy or derived work must preserve the copyright notice and this
007: * notice unmodified.
008: *
009: * Resin Open Source is free software; you can redistribute it and/or modify
010: * it under the terms of the GNU General Public License as published by
011: * the Free Software Foundation; either version 2 of the License, or
012: * (at your option) any later version.
013: *
014: * Resin Open Source is distributed in the hope that it will be useful,
015: * but WITHOUT ANY WARRANTY; without even the implied warranty of
016: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE, or any warranty
017: * of NON-INFRINGEMENT. See the GNU General Public License for more
018: * details.
019: *
020: * You should have received a copy of the GNU General Public License
021: * along with Resin Open Source; if not, write to the
022: *
023: * Free Software Foundation, Inc.
024: * 59 Temple Place, Suite 330
025: * Boston, MA 02111-1307 USA
026: *
027: * @author Scott Ferguson
028: */
029:
030: package com.caucho.jms.message;
031:
032: import javax.jms.JMSException;
033: import javax.jms.MessageEOFException;
034: import javax.jms.StreamMessage;
035: import java.util.ArrayList;
036: import java.io.*;
037:
038: import com.caucho.vfs.*;
039: import com.caucho.hessian.io.*;
040:
041: /**
042: * A stream message.
043: */
044: public class StreamMessageImpl extends MessageImpl implements
045: StreamMessage {
046: private ArrayList<Object> _values = new ArrayList<Object>();
047: private int _index;
048: private byte[] _bytes;
049: private int _bytesOffset;
050:
051: public StreamMessageImpl() {
052: }
053:
054: StreamMessageImpl(StreamMessage stream) throws JMSException {
055: super (stream);
056:
057: try {
058: stream.reset();
059:
060: Object value;
061: while (true) {
062: writeObject(stream.readObject());
063: }
064: } catch (MessageEOFException e) {
065: }
066:
067: reset();
068: }
069:
070: StreamMessageImpl(StreamMessageImpl stream) {
071: super (stream);
072:
073: _values.addAll(stream._values);
074:
075: try {
076: reset();
077: } catch (Exception e) {
078: throw new RuntimeException(e);
079: }
080: }
081:
082: /**
083: * Returns the type enumeration.
084: */
085: @Override
086: public MessageType getType() {
087: return MessageType.STREAM;
088: }
089:
090: /**
091: * Sets the body for reading.
092: */
093: public void setReceive() throws JMSException {
094: super .setReceive();
095:
096: reset();
097: }
098:
099: /**
100: * Set the stream for reading.
101: */
102: public void reset() throws JMSException {
103: setBodyReadOnly();
104:
105: _index = 0;
106: _bytes = null;
107: _bytesOffset = 0;
108: }
109:
110: /**
111: * Read a boolean from the stream.
112: */
113: public boolean readBoolean() throws JMSException {
114: boolean value = ObjectConverter.toBoolean(readObjectImpl());
115:
116: _index++;
117:
118: return value;
119: }
120:
121: /**
122: * Read a byte from the stream.
123: */
124: public byte readByte() throws JMSException {
125: byte value = ObjectConverter.toByte(readObjectImpl());
126:
127: _index++;
128:
129: return value;
130: }
131:
132: /**
133: * Read a short from the stream.
134: */
135: public short readShort() throws JMSException {
136: short value = ObjectConverter.toShort(readObjectImpl());
137:
138: _index++;
139:
140: return value;
141: }
142:
143: /**
144: * Read an integer from the stream.
145: */
146: public int readInt() throws JMSException {
147: int value = ObjectConverter.toInt(readObjectImpl());
148:
149: _index++;
150:
151: return value;
152: }
153:
154: /**
155: * Read a long from the stream.
156: */
157: public long readLong() throws JMSException {
158: long value = ObjectConverter.toLong(readObjectImpl());
159:
160: _index++;
161:
162: return value;
163: }
164:
165: /**
166: * Read a float from the stream.
167: */
168: public float readFloat() throws JMSException {
169: float value = ObjectConverter.toFloat(readObjectImpl());
170:
171: _index++;
172:
173: return value;
174: }
175:
176: /**
177: * Read a double from the stream.
178: */
179: public double readDouble() throws JMSException {
180: double value = ObjectConverter.toDouble(readObjectImpl());
181:
182: _index++;
183:
184: return value;
185: }
186:
187: /**
188: * Read a character object from the stream.
189: */
190: public char readChar() throws JMSException {
191: char value = ObjectConverter.toChar(readObjectImpl());
192:
193: _index++;
194:
195: return value;
196: }
197:
198: /**
199: * Read a string from the stream.
200: */
201: public String readString() throws JMSException {
202: String value = ObjectConverter.toString(readObjectImpl());
203:
204: _index++;
205:
206: return value;
207: }
208:
209: /**
210: * Read a byte array object from the stream.
211: */
212: public int readBytes(byte[] value) throws JMSException {
213: byte[] bytes;
214:
215: if (_bytes != null) {
216: if (_bytesOffset == _bytes.length) {
217: _bytes = null;
218: _bytesOffset = 0;
219: return -1;
220: }
221: } else {
222: _bytes = ObjectConverter.toBytes(readObjectImpl());
223: _index++;
224: }
225:
226: if (_bytes == null)
227: return -1;
228:
229: int sublen = _bytes.length - _bytesOffset;
230: if (value.length < sublen)
231: sublen = value.length;
232:
233: for (int i = 0; i < sublen; i++)
234: value[i] = _bytes[_bytesOffset++];
235:
236: return sublen;
237: }
238:
239: /**
240: * Reads the next object.
241: */
242: public Object readObject() throws JMSException {
243: Object value = readObjectImpl();
244:
245: _index++;
246:
247: return value;
248: }
249:
250: /**
251: * Reads the next object.
252: */
253: private Object readObjectImpl() throws JMSException {
254: checkBodyReadable();
255:
256: if (_values.size() <= _index)
257: throw new MessageEOFException(L
258: .l("end of message in stream"));
259:
260: _bytes = null;
261: _bytesOffset = 0;
262:
263: return _values.get(_index);
264: }
265:
266: /**
267: * Clears the message and puts it into write mode.
268: */
269: public void clearBody() throws JMSException {
270: super .clearBody();
271:
272: _values.clear();
273: _index = 0;
274: _bytes = null;
275: _bytesOffset = 0;
276: }
277:
278: /**
279: * Writes a boolean to the stream.
280: */
281: public void writeBoolean(boolean b) throws JMSException {
282: writeObject(new Boolean(b));
283: }
284:
285: /**
286: * Writes a byte to the stream.
287: */
288: public void writeByte(byte b) throws JMSException {
289: writeObject(new Byte(b));
290: }
291:
292: /**
293: * Writes a short to the stream.
294: */
295: public void writeShort(short s) throws JMSException {
296: writeObject(new Short(s));
297: }
298:
299: /**
300: * Writes an integer to the stream.
301: */
302: public void writeInt(int i) throws JMSException {
303: writeObject(new Integer(i));
304: }
305:
306: /**
307: * Writes a long to the stream.
308: */
309: public void writeLong(long l) throws JMSException {
310: writeObject(new Long(l));
311: }
312:
313: /**
314: * Writes a float to the stream.
315: */
316: public void writeFloat(float f) throws JMSException {
317: writeObject(new Float(f));
318: }
319:
320: /**
321: * Writes a double to the stream.
322: */
323: public void writeDouble(double d) throws JMSException {
324: writeObject(new Double(d));
325: }
326:
327: /**
328: * Writes a string to the stream.
329: */
330: public void writeString(String s) throws JMSException {
331: writeObject(s);
332: }
333:
334: /**
335: * Writes a character to the stream.
336: */
337: public void writeChar(char ch) throws JMSException {
338: writeObject(new Character(ch));
339: }
340:
341: /**
342: * Writes a byte array to the stream.
343: */
344: public void writeBytes(byte[] buf) throws JMSException {
345: writeBytes(buf, 0, buf.length);
346: }
347:
348: /**
349: * Writes a byte array to the stream.
350: */
351: public void writeBytes(byte[] buf, int offset, int length)
352: throws JMSException {
353: byte[] newBuf = new byte[length];
354:
355: System.arraycopy(buf, offset, newBuf, 0, length);
356:
357: writeObject(newBuf);
358: }
359:
360: /**
361: * Writes the next object.
362: */
363: public void writeObject(Object obj) throws JMSException {
364: checkBodyWriteable();
365:
366: _values.add(obj);
367: }
368:
369: @Override
370: public MessageImpl copy() {
371: return new StreamMessageImpl(this );
372: }
373:
374: protected void copy(StreamMessageImpl newMsg) {
375: super .copy(newMsg);
376:
377: newMsg._values = new ArrayList(_values);
378: newMsg._index = 0;
379: }
380:
381: /**
382: * Serialize the body to an input stream.
383: */
384: @Override
385: public InputStream bodyToInputStream() throws IOException {
386: if (_values == null || _values.size() == 0)
387: return null;
388:
389: TempStream body = new TempStream();
390: body.openWrite();
391:
392: StreamImplOutputStream ws = new StreamImplOutputStream(body);
393:
394: Hessian2Output out = new Hessian2Output(ws);
395:
396: out.writeObject(_values);
397:
398: out.close();
399:
400: ws.close();
401:
402: return body.openRead();
403: }
404:
405: /**
406: * Read the body from an input stream.
407: */
408: @Override
409: public void readBody(InputStream is) throws IOException,
410: JMSException {
411: if (is != null) {
412: Hessian2Input in = new Hessian2Input(is);
413:
414: _values = (ArrayList<Object>) in.readObject();
415:
416: in.close();
417: }
418:
419: reset();
420: }
421:
422: public String toString() {
423: return "StreamMessageImpl[]";
424: }
425: }
|