Source Code Cross Referenced for JMSDestinationManager.java in  » EJB-Server-JBoss-4.2.1 » messaging » org » jboss » mq » server » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » EJB Server JBoss 4.2.1 » messaging » org.jboss.mq.server 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         * JBoss, Home of Professional Open Source.
003:         * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004:         * as indicated by the @author tags. See the copyright.txt file in the
005:         * distribution for a full listing of individual contributors.
006:         *
007:         * This is free software; you can redistribute it and/or modify it
008:         * under the terms of the GNU Lesser General Public License as
009:         * published by the Free Software Foundation; either version 2.1 of
010:         * the License, or (at your option) any later version.
011:         *
012:         * This software is distributed in the hope that it will be useful,
013:         * but WITHOUT ANY WARRANTY; without even the implied warranty of
014:         * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015:         * Lesser General Public License for more details.
016:         *
017:         * You should have received a copy of the GNU Lesser General Public
018:         * License along with this software; if not, write to the Free
019:         * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020:         * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021:         */
022:        package org.jboss.mq.server;
023:
024:        import java.util.Collection;
025:        import java.util.HashMap;
026:        import java.util.Iterator;
027:        import java.util.Map;
028:        import java.util.TreeMap;
029:
030:        import javax.jms.Destination;
031:        import javax.jms.InvalidDestinationException;
032:        import javax.jms.JMSException;
033:        import javax.jms.Queue;
034:        import javax.jms.TemporaryQueue;
035:        import javax.jms.TemporaryTopic;
036:        import javax.jms.Topic;
037:        import javax.transaction.xa.Xid;
038:
039:        import org.jboss.mq.AcknowledgementRequest;
040:        import org.jboss.mq.ConnectionToken;
041:        import org.jboss.mq.DurableSubscriptionID;
042:        import org.jboss.mq.SpyDestination;
043:        import org.jboss.mq.SpyJMSException;
044:        import org.jboss.mq.SpyMessage;
045:        import org.jboss.mq.SpyQueue;
046:        import org.jboss.mq.SpyTemporaryQueue;
047:        import org.jboss.mq.SpyTemporaryTopic;
048:        import org.jboss.mq.SpyTopic;
049:        import org.jboss.mq.SpyTransactionRolledBackException;
050:        import org.jboss.mq.Subscription;
051:        import org.jboss.mq.TransactionRequest;
052:        import org.jboss.mq.pm.PersistenceManager;
053:        import org.jboss.mq.pm.Tx;
054:        import org.jboss.mq.pm.TxManager;
055:        import org.jboss.mq.sm.StateManager;
056:        import org.jboss.util.threadpool.ThreadPool;
057:        import org.jboss.util.timeout.TimeoutFactory;
058:
059:        import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
060:
061:        /**
062:         * This class implements the JMS provider
063:         *
064:         * @author    Norbert Lataille (Norbert.Lataille@m4x.org)
065:         * @author    Hiram Chirino (Cojonudo14@hotmail.com)
066:         * @author    David Maplesden (David.Maplesden@orion.co.nz)
067:         * @author <a href="mailto:pra@tim.se">Peter Antman</a>
068:         * @version   $Revision: 57198 $
069:         */
070:        public class JMSDestinationManager extends JMSServerInterceptorSupport {
071:            /** The version */
072:            public final static String JBOSS_VERSION = "JBossMQ Version 4.0";
073:
074:            /** Destinations SpyDestination -> JMSDestination */
075:            public Map destinations = new ConcurrentReaderHashMap();
076:
077:            /** Destinations being closed SpyDestination -> JMSDestination */
078:            public Map closingDestinations = new ConcurrentReaderHashMap();
079:
080:            /** Thread pool */
081:            public ThreadPool threadPool;
082:
083:            /** Thread group */
084:            public ThreadGroup threadGroup;
085:
086:            /** Timeout factory */
087:            public TimeoutFactory timeoutFactory;
088:
089:            /** The list of ClientConsumers hased by ConnectionTokens */
090:            Map clientConsumers = new ConcurrentReaderHashMap();
091:
092:            /** last id given to a client */
093:            private int lastID = 1;
094:
095:            /** last id given to a temporary topic */
096:            private int lastTemporaryTopic = 1;
097:
098:            private Object lastTemporaryTopicLock = new Object();
099:
100:            /** last id given to a temporary queue */
101:            private int lastTemporaryQueue = 1;
102:
103:            private Object lastTemporaryQueueLock = new Object();
104:
105:            /** The security manager */
106:            private StateManager stateManager;
107:
108:            /** The persistence manager */
109:            private PersistenceManager persistenceManager;
110:
111:            /** The Cache Used to hold messages */
112:            private MessageCache messageCache;
113:
114:            private Object stateLock = new Object();
115:
116:            private Object idLock = new Object();
117:
118:            /**
119:             * Because there can be a delay between killing the JMS service and the
120:             * service actually dying, this field is used to tell external classes that
121:             * that server has actually stopped.
122:             */
123:            private boolean stopped = true;
124:
125:            /** Temporary queue/topic parameters */
126:            BasicQueueParameters parameters;
127:
128:            /**
129:             * Constructor for the JMSServer object
130:             */
131:            public JMSDestinationManager(BasicQueueParameters parameters) {
132:                this .parameters = parameters;
133:            }
134:
135:            /**
136:             * Sets the Enabled attribute of the JMSServer object
137:             *
138:             * @param dc                The new Enabled value
139:             * @param enabled           The new Enabled value
140:             * @exception JMSException  Description of Exception
141:             */
142:            public void setEnabled(ConnectionToken dc, boolean enabled)
143:                    throws JMSException {
144:                ClientConsumer ClientConsumer = getClientConsumer(dc);
145:                ClientConsumer.setEnabled(enabled);
146:            }
147:
148:            /**
149:             * Sets the StateManager attribute of the JMSServer object
150:             *
151:             * @param newStateManager  The new StateManager value
152:             */
153:            public void setStateManager(StateManager newStateManager) {
154:                stateManager = newStateManager;
155:            }
156:
157:            /**
158:             * Sets the PersistenceManager attribute of the JMSServer object
159:             *
160:             * @param newPersistenceManager  The new PersistenceManager value
161:             */
162:            public void setPersistenceManager(
163:                    org.jboss.mq.pm.PersistenceManager newPersistenceManager) {
164:                persistenceManager = newPersistenceManager;
165:            }
166:
167:            /**
168:             * Returns <code>false</code> if the JMS server is currently running and
169:             * handling requests, <code>true</code> otherwise.
170:             *
171:             * @return   <code>false</code> if the JMS server is currently running and
172:             *      handling requests, <code>true</code> otherwise.
173:             */
174:            public boolean isStopped() {
175:                synchronized (stateLock) {
176:                    return this .stopped;
177:                }
178:            }
179:
180:            protected void checkStopped() throws IllegalStateException {
181:                if (isStopped())
182:                    throw new IllegalStateException("Server is stopped.");
183:            }
184:
185:            /**
186:             *
187:             * @return the current client count
188:             */
189:            public int getClientCount() {
190:                return clientConsumers.size();
191:            }
192:
193:            /** 
194:             * Obtain a copy of the current clients
195:             * 
196:             * @return a HashMap<ConnectionToken, ClientConsumer> of current clients
197:             */
198:            public HashMap getClients() {
199:                return new HashMap(clientConsumers);
200:            }
201:
202:            public void setThreadPool(ThreadPool threadPool) {
203:                this .threadPool = threadPool;
204:            }
205:
206:            public ThreadPool getThreadPool() {
207:                return threadPool;
208:            }
209:
210:            public void setThreadGroup(ThreadGroup threadGroup) {
211:                this .threadGroup = threadGroup;
212:            }
213:
214:            public ThreadGroup getThreadGroup() {
215:                return threadGroup;
216:            }
217:
218:            public TimeoutFactory getTimeoutFactory() {
219:                return timeoutFactory;
220:            }
221:
222:            /**
223:             * Gets the ID attribute of the JMSServer object
224:             *
225:             * @return   The ID value
226:             */
227:            public String getID() {
228:                String ID = null;
229:
230:                while (isStopped() == false) {
231:                    if (stateManager == null)
232:                        throw new IllegalStateException("No statemanager");
233:                    try {
234:                        synchronized (idLock) {
235:                            ID = "ID:" + (new Integer(lastID++).toString());
236:                        }
237:                        stateManager.addLoggedOnClientId(ID);
238:                        break;
239:                    } catch (Exception e) {
240:                    }
241:                }
242:
243:                checkStopped();
244:
245:                return ID;
246:            }
247:
248:            public TemporaryTopic getTemporaryTopic(ConnectionToken dc)
249:                    throws JMSException {
250:                checkStopped();
251:
252:                String topicName;
253:                synchronized (lastTemporaryTopicLock) {
254:                    topicName = "JMS_TT"
255:                            + (new Integer(lastTemporaryTopic++).toString());
256:                }
257:                SpyTemporaryTopic topic = new SpyTemporaryTopic(topicName, dc);
258:
259:                ClientConsumer ClientConsumer = getClientConsumer(dc);
260:                JMSDestination queue = new JMSTopic(topic, ClientConsumer,
261:                        this , parameters);
262:                destinations.put(topic, queue);
263:
264:                return topic;
265:            }
266:
267:            public TemporaryQueue getTemporaryQueue(ConnectionToken dc)
268:                    throws JMSException {
269:                checkStopped();
270:
271:                String queueName;
272:                synchronized (lastTemporaryQueueLock) {
273:                    queueName = "JMS_TQ"
274:                            + (new Integer(lastTemporaryQueue++).toString());
275:                }
276:                SpyTemporaryQueue newQueue = new SpyTemporaryQueue(queueName,
277:                        dc);
278:
279:                ClientConsumer ClientConsumer = getClientConsumer(dc);
280:                JMSDestination queue = new JMSQueue(newQueue, ClientConsumer,
281:                        this , parameters);
282:                destinations.put(newQueue, queue);
283:
284:                return newQueue;
285:            }
286:
287:            public ClientConsumer getClientConsumer(ConnectionToken dc)
288:                    throws JMSException {
289:                ClientConsumer cq = (ClientConsumer) clientConsumers.get(dc);
290:                if (cq == null) {
291:                    cq = new ClientConsumer(this , dc);
292:                    clientConsumers.put(dc, cq);
293:                }
294:                return cq;
295:            }
296:
297:            public JMSDestination getJMSDestination(SpyDestination dest) {
298:                return (JMSDestination) destinations.get(dest);
299:            }
300:
301:            /**
302:             * Gets the JMSDestination attribute of the JMSServer object
303:             * which might be being closed 
304:             *
305:             * @param dest  Description of Parameter
306:             * @return      The JMSDestination value
307:             */
308:            protected JMSDestination getPossiblyClosingJMSDestination(
309:                    SpyDestination dest) {
310:                JMSDestination result = (JMSDestination) destinations.get(dest);
311:                if (result == null)
312:                    result = (JMSDestination) closingDestinations.get(dest);
313:                return result;
314:            }
315:
316:            /**
317:             * Gets the StateManager attribute of the JMSServer object
318:             *
319:             * @return   The StateManager value
320:             */
321:            public StateManager getStateManager() {
322:                return stateManager;
323:            }
324:
325:            /**
326:             * Gets the PersistenceManager attribute of the JMSServer object
327:             *
328:             * @return   The PersistenceManager value
329:             */
330:            public PersistenceManager getPersistenceManager() {
331:                return persistenceManager;
332:            }
333:
334:            /**
335:             * Start the server
336:             */
337:            public void startServer() {
338:                synchronized (stateLock) {
339:                    this .stopped = false;
340:                    this .timeoutFactory = new TimeoutFactory(this .threadPool);
341:                }
342:            }
343:
344:            /**
345:             * Stop the server
346:             */
347:            public void stopServer() {
348:                synchronized (stateLock) {
349:                    this .stopped = true;
350:                    this .timeoutFactory.cancel();
351:
352:                    for (Iterator i = clientConsumers.keySet().iterator(); i
353:                            .hasNext();) {
354:                        ConnectionToken token = (ConnectionToken) i.next();
355:                        try {
356:                            connectionClosing(token);
357:                        } catch (Throwable t) {
358:                            log.trace(
359:                                    "Ignored error closing client connection "
360:                                            + token, t);
361:                        }
362:                    }
363:                }
364:            }
365:
366:            public void checkID(String ID) throws JMSException {
367:                checkStopped();
368:                stateManager.addLoggedOnClientId(ID);
369:            }
370:
371:            public void addMessage(ConnectionToken dc, SpyMessage val)
372:                    throws JMSException {
373:                addMessage(dc, val, null);
374:            }
375:
376:            public void addMessage(ConnectionToken dc, SpyMessage val, Tx txId)
377:                    throws JMSException {
378:                checkStopped();
379:                JMSDestination queue = (JMSDestination) destinations.get(val
380:                        .getJMSDestination());
381:                if (queue == null)
382:                    throw new InvalidDestinationException(
383:                            "This destination does not exist! "
384:                                    + val.getJMSDestination());
385:
386:                // Reset any redelivered information
387:                val.setJMSRedelivered(false);
388:                val.header.jmsProperties
389:                        .remove(SpyMessage.PROPERTY_REDELIVERY_COUNT);
390:
391:                //Add the message to the queue
392:                val.setReadOnlyMode();
393:                queue.addMessage(val, txId);
394:            }
395:
396:            public void transact(ConnectionToken dc, TransactionRequest t)
397:                    throws JMSException {
398:                checkStopped();
399:                boolean trace = log.isTraceEnabled();
400:                TxManager txManager = persistenceManager.getTxManager();
401:                if (t.requestType == TransactionRequest.ONE_PHASE_COMMIT_REQUEST) {
402:                    Tx txId = txManager.createTx();
403:                    if (trace)
404:                        log.trace(dc + " 1PC " + t.xid + " txId="
405:                                + txId.longValue());
406:                    try {
407:                        if (t.messages != null) {
408:                            for (int i = 0; i < t.messages.length; i++) {
409:                                addMessage(dc, t.messages[i], txId);
410:                            }
411:                        }
412:                        if (t.acks != null) {
413:                            for (int i = 0; i < t.acks.length; i++) {
414:                                acknowledge(dc, t.acks[i], txId);
415:                            }
416:                        }
417:                        txManager.commitTx(txId);
418:                    } catch (JMSException e) {
419:                        log
420:                                .debug(
421:                                        "Exception occured, rolling back transaction: ",
422:                                        e);
423:                        txManager.rollbackTx(txId);
424:                        throw new SpyTransactionRolledBackException(
425:                                "Transaction was rolled back.", e);
426:                    }
427:                } else if (t.requestType == TransactionRequest.TWO_PHASE_COMMIT_PREPARE_REQUEST) {
428:                    Tx txId = txManager.createTx(dc, t.xid);
429:                    if (trace)
430:                        log.trace(dc + " 2PC PREPARE " + t.xid + " txId="
431:                                + txId.longValue());
432:                    try {
433:                        if (t.messages != null) {
434:                            for (int i = 0; i < t.messages.length; i++) {
435:                                addMessage(dc, t.messages[i], txId);
436:                            }
437:                        }
438:                        if (t.acks != null) {
439:                            for (int i = 0; i < t.acks.length; i++) {
440:                                acknowledge(dc, t.acks[i], txId);
441:                            }
442:                        }
443:
444:                        txManager.markPrepared(dc, t.xid, txId);
445:                    } catch (JMSException e) {
446:                        log
447:                                .debug(
448:                                        "Exception occured, rolling back transaction: ",
449:                                        e);
450:                        txManager.rollbackTx(txId);
451:                        throw new SpyTransactionRolledBackException(
452:                                "Transaction was rolled back.", e);
453:                    }
454:                } else if (t.requestType == TransactionRequest.TWO_PHASE_COMMIT_ROLLBACK_REQUEST) {
455:                    if (trace)
456:                        log.trace(dc + " 2PC ROLLBACK " + t.xid);
457:                    txManager.rollbackTx(dc, t.xid);
458:                } else if (t.requestType == TransactionRequest.TWO_PHASE_COMMIT_COMMIT_REQUEST) {
459:                    if (trace)
460:                        log.trace(dc + " 2PC COMMIT " + t.xid);
461:                    txManager.commitTx(dc, t.xid);
462:                }
463:            }
464:
465:            public Xid[] recover(ConnectionToken dc, int flags)
466:                    throws Exception {
467:                checkStopped();
468:                TxManager txManager = persistenceManager.getTxManager();
469:                return txManager.recover(dc, flags);
470:            }
471:
472:            public void acknowledge(ConnectionToken dc,
473:                    AcknowledgementRequest item) throws JMSException {
474:                acknowledge(dc, item, null);
475:            }
476:
477:            public void acknowledge(ConnectionToken dc,
478:                    AcknowledgementRequest item, Tx txId) throws JMSException {
479:                checkStopped();
480:                ClientConsumer cc = getClientConsumer(dc);
481:                cc.acknowledge(item, txId);
482:            }
483:
484:            public void connectionClosing(ConnectionToken dc)
485:                    throws JMSException {
486:                if (dc == null)
487:                    return;
488:
489:                // Close it's ClientConsumer
490:                ClientConsumer cq = (ClientConsumer) clientConsumers.remove(dc);
491:                if (cq != null)
492:                    cq.close();
493:
494:                //unregister its clientID
495:                if (dc.getClientID() != null)
496:                    stateManager.removeLoggedOnClientId(dc.getClientID());
497:
498:                //Remove any temporary destinations the consumer may have created.
499:                Iterator i = destinations.entrySet().iterator();
500:                while (i.hasNext()) {
501:                    Map.Entry entry = (Map.Entry) i.next();
502:                    JMSDestination sq = (JMSDestination) entry.getValue();
503:                    if (sq != null) {
504:                        ClientConsumer cc = sq.temporaryDestination;
505:                        if (cc != null && dc.equals(cc.connectionToken)) {
506:                            i.remove();
507:                            deleteTemporaryDestination(dc, sq);
508:                        }
509:                    }
510:                }
511:                // Close the clientIL
512:                try {
513:                    if (dc.clientIL != null)
514:                        dc.clientIL.close();
515:                } catch (Exception ex) {
516:                    // We skip warning, to often the client will always
517:                    // have gone when we get here
518:                    //log.warn("Could not close clientIL: " +ex,ex);
519:                }
520:            }
521:
522:            public void connectionFailure(ConnectionToken dc)
523:                    throws JMSException {
524:                //We should try again :) This behavior should under control of a Failure-Plugin
525:                log.error("The connection to client " + dc.getClientID()
526:                        + " failed.");
527:                connectionClosing(dc);
528:            }
529:
530:            public void subscribe(ConnectionToken dc, Subscription sub)
531:                    throws JMSException {
532:                checkStopped();
533:                ClientConsumer clientConsumer = getClientConsumer(dc);
534:                clientConsumer.addSubscription(sub);
535:            }
536:
537:            public void unsubscribe(ConnectionToken dc, int subscriptionId)
538:                    throws JMSException {
539:                checkStopped();
540:                ClientConsumer clientConsumer = getClientConsumer(dc);
541:                clientConsumer.removeSubscription(subscriptionId);
542:            }
543:
544:            public void destroySubscription(ConnectionToken dc,
545:                    DurableSubscriptionID id) throws JMSException {
546:                checkStopped();
547:                getStateManager().setDurableSubscription(this , id, null);
548:            }
549:
550:            public SpyMessage[] browse(ConnectionToken dc, Destination dest,
551:                    String selector) throws JMSException {
552:                checkStopped();
553:                JMSDestination queue = (JMSDestination) destinations.get(dest);
554:                if (queue == null)
555:                    throw new InvalidDestinationException(
556:                            "That destination does not exist! " + dest);
557:                if (!(queue instanceof  JMSQueue))
558:                    throw new JMSException("That destination is not a queue");
559:
560:                return ((JMSQueue) queue).browse(selector);
561:            }
562:
563:            public SpyMessage receive(ConnectionToken dc, int subscriberId,
564:                    long wait) throws JMSException {
565:                checkStopped();
566:                ClientConsumer clientConsumer = getClientConsumer(dc);
567:                SpyMessage msg = clientConsumer.receive(subscriberId, wait);
568:                return msg;
569:            }
570:
571:            public Queue createQueue(ConnectionToken dc, String name)
572:                    throws JMSException {
573:                checkStopped();
574:                SpyQueue newQueue = new SpyQueue(name);
575:                if (!destinations.containsKey(newQueue))
576:                    throw new JMSException("This destination does not exist !"
577:                            + newQueue);
578:                return newQueue;
579:            }
580:
581:            public Topic createTopic(ConnectionToken dc, String name)
582:                    throws JMSException {
583:                checkStopped();
584:                SpyTopic newTopic = new SpyTopic(name);
585:                if (!destinations.containsKey(newTopic))
586:                    throw new JMSException("This destination does not exist !"
587:                            + newTopic);
588:                return newTopic;
589:            }
590:
591:            public Queue createQueue(String queueName) throws JMSException {
592:                checkStopped();
593:
594:                SpyTemporaryQueue newQueue = new SpyTemporaryQueue(queueName,
595:                        null);
596:
597:                JMSDestination queue = new JMSQueue(newQueue, null, this ,
598:                        parameters);
599:                destinations.put(newQueue, queue);
600:
601:                return newQueue;
602:            }
603:
604:            public Topic createTopic(String topicName) throws JMSException {
605:                checkStopped();
606:
607:                SpyTemporaryTopic topic = new SpyTemporaryTopic(topicName, null);
608:
609:                JMSDestination queue = new JMSTopic(topic, null, this ,
610:                        parameters);
611:                destinations.put(topic, queue);
612:
613:                return topic;
614:            }
615:
616:            public void deleteTemporaryDestination(ConnectionToken dc,
617:                    SpyDestination dest) throws JMSException {
618:                checkStopped();
619:                JMSDestination destination = (JMSDestination) destinations
620:                        .get(dest);
621:                if (destination == null)
622:                    throw new InvalidDestinationException(
623:                            "That destination does not exist! " + destination);
624:
625:                if (destination.isInUse())
626:                    throw new JMSException(
627:                            "Cannot delete temporary queue, it is in use.");
628:
629:                destinations.remove(dest);
630:                deleteTemporaryDestination(dc, destination);
631:            }
632:
633:            protected void deleteTemporaryDestination(ConnectionToken dc,
634:                    JMSDestination destination) throws JMSException {
635:                try {
636:                    destination.removeAllMessages();
637:                } catch (Exception e) {
638:                    log
639:                            .error(
640:                                    "An exception happened while removing all messages from temporary destination "
641:                                            + destination.getSpyDestination()
642:                                                    .getName(), e);
643:                }
644:
645:            }
646:
647:            public String checkUser(String userName, String password)
648:                    throws JMSException {
649:                checkStopped();
650:                return stateManager.checkUser(userName, password);
651:            }
652:
653:            public String authenticate(String id, String password)
654:                    throws JMSException {
655:                checkStopped();
656:                // do nothing
657:                return null;
658:            }
659:
660:            public void addDestination(JMSDestination destination)
661:                    throws JMSException {
662:                if (destinations.containsKey(destination.getSpyDestination()))
663:                    throw new JMSException(
664:                            "This destination has already been added to the server!");
665:
666:                //Add this new destination to the list
667:                destinations.put(destination.getSpyDestination(), destination);
668:
669:                // Restore the messages
670:                if (destination instanceof  JMSTopic) {
671:                    Collection durableSubs = getStateManager()
672:                            .getDurableSubscriptionIdsForTopic(
673:                                    (SpyTopic) destination.getSpyDestination());
674:                    for (Iterator i = durableSubs.iterator(); i.hasNext();) {
675:                        DurableSubscriptionID sub = (DurableSubscriptionID) i
676:                                .next();
677:                        log.debug("creating the durable subscription for :"
678:                                + sub);
679:                        ((JMSTopic) destination).createDurableSubscription(sub);
680:                    }
681:                }
682:            }
683:
684:            /**
685:             * Closed a destination that was opened previously
686:             *
687:             * @param dest              the destionation to close
688:             * @exception JMSException  Description of Exception
689:             */
690:            public void closeDestination(SpyDestination dest)
691:                    throws JMSException {
692:                JMSDestination destination = (JMSDestination) destinations
693:                        .remove(dest);
694:                if (destination == null)
695:                    throw new InvalidDestinationException(
696:                            "This destination is not open! " + dest);
697:
698:                log.debug("Closing destination " + dest);
699:
700:                // Add it to the closing list
701:                closingDestinations.put(dest, destination);
702:                try {
703:                    destination.close();
704:                } finally {
705:                    closingDestinations.remove(dest);
706:                }
707:            }
708:
709:            public String toString() {
710:                return JBOSS_VERSION;
711:            }
712:
713:            public void ping(ConnectionToken dc, long clientTime)
714:                    throws JMSException {
715:                checkStopped();
716:                try {
717:                    dc.clientIL.pong(System.currentTimeMillis());
718:                } catch (Exception e) {
719:                    throw new SpyJMSException("Could not pong", e);
720:                }
721:            }
722:
723:            /**
724:             * Gets the messageCache
725:             * @return Returns a MessageCache
726:             */
727:            public MessageCache getMessageCache() {
728:                return messageCache;
729:            }
730:
731:            /**
732:             * Sets the messageCache
733:             * @param messageCache The messageCache to set
734:             */
735:            public void setMessageCache(MessageCache messageCache) {
736:                this .messageCache = messageCache;
737:            }
738:
739:            public SpyTopic getDurableTopic(DurableSubscriptionID sub)
740:                    throws JMSException {
741:                checkStopped();
742:                return getStateManager().getDurableTopic(sub);
743:            }
744:
745:            public Subscription getSubscription(ConnectionToken dc,
746:                    int subscriberId) throws JMSException {
747:                checkStopped();
748:                ClientConsumer clientConsumer = getClientConsumer(dc);
749:                return clientConsumer.getSubscription(subscriberId);
750:            }
751:
752:            /**
753:             * Gets message counters of all configured destinations
754:             *
755:             * @return MessageCounter[]      message counter array sorted by name
756:             */
757:            public MessageCounter[] getMessageCounter() {
758:                TreeMap map = new TreeMap(); // for sorting
759:
760:                Iterator i = destinations.values().iterator();
761:
762:                while (i.hasNext()) {
763:                    JMSDestination dest = (JMSDestination) i.next();
764:
765:                    MessageCounter[] counter = dest.getMessageCounter();
766:
767:                    for (int j = 0; j < counter.length; j++) {
768:                        // sorting order name + subscription + type
769:                        String key = counter[j].getDestinationName()
770:                                + "-"
771:                                + counter[j].getDestinationSubscription()
772:                                + "-"
773:                                + (counter[j].getDestinationTopic() ? "Topic"
774:                                        : "Queue");
775:
776:                        map.put(key, counter[j]);
777:                    }
778:                }
779:
780:                return (MessageCounter[]) map.values().toArray(
781:                        new MessageCounter[0]);
782:            }
783:
784:            /**
785:             * Resets message counters of all configured destinations
786:             */
787:            public void resetMessageCounter() {
788:                Iterator i = destinations.values().iterator();
789:
790:                while (i.hasNext()) {
791:                    JMSDestination dest = (JMSDestination) i.next();
792:
793:                    MessageCounter[] counter = dest.getMessageCounter();
794:
795:                    for (int j = 0; j < counter.length; j++) {
796:                        counter[j].resetCounter();
797:                    }
798:                }
799:            }
800:
801:            public BasicQueueParameters getParameters() {
802:                return parameters;
803:            }
804:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.