001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.test.jbossmq.test;
023:
024: import javax.jms.BytesMessage;
025: import javax.jms.DeliveryMode;
026: import javax.jms.Message;
027: import javax.jms.MessageListener;
028: import javax.jms.Queue;
029: import javax.jms.QueueConnection;
030: import javax.jms.QueueConnectionFactory;
031: import javax.jms.QueueReceiver;
032: import javax.jms.QueueSender;
033: import javax.jms.QueueSession;
034: import javax.jms.Session;
035: import javax.jms.Topic;
036: import javax.jms.TopicConnection;
037: import javax.jms.TopicConnectionFactory;
038: import javax.jms.TopicPublisher;
039: import javax.jms.TopicSession;
040: import javax.jms.TopicSubscriber;
041: import javax.naming.Context;
042:
043: import junit.framework.Test;
044: import junit.framework.TestSuite;
045:
046: import org.apache.log4j.Category;
047: import org.jboss.test.JBossTestCase;
048:
049: /**
050: * Rollback tests
051: *
052: * @author
053: * @version $Revision: 57211 $
054: */
055: public class UnackedUnitTestCase extends JBossTestCase {
056: // Provider specific
057: static String TOPIC_FACTORY = "ConnectionFactory";
058: static String QUEUE_FACTORY = "ConnectionFactory";
059:
060: static String TEST_QUEUE = "queue/testQueue";
061: static String TEST_TOPIC = "topic/testTopic";
062: static String TEST_DURABLE_TOPIC = "topic/testDurableTopic";
063:
064: static byte[] PAYLOAD = new byte[10];
065:
066: static Context context;
067: static QueueConnection queueConnection;
068: static TopicConnection topicConnection;
069: static TopicConnection topicDurableConnection;
070:
071: public static Test suite() throws Exception {
072: // JBAS-3580, the execution order of tests in this test case is important
073: // so it must be defined explicitly when running under some JVMs
074: TestSuite suite = new TestSuite();
075: suite.addTest(new UnackedUnitTestCase("testUnackedQueue"));
076: suite.addTest(new UnackedUnitTestCase(
077: "testUnackedMultipleSession"));
078: suite.addTest(new UnackedUnitTestCase(
079: "testUnackedMultipleConnection"));
080: suite.addTest(new UnackedUnitTestCase("testUnackedTopic"));
081: suite
082: .addTest(new UnackedUnitTestCase(
083: "testUnackedDurableTopic"));
084: suite.addTest(new UnackedUnitTestCase("testDummyLast"));
085:
086: return suite;
087: }
088:
089: /**
090: * Constructor the test
091: *
092: * @param name Description of Parameter
093: * @exception Exception Description of Exception
094: */
095: public UnackedUnitTestCase(String name) throws Exception {
096: super (name);
097: }
098:
099: /**
100: * #Description of the Method
101: *
102: * @param persistence Description of Parameter
103: * @exception Exception Description of Exception
104: */
105: public void runUnackedQueue(final int persistence) throws Exception {
106: drainQueue();
107:
108: final int iterationCount = getIterationCount();
109:
110: QueueSession session = queueConnection.createQueueSession(
111: false, Session.AUTO_ACKNOWLEDGE);
112: Queue queue = (Queue) context.lookup(TEST_QUEUE);
113:
114: QueueSender sender = session.createSender(queue);
115:
116: Message message = session.createBytesMessage();
117: ((BytesMessage) message).writeBytes(PAYLOAD);
118:
119: for (int i = 0; i < iterationCount; i++)
120: sender.send(message, persistence, 4, 0);
121:
122: session.close();
123:
124: session = queueConnection.createQueueSession(false,
125: Session.CLIENT_ACKNOWLEDGE);
126: queue = (Queue) context.lookup(TEST_QUEUE);
127: QueueReceiver receiver = session.createReceiver(queue);
128: queueConnection.start();
129: message = receiver.receive(50);
130: int c = 0;
131: while (message != null) {
132: message = receiver.receive(50);
133: c++;
134: }
135: assertTrue("Should have received all data unacked",
136: c == iterationCount);
137:
138: queueConnection.close();
139: QueueConnectionFactory queueFactory = (QueueConnectionFactory) context
140: .lookup(QUEUE_FACTORY);
141: queueConnection = queueFactory.createQueueConnection();
142:
143: assertTrue("Queue should be full",
144: drainQueue() == iterationCount);
145:
146: }
147:
148: /**
149: * #Description of the Method
150: *
151: * @param persistence Description of Parameter
152: * @exception Exception Description of Exception
153: */
154: public void runUnackedMultipleSession(final int persistence)
155: throws Exception {
156: drainQueue();
157:
158: final int iterationCount = getIterationCount();
159:
160: QueueSession session = queueConnection.createQueueSession(
161: false, Session.AUTO_ACKNOWLEDGE);
162: Queue queue = (Queue) context.lookup(TEST_QUEUE);
163:
164: QueueSender sender = session.createSender(queue);
165:
166: Message message = session.createBytesMessage();
167: ((BytesMessage) message).writeBytes(PAYLOAD);
168:
169: for (int i = 0; i < iterationCount; i++)
170: sender.send(message, persistence, 4, 0);
171:
172: session.close();
173:
174: QueueSession session1 = queueConnection.createQueueSession(
175: false, Session.CLIENT_ACKNOWLEDGE);
176: queue = (Queue) context.lookup(TEST_QUEUE);
177: QueueReceiver receiver1 = session1.createReceiver(queue);
178: QueueSession session2 = queueConnection.createQueueSession(
179: false, Session.CLIENT_ACKNOWLEDGE);
180: QueueReceiver receiver2 = session2.createReceiver(queue);
181: queueConnection.start();
182:
183: // Read half from session1
184: int c = 0;
185: for (int l = 0; l < iterationCount / 2; l++) {
186: message = receiver1.receive(50);
187: if (message != null)
188: c++;
189: }
190: assertTrue("Should have received half data unacked",
191: c == iterationCount / 2);
192:
193: // Read the rest from session2
194: c = 0;
195: Message lastMessage = null;
196: while (message != null) {
197: message = receiver2.receive(50);
198: if (message != null) {
199: c++;
200: lastMessage = message;
201: }
202: }
203: assertTrue("Should have received all data unacked",
204: c == iterationCount - iterationCount / 2);
205:
206: // Close session1, the messages are unacked and should go back in the queue
207: session1.close();
208:
209: // Acknowledge messages on session2 and close it
210: lastMessage.acknowledge();
211: session2.close();
212:
213: queueConnection.stop();
214:
215: assertTrue("Session1 messages should be available",
216: drainQueue() == iterationCount / 2);
217:
218: }
219:
220: /**
221: * #Description of the Method
222: *
223: * @param persistence Description of Parameter
224: * @exception Exception Description of Exception
225: */
226: public void runUnackedMultipleConnection(final int persistence)
227: throws Exception {
228: drainQueue();
229:
230: final int iterationCount = getIterationCount();
231:
232: QueueSession session = queueConnection.createQueueSession(
233: false, Session.AUTO_ACKNOWLEDGE);
234: Queue queue = (Queue) context.lookup(TEST_QUEUE);
235:
236: QueueSender sender = session.createSender(queue);
237:
238: Message message = session.createBytesMessage();
239: ((BytesMessage) message).writeBytes(PAYLOAD);
240:
241: for (int i = 0; i < iterationCount; i++)
242: sender.send(message, persistence, 4, 0);
243:
244: session.close();
245:
246: QueueConnectionFactory queueFactory = (QueueConnectionFactory) context
247: .lookup(QUEUE_FACTORY);
248: QueueConnection queueConnection1 = queueFactory
249: .createQueueConnection();
250: QueueSession session1 = queueConnection1.createQueueSession(
251: false, Session.CLIENT_ACKNOWLEDGE);
252: queue = (Queue) context.lookup(TEST_QUEUE);
253: QueueReceiver receiver1 = session1.createReceiver(queue);
254:
255: QueueConnection queueConnection2 = queueFactory
256: .createQueueConnection();
257: QueueSession session2 = queueConnection2.createQueueSession(
258: false, Session.CLIENT_ACKNOWLEDGE);
259: QueueReceiver receiver2 = session2.createReceiver(queue);
260:
261: queueConnection1.start();
262: queueConnection2.start();
263:
264: // Read half from session1
265: int c = 0;
266: for (int l = 0; l < iterationCount / 2; l++) {
267: message = receiver1.receive(50);
268: if (message != null)
269: c++;
270: }
271: assertTrue("Should have received half data unacked",
272: c == iterationCount / 2);
273:
274: // Read the rest from session2
275: Message lastMessage = null;
276: c = 0;
277: while (message != null) {
278: message = receiver2.receive(50);
279: if (message != null) {
280: c++;
281: lastMessage = message;
282: }
283: }
284: assertTrue("Should have received all data unacked",
285: c == iterationCount - iterationCount / 2);
286:
287: // Close session1, the messages are unacked and should go back in the queue
288: queueConnection1.close();
289:
290: // Acknowledge messages for connection 2 and close it
291: lastMessage.acknowledge();
292: queueConnection2.close();
293:
294: assertTrue("Connection1 messages should be available",
295: drainQueue() == iterationCount / 2);
296:
297: }
298:
299: /**
300: * #Description of the Method
301: *
302: * @param persistence Description of Parameter
303: * @exception Exception Description of Exception
304: */
305: public void runUnackedTopic(final int persistence) throws Exception {
306: drainQueue();
307: drainTopic();
308:
309: final int iterationCount = getIterationCount();
310: final Category log = getLog();
311:
312: Thread sendThread = new Thread() {
313: public void run() {
314: try {
315:
316: TopicSession session = topicConnection
317: .createTopicSession(false,
318: Session.AUTO_ACKNOWLEDGE);
319: Topic topic = (Topic) context.lookup(TEST_TOPIC);
320:
321: TopicPublisher publisher = session
322: .createPublisher(topic);
323:
324: waitForSynchMessage();
325:
326: BytesMessage message = session.createBytesMessage();
327: message.writeBytes(PAYLOAD);
328:
329: for (int i = 0; i < iterationCount; i++) {
330: publisher.publish(message, persistence, 4, 0);
331: }
332:
333: session.close();
334: } catch (Exception e) {
335: log.error("error", e);
336: }
337: }
338: };
339:
340: TopicSession session = topicConnection.createTopicSession(
341: false, Session.CLIENT_ACKNOWLEDGE);
342: Topic topic = (Topic) context.lookup(TEST_TOPIC);
343: TopicSubscriber subscriber = session.createSubscriber(topic);
344:
345: MyMessageListener listener = new MyMessageListener(
346: iterationCount, log);
347:
348: queueConnection.start();
349: sendThread.start();
350: subscriber.setMessageListener(listener);
351: topicConnection.start();
352: sendSynchMessage();
353: synchronized (listener) {
354: if (listener.i < iterationCount)
355: listener.wait();
356: }
357: sendThread.join();
358: topicConnection.close();
359: TopicConnectionFactory topicFactory = (TopicConnectionFactory) context
360: .lookup(TOPIC_FACTORY);
361: topicConnection = topicFactory.createTopicConnection();
362: queueConnection.stop();
363: assertTrue("Topic should be empty", drainTopic() == 0);
364: }
365:
366: /**
367: * #Description of the Method
368: *
369: * @param persistence Description of Parameter
370: * @exception Exception Description of Exception
371: */
372: public void runUnackedDurableTopic(final int persistence)
373: throws Exception {
374: drainQueue();
375: drainDurableTopic();
376:
377: final int iterationCount = getIterationCount();
378: final Category log = getLog();
379:
380: Thread sendThread = new Thread() {
381: public void run() {
382: try {
383:
384: TopicSession session = topicConnection
385: .createTopicSession(false,
386: Session.AUTO_ACKNOWLEDGE);
387: Topic topic = (Topic) context
388: .lookup(TEST_DURABLE_TOPIC);
389:
390: TopicPublisher publisher = session
391: .createPublisher(topic);
392:
393: waitForSynchMessage();
394:
395: BytesMessage message = session.createBytesMessage();
396: message.writeBytes(PAYLOAD);
397:
398: for (int i = 0; i < iterationCount; i++) {
399: publisher.publish(message, persistence, 4, 0);
400: }
401:
402: session.close();
403: } catch (Exception e) {
404: log.error("error", e);
405: }
406: }
407: };
408:
409: TopicSession session = topicDurableConnection
410: .createTopicSession(false, Session.CLIENT_ACKNOWLEDGE);
411: Topic topic = (Topic) context.lookup(TEST_DURABLE_TOPIC);
412: TopicSubscriber subscriber = session.createDurableSubscriber(
413: topic, "test");
414:
415: MyMessageListener listener = new MyMessageListener(
416: iterationCount, log);
417:
418: queueConnection.start();
419: sendThread.start();
420: subscriber.setMessageListener(listener);
421: topicDurableConnection.start();
422: sendSynchMessage();
423: synchronized (listener) {
424: if (listener.i < iterationCount)
425: listener.wait();
426: }
427:
428: sendThread.join();
429: topicDurableConnection.close();
430: TopicConnectionFactory topicFactory = (TopicConnectionFactory) context
431: .lookup(TOPIC_FACTORY);
432: topicDurableConnection = topicFactory.createTopicConnection(
433: "john", "needle");
434: queueConnection.stop();
435: assertTrue("Topic should be full",
436: drainDurableTopic() == iterationCount);
437: }
438:
439: /**
440: * A unit test for JUnit
441: *
442: * @exception Exception Description of Exception
443: */
444: public void testUnackedQueue() throws Exception {
445:
446: getLog().debug("Starting UnackedQueue test");
447:
448: runUnackedQueue(DeliveryMode.NON_PERSISTENT);
449: runUnackedQueue(DeliveryMode.PERSISTENT);
450:
451: getLog().debug("UnackedQueue passed");
452: }
453:
454: /**
455: * A unit test for JUnit
456: *
457: * @exception Exception Description of Exception
458: */
459: public void testUnackedMultipleSession() throws Exception {
460:
461: getLog().debug("Starting UnackedMultipleSession test");
462:
463: runUnackedMultipleSession(DeliveryMode.NON_PERSISTENT);
464: runUnackedMultipleSession(DeliveryMode.PERSISTENT);
465:
466: getLog().debug("UnackedMultipleSession passed");
467: }
468:
469: /**
470: * A unit test for JUnit
471: *
472: * @exception Exception Description of Exception
473: */
474: public void testUnackedMultipleConnection() throws Exception {
475:
476: getLog().debug("Starting UnackedMultipleConnection test");
477:
478: runUnackedMultipleConnection(DeliveryMode.NON_PERSISTENT);
479: runUnackedMultipleConnection(DeliveryMode.PERSISTENT);
480:
481: getLog().debug("UnackedMultipleConnection passed");
482: }
483:
484: /**
485: * A unit test for JUnit
486: *
487: * @exception Exception Description of Exception
488: */
489: public void testUnackedTopic() throws Exception {
490:
491: getLog().debug("Starting UnackedTopic test");
492:
493: runUnackedTopic(DeliveryMode.NON_PERSISTENT);
494: runUnackedTopic(DeliveryMode.PERSISTENT);
495:
496: getLog().debug("UnackedTopic passed");
497: }
498:
499: /**
500: * A unit test for JUnit
501: *
502: * @exception Exception Description of Exception
503: */
504: public void testUnackedDurableTopic() throws Exception {
505:
506: getLog().debug("Starting UnackedDurableTopic test");
507:
508: runUnackedDurableTopic(DeliveryMode.NON_PERSISTENT);
509: runUnackedDurableTopic(DeliveryMode.PERSISTENT);
510:
511: getLog().debug("UnackedDurableTopic passed");
512: }
513:
514: /**
515: * A unit test for JUnit
516: *
517: * @exception Exception Description of Exception
518: */
519: public void testDummyLast() throws Exception {
520:
521: TopicSession session = topicDurableConnection
522: .createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
523: session.unsubscribe("test");
524:
525: queueConnection.close();
526: topicConnection.close();
527: topicDurableConnection.close();
528: }
529:
530: /**
531: * The JUnit setup method
532: *
533: * @exception Exception Description of Exception
534: */
535: protected void setUp() throws Exception {
536: if (context == null) {
537: context = getInitialContext();
538:
539: QueueConnectionFactory queueFactory = (QueueConnectionFactory) context
540: .lookup(QUEUE_FACTORY);
541: queueConnection = queueFactory.createQueueConnection();
542:
543: TopicConnectionFactory topicFactory = (TopicConnectionFactory) context
544: .lookup(TOPIC_FACTORY);
545: topicConnection = topicFactory.createTopicConnection();
546: topicDurableConnection = topicFactory
547: .createTopicConnection("john", "needle");
548:
549: getLog().debug("Connection to JBossMQ established.");
550: }
551: }
552:
553: // Emptys out all the messages in a queue
554: private int drainQueue() throws Exception {
555: getLog().debug("Draining Queue");
556: queueConnection.start();
557:
558: QueueSession session = queueConnection.createQueueSession(
559: false, Session.AUTO_ACKNOWLEDGE);
560: Queue queue = (Queue) context.lookup(TEST_QUEUE);
561:
562: QueueReceiver receiver = session.createReceiver(queue);
563: Message message = receiver.receive(1000);
564: int c = 0;
565: while (message != null) {
566: message = receiver.receive(1000);
567: c++;
568: }
569:
570: getLog().debug(" Drained " + c + " messages from the queue");
571:
572: session.close();
573:
574: queueConnection.stop();
575:
576: return c;
577: }
578:
579: // Emptys out all the messages in a topic
580: private int drainTopic() throws Exception {
581: getLog().debug("Draining Topic");
582: topicConnection.start();
583:
584: final TopicSession session = topicConnection
585: .createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
586: Topic topic = (Topic) context.lookup(TEST_TOPIC);
587: TopicSubscriber subscriber = session.createSubscriber(topic);
588:
589: Message message = subscriber.receive(1000);
590: int c = 0;
591: while (message != null) {
592: message = subscriber.receive(1000);
593: c++;
594: }
595:
596: getLog().debug(" Drained " + c + " messages from the topic");
597:
598: session.close();
599:
600: topicConnection.stop();
601:
602: return c;
603: }
604:
605: // Emptys out all the messages in a durable topic
606: private int drainDurableTopic() throws Exception {
607: getLog().debug("Draining Durable Topic");
608: topicDurableConnection.start();
609:
610: final TopicSession session = topicDurableConnection
611: .createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
612: Topic topic = (Topic) context.lookup(TEST_DURABLE_TOPIC);
613: TopicSubscriber subscriber = session.createDurableSubscriber(
614: topic, "test");
615:
616: Message message = subscriber.receive(1000);
617: int c = 0;
618: while (message != null) {
619: message = subscriber.receive(1000);
620: c++;
621: }
622:
623: getLog().debug(
624: " Drained " + c + " messages from the durable topic");
625:
626: session.close();
627:
628: topicDurableConnection.stop();
629:
630: return c;
631: }
632:
633: private void waitForSynchMessage() throws Exception {
634: getLog().debug("Waiting for Synch Message");
635: QueueSession session = queueConnection.createQueueSession(
636: false, Session.AUTO_ACKNOWLEDGE);
637: Queue queue = (Queue) context.lookup(TEST_QUEUE);
638:
639: QueueReceiver receiver = session.createReceiver(queue);
640: receiver.receive();
641: session.close();
642: getLog().debug("Got Synch Message");
643: }
644:
645: private void sendSynchMessage() throws Exception {
646: getLog().debug("Sending Synch Message");
647: QueueSession session = queueConnection.createQueueSession(
648: false, Session.AUTO_ACKNOWLEDGE);
649: Queue queue = (Queue) context.lookup(TEST_QUEUE);
650:
651: QueueSender sender = session.createSender(queue);
652:
653: Message message = session.createMessage();
654: sender.send(message);
655:
656: session.close();
657: getLog().debug("Sent Synch Message");
658: }
659:
660: public class MyMessageListener implements MessageListener {
661: public int i = 0;
662:
663: public int iterationCount;
664:
665: public Category log;
666:
667: public MyMessageListener(int iterationCount, Category log) {
668: this .iterationCount = iterationCount;
669: this .log = log;
670: }
671:
672: public void onMessage(Message message) {
673: synchronized (this ) {
674: i++;
675: log.debug("Got message " + i);
676: if (i >= iterationCount)
677: this .notify();
678: }
679: }
680: }
681:
682: public int getIterationCount() {
683: return 5;
684: }
685: }
|