TransactionalMessageListenerContainer.cs :  » Inversion-of-Control-Dependency-Injection » Spring.net » Spring » Messaging » Listener » C# / CSharp Open Source

Home
C# / CSharp Open Source
1.2.6.4 mono .net core
2.2.6.4 mono core
3.Aspect Oriented Frameworks
4.Bloggers
5.Build Systems
6.Business Application
7.Charting Reporting Tools
8.Chat Servers
9.Code Coverage Tools
10.Content Management Systems CMS
11.CRM ERP
12.Database
13.Development
14.Email
15.Forum
16.Game
17.GIS
18.GUI
19.IDEs
20.Installers Generators
21.Inversion of Control Dependency Injection
22.Issue Tracking
23.Logging Tools
24.Message
25.Mobile
26.Network Clients
27.Network Servers
28.Office
29.PDF
30.Persistence Frameworks
31.Portals
32.Profilers
33.Project Management
34.RSS RDF
35.Rule Engines
36.Script
37.Search Engines
38.Sound Audio
39.Source Control
40.SQL Clients
41.Template Engines
42.Testing
43.UML
44.Web Frameworks
45.Web Service
46.Web Testing
47.Wiki Engines
48.Windows Presentation Foundation
49.Workflows
50.XML Parsers
C# / C Sharp
C# / C Sharp by API
C# / CSharp Tutorial
C# / CSharp Open Source » Inversion of Control Dependency Injection » Spring.net 
Spring.net » Spring » Messaging » Listener » TransactionalMessageListenerContainer.cs
#region License

