0001: /*
0002: * JBoss, Home of Professional Open Source.
0003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
0004: * as indicated by the @author tags. See the copyright.txt file in the
0005: * distribution for a full listing of individual contributors.
0006: *
0007: * This is free software; you can redistribute it and/or modify it
0008: * under the terms of the GNU Lesser General Public License as
0009: * published by the Free Software Foundation; either version 2.1 of
0010: * the License, or (at your option) any later version.
0011: *
0012: * This software is distributed in the hope that it will be useful,
0013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
0014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
0015: * Lesser General Public License for more details.
0016: *
0017: * You should have received a copy of the GNU Lesser General Public
0018: * License along with this software; if not, write to the Free
0019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
0020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
0021: */
0022: package org.jboss.mq.server;
0023:
0024: import java.util.ArrayList;
0025: import java.util.Date;
0026: import java.util.HashMap;
0027: import java.util.HashSet;
0028: import java.util.Iterator;
0029: import java.util.LinkedList;
0030: import java.util.List;
0031: import java.util.Map;
0032: import java.util.Set;
0033: import java.util.SortedSet;
0034: import java.util.TreeSet;
0035:
0036: import javax.jms.IllegalStateException;
0037: import javax.jms.JMSException;
0038:
0039: import org.jboss.logging.Logger;
0040: import org.jboss.mq.AcknowledgementRequest;
0041: import org.jboss.mq.DestinationFullException;
0042: import org.jboss.mq.SpyDestination;
0043: import org.jboss.mq.SpyJMSException;
0044: import org.jboss.mq.SpyMessage;
0045: import org.jboss.mq.Subscription;
0046: import org.jboss.mq.pm.Tx;
0047: import org.jboss.mq.pm.TxManager;
0048: import org.jboss.mq.selectors.Selector;
0049: import org.jboss.util.NestedRuntimeException;
0050: import org.jboss.util.timeout.Timeout;
0051: import org.jboss.util.timeout.TimeoutTarget;
0052:
0053: import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
0054: import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArraySet;
0055:
0056: /**
0057: * This class represents a queue which provides it's messages exclusively to one
0058: * consumer at a time.<p>
0059: *
0060: * Notes about synchronization: Much of the work is synchronized on
0061: * the receivers or messages depending on the work performed.
0062: * However, anything to do with unacknowledged messages and removed
0063: * subscriptions must be done synchronized on both (receivers first).
0064: * This is because there are multiple entry points with the possibility
0065: * that a message acknowledgement (or NACK) is being processed at
0066: * the same time as a network failure removes the subscription.
0067: *
0068: *
0069: * @author Hiram Chirino (Cojonudo14@hotmail.com)
0070: * @author Norbert Lataille (Norbert.Lataille@m4x.org)
0071: * @author David Maplesden (David.Maplesden@orion.co.nz)
0072: * @author Adrian Brock (Adrian@jboss.org)
0073: * @created August 16, 2001
0074: * @version $Revision: 61446 $
0075: */
0076: public class BasicQueue {
0077: static final Logger log = Logger.getLogger(BasicQueue.class);
0078:
0079: /** List of messages waiting to be dispatched<p>
0080: synchronized access on itself */
0081: SortedSet messages = new TreeSet();
0082:
0083: /** Events by message id */
0084: ConcurrentHashMap events = new ConcurrentHashMap();
0085:
0086: /** The scheduled messages */
0087: CopyOnWriteArraySet scheduledMessages = new CopyOnWriteArraySet();
0088:
0089: /** The JMSServer object */
0090: JMSDestinationManager server;
0091:
0092: /** The subscribers waiting for messages - synchronized access on itself */
0093: Receivers receivers;
0094:
0095: /** The description used to seperate persistence for multiple subscriptions to a topic */
0096: String description;
0097:
0098: /** Simple Counter for gathering message add statistic data */
0099: MessageCounter counter;
0100:
0101: /** Unacknowledged messages AcknowledgementRequest -> UnackedMessageInfo<p>
0102: synchronized access on receivers and messages */
0103: HashMap unacknowledgedMessages = new HashMap();
0104: /** Unacknowledged messages MessageRef -> UnackedMessageInfo <p>
0105: synchronized access on receivers and messages */
0106: HashMap unackedByMessageRef = new HashMap();
0107: /** Unacknowledged messages Subscription -> UnackedMessageInfo <p>
0108: synchronized access on receivers and messages */
0109: HashMap unackedBySubscription = new HashMap();
0110:
0111: /** Subscribers <p>
0112: synchronized access on receivers */
0113: HashSet subscribers = new HashSet();
0114:
0115: /** Removed subscribers <p>
0116: synchronized access on receivers and messages */
0117: HashSet removedSubscribers = new HashSet();
0118:
0119: /** The basic queue parameters */
0120: BasicQueueParameters parameters;
0121:
0122: /** Have we been stopped */
0123: boolean stopped = false;
0124:
0125: /**
0126: * Construct a new basic queue
0127: *
0128: * @param server the destination manager
0129: * @param description a description to uniquely identify the queue
0130: * @param parameters the basic queue parameters
0131: * @throws JMSException for any error
0132: */
0133: public BasicQueue(JMSDestinationManager server, String description,
0134: BasicQueueParameters parameters) throws JMSException {
0135: this .server = server;
0136: this .description = description;
0137: this .parameters = parameters;
0138:
0139: Class receiversImpl = parameters.receiversImpl;
0140: if (receiversImpl == null)
0141: receiversImpl = ReceiversImpl.class;
0142:
0143: try {
0144: receivers = (Receivers) receiversImpl.newInstance();
0145: } catch (Throwable t) {
0146: throw new SpyJMSException(
0147: "Error instantiating receivers implementation: "
0148: + receiversImpl, t);
0149: }
0150: }
0151:
0152: /**
0153: * Retrieve the unique description for this queue
0154: *
0155: * @return the description
0156: */
0157: public String getDescription() {
0158: return description;
0159: }
0160:
0161: /**
0162: * Retrieve the number of receivers waiting for a message
0163: *
0164: * @return the number of receivers
0165: */
0166: public int getReceiversCount() {
0167: return receivers.size();
0168: }
0169:
0170: /**
0171: * Retrieve the receivers waiting for a message
0172: *
0173: * @return an array of subscriptions
0174: */
0175: public ArrayList getReceivers() {
0176: synchronized (receivers) {
0177: return receivers.listReceivers();
0178: }
0179: }
0180:
0181: /**
0182: * Test whether the queue is in use
0183: *
0184: * @return true when there are subscribers
0185: */
0186: public boolean isInUse() {
0187: synchronized (receivers) {
0188: // In use if we have subscribers or there are unacknowledged messages
0189: return (subscribers.isEmpty() == false || getInProcessMessageCount() > 0);
0190: }
0191: }
0192:
0193: /**
0194: * Add a receiver to the queue
0195: *
0196: * @param sub the subscription to add
0197: * @throws JMSException for any error
0198: */
0199: public void addReceiver(Subscription sub) throws JMSException {
0200: boolean trace = log.isTraceEnabled();
0201: if (trace)
0202: log.trace("addReceiver " + sub + " " + this );
0203:
0204: MessageReference found = null;
0205: synchronized (messages) {
0206: if (messages.size() != 0) {
0207: for (Iterator it = messages.iterator(); it.hasNext();) {
0208: MessageReference message = (MessageReference) it
0209: .next();
0210: try {
0211: if (message.isExpired()) {
0212: it.remove();
0213: expireMessageAsync(message);
0214: } else if (sub.accepts(message.getHeaders())) {
0215: //queue message for sending to this sub
0216: it.remove();
0217: found = message;
0218: break;
0219: }
0220: } catch (JMSException ignore) {
0221: log
0222: .info(
0223: "Caught unusual exception in addToReceivers.",
0224: ignore);
0225: }
0226: }
0227: }
0228: }
0229: if (found != null)
0230: queueMessageForSending(sub, found);
0231: else
0232: addToReceivers(sub);
0233: }
0234:
0235: /**
0236: * Get the subscribers
0237: *
0238: * @return the subscribers
0239: */
0240: public Set getSubscribers() {
0241: synchronized (receivers) {
0242: return (Set) subscribers.clone();
0243: }
0244: }
0245:
0246: /**
0247: * Add a subscription from the queue
0248: *
0249: * @param sub the subscription to add
0250: * @throws JMSException for any error
0251: */
0252: public void addSubscriber(Subscription sub) throws JMSException {
0253: boolean trace = log.isTraceEnabled();
0254: if (trace)
0255: log.trace("addSubscriber " + sub + " " + this );
0256: synchronized (receivers) {
0257: if (stopped)
0258: throw new IllegalStateException(
0259: "The destination is stopped "
0260: + getDescription());
0261: subscribers.add(sub);
0262: }
0263: }
0264:
0265: /**
0266: * Removes a subscription from the queue
0267: *
0268: * @param sub the subscription to remove
0269: */
0270: public void removeSubscriber(Subscription sub) {
0271: boolean trace = log.isTraceEnabled();
0272: if (trace)
0273: log.trace("removeSubscriber " + sub + " " + this );
0274: synchronized (receivers) {
0275: removeReceiver(sub);
0276: synchronized (messages) {
0277: if (hasUnackedMessages(sub)) {
0278: if (trace)
0279: log
0280: .trace("Delaying removal of subscriber is has unacked messages "
0281: + sub);
0282: removedSubscribers.add(sub);
0283: } else {
0284: if (trace)
0285: log.trace("Removing subscriber " + sub);
0286: subscribers.remove(sub);
0287: ((ClientConsumer) sub.clientConsumer)
0288: .removeRemovedSubscription(sub.subscriptionId);
0289: }
0290: }
0291: }
0292: }
0293:
0294: /**
0295: * Retrieve the queue depth
0296: *
0297: * @return the number of messages in the queue
0298: */
0299: public int getQueueDepth() {
0300: return messages.size();
0301: }
0302:
0303: /**
0304: * Returns the number of scheduled messages in the queue
0305: *
0306: * @return the scheduled message count
0307: */
0308: public int getScheduledMessageCount() {
0309: return scheduledMessages.size();
0310: }
0311:
0312: /**
0313: * Returns the number of in process messages for the queue
0314: *
0315: * @return the in process count
0316: */
0317: public int getInProcessMessageCount() {
0318: synchronized (messages) {
0319: return unacknowledgedMessages.size();
0320: }
0321: }
0322:
0323: /**
0324: * Add a message to the queue
0325: *
0326: * @param mes the message reference
0327: * @param txId the transaction
0328: * @throws JMSException for any error
0329: */
0330: public void addMessage(MessageReference mes, Tx txId)
0331: throws JMSException {
0332: boolean trace = log.isTraceEnabled();
0333: if (trace)
0334: log.trace("addMessage " + mes + " " + txId + " " + this );
0335:
0336: try {
0337: synchronized (receivers) {
0338: if (stopped)
0339: throw new IllegalStateException(
0340: "The destination is stopped "
0341: + getDescription());
0342: }
0343:
0344: if (parameters.maxDepth > 0) {
0345: synchronized (messages) {
0346: if (messages.size() >= parameters.maxDepth) {
0347: dropMessage(mes);
0348: String message = "Maximum size "
0349: + parameters.maxDepth
0350: + " exceeded for " + description;
0351: log.warn(message);
0352: throw new DestinationFullException(message);
0353: }
0354: }
0355: }
0356:
0357: performOrPrepareAddMessage(mes, txId);
0358: } catch (Throwable t) {
0359: String error = "Error in addMessage " + mes;
0360: log.trace(error, t);
0361: dropMessage(mes, txId);
0362: SpyJMSException.rethrowAsJMSException(error, t);
0363: }
0364: }
0365:
0366: /**
0367: * Either perform or prepare the add message
0368: *
0369: * @param mes the message reference
0370: * @param txId the transaction id
0371: * @throws Exception for any error
0372: */
0373: protected void performOrPrepareAddMessage(MessageReference mes,
0374: Tx txId) throws Exception {
0375: TxManager txManager = server.getPersistenceManager()
0376: .getTxManager();
0377:
0378: // The message is removed from the cache on a rollback
0379: Runnable task = new AddMessagePostRollBackTask(mes);
0380: txManager.addPostRollbackTask(txId, task);
0381:
0382: // The message gets added to the queue after the transaction commits
0383: task = new AddMessagePostCommitTask(mes);
0384: txManager.addPostCommitTask(txId, task);
0385: }
0386:
0387: /**
0388: * Restores a message.
0389: *
0390: * @param mes the message reference
0391: */
0392: public void restoreMessage(MessageReference mes) {
0393: restoreMessage(mes, null, Tx.UNKNOWN);
0394: }
0395:
0396: /**
0397: * Restores a message.
0398: *
0399: * @param mes the message reference
0400: * @param txid the transaction id
0401: * @param type the type of restoration
0402: */
0403: public void restoreMessage(MessageReference mes, Tx txid, int type) {
0404: boolean trace = log.isTraceEnabled();
0405: if (trace)
0406: log.trace("restoreMessage " + mes + " " + this + " txid="
0407: + txid + " type=" + type);
0408:
0409: try {
0410: if (txid == null) {
0411: internalAddMessage(mes);
0412: } else if (type == Tx.ADD) {
0413: performOrPrepareAddMessage(mes, txid);
0414: } else if (type == Tx.REMOVE) {
0415: performOrPrepareAcknowledgeMessage(mes, txid);
0416: } else {
0417: throw new IllegalStateException("Unknown restore type "
0418: + type + " for message " + mes + " txid="
0419: + txid);
0420: }
0421: } catch (RuntimeException e) {
0422: throw e;
0423: } catch (Exception e) {
0424: throw new NestedRuntimeException(
0425: "Unable to restore message " + mes, e);
0426: }
0427: }
0428:
0429: /**
0430: * Nacks a message.
0431: */
0432: protected void nackMessage(MessageReference message) {
0433: if (log.isTraceEnabled())
0434: log.trace("Restoring message: " + message);
0435:
0436: try {
0437: message.redelivered();
0438: // Set redelivered, vendor-specific flags
0439: message.invalidate();
0440: // Update the persistent message outside the transaction
0441: // We want to know the message might have been delivered regardless
0442: if (message.isPersistent())
0443: server.getPersistenceManager().update(message, null);
0444: } catch (JMSException e) {
0445: log.error("Caught unusual exception in nackMessage for "
0446: + message, e);
0447: }
0448:
0449: internalAddMessage(message);
0450: }
0451:
0452: /**
0453: * Browse the queue
0454: *
0455: * @param selector the selector to apply, pass null for
0456: * all messages
0457: * @return the messages
0458: * @throws JMSException for any error
0459: */
0460: public SpyMessage[] browse(String selector) throws JMSException {
0461: if (selector == null) {
0462: SpyMessage list[];
0463: synchronized (messages) {
0464: list = new SpyMessage[messages.size()];
0465: Iterator iter = messages.iterator();
0466: for (int i = 0; iter.hasNext(); i++)
0467: list[i] = ((MessageReference) iter.next())
0468: .getMessageForDelivery();
0469: }
0470: return list;
0471: } else {
0472: Selector s = new Selector(selector);
0473: LinkedList selection = new LinkedList();
0474:
0475: synchronized (messages) {
0476: Iterator i = messages.iterator();
0477: while (i.hasNext()) {
0478: MessageReference m = (MessageReference) i.next();
0479: if (s.test(m.getHeaders()))
0480: selection.add(m.getMessageForDelivery());
0481: }
0482: }
0483:
0484: SpyMessage list[];
0485: list = new SpyMessage[selection.size()];
0486: list = (SpyMessage[]) selection.toArray(list);
0487: return list;
0488: }
0489: }
0490:
0491: /**
0492: * Browse the scheduled messages
0493: *
0494: * @param selector the selector to apply, pass null for
0495: * all messages
0496: * @return the messages
0497: * @throws JMSException for any error
0498: */
0499: public List browseScheduled(String selector) throws JMSException {
0500: if (selector == null) {
0501: ArrayList list;
0502: synchronized (messages) {
0503: list = new ArrayList(scheduledMessages.size());
0504: Iterator iter = scheduledMessages.iterator();
0505: while (iter.hasNext()) {
0506: MessageReference ref = (MessageReference) iter
0507: .next();
0508: list.add(ref.getMessageForDelivery());
0509: }
0510: }
0511: return list;
0512: } else {
0513: Selector s = new Selector(selector);
0514: LinkedList selection = new LinkedList();
0515:
0516: synchronized (messages) {
0517: Iterator iter = scheduledMessages.iterator();
0518: while (iter.hasNext()) {
0519: MessageReference ref = (MessageReference) iter
0520: .next();
0521: if (s.test(ref.getHeaders()))
0522: selection.add(ref.getMessageForDelivery());
0523: }
0524: }
0525:
0526: return selection;
0527: }
0528: }
0529:
0530: /**
0531: * Browse the in process messages
0532: *
0533: * @param selector the selector to apply, pass null for
0534: * all messages
0535: * @return the messages
0536: * @throws JMSException for any error
0537: */
0538: public List browseInProcess(String selector) throws JMSException {
0539: if (selector == null) {
0540: ArrayList list;
0541: synchronized (messages) {
0542: list = new ArrayList(unacknowledgedMessages.size());
0543: Iterator iter = unacknowledgedMessages.values()
0544: .iterator();
0545: while (iter.hasNext()) {
0546: UnackedMessageInfo unacked = (UnackedMessageInfo) iter
0547: .next();
0548: MessageReference ref = unacked.messageRef;
0549: list.add(ref.getMessageForDelivery());
0550: }
0551: }
0552: return list;
0553: } else {
0554: Selector s = new Selector(selector);
0555: LinkedList selection = new LinkedList();
0556:
0557: synchronized (messages) {
0558: Iterator iter = unacknowledgedMessages.values()
0559: .iterator();
0560: while (iter.hasNext()) {
0561: UnackedMessageInfo unacked = (UnackedMessageInfo) iter
0562: .next();
0563: MessageReference ref = unacked.messageRef;
0564: if (s.test(ref.getHeaders()))
0565: selection.add(ref.getMessageForDelivery());
0566: }
0567: }
0568:
0569: return selection;
0570: }
0571: }
0572:
0573: /**
0574: * Receive a message from the queue
0575: *
0576: * @param sub the subscription requiring a message
0577: * @param wait whether to wait for a message
0578: * @return the message
0579: * @throws JMSException for any error
0580: */
0581: public SpyMessage receive(Subscription sub, boolean wait)
0582: throws JMSException {
0583: boolean trace = log.isTraceEnabled();
0584: if (trace)
0585: log.trace("receive " + sub + " wait=" + wait + " " + this );
0586:
0587: MessageReference messageRef = null;
0588: synchronized (receivers) {
0589: if (stopped)
0590: throw new IllegalStateException(
0591: "The destination is stopped "
0592: + getDescription());
0593: // If the subscription is not picky, the first message will be it
0594: if (sub.getSelector() == null && sub.noLocal == false) {
0595: synchronized (messages) {
0596: // find a non-expired message
0597: while (messages.size() != 0) {
0598: messageRef = (MessageReference) messages
0599: .first();
0600: messages.remove(messageRef);
0601:
0602: if (messageRef.isExpired()) {
0603: expireMessageAsync(messageRef);
0604: messageRef = null;
0605: } else
0606: break;
0607: }
0608: }
0609: } else {
0610: // The subscription is picky, so we have to iterate.
0611: synchronized (messages) {
0612: Iterator i = messages.iterator();
0613: while (i.hasNext()) {
0614: MessageReference mr = (MessageReference) i
0615: .next();
0616: if (mr.isExpired()) {
0617: i.remove();
0618: expireMessageAsync(mr);
0619: } else if (sub.accepts(mr.getHeaders())) {
0620: messageRef = mr;
0621: i.remove();
0622: break;
0623: }
0624: }
0625: }
0626: }
0627:
0628: if (messageRef == null) {
0629: if (wait)
0630: addToReceivers(sub);
0631: } else {
0632: setupMessageAcknowledgement(sub, messageRef);
0633: }
0634: }
0635:
0636: if (messageRef == null)
0637: return null;
0638: return messageRef.getMessageForDelivery();
0639: }
0640:
0641: /**
0642: * Acknowledge a message
0643: *
0644: * @param item the acknowledgement request
0645: * @param txId the transaction
0646: * @throws JMSException for any error
0647: */
0648: public void acknowledge(AcknowledgementRequest item, Tx txId)
0649: throws JMSException {
0650: boolean trace = log.isTraceEnabled();
0651: if (trace)
0652: log.trace("acknowledge " + item + " " + txId + " " + this );
0653:
0654: UnackedMessageInfo unacked = null;
0655: synchronized (messages) {
0656: unacked = (UnackedMessageInfo) unacknowledgedMessages
0657: .remove(item);
0658: if (unacked == null)
0659: return;
0660: unackedByMessageRef.remove(unacked.messageRef);
0661: HashMap map = (HashMap) unackedBySubscription
0662: .get(unacked.sub);
0663: if (map != null)
0664: map.remove(unacked.messageRef);
0665: if (map == null || map.isEmpty())
0666: unackedBySubscription.remove(unacked.sub);
0667: }
0668:
0669: MessageReference m = unacked.messageRef;
0670:
0671: // Was it a negative acknowledge??
0672: if (!item.isAck) {
0673: Runnable task = new RestoreMessageTask(m);
0674: server.getPersistenceManager().getTxManager()
0675: .addPostCommitTask(txId, task);
0676: server.getPersistenceManager().getTxManager()
0677: .addPostRollbackTask(txId, task);
0678: } else {
0679: try {
0680: if (m.isPersistent())
0681: server.getPersistenceManager().remove(m, txId);
0682: } catch (Throwable t) {
0683: // Something is wrong with the persistence manager,
0684: // force a NACK with a rollback/error
0685: Runnable task = new RestoreMessageTask(m);
0686: server.getPersistenceManager().getTxManager()
0687: .addPostCommitTask(txId, task);
0688: server.getPersistenceManager().getTxManager()
0689: .addPostRollbackTask(txId, task);
0690: SpyJMSException.rethrowAsJMSException(
0691: "Error during ACK ref=" + m, t);
0692: }
0693:
0694: performOrPrepareAcknowledgeMessage(m, txId);
0695: }
0696:
0697: synchronized (receivers) {
0698: synchronized (messages) {
0699: checkRemovedSubscribers(unacked.sub);
0700: }
0701: }
0702: }
0703:
0704: /**
0705: * Either perform or prepare the acknowledge message
0706: *
0707: * @param mes the message reference
0708: * @param txId the transaction id
0709: * @throws Exception for any error
0710: */
0711: protected void performOrPrepareAcknowledgeMessage(
0712: MessageReference mes, Tx txId) throws JMSException {
0713: TxManager txManager = server.getPersistenceManager()
0714: .getTxManager();
0715:
0716: // The message is restored to the queue on a rollback
0717: Runnable task = new RestoreMessageTask(mes);
0718: txManager.addPostRollbackTask(txId, task);
0719:
0720: // The message is fully removed after the transaction commits
0721: task = new RemoveMessageTask(mes);
0722: txManager.addPostCommitTask(txId, task);
0723: }
0724:
0725: /**
0726: * Nack all messages for a subscription
0727: *
0728: * @param sub the subscription
0729: */
0730: public void nackMessages(Subscription sub) {
0731: boolean trace = log.isTraceEnabled();
0732: if (trace)
0733: log.trace("nackMessages " + sub + " " + this );
0734:
0735: // Send nacks for unacknowledged messages
0736: synchronized (receivers) {
0737: synchronized (messages) {
0738: int count = 0;
0739: HashMap map = (HashMap) unackedBySubscription.get(sub);
0740: if (map != null) {
0741: Iterator i = ((HashMap) map.clone()).values()
0742: .iterator();
0743: while (i.hasNext()) {
0744: AcknowledgementRequest item = (AcknowledgementRequest) i
0745: .next();
0746: try {
0747: acknowledge(item, null);
0748: count++;
0749: } catch (JMSException ignore) {
0750: log.debug(
0751: "Unable to nack message: " + item,
0752: ignore);
0753: }
0754: }
0755: if (log.isDebugEnabled())
0756: log.debug("Nacked " + count
0757: + " messages for removed subscription "
0758: + sub);
0759: }
0760: }
0761: }
0762: }
0763:
0764: public void removeAllMessages() throws JMSException {
0765: boolean trace = log.isTraceEnabled();
0766: if (trace)
0767: log.trace("removeAllMessages " + this );
0768:
0769: // Drop scheduled messages
0770: for (Iterator i = events.entrySet().iterator(); i.hasNext();) {
0771: Map.Entry entry = (Map.Entry) i.next();
0772: MessageReference message = (MessageReference) entry
0773: .getKey();
0774: Timeout timeout = (Timeout) entry.getValue();
0775: if (timeout != null) {
0776: timeout.cancel();
0777: i.remove();
0778: dropMessage(message);
0779: }
0780: }
0781: scheduledMessages.clear();
0782:
0783: synchronized (receivers) {
0784: synchronized (messages) {
0785: Iterator i = ((HashMap) unacknowledgedMessages.clone())
0786: .keySet().iterator();
0787: while (i.hasNext()) {
0788: AcknowledgementRequest item = (AcknowledgementRequest) i
0789: .next();
0790: try {
0791: acknowledge(item, null);
0792: } catch (JMSException ignore) {
0793: }
0794: }
0795:
0796: // Remove all remaining messages
0797: i = messages.iterator();
0798: while (i.hasNext()) {
0799: MessageReference message = (MessageReference) i
0800: .next();
0801: i.remove();
0802: dropMessage(message);
0803: }
0804: }
0805: }
0806: }
0807:
0808: public void stop() {
0809: HashSet subs;
0810: synchronized (receivers) {
0811: stopped = true;
0812: subs = new HashSet(subscribers);
0813: if (log.isTraceEnabled())
0814: log.trace("Stopping " + this + " with subscribers "
0815: + subs);
0816: clearEvents();
0817: }
0818:
0819: for (Iterator i = subs.iterator(); i.hasNext();) {
0820: Subscription sub = (Subscription) i.next();
0821: ClientConsumer consumer = (ClientConsumer) sub.clientConsumer;
0822: try {
0823: consumer.removeSubscription(sub.subscriptionId);
0824: } catch (Throwable t) {
0825: log.warn("Error during stop - removing subscriber "
0826: + sub, t);
0827: }
0828: nackMessages(sub);
0829: }
0830:
0831: MessageCache cache = server.getMessageCache();
0832: synchronized (messages) {
0833: for (Iterator i = messages.iterator(); i.hasNext();) {
0834: MessageReference message = (MessageReference) i.next();
0835: try {
0836: cache.remove(message);
0837: } catch (JMSException ignored) {
0838: log
0839: .trace(
0840: "Ignored error removing message from cache",
0841: ignored);
0842: }
0843: }
0844: }
0845:
0846: // Help the garbage collector
0847: messages.clear();
0848: unacknowledgedMessages.clear();
0849: unackedByMessageRef.clear();
0850: unackedBySubscription.clear();
0851: subscribers.clear();
0852: removedSubscribers.clear();
0853: }
0854:
0855: /**
0856: * Create message counter object
0857: *
0858: * @param name topic/queue name
0859: * @param subscription topic subscription
0860: * @param topic topic flag
0861: * @param durable durable subscription flag
0862: * @param daycountmax message history day count limit
0863: * 0: disabled,
0864: * >0: max day count,
0865: * <0: unlimited
0866: */
0867: public void createMessageCounter(String name, String subscription,
0868: boolean topic, boolean durable, int daycountmax) {
0869: // create message counter object
0870: counter = new MessageCounter(name, subscription, this , topic,
0871: durable, daycountmax);
0872: }
0873:
0874: /**
0875: * Get message counter object
0876: *
0877: * @return MessageCounter message counter object or null
0878: */
0879: public MessageCounter getMessageCounter() {
0880: return counter;
0881: }
0882:
0883: public String toString() {
0884: return super .toString() + "{id=" + description + '}';
0885: }
0886:
0887: /**
0888: * Clear all the events
0889: */
0890: protected void clearEvents() {
0891: for (Iterator i = events.entrySet().iterator(); i.hasNext();) {
0892: Map.Entry entry = (Map.Entry) i.next();
0893: Timeout timeout = (Timeout) entry.getValue();
0894: if (timeout != null) {
0895: timeout.cancel();
0896: i.remove();
0897: }
0898: }
0899: scheduledMessages.clear();
0900: }
0901:
0902: /**
0903: * Clear the event for a message
0904: *
0905: * @param message the message reference
0906: */
0907: protected void clearEvent(MessageReference message) {
0908: Timeout timeout = (Timeout) events.remove(message);
0909: if (timeout != null)
0910: timeout.cancel();
0911: scheduledMessages.remove(message);
0912: }
0913:
0914: /**
0915: * Add a receiver
0916: *
0917: * @param sub the receiver to add
0918: */
0919: protected void addToReceivers(Subscription sub) throws JMSException {
0920: boolean trace = log.isTraceEnabled();
0921: if (trace)
0922: log.trace("addReceiver " + " " + sub + " " + this );
0923:
0924: synchronized (receivers) {
0925: if (stopped)
0926: throw new IllegalStateException(
0927: "The destination is stopped "
0928: + getDescription());
0929: receivers.add(sub);
0930: }
0931: }
0932:
0933: /**
0934: * Remove a receiver
0935: *
0936: * @param sub the receiver to remove
0937: */
0938: protected void removeReceiver(Subscription sub) {
0939: boolean trace = log.isTraceEnabled();
0940: if (trace)
0941: log.trace("removeReceiver " + " " + sub + " " + this );
0942:
0943: synchronized (receivers) {
0944: receivers.remove(sub);
0945: }
0946: }
0947:
0948: private void addTimeout(MessageReference message, TimeoutTarget t,
0949: long ts) {
0950: Timeout timeout = server.getTimeoutFactory().schedule(ts, t);
0951: events.put(message, timeout);
0952: }
0953:
0954: /**
0955: * Add a message
0956: *
0957: * @param message the message to add
0958: */
0959: private void internalAddMessage(MessageReference message) {
0960: boolean trace = log.isTraceEnabled();
0961: if (trace)
0962: log.trace("internalAddMessage " + " " + message + " "
0963: + this );
0964:
0965: // If scheduled, put in timer queue
0966: long ts = message.messageScheduledDelivery;
0967: if (ts > 0 && ts > System.currentTimeMillis()) {
0968: scheduledMessages.add(message);
0969: addTimeout(message, new EnqueueMessageTask(message), ts);
0970: if (trace)
0971: log.trace("scheduled message at " + new Date(ts) + ": "
0972: + message);
0973: // Can't deliver now
0974: return;
0975: }
0976:
0977: // Don't bother with expired messages
0978: if (message.isExpired()) {
0979: expireMessageAsync(message);
0980: return;
0981: }
0982:
0983: try {
0984: Subscription found = null;
0985: synchronized (receivers) {
0986: if (receivers.size() != 0) {
0987: for (Iterator it = receivers.iterator(); it
0988: .hasNext();) {
0989: Subscription sub = (Subscription) it.next();
0990: if (sub.accepts(message.getHeaders())) {
0991: it.remove();
0992: found = sub;
0993: break;
0994: }
0995: }
0996: }
0997:
0998: if (found == null) {
0999: synchronized (messages) {
1000: messages.add(message);
1001:
1002: // If a message is set to expire, and nobody wants it, put its reaper in
1003: // the timer queue
1004: if (message.messageExpiration > 0) {
1005: addTimeout(message, new ExpireMessageTask(
1006: message), message.messageExpiration);
1007: }
1008: }
1009: }
1010: }
1011:
1012: // Queue to the receiver
1013: if (found != null)
1014: queueMessageForSending(found, message);
1015: } catch (JMSException e) {
1016: // Could happen at the accepts() calls
1017: log.error(
1018: "Caught unusual exception in internalAddMessage.",
1019: e);
1020: // And drop the message, otherwise we have a leak in the cache
1021: dropMessage(message);
1022: }
1023: }
1024:
1025: /**
1026: * Queue a message for sending through the client consumer
1027: *
1028: * @param sub the subscirption to receive the message
1029: * @param message the message reference to queue
1030: */
1031: protected void queueMessageForSending(Subscription sub,
1032: MessageReference message) {
1033: boolean trace = log.isTraceEnabled();
1034: if (trace)
1035: log.trace("queueMessageForSending " + " " + sub + " "
1036: + message + " " + this );
1037:
1038: try {
1039: setupMessageAcknowledgement(sub, message);
1040: RoutedMessage r = new RoutedMessage();
1041: r.message = message;
1042: r.subscriptionId = new Integer(sub.subscriptionId);
1043: ((ClientConsumer) sub.clientConsumer)
1044: .queueMessageForSending(r);
1045: } catch (Throwable t) {
1046: log
1047: .warn(
1048: "Caught unusual exception sending message to receiver.",
1049: t);
1050: }
1051: }
1052:
1053: /**
1054: * Setup a message acknowledgement
1055: *
1056: * @param sub the subscription receiving the message
1057: * @param messageRef the message to be acknowledged
1058: * @throws JMSException for any error
1059: */
1060: protected void setupMessageAcknowledgement(Subscription sub,
1061: MessageReference messageRef) throws JMSException {
1062: SpyMessage message = messageRef.getMessage();
1063: AcknowledgementRequest nack = new AcknowledgementRequest(false);
1064: nack.destination = message.getJMSDestination();
1065: nack.messageID = message.getJMSMessageID();
1066: nack.subscriberId = sub.subscriptionId;
1067:
1068: synchronized (messages) {
1069: UnackedMessageInfo unacked = new UnackedMessageInfo(
1070: messageRef, sub);
1071: unacknowledgedMessages.put(nack, unacked);
1072: unackedByMessageRef.put(messageRef, nack);
1073: HashMap map = (HashMap) unackedBySubscription.get(sub);
1074: if (map == null) {
1075: map = new HashMap();
1076: unackedBySubscription.put(sub, map);
1077: }
1078: map.put(messageRef, nack);
1079: }
1080: }
1081:
1082: /**
1083: * Remove a message
1084: *
1085: * @param message the message to remove
1086: */
1087: protected void dropMessage(MessageReference message) {
1088: dropMessage(message, null);
1089: }
1090:
1091: /**
1092: * Remove a message
1093: *
1094: * @param message the message to remove
1095: * @param txid the transaction context for the removal
1096: */
1097: protected void dropMessage(MessageReference message, Tx txid) {
1098: boolean trace = log.isTraceEnabled();
1099: if (trace)
1100: log.trace("dropMessage " + this + " txid=" + txid);
1101:
1102: clearEvent(message);
1103: try {
1104: if (message.isPersistent()) {
1105: try {
1106: server.getPersistenceManager()
1107: .remove(message, txid);
1108: } catch (JMSException e) {
1109: try {
1110: log.warn(
1111: "Message removed from queue, but not from the persistent store: "
1112: + message.getMessage(), e);
1113: } catch (JMSException x) {
1114: log.warn(
1115: "Message removed from queue, but not from the persistent store: "
1116: + message, e);
1117: }
1118: }
1119: }
1120: server.getMessageCache().remove(message);
1121: } catch (JMSException e) {
1122: log.warn("Error dropping message " + message, e);
1123: }
1124: }
1125:
1126: /**
1127: * Expire a message asynchronously.
1128: *
1129: * @param messageRef the message to remove
1130: */
1131: protected void expireMessageAsync(MessageReference messageRef) {
1132: server.getThreadPool().run(new ExpireMessageTask(messageRef));
1133: }
1134:
1135: /**
1136: * Expire a message
1137: *
1138: * @param messageRef the message to remove
1139: */
1140: protected void expireMessage(MessageReference messageRef) {
1141: boolean trace = log.isTraceEnabled();
1142: if (trace)
1143: log.trace("message expired: " + messageRef);
1144:
1145: SpyDestination ed = parameters.expiryDestination;
1146: if (ed == null) {
1147: dropMessage(messageRef);
1148: return;
1149: }
1150:
1151: if (trace)
1152: log.trace("sending to: " + ed);
1153:
1154: try {
1155: SpyMessage orig = messageRef.getMessage();
1156: SpyMessage copy = orig.myClone();
1157: copy.header.jmsPropertiesReadWrite = true;
1158: copy.setJMSExpiration(0);
1159: copy.setJMSDestination(ed);
1160: copy.setLongProperty(SpyMessage.PROPERTY_ORIG_EXPIRATION,
1161: orig.getJMSExpiration());
1162: copy.setStringProperty(
1163: SpyMessage.PROPERTY_ORIG_DESTINATION, orig
1164: .getJMSDestination().toString());
1165: TxManager tm = server.getPersistenceManager()
1166: .getTxManager();
1167: Tx tx = tm.createTx();
1168: try {
1169: server.addMessage(null, copy, tx);
1170: dropMessage(messageRef, tx);
1171: tm.commitTx(tx);
1172: } catch (JMSException e) {
1173: tm.rollbackTx(tx);
1174: throw e;
1175: }
1176: } catch (JMSException e) {
1177: log.error("Could not move expired message: " + messageRef,
1178: e);
1179: }
1180: }
1181:
1182: /**
1183: * Check whether a removed subscription can be permenantly removed.
1184: * This method is private because it assumes external synchronization
1185: *
1186: * @param the subscription to check
1187: */
1188: private void checkRemovedSubscribers(Subscription sub) {
1189: boolean trace = log.isTraceEnabled();
1190: if (removedSubscribers.contains(sub)
1191: && hasUnackedMessages(sub) == false) {
1192: if (trace)
1193: log.trace("Removing subscriber " + sub);
1194: removedSubscribers.remove(sub);
1195: subscribers.remove(sub);
1196: ((ClientConsumer) sub.clientConsumer)
1197: .removeRemovedSubscription(sub.subscriptionId);
1198: }
1199: }
1200:
1201: /**
1202: * Check whether a subscription has unacknowledged messages.
1203: * This method is private because it assumes external synchronization
1204: *
1205: * @param sub the subscription to check
1206: * @return true when it has unacknowledged messages
1207: */
1208: private boolean hasUnackedMessages(Subscription sub) {
1209: return unackedBySubscription.containsKey(sub);
1210: }
1211:
1212: /**
1213: * Rollback an add message
1214: */
1215: class AddMessagePostRollBackTask implements Runnable {
1216: MessageReference message;
1217:
1218: AddMessagePostRollBackTask(MessageReference m) {
1219: message = m;
1220: }
1221:
1222: public void run() {
1223: try {
1224: server.getMessageCache().remove(message);
1225: } catch (JMSException e) {
1226: log
1227: .error(
1228: "Could not remove message from the message cache after an add rollback: ",
1229: e);
1230: }
1231: }
1232: }
1233:
1234: /**
1235: * Add a message to the queue
1236: */
1237: class AddMessagePostCommitTask implements Runnable {
1238: MessageReference message;
1239:
1240: AddMessagePostCommitTask(MessageReference m) {
1241: message = m;
1242: }
1243:
1244: public void run() {
1245: internalAddMessage(message);
1246:
1247: // update message counter
1248: if (counter != null) {
1249: counter.incrementCounter();
1250: }
1251: }
1252: }
1253:
1254: /**
1255: * Restore a message to the queue
1256: */
1257: class RestoreMessageTask implements Runnable {
1258: MessageReference message;
1259:
1260: RestoreMessageTask(MessageReference m) {
1261: message = m;
1262: }
1263:
1264: public void run() {
1265: nackMessage(message);
1266: }
1267: }
1268:
1269: /**
1270: * Remove a message
1271: */
1272: class RemoveMessageTask implements Runnable {
1273: MessageReference message;
1274:
1275: RemoveMessageTask(MessageReference m) {
1276: message = m;
1277: }
1278:
1279: public void run() {
1280: try {
1281: clearEvent(message);
1282: server.getMessageCache().remove(message);
1283: } catch (JMSException e) {
1284: log
1285: .error(
1286: "Could not remove an acknowleged message from the message cache: ",
1287: e);
1288: }
1289: }
1290: }
1291:
1292: /**
1293: * Schedele message delivery
1294: */
1295: private class EnqueueMessageTask implements TimeoutTarget {
1296: private MessageReference messageRef;
1297:
1298: public EnqueueMessageTask(MessageReference messageRef) {
1299: this .messageRef = messageRef;
1300: }
1301:
1302: public void timedOut(Timeout timeout) {
1303: if (log.isTraceEnabled())
1304: log.trace("scheduled message delivery: " + messageRef);
1305: events.remove(messageRef);
1306: scheduledMessages.remove(messageRef);
1307: internalAddMessage(messageRef);
1308: }
1309: }
1310:
1311: /**
1312: * Drop a message when it expires
1313: */
1314: private class ExpireMessageTask implements TimeoutTarget, Runnable {
1315: private MessageReference messageRef;
1316:
1317: public ExpireMessageTask(MessageReference messageRef) {
1318: this .messageRef = messageRef;
1319: }
1320:
1321: public void timedOut(Timeout timout) {
1322: events.remove(messageRef);
1323: scheduledMessages.remove(messageRef);
1324: synchronized (messages) {
1325: // If the message was already sent, then do nothing
1326: // (This probably happens more than not)
1327: if (messages.remove(messageRef) == false)
1328: return;
1329: }
1330: expireMessage(messageRef);
1331: }
1332:
1333: public void run() {
1334: expireMessage(messageRef);
1335: }
1336: }
1337:
1338: /**
1339: * Information about unacknowledged messages
1340: */
1341: private static class UnackedMessageInfo {
1342: public MessageReference messageRef;
1343: public Subscription sub;
1344:
1345: public UnackedMessageInfo(MessageReference messageRef,
1346: Subscription sub) {
1347: this.messageRef = messageRef;
1348: this.sub = sub;
1349: }
1350: }
1351: }
|