0001: /*
0002: * Copyright (c) 1998-2008 Caucho Technology -- all rights reserved
0003: *
0004: * This file is part of Resin(R) Open Source
0005: *
0006: * Each copy or derived work must preserve the copyright notice and this
0007: * notice unmodified.
0008: *
0009: * Resin Open Source is free software; you can redistribute it and/or modify
0010: * it under the terms of the GNU General Public License as published by
0011: * the Free Software Foundation; either version 2 of the License, or
0012: * (at your option) any later version.
0013: *
0014: * Resin Open Source is distributed in the hope that it will be useful,
0015: * but WITHOUT ANY WARRANTY; without even the implied warranty of
0016: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE, or any warranty
0017: * of NON-INFRINGEMENT. See the GNU General Public License for more
0018: * details.
0019: *
0020: * You should have received a copy of the GNU General Public License
0021: * along with Resin Open Source; if not, write to the
0022: *
0023: * Free Software Foundation, Inc.
0024: * 59 Temple Place, Suite 330
0025: * Boston, MA 02111-1307 USA
0026: *
0027: * @author Scott Ferguson
0028: */
0029:
0030: package com.caucho.jms.connection;
0031:
0032: import com.caucho.jms2.JMSExceptionWrapper;
0033: import com.caucho.jms.message.*;
0034: import com.caucho.jms.queue.*;
0035: import com.caucho.util.Alarm;
0036: import com.caucho.util.L10N;
0037: import com.caucho.util.ThreadPool;
0038: import com.caucho.util.ThreadTask;
0039:
0040: import javax.jms.*;
0041: import javax.jms.IllegalStateException;
0042: import javax.naming.*;
0043: import javax.transaction.*;
0044: import javax.transaction.xa.*;
0045: import java.io.Serializable;
0046: import java.util.ArrayList;
0047: import java.util.logging.Level;
0048: import java.util.logging.Logger;
0049:
0050: /**
0051: * Manages the JMS session.
0052: */
0053: public class JmsSession implements XASession, ThreadTask, XAResource {
0054: protected static final Logger log = Logger
0055: .getLogger(JmsSession.class.getName());
0056: protected static final L10N L = new L10N(JmsSession.class);
0057:
0058: private static final long SHUTDOWN_WAIT_TIME = 10000;
0059:
0060: private boolean _isXA;
0061: private Xid _xid;
0062: private TransactionManager _tm;
0063:
0064: private boolean _isTransacted;
0065: private int _acknowledgeMode;
0066:
0067: private ClassLoader _classLoader;
0068:
0069: private ConnectionImpl _connection;
0070:
0071: private final ArrayList<MessageConsumerImpl> _consumers = new ArrayList<MessageConsumerImpl>();
0072:
0073: private MessageFactory _messageFactory = new MessageFactory();
0074: private MessageListener _messageListener;
0075: private boolean _isAsynchronous;
0076:
0077: // 4.4.1 - client's responsibility
0078: private Thread _thread;
0079:
0080: // transacted messages
0081: private ArrayList<TransactedMessage> _transactedMessages;
0082:
0083: // true if the listener thread is running
0084: private volatile boolean _isRunning;
0085:
0086: private volatile boolean _isClosed;
0087: private volatile boolean _hasMessage;
0088:
0089: public JmsSession(ConnectionImpl connection, boolean isTransacted,
0090: int ackMode, boolean isXA) throws JMSException {
0091: _classLoader = Thread.currentThread().getContextClassLoader();
0092:
0093: _connection = connection;
0094:
0095: _isXA = isXA;
0096:
0097: _isTransacted = isTransacted;
0098: _acknowledgeMode = ackMode;
0099:
0100: if (isTransacted)
0101: _acknowledgeMode = 0;
0102: else {
0103: switch (ackMode) {
0104: case CLIENT_ACKNOWLEDGE:
0105: case DUPS_OK_ACKNOWLEDGE:
0106: case AUTO_ACKNOWLEDGE:
0107: _acknowledgeMode = ackMode;
0108: break;
0109: default:
0110: try {
0111: log
0112: .warning(L
0113: .l(
0114: "JmsSession {0} is an illegal acknowledge mode",
0115: ackMode));
0116: // XXX: tck
0117: // throw new JMSException(L.l("{0} is an illegal acknowledge mode", ackMode));
0118: log
0119: .warning(L
0120: .l(
0121: "JmsSession {0} is an illegal acknowledge mode",
0122: ackMode));
0123: _acknowledgeMode = AUTO_ACKNOWLEDGE;
0124: } catch (Exception e) {
0125: log.log(Level.FINE, e.toString(), e);
0126: }
0127: break;
0128: }
0129: }
0130:
0131: try {
0132: InitialContext ic = new InitialContext();
0133:
0134: _tm = (TransactionManager) ic
0135: .lookup("java:comp/TransactionManager");
0136: } catch (Exception e) {
0137: log.log(Level.FINER, e.toString(), e);
0138: }
0139:
0140: _connection.addSession(this );
0141: }
0142:
0143: /**
0144: * Returns the connection.
0145: */
0146: ConnectionImpl getConnection() {
0147: return _connection;
0148: }
0149:
0150: /**
0151: * Returns the ClassLoader.
0152: */
0153: ClassLoader getClassLoader() {
0154: return _classLoader;
0155: }
0156:
0157: /**
0158: * Returns the connection's clientID
0159: */
0160: public String getClientID() throws JMSException {
0161: return _connection.getClientID();
0162: }
0163:
0164: /**
0165: * Returns true if the connection is active.
0166: */
0167: public boolean isActive() {
0168: return !_isClosed && _connection.isActive();
0169: }
0170:
0171: /**
0172: * Returns true if the connection is active.
0173: */
0174: boolean isStopping() {
0175: return _connection.isStopping();
0176: }
0177:
0178: /**
0179: * Returns true if the session is in a transaction.
0180: */
0181: public boolean getTransacted() throws JMSException {
0182: checkOpen();
0183:
0184: return _isTransacted;
0185: }
0186:
0187: /**
0188: * Returns the acknowledge mode for the session.
0189: */
0190: public int getAcknowledgeMode() throws JMSException {
0191: checkOpen();
0192:
0193: return _acknowledgeMode;
0194: }
0195:
0196: /**
0197: * Returns the message listener
0198: */
0199: public MessageListener getMessageListener() throws JMSException {
0200: checkOpen();
0201:
0202: return _messageListener;
0203: }
0204:
0205: /**
0206: * Sets the message listener
0207: */
0208: public void setMessageListener(MessageListener listener)
0209: throws JMSException {
0210: checkOpen();
0211:
0212: _messageListener = listener;
0213: setAsynchronous();
0214: }
0215:
0216: /**
0217: * Set true for a synchronous session.
0218: */
0219: void setAsynchronous() {
0220: _isAsynchronous = true;
0221:
0222: notifyMessageAvailable();
0223: }
0224:
0225: /**
0226: * Set true for a synchronous session.
0227: */
0228: boolean isAsynchronous() {
0229: return _isAsynchronous;
0230: }
0231:
0232: /**
0233: * Creates a new byte[] message.
0234: */
0235: public BytesMessage createBytesMessage() throws JMSException {
0236: checkOpen();
0237:
0238: return new BytesMessageImpl();
0239: }
0240:
0241: /**
0242: * Creates a new map message.
0243: */
0244: public MapMessage createMapMessage() throws JMSException {
0245: checkOpen();
0246:
0247: return new MapMessageImpl();
0248: }
0249:
0250: /**
0251: * Creates a message. Used when only header info is important.
0252: */
0253: public Message createMessage() throws JMSException {
0254: checkOpen();
0255:
0256: return new MessageImpl();
0257: }
0258:
0259: /**
0260: * Creates an object message.
0261: */
0262: public ObjectMessage createObjectMessage() throws JMSException {
0263: checkOpen();
0264:
0265: return new ObjectMessageImpl();
0266: }
0267:
0268: /**
0269: * Creates an object message.
0270: *
0271: * @param obj a serializable message.
0272: */
0273: public ObjectMessage createObjectMessage(Serializable obj)
0274: throws JMSException {
0275: checkOpen();
0276:
0277: ObjectMessage msg = createObjectMessage();
0278:
0279: msg.setObject(obj);
0280:
0281: return msg;
0282: }
0283:
0284: /**
0285: * Creates a stream message.
0286: */
0287: public StreamMessage createStreamMessage() throws JMSException {
0288: checkOpen();
0289:
0290: return new StreamMessageImpl();
0291: }
0292:
0293: /**
0294: * Creates a text message.
0295: */
0296: public TextMessage createTextMessage() throws JMSException {
0297: checkOpen();
0298:
0299: return new TextMessageImpl();
0300: }
0301:
0302: /**
0303: * Creates a text message.
0304: */
0305: public TextMessage createTextMessage(String message)
0306: throws JMSException {
0307: checkOpen();
0308:
0309: TextMessage msg = createTextMessage();
0310:
0311: msg.setText(message);
0312:
0313: return msg;
0314: }
0315:
0316: /**
0317: * Creates a consumer to receive messages.
0318: *
0319: * @param destination the destination to receive messages from.
0320: */
0321: public MessageConsumer createConsumer(Destination destination)
0322: throws JMSException {
0323: checkOpen();
0324:
0325: return createConsumer(destination, null, false);
0326: }
0327:
0328: /**
0329: * Creates a consumer to receive messages.
0330: *
0331: * @param destination the destination to receive messages from.
0332: * @param messageSelector query to restrict the messages.
0333: */
0334: public MessageConsumer createConsumer(Destination destination,
0335: String messageSelector) throws JMSException {
0336: checkOpen();
0337:
0338: return createConsumer(destination, messageSelector, false);
0339: }
0340:
0341: /**
0342: * Creates a consumer to receive messages.
0343: *
0344: * @param destination the destination to receive messages from.
0345: * @param messageSelector query to restrict the messages.
0346: */
0347: public MessageConsumer createConsumer(Destination destination,
0348: String messageSelector, boolean noLocal)
0349: throws JMSException {
0350: checkOpen();
0351:
0352: if (destination == null)
0353: throw new InvalidDestinationException(
0354: L
0355: .l("destination is null. Destination may not be null for Session.createConsumer"));
0356:
0357: MessageConsumerImpl consumer;
0358:
0359: if (destination instanceof AbstractQueue) {
0360: AbstractQueue dest = (AbstractQueue) destination;
0361:
0362: consumer = new MessageConsumerImpl(this , dest,
0363: messageSelector, noLocal);
0364: } else if (destination instanceof AbstractTopic) {
0365: AbstractTopic dest = (AbstractTopic) destination;
0366:
0367: consumer = new TopicSubscriberImpl(this , dest,
0368: messageSelector, noLocal);
0369: } else
0370: throw new InvalidDestinationException(
0371: L
0372: .l(
0373: "'{0}' is an unknown destination. The destination must be a Resin JMS Destination.",
0374: destination));
0375:
0376: addConsumer(consumer);
0377:
0378: return consumer;
0379: }
0380:
0381: /**
0382: * Creates a producer to produce messages.
0383: *
0384: * @param destination the destination to send messages from.
0385: */
0386: public MessageProducer createProducer(Destination destination)
0387: throws JMSException {
0388: checkOpen();
0389:
0390: if (destination == null) {
0391: return new MessageProducerImpl(this , null);
0392: }
0393:
0394: if (!(destination instanceof AbstractDestination))
0395: throw new InvalidDestinationException(
0396: L
0397: .l(
0398: "'{0}' is an unknown destination. The destination must be a Resin JMS destination for Session.createProducer.",
0399: destination));
0400:
0401: AbstractDestination dest = (AbstractDestination) destination;
0402:
0403: return new MessageProducerImpl(this , dest);
0404: }
0405:
0406: /**
0407: * Creates a QueueBrowser to browse messages in the queue.
0408: *
0409: * @param queue the queue to send messages to.
0410: */
0411: public QueueBrowser createBrowser(Queue queue) throws JMSException {
0412: checkOpen();
0413:
0414: return createBrowser(queue, null);
0415: }
0416:
0417: /**
0418: * Creates a QueueBrowser to browse messages in the queue.
0419: *
0420: * @param queue the queue to send messages to.
0421: */
0422: public QueueBrowser createBrowser(Queue queue,
0423: String messageSelector) throws JMSException {
0424: checkOpen();
0425:
0426: if (queue == null)
0427: throw new InvalidDestinationException(
0428: L
0429: .l("queue is null. Queue may not be null for Session.createBrowser"));
0430:
0431: if (!(queue instanceof AbstractQueue))
0432: throw new InvalidDestinationException(
0433: L
0434: .l(
0435: "'{0}' is an unknown queue. The queue must be a Resin JMS Queue for Session.createBrowser.",
0436: queue));
0437:
0438: return new MessageBrowserImpl(this , (AbstractQueue) queue,
0439: messageSelector);
0440: }
0441:
0442: /**
0443: * Creates a new queue.
0444: */
0445: public Queue createQueue(String queueName) throws JMSException {
0446: checkOpen();
0447:
0448: return _connection.createQueue(queueName);
0449: }
0450:
0451: /**
0452: * Creates a temporary queue.
0453: */
0454: public TemporaryQueue createTemporaryQueue() throws JMSException {
0455: checkOpen();
0456:
0457: return new TemporaryQueueImpl(this );
0458: }
0459:
0460: /**
0461: * Creates a new topic.
0462: */
0463: public Topic createTopic(String topicName) throws JMSException {
0464: checkOpen();
0465:
0466: return _connection.createTopic(topicName);
0467: }
0468:
0469: /**
0470: * Creates a temporary topic.
0471: */
0472: public TemporaryTopic createTemporaryTopic() throws JMSException {
0473: checkOpen();
0474:
0475: return new TemporaryTopicImpl(this );
0476: }
0477:
0478: /**
0479: * Creates a durable subscriber to receive messages.
0480: *
0481: * @param topic the topic to receive messages from.
0482: */
0483: public TopicSubscriber createDurableSubscriber(Topic topic,
0484: String name) throws JMSException {
0485: checkOpen();
0486:
0487: if (getClientID() == null)
0488: throw new JMSException(
0489: L
0490: .l("connection may not create a durable subscriber because it does not have an assigned ClientID."));
0491:
0492: return createDurableSubscriber(topic, name, null, false);
0493: }
0494:
0495: /**
0496: * Creates a subscriber to receive messages.
0497: *
0498: * @param topic the topic to receive messages from.
0499: * @param messageSelector topic to restrict the messages.
0500: * @param noLocal if true, don't receive messages we've sent
0501: */
0502: public TopicSubscriber createDurableSubscriber(Topic topic,
0503: String name, String messageSelector, boolean noLocal)
0504: throws JMSException {
0505: checkOpen();
0506:
0507: if (topic == null)
0508: throw new InvalidDestinationException(
0509: L
0510: .l("destination is null. Destination may not be null for Session.createDurableSubscriber"));
0511:
0512: if (!(topic instanceof AbstractTopic))
0513: throw new InvalidDestinationException(
0514: L
0515: .l(
0516: "'{0}' is an unknown destination. The destination must be a Resin JMS Destination.",
0517: topic));
0518:
0519: AbstractTopic topicImpl = (AbstractTopic) topic;
0520:
0521: if (_connection.getDurableSubscriber(name) != null) {
0522: // jms/2130
0523: // unsubscribe(name);
0524: /*
0525: throw new JMSException(L.l("'{0}' is already an active durable subscriber",
0526: name));
0527: */
0528: }
0529:
0530: AbstractQueue queue = topicImpl.createSubscriber(this , name,
0531: noLocal);
0532:
0533: TopicSubscriberImpl consumer;
0534: consumer = new TopicSubscriberImpl(this , topicImpl, queue,
0535: messageSelector, noLocal);
0536:
0537: _connection.putDurableSubscriber(name, consumer);
0538:
0539: addConsumer(consumer);
0540:
0541: return consumer;
0542: }
0543:
0544: /**
0545: * Unsubscribe from a durable subscription.
0546: */
0547: public void unsubscribe(String name) throws JMSException {
0548: checkOpen();
0549:
0550: if (name == null)
0551: throw new InvalidDestinationException(
0552: L
0553: .l("destination is null. Destination may not be null for Session.unsubscribe"));
0554:
0555: TopicSubscriber subscriber = _connection
0556: .removeDurableSubscriber(name);
0557:
0558: if (subscriber == null)
0559: throw new InvalidDestinationException(
0560: L
0561: .l(
0562: "'{0}' is an unknown subscriber for Session.unsubscribe",
0563: name));
0564:
0565: subscriber.close();
0566: }
0567:
0568: /**
0569: * Starts the session.
0570: */
0571: void start() {
0572: if (log.isLoggable(Level.FINE))
0573: log.fine(toString() + " active");
0574:
0575: notifyMessageAvailable();
0576: }
0577:
0578: /**
0579: * Stops the session.
0580: */
0581: void stop() {
0582: if (log.isLoggable(Level.FINE))
0583: log.fine(toString() + " stopping");
0584:
0585: synchronized (_consumers) {
0586: long timeout = Alarm.getCurrentTime() + SHUTDOWN_WAIT_TIME;
0587: while (_isRunning && Alarm.getCurrentTime() < timeout) {
0588: try {
0589: _consumers.wait(SHUTDOWN_WAIT_TIME);
0590:
0591: if (Alarm.isTest()) {
0592: return;
0593: }
0594: } catch (Throwable e) {
0595: log.log(Level.FINER, e.toString(), e);
0596: }
0597: }
0598:
0599: ArrayList<MessageConsumerImpl> consumers = new ArrayList<MessageConsumerImpl>(
0600: _consumers);
0601:
0602: for (MessageConsumerImpl consumer : consumers) {
0603: try {
0604: // XXX: should be stop()?
0605:
0606: consumer.stop();
0607: } catch (Throwable e) {
0608: log.log(Level.FINE, e.toString(), e);
0609: }
0610: }
0611: }
0612: }
0613:
0614: /**
0615: * Commits the messages.
0616: */
0617: public void commit() throws JMSException {
0618: checkOpen();
0619:
0620: commit(false);
0621: }
0622:
0623: /**
0624: * Commits the messages.
0625: */
0626: private void commit(boolean isXA) throws JMSException {
0627: _xid = null;
0628:
0629: if (!_isTransacted && !_isXA)
0630: throw new IllegalStateException(
0631: L
0632: .l("commit() can only be called on a transacted session."));
0633:
0634: _isXA = false;
0635:
0636: ArrayList<TransactedMessage> messages = _transactedMessages;
0637: if (messages != null) {
0638: try {
0639: for (int i = 0; i < messages.size(); i++) {
0640: messages.get(i).commit();
0641: }
0642: } finally {
0643: messages.clear();
0644: }
0645: }
0646:
0647: if (!isXA)
0648: acknowledge();
0649: }
0650:
0651: /**
0652: * Acknowledge received
0653: */
0654: public void acknowledge() throws JMSException {
0655: checkOpen();
0656:
0657: if (_transactedMessages != null) {
0658: for (int i = _transactedMessages.size() - 1; i >= 0; i--) {
0659: TransactedMessage msg = _transactedMessages.get(i);
0660:
0661: if (msg instanceof ReceiveMessage) {
0662: _transactedMessages.remove(i);
0663:
0664: msg.commit();
0665: }
0666: }
0667: }
0668: }
0669:
0670: /**
0671: * Recovers the messages.
0672: */
0673: public void recover() throws JMSException {
0674: checkOpen();
0675:
0676: if (_isTransacted)
0677: throw new IllegalStateException(
0678: L
0679: .l("recover() may not be called on a transacted session."));
0680:
0681: if (_transactedMessages != null) {
0682: for (int i = _transactedMessages.size() - 1; i >= 0; i--) {
0683: TransactedMessage msg = _transactedMessages.get(i);
0684:
0685: if (msg instanceof ReceiveMessage) {
0686: _transactedMessages.remove(i);
0687:
0688: msg.rollback();
0689: }
0690: }
0691: }
0692: }
0693:
0694: /**
0695: * Rollsback the messages.
0696: */
0697: public void rollback() throws JMSException {
0698: checkOpen();
0699:
0700: rollbackImpl();
0701: }
0702:
0703: /**
0704: * Rollsback the messages.
0705: */
0706: public void rollbackImpl() throws JMSException {
0707: if (!_isTransacted && !_isXA)
0708: throw new IllegalStateException(
0709: L
0710: .l("rollback() can only be called on a transacted session."));
0711:
0712: if (_transactedMessages != null) {
0713: for (int i = 0; i < _transactedMessages.size(); i++)
0714: _transactedMessages.get(i).rollback();
0715:
0716: _transactedMessages.clear();
0717: }
0718: }
0719:
0720: /**
0721: * Closes the session
0722: */
0723: public void close() throws JMSException {
0724: if (_isClosed)
0725: return;
0726:
0727: try {
0728: stop();
0729: } catch (Exception e) {
0730: log.log(Level.WARNING, e.toString(), e);
0731: }
0732:
0733: ArrayList<TransactedMessage> messages = _transactedMessages;
0734:
0735: if (messages != null && _xid == null) {
0736: _transactedMessages = null;
0737:
0738: try {
0739: for (int i = 0; i < messages.size(); i++) {
0740: messages.get(i).close();
0741: }
0742: } catch (Exception e) {
0743: log.log(Level.WARNING, e.toString(), e);
0744: }
0745: }
0746:
0747: for (int i = 0; i < _consumers.size(); i++) {
0748: MessageConsumerImpl consumer = _consumers.get(i);
0749:
0750: try {
0751: consumer.close();
0752: } catch (Exception e) {
0753: log.log(Level.WARNING, e.toString(), e);
0754: }
0755: }
0756:
0757: try {
0758: _connection.removeSession(this );
0759: } finally {
0760: _isClosed = true;
0761: }
0762:
0763: _classLoader = null;
0764: }
0765:
0766: protected void addConsumer(MessageConsumerImpl consumer) {
0767: _consumers.add(consumer);
0768:
0769: notifyMessageAvailable();
0770: }
0771:
0772: protected void removeConsumer(MessageConsumerImpl consumer) {
0773: if (_consumers != null)
0774: _consumers.remove(consumer);
0775: }
0776:
0777: /**
0778: * Notifies the receiver.
0779: */
0780: boolean notifyMessageAvailable() {
0781: synchronized (_consumers) {
0782: _hasMessage = true;
0783:
0784: if (_isRunning || !_isAsynchronous || !isActive())
0785: return false;
0786:
0787: _isRunning = true;
0788: }
0789:
0790: ThreadPool.getThreadPool().schedule(this );
0791: // the yield is only needed for the regressions
0792: Thread.yield();
0793:
0794: return true;
0795: }
0796:
0797: /**
0798: * Adds a message to the session message queue.
0799: */
0800: public void send(AbstractDestination queue, Message appMessage,
0801: int deliveryMode, int priority, long timeout)
0802: throws JMSException {
0803: checkOpen();
0804:
0805: if (queue == null)
0806: throw new UnsupportedOperationException(L
0807: .l("empty queue is not allowed for this session."));
0808:
0809: MessageImpl message = _messageFactory.copy(appMessage);
0810:
0811: long now = Alarm.getExactTime();
0812: long expiration = now + timeout;
0813:
0814: message.setJMSMessageID(queue.generateMessageID());
0815: if (message.getJMSDestination() == null)
0816: message.setJMSDestination(queue);
0817: message.setJMSDeliveryMode(deliveryMode);
0818: if (message.getJMSTimestamp() == 0)
0819: message.setJMSTimestamp(now);
0820: if (message.getJMSExpiration() == 0)
0821: message.setJMSExpiration(expiration);
0822: message.setJMSPriority(priority);
0823:
0824: // ejb/0970
0825:
0826: boolean isXA = false;
0827: try {
0828: if (_isTransacted && _tm != null
0829: && _tm.getTransaction() != null)
0830: isXA = true;
0831: } catch (Exception e) {
0832: log.log(Level.FINE, e.toString(), e);
0833: }
0834:
0835: if (_isTransacted || isXA) {
0836: if (_transactedMessages == null)
0837: _transactedMessages = new ArrayList<TransactedMessage>();
0838:
0839: TransactedMessage transMsg = new SendMessage(queue, message);
0840:
0841: _transactedMessages.add(transMsg);
0842:
0843: if (_xid == null)
0844: enlist();
0845: } else {
0846: if (log.isLoggable(Level.FINE))
0847: log.fine(queue + " sending " + message);
0848:
0849: queue.send(this , message, expiration);
0850: }
0851: }
0852:
0853: private void enlist() {
0854: if (_tm != null) {
0855: try {
0856: Transaction trans = _tm.getTransaction();
0857:
0858: if (trans != null)
0859: trans.enlistResource(this );
0860: } catch (Exception e) {
0861: throw new RuntimeException(e);
0862: }
0863: }
0864: }
0865:
0866: private void delist() {
0867: if (_tm != null) {
0868: try {
0869: Transaction trans = _tm.getTransaction();
0870:
0871: if (trans != null)
0872: trans.delistResource(this , 0);
0873: } catch (Exception e) {
0874: throw new RuntimeException(e);
0875: }
0876: }
0877: }
0878:
0879: /**
0880: * Adds a message to the session message queue.
0881: */
0882: void addTransactedReceive(AbstractDestination queue,
0883: MessageImpl message) {
0884: message.setSession(this );
0885:
0886: if (_transactedMessages == null)
0887: _transactedMessages = new ArrayList<TransactedMessage>();
0888:
0889: TransactedMessage transMsg = new ReceiveMessage(queue, message);
0890:
0891: _transactedMessages.add(transMsg);
0892:
0893: if (_tm != null && _transactedMessages.size() == 1) {
0894: enlist();
0895: }
0896: }
0897:
0898: /**
0899: * Called to synchronously receive a message.
0900: */
0901: protected Message receive(MessageConsumerImpl consumer, long timeout)
0902: throws JMSException {
0903: throw new UnsupportedOperationException();
0904: /*
0905: checkOpen();
0906:
0907: if (Long.MAX_VALUE / 2 < timeout || timeout < 0)
0908: timeout = Long.MAX_VALUE / 2;
0909:
0910: long now = Alarm.getCurrentTime();
0911: long failTime = Alarm.getCurrentTime() + timeout;
0912:
0913: Selector selector = consumer.getSelector();
0914: AbstractQueue queue;
0915: queue = (AbstractQueue) consumer.getDestination();
0916:
0917: // 4.4.1 user's reponsibility
0918: // checkThread();
0919:
0920: Thread oldThread = Thread.currentThread();
0921: try {
0922: // _thread = Thread.currentThread();
0923:
0924: while (! consumer.isClosed()) {
0925: if (isActive()) {
0926: Message msg = queue.receive(selector);
0927: if (msg != null)
0928: return msg;
0929: _hasMessage = false;
0930: }
0931:
0932: long delta = failTime - Alarm.getCurrentTime();
0933:
0934: if (delta <= 0 || _isClosed || Alarm.isTest())
0935: return null;
0936:
0937: synchronized (_consumers) {
0938: if (! _hasMessage || ! isActive()) {
0939: try {
0940: _consumers.wait(delta);
0941: } catch (Throwable e) {
0942: }
0943: }
0944: }
0945: }
0946: } finally {
0947: // _thread = oldThread;
0948: }
0949:
0950: return null;
0951: */
0952: }
0953:
0954: //
0955: // XA
0956: //
0957:
0958: public Session getSession() {
0959: return this ;
0960: }
0961:
0962: public XAResource getXAResource() {
0963: return this ;
0964: }
0965:
0966: /**
0967: * Returns true if the specified resource has the same RM.
0968: */
0969: public boolean isSameRM(XAResource xa) throws XAException {
0970: return this == xa;
0971: }
0972:
0973: /**
0974: * Sets the transaction timeout in seconds.
0975: */
0976: public boolean setTransactionTimeout(int timeout)
0977: throws XAException {
0978: return true;
0979: }
0980:
0981: /**
0982: * Gets the transaction timeout in seconds.
0983: */
0984: public int getTransactionTimeout() throws XAException {
0985: return 0;
0986: }
0987:
0988: /**
0989: * Called when the resource is associated with a transaction.
0990: */
0991: public void start(Xid xid, int flags) throws XAException {
0992: _xid = xid;
0993: }
0994:
0995: /**
0996: * Called when the resource is is done with a transaction.
0997: */
0998: public void end(Xid xid, int flags) throws XAException {
0999: _xid = null;
1000: }
1001:
1002: /**
1003: * Called to start the first phase of the commit.
1004: */
1005: public int prepare(Xid xid) throws XAException {
1006: return 0;
1007: }
1008:
1009: /**
1010: * Called to commit.
1011: */
1012: public void commit(Xid xid, boolean onePhase) throws XAException {
1013: try {
1014: commit(true);
1015: } catch (Exception e) {
1016: throw new RuntimeException(e);
1017: } finally {
1018: delist();
1019: _isXA = false;
1020: }
1021: }
1022:
1023: /**
1024: * Called to roll back.
1025: */
1026: public void rollback(Xid xid) throws XAException {
1027: try {
1028: rollbackImpl();
1029: } catch (Exception e) {
1030: throw new RuntimeException(e);
1031: } finally {
1032: delist();
1033: _isXA = false;
1034: }
1035: }
1036:
1037: /**
1038: * Called to forget an Xid that had a heuristic commit.
1039: */
1040: public void forget(Xid xid) throws XAException {
1041: }
1042:
1043: /**
1044: * Called to find Xid's that need recovery.
1045: */
1046: public Xid[] recover(int flag) throws XAException {
1047: return null;
1048: }
1049:
1050: /**
1051: * Called to synchronously receive messages
1052: */
1053: public void run() {
1054: boolean isValid = true;
1055:
1056: while (isValid) {
1057: isValid = false;
1058: _hasMessage = false;
1059:
1060: try {
1061: for (int i = 0; i < _consumers.size(); i++) {
1062: MessageConsumerImpl consumer = _consumers.get(i);
1063:
1064: while (isActive()
1065: && consumer.handleMessage(_messageListener)) {
1066: }
1067: }
1068:
1069: isValid = isActive();
1070: } finally {
1071: synchronized (_consumers) {
1072: if (!isValid)
1073: _isRunning = false;
1074: else if (!_hasMessage) {
1075: _isRunning = false;
1076: isValid = false;
1077: }
1078:
1079: // notification, e.g. for shutdown
1080: _consumers.notifyAll();
1081: }
1082: }
1083: }
1084: }
1085:
1086: public boolean isClosed() {
1087: return _isClosed;
1088: }
1089:
1090: /**
1091: * Checks that the session is open.
1092: */
1093: public void checkOpen() throws javax.jms.IllegalStateException {
1094: if (_isClosed)
1095: throw new javax.jms.IllegalStateException(L
1096: .l("session is closed"));
1097: }
1098:
1099: /**
1100: * Verifies that multiple threads aren't using the session.
1101: *
1102: * 4.4.1 the client takes the responsibility. There's no
1103: * validation check.
1104: */
1105: void checkThread() throws JMSException {
1106: Thread thread = _thread;
1107:
1108: if (thread != Thread.currentThread() && thread != null) {
1109: Exception e = new IllegalStateException(L
1110: .l("Can't use session from concurrent threads."));
1111: log.log(Level.WARNING, e.toString(), e);
1112: }
1113: }
1114:
1115: public String toString() {
1116: String className = getClass().getName();
1117: int p = className.lastIndexOf('.');
1118:
1119: return className.substring(p + 1) + "[]";
1120: }
1121:
1122: abstract class TransactedMessage {
1123: abstract void commit() throws JMSException;
1124:
1125: abstract void rollback() throws JMSException;
1126:
1127: void close() throws JMSException {
1128: }
1129: }
1130:
1131: class SendMessage extends TransactedMessage {
1132: private final AbstractDestination _queue;
1133: private final MessageImpl _message;
1134:
1135: SendMessage(AbstractDestination queue, MessageImpl message) {
1136: _queue = queue;
1137: _message = message;
1138: }
1139:
1140: void commit() throws JMSException {
1141: _queue.send(JmsSession.this , _message, 0);
1142: }
1143:
1144: void rollback() throws JMSException {
1145: }
1146:
1147: void close() throws JMSException {
1148: commit();
1149: }
1150: }
1151:
1152: class ReceiveMessage extends TransactedMessage {
1153: private final AbstractDestination _queue;
1154: private final String _msgId;
1155:
1156: ReceiveMessage(AbstractDestination queue, MessageImpl message) {
1157: _queue = queue;
1158: _msgId = message.getJMSMessageID();
1159:
1160: if (_msgId == null)
1161: throw new NullPointerException();
1162: }
1163:
1164: void commit() throws JMSException {
1165: _queue.acknowledge(_msgId);
1166: }
1167:
1168: void rollback() throws JMSException {
1169: _queue.rollback(_msgId);
1170: }
1171:
1172: void close() throws JMSException {
1173: rollback();
1174: }
1175: }
1176: }
|