001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.resource.adapter.jms;
023:
024: import java.io.Serializable;
025: import java.util.HashSet;
026: import java.util.Iterator;
027:
028: import javax.jms.BytesMessage;
029: import javax.jms.Destination;
030: import javax.jms.IllegalStateException;
031: import javax.jms.InvalidDestinationException;
032: import javax.jms.JMSException;
033: import javax.jms.MessageListener;
034: import javax.jms.MapMessage;
035: import javax.jms.Message;
036: import javax.jms.MessageConsumer;
037: import javax.jms.MessageProducer;
038: import javax.jms.ObjectMessage;
039: import javax.jms.Queue;
040: import javax.jms.QueueBrowser;
041: import javax.jms.QueueReceiver;
042: import javax.jms.QueueSender;
043: import javax.jms.QueueSession;
044: import javax.jms.Session;
045: import javax.jms.StreamMessage;
046: import javax.jms.TemporaryQueue;
047: import javax.jms.TemporaryTopic;
048: import javax.jms.TextMessage;
049: import javax.jms.Topic;
050: import javax.jms.TopicPublisher;
051: import javax.jms.TopicSession;
052: import javax.jms.TopicSubscriber;
053: import javax.resource.spi.ConnectionEvent;
054:
055: import org.jboss.logging.Logger;
056:
057: /**
058: * Adapts the JMS QueueSession and TopicSession API to a JmsManagedConnection.
059: *
060: * @author <a href="mailto:peter.antman@tim.se">Peter Antman </a>.
061: * @author <a href="mailto:jason@planet57.com">Jason Dillon </a>.
062: * @author <a href="mailto:adrian@jboss.com">Adrian Brock</a>
063: * @version $Revision: 63350 $
064: */
065: public class JmsSession implements Session, QueueSession, TopicSession {
066: private static final Logger log = Logger
067: .getLogger(JmsSession.class);
068:
069: /** The managed connection for this session. */
070: private JmsManagedConnection mc; // = null;
071:
072: /** The connection request info */
073: private JmsConnectionRequestInfo info;
074:
075: /** The session factory for this session */
076: private JmsSessionFactory sf;
077:
078: /** The message consumers */
079: private HashSet consumers = new HashSet();
080:
081: /** The message producers */
082: private HashSet producers = new HashSet();
083:
084: /** Whether trace is enabled */
085: private boolean trace = log.isTraceEnabled();
086:
087: /**
088: * Construct a <tt>JmsSession</tt>.
089: *
090: * @param mc The managed connection for this session.
091: */
092: public JmsSession(final JmsManagedConnection mc,
093: JmsConnectionRequestInfo info) {
094: this .mc = mc;
095: this .info = info;
096: if (trace)
097: log.trace("new JmsSession " + this + " mc=" + mc + " cri="
098: + info);
099: }
100:
101: public void setJmsSessionFactory(JmsSessionFactory sf) {
102: this .sf = sf;
103: }
104:
105: /**
106: * Ensure that the session is opened.
107: *
108: * @return The session
109: *
110: * @throws IllegalStateException The session is closed
111: */
112: Session getSession() throws JMSException {
113: // ensure that the connection is opened
114: if (mc == null)
115: throw new IllegalStateException("The session is closed");
116:
117: Session session = mc.getSession();
118: if (trace)
119: log.trace("getSession " + session + " for " + this );
120: return session;
121: }
122:
123: // ---- Session API
124:
125: public BytesMessage createBytesMessage() throws JMSException {
126: Session session = getSession();
127: if (trace)
128: log.trace("createBytesMessage" + session);
129: return session.createBytesMessage();
130: }
131:
132: public MapMessage createMapMessage() throws JMSException {
133: Session session = getSession();
134: if (trace)
135: log.trace("createMapMessage" + session);
136: return session.createMapMessage();
137: }
138:
139: public Message createMessage() throws JMSException {
140: Session session = getSession();
141: if (trace)
142: log.trace("createMessage" + session);
143: return session.createMessage();
144: }
145:
146: public ObjectMessage createObjectMessage() throws JMSException {
147: Session session = getSession();
148: if (trace)
149: log.trace("createObjectMessage" + session);
150: return session.createObjectMessage();
151: }
152:
153: public ObjectMessage createObjectMessage(Serializable object)
154: throws JMSException {
155: Session session = getSession();
156: if (trace)
157: log.trace("createObjectMessage(Object)" + session);
158: return session.createObjectMessage(object);
159: }
160:
161: public StreamMessage createStreamMessage() throws JMSException {
162: Session session = getSession();
163: if (trace)
164: log.trace("createStreamMessage" + session);
165: return session.createStreamMessage();
166: }
167:
168: public TextMessage createTextMessage() throws JMSException {
169: Session session = getSession();
170: if (trace)
171: log.trace("createTextMessage" + session);
172: return session.createTextMessage();
173: }
174:
175: public TextMessage createTextMessage(String string)
176: throws JMSException {
177: Session session = getSession();
178: if (trace)
179: log.trace("createTextMessage(String)" + session);
180: return session.createTextMessage(string);
181: }
182:
183: public boolean getTransacted() throws JMSException {
184: getSession(); // check closed
185: return info.isTransacted();
186: }
187:
188: /**
189: * Always throws an Exception.
190: *
191: * @throws IllegalStateException Method not allowed.
192: */
193: public MessageListener getMessageListener() throws JMSException {
194: throw new IllegalStateException("Method not allowed");
195: }
196:
197: /**
198: * Always throws an Exception.
199: *
200: * @throws IllegalStateException Method not allowed.
201: */
202: public void setMessageListener(MessageListener listener)
203: throws JMSException {
204: throw new IllegalStateException("Method not allowed");
205: }
206:
207: /**
208: * Always throws an Error.
209: *
210: * @throws Error Method not allowed.
211: */
212: public void run() {
213: // should this really throw an Error?
214: throw new Error("Method not allowed");
215: }
216:
217: /**
218: * Closes the session. Sends a ConnectionEvent.CONNECTION_CLOSED to the
219: * managed connection.
220: *
221: * @throws JMSException Failed to close session.
222: */
223: public void close() throws JMSException {
224: sf.closeSession(this );
225: closeSession();
226: }
227:
228: // FIXME - is this really OK, probably not
229: public void commit() throws JMSException {
230: Session session = getSession();
231: if (info.isTransacted() == false)
232: throw new IllegalStateException("Session is not transacted");
233: if (trace)
234: log.trace("Commit session " + this );
235: session.commit();
236: }
237:
238: public void rollback() throws JMSException {
239: Session session = getSession();
240: if (info.isTransacted() == false)
241: throw new IllegalStateException("Session is not transacted");
242: if (trace)
243: log.trace("Rollback session " + this );
244: session.rollback();
245: }
246:
247: public void recover() throws JMSException {
248: Session session = getSession();
249: if (info.isTransacted())
250: throw new IllegalStateException("Session is transacted");
251: if (trace)
252: log.trace("Recover session " + this );
253: session.recover();
254: }
255:
256: // --- TopicSession API
257:
258: public Topic createTopic(String topicName) throws JMSException {
259: if (info.getType() == JmsConnectionFactory.QUEUE) {
260: throw new IllegalStateException(
261: "Cannot create topic for javax.jms.QueueSession");
262: }
263:
264: Session session = getSession();
265: if (trace)
266: log.trace("createTopic " + session + " topicName="
267: + topicName);
268: Topic result = session.createTopic(topicName);
269: if (trace)
270: log.trace("createdTopic " + session + " topic=" + result);
271: return result;
272: }
273:
274: public TopicSubscriber createSubscriber(Topic topic)
275: throws JMSException {
276: TopicSession session = getTopicSession();
277: if (trace)
278: log
279: .trace("createSubscriber " + session + " topic="
280: + topic);
281: TopicSubscriber result = session.createSubscriber(topic);
282: result = new JmsTopicSubscriber(result, this );
283: if (trace)
284: log.trace("createdSubscriber " + session
285: + " JmsTopicSubscriber=" + result);
286: addConsumer(result);
287: return result;
288: }
289:
290: public TopicSubscriber createSubscriber(Topic topic,
291: String messageSelector, boolean noLocal)
292: throws JMSException {
293: TopicSession session = getTopicSession();
294: if (trace)
295: log.trace("createSubscriber " + session + " topic=" + topic
296: + " selector=" + messageSelector + " noLocal="
297: + noLocal);
298: TopicSubscriber result = session.createSubscriber(topic,
299: messageSelector, noLocal);
300: result = new JmsTopicSubscriber(result, this );
301: if (trace)
302: log.trace("createdSubscriber " + session
303: + " JmsTopicSubscriber=" + result);
304: addConsumer(result);
305: return result;
306: }
307:
308: public TopicSubscriber createDurableSubscriber(Topic topic,
309: String name) throws JMSException {
310: if (info.getType() == JmsConnectionFactory.QUEUE) {
311: throw new IllegalStateException(
312: "Cannot create durable subscriber from javax.jms.QueueSession");
313: }
314:
315: Session session = getSession();
316: if (trace)
317: log.trace("createDurableSubscriber " + session + " topic="
318: + topic + " name=" + name);
319: TopicSubscriber result = session.createDurableSubscriber(topic,
320: name);
321: result = new JmsTopicSubscriber(result, this );
322: if (trace)
323: log.trace("createdDurableSubscriber " + session
324: + " JmsTopicSubscriber=" + result);
325: addConsumer(result);
326: return result;
327: }
328:
329: public TopicSubscriber createDurableSubscriber(Topic topic,
330: String name, String messageSelector, boolean noLocal)
331: throws JMSException {
332: Session session = getSession();
333: if (trace)
334: log.trace("createDurableSubscriber " + session + " topic="
335: + topic + " name=" + name + " selector="
336: + messageSelector + " noLocal=" + noLocal);
337: TopicSubscriber result = session.createDurableSubscriber(topic,
338: name, messageSelector, noLocal);
339: result = new JmsTopicSubscriber(result, this );
340: if (trace)
341: log.trace("createdDurableSubscriber " + session
342: + " JmsTopicSubscriber=" + result);
343: addConsumer(result);
344: return result;
345: }
346:
347: public TopicPublisher createPublisher(Topic topic)
348: throws JMSException {
349: TopicSession session = getTopicSession();
350: if (trace)
351: log.trace("createPublisher " + session + " topic=" + topic);
352: TopicPublisher result = session.createPublisher(topic);
353: if (trace)
354: log.trace("createdPublisher " + session + " publisher="
355: + result);
356: addProducer(result);
357: return result;
358: }
359:
360: public TemporaryTopic createTemporaryTopic() throws JMSException {
361: if (info.getType() == JmsConnectionFactory.QUEUE) {
362: throw new IllegalStateException(
363: "Cannot create temporary topic for javax.jms.QueueSession");
364: }
365:
366: Session session = getSession();
367: if (trace)
368: log.trace("createTemporaryTopic " + session);
369: TemporaryTopic temp = session.createTemporaryTopic();
370: if (trace)
371: log.trace("createdTemporaryTopic " + session + " temp="
372: + temp);
373: sf.addTemporaryTopic(temp);
374: return temp;
375: }
376:
377: public void unsubscribe(String name) throws JMSException {
378: if (info.getType() == JmsConnectionFactory.QUEUE) {
379: throw new IllegalStateException(
380: "Cannot unsubscribe for javax.jms.QueueSession");
381: }
382:
383: Session session = getSession();
384: if (trace)
385: log.trace("unsubscribe " + session + " name=" + name);
386: session.unsubscribe(name);
387: }
388:
389: //--- QueueSession API
390:
391: public QueueBrowser createBrowser(Queue queue) throws JMSException {
392:
393: if (info.getType() == JmsConnectionFactory.TOPIC) {
394: throw new IllegalStateException(
395: "Cannot create browser for javax.jms.TopicSession");
396:
397: }
398:
399: Session session = getSession();
400: if (trace)
401: log.trace("createBrowser " + session + " queue=" + queue);
402: QueueBrowser result = session.createBrowser(queue);
403: if (trace)
404: log.trace("createdBrowser " + session + " browser="
405: + result);
406: return result;
407: }
408:
409: public QueueBrowser createBrowser(Queue queue,
410: String messageSelector) throws JMSException {
411: Session session = getSession();
412: if (trace)
413: log.trace("createBrowser " + session + " queue=" + queue
414: + " selector=" + messageSelector);
415: QueueBrowser result = session.createBrowser(queue,
416: messageSelector);
417: if (trace)
418: log.trace("createdBrowser " + session + " browser="
419: + result);
420: return result;
421: }
422:
423: public Queue createQueue(String queueName) throws JMSException {
424: if (info.getType() == JmsConnectionFactory.TOPIC) {
425: throw new IllegalStateException(
426: "Cannot create browser or javax.jms.TopicSession");
427:
428: }
429:
430: Session session = getSession();
431: if (trace)
432: log.trace("createQueue " + session + " queueName="
433: + queueName);
434: Queue result = session.createQueue(queueName);
435: if (trace)
436: log.trace("createdQueue " + session + " queue=" + result);
437: return result;
438: }
439:
440: public QueueReceiver createReceiver(Queue queue)
441: throws JMSException {
442: QueueSession session = getQueueSession();
443: if (trace)
444: log.trace("createReceiver " + session + " queue=" + queue);
445: QueueReceiver result = session.createReceiver(queue);
446: result = new JmsQueueReceiver(result, this );
447: if (trace)
448: log.trace("createdReceiver " + session + " receiver="
449: + result);
450: addConsumer(result);
451: return result;
452: }
453:
454: public QueueReceiver createReceiver(Queue queue,
455: String messageSelector) throws JMSException {
456: QueueSession session = getQueueSession();
457: if (trace)
458: log.trace("createReceiver " + session + " queue=" + queue
459: + " selector=" + messageSelector);
460: QueueReceiver result = session.createReceiver(queue,
461: messageSelector);
462: result = new JmsQueueReceiver(result, this );
463: if (trace)
464: log.trace("createdReceiver " + session + " receiver="
465: + result);
466: addConsumer(result);
467: return result;
468: }
469:
470: public QueueSender createSender(Queue queue) throws JMSException {
471: QueueSession session = getQueueSession();
472: if (trace)
473: log.trace("createSender " + session + " queue=" + queue);
474: QueueSender result = session.createSender(queue);
475: if (trace)
476: log.trace("createdSender " + session + " sender=" + result);
477: addProducer(result);
478: return result;
479: }
480:
481: public TemporaryQueue createTemporaryQueue() throws JMSException {
482: if (info.getType() == JmsConnectionFactory.TOPIC) {
483: throw new IllegalStateException(
484: "Cannot create temporary queue for javax.jms.TopicSession");
485:
486: }
487: Session session = getSession();
488: if (trace)
489: log.trace("createTemporaryQueue " + session);
490: TemporaryQueue temp = session.createTemporaryQueue();
491: if (trace)
492: log.trace("createdTemporaryQueue " + session + " temp="
493: + temp);
494: sf.addTemporaryQueue(temp);
495: return temp;
496: }
497:
498: // -- JMS 1.1
499:
500: public MessageConsumer createConsumer(Destination destination)
501: throws JMSException {
502: Session session = getSession();
503: if (trace)
504: log.trace("createConsumer " + session + " dest="
505: + destination);
506: MessageConsumer result = session.createConsumer(destination);
507: result = new JmsMessageConsumer(result, this );
508: if (trace)
509: log.trace("createdConsumer " + session + " consumer="
510: + result);
511: addConsumer(result);
512: return result;
513: }
514:
515: public MessageConsumer createConsumer(Destination destination,
516: String messageSelector) throws JMSException {
517: Session session = getSession();
518: if (trace)
519: log.trace("createConsumer " + session + " dest="
520: + destination + " messageSelector="
521: + messageSelector);
522: MessageConsumer result = session.createConsumer(destination,
523: messageSelector);
524: result = new JmsMessageConsumer(result, this );
525: if (trace)
526: log.trace("createdConsumer " + session + " consumer="
527: + result);
528: addConsumer(result);
529: return result;
530: }
531:
532: public MessageConsumer createConsumer(Destination destination,
533: String messageSelector, boolean noLocal)
534: throws JMSException {
535: Session session = getSession();
536: if (trace)
537: log.trace("createConsumer " + session + " dest="
538: + destination + " messageSelector="
539: + messageSelector + " noLocal=" + noLocal);
540: MessageConsumer result = session.createConsumer(destination,
541: messageSelector, noLocal);
542: result = new JmsMessageConsumer(result, this );
543: if (trace)
544: log.trace("createdConsumer " + session + " consumer="
545: + result);
546: addConsumer(result);
547: return result;
548: }
549:
550: public MessageProducer createProducer(Destination destination)
551: throws JMSException {
552: Session session = getSession();
553: if (trace)
554: log.trace("createProducer " + session + " dest="
555: + destination);
556: MessageProducer result = getSession().createProducer(
557: destination);
558: if (trace)
559: log.trace("createdProducer " + session + " producer="
560: + result);
561: addProducer(result);
562: return result;
563: }
564:
565: public int getAcknowledgeMode() throws JMSException {
566: getSession(); // check closed
567: return info.getAcknowledgeMode();
568: }
569:
570: // --- JmsManagedConnection api
571:
572: void setManagedConnection(final JmsManagedConnection mc) {
573: if (this .mc != null)
574: this .mc.removeHandle(this );
575: this .mc = mc;
576: }
577:
578: void destroy() {
579: mc = null;
580: }
581:
582: void start() throws JMSException {
583: if (mc != null)
584: mc.start();
585: }
586:
587: void stop() throws JMSException {
588: if (mc != null)
589: mc.stop();
590: }
591:
592: void checkStrict() throws JMSException {
593: if (mc != null && mc.getManagedConnectionFactory().isStrict())
594: throw new IllegalStateException(JmsSessionFactory.ISE);
595: }
596:
597: void closeSession() throws JMSException {
598: if (mc != null) {
599: log.trace("Closing session");
600:
601: try {
602: mc.stop();
603: } catch (Throwable t) {
604: log.trace("Error stopping managed connection", t);
605: }
606:
607: synchronized (consumers) {
608: for (Iterator i = consumers.iterator(); i.hasNext();) {
609: JmsMessageConsumer consumer = (JmsMessageConsumer) i
610: .next();
611: try {
612: consumer.closeConsumer();
613: } catch (Throwable t) {
614: log.trace("Error closing consumer", t);
615: }
616: i.remove();
617: }
618: }
619:
620: synchronized (producers) {
621: for (Iterator i = producers.iterator(); i.hasNext();) {
622: MessageProducer producer = (MessageProducer) i
623: .next();
624: try {
625: producer.close();
626: } catch (Throwable t) {
627: log.trace("Error closing producer", t);
628: }
629: i.remove();
630: }
631: }
632:
633: mc.removeHandle(this );
634: ConnectionEvent ev = new ConnectionEvent(mc,
635: ConnectionEvent.CONNECTION_CLOSED);
636: ev.setConnectionHandle(this );
637: mc.sendEvent(ev);
638: mc = null;
639: }
640: }
641:
642: void addConsumer(MessageConsumer consumer) {
643: synchronized (consumers) {
644: consumers.add(consumer);
645: }
646: }
647:
648: void removeConsumer(MessageConsumer consumer) {
649: synchronized (consumers) {
650: consumers.remove(consumer);
651: }
652: }
653:
654: void addProducer(MessageProducer producer) {
655: synchronized (producers) {
656: producers.add(producer);
657: }
658: }
659:
660: void removeProducer(MessageProducer producer) {
661: synchronized (producers) {
662: producers.remove(producer);
663: }
664: }
665:
666: QueueSession getQueueSession() throws JMSException {
667: Session s = getSession();
668: if (!(s instanceof QueueSession))
669: throw new InvalidDestinationException(
670: "Attempting to use QueueSession methods on: "
671: + this );
672: return (QueueSession) s;
673: }
674:
675: TopicSession getTopicSession() throws JMSException {
676: Session s = getSession();
677: if (!(s instanceof TopicSession))
678: throw new InvalidDestinationException(
679: "Attempting to use TopicSession methods on: "
680: + this );
681: return (TopicSession) s;
682: }
683: }
|