001: package org.mockejb.jms;
002:
003: import java.util.*;
004: import javax.jms.*;
005:
006: /**
007: * <code>StreamMessage</code> implementation.
008: * @author Dimitar Gospodinov
009: */
010: public class StreamMessageImpl extends MessageImpl implements
011: StreamMessage {
012:
013: private final List streamData = new ArrayList();
014:
015: // Shows what should be read next from the stream
016: private int position;
017: // Shows what should be read next from current byte[]
018: private int byteArrayPosition;
019:
020: /**
021: * Creates empty <code>StreamMessageImpl</code>.
022: */
023: public StreamMessageImpl() {
024: super ();
025: }
026:
027: /**
028: * Creates new <code>StreamMessageImpl</code> initialized with
029: * header, properties and body from <code>msg</code>.
030: * The state of <code>msg</code> is not changed.
031: * @param msg message to copy from
032: * @throws JMSException
033: */
034: public StreamMessageImpl(StreamMessage msg) throws JMSException {
035: super (msg);
036: setBody(msg);
037: }
038:
039: /**
040: * Sets the body of this message to the body of <code>msg</code>.
041: * The state of <code>msg</code> is not changed.
042: * @param msg
043: * @throws JMSException
044: */
045: private void setBody(StreamMessage msg) throws JMSException {
046: // Number of elements remaining until the end of the stream
047: int elementsRemaining = 0;
048:
049: while (true) {
050: try {
051: msg.readObject();
052: elementsRemaining++;
053: } catch (MessageEOFException ex) {
054: /*
055: * Reached EOF.
056: * Now <code>bytesRemaining</code> contains number of bytes
057: * remaining in the byte stream of <code>msg</code>
058: * Next - reset <code>msg</code> and "extract" all bytes into
059: * <code>sourceBytes</code>
060: */
061: extractElements(msg);
062: // Reset the message and move to the original position in the stream
063: msg.reset();
064: int moveTo = streamData.size() - elementsRemaining;
065: while (moveTo-- > 0) {
066: msg.readObject();
067: }
068: break;
069: } catch (MessageNotReadableException ex) {
070: // Message is in Write-Only mode
071: extractElements(msg);
072: /*
073: * At this point <code>msg</code> is in Read-Only mode
074: * Switch to Write-Only mode and restore the original stream.
075: */
076: msg.clearBody();
077: for (int i = 0; i < streamData.size(); i++) {
078: msg.writeObject(streamData.get(i));
079: }
080: break;
081: }
082: }
083: }
084:
085: /**
086: * Calls <code>msg.reset</code> and reads all elements from its body.
087: * The elements are added to the body of this message.
088: * @param msg
089: * @throws JMSException
090: */
091: private void extractElements(StreamMessage msg) throws JMSException {
092: msg.reset();
093: while (true) {
094: try {
095: writeObject(msg.readObject());
096: } catch (MessageEOFException ex) {
097: break;
098: }
099: }
100: }
101:
102: /**
103: * @see javax.jms.StreamMessage#clearBody()
104: */
105: public void clearBody() throws JMSException {
106: super .clearBody();
107: streamData.clear();
108: }
109:
110: /**
111: * @see javax.jms.StreamMessage#readBoolean()
112: */
113: public boolean readBoolean() throws JMSException {
114: checkBodyReadable();
115: checkEOF();
116:
117: Object value = streamData.get(position);
118: boolean result;
119:
120: if (value instanceof Boolean) {
121: result = ((Boolean) value).booleanValue();
122: } else if (value == null || value instanceof String) {
123: result = Boolean.valueOf((String) value).booleanValue();
124: } else {
125: throw new MessageFormatException(
126: "Current position does not contain valid Boolean value!");
127: }
128: position++;
129: return result;
130: }
131:
132: /**
133: * @see javax.jms.StreamMessage#readByte()
134: */
135: public byte readByte() throws JMSException {
136: checkBodyReadable();
137: checkEOF();
138:
139: Object value = streamData.get(position);
140: byte result;
141:
142: if (value instanceof Byte) {
143: result = ((Byte) value).byteValue();
144: } else if (value == null || value instanceof String) {
145: result = Byte.valueOf((String) value).byteValue();
146: } else {
147: throw new MessageFormatException(
148: "Current position does not contain valid Byte value!");
149: }
150: position++;
151: return result;
152: }
153:
154: /**
155: * @see javax.jms.StreamMessage#readShort()
156: */
157: public short readShort() throws JMSException {
158: checkBodyReadable();
159: checkEOF();
160:
161: Object value = streamData.get(position);
162: short result;
163:
164: if (value instanceof Byte || value instanceof Short) {
165: result = ((Number) value).shortValue();
166: } else if (value == null || value instanceof String) {
167: result = Short.valueOf((String) value).shortValue();
168: } else {
169: throw new MessageFormatException(
170: "Current position does not contain valid Short value!");
171: }
172: position++;
173: return result;
174: }
175:
176: /**
177: * @see javax.jms.StreamMessage#readChar()
178: */
179: public char readChar() throws JMSException {
180: checkBodyReadable();
181: checkEOF();
182:
183: Object value = streamData.get(position);
184: char result;
185:
186: if (value == null) {
187: throw new NullPointerException();
188: } else if (value instanceof Character) {
189: result = ((Character) value).charValue();
190: } else {
191: throw new MessageFormatException(
192: "Current position does not contain valid Char value!");
193: }
194: position++;
195: return result;
196: }
197:
198: /**
199: * @see javax.jms.StreamMessage#readInt()
200: */
201: public int readInt() throws JMSException {
202: checkBodyReadable();
203: checkEOF();
204:
205: Object value = streamData.get(position);
206: int result;
207:
208: if (value instanceof Byte || value instanceof Short
209: || value instanceof Integer) {
210: result = ((Number) value).intValue();
211: } else if (value == null || value instanceof String) {
212: result = Integer.valueOf((String) value).intValue();
213: } else {
214: throw new MessageFormatException(
215: "Current position does not contain valid Integer value!");
216: }
217: position++;
218: return result;
219: }
220:
221: /**
222: * @see javax.jms.StreamMessage#readLong()
223: */
224: public long readLong() throws JMSException {
225: checkBodyReadable();
226: checkEOF();
227:
228: Object value = streamData.get(position);
229: long result;
230:
231: if (value instanceof Byte || value instanceof Short
232: || value instanceof Integer || value instanceof Long) {
233:
234: result = ((Number) value).longValue();
235: } else if (value == null || value instanceof String) {
236: result = Long.valueOf((String) value).longValue();
237: } else {
238: throw new MessageFormatException(
239: "Current position does not contain valid Long value!");
240: }
241: position++;
242: return result;
243: }
244:
245: /**
246: * @see javax.jms.StreamMessage#readFloat()
247: */
248: public float readFloat() throws JMSException {
249: checkBodyReadable();
250: checkEOF();
251:
252: Object value = streamData.get(position);
253: float result;
254:
255: if (value instanceof Float) {
256: result = ((Float) value).floatValue();
257: } else if (value == null || value instanceof String) {
258: result = Float.valueOf((String) value).floatValue();
259: } else {
260: throw new MessageFormatException(
261: "Current position does not contain valid Float value!");
262: }
263: position++;
264: return result;
265: }
266:
267: /**
268: * @see javax.jms.StreamMessage#readDouble()
269: */
270: public double readDouble() throws JMSException {
271: checkBodyReadable();
272: checkEOF();
273:
274: Object value = streamData.get(position);
275: double result;
276:
277: if (value instanceof Float || value instanceof Double) {
278: result = ((Number) value).doubleValue();
279: } else if (value == null || value instanceof String) {
280: result = Double.valueOf((String) value).doubleValue();
281: } else {
282: throw new MessageFormatException(
283: "Current position does not contain valid Double value!");
284: }
285: position++;
286: return result;
287: }
288:
289: /**
290: * @see javax.jms.StreamMessage#readString()
291: */
292: public String readString() throws JMSException {
293: checkBodyReadable();
294: checkEOF();
295:
296: Object value = streamData.get(position);
297: if (value instanceof byte[]) {
298: throw new MessageFormatException(
299: "Current position does not contain valid UTF value!");
300: }
301: position++;
302: if (value == null) {
303: return null;
304: }
305: return value.toString();
306: }
307:
308: /**
309: * @see javax.jms.StreamMessage#readBytes(byte[])
310: */
311: public int readBytes(byte[] bytes) throws JMSException {
312: checkBodyReadable();
313: checkEOF();
314:
315: if (bytes == null) {
316: throw new NullPointerException();
317: }
318:
319: Object value = streamData.get(position);
320: if (byteArrayPosition == -1 || value == null) {
321: position++;
322: byteArrayPosition = 0;
323: return -1;
324: }
325: if (!(value instanceof byte[])) {
326: throw new MessageFormatException(
327: "Current position does not contain valid byte[] value!");
328: }
329:
330: byte[] byteArray = (byte[]) value;
331: if (byteArray.length == 0) {
332: byteArrayPosition = -1;
333: return 0;
334: }
335:
336: // Determine how many bytes to copy into <code>bytes</code>
337: int numOfBytesToCopy;
338: int remainingBytesToCopy = byteArray.length - byteArrayPosition;
339: int startOffset = byteArrayPosition;
340:
341: if (remainingBytesToCopy <= bytes.length) {
342: numOfBytesToCopy = remainingBytesToCopy;
343: byteArrayPosition = -1;
344: } else {
345: numOfBytesToCopy = bytes.length;
346: byteArrayPosition += numOfBytesToCopy;
347: }
348:
349: int i = 0;
350: int j = numOfBytesToCopy;
351: while (j > 0) {
352: bytes[i++] = byteArray[startOffset++];
353: j--;
354: }
355: return numOfBytesToCopy;
356: }
357:
358: /**
359: * @see javax.jms.StreamMessage#readObject()
360: */
361: public Object readObject() throws JMSException {
362: checkBodyReadable();
363: checkEOF();
364:
365: Object result = streamData.get(position++);
366: if (result instanceof byte[]) {
367: result = ((byte[]) result).clone();
368: }
369: return result;
370: }
371:
372: /**
373: * @see javax.jms.StreamMessage#writeBoolean(boolean)
374: */
375: public void writeBoolean(boolean value) throws JMSException {
376: checkBodyWriteable();
377: streamData.add(new Boolean(value));
378: }
379:
380: /**
381: * @see javax.jms.StreamMessage#writeByte(byte)
382: */
383: public void writeByte(byte value) throws JMSException {
384: checkBodyWriteable();
385: streamData.add(new Byte(value));
386: }
387:
388: /**
389: * @see javax.jms.StreamMessage#writeShort(short)
390: */
391: public void writeShort(short value) throws JMSException {
392: checkBodyWriteable();
393: streamData.add(new Short(value));
394: }
395:
396: /**
397: * @see javax.jms.StreamMessage#writeChar(char)
398: */
399: public void writeChar(char value) throws JMSException {
400: checkBodyWriteable();
401: streamData.add(new Character(value));
402: }
403:
404: /**
405: * @see javax.jms.StreamMessage#writeInt(int)
406: */
407: public void writeInt(int value) throws JMSException {
408: checkBodyWriteable();
409: streamData.add(new Integer(value));
410: }
411:
412: /**
413: * @see javax.jms.StreamMessage#writeLong(long)
414: */
415: public void writeLong(long value) throws JMSException {
416: checkBodyWriteable();
417: streamData.add(new Long(value));
418: }
419:
420: /**
421: * @see javax.jms.StreamMessage#writeFloat(float)
422: */
423: public void writeFloat(float value) throws JMSException {
424: checkBodyWriteable();
425: streamData.add(new Float(value));
426: }
427:
428: /**
429: * @see javax.jms.StreamMessage#writeDouble(double)
430: */
431: public void writeDouble(double value) throws JMSException {
432: checkBodyWriteable();
433: streamData.add(new Double(value));
434: }
435:
436: /**
437: * @see javax.jms.StreamMessage#writeString(java.lang.String)
438: */
439: public void writeString(String value) throws JMSException {
440: checkBodyWriteable();
441: streamData.add(value);
442: }
443:
444: /**
445: * @see javax.jms.StreamMessage#writeBytes(byte[])
446: */
447: public void writeBytes(byte[] bytes) throws JMSException {
448: checkBodyWriteable();
449: if (bytes == null) {
450: streamData.add(null);
451: return;
452: }
453: writeBytes(bytes, 0, bytes.length);
454: }
455:
456: /**
457: * @see javax.jms.StreamMessage#writeBytes(byte[], int, int)
458: */
459: public void writeBytes(byte[] bytes, int offset, int length)
460: throws JMSException {
461:
462: checkBodyWriteable();
463: if (bytes == null) {
464: streamData.add(null);
465: return;
466: }
467: if (offset < 0 || length < 0
468: || (offset + length) > bytes.length) {
469: throw new IllegalArgumentException();
470: }
471:
472: byte[] bytesToAdd = new byte[length];
473: for (int i = 0; i < length; bytesToAdd[i++] = bytes[offset++])
474: ;
475: streamData.add(bytesToAdd);
476: }
477:
478: /**
479: * @see javax.jms.StreamMessage#writeObject(java.lang.Object)
480: */
481: public void writeObject(Object value) throws JMSException {
482: checkBodyWriteable();
483:
484: if (!(value == null || value instanceof Boolean
485: || value instanceof Byte || value instanceof Short
486: || value instanceof Integer || value instanceof Long
487: || value instanceof Float || value instanceof Double
488: || value instanceof String
489: || value instanceof Character || value instanceof byte[])) {
490: throw new MessageFormatException("Incorrect object type!");
491: }
492: if (value instanceof byte[]) {
493: value = ((byte[]) value).clone();
494: }
495: streamData.add(value);
496: }
497:
498: /**
499: * @see javax.jms.StreamMessage#reset()
500: */
501: public void reset() throws JMSException {
502: setBodyReadOnly();
503: position = 0;
504: byteArrayPosition = 0;
505: }
506:
507: // Non standard methods
508:
509: /**
510: * Checks if EOF has been reached and throws
511: * <code>MessageEOFException</code> if yes.
512: * @throws MessageEOFException if EOF has been reached.
513: */
514: private void checkEOF() throws MessageEOFException {
515: if (position >= streamData.size()) {
516: throw new MessageEOFException("EOF reached!");
517: }
518: }
519:
520: /**
521: * Returns array containing all elements from the stream, in
522: * the order they were added.
523: * @return
524: */
525: Object[] getStreamData() {
526: return streamData.toArray();
527: }
528:
529: // Non-standard methods
530:
531: /**
532: * Sets message body in read-only mode.
533: * @throws JMSException
534: */
535: void resetBody() throws JMSException {
536: reset();
537: }
538:
539: }
|