/*
 * Copyright 2002-2008 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#endregion

using System;
using System.Messaging;
using Common.Logging;
using Spring.Data.Core;
using Spring.Messaging.Core;
using Spring.Messaging.Support;
using Spring.Transaction;
using Spring.Transaction.Support;

namespace Spring.Messaging.Listener{
    /// <summary>
    /// A MessageListenerContainer that uses local (non-DTC) based transactions.  Exceptions are
    /// handled by instances of <see cref="IMessageTransactionExceptionHandler"/>.
    /// </summary>
    /// <remarks>
    /// <para>
    /// This container distinguishes between two types of <see cref="IPlatformTransactionManager"/>
    /// implementations.
    /// </para>  
    /// <para>If you specify a <see cref="MessageQueueTransactionManager"/> then
    /// a MSMQ <see cref="MessageQueueTransaction"/> will be started 
    /// before receiving the message and used as part of the container's recieve operation.  The 
    /// <see cref="MessageQueueTransactionManager"/> binds the <see cref="MessageQueueTransaction"/> 
    /// to thread local storage and as such will implicitly be used by 
    /// <see cref="MessageQueueTemplate"/> send and receive operations to a transactional queue.
    /// </para>
    /// <para>
    /// Service layer operations that are called inside the message listener will typically
    /// be transactional based on using standard Spring declarative transaction management
    /// functionality.  In case of exceptions in the service layer, the database operation 
    /// will have been rolled back and the <see cref="IMessageTransactionExceptionHandler"/> 
    /// that is later invoked should decide to either commit the surrounding local 
    /// MSMQ based transaction (removing the message from the queue) or to rollback 
    /// (placing the message back on the queue for redelivery).
    /// </para>
    /// <para>
    /// The use of a transactional service layer in combination with 
    /// a container managed <see cref="MessageQueueTransaction"/> is a powerful combination
    /// that can be used to achieve "exactly one" transaction message processing with 
    /// database operations that are commonly associated with using transactional messaging and 
    /// distributed transactions (i.e. both the messaging and database operation commit or rollback
    /// together).  
    /// </para>
    /// <para>
    /// The additional programming logic needed to achieve this is to keep track of the Message.Id
    /// that has been processed successfully within the transactional service layer.  
    /// This is needed as there may be a system failure (e.g. power goes off) 
    /// between the 'inner' database commit and the 'outer' messaging commit, resulting
    /// in message redelivery.  The transactional service layer needs logic to detect if incoming
    /// message was processed successfully. It can do this by checking the database for an
    /// indication of successfull processing, perhaps by recording the Message.Id itself in a
    /// status table.  If the transactional service layer determines that the message has 
    /// already been processed, it can throw a specific exception for thise case.  The 
    /// container's exception handler will recognize this exception type and vote to commit 
    /// (remove from the queue) the 'outer' messaging transaction.  
    /// Spring provides an exception handler with this functionality, 
    /// see <see cref="SendToQueueExceptionHandler"/> for more information.
    /// </para>
    /// <para>If you specify an implementation of <see cref="IResourceTransactionManager"/> 
    /// (e.g. <see cref="AdoPlatformTransactionManager"/> or HibernateTransactionManager) then
    /// an local database transaction will be started before receiving the message.  By default,
    /// the container will also start a local <see cref="MessageQueueTransaction"/>
    /// after the local database transaction has started, but before the receiving the message.
    /// The <see cref="MessageQueueTransaction"/> will be used to receive the message.
    /// If you do not want his behavior set <see cref="UseContainerManagedMessageQueueTransaction"/>
    /// to false.  Also by default, the <see cref="MessageQueueTransaction"/>
    /// will be bound to thread local storage such that any <see cref="MessageQueueTemplate"/>
    /// send or recieve operations will participate transparently in the same
    /// <see cref="MessageQueueTransaction"/>.  If you do not want this behavior 
    /// set the property <see cref="ExposeContainerManagedMessageQueueTransaction"/> to false.
    /// </para>
    /// <para>In case of exceptions during <see cref="IMessageListener"/> processing
    /// when using an implementation of 
    /// <see cref="IResourceTransactionManager"/> (e.g. and starting a container managed 
    /// <see cref="MessageQueueTransaction"/>) the container's
    /// <see cref="IMessageTransactionExceptionHandler"/> will determine if the 
    /// <see cref="MessageQueueTransaction"/> should commit (removing it from the queue)
    /// or rollback (placing it back on the queue for redelivery).  The listener 
    /// exception will always
    /// trigger a rollback in the 'outer' (e.g. <see cref="AdoPlatformTransactionManager"/> 
    /// or HibernateTransactionManager) based transaction.
    /// </para>
    /// <para>
    /// PoisonMessage handing, that is endless redelivery of a message due to exceptions
    /// during processing, can be detected using implementatons of the 
    /// <see cref="IMessageTransactionExceptionHandler"/> interface.  A specific implementation 
    /// is provided that will move the poison message to another queue after a maximum number
    /// of redelivery attempts.  See <see cref="SendToQueueExceptionHandler"/> for more information.
    /// </para>
    /// </remarks>
    public class TransactionalMessageListenerContainer : AbstractTransactionalMessageListenerContainer
    {
        #region Logging Definition

        private static readonly ILog LOG = LogManager.GetLogger(typeof (TransactionalMessageListenerContainer));

        #endregion

        #region Fields

        private bool useContainerManagedMessageQueueTransaction = false;

        private bool useMessageQueueTransactionManagerCalled = false;

        private bool exposeContainerManagedMessageQueueTransaction = true;

        private IMessageTransactionExceptionHandler messageTransactionExceptionHandler;

        #endregion

        #region Properties

        /// <summary>
        /// Gets or sets a value indicating whether the MessageListenerContainer should be
        /// responsible for creating a MessageQueueTransaction
        /// when receiving a message.
        /// </summary>
        /// <remarks>
        /// <para>
        /// Creating MessageQueueTransactions is usually the responsibility of the
        /// IPlatformTransactionManager, e.g. TxScopePlatformTransactionManager (when using DTC)
        /// or MessageQueueTransactionManager (when using local messaging transactions).
        /// </para>
        /// <para>
        /// For all other IPlatformTransactionManager implementations, including when none is
        /// specified, the MessageListenerContainer will itself create a MessageQueueTransaction
        /// (assuming the container is consuming from a transactional queue).
        /// </para>
        /// <para>
        /// Set the ExposeContainerManagedMessageQueueTransaction property to true if you want
        /// the MessageQueueTransaction to be exposed to Spring's MessageQueueTemplate class
        /// </para>
        /// </remarks>
        /// <value>
        ///   <c>true</c> to use a container managed MessageQueueTransaction; otherwise, <c>false</c>.
        /// </value>
        public bool UseContainerManagedMessageQueueTransaction
        {
            get { return useContainerManagedMessageQueueTransaction; }
            set
            {
                useContainerManagedMessageQueueTransaction = value;
                useMessageQueueTransactionManagerCalled = true;
            }
        }

        /// <summary>
        /// Gets or sets a value indicating whether expose the 
        /// container managed <see cref="MessageQueueTransaction"/> to thread local storage
        /// where it will be automatically used by <see cref="MessageQueueTemplate"/> send
        /// and receive operations.
        /// </summary>
        /// <remarks>
        /// Using an <see cref="MessageQueueTransactionManager"/> will always exposes a
        /// <see cref="MessageQueueTransaction"/> to thread local storage.  This property
        /// only has effect when using a non-DTC based 
        /// </remarks>
        /// <value>
        ///   <c>true</c> if [expose container managed message queue transaction]; otherwise, <c>false</c>.
        /// </value>
        public bool ExposeContainerManagedMessageQueueTransaction
        {
            get { return exposeContainerManagedMessageQueueTransaction; }
            set { exposeContainerManagedMessageQueueTransaction = value; }
        }

        /// <summary>
        /// Gets or sets the message transaction exception handler.
        /// </summary>
        /// <value>The message transaction exception handler.</value>
        public IMessageTransactionExceptionHandler MessageTransactionExceptionHandler
        {
            get { return messageTransactionExceptionHandler; }
            set { messageTransactionExceptionHandler = value; }
        }

        #endregion

        #region Public Methods

        /// <summary>
        /// Determine if the container should create its own
        /// MessageQueueTransaction when a IResourceTransactionManager is specified.
        /// Set the transaction name to the name of the spring object.
        /// Call base class Initialize() funtionality
        /// </summary>
        public override void Initialize()
        {
            //using non-DTC based transaction manager?
            bool isRtm = PlatformTransactionManager is IResourceTransactionManager;
            //using MessageQueueTransactionManager?
            bool isQtm = PlatformTransactionManager is MessageQueueTransactionManager;

            if (!isRtm && !isQtm)
            {
                throw new ArgumentException("Can not use the provied IPlatformTransactionManager of type " 
                    + PlatformTransactionManager.GetType()                     
                    + ".  It must implement IResourceTransactionManager or be a MessageQueueTransactionManager.");
            }

            //Set useContainerManagedMessageQueueTransaction = true when using 
            // 1. non-DTC based transaction manager
            // 2. not the MessageQueueTransactionManager.
            if (!useMessageQueueTransactionManagerCalled && isRtm && !isQtm)
            {
                useContainerManagedMessageQueueTransaction = true;
            }

            // Use object name as default transaction name.
            if (TransactionDefinition.Name == null)
            {
                TransactionDefinition.Name = ObjectName;
            }

            // Proceed with superclass initialization.
            base.Initialize();
        }

        #endregion

        #region Protected Methods

        /// <summary>
        /// Does the receive and execute using platform transaction manager.
        /// </summary>
        /// <param name="mq">The message queue.</param>
        /// <param name="status">The transactional status.</param>
        /// <returns>
        /// true if should continue peeking, false otherwise.
        /// </returns>
        protected override bool DoReceiveAndExecuteUsingPlatformTransactionManager(MessageQueue mq,
                                                                                   ITransactionStatus status)
        {
            if (PlatformTransactionManager is MessageQueueTransactionManager)
            {
                return DoRecieveAndExecuteUsingMessageQueueTransactionManager(mq, status);
            }
            else if (PlatformTransactionManager is IResourceTransactionManager)
            {
                if (UseContainerManagedMessageQueueTransaction)
                {
                    return DoRecieveAndExecuteUsingResourceTransactionManagerWithTxQueue(mq, status);
                }
                else
                {
                    //recieve non-transactionally from transactional queue but
                    //use ResourceBasedTransactionManagement.
                    DoRecieveAndExecuteUsingResourceTransactionManager();
                }
            }
            return false;
        }

        private void DoRecieveAndExecuteUsingResourceTransactionManager()
        {
            //This is a bit of an odd case since really one is better off using
            //NonTransactionalMessageListenerContainer and having the database
            //transaction done in the service tier.

            throw new NotSupportedException("Try using NonTransactionalMessageListenerContainer instead.");
        }

        /// <summary>
        /// Does the recieve and execute using message queue transaction manager.
        /// </summary>
        /// <param name="mq">The message queue.</param>
        /// <param name="status">The transactional status.</param>
        /// <returns>true if should continue peeking, false otherise</returns>
        protected virtual bool DoRecieveAndExecuteUsingMessageQueueTransactionManager(MessageQueue mq,
                                                                                      ITransactionStatus status)
        {
            #region Logging

            if (LOG.IsDebugEnabled)
            {
                LOG.Debug("Executing DoRecieveAndExecuteUsingMessageQueueTransactionManager");
            }

            #endregion Logging

            Message message;

            #region Receive message

            try
            {
                message = mq.Receive(TimeSpan.Zero, QueueUtils.GetMessageQueueTransaction(null));
            }
            catch (MessageQueueException ex)
            {
                if (ex.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout)
                {
                    //expected to occur occasionally
                    if (LOG.IsTraceEnabled)
                    {
                        LOG.Trace("IOTimeout: Message to receive was already processed by another thread.");
                    }
                    status.SetRollbackOnly();
                    return false; // no more peeking unless this is the last listener thread
                }
                else
                {
                    // A real issue in receiving the message

                    #region Logging

                    if (LOG.IsErrorEnabled)
                    {
                        LOG.Error("Error receiving message from DefaultMessageQueue [" + mq.Path +
                                  "], closing queue and clearing connection cache.");
                    }

                    #endregion

                    lock (messageQueueMonitor)
                    {
                        mq.Close();
                        MessageQueue.ClearConnectionCache();
                    }
                    throw; // will cause rollback in MessageQueueTransactionManager and log exception
                }
            }

            #endregion

            if (message == null)
            {
                #region Logging

                if (LOG.IsTraceEnabled)
                {
                    LOG.Trace("Message recieved is null from Queue = [" + mq.Path + "]");
                }

                #endregion

                status.SetRollbackOnly();
                return false; // no more peeking unless this is the last listener thread
            }

            try
            {
                #region Logging

                if (LOG.IsDebugEnabled)
                {
                    LOG.Debug("Received message [" + message.Id + "] on queue [" + mq.Path + "]");
                }

                #endregion

                MessageReceived(message);
                DoExecuteListener(message);

                #region Logging

                if (LOG.IsTraceEnabled)
                {
                    LOG.Trace("MessageListener executed");
                }

                #endregion
            }
            catch (Exception ex)
            {
                //Exception may indicate rollback of database transaction in service layer.
                //Let the handler determine if the message should be removed from the queue.
                TransactionAction action =
                    HandleTransactionalListenerException(ex, message, QueueUtils.GetMessageQueueTransaction(null));
                if (action == TransactionAction.Rollback)
                {
                    #region Logging

                    if (LOG.IsDebugEnabled)
                    {
                        LOG.Debug(
                            "Exception handler's TransactionAction has rolled back MessageQueueTransaction for queue [" +
                            mq.Path + "]");
                    }

                    #endregion

                    status.SetRollbackOnly();
                    return false; // no more peeking unless this is the last listener thread
                }
                else
                {
                    LOG.Info("Committing MessageQueueTransaction due to explicit commit request by exception handler.");
                }
            }
            finally
            {
                message.Dispose();
            }
            return true;
        }

        /// <summary>
        /// Does the recieve and execute using a local MessageQueueTransaction.
        /// </summary>
        /// <param name="mq">The mqessage queue.</param>
        /// <param name="status">The transactional status.</param>
        /// <returns>true if should continue peeking, false otherwise.</returns>
        protected virtual bool DoRecieveAndExecuteUsingResourceTransactionManagerWithTxQueue(MessageQueue mq,
                                                                                             ITransactionStatus status)
        {
            #region Logging

            if (LOG.IsDebugEnabled)
            {
                LOG.Debug("Executing DoRecieveAndExecuteUsingResourceTransactionManagerWithTxQueue");
            }

            #endregion Logging

            using (MessageQueueTransaction messageQueueTransaction = new MessageQueueTransaction())
            {
                messageQueueTransaction.Begin();

                #region Logging

                if (LOG.IsTraceEnabled)
                {
                    LOG.Trace("Started MessageQueueTransaction for queue = [" + mq.Path + "]");
                }

                #endregion

                Message message;

                #region ReceiveMessage

                try
                {
                    #region Logging

                    if (LOG.IsTraceEnabled)
                    {
                        LOG.Trace("Receiving message with zero timeout for queue = [" + mq.Path + "]");
                    }

                    #endregion

                    message = mq.Receive(TimeSpan.Zero, messageQueueTransaction);
                }
                catch (MessageQueueException ex)
                {
                    if (ex.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout)
                    {
                        //expected to occur occasionally

                        #region Logging

                        if (LOG.IsTraceEnabled)
                        {
                            LOG.Trace(
                                "MessageQueueErrorCode.IOTimeout: No message available to receive.  May have been processed by another thread.");
                        }

                        #endregion

                        status.SetRollbackOnly();
                        return false; // no more peeking unless this is the last listener thread
                    }
                    else
                    {
                        // A real issue in receiving the message

                        #region Logging

                        if (LOG.IsErrorEnabled)
                        {
                            LOG.Error("Error receiving message from DefaultMessageQueue [" + mq.Path +
                                      "], closing queue and clearing connection cache.");
                        }

                        #endregion

                        lock (messageQueueMonitor)
                        {
                            mq.Close();
                            MessageQueue.ClearConnectionCache();
                        }
                        throw; // will cause rollback in surrounding platform transaction manager and log exception
                    }
                }

                #endregion

                if (message == null)
                {
                    #region Logging

                    if (LOG.IsTraceEnabled)
                    {
                        LOG.Trace("Message recieved is null from Queue = [" + mq.Path + "]");
                    }

                    #endregion

                    status.SetRollbackOnly();
                    return false; // no more peeking unless this is the last listener thread
                }

                try
                {
                    #region Logging

                    if (LOG.IsDebugEnabled)
                    {
                        LOG.Debug("Received message [" + message.Id + "] on queue [" + mq.Path + "]");
                    }

                    #endregion

                    MessageReceived(message);

                    if (ExposeContainerManagedMessageQueueTransaction)
                    {
                        TransactionSynchronizationManager.BindResource(
                            MessageQueueTransactionManager.CURRENT_TRANSACTION_SLOTNAME,
                            new LocallyExposedMessageQueueResourceHolder(messageQueueTransaction));
                    }

                    DoExecuteListener(message);

                    #region Logging

                    if (LOG.IsTraceEnabled)
                    {
                        LOG.Trace("MessageListener executed");
                    }

                    #endregion

                    messageQueueTransaction.Commit();

                    #region Logging

                    if (LOG.IsTraceEnabled)
                    {
                        LOG.Trace("Committed MessageQueueTransaction for queue [" + mq.Path + "]");
                    }

                    #endregion
                }
                catch (Exception ex)
                {
                    TransactionAction action =
                        HandleTransactionalListenerException(ex, message, messageQueueTransaction);
                    if (action == TransactionAction.Rollback)
                    {
                        messageQueueTransaction.Abort();

                        #region Logging

                        if (LOG.IsDebugEnabled)
                        {
                            LOG.Debug(
                                "Exception handler's TransactionAction has rolled back MessageQueueTransaction for queue [" +
                                mq.Path + "]");
                        }

                        #endregion
                    }
                    else
                    {
                        // Will remove from the message queue
                        messageQueueTransaction.Commit();

                        #region Logging

                        if (LOG.IsDebugEnabled)
                        {
                            LOG.Debug(
                                "Exception handler's TransactionAction has committed MessageQueueTransaction for queue [" +
                                mq.Path + "]");
                        }

                        #endregion
                    }
                    //Outer db-tx will rollback
                    throw;
                }
                finally
                {
                    if (ExposeContainerManagedMessageQueueTransaction)
                    {
                        TransactionSynchronizationManager.UnbindResource(
                            MessageQueueTransactionManager.CURRENT_TRANSACTION_SLOTNAME);
                    }
                    message.Dispose();
                }
                return true;
            }
        }


        /// <summary>
        /// Handles the transactional listener exception.
        /// </summary>
        /// <param name="e">The exception thrown while processing the message.</param>
        /// <param name="message">The message.</param>
        /// <param name="messageQueueTransaction">The message queue transaction.</param>
        /// <returns>The TransactionAction retruned by the TransactionalExceptionListener</returns>
        protected virtual TransactionAction HandleTransactionalListenerException(Exception e, Message message,
                                                                                 MessageQueueTransaction
                                                                                     messageQueueTransaction)
        {
            try
            {
                TransactionAction transactionAction =
                    InvokeTransactionalExceptionListener(e, message, messageQueueTransaction);
                if (Active)
                {
                    // Regular case: failed while active.
                    // Log at error level.
                    LOG.Error("Execution of message listener failed", e);
                }
                else
                {
                    // Rare case: listener thread failed after container shutdown.
                    // Log at debug level, to avoid spamming the shutdown log.
                    LOG.Debug("Listener exception after container shutdown", e);
                }
                return transactionAction;
            }
            catch (Exception ex)
            {
                LOG.Error("Exception invoking MessageTransactionExceptionHandler.  Rolling back transaction.", ex);
                return TransactionAction.Rollback;
            }
        }


        /// <summary>
        /// Invokes the transactional exception listener.
        /// </summary>
        /// <param name="e">The exception thrown during message processing.</param>
        /// <param name="message">The message.</param>
        /// <param name="messageQueueTransaction">The message queue transaction.</param>
        /// <returns>TransactionAction.Rollback if no exception handler is defined, otherwise the 
        /// TransactionAction returned by the exception handler</returns>
        protected virtual TransactionAction InvokeTransactionalExceptionListener(Exception e, Message message,
                                                                                 MessageQueueTransaction
                                                                                     messageQueueTransaction)
        {
            IMessageTransactionExceptionHandler exceptionHandler = MessageTransactionExceptionHandler;
            if (exceptionHandler != null)
            {
                return exceptionHandler.OnException(e, message, messageQueueTransaction);
            }
            else
            {
                LOG.Warn("No MessageTransactionExceptionHandler defined.  Defaulting to TransactionAction.Rollback.");
                return TransactionAction.Rollback;
            }
        }

        #endregion
    }
}
www.java2v.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.