001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.test.jbossmq;
023:
024: import java.util.Enumeration;
025:
026: import javax.jms.Connection;
027: import javax.jms.DeliveryMode;
028: import javax.jms.Destination;
029: import javax.jms.ExceptionListener;
030: import javax.jms.JMSException;
031: import javax.jms.Message;
032: import javax.jms.MessageConsumer;
033: import javax.jms.MessageListener;
034: import javax.jms.MessageProducer;
035: import javax.jms.Queue;
036: import javax.jms.QueueBrowser;
037: import javax.jms.QueueConnection;
038: import javax.jms.QueueConnectionFactory;
039: import javax.jms.QueueSender;
040: import javax.jms.QueueSession;
041: import javax.jms.Session;
042: import javax.jms.Topic;
043: import javax.jms.TopicConnection;
044: import javax.jms.TopicConnectionFactory;
045: import javax.jms.TopicPublisher;
046: import javax.jms.TopicSession;
047: import javax.naming.Context;
048: import javax.naming.NamingException;
049:
050: import org.apache.log4j.Category;
051: import org.jboss.test.JBossTestCase;
052:
053: /**
054: * JMS tests base class.
055: *
056: * Your test extends this class, and can then use common methods. To do
057: * the tests you use TopicWorker or QueueWorker and the MessageCreator,
058: * MessageFilter and perhaps MessageQos classes, directly or by extending
059: * them.
060: *
061: * You can change the connection factories and destinations used by the
062: * properties: jbosstest.queuefactory, jbosstest.topicfactory,
063: * jbosstest.queue or jbosstest.topic.
064: *
065: *
066: * @author <a href="pra@tim.se">Peter Antman</a>
067: * @version $Revision: 57211 $
068: */
069: public class MQBase extends JBossTestCase {
070: public static final int PUBLISHER = 0;
071: public static final int SUBSCRIBER = 1;
072: public static final int GETTER = 2;
073: public static final int CONNECTOR = 3;
074: public static final int FAILSAFE_SUBSCRIBER = 4;
075: public static final int TRANS_NONE = 0;
076: public static final int TRANS_INDIVIDUAL = 1;
077: public static final int TRANS_TOTAL = 2;
078: public static final String[] TRANS_DESC = { "NOT", "individually",
079: "totally" };
080: public static final int DEFAULT_RUNSLEEP = 50;
081: public final Category log = getLog();
082:
083: // Provider specific
084: public String TOPIC_FACTORY = "ConnectionFactory";
085: public String QUEUE_FACTORY = "ConnectionFactory";
086:
087: public String TEST_QUEUE = "queue/testQueue";
088: public String TEST_TOPIC = "topic/testTopic";
089:
090: public Context context;
091: public QueueConnectionFactory queueFactory;
092: public TopicConnectionFactory topicFactory;
093:
094: public MQBase(String name) {
095: super (name);
096: }
097:
098: public long getRunSleep() {
099: log.info("run.sleep: " + System.getProperty("run.sleep"));
100: return 1000L * Integer
101: .getInteger("run.sleep", DEFAULT_RUNSLEEP).intValue();
102: }
103:
104: public void sleep(long sleep) {
105: try {
106: Thread.sleep(sleep);
107: } catch (InterruptedException e) {
108: }
109: }
110:
111: public void drainTopic() throws JMSException {
112: TopicWorker sub1 = new TopicWorker(GETTER, TRANS_NONE, null);
113: sub1.connect();
114: sub1.get();
115: sub1.close();
116: }
117:
118: public void drainQueue() throws JMSException {
119: QueueWorker sub1 = new QueueWorker(GETTER, TRANS_NONE, null);
120: sub1.connect();
121: sub1.get();
122: sub1.close();
123: }
124:
125: /**
126: * The JUnit setup method
127: *
128: * @exception Exception Description of Exception
129: */
130: protected void setUp() throws Exception {
131: // Reconfigure acording to props
132: QUEUE_FACTORY = System.getProperty("jbosstest.queuefactory",
133: QUEUE_FACTORY);
134: TOPIC_FACTORY = System.getProperty("jbosstest.topicfactory",
135: TOPIC_FACTORY);
136: TEST_QUEUE = System.getProperty("jbosstest.queue", TEST_QUEUE);
137: TEST_TOPIC = System.getProperty("jbosstest.topic", TEST_TOPIC);
138:
139: if (context == null) {
140:
141: context = getInitialContext();
142:
143: queueFactory = (QueueConnectionFactory) context
144: .lookup(QUEUE_FACTORY);
145: topicFactory = (TopicConnectionFactory) context
146: .lookup(TOPIC_FACTORY);
147:
148: getLog().debug("Connection to JBossMQ established.");
149: }
150:
151: }
152:
153: public static void main(String[] args) {
154:
155: }
156:
157: public abstract class JMSWorker implements Runnable,
158: MessageListener, ExceptionListener {
159:
160: protected boolean stopRequested = false;
161: protected int messageHandled = 0;
162: protected Exception runEx = null;
163: protected MessageFilter filter;
164: protected MessageCreator creator;
165: protected int number = 1;
166: protected int type = -1;
167: protected int transacted;
168: protected QosConfig qosConfig = new QosConfig();
169: protected String userName;
170: protected String password;
171: protected String clientID;
172:
173: // Generic ones, should be set by sublcasses
174: public Connection connection;
175: public Destination destination;
176: public Session session;
177: public MessageProducer producer;
178: public MessageConsumer consumer;
179:
180: /**
181: * Create one without any settings, use mutators instead. Makes it easier to owerride.
182: */
183: public JMSWorker() {
184: }
185:
186: public JMSWorker(int type, int transacted, MessageFilter filter) {
187: this .type = type;
188: this .transacted = transacted;
189: this .filter = filter;
190: }
191:
192: public JMSWorker(int type, int transacted,
193: MessageCreator creator, int number) {
194: this .type = type;
195: this .transacted = transacted;
196: this .creator = creator;
197: this .number = number;
198: }
199:
200: public void setSubscriberAttrs(int type, int transacted,
201: MessageFilter filter) {
202: this .type = type;
203: this .transacted = transacted;
204: this .filter = filter;
205: }
206:
207: public void setPublisherAttrs(int type, int transacted,
208: MessageCreator creator, int number) {
209: this .type = type;
210: this .transacted = transacted;
211: this .creator = creator;
212: this .number = number;
213: }
214:
215: public void setUser(String userName, String password) {
216: this .userName = userName;
217: this .password = password;
218: }
219:
220: public void setClientID(String ID) {
221: this .clientID = ID;
222: }
223:
224: abstract public void publish() throws JMSException;
225:
226: abstract public void publish(int nr) throws JMSException;
227:
228: /**
229: * Subsribes, collects, checking any set filters. A messageComsumer must be created before calling this.
230: */
231: public void subscribe() throws JMSException {
232: subscribe(false);
233: }
234:
235: /**
236: * Subsribes, collects, checking any set filters. A messageComsumer must be created before calling this. If arg set to true, do a failsafe sub
237: */
238: public void subscribe(boolean failsafe) throws JMSException {
239: if (consumer == null)
240: throw new JMSException("No messageConsumer created");
241:
242: if (failsafe)
243: connection.setExceptionListener(this );
244:
245: consumer.setMessageListener(this );
246:
247: }
248:
249: public void get() throws JMSException {
250: Message msg = consumer.receive(2000);
251: while (msg != null) {
252: if (filter != null) {
253: if (filter.ok(msg))
254: messageHandled++;
255: } else {
256: messageHandled++;
257: }
258: msg = consumer.receive(2000);
259: }
260: }
261:
262: abstract public void connect() throws JMSException;
263:
264: public void setQosConfig(QosConfig qosConfig) {
265: this .qosConfig = qosConfig;
266: }
267:
268: public void setStoped() throws JMSException {
269: stopRequested = true;
270: }
271:
272: public int getMessageHandled() {
273: return messageHandled;
274: }
275:
276: public Exception getException() {
277: return runEx;
278: }
279:
280: public void reset() {
281: messageHandled = 0;
282: stopRequested = false;
283: runEx = null;
284: }
285:
286: public void close() {
287: try {
288: if (consumer != null)
289: consumer.close();
290: if (producer != null)
291: producer.close();
292: if (session != null)
293: session.close();
294: } catch (JMSException ex) {
295: } finally {
296: if (connection != null) {
297: try {
298: connection.close();
299: } catch (JMSException ex) {
300: }
301: }
302: }
303: }
304:
305: public void onMessage(Message msg) {
306: try {
307: if (filter != null) {
308: if (filter.ok(msg))
309: messageHandled++;
310: } else {
311: messageHandled++;
312: }
313: if (session.getTransacted())
314: session.commit();
315: } catch (Exception ex) {
316: log.warn("Exception in on message: " + ex, ex);
317: runEx = ex;
318: }
319: }
320:
321: /**
322: * onException handling is only for subscriber. Will try to to
323: * a connect followed by a subscribe
324: */
325: public void onException(JMSException ex) {
326: log.error("Ex in connection: " + ex);
327:
328: try {
329: connection.setExceptionListener(null);
330: close();
331: } catch (JMSException c) {
332: }
333:
334: // Try reconnect, loops until success or shut down
335: try {
336: boolean tryIt = true;
337: while (tryIt && !stopRequested) {
338: log.info("Trying reconnect...");
339: try {
340: Thread.sleep(10000);
341: } catch (InterruptedException ie) {
342: }
343: try {
344: connect();
345: subscribe(true);
346: tryIt = false;
347: log.info("Reconnect OK");
348: //return;
349: } catch (JMSException e) {
350: log.error("Error in reconnect: " + e);
351: }
352: }
353:
354: } catch (Exception je) {
355: log
356: .error("Strange error in failsafe handling"
357: + je, je);
358: }
359: }
360:
361: public void run() {
362: try {
363: switch (type) {
364: case -1:
365: log.info("Nothing to do for type " + type);
366: break;
367: case PUBLISHER:
368: connect();
369: publish();
370: break;
371: case SUBSCRIBER:
372: connect();
373: subscribe();
374: break;
375: case GETTER:
376: connect();
377: get();
378: break;
379: case CONNECTOR:
380: connect();
381: break;
382: case FAILSAFE_SUBSCRIBER:
383: connect();
384: subscribe(true);
385: break;
386: }
387:
388: //if the method does not hold an own thread, we do it here
389: while (!stopRequested) {
390: try {
391: Thread.sleep(1000);
392: } catch (InterruptedException ex) {
393:
394: }
395: }
396: } catch (JMSException ex) {
397: runEx = ex;
398: log.error("Could not run: " + ex, ex);
399: }
400: }
401: }
402:
403: public interface MessageCreator {
404: public void setSession(Session session);
405:
406: public Message createMessage(int nr) throws JMSException;
407: }
408:
409: public abstract class BaseMessageCreator implements MessageCreator {
410: protected Session session;
411: protected String property;
412:
413: public BaseMessageCreator(String property) {
414: this .property = property;
415: }
416:
417: public void setSession(Session session) {
418: this .session = session;
419: }
420:
421: abstract public Message createMessage(int nr)
422: throws JMSException;
423: }
424:
425: public class IntRangeMessageCreator extends BaseMessageCreator {
426: int start = 0;
427:
428: public IntRangeMessageCreator(String property) {
429: super (property);
430: }
431:
432: public IntRangeMessageCreator(String property, int start) {
433: super (property);
434: this .start = start;
435: }
436:
437: public Message createMessage(int nr) throws JMSException {
438: if (session == null)
439: throw new JMSException("Session not allowed to be null");
440:
441: Message msg = session.createMessage();
442: msg.setStringProperty(property, String.valueOf(start + nr));
443: return msg;
444: }
445: }
446:
447: public interface MessageFilter {
448: public boolean ok(Message msg) throws JMSException;
449: }
450:
451: public class IntRangeMessageFilter implements MessageFilter {
452: Class messageClass;
453: String className;
454: String property;
455: int low;
456: int max;
457: int counter = 0;
458: int report = 1000;
459:
460: public IntRangeMessageFilter(Class messageClass,
461: String property, int low, int max) {
462: this .messageClass = messageClass;
463: this .property = property;
464: className = messageClass.getName();
465: this .low = low;
466: this .max = max;
467: }
468:
469: private boolean validateClass(Message msg) {
470: Class clazz = null;
471: if (msg instanceof javax.jms.TextMessage)
472: clazz = javax.jms.TextMessage.class;
473: else if (msg instanceof javax.jms.BytesMessage)
474: clazz = javax.jms.BytesMessage.class;
475: else if (msg instanceof javax.jms.MapMessage)
476: clazz = javax.jms.MapMessage.class;
477: else if (msg instanceof javax.jms.ObjectMessage)
478: clazz = javax.jms.ObjectMessage.class;
479: else if (msg instanceof javax.jms.StreamMessage)
480: clazz = javax.jms.StreamMessage.class;
481: else
482: clazz = javax.jms.Message.class;
483:
484: return clazz.equals(messageClass);
485: }
486:
487: public boolean ok(Message msg) throws JMSException {
488: boolean res = false;
489: if (validateClass(msg)) {
490: if (msg.propertyExists(property)) {
491: String p = msg.getStringProperty(property);
492: try {
493: int i = Integer.parseInt(p);
494: //log.debug("Received message " + property +"=" +i);
495: if (i >= low && i < max)
496: res = true;
497: } catch (NumberFormatException ex) {
498: throw new JMSException("Property " + property
499: + " was not int: " + p);
500: }
501: }
502: }
503: counter++;
504: int mod = counter % report;
505: if (mod == 0)
506: log.debug("Have received " + counter + " messages");
507: return res;
508: }
509:
510: }
511:
512: /*
513: public class REMessageFilter implements MessageFilter {
514: Class messageClass;
515: String className;
516: String property;
517: RE re = null;
518: public REMessageFilter(Class messageClass, String property, String regexp) throws REException{
519: this.messageClass = messageClass;
520: this.property = property;
521: re = new RE(regexp);
522: className = messageClass.getName();
523: }
524:
525: public boolean ok(Message msg) throws JMSException{
526: boolean res = false;
527: if (className.equals(msg.getClass().getName())) {
528: if (msg.propertyExists(property)) {
529: String p = msg.getStringProperty(property);
530: if (re.getMatch(p)!=null)
531: res = true;
532: }
533: }
534: return true;
535: }
536: }
537: */
538: /**
539: * Defines quality of service for message publishing. Defaults are the same
540: * ase defined in SpyMessage.
541: */
542: public class QosConfig {
543: int deliveryMode = DeliveryMode.PERSISTENT;
544: int priority = 4;
545: long ttl = 0;
546: }
547:
548: public class TopicWorker extends JMSWorker {
549: String durableHandle;
550:
551: /**
552: * If using this, use mutators to add attrs.
553: */
554: public TopicWorker() {
555: super ();
556: }
557:
558: public TopicWorker(int type, int transacted,
559: MessageFilter filter) {
560: super (type, transacted, filter);
561: }
562:
563: public TopicWorker(int type, int transacted,
564: MessageCreator creator, int number) {
565: super (type, transacted, creator, number);
566: }
567:
568: public void publish() throws JMSException {
569: publish(number);
570: }
571:
572: public void publish(int nr) throws JMSException {
573: if (producer == null)
574: producer = ((TopicSession) session)
575: .createPublisher((Topic) destination);
576: if (creator == null)
577: throw new JMSException(
578: "Publish must have a MessageCreator set");
579:
580: creator.setSession(session);
581: log.debug("Publishing " + nr + " messages");
582: for (int i = 0; i < nr; i++) {
583: if (qosConfig != null) {
584: ((TopicPublisher) producer).publish(creator
585: .createMessage(i), qosConfig.deliveryMode,
586: qosConfig.priority, qosConfig.ttl);
587: } else {
588: ((TopicPublisher) producer).publish(creator
589: .createMessage(i));
590: }
591:
592: messageHandled++;
593: }
594: if (session.getTransacted())
595: session.commit();
596: log.debug("Finished publishing");
597: }
598:
599: public void subscribe() throws JMSException {
600: subscribe(false);
601: }
602:
603: public void subscribe(boolean failsafe) throws JMSException {
604: if (durableHandle != null)
605: consumer = ((TopicSession) session)
606: .createDurableSubscriber((Topic) destination,
607: durableHandle);
608: else
609: consumer = ((TopicSession) session)
610: .createSubscriber((Topic) destination);
611: super .subscribe(failsafe);
612: connection.start();
613: }
614:
615: public void get() throws JMSException {
616: consumer = ((TopicSession) session)
617: .createSubscriber((Topic) destination);
618: super .subscribe();
619: connection.start();
620: }
621:
622: public void connect() throws JMSException {
623: log.debug("Connecting: " + this .toString());
624: if (userName != null)
625: connection = topicFactory.createTopicConnection(
626: userName, password);
627: else
628: connection = topicFactory.createTopicConnection();
629:
630: if (clientID != null) {
631: log.debug("Setting clientID" + clientID);
632: connection.setClientID(clientID);
633: }
634:
635: session = ((TopicConnection) connection)
636: .createTopicSession(transacted != TRANS_NONE,
637: Session.AUTO_ACKNOWLEDGE);
638: try {
639: destination = (Destination) context.lookup(TEST_TOPIC);
640: } catch (NamingException ex) {
641: throw new JMSException("Could not lookup topic " + ex);
642: }
643: }
644:
645: // Topic specific stuff
646: public void setDurable(String userId, String pwd, String handle) {
647: this .userName = userId;
648: this .password = pwd;
649: this .durableHandle = handle;
650: }
651:
652: public void setDurable(String handle) {
653: this .durableHandle = handle;
654: }
655:
656: public void unsubscribe() throws JMSException {
657: if (durableHandle != null)
658: ((TopicSession) session).unsubscribe(durableHandle);
659: }
660:
661: public String toString() {
662: return "(userId=" + userName + " pwd=" + password
663: + " handle=" + durableHandle + ")";
664: }
665:
666: }
667:
668: public class QueueWorker extends JMSWorker {
669: String userId;
670: String pwd;
671: String handle;
672:
673: /**
674: * If using this, use mutators to add attrs.
675: */
676: public QueueWorker() {
677: super ();
678: }
679:
680: public QueueWorker(int type, int transacted,
681: MessageFilter filter) {
682: super (type, transacted, filter);
683: }
684:
685: public QueueWorker(int type, int transacted,
686: MessageCreator creator, int number) {
687: super (type, transacted, creator, number);
688: }
689:
690: public void publish() throws JMSException {
691: publish(number);
692: }
693:
694: public void publish(int nr) throws JMSException {
695: if (producer == null)
696: producer = ((QueueSession) session)
697: .createSender((Queue) destination);
698: if (creator == null)
699: throw new JMSException(
700: "Publish must have a MessageCreator set");
701:
702: creator.setSession(session);
703: log.debug("Publishing " + nr + " messages");
704: for (int i = 0; i < nr; i++) {
705: if (qosConfig != null) {
706: ((QueueSender) producer).send(creator
707: .createMessage(i), qosConfig.deliveryMode,
708: qosConfig.priority, qosConfig.ttl);
709: } else {
710: ((QueueSender) producer).send(creator
711: .createMessage(i));
712: }
713:
714: messageHandled++;
715: }
716: if (session.getTransacted())
717: session.commit();
718: log.debug("Finished publishing");
719: }
720:
721: public void subscribe() throws JMSException {
722: subscribe(false);
723: }
724:
725: public void subscribe(boolean failsafe) throws JMSException {
726:
727: consumer = ((QueueSession) session)
728: .createReceiver((Queue) destination);
729: super .subscribe(failsafe);
730: connection.start();
731: }
732:
733: public void get() throws JMSException {
734: consumer = ((QueueSession) session)
735: .createReceiver((Queue) destination);
736: super .subscribe();
737: connection.start();
738: }
739:
740: public void connect() throws JMSException {
741: log.debug("Connecting: " + this .toString());
742: if (userName != null)
743: connection = queueFactory.createQueueConnection(
744: userName, password);
745: else
746: connection = queueFactory.createQueueConnection();
747:
748: if (clientID != null)
749: connection.setClientID(clientID);
750:
751: session = ((QueueConnection) connection)
752: .createQueueSession(transacted != TRANS_NONE,
753: Session.AUTO_ACKNOWLEDGE);
754: try {
755: destination = (Destination) context.lookup(TEST_QUEUE);
756: } catch (NamingException ex) {
757: throw new JMSException("Could not lookup topic " + ex);
758: }
759: }
760:
761: // Queue specific
762: public Enumeration browse() throws JMSException {
763: QueueBrowser b = ((QueueSession) session)
764: .createBrowser((Queue) destination);
765: return b.getEnumeration();
766: }
767: }
768: } // MQBase
|