001: /**
002: *
003: * Licensed to the Apache Software Foundation (ASF) under one or more
004: * contributor license agreements. See the NOTICE file distributed with
005: * this work for additional information regarding copyright ownership.
006: * The ASF licenses this file to You under the Apache License, Version 2.0
007: * (the "License"); you may not use this file except in compliance with
008: * the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing, software
013: * distributed under the License is distributed on an "AS IS" BASIS,
014: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015: * See the License for the specific language governing permissions and
016: * limitations under the License.
017: */package org.apache.openejb.test.mdb;
018:
019: import javax.jms.ConnectionFactory;
020: import javax.jms.JMSException;
021: import javax.jms.Connection;
022: import javax.jms.Session;
023: import javax.jms.MessageProducer;
024: import javax.jms.Destination;
025: import javax.jms.DeliveryMode;
026: import javax.jms.ObjectMessage;
027: import javax.jms.MessageConsumer;
028: import javax.jms.Message;
029: import java.lang.reflect.InvocationHandler;
030: import java.lang.reflect.Proxy;
031: import java.lang.reflect.Method;
032: import java.util.Map;
033: import java.util.TreeMap;
034: import java.util.UUID;
035: import java.io.Serializable;
036:
037: public class MdbProxy {
038: @SuppressWarnings({"unchecked"})
039: public static <T> T newProxyInstance(Class<T> type,
040: ConnectionFactory connectionFactory, String requestQueueName)
041: throws JMSException {
042: ClassLoader classLoader = Thread.currentThread()
043: .getContextClassLoader();
044: if (classLoader == null)
045: classLoader = type.getClassLoader();
046: if (classLoader == null)
047: classLoader = ClassLoader.getSystemClassLoader();
048:
049: InvocationHandler invocationHandler = new MdbInvocationHandler(
050: connectionFactory, requestQueueName);
051: Object proxy = Proxy.newProxyInstance(classLoader,
052: new Class[] { type }, invocationHandler);
053: return (T) proxy;
054: }
055:
056: @SuppressWarnings({"unchecked"})
057: public static <T> T newProxyInstance(Class<T> type,
058: ConnectionFactory connectionFactory,
059: Destination requestQueue) throws JMSException {
060: ClassLoader classLoader = Thread.currentThread()
061: .getContextClassLoader();
062: if (classLoader == null)
063: classLoader = type.getClassLoader();
064: if (classLoader == null)
065: classLoader = ClassLoader.getSystemClassLoader();
066:
067: InvocationHandler invocationHandler = new MdbInvocationHandler(
068: connectionFactory, requestQueue);
069: Object proxy = Proxy.newProxyInstance(classLoader,
070: new Class[] { type }, invocationHandler);
071: return (T) proxy;
072: }
073:
074: public static void destroyProxy(Object proxy) {
075: InvocationHandler handler = Proxy.getInvocationHandler(proxy);
076: if (handler instanceof MdbProxy) {
077: MdbInvocationHandler mdbInvocationHandler = (MdbInvocationHandler) handler;
078: mdbInvocationHandler.destroy();
079: }
080: }
081:
082: private static class MdbInvocationHandler implements
083: InvocationHandler {
084: private static final int MAX_RESPONSE_WAIT = 1000;
085: private Connection connection;
086: private Session session;
087: private MessageProducer producer;
088: private Destination requestQueue;
089:
090: public MdbInvocationHandler(
091: ConnectionFactory connectionFactory,
092: String requestQueueName) throws JMSException {
093: // open a connection
094: connection = connectionFactory.createConnection();
095: connection.start();
096:
097: // create a session
098: session = connection.createSession(false,
099: Session.AUTO_ACKNOWLEDGE);
100:
101: // create the request queue
102: requestQueue = session.createQueue(requestQueueName);
103:
104: // create a producer which is used to send requests
105: producer = session.createProducer(requestQueue);
106: producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
107: }
108:
109: public MdbInvocationHandler(
110: ConnectionFactory connectionFactory,
111: Destination requestQueue) throws JMSException {
112: this .requestQueue = requestQueue;
113:
114: // open a connection
115: connection = connectionFactory.createConnection();
116: connection.start();
117:
118: // create a session
119: session = connection.createSession(false,
120: Session.AUTO_ACKNOWLEDGE);
121:
122: // create a producer which is used to send requests
123: producer = session.createProducer(requestQueue);
124: producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
125: }
126:
127: public synchronized void destroy() {
128: MdbUtil.close(producer);
129: producer = null;
130: MdbUtil.close(session);
131: session = null;
132: MdbUtil.close(connection);
133: connection = null;
134: }
135:
136: private synchronized Session getSession() {
137: return session;
138: }
139:
140: public synchronized MessageProducer getProducer() {
141: return producer;
142: }
143:
144: public Object invoke(Object proxy, Method method, Object[] args)
145: throws Throwable {
146: Session session = getSession();
147: if (session == null)
148: throw new IllegalStateException(
149: "Proxy has been destroyed");
150:
151: // create request
152: Map<String, Object> request = new TreeMap<String, Object>();
153: String signature = MdbUtil.getSignature(method);
154: request.put("method", signature);
155: request.put("args", args);
156:
157: // create a new temp response queue and consumer
158: // this is very inefficient, but eliminates a whole class of errors
159: Destination responseQueue = session.createTemporaryQueue();
160:
161: // Create a messages
162: ObjectMessage requestMessage = session
163: .createObjectMessage();
164: requestMessage.setJMSReplyTo(responseQueue);
165: String correlationId = UUID.randomUUID().toString();
166: requestMessage.setJMSCorrelationID(correlationId);
167: requestMessage.setObject((Serializable) request);
168:
169: // create a producer and consumers used to send requests and receive response
170: MessageConsumer consumer = null;
171: try {
172: // Send the request
173: getProducer().send(requestMessage);
174:
175: // System.out.println("\n" + "***************************************\n" +
176: // "Sent request message: " + requestMessage + "\n" +
177: // " request map: " + request + "\n" +
178: // " to queue: " + requestQueue + "\n" +
179: // "***************************************\n\n");
180: //
181: // Wait for a message
182: // Again this is quite inefficient
183: consumer = session.createConsumer(responseQueue);
184:
185: // wait for the message
186: Message message = consumer
187: .receive(MdbProxy.MdbInvocationHandler.MAX_RESPONSE_WAIT);
188:
189: // verify message
190: if (message == null)
191: throw new NullPointerException(
192: "Did not get a response withing "
193: + MdbProxy.MdbInvocationHandler.MAX_RESPONSE_WAIT
194: + "ms");
195: if (!correlationId
196: .equals(message.getJMSCorrelationID())) {
197: throw new IllegalStateException(
198: "Received a response message with the wrong correlation id");
199: }
200: if (!(message instanceof ObjectMessage))
201: throw new IllegalArgumentException(
202: "Expected a ObjectMessage response but got a "
203: + message.getClass().getName());
204: ObjectMessage resMessage = (ObjectMessage) message;
205: Serializable object = resMessage.getObject();
206: if (object == null)
207: throw new NullPointerException(
208: "object in ObjectMessage is null");
209: if (!(object instanceof Map)) {
210: if (message instanceof ObjectMessage)
211: throw new IllegalArgumentException(
212: "Expected a Map contained in the ObjectMessage response but got a "
213: + object.getClass().getName());
214: }
215: Map response = (Map) object;
216:
217: // process results
218: boolean exception = response.containsKey("exception");
219: Object returnValue = response.get("return");
220: if (exception) {
221: throw (Throwable) returnValue;
222: }
223: return returnValue;
224: } finally {
225: MdbUtil.close(consumer);
226: }
227: }
228: }
229:
230: private MdbProxy() {
231: }
232: }
|