001: /**
002: *
003: * Licensed to the Apache Software Foundation (ASF) under one or more
004: * contributor license agreements. See the NOTICE file distributed with
005: * this work for additional information regarding copyright ownership.
006: * The ASF licenses this file to You under the Apache License, Version 2.0
007: * (the "License"); you may not use this file except in compliance with
008: * the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing, software
013: * distributed under the License is distributed on an "AS IS" BASIS,
014: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015: * See the License for the specific language governing permissions and
016: * limitations under the License.
017: */package org.apache.openejb.core.mdb;
018:
019: import javax.jms.Connection;
020: import javax.jms.ConnectionFactory;
021: import javax.jms.DeliveryMode;
022: import javax.jms.Destination;
023: import javax.jms.JMSException;
024: import javax.jms.Message;
025: import javax.jms.MessageConsumer;
026: import javax.jms.MessageProducer;
027: import javax.jms.Session;
028: import javax.jms.ObjectMessage;
029: import java.lang.reflect.InvocationHandler;
030: import java.lang.reflect.Method;
031: import java.lang.reflect.Proxy;
032: import java.util.UUID;
033: import java.util.Map;
034: import java.util.TreeMap;
035: import java.io.Serializable;
036:
037: public class MdbProxy {
038: @SuppressWarnings({"unchecked"})
039: public static <T> T newProxyInstance(Class<T> type,
040: ConnectionFactory connectionFactory, String requestQueueName)
041: throws JMSException {
042: ClassLoader classLoader = Thread.currentThread()
043: .getContextClassLoader();
044: if (classLoader == null)
045: classLoader = type.getClassLoader();
046: if (classLoader == null)
047: classLoader = ClassLoader.getSystemClassLoader();
048:
049: InvocationHandler invocationHandler = new MdbInvocationHandler(
050: connectionFactory, requestQueueName);
051: Object proxy = Proxy.newProxyInstance(classLoader,
052: new Class[] { type }, invocationHandler);
053: return (T) proxy;
054: }
055:
056: public static void destroyProxy(Object proxy) {
057: InvocationHandler handler = Proxy.getInvocationHandler(proxy);
058: if (handler instanceof MdbProxy) {
059: MdbInvocationHandler mdbInvocationHandler = (MdbInvocationHandler) handler;
060: mdbInvocationHandler.destroy();
061: }
062: }
063:
064: private static class MdbInvocationHandler implements
065: InvocationHandler {
066: private static final int MAX_RESPONSE_WAIT = 1000;
067: private Connection connection;
068: private Session session;
069: private MessageProducer producer;
070:
071: public MdbInvocationHandler(
072: ConnectionFactory connectionFactory,
073: String requestQueueName) throws JMSException {
074: // open a connection
075: connection = connectionFactory.createConnection();
076: connection.start();
077:
078: // create a session
079: session = connection.createSession(false,
080: Session.AUTO_ACKNOWLEDGE);
081:
082: // create the request queue
083: Destination requestQueue = session
084: .createQueue(requestQueueName);
085:
086: // create a producer which is used to send requests
087: producer = session.createProducer(requestQueue);
088: producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
089: }
090:
091: public synchronized void destroy() {
092: MdbUtil.close(producer);
093: producer = null;
094: MdbUtil.close(session);
095: session = null;
096: MdbUtil.close(connection);
097: connection = null;
098: }
099:
100: private synchronized Session getSession() {
101: return session;
102: }
103:
104: public synchronized MessageProducer getProducer() {
105: return producer;
106: }
107:
108: public Object invoke(Object proxy, Method method, Object[] args)
109: throws Throwable {
110: Session session = getSession();
111: if (session == null)
112: throw new IllegalStateException(
113: "Proxy has been destroyed");
114:
115: // create request
116: Map<String, Object> request = new TreeMap<String, Object>();
117: String signature = MdbUtil.getSignature(method);
118: request.put("method", signature);
119: request.put("args", args);
120:
121: // create a new temp response queue and consumer
122: // this is very inefficient, but eliminates a whole class of errors
123: Destination responseQueue = session.createTemporaryQueue();
124:
125: // Create a messages
126: ObjectMessage reqMessage = session.createObjectMessage();
127: reqMessage.setJMSReplyTo(responseQueue);
128: String correlationId = UUID.randomUUID().toString();
129: reqMessage.setJMSCorrelationID(correlationId);
130: reqMessage.setObject((Serializable) request);
131:
132: // Send the request
133: getProducer().send(reqMessage);
134:
135: // Wait for a message
136: // Again this is quite inefficient
137: MessageConsumer consumer = session
138: .createConsumer(responseQueue);
139: try {
140: // wait for the message
141: Message message = consumer.receive(MAX_RESPONSE_WAIT);
142:
143: // verify message
144: if (message == null)
145: throw new NullPointerException("message is null");
146: if (!correlationId
147: .equals(message.getJMSCorrelationID())) {
148: throw new IllegalStateException(
149: "Recieved a response message with the wrong correlation id");
150: }
151: if (!(message instanceof ObjectMessage))
152: throw new IllegalArgumentException(
153: "Expected a ObjectMessage response but got a "
154: + message.getClass().getName());
155: ObjectMessage resMessage = (ObjectMessage) message;
156: Serializable object = resMessage.getObject();
157: if (object == null)
158: throw new NullPointerException(
159: "object in ObjectMessage is null");
160: if (!(object instanceof Map)) {
161: if (message instanceof ObjectMessage)
162: throw new IllegalArgumentException(
163: "Expected a Map contained in the ObjectMessage response but got a "
164: + object.getClass().getName());
165: }
166: Map response = (Map) object;
167:
168: // process results
169: boolean exception = response.containsKey("exception");
170: Object returnValue = response.get("return");
171: if (exception) {
172: throw (Throwable) returnValue;
173: }
174: return returnValue;
175: } finally {
176: MdbUtil.close(consumer);
177: }
178: }
179: }
180:
181: private MdbProxy() {
182: }
183: }
|