001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.mq.server;
023:
024: import java.util.Collection;
025: import java.util.HashMap;
026: import java.util.Iterator;
027: import java.util.Map;
028: import java.util.TreeMap;
029:
030: import javax.jms.Destination;
031: import javax.jms.InvalidDestinationException;
032: import javax.jms.JMSException;
033: import javax.jms.Queue;
034: import javax.jms.TemporaryQueue;
035: import javax.jms.TemporaryTopic;
036: import javax.jms.Topic;
037: import javax.transaction.xa.Xid;
038:
039: import org.jboss.mq.AcknowledgementRequest;
040: import org.jboss.mq.ConnectionToken;
041: import org.jboss.mq.DurableSubscriptionID;
042: import org.jboss.mq.SpyDestination;
043: import org.jboss.mq.SpyJMSException;
044: import org.jboss.mq.SpyMessage;
045: import org.jboss.mq.SpyQueue;
046: import org.jboss.mq.SpyTemporaryQueue;
047: import org.jboss.mq.SpyTemporaryTopic;
048: import org.jboss.mq.SpyTopic;
049: import org.jboss.mq.SpyTransactionRolledBackException;
050: import org.jboss.mq.Subscription;
051: import org.jboss.mq.TransactionRequest;
052: import org.jboss.mq.pm.PersistenceManager;
053: import org.jboss.mq.pm.Tx;
054: import org.jboss.mq.pm.TxManager;
055: import org.jboss.mq.sm.StateManager;
056: import org.jboss.util.threadpool.ThreadPool;
057: import org.jboss.util.timeout.TimeoutFactory;
058:
059: import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
060:
061: /**
062: * This class implements the JMS provider
063: *
064: * @author Norbert Lataille (Norbert.Lataille@m4x.org)
065: * @author Hiram Chirino (Cojonudo14@hotmail.com)
066: * @author David Maplesden (David.Maplesden@orion.co.nz)
067: * @author <a href="mailto:pra@tim.se">Peter Antman</a>
068: * @version $Revision: 57198 $
069: */
070: public class JMSDestinationManager extends JMSServerInterceptorSupport {
071: /** The version */
072: public final static String JBOSS_VERSION = "JBossMQ Version 4.0";
073:
074: /** Destinations SpyDestination -> JMSDestination */
075: public Map destinations = new ConcurrentReaderHashMap();
076:
077: /** Destinations being closed SpyDestination -> JMSDestination */
078: public Map closingDestinations = new ConcurrentReaderHashMap();
079:
080: /** Thread pool */
081: public ThreadPool threadPool;
082:
083: /** Thread group */
084: public ThreadGroup threadGroup;
085:
086: /** Timeout factory */
087: public TimeoutFactory timeoutFactory;
088:
089: /** The list of ClientConsumers hased by ConnectionTokens */
090: Map clientConsumers = new ConcurrentReaderHashMap();
091:
092: /** last id given to a client */
093: private int lastID = 1;
094:
095: /** last id given to a temporary topic */
096: private int lastTemporaryTopic = 1;
097:
098: private Object lastTemporaryTopicLock = new Object();
099:
100: /** last id given to a temporary queue */
101: private int lastTemporaryQueue = 1;
102:
103: private Object lastTemporaryQueueLock = new Object();
104:
105: /** The security manager */
106: private StateManager stateManager;
107:
108: /** The persistence manager */
109: private PersistenceManager persistenceManager;
110:
111: /** The Cache Used to hold messages */
112: private MessageCache messageCache;
113:
114: private Object stateLock = new Object();
115:
116: private Object idLock = new Object();
117:
118: /**
119: * Because there can be a delay between killing the JMS service and the
120: * service actually dying, this field is used to tell external classes that
121: * that server has actually stopped.
122: */
123: private boolean stopped = true;
124:
125: /** Temporary queue/topic parameters */
126: BasicQueueParameters parameters;
127:
128: /**
129: * Constructor for the JMSServer object
130: */
131: public JMSDestinationManager(BasicQueueParameters parameters) {
132: this .parameters = parameters;
133: }
134:
135: /**
136: * Sets the Enabled attribute of the JMSServer object
137: *
138: * @param dc The new Enabled value
139: * @param enabled The new Enabled value
140: * @exception JMSException Description of Exception
141: */
142: public void setEnabled(ConnectionToken dc, boolean enabled)
143: throws JMSException {
144: ClientConsumer ClientConsumer = getClientConsumer(dc);
145: ClientConsumer.setEnabled(enabled);
146: }
147:
148: /**
149: * Sets the StateManager attribute of the JMSServer object
150: *
151: * @param newStateManager The new StateManager value
152: */
153: public void setStateManager(StateManager newStateManager) {
154: stateManager = newStateManager;
155: }
156:
157: /**
158: * Sets the PersistenceManager attribute of the JMSServer object
159: *
160: * @param newPersistenceManager The new PersistenceManager value
161: */
162: public void setPersistenceManager(
163: org.jboss.mq.pm.PersistenceManager newPersistenceManager) {
164: persistenceManager = newPersistenceManager;
165: }
166:
167: /**
168: * Returns <code>false</code> if the JMS server is currently running and
169: * handling requests, <code>true</code> otherwise.
170: *
171: * @return <code>false</code> if the JMS server is currently running and
172: * handling requests, <code>true</code> otherwise.
173: */
174: public boolean isStopped() {
175: synchronized (stateLock) {
176: return this .stopped;
177: }
178: }
179:
180: protected void checkStopped() throws IllegalStateException {
181: if (isStopped())
182: throw new IllegalStateException("Server is stopped.");
183: }
184:
185: /**
186: *
187: * @return the current client count
188: */
189: public int getClientCount() {
190: return clientConsumers.size();
191: }
192:
193: /**
194: * Obtain a copy of the current clients
195: *
196: * @return a HashMap<ConnectionToken, ClientConsumer> of current clients
197: */
198: public HashMap getClients() {
199: return new HashMap(clientConsumers);
200: }
201:
202: public void setThreadPool(ThreadPool threadPool) {
203: this .threadPool = threadPool;
204: }
205:
206: public ThreadPool getThreadPool() {
207: return threadPool;
208: }
209:
210: public void setThreadGroup(ThreadGroup threadGroup) {
211: this .threadGroup = threadGroup;
212: }
213:
214: public ThreadGroup getThreadGroup() {
215: return threadGroup;
216: }
217:
218: public TimeoutFactory getTimeoutFactory() {
219: return timeoutFactory;
220: }
221:
222: /**
223: * Gets the ID attribute of the JMSServer object
224: *
225: * @return The ID value
226: */
227: public String getID() {
228: String ID = null;
229:
230: while (isStopped() == false) {
231: if (stateManager == null)
232: throw new IllegalStateException("No statemanager");
233: try {
234: synchronized (idLock) {
235: ID = "ID:" + (new Integer(lastID++).toString());
236: }
237: stateManager.addLoggedOnClientId(ID);
238: break;
239: } catch (Exception e) {
240: }
241: }
242:
243: checkStopped();
244:
245: return ID;
246: }
247:
248: public TemporaryTopic getTemporaryTopic(ConnectionToken dc)
249: throws JMSException {
250: checkStopped();
251:
252: String topicName;
253: synchronized (lastTemporaryTopicLock) {
254: topicName = "JMS_TT"
255: + (new Integer(lastTemporaryTopic++).toString());
256: }
257: SpyTemporaryTopic topic = new SpyTemporaryTopic(topicName, dc);
258:
259: ClientConsumer ClientConsumer = getClientConsumer(dc);
260: JMSDestination queue = new JMSTopic(topic, ClientConsumer,
261: this , parameters);
262: destinations.put(topic, queue);
263:
264: return topic;
265: }
266:
267: public TemporaryQueue getTemporaryQueue(ConnectionToken dc)
268: throws JMSException {
269: checkStopped();
270:
271: String queueName;
272: synchronized (lastTemporaryQueueLock) {
273: queueName = "JMS_TQ"
274: + (new Integer(lastTemporaryQueue++).toString());
275: }
276: SpyTemporaryQueue newQueue = new SpyTemporaryQueue(queueName,
277: dc);
278:
279: ClientConsumer ClientConsumer = getClientConsumer(dc);
280: JMSDestination queue = new JMSQueue(newQueue, ClientConsumer,
281: this , parameters);
282: destinations.put(newQueue, queue);
283:
284: return newQueue;
285: }
286:
287: public ClientConsumer getClientConsumer(ConnectionToken dc)
288: throws JMSException {
289: ClientConsumer cq = (ClientConsumer) clientConsumers.get(dc);
290: if (cq == null) {
291: cq = new ClientConsumer(this , dc);
292: clientConsumers.put(dc, cq);
293: }
294: return cq;
295: }
296:
297: public JMSDestination getJMSDestination(SpyDestination dest) {
298: return (JMSDestination) destinations.get(dest);
299: }
300:
301: /**
302: * Gets the JMSDestination attribute of the JMSServer object
303: * which might be being closed
304: *
305: * @param dest Description of Parameter
306: * @return The JMSDestination value
307: */
308: protected JMSDestination getPossiblyClosingJMSDestination(
309: SpyDestination dest) {
310: JMSDestination result = (JMSDestination) destinations.get(dest);
311: if (result == null)
312: result = (JMSDestination) closingDestinations.get(dest);
313: return result;
314: }
315:
316: /**
317: * Gets the StateManager attribute of the JMSServer object
318: *
319: * @return The StateManager value
320: */
321: public StateManager getStateManager() {
322: return stateManager;
323: }
324:
325: /**
326: * Gets the PersistenceManager attribute of the JMSServer object
327: *
328: * @return The PersistenceManager value
329: */
330: public PersistenceManager getPersistenceManager() {
331: return persistenceManager;
332: }
333:
334: /**
335: * Start the server
336: */
337: public void startServer() {
338: synchronized (stateLock) {
339: this .stopped = false;
340: this .timeoutFactory = new TimeoutFactory(this .threadPool);
341: }
342: }
343:
344: /**
345: * Stop the server
346: */
347: public void stopServer() {
348: synchronized (stateLock) {
349: this .stopped = true;
350: this .timeoutFactory.cancel();
351:
352: for (Iterator i = clientConsumers.keySet().iterator(); i
353: .hasNext();) {
354: ConnectionToken token = (ConnectionToken) i.next();
355: try {
356: connectionClosing(token);
357: } catch (Throwable t) {
358: log.trace(
359: "Ignored error closing client connection "
360: + token, t);
361: }
362: }
363: }
364: }
365:
366: public void checkID(String ID) throws JMSException {
367: checkStopped();
368: stateManager.addLoggedOnClientId(ID);
369: }
370:
371: public void addMessage(ConnectionToken dc, SpyMessage val)
372: throws JMSException {
373: addMessage(dc, val, null);
374: }
375:
376: public void addMessage(ConnectionToken dc, SpyMessage val, Tx txId)
377: throws JMSException {
378: checkStopped();
379: JMSDestination queue = (JMSDestination) destinations.get(val
380: .getJMSDestination());
381: if (queue == null)
382: throw new InvalidDestinationException(
383: "This destination does not exist! "
384: + val.getJMSDestination());
385:
386: // Reset any redelivered information
387: val.setJMSRedelivered(false);
388: val.header.jmsProperties
389: .remove(SpyMessage.PROPERTY_REDELIVERY_COUNT);
390:
391: //Add the message to the queue
392: val.setReadOnlyMode();
393: queue.addMessage(val, txId);
394: }
395:
396: public void transact(ConnectionToken dc, TransactionRequest t)
397: throws JMSException {
398: checkStopped();
399: boolean trace = log.isTraceEnabled();
400: TxManager txManager = persistenceManager.getTxManager();
401: if (t.requestType == TransactionRequest.ONE_PHASE_COMMIT_REQUEST) {
402: Tx txId = txManager.createTx();
403: if (trace)
404: log.trace(dc + " 1PC " + t.xid + " txId="
405: + txId.longValue());
406: try {
407: if (t.messages != null) {
408: for (int i = 0; i < t.messages.length; i++) {
409: addMessage(dc, t.messages[i], txId);
410: }
411: }
412: if (t.acks != null) {
413: for (int i = 0; i < t.acks.length; i++) {
414: acknowledge(dc, t.acks[i], txId);
415: }
416: }
417: txManager.commitTx(txId);
418: } catch (JMSException e) {
419: log
420: .debug(
421: "Exception occured, rolling back transaction: ",
422: e);
423: txManager.rollbackTx(txId);
424: throw new SpyTransactionRolledBackException(
425: "Transaction was rolled back.", e);
426: }
427: } else if (t.requestType == TransactionRequest.TWO_PHASE_COMMIT_PREPARE_REQUEST) {
428: Tx txId = txManager.createTx(dc, t.xid);
429: if (trace)
430: log.trace(dc + " 2PC PREPARE " + t.xid + " txId="
431: + txId.longValue());
432: try {
433: if (t.messages != null) {
434: for (int i = 0; i < t.messages.length; i++) {
435: addMessage(dc, t.messages[i], txId);
436: }
437: }
438: if (t.acks != null) {
439: for (int i = 0; i < t.acks.length; i++) {
440: acknowledge(dc, t.acks[i], txId);
441: }
442: }
443:
444: txManager.markPrepared(dc, t.xid, txId);
445: } catch (JMSException e) {
446: log
447: .debug(
448: "Exception occured, rolling back transaction: ",
449: e);
450: txManager.rollbackTx(txId);
451: throw new SpyTransactionRolledBackException(
452: "Transaction was rolled back.", e);
453: }
454: } else if (t.requestType == TransactionRequest.TWO_PHASE_COMMIT_ROLLBACK_REQUEST) {
455: if (trace)
456: log.trace(dc + " 2PC ROLLBACK " + t.xid);
457: txManager.rollbackTx(dc, t.xid);
458: } else if (t.requestType == TransactionRequest.TWO_PHASE_COMMIT_COMMIT_REQUEST) {
459: if (trace)
460: log.trace(dc + " 2PC COMMIT " + t.xid);
461: txManager.commitTx(dc, t.xid);
462: }
463: }
464:
465: public Xid[] recover(ConnectionToken dc, int flags)
466: throws Exception {
467: checkStopped();
468: TxManager txManager = persistenceManager.getTxManager();
469: return txManager.recover(dc, flags);
470: }
471:
472: public void acknowledge(ConnectionToken dc,
473: AcknowledgementRequest item) throws JMSException {
474: acknowledge(dc, item, null);
475: }
476:
477: public void acknowledge(ConnectionToken dc,
478: AcknowledgementRequest item, Tx txId) throws JMSException {
479: checkStopped();
480: ClientConsumer cc = getClientConsumer(dc);
481: cc.acknowledge(item, txId);
482: }
483:
484: public void connectionClosing(ConnectionToken dc)
485: throws JMSException {
486: if (dc == null)
487: return;
488:
489: // Close it's ClientConsumer
490: ClientConsumer cq = (ClientConsumer) clientConsumers.remove(dc);
491: if (cq != null)
492: cq.close();
493:
494: //unregister its clientID
495: if (dc.getClientID() != null)
496: stateManager.removeLoggedOnClientId(dc.getClientID());
497:
498: //Remove any temporary destinations the consumer may have created.
499: Iterator i = destinations.entrySet().iterator();
500: while (i.hasNext()) {
501: Map.Entry entry = (Map.Entry) i.next();
502: JMSDestination sq = (JMSDestination) entry.getValue();
503: if (sq != null) {
504: ClientConsumer cc = sq.temporaryDestination;
505: if (cc != null && dc.equals(cc.connectionToken)) {
506: i.remove();
507: deleteTemporaryDestination(dc, sq);
508: }
509: }
510: }
511: // Close the clientIL
512: try {
513: if (dc.clientIL != null)
514: dc.clientIL.close();
515: } catch (Exception ex) {
516: // We skip warning, to often the client will always
517: // have gone when we get here
518: //log.warn("Could not close clientIL: " +ex,ex);
519: }
520: }
521:
522: public void connectionFailure(ConnectionToken dc)
523: throws JMSException {
524: //We should try again :) This behavior should under control of a Failure-Plugin
525: log.error("The connection to client " + dc.getClientID()
526: + " failed.");
527: connectionClosing(dc);
528: }
529:
530: public void subscribe(ConnectionToken dc, Subscription sub)
531: throws JMSException {
532: checkStopped();
533: ClientConsumer clientConsumer = getClientConsumer(dc);
534: clientConsumer.addSubscription(sub);
535: }
536:
537: public void unsubscribe(ConnectionToken dc, int subscriptionId)
538: throws JMSException {
539: checkStopped();
540: ClientConsumer clientConsumer = getClientConsumer(dc);
541: clientConsumer.removeSubscription(subscriptionId);
542: }
543:
544: public void destroySubscription(ConnectionToken dc,
545: DurableSubscriptionID id) throws JMSException {
546: checkStopped();
547: getStateManager().setDurableSubscription(this , id, null);
548: }
549:
550: public SpyMessage[] browse(ConnectionToken dc, Destination dest,
551: String selector) throws JMSException {
552: checkStopped();
553: JMSDestination queue = (JMSDestination) destinations.get(dest);
554: if (queue == null)
555: throw new InvalidDestinationException(
556: "That destination does not exist! " + dest);
557: if (!(queue instanceof JMSQueue))
558: throw new JMSException("That destination is not a queue");
559:
560: return ((JMSQueue) queue).browse(selector);
561: }
562:
563: public SpyMessage receive(ConnectionToken dc, int subscriberId,
564: long wait) throws JMSException {
565: checkStopped();
566: ClientConsumer clientConsumer = getClientConsumer(dc);
567: SpyMessage msg = clientConsumer.receive(subscriberId, wait);
568: return msg;
569: }
570:
571: public Queue createQueue(ConnectionToken dc, String name)
572: throws JMSException {
573: checkStopped();
574: SpyQueue newQueue = new SpyQueue(name);
575: if (!destinations.containsKey(newQueue))
576: throw new JMSException("This destination does not exist !"
577: + newQueue);
578: return newQueue;
579: }
580:
581: public Topic createTopic(ConnectionToken dc, String name)
582: throws JMSException {
583: checkStopped();
584: SpyTopic newTopic = new SpyTopic(name);
585: if (!destinations.containsKey(newTopic))
586: throw new JMSException("This destination does not exist !"
587: + newTopic);
588: return newTopic;
589: }
590:
591: public Queue createQueue(String queueName) throws JMSException {
592: checkStopped();
593:
594: SpyTemporaryQueue newQueue = new SpyTemporaryQueue(queueName,
595: null);
596:
597: JMSDestination queue = new JMSQueue(newQueue, null, this ,
598: parameters);
599: destinations.put(newQueue, queue);
600:
601: return newQueue;
602: }
603:
604: public Topic createTopic(String topicName) throws JMSException {
605: checkStopped();
606:
607: SpyTemporaryTopic topic = new SpyTemporaryTopic(topicName, null);
608:
609: JMSDestination queue = new JMSTopic(topic, null, this ,
610: parameters);
611: destinations.put(topic, queue);
612:
613: return topic;
614: }
615:
616: public void deleteTemporaryDestination(ConnectionToken dc,
617: SpyDestination dest) throws JMSException {
618: checkStopped();
619: JMSDestination destination = (JMSDestination) destinations
620: .get(dest);
621: if (destination == null)
622: throw new InvalidDestinationException(
623: "That destination does not exist! " + destination);
624:
625: if (destination.isInUse())
626: throw new JMSException(
627: "Cannot delete temporary queue, it is in use.");
628:
629: destinations.remove(dest);
630: deleteTemporaryDestination(dc, destination);
631: }
632:
633: protected void deleteTemporaryDestination(ConnectionToken dc,
634: JMSDestination destination) throws JMSException {
635: try {
636: destination.removeAllMessages();
637: } catch (Exception e) {
638: log
639: .error(
640: "An exception happened while removing all messages from temporary destination "
641: + destination.getSpyDestination()
642: .getName(), e);
643: }
644:
645: }
646:
647: public String checkUser(String userName, String password)
648: throws JMSException {
649: checkStopped();
650: return stateManager.checkUser(userName, password);
651: }
652:
653: public String authenticate(String id, String password)
654: throws JMSException {
655: checkStopped();
656: // do nothing
657: return null;
658: }
659:
660: public void addDestination(JMSDestination destination)
661: throws JMSException {
662: if (destinations.containsKey(destination.getSpyDestination()))
663: throw new JMSException(
664: "This destination has already been added to the server!");
665:
666: //Add this new destination to the list
667: destinations.put(destination.getSpyDestination(), destination);
668:
669: // Restore the messages
670: if (destination instanceof JMSTopic) {
671: Collection durableSubs = getStateManager()
672: .getDurableSubscriptionIdsForTopic(
673: (SpyTopic) destination.getSpyDestination());
674: for (Iterator i = durableSubs.iterator(); i.hasNext();) {
675: DurableSubscriptionID sub = (DurableSubscriptionID) i
676: .next();
677: log.debug("creating the durable subscription for :"
678: + sub);
679: ((JMSTopic) destination).createDurableSubscription(sub);
680: }
681: }
682: }
683:
684: /**
685: * Closed a destination that was opened previously
686: *
687: * @param dest the destionation to close
688: * @exception JMSException Description of Exception
689: */
690: public void closeDestination(SpyDestination dest)
691: throws JMSException {
692: JMSDestination destination = (JMSDestination) destinations
693: .remove(dest);
694: if (destination == null)
695: throw new InvalidDestinationException(
696: "This destination is not open! " + dest);
697:
698: log.debug("Closing destination " + dest);
699:
700: // Add it to the closing list
701: closingDestinations.put(dest, destination);
702: try {
703: destination.close();
704: } finally {
705: closingDestinations.remove(dest);
706: }
707: }
708:
709: public String toString() {
710: return JBOSS_VERSION;
711: }
712:
713: public void ping(ConnectionToken dc, long clientTime)
714: throws JMSException {
715: checkStopped();
716: try {
717: dc.clientIL.pong(System.currentTimeMillis());
718: } catch (Exception e) {
719: throw new SpyJMSException("Could not pong", e);
720: }
721: }
722:
723: /**
724: * Gets the messageCache
725: * @return Returns a MessageCache
726: */
727: public MessageCache getMessageCache() {
728: return messageCache;
729: }
730:
731: /**
732: * Sets the messageCache
733: * @param messageCache The messageCache to set
734: */
735: public void setMessageCache(MessageCache messageCache) {
736: this .messageCache = messageCache;
737: }
738:
739: public SpyTopic getDurableTopic(DurableSubscriptionID sub)
740: throws JMSException {
741: checkStopped();
742: return getStateManager().getDurableTopic(sub);
743: }
744:
745: public Subscription getSubscription(ConnectionToken dc,
746: int subscriberId) throws JMSException {
747: checkStopped();
748: ClientConsumer clientConsumer = getClientConsumer(dc);
749: return clientConsumer.getSubscription(subscriberId);
750: }
751:
752: /**
753: * Gets message counters of all configured destinations
754: *
755: * @return MessageCounter[] message counter array sorted by name
756: */
757: public MessageCounter[] getMessageCounter() {
758: TreeMap map = new TreeMap(); // for sorting
759:
760: Iterator i = destinations.values().iterator();
761:
762: while (i.hasNext()) {
763: JMSDestination dest = (JMSDestination) i.next();
764:
765: MessageCounter[] counter = dest.getMessageCounter();
766:
767: for (int j = 0; j < counter.length; j++) {
768: // sorting order name + subscription + type
769: String key = counter[j].getDestinationName()
770: + "-"
771: + counter[j].getDestinationSubscription()
772: + "-"
773: + (counter[j].getDestinationTopic() ? "Topic"
774: : "Queue");
775:
776: map.put(key, counter[j]);
777: }
778: }
779:
780: return (MessageCounter[]) map.values().toArray(
781: new MessageCounter[0]);
782: }
783:
784: /**
785: * Resets message counters of all configured destinations
786: */
787: public void resetMessageCounter() {
788: Iterator i = destinations.values().iterator();
789:
790: while (i.hasNext()) {
791: JMSDestination dest = (JMSDestination) i.next();
792:
793: MessageCounter[] counter = dest.getMessageCounter();
794:
795: for (int j = 0; j < counter.length; j++) {
796: counter[j].resetCounter();
797: }
798: }
799: }
800:
801: public BasicQueueParameters getParameters() {
802: return parameters;
803: }
804: }
|