Source Code Cross Referenced for MQBase.java in  » EJB-Server-JBoss-4.2.1 » testsuite » org » jboss » test » jbossmq » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » EJB Server JBoss 4.2.1 » testsuite » org.jboss.test.jbossmq 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         * JBoss, Home of Professional Open Source.
003:         * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004:         * as indicated by the @author tags. See the copyright.txt file in the
005:         * distribution for a full listing of individual contributors.
006:         *
007:         * This is free software; you can redistribute it and/or modify it
008:         * under the terms of the GNU Lesser General Public License as
009:         * published by the Free Software Foundation; either version 2.1 of
010:         * the License, or (at your option) any later version.
011:         *
012:         * This software is distributed in the hope that it will be useful,
013:         * but WITHOUT ANY WARRANTY; without even the implied warranty of
014:         * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015:         * Lesser General Public License for more details.
016:         *
017:         * You should have received a copy of the GNU Lesser General Public
018:         * License along with this software; if not, write to the Free
019:         * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020:         * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021:         */
022:        package org.jboss.test.jbossmq;
023:
024:        import java.util.Enumeration;
025:
026:        import javax.jms.Connection;
027:        import javax.jms.DeliveryMode;
028:        import javax.jms.Destination;
029:        import javax.jms.ExceptionListener;
030:        import javax.jms.JMSException;
031:        import javax.jms.Message;
032:        import javax.jms.MessageConsumer;
033:        import javax.jms.MessageListener;
034:        import javax.jms.MessageProducer;
035:        import javax.jms.Queue;
036:        import javax.jms.QueueBrowser;
037:        import javax.jms.QueueConnection;
038:        import javax.jms.QueueConnectionFactory;
039:        import javax.jms.QueueSender;
040:        import javax.jms.QueueSession;
041:        import javax.jms.Session;
042:        import javax.jms.Topic;
043:        import javax.jms.TopicConnection;
044:        import javax.jms.TopicConnectionFactory;
045:        import javax.jms.TopicPublisher;
046:        import javax.jms.TopicSession;
047:        import javax.naming.Context;
048:        import javax.naming.NamingException;
049:
050:        import org.apache.log4j.Category;
051:        import org.jboss.test.JBossTestCase;
052:
053:        /**
054:         * JMS tests base class.
055:         *
056:         * Your test extends this class, and can then use common methods. To do
057:         * the tests you use TopicWorker or QueueWorker and the MessageCreator,
058:         * MessageFilter and perhaps MessageQos classes, directly or by extending
059:         * them.
060:         *
061:         * You can change the connection factories and destinations used by the
062:         * properties:   jbosstest.queuefactory, jbosstest.topicfactory, 
063:         * jbosstest.queue or jbosstest.topic.
064:         *
065:         *
066:         * @author    <a href="pra@tim.se">Peter Antman</a>
067:         * @version $Revision: 57211 $
068:         */
069:        public class MQBase extends JBossTestCase {
070:            public static final int PUBLISHER = 0;
071:            public static final int SUBSCRIBER = 1;
072:            public static final int GETTER = 2;
073:            public static final int CONNECTOR = 3;
074:            public static final int FAILSAFE_SUBSCRIBER = 4;
075:            public static final int TRANS_NONE = 0;
076:            public static final int TRANS_INDIVIDUAL = 1;
077:            public static final int TRANS_TOTAL = 2;
078:            public static final String[] TRANS_DESC = { "NOT", "individually",
079:                    "totally" };
080:            public static final int DEFAULT_RUNSLEEP = 50;
081:            public final Category log = getLog();
082:
083:            // Provider specific
084:            public String TOPIC_FACTORY = "ConnectionFactory";
085:            public String QUEUE_FACTORY = "ConnectionFactory";
086:
087:            public String TEST_QUEUE = "queue/testQueue";
088:            public String TEST_TOPIC = "topic/testTopic";
089:
090:            public Context context;
091:            public QueueConnectionFactory queueFactory;
092:            public TopicConnectionFactory topicFactory;
093:
094:            public MQBase(String name) {
095:                super (name);
096:            }
097:
098:            public long getRunSleep() {
099:                log.info("run.sleep: " + System.getProperty("run.sleep"));
100:                return 1000L * Integer
101:                        .getInteger("run.sleep", DEFAULT_RUNSLEEP).intValue();
102:            }
103:
104:            public void sleep(long sleep) {
105:                try {
106:                    Thread.sleep(sleep);
107:                } catch (InterruptedException e) {
108:                }
109:            }
110:
111:            public void drainTopic() throws JMSException {
112:                TopicWorker sub1 = new TopicWorker(GETTER, TRANS_NONE, null);
113:                sub1.connect();
114:                sub1.get();
115:                sub1.close();
116:            }
117:
118:            public void drainQueue() throws JMSException {
119:                QueueWorker sub1 = new QueueWorker(GETTER, TRANS_NONE, null);
120:                sub1.connect();
121:                sub1.get();
122:                sub1.close();
123:            }
124:
125:            /**
126:             * The JUnit setup method
127:             *
128:             * @exception Exception  Description of Exception
129:             */
130:            protected void setUp() throws Exception {
131:                // Reconfigure acording to props
132:                QUEUE_FACTORY = System.getProperty("jbosstest.queuefactory",
133:                        QUEUE_FACTORY);
134:                TOPIC_FACTORY = System.getProperty("jbosstest.topicfactory",
135:                        TOPIC_FACTORY);
136:                TEST_QUEUE = System.getProperty("jbosstest.queue", TEST_QUEUE);
137:                TEST_TOPIC = System.getProperty("jbosstest.topic", TEST_TOPIC);
138:
139:                if (context == null) {
140:
141:                    context = getInitialContext();
142:
143:                    queueFactory = (QueueConnectionFactory) context
144:                            .lookup(QUEUE_FACTORY);
145:                    topicFactory = (TopicConnectionFactory) context
146:                            .lookup(TOPIC_FACTORY);
147:
148:                    getLog().debug("Connection to JBossMQ established.");
149:                }
150:
151:            }
152:
153:            public static void main(String[] args) {
154:
155:            }
156:
157:            public abstract class JMSWorker implements  Runnable,
158:                    MessageListener, ExceptionListener {
159:
160:                protected boolean stopRequested = false;
161:                protected int messageHandled = 0;
162:                protected Exception runEx = null;
163:                protected MessageFilter filter;
164:                protected MessageCreator creator;
165:                protected int number = 1;
166:                protected int type = -1;
167:                protected int transacted;
168:                protected QosConfig qosConfig = new QosConfig();
169:                protected String userName;
170:                protected String password;
171:                protected String clientID;
172:
173:                // Generic ones, should be set by sublcasses
174:                public Connection connection;
175:                public Destination destination;
176:                public Session session;
177:                public MessageProducer producer;
178:                public MessageConsumer consumer;
179:
180:                /**
181:                 * Create one without any settings, use mutators instead. Makes it easier to owerride.
182:                 */
183:                public JMSWorker() {
184:                }
185:
186:                public JMSWorker(int type, int transacted, MessageFilter filter) {
187:                    this .type = type;
188:                    this .transacted = transacted;
189:                    this .filter = filter;
190:                }
191:
192:                public JMSWorker(int type, int transacted,
193:                        MessageCreator creator, int number) {
194:                    this .type = type;
195:                    this .transacted = transacted;
196:                    this .creator = creator;
197:                    this .number = number;
198:                }
199:
200:                public void setSubscriberAttrs(int type, int transacted,
201:                        MessageFilter filter) {
202:                    this .type = type;
203:                    this .transacted = transacted;
204:                    this .filter = filter;
205:                }
206:
207:                public void setPublisherAttrs(int type, int transacted,
208:                        MessageCreator creator, int number) {
209:                    this .type = type;
210:                    this .transacted = transacted;
211:                    this .creator = creator;
212:                    this .number = number;
213:                }
214:
215:                public void setUser(String userName, String password) {
216:                    this .userName = userName;
217:                    this .password = password;
218:                }
219:
220:                public void setClientID(String ID) {
221:                    this .clientID = ID;
222:                }
223:
224:                abstract public void publish() throws JMSException;
225:
226:                abstract public void publish(int nr) throws JMSException;
227:
228:                /**
229:                 * Subsribes, collects, checking any set filters. A messageComsumer must be created before calling this.
230:                 */
231:                public void subscribe() throws JMSException {
232:                    subscribe(false);
233:                }
234:
235:                /**
236:                 * Subsribes, collects, checking any set filters. A messageComsumer must be created before calling this. If arg set to true, do a failsafe sub
237:                 */
238:                public void subscribe(boolean failsafe) throws JMSException {
239:                    if (consumer == null)
240:                        throw new JMSException("No messageConsumer created");
241:
242:                    if (failsafe)
243:                        connection.setExceptionListener(this );
244:
245:                    consumer.setMessageListener(this );
246:
247:                }
248:
249:                public void get() throws JMSException {
250:                    Message msg = consumer.receive(2000);
251:                    while (msg != null) {
252:                        if (filter != null) {
253:                            if (filter.ok(msg))
254:                                messageHandled++;
255:                        } else {
256:                            messageHandled++;
257:                        }
258:                        msg = consumer.receive(2000);
259:                    }
260:                }
261:
262:                abstract public void connect() throws JMSException;
263:
264:                public void setQosConfig(QosConfig qosConfig) {
265:                    this .qosConfig = qosConfig;
266:                }
267:
268:                public void setStoped() throws JMSException {
269:                    stopRequested = true;
270:                }
271:
272:                public int getMessageHandled() {
273:                    return messageHandled;
274:                }
275:
276:                public Exception getException() {
277:                    return runEx;
278:                }
279:
280:                public void reset() {
281:                    messageHandled = 0;
282:                    stopRequested = false;
283:                    runEx = null;
284:                }
285:
286:                public void close() {
287:                    try {
288:                        if (consumer != null)
289:                            consumer.close();
290:                        if (producer != null)
291:                            producer.close();
292:                        if (session != null)
293:                            session.close();
294:                    } catch (JMSException ex) {
295:                    } finally {
296:                        if (connection != null) {
297:                            try {
298:                                connection.close();
299:                            } catch (JMSException ex) {
300:                            }
301:                        }
302:                    }
303:                }
304:
305:                public void onMessage(Message msg) {
306:                    try {
307:                        if (filter != null) {
308:                            if (filter.ok(msg))
309:                                messageHandled++;
310:                        } else {
311:                            messageHandled++;
312:                        }
313:                        if (session.getTransacted())
314:                            session.commit();
315:                    } catch (Exception ex) {
316:                        log.warn("Exception in on message: " + ex, ex);
317:                        runEx = ex;
318:                    }
319:                }
320:
321:                /**
322:                 * onException handling is only for subscriber. Will try to to
323:                 * a connect followed by a subscribe
324:                 */
325:                public void onException(JMSException ex) {
326:                    log.error("Ex in connection: " + ex);
327:
328:                    try {
329:                        connection.setExceptionListener(null);
330:                        close();
331:                    } catch (JMSException c) {
332:                    }
333:
334:                    // Try reconnect, loops until success or shut down
335:                    try {
336:                        boolean tryIt = true;
337:                        while (tryIt && !stopRequested) {
338:                            log.info("Trying reconnect...");
339:                            try {
340:                                Thread.sleep(10000);
341:                            } catch (InterruptedException ie) {
342:                            }
343:                            try {
344:                                connect();
345:                                subscribe(true);
346:                                tryIt = false;
347:                                log.info("Reconnect OK");
348:                                //return;
349:                            } catch (JMSException e) {
350:                                log.error("Error in reconnect: " + e);
351:                            }
352:                        }
353:
354:                    } catch (Exception je) {
355:                        log
356:                                .error("Strange error in failsafe handling"
357:                                        + je, je);
358:                    }
359:                }
360:
361:                public void run() {
362:                    try {
363:                        switch (type) {
364:                        case -1:
365:                            log.info("Nothing to do for type " + type);
366:                            break;
367:                        case PUBLISHER:
368:                            connect();
369:                            publish();
370:                            break;
371:                        case SUBSCRIBER:
372:                            connect();
373:                            subscribe();
374:                            break;
375:                        case GETTER:
376:                            connect();
377:                            get();
378:                            break;
379:                        case CONNECTOR:
380:                            connect();
381:                            break;
382:                        case FAILSAFE_SUBSCRIBER:
383:                            connect();
384:                            subscribe(true);
385:                            break;
386:                        }
387:
388:                        //if the method does not hold an own thread, we do it here
389:                        while (!stopRequested) {
390:                            try {
391:                                Thread.sleep(1000);
392:                            } catch (InterruptedException ex) {
393:
394:                            }
395:                        }
396:                    } catch (JMSException ex) {
397:                        runEx = ex;
398:                        log.error("Could not run: " + ex, ex);
399:                    }
400:                }
401:            }
402:
403:            public interface MessageCreator {
404:                public void setSession(Session session);
405:
406:                public Message createMessage(int nr) throws JMSException;
407:            }
408:
409:            public abstract class BaseMessageCreator implements  MessageCreator {
410:                protected Session session;
411:                protected String property;
412:
413:                public BaseMessageCreator(String property) {
414:                    this .property = property;
415:                }
416:
417:                public void setSession(Session session) {
418:                    this .session = session;
419:                }
420:
421:                abstract public Message createMessage(int nr)
422:                        throws JMSException;
423:            }
424:
425:            public class IntRangeMessageCreator extends BaseMessageCreator {
426:                int start = 0;
427:
428:                public IntRangeMessageCreator(String property) {
429:                    super (property);
430:                }
431:
432:                public IntRangeMessageCreator(String property, int start) {
433:                    super (property);
434:                    this .start = start;
435:                }
436:
437:                public Message createMessage(int nr) throws JMSException {
438:                    if (session == null)
439:                        throw new JMSException("Session not allowed to be null");
440:
441:                    Message msg = session.createMessage();
442:                    msg.setStringProperty(property, String.valueOf(start + nr));
443:                    return msg;
444:                }
445:            }
446:
447:            public interface MessageFilter {
448:                public boolean ok(Message msg) throws JMSException;
449:            }
450:
451:            public class IntRangeMessageFilter implements  MessageFilter {
452:                Class messageClass;
453:                String className;
454:                String property;
455:                int low;
456:                int max;
457:                int counter = 0;
458:                int report = 1000;
459:
460:                public IntRangeMessageFilter(Class messageClass,
461:                        String property, int low, int max) {
462:                    this .messageClass = messageClass;
463:                    this .property = property;
464:                    className = messageClass.getName();
465:                    this .low = low;
466:                    this .max = max;
467:                }
468:
469:                private boolean validateClass(Message msg) {
470:                    Class clazz = null;
471:                    if (msg instanceof  javax.jms.TextMessage)
472:                        clazz = javax.jms.TextMessage.class;
473:                    else if (msg instanceof  javax.jms.BytesMessage)
474:                        clazz = javax.jms.BytesMessage.class;
475:                    else if (msg instanceof  javax.jms.MapMessage)
476:                        clazz = javax.jms.MapMessage.class;
477:                    else if (msg instanceof  javax.jms.ObjectMessage)
478:                        clazz = javax.jms.ObjectMessage.class;
479:                    else if (msg instanceof  javax.jms.StreamMessage)
480:                        clazz = javax.jms.StreamMessage.class;
481:                    else
482:                        clazz = javax.jms.Message.class;
483:
484:                    return clazz.equals(messageClass);
485:                }
486:
487:                public boolean ok(Message msg) throws JMSException {
488:                    boolean res = false;
489:                    if (validateClass(msg)) {
490:                        if (msg.propertyExists(property)) {
491:                            String p = msg.getStringProperty(property);
492:                            try {
493:                                int i = Integer.parseInt(p);
494:                                //log.debug("Received message " + property +"=" +i);
495:                                if (i >= low && i < max)
496:                                    res = true;
497:                            } catch (NumberFormatException ex) {
498:                                throw new JMSException("Property " + property
499:                                        + " was not int: " + p);
500:                            }
501:                        }
502:                    }
503:                    counter++;
504:                    int mod = counter % report;
505:                    if (mod == 0)
506:                        log.debug("Have received " + counter + " messages");
507:                    return res;
508:                }
509:
510:            }
511:
512:            /*  
513:            public class REMessageFilter implements MessageFilter {
514:               Class messageClass;
515:               String className;
516:               String property;
517:               RE re = null;
518:               public REMessageFilter(Class messageClass, String property, String regexp) throws REException{
519:                  this.messageClass = messageClass;
520:                  this.property = property;
521:                  re = new RE(regexp);
522:                  className = messageClass.getName();
523:               }
524:               
525:               public boolean ok(Message msg) throws JMSException{
526:                  boolean res = false;
527:                  if (className.equals(msg.getClass().getName())) {
528:                     if (msg.propertyExists(property)) {
529:                        String p = msg.getStringProperty(property);
530:                        if (re.getMatch(p)!=null)
531:                           res = true;
532:                     } 
533:                  }
534:                  return true;
535:               }
536:            }
537:             */
538:            /**
539:             * Defines quality of service for message publishing. Defaults are the same
540:             * ase defined in SpyMessage.
541:             */
542:            public class QosConfig {
543:                int deliveryMode = DeliveryMode.PERSISTENT;
544:                int priority = 4;
545:                long ttl = 0;
546:            }
547:
548:            public class TopicWorker extends JMSWorker {
549:                String durableHandle;
550:
551:                /**
552:                 * If using this, use mutators to add attrs.
553:                 */
554:                public TopicWorker() {
555:                    super ();
556:                }
557:
558:                public TopicWorker(int type, int transacted,
559:                        MessageFilter filter) {
560:                    super (type, transacted, filter);
561:                }
562:
563:                public TopicWorker(int type, int transacted,
564:                        MessageCreator creator, int number) {
565:                    super (type, transacted, creator, number);
566:                }
567:
568:                public void publish() throws JMSException {
569:                    publish(number);
570:                }
571:
572:                public void publish(int nr) throws JMSException {
573:                    if (producer == null)
574:                        producer = ((TopicSession) session)
575:                                .createPublisher((Topic) destination);
576:                    if (creator == null)
577:                        throw new JMSException(
578:                                "Publish must have a MessageCreator set");
579:
580:                    creator.setSession(session);
581:                    log.debug("Publishing " + nr + " messages");
582:                    for (int i = 0; i < nr; i++) {
583:                        if (qosConfig != null) {
584:                            ((TopicPublisher) producer).publish(creator
585:                                    .createMessage(i), qosConfig.deliveryMode,
586:                                    qosConfig.priority, qosConfig.ttl);
587:                        } else {
588:                            ((TopicPublisher) producer).publish(creator
589:                                    .createMessage(i));
590:                        }
591:
592:                        messageHandled++;
593:                    }
594:                    if (session.getTransacted())
595:                        session.commit();
596:                    log.debug("Finished publishing");
597:                }
598:
599:                public void subscribe() throws JMSException {
600:                    subscribe(false);
601:                }
602:
603:                public void subscribe(boolean failsafe) throws JMSException {
604:                    if (durableHandle != null)
605:                        consumer = ((TopicSession) session)
606:                                .createDurableSubscriber((Topic) destination,
607:                                        durableHandle);
608:                    else
609:                        consumer = ((TopicSession) session)
610:                                .createSubscriber((Topic) destination);
611:                    super .subscribe(failsafe);
612:                    connection.start();
613:                }
614:
615:                public void get() throws JMSException {
616:                    consumer = ((TopicSession) session)
617:                            .createSubscriber((Topic) destination);
618:                    super .subscribe();
619:                    connection.start();
620:                }
621:
622:                public void connect() throws JMSException {
623:                    log.debug("Connecting: " + this .toString());
624:                    if (userName != null)
625:                        connection = topicFactory.createTopicConnection(
626:                                userName, password);
627:                    else
628:                        connection = topicFactory.createTopicConnection();
629:
630:                    if (clientID != null) {
631:                        log.debug("Setting clientID" + clientID);
632:                        connection.setClientID(clientID);
633:                    }
634:
635:                    session = ((TopicConnection) connection)
636:                            .createTopicSession(transacted != TRANS_NONE,
637:                                    Session.AUTO_ACKNOWLEDGE);
638:                    try {
639:                        destination = (Destination) context.lookup(TEST_TOPIC);
640:                    } catch (NamingException ex) {
641:                        throw new JMSException("Could not lookup topic " + ex);
642:                    }
643:                }
644:
645:                // Topic specific stuff
646:                public void setDurable(String userId, String pwd, String handle) {
647:                    this .userName = userId;
648:                    this .password = pwd;
649:                    this .durableHandle = handle;
650:                }
651:
652:                public void setDurable(String handle) {
653:                    this .durableHandle = handle;
654:                }
655:
656:                public void unsubscribe() throws JMSException {
657:                    if (durableHandle != null)
658:                        ((TopicSession) session).unsubscribe(durableHandle);
659:                }
660:
661:                public String toString() {
662:                    return "(userId=" + userName + " pwd=" + password
663:                            + " handle=" + durableHandle + ")";
664:                }
665:
666:            }
667:
668:            public class QueueWorker extends JMSWorker {
669:                String userId;
670:                String pwd;
671:                String handle;
672:
673:                /**
674:                 * If using this, use mutators to add attrs.
675:                 */
676:                public QueueWorker() {
677:                    super ();
678:                }
679:
680:                public QueueWorker(int type, int transacted,
681:                        MessageFilter filter) {
682:                    super (type, transacted, filter);
683:                }
684:
685:                public QueueWorker(int type, int transacted,
686:                        MessageCreator creator, int number) {
687:                    super (type, transacted, creator, number);
688:                }
689:
690:                public void publish() throws JMSException {
691:                    publish(number);
692:                }
693:
694:                public void publish(int nr) throws JMSException {
695:                    if (producer == null)
696:                        producer = ((QueueSession) session)
697:                                .createSender((Queue) destination);
698:                    if (creator == null)
699:                        throw new JMSException(
700:                                "Publish must have a MessageCreator set");
701:
702:                    creator.setSession(session);
703:                    log.debug("Publishing " + nr + " messages");
704:                    for (int i = 0; i < nr; i++) {
705:                        if (qosConfig != null) {
706:                            ((QueueSender) producer).send(creator
707:                                    .createMessage(i), qosConfig.deliveryMode,
708:                                    qosConfig.priority, qosConfig.ttl);
709:                        } else {
710:                            ((QueueSender) producer).send(creator
711:                                    .createMessage(i));
712:                        }
713:
714:                        messageHandled++;
715:                    }
716:                    if (session.getTransacted())
717:                        session.commit();
718:                    log.debug("Finished publishing");
719:                }
720:
721:                public void subscribe() throws JMSException {
722:                    subscribe(false);
723:                }
724:
725:                public void subscribe(boolean failsafe) throws JMSException {
726:
727:                    consumer = ((QueueSession) session)
728:                            .createReceiver((Queue) destination);
729:                    super .subscribe(failsafe);
730:                    connection.start();
731:                }
732:
733:                public void get() throws JMSException {
734:                    consumer = ((QueueSession) session)
735:                            .createReceiver((Queue) destination);
736:                    super .subscribe();
737:                    connection.start();
738:                }
739:
740:                public void connect() throws JMSException {
741:                    log.debug("Connecting: " + this .toString());
742:                    if (userName != null)
743:                        connection = queueFactory.createQueueConnection(
744:                                userName, password);
745:                    else
746:                        connection = queueFactory.createQueueConnection();
747:
748:                    if (clientID != null)
749:                        connection.setClientID(clientID);
750:
751:                    session = ((QueueConnection) connection)
752:                            .createQueueSession(transacted != TRANS_NONE,
753:                                    Session.AUTO_ACKNOWLEDGE);
754:                    try {
755:                        destination = (Destination) context.lookup(TEST_QUEUE);
756:                    } catch (NamingException ex) {
757:                        throw new JMSException("Could not lookup topic " + ex);
758:                    }
759:                }
760:
761:                // Queue specific
762:                public Enumeration browse() throws JMSException {
763:                    QueueBrowser b = ((QueueSession) session)
764:                            .createBrowser((Queue) destination);
765:                    return b.getEnumeration();
766:                }
767:            }
768:        } // MQBase
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.