001: /*
002: * Copyright (c) 1998-2008 Caucho Technology -- all rights reserved
003: *
004: * This file is part of Resin(R) Open Source
005: *
006: * Each copy or derived work must preserve the copyright notice and this
007: * notice unmodified.
008: *
009: * Resin Open Source is free software; you can redistribute it and/or modify
010: * it under the terms of the GNU General Public License as published by
011: * the Free Software Foundation; either version 2 of the License, or
012: * (at your option) any later version.
013: *
014: * Resin Open Source is distributed in the hope that it will be useful,
015: * but WITHOUT ANY WARRANTY; without even the implied warranty of
016: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE, or any warranty
017: * of NON-INFRINGEMENT. See the GNU General Public License for more
018: * details.
019: *
020: * You should have received a copy of the GNU General Public License
021: * along with Resin Open Source; if not, write to the
022: *
023: * Free Software Foundation, Inc.
024: * 59 Temple Place, Suite 330
025: * Boston, MA 02111-1307 USA
026: *
027: * @author Scott Ferguson
028: */
029:
030: package com.caucho.ejb.message;
031:
032: import java.util.*;
033: import java.util.logging.*;
034: import java.lang.reflect.*;
035:
036: import javax.jms.*;
037: import javax.resource.*;
038: import javax.resource.spi.*;
039: import javax.resource.spi.endpoint.*;
040: import javax.resource.spi.work.*;
041: import javax.transaction.xa.*;
042:
043: import com.caucho.config.*;
044:
045: public class JmsResourceAdapter implements ResourceAdapter {
046: private static final Logger log = Logger
047: .getLogger(JmsResourceAdapter.class.getName());
048:
049: private static final Method _onMessage;
050:
051: private final String _ejbName;
052: private final ConnectionFactory _connectionFactory;
053: private final Destination _destination;
054:
055: private int _consumerMax = 5;
056: private int _acknowledgeMode = Session.AUTO_ACKNOWLEDGE;
057:
058: private String _subscriptionName;
059: private String _selector;
060:
061: private Connection _connection;
062: private MessageEndpointFactory _endpointFactory;
063:
064: private ArrayList<Consumer> _consumers;
065:
066: public JmsResourceAdapter(String ejbName,
067: ConnectionFactory factory, Destination destination) {
068: assert (factory != null);
069: assert (destination != null);
070:
071: _ejbName = ejbName;
072: _connectionFactory = factory;
073:
074: _destination = destination;
075: }
076:
077: public void setMessageSelector(String selector) {
078: _selector = selector;
079: }
080:
081: public void setSubscriptionName(String subscriptionName) {
082: _subscriptionName = subscriptionName;
083: }
084:
085: public void setConsumerMax(int consumerMax) {
086: _consumerMax = consumerMax;
087: }
088:
089: public void setAcknowledgeMode(int acknowledgeMode) {
090: _acknowledgeMode = acknowledgeMode;
091: }
092:
093: public void start(BootstrapContext ctx)
094: throws ResourceAdapterInternalException {
095: }
096:
097: /**
098: * Called when the resource adapter is stopped.
099: */
100: public void stop() throws ResourceAdapterInternalException {
101: }
102:
103: /**
104: * Called during activation of a message endpoint.
105: */
106: public void endpointActivation(
107: MessageEndpointFactory endpointFactory, ActivationSpec spec)
108: throws NotSupportedException {
109: synchronized (this ) {
110: if (_consumers != null)
111: throw new java.lang.IllegalStateException();
112: _consumers = new ArrayList<Consumer>();
113: }
114:
115: try {
116: assert (_connectionFactory != null);
117: assert (_destination != null);
118: assert (_consumerMax > 0);
119:
120: _endpointFactory = endpointFactory;
121:
122: Connection connection = _connectionFactory
123: .createConnection();
124: _connection = connection;
125:
126: if (_destination instanceof Topic)
127: _consumerMax = 1;
128:
129: for (int i = 0; i < _consumerMax; i++) {
130: Consumer consumer = new Consumer(_connection,
131: _destination);
132:
133: _consumers.add(consumer);
134:
135: consumer.start();
136: }
137:
138: _connection.start();
139: } catch (RuntimeException e) {
140: throw e;
141: } catch (Exception e) {
142: throw new RuntimeException(e);
143: }
144: }
145:
146: /**
147: * Called during deactivation of a message endpoint.
148: */
149: public void endpointDeactivation(
150: MessageEndpointFactory endpointFactory, ActivationSpec spec) {
151: try {
152: ArrayList<Consumer> consumers = new ArrayList<Consumer>(
153: _consumers);
154: _consumers = null;
155:
156: if (consumers != null) {
157: for (Consumer consumer : consumers) {
158: consumer.destroy();
159: }
160: }
161:
162: if (_connection != null)
163: _connection.close();
164: } catch (Exception e) {
165: log.log(Level.WARNING, e.toString(), e);
166: }
167: }
168:
169: /**
170: * Called during crash recovery.
171: */
172: public XAResource[] getXAResources(ActivationSpec[] specs)
173: throws ResourceException {
174: return new XAResource[0];
175: }
176:
177: public String toString() {
178: return getClass().getName() + "[" + _ejbName + ","
179: + _destination + "]";
180: }
181:
182: class Consumer {
183: private Session _session;
184:
185: private XAResource _xaResource;
186: private MessageConsumer _consumer;
187:
188: private MessageEndpoint _endpoint;
189: private MessageListener _listener;
190:
191: Consumer(Connection conn, Destination destination)
192: throws Exception {
193: boolean transacted = true;
194:
195: _session = conn.createSession(transacted, _acknowledgeMode);
196:
197: if (_session instanceof XASession)
198: _xaResource = ((XASession) _session).getXAResource();
199:
200: _endpoint = _endpointFactory.createEndpoint(_xaResource);
201:
202: _listener = (MessageListener) _endpoint;
203: }
204:
205: /**
206: * Creates the session.
207: */
208: void start() throws Exception {
209: if (_subscriptionName != null) {
210: Topic topic = (Topic) _destination;
211:
212: _consumer = _session.createDurableSubscriber(topic,
213: _subscriptionName, _selector, true);
214: } else {
215: _consumer = _session.createConsumer(_destination,
216: _selector);
217: }
218:
219: _consumer.setMessageListener(_listener);
220: }
221:
222: /**
223: * Returns the session.
224: */
225: public Session getSession() throws JMSException {
226: return _session;
227: }
228:
229: /**
230: * Destroys the listener.
231: */
232: private void destroy() throws JMSException {
233: _endpoint.release();
234: }
235: }
236:
237: static {
238: try {
239: _onMessage = MessageListener.class.getMethod("onMessage",
240: new Class[] { Message.class });
241: } catch (Exception e) {
242: throw ConfigException.create(e);
243: }
244: }
245: }
|