001: package org.mockejb.jms;
002:
003: import javax.jms.*;
004: import java.util.*;
005:
006: import org.mockejb.interceptor.InterceptableProxy;
007:
008: /**
009: * @author Dimitar Gospodinov
010: *
011: */
012: public class MockConsumer implements MessageConsumer {
013:
014: private boolean closed = false;
015:
016: private MockSession sess;
017: private MockDestination destination;
018: private final List messages = new ArrayList();
019: private MessageListener listener = null;
020:
021: MockConsumer(MockSession sess, MockDestination destination) {
022: this .sess = sess;
023: this .destination = destination;
024: }
025:
026: /**
027: * @see javax.jms.MessageConsumer#getMessageSelector()
028: */
029: public String getMessageSelector() throws JMSException {
030: return null;
031: }
032:
033: /**
034: * @see javax.jms.MessageConsumer#getMessageListener()
035: */
036: public MessageListener getMessageListener() throws JMSException {
037: return listener;
038: }
039:
040: /**
041: * @see javax.jms.MessageConsumer#setMessageListener(javax.jms.MessageListener)
042: */
043: public void setMessageListener(MessageListener listener)
044: throws JMSException {
045:
046: this .listener = (MessageListener) InterceptableProxy.create(
047: javax.jms.MessageListener.class, listener);
048: }
049:
050: /**
051: * @see javax.jms.MessageConsumer#receive()
052: */
053: public Message receive() throws JMSException {
054: if (isClosed()) {
055: return null;
056: }
057: if (messages.size() == 0) {
058: throw new RuntimeException("No messages received");
059: }
060: return (Message) messages.remove(0);
061: }
062:
063: /**
064: * @see javax.jms.MessageConsumer#receive(long)
065: */
066: public Message receive(long timeout) throws JMSException {
067: if (isClosed() || (messages.size() == 0 && timeout != 0)) {
068: return null;
069: }
070: return receive();
071: }
072:
073: /**
074: * @see javax.jms.MessageConsumer#receiveNoWait()
075: */
076: public Message receiveNoWait() throws JMSException {
077: return receive(1);
078: }
079:
080: /**
081: * @see javax.jms.MessageConsumer#close()
082: */
083: public void close() throws JMSException {
084: closed = true;
085: destination.removeConsumer(this );
086: messages.clear();
087: listener = null;
088: destination = null;
089: }
090:
091: // Non-standard methods
092:
093: /**
094: * Asynchronously consume all received messages.
095: */
096: void consume() throws JMSException {
097: if (listener == null || messages.size() == 0) {
098: return;
099: }
100: ListIterator it = messages.listIterator();
101: System.out.println("Consuming...");
102: while (it.hasNext()) {
103: try {
104: listener.onMessage((Message) it.next());
105: } catch (Throwable t) {
106: throw new RuntimeException(
107: "Escaped exception from MessageListener (faulty listener):\n"
108: + t.getMessage());
109: }
110: it.remove();
111: }
112: }
113:
114: /**
115: * Consume message sent from destination.
116: * If MessageListener is available and the connection is started, message is consumed immediatelly.
117: * In all other cases message is stored and will be consumed later.
118: * @param msg
119: * @throws JMSException
120: */
121: void consume(Message msg) throws JMSException {
122:
123: MessageImpl receivedMsg = MessageUtility.copyMessage(msg, true);
124: MessageListener l = getMessageListener();
125:
126: if (l != null && isStarted()) {
127: try {
128: l.onMessage(receivedMsg);
129: } catch (Throwable t) {
130: throw new RuntimeException(
131: "Escaped exception from MessageListener (faulty listener):\n"
132: + t.getMessage());
133: }
134: } else {
135: messages.add(receivedMsg);
136: }
137: }
138:
139: /**
140: * Consumes all specified messages. <code>messages</code> is collection of
141: * <code>Message</code>.
142: * Each message is copied and <code>consume(Message)</code> is called with the copy.
143: * @param messages
144: */
145: void consume(Collection messages) throws JMSException {
146: Iterator it = messages.iterator();
147: while (it.hasNext()) {
148: consume((Message) it.next());
149: }
150: }
151:
152: boolean isClosed() {
153: return closed;
154: }
155:
156: MockDestination getDestination() {
157: return destination;
158: }
159:
160: private boolean isStarted() {
161: return sess.getConnection().isStarted();
162: }
163:
164: }
|