001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.test.cts.ejb;
023:
024: import javax.ejb.MessageDrivenBean;
025: import javax.ejb.MessageDrivenContext;
026: import javax.ejb.EJBException;
027: import javax.jms.MessageListener;
028: import javax.jms.QueueConnection;
029: import javax.jms.QueueSession;
030: import javax.jms.QueueSender;
031: import javax.jms.QueueConnectionFactory;
032: import javax.jms.Queue;
033: import javax.jms.Message;
034: import javax.jms.TextMessage;
035: import javax.jms.JMSException;
036: import javax.naming.InitialContext;
037: import org.jboss.logging.Logger;
038:
039: /** An MDB that validates that no more than maxActiveCount MDB instances
040: are active in the onMessage method.
041:
042: @author Scott.Stark@jboss.org
043: @version $Revision: 57211 $
044: */
045: public class StrictlyPooledMDB implements MessageDrivenBean,
046: MessageListener {
047: private static Logger log = Logger
048: .getLogger(StrictlyPooledMDB.class);
049: /** The class wide max count of instances allows */
050: private static int maxActiveCount = 5;
051: /** The class wide count of instances active in business code */
052: private static int activeCount;
053:
054: private MessageDrivenContext ctx = null;
055: private QueueConnection queConn;
056: private QueueSession session;
057: private QueueSender sender;
058:
059: private static synchronized int incActiveCount() {
060: return activeCount++;
061: }
062:
063: private static synchronized int decActiveCount() {
064: return activeCount--;
065: }
066:
067: public void setMessageDrivenContext(MessageDrivenContext ctx)
068: throws EJBException {
069: this .ctx = ctx;
070: try {
071: InitialContext iniCtx = new InitialContext();
072: Integer i = (Integer) iniCtx
073: .lookup("java:comp/env/maxActiveCount");
074: maxActiveCount = i.intValue();
075: QueueConnectionFactory factory = (QueueConnectionFactory) iniCtx
076: .lookup("java:/ConnectionFactory");
077: queConn = factory.createQueueConnection();
078: session = queConn.createQueueSession(false,
079: QueueSession.AUTO_ACKNOWLEDGE);
080: Queue queue = (Queue) iniCtx.lookup("queue/B");
081: sender = session.createSender(queue);
082: } catch (Exception e) {
083: log.error("Setup failure", e);
084: throw new EJBException("Setup failure", e);
085: }
086: }
087:
088: public void ejbCreate() {
089: }
090:
091: public void ejbRemove() {
092: try {
093: if (sender != null)
094: sender.close();
095: if (session != null)
096: session.close();
097: if (queConn != null)
098: queConn.close();
099: } catch (Exception e) {
100: log.error("Failed to close JMS resources", e);
101: }
102: }
103:
104: public void onMessage(Message message) {
105: int count = incActiveCount();
106: log.debug("Begin onMessage, activeCount=" + count + ", ctx="
107: + ctx);
108: try {
109: Message reply = null;
110: if (count > maxActiveCount) {
111: String msg = "IllegalState, activeCount > maxActiveCount, "
112: + count + " > " + maxActiveCount;
113: // Send an exception
114: Exception e = new IllegalStateException(msg);
115: reply = session.createObjectMessage(e);
116: } else {
117: TextMessage tm = (TextMessage) message;
118: // Send an ack
119: reply = session.createTextMessage("Recevied msg="
120: + tm.getText());
121: }
122: Thread.currentThread().sleep(1000);
123: sender.send(reply);
124: } catch (JMSException e) {
125: log.error("Failed to send error message", e);
126: } catch (InterruptedException e) {
127: } finally {
128: count = decActiveCount();
129: log.debug("End onMessage, activeCount=" + count + ", ctx="
130: + ctx);
131: }
132: }
133: }
|