001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.bpmscript.jbi.tasklist;
018:
019: import java.util.Enumeration;
020: import java.util.Properties;
021:
022: import javax.jms.Connection;
023: import javax.jms.ConnectionFactory;
024: import javax.jms.JMSException;
025: import javax.jms.Message;
026: import javax.jms.MessageConsumer;
027: import javax.jms.MessageProducer;
028: import javax.jms.ObjectMessage;
029: import javax.jms.Queue;
030: import javax.jms.QueueBrowser;
031: import javax.jms.Session;
032:
033: import junit.framework.TestCase;
034:
035: import org.apache.activemq.ActiveMQConnectionFactory;
036: import org.apache.activemq.xbean.BrokerFactoryBean;
037: import org.apache.log4j.PropertyConfigurator;
038: import org.springframework.core.io.ClassPathResource;
039:
040: public class JmsTest extends TestCase {
041:
042: protected final transient org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
043: .getLog(getClass());
044:
045: protected void setUp() throws Exception {
046: super .setUp();
047: }
048:
049: protected void tearDown() throws Exception {
050: super .tearDown();
051: }
052:
053: protected Message getMessage(Session session, Queue queue,
054: String selector) throws JMSException {
055: log.info("browsing for message");
056: QueueBrowser browser = session.createBrowser(queue,
057: "id = 'something'");
058: Enumeration enumeration = browser.getEnumeration();
059: if (enumeration.hasMoreElements()) {
060: return (Message) enumeration.nextElement();
061: } else {
062: return null;
063: }
064: }
065:
066: public void testSomething() throws Exception {
067: Properties properties = new Properties();
068: properties.put("log4j.rootCategory", "INFO, stdout");
069: properties.put("log4j.appender.stdout",
070: "org.apache.log4j.ConsoleAppender");
071: properties.put("log4j.appender.stdout.layout",
072: "org.apache.log4j.PatternLayout");
073: properties.put(
074: "log4j.appender.stdout.layout.ConversionPattern",
075: "%p %c{1}.%M(%L) | [%t] %m%n");
076: PropertyConfigurator.configure(properties);
077: BrokerFactoryBean bean = new BrokerFactoryBean();
078: bean.setConfig(new ClassPathResource("inmemoryactivemq.xml"));
079: bean.afterPropertiesSet();
080:
081: // final ConnectionFactory connectionFactory = new PooledConnectionFactory(new ActiveMQConnectionFactory());
082: final ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
083: Connection connection = connectionFactory.createConnection();
084: Session session = connection.createSession(false,
085: Session.AUTO_ACKNOWLEDGE);
086:
087: final Queue queue = session.createQueue(this .getClass()
088: .getName());
089:
090: MessageProducer producer = session.createProducer(queue);
091: ObjectMessage message = session.createObjectMessage("One");
092: message.setStringProperty("id", "something");
093: producer.send(message);
094: log.info("sent first message");
095:
096: Thread threadOne = new Thread(new Runnable() {
097:
098: public void run() {
099: try {
100: Connection connectionOne = connectionFactory
101: .createConnection();
102: connectionOne.start();
103: log.info("started sessionOne transaction");
104: Session sessionOne = connectionOne.createSession(
105: true, Session.AUTO_ACKNOWLEDGE);
106: assertNotNull(getMessage(sessionOne, queue,
107: "id = 'something'"));
108: // Thread.sleep(500);
109: MessageConsumer consumerOne = sessionOne
110: .createConsumer(queue, "id = 'something'");
111: log.info("receiving message");
112: ObjectMessage messageOne = (ObjectMessage) consumerOne
113: .receive();
114: log.info("consumed first message "
115: + messageOne.getObject());
116: MessageProducer producer = sessionOne
117: .createProducer(queue);
118: ObjectMessage message = sessionOne
119: .createObjectMessage("Two");
120: message.setStringProperty("id", "something");
121: Thread.sleep(2000);
122: producer.send(message);
123: log.info("sent second message");
124: sessionOne.commit();
125: log.info("committed sessionOne transaction");
126: connectionOne.close();
127: } catch (Exception e) {
128: e.printStackTrace();
129: }
130: }
131:
132: });
133:
134: Thread threadTwo = new Thread(new Runnable() {
135:
136: public void run() {
137: try {
138: Connection connectionTwo = connectionFactory
139: .createConnection();
140: connectionTwo.start();
141: Thread.sleep(1000);
142: log.info("started sessionTwo transaction");
143: Session sessionTwo = connectionTwo.createSession(
144: true, Session.AUTO_ACKNOWLEDGE);
145: assertNotNull(getMessage(sessionTwo, queue,
146: "id = 'something'"));
147: Thread.sleep(100);
148: MessageConsumer consumerTwo = sessionTwo
149: .createConsumer(queue, "id = 'something'");
150: log.info("receiving message");
151: ObjectMessage messageTwo = (ObjectMessage) consumerTwo
152: .receive();
153: log.info("consumed second message "
154: + messageTwo.getObject());
155: MessageProducer producer = sessionTwo
156: .createProducer(queue);
157: ObjectMessage message = sessionTwo
158: .createObjectMessage("Three");
159: message.setStringProperty("id", "something");
160: producer.send(message);
161: log.info("sent third message");
162: sessionTwo.commit();
163: log.info("committed sessionTwo transaction");
164: connectionTwo.close();
165: } catch (Exception e) {
166: e.printStackTrace();
167: }
168: }
169:
170: });
171:
172: threadOne.setName("1");
173: threadTwo.setName("2");
174:
175: threadOne.start();
176: threadTwo.start();
177:
178: Thread.sleep(10000);
179:
180: connection.stop();
181: connection.close();
182:
183: bean.destroy();
184: }
185:
186: }
|