001: /**
002: *
003: * Licensed to the Apache Software Foundation (ASF) under one or more
004: * contributor license agreements. See the NOTICE file distributed with
005: * this work for additional information regarding copyright ownership.
006: * The ASF licenses this file to You under the Apache License, Version 2.0
007: * (the "License"); you may not use this file except in compliance with
008: * the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing, software
013: * distributed under the License is distributed on an "AS IS" BASIS,
014: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015: * See the License for the specific language governing permissions and
016: * limitations under the License.
017: */package org.apache.openejb.core.mdb;
018:
019: import java.io.Serializable;
020: import java.lang.reflect.Method;
021: import java.util.Map;
022: import java.util.TreeMap;
023: import java.util.concurrent.Executor;
024: import java.util.concurrent.Executors;
025: import javax.jms.Connection;
026: import javax.jms.ConnectionFactory;
027: import javax.jms.Destination;
028: import javax.jms.JMSException;
029: import javax.jms.Message;
030: import javax.jms.MessageConsumer;
031: import javax.jms.MessageListener;
032: import javax.jms.MessageProducer;
033: import javax.jms.ObjectMessage;
034: import javax.jms.Session;
035: import javax.resource.ResourceException;
036: import javax.resource.spi.BootstrapContext;
037: import javax.resource.spi.ResourceAdapterInternalException;
038: import javax.resource.spi.UnavailableException;
039: import javax.resource.spi.endpoint.MessageEndpoint;
040: import javax.resource.spi.endpoint.MessageEndpointFactory;
041: import javax.resource.spi.work.WorkManager;
042: import javax.transaction.xa.XAResource;
043:
044: import junit.framework.TestCase;
045: import org.apache.activemq.ActiveMQConnectionFactory;
046: import org.apache.activemq.ra.ActiveMQActivationSpec;
047: import org.apache.geronimo.connector.GeronimoBootstrapContext;
048: import org.apache.geronimo.connector.work.GeronimoWorkManager;
049: import org.apache.geronimo.transaction.manager.GeronimoTransactionManager;
050: import org.apache.openejb.OpenEJBException;
051: import org.apache.openejb.resource.activemq.ActiveMQResourceAdapter;
052:
053: public class MdbTest extends TestCase {
054: private static final String REQUEST_QUEUE_NAME = "request";
055: private ConnectionFactory connectionFactory;
056: private ActiveMQResourceAdapter ra;
057:
058: protected void setUp() throws Exception {
059: super .setUp();
060:
061: // create a transaction manager
062: GeronimoTransactionManager transactionManager = new GeronimoTransactionManager();
063:
064: // create the ActiveMQ resource adapter instance
065: ra = new ActiveMQResourceAdapter();
066:
067: // initialize properties
068: ra.setServerUrl("tcp://localhost:61616");
069: ra
070: .setBrokerXmlConfig("broker:(tcp://localhost:61616)?useJmx=false");
071:
072: // create a thead pool for ActiveMQ
073: Executor threadPool = Executors.newFixedThreadPool(30);
074:
075: // create a work manager which ActiveMQ uses to dispatch message delivery jobs
076: WorkManager workManager = new GeronimoWorkManager(threadPool,
077: threadPool, threadPool, transactionManager);
078:
079: // wrap the work mananger and transaction manager in a bootstrap context (connector spec thing)
080: BootstrapContext bootstrapContext = new GeronimoBootstrapContext(
081: workManager, transactionManager);
082:
083: // start the resource adapter
084: try {
085: ra.start(bootstrapContext);
086: } catch (ResourceAdapterInternalException e) {
087: throw new OpenEJBException(e);
088: }
089: // Create a ConnectionFactory
090: connectionFactory = new ActiveMQConnectionFactory(
091: "tcp://localhost:61616");
092: }
093:
094: protected void tearDown() throws Exception {
095: connectionFactory = null;
096: if (ra != null) {
097: ra.stop();
098: ra = null;
099: }
100: super .tearDown();
101: }
102:
103: public void testProxy() throws Exception {
104: createListener();
105: createSender();
106: }
107:
108: private void createSender() throws JMSException {
109: Connection connection = null;
110: Session session = null;
111: MessageProducer producer = null;
112: MessageConsumer consumer = null;
113: try {
114: connection = connectionFactory.createConnection();
115: connection.start();
116:
117: // create request
118: Map<String, Object> request = new TreeMap<String, Object>();
119: request.put("args", new Object[] { "cheese" });
120:
121: // create a new temp response queue
122: session = connection.createSession(false,
123: Session.AUTO_ACKNOWLEDGE);
124: Destination responseQueue = session.createTemporaryQueue();
125:
126: // Create a request messages
127: ObjectMessage requestMessage = session
128: .createObjectMessage();
129: requestMessage.setJMSReplyTo(responseQueue);
130: requestMessage.setObject((Serializable) request);
131:
132: // Send the request message
133: producer = session.createProducer(session
134: .createQueue(REQUEST_QUEUE_NAME));
135: producer.send(requestMessage);
136:
137: // wait for the response message
138: consumer = session.createConsumer(responseQueue);
139: Message message = consumer.receive(1000);
140:
141: // verify message
142: assertNotNull("Did not get a response message", message);
143: assertTrue("Response message is not an ObjectMessage",
144: message instanceof ObjectMessage);
145: ObjectMessage responseMessage = (ObjectMessage) message;
146: Serializable object = responseMessage.getObject();
147: assertNotNull("Response ObjectMessage contains a null object");
148: assertTrue(
149: "Response ObjectMessage does not contain an instance of Map",
150: object instanceof Map);
151: Map response = (Map) object;
152:
153: // process results
154: String returnValue = (String) response.get("return");
155: assertEquals("test-cheese", returnValue);
156: } finally {
157: MdbUtil.close(consumer);
158: MdbUtil.close(producer);
159: MdbUtil.close(session);
160: MdbUtil.close(connection);
161: }
162: }
163:
164: private void createListener() throws Exception {
165: // create the activation spec
166: ActiveMQActivationSpec activationSpec = new ActiveMQActivationSpec();
167: activationSpec.setDestinationType("javax.jms.Queue");
168: activationSpec.setDestination(REQUEST_QUEUE_NAME);
169:
170: // validate the activation spec
171: activationSpec.validate();
172:
173: // set the resource adapter into the activation spec
174: activationSpec.setResourceAdapter(ra);
175:
176: // create the message endpoint
177: MessageEndpointFactory endpointFactory = new JmsEndpointFactory();
178:
179: // activate the endpoint
180: ra.endpointActivation(endpointFactory, activationSpec);
181: }
182:
183: public static class JmsEndpointFactory implements
184: MessageEndpointFactory {
185: private final ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
186: "tcp://localhost:61616");
187:
188: public MessageEndpoint createEndpoint(XAResource xaResource)
189: throws UnavailableException {
190: try {
191: return new JmsEndpoint(connectionFactory);
192: } catch (JMSException e) {
193: e.printStackTrace();
194: throw new UnavailableException(e);
195: }
196: }
197:
198: public boolean isDeliveryTransacted(Method method)
199: throws NoSuchMethodException {
200: return false;
201: }
202: }
203:
204: public static class JmsEndpoint implements MessageEndpoint,
205: MessageListener {
206: private final Session session;
207:
208: public JmsEndpoint(ConnectionFactory connectionFactory)
209: throws JMSException {
210: Connection connection = connectionFactory
211: .createConnection();
212: connection.start();
213:
214: session = connection.createSession(false,
215: Session.AUTO_ACKNOWLEDGE);
216: }
217:
218: public void onMessage(Message message) {
219: // if we got a dummy (non ObjectMessage) return
220: if (!(message instanceof ObjectMessage))
221: return;
222:
223: MessageProducer producer = null;
224: try {
225: // process request
226: ObjectMessage requestMessage = (ObjectMessage) message;
227: Map request = (Map) requestMessage.getObject();
228: Object[] args = (Object[]) request.get("args");
229: String returnValue = "test-" + args[0];
230:
231: // create response map
232: Map<String, Object> response = new TreeMap<String, Object>();
233: response.put("return", returnValue);
234:
235: // create response message
236: ObjectMessage responseMessage = session
237: .createObjectMessage();
238: responseMessage.setJMSCorrelationID(requestMessage
239: .getJMSCorrelationID());
240: responseMessage.setObject((Serializable) response);
241:
242: // send response message
243: producer = session.createProducer(requestMessage
244: .getJMSReplyTo());
245: producer.send(responseMessage);
246:
247: } catch (Throwable e) {
248: e.printStackTrace();
249: } finally {
250: MdbUtil.close(producer);
251: }
252: }
253:
254: public void beforeDelivery(Method method)
255: throws NoSuchMethodException, ResourceException {
256: }
257:
258: public void afterDelivery() throws ResourceException {
259: }
260:
261: public void release() {
262: }
263: }
264: }
|