001: /**
002: *
003: * Licensed to the Apache Software Foundation (ASF) under one or more
004: * contributor license agreements. See the NOTICE file distributed with
005: * this work for additional information regarding copyright ownership.
006: * The ASF licenses this file to You under the Apache License, Version 2.0
007: * (the "License"); you may not use this file except in compliance with
008: * the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing, software
013: * distributed under the License is distributed on an "AS IS" BASIS,
014: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015: * See the License for the specific language governing permissions and
016: * limitations under the License.
017: */package org.apache.openejb.core.mdb;
018:
019: import java.io.Serializable;
020: import java.util.Map;
021: import java.util.TreeMap;
022: import java.util.concurrent.Executor;
023: import java.util.concurrent.Executors;
024: import javax.jms.Connection;
025: import javax.jms.ConnectionFactory;
026: import javax.jms.Destination;
027: import javax.jms.JMSException;
028: import javax.jms.Message;
029: import javax.jms.MessageConsumer;
030: import javax.jms.MessageListener;
031: import javax.jms.MessageProducer;
032: import javax.jms.ObjectMessage;
033: import javax.jms.Session;
034: import javax.resource.spi.BootstrapContext;
035: import javax.resource.spi.ResourceAdapterInternalException;
036: import javax.resource.spi.work.WorkManager;
037:
038: import junit.framework.TestCase;
039: import org.apache.activemq.ActiveMQConnectionFactory;
040: import org.apache.geronimo.connector.GeronimoBootstrapContext;
041: import org.apache.geronimo.connector.work.GeronimoWorkManager;
042: import org.apache.geronimo.transaction.manager.GeronimoTransactionManager;
043: import org.apache.openejb.OpenEJBException;
044: import org.apache.openejb.resource.activemq.ActiveMQResourceAdapter;
045:
046: public class JmsTest extends TestCase {
047: private ConnectionFactory connectionFactory;
048: private static final String REQUEST_QUEUE_NAME = "request";
049: private ActiveMQResourceAdapter ra;
050:
051: protected void setUp() throws Exception {
052: super .setUp();
053:
054: // create a transaction manager
055: GeronimoTransactionManager transactionManager = new GeronimoTransactionManager();
056:
057: // create the ActiveMQ resource adapter instance
058: ra = new ActiveMQResourceAdapter();
059:
060: // initialize properties
061: ra.setServerUrl("tcp://localhost:61616");
062: ra
063: .setBrokerXmlConfig("broker:(tcp://localhost:61616)?useJmx=false");
064:
065: // create a thead pool for ActiveMQ
066: Executor threadPool = Executors.newFixedThreadPool(30);
067:
068: // create a work manager which ActiveMQ uses to dispatch message delivery jobs
069: WorkManager workManager = new GeronimoWorkManager(threadPool,
070: threadPool, threadPool, transactionManager);
071:
072: // wrap the work mananger and transaction manager in a bootstrap context (connector spec thing)
073: BootstrapContext bootstrapContext = new GeronimoBootstrapContext(
074: workManager, transactionManager);
075:
076: // start the resource adapter
077: try {
078: ra.start(bootstrapContext);
079: } catch (ResourceAdapterInternalException e) {
080: throw new OpenEJBException(e);
081: }
082: // Create a ConnectionFactory
083: connectionFactory = new ActiveMQConnectionFactory(
084: "tcp://localhost:61616");
085: }
086:
087: protected void tearDown() throws Exception {
088: connectionFactory = null;
089: if (ra != null) {
090: ra.stop();
091: ra = null;
092: }
093: super .tearDown();
094: }
095:
096: public void testProxy() throws Exception {
097: // Create a Session
098: Connection connection = connectionFactory.createConnection();
099: try {
100: connection.start();
101:
102: Destination requestQueue = createListener(connection);
103:
104: createSender(connection, requestQueue);
105: } finally {
106: MdbUtil.close(connection);
107: }
108: }
109:
110: @SuppressWarnings("unchecked")
111: private void createSender(Connection connection,
112: Destination requestQueue) throws JMSException {
113: Session session = null;
114: MessageProducer producer = null;
115: MessageConsumer consumer = null;
116: try {
117: // create request
118: Map<String, Object> request = new TreeMap<String, Object>();
119: request.put("args", new Object[] { "cheese" });
120:
121: // create a new temp response queue
122: session = connection.createSession(false,
123: Session.AUTO_ACKNOWLEDGE);
124: Destination responseQueue = session.createTemporaryQueue();
125:
126: // Create a request messages
127: ObjectMessage requestMessage = session
128: .createObjectMessage();
129: requestMessage.setJMSReplyTo(responseQueue);
130: requestMessage.setObject((Serializable) request);
131:
132: // Send the request message
133: producer = session.createProducer(requestQueue);
134: producer.send(requestMessage);
135:
136: // wait for the response message
137: consumer = session.createConsumer(responseQueue);
138: Message message = consumer.receive(1000);
139:
140: // verify message
141: assertNotNull("Did not get a response message", message);
142: assertTrue("Response message is not an ObjectMessage",
143: message instanceof ObjectMessage);
144: ObjectMessage responseMessage = (ObjectMessage) message;
145: Serializable object = responseMessage.getObject();
146: assertNotNull("Response ObjectMessage contains a null object");
147: assertTrue(
148: "Response ObjectMessage does not contain an instance of Map",
149: object instanceof Map);
150: Map<String, String> response = (Map<String, String>) object;
151:
152: // process results
153: String returnValue = (String) response.get("return");
154: assertEquals("test-cheese", returnValue);
155: } finally {
156: MdbUtil.close(consumer);
157: MdbUtil.close(producer);
158: MdbUtil.close(session);
159: }
160: }
161:
162: private Destination createListener(Connection connection)
163: throws JMSException {
164: final Session session = connection.createSession(false,
165: Session.AUTO_ACKNOWLEDGE);
166:
167: // Create the request Queue
168: Destination requestQueue = session
169: .createQueue(REQUEST_QUEUE_NAME);
170: MessageConsumer consumer = session.createConsumer(requestQueue);
171: consumer.setMessageListener(new MessageListener() {
172: @SuppressWarnings("unchecked")
173: public void onMessage(Message message) {
174: // if we got a dummy (non ObjectMessage) return
175: if (!(message instanceof ObjectMessage))
176: return;
177:
178: MessageProducer producer = null;
179: try {
180: // process request
181: ObjectMessage requestMessage = (ObjectMessage) message;
182: Map<String, Object[]> request = (Map<String, Object[]>) requestMessage
183: .getObject();
184: Object[] args = (Object[]) request.get("args");
185: String returnValue = "test-" + args[0];
186:
187: // create response map
188: Map<String, Object> response = new TreeMap<String, Object>();
189: response.put("return", returnValue);
190:
191: // create response message
192: ObjectMessage responseMessage = session
193: .createObjectMessage();
194: responseMessage.setJMSCorrelationID(requestMessage
195: .getJMSCorrelationID());
196: responseMessage.setObject((Serializable) response);
197:
198: // send response message
199: producer = session.createProducer(requestMessage
200: .getJMSReplyTo());
201: producer.send(responseMessage);
202:
203: } catch (Throwable e) {
204: e.printStackTrace();
205: } finally {
206: MdbUtil.close(producer);
207: }
208: }
209: });
210: return requestQueue;
211: }
212: }
|