001: package com.mockrunner.mock.jms;
002:
003: import javax.jms.InvalidSelectorException;
004: import javax.jms.JMSException;
005: import javax.jms.Message;
006: import javax.jms.MessageConsumer;
007: import javax.jms.MessageListener;
008:
009: import org.activemq.filter.mockrunner.Filter;
010: import org.activemq.selector.mockrunner.SelectorParser;
011:
012: /**
013: * Mock implementation of JMS <code>MessageConsumer</code>.
014: */
015: public abstract class MockMessageConsumer implements MessageConsumer {
016: private MockConnection connection;
017: private String messageSelector;
018: private Filter messageSelectorFilter;
019: private boolean closed;
020: private MessageListener messageListener;
021:
022: public MockMessageConsumer(MockConnection connection,
023: String messageSelector) {
024: this .connection = connection;
025: this .messageSelector = messageSelector;
026: parseMessageSelector();
027: closed = false;
028: messageListener = null;
029: }
030:
031: private void parseMessageSelector() {
032: if (null == messageSelector || messageSelector.length() == 0) {
033: this .messageSelectorFilter = null;
034: } else {
035: try {
036: this .messageSelectorFilter = new SelectorParser()
037: .parse(messageSelector);
038: } catch (InvalidSelectorException exc) {
039: throw new RuntimeException(
040: "Error parsing message selector: "
041: + exc.getMessage());
042: }
043: }
044: }
045:
046: /**
047: * Returns if this consumer was closed.
048: * @return <code>true</code> if this consumer is closed
049: */
050: public boolean isClosed() {
051: return closed;
052: }
053:
054: /**
055: * Returns if this consumer can consume an incoming message,
056: * i.e. if a <code>MessageListener</code> is registered,
057: * the receiver isn't closed and has an approriate selector.
058: * @return <code>true</code> if this receiver can consume the message
059: */
060: public boolean canConsume(Message message) {
061: if (messageListener == null)
062: return false;
063: if (isClosed())
064: return false;
065: return matchesMessageSelector(message);
066: }
067:
068: /**
069: * Adds a message that is immediately propagated to the
070: * message listener. If there's no message listener,
071: * nothing happens.
072: * @param message the message
073: */
074: public void receiveMessage(Message message) {
075: if (null == messageListener)
076: return;
077: messageListener.onMessage(message);
078: }
079:
080: public String getMessageSelector() throws JMSException {
081: connection.throwJMSException();
082: return messageSelector;
083: }
084:
085: public MessageListener getMessageListener() throws JMSException {
086: connection.throwJMSException();
087: return messageListener;
088: }
089:
090: public void setMessageListener(MessageListener messageListener)
091: throws JMSException {
092: connection.throwJMSException();
093: this .messageListener = messageListener;
094: }
095:
096: public Message receive(long timeout) throws JMSException {
097: connection.throwJMSException();
098: return receive();
099: }
100:
101: public Message receiveNoWait() throws JMSException {
102: connection.throwJMSException();
103: return receive();
104: }
105:
106: public void close() throws JMSException {
107: connection.throwJMSException();
108: closed = true;
109: }
110:
111: private boolean matchesMessageSelector(Message message) {
112: if (!connection.getConfigurationManager()
113: .getUseMessageSelectors())
114: return true;
115: if (messageSelectorFilter == null)
116: return true;
117: try {
118: return messageSelectorFilter.matches(message);
119: } catch (JMSException exc) {
120: throw new RuntimeException(exc.getMessage());
121: }
122: }
123:
124: protected Filter getMessageFilter() {
125: return messageSelectorFilter;
126: }
127:
128: protected MockConnection getConnection() {
129: return connection;
130: }
131: }
|