001: /*
002: * Copyright (c) 1998-2008 Caucho Technology -- all rights reserved
003: *
004: * This file is part of Resin(R) Open Source
005: *
006: * Each copy or derived work must preserve the copyright notice and this
007: * notice unmodified.
008: *
009: * Resin Open Source is free software; you can redistribute it and/or modify
010: * it under the terms of the GNU General Public License as published by
011: * the Free Software Foundation; either version 2 of the License, or
012: * (at your option) any later version.
013: *
014: * Resin Open Source is distributed in the hope that it will be useful,
015: * but WITHOUT ANY WARRANTY; without even the implied warranty of
016: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE, or any warranty
017: * of NON-INFRINGEMENT. See the GNU General Public License for more
018: * details.
019: *
020: * You should have received a copy of the GNU General Public License
021: * along with Resin Open Source; if not, write to the
022: *
023: * Free Software Foundation, Inc.
024: * 59 Temple Place, Suite 330
025: * Boston, MA 02111-1307 USA
026: *
027: * @author Scott Ferguson
028: */
029:
030: package com.caucho.jms.connection;
031:
032: import com.caucho.jms.message.*;
033: import com.caucho.jms.queue.*;
034: import com.caucho.jms.selector.Selector;
035: import com.caucho.jms.selector.SelectorParser;
036: import com.caucho.log.Log;
037: import com.caucho.util.Alarm;
038: import com.caucho.util.L10N;
039: import com.caucho.util.AlarmListener;
040:
041: import javax.jms.JMSException;
042: import javax.jms.Message;
043: import javax.jms.MessageConsumer;
044: import javax.jms.MessageListener;
045: import javax.jms.Session;
046: import java.util.logging.Logger;
047: import java.util.logging.Level;
048:
049: /**
050: * A basic message consumer.
051: */
052: public class MessageConsumerImpl implements MessageConsumer {
053: static final Logger log = Logger
054: .getLogger(MessageConsumerImpl.class.getName());
055: static final L10N L = new L10N(MessageConsumerImpl.class);
056:
057: private final Object _consumerLock = new Object();
058:
059: protected final JmsSession _session;
060:
061: private AbstractQueue _queue;
062:
063: private MessageListener _messageListener;
064: private ClassLoader _listenerClassLoader;
065:
066: private String _messageSelector;
067: protected Selector _selector;
068: private boolean _noLocal;
069: private boolean _isAutoAcknowledge;
070:
071: private volatile boolean _isClosed;
072: private Alarm _pollAlarm;
073:
074: MessageConsumerImpl(JmsSession session, AbstractQueue queue,
075: String messageSelector, boolean noLocal)
076: throws JMSException {
077: _session = session;
078: _queue = queue;
079: _messageSelector = messageSelector;
080:
081: if (_messageSelector != null) {
082: SelectorParser parser = new SelectorParser();
083: _selector = parser.parse(messageSelector);
084: }
085: _noLocal = noLocal;
086:
087: _queue.addConsumer(this );
088:
089: switch (_session.getAcknowledgeMode()) {
090: case Session.AUTO_ACKNOWLEDGE:
091: case Session.DUPS_OK_ACKNOWLEDGE:
092: _isAutoAcknowledge = true;
093: break;
094:
095: default:
096: _isAutoAcknowledge = false;
097: break;
098: }
099: }
100:
101: /**
102: * Returns the destination
103: */
104: protected AbstractDestination getDestination() throws JMSException {
105: if (_isClosed || _session.isClosed())
106: throw new javax.jms.IllegalStateException(L
107: .l("getDestination(): MessageConsumer is closed."));
108:
109: return _queue;
110: }
111:
112: /**
113: * Returns true if local messages are not sent.
114: */
115: public boolean getNoLocal() throws JMSException {
116: if (_isClosed || _session.isClosed())
117: throw new javax.jms.IllegalStateException(L
118: .l("getNoLocal(): MessageConsumer is closed."));
119:
120: return _noLocal;
121: }
122:
123: /**
124: * Returns the message listener
125: */
126: public MessageListener getMessageListener() throws JMSException {
127: if (_isClosed || _session.isClosed())
128: throw new javax.jms.IllegalStateException(L
129: .l("getNoLocal(): MessageConsumer is closed."));
130:
131: return _messageListener;
132: }
133:
134: /**
135: * Sets the message listener
136: */
137: public void setMessageListener(MessageListener listener)
138: throws JMSException {
139: setMessageListener(listener, -1);
140: }
141:
142: /**
143: * Sets the message listener with a poll interval
144: */
145: public void setMessageListener(MessageListener listener,
146: long pollInterval) throws JMSException {
147: if (_isClosed || _session.isClosed())
148: throw new javax.jms.IllegalStateException(
149: L
150: .l("setMessageListener(): MessageConsumer is closed."));
151:
152: _messageListener = listener;
153: _listenerClassLoader = Thread.currentThread()
154: .getContextClassLoader();
155: _session.setAsynchronous();
156: }
157:
158: /**
159: * Returns the message consumer's selector.
160: */
161: public String getMessageSelector() throws JMSException {
162: if (_isClosed || _session.isClosed())
163: throw new javax.jms.IllegalStateException(
164: L
165: .l("getMessageSelector(): MessageConsumer is closed."));
166:
167: return _messageSelector;
168: }
169:
170: /**
171: * Returns the parsed selector.
172: */
173: public Selector getSelector() {
174: return _selector;
175: }
176:
177: /**
178: * Returns true if active
179: */
180: public boolean isActive() throws JMSException {
181: if (_isClosed || _session.isClosed())
182: throw new javax.jms.IllegalStateException(L
183: .l("isActive(): MessageConsumer is closed."));
184:
185: return _session.isActive() && !_isClosed;
186: }
187:
188: /**
189: * Returns true if closed
190: */
191: public boolean isClosed() {
192: return _isClosed || _session.isClosed();
193: }
194:
195: /**
196: * Receives the next message, blocking until a message is available.
197: */
198: public Message receive() throws JMSException {
199: return receiveImpl(Long.MAX_VALUE / 2);
200: }
201:
202: /**
203: * Receives a message from the queue.
204: */
205: public Message receiveNoWait() throws JMSException {
206: return receiveImpl(0);
207: }
208:
209: /**
210: * Receives a message from the queue.
211: */
212: public Message receive(long timeout) throws JMSException {
213: Message msg = receiveImpl(timeout);
214:
215: if (msg != null && log.isLoggable(Level.FINE))
216: log.fine(_queue + " receive message " + msg);
217:
218: return msg;
219: }
220:
221: /**
222: * Receives a message from the queue.
223: */
224: private Message receiveImpl(long timeout) throws JMSException {
225: if (_isClosed || _session.isClosed())
226: throw new javax.jms.IllegalStateException(L
227: .l("receiveNoWait(): MessageConsumer is closed."));
228:
229: if (Long.MAX_VALUE / 2 < timeout || timeout < 0)
230: timeout = Long.MAX_VALUE / 2;
231:
232: long now = Alarm.getCurrentTime();
233: long expireTime = timeout > 0 ? now + timeout : 0;
234:
235: while (_session.isActive()) {
236: MessageImpl msg = _queue.receive(_isAutoAcknowledge);
237:
238: if (msg == null) {
239: synchronized (_consumerLock) {
240: if (expireTime <= Alarm.getCurrentTime()
241: || Alarm.isTest())
242: return null;
243:
244: try {
245: _consumerLock.wait(expireTime
246: - Alarm.getCurrentTime());
247: } catch (Exception e) {
248: }
249: }
250: }
251:
252: else if (_selector != null && !_selector.isMatch(msg)) {
253: msg.acknowledge();
254: continue;
255: }
256:
257: else {
258: if (log.isLoggable(Level.FINE))
259: log.fine(_queue + " receiving message " + msg);
260:
261: if (!_isAutoAcknowledge)
262: _session.addTransactedReceive(_queue, msg);
263:
264: return msg;
265: }
266: }
267:
268: return null;
269: }
270:
271: /**
272: * Notifies that a message is available.
273: */
274: public boolean notifyMessageAvailable() {
275: synchronized (_consumerLock) {
276: _consumerLock.notifyAll();
277: }
278:
279: return _session.notifyMessageAvailable();
280: }
281:
282: /**
283: * Called with the session's thread to handle any messages
284: */
285: boolean handleMessage(MessageListener listener) {
286: if (_messageListener != null)
287: listener = _messageListener;
288:
289: if (listener == null)
290: return false;
291:
292: MessageImpl msg = null;
293: try {
294: msg = _queue.receive(false);
295:
296: if (msg != null) {
297: if (log.isLoggable(Level.FINE)) {
298: log.fine(_queue + " deliver " + msg
299: + " to listener " + listener);
300: }
301:
302: msg.setSession(_session);
303:
304: // XXX: ejb30/bb/mdb/activationconfig/queue/selectorauto/annotated/negativeTest1
305: if (_selector == null || _selector.isMatch(msg)) {
306: _session.addTransactedReceive(_queue, msg);
307:
308: Thread thread = Thread.currentThread();
309: ClassLoader oldLoader = thread
310: .getContextClassLoader();
311: try {
312: thread
313: .setContextClassLoader(_listenerClassLoader);
314:
315: listener.onMessage(msg);
316: } finally {
317: thread.setContextClassLoader(oldLoader);
318: }
319: }
320:
321: if (_session.getTransacted())
322: _session.commit();
323: else
324: msg.acknowledge();
325:
326: return true;
327: }
328: } catch (Exception e) {
329: log
330: .log(
331: Level.WARNING,
332: L
333: .l(
334: "{0} message listener '{1}' failed for message '{2}' with exception\n{3}",
335: _queue, listener, msg, e
336: .toString()), e);
337:
338: _queue.addListenerException(e);
339: }
340:
341: return false;
342: }
343:
344: /**
345: * Stops the consumer.
346: */
347: public void stop() throws JMSException {
348: synchronized (_consumerLock) {
349: _consumerLock.notifyAll();
350: }
351: }
352:
353: /**
354: * Closes the consumer.
355: */
356: public void close() throws JMSException {
357: synchronized (this ) {
358: if (_isClosed)
359: return;
360:
361: _isClosed = true;
362: }
363:
364: _queue.removeConsumer(this );
365: _session.removeConsumer(this );
366: }
367:
368: public String toString() {
369: return "MessageConsumerImpl[" + _queue + "]";
370: }
371: }
|