Source Code Cross Referenced for BasicQueue.java in  » EJB-Server-JBoss-4.2.1 » messaging » org » jboss » mq » server » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » EJB Server JBoss 4.2.1 » messaging » org.jboss.mq.server 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        /*
0002:         * JBoss, Home of Professional Open Source.
0003:         * Copyright 2006, Red Hat Middleware LLC, and individual contributors
0004:         * as indicated by the @author tags. See the copyright.txt file in the
0005:         * distribution for a full listing of individual contributors.
0006:         *
0007:         * This is free software; you can redistribute it and/or modify it
0008:         * under the terms of the GNU Lesser General Public License as
0009:         * published by the Free Software Foundation; either version 2.1 of
0010:         * the License, or (at your option) any later version.
0011:         *
0012:         * This software is distributed in the hope that it will be useful,
0013:         * but WITHOUT ANY WARRANTY; without even the implied warranty of
0014:         * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
0015:         * Lesser General Public License for more details.
0016:         *
0017:         * You should have received a copy of the GNU Lesser General Public
0018:         * License along with this software; if not, write to the Free
0019:         * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
0020:         * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
0021:         */
0022:        package org.jboss.mq.server;
0023:
0024:        import java.util.ArrayList;
0025:        import java.util.Date;
0026:        import java.util.HashMap;
0027:        import java.util.HashSet;
0028:        import java.util.Iterator;
0029:        import java.util.LinkedList;
0030:        import java.util.List;
0031:        import java.util.Map;
0032:        import java.util.Set;
0033:        import java.util.SortedSet;
0034:        import java.util.TreeSet;
0035:
0036:        import javax.jms.IllegalStateException;
0037:        import javax.jms.JMSException;
0038:
0039:        import org.jboss.logging.Logger;
0040:        import org.jboss.mq.AcknowledgementRequest;
0041:        import org.jboss.mq.DestinationFullException;
0042:        import org.jboss.mq.SpyDestination;
0043:        import org.jboss.mq.SpyJMSException;
0044:        import org.jboss.mq.SpyMessage;
0045:        import org.jboss.mq.Subscription;
0046:        import org.jboss.mq.pm.Tx;
0047:        import org.jboss.mq.pm.TxManager;
0048:        import org.jboss.mq.selectors.Selector;
0049:        import org.jboss.util.NestedRuntimeException;
0050:        import org.jboss.util.timeout.Timeout;
0051:        import org.jboss.util.timeout.TimeoutTarget;
0052:
0053:        import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
0054:        import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArraySet;
0055:
0056:        /**
0057:         *  This class represents a queue which provides it's messages exclusively to one
0058:         *  consumer at a time.<p>
0059:         *
0060:         * Notes about synchronization: Much of the work is synchronized on
0061:         * the receivers or messages depending on the work performed.
0062:         * However, anything to do with unacknowledged messages and removed
0063:         * subscriptions must be done synchronized on both (receivers first).
0064:         * This is because there are multiple entry points with the possibility
0065:         * that a message acknowledgement (or NACK) is being processed at
0066:         * the same time as a network failure removes the subscription.
0067:         *
0068:         *
0069:         * @author     Hiram Chirino (Cojonudo14@hotmail.com)
0070:         * @author     Norbert Lataille (Norbert.Lataille@m4x.org)
0071:         * @author     David Maplesden (David.Maplesden@orion.co.nz)
0072:         * @author     Adrian Brock (Adrian@jboss.org)
0073:         * @created    August 16, 2001
0074:         * @version    $Revision: 61446 $
0075:         */
0076:        public class BasicQueue {
0077:            static final Logger log = Logger.getLogger(BasicQueue.class);
0078:
0079:            /** List of messages waiting to be dispatched<p>
0080:                synchronized access on itself */
0081:            SortedSet messages = new TreeSet();
0082:
0083:            /** Events by message id */
0084:            ConcurrentHashMap events = new ConcurrentHashMap();
0085:
0086:            /** The scheduled messages */
0087:            CopyOnWriteArraySet scheduledMessages = new CopyOnWriteArraySet();
0088:
0089:            /** The JMSServer object */
0090:            JMSDestinationManager server;
0091:
0092:            /** The subscribers waiting for messages - synchronized access on itself */
0093:            Receivers receivers;
0094:
0095:            /** The description used to seperate persistence for multiple subscriptions to a topic */
0096:            String description;
0097:
0098:            /** Simple Counter for gathering message add statistic data */
0099:            MessageCounter counter;
0100:
0101:            /** Unacknowledged messages AcknowledgementRequest -> UnackedMessageInfo<p>
0102:                synchronized access on receivers and messages */
0103:            HashMap unacknowledgedMessages = new HashMap();
0104:            /** Unacknowledged messages MessageRef -> UnackedMessageInfo <p>
0105:                synchronized access on receivers and messages */
0106:            HashMap unackedByMessageRef = new HashMap();
0107:            /** Unacknowledged messages Subscription -> UnackedMessageInfo <p>
0108:                synchronized access on receivers and messages */
0109:            HashMap unackedBySubscription = new HashMap();
0110:
0111:            /** Subscribers <p>
0112:             synchronized access on receivers */
0113:            HashSet subscribers = new HashSet();
0114:
0115:            /** Removed subscribers <p>
0116:                synchronized access on receivers and messages */
0117:            HashSet removedSubscribers = new HashSet();
0118:
0119:            /** The basic queue parameters */
0120:            BasicQueueParameters parameters;
0121:
0122:            /** Have we been stopped */
0123:            boolean stopped = false;
0124:
0125:            /**
0126:             * Construct a new basic queue
0127:             * 
0128:             * @param server the destination manager
0129:             * @param description a description to uniquely identify the queue
0130:             * @param parameters the basic queue parameters
0131:             * @throws JMSException for any error
0132:             */
0133:            public BasicQueue(JMSDestinationManager server, String description,
0134:                    BasicQueueParameters parameters) throws JMSException {
0135:                this .server = server;
0136:                this .description = description;
0137:                this .parameters = parameters;
0138:
0139:                Class receiversImpl = parameters.receiversImpl;
0140:                if (receiversImpl == null)
0141:                    receiversImpl = ReceiversImpl.class;
0142:
0143:                try {
0144:                    receivers = (Receivers) receiversImpl.newInstance();
0145:                } catch (Throwable t) {
0146:                    throw new SpyJMSException(
0147:                            "Error instantiating receivers implementation: "
0148:                                    + receiversImpl, t);
0149:                }
0150:            }
0151:
0152:            /**
0153:             * Retrieve the unique description for this queue
0154:             *
0155:             * @return the description
0156:             */
0157:            public String getDescription() {
0158:                return description;
0159:            }
0160:
0161:            /**
0162:             * Retrieve the number of receivers waiting for a message
0163:             *
0164:             * @return the number of receivers
0165:             */
0166:            public int getReceiversCount() {
0167:                return receivers.size();
0168:            }
0169:
0170:            /**
0171:             * Retrieve the receivers waiting for a message
0172:             *
0173:             * @return an array of subscriptions
0174:             */
0175:            public ArrayList getReceivers() {
0176:                synchronized (receivers) {
0177:                    return receivers.listReceivers();
0178:                }
0179:            }
0180:
0181:            /**
0182:             * Test whether the queue is in use
0183:             *
0184:             * @return true when there are subscribers
0185:             */
0186:            public boolean isInUse() {
0187:                synchronized (receivers) {
0188:                    // In use if we have subscribers or there are unacknowledged messages
0189:                    return (subscribers.isEmpty() == false || getInProcessMessageCount() > 0);
0190:                }
0191:            }
0192:
0193:            /**
0194:             * Add a receiver to the queue
0195:             *
0196:             * @param sub the subscription to add
0197:             * @throws JMSException for any error
0198:             */
0199:            public void addReceiver(Subscription sub) throws JMSException {
0200:                boolean trace = log.isTraceEnabled();
0201:                if (trace)
0202:                    log.trace("addReceiver " + sub + " " + this );
0203:
0204:                MessageReference found = null;
0205:                synchronized (messages) {
0206:                    if (messages.size() != 0) {
0207:                        for (Iterator it = messages.iterator(); it.hasNext();) {
0208:                            MessageReference message = (MessageReference) it
0209:                                    .next();
0210:                            try {
0211:                                if (message.isExpired()) {
0212:                                    it.remove();
0213:                                    expireMessageAsync(message);
0214:                                } else if (sub.accepts(message.getHeaders())) {
0215:                                    //queue message for sending to this sub
0216:                                    it.remove();
0217:                                    found = message;
0218:                                    break;
0219:                                }
0220:                            } catch (JMSException ignore) {
0221:                                log
0222:                                        .info(
0223:                                                "Caught unusual exception in addToReceivers.",
0224:                                                ignore);
0225:                            }
0226:                        }
0227:                    }
0228:                }
0229:                if (found != null)
0230:                    queueMessageForSending(sub, found);
0231:                else
0232:                    addToReceivers(sub);
0233:            }
0234:
0235:            /**
0236:             * Get the subscribers
0237:             * 
0238:             * @return the subscribers
0239:             */
0240:            public Set getSubscribers() {
0241:                synchronized (receivers) {
0242:                    return (Set) subscribers.clone();
0243:                }
0244:            }
0245:
0246:            /**
0247:             * Add a subscription from the queue
0248:             *
0249:             * @param sub the subscription to add
0250:             * @throws JMSException for any error
0251:             */
0252:            public void addSubscriber(Subscription sub) throws JMSException {
0253:                boolean trace = log.isTraceEnabled();
0254:                if (trace)
0255:                    log.trace("addSubscriber " + sub + " " + this );
0256:                synchronized (receivers) {
0257:                    if (stopped)
0258:                        throw new IllegalStateException(
0259:                                "The destination is stopped "
0260:                                        + getDescription());
0261:                    subscribers.add(sub);
0262:                }
0263:            }
0264:
0265:            /**
0266:             * Removes a subscription from the queue
0267:             *
0268:             * @param sub the subscription to remove
0269:             */
0270:            public void removeSubscriber(Subscription sub) {
0271:                boolean trace = log.isTraceEnabled();
0272:                if (trace)
0273:                    log.trace("removeSubscriber " + sub + " " + this );
0274:                synchronized (receivers) {
0275:                    removeReceiver(sub);
0276:                    synchronized (messages) {
0277:                        if (hasUnackedMessages(sub)) {
0278:                            if (trace)
0279:                                log
0280:                                        .trace("Delaying removal of subscriber is has unacked messages "
0281:                                                + sub);
0282:                            removedSubscribers.add(sub);
0283:                        } else {
0284:                            if (trace)
0285:                                log.trace("Removing subscriber " + sub);
0286:                            subscribers.remove(sub);
0287:                            ((ClientConsumer) sub.clientConsumer)
0288:                                    .removeRemovedSubscription(sub.subscriptionId);
0289:                        }
0290:                    }
0291:                }
0292:            }
0293:
0294:            /**
0295:             * Retrieve the queue depth
0296:             *
0297:             * @return the number of messages in the queue
0298:             */
0299:            public int getQueueDepth() {
0300:                return messages.size();
0301:            }
0302:
0303:            /**
0304:             * Returns the number of scheduled messages in the queue
0305:             * 
0306:             * @return the scheduled message count
0307:             */
0308:            public int getScheduledMessageCount() {
0309:                return scheduledMessages.size();
0310:            }
0311:
0312:            /**
0313:             * Returns the number of in process messages for the queue
0314:             * 
0315:             * @return the in process count
0316:             */
0317:            public int getInProcessMessageCount() {
0318:                synchronized (messages) {
0319:                    return unacknowledgedMessages.size();
0320:                }
0321:            }
0322:
0323:            /**
0324:             * Add a message to the queue
0325:             *
0326:             * @param mes the message reference
0327:             * @param txId the transaction
0328:             * @throws JMSException for any error
0329:             */
0330:            public void addMessage(MessageReference mes, Tx txId)
0331:                    throws JMSException {
0332:                boolean trace = log.isTraceEnabled();
0333:                if (trace)
0334:                    log.trace("addMessage " + mes + " " + txId + " " + this );
0335:
0336:                try {
0337:                    synchronized (receivers) {
0338:                        if (stopped)
0339:                            throw new IllegalStateException(
0340:                                    "The destination is stopped "
0341:                                            + getDescription());
0342:                    }
0343:
0344:                    if (parameters.maxDepth > 0) {
0345:                        synchronized (messages) {
0346:                            if (messages.size() >= parameters.maxDepth) {
0347:                                dropMessage(mes);
0348:                                String message = "Maximum size "
0349:                                        + parameters.maxDepth
0350:                                        + " exceeded for " + description;
0351:                                log.warn(message);
0352:                                throw new DestinationFullException(message);
0353:                            }
0354:                        }
0355:                    }
0356:
0357:                    performOrPrepareAddMessage(mes, txId);
0358:                } catch (Throwable t) {
0359:                    String error = "Error in addMessage " + mes;
0360:                    log.trace(error, t);
0361:                    dropMessage(mes, txId);
0362:                    SpyJMSException.rethrowAsJMSException(error, t);
0363:                }
0364:            }
0365:
0366:            /**
0367:             * Either perform or prepare the add message 
0368:             * 
0369:             * @param mes the message reference
0370:             * @param txId the transaction id
0371:             * @throws Exception for any error
0372:             */
0373:            protected void performOrPrepareAddMessage(MessageReference mes,
0374:                    Tx txId) throws Exception {
0375:                TxManager txManager = server.getPersistenceManager()
0376:                        .getTxManager();
0377:
0378:                // The message is removed from the cache on a rollback
0379:                Runnable task = new AddMessagePostRollBackTask(mes);
0380:                txManager.addPostRollbackTask(txId, task);
0381:
0382:                // The message gets added to the queue after the transaction commits
0383:                task = new AddMessagePostCommitTask(mes);
0384:                txManager.addPostCommitTask(txId, task);
0385:            }
0386:
0387:            /**
0388:             * Restores a message.
0389:             * 
0390:             * @param mes the message reference
0391:             */
0392:            public void restoreMessage(MessageReference mes) {
0393:                restoreMessage(mes, null, Tx.UNKNOWN);
0394:            }
0395:
0396:            /**
0397:             * Restores a message.
0398:             * 
0399:             * @param mes the message reference
0400:             * @param txid the transaction id
0401:             * @param type the type of restoration
0402:             */
0403:            public void restoreMessage(MessageReference mes, Tx txid, int type) {
0404:                boolean trace = log.isTraceEnabled();
0405:                if (trace)
0406:                    log.trace("restoreMessage " + mes + " " + this  + " txid="
0407:                            + txid + " type=" + type);
0408:
0409:                try {
0410:                    if (txid == null) {
0411:                        internalAddMessage(mes);
0412:                    } else if (type == Tx.ADD) {
0413:                        performOrPrepareAddMessage(mes, txid);
0414:                    } else if (type == Tx.REMOVE) {
0415:                        performOrPrepareAcknowledgeMessage(mes, txid);
0416:                    } else {
0417:                        throw new IllegalStateException("Unknown restore type "
0418:                                + type + " for message " + mes + " txid="
0419:                                + txid);
0420:                    }
0421:                } catch (RuntimeException e) {
0422:                    throw e;
0423:                } catch (Exception e) {
0424:                    throw new NestedRuntimeException(
0425:                            "Unable to restore message " + mes, e);
0426:                }
0427:            }
0428:
0429:            /**
0430:             * Nacks a message.
0431:             */
0432:            protected void nackMessage(MessageReference message) {
0433:                if (log.isTraceEnabled())
0434:                    log.trace("Restoring message: " + message);
0435:
0436:                try {
0437:                    message.redelivered();
0438:                    // Set redelivered, vendor-specific flags
0439:                    message.invalidate();
0440:                    // Update the persistent message outside the transaction
0441:                    // We want to know the message might have been delivered regardless
0442:                    if (message.isPersistent())
0443:                        server.getPersistenceManager().update(message, null);
0444:                } catch (JMSException e) {
0445:                    log.error("Caught unusual exception in nackMessage for "
0446:                            + message, e);
0447:                }
0448:
0449:                internalAddMessage(message);
0450:            }
0451:
0452:            /**
0453:             * Browse the queue
0454:             *
0455:             * @param selector the selector to apply, pass null for
0456:             *                 all messages
0457:             * @return the messages
0458:             * @throws JMSException for any error
0459:             */
0460:            public SpyMessage[] browse(String selector) throws JMSException {
0461:                if (selector == null) {
0462:                    SpyMessage list[];
0463:                    synchronized (messages) {
0464:                        list = new SpyMessage[messages.size()];
0465:                        Iterator iter = messages.iterator();
0466:                        for (int i = 0; iter.hasNext(); i++)
0467:                            list[i] = ((MessageReference) iter.next())
0468:                                    .getMessageForDelivery();
0469:                    }
0470:                    return list;
0471:                } else {
0472:                    Selector s = new Selector(selector);
0473:                    LinkedList selection = new LinkedList();
0474:
0475:                    synchronized (messages) {
0476:                        Iterator i = messages.iterator();
0477:                        while (i.hasNext()) {
0478:                            MessageReference m = (MessageReference) i.next();
0479:                            if (s.test(m.getHeaders()))
0480:                                selection.add(m.getMessageForDelivery());
0481:                        }
0482:                    }
0483:
0484:                    SpyMessage list[];
0485:                    list = new SpyMessage[selection.size()];
0486:                    list = (SpyMessage[]) selection.toArray(list);
0487:                    return list;
0488:                }
0489:            }
0490:
0491:            /**
0492:             * Browse the scheduled messages
0493:             *
0494:             * @param selector the selector to apply, pass null for
0495:             *                 all messages
0496:             * @return the messages
0497:             * @throws JMSException for any error
0498:             */
0499:            public List browseScheduled(String selector) throws JMSException {
0500:                if (selector == null) {
0501:                    ArrayList list;
0502:                    synchronized (messages) {
0503:                        list = new ArrayList(scheduledMessages.size());
0504:                        Iterator iter = scheduledMessages.iterator();
0505:                        while (iter.hasNext()) {
0506:                            MessageReference ref = (MessageReference) iter
0507:                                    .next();
0508:                            list.add(ref.getMessageForDelivery());
0509:                        }
0510:                    }
0511:                    return list;
0512:                } else {
0513:                    Selector s = new Selector(selector);
0514:                    LinkedList selection = new LinkedList();
0515:
0516:                    synchronized (messages) {
0517:                        Iterator iter = scheduledMessages.iterator();
0518:                        while (iter.hasNext()) {
0519:                            MessageReference ref = (MessageReference) iter
0520:                                    .next();
0521:                            if (s.test(ref.getHeaders()))
0522:                                selection.add(ref.getMessageForDelivery());
0523:                        }
0524:                    }
0525:
0526:                    return selection;
0527:                }
0528:            }
0529:
0530:            /**
0531:             * Browse the in process messages
0532:             *
0533:             * @param selector the selector to apply, pass null for
0534:             *                 all messages
0535:             * @return the messages
0536:             * @throws JMSException for any error
0537:             */
0538:            public List browseInProcess(String selector) throws JMSException {
0539:                if (selector == null) {
0540:                    ArrayList list;
0541:                    synchronized (messages) {
0542:                        list = new ArrayList(unacknowledgedMessages.size());
0543:                        Iterator iter = unacknowledgedMessages.values()
0544:                                .iterator();
0545:                        while (iter.hasNext()) {
0546:                            UnackedMessageInfo unacked = (UnackedMessageInfo) iter
0547:                                    .next();
0548:                            MessageReference ref = unacked.messageRef;
0549:                            list.add(ref.getMessageForDelivery());
0550:                        }
0551:                    }
0552:                    return list;
0553:                } else {
0554:                    Selector s = new Selector(selector);
0555:                    LinkedList selection = new LinkedList();
0556:
0557:                    synchronized (messages) {
0558:                        Iterator iter = unacknowledgedMessages.values()
0559:                                .iterator();
0560:                        while (iter.hasNext()) {
0561:                            UnackedMessageInfo unacked = (UnackedMessageInfo) iter
0562:                                    .next();
0563:                            MessageReference ref = unacked.messageRef;
0564:                            if (s.test(ref.getHeaders()))
0565:                                selection.add(ref.getMessageForDelivery());
0566:                        }
0567:                    }
0568:
0569:                    return selection;
0570:                }
0571:            }
0572:
0573:            /**
0574:             * Receive a message from the queue
0575:             *
0576:             * @param sub the subscription requiring a message
0577:             * @param wait whether to wait for a message
0578:             * @return the message
0579:             * @throws JMSException for any error
0580:             */
0581:            public SpyMessage receive(Subscription sub, boolean wait)
0582:                    throws JMSException {
0583:                boolean trace = log.isTraceEnabled();
0584:                if (trace)
0585:                    log.trace("receive " + sub + " wait=" + wait + " " + this );
0586:
0587:                MessageReference messageRef = null;
0588:                synchronized (receivers) {
0589:                    if (stopped)
0590:                        throw new IllegalStateException(
0591:                                "The destination is stopped "
0592:                                        + getDescription());
0593:                    // If the subscription is not picky, the first message will be it
0594:                    if (sub.getSelector() == null && sub.noLocal == false) {
0595:                        synchronized (messages) {
0596:                            // find a non-expired message
0597:                            while (messages.size() != 0) {
0598:                                messageRef = (MessageReference) messages
0599:                                        .first();
0600:                                messages.remove(messageRef);
0601:
0602:                                if (messageRef.isExpired()) {
0603:                                    expireMessageAsync(messageRef);
0604:                                    messageRef = null;
0605:                                } else
0606:                                    break;
0607:                            }
0608:                        }
0609:                    } else {
0610:                        // The subscription is picky, so we have to iterate.
0611:                        synchronized (messages) {
0612:                            Iterator i = messages.iterator();
0613:                            while (i.hasNext()) {
0614:                                MessageReference mr = (MessageReference) i
0615:                                        .next();
0616:                                if (mr.isExpired()) {
0617:                                    i.remove();
0618:                                    expireMessageAsync(mr);
0619:                                } else if (sub.accepts(mr.getHeaders())) {
0620:                                    messageRef = mr;
0621:                                    i.remove();
0622:                                    break;
0623:                                }
0624:                            }
0625:                        }
0626:                    }
0627:
0628:                    if (messageRef == null) {
0629:                        if (wait)
0630:                            addToReceivers(sub);
0631:                    } else {
0632:                        setupMessageAcknowledgement(sub, messageRef);
0633:                    }
0634:                }
0635:
0636:                if (messageRef == null)
0637:                    return null;
0638:                return messageRef.getMessageForDelivery();
0639:            }
0640:
0641:            /**
0642:             * Acknowledge a message
0643:             *
0644:             * @param item the acknowledgement request
0645:             * @param txId the transaction
0646:             * @throws JMSException for any error
0647:             */
0648:            public void acknowledge(AcknowledgementRequest item, Tx txId)
0649:                    throws JMSException {
0650:                boolean trace = log.isTraceEnabled();
0651:                if (trace)
0652:                    log.trace("acknowledge " + item + " " + txId + " " + this );
0653:
0654:                UnackedMessageInfo unacked = null;
0655:                synchronized (messages) {
0656:                    unacked = (UnackedMessageInfo) unacknowledgedMessages
0657:                            .remove(item);
0658:                    if (unacked == null)
0659:                        return;
0660:                    unackedByMessageRef.remove(unacked.messageRef);
0661:                    HashMap map = (HashMap) unackedBySubscription
0662:                            .get(unacked.sub);
0663:                    if (map != null)
0664:                        map.remove(unacked.messageRef);
0665:                    if (map == null || map.isEmpty())
0666:                        unackedBySubscription.remove(unacked.sub);
0667:                }
0668:
0669:                MessageReference m = unacked.messageRef;
0670:
0671:                // Was it a negative acknowledge??
0672:                if (!item.isAck) {
0673:                    Runnable task = new RestoreMessageTask(m);
0674:                    server.getPersistenceManager().getTxManager()
0675:                            .addPostCommitTask(txId, task);
0676:                    server.getPersistenceManager().getTxManager()
0677:                            .addPostRollbackTask(txId, task);
0678:                } else {
0679:                    try {
0680:                        if (m.isPersistent())
0681:                            server.getPersistenceManager().remove(m, txId);
0682:                    } catch (Throwable t) {
0683:                        // Something is wrong with the persistence manager,
0684:                        // force a NACK with a rollback/error
0685:                        Runnable task = new RestoreMessageTask(m);
0686:                        server.getPersistenceManager().getTxManager()
0687:                                .addPostCommitTask(txId, task);
0688:                        server.getPersistenceManager().getTxManager()
0689:                                .addPostRollbackTask(txId, task);
0690:                        SpyJMSException.rethrowAsJMSException(
0691:                                "Error during ACK ref=" + m, t);
0692:                    }
0693:
0694:                    performOrPrepareAcknowledgeMessage(m, txId);
0695:                }
0696:
0697:                synchronized (receivers) {
0698:                    synchronized (messages) {
0699:                        checkRemovedSubscribers(unacked.sub);
0700:                    }
0701:                }
0702:            }
0703:
0704:            /**
0705:             * Either perform or prepare the acknowledge message 
0706:             * 
0707:             * @param mes the message reference
0708:             * @param txId the transaction id
0709:             * @throws Exception for any error
0710:             */
0711:            protected void performOrPrepareAcknowledgeMessage(
0712:                    MessageReference mes, Tx txId) throws JMSException {
0713:                TxManager txManager = server.getPersistenceManager()
0714:                        .getTxManager();
0715:
0716:                // The message is restored to the queue on a rollback
0717:                Runnable task = new RestoreMessageTask(mes);
0718:                txManager.addPostRollbackTask(txId, task);
0719:
0720:                // The message is fully removed after the transaction commits
0721:                task = new RemoveMessageTask(mes);
0722:                txManager.addPostCommitTask(txId, task);
0723:            }
0724:
0725:            /**
0726:             * Nack all messages for a subscription
0727:             *
0728:             * @param sub the subscription
0729:             */
0730:            public void nackMessages(Subscription sub) {
0731:                boolean trace = log.isTraceEnabled();
0732:                if (trace)
0733:                    log.trace("nackMessages " + sub + " " + this );
0734:
0735:                // Send nacks for unacknowledged messages
0736:                synchronized (receivers) {
0737:                    synchronized (messages) {
0738:                        int count = 0;
0739:                        HashMap map = (HashMap) unackedBySubscription.get(sub);
0740:                        if (map != null) {
0741:                            Iterator i = ((HashMap) map.clone()).values()
0742:                                    .iterator();
0743:                            while (i.hasNext()) {
0744:                                AcknowledgementRequest item = (AcknowledgementRequest) i
0745:                                        .next();
0746:                                try {
0747:                                    acknowledge(item, null);
0748:                                    count++;
0749:                                } catch (JMSException ignore) {
0750:                                    log.debug(
0751:                                            "Unable to nack message: " + item,
0752:                                            ignore);
0753:                                }
0754:                            }
0755:                            if (log.isDebugEnabled())
0756:                                log.debug("Nacked " + count
0757:                                        + " messages for removed subscription "
0758:                                        + sub);
0759:                        }
0760:                    }
0761:                }
0762:            }
0763:
0764:            public void removeAllMessages() throws JMSException {
0765:                boolean trace = log.isTraceEnabled();
0766:                if (trace)
0767:                    log.trace("removeAllMessages " + this );
0768:
0769:                // Drop scheduled messages
0770:                for (Iterator i = events.entrySet().iterator(); i.hasNext();) {
0771:                    Map.Entry entry = (Map.Entry) i.next();
0772:                    MessageReference message = (MessageReference) entry
0773:                            .getKey();
0774:                    Timeout timeout = (Timeout) entry.getValue();
0775:                    if (timeout != null) {
0776:                        timeout.cancel();
0777:                        i.remove();
0778:                        dropMessage(message);
0779:                    }
0780:                }
0781:                scheduledMessages.clear();
0782:
0783:                synchronized (receivers) {
0784:                    synchronized (messages) {
0785:                        Iterator i = ((HashMap) unacknowledgedMessages.clone())
0786:                                .keySet().iterator();
0787:                        while (i.hasNext()) {
0788:                            AcknowledgementRequest item = (AcknowledgementRequest) i
0789:                                    .next();
0790:                            try {
0791:                                acknowledge(item, null);
0792:                            } catch (JMSException ignore) {
0793:                            }
0794:                        }
0795:
0796:                        // Remove all remaining messages
0797:                        i = messages.iterator();
0798:                        while (i.hasNext()) {
0799:                            MessageReference message = (MessageReference) i
0800:                                    .next();
0801:                            i.remove();
0802:                            dropMessage(message);
0803:                        }
0804:                    }
0805:                }
0806:            }
0807:
0808:            public void stop() {
0809:                HashSet subs;
0810:                synchronized (receivers) {
0811:                    stopped = true;
0812:                    subs = new HashSet(subscribers);
0813:                    if (log.isTraceEnabled())
0814:                        log.trace("Stopping " + this  + " with subscribers "
0815:                                + subs);
0816:                    clearEvents();
0817:                }
0818:
0819:                for (Iterator i = subs.iterator(); i.hasNext();) {
0820:                    Subscription sub = (Subscription) i.next();
0821:                    ClientConsumer consumer = (ClientConsumer) sub.clientConsumer;
0822:                    try {
0823:                        consumer.removeSubscription(sub.subscriptionId);
0824:                    } catch (Throwable t) {
0825:                        log.warn("Error during stop - removing subscriber "
0826:                                + sub, t);
0827:                    }
0828:                    nackMessages(sub);
0829:                }
0830:
0831:                MessageCache cache = server.getMessageCache();
0832:                synchronized (messages) {
0833:                    for (Iterator i = messages.iterator(); i.hasNext();) {
0834:                        MessageReference message = (MessageReference) i.next();
0835:                        try {
0836:                            cache.remove(message);
0837:                        } catch (JMSException ignored) {
0838:                            log
0839:                                    .trace(
0840:                                            "Ignored error removing message from cache",
0841:                                            ignored);
0842:                        }
0843:                    }
0844:                }
0845:
0846:                // Help the garbage collector
0847:                messages.clear();
0848:                unacknowledgedMessages.clear();
0849:                unackedByMessageRef.clear();
0850:                unackedBySubscription.clear();
0851:                subscribers.clear();
0852:                removedSubscribers.clear();
0853:            }
0854:
0855:            /**
0856:             * Create message counter object
0857:             * 
0858:             * @param name             topic/queue name
0859:             * @param subscription     topic subscription
0860:             * @param topic            topic flag
0861:             * @param durable          durable subscription flag
0862:             * @param daycountmax      message history day count limit
0863:             *                           0: disabled,
0864:             *                          >0: max day count,
0865:             *                          <0: unlimited
0866:             */
0867:            public void createMessageCounter(String name, String subscription,
0868:                    boolean topic, boolean durable, int daycountmax) {
0869:                // create message counter object
0870:                counter = new MessageCounter(name, subscription, this , topic,
0871:                        durable, daycountmax);
0872:            }
0873:
0874:            /**
0875:             * Get message counter object
0876:             *
0877:             * @return MessageCounter     message counter object or null 
0878:             */
0879:            public MessageCounter getMessageCounter() {
0880:                return counter;
0881:            }
0882:
0883:            public String toString() {
0884:                return super .toString() + "{id=" + description + '}';
0885:            }
0886:
0887:            /**
0888:             * Clear all the events
0889:             */
0890:            protected void clearEvents() {
0891:                for (Iterator i = events.entrySet().iterator(); i.hasNext();) {
0892:                    Map.Entry entry = (Map.Entry) i.next();
0893:                    Timeout timeout = (Timeout) entry.getValue();
0894:                    if (timeout != null) {
0895:                        timeout.cancel();
0896:                        i.remove();
0897:                    }
0898:                }
0899:                scheduledMessages.clear();
0900:            }
0901:
0902:            /**
0903:             * Clear the event for a message
0904:             * 
0905:             * @param message the message reference
0906:             */
0907:            protected void clearEvent(MessageReference message) {
0908:                Timeout timeout = (Timeout) events.remove(message);
0909:                if (timeout != null)
0910:                    timeout.cancel();
0911:                scheduledMessages.remove(message);
0912:            }
0913:
0914:            /**
0915:             * Add a receiver
0916:             *
0917:             * @param sub the receiver to add
0918:             */
0919:            protected void addToReceivers(Subscription sub) throws JMSException {
0920:                boolean trace = log.isTraceEnabled();
0921:                if (trace)
0922:                    log.trace("addReceiver " + " " + sub + " " + this );
0923:
0924:                synchronized (receivers) {
0925:                    if (stopped)
0926:                        throw new IllegalStateException(
0927:                                "The destination is stopped "
0928:                                        + getDescription());
0929:                    receivers.add(sub);
0930:                }
0931:            }
0932:
0933:            /**
0934:             * Remove a receiver
0935:             *
0936:             * @param sub the receiver to remove
0937:             */
0938:            protected void removeReceiver(Subscription sub) {
0939:                boolean trace = log.isTraceEnabled();
0940:                if (trace)
0941:                    log.trace("removeReceiver " + " " + sub + " " + this );
0942:
0943:                synchronized (receivers) {
0944:                    receivers.remove(sub);
0945:                }
0946:            }
0947:
0948:            private void addTimeout(MessageReference message, TimeoutTarget t,
0949:                    long ts) {
0950:                Timeout timeout = server.getTimeoutFactory().schedule(ts, t);
0951:                events.put(message, timeout);
0952:            }
0953:
0954:            /**
0955:             * Add a message
0956:             *
0957:             * @param message the message to add
0958:             */
0959:            private void internalAddMessage(MessageReference message) {
0960:                boolean trace = log.isTraceEnabled();
0961:                if (trace)
0962:                    log.trace("internalAddMessage " + " " + message + " "
0963:                            + this );
0964:
0965:                // If scheduled, put in timer queue
0966:                long ts = message.messageScheduledDelivery;
0967:                if (ts > 0 && ts > System.currentTimeMillis()) {
0968:                    scheduledMessages.add(message);
0969:                    addTimeout(message, new EnqueueMessageTask(message), ts);
0970:                    if (trace)
0971:                        log.trace("scheduled message at " + new Date(ts) + ": "
0972:                                + message);
0973:                    // Can't deliver now
0974:                    return;
0975:                }
0976:
0977:                // Don't bother with expired messages
0978:                if (message.isExpired()) {
0979:                    expireMessageAsync(message);
0980:                    return;
0981:                }
0982:
0983:                try {
0984:                    Subscription found = null;
0985:                    synchronized (receivers) {
0986:                        if (receivers.size() != 0) {
0987:                            for (Iterator it = receivers.iterator(); it
0988:                                    .hasNext();) {
0989:                                Subscription sub = (Subscription) it.next();
0990:                                if (sub.accepts(message.getHeaders())) {
0991:                                    it.remove();
0992:                                    found = sub;
0993:                                    break;
0994:                                }
0995:                            }
0996:                        }
0997:
0998:                        if (found == null) {
0999:                            synchronized (messages) {
1000:                                messages.add(message);
1001:
1002:                                // If a message is set to expire, and nobody wants it, put its reaper in
1003:                                // the timer queue 
1004:                                if (message.messageExpiration > 0) {
1005:                                    addTimeout(message, new ExpireMessageTask(
1006:                                            message), message.messageExpiration);
1007:                                }
1008:                            }
1009:                        }
1010:                    }
1011:
1012:                    // Queue to the receiver
1013:                    if (found != null)
1014:                        queueMessageForSending(found, message);
1015:                } catch (JMSException e) {
1016:                    // Could happen at the accepts() calls
1017:                    log.error(
1018:                            "Caught unusual exception in internalAddMessage.",
1019:                            e);
1020:                    // And drop the message, otherwise we have a leak in the cache
1021:                    dropMessage(message);
1022:                }
1023:            }
1024:
1025:            /**
1026:             * Queue a message for sending through the client consumer
1027:             *
1028:             * @param sub the subscirption to receive the message
1029:             * @param message the message reference to queue
1030:             */
1031:            protected void queueMessageForSending(Subscription sub,
1032:                    MessageReference message) {
1033:                boolean trace = log.isTraceEnabled();
1034:                if (trace)
1035:                    log.trace("queueMessageForSending " + " " + sub + " "
1036:                            + message + " " + this );
1037:
1038:                try {
1039:                    setupMessageAcknowledgement(sub, message);
1040:                    RoutedMessage r = new RoutedMessage();
1041:                    r.message = message;
1042:                    r.subscriptionId = new Integer(sub.subscriptionId);
1043:                    ((ClientConsumer) sub.clientConsumer)
1044:                            .queueMessageForSending(r);
1045:                } catch (Throwable t) {
1046:                    log
1047:                            .warn(
1048:                                    "Caught unusual exception sending message to receiver.",
1049:                                    t);
1050:                }
1051:            }
1052:
1053:            /**
1054:             * Setup a message acknowledgement
1055:             *
1056:             * @param sub the subscription receiving the message
1057:             * @param messageRef the message to be acknowledged
1058:             * @throws JMSException for any error
1059:             */
1060:            protected void setupMessageAcknowledgement(Subscription sub,
1061:                    MessageReference messageRef) throws JMSException {
1062:                SpyMessage message = messageRef.getMessage();
1063:                AcknowledgementRequest nack = new AcknowledgementRequest(false);
1064:                nack.destination = message.getJMSDestination();
1065:                nack.messageID = message.getJMSMessageID();
1066:                nack.subscriberId = sub.subscriptionId;
1067:
1068:                synchronized (messages) {
1069:                    UnackedMessageInfo unacked = new UnackedMessageInfo(
1070:                            messageRef, sub);
1071:                    unacknowledgedMessages.put(nack, unacked);
1072:                    unackedByMessageRef.put(messageRef, nack);
1073:                    HashMap map = (HashMap) unackedBySubscription.get(sub);
1074:                    if (map == null) {
1075:                        map = new HashMap();
1076:                        unackedBySubscription.put(sub, map);
1077:                    }
1078:                    map.put(messageRef, nack);
1079:                }
1080:            }
1081:
1082:            /**
1083:             * Remove a message
1084:             *
1085:             * @param message the message to remove
1086:             */
1087:            protected void dropMessage(MessageReference message) {
1088:                dropMessage(message, null);
1089:            }
1090:
1091:            /**
1092:             * Remove a message
1093:             *
1094:             * @param message the message to remove
1095:             * @param txid the transaction context for the removal
1096:             */
1097:            protected void dropMessage(MessageReference message, Tx txid) {
1098:                boolean trace = log.isTraceEnabled();
1099:                if (trace)
1100:                    log.trace("dropMessage " + this  + " txid=" + txid);
1101:
1102:                clearEvent(message);
1103:                try {
1104:                    if (message.isPersistent()) {
1105:                        try {
1106:                            server.getPersistenceManager()
1107:                                    .remove(message, txid);
1108:                        } catch (JMSException e) {
1109:                            try {
1110:                                log.warn(
1111:                                        "Message removed from queue, but not from the persistent store: "
1112:                                                + message.getMessage(), e);
1113:                            } catch (JMSException x) {
1114:                                log.warn(
1115:                                        "Message removed from queue, but not from the persistent store: "
1116:                                                + message, e);
1117:                            }
1118:                        }
1119:                    }
1120:                    server.getMessageCache().remove(message);
1121:                } catch (JMSException e) {
1122:                    log.warn("Error dropping message " + message, e);
1123:                }
1124:            }
1125:
1126:            /**
1127:             * Expire a message asynchronously.
1128:             *
1129:             * @param messageRef the message to remove
1130:             */
1131:            protected void expireMessageAsync(MessageReference messageRef) {
1132:                server.getThreadPool().run(new ExpireMessageTask(messageRef));
1133:            }
1134:
1135:            /**
1136:             * Expire a message
1137:             *
1138:             * @param messageRef the message to remove
1139:             */
1140:            protected void expireMessage(MessageReference messageRef) {
1141:                boolean trace = log.isTraceEnabled();
1142:                if (trace)
1143:                    log.trace("message expired: " + messageRef);
1144:
1145:                SpyDestination ed = parameters.expiryDestination;
1146:                if (ed == null) {
1147:                    dropMessage(messageRef);
1148:                    return;
1149:                }
1150:
1151:                if (trace)
1152:                    log.trace("sending to: " + ed);
1153:
1154:                try {
1155:                    SpyMessage orig = messageRef.getMessage();
1156:                    SpyMessage copy = orig.myClone();
1157:                    copy.header.jmsPropertiesReadWrite = true;
1158:                    copy.setJMSExpiration(0);
1159:                    copy.setJMSDestination(ed);
1160:                    copy.setLongProperty(SpyMessage.PROPERTY_ORIG_EXPIRATION,
1161:                            orig.getJMSExpiration());
1162:                    copy.setStringProperty(
1163:                            SpyMessage.PROPERTY_ORIG_DESTINATION, orig
1164:                                    .getJMSDestination().toString());
1165:                    TxManager tm = server.getPersistenceManager()
1166:                            .getTxManager();
1167:                    Tx tx = tm.createTx();
1168:                    try {
1169:                        server.addMessage(null, copy, tx);
1170:                        dropMessage(messageRef, tx);
1171:                        tm.commitTx(tx);
1172:                    } catch (JMSException e) {
1173:                        tm.rollbackTx(tx);
1174:                        throw e;
1175:                    }
1176:                } catch (JMSException e) {
1177:                    log.error("Could not move expired message: " + messageRef,
1178:                            e);
1179:                }
1180:            }
1181:
1182:            /**
1183:             * Check whether a removed subscription can be permenantly removed.
1184:             * This method is private because it assumes external synchronization
1185:             *
1186:             * @param the subscription to check
1187:             */
1188:            private void checkRemovedSubscribers(Subscription sub) {
1189:                boolean trace = log.isTraceEnabled();
1190:                if (removedSubscribers.contains(sub)
1191:                        && hasUnackedMessages(sub) == false) {
1192:                    if (trace)
1193:                        log.trace("Removing subscriber " + sub);
1194:                    removedSubscribers.remove(sub);
1195:                    subscribers.remove(sub);
1196:                    ((ClientConsumer) sub.clientConsumer)
1197:                            .removeRemovedSubscription(sub.subscriptionId);
1198:                }
1199:            }
1200:
1201:            /**
1202:             * Check whether a subscription has unacknowledged messages.
1203:             * This method is private because it assumes external synchronization
1204:             *
1205:             * @param sub the subscription to check
1206:             * @return true when it has unacknowledged messages
1207:             */
1208:            private boolean hasUnackedMessages(Subscription sub) {
1209:                return unackedBySubscription.containsKey(sub);
1210:            }
1211:
1212:            /**
1213:             * Rollback an add message
1214:             */
1215:            class AddMessagePostRollBackTask implements  Runnable {
1216:                MessageReference message;
1217:
1218:                AddMessagePostRollBackTask(MessageReference m) {
1219:                    message = m;
1220:                }
1221:
1222:                public void run() {
1223:                    try {
1224:                        server.getMessageCache().remove(message);
1225:                    } catch (JMSException e) {
1226:                        log
1227:                                .error(
1228:                                        "Could not remove message from the message cache after an add rollback: ",
1229:                                        e);
1230:                    }
1231:                }
1232:            }
1233:
1234:            /**
1235:             * Add a message to the queue
1236:             */
1237:            class AddMessagePostCommitTask implements  Runnable {
1238:                MessageReference message;
1239:
1240:                AddMessagePostCommitTask(MessageReference m) {
1241:                    message = m;
1242:                }
1243:
1244:                public void run() {
1245:                    internalAddMessage(message);
1246:
1247:                    // update message counter
1248:                    if (counter != null) {
1249:                        counter.incrementCounter();
1250:                    }
1251:                }
1252:            }
1253:
1254:            /**
1255:             * Restore a message to the queue
1256:             */
1257:            class RestoreMessageTask implements  Runnable {
1258:                MessageReference message;
1259:
1260:                RestoreMessageTask(MessageReference m) {
1261:                    message = m;
1262:                }
1263:
1264:                public void run() {
1265:                    nackMessage(message);
1266:                }
1267:            }
1268:
1269:            /**
1270:             * Remove a message
1271:             */
1272:            class RemoveMessageTask implements  Runnable {
1273:                MessageReference message;
1274:
1275:                RemoveMessageTask(MessageReference m) {
1276:                    message = m;
1277:                }
1278:
1279:                public void run() {
1280:                    try {
1281:                        clearEvent(message);
1282:                        server.getMessageCache().remove(message);
1283:                    } catch (JMSException e) {
1284:                        log
1285:                                .error(
1286:                                        "Could not remove an acknowleged message from the message cache: ",
1287:                                        e);
1288:                    }
1289:                }
1290:            }
1291:
1292:            /**
1293:             * Schedele message delivery
1294:             */
1295:            private class EnqueueMessageTask implements  TimeoutTarget {
1296:                private MessageReference messageRef;
1297:
1298:                public EnqueueMessageTask(MessageReference messageRef) {
1299:                    this .messageRef = messageRef;
1300:                }
1301:
1302:                public void timedOut(Timeout timeout) {
1303:                    if (log.isTraceEnabled())
1304:                        log.trace("scheduled message delivery: " + messageRef);
1305:                    events.remove(messageRef);
1306:                    scheduledMessages.remove(messageRef);
1307:                    internalAddMessage(messageRef);
1308:                }
1309:            }
1310:
1311:            /**
1312:             * Drop a message when it expires
1313:             */
1314:            private class ExpireMessageTask implements  TimeoutTarget, Runnable {
1315:                private MessageReference messageRef;
1316:
1317:                public ExpireMessageTask(MessageReference messageRef) {
1318:                    this .messageRef = messageRef;
1319:                }
1320:
1321:                public void timedOut(Timeout timout) {
1322:                    events.remove(messageRef);
1323:                    scheduledMessages.remove(messageRef);
1324:                    synchronized (messages) {
1325:                        // If the message was already sent, then do nothing
1326:                        // (This probably happens more than not)
1327:                        if (messages.remove(messageRef) == false)
1328:                            return;
1329:                    }
1330:                    expireMessage(messageRef);
1331:                }
1332:
1333:                public void run() {
1334:                    expireMessage(messageRef);
1335:                }
1336:            }
1337:
1338:            /**
1339:             * Information about unacknowledged messages
1340:             */
1341:            private static class UnackedMessageInfo {
1342:                public MessageReference messageRef;
1343:                public Subscription sub;
1344:
1345:                public UnackedMessageInfo(MessageReference messageRef,
1346:                        Subscription sub) {
1347:                    this.messageRef = messageRef;
1348:                    this.sub = sub;
1349:                }
1350:            }
1351:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.