001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.mq.server;
023:
024: import java.util.HashMap;
025: import java.util.Iterator;
026: import java.util.LinkedList;
027:
028: import javax.jms.InvalidDestinationException;
029: import javax.jms.JMSException;
030:
031: import org.jboss.logging.Logger;
032: import org.jboss.util.threadpool.ThreadPool;
033: import org.jboss.mq.AcknowledgementRequest;
034: import org.jboss.mq.ConnectionToken;
035: import org.jboss.mq.ReceiveRequest;
036: import org.jboss.mq.SpyMessage;
037: import org.jboss.mq.Subscription;
038:
039: /**
040: * This represent the clients queue which consumes messages from the
041: * destinations on the provider.
042: *
043: * @author Hiram Chirino (Cojonudo14@hotmail.com)
044: * @author <a href="mailto:pra@tim.se">Peter Antman</a>
045: * @created August 16, 2001
046: * @version $Revision: 57198 $
047: */
048: public class ClientConsumer implements Runnable {
049: private static Logger log = Logger.getLogger(ClientConsumer.class);
050: //The JMSServer object
051: JMSDestinationManager server;
052: //The connection this queue will send messages over
053: ConnectionToken connectionToken;
054: //Is this connection enabled (Can we transmit to the receiver)
055: boolean enabled;
056: //Has this connection been closed?
057: boolean closed = false;
058: //Maps a subscription id to a Subscription
059: HashMap subscriptions = new HashMap();
060: //Maps a subscription id to a Subscription for subscriptions that have finished receiving
061: HashMap removedSubscriptions = new HashMap();
062:
063: LinkedList blockedSubscriptions = new LinkedList();
064:
065: //List of messages waiting to be transmitted to the client
066: private LinkedList messages = new LinkedList();
067:
068: /**
069: * Flags that I am enqueued as work on my thread pool.
070: */
071: private boolean enqueued = false;
072:
073: // Static ---------------------------------------------------
074:
075: /**
076: * The {@link org.jboss.util.threadpool.ThreadPool} that
077: * does the actual message pushing for us.
078: */
079: private ThreadPool threadPool = null;
080:
081: // Constructor ---------------------------------------------------
082:
083: public ClientConsumer(JMSDestinationManager server,
084: ConnectionToken connectionToken) throws JMSException {
085: this .server = server;
086: this .connectionToken = connectionToken;
087: this .threadPool = server.getThreadPool();
088: }
089:
090: public void setEnabled(boolean enabled) throws JMSException {
091: if (log.isTraceEnabled())
092: log.trace("" + this + "->setEnabled(enabled=" + enabled
093: + ")");
094:
095: // queues might be waiting for messages.
096: synchronized (blockedSubscriptions) {
097: this .enabled = enabled;
098: if (enabled) {
099: for (Iterator it = blockedSubscriptions.iterator(); it
100: .hasNext();) {
101: Subscription sub = (Subscription) it.next();
102: JMSDestination dest = server
103: .getJMSDestination(sub.destination);
104: if (dest != null)
105: dest.addReceiver(sub);
106: }
107: blockedSubscriptions.clear();
108: }
109: }
110: }
111:
112: public void queueMessageForSending(RoutedMessage r) {
113:
114: synchronized (messages) {
115: if (closed)
116: return; // Wouldn't be delivered anyway
117:
118: messages.add(r);
119: if (!enqueued) {
120: threadPool.run(this );
121: enqueued = true;
122: }
123: }
124: }
125:
126: public void addSubscription(Subscription req) throws JMSException {
127: if (log.isTraceEnabled())
128: log.trace("Adding subscription for: " + req);
129: req.connectionToken = connectionToken;
130: req.clientConsumer = this ;
131:
132: JMSDestination jmsdest = server
133: .getJMSDestination(req.destination);
134: if (jmsdest == null)
135: throw new InvalidDestinationException("The destination "
136: + req.destination + " does not exist !");
137:
138: jmsdest.addSubscriber(req);
139:
140: synchronized (subscriptions) {
141: subscriptions.put(new Integer(req.subscriptionId), req);
142: }
143: }
144:
145: public void close() {
146: boolean trace = log.isTraceEnabled();
147: if (trace)
148: log.trace("" + this + "->close()");
149:
150: synchronized (messages) {
151: closed = true;
152: if (enqueued) {
153: enqueued = false;
154: }
155: messages.clear();
156: }
157:
158: // Remove all the subscriptions for this client
159: HashMap subscriptionsClone = null;
160: synchronized (subscriptions) {
161: subscriptionsClone = (HashMap) subscriptions.clone();
162: }
163: Iterator i = subscriptionsClone.keySet().iterator();
164: while (i.hasNext()) {
165: Integer subscriptionId = (Integer) i.next();
166: try {
167: removeSubscription(subscriptionId.intValue());
168: } catch (JMSException ignore) {
169: }
170: }
171:
172: // Nack the removed subscriptions, the connection is gone
173: HashMap removedSubsClone = null;
174: synchronized (subscriptions) {
175: removedSubsClone = (HashMap) removedSubscriptions.clone();
176: }
177: i = removedSubsClone.values().iterator();
178: while (i.hasNext()) {
179: Subscription removed = (Subscription) i.next();
180: JMSDestination queue = server
181: .getJMSDestination(removed.destination);
182: if (queue == null)
183: log
184: .warn("The subscription was registered with a destination that does not exist: "
185: + removed);
186: else {
187: try {
188: queue.nackMessages(removed);
189: } catch (JMSException e) {
190: log.warn("Unable to nack removed subscription: "
191: + removed, e);
192: }
193: }
194: // Make sure the subscription is gone
195: removeRemovedSubscription(removed.subscriptionId);
196: }
197: }
198:
199: public SpyMessage receive(int subscriberId, long wait)
200: throws JMSException {
201: Subscription req = getSubscription(subscriberId);
202: if (req == null) {
203: throw new JMSException(
204: "The provided subscription does not exist");
205: }
206:
207: JMSDestination queue = server
208: .getJMSDestination(req.destination);
209: if (queue == null)
210: throw new InvalidDestinationException(
211: "The subscription's destination " + req.destination
212: + " does not exist");
213:
214: // Block the receiver if we are not enabled and it is not noWait, otherwise receive a message
215: if (addBlockedSubscription(req, wait))
216: return queue.receive(req, (wait != -1));
217:
218: return null;
219: }
220:
221: public void removeSubscription(int subscriptionId)
222: throws JMSException {
223: if (log.isTraceEnabled())
224: log.trace("" + this + "->removeSubscription(subscriberId="
225: + subscriptionId + ")");
226:
227: Integer subId = new Integer(subscriptionId);
228: Subscription req;
229: synchronized (subscriptions) {
230: req = (Subscription) subscriptions.remove(subId);
231: if (req != null)
232: removedSubscriptions.put(subId, req);
233: }
234:
235: if (req == null)
236: throw new JMSException(
237: "The subscription had not been previously registered");
238:
239: JMSDestination queue = server
240: .getPossiblyClosingJMSDestination(req.destination);
241: if (queue == null)
242: throw new InvalidDestinationException(
243: "The subscription was registered with a destination that does not exist !");
244:
245: queue.removeSubscriber(req);
246:
247: }
248:
249: /**
250: * Push some messages.
251: */
252: public void run() {
253: try {
254:
255: ReceiveRequest[] job;
256:
257: synchronized (messages) {
258: if (closed)
259: return;
260:
261: job = new ReceiveRequest[messages.size()];
262: Iterator iter = messages.iterator();
263: for (int i = 0; iter.hasNext(); i++) {
264: RoutedMessage rm = (RoutedMessage) iter.next();
265: job[i] = rm.toReceiveRequest();
266: iter.remove();
267: }
268: enqueued = false;
269: }
270:
271: connectionToken.clientIL.receive(job);
272:
273: } catch (Throwable t) {
274: synchronized (messages) {
275: if (closed)
276: log.warn("Could not send messages to a receiver.",
277: t);
278: else
279: log
280: .trace(
281: "Could not send messages to a receiver. It is closed.",
282: t);
283: }
284: try {
285: server.connectionFailure(connectionToken);
286: } catch (Throwable ignore) {
287: log.warn("Could not close the client connection..",
288: ignore);
289: }
290: }
291: }
292:
293: public String toString() {
294: return "ClientConsumer:" + connectionToken.getClientID();
295: }
296:
297: public void acknowledge(AcknowledgementRequest request,
298: org.jboss.mq.pm.Tx txId) throws JMSException {
299: Subscription sub = retrieveSubscription(request.subscriberId);
300:
301: if (sub == null) {
302: //might be in removed subscriptions
303: synchronized (subscriptions) {
304: sub = (Subscription) removedSubscriptions
305: .get(new Integer(request.subscriberId));
306: }
307: }
308:
309: if (sub == null) {
310: throw new JMSException(
311: "The provided subscription does not exist");
312: }
313:
314: JMSDestination queue = server
315: .getJMSDestination(sub.destination);
316: if (queue == null)
317: throw new InvalidDestinationException(
318: "The subscription's destination " + sub.destination
319: + " does not exist");
320:
321: queue.acknowledge(request, sub, txId);
322: }
323:
324: boolean addBlockedSubscription(Subscription sub, long wait) {
325: synchronized (blockedSubscriptions) {
326: if (enabled == false && wait != -1)
327: blockedSubscriptions.add(sub);
328: return enabled;
329: }
330: }
331:
332: void removeRemovedSubscription(int subId) {
333: Subscription sub = null;
334: synchronized (subscriptions) {
335: sub = (Subscription) removedSubscriptions
336: .remove(new Integer(subId));
337: }
338: if (sub != null) {
339: JMSDestination topic = server
340: .getPossiblyClosingJMSDestination(sub.destination);
341: if (topic != null && topic instanceof JMSTopic)
342: ((JMSTopic) topic).cleanupSubscription(sub);
343: }
344: }
345:
346: /**
347: * Get a subscription for the subscriberid
348: *
349: * @exception JMSException if it can not find the subscription.
350: */
351: public Subscription getSubscription(int subscriberId)
352: throws JMSException {
353: Subscription req = retrieveSubscription(subscriberId);
354: if (req == null)
355: throw new JMSException(
356: "The provided subscription does not exist");
357:
358: return req;
359: }
360:
361: /**
362: * Retrieve a subscription for the subscriberid
363: */
364: private Subscription retrieveSubscription(int subscriberId)
365: throws JMSException {
366: Integer id = new Integer(subscriberId);
367: synchronized (subscriptions) {
368: return (Subscription) subscriptions.get(id);
369: }
370: }
371: }
|