0001: /*
0002: * JBoss, Home of Professional Open Source.
0003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
0004: * as indicated by the @author tags. See the copyright.txt file in the
0005: * distribution for a full listing of individual contributors.
0006: *
0007: * This is free software; you can redistribute it and/or modify it
0008: * under the terms of the GNU Lesser General Public License as
0009: * published by the Free Software Foundation; either version 2.1 of
0010: * the License, or (at your option) any later version.
0011: *
0012: * This software is distributed in the hope that it will be useful,
0013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
0014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
0015: * Lesser General Public License for more details.
0016: *
0017: * You should have received a copy of the GNU Lesser General Public
0018: * License along with this software; if not, write to the Free
0019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
0020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
0021: */
0022: package org.jboss.ejb.plugins.jms;
0023:
0024: import java.lang.reflect.Method;
0025: import java.security.AccessController;
0026: import java.security.Principal;
0027: import java.security.PrivilegedAction;
0028: import java.util.Collection;
0029:
0030: import javax.ejb.EJBMetaData;
0031: import javax.jms.Connection;
0032: import javax.jms.ConnectionConsumer;
0033: import javax.jms.Destination;
0034: import javax.jms.ExceptionListener;
0035: import javax.jms.JMSException;
0036: import javax.jms.Message;
0037: import javax.jms.MessageListener;
0038: import javax.jms.Queue;
0039: import javax.jms.QueueConnection;
0040: import javax.jms.ServerSessionPool;
0041: import javax.jms.Topic;
0042: import javax.jms.TopicConnection;
0043: import javax.management.MBeanServer;
0044: import javax.management.Notification;
0045: import javax.management.ObjectName;
0046: import javax.naming.Context;
0047: import javax.naming.InitialContext;
0048: import javax.naming.NamingException;
0049: import javax.transaction.Transaction;
0050: import javax.transaction.TransactionManager;
0051:
0052: import org.jboss.deployment.DeploymentException;
0053: import org.jboss.ejb.Container;
0054: import org.jboss.ejb.EJBProxyFactory;
0055: import org.jboss.invocation.Invocation;
0056: import org.jboss.invocation.InvocationType;
0057: import org.jboss.jms.ConnectionFactoryHelper;
0058: import org.jboss.jms.asf.ServerSessionPoolFactory;
0059: import org.jboss.jms.asf.StdServerSessionPool;
0060: import org.jboss.jms.jndi.JMSProviderAdapter;
0061: import org.jboss.logging.Logger;
0062: import org.jboss.metadata.ActivationConfigPropertyMetaData;
0063: import org.jboss.metadata.InvokerProxyBindingMetaData;
0064: import org.jboss.metadata.MessageDestinationMetaData;
0065: import org.jboss.metadata.MessageDrivenMetaData;
0066: import org.jboss.metadata.MetaData;
0067: import org.jboss.system.ServiceMBeanSupport;
0068: import org.w3c.dom.Element;
0069:
0070: /**
0071: * EJBProxyFactory for JMS MessageDrivenBeans
0072: *
0073: * @author <a href="mailto:peter.antman@tim.se">Peter Antman</a> .
0074: * @author <a href="mailto:rickard.oberg@telkel.com">Rickard Oberg</a>
0075: * @author <a href="mailto:sebastien.alborini@m4x.org">Sebastien Alborini</a>
0076: * @author <a href="mailto:marc.fleury@telkel.com">Marc Fleury</a>
0077: * @author <a href="mailto:jason@planet57.com">Jason Dillon</a>
0078: * @author <a href="mailto:adrian@jboss.com">Adrian Brock</a>
0079: * @version <tt>$Revision: 57506 $</tt>
0080: */
0081: public class JMSContainerInvoker extends ServiceMBeanSupport implements
0082: EJBProxyFactory, JMSContainerInvokerMBean {
0083: /** The logger */
0084: private static final Logger log = Logger
0085: .getLogger(JMSContainerInvoker.class);
0086:
0087: /** Notification sent before connectioning */
0088: private static final String CONNECTING_NOTIFICATION = "org.jboss.ejb.plugins.jms.CONNECTING";
0089:
0090: /** Notification sent after connection */
0091: private static final String CONNECTED_NOTIFICATION = "org.jboss.ejb.plugins.jms.CONNECTED";
0092:
0093: /** Notification sent before disconnection */
0094: private static final String DISCONNECTING_NOTIFICATION = "org.jboss.ejb.plugins.jms.DISCONNECTING";
0095:
0096: /** Notification sent before disconnected */
0097: private static final String DISCONNECTED_NOTIFICATION = "org.jboss.ejb.plugins.jms.DISCONNECTED";
0098:
0099: /** Notification sent at connection failure */
0100: private static final String FAILURE_NOTIFICATION = "org.jboss.ejb.plugins.jms.FAILURE";
0101:
0102: /** {@link MessageListener#onMessage} reference. */
0103: protected static Method ON_MESSAGE;
0104:
0105: /**
0106: * Default destination type. Used when no message-driven-destination is given
0107: * in ejb-jar, and a lookup of destinationJNDI from jboss.xml is not
0108: * successfull. Default value: javax.jms.Topic.
0109: */
0110: protected final static String DEFAULT_DESTINATION_TYPE = "javax.jms.Topic";
0111:
0112: /**
0113: * Initialize the ON_MESSAGE reference.
0114: */
0115: static {
0116: try {
0117: final Class type = MessageListener.class;
0118: final Class arg = Message.class;
0119: ON_MESSAGE = type.getMethod("onMessage",
0120: new Class[] { arg });
0121: } catch (Exception e) {
0122: throw new ExceptionInInitializerError(e);
0123: }
0124: }
0125:
0126: protected boolean optimize;
0127:
0128: /** Maximum number provider is allowed to stuff into a session. */
0129: protected int maxMessagesNr = 1;
0130:
0131: /** Minimun pool size of server sessions. */
0132: protected int minPoolSize = 1;
0133:
0134: /** Keep alive server sessions. */
0135: protected long keepAlive = 30 * 1000;
0136:
0137: /** Maximun pool size of server sessions. */
0138: protected int maxPoolSize = 15;
0139:
0140: /** Time to wait before retrying to reconnect a lost connection. */
0141: protected long reconnectInterval = 10000;
0142:
0143: /** If Dead letter queue should be used or not. */
0144: protected boolean useDLQ = false;
0145:
0146: /**
0147: * JNDI name of the provider adapter.
0148: *
0149: * @see org.jboss.jms.jndi.JMSProviderAdapter
0150: */
0151: protected String providerAdapterJNDI;
0152:
0153: /**
0154: * JNDI name of the server session factory.
0155: *
0156: * @see org.jboss.jms.asf.ServerSessionPoolFactory
0157: */
0158: protected String serverSessionPoolFactoryJNDI;
0159:
0160: /** JMS acknowledge mode, used when session is not XA. */
0161: protected int acknowledgeMode;
0162:
0163: protected boolean isContainerManagedTx;
0164: protected boolean isNotSupportedTx;
0165:
0166: /** The container. */
0167: protected Container container;
0168:
0169: /** The JMS connection. */
0170: protected Connection connection;
0171:
0172: /** The JMS connection consumer. */
0173: protected ConnectionConsumer connectionConsumer;
0174:
0175: protected TransactionManager tm;
0176: protected ServerSessionPool pool;
0177: protected ExceptionListenerImpl exListener;
0178:
0179: /** Dead letter queue handler. */
0180: protected DLQHandler dlqHandler;
0181:
0182: /** DLQConfig element from MDBConfig element from jboss.xml. */
0183: protected Element dlqConfig;
0184:
0185: protected InvokerProxyBindingMetaData invokerMetaData;
0186: protected String invokerBinding;
0187:
0188: protected boolean deliveryActive = true;
0189:
0190: protected boolean createJBossMQDestination = true;
0191:
0192: /**
0193: * Set the invoker meta data so that the ProxyFactory can initialize
0194: * properly
0195: */
0196: public void setInvokerMetaData(InvokerProxyBindingMetaData imd) {
0197: invokerMetaData = imd;
0198: }
0199:
0200: /**
0201: * Set the invoker jndi binding
0202: */
0203: public void setInvokerBinding(String binding) {
0204: invokerBinding = binding;
0205: }
0206:
0207: /**
0208: * Set the container for which this is an invoker to.
0209: *
0210: * @param container The container for which this is an invoker to.
0211: */
0212: public void setContainer(final Container container) {
0213: this .container = container;
0214: }
0215:
0216: public int getMinPoolSize() {
0217: return minPoolSize;
0218: }
0219:
0220: public void setMinPoolSize(int minPoolSize) {
0221: this .minPoolSize = minPoolSize;
0222: }
0223:
0224: public int getMaxPoolSize() {
0225: return maxPoolSize;
0226: }
0227:
0228: public void setMaxPoolSize(int maxPoolSize) {
0229: this .maxPoolSize = maxPoolSize;
0230: }
0231:
0232: public long getKeepAliveMillis() {
0233: return keepAlive;
0234: }
0235:
0236: public void setKeepAliveMillis(long keepAlive) {
0237: this .keepAlive = keepAlive;
0238: }
0239:
0240: public int getMaxMessages() {
0241: return maxMessagesNr;
0242: }
0243:
0244: public void setMaxMessages(int maxMessages) {
0245: this .maxMessagesNr = maxMessages;
0246: }
0247:
0248: public MessageDrivenMetaData getMetaData() {
0249: MessageDrivenMetaData config = (MessageDrivenMetaData) container
0250: .getBeanMetaData();
0251: return config;
0252: }
0253:
0254: public boolean getDeliveryActive() {
0255: return deliveryActive;
0256: }
0257:
0258: public boolean getCreateJBossMQDestination() {
0259: return createJBossMQDestination;
0260: }
0261:
0262: public void startDelivery() throws Exception {
0263: if (getState() != STARTED)
0264: throw new IllegalStateException("The MDB is not started");
0265: if (deliveryActive)
0266: return;
0267: deliveryActive = true;
0268: startService();
0269: }
0270:
0271: public void stopDelivery() throws Exception {
0272: if (getState() != STARTED)
0273: throw new IllegalStateException("The MDB is not started");
0274: if (deliveryActive == false)
0275: return;
0276: deliveryActive = false;
0277: stopService();
0278: }
0279:
0280: /**
0281: * Sets the Optimized attribute of the JMSContainerInvoker object
0282: *
0283: * @param optimize The new Optimized value
0284: */
0285: public void setOptimized(final boolean optimize) {
0286: this .optimize = optimize;
0287: }
0288:
0289: public boolean isIdentical(Container container, Invocation mi) {
0290: throw new Error("Not valid for MessageDriven beans");
0291: }
0292:
0293: public Object getEJBHome() {
0294: throw new Error("Not valid for MessageDriven beans");
0295: }
0296:
0297: public EJBMetaData getEJBMetaData() {
0298: throw new Error("Not valid for MessageDriven beans");
0299: }
0300:
0301: public Collection getEntityCollection(Collection ids) {
0302: throw new Error("Not valid for MessageDriven beans");
0303: }
0304:
0305: public Object getEntityEJBObject(Object id) {
0306: throw new Error("Not valid for MessageDriven beans");
0307: }
0308:
0309: public Object getStatefulSessionEJBObject(Object id) {
0310: throw new Error("Not valid for MessageDriven beans");
0311: }
0312:
0313: public Object getStatelessSessionEJBObject() {
0314: throw new Error("Not valid for MessageDriven beans");
0315: }
0316:
0317: public boolean isOptimized() {
0318: return optimize;
0319: }
0320:
0321: /**
0322: * XmlLoadable implementation.
0323: *
0324: * @todo FIXME - we ought to move all config into MDBConfig, but I do not do that
0325: * now due to backward compatibility.
0326: *
0327: * @param element Description of Parameter
0328: * @throws DeploymentException Description of Exception
0329: */
0330: public void importXml(final Element element) throws Exception {
0331: try {
0332: if ("false".equalsIgnoreCase(MetaData
0333: .getElementContent(MetaData.getUniqueChild(element,
0334: "CreateJBossMQDestination")))) {
0335: createJBossMQDestination = false;
0336: }
0337: } catch (Exception ignore) {
0338: }
0339:
0340: try {
0341: String maxMessages = MetaData.getElementContent(MetaData
0342: .getUniqueChild(element, "MaxMessages"));
0343: maxMessagesNr = Integer.parseInt(maxMessages);
0344: } catch (Exception ignore) {
0345: }
0346:
0347: try {
0348: String minSize = MetaData.getElementContent(MetaData
0349: .getUniqueChild(element, "MinimumSize"));
0350: minPoolSize = Integer.parseInt(minSize);
0351: } catch (Exception ignore) {
0352: }
0353:
0354: try {
0355: String maxSize = MetaData.getElementContent(MetaData
0356: .getUniqueChild(element, "MaximumSize"));
0357: maxPoolSize = Integer.parseInt(maxSize);
0358: } catch (Exception ignore) {
0359: }
0360:
0361: try {
0362: String keepAliveMillis = MetaData
0363: .getElementContent(MetaData.getUniqueChild(element,
0364: "KeepAliveMillis"));
0365: keepAlive = Integer.parseInt(keepAliveMillis);
0366: } catch (Exception ignore) {
0367: }
0368:
0369: Element mdbConfig = MetaData.getUniqueChild(element,
0370: "MDBConfig");
0371:
0372: try {
0373: String reconnect = MetaData.getElementContent(MetaData
0374: .getUniqueChild(mdbConfig, "ReconnectIntervalSec"));
0375: reconnectInterval = Long.parseLong(reconnect) * 1000;
0376: } catch (Exception ignore) {
0377: }
0378:
0379: try {
0380: if ("false".equalsIgnoreCase(MetaData
0381: .getElementContent(MetaData.getUniqueChild(
0382: mdbConfig, "DeliveryActive")))) {
0383: deliveryActive = false;
0384: }
0385: } catch (Exception ignore) {
0386: }
0387:
0388: // Get Dead letter queue config - and save it for later use
0389: Element dlqEl = MetaData.getOptionalChild(mdbConfig,
0390: "DLQConfig");
0391: if (dlqEl != null) {
0392: dlqConfig = (Element) dlqEl.cloneNode(true);
0393: useDLQ = true;
0394: } else {
0395: useDLQ = false;
0396: }
0397:
0398: // If these are not found we will get a DeploymentException, I hope
0399: providerAdapterJNDI = MetaData.getElementContent(MetaData
0400: .getUniqueChild(element, "JMSProviderAdapterJNDI"));
0401:
0402: serverSessionPoolFactoryJNDI = MetaData
0403: .getElementContent(MetaData.getUniqueChild(element,
0404: "ServerSessionPoolFactoryJNDI"));
0405:
0406: // Check java:/ prefix
0407: if (!providerAdapterJNDI.startsWith("java:/")) {
0408: providerAdapterJNDI = "java:/" + providerAdapterJNDI;
0409: }
0410:
0411: if (!serverSessionPoolFactoryJNDI.startsWith("java:/")) {
0412: serverSessionPoolFactoryJNDI = "java:/"
0413: + serverSessionPoolFactoryJNDI;
0414: }
0415: }
0416:
0417: /**
0418: * Initialize the container invoker. Sets up a connection, a server session
0419: * pool and a connection consumer for the configured destination.
0420: *
0421: * <p>Any JMSExceptions produced while initializing will be assumed to be caused
0422: * due to JMS Provider failure.
0423: *
0424: * @throws Exception Failed to initalize.
0425: */
0426: protected void createService() throws Exception {
0427: importXml(invokerMetaData.getProxyFactoryConfig());
0428:
0429: exListener = new ExceptionListenerImpl(this );
0430: }
0431:
0432: /**
0433: * Initialize the container invoker. Sets up a connection, a server session
0434: * pool and a connection consumer for the configured destination.
0435: *
0436: * @throws Exception Failed to initalize.
0437: */
0438: protected void innerStartDelivery() throws Exception {
0439: if (deliveryActive == false) {
0440: log.debug("Delivery is disabled");
0441: return;
0442: }
0443:
0444: sendNotification(CONNECTING_NOTIFICATION, null);
0445:
0446: log.debug("Initializing");
0447:
0448: // Get the JMS provider
0449: JMSProviderAdapter adapter = getJMSProviderAdapter();
0450: log.debug("Provider adapter: " + adapter);
0451:
0452: // Set up Dead Letter Queue handler
0453: if (useDLQ) {
0454: dlqHandler = new DLQHandler(adapter, this );
0455: dlqHandler.importXml(dlqConfig);
0456: dlqHandler.create();
0457: }
0458:
0459: // Store TM reference locally - should we test for CMT Required
0460: tm = container.getTransactionManager();
0461:
0462: // Get configuration information - from EJB-xml
0463: MessageDrivenMetaData config = getMetaData();
0464:
0465: // Selector
0466: String messageSelector = config.getMessageSelector();
0467: String activationConfig = getActivationConfigProperty("messageSelector");
0468: if (activationConfig != null)
0469: messageSelector = activationConfig;
0470:
0471: // Queue or Topic - optional unfortunately
0472: String destinationType = config.getDestinationType();
0473: activationConfig = getActivationConfigProperty("destinationType");
0474: if (activationConfig != null)
0475: destinationType = activationConfig;
0476:
0477: // Is container managed?
0478: isContainerManagedTx = config.isContainerManagedTx();
0479: acknowledgeMode = config.getAcknowledgeMode();
0480: activationConfig = getActivationConfigProperty("acknowledgeMode");
0481: if (activationConfig != null) {
0482: if (activationConfig.equals("DUPS_OK_ACKNOWLEDGE"))
0483: acknowledgeMode = MessageDrivenMetaData.DUPS_OK_ACKNOWLEDGE_MODE;
0484: else
0485: acknowledgeMode = MessageDrivenMetaData.AUTO_ACKNOWLEDGE_MODE;
0486: }
0487:
0488: byte txType = config.getMethodTransactionType("onMessage",
0489: new Class[] { Message.class }, InvocationType.LOCAL);
0490: isNotSupportedTx = txType == MetaData.TX_NOT_SUPPORTED;
0491:
0492: // Get configuration data from jboss.xml
0493: String destinationJNDI = config.getDestinationJndiName();
0494: activationConfig = getActivationConfigProperty("destination");
0495: if (activationConfig != null)
0496: destinationJNDI = activationConfig;
0497: // Try any EJB21 destination link
0498: if (destinationJNDI == null) {
0499: String link = config.getDestinationLink();
0500: if (link != null) {
0501: link = link.trim();
0502: if (link.length() > 0) {
0503: MessageDestinationMetaData destinationMetaData = container
0504: .getMessageDestination(link);
0505: if (destinationMetaData == null)
0506: log
0507: .warn("Unresolved message-destination-link '"
0508: + link
0509: + "' no message-destination in ejb-jar.xml");
0510: else {
0511: String jndiName = destinationMetaData
0512: .getJNDIName();
0513: if (jndiName == null)
0514: log
0515: .warn("The message-destination '"
0516: + link
0517: + "' has no jndi-name in jboss.xml");
0518: else
0519: destinationJNDI = jndiName;
0520: }
0521: }
0522: }
0523: }
0524:
0525: String user = config.getUser();
0526: String password = config.getPasswd();
0527:
0528: // Connect to the JNDI server and get a reference to root context
0529: Context context = adapter.getInitialContext();
0530: log.debug("context: " + context);
0531:
0532: // if we can't get the root context then exit with an exception
0533: if (context == null) {
0534: throw new RuntimeException("Failed to get the root context");
0535: }
0536:
0537: // Get the JNDI suffix of the destination
0538: String jndiSuffix = parseJndiSuffix(destinationJNDI, config
0539: .getEjbName());
0540: log.debug("jndiSuffix: " + jndiSuffix);
0541:
0542: // Unfortunately the destination is optional, so if we do not have one
0543: // here we have to look it up if we have a destinationJNDI, else give it
0544: // a default.
0545: if (destinationType == null) {
0546: log
0547: .warn("No message-driven-destination given; using; guessing type");
0548: destinationType = getDestinationType(context,
0549: destinationJNDI);
0550: }
0551:
0552: if ("javax.jms.Topic".equals(destinationType)) {
0553: log.debug("Got destination type Topic for "
0554: + config.getEjbName());
0555:
0556: // create a topic connection
0557: Object factory = context.lookup(adapter
0558: .getTopicFactoryRef());
0559: TopicConnection tConnection = null;
0560: try {
0561: tConnection = ConnectionFactoryHelper
0562: .createTopicConnection(factory, user, password);
0563: connection = tConnection;
0564: } catch (ClassCastException e) {
0565: throw new DeploymentException(
0566: "Expected a TopicConnection check your provider adaptor: "
0567: + adapter.getTopicFactoryRef());
0568: }
0569:
0570: try {
0571: // Fix: ClientId must be set as the first method call after connection creation.
0572: // Fix: ClientId is necessary for durable subscriptions.
0573:
0574: String clientId = config.getClientId();
0575: activationConfig = getActivationConfigProperty("clientID");
0576: if (activationConfig != null)
0577: clientId = activationConfig;
0578:
0579: log.debug("Using client id: " + clientId);
0580: if (clientId != null && clientId.length() > 0)
0581: connection.setClientID(clientId);
0582:
0583: // lookup or create the destination topic
0584: Topic topic = null;
0585: try {
0586: // First we try the specified topic
0587: if (destinationJNDI != null)
0588: topic = (Topic) context.lookup(destinationJNDI);
0589: else if (createJBossMQDestination == false)
0590: throw new DeploymentException(
0591: "Unable to determine destination for '"
0592: + container.getBeanMetaData()
0593: .getEjbName()
0594: + "' use destination-jndi-name in jboss.xml, an activation config property or a message-destination-link");
0595: } catch (NamingException e) {
0596: if (createJBossMQDestination == false)
0597: throw new DeploymentException(
0598: "Could not find the topic destination-jndi-name="
0599: + destinationJNDI, e);
0600: log.warn(
0601: "Could not find the topic destination-jndi-name="
0602: + destinationJNDI, e);
0603: } catch (ClassCastException e) {
0604: throw new DeploymentException(
0605: "Expected a Topic destination-jndi-name="
0606: + destinationJNDI, e);
0607: }
0608:
0609: // FIXME: This is not portable, only works for JBossMQ
0610: if (topic == null)
0611: topic = (Topic) createDestination(Topic.class,
0612: context, "topic/" + jndiSuffix, jndiSuffix);
0613:
0614: // set up the server session pool
0615: pool = createSessionPool(topic, tConnection,
0616: minPoolSize, maxPoolSize, keepAlive, true, // tx
0617: acknowledgeMode, new MessageListenerImpl(this ));
0618:
0619: int subscriptionDurablity = config
0620: .getSubscriptionDurability();
0621: activationConfig = getActivationConfigProperty("subscriptionDurability");
0622: if (activationConfig != null) {
0623: if (activationConfig.equals("Durable"))
0624: subscriptionDurablity = MessageDrivenMetaData.DURABLE_SUBSCRIPTION;
0625: else
0626: subscriptionDurablity = MessageDrivenMetaData.NON_DURABLE_SUBSCRIPTION;
0627: }
0628: // To be no-durable or durable
0629: if (subscriptionDurablity != MessageDrivenMetaData.DURABLE_SUBSCRIPTION) {
0630: // Create non durable
0631: connectionConsumer = tConnection
0632: .createConnectionConsumer(topic,
0633: messageSelector, pool,
0634: maxMessagesNr);
0635: } else {
0636: // Durable subscription
0637: String durableName = config.getSubscriptionId();
0638: activationConfig = getActivationConfigProperty("subscriptionName");
0639: if (activationConfig != null)
0640: durableName = activationConfig;
0641:
0642: connectionConsumer = tConnection
0643: .createDurableConnectionConsumer(topic,
0644: durableName, messageSelector, pool,
0645: maxMessagesNr);
0646: }
0647: log.debug("Topic connectionConsumer set up");
0648: } catch (Throwable t) {
0649: try {
0650: tConnection.close();
0651: } catch (Throwable ignored) {
0652: }
0653: DeploymentException.rethrowAsDeploymentException(
0654: "Error during topic setup", t);
0655: }
0656: } else if ("javax.jms.Queue".equals(destinationType)) {
0657: log.debug("Got destination type Queue for "
0658: + config.getEjbName());
0659:
0660: // create a queue connection
0661: Object qFactory = context.lookup(adapter
0662: .getQueueFactoryRef());
0663: QueueConnection qConnection = null;
0664: try {
0665: qConnection = ConnectionFactoryHelper
0666: .createQueueConnection(qFactory, user, password);
0667: connection = qConnection;
0668: } catch (ClassCastException e) {
0669: throw new DeploymentException(
0670: "Expected a QueueConnection check your provider adaptor: "
0671: + adapter.getQueueFactoryRef());
0672: }
0673:
0674: try {
0675: // Set the optional client id
0676: String clientId = config.getClientId();
0677: activationConfig = getActivationConfigProperty("clientID");
0678: if (activationConfig != null)
0679: clientId = activationConfig;
0680:
0681: log.debug("Using client id: " + clientId);
0682: if (clientId != null && clientId.length() > 0)
0683: connection.setClientID(clientId);
0684:
0685: // lookup or create the destination queue
0686: Queue queue = null;
0687: try {
0688: // First we try the specified queue
0689: if (destinationJNDI != null)
0690: queue = (Queue) context.lookup(destinationJNDI);
0691: else if (createJBossMQDestination == false)
0692: throw new DeploymentException(
0693: "Unable to determine destination for '"
0694: + container.getBeanMetaData()
0695: .getEjbName()
0696: + "' use destination-jndi-name in jboss.xml, an activation config property or a message-destination-link");
0697: } catch (NamingException e) {
0698: if (createJBossMQDestination == false)
0699: throw new DeploymentException(
0700: "Could not find the queue destination-jndi-name="
0701: + destinationJNDI, e);
0702: log
0703: .warn("Could not find the queue destination-jndi-name="
0704: + destinationJNDI);
0705: } catch (ClassCastException e) {
0706: throw new DeploymentException(
0707: "Expected a Queue destination-jndi-name="
0708: + destinationJNDI);
0709: }
0710:
0711: // FIXME: This is not portable, only works for JBossMQ
0712: if (queue == null)
0713: queue = (Queue) createDestination(Queue.class,
0714: context, "queue/" + jndiSuffix, jndiSuffix);
0715:
0716: // set up the server session pool
0717: pool = createSessionPool(queue, qConnection,
0718: minPoolSize, maxPoolSize, keepAlive, true, // tx
0719: acknowledgeMode, new MessageListenerImpl(this ));
0720: log.debug("Server session pool: " + pool);
0721:
0722: // create the connection consumer
0723: connectionConsumer = qConnection
0724: .createConnectionConsumer(queue,
0725: messageSelector, pool, maxMessagesNr);
0726: log.debug("Connection consumer: " + connectionConsumer);
0727: } catch (Throwable t) {
0728: try {
0729: qConnection.close();
0730: } catch (Throwable ignored) {
0731: }
0732: DeploymentException.rethrowAsDeploymentException(
0733: "Error during queue setup", t);
0734: }
0735: } else
0736: throw new DeploymentException("Unknown destination-type "
0737: + destinationType);
0738:
0739: log.debug("Initialized with config " + toString());
0740:
0741: context.close();
0742:
0743: if (dlqHandler != null) {
0744: dlqHandler.start();
0745: }
0746:
0747: if (connection != null) {
0748: connection.setExceptionListener(exListener);
0749: connection.start();
0750: }
0751:
0752: sendNotification(CONNECTED_NOTIFICATION, null);
0753: }
0754:
0755: protected void startService() throws Exception {
0756: try {
0757: innerStartDelivery();
0758: } catch (final Throwable t) {
0759: // start a thread up to handle recovering the connection. so we can
0760: // attach to the jms resources once they become available
0761: exListener.handleFailure(t);
0762: return;
0763: } finally {
0764: // Clear any security context established by the jms connection
0765: SecurityActions.clear();
0766: }
0767: }
0768:
0769: protected void stopService() throws Exception {
0770: // Silence the exception listener
0771: if (exListener != null) {
0772: exListener.stop();
0773: }
0774:
0775: innerStopDelivery();
0776: }
0777:
0778: /**
0779: * Stop done from inside, we should not stop the exceptionListener in inner
0780: * stop.
0781: */
0782: protected void innerStopDelivery() {
0783: log.debug("innerStop");
0784:
0785: sendNotification(DISCONNECTING_NOTIFICATION, null);
0786:
0787: try {
0788: if (connection != null) {
0789: connection.setExceptionListener(null);
0790: log.debug("unset exception listener");
0791: }
0792: } catch (Throwable t) {
0793: log.trace("Could not set ExceptionListener to null", t);
0794: }
0795:
0796: // Stop the connection
0797: try {
0798: if (connection != null) {
0799: connection.stop();
0800: log.debug("connection stopped");
0801: }
0802: } catch (Throwable t) {
0803: log.trace("Could not stop JMS connection", t);
0804: }
0805:
0806: try {
0807: if (dlqHandler != null)
0808: dlqHandler.stop();
0809: } catch (Throwable t) {
0810: log.trace("Failed to stop the dlq handler", t);
0811: }
0812:
0813: // close the connection consumer
0814: try {
0815: if (connectionConsumer != null)
0816: connectionConsumer.close();
0817: } catch (Throwable t) {
0818: log.trace("Failed to close connection consumer", t);
0819: }
0820: connectionConsumer = null;
0821:
0822: // clear the server session pool (if it is clearable)
0823: try {
0824: if (pool instanceof StdServerSessionPool) {
0825: StdServerSessionPool p = (StdServerSessionPool) pool;
0826: p.clear();
0827: }
0828: } catch (Throwable t) {
0829: log.trace("Failed to clear session pool", t);
0830: }
0831:
0832: // close the connection
0833: if (connection != null) {
0834: try {
0835: connection.close();
0836: } catch (Throwable t) {
0837: log.trace("Failed to close connection", t);
0838: }
0839: }
0840: connection = null;
0841:
0842: // Take down DLQ
0843: try {
0844: if (dlqHandler != null) {
0845: dlqHandler.destroy();
0846: }
0847: } catch (Throwable t) {
0848: log.trace("Failed to close the dlq handler", t);
0849: }
0850: dlqHandler = null;
0851:
0852: sendNotification(DISCONNECTED_NOTIFICATION, null);
0853: }
0854:
0855: public Object invoke(Object id, Method m, Object[] args,
0856: Transaction tx, Principal identity, Object credential)
0857: throws Exception {
0858:
0859: Invocation invocation = new Invocation(id, m, args, tx,
0860: identity, credential);
0861: invocation.setType(InvocationType.LOCAL);
0862:
0863: // Set the right context classloader
0864: ClassLoader oldCL = TCLAction.UTIL.getContextClassLoader();
0865: TCLAction.UTIL
0866: .setContextClassLoader(container.getClassLoader());
0867: try {
0868: return container.invoke(invocation);
0869: } finally {
0870: TCLAction.UTIL.setContextClassLoader(oldCL);
0871: }
0872: }
0873:
0874: /**
0875: * Try to get a destination type by looking up the destination JNDI, or
0876: * provide a default if there is not destinationJNDI or if it is not possible
0877: * to lookup.
0878: *
0879: * @param ctx The naming context to lookup destinations from.
0880: * @param destinationJNDI The name to use when looking up destinations.
0881: * @return The destination type, either derived from destinationJDNI or
0882: * DEFAULT_DESTINATION_TYPE
0883: */
0884: protected String getDestinationType(Context ctx,
0885: String destinationJNDI) {
0886: String destType = null;
0887:
0888: if (destinationJNDI != null) {
0889: try {
0890: Destination dest = (Destination) ctx
0891: .lookup(destinationJNDI);
0892: if (dest instanceof javax.jms.Topic) {
0893: destType = "javax.jms.Topic";
0894: } else if (dest instanceof javax.jms.Queue) {
0895: destType = "javax.jms.Queue";
0896: }
0897: } catch (NamingException ex) {
0898: log.debug(
0899: "Could not do heristic lookup of destination ",
0900: ex);
0901: }
0902:
0903: }
0904: if (destType == null) {
0905: log
0906: .warn("Could not determine destination type, defaults to: "
0907: + DEFAULT_DESTINATION_TYPE);
0908:
0909: destType = DEFAULT_DESTINATION_TYPE;
0910: }
0911:
0912: return destType;
0913: }
0914:
0915: /**
0916: * Return the JMSProviderAdapter that should be used.
0917: *
0918: * @return The JMSProviderAdapter to use.
0919: */
0920: protected JMSProviderAdapter getJMSProviderAdapter()
0921: throws NamingException {
0922: Context context = new InitialContext();
0923: try {
0924: log.debug("Looking up provider adapter: "
0925: + providerAdapterJNDI);
0926: return (JMSProviderAdapter) context
0927: .lookup(providerAdapterJNDI);
0928: } finally {
0929: context.close();
0930: }
0931: }
0932:
0933: /**
0934: * Create and or lookup a JMS destination.
0935: *
0936: * @param type Either javax.jms.Queue or javax.jms.Topic.
0937: * @param ctx The naming context to lookup destinations from.
0938: * @param jndiName The name to use when looking up destinations.
0939: * @param jndiSuffix The name to use when creating destinations.
0940: * @return The destination.
0941: * @throws IllegalArgumentException Type is not Queue or Topic.
0942: * @throws Exception Description of Exception
0943: */
0944: protected Destination createDestination(final Class type,
0945: final Context ctx, final String jndiName,
0946: final String jndiSuffix) throws Exception {
0947: try {
0948: // first try to look it up
0949: return (Destination) ctx.lookup(jndiName);
0950: } catch (NamingException e) {
0951: // if the lookup failes, the try to create it
0952: log.warn("destination not found: " + jndiName + " reason: "
0953: + e);
0954: log.warn("creating a new temporary destination: "
0955: + jndiName);
0956:
0957: //
0958: // jason: we should do away with this...
0959: //
0960: // attempt to create the destination (note, this is very
0961: // very, very unportable).
0962: //
0963:
0964: MBeanServer server = org.jboss.mx.util.MBeanServerLocator
0965: .locateJBoss();
0966:
0967: String methodName;
0968: if (type == Topic.class) {
0969: methodName = "createTopic";
0970: } else if (type == Queue.class) {
0971: methodName = "createQueue";
0972: } else {
0973: // type was not a Topic or Queue, bad user
0974: throw new IllegalArgumentException(
0975: "Expected javax.jms.Queue or javax.jms.Topic: "
0976: + type);
0977: }
0978:
0979: // invoke the server to create the destination
0980: server.invoke(new ObjectName(
0981: "jboss.mq:service=DestinationManager"), methodName,
0982: new Object[] { jndiSuffix },
0983: new String[] { "java.lang.String" });
0984:
0985: // try to look it up again
0986: return (Destination) ctx.lookup(jndiName);
0987: }
0988: }
0989:
0990: protected String getActivationConfigProperty(String property) {
0991: MessageDrivenMetaData mdmd = getMetaData();
0992: ActivationConfigPropertyMetaData acpmd = mdmd
0993: .getActivationConfigProperty(property);
0994: if (acpmd != null)
0995: return acpmd.getValue();
0996: else
0997: return null;
0998: }
0999:
1000: /**
1001: * Create a server session pool for the given connection.
1002: *
1003: * @param destination the destination
1004: * @param connection The connection to use.
1005: * @param minSession The minumum number of sessions
1006: * @param maxSession The maximum number of sessions.
1007: * @param keepAlive The time to keep sessions alive
1008: * @param isTransacted True if the sessions are transacted.
1009: * @param ack The session acknowledgement mode.
1010: * @param listener The message listener.
1011: * @return A server session pool.
1012: * @throws JMSException
1013: * @throws NamingException Description of Exception
1014: */
1015: protected ServerSessionPool createSessionPool(
1016: final Destination destination, final Connection connection,
1017: final int minSession, final int maxSession,
1018: final long keepAlive, final boolean isTransacted,
1019: final int ack, final MessageListener listener)
1020: throws NamingException, JMSException {
1021: ServerSessionPool pool;
1022: Context context = new InitialContext();
1023:
1024: try {
1025: // first lookup the factory
1026: log.debug("looking up session pool factory: "
1027: + serverSessionPoolFactoryJNDI);
1028: ServerSessionPoolFactory factory = (ServerSessionPoolFactory) context
1029: .lookup(serverSessionPoolFactoryJNDI);
1030:
1031: // the create the pool
1032: pool = factory.getServerSessionPool(destination,
1033: connection, minSession, maxSession, keepAlive,
1034: isTransacted, ack, !isContainerManagedTx
1035: || isNotSupportedTx, listener);
1036: } finally {
1037: context.close();
1038: }
1039:
1040: return pool;
1041: }
1042:
1043: /**
1044: * Notify of an event
1045: *
1046: * @param event the event
1047: * @param userData any user data, e.g. the exception on a failure
1048: */
1049: protected void sendNotification(String event, Object userData) {
1050: Notification notif = new Notification(event, getServiceName(),
1051: getNextNotificationSequenceNumber());
1052: notif.setUserData(userData);
1053: sendNotification(notif);
1054: }
1055:
1056: /**
1057: * Parse the JNDI suffix from the given JNDI name.
1058: *
1059: * @param jndiname The JNDI name used to lookup the destination.
1060: * @param defautSuffix Description of Parameter
1061: * @return The parsed suffix or the defaultSuffix
1062: */
1063: protected String parseJndiSuffix(final String jndiname,
1064: final String defautSuffix) {
1065: // jndiSuffix is merely the name that the user has given the MDB.
1066: // since the jndi name contains the message type I have to split
1067: // at the "/" if there is no slash then I use the entire jndi name...
1068: String jndiSuffix = "";
1069:
1070: if (jndiname != null) {
1071: int indexOfSlash = jndiname.indexOf("/");
1072: if (indexOfSlash != -1) {
1073: jndiSuffix = jndiname.substring(indexOfSlash + 1);
1074: } else {
1075: jndiSuffix = jndiname;
1076: }
1077: } else {
1078: // if the jndi name from jboss.xml is null then lets use the ejbName
1079: jndiSuffix = defautSuffix;
1080: }
1081:
1082: return jndiSuffix;
1083: }
1084:
1085: /**
1086: * An implementation of MessageListener that passes messages on to the
1087: * container invoker.
1088: */
1089: class MessageListenerImpl implements MessageListener {
1090: /** The container invoker. */
1091: JMSContainerInvoker invoker;
1092:
1093: /**
1094: * Construct a <tt>MessageListenerImpl</tt> .
1095: *
1096: * @param invoker The container invoker. Must not be null.
1097: */
1098: MessageListenerImpl(final JMSContainerInvoker invoker) {
1099: this .invoker = invoker;
1100: }
1101:
1102: /**
1103: * Process a message.
1104: *
1105: * @param message The message to process.
1106: */
1107: public void onMessage(final Message message) {
1108: if (log.isTraceEnabled()) {
1109: log.trace("processing message: " + message);
1110: }
1111:
1112: Object id;
1113: try {
1114: id = message.getJMSMessageID();
1115: } catch (JMSException e) {
1116: // what ?
1117: id = "JMSContainerInvoker";
1118: }
1119:
1120: // Invoke, shuld we catch any Exceptions??
1121: try {
1122: Transaction tx = tm.getTransaction();
1123:
1124: // DLQHandling
1125: if (useDLQ && // Is Dead Letter Queue used at all
1126: message.getJMSRedelivered() && // Was message resent
1127: dlqHandler
1128: .handleRedeliveredMessage(message, tx)) //Did the DLQ handler take care of the message
1129: {
1130: // Message will be placed on Dead Letter Queue,
1131: // if redelivered to many times
1132: return;
1133: }
1134:
1135: invoker.invoke(id, // Object id - where used?
1136: ON_MESSAGE, // Method to invoke
1137: new Object[] { message }, // argument
1138: tx, // Transaction
1139: null, // Principal
1140: null); // Cred
1141:
1142: } catch (Exception e) {
1143: log.error("Exception in JMSCI message listener", e);
1144: }
1145: }
1146: }
1147:
1148: /** ExceptionListener for failover handling. */
1149: class ExceptionListenerImpl implements ExceptionListener {
1150: Object lock = new Object();
1151: JMSContainerInvoker invoker;
1152: Thread currentThread;
1153: boolean notStopped = true;
1154:
1155: /**
1156: * Create a new ExceptionListenerImpl.
1157: *
1158: * @param invoker the container invoker
1159: */
1160: ExceptionListenerImpl(final JMSContainerInvoker invoker) {
1161: this .invoker = invoker;
1162: }
1163:
1164: /**
1165: * Called on jms connection failure events
1166: *
1167: * @param ex the jms connection failure exception
1168: */
1169: public void onException(JMSException ex) {
1170: handleFailure(ex);
1171: }
1172:
1173: /**
1174: * Handle a failure
1175: *
1176: * @param t the failure
1177: */
1178: public void handleFailure(Throwable t) {
1179: MessageDrivenMetaData metaData = invoker.getMetaData();
1180: log.warn("JMS provider failure detected for "
1181: + metaData.getEjbName(), t);
1182:
1183: // JBAS-3750 - Help debug integration with foreign JMS providers
1184: if (t instanceof JMSException) {
1185: Exception le = ((JMSException) t).getLinkedException();
1186: if (le != null)
1187: log.debug("Linked exception: " + le + ", cause: "
1188: + le.getCause());
1189: }
1190:
1191: // Run the reconnection in the background
1192: String name = "JMSContainerInvoker("
1193: + metaData.getEjbName() + ") Reconnect";
1194: synchronized (lock) {
1195: if (currentThread != null) {
1196: log.debug("Already a reconnect thread: "
1197: + currentThread + " for "
1198: + metaData.getEjbName());
1199: return;
1200: }
1201: Runnable runnable = new ExceptionListenerRunnable(t);
1202: currentThread = new Thread(runnable, name);
1203: try {
1204: currentThread.setDaemon(true);
1205: currentThread.start();
1206: } catch (RuntimeException rethrow) {
1207: currentThread = null;
1208: throw rethrow;
1209: } catch (Error rethrow) {
1210: currentThread = null;
1211: throw rethrow;
1212: }
1213: }
1214: }
1215:
1216: class ExceptionListenerRunnable implements Runnable {
1217: Throwable failure;
1218:
1219: /**
1220: * Create a new ExceptionListenerRunnable.
1221: *
1222: * @param failure the error
1223: */
1224: public ExceptionListenerRunnable(Throwable failure) {
1225: this .failure = failure;
1226: }
1227:
1228: /**
1229: * Try to reconnect to the jms provider until explicitly stopped.
1230: */
1231: public void run() {
1232: MessageDrivenMetaData metaData = invoker.getMetaData();
1233: try {
1234: boolean tryIt = true;
1235: while (tryIt && notStopped) {
1236: try {
1237: invoker.innerStopDelivery();
1238: } catch (Throwable t) {
1239: log.error(
1240: "Unhandled error stopping connection for "
1241: + metaData.getEjbName(), t);
1242: }
1243:
1244: sendNotification(FAILURE_NOTIFICATION, failure);
1245:
1246: try {
1247: log.info("Waiting for reconnect internal "
1248: + reconnectInterval + "ms for "
1249: + metaData.getEjbName());
1250: try {
1251: Thread.sleep(reconnectInterval);
1252: } catch (InterruptedException ie) {
1253: tryIt = false;
1254: return;
1255: }
1256:
1257: // Reboot container
1258: log
1259: .info("Trying to reconnect to JMS provider for "
1260: + metaData.getEjbName());
1261: invoker.innerStartDelivery();
1262: tryIt = false;
1263:
1264: log.info("Reconnected to JMS provider for "
1265: + metaData.getEjbName());
1266: } catch (Throwable t) {
1267: log.error(
1268: "Reconnect failed: JMS provider failure detected for "
1269: + metaData.getEjbName(), t);
1270: }
1271: }
1272: } finally {
1273: synchronized (lock) {
1274: currentThread = null;
1275: }
1276: }
1277: }
1278: }
1279:
1280: void stop() {
1281: synchronized (lock) {
1282: log.debug("Stop requested for recovery thread: "
1283: + currentThread);
1284: notStopped = false;
1285: if (currentThread != null) {
1286: currentThread.interrupt();
1287: log.debug("Recovery thread interrupted: "
1288: + currentThread);
1289: }
1290: }
1291: }
1292: }
1293:
1294: /**
1295: * Return a string representation of the current config state.
1296: */
1297: public String toString() {
1298: MessageDrivenMetaData metaData = getMetaData();
1299: String destinationJNDI = metaData.getDestinationJndiName();
1300: return super .toString() + "{ maxMessagesNr=" + maxMessagesNr
1301: + ", maxPoolSize=" + maxPoolSize
1302: + ", reconnectInterval=" + reconnectInterval
1303: + ", providerAdapterJNDI=" + providerAdapterJNDI
1304: + ", serverSessionPoolFactoryJNDI="
1305: + serverSessionPoolFactoryJNDI + ", acknowledgeMode="
1306: + acknowledgeMode + ", isContainerManagedTx="
1307: + isContainerManagedTx + ", isNotSupportedTx="
1308: + isNotSupportedTx + ", useDLQ=" + useDLQ
1309: + ", dlqHandler=" + dlqHandler + ", destinationJNDI="
1310: + destinationJNDI + " }";
1311: }
1312:
1313: interface TCLAction {
1314: class UTIL {
1315: static TCLAction getTCLAction() {
1316: return System.getSecurityManager() == null ? NON_PRIVILEGED
1317: : PRIVILEGED;
1318: }
1319:
1320: static ClassLoader getContextClassLoader() {
1321: return getTCLAction().getContextClassLoader();
1322: }
1323:
1324: static ClassLoader getContextClassLoader(Thread thread) {
1325: return getTCLAction().getContextClassLoader(thread);
1326: }
1327:
1328: static void setContextClassLoader(ClassLoader cl) {
1329: getTCLAction().setContextClassLoader(cl);
1330: }
1331:
1332: static void setContextClassLoader(Thread thread,
1333: ClassLoader cl) {
1334: getTCLAction().setContextClassLoader(thread, cl);
1335: }
1336: }
1337:
1338: TCLAction NON_PRIVILEGED = new TCLAction() {
1339: public ClassLoader getContextClassLoader() {
1340: return Thread.currentThread().getContextClassLoader();
1341: }
1342:
1343: public ClassLoader getContextClassLoader(Thread thread) {
1344: return thread.getContextClassLoader();
1345: }
1346:
1347: public void setContextClassLoader(ClassLoader cl) {
1348: Thread.currentThread().setContextClassLoader(cl);
1349: }
1350:
1351: public void setContextClassLoader(Thread thread,
1352: ClassLoader cl) {
1353: thread.setContextClassLoader(cl);
1354: }
1355: };
1356:
1357: TCLAction PRIVILEGED = new TCLAction() {
1358: private final PrivilegedAction getTCLPrivilegedAction = new PrivilegedAction() {
1359: public Object run() {
1360: return Thread.currentThread()
1361: .getContextClassLoader();
1362: }
1363: };
1364:
1365: public ClassLoader getContextClassLoader() {
1366: return (ClassLoader) AccessController
1367: .doPrivileged(getTCLPrivilegedAction);
1368: }
1369:
1370: public ClassLoader getContextClassLoader(final Thread thread) {
1371: return (ClassLoader) AccessController
1372: .doPrivileged(new PrivilegedAction() {
1373: public Object run() {
1374: return thread.getContextClassLoader();
1375: }
1376: });
1377: }
1378:
1379: public void setContextClassLoader(final ClassLoader cl) {
1380: AccessController.doPrivileged(new PrivilegedAction() {
1381: public Object run() {
1382: Thread.currentThread()
1383: .setContextClassLoader(cl);
1384: return null;
1385: }
1386: });
1387: }
1388:
1389: public void setContextClassLoader(final Thread thread,
1390: final ClassLoader cl) {
1391: AccessController.doPrivileged(new PrivilegedAction() {
1392: public Object run() {
1393: thread.setContextClassLoader(cl);
1394: return null;
1395: }
1396: });
1397: }
1398: };
1399:
1400: ClassLoader getContextClassLoader();
1401:
1402: ClassLoader getContextClassLoader(Thread thread);
1403:
1404: void setContextClassLoader(ClassLoader cl);
1405:
1406: void setContextClassLoader(Thread thread, ClassLoader cl);
1407: }
1408: }
|