01: package com.mockrunner.mock.jms;
02:
03: import java.util.Iterator;
04: import java.util.List;
05: import java.util.Map;
06:
07: import javax.jms.JMSException;
08: import javax.jms.Message;
09: import javax.jms.MessageListener;
10: import javax.jms.Topic;
11:
12: /**
13: * Mock implementation of JMS <code>Topic</code>.
14: */
15: public class MockTopic extends MockDestination implements Topic {
16: private String name;
17:
18: public MockTopic(String name) {
19: this .name = name;
20: }
21:
22: public String getTopicName() throws JMSException {
23: return name;
24: }
25:
26: /**
27: * Adds a message to this <code>Topic</code> that will
28: * be propagated to the corresponding receivers.
29: * @param message the message
30: */
31: public void addMessage(Message message) throws JMSException {
32: addReceivedMessage(message);
33: boolean isConsumed = false;
34: Iterator sessionsIterator = sessionSet().iterator();
35: while (sessionsIterator.hasNext()) {
36: MockSession session = (MockSession) sessionsIterator.next();
37: MessageListener globalListener = session
38: .getMessageListener();
39: if (null != globalListener) {
40: globalListener.onMessage(message);
41: isConsumed = true;
42: acknowledgeMessage(message, session);
43: } else {
44: List subscribers = session
45: .getTopicTransmissionManager()
46: .getTopicSubscriberList(name);
47: for (int ii = 0; ii < subscribers.size(); ii++) {
48: MockTopicSubscriber subscriber = (MockTopicSubscriber) subscribers
49: .get(ii);
50: if (subscriber.canConsume(message)) {
51: subscriber.receiveMessage(message);
52: isConsumed = true;
53: acknowledgeMessage(message, session);
54: }
55: }
56: Map durableSubscribers = session
57: .getTopicTransmissionManager()
58: .getDurableTopicSubscriberMap(name);
59: Iterator keys = durableSubscribers.keySet().iterator();
60: while (keys.hasNext()) {
61: MockTopicSubscriber subscriber = (MockTopicSubscriber) durableSubscribers
62: .get(keys.next());
63: if (subscriber.canConsume(message)) {
64: subscriber.receiveMessage(message);
65: isConsumed = true;
66: acknowledgeMessage(message, session);
67: }
68: }
69: }
70: }
71: if (!isConsumed) {
72: addCurrentMessage(message);
73: }
74: }
75: }
|