001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.mq.server;
023:
024: import java.util.ArrayList;
025:
026: import javax.jms.JMSException;
027:
028: import org.jboss.mq.SpyDestination;
029: import org.jboss.mq.SpyJMSException;
030: import org.jboss.mq.SpyMessage;
031: import org.jboss.mq.Subscription;
032: import org.jboss.mq.pm.Tx;
033:
034: /**
035: * This class is a message queue which is stored (hashed by Destination) on the
036: * JMS provider
037: *
038: * @author Norbert Lataille (Norbert.Lataille@m4x.org)
039: * @author Hiram Chirino (Cojonudo14@hotmail.com)
040: * @author David Maplesden (David.Maplesden@orion.co.nz)
041: * @author Adrian Brock (adrian@jboss.com)
042: * @created August 16, 2001
043: * @version $Revision: 57198 $
044: */
045: public class JMSQueue extends JMSDestination {
046:
047: public BasicQueue queue;
048:
049: public JMSQueue(SpyDestination dest, ClientConsumer temporary,
050: JMSDestinationManager server,
051: BasicQueueParameters parameters) throws JMSException {
052: super (dest, temporary, server, parameters);
053:
054: // If this is a non-temp queue, then we should persist data
055: if (temporaryDestination == null) {
056: Throwable error = null;
057: for (int i = 0; i <= parameters.recoveryRetries; ++i) {
058: // create queue
059: queue = new PersistentQueue(server, dest, parameters);
060:
061: try {
062: // restore persistent queue data
063: server.getPersistenceManager().restoreQueue(this ,
064: dest);
065:
066: // done
067: break;
068: } catch (Throwable t) {
069: if (i < parameters.recoveryRetries)
070: cat.warn("Error restoring queue " + queue
071: + " retries=" + i + " of "
072: + parameters.recoveryRetries, t);
073: else
074: error = t;
075: try {
076: queue.stop();
077: } catch (Throwable ignored) {
078: cat.trace("Ignored error stopping queue "
079: + queue, ignored);
080: } finally {
081: queue = null;
082: }
083: }
084: }
085:
086: if (error != null)
087: SpyJMSException.rethrowAsJMSException(
088: "Unable to recover queue " + dest + " retries="
089: + parameters.recoveryRetries, error);
090: } else {
091: // create queue
092: queue = new BasicQueue(server, destination.toString(),
093: parameters);
094: }
095:
096: // create queue message counter
097: queue.createMessageCounter(dest.getName(), null, false, false,
098: parameters.messageCounterHistoryDayLimit);
099: }
100:
101: public void addSubscriber(Subscription sub) throws JMSException {
102: queue.addSubscriber(sub);
103: }
104:
105: public void removeSubscriber(Subscription sub) {
106: queue.removeSubscriber(sub);
107: }
108:
109: public void nackMessages(Subscription sub) {
110: queue.nackMessages(sub);
111: }
112:
113: public void addReceiver(Subscription sub) throws JMSException {
114: queue.addReceiver(sub);
115: }
116:
117: public void removeReceiver(Subscription sub) {
118: queue.removeReceiver(sub);
119: }
120:
121: public void restoreMessage(MessageReference messageRef) {
122: try {
123: SpyMessage spyMessage = messageRef.getMessage();
124: updateNextMessageId(spyMessage);
125: messageRef.queue = queue;
126: queue.restoreMessage(messageRef);
127: } catch (JMSException e) {
128: cat.error("Could not restore message:", e);
129: }
130: }
131:
132: public void restoreMessage(SpyMessage message, Tx txid, int type) {
133: try {
134: updateNextMessageId(message);
135: MessageReference messageRef = server.getMessageCache().add(
136: message, queue, MessageReference.STORED);
137: queue.restoreMessage(messageRef, txid, type);
138: } catch (JMSException e) {
139: cat.error("Could not restore message:", e);
140: }
141: }
142:
143: public SpyMessage[] browse(String selector) throws JMSException {
144: return queue.browse(selector);
145: }
146:
147: public String toString() {
148: return "JMSDestination:" + destination;
149: }
150:
151: public void acknowledge(org.jboss.mq.AcknowledgementRequest req,
152: Subscription sub, org.jboss.mq.pm.Tx txId)
153: throws JMSException {
154: queue.acknowledge(req, txId);
155: }
156:
157: public void addMessage(SpyMessage mes, org.jboss.mq.pm.Tx txId)
158: throws JMSException {
159: //Number the message so that we can preserve order of delivery.
160: mes.header.messageId = nextMessageId();
161: MessageReference message = server.getMessageCache().add(mes,
162: queue, MessageReference.NOT_STORED);
163: queue.addMessage(message, txId);
164: }
165:
166: public org.jboss.mq.SpyMessage receive(
167: org.jboss.mq.Subscription sub, boolean wait)
168: throws javax.jms.JMSException {
169: return queue.receive(sub, wait);
170: }
171:
172: /*
173: * @see JMSDestination#isInUse()
174: */
175: public boolean isInUse() {
176: return queue.isInUse();
177: }
178:
179: /*
180: * @see JMSDestination#close()
181: */
182: public void close() throws JMSException {
183: queue.stop();
184: server.getPersistenceManager().closeQueue(this ,
185: getSpyDestination());
186: }
187:
188: /**
189: * @see JMSDestination#destroy()
190: */
191: public void removeAllMessages() throws JMSException {
192: queue.removeAllMessages();
193: }
194:
195: /**
196: * Get message counter of internal queue
197: *
198: * @return MessageCounter[] internal queue message counter
199: */
200: public MessageCounter[] getMessageCounter() {
201: ArrayList array = new ArrayList();
202:
203: MessageCounter counter = queue.getMessageCounter();
204:
205: if (counter != null)
206: array.add(counter);
207:
208: return (MessageCounter[]) array.toArray(new MessageCounter[0]);
209: }
210: }
|