Source Code Cross Referenced for JmsSession.java in  » EJB-Server-resin-3.1.5 » resin » com » caucho » jms » connection » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » EJB Server resin 3.1.5 » resin » com.caucho.jms.connection 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        /*
0002:         * Copyright (c) 1998-2008 Caucho Technology -- all rights reserved
0003:         *
0004:         * This file is part of Resin(R) Open Source
0005:         *
0006:         * Each copy or derived work must preserve the copyright notice and this
0007:         * notice unmodified.
0008:         *
0009:         * Resin Open Source is free software; you can redistribute it and/or modify
0010:         * it under the terms of the GNU General Public License as published by
0011:         * the Free Software Foundation; either version 2 of the License, or
0012:         * (at your option) any later version.
0013:         *
0014:         * Resin Open Source is distributed in the hope that it will be useful,
0015:         * but WITHOUT ANY WARRANTY; without even the implied warranty of
0016:         * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE, or any warranty
0017:         * of NON-INFRINGEMENT.  See the GNU General Public License for more
0018:         * details.
0019:         *
0020:         * You should have received a copy of the GNU General Public License
0021:         * along with Resin Open Source; if not, write to the
0022:         *
0023:         *   Free Software Foundation, Inc.
0024:         *   59 Temple Place, Suite 330
0025:         *   Boston, MA 02111-1307  USA
0026:         *
0027:         * @author Scott Ferguson
0028:         */
0029:
0030:        package com.caucho.jms.connection;
0031:
0032:        import com.caucho.jms2.JMSExceptionWrapper;
0033:        import com.caucho.jms.message.*;
0034:        import com.caucho.jms.queue.*;
0035:        import com.caucho.util.Alarm;
0036:        import com.caucho.util.L10N;
0037:        import com.caucho.util.ThreadPool;
0038:        import com.caucho.util.ThreadTask;
0039:
0040:        import javax.jms.*;
0041:        import javax.jms.IllegalStateException;
0042:        import javax.naming.*;
0043:        import javax.transaction.*;
0044:        import javax.transaction.xa.*;
0045:        import java.io.Serializable;
0046:        import java.util.ArrayList;
0047:        import java.util.logging.Level;
0048:        import java.util.logging.Logger;
0049:
0050:        /**
0051:         * Manages the JMS session.
0052:         */
0053:        public class JmsSession implements  XASession, ThreadTask, XAResource {
0054:            protected static final Logger log = Logger
0055:                    .getLogger(JmsSession.class.getName());
0056:            protected static final L10N L = new L10N(JmsSession.class);
0057:
0058:            private static final long SHUTDOWN_WAIT_TIME = 10000;
0059:
0060:            private boolean _isXA;
0061:            private Xid _xid;
0062:            private TransactionManager _tm;
0063:
0064:            private boolean _isTransacted;
0065:            private int _acknowledgeMode;
0066:
0067:            private ClassLoader _classLoader;
0068:
0069:            private ConnectionImpl _connection;
0070:
0071:            private final ArrayList<MessageConsumerImpl> _consumers = new ArrayList<MessageConsumerImpl>();
0072:
0073:            private MessageFactory _messageFactory = new MessageFactory();
0074:            private MessageListener _messageListener;
0075:            private boolean _isAsynchronous;
0076:
0077:            // 4.4.1 - client's responsibility
0078:            private Thread _thread;
0079:
0080:            // transacted messages
0081:            private ArrayList<TransactedMessage> _transactedMessages;
0082:
0083:            // true if the listener thread is running
0084:            private volatile boolean _isRunning;
0085:
0086:            private volatile boolean _isClosed;
0087:            private volatile boolean _hasMessage;
0088:
0089:            public JmsSession(ConnectionImpl connection, boolean isTransacted,
0090:                    int ackMode, boolean isXA) throws JMSException {
0091:                _classLoader = Thread.currentThread().getContextClassLoader();
0092:
0093:                _connection = connection;
0094:
0095:                _isXA = isXA;
0096:
0097:                _isTransacted = isTransacted;
0098:                _acknowledgeMode = ackMode;
0099:
0100:                if (isTransacted)
0101:                    _acknowledgeMode = 0;
0102:                else {
0103:                    switch (ackMode) {
0104:                    case CLIENT_ACKNOWLEDGE:
0105:                    case DUPS_OK_ACKNOWLEDGE:
0106:                    case AUTO_ACKNOWLEDGE:
0107:                        _acknowledgeMode = ackMode;
0108:                        break;
0109:                    default:
0110:                        try {
0111:                            log
0112:                                    .warning(L
0113:                                            .l(
0114:                                                    "JmsSession {0} is an illegal acknowledge mode",
0115:                                                    ackMode));
0116:                            // XXX: tck
0117:                            // throw new JMSException(L.l("{0} is an illegal acknowledge mode", ackMode));
0118:                            log
0119:                                    .warning(L
0120:                                            .l(
0121:                                                    "JmsSession {0} is an illegal acknowledge mode",
0122:                                                    ackMode));
0123:                            _acknowledgeMode = AUTO_ACKNOWLEDGE;
0124:                        } catch (Exception e) {
0125:                            log.log(Level.FINE, e.toString(), e);
0126:                        }
0127:                        break;
0128:                    }
0129:                }
0130:
0131:                try {
0132:                    InitialContext ic = new InitialContext();
0133:
0134:                    _tm = (TransactionManager) ic
0135:                            .lookup("java:comp/TransactionManager");
0136:                } catch (Exception e) {
0137:                    log.log(Level.FINER, e.toString(), e);
0138:                }
0139:
0140:                _connection.addSession(this );
0141:            }
0142:
0143:            /**
0144:             * Returns the connection.
0145:             */
0146:            ConnectionImpl getConnection() {
0147:                return _connection;
0148:            }
0149:
0150:            /**
0151:             * Returns the ClassLoader.
0152:             */
0153:            ClassLoader getClassLoader() {
0154:                return _classLoader;
0155:            }
0156:
0157:            /**
0158:             * Returns the connection's clientID
0159:             */
0160:            public String getClientID() throws JMSException {
0161:                return _connection.getClientID();
0162:            }
0163:
0164:            /**
0165:             * Returns true if the connection is active.
0166:             */
0167:            public boolean isActive() {
0168:                return !_isClosed && _connection.isActive();
0169:            }
0170:
0171:            /**
0172:             * Returns true if the connection is active.
0173:             */
0174:            boolean isStopping() {
0175:                return _connection.isStopping();
0176:            }
0177:
0178:            /**
0179:             * Returns true if the session is in a transaction.
0180:             */
0181:            public boolean getTransacted() throws JMSException {
0182:                checkOpen();
0183:
0184:                return _isTransacted;
0185:            }
0186:
0187:            /**
0188:             * Returns the acknowledge mode for the session.
0189:             */
0190:            public int getAcknowledgeMode() throws JMSException {
0191:                checkOpen();
0192:
0193:                return _acknowledgeMode;
0194:            }
0195:
0196:            /**
0197:             * Returns the message listener
0198:             */
0199:            public MessageListener getMessageListener() throws JMSException {
0200:                checkOpen();
0201:
0202:                return _messageListener;
0203:            }
0204:
0205:            /**
0206:             * Sets the message listener
0207:             */
0208:            public void setMessageListener(MessageListener listener)
0209:                    throws JMSException {
0210:                checkOpen();
0211:
0212:                _messageListener = listener;
0213:                setAsynchronous();
0214:            }
0215:
0216:            /**
0217:             * Set true for a synchronous session.
0218:             */
0219:            void setAsynchronous() {
0220:                _isAsynchronous = true;
0221:
0222:                notifyMessageAvailable();
0223:            }
0224:
0225:            /**
0226:             * Set true for a synchronous session.
0227:             */
0228:            boolean isAsynchronous() {
0229:                return _isAsynchronous;
0230:            }
0231:
0232:            /**
0233:             * Creates a new byte[] message.
0234:             */
0235:            public BytesMessage createBytesMessage() throws JMSException {
0236:                checkOpen();
0237:
0238:                return new BytesMessageImpl();
0239:            }
0240:
0241:            /**
0242:             * Creates a new map message.
0243:             */
0244:            public MapMessage createMapMessage() throws JMSException {
0245:                checkOpen();
0246:
0247:                return new MapMessageImpl();
0248:            }
0249:
0250:            /**
0251:             * Creates a message.  Used when only header info is important.
0252:             */
0253:            public Message createMessage() throws JMSException {
0254:                checkOpen();
0255:
0256:                return new MessageImpl();
0257:            }
0258:
0259:            /**
0260:             * Creates an object message.
0261:             */
0262:            public ObjectMessage createObjectMessage() throws JMSException {
0263:                checkOpen();
0264:
0265:                return new ObjectMessageImpl();
0266:            }
0267:
0268:            /**
0269:             * Creates an object message.
0270:             *
0271:             * @param obj a serializable message.
0272:             */
0273:            public ObjectMessage createObjectMessage(Serializable obj)
0274:                    throws JMSException {
0275:                checkOpen();
0276:
0277:                ObjectMessage msg = createObjectMessage();
0278:
0279:                msg.setObject(obj);
0280:
0281:                return msg;
0282:            }
0283:
0284:            /**
0285:             * Creates a stream message.
0286:             */
0287:            public StreamMessage createStreamMessage() throws JMSException {
0288:                checkOpen();
0289:
0290:                return new StreamMessageImpl();
0291:            }
0292:
0293:            /**
0294:             * Creates a text message.
0295:             */
0296:            public TextMessage createTextMessage() throws JMSException {
0297:                checkOpen();
0298:
0299:                return new TextMessageImpl();
0300:            }
0301:
0302:            /**
0303:             * Creates a text message.
0304:             */
0305:            public TextMessage createTextMessage(String message)
0306:                    throws JMSException {
0307:                checkOpen();
0308:
0309:                TextMessage msg = createTextMessage();
0310:
0311:                msg.setText(message);
0312:
0313:                return msg;
0314:            }
0315:
0316:            /**
0317:             * Creates a consumer to receive messages.
0318:             *
0319:             * @param destination the destination to receive messages from.
0320:             */
0321:            public MessageConsumer createConsumer(Destination destination)
0322:                    throws JMSException {
0323:                checkOpen();
0324:
0325:                return createConsumer(destination, null, false);
0326:            }
0327:
0328:            /**
0329:             * Creates a consumer to receive messages.
0330:             *
0331:             * @param destination the destination to receive messages from.
0332:             * @param messageSelector query to restrict the messages.
0333:             */
0334:            public MessageConsumer createConsumer(Destination destination,
0335:                    String messageSelector) throws JMSException {
0336:                checkOpen();
0337:
0338:                return createConsumer(destination, messageSelector, false);
0339:            }
0340:
0341:            /**
0342:             * Creates a consumer to receive messages.
0343:             *
0344:             * @param destination the destination to receive messages from.
0345:             * @param messageSelector query to restrict the messages.
0346:             */
0347:            public MessageConsumer createConsumer(Destination destination,
0348:                    String messageSelector, boolean noLocal)
0349:                    throws JMSException {
0350:                checkOpen();
0351:
0352:                if (destination == null)
0353:                    throw new InvalidDestinationException(
0354:                            L
0355:                                    .l("destination is null.  Destination may not be null for Session.createConsumer"));
0356:
0357:                MessageConsumerImpl consumer;
0358:
0359:                if (destination instanceof  AbstractQueue) {
0360:                    AbstractQueue dest = (AbstractQueue) destination;
0361:
0362:                    consumer = new MessageConsumerImpl(this , dest,
0363:                            messageSelector, noLocal);
0364:                } else if (destination instanceof  AbstractTopic) {
0365:                    AbstractTopic dest = (AbstractTopic) destination;
0366:
0367:                    consumer = new TopicSubscriberImpl(this , dest,
0368:                            messageSelector, noLocal);
0369:                } else
0370:                    throw new InvalidDestinationException(
0371:                            L
0372:                                    .l(
0373:                                            "'{0}' is an unknown destination.  The destination must be a Resin JMS Destination.",
0374:                                            destination));
0375:
0376:                addConsumer(consumer);
0377:
0378:                return consumer;
0379:            }
0380:
0381:            /**
0382:             * Creates a producer to produce messages.
0383:             *
0384:             * @param destination the destination to send messages from.
0385:             */
0386:            public MessageProducer createProducer(Destination destination)
0387:                    throws JMSException {
0388:                checkOpen();
0389:
0390:                if (destination == null) {
0391:                    return new MessageProducerImpl(this , null);
0392:                }
0393:
0394:                if (!(destination instanceof  AbstractDestination))
0395:                    throw new InvalidDestinationException(
0396:                            L
0397:                                    .l(
0398:                                            "'{0}' is an unknown destination.  The destination must be a Resin JMS destination for Session.createProducer.",
0399:                                            destination));
0400:
0401:                AbstractDestination dest = (AbstractDestination) destination;
0402:
0403:                return new MessageProducerImpl(this , dest);
0404:            }
0405:
0406:            /**
0407:             * Creates a QueueBrowser to browse messages in the queue.
0408:             *
0409:             * @param queue the queue to send messages to.
0410:             */
0411:            public QueueBrowser createBrowser(Queue queue) throws JMSException {
0412:                checkOpen();
0413:
0414:                return createBrowser(queue, null);
0415:            }
0416:
0417:            /**
0418:             * Creates a QueueBrowser to browse messages in the queue.
0419:             *
0420:             * @param queue the queue to send messages to.
0421:             */
0422:            public QueueBrowser createBrowser(Queue queue,
0423:                    String messageSelector) throws JMSException {
0424:                checkOpen();
0425:
0426:                if (queue == null)
0427:                    throw new InvalidDestinationException(
0428:                            L
0429:                                    .l("queue is null.  Queue may not be null for Session.createBrowser"));
0430:
0431:                if (!(queue instanceof  AbstractQueue))
0432:                    throw new InvalidDestinationException(
0433:                            L
0434:                                    .l(
0435:                                            "'{0}' is an unknown queue.  The queue must be a Resin JMS Queue for Session.createBrowser.",
0436:                                            queue));
0437:
0438:                return new MessageBrowserImpl(this , (AbstractQueue) queue,
0439:                        messageSelector);
0440:            }
0441:
0442:            /**
0443:             * Creates a new queue.
0444:             */
0445:            public Queue createQueue(String queueName) throws JMSException {
0446:                checkOpen();
0447:
0448:                return _connection.createQueue(queueName);
0449:            }
0450:
0451:            /**
0452:             * Creates a temporary queue.
0453:             */
0454:            public TemporaryQueue createTemporaryQueue() throws JMSException {
0455:                checkOpen();
0456:
0457:                return new TemporaryQueueImpl(this );
0458:            }
0459:
0460:            /**
0461:             * Creates a new topic.
0462:             */
0463:            public Topic createTopic(String topicName) throws JMSException {
0464:                checkOpen();
0465:
0466:                return _connection.createTopic(topicName);
0467:            }
0468:
0469:            /**
0470:             * Creates a temporary topic.
0471:             */
0472:            public TemporaryTopic createTemporaryTopic() throws JMSException {
0473:                checkOpen();
0474:
0475:                return new TemporaryTopicImpl(this );
0476:            }
0477:
0478:            /**
0479:             * Creates a durable subscriber to receive messages.
0480:             *
0481:             * @param topic the topic to receive messages from.
0482:             */
0483:            public TopicSubscriber createDurableSubscriber(Topic topic,
0484:                    String name) throws JMSException {
0485:                checkOpen();
0486:
0487:                if (getClientID() == null)
0488:                    throw new JMSException(
0489:                            L
0490:                                    .l("connection may not create a durable subscriber because it does not have an assigned ClientID."));
0491:
0492:                return createDurableSubscriber(topic, name, null, false);
0493:            }
0494:
0495:            /**
0496:             * Creates a subscriber to receive messages.
0497:             *
0498:             * @param topic the topic to receive messages from.
0499:             * @param messageSelector topic to restrict the messages.
0500:             * @param noLocal if true, don't receive messages we've sent
0501:             */
0502:            public TopicSubscriber createDurableSubscriber(Topic topic,
0503:                    String name, String messageSelector, boolean noLocal)
0504:                    throws JMSException {
0505:                checkOpen();
0506:
0507:                if (topic == null)
0508:                    throw new InvalidDestinationException(
0509:                            L
0510:                                    .l("destination is null.  Destination may not be null for Session.createDurableSubscriber"));
0511:
0512:                if (!(topic instanceof  AbstractTopic))
0513:                    throw new InvalidDestinationException(
0514:                            L
0515:                                    .l(
0516:                                            "'{0}' is an unknown destination.  The destination must be a Resin JMS Destination.",
0517:                                            topic));
0518:
0519:                AbstractTopic topicImpl = (AbstractTopic) topic;
0520:
0521:                if (_connection.getDurableSubscriber(name) != null) {
0522:                    // jms/2130
0523:                    // unsubscribe(name);
0524:                    /*
0525:                    throw new JMSException(L.l("'{0}' is already an active durable subscriber",
0526:                    		 name));
0527:                     */
0528:                }
0529:
0530:                AbstractQueue queue = topicImpl.createSubscriber(this , name,
0531:                        noLocal);
0532:
0533:                TopicSubscriberImpl consumer;
0534:                consumer = new TopicSubscriberImpl(this , topicImpl, queue,
0535:                        messageSelector, noLocal);
0536:
0537:                _connection.putDurableSubscriber(name, consumer);
0538:
0539:                addConsumer(consumer);
0540:
0541:                return consumer;
0542:            }
0543:
0544:            /**
0545:             * Unsubscribe from a durable subscription.
0546:             */
0547:            public void unsubscribe(String name) throws JMSException {
0548:                checkOpen();
0549:
0550:                if (name == null)
0551:                    throw new InvalidDestinationException(
0552:                            L
0553:                                    .l("destination is null.  Destination may not be null for Session.unsubscribe"));
0554:
0555:                TopicSubscriber subscriber = _connection
0556:                        .removeDurableSubscriber(name);
0557:
0558:                if (subscriber == null)
0559:                    throw new InvalidDestinationException(
0560:                            L
0561:                                    .l(
0562:                                            "'{0}' is an unknown subscriber for Session.unsubscribe",
0563:                                            name));
0564:
0565:                subscriber.close();
0566:            }
0567:
0568:            /**
0569:             * Starts the session.
0570:             */
0571:            void start() {
0572:                if (log.isLoggable(Level.FINE))
0573:                    log.fine(toString() + " active");
0574:
0575:                notifyMessageAvailable();
0576:            }
0577:
0578:            /**
0579:             * Stops the session.
0580:             */
0581:            void stop() {
0582:                if (log.isLoggable(Level.FINE))
0583:                    log.fine(toString() + " stopping");
0584:
0585:                synchronized (_consumers) {
0586:                    long timeout = Alarm.getCurrentTime() + SHUTDOWN_WAIT_TIME;
0587:                    while (_isRunning && Alarm.getCurrentTime() < timeout) {
0588:                        try {
0589:                            _consumers.wait(SHUTDOWN_WAIT_TIME);
0590:
0591:                            if (Alarm.isTest()) {
0592:                                return;
0593:                            }
0594:                        } catch (Throwable e) {
0595:                            log.log(Level.FINER, e.toString(), e);
0596:                        }
0597:                    }
0598:
0599:                    ArrayList<MessageConsumerImpl> consumers = new ArrayList<MessageConsumerImpl>(
0600:                            _consumers);
0601:
0602:                    for (MessageConsumerImpl consumer : consumers) {
0603:                        try {
0604:                            // XXX: should be stop()?
0605:
0606:                            consumer.stop();
0607:                        } catch (Throwable e) {
0608:                            log.log(Level.FINE, e.toString(), e);
0609:                        }
0610:                    }
0611:                }
0612:            }
0613:
0614:            /**
0615:             * Commits the messages.
0616:             */
0617:            public void commit() throws JMSException {
0618:                checkOpen();
0619:
0620:                commit(false);
0621:            }
0622:
0623:            /**
0624:             * Commits the messages.
0625:             */
0626:            private void commit(boolean isXA) throws JMSException {
0627:                _xid = null;
0628:
0629:                if (!_isTransacted && !_isXA)
0630:                    throw new IllegalStateException(
0631:                            L
0632:                                    .l("commit() can only be called on a transacted session."));
0633:
0634:                _isXA = false;
0635:
0636:                ArrayList<TransactedMessage> messages = _transactedMessages;
0637:                if (messages != null) {
0638:                    try {
0639:                        for (int i = 0; i < messages.size(); i++) {
0640:                            messages.get(i).commit();
0641:                        }
0642:                    } finally {
0643:                        messages.clear();
0644:                    }
0645:                }
0646:
0647:                if (!isXA)
0648:                    acknowledge();
0649:            }
0650:
0651:            /**
0652:             * Acknowledge received
0653:             */
0654:            public void acknowledge() throws JMSException {
0655:                checkOpen();
0656:
0657:                if (_transactedMessages != null) {
0658:                    for (int i = _transactedMessages.size() - 1; i >= 0; i--) {
0659:                        TransactedMessage msg = _transactedMessages.get(i);
0660:
0661:                        if (msg instanceof  ReceiveMessage) {
0662:                            _transactedMessages.remove(i);
0663:
0664:                            msg.commit();
0665:                        }
0666:                    }
0667:                }
0668:            }
0669:
0670:            /**
0671:             * Recovers the messages.
0672:             */
0673:            public void recover() throws JMSException {
0674:                checkOpen();
0675:
0676:                if (_isTransacted)
0677:                    throw new IllegalStateException(
0678:                            L
0679:                                    .l("recover() may not be called on a transacted session."));
0680:
0681:                if (_transactedMessages != null) {
0682:                    for (int i = _transactedMessages.size() - 1; i >= 0; i--) {
0683:                        TransactedMessage msg = _transactedMessages.get(i);
0684:
0685:                        if (msg instanceof  ReceiveMessage) {
0686:                            _transactedMessages.remove(i);
0687:
0688:                            msg.rollback();
0689:                        }
0690:                    }
0691:                }
0692:            }
0693:
0694:            /**
0695:             * Rollsback the messages.
0696:             */
0697:            public void rollback() throws JMSException {
0698:                checkOpen();
0699:
0700:                rollbackImpl();
0701:            }
0702:
0703:            /**
0704:             * Rollsback the messages.
0705:             */
0706:            public void rollbackImpl() throws JMSException {
0707:                if (!_isTransacted && !_isXA)
0708:                    throw new IllegalStateException(
0709:                            L
0710:                                    .l("rollback() can only be called on a transacted session."));
0711:
0712:                if (_transactedMessages != null) {
0713:                    for (int i = 0; i < _transactedMessages.size(); i++)
0714:                        _transactedMessages.get(i).rollback();
0715:
0716:                    _transactedMessages.clear();
0717:                }
0718:            }
0719:
0720:            /**
0721:             * Closes the session
0722:             */
0723:            public void close() throws JMSException {
0724:                if (_isClosed)
0725:                    return;
0726:
0727:                try {
0728:                    stop();
0729:                } catch (Exception e) {
0730:                    log.log(Level.WARNING, e.toString(), e);
0731:                }
0732:
0733:                ArrayList<TransactedMessage> messages = _transactedMessages;
0734:
0735:                if (messages != null && _xid == null) {
0736:                    _transactedMessages = null;
0737:
0738:                    try {
0739:                        for (int i = 0; i < messages.size(); i++) {
0740:                            messages.get(i).close();
0741:                        }
0742:                    } catch (Exception e) {
0743:                        log.log(Level.WARNING, e.toString(), e);
0744:                    }
0745:                }
0746:
0747:                for (int i = 0; i < _consumers.size(); i++) {
0748:                    MessageConsumerImpl consumer = _consumers.get(i);
0749:
0750:                    try {
0751:                        consumer.close();
0752:                    } catch (Exception e) {
0753:                        log.log(Level.WARNING, e.toString(), e);
0754:                    }
0755:                }
0756:
0757:                try {
0758:                    _connection.removeSession(this );
0759:                } finally {
0760:                    _isClosed = true;
0761:                }
0762:
0763:                _classLoader = null;
0764:            }
0765:
0766:            protected void addConsumer(MessageConsumerImpl consumer) {
0767:                _consumers.add(consumer);
0768:
0769:                notifyMessageAvailable();
0770:            }
0771:
0772:            protected void removeConsumer(MessageConsumerImpl consumer) {
0773:                if (_consumers != null)
0774:                    _consumers.remove(consumer);
0775:            }
0776:
0777:            /**
0778:             * Notifies the receiver.
0779:             */
0780:            boolean notifyMessageAvailable() {
0781:                synchronized (_consumers) {
0782:                    _hasMessage = true;
0783:
0784:                    if (_isRunning || !_isAsynchronous || !isActive())
0785:                        return false;
0786:
0787:                    _isRunning = true;
0788:                }
0789:
0790:                ThreadPool.getThreadPool().schedule(this );
0791:                // the yield is only needed for the regressions
0792:                Thread.yield();
0793:
0794:                return true;
0795:            }
0796:
0797:            /**
0798:             * Adds a message to the session message queue.
0799:             */
0800:            public void send(AbstractDestination queue, Message appMessage,
0801:                    int deliveryMode, int priority, long timeout)
0802:                    throws JMSException {
0803:                checkOpen();
0804:
0805:                if (queue == null)
0806:                    throw new UnsupportedOperationException(L
0807:                            .l("empty queue is not allowed for this session."));
0808:
0809:                MessageImpl message = _messageFactory.copy(appMessage);
0810:
0811:                long now = Alarm.getExactTime();
0812:                long expiration = now + timeout;
0813:
0814:                message.setJMSMessageID(queue.generateMessageID());
0815:                if (message.getJMSDestination() == null)
0816:                    message.setJMSDestination(queue);
0817:                message.setJMSDeliveryMode(deliveryMode);
0818:                if (message.getJMSTimestamp() == 0)
0819:                    message.setJMSTimestamp(now);
0820:                if (message.getJMSExpiration() == 0)
0821:                    message.setJMSExpiration(expiration);
0822:                message.setJMSPriority(priority);
0823:
0824:                // ejb/0970
0825:
0826:                boolean isXA = false;
0827:                try {
0828:                    if (_isTransacted && _tm != null
0829:                            && _tm.getTransaction() != null)
0830:                        isXA = true;
0831:                } catch (Exception e) {
0832:                    log.log(Level.FINE, e.toString(), e);
0833:                }
0834:
0835:                if (_isTransacted || isXA) {
0836:                    if (_transactedMessages == null)
0837:                        _transactedMessages = new ArrayList<TransactedMessage>();
0838:
0839:                    TransactedMessage transMsg = new SendMessage(queue, message);
0840:
0841:                    _transactedMessages.add(transMsg);
0842:
0843:                    if (_xid == null)
0844:                        enlist();
0845:                } else {
0846:                    if (log.isLoggable(Level.FINE))
0847:                        log.fine(queue + " sending " + message);
0848:
0849:                    queue.send(this , message, expiration);
0850:                }
0851:            }
0852:
0853:            private void enlist() {
0854:                if (_tm != null) {
0855:                    try {
0856:                        Transaction trans = _tm.getTransaction();
0857:
0858:                        if (trans != null)
0859:                            trans.enlistResource(this );
0860:                    } catch (Exception e) {
0861:                        throw new RuntimeException(e);
0862:                    }
0863:                }
0864:            }
0865:
0866:            private void delist() {
0867:                if (_tm != null) {
0868:                    try {
0869:                        Transaction trans = _tm.getTransaction();
0870:
0871:                        if (trans != null)
0872:                            trans.delistResource(this , 0);
0873:                    } catch (Exception e) {
0874:                        throw new RuntimeException(e);
0875:                    }
0876:                }
0877:            }
0878:
0879:            /**
0880:             * Adds a message to the session message queue.
0881:             */
0882:            void addTransactedReceive(AbstractDestination queue,
0883:                    MessageImpl message) {
0884:                message.setSession(this );
0885:
0886:                if (_transactedMessages == null)
0887:                    _transactedMessages = new ArrayList<TransactedMessage>();
0888:
0889:                TransactedMessage transMsg = new ReceiveMessage(queue, message);
0890:
0891:                _transactedMessages.add(transMsg);
0892:
0893:                if (_tm != null && _transactedMessages.size() == 1) {
0894:                    enlist();
0895:                }
0896:            }
0897:
0898:            /**
0899:             * Called to synchronously receive a message.
0900:             */
0901:            protected Message receive(MessageConsumerImpl consumer, long timeout)
0902:                    throws JMSException {
0903:                throw new UnsupportedOperationException();
0904:                /*
0905:                checkOpen();
0906:                
0907:                if (Long.MAX_VALUE / 2 < timeout || timeout < 0)
0908:                  timeout = Long.MAX_VALUE / 2;
0909:                
0910:                long now = Alarm.getCurrentTime();
0911:                long failTime = Alarm.getCurrentTime() + timeout;
0912:                
0913:                Selector selector = consumer.getSelector();
0914:                AbstractQueue queue;
0915:                queue = (AbstractQueue) consumer.getDestination();
0916:
0917:                // 4.4.1 user's reponsibility
0918:                // checkThread();
0919:
0920:                Thread oldThread = Thread.currentThread();
0921:                try {
0922:                  // _thread = Thread.currentThread();
0923:                  
0924:                  while (! consumer.isClosed()) {
0925:                if (isActive()) {
0926:                  Message msg = queue.receive(selector);
0927:                  if (msg != null)
0928:                    return msg;
0929:                  _hasMessage = false;
0930:                }
0931:                  
0932:                long delta = failTime - Alarm.getCurrentTime();
0933:
0934:                if (delta <= 0 || _isClosed || Alarm.isTest())
0935:                  return null;
0936:
0937:                synchronized (_consumers) {
0938:                  if (! _hasMessage || ! isActive()) {
0939:                    try {
0940:                      _consumers.wait(delta);
0941:                    } catch (Throwable e) {
0942:                    }
0943:                  }
0944:                }
0945:                  }
0946:                } finally {
0947:                  // _thread = oldThread;
0948:                }
0949:
0950:                return null;
0951:                 */
0952:            }
0953:
0954:            //
0955:            // XA
0956:            //
0957:
0958:            public Session getSession() {
0959:                return this ;
0960:            }
0961:
0962:            public XAResource getXAResource() {
0963:                return this ;
0964:            }
0965:
0966:            /**
0967:             * Returns true if the specified resource has the same RM.
0968:             */
0969:            public boolean isSameRM(XAResource xa) throws XAException {
0970:                return this  == xa;
0971:            }
0972:
0973:            /**
0974:             * Sets the transaction timeout in seconds.
0975:             */
0976:            public boolean setTransactionTimeout(int timeout)
0977:                    throws XAException {
0978:                return true;
0979:            }
0980:
0981:            /**
0982:             * Gets the transaction timeout in seconds.
0983:             */
0984:            public int getTransactionTimeout() throws XAException {
0985:                return 0;
0986:            }
0987:
0988:            /**
0989:             * Called when the resource is associated with a transaction.
0990:             */
0991:            public void start(Xid xid, int flags) throws XAException {
0992:                _xid = xid;
0993:            }
0994:
0995:            /**
0996:             * Called when the resource is is done with a transaction.
0997:             */
0998:            public void end(Xid xid, int flags) throws XAException {
0999:                _xid = null;
1000:            }
1001:
1002:            /**
1003:             * Called to start the first phase of the commit.
1004:             */
1005:            public int prepare(Xid xid) throws XAException {
1006:                return 0;
1007:            }
1008:
1009:            /**
1010:             * Called to commit.
1011:             */
1012:            public void commit(Xid xid, boolean onePhase) throws XAException {
1013:                try {
1014:                    commit(true);
1015:                } catch (Exception e) {
1016:                    throw new RuntimeException(e);
1017:                } finally {
1018:                    delist();
1019:                    _isXA = false;
1020:                }
1021:            }
1022:
1023:            /**
1024:             * Called to roll back.
1025:             */
1026:            public void rollback(Xid xid) throws XAException {
1027:                try {
1028:                    rollbackImpl();
1029:                } catch (Exception e) {
1030:                    throw new RuntimeException(e);
1031:                } finally {
1032:                    delist();
1033:                    _isXA = false;
1034:                }
1035:            }
1036:
1037:            /**
1038:             * Called to forget an Xid that had a heuristic commit.
1039:             */
1040:            public void forget(Xid xid) throws XAException {
1041:            }
1042:
1043:            /**
1044:             * Called to find Xid's that need recovery.
1045:             */
1046:            public Xid[] recover(int flag) throws XAException {
1047:                return null;
1048:            }
1049:
1050:            /**
1051:             * Called to synchronously receive messages
1052:             */
1053:            public void run() {
1054:                boolean isValid = true;
1055:
1056:                while (isValid) {
1057:                    isValid = false;
1058:                    _hasMessage = false;
1059:
1060:                    try {
1061:                        for (int i = 0; i < _consumers.size(); i++) {
1062:                            MessageConsumerImpl consumer = _consumers.get(i);
1063:
1064:                            while (isActive()
1065:                                    && consumer.handleMessage(_messageListener)) {
1066:                            }
1067:                        }
1068:
1069:                        isValid = isActive();
1070:                    } finally {
1071:                        synchronized (_consumers) {
1072:                            if (!isValid)
1073:                                _isRunning = false;
1074:                            else if (!_hasMessage) {
1075:                                _isRunning = false;
1076:                                isValid = false;
1077:                            }
1078:
1079:                            // notification, e.g. for shutdown
1080:                            _consumers.notifyAll();
1081:                        }
1082:                    }
1083:                }
1084:            }
1085:
1086:            public boolean isClosed() {
1087:                return _isClosed;
1088:            }
1089:
1090:            /**
1091:             * Checks that the session is open.
1092:             */
1093:            public void checkOpen() throws javax.jms.IllegalStateException {
1094:                if (_isClosed)
1095:                    throw new javax.jms.IllegalStateException(L
1096:                            .l("session is closed"));
1097:            }
1098:
1099:            /**
1100:             * Verifies that multiple threads aren't using the session.
1101:             *
1102:             * 4.4.1 the client takes the responsibility.  There's no
1103:             * validation check.
1104:             */
1105:            void checkThread() throws JMSException {
1106:                Thread thread = _thread;
1107:
1108:                if (thread != Thread.currentThread() && thread != null) {
1109:                    Exception e = new IllegalStateException(L
1110:                            .l("Can't use session from concurrent threads."));
1111:                    log.log(Level.WARNING, e.toString(), e);
1112:                }
1113:            }
1114:
1115:            public String toString() {
1116:                String className = getClass().getName();
1117:                int p = className.lastIndexOf('.');
1118:
1119:                return className.substring(p + 1) + "[]";
1120:            }
1121:
1122:            abstract class TransactedMessage {
1123:                abstract void commit() throws JMSException;
1124:
1125:                abstract void rollback() throws JMSException;
1126:
1127:                void close() throws JMSException {
1128:                }
1129:            }
1130:
1131:            class SendMessage extends TransactedMessage {
1132:                private final AbstractDestination _queue;
1133:                private final MessageImpl _message;
1134:
1135:                SendMessage(AbstractDestination queue, MessageImpl message) {
1136:                    _queue = queue;
1137:                    _message = message;
1138:                }
1139:
1140:                void commit() throws JMSException {
1141:                    _queue.send(JmsSession.this , _message, 0);
1142:                }
1143:
1144:                void rollback() throws JMSException {
1145:                }
1146:
1147:                void close() throws JMSException {
1148:                    commit();
1149:                }
1150:            }
1151:
1152:            class ReceiveMessage extends TransactedMessage {
1153:                private final AbstractDestination _queue;
1154:                private final String _msgId;
1155:
1156:                ReceiveMessage(AbstractDestination queue, MessageImpl message) {
1157:                    _queue = queue;
1158:                    _msgId = message.getJMSMessageID();
1159:
1160:                    if (_msgId == null)
1161:                        throw new NullPointerException();
1162:                }
1163:
1164:                void commit() throws JMSException {
1165:                    _queue.acknowledge(_msgId);
1166:                }
1167:
1168:                void rollback() throws JMSException {
1169:                    _queue.rollback(_msgId);
1170:                }
1171:
1172:                void close() throws JMSException {
1173:                    rollback();
1174:                }
1175:            }
1176:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.