0001: /*
0002: * JBoss, Home of Professional Open Source.
0003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
0004: * as indicated by the @author tags. See the copyright.txt file in the
0005: * distribution for a full listing of individual contributors.
0006: *
0007: * This is free software; you can redistribute it and/or modify it
0008: * under the terms of the GNU Lesser General Public License as
0009: * published by the Free Software Foundation; either version 2.1 of
0010: * the License, or (at your option) any later version.
0011: *
0012: * This software is distributed in the hope that it will be useful,
0013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
0014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
0015: * Lesser General Public License for more details.
0016: *
0017: * You should have received a copy of the GNU Lesser General Public
0018: * License along with this software; if not, write to the Free
0019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
0020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
0021: */
0022: package org.jboss.mq;
0023:
0024: import java.io.Serializable;
0025: import java.util.ArrayList;
0026: import java.util.HashSet;
0027: import java.util.Iterator;
0028: import java.util.LinkedList;
0029:
0030: import javax.jms.BytesMessage;
0031: import javax.jms.Destination;
0032: import javax.jms.IllegalStateException;
0033: import javax.jms.InvalidDestinationException;
0034: import javax.jms.JMSException;
0035: import javax.jms.JMSSecurityException;
0036: import javax.jms.MapMessage;
0037: import javax.jms.Message;
0038: import javax.jms.MessageConsumer;
0039: import javax.jms.MessageListener;
0040: import javax.jms.MessageProducer;
0041: import javax.jms.ObjectMessage;
0042: import javax.jms.Queue;
0043: import javax.jms.QueueBrowser;
0044: import javax.jms.QueueReceiver;
0045: import javax.jms.QueueSender;
0046: import javax.jms.Session;
0047: import javax.jms.StreamMessage;
0048: import javax.jms.TemporaryQueue;
0049: import javax.jms.TemporaryTopic;
0050: import javax.jms.TextMessage;
0051: import javax.jms.Topic;
0052: import javax.jms.TopicPublisher;
0053: import javax.jms.TopicSubscriber;
0054: import javax.jms.XASession;
0055: import javax.transaction.xa.XAResource;
0056:
0057: import org.jboss.logging.Logger;
0058:
0059: import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
0060:
0061: /**
0062: * This class implements javax.jms.Session and javax.jms.XASession
0063: *
0064: * @author Norbert Lataille (Norbert.Lataille@m4x.org)
0065: * @author Hiram Chirino (Cojonudo14@hotmail.com) @created August 16, 2001
0066: * @version $Revision: 57198 $
0067: */
0068: public class SpySession implements Session, XASession {
0069: /** The log */
0070: static Logger log = Logger.getLogger(SpySession.class);
0071:
0072: /** Whether trace is enabled */
0073: static boolean trace = log.isTraceEnabled();
0074:
0075: /** The connection object to which this session is linked */
0076: public Connection connection;
0077:
0078: /** Is this session running right now? */
0079: public boolean running;
0080: /** Is this session transacted ? */
0081: protected boolean transacted;
0082: /** What is the type of acknowledgement ? */
0083: protected int acknowledgeMode;
0084: /** MessageConsumers created by this session */
0085: protected HashSet consumers;
0086: /** MessageProducers created by this session */
0087: protected HashSet producers;
0088: /** The delivery lock */
0089: protected Object deliveryLock = new Object();
0090: /** Whether we are doing asynchronous delivery */
0091: protected boolean inDelivery = false;
0092:
0093: /**
0094: * This consumer is the consumer that receives messages for the
0095: * MessageListener assigned to the session. The SpyConnectionConsumer
0096: * delivers messages to
0097: */
0098: SpyMessageConsumer sessionConsumer;
0099:
0100: /** Is the session closed ? */
0101: SynchronizedBoolean closed = new SynchronizedBoolean(false);
0102:
0103: /** Used to lock the run() method */
0104: Object runLock = new Object();
0105:
0106: /**
0107: * The transctionId of the current transaction (registed with the
0108: * SpyXAResourceManager).
0109: */
0110: private Object currentTransactionId;
0111:
0112: /** If this is an XASession, we have an associated XAResource */
0113: SpyXAResource spyXAResource;
0114:
0115: /** Optional Connection consumer methods */
0116: LinkedList messages = new LinkedList();
0117:
0118: /** keep track of unacknowledged messages */
0119: ArrayList unacknowledgedMessages = new ArrayList();
0120:
0121: /**
0122: * Create a new SpySession
0123: *
0124: * @param conn the connection
0125: * @param trans is the session transacted
0126: * @param acknowledge the acknowledgement mode
0127: * @param xaSession is the session an xa session
0128: */
0129: SpySession(Connection conn, boolean trans, int acknowledge,
0130: boolean xaSession) {
0131: trace = log.isTraceEnabled();
0132:
0133: connection = conn;
0134: transacted = trans;
0135: acknowledgeMode = acknowledge;
0136: if (xaSession)
0137: spyXAResource = new SpyXAResource(this );
0138:
0139: running = true;
0140: consumers = new HashSet();
0141: producers = new HashSet();
0142:
0143: //Have a TX ready with the resource manager.
0144: if (spyXAResource == null && transacted)
0145: currentTransactionId = connection.spyXAResourceManager
0146: .startTx();
0147:
0148: if (trace)
0149: log.trace("New session " + this );
0150: }
0151:
0152: /**
0153: * JMS 11.2.21.2 Note that the acknowledge method of Message acknowledges
0154: * all messages received on that messages session.
0155: *
0156: * JMS 11.3.2.2.3 Message.acknowledge method: Clarify that the method
0157: * applies to all consumed messages of the session. Rationale for this
0158: * change: A possible misinterpretation of the existing Java API
0159: * documentation for Message.acknowledge assumed that only messages received
0160: * prior to this message should be acknowledged. The updated Java API
0161: * documentation statement emphasizes that message acknowledgement is really
0162: * a session-level activity and that this message is only being used to
0163: * identify the session in order to acknowledge all messages consumed by the
0164: * session. The acknowledge method was placed in the message object only to
0165: * enable easy access to acknowledgement capability within a message
0166: * listeners onMessage method. This change aligns the specification and Java
0167: * API documentation to define Message.acknowledge in the same manner.
0168: *
0169: * @param message the message to acknowledge
0170: * @param ack the acknowledgement request
0171: * @throws JMSException for any error
0172: */
0173: public void doAcknowledge(Message message,
0174: AcknowledgementRequest ack) throws JMSException {
0175: checkClosed();
0176: //if we are acking, ack all messages consumed by this session
0177: if (ack.isAck()) {
0178: synchronized (unacknowledgedMessages) {
0179: if (trace)
0180: log.trace("Acknowledging message " + ack);
0181:
0182: //ack the current message
0183: connection.send(((SpyMessage) message)
0184: .getAcknowledgementRequest(true));
0185: unacknowledgedMessages.remove(message);
0186:
0187: //ack the other messages consumed in this session
0188: Iterator i = unacknowledgedMessages.iterator();
0189: while (i.hasNext()) {
0190: Message mess = (Message) i.next();
0191: i.remove();
0192: connection.send(((SpyMessage) mess)
0193: .getAcknowledgementRequest(true));
0194: }
0195: }
0196: }
0197: //if we are nacking, only nack the one message
0198: else {
0199: if (trace)
0200: log.trace("Nacking message "
0201: + message.getJMSMessageID());
0202:
0203: //nack the current message
0204: unacknowledgedMessages.remove(message);
0205: connection.send(ack);
0206: }
0207: }
0208:
0209: /**
0210: * Retrieve the XA resource manager
0211: *
0212: * @return the resource manager
0213: */
0214: public SpyXAResourceManager getXAResourceManager() {
0215: return connection.spyXAResourceManager;
0216: }
0217:
0218: public void setMessageListener(MessageListener listener)
0219: throws JMSException {
0220: checkClosed();
0221:
0222: if (trace)
0223: log.trace("Set message listener " + listener + " " + this );
0224:
0225: sessionConsumer = new SpyMessageConsumer(this , true);
0226: sessionConsumer.setMessageListener(listener);
0227: }
0228:
0229: public boolean getTransacted() throws JMSException {
0230: checkClosed();
0231: return transacted;
0232: }
0233:
0234: public MessageListener getMessageListener() throws JMSException {
0235: checkClosed();
0236: if (sessionConsumer == null)
0237: return null;
0238:
0239: return sessionConsumer.getMessageListener();
0240: }
0241:
0242: public BytesMessage createBytesMessage() throws JMSException {
0243: checkClosed();
0244: SpyBytesMessage message = MessagePool.getBytesMessage();
0245: message.header.producerClientId = connection.getClientID();
0246: return message;
0247: }
0248:
0249: public MapMessage createMapMessage() throws JMSException {
0250: checkClosed();
0251: SpyMapMessage message = MessagePool.getMapMessage();
0252: message.header.producerClientId = connection.getClientID();
0253: return message;
0254: }
0255:
0256: public Message createMessage() throws JMSException {
0257: checkClosed();
0258: SpyMessage message = MessagePool.getMessage();
0259: message.header.producerClientId = connection.getClientID();
0260: return message;
0261: }
0262:
0263: public ObjectMessage createObjectMessage() throws JMSException {
0264: checkClosed();
0265: SpyObjectMessage message = MessagePool.getObjectMessage();
0266: message.header.producerClientId = connection.getClientID();
0267: return message;
0268: }
0269:
0270: public ObjectMessage createObjectMessage(Serializable object)
0271: throws JMSException {
0272: checkClosed();
0273: SpyObjectMessage message = MessagePool.getObjectMessage();
0274: message.setObject(object);
0275: message.header.producerClientId = connection.getClientID();
0276: return message;
0277: }
0278:
0279: public StreamMessage createStreamMessage() throws JMSException {
0280: checkClosed();
0281: SpyStreamMessage message = MessagePool.getStreamMessage();
0282: message.header.producerClientId = connection.getClientID();
0283: return message;
0284: }
0285:
0286: public TextMessage createTextMessage() throws JMSException {
0287: checkClosed();
0288: SpyTextMessage message = MessagePool.getTextMessage();
0289: message.header.producerClientId = connection.getClientID();
0290: return message;
0291: }
0292:
0293: // Delivers messages queued by ConnectionConsumer to the message listener
0294: public void run() {
0295: synchronized (messages) {
0296: if (trace)
0297: log.trace("Run messages=" + messages.size() + " "
0298: + this );
0299: while (messages.size() > 0) {
0300: SpyMessage message = (SpyMessage) messages
0301: .removeFirst();
0302: try {
0303: if (sessionConsumer == null) {
0304: log
0305: .warn("Session has no message listener set, cannot process message. "
0306: + this );
0307: //Nack message
0308: connection.send(message
0309: .getAcknowledgementRequest(false));
0310: } else {
0311: sessionConsumer.addMessage(message);
0312: }
0313: } catch (Throwable ignore) {
0314: if (trace)
0315: log.trace(
0316: "Ignored error from session consumer",
0317: ignore);
0318: }
0319: }
0320: }
0321: }
0322:
0323: public void close() throws JMSException {
0324: if (closed.set(true))
0325: return;
0326:
0327: if (trace)
0328: log.trace("Session closing " + this );
0329:
0330: JMSException exception = null;
0331:
0332: if (trace)
0333: log.trace("Closing consumers " + this );
0334:
0335: Iterator i;
0336: synchronized (consumers) {
0337: //notify the sleeping synchronous listeners
0338: if (sessionConsumer != null) {
0339: try {
0340: sessionConsumer.close();
0341: } catch (Throwable t) {
0342: log.trace("Error closing session consumer", t);
0343: }
0344: }
0345:
0346: i = new ArrayList(consumers).iterator();
0347: }
0348:
0349: while (i.hasNext()) {
0350: SpyMessageConsumer messageConsumer = (SpyMessageConsumer) i
0351: .next();
0352: try {
0353: messageConsumer.close();
0354: } catch (Throwable t) {
0355: log.trace("Error closing message consumer", t);
0356: }
0357: }
0358:
0359: synchronized (producers) {
0360: i = new ArrayList(producers).iterator();
0361: }
0362:
0363: while (i.hasNext()) {
0364: SpyMessageProducer messageProducer = (SpyMessageProducer) i
0365: .next();
0366: try {
0367: messageProducer.close();
0368: } catch (InvalidDestinationException ignored) {
0369: log.warn(ignored.getMessage(), ignored);
0370: } catch (Throwable t) {
0371: log.trace("Error closing message producer", t);
0372: }
0373: }
0374:
0375: if (trace)
0376: log.trace("Close handling unacknowledged messages " + this );
0377: try {
0378: if (spyXAResource == null) {
0379: if (transacted)
0380: internalRollback();
0381: else {
0382: i = unacknowledgedMessages.iterator();
0383: while (i.hasNext()) {
0384: SpyMessage message = (SpyMessage) i.next();
0385: connection.send(message
0386: .getAcknowledgementRequest(false));
0387: i.remove();
0388: }
0389: }
0390: }
0391: } catch (Throwable t) {
0392: if (exception == null)
0393: exception = SpyJMSException.getAsJMSException(
0394: "Error nacking message", t);
0395: }
0396:
0397: if (trace)
0398: log.trace("Informing connection of close " + this );
0399: connection.sessionClosing(this );
0400:
0401: // Throw the first exception
0402: if (exception != null)
0403: throw exception;
0404: }
0405:
0406: //Commit a transacted session
0407: public void commit() throws JMSException {
0408: checkClosed();
0409: trace = log.isTraceEnabled();
0410:
0411: //Don't deliver any more messages while commiting
0412: synchronized (runLock) {
0413: if (spyXAResource != null)
0414: throw new javax.jms.TransactionInProgressException(
0415: "Should not be call from a XASession");
0416: if (!transacted)
0417: throw new IllegalStateException(
0418: "The session is not transacted");
0419:
0420: if (trace)
0421: log.trace("Committing transaction " + this );
0422: try {
0423: connection.spyXAResourceManager.endTx(
0424: currentTransactionId, true);
0425: connection.spyXAResourceManager.commit(
0426: currentTransactionId, true);
0427: } catch (Throwable t) {
0428: SpyJMSException.rethrowAsJMSException(
0429: "Could not commit", t);
0430: } finally {
0431: unacknowledgedMessages.clear();
0432: try {
0433: currentTransactionId = connection.spyXAResourceManager
0434: .startTx();
0435:
0436: if (trace)
0437: log.trace("Current transaction id: "
0438: + currentTransactionId + " " + this );
0439: } catch (Throwable ignore) {
0440: if (trace)
0441: log.trace("Failed to start tx " + this , ignore);
0442: }
0443: }
0444: }
0445: }
0446:
0447: public void rollback() throws JMSException {
0448: checkClosed();
0449: trace = log.isTraceEnabled();
0450:
0451: synchronized (runLock) {
0452: internalRollback();
0453: }
0454: }
0455:
0456: public void recover() throws JMSException {
0457: checkClosed();
0458: boolean stopped = connection.modeStop;
0459:
0460: synchronized (runLock) {
0461: if (currentTransactionId != null)
0462: throw new IllegalStateException(
0463: "The session is transacted");
0464:
0465: if (trace)
0466: log.trace("Session recovery stopping delivery " + this );
0467: try {
0468: connection.stop();
0469: running = false;
0470: } catch (Throwable t) {
0471: SpyJMSException.rethrowAsJMSException(
0472: "Could not stop message delivery", t);
0473: }
0474:
0475: // Loop over all consumers, check their unacknowledged messages, set
0476: // then as redelivered and add back to the list of messages
0477: try {
0478: synchronized (messages) {
0479: if (stopped == false) {
0480: if (trace)
0481: log
0482: .trace("Recovering: unacknowledged messages="
0483: + unacknowledgedMessages
0484: + " " + this );
0485: Iterator i = consumers.iterator();
0486: while (i.hasNext()) {
0487: SpyMessageConsumer consumer = (SpyMessageConsumer) i
0488: .next();
0489:
0490: Iterator ii = unacknowledgedMessages
0491: .iterator();
0492: while (ii.hasNext()) {
0493: SpyMessage message = (SpyMessage) ii
0494: .next();
0495:
0496: if (consumer.getSubscription().accepts(
0497: message.header)) {
0498: message.setJMSRedelivered(true);
0499: consumer.messages.addLast(message);
0500: ii.remove();
0501: if (trace)
0502: log.trace("Recovered: message="
0503: + message
0504: + " consumer="
0505: + consumer);
0506: }
0507: }
0508: }
0509: }
0510:
0511: // We no longer have consumers for the remaining messages
0512: Iterator i = unacknowledgedMessages.iterator();
0513: while (i.hasNext()) {
0514: SpyMessage message = (SpyMessage) i.next();
0515: connection.send(message
0516: .getAcknowledgementRequest(false));
0517: i.remove();
0518: if (trace)
0519: log
0520: .trace("Recovered: nacked with no consumer message="
0521: + message + " " + this );
0522: }
0523: }
0524: } catch (Throwable t) {
0525: SpyJMSException.rethrowAsJMSException(
0526: "Unable to recover session ", t);
0527: }
0528: // Restart the delivery sequence including all unacknowledged messages
0529: // that had
0530: // been previously delivered. Redelivered messages do not have to be
0531: // delivered
0532: // in exactly their original delivery order.
0533:
0534: if (stopped == false) {
0535: if (trace)
0536: log.trace("Recovery restarting message delivery "
0537: + this );
0538: try {
0539: running = true;
0540: connection.start();
0541:
0542: Iterator i = consumers.iterator();
0543: while (i.hasNext())
0544: ((SpyMessageConsumer) i.next())
0545: .restartProcessing();
0546: } catch (Throwable t) {
0547: SpyJMSException.rethrowAsJMSException(
0548: "Could not resume message delivery", t);
0549: }
0550: }
0551: }
0552: }
0553:
0554: public TextMessage createTextMessage(String string)
0555: throws JMSException {
0556: checkClosed();
0557: SpyTextMessage message = new SpyTextMessage();
0558: message.setText(string);
0559: message.header.producerClientId = connection.getClientID();
0560: return message;
0561: }
0562:
0563: public int getAcknowledgeMode() throws JMSException {
0564: return acknowledgeMode;
0565: }
0566:
0567: public MessageConsumer createConsumer(Destination destination)
0568: throws JMSException {
0569: return createConsumer(destination, null, false);
0570: }
0571:
0572: public MessageConsumer createConsumer(Destination destination,
0573: String messageSelector) throws JMSException {
0574: return createConsumer(destination, messageSelector, false);
0575: }
0576:
0577: public MessageConsumer createConsumer(Destination destination,
0578: String messageSelector, boolean noLocal)
0579: throws JMSException {
0580: if (destination instanceof Topic)
0581: return createSubscriber((Topic) destination,
0582: messageSelector, noLocal);
0583: else
0584: return createReceiver((Queue) destination, messageSelector);
0585: }
0586:
0587: public MessageProducer createProducer(Destination destination)
0588: throws JMSException {
0589: if (destination instanceof Topic)
0590: return createPublisher((Topic) destination);
0591: else
0592: return createSender((Queue) destination);
0593: }
0594:
0595: public QueueBrowser createBrowser(Queue queue) throws JMSException {
0596: return createBrowser(queue, null);
0597: }
0598:
0599: public QueueBrowser createBrowser(Queue queue,
0600: String messageSelector) throws JMSException {
0601: checkClosed();
0602: if (this instanceof SpyTopicSession)
0603: throw new IllegalStateException(
0604: "Not allowed for a TopicSession");
0605: if (queue == null)
0606: throw new InvalidDestinationException(
0607: "Cannot browse a null queue.");
0608: return new SpyQueueBrowser(this , queue, messageSelector);
0609: }
0610:
0611: public QueueReceiver createReceiver(Queue queue)
0612: throws JMSException {
0613: return createReceiver(queue, null);
0614: }
0615:
0616: public QueueReceiver createReceiver(Queue queue,
0617: String messageSelector) throws JMSException {
0618: checkClosed();
0619: if (queue == null)
0620: throw new InvalidDestinationException(
0621: "Queue cannot be null.");
0622:
0623: connection.checkTemporary(queue);
0624: SpyQueueReceiver receiver = new SpyQueueReceiver(this , queue,
0625: messageSelector);
0626: addConsumer(receiver);
0627:
0628: return receiver;
0629: }
0630:
0631: public QueueSender createSender(Queue queue) throws JMSException {
0632: checkClosed();
0633: SpyQueueSender producer = new SpyQueueSender(this , queue);
0634: addProducer(producer);
0635: return producer;
0636: }
0637:
0638: public TopicSubscriber createDurableSubscriber(Topic topic,
0639: String name) throws JMSException {
0640: return createDurableSubscriber(topic, name, null, false);
0641: }
0642:
0643: public TopicSubscriber createDurableSubscriber(Topic topic,
0644: String name, String messageSelector, boolean noLocal)
0645: throws JMSException {
0646: checkClosed();
0647: if (this instanceof SpyQueueSession)
0648: throw new IllegalStateException(
0649: "Not allowed for a QueueSession");
0650: if (topic == null)
0651: throw new InvalidDestinationException(
0652: "Topic cannot be null");
0653: if (topic instanceof TemporaryTopic)
0654: throw new InvalidDestinationException(
0655: "Attempt to create a durable subscription for a temporary topic");
0656:
0657: if (name == null || name.trim().length() == 0)
0658: throw new JMSException("Null or empty subscription");
0659:
0660: SpyTopic t = new SpyTopic((SpyTopic) topic, connection
0661: .getClientID(), name, messageSelector);
0662: SpyTopicSubscriber sub = new SpyTopicSubscriber(this , t,
0663: noLocal, messageSelector);
0664: addConsumer(sub);
0665:
0666: return sub;
0667: }
0668:
0669: public TopicSubscriber createSubscriber(Topic topic)
0670: throws JMSException {
0671: return createSubscriber(topic, null, false);
0672: }
0673:
0674: public TopicSubscriber createSubscriber(Topic topic,
0675: String messageSelector, boolean noLocal)
0676: throws JMSException {
0677: checkClosed();
0678: if (topic == null)
0679: throw new InvalidDestinationException(
0680: "Topic cannot be null");
0681:
0682: connection.checkTemporary(topic);
0683: SpyTopicSubscriber sub = new SpyTopicSubscriber(this ,
0684: (SpyTopic) topic, noLocal, messageSelector);
0685: addConsumer(sub);
0686:
0687: return sub;
0688: }
0689:
0690: public TopicPublisher createPublisher(Topic topic)
0691: throws JMSException {
0692: checkClosed();
0693: SpyTopicPublisher producer = new SpyTopicPublisher(this , topic);
0694: addProducer(producer);
0695: return producer;
0696: }
0697:
0698: public Queue createQueue(String queueName) throws JMSException {
0699: checkClosed();
0700: if (this instanceof SpyTopicSession)
0701: throw new IllegalStateException(
0702: "Not allowed for a TopicSession");
0703: if (queueName == null)
0704: throw new InvalidDestinationException(
0705: "Queue name cannot be null.");
0706: return ((SpyConnection) connection).createQueue(queueName);
0707: }
0708:
0709: public Topic createTopic(String topicName) throws JMSException {
0710: checkClosed();
0711: if (this instanceof SpyQueueSession)
0712: throw new IllegalStateException(
0713: "Not allowed for a QueueSession");
0714: if (topicName == null)
0715: throw new InvalidDestinationException(
0716: "The topic name cannot be null");
0717:
0718: return ((SpyConnection) connection).createTopic(topicName);
0719: }
0720:
0721: public TemporaryQueue createTemporaryQueue() throws JMSException {
0722: checkClosed();
0723: if (this instanceof SpyTopicSession)
0724: throw new IllegalStateException(
0725: "Not allowed for a TopicSession");
0726:
0727: return ((SpyConnection) connection).getTemporaryQueue();
0728: }
0729:
0730: public TemporaryTopic createTemporaryTopic() throws JMSException {
0731: checkClosed();
0732: if (this instanceof SpyQueueSession)
0733: throw new IllegalStateException(
0734: "Not allowed for a QueueSession");
0735: return ((SpyConnection) connection).getTemporaryTopic();
0736: }
0737:
0738: public void unsubscribe(String name) throws JMSException {
0739: checkClosed();
0740: if (this instanceof SpyQueueSession)
0741: throw new IllegalStateException(
0742: "Not allowed for a QueueSession");
0743:
0744: // @todo Not yet implemented
0745: DurableSubscriptionID id = new DurableSubscriptionID(connection
0746: .getClientID(), name, null);
0747: connection.unsubscribe(id);
0748: }
0749:
0750: public XAResource getXAResource() {
0751: return spyXAResource;
0752: }
0753:
0754: public Session getSession() throws JMSException {
0755: checkClosed();
0756: return this ;
0757: }
0758:
0759: public String toString() {
0760: StringBuffer buffer = new StringBuffer(100);
0761: buffer.append("SpySession@").append(
0762: System.identityHashCode(this ));
0763: buffer.append('[');
0764: buffer.append("tx=").append(transacted);
0765: if (transacted == false) {
0766: if (acknowledgeMode == AUTO_ACKNOWLEDGE)
0767: buffer.append(" ack=").append("AUTO");
0768: else if (acknowledgeMode == CLIENT_ACKNOWLEDGE)
0769: buffer.append(" ack=").append("CLIENT");
0770: else if (acknowledgeMode == DUPS_OK_ACKNOWLEDGE)
0771: buffer.append(" ack=").append("DUPSOK");
0772: }
0773: buffer.append(" txid=" + currentTransactionId);
0774: if (spyXAResource != null)
0775: buffer.append(" XA");
0776: if (running)
0777: buffer.append(" RUNNING");
0778: if (closed.get())
0779: buffer.append(" CLOSED");
0780: buffer.append(" connection=").append(connection);
0781: buffer.append(']');
0782: return buffer.toString();
0783: }
0784:
0785: /**
0786: * Set the session's transaction id
0787: *
0788: * @param xid the transaction id
0789: */
0790: void setCurrentTransactionId(final Object xid) {
0791: if (xid == null)
0792: throw new org.jboss.util.NullArgumentException("xid");
0793:
0794: if (trace)
0795: log.trace("Setting current tx xid=" + xid + " previous: "
0796: + currentTransactionId + " " + this );
0797:
0798: this .currentTransactionId = xid;
0799: }
0800:
0801: /**
0802: * Remove the session's transaction id
0803: *
0804: * @param xid the transaction id
0805: */
0806: void unsetCurrentTransactionId(final Object xid) {
0807: if (xid == null)
0808: throw new org.jboss.util.NullArgumentException("xid");
0809:
0810: if (trace)
0811: log
0812: .trace("Unsetting current tx xid=" + xid
0813: + " previous: " + currentTransactionId
0814: + " " + this );
0815:
0816: // Don't unset the xid if it has previously been suspended
0817: // The session could have been recycled
0818: if (xid.equals(currentTransactionId))
0819: this .currentTransactionId = null;
0820: }
0821:
0822: /**
0823: * Get the session's transaction id
0824: *
0825: * @param xid the transaction id
0826: */
0827: Object getCurrentTransactionId() {
0828: return currentTransactionId;
0829: }
0830:
0831: /**
0832: * Get a new message
0833: *
0834: * @return the new message id
0835: * @throws JMSException for any error
0836: */
0837: String getNewMessageID() throws JMSException {
0838: checkClosed();
0839: return connection.getNewMessageID();
0840: }
0841:
0842: /**
0843: * Add a message tot the session
0844: *
0845: * @param message the message
0846: */
0847: void addMessage(SpyMessage message) {
0848: synchronized (messages) {
0849: if (trace)
0850: log.trace("Add message msgid="
0851: + message.header.jmsMessageID + " " + this );
0852: messages.addLast(message);
0853: }
0854: }
0855:
0856: /**
0857: * Add an unacknowledged message
0858: *
0859: * @param message the message
0860: */
0861: void addUnacknowlegedMessage(SpyMessage message) {
0862: if (!transacted) {
0863: synchronized (unacknowledgedMessages) {
0864: if (trace)
0865: log.trace("Add unacked message msgid="
0866: + message.header.jmsMessageID + " " + this );
0867:
0868: unacknowledgedMessages.add(message);
0869: }
0870: }
0871: }
0872:
0873: /**
0874: * Send a message
0875: *
0876: * @param m the message
0877: * @throws JMSException for any error
0878: */
0879: void sendMessage(SpyMessage m) throws JMSException {
0880: checkClosed();
0881:
0882: // Make sure the message has the correct client id
0883: m.header.producerClientId = connection.getClientID();
0884:
0885: if (transacted) {
0886: if (trace)
0887: log.trace("Adding message to transaction "
0888: + m.header.jmsMessageID + " " + this );
0889: connection.spyXAResourceManager.addMessage(
0890: currentTransactionId, m.myClone());
0891: } else {
0892: if (trace)
0893: log.trace("Sending message to server "
0894: + m.header.jmsMessageID + " " + this );
0895: connection.sendToServer(m);
0896: }
0897: }
0898:
0899: /**
0900: * Add a consumer
0901: *
0902: * @param who the consumer
0903: * @throws JMSException for any error
0904: */
0905: void addConsumer(SpyMessageConsumer who) throws JMSException {
0906: checkClosed();
0907:
0908: synchronized (consumers) {
0909: if (trace)
0910: log.trace("Adding consumer " + who);
0911:
0912: consumers.add(who);
0913: }
0914: try {
0915: connection.addConsumer(who);
0916: } catch (JMSSecurityException ex) {
0917: removeConsumerInternal(who);
0918: throw ex;
0919: } catch (Throwable t) {
0920: SpyJMSException.rethrowAsJMSException(
0921: "Error adding consumer", t);
0922: }
0923: }
0924:
0925: /**
0926: * Remove a consumer
0927: *
0928: * @param who the consumer
0929: * @throws JMSException for any error
0930: */
0931: void removeConsumer(SpyMessageConsumer who) throws JMSException {
0932: connection.removeConsumer(who);
0933: removeConsumerInternal(who);
0934: }
0935:
0936: /**
0937: * Add a producer
0938: *
0939: * @param who the producer
0940: * @throws JMSException for any error
0941: */
0942: void addProducer(SpyMessageProducer who) throws JMSException {
0943: checkClosed();
0944:
0945: synchronized (producers) {
0946: if (trace)
0947: log.trace("Adding producer " + who);
0948:
0949: producers.add(who);
0950: }
0951: }
0952:
0953: /**
0954: * Remove a producer
0955: *
0956: * @param who the producer
0957: * @throws JMSException for any error
0958: */
0959: void removeProducer(SpyMessageProducer who) throws JMSException {
0960: removeProducerInternal(who);
0961: }
0962:
0963: /**
0964: * Try to lock the session for asynchronous delivery
0965: *
0966: * @return true when the lock was obtained
0967: */
0968: boolean tryDeliveryLock() {
0969: synchronized (deliveryLock) {
0970: if (inDelivery) {
0971: try {
0972: deliveryLock.wait();
0973: } catch (InterruptedException e) {
0974: log
0975: .trace("Ignored interruption waiting for delivery lock");
0976: }
0977: }
0978: // We got the lock
0979: if (inDelivery == false) {
0980: inDelivery = true;
0981: return true;
0982: }
0983: }
0984: return false;
0985: }
0986:
0987: /**
0988: * Release the delivery lock
0989: */
0990: void releaseDeliveryLock() {
0991: synchronized (deliveryLock) {
0992: inDelivery = false;
0993: deliveryLock.notifyAll();
0994: }
0995: }
0996:
0997: /**
0998: * Interrupt threads waiting for the delivery lock
0999: */
1000: void interruptDeliveryLockWaiters() {
1001: synchronized (deliveryLock) {
1002: deliveryLock.notifyAll();
1003: }
1004: }
1005:
1006: /**
1007: * Invoked to notify of asynchronous failure
1008: *
1009: * @param message the message
1010: * @param t the throwable
1011: */
1012: void asynchFailure(String message, Throwable t) {
1013: connection.asynchFailure(message, t);
1014: }
1015:
1016: /**
1017: * Rollback a transaction
1018: *
1019: * @throws JMSException for any error
1020: */
1021: private void internalRollback() throws JMSException {
1022: synchronized (runLock) {
1023: if (spyXAResource != null)
1024: throw new javax.jms.TransactionInProgressException(
1025: "Should not be call from a XASession");
1026: if (!transacted)
1027: throw new IllegalStateException(
1028: "The session is not transacted");
1029:
1030: if (trace)
1031: log.trace("Rollback transaction " + this );
1032: try {
1033: connection.spyXAResourceManager.endTx(
1034: currentTransactionId, true);
1035: connection.spyXAResourceManager
1036: .rollback(currentTransactionId);
1037: } catch (Throwable t) {
1038: SpyJMSException.rethrowAsJMSException(
1039: "Could not rollback", t);
1040: } finally {
1041: unacknowledgedMessages.clear();
1042: try {
1043: currentTransactionId = connection.spyXAResourceManager
1044: .startTx();
1045: if (trace)
1046: log.trace("Current transaction id: "
1047: + currentTransactionId + " " + this );
1048: } catch (Throwable ignore) {
1049: if (trace)
1050: log.trace("Failed to start tx " + this , ignore);
1051: }
1052: }
1053: }
1054: }
1055:
1056: /**
1057: * Remove a consumer
1058: *
1059: * @param who the consumer
1060: */
1061: private void removeConsumerInternal(SpyMessageConsumer who) {
1062: synchronized (consumers) {
1063: if (trace)
1064: log.trace("Remove consumer " + who);
1065:
1066: consumers.remove(who);
1067: }
1068: }
1069:
1070: /**
1071: * Remove a producer
1072: *
1073: * @param who the producer
1074: */
1075: private void removeProducerInternal(SpyMessageProducer who) {
1076: synchronized (producers) {
1077: if (trace)
1078: log.trace("Remove producer " + who);
1079:
1080: producers.remove(who);
1081: }
1082: }
1083:
1084: /**
1085: * Check whether we are closed
1086: *
1087: * @throws IllegalStateException when the session is closed
1088: */
1089: private void checkClosed() throws IllegalStateException {
1090: if (closed.get())
1091: throw new IllegalStateException("The session is closed");
1092: }
1093: }
|