001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.mq.server;
023:
024: import java.util.ArrayList;
025: import java.util.Iterator;
026: import java.util.TreeMap;
027:
028: import javax.jms.IllegalStateException;
029: import javax.jms.JMSException;
030:
031: import org.jboss.mq.DestinationFullException;
032: import org.jboss.mq.DurableSubscriptionID;
033: import org.jboss.mq.SpyDestination;
034: import org.jboss.mq.SpyJMSException;
035: import org.jboss.mq.SpyMessage;
036: import org.jboss.mq.SpyTopic;
037: import org.jboss.mq.Subscription;
038: import org.jboss.mq.pm.NewPersistenceManager;
039: import org.jboss.mq.pm.PersistenceManager;
040: import org.jboss.mq.pm.Tx;
041:
042: import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
043:
044: /**
045: * This class is a message queue which is stored (hashed by Destination) on the
046: * JMS provider
047: *
048: * @author Norbert Lataille (Norbert.Lataille@m4x.org)
049: * @author Hiram Chirino (Cojonudo14@hotmail.com)
050: * @author David Maplesden (David.Maplesden@orion.co.nz)
051: * @author Adrian Brock (adrian@jboss.com)
052: * @created August 16, 2001
053: * @version $Revision: 61446 $
054: */
055: public class JMSTopic extends JMSDestination {
056:
057: //Hashmap of ExclusiveQueues
058: ConcurrentReaderHashMap durQueues = new ConcurrentReaderHashMap();
059: ConcurrentReaderHashMap tempQueues = new ConcurrentReaderHashMap();
060:
061: public JMSTopic(SpyDestination dest, ClientConsumer temporary,
062: JMSDestinationManager server,
063: BasicQueueParameters parameters) throws JMSException {
064: super (dest, temporary, server, parameters);
065:
066: // This is a bit of a hack, for backwards compatibility
067: PersistenceManager pm = server.getPersistenceManager();
068: parameters.lateClone = (pm instanceof NewPersistenceManager);
069: }
070:
071: public void addSubscriber(Subscription sub) throws JMSException {
072: SpyTopic topic = (SpyTopic) sub.destination;
073: DurableSubscriptionID id = topic.getDurableSubscriptionID();
074:
075: if (id == null) {
076: // create queue
077: ExclusiveQueue q = new ExclusiveQueue(server, destination,
078: sub, parameters);
079:
080: // create topic queue message counter
081: q.createMessageCounter(destination.getName(), q
082: .getDescription(), true, false,
083: parameters.messageCounterHistoryDayLimit);
084:
085: tempQueues.put(sub, q);
086: q.addSubscriber(sub);
087: } else {
088: PersistentQueue q = (PersistentQueue) durQueues.get(id);
089:
090: // Check for already in use
091: if (q != null && q.isInUse())
092: throw new IllegalStateException(
093: "The durable subscription is already in use. "
094: + id);
095:
096: // Check for a changed selector
097: boolean selectorChanged = false;
098: if (q != null) {
099: String newSelector = sub.messageSelector;
100: String oldSelector = null;
101: if (q instanceof SelectorPersistentQueue)
102: oldSelector = ((SelectorPersistentQueue) q).selectorString;
103: if ((newSelector == null && oldSelector != null)
104: || (newSelector != null && newSelector
105: .equals(oldSelector) == false))
106: selectorChanged = true;
107: }
108:
109: if (q == null || //Brand new durable subscriber
110: !q.destination.equals(topic) || selectorChanged) {
111: //subscription changed to new topic
112: server.getStateManager().setDurableSubscription(server,
113: id, topic);
114:
115: // Pickup the new queue
116: synchronized (durQueues) {
117: q = (PersistentQueue) durQueues.get(id);
118: }
119: }
120: q.addSubscriber(sub);
121: }
122: }
123:
124: public void removeSubscriber(Subscription sub) throws JMSException {
125: BasicQueue queue = null;
126: SpyTopic topic = (SpyTopic) sub.destination;
127: DurableSubscriptionID id = topic.getDurableSubscriptionID();
128: if (id == null)
129: queue = (BasicQueue) tempQueues.get(sub);
130: else
131: queue = (BasicQueue) durQueues.get(id);
132: // The queue may be null if the durable subscription
133: // is destroyed before the consumer is unsubscribed!
134: if (queue == null)
135: ((ClientConsumer) sub.clientConsumer)
136: .removeRemovedSubscription(sub.subscriptionId);
137: else
138: queue.removeSubscriber(sub);
139: }
140:
141: public void nackMessages(Subscription sub) throws JMSException {
142: BasicQueue queue = null;
143: SpyTopic topic = (SpyTopic) sub.destination;
144: DurableSubscriptionID id = topic.getDurableSubscriptionID();
145: if (id == null)
146: queue = (BasicQueue) tempQueues.get(sub);
147: else
148: queue = (BasicQueue) durQueues.get(id);
149: if (queue != null) {
150: queue.nackMessages(sub);
151: }
152: }
153:
154: void cleanupSubscription(Subscription sub) {
155: //just try to remove from tempQueues, don't worry if its not there
156: BasicQueue queue = (BasicQueue) tempQueues.remove(sub);
157: try {
158: if (queue != null)
159: queue.removeAllMessages();
160: } catch (JMSException e) {
161: cat.debug(
162: "Error removing messages for subscription " + sub,
163: e);
164: }
165: }
166:
167: public void addReceiver(Subscription sub) throws JMSException {
168: getQueue(sub).addReceiver(sub);
169: }
170:
171: public void removeReceiver(Subscription sub) {
172: try {
173: getQueue(sub).removeReceiver(sub);
174: } catch (JMSException e) {
175: cat.trace("Subscription is not registered: " + sub, e);
176: }
177: }
178:
179: public void restoreMessage(MessageReference messageRef) {
180: try {
181: SpyMessage spyMessage = messageRef.getMessage();
182: updateNextMessageId(spyMessage);
183: if (spyMessage.header.durableSubscriberID == null) {
184: cat
185: .debug("Trying to restore message with null durableSubscriberID");
186: } else {
187: BasicQueue queue = ((BasicQueue) durQueues
188: .get(spyMessage.header.durableSubscriberID));
189: messageRef.queue = queue;
190: queue.restoreMessage(messageRef);
191: }
192: } catch (JMSException e) {
193: cat.error("Could not restore message:", e);
194: }
195: }
196:
197: public void restoreMessage(SpyMessage message, Tx txid, int type) {
198: try {
199: updateNextMessageId(message);
200: if (message.header.durableSubscriberID == null) {
201: cat
202: .debug("Trying to restore message with null durableSubscriberID");
203: } else {
204: BasicQueue queue = (BasicQueue) durQueues
205: .get(message.header.durableSubscriberID);
206: MessageReference messageRef = server.getMessageCache()
207: .add(message, queue, MessageReference.STORED);
208: queue.restoreMessage(messageRef, txid, type);
209: }
210: } catch (JMSException e) {
211: cat.error("Could not restore message:", e);
212: }
213: }
214:
215: public void restoreMessage(SpyMessage message,
216: DurableSubscriptionID id) {
217: try {
218: updateNextMessageId(message);
219: if (id == null) {
220: cat
221: .debug("Trying to restore message with null durableSubscriberID");
222: } else {
223: BasicQueue queue = (BasicQueue) durQueues.get(id);
224: MessageReference messageRef = server.getMessageCache()
225: .add(message, queue, MessageReference.STORED,
226: id);
227: queue.restoreMessage(messageRef);
228: }
229: } catch (JMSException e) {
230: cat.error("Could not restore message:", e);
231: }
232: }
233:
234: //called by state manager when a durable sub is created
235: public void createDurableSubscription(DurableSubscriptionID id)
236: throws JMSException {
237: if (temporaryDestination != null)
238: throw new JMSException(
239: "Not a valid operation on a temporary topic");
240:
241: SpyTopic dstopic = new SpyTopic((SpyTopic) destination, id);
242:
243: Throwable error = null;
244: for (int i = 0; i <= parameters.recoveryRetries; ++i) {
245: // Create a subscription
246: BasicQueue queue;
247: if (id.getSelector() == null)
248: queue = new PersistentQueue(server, dstopic, parameters);
249: else
250: queue = new SelectorPersistentQueue(server, dstopic, id
251: .getSelector(), parameters);
252:
253: // create topic queue message counter
254: queue.createMessageCounter(destination.getName(), id
255: .toString(), true, true,
256: parameters.messageCounterHistoryDayLimit);
257:
258: durQueues.put(id, queue);
259:
260: try {
261: // restore persistent queue data
262: server.getPersistenceManager().restoreQueue(this ,
263: dstopic);
264:
265: // done
266: break;
267: } catch (Throwable t) {
268: if (i < parameters.recoveryRetries)
269: cat.warn("Error restoring topic subscription "
270: + queue + " retries=" + i + " of "
271: + parameters.recoveryRetries, t);
272: else
273: error = t;
274: try {
275: queue.stop();
276: } catch (Throwable ignored) {
277: cat.trace(
278: "Ignored error stopping topic subscription "
279: + queue, ignored);
280: } finally {
281: durQueues.remove(id);
282: queue = null;
283: }
284: }
285: }
286:
287: if (error != null)
288: SpyJMSException.rethrowAsJMSException(
289: "Unable to recover topic subscription " + id
290: + " retries=" + parameters.recoveryRetries,
291: error);
292: }
293:
294: //called by JMSServer when a destination is being closed.
295: public void close() throws JMSException {
296: if (temporaryDestination != null)
297: throw new JMSException(
298: "Not a valid operation on a temporary topic");
299:
300: Iterator i = tempQueues.values().iterator();
301: while (i.hasNext()) {
302: ExclusiveQueue queue = (ExclusiveQueue) i.next();
303: queue.stop();
304: }
305:
306: i = durQueues.values().iterator();
307: while (i.hasNext()) {
308: PersistentQueue queue = (PersistentQueue) i.next();
309: queue.stop();
310: server.getPersistenceManager().closeQueue(this ,
311: queue.getSpyDestination());
312: }
313: }
314:
315: //called by state manager when a durable sub is deleted
316: public void destroyDurableSubscription(DurableSubscriptionID id)
317: throws JMSException {
318: boolean inUse = false;
319: BasicQueue queue = null;
320: synchronized (durQueues) {
321: queue = (BasicQueue) durQueues.get(id);
322: if (queue != null && queue.isInUse())
323: inUse = true;
324: else
325: durQueues.remove(id);
326: }
327: if (queue == null)
328: throw new IllegalStateException(
329: "Unable to find durable subscription: " + id);
330: if (inUse)
331: throw new IllegalStateException(
332: "The durable subscription is in use: " + id);
333: queue.removeAllMessages();
334: }
335:
336: public SpyMessage receive(Subscription sub, boolean wait)
337: throws javax.jms.JMSException {
338: return getQueue(sub).receive(sub, wait);
339: }
340:
341: public void acknowledge(org.jboss.mq.AcknowledgementRequest req,
342: Subscription sub, org.jboss.mq.pm.Tx txId)
343: throws JMSException {
344: getQueue(sub).acknowledge(req, txId);
345: }
346:
347: public void addMessage(SpyMessage message, org.jboss.mq.pm.Tx txId)
348: throws JMSException {
349: StringBuffer errorMessage = null;
350:
351: // Whether the message was added to a persistence queue
352: boolean added = false;
353:
354: //Number the message so that we can preserve order of delivery.
355: long messageId = nextMessageId();
356:
357: if (parameters.lateClone)
358: message.header.messageId = messageId;
359:
360: Iterator iter = durQueues.keySet().iterator();
361: while (iter.hasNext()) {
362: DurableSubscriptionID id = (DurableSubscriptionID) iter
363: .next();
364: PersistentQueue q = (PersistentQueue) durQueues.get(id);
365: MessageReference ref;
366: if (parameters.lateClone) {
367: ref = server.getMessageCache().add(message, q,
368: MessageReference.NOT_STORED, id);
369: } else {
370: SpyMessage clone = message.myClone();
371: clone.header.durableSubscriberID = id;
372: clone.header.messageId = messageId;
373: clone.setJMSDestination(q.getSpyDestination());
374: ref = server.getMessageCache().add(clone, q,
375: MessageReference.NOT_STORED);
376: }
377: try {
378: // For shared blob write it for the first durable subscription
379: if (added == false && parameters.lateClone
380: && ref.isPersistent()) {
381: NewPersistenceManager pm = (NewPersistenceManager) server
382: .getPersistenceManager();
383: pm.addMessage(message);
384: added = true;
385: }
386: q.addMessage(ref, txId);
387: } catch (DestinationFullException e) {
388: if (errorMessage == null)
389: errorMessage = new StringBuffer(e.getText());
390: else
391: errorMessage.append(", ").append(e.getText());
392: }
393: }
394:
395: iter = tempQueues.values().iterator();
396: while (iter.hasNext()) {
397: BasicQueue q = (BasicQueue) iter.next();
398: MessageReference ref;
399: if (parameters.lateClone) {
400: ref = server.getMessageCache().add(message, q,
401: MessageReference.NOT_STORED);
402: } else {
403: SpyMessage clone = message.myClone();
404: clone.header.messageId = messageId;
405: ref = server.getMessageCache().add(clone, q,
406: MessageReference.NOT_STORED);
407: }
408: try {
409: q.addMessage(ref, txId);
410: } catch (DestinationFullException e) {
411: if (errorMessage == null)
412: errorMessage = new StringBuffer(e.getText());
413: else
414: errorMessage.append(", ").append(e.getText());
415: }
416: }
417:
418: if (errorMessage != null)
419: throw new DestinationFullException(errorMessage.toString());
420: }
421:
422: public int getAllMessageCount() {
423: return calculateMessageCount(getAllQueues());
424: }
425:
426: public int getDurableMessageCount() {
427: return calculateMessageCount(getPersistentQueues());
428: }
429:
430: public int getNonDurableMessageCount() {
431: return calculateMessageCount(getTemporaryQueues());
432: }
433:
434: public ArrayList getAllQueues() {
435: ArrayList result = new ArrayList(getAllSubscriptionsCount());
436: result.addAll(getPersistentQueues());
437: result.addAll(getTemporaryQueues());
438: return result;
439: }
440:
441: public ArrayList getTemporaryQueues() {
442: return new ArrayList(tempQueues.values());
443: }
444:
445: public ArrayList getPersistentQueues() {
446: return new ArrayList(durQueues.values());
447: }
448:
449: public int getAllSubscriptionsCount() {
450: return durQueues.size() + tempQueues.size();
451: }
452:
453: public int getDurableSubscriptionsCount() {
454: return durQueues.size();
455: }
456:
457: public int getNonDurableSubscriptionsCount() {
458: return tempQueues.size();
459: }
460:
461: public ArrayList getAllSubscriptions() {
462: ArrayList result = new ArrayList(getAllSubscriptionsCount());
463: result.addAll(getDurableSubscriptions());
464: result.addAll(getNonDurableSubscriptions());
465: return result;
466: }
467:
468: public ArrayList getDurableSubscriptions() {
469: return new ArrayList(durQueues.keySet());
470: }
471:
472: public ArrayList getNonDurableSubscriptions() {
473: return new ArrayList(tempQueues.keySet());
474: }
475:
476: public PersistentQueue getDurableSubscription(
477: DurableSubscriptionID id) {
478: return (PersistentQueue) durQueues.get(id);
479: }
480:
481: public BasicQueue getQueue(Subscription sub) throws JMSException {
482: SpyTopic topic = (SpyTopic) sub.destination;
483: DurableSubscriptionID id = topic.getDurableSubscriptionID();
484: BasicQueue queue = null;
485: if (id != null)
486: queue = getDurableSubscription(id);
487: else
488: queue = (BasicQueue) tempQueues.get(sub);
489:
490: if (queue != null)
491: return queue;
492: else
493: throw new JMSException("Subscription not found: " + sub);
494: }
495:
496: // Package protected ---------------------------------------------
497:
498: /*
499: * @see JMSDestination#isInUse()
500: */
501: public boolean isInUse() {
502: if (tempQueues.size() > 0)
503: return true;
504: Iterator iter = durQueues.values().iterator();
505: while (iter.hasNext()) {
506: PersistentQueue q = (PersistentQueue) iter.next();
507: if (q.isInUse())
508: return true;
509: }
510: return false;
511: }
512:
513: /**
514: * @see JMSDestination#destroy()
515: */
516: public void removeAllMessages() throws JMSException {
517: Iterator i = durQueues.values().iterator();
518: while (i.hasNext()) {
519: PersistentQueue queue = (PersistentQueue) i.next();
520: queue.removeAllMessages();
521: }
522: }
523:
524: private int calculateMessageCount(ArrayList queues) {
525: int count = 0;
526: for (Iterator i = queues.listIterator(); i.hasNext();) {
527: BasicQueue queue = (BasicQueue) i.next();
528: count += queue.getQueueDepth();
529: }
530: return count;
531: }
532:
533: /**
534: * Get message counter of all topic internal queues
535: *
536: * @return MessageCounter[] topic queue message counter array
537: */
538: public MessageCounter[] getMessageCounter() {
539: TreeMap map = new TreeMap();
540:
541: Iterator i = durQueues.values().iterator();
542:
543: while (i.hasNext()) {
544: BasicQueue queue = (BasicQueue) i.next();
545: MessageCounter counter = queue.getMessageCounter();
546:
547: if (counter != null) {
548: String key = counter.getDestinationName()
549: + counter.getDestinationSubscription();
550: map.put(key, counter);
551: }
552: }
553:
554: i = tempQueues.values().iterator();
555:
556: while (i.hasNext()) {
557: BasicQueue queue = (BasicQueue) i.next();
558: MessageCounter counter = queue.getMessageCounter();
559:
560: if (counter != null) {
561: String key = counter.getDestinationName()
562: + counter.getDestinationSubscription();
563: map.put(key, counter);
564: }
565: }
566:
567: return (MessageCounter[]) map.values().toArray(
568: new MessageCounter[0]);
569: }
570: }
|