001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.mq.server;
023:
024: import javax.jms.JMSException;
025:
026: import org.jboss.logging.Logger;
027: import org.jboss.mq.SpyDestination;
028: import org.jboss.mq.SpyMessage;
029: import org.jboss.mq.Subscription;
030: import org.jboss.mq.pm.Tx;
031:
032: /**
033: * This class is a message queue which is stored (hashed by Destination) on the
034: * JMS provider
035: *
036: * @author Norbert Lataille (Norbert.Lataille@m4x.org)
037: * @author Hiram Chirino (Cojonudo14@hotmail.com)
038: * @author David Maplesden (David.Maplesden@orion.co.nz)
039: * @author Adrian Brock (adrian@jboss.com)
040: * @created August 16, 2001
041: * @version $Revision: 57198 $
042: */
043: public abstract class JMSDestination {
044: //the Destination of this queue
045: SpyDestination destination;
046: //If this is a temporaryDestination, temporaryDestination=ClientConsumer of the owner, otherwise it's null
047: ClientConsumer temporaryDestination;
048: //The JMSServer object
049: JMSDestinationManager server;
050:
051: //Counter used to number incomming messages. (Used to order the messages.)
052: long nextMessageIdCounter = 0;
053: Object nextMessageIdLock = new Object();
054:
055: static long nextSharedMessageIdCounter = 0;
056: static Object nextSharedMessageIdLock = new Object();
057:
058: /** The basic queue parameters */
059: public BasicQueueParameters parameters;
060:
061: static Logger cat = Logger.getLogger(JMSDestination.class);
062:
063: JMSDestination(SpyDestination dest, ClientConsumer temporary,
064: JMSDestinationManager server,
065: BasicQueueParameters parameters) throws JMSException {
066: destination = dest;
067: temporaryDestination = temporary;
068: this .server = server;
069: this .parameters = parameters;
070: }
071:
072: public SpyDestination getSpyDestination() {
073: return destination;
074: }
075:
076: public abstract void addSubscriber(Subscription sub)
077: throws JMSException;
078:
079: public abstract void removeSubscriber(Subscription sub)
080: throws JMSException;
081:
082: public abstract void nackMessages(Subscription sub)
083: throws JMSException;
084:
085: public abstract SpyMessage receive(Subscription sub, boolean wait)
086: throws JMSException;
087:
088: public abstract void addReceiver(Subscription sub)
089: throws JMSException;
090:
091: public abstract void removeReceiver(Subscription sub);
092:
093: public abstract void restoreMessage(MessageReference message);
094:
095: public void restoreMessage(SpyMessage message) {
096: restoreMessage(message, null, Tx.UNKNOWN);
097: }
098:
099: /**
100: * Restore a message
101: *
102: * @param message the message
103: * @param tx any transaction
104: * @param type the type of restoration
105: */
106: public abstract void restoreMessage(SpyMessage message, Tx tx,
107: int type);
108:
109: public abstract boolean isInUse();
110:
111: public abstract void close() throws JMSException;
112:
113: public abstract void removeAllMessages() throws JMSException;
114:
115: /**
116: * @param req org.jboss.mq.AcknowledgementRequest
117: * @param sub org.jboss.mq.Subscription
118: * @param txId org.jboss.mq.pm.Tx
119: * @exception javax.jms.JMSException The exception description.
120: */
121: public abstract void acknowledge(
122: org.jboss.mq.AcknowledgementRequest req,
123: org.jboss.mq.Subscription sub, org.jboss.mq.pm.Tx txId)
124: throws javax.jms.JMSException;
125:
126: /**
127: * @param mes org.jboss.mq.SpyMessage
128: * @param txId org.jboss.mq.pm.Tx
129: * @exception javax.jms.JMSException The exception description.
130: */
131: public abstract void addMessage(org.jboss.mq.SpyMessage mes,
132: org.jboss.mq.pm.Tx txId) throws javax.jms.JMSException;
133:
134: public abstract MessageCounter[] getMessageCounter();
135:
136: protected static long nextSharedMessageId() {
137: synchronized (nextSharedMessageIdLock) {
138: return nextSharedMessageIdCounter++;
139: }
140: }
141:
142: protected static void updateSharedNextMessageId(SpyMessage message) {
143: synchronized (nextSharedMessageIdLock) {
144: nextSharedMessageIdCounter = Math.max(
145: nextSharedMessageIdCounter,
146: message.header.messageId + 1);
147: }
148: }
149:
150: protected long nextMessageId() {
151: if (parameters.lateClone)
152: return nextSharedMessageId();
153:
154: synchronized (nextMessageIdLock) {
155: return nextMessageIdCounter++;
156: }
157: }
158:
159: protected void updateNextMessageId(SpyMessage message) {
160: if (parameters.lateClone) {
161: updateSharedNextMessageId(message);
162: return;
163: }
164:
165: synchronized (nextMessageIdLock) {
166: nextMessageIdCounter = Math.max(nextMessageIdCounter,
167: message.header.messageId + 1);
168: }
169: }
170: }
|