001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.test.jbossmq.test;
023:
024: import javax.jms.BytesMessage;
025: import javax.jms.DeliveryMode;
026: import javax.jms.JMSException;
027: import javax.jms.Message;
028: import javax.jms.MessageListener;
029: import javax.jms.QueueConnection;
030: import javax.jms.QueueConnectionFactory;
031: import javax.jms.QueueReceiver;
032: import javax.jms.QueueSender;
033: import javax.jms.QueueSession;
034: import javax.jms.Session;
035: import javax.jms.Topic;
036: import javax.jms.TopicConnection;
037: import javax.jms.TopicConnectionFactory;
038: import javax.jms.TopicPublisher;
039: import javax.jms.TopicSession;
040: import javax.jms.TopicSubscriber;
041: import javax.jms.Queue;
042: import javax.naming.Context;
043:
044: import org.apache.log4j.Category;
045: import org.jboss.test.JBossTestCase;
046:
047: /**
048: * Rollback tests
049: *
050: * @author
051: * @version
052: */
053: public class RollBackUnitTestCase extends JBossTestCase {
054:
055: // Provider specific
056: static String TOPIC_FACTORY = "ConnectionFactory";
057:
058: static String QUEUE_FACTORY = "ConnectionFactory";
059:
060: static String TEST_QUEUE = "queue/testQueue";
061:
062: static String TEST_TOPIC = "topic/testTopic";
063:
064: static String TEST_DURABLE_TOPIC = "topic/testDurableTopic";
065:
066: static byte[] PAYLOAD = new byte[10];
067:
068: static Context context;
069:
070: static QueueConnection queueConnection;
071:
072: static TopicConnection topicConnection;
073:
074: static TopicConnection topicDurableConnection;
075:
076: /**
077: * Constructor the test
078: *
079: * @param name Description of Parameter
080: * @exception Exception Description of Exception
081: */
082: public RollBackUnitTestCase(String name) throws Exception {
083: super (name);
084: }
085:
086: /**
087: * #Description of the Method
088: *
089: * @param persistence Description of Parameter
090: * @exception Exception Description of Exception
091: */
092: public void runQueueSendRollBack(final int persistence,
093: final boolean explicit) throws Exception {
094: drainQueue();
095: final int iterationCount = getIterationCount();
096: final Category log = getLog();
097:
098: Thread sendThread = new Thread() {
099: public void run() {
100: try {
101: QueueSession session = queueConnection
102: .createQueueSession(true,
103: Session.AUTO_ACKNOWLEDGE);
104: Queue queue = (Queue) context.lookup(TEST_QUEUE);
105:
106: QueueSender sender = session.createSender(queue);
107:
108: BytesMessage message = session.createBytesMessage();
109: message.writeBytes(PAYLOAD);
110: message.setStringProperty("TEST_NAME",
111: "runQueueSendRollback");
112: message.setIntProperty("TEST_PERSISTENCE",
113: persistence);
114: message.setBooleanProperty("TEST_EXPLICIT",
115: explicit);
116:
117: for (int i = 0; i < iterationCount; i++) {
118: sender.send(message, persistence, 4, 0);
119: }
120:
121: if (explicit)
122: session.rollback();
123: session.close();
124: } catch (Exception e) {
125: log.error("error", e);
126: }
127: }
128: };
129:
130: sendThread.start();
131: sendThread.join();
132: assertTrue("Queue should be empty", drainQueue() == 0);
133: }
134:
135: /**
136: * #Description of the Method
137: *
138: * @param persistence Description of Parameter
139: * @exception Exception Description of Exception
140: */
141: public void runTopicSendRollBack(final int persistence,
142: final boolean explicit) throws Exception {
143: drainQueue();
144: drainTopic();
145:
146: final int iterationCount = getIterationCount();
147: final Category log = getLog();
148:
149: Thread sendThread = new Thread() {
150: public void run() {
151: try {
152:
153: TopicSession session = topicConnection
154: .createTopicSession(true,
155: Session.AUTO_ACKNOWLEDGE);
156: Topic topic = (Topic) context.lookup(TEST_TOPIC);
157:
158: TopicPublisher publisher = session
159: .createPublisher(topic);
160:
161: BytesMessage message = session.createBytesMessage();
162: message.writeBytes(PAYLOAD);
163: message.setStringProperty("TEST_NAME",
164: "runTopicSendRollback");
165: message.setIntProperty("TEST_PERSISTENCE",
166: persistence);
167: message.setBooleanProperty("TEST_EXPLICIT",
168: explicit);
169:
170: for (int i = 0; i < iterationCount; i++) {
171: publisher.publish(message, persistence, 4, 0);
172: }
173:
174: session.close();
175: } catch (Exception e) {
176: log.error("error", e);
177: }
178: }
179: };
180:
181: sendThread.start();
182: sendThread.join();
183: assertTrue("Topic should be empty", drainTopic() == 0);
184: }
185:
186: /**
187: * #Description of the Method
188: *
189: * @param persistence Description of Parameter
190: * @exception Exception Description of Exception
191: */
192: public void runAsynchQueueReceiveRollBack(final int persistence,
193: final boolean explicit) throws Exception {
194: drainQueue();
195:
196: final int iterationCount = getIterationCount();
197: final Category log = getLog();
198:
199: Thread sendThread = new Thread() {
200: public void run() {
201: try {
202: QueueSession session = queueConnection
203: .createQueueSession(false,
204: Session.AUTO_ACKNOWLEDGE);
205: Queue queue = (Queue) context.lookup(TEST_QUEUE);
206:
207: QueueSender sender = session.createSender(queue);
208:
209: BytesMessage message = session.createBytesMessage();
210: message.writeBytes(PAYLOAD);
211: message.setStringProperty("TEST_NAME",
212: "runAsynchQueueReceiveRollback");
213: message.setIntProperty("TEST_PERSISTENCE",
214: persistence);
215: message.setBooleanProperty("TEST_EXPLICIT",
216: explicit);
217:
218: for (int i = 0; i < iterationCount; i++) {
219: sender.send(message, persistence, 4, 0);
220: }
221:
222: session.close();
223: } catch (Exception e) {
224: log.error("error", e);
225: }
226: }
227: };
228:
229: QueueSession session = queueConnection.createQueueSession(true,
230: Session.AUTO_ACKNOWLEDGE);
231: Queue queue = (Queue) context.lookup(TEST_QUEUE);
232: QueueReceiver receiver = session.createReceiver(queue);
233:
234: MyMessageListener listener = new MyMessageListener(
235: iterationCount, log);
236:
237: sendThread.start();
238: receiver.setMessageListener(listener);
239: queueConnection.start();
240: synchronized (listener) {
241: if (listener.i < iterationCount)
242: listener.wait();
243: }
244: receiver.setMessageListener(null);
245:
246: if (explicit)
247: session.rollback();
248: session.close();
249:
250: queueConnection.stop();
251:
252: sendThread.join();
253:
254: assertTrue("Queue should be full",
255: drainQueue() == iterationCount);
256:
257: }
258:
259: /**
260: * #Description of the Method
261: *
262: * @param persistence Description of Parameter
263: * @exception Exception Description of Exception
264: */
265: public void runAsynchTopicReceiveRollBack(final int persistence,
266: final boolean explicit) throws Exception {
267: drainQueue();
268: drainTopic();
269:
270: final int iterationCount = getIterationCount();
271: final Category log = getLog();
272:
273: Thread sendThread = new Thread() {
274: public void run() {
275: try {
276:
277: TopicSession session = topicConnection
278: .createTopicSession(false,
279: Session.AUTO_ACKNOWLEDGE);
280: Topic topic = (Topic) context.lookup(TEST_TOPIC);
281:
282: TopicPublisher publisher = session
283: .createPublisher(topic);
284:
285: waitForSynchMessage();
286:
287: BytesMessage message = session.createBytesMessage();
288: message.writeBytes(PAYLOAD);
289: message.setStringProperty("TEST_NAME",
290: "runAsynchTopicReceiveRollback");
291: message.setIntProperty("TEST_PERSISTENCE",
292: persistence);
293: message.setBooleanProperty("TEST_EXPLICIT",
294: explicit);
295:
296: for (int i = 0; i < iterationCount; i++) {
297: publisher.publish(message, persistence, 4, 0);
298: log.debug("Published message " + i);
299: }
300:
301: session.close();
302: } catch (Exception e) {
303: log.error("error", e);
304: }
305: }
306: };
307:
308: TopicSession session = topicConnection.createTopicSession(true,
309: Session.AUTO_ACKNOWLEDGE);
310: Topic topic = (Topic) context.lookup(TEST_TOPIC);
311: TopicSubscriber subscriber = session.createSubscriber(topic);
312:
313: MyMessageListener listener = new MyMessageListener(
314: iterationCount, log);
315:
316: queueConnection.start();
317: sendThread.start();
318: subscriber.setMessageListener(listener);
319: topicConnection.start();
320: sendSynchMessage();
321: getLog().debug("Waiting for all messages");
322: synchronized (listener) {
323: if (listener.i < iterationCount)
324: listener.wait();
325: }
326: getLog().debug("Got all messages");
327: subscriber.setMessageListener(null);
328:
329: if (explicit)
330: session.rollback();
331: session.close();
332:
333: sendThread.join();
334: topicConnection.stop();
335: queueConnection.stop();
336: assertTrue("Topic should be empty", drainTopic() == 0);
337: }
338:
339: /**
340: * #Description of the Method
341: *
342: * @param persistence Description of Parameter
343: * @exception Exception Description of Exception
344: */
345: public void runAsynchDurableTopicReceiveRollBack(
346: final int persistence, final boolean explicit)
347: throws Exception {
348: getLog().debug(
349: "====> runAsynchDurableTopicReceiveRollBack persistence="
350: + persistence + " explicit=" + explicit);
351: drainQueue();
352: drainDurableTopic();
353:
354: final int iterationCount = getIterationCount();
355: final Category log = getLog();
356:
357: Thread sendThread = new Thread() {
358: public void run() {
359: try {
360:
361: TopicSession session = topicConnection
362: .createTopicSession(false,
363: Session.AUTO_ACKNOWLEDGE);
364: Topic topic = (Topic) context
365: .lookup(TEST_DURABLE_TOPIC);
366:
367: TopicPublisher publisher = session
368: .createPublisher(topic);
369:
370: waitForSynchMessage();
371:
372: BytesMessage message = session.createBytesMessage();
373: message.writeBytes(PAYLOAD);
374: message.setStringProperty("TEST_NAME",
375: "runAsynchDurableTopicReceiveRollback");
376: message.setIntProperty("TEST_PERSISTENCE",
377: persistence);
378: message.setBooleanProperty("TEST_EXPLICIT",
379: explicit);
380:
381: for (int i = 0; i < iterationCount; i++) {
382: publisher.publish(message, persistence, 4, 0);
383: log.debug("Published message " + i);
384: }
385:
386: session.close();
387: } catch (Exception e) {
388: log.error("error", e);
389: }
390: }
391: };
392:
393: TopicSession session = topicDurableConnection
394: .createTopicSession(true, Session.AUTO_ACKNOWLEDGE);
395: Topic topic = (Topic) context.lookup(TEST_DURABLE_TOPIC);
396: TopicSubscriber subscriber = session.createDurableSubscriber(
397: topic, "test");
398: try {
399: MyMessageListener listener = new MyMessageListener(
400: iterationCount, log);
401:
402: queueConnection.start();
403: sendThread.start();
404: subscriber.setMessageListener(listener);
405: topicDurableConnection.start();
406: sendSynchMessage();
407: getLog().debug("Waiting for all messages");
408: synchronized (listener) {
409: if (listener.i < iterationCount)
410: listener.wait();
411: }
412: getLog().debug("Got all messages");
413: subscriber.setMessageListener(null);
414: subscriber.close();
415:
416: if (explicit)
417: session.rollback();
418: session.close();
419:
420: sendThread.join();
421: topicDurableConnection.stop();
422: queueConnection.stop();
423: assertTrue("Topic should be full",
424: drainDurableTopic() == iterationCount);
425: } finally {
426: removeDurableSubscription();
427: }
428: }
429:
430: /**
431: * A unit test for JUnit
432: *
433: * @exception Exception Description of Exception
434: */
435: public void testQueueSendRollBack() throws Exception {
436:
437: getLog().debug("Starting AsynchQueueSendRollBack test");
438:
439: runQueueSendRollBack(DeliveryMode.NON_PERSISTENT, false);
440: runQueueSendRollBack(DeliveryMode.PERSISTENT, false);
441: runQueueSendRollBack(DeliveryMode.NON_PERSISTENT, true);
442: runQueueSendRollBack(DeliveryMode.PERSISTENT, true);
443:
444: getLog().debug("AsynchQueueSendRollBack passed");
445: }
446:
447: /**
448: * A unit test for JUnit
449: *
450: * @exception Exception Description of Exception
451: */
452: public void testAsynchQueueReceiveBack() throws Exception {
453:
454: getLog().debug("Starting AsynchQueueReceiveRollBack test");
455:
456: runAsynchQueueReceiveRollBack(DeliveryMode.NON_PERSISTENT,
457: false);
458: runAsynchQueueReceiveRollBack(DeliveryMode.PERSISTENT, false);
459: runQueueSendRollBack(DeliveryMode.NON_PERSISTENT, true);
460: runQueueSendRollBack(DeliveryMode.PERSISTENT, true);
461:
462: getLog().debug("AsynchQueueReceiveRollBack passed");
463: }
464:
465: /**
466: * A unit test for JUnit
467: *
468: * @exception Exception Description of Exception
469: */
470: public void testTopicSendRollBack() throws Exception {
471:
472: getLog().debug("Starting AsynchTopicSendRollBack test");
473:
474: runTopicSendRollBack(DeliveryMode.NON_PERSISTENT, false);
475: runTopicSendRollBack(DeliveryMode.PERSISTENT, false);
476: runTopicSendRollBack(DeliveryMode.NON_PERSISTENT, true);
477: runTopicSendRollBack(DeliveryMode.PERSISTENT, true);
478:
479: getLog().debug("AsynchTopicSendRollBack passed");
480: }
481:
482: /**
483: * A unit test for JUnit
484: *
485: * @exception Exception Description of Exception
486: */
487: public void testAsynchTopicReceiveRollBack() throws Exception {
488:
489: getLog().debug("Starting AsynchTopicReceiveRollBack test");
490:
491: runAsynchTopicReceiveRollBack(DeliveryMode.NON_PERSISTENT,
492: false);
493: runAsynchTopicReceiveRollBack(DeliveryMode.PERSISTENT, false);
494: runAsynchTopicReceiveRollBack(DeliveryMode.NON_PERSISTENT, true);
495: runAsynchTopicReceiveRollBack(DeliveryMode.PERSISTENT, true);
496:
497: getLog().debug("AsynchTopicReceiveRollBack passed");
498: }
499:
500: /**
501: * A unit test for JUnit
502: *
503: * @exception Exception Description of Exception
504: */
505: public void testAsynchDurableTopicReceiveRollBack()
506: throws Exception {
507:
508: getLog().debug(
509: "Starting AsynchDurableTopicReceiveRollBack test");
510:
511: runAsynchDurableTopicReceiveRollBack(
512: DeliveryMode.NON_PERSISTENT, false);
513: runAsynchDurableTopicReceiveRollBack(DeliveryMode.PERSISTENT,
514: false);
515: runAsynchDurableTopicReceiveRollBack(
516: DeliveryMode.NON_PERSISTENT, true);
517: runAsynchDurableTopicReceiveRollBack(DeliveryMode.PERSISTENT,
518: true);
519:
520: getLog().debug("AsynchDurableTopicReceiveRollBack passed");
521: }
522:
523: /**
524: * A unit test for JUnit
525: *
526: * @exception Exception Description of Exception
527: */
528: public void removeDurableSubscription() throws Exception {
529:
530: TopicSession session = topicDurableConnection
531: .createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
532: session.unsubscribe("test");
533: }
534:
535: /**
536: * The JUnit setup method
537: *
538: * @exception Exception Description of Exception
539: */
540: protected void setUp() throws Exception {
541: super .setUp();
542:
543: getLog().debug("START TEST " + getName());
544: context = getInitialContext();
545:
546: QueueConnectionFactory queueFactory = (QueueConnectionFactory) context
547: .lookup(QUEUE_FACTORY);
548: queueConnection = queueFactory.createQueueConnection();
549:
550: TopicConnectionFactory topicFactory = (TopicConnectionFactory) context
551: .lookup(TOPIC_FACTORY);
552: topicConnection = topicFactory.createTopicConnection();
553: topicDurableConnection = topicFactory.createTopicConnection(
554: "john", "needle");
555:
556: getLog().debug("Connection to JBossMQ established.");
557: }
558:
559: protected void tearDown() throws Exception {
560: try {
561: if (topicDurableConnection != null) {
562: topicDurableConnection.close();
563: topicDurableConnection = null;
564: }
565: } catch (JMSException ignored) {
566: }
567: try {
568: if (topicConnection != null) {
569: topicConnection.close();
570: topicConnection = null;
571: }
572: } catch (JMSException ignored) {
573: }
574: try {
575: if (queueConnection != null) {
576: queueConnection.close();
577: queueConnection = null;
578: }
579: } catch (JMSException ignored) {
580: }
581: super .tearDown();
582: }
583:
584: // Emptys out all the messages in a queue
585: private int drainQueue() throws Exception {
586: getLog().debug("Draining Queue");
587: queueConnection.start();
588:
589: QueueSession session = queueConnection.createQueueSession(
590: false, Session.AUTO_ACKNOWLEDGE);
591: Queue queue = (Queue) context.lookup(TEST_QUEUE);
592:
593: QueueReceiver receiver = session.createReceiver(queue);
594: Message message = receiver.receive(50);
595: int c = 0;
596: while (message != null) {
597: c++;
598: message = receiver.receive(50);
599: }
600:
601: getLog().debug(" Drained " + c + " messages from the queue");
602:
603: session.close();
604:
605: queueConnection.stop();
606:
607: return c;
608: }
609:
610: // Emptys out all the messages in a topic
611: private int drainTopic() throws Exception {
612: getLog().debug("Draining Topic");
613: topicConnection.start();
614:
615: final TopicSession session = topicConnection
616: .createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
617: Topic topic = (Topic) context.lookup(TEST_TOPIC);
618: TopicSubscriber subscriber = session.createSubscriber(topic);
619:
620: Message message = subscriber.receive(50);
621: int c = 0;
622: while (message != null) {
623: c++;
624: message = subscriber.receive(50);
625: }
626:
627: getLog().debug(" Drained " + c + " messages from the topic");
628:
629: session.close();
630:
631: topicConnection.stop();
632:
633: return c;
634: }
635:
636: // Emptys out all the messages in a durable topic
637: private int drainDurableTopic() throws Exception {
638: getLog().debug("Draining Durable Topic");
639: topicDurableConnection.start();
640:
641: final TopicSession session = topicDurableConnection
642: .createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
643: Topic topic = (Topic) context.lookup(TEST_DURABLE_TOPIC);
644: TopicSubscriber subscriber = session.createDurableSubscriber(
645: topic, "test");
646:
647: Message message = subscriber.receive(50);
648: int c = 0;
649: while (message != null) {
650: c++;
651: message = subscriber.receive(50);
652: }
653:
654: getLog().debug(
655: " Drained " + c + " messages from the durable topic");
656:
657: session.close();
658:
659: topicDurableConnection.stop();
660:
661: return c;
662: }
663:
664: private void waitForSynchMessage() throws Exception {
665: getLog().debug("Waiting for Synch Message");
666: QueueSession session = queueConnection.createQueueSession(
667: false, Session.AUTO_ACKNOWLEDGE);
668: Queue queue = (Queue) context.lookup(TEST_QUEUE);
669:
670: QueueReceiver receiver = session.createReceiver(queue);
671: receiver.receive();
672: session.close();
673: getLog().debug("Got Synch Message");
674: }
675:
676: private void sendSynchMessage() throws Exception {
677: getLog().debug("Sending Synch Message");
678: QueueSession session = queueConnection.createQueueSession(
679: false, Session.AUTO_ACKNOWLEDGE);
680: Queue queue = (Queue) context.lookup(TEST_QUEUE);
681:
682: QueueSender sender = session.createSender(queue);
683:
684: Message message = session.createMessage();
685: sender.send(message);
686:
687: session.close();
688: getLog().debug("Sent Synch Message");
689: }
690:
691: public class MyMessageListener implements MessageListener {
692: public int i = 0;
693:
694: public int iterationCount;
695:
696: public Category log;
697:
698: public MyMessageListener(int iterationCount, Category log) {
699: this .iterationCount = iterationCount;
700: this .log = log;
701: }
702:
703: public void onMessage(Message message) {
704: synchronized (this ) {
705: i++;
706: log.debug("Got message " + i);
707: if (i >= iterationCount)
708: this .notify();
709: }
710: }
711: }
712:
713: public int getIterationCount() {
714: return 5;
715: }
716: }
|