001: /*
002: * Copyright 2005 Joe Walker
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016: package org.directwebremoting.jms;
017:
018: import javax.jms.Destination;
019: import javax.jms.IllegalStateException;
020: import javax.jms.JMSException;
021: import javax.jms.Message;
022: import javax.jms.MessageConsumer;
023: import javax.jms.MessageListener;
024: import javax.servlet.ServletContext;
025:
026: import org.apache.commons.logging.Log;
027: import org.apache.commons.logging.LogFactory;
028: import org.directwebremoting.Hub;
029: import org.directwebremoting.HubFactory;
030: import org.directwebremoting.event.MessageEvent;
031:
032: /**
033: * A {@link MessageConsumer} for DWR
034: * @author Joe Walker [joe at getahead dot ltd dot uk]
035: */
036: public class DwrMessageConsumer implements MessageConsumer {
037: /**
038: * @see javax.jms.Session#createConsumer(Destination)
039: */
040: public DwrMessageConsumer(DwrConnection connection,
041: Destination destination) throws JMSException {
042: this (connection, destination, null, false);
043: }
044:
045: /**
046: * @throws JMSException
047: * @see javax.jms.Session#createConsumer(Destination, String)
048: */
049: public DwrMessageConsumer(DwrConnection connection,
050: Destination destination, String messageSelector)
051: throws JMSException {
052: this (connection, destination, messageSelector, false);
053: }
054:
055: /**
056: * @see javax.jms.Session#createConsumer(Destination, String, boolean)
057: */
058: public DwrMessageConsumer(DwrConnection connection,
059: Destination destination, String messageSelector,
060: boolean noLocal) throws JMSException {
061: this .destination = destination;
062: setMessageSelector(messageSelector);
063: this .noLocal = noLocal;
064:
065: ServletContext servletContext = connection.getServletContext();
066:
067: if (servletContext != null) {
068: hub = HubFactory.get(servletContext);
069: } else {
070: hub = HubFactory.get();
071: }
072:
073: if (destination instanceof DwrTopic) {
074: DwrTopic topic = (DwrTopic) destination;
075: String topicName = topic.getTopicName();
076:
077: hub.subscribe(topicName,
078: new MessageListenerMessageListener());
079: } else {
080: throw new IllegalStateException(
081: "Unsuported Destination type ("
082: + destination.getClass().getCanonicalName()
083: + "). Only Topics are currently supported.");
084: }
085: }
086:
087: /**
088: * @author Joe Walker [joe at getahead dot ltd dot uk]
089: */
090: public final class MessageListenerMessageListener implements
091: org.directwebremoting.event.MessageListener {
092: public void onMessage(MessageEvent message) {
093: if (messageListener != null) {
094: messageListener.onMessage(new DwrMessage(message
095: .getHub(), message));
096: }
097: }
098: }
099:
100: /* (non-Javadoc)
101: * @see javax.jms.MessageConsumer#close()
102: */
103: public void close() throws JMSException {
104: }
105:
106: /* (non-Javadoc)
107: * @see javax.jms.MessageConsumer#getMessageListener()
108: */
109: public MessageListener getMessageListener() throws JMSException {
110: return messageListener;
111: }
112:
113: /* (non-Javadoc)
114: * @see javax.jms.MessageConsumer#setMessageListener(javax.jms.MessageListener)
115: */
116: public void setMessageListener(MessageListener messageListener)
117: throws JMSException {
118: this .messageListener = messageListener;
119: }
120:
121: /**
122: * @param messageSelector the messageSelector to set
123: */
124: public void setMessageSelector(String messageSelector) {
125: this .messageSelector = messageSelector;
126:
127: if (messageSelector != null && messageSelector.length() != 0) {
128: throw Unsupported.noMessageSelectors();
129: }
130: }
131:
132: /* (non-Javadoc)
133: * @see javax.jms.MessageConsumer#getMessageSelector()
134: */
135: public String getMessageSelector() throws JMSException {
136: return messageSelector;
137: }
138:
139: /* (non-Javadoc)
140: * @see javax.jms.MessageConsumer#receive()
141: */
142: public Message receive() throws JMSException {
143: MessageListener old = getMessageListener();
144:
145: BlockingMessageListener listener = new BlockingMessageListener();
146:
147: setMessageListener(listener);
148: Message message = listener.receive(0);
149: setMessageListener(old);
150:
151: return message;
152: }
153:
154: /* (non-Javadoc)
155: * @see javax.jms.MessageConsumer#receive(long)
156: */
157: public Message receive(long timeout) throws JMSException {
158: MessageListener old = getMessageListener();
159:
160: BlockingMessageListener listener = new BlockingMessageListener();
161:
162: setMessageListener(listener);
163: Message message = listener.receive(timeout);
164: setMessageListener(old);
165:
166: return message;
167: }
168:
169: /* (non-Javadoc)
170: * @see javax.jms.MessageConsumer#receiveNoWait()
171: */
172: public Message receiveNoWait() throws JMSException {
173: log
174: .warn("MessageConsumer.receiveNoWait() probably doesn't do what you need in a queue context");
175:
176: MessageListener old = getMessageListener();
177:
178: BlockingMessageListener listener = new BlockingMessageListener();
179:
180: setMessageListener(listener);
181: Message message = listener.receive(1);
182: setMessageListener(old);
183:
184: return message;
185: }
186:
187: /**
188: * The Hub for this instance of DWR
189: */
190: protected Hub hub;
191:
192: /**
193: * A way to filter messages based on message properties
194: */
195: protected String messageSelector;
196:
197: /**
198: * The queue or topic that this message is destined for
199: */
200: protected Destination destination;
201:
202: /**
203: * Do we get messages that are sent locally?
204: */
205: protected boolean noLocal;
206:
207: /**
208: * What we do with inbound messages
209: */
210: protected MessageListener messageListener;
211:
212: /**
213: * The log stream
214: */
215: private static final Log log = LogFactory
216: .getLog(DwrMessageConsumer.class);
217: }
|