001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.mq;
023:
024: import java.util.ArrayList;
025: import java.util.LinkedList;
026:
027: import javax.jms.ConnectionConsumer;
028: import javax.jms.Destination;
029: import javax.jms.JMSException;
030: import javax.jms.ServerSession;
031: import javax.jms.ServerSessionPool;
032:
033: import org.jboss.logging.Logger;
034:
035: import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
036:
037: /**
038: * This class implements javax.jms.ConnectionConsumer
039: *
040: * @author Hiram Chirino (Cojonudo14@hotmail.com)
041: * @author <a href="mailto:adrian@jboss.org">Adrian Brock</a>
042: * @version $Revision: 62240 $
043: */
044: public class SpyConnectionConsumer implements ConnectionConsumer,
045: SpyConsumer, Runnable {
046: // Constants -----------------------------------------------------
047:
048: /** The log */
049: static Logger log = Logger.getLogger(SpyConnectionConsumer.class);
050:
051: /** Whether trace is enabled */
052: static boolean trace = log.isTraceEnabled();
053:
054: // Attributes ----------------------------------------------------
055:
056: /** The connection is the consumer was created with */
057: Connection connection;
058: /** The destination this consumer will receive messages from */
059: Destination destination;
060: /** The ServerSessionPool that is implemented by the AS */
061: javax.jms.ServerSessionPool serverSessionPool;
062: /** The maximum number of messages that a single session will be loaded with. */
063: int maxMessages;
064: /** This queue will hold messages until they are dispatched to the
065: MessageListener */
066: LinkedList queue = new LinkedList();
067: /** Is the ConnectionConsumer closed? */
068: boolean closed = false;
069: /** Whether we are waiting for a message */
070: boolean waitingForMessage = false;
071: /** The subscription info the consumer */
072: Subscription subscription = new Subscription();
073: /** The "listening" thread that gets messages from destination and queues
074: them for delivery to sessions */
075: Thread internalThread;
076: /** The thread id */
077: int id;
078: /** The thread id generator */
079: static SynchronizedInt threadId = new SynchronizedInt(0);
080:
081: // Static --------------------------------------------------------
082:
083: // Constructors --------------------------------------------------
084:
085: /**
086: * SpyConnectionConsumer constructor
087: *
088: * @param connection the connection
089: * @param destination destination
090: * @param messageSelector the message selector
091: * @param serverSessionPool the server session pool
092: * @param maxMessages the maxmimum messages
093: * @exception JMSException for any error
094: */
095: public SpyConnectionConsumer(Connection connection,
096: Destination destination, String messageSelector,
097: ServerSessionPool serverSessionPool, int maxMessages)
098: throws JMSException {
099: trace = log.isTraceEnabled();
100:
101: this .connection = connection;
102: this .destination = destination;
103: this .serverSessionPool = serverSessionPool;
104: this .maxMessages = maxMessages;
105: if (this .maxMessages < 1)
106: this .maxMessages = 1;
107:
108: subscription.destination = (SpyDestination) destination;
109: subscription.messageSelector = messageSelector;
110: subscription.noLocal = false;
111:
112: connection.addConsumer(this );
113: id = threadId.increment();
114: internalThread = new Thread(this ,
115: "Connection Consumer for dest " + subscription + " id="
116: + id);
117: internalThread.start();
118:
119: if (trace)
120: log.trace("New " + this );
121: }
122:
123: // Public --------------------------------------------------------
124:
125: /**
126: * Get the subscription
127: *
128: * @return the subscription
129: */
130: public Subscription getSubscription() {
131: return subscription;
132: }
133:
134: /**
135: * Add a message
136: *
137: * @mes the message
138: * @throws JMSException for any error
139: */
140: public void addMessage(SpyMessage mes) throws JMSException {
141: synchronized (queue) {
142: if (closed) {
143: if (trace)
144: log.trace("Consumer close nacking message="
145: + mes.header.jmsMessageID + " " + this );
146: log
147: .warn("NACK issued. The connection consumer was closed.");
148: connection.send(mes.getAcknowledgementRequest(false));
149: return;
150: }
151:
152: if (trace)
153: log.trace("Add message=" + mes.header.jmsMessageID
154: + " " + this );
155:
156: if (waitingForMessage) {
157: queue.addLast(mes);
158: queue.notifyAll();
159: } else {
160: if (trace)
161: log.trace("Consumer not waiting nacking message="
162: + mes.header.jmsMessageID + " " + this );
163: connection.send(mes.getAcknowledgementRequest(false));
164: }
165: }
166: }
167:
168: // ConnectionConsumer implementation -----------------------------
169:
170: public ServerSessionPool getServerSessionPool() throws JMSException {
171: return serverSessionPool;
172: }
173:
174: public void close() throws javax.jms.JMSException {
175: synchronized (queue) {
176: if (closed)
177: return;
178:
179: closed = true;
180: queue.notifyAll();
181: }
182:
183: if (trace)
184: log.trace("Close " + this );
185:
186: if (internalThread != null
187: && !internalThread.equals(Thread.currentThread())) {
188: try {
189:
190: if (trace)
191: log.trace("Joining thread " + this );
192: internalThread.join();
193: } catch (InterruptedException e) {
194: if (trace)
195: log
196: .trace("Ignoring interrupting while joining thread "
197: + this );
198: }
199: }
200: synchronized (queue) {
201: if (trace)
202: log.trace("Nacking messages on queue " + this );
203: try {
204: while (queue.isEmpty() == false) {
205: SpyMessage message = (SpyMessage) queue
206: .removeFirst();
207: connection.send(message
208: .getAcknowledgementRequest(false));
209: }
210: } catch (Throwable ignore) {
211: if (trace)
212: log.trace(
213: "Ignoring error nacking messages in queue "
214: + this , ignore);
215: }
216: try {
217: connection.removeConsumer(this );
218: } catch (Throwable ignore) {
219: if (trace)
220: log.trace(
221: "Ignoring error removing consumer from connection "
222: + this , ignore);
223: }
224: }
225: }
226:
227: // Runnable implementation ---------------------------------------
228:
229: public void run() {
230: ArrayList mesList = new ArrayList();
231: try {
232: outer: while (true) {
233: synchronized (queue) {
234: if (closed) {
235: if (trace)
236: log.trace("run() closed " + this );
237: break outer;
238: }
239: }
240:
241: for (int i = 0; i < maxMessages; i++) {
242: SpyMessage mes = connection.receive(subscription,
243: -1);
244: if (mes == null) {
245: if (trace)
246: log
247: .trace("run() receivedNoWait got no message"
248: + this );
249: break;
250: } else {
251: if (trace)
252: log.trace("run() receivedNoWait message="
253: + mes.header.jmsMessageID + " "
254: + this );
255: mesList.add(mes);
256: }
257: }
258:
259: if (mesList.isEmpty()) {
260: SpyMessage mes = null;
261: synchronized (queue) {
262: mes = connection.receive(subscription, 0);
263: if (mes == null) {
264: waitingForMessage = true;
265: while (queue.isEmpty() && !closed) {
266: if (trace)
267: log
268: .trace("run() waiting for message "
269: + this );
270: try {
271: queue.wait();
272: } catch (InterruptedException e) {
273: if (trace)
274: log.trace(
275: "Ignoring interruption waiting for message "
276: + this , e);
277: }
278: }
279: if (closed) {
280: if (trace)
281: log
282: .trace("run() closed while waiting "
283: + this );
284: waitingForMessage = false;
285: break outer;
286: }
287: mes = (SpyMessage) queue.removeFirst();
288: waitingForMessage = false;
289: if (trace)
290: log.trace("run() got message message="
291: + mes.header.jmsMessageID + " "
292: + this );
293: }
294: }
295: mesList.add(mes);
296: }
297:
298: if (trace)
299: log.trace("Waiting for serverSesionPool " + this );
300: ServerSession serverSession = serverSessionPool
301: .getServerSession();
302: SpySession spySession = (SpySession) serverSession
303: .getSession();
304: if (trace)
305: log.trace("Waited serverSesion=" + serverSession
306: + " session=" + spySession + " " + this );
307:
308: if (spySession.sessionConsumer == null) {
309: if (trace)
310: log
311: .trace("Session did not have a set MessageListner "
312: + spySession + " " + this );
313: } else {
314: spySession.sessionConsumer.subscription = subscription;
315: }
316:
317: for (int i = 0; i < mesList.size(); i++)
318: spySession.addMessage((SpyMessage) mesList.get(i));
319:
320: if (trace)
321: log.trace(" Starting the ServerSession="
322: + serverSession + " " + this );
323: serverSession.start();
324: mesList.clear();
325: }
326: } catch (Throwable t) {
327: try {
328: for (int i = 0; i < mesList.size(); i++) {
329: SpyMessage msg = (SpyMessage) mesList.get(i);
330: connection.send(msg
331: .getAcknowledgementRequest(false));
332: }
333: } catch (Throwable ignore) {
334: if (trace)
335: log.trace("Ignoring error nacking message " + this ,
336: ignore);
337: }
338: try {
339: close();
340: } catch (Throwable ignore) {
341: if (trace)
342: log.trace("Ignoring error during close " + this ,
343: ignore);
344: }
345:
346: connection.asynchFailure(
347: "Connection consumer closing due to error in listening thread"
348: + this , t);
349: }
350: }
351:
352: // Object overrides ----------------------------------------------
353:
354: public String toString() {
355: StringBuffer buffer = new StringBuffer(100);
356: buffer.append("SpyConnectionConsumer[sub=")
357: .append(subscription);
358: if (closed)
359: buffer.append(" CLOSED");
360: buffer.append(" messages=").append(queue.size());
361: buffer.append(" waitingForMessage=").append(waitingForMessage);
362: if (internalThread != null)
363: buffer.append(" internalThread=").append(internalThread);
364: buffer.append(" sessionPool=").append(serverSessionPool);
365: buffer.append(" connection=").append(connection);
366: buffer.append(']');
367: return buffer.toString();
368: }
369:
370: // Package protected ---------------------------------------------
371:
372: // Protected -----------------------------------------------------
373:
374: // Private -------------------------------------------------------
375:
376: // Inner classes -------------------------------------------------
377: }
|