001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.mq;
023:
024: import java.util.LinkedList;
025:
026: import javax.jms.Destination;
027: import javax.jms.IllegalStateException;
028: import javax.jms.InvalidSelectorException;
029: import javax.jms.JMSException;
030: import javax.jms.Message;
031: import javax.jms.MessageConsumer;
032: import javax.jms.MessageListener;
033: import javax.jms.Session;
034:
035: import org.jboss.logging.Logger;
036: import org.jboss.util.UnreachableStatementException;
037:
038: import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
039:
040: /**
041: * This class implements <tt>javax.jms.MessageConsumer</tt>.
042: *
043: * @author Norbert Lataille (Norbert.Lataille@m4x.org)
044: * @author Hiram Chirino (Cojonudo14@hotmail.com)
045: * @author David Maplesden (David.Maplesden@orion.co.nz)
046: * @author <a href="mailto:adrian@jboss.org">Adrian Brock</a>
047: * @version $Revision: 58406 $
048: */
049: public class SpyMessageConsumer implements MessageConsumer,
050: SpyConsumer, Runnable {
051: /** The log */
052: static Logger log = Logger.getLogger(SpyMessageConsumer.class);
053:
054: /** Is trace enabled */
055: static boolean trace = log.isTraceEnabled();
056:
057: /** Delivered once */
058: static final Integer ONCE = new Integer(1);
059:
060: /** Link to my session */
061: public SpySession session;
062: /** The subscription structure should be fill out by the descendent */
063: public Subscription subscription = new Subscription();
064: /** Are we closed ? */
065: private SynchronizedBoolean closed = new SynchronizedBoolean(false);
066: /** The state lock */
067: protected Object stateLock = new Object();
068: /** Are we receiving a message */
069: protected boolean receiving = false;
070: /** Are we waiting for a message */
071: protected boolean waitingForMessage = false;
072: /** Are we listening */
073: protected boolean listening = false;
074: /** The listener thread */
075: protected Thread listenerThread = null;
076: /** My message listener (null if none) */
077: MessageListener messageListener;
078: /** List of Pending messages (not yet delivered) */
079: LinkedList messages;
080: /** Is this a session consumer? */
081: boolean sessionConsumer;
082:
083: /**
084: * Create a new SpyMessageConsumer
085: *
086: * @param s the session
087: * @param sessionConsumer true for a session consumer, false otherwise
088: */
089: SpyMessageConsumer(SpySession s, boolean sessionConsumer) {
090: trace = log.isTraceEnabled();
091:
092: session = s;
093: this .sessionConsumer = sessionConsumer;
094: messageListener = null;
095: messages = new LinkedList();
096:
097: if (trace)
098: log.trace("New message consumer " + this );
099: }
100:
101: /**
102: * Create a new SpyMessageConsumer
103: *
104: * @param s the session
105: * @param sessionConsumer true for a session consumer, false otherwise
106: * @param destination the destination
107: * @param selector the selector
108: * @param noLocal true for noLocal, false otherwise
109: */
110: SpyMessageConsumer(SpySession s, boolean sessionConsumer,
111: SpyDestination destination, String selector, boolean noLocal)
112: throws InvalidSelectorException {
113: trace = log.isTraceEnabled();
114:
115: session = s;
116: this .sessionConsumer = sessionConsumer;
117: subscription.destination = destination;
118: subscription.messageSelector = selector;
119: subscription.noLocal = noLocal;
120:
121: // If the selector is set, try to build it, throws an
122: // InvalidSelectorException
123: // if it is not valid.
124: if (subscription.messageSelector != null)
125: subscription.getSelector();
126:
127: messageListener = null;
128: messages = new LinkedList();
129:
130: if (trace)
131: log.trace("New message consumer " + this );
132: }
133:
134: /**
135: * Get the subscription
136: *
137: * @return the subscription
138: */
139: public Subscription getSubscription() {
140: return subscription;
141: }
142:
143: /**
144: * Add a message
145: *
146: * @param message the message to add
147: * @throws JMSException for any error
148: */
149: public void addMessage(SpyMessage message) throws JMSException {
150: if (isClosed()) {
151: if (trace)
152: log.trace("WARNING: NACK issued. message="
153: + message.header.jmsMessageID
154: + " The message consumer was closed. " + this );
155: session.connection.send(message
156: .getAcknowledgementRequest(false));
157: return;
158: }
159:
160: //Add a message to the queue
161:
162: // Consider removing this test (subscription.accepts). I don't think it
163: // will ever fail
164: // because the test is also done by the server before message is even
165: // sent.
166: if (subscription.accepts(message.header)) {
167: if (sessionConsumer)
168: sessionConsumerProcessMessage(message);
169: else {
170: synchronized (messages) {
171: if (waitingForMessage) {
172: if (trace)
173: log.trace("Adding message="
174: + message.header.jmsMessageID + " "
175: + this );
176: messages.addLast(message);
177: messages.notifyAll();
178: } else {
179: //unwanted message (due to consumer receive timing out) Nack
180: // it.
181: if (trace)
182: log
183: .trace("WARNING: NACK issued. message="
184: + message.header.jmsMessageID
185: + " The message consumer was not waiting for a message. "
186: + this );
187: session.connection.send(message
188: .getAcknowledgementRequest(false));
189: }
190: }
191: }
192: } else {
193: if (trace)
194: log
195: .trace("WARNING: NACK issued. message="
196: + message.header.jmsMessageID
197: + " The subscription did not accept the message. "
198: + this );
199: session.connection.send(message
200: .getAcknowledgementRequest(false));
201: }
202: }
203:
204: /**
205: * Restarts the processing of the messages in case of a recovery
206: */
207: public void restartProcessing() {
208: synchronized (messages) {
209: if (trace)
210: log.trace("Restarting processing " + this );
211: messages.notifyAll();
212: }
213: }
214:
215: public void setMessageListener(MessageListener listener)
216: throws JMSException {
217: checkClosed();
218:
219: synchronized (stateLock) {
220: if (receiving)
221: throw new JMSException(
222: "Another thread is already in receive.");
223:
224: if (trace)
225: log.trace("Set message listener=" + listener
226: + " old listener=" + messageListener + " "
227: + this );
228:
229: boolean oldListening = listening;
230: listening = (listener != null);
231: messageListener = listener;
232:
233: if (!sessionConsumer && listening && !oldListening) {
234: //Start listener thread (if one is not already running)
235: if (listenerThread == null) {
236: listenerThread = new Thread(this ,
237: "MessageListenerThread - "
238: + subscription.destination
239: .getName());
240: listenerThread.start();
241: }
242: }
243: }
244: }
245:
246: public String getMessageSelector() throws JMSException {
247: checkClosed();
248: return subscription.messageSelector;
249: }
250:
251: public MessageListener getMessageListener() throws JMSException {
252: checkClosed();
253: return messageListener;
254: }
255:
256: public Message receive() throws JMSException {
257: checkClosed();
258: synchronized (stateLock) {
259: if (receiving)
260: throw new JMSException(
261: "Another thread is already in receive.");
262: if (listening)
263: throw new JMSException(
264: "A message listener is already registered");
265: receiving = true;
266:
267: if (trace)
268: log.trace("receive() " + this );
269: }
270:
271: try {
272: synchronized (messages) {
273: //see if we have any undelivered messages before we go to the JMS
274: //server to look.
275: Message message = getMessage();
276: if (message != null) {
277: if (trace)
278: log.trace("receive() message in list "
279: + message.getJMSMessageID() + " "
280: + this );
281: return message;
282: }
283:
284: // Loop through expired messages
285: while (true) {
286: SpyMessage msg = session.connection.receive(
287: subscription, 0);
288: if (msg != null) {
289: Message mes = preProcessMessage(msg);
290: if (mes != null) {
291: if (trace)
292: log
293: .trace("receive() message from server "
294: + mes.getJMSMessageID()
295: + " " + this );
296: return mes;
297: }
298: } else
299: break;
300: }
301:
302: if (trace)
303: log.trace("No message in receive(), waiting "
304: + this );
305:
306: try {
307: waitingForMessage = true;
308: while (true) {
309: if (isClosed()) {
310: if (trace)
311: log
312: .trace("Consumer closed in receive() "
313: + this );
314: return null;
315: }
316: Message mes = getMessage();
317: if (mes != null) {
318: if (trace)
319: log
320: .trace("receive() message from list after wait "
321: + this );
322: return mes;
323: }
324: messages.wait();
325: }
326: } catch (Throwable t) {
327: SpyJMSException.rethrowAsJMSException(
328: "Receive interupted", t);
329: throw new UnreachableStatementException();
330: } finally {
331: waitingForMessage = false;
332: }
333: }
334: } finally {
335: synchronized (stateLock) {
336: receiving = false;
337: }
338: }
339: }
340:
341: public Message receive(long timeOut) throws JMSException {
342: if (timeOut == 0) {
343: if (trace)
344: log
345: .trace("Timeout is zero in receive(long) using receive() "
346: + this );
347: return receive();
348: }
349:
350: checkClosed();
351: synchronized (stateLock) {
352: if (receiving)
353: throw new JMSException(
354: "Another thread is already in receive.");
355: if (listening)
356: throw new JMSException(
357: "A message listener is already registered");
358: receiving = true;
359:
360: if (trace)
361: log.trace("receive(long) " + this );
362: }
363:
364: long endTime = System.currentTimeMillis() + timeOut;
365:
366: if (trace)
367: log.trace("receive(long) endTime=" + endTime + " " + this );
368:
369: try {
370: synchronized (messages) {
371: //see if we have any undelivered messages before we go to the JMS
372: //server to look.
373: Message message = getMessage();
374: if (message != null) {
375: if (trace)
376: log.trace("receive(long) message in list "
377: + message.getJMSMessageID() + " "
378: + this );
379: return message;
380: }
381: // Loop through expired messages
382: while (true) {
383: SpyMessage msg = session.connection.receive(
384: subscription, timeOut);
385: if (msg != null) {
386: Message mes = preProcessMessage(msg);
387: if (mes != null) {
388: if (trace)
389: log
390: .trace("receive(long) message from server "
391: + mes.getJMSMessageID()
392: + " " + this );
393: return mes;
394: }
395: } else
396: break;
397: }
398:
399: if (trace)
400: log.trace("No message in receive(), waiting "
401: + this );
402:
403: try {
404: waitingForMessage = true;
405: while (true) {
406: if (isClosed()) {
407: if (trace)
408: log
409: .trace("Consumer closed in receive(long) "
410: + this );
411: return null;
412: }
413:
414: Message mes = getMessage();
415: if (mes != null) {
416: if (trace)
417: log
418: .trace("receive(long) message from list after wait "
419: + this );
420: return mes;
421: }
422:
423: long att = endTime - System.currentTimeMillis();
424: if (att <= 0) {
425: if (trace)
426: log
427: .trace("receive(long) timed out endTime="
428: + endTime + " " + this );
429: return null;
430: }
431:
432: messages.wait(att);
433: }
434: } catch (Throwable t) {
435: SpyJMSException.rethrowAsJMSException(
436: "Receive interupted", t);
437: throw new UnreachableStatementException();
438: } finally {
439: waitingForMessage = false;
440: }
441: }
442: } finally {
443: synchronized (stateLock) {
444: receiving = false;
445: }
446: }
447: }
448:
449: public Message receiveNoWait() throws JMSException {
450: checkClosed();
451: synchronized (stateLock) {
452: if (receiving)
453: throw new JMSException(
454: "Another thread is already in receive.");
455: if (listening)
456: throw new JMSException(
457: "A message listener is already registered");
458: receiving = true;
459:
460: if (trace)
461: log.trace("receiveNoWait() " + this );
462: }
463:
464: try {
465: //see if we have any undelivered messages before we go to the JMS
466: //server to look.
467: synchronized (messages) {
468: Message mes = getMessage();
469: if (mes != null) {
470: if (trace)
471: log.trace("receiveNoWait() message in list "
472: + mes.getJMSMessageID() + " " + this );
473: return mes;
474: }
475: }
476: // Loop through expired messages
477: while (true) {
478: SpyMessage msg = session.connection.receive(
479: subscription, -1);
480: if (msg != null) {
481: Message mes = preProcessMessage(msg);
482: if (mes != null) {
483: if (trace)
484: log
485: .trace("receiveNoWait() message from server "
486: + mes.getJMSMessageID()
487: + " " + this );
488: return mes;
489: }
490: } else {
491: if (trace)
492: log.trace("receiveNoWait() no message " + this );
493: return null;
494: }
495: }
496: } finally {
497: synchronized (stateLock) {
498: receiving = false;
499: }
500: }
501: }
502:
503: public void close() throws JMSException {
504: synchronized (messages) {
505: if (closed.set(true))
506: return;
507:
508: if (trace)
509: log.trace("Message consumer closing. " + this );
510: messages.notifyAll();
511: }
512:
513: // Notification to break out of delivery lock loop
514: session.interruptDeliveryLockWaiters();
515:
516: if (listenerThread != null
517: && !Thread.currentThread().equals(listenerThread)) {
518: try {
519: if (trace)
520: log.trace("Joining listener thread. " + this );
521: listenerThread.join();
522: } catch (InterruptedException e) {
523: }
524: }
525:
526: if (!sessionConsumer) {
527: session.removeConsumer(this );
528: }
529:
530: if (trace)
531: log.trace("Closed. " + this );
532: }
533:
534: public void run() {
535: SpyMessage mes = null;
536: try {
537: outer: while (true) {
538: //get Message
539: while (mes == null) {
540: synchronized (messages) {
541: if (isClosed()) {
542: waitingForMessage = false;
543: if (trace)
544: log.trace("Consumer closed in run() "
545: + this );
546: break outer;
547: }
548: if (messages.isEmpty())
549: mes = session.connection.receive(
550: subscription, 0);
551: if (mes == null) {
552: waitingForMessage = true;
553: if (trace)
554: log.trace("waiting in run() " + this );
555: while ((messages.isEmpty() && isClosed() == false)
556: || (!session.running)) {
557: try {
558: messages.wait();
559: } catch (InterruptedException e) {
560: log
561: .trace("Ignored interruption waiting for messages");
562: }
563: }
564: if (isClosed()) {
565: waitingForMessage = false;
566: if (trace)
567: log
568: .trace("Consumer closed while waiting in run() "
569: + this );
570: break outer;
571: }
572: mes = (SpyMessage) messages.removeFirst();
573: waitingForMessage = false;
574: } else {
575: if (trace)
576: log
577: .trace("run() message from server mes="
578: + mes.getJMSMessageID()
579: + " " + this );
580: }
581: }
582: mes.session = session;
583: }
584:
585: MessageListener this Listener;
586: synchronized (stateLock) {
587: if (!isListening()) {
588: //send NACK cause we have closed listener
589: if (mes != null) {
590: if (trace)
591: log
592: .trace("run() nacking not listening message mes="
593: + mes.getJMSMessageID()
594: + " " + this );
595: session.connection.send(mes
596: .getAcknowledgementRequest(false));
597: }
598: //this thread is about to die, so we will need a new one if
599: // a new listener is added
600: listenerThread = null;
601: mes = null;
602: break;
603: }
604: this Listener = messageListener;
605: }
606: Message message = mes;
607: if (mes instanceof SpyEncapsulatedMessage)
608: message = ((SpyEncapsulatedMessage) mes)
609: .getMessage();
610:
611: // Try to obtain the session delivery lock
612: // This avoids concurrent delivery to message listeners in the same session as per spec
613: boolean gotDeliveryLock = false;
614: while (gotDeliveryLock == false) {
615: gotDeliveryLock = session.tryDeliveryLock();
616: // We didn't get the lock, check whether we are closing
617: if (gotDeliveryLock == false) {
618: synchronized (messages) {
619: if (isClosed())
620: break;
621: }
622: }
623: }
624: if (gotDeliveryLock == false) {
625: if (trace)
626: log
627: .trace("run() nacking didn't get delivery lock mes="
628: + mes.getJMSMessageID()
629: + " "
630: + this );
631: session.connection.send(mes
632: .getAcknowledgementRequest(false));
633: } else {
634: //Handle runtime exceptions. These are handled as per the spec if
635: // you assume
636: //the number of times erroneous messages are redelivered in
637: // auto_acknowledge mode
638: //is 0. :)
639: try {
640: if (session.transacted) {
641: // REVIEW: for an XASession without a transaction this will ack the message
642: // before it has been processed. Plain message listeners
643: // are not supported in a j2ee environment, but what if somebody is trying
644: // to be clever?
645: if (trace)
646: log
647: .trace("run() acknowledging message in tx mes="
648: + mes.getJMSMessageID()
649: + " " + this );
650: session.connection.spyXAResourceManager
651: .ackMessage(session
652: .getCurrentTransactionId(),
653: mes);
654: }
655:
656: try {
657: prepareDelivery((SpyMessage) message);
658: session
659: .addUnacknowlegedMessage((SpyMessage) message);
660: this Listener.onMessage(message);
661: } catch (Throwable t) {
662: log.warn("Message listener " + this Listener
663: + " threw a throwable.", t);
664: }
665: } finally {
666: session.releaseDeliveryLock();
667: }
668:
669: if (!session.transacted
670: && (session.acknowledgeMode == Session.AUTO_ACKNOWLEDGE || session.acknowledgeMode == Session.DUPS_OK_ACKNOWLEDGE)) {
671: // Only acknowledge the message if the message wasn't recovered
672: boolean recovered;
673: synchronized (messages) {
674: recovered = messages.contains(message);
675: }
676: if (recovered == false)
677: mes.doAcknowledge();
678: }
679: mes = null;
680: }
681: }
682: } catch (Throwable t) {
683: log
684: .warn(
685: "Message consumer closing due to error in listening thread.",
686: t);
687: try {
688: close();
689: } catch (Throwable ignore) {
690: }
691: session
692: .asynchFailure(
693: "Message consumer closing due to error in listening thread.",
694: t);
695: }
696: }
697:
698: public String toString() {
699: StringBuffer buffer = new StringBuffer(100);
700: buffer.append("SpyMessageConsumer@").append(
701: System.identityHashCode(this ));
702: buffer.append("[sub=").append(subscription);
703: if (isClosed())
704: buffer.append(" CLOSED");
705: buffer.append(" listening=").append(listening);
706: buffer.append(" receiving=").append(receiving);
707: buffer.append(" sessionConsumer=").append(sessionConsumer);
708: buffer.append(" waitingForMessage=").append(waitingForMessage);
709: buffer.append(" messages=").append(messages.size());
710: if (listenerThread != null)
711: buffer.append(" thread=").append(listenerThread);
712: if (messageListener != null)
713: buffer.append(" listener=").append(messageListener);
714: buffer.append(" session=").append(session);
715: buffer.append(']');
716: return buffer.toString();
717: }
718:
719: Message getMessage() {
720: synchronized (messages) {
721: if (trace)
722: log.trace("Getting message from list " + this );
723: while (true) {
724: try {
725: if (messages.size() == 0)
726: return null;
727:
728: SpyMessage mes = (SpyMessage) messages
729: .removeFirst();
730:
731: Message rc = preProcessMessage(mes);
732: // could happen if the message has expired.
733: if (rc == null)
734: continue;
735:
736: return rc;
737: } catch (Throwable t) {
738: log.error("Ignoring error", t);
739: }
740: }
741: }
742: }
743:
744: Message preProcessMessage(SpyMessage message) throws JMSException {
745: message.session = session;
746: session.addUnacknowlegedMessage(message);
747:
748: prepareDelivery(message);
749:
750: // Should we try to ack before the message is processed?
751: if (!isListening()) {
752: if (session.transacted) {
753: if (trace)
754: log
755: .trace("preprocess() acking message in tx message="
756: + message.getJMSMessageID()
757: + " "
758: + this );
759: session.connection.spyXAResourceManager.ackMessage(
760: session.getCurrentTransactionId(), message);
761: } else if (session.acknowledgeMode == Session.AUTO_ACKNOWLEDGE
762: || session.acknowledgeMode == Session.DUPS_OK_ACKNOWLEDGE) {
763: message.doAcknowledge();
764: }
765:
766: if (message instanceof SpyEncapsulatedMessage) {
767: return ((SpyEncapsulatedMessage) message).getMessage();
768: }
769: return message;
770: } else {
771: return message;
772: }
773: }
774:
775: /**
776: * Prepare the message for delivery
777: *
778: * @param message the message
779: * @throws JMSException for any error
780: */
781: void prepareDelivery(SpyMessage message) throws JMSException {
782: Integer delivery = ONCE;
783: Integer redelivery = (Integer) message.header.jmsProperties
784: .get(SpyMessage.PROPERTY_REDELIVERY_COUNT);
785: if (redelivery != null) {
786: int value = redelivery.intValue();
787: if (value != 0)
788: delivery = new Integer(value + 1);
789: }
790: message.header.jmsProperties.put(
791: SpyMessage.PROPERTY_DELIVERY_COUNT, delivery);
792: }
793:
794: protected Destination getDestination() throws JMSException {
795: checkClosed();
796: return subscription.destination;
797: }
798:
799: protected boolean getNoLocal() throws JMSException {
800: checkClosed();
801: return subscription.noLocal;
802: }
803:
804: /**
805: * Are we listening
806: *
807: * @return true when listening, false otherwise
808: */
809: protected boolean isListening() {
810: synchronized (stateLock) {
811: return listening;
812: }
813: }
814:
815: protected void sessionConsumerProcessMessage(SpyMessage message)
816: throws JMSException {
817: message.session = session;
818: //simply pass on to messageListener (if there is one)
819: MessageListener this Listener;
820: synchronized (stateLock) {
821: this Listener = messageListener;
822: }
823:
824: // Add the message to XAResource manager before we call onMessages since
825: // the
826: // resource may get elisted IN the onMessage method.
827: // This gives onMessage a chance to roll the message back.
828: Object anonymousTXID = null;
829: if (session.transacted) {
830: // Only happens with XA transactions
831: if (session.getCurrentTransactionId() == null) {
832: anonymousTXID = session.connection.spyXAResourceManager
833: .startTx();
834: session.setCurrentTransactionId(anonymousTXID);
835: }
836: if (trace)
837: log.trace("consumer() acking message in tx message="
838: + message.getJMSMessageID() + " " + this );
839: session.connection.spyXAResourceManager.ackMessage(session
840: .getCurrentTransactionId(), message);
841: }
842:
843: if (this Listener != null) {
844: Message mes = message;
845: if (message instanceof SpyEncapsulatedMessage) {
846: mes = ((SpyEncapsulatedMessage) message).getMessage();
847: }
848: session.addUnacknowlegedMessage((SpyMessage) mes);
849: if (trace)
850: log.trace("consumer() before onMessage="
851: + message.getJMSMessageID() + " " + this );
852: this Listener.onMessage(mes);
853: if (trace)
854: log.trace("consumer() after onMessage="
855: + message.getJMSMessageID() + " " + this );
856: }
857:
858: if (session.transacted) {
859: // If we started an anonymous tx
860: if (anonymousTXID != null) {
861: if (session.getCurrentTransactionId() == anonymousTXID) {
862: // We never got enlisted, so just commit the transaction
863: try {
864: if (trace)
865: log
866: .trace("XASession was not enlisted - Committing work using anonymous xid: "
867: + anonymousTXID);
868: session.connection.spyXAResourceManager.endTx(
869: anonymousTXID, true);
870: session.connection.spyXAResourceManager.commit(
871: anonymousTXID, true);
872: } catch (Throwable t) {
873: log.error("Could not commit", t);
874: } finally {
875: session
876: .unsetCurrentTransactionId(anonymousTXID);
877: }
878: }
879: }
880: } else {
881: // Should we Auto-ack the message since the message has now been
882: // processesed
883: if (session.acknowledgeMode == Session.AUTO_ACKNOWLEDGE
884: || session.acknowledgeMode == Session.DUPS_OK_ACKNOWLEDGE) {
885: message.doAcknowledge();
886: }
887: }
888: }
889:
890: /**
891: * Check whether we are closed
892: *
893: * @return true when closed
894: */
895: private boolean isClosed() {
896: return closed.get();
897: }
898:
899: /**
900: * Check whether we are closed
901: *
902: * @throws IllegalStateException when the session is closed
903: */
904: private void checkClosed() throws IllegalStateException {
905: if (closed.get())
906: throw new IllegalStateException("The consumer is closed");
907: }
908: }
|