001: package com.mockrunner.test.jms;
002:
003: import java.util.ArrayList;
004: import java.util.List;
005:
006: import javax.jms.BytesMessage;
007: import javax.jms.JMSException;
008: import javax.jms.Message;
009: import javax.jms.MessageListener;
010: import javax.jms.Session;
011: import javax.jms.StreamMessage;
012: import javax.jms.TemporaryTopic;
013: import javax.jms.TextMessage;
014: import javax.jms.Topic;
015: import javax.jms.TopicPublisher;
016: import javax.jms.TopicSession;
017: import javax.jms.TopicSubscriber;
018:
019: import junit.framework.TestCase;
020:
021: import com.mockrunner.jms.ConfigurationManager;
022: import com.mockrunner.jms.DestinationManager;
023: import com.mockrunner.jms.MessageManager;
024: import com.mockrunner.jms.TopicTransmissionManager;
025: import com.mockrunner.jms.TransmissionManagerWrapper;
026: import com.mockrunner.mock.jms.MockBytesMessage;
027: import com.mockrunner.mock.jms.MockMapMessage;
028: import com.mockrunner.mock.jms.MockObjectMessage;
029: import com.mockrunner.mock.jms.MockStreamMessage;
030: import com.mockrunner.mock.jms.MockTemporaryTopic;
031: import com.mockrunner.mock.jms.MockTextMessage;
032: import com.mockrunner.mock.jms.MockTopic;
033: import com.mockrunner.mock.jms.MockTopicConnection;
034: import com.mockrunner.mock.jms.MockTopicPublisher;
035: import com.mockrunner.mock.jms.MockTopicSession;
036: import com.mockrunner.mock.jms.MockTopicSubscriber;
037:
038: public class MockTopicSessionTest extends TestCase {
039: private MockTopicConnection connection;
040: private MockTopicSession session;
041: private MockTopicSession anotherSession;
042: private MockTopic topic1;
043: private MockTopic topic2;
044:
045: protected void setUp() throws Exception {
046: super .setUp();
047: DestinationManager destManager = new DestinationManager();
048: ConfigurationManager confManager = new ConfigurationManager();
049: connection = new MockTopicConnection(destManager, confManager);
050: session = (MockTopicSession) connection.createTopicSession(
051: false, TopicSession.CLIENT_ACKNOWLEDGE);
052: anotherSession = (MockTopicSession) connection
053: .createTopicSession(false,
054: TopicSession.CLIENT_ACKNOWLEDGE);
055: }
056:
057: public void testCreateMessages() throws Exception {
058: session.createTextMessage("Text1");
059: session.createObjectMessage("Object1");
060: session.createBytesMessage();
061: session.createStreamMessage();
062: MessageManager manager = session.getMessageManager();
063: assertEquals("Text1", manager.getTextMessage(0).getText());
064: assertEquals("Object1", manager.getObjectMessage(0).getObject());
065: assertNotNull(manager.getBytesMessage(0));
066: assertNotNull(manager.getStreamMessage(0));
067: assertNull(manager.getMapMessage(0));
068: assertNull(manager.getBytesMessage(1));
069: }
070:
071: public void testCreateTopics() throws Exception {
072: try {
073: session.createTopic("Topic1");
074: fail();
075: } catch (JMSException exc) {
076: //should throw exception
077: }
078: DestinationManager manager = connection.getDestinationManager();
079: Topic managerTopic1 = manager.createTopic("Topic1");
080: Topic topic = session.createTopic("Topic1");
081: assertTrue(topic == managerTopic1);
082: assertEquals("Topic1", topic.getTopicName());
083: manager.createTopic("Topic2");
084: assertTrue(manager.getTopic("Topic2") == session
085: .createTopic("Topic2"));
086: manager.removeTopic("Topic2");
087: try {
088: session.createTopic("Topic2");
089: fail();
090: } catch (JMSException exc) {
091: //should throw exception
092: }
093: session.createTemporaryTopic();
094: TemporaryTopic tempTopic = session.createTemporaryTopic();
095: assertNotNull(session.getTemporaryTopic(0));
096: assertNotNull(session.getTemporaryTopic(1));
097: assertNull(session.getTemporaryTopic(3));
098: assertTrue(tempTopic == session.getTemporaryTopic(1));
099: }
100:
101: public void testCreatePublisherAndSubscriber() throws Exception {
102: DestinationManager manager = connection.getDestinationManager();
103: TopicTransmissionManager topicTransManager = session
104: .getTopicTransmissionManager();
105: TransmissionManagerWrapper transManager = session
106: .getTransmissionManagerWrapper();
107: topic1 = manager.createTopic("Topic1");
108: topic2 = manager.createTopic("Topic2");
109: assertEquals(0, topicTransManager.getTopicPublisherList()
110: .size());
111: assertEquals(0, transManager.getMessageProducerList().size());
112: TopicPublisher publisher1 = session.createPublisher(topic1);
113: TopicPublisher publisher2 = session.createPublisher(topic2);
114: assertNotNull(topicTransManager.getTopicPublisher(0));
115: assertNotNull(topicTransManager.getTopicPublisher(1));
116: assertNull(topicTransManager.getTopicPublisher(2));
117: assertEquals(2, topicTransManager.getTopicPublisherList()
118: .size());
119: assertNotNull(transManager.getMessageProducer(0));
120: assertNotNull(transManager.getMessageProducer(1));
121: assertNull(transManager.getMessageProducer(2));
122: assertEquals(2, transManager.getMessageProducerList().size());
123: assertTrue(publisher1 == topicTransManager.getTopicPublisher(0));
124: assertTrue(publisher2 == topicTransManager
125: .getTopicPublisher("Topic2"));
126: assertTrue(topic1 == topicTransManager.getTopicPublisher(
127: "Topic1").getTopic());
128: assertTrue(topic2 == topicTransManager.getTopicPublisher(1)
129: .getTopic());
130: assertEquals(0, topicTransManager.getTopicSubscriberList()
131: .size());
132: assertEquals(0, transManager.getMessageConsumerList().size());
133: TopicSubscriber subscriber = session.createSubscriber(topic1);
134: assertFalse(((MockTopicSubscriber) subscriber).isDurable());
135: session.createSubscriber(topic2);
136: assertNotNull(topicTransManager.getTopicSubscriber(0));
137: assertNotNull(topicTransManager.getTopicSubscriber(1));
138: assertNotNull(transManager.getMessageConsumer(0));
139: assertNotNull(transManager.getMessageConsumer(1));
140: assertTrue(subscriber == topicTransManager
141: .getTopicSubscriber("Topic1"));
142: assertTrue("Topic2" == topicTransManager.getTopicSubscriber(1)
143: .getTopic().getTopicName());
144: assertNull(topicTransManager.getTopicSubscriber(2));
145: assertTrue(subscriber == topicTransManager
146: .getTopicSubscriber(0));
147: assertFalse(subscriber == topicTransManager
148: .getTopicSubscriber(1));
149: assertTrue(topic1 == topicTransManager.getTopicSubscriber(0)
150: .getTopic());
151: assertTrue(topic2 == topicTransManager.getTopicSubscriber(1)
152: .getTopic());
153: assertEquals(2, topicTransManager.getTopicSubscriberList()
154: .size());
155: assertEquals(2, transManager.getMessageConsumerList().size());
156: }
157:
158: public void testCreateDurableSubscriber() throws Exception {
159: DestinationManager manager = connection.getDestinationManager();
160: TopicTransmissionManager transManager = session
161: .getTopicTransmissionManager();
162: topic1 = manager.createTopic("Topic1");
163: assertEquals(0, transManager.getDurableTopicSubscriberMap()
164: .size());
165: TopicSubscriber subscriber1 = session.createDurableSubscriber(
166: topic1, "Durable1");
167: TopicSubscriber subscriber2 = session.createDurableSubscriber(
168: topic1, "Durable2", null, true);
169: assertEquals(2, transManager.getDurableTopicSubscriberMap()
170: .size());
171: assertFalse(((MockTopicSubscriber) subscriber1).getNoLocal());
172: assertTrue(((MockTopicSubscriber) subscriber1).isDurable());
173: assertTrue(((MockTopicSubscriber) subscriber2).getNoLocal());
174: assertTrue(((MockTopicSubscriber) subscriber2).isDurable());
175: assertTrue(subscriber1 == transManager
176: .getDurableTopicSubscriber("Durable1"));
177: assertTrue(subscriber2 == transManager
178: .getDurableTopicSubscriber("Durable2"));
179: assertEquals("Durable1", transManager
180: .getDurableTopicSubscriber("Durable1").getName());
181: assertTrue(topic1 == transManager.getDurableTopicSubscriber(
182: "Durable2").getTopic());
183: session.unsubscribe("Durable2");
184: assertEquals(1, transManager.getDurableTopicSubscriberMap()
185: .size());
186: assertTrue(subscriber1 == transManager
187: .getDurableTopicSubscriberMap().get("Durable1"));
188: assertNull(transManager.getDurableTopicSubscriberMap().get(
189: "Durable2"));
190: assertNull(transManager.getDurableTopicSubscriber("Durable2"));
191: TopicSubscriber subscriber3 = session.createDurableSubscriber(
192: topic1, "Durable1");
193: assertFalse(((MockTopicSubscriber) subscriber3).getNoLocal());
194: assertTrue(((MockTopicSubscriber) subscriber3).isDurable());
195: assertFalse(subscriber1 == transManager
196: .getDurableTopicSubscriber("Durable1"));
197: assertTrue(subscriber3 == transManager
198: .getDurableTopicSubscriber("Durable1"));
199: session.unsubscribe("Durable1");
200: assertEquals(0, transManager.getDurableTopicSubscriberMap()
201: .size());
202: }
203:
204: public void testTransmissionGlobalListener() throws Exception {
205: DestinationManager manager = connection.getDestinationManager();
206: manager.createTopic("Topic1");
207: topic1 = (MockTopic) session.createTopic("Topic1");
208: TopicPublisher publisher = session.createPublisher(topic1);
209: TestMessageListener globalListener = new TestMessageListener();
210: session.setMessageListener(globalListener);
211: publisher.publish(new MockTextMessage("Text1"));
212: assertEquals("Text1", ((TextMessage) globalListener
213: .getMessage()).getText());
214: TopicSubscriber subscriber1 = session.createSubscriber(topic1);
215: TopicSubscriber subscriber2 = session.createDurableSubscriber(
216: topic1, "durable");
217: TestMessageListener listener1 = new TestMessageListener();
218: TestMessageListener listener2 = new TestMessageListener();
219: subscriber1.setMessageListener(listener1);
220: subscriber2.setMessageListener(listener2);
221: publisher.publish(new MockTextMessage("Text2"));
222: assertEquals("Text2", ((TextMessage) globalListener
223: .getMessage()).getText());
224: assertNull(listener1.getMessage());
225: assertNull(listener2.getMessage());
226: }
227:
228: public void testTransmission() throws Exception {
229: DestinationManager manager = connection.getDestinationManager();
230: manager.createTopic("Topic1");
231: manager.createTopic("Topic2");
232: topic1 = (MockTopic) session.createTopic("Topic1");
233: topic2 = (MockTopic) session.createTopic("Topic2");
234: TopicPublisher publisher1 = session.createPublisher(topic1);
235: TopicPublisher publisher2 = session.createPublisher(topic2);
236: MockTopicSubscriber subscriber1 = (MockTopicSubscriber) session
237: .createSubscriber(topic1);
238: MockTopicSubscriber subscriber2 = (MockTopicSubscriber) session
239: .createSubscriber(topic1);
240: MockTopicSubscriber subscriber3 = (MockTopicSubscriber) session
241: .createSubscriber(topic2);
242: TestMessageListener listener1 = new TestMessageListener();
243: TestMessageListener listener2 = new TestMessageListener();
244: TestMessageListener listener3 = new TestMessageListener();
245: subscriber1.setMessageListener(listener1);
246: subscriber2.setMessageListener(listener2);
247: subscriber3.setMessageListener(listener3);
248: publisher1.publish(new MockTextMessage("Text1"));
249: assertEquals("Text1", ((TextMessage) listener1.getMessage())
250: .getText());
251: assertEquals("Text1", ((TextMessage) listener2.getMessage())
252: .getText());
253: assertNull(listener3.getMessage());
254: assertEquals(1, topic1.getReceivedMessageList().size());
255: assertEquals(0, topic1.getCurrentMessageList().size());
256: assertNull(topic1.getMessage());
257: assertTrue(topic1.isEmpty());
258: assertEquals(0, topic2.getReceivedMessageList().size());
259: assertEquals(0, topic2.getCurrentMessageList().size());
260: assertNull(topic2.getMessage());
261: assertTrue(topic2.isEmpty());
262: publisher1.publish(new MockTextMessage("Text2"));
263: assertEquals("Text2", ((TextMessage) listener1.getMessage())
264: .getText());
265: assertEquals("Text2", ((TextMessage) listener2.getMessage())
266: .getText());
267: assertNull(listener3.getMessage());
268: publisher1.publish(new MockTextMessage("Text3"));
269: assertEquals("Text3", ((TextMessage) listener1.getMessage())
270: .getText());
271: assertEquals("Text3", ((TextMessage) listener2.getMessage())
272: .getText());
273: assertNull(listener3.getMessage());
274: assertEquals(3, topic1.getReceivedMessageList().size());
275: assertEquals(0, topic1.getCurrentMessageList().size());
276: assertEquals(0, topic2.getReceivedMessageList().size());
277: assertEquals(0, topic2.getCurrentMessageList().size());
278: publisher2.publish(new MockTextMessage("Text4"));
279: assertEquals("Text3", ((TextMessage) listener1.getMessage())
280: .getText());
281: assertEquals("Text3", ((TextMessage) listener2.getMessage())
282: .getText());
283: assertEquals("Text4", ((TextMessage) listener3.getMessage())
284: .getText());
285: assertEquals(3, topic1.getReceivedMessageList().size());
286: assertEquals(0, topic1.getCurrentMessageList().size());
287: assertEquals(1, topic2.getReceivedMessageList().size());
288: assertEquals(0, topic2.getCurrentMessageList().size());
289: manager.createTopic("NewTopic");
290: topic2 = (MockTopic) session.createTopic("NewTopic");
291: publisher2 = session.createPublisher(topic2);
292: publisher2.publish(new MockTextMessage("Text5"));
293: assertEquals(1, topic2.getReceivedMessageList().size());
294: assertEquals(1, topic2.getCurrentMessageList().size());
295: assertEquals("Text5", ((TextMessage) topic2.getMessage())
296: .getText());
297: assertNull(topic2.getMessage());
298: assertEquals(1, topic2.getReceivedMessageList().size());
299: assertEquals(0, topic2.getCurrentMessageList().size());
300: subscriber2 = (MockTopicSubscriber) session
301: .createSubscriber(topic2);
302: publisher2.publish(new MockTextMessage("Text6"));
303: publisher2.publish(new MockTextMessage("Text7"));
304: publisher2.publish(new MockTextMessage("Text8"));
305: assertEquals("Text6", ((TextMessage) subscriber2.receive())
306: .getText());
307: assertEquals("Text7", ((TextMessage) subscriber2.receive())
308: .getText());
309: assertEquals("Text8", ((TextMessage) topic2.getMessage())
310: .getText());
311: assertNull(topic2.getMessage());
312: assertEquals(4, topic2.getReceivedMessageList().size());
313: assertEquals(0, topic2.getCurrentMessageList().size());
314: subscriber2 = (MockTopicSubscriber) session
315: .createSubscriber(topic2);
316: subscriber2.setMessageListener(listener2);
317: publisher2.publish(new MockTextMessage("Text9"));
318: assertEquals(5, topic2.getReceivedMessageList().size());
319: assertEquals(0, topic2.getCurrentMessageList().size());
320: assertEquals("Text9", ((TextMessage) listener2.getMessage())
321: .getText());
322: assertNull(topic2.getMessage());
323: }
324:
325: public void testTransmissionDurableSubscriber() throws Exception {
326: DestinationManager manager = connection.getDestinationManager();
327: manager.createTopic("Topic1");
328: manager.createTopic("Topic2");
329: topic1 = (MockTopic) session.createTopic("Topic1");
330: topic2 = (MockTopic) session.createTopic("Topic2");
331: TopicPublisher publisher1 = session.createPublisher(topic1);
332: TopicPublisher publisher2 = session.createPublisher(topic2);
333: MockTopicSubscriber subscriber1 = (MockTopicSubscriber) session
334: .createSubscriber(topic1);
335: MockTopicSubscriber subscriber2 = (MockTopicSubscriber) session
336: .createSubscriber(topic2);
337: MockTopicSubscriber durableSubscriber1 = (MockTopicSubscriber) session
338: .createDurableSubscriber(topic1, "durable1");
339: MockTopicSubscriber durableSubscriber2 = (MockTopicSubscriber) session
340: .createDurableSubscriber(topic1, "durable2");
341: TestMessageListener listener1 = new TestMessageListener();
342: TestMessageListener listener2 = new TestMessageListener();
343: TestMessageListener durableListener1 = new TestMessageListener();
344: TestMessageListener durableListener2 = new TestMessageListener();
345: subscriber1.setMessageListener(listener1);
346: subscriber2.setMessageListener(listener2);
347: durableSubscriber1.setMessageListener(durableListener1);
348: durableSubscriber2.setMessageListener(durableListener2);
349: publisher1.publish(new MockTextMessage("Text1"));
350: assertEquals(1, topic1.getReceivedMessageList().size());
351: assertEquals(0, topic1.getCurrentMessageList().size());
352: assertEquals(0, topic2.getReceivedMessageList().size());
353: assertEquals(0, topic2.getCurrentMessageList().size());
354: assertEquals("Text1", ((TextMessage) listener1.getMessage())
355: .getText());
356: assertEquals("Text1", ((TextMessage) durableListener1
357: .getMessage()).getText());
358: assertEquals("Text1", ((TextMessage) durableListener2
359: .getMessage()).getText());
360: assertNull(listener2.getMessage());
361: publisher2.publish(new MockTextMessage("Text2"));
362: assertEquals(1, topic1.getReceivedMessageList().size());
363: assertEquals(0, topic1.getCurrentMessageList().size());
364: assertEquals(1, topic2.getReceivedMessageList().size());
365: assertEquals(0, topic2.getCurrentMessageList().size());
366: assertEquals("Text1", ((TextMessage) listener1.getMessage())
367: .getText());
368: assertEquals("Text1", ((TextMessage) durableListener1
369: .getMessage()).getText());
370: assertEquals("Text1", ((TextMessage) durableListener2
371: .getMessage()).getText());
372: assertEquals("Text2", ((TextMessage) listener2.getMessage())
373: .getText());
374: session.unsubscribe("durable2");
375: publisher1.publish(new MockTextMessage("Text3"));
376: assertEquals(2, topic1.getReceivedMessageList().size());
377: assertEquals(0, topic1.getCurrentMessageList().size());
378: assertEquals(1, topic2.getReceivedMessageList().size());
379: assertEquals(0, topic2.getCurrentMessageList().size());
380: assertEquals("Text3", ((TextMessage) listener1.getMessage())
381: .getText());
382: assertEquals("Text3", ((TextMessage) durableListener1
383: .getMessage()).getText());
384: assertEquals("Text1", ((TextMessage) durableListener2
385: .getMessage()).getText());
386: assertEquals("Text2", ((TextMessage) listener2.getMessage())
387: .getText());
388: }
389:
390: public void testTransmissionResetCalled() throws Exception {
391: DestinationManager manager = connection.getDestinationManager();
392: manager.createTopic("Topic1");
393: topic1 = (MockTopic) session.createTopic("Topic1");
394: TopicPublisher publisher = session.createPublisher(topic1);
395: BytesMessage bytesMessage = new MockBytesMessage();
396: StreamMessage streamMessage = new MockStreamMessage();
397: bytesMessage.writeDouble(123.3);
398: streamMessage.writeLong(234);
399: try {
400: bytesMessage.readDouble();
401: fail();
402: } catch (JMSException exc) {
403: //should throw exception
404: }
405: try {
406: streamMessage.readInt();
407: fail();
408: } catch (JMSException exc) {
409: //should throw exception
410: }
411: publisher.publish(bytesMessage);
412: publisher.publish(streamMessage);
413: bytesMessage = (BytesMessage) topic1.getMessage();
414: streamMessage = (StreamMessage) topic1.getMessage();
415: assertEquals(123.3, bytesMessage.readDouble(), 0);
416: assertEquals(234, streamMessage.readLong());
417: }
418:
419: public void testTransmissionSenderOrReceiverClosed()
420: throws Exception {
421: DestinationManager manager = connection.getDestinationManager();
422: manager.createTopic("Topic1");
423: topic1 = (MockTopic) session.createTopic("Topic1");
424: MockTopicPublisher publisher = (MockTopicPublisher) session
425: .createPublisher(topic1);
426: TopicSubscriber subscriber1 = session.createSubscriber(topic1);
427: TestMessageListener listener1 = new TestMessageListener();
428: subscriber1.setMessageListener(listener1);
429: publisher.publish(new MockTextMessage("Text"));
430: assertNull(subscriber1.receive());
431: assertEquals(new MockTextMessage("Text"), listener1
432: .getMessage());
433: listener1.reset();
434: subscriber1.close();
435: publisher.publish(new MockTextMessage("Text"));
436: assertNull(listener1.getMessage());
437: try {
438: subscriber1.receive();
439: fail();
440: } catch (JMSException exc) {
441: //should throw exception
442: }
443: TopicSubscriber subscriber2 = session.createSubscriber(topic1);
444: assertEquals(new MockTextMessage("Text"), subscriber2.receive());
445: TestMessageListener listener2 = new TestMessageListener();
446: subscriber2.setMessageListener(listener2);
447: publisher.publish(new MockTextMessage("Text"));
448: assertEquals(new MockTextMessage("Text"), listener2
449: .getMessage());
450: publisher.close();
451: try {
452: publisher.publish(new MockTextMessage("Text"));
453: fail();
454: } catch (JMSException exc) {
455: //should throw exception
456: }
457: }
458:
459: public void testTransmissionWithMessageSelector() throws Exception {
460: DestinationManager manager = connection.getDestinationManager();
461: manager.createTopic("Topic");
462: MockTopic topic = (MockTopic) session.createTopic("Topic");
463: MockTopicSubscriber subscriber1 = (MockTopicSubscriber) session
464: .createSubscriber(topic, "number <= 3", false);
465: TestListMessageListener listener1 = new TestListMessageListener();
466: subscriber1.setMessageListener(listener1);
467: TopicPublisher publisher = session.createPublisher(topic);
468: MockStreamMessage message1 = new MockStreamMessage();
469: message1.setIntProperty("number", 3);
470: publisher.publish(message1);
471: MockStreamMessage message2 = new MockStreamMessage();
472: message2.setIntProperty("number", 1);
473: publisher.publish(message2);
474: MockStreamMessage message3 = new MockStreamMessage();
475: message3.setIntProperty("number", 4);
476: publisher.publish(message3);
477: assertEquals(2, listener1.getMessageList().size());
478: assertSame(message1, listener1.getMessageList().get(0));
479: assertSame(message2, listener1.getMessageList().get(1));
480: assertNull(subscriber1.receive());
481: MockTopicSubscriber subscriber2 = (MockTopicSubscriber) session
482: .createSubscriber(topic);
483: assertSame(message3, subscriber2.receiveNoWait());
484: subscriber2.setMessageListener(listener1);
485: listener1.clearMessageList();
486: publisher.publish(message3);
487: publisher.publish(message2);
488: publisher.publish(message1);
489: assertEquals(5, listener1.getMessageList().size());
490: }
491:
492: public void testTransmissionMessageAcknowledged() throws Exception {
493: MockTopicSession session1 = (MockTopicSession) connection
494: .createTopicSession(false, Session.CLIENT_ACKNOWLEDGE);
495: MockTopicSession session2 = (MockTopicSession) connection
496: .createTopicSession(false, Session.DUPS_OK_ACKNOWLEDGE);
497: DestinationManager manager = connection.getDestinationManager();
498: manager.createTopic("Topic");
499: MockTopic topic = (MockTopic) session1.createTopic("Topic");
500: MockTopicPublisher publisher = (MockTopicPublisher) session2
501: .createPublisher(topic);
502: MockMapMessage message1 = new MockMapMessage();
503: MockStreamMessage message2 = new MockStreamMessage();
504: topic.reset();
505: publisher.publish(message1);
506: publisher.publish(message2);
507: message1 = (MockMapMessage) topic.getReceivedMessageList().get(
508: 0);
509: message2 = (MockStreamMessage) topic.getReceivedMessageList()
510: .get(1);
511: assertFalse(message1.isAcknowledged());
512: assertFalse(message2.isAcknowledged());
513: MockTopicSubscriber subscriber1 = (MockTopicSubscriber) session1
514: .createSubscriber(topic);
515: MockTopicSubscriber subscriber2 = (MockTopicSubscriber) session2
516: .createDurableSubscriber(topic, "mySubscription");
517: subscriber1.receive(1);
518: assertFalse(message1.isAcknowledged());
519: assertFalse(message2.isAcknowledged());
520: subscriber2.receiveNoWait();
521: assertFalse(message1.isAcknowledged());
522: assertTrue(message2.isAcknowledged());
523: TestMessageListener listener = new TestMessageListener();
524: subscriber1.setMessageListener(listener);
525: message2 = new MockStreamMessage();
526: topic.reset();
527: publisher.publish(message2);
528: message2 = (MockStreamMessage) topic.getReceivedMessageList()
529: .get(0);
530: assertFalse(message2.isAcknowledged());
531: subscriber2.setMessageListener(listener);
532: message2 = new MockStreamMessage();
533: topic.reset();
534: publisher.publish(message2);
535: message2 = (MockStreamMessage) topic.getReceivedMessageList()
536: .get(0);
537: assertTrue(message2.isAcknowledged());
538: subscriber1.setMessageListener(null);
539: subscriber2.setMessageListener(null);
540: session1.setMessageListener(listener);
541: message2 = new MockStreamMessage();
542: topic.reset();
543: publisher.publish(message2);
544: message2 = (MockStreamMessage) topic.getReceivedMessageList()
545: .get(0);
546: assertFalse(message2.isAcknowledged());
547: session2.setMessageListener(listener);
548: message2 = new MockStreamMessage();
549: topic.reset();
550: publisher.publish(message2);
551: message2 = (MockStreamMessage) topic.getReceivedMessageList()
552: .get(0);
553: assertTrue(message2.isAcknowledged());
554: }
555:
556: public void testTransmissionMultipleSessions() throws Exception {
557: DestinationManager manager = connection.getDestinationManager();
558: manager.createTopic("Topic1");
559: topic1 = (MockTopic) session.createTopic("Topic1");
560: topic2 = (MockTopic) anotherSession.createTopic("Topic1");
561: TestListMessageListener listener = new TestListMessageListener();
562: session.setMessageListener(listener);
563: MockTopicSubscriber subscriber1 = (MockTopicSubscriber) anotherSession
564: .createSubscriber(topic1);
565: subscriber1.setMessageListener(listener);
566: TopicPublisher publisher = anotherSession
567: .createPublisher(topic1);
568: publisher.publish(new MockTextMessage("Text1"));
569: assertEquals(1, topic1.getReceivedMessageList().size());
570: assertEquals(0, topic1.getCurrentMessageList().size());
571: assertEquals(2, listener.getMessageList().size());
572: assertEquals(new MockTextMessage("Text1"), listener
573: .getMessageList().get(0));
574: MockTopicSubscriber subscriber2 = (MockTopicSubscriber) session
575: .createSubscriber(topic1);
576: subscriber2.setMessageListener(listener);
577: session.setMessageListener(null);
578: publisher.publish(new MockTextMessage("Text2"));
579: assertEquals(2, topic1.getReceivedMessageList().size());
580: assertEquals(0, topic1.getCurrentMessageList().size());
581: assertEquals(4, listener.getMessageList().size());
582: assertEquals(new MockTextMessage("Text1"), listener
583: .getMessageList().get(0));
584: assertEquals(new MockTextMessage("Text1"), listener
585: .getMessageList().get(1));
586: assertEquals(new MockTextMessage("Text2"), listener
587: .getMessageList().get(2));
588: assertEquals(new MockTextMessage("Text2"), listener
589: .getMessageList().get(3));
590: MockTopicSubscriber subscriber3 = (MockTopicSubscriber) session
591: .createSubscriber(topic1);
592: subscriber3.setMessageListener(listener);
593: publisher = anotherSession.createPublisher(topic2);
594: publisher.publish(new MockObjectMessage(new Integer(1)));
595: assertEquals(3, topic1.getReceivedMessageList().size());
596: assertEquals(3, topic2.getReceivedMessageList().size());
597: assertEquals(0, topic1.getCurrentMessageList().size());
598: assertEquals(0, topic2.getCurrentMessageList().size());
599: assertEquals(7, listener.getMessageList().size());
600: manager.createTopic("Topic2");
601: topic2 = (MockTopic) anotherSession.createTopic("Topic2");
602: publisher = anotherSession.createPublisher(topic2);
603: publisher.publish(new MockTextMessage("Text2"));
604: assertEquals(3, topic1.getReceivedMessageList().size());
605: assertEquals(0, topic1.getCurrentMessageList().size());
606: assertEquals(1, topic2.getReceivedMessageList().size());
607: assertEquals(1, topic2.getCurrentMessageList().size());
608: assertEquals(7, listener.getMessageList().size());
609: }
610:
611: public void testCloseSession() throws Exception {
612: DestinationManager manager = connection.getDestinationManager();
613: manager.createTopic("Topic");
614: MockTopicSession session = (MockTopicSession) connection
615: .createTopicSession(false,
616: TopicSession.CLIENT_ACKNOWLEDGE);
617: MockTopic topic = (MockTopic) session.createTopic("Topic");
618: MockTopicSubscriber subscriber1 = (MockTopicSubscriber) session
619: .createSubscriber(topic);
620: MockTopicPublisher publisher1 = (MockTopicPublisher) session
621: .createPublisher(topic);
622: MockTopicPublisher publisher2 = (MockTopicPublisher) session
623: .createPublisher(topic);
624: session.close();
625: assertTrue(session.isClosed());
626: assertFalse(session.isRolledBack());
627: assertTrue(subscriber1.isClosed());
628: assertTrue(publisher1.isClosed());
629: assertTrue(publisher2.isClosed());
630: session = (MockTopicSession) connection.createTopicSession(
631: true, TopicSession.CLIENT_ACKNOWLEDGE);
632: topic = (MockTopic) session.createTopic("Topic");
633: subscriber1 = (MockTopicSubscriber) session
634: .createSubscriber(topic);
635: publisher1 = (MockTopicPublisher) session
636: .createPublisher(topic);
637: publisher2 = (MockTopicPublisher) session
638: .createPublisher(topic);
639: session.close();
640: assertTrue(session.isClosed());
641: assertTrue(session.isRolledBack());
642: assertTrue(subscriber1.isClosed());
643: assertTrue(publisher1.isClosed());
644: assertTrue(publisher2.isClosed());
645: session = (MockTopicSession) connection.createTopicSession(
646: true, TopicSession.CLIENT_ACKNOWLEDGE);
647: session.commit();
648: session.close();
649: assertTrue(session.isClosed());
650: assertFalse(session.isRolledBack());
651: }
652:
653: public void testCloseSessionRemove() throws Exception {
654: DestinationManager manager = connection.getDestinationManager();
655: MockTopic topic1 = manager.createTopic("Topic1");
656: MockTopic topic2 = manager.createTopic("Topic2");
657: MockTopicSession session = (MockTopicSession) connection
658: .createTopicSession(false,
659: TopicSession.CLIENT_ACKNOWLEDGE);
660: session.createTopic("Topic2");
661: MockTemporaryTopic tempTopic = (MockTemporaryTopic) session
662: .createTemporaryTopic();
663: assertFalse(topic1.sessionSet().contains(session));
664: assertTrue(topic2.sessionSet().contains(session));
665: assertTrue(tempTopic.sessionSet().contains(session));
666: session.close();
667: assertFalse(topic1.sessionSet().contains(session));
668: assertFalse(topic2.sessionSet().contains(session));
669: assertFalse(tempTopic.sessionSet().contains(session));
670: }
671:
672: public void testTransmissionWithNullDestination() throws Exception {
673: MockTopicSession session = (MockTopicSession) connection
674: .createTopicSession(false, Session.CLIENT_ACKNOWLEDGE);
675: DestinationManager manager = connection.getDestinationManager();
676: MockTopic topic = (MockTopic) manager.createTopic("Topic");
677: TopicPublisher publisher = session.createPublisher(null);
678: MockTextMessage message = new MockTextMessage("Text");
679: publisher.publish(topic, message);
680: assertEquals(1, topic.getReceivedMessageList().size());
681: assertEquals(1, topic.getCurrentMessageList().size());
682: }
683:
684: public static class TestListMessageListener implements
685: MessageListener {
686: private List messages = new ArrayList();
687:
688: public List getMessageList() {
689: return messages;
690: }
691:
692: public void clearMessageList() {
693: messages.clear();
694: }
695:
696: public void onMessage(Message message) {
697: messages.add(message);
698: }
699: }
700:
701: public static class TestMessageListener implements MessageListener {
702: private Message message;
703:
704: public Message getMessage() {
705: return message;
706: }
707:
708: public void reset() {
709: message = null;
710: }
711:
712: public void onMessage(Message message) {
713: this.message = message;
714: }
715: }
716: }
|