001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.resource.adapter.jms;
023:
024: import java.util.HashSet;
025: import java.util.Iterator;
026:
027: import javax.jms.ConnectionConsumer;
028: import javax.jms.ConnectionMetaData;
029: import javax.jms.Destination;
030: import javax.jms.ExceptionListener;
031: import javax.jms.IllegalStateException;
032: import javax.jms.JMSException;
033: import javax.jms.Queue;
034: import javax.jms.QueueSession;
035: import javax.jms.ServerSessionPool;
036: import javax.jms.Session;
037: import javax.jms.TemporaryQueue;
038: import javax.jms.TemporaryTopic;
039: import javax.jms.Topic;
040: import javax.jms.TopicSession;
041: import javax.naming.Reference;
042: import javax.resource.Referenceable;
043: import javax.resource.ResourceException;
044: import javax.resource.spi.ConnectionManager;
045: import javax.resource.spi.ManagedConnectionFactory;
046:
047: import org.jboss.logging.Logger;
048:
049: /**
050: * Implements the JMS Connection API and produces {@link JmsSession} objects.
051: *
052: * @author <a href="mailto:peter.antman@tim.se">Peter Antman</a>.
053: * @author <a href="mailto:jason@planet57.com">Jason Dillon</a>
054: * @author <a href="mailto:adrian@jboss.com">Adrian Brock</a>
055: * @version <tt>$Revision: 57189 $</tt>
056: */
057: public class JmsSessionFactoryImpl implements JmsSessionFactory,
058: Referenceable {
059: private static final Logger log = Logger
060: .getLogger(JmsSessionFactoryImpl.class);
061:
062: /** Are we closed? */
063: private boolean closed = false;
064:
065: /** Whether trace is enabled */
066: private boolean trace = log.isTraceEnabled();
067:
068: private Reference reference;
069:
070: // Used from JmsConnectionFactory
071: private String userName;
072: private String password;
073: private String clientID;
074: private int type;
075:
076: /* Whether we are started */
077: private boolean started = false;
078:
079: /** JmsRa own factory */
080: private JmsManagedConnectionFactory mcf;
081:
082: /** Hook to the appserver */
083: private ConnectionManager cm;
084:
085: /** The sessions */
086: private HashSet sessions = new HashSet();
087:
088: /** The temporary queues */
089: private HashSet tempQueues = new HashSet();
090:
091: /** The temporary topics */
092: private HashSet tempTopics = new HashSet();
093:
094: public JmsSessionFactoryImpl(final ManagedConnectionFactory mcf,
095: final ConnectionManager cm, final int type) {
096: this .mcf = (JmsManagedConnectionFactory) mcf;
097: this .cm = cm;
098:
099: if (cm == null)
100: // This is standalone usage, no appserver
101: this .cm = new JmsConnectionManager();
102: else
103: this .cm = cm;
104:
105: this .type = type;
106:
107: if (trace)
108: log.trace("mcf=" + mcf + ", cm=" + cm + ", type=" + type);
109: }
110:
111: public void setReference(final Reference reference) {
112: this .reference = reference;
113: }
114:
115: public Reference getReference() {
116: return reference;
117: }
118:
119: // --- API for JmsConnectionFactoryImpl
120:
121: public void setUserName(final String name) {
122: userName = name;
123: }
124:
125: public void setPassword(final String password) {
126: this .password = password;
127: }
128:
129: //---- QueueConnection ---
130:
131: public QueueSession createQueueSession(final boolean transacted,
132: final int acknowledgeMode) throws JMSException {
133: checkClosed();
134: if (type == JmsConnectionFactory.TOPIC)
135: throw new IllegalStateException(
136: "Can not get a queue session from a topic connection");
137: return allocateConnection(transacted, acknowledgeMode, type);
138: }
139:
140: public ConnectionConsumer createConnectionConsumer(Queue queue,
141: String messageSelector, ServerSessionPool sessionPool,
142: int maxMessages) throws JMSException {
143: throw new IllegalStateException(ISE);
144: }
145:
146: //--- TopicConnection ---
147:
148: public TopicSession createTopicSession(final boolean transacted,
149: final int acknowledgeMode) throws JMSException {
150: checkClosed();
151: if (type == JmsConnectionFactory.QUEUE)
152: throw new IllegalStateException(
153: "Can not get a topic session from a queue connection");
154: return allocateConnection(transacted, acknowledgeMode, type);
155: }
156:
157: public ConnectionConsumer createConnectionConsumer(Topic topic,
158: String messageSelector, ServerSessionPool sessionPool,
159: int maxMessages) throws JMSException {
160: throw new IllegalStateException(ISE);
161: }
162:
163: public ConnectionConsumer createDurableConnectionConsumer(
164: Topic topic, String subscriptionName,
165: String messageSelector, ServerSessionPool sessionPool,
166: int maxMessages) throws JMSException {
167: throw new IllegalStateException(ISE);
168: }
169:
170: //--- All the Connection methods
171:
172: public String getClientID() throws JMSException {
173: checkClosed();
174: return clientID;
175: }
176:
177: public void setClientID(String cID) throws JMSException {
178: if (mcf.isStrict())
179: throw new IllegalStateException(ISE);
180:
181: checkClosed();
182: if (clientID != null)
183: throw new IllegalStateException("Cannot change client id");
184: clientID = cID;
185: }
186:
187: public ConnectionMetaData getMetaData() throws JMSException {
188: checkClosed();
189: return mcf.getMetaData();
190: }
191:
192: public ExceptionListener getExceptionListener() throws JMSException {
193: throw new IllegalStateException(ISE);
194: }
195:
196: public void setExceptionListener(ExceptionListener listener)
197: throws JMSException {
198: throw new IllegalStateException(ISE);
199: }
200:
201: public void start() throws JMSException {
202: checkClosed();
203: if (trace)
204: log.trace("start() " + this );
205: synchronized (sessions) {
206: if (started)
207: return;
208: started = true;
209: for (Iterator i = sessions.iterator(); i.hasNext();) {
210: JmsSession session = (JmsSession) i.next();
211: session.start();
212: }
213: }
214: }
215:
216: public void stop() throws JMSException {
217: if (mcf.isStrict())
218: throw new IllegalStateException(ISE);
219: checkClosed();
220: if (trace)
221: log.trace("stop() " + this );
222: synchronized (sessions) {
223: if (started == false)
224: return;
225: started = true;
226: for (Iterator i = sessions.iterator(); i.hasNext();) {
227: JmsSession session = (JmsSession) i.next();
228: session.stop();
229: }
230: }
231: }
232:
233: public void close() throws JMSException {
234: if (closed)
235: return;
236: closed = true;
237:
238: if (trace)
239: log.trace("close() " + this );
240:
241: synchronized (sessions) {
242: for (Iterator i = sessions.iterator(); i.hasNext();) {
243: JmsSession session = (JmsSession) i.next();
244: try {
245: session.closeSession();
246: } catch (Throwable t) {
247: log.trace("Error closing session", t);
248: }
249: i.remove();
250: }
251: }
252:
253: synchronized (tempQueues) {
254: for (Iterator i = tempQueues.iterator(); i.hasNext();) {
255: TemporaryQueue temp = (TemporaryQueue) i.next();
256: try {
257: if (trace)
258: log.trace("Closing temporary queue " + temp
259: + " for " + this );
260: temp.delete();
261: } catch (Throwable t) {
262: log.trace("Error deleting temporary queue", t);
263: }
264: i.remove();
265: }
266: }
267:
268: synchronized (tempTopics) {
269: for (Iterator i = tempTopics.iterator(); i.hasNext();) {
270: TemporaryTopic temp = (TemporaryTopic) i.next();
271: try {
272: if (trace)
273: log.trace("Closing temporary topic " + temp
274: + " for " + this );
275: temp.delete();
276: } catch (Throwable t) {
277: log.trace("Error deleting temporary queue", t);
278: }
279: i.remove();
280: }
281: }
282: }
283:
284: public void closeSession(JmsSession session) throws JMSException {
285: synchronized (sessions) {
286: sessions.remove(session);
287: }
288: }
289:
290: public void addTemporaryQueue(TemporaryQueue temp) {
291: synchronized (tempQueues) {
292: tempQueues.add(temp);
293: }
294: }
295:
296: public void addTemporaryTopic(TemporaryTopic temp) {
297: synchronized (tempTopics) {
298: tempTopics.add(temp);
299: }
300: }
301:
302: // -- JMS 1.1
303:
304: public ConnectionConsumer createConnectionConsumer(
305: Destination destination, ServerSessionPool pool,
306: int maxMessages) throws JMSException {
307: throw new IllegalStateException(ISE);
308: }
309:
310: public ConnectionConsumer createConnectionConsumer(
311: Destination destination, String name,
312: ServerSessionPool pool, int maxMessages)
313: throws JMSException {
314: throw new IllegalStateException(ISE);
315: }
316:
317: public Session createSession(boolean transacted, int acknowledgeMode)
318: throws JMSException {
319: checkClosed();
320: return allocateConnection(transacted, acknowledgeMode, type);
321: }
322:
323: protected JmsSession allocateConnection(boolean transacted,
324: int acknowledgeMode, int sessionType) throws JMSException {
325: try {
326: synchronized (sessions) {
327: if (mcf.isStrict() && sessions.isEmpty() == false)
328: throw new IllegalStateException(
329: "Only allowed one session per connection. See the J2EE spec, e.g. J2EE1.4 Section 6.6");
330: if (transacted)
331: acknowledgeMode = Session.SESSION_TRANSACTED;
332: JmsConnectionRequestInfo info = new JmsConnectionRequestInfo(
333: transacted, acknowledgeMode, sessionType);
334: info.setUserName(userName);
335: info.setPassword(password);
336: info.setClientID(clientID);
337:
338: if (trace)
339: log.trace("Allocating session for " + this
340: + " with request info=" + info);
341: JmsSession session = (JmsSession) cm
342: .allocateConnection(mcf, info);
343: if (trace)
344: log.trace("Allocated " + this + " session="
345: + session);
346: session.setJmsSessionFactory(this );
347: if (started)
348: session.start();
349: sessions.add(session);
350: return session;
351: }
352: } catch (ResourceException e) {
353: log.error("could not create session", e);
354:
355: JMSException je = new JMSException(
356: "Could not create a session: " + e);
357: je.setLinkedException(e);
358: throw je;
359: }
360: }
361:
362: protected void checkClosed() throws IllegalStateException {
363: if (closed)
364: throw new IllegalStateException("The connection is closed");
365: }
366: }
|