001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.resource.adapter.jms;
023:
024: import javax.jms.BytesMessage;
025: import javax.jms.JMSException;
026: import javax.jms.MapMessage;
027: import javax.jms.Message;
028: import javax.jms.MessageConsumer;
029: import javax.jms.MessageListener;
030: import javax.jms.ObjectMessage;
031: import javax.jms.StreamMessage;
032: import javax.jms.TextMessage;
033:
034: import org.jboss.logging.Logger;
035:
036: /**
037: * A wrapper for a message consumer
038: *
039: * @author <a href="mailto:adrian@jboss.com">Adrian Brock</a>
040: * @version $Revision: 57189 $
041: */
042: public class JmsMessageConsumer implements MessageConsumer {
043: private static final Logger log = Logger
044: .getLogger(JmsMessageConsumer.class);
045:
046: /** The wrapped message consumer */
047: MessageConsumer consumer;
048:
049: /** The session for this consumer */
050: JmsSession session;
051:
052: /** Whether trace is enabled */
053: private boolean trace = log.isTraceEnabled();
054:
055: /**
056: * Create a new wrapper
057: *
058: * @param consumer the consumer
059: * @param session the session
060: */
061: public JmsMessageConsumer(MessageConsumer consumer,
062: JmsSession session) {
063: this .consumer = consumer;
064: this .session = session;
065:
066: if (trace)
067: log.trace("new JmsMessageConsumer " + this + " consumer="
068: + consumer + " session=" + session);
069: }
070:
071: public void close() throws JMSException {
072: if (trace)
073: log.trace("close " + this );
074: try {
075: closeConsumer();
076: } finally {
077: session.removeConsumer(this );
078: }
079: }
080:
081: public MessageListener getMessageListener() throws JMSException {
082: session.checkStrict();
083: return consumer.getMessageListener();
084: }
085:
086: public String getMessageSelector() throws JMSException {
087: return consumer.getMessageSelector();
088: }
089:
090: public Message receive() throws JMSException {
091: if (trace)
092: log.trace("receive " + this );
093: Message message = consumer.receive();
094: if (trace)
095: log.trace("received " + this + " result=" + message);
096: if (message == null)
097: return null;
098: else
099: return wrapMessage(message);
100: }
101:
102: public Message receive(long timeout) throws JMSException {
103: if (trace)
104: log.trace("receive " + this + " timeout=" + timeout);
105: Message message = consumer.receive(timeout);
106: if (trace)
107: log.trace("received " + this + " result=" + message);
108: if (message == null)
109: return null;
110: else
111: return wrapMessage(message);
112: }
113:
114: public Message receiveNoWait() throws JMSException {
115: if (trace)
116: log.trace("receiveNoWait " + this );
117: Message message = consumer.receiveNoWait();
118: if (trace)
119: log.trace("received " + this + " result=" + message);
120: if (message == null)
121: return null;
122: else
123: return wrapMessage(message);
124: }
125:
126: public void setMessageListener(MessageListener listener)
127: throws JMSException {
128: session.checkStrict();
129: if (listener == null)
130: consumer.setMessageListener(null);
131: else
132: consumer.setMessageListener(wrapMessageListener(listener));
133: }
134:
135: void closeConsumer() throws JMSException {
136: consumer.close();
137: }
138:
139: Message wrapMessage(Message message) {
140: if (message instanceof BytesMessage)
141: return new JmsBytesMessage((BytesMessage) message, session);
142: else if (message instanceof MapMessage)
143: return new JmsMapMessage((MapMessage) message, session);
144: else if (message instanceof ObjectMessage)
145: return new JmsObjectMessage((ObjectMessage) message,
146: session);
147: else if (message instanceof StreamMessage)
148: return new JmsStreamMessage((StreamMessage) message,
149: session);
150: else if (message instanceof TextMessage)
151: return new JmsTextMessage((TextMessage) message, session);
152: return new JmsMessage(message, session);
153: }
154:
155: MessageListener wrapMessageListener(MessageListener listener) {
156: return new JmsMessageListener(listener, this);
157: }
158: }
|