EmsTemplate.cs :  » Inversion-of-Control-Dependency-Injection » Spring.net » Spring » Messaging » Ems » Core » C# / CSharp Open Source

Home
C# / CSharp Open Source
1.2.6.4 mono .net core
2.2.6.4 mono core
3.Aspect Oriented Frameworks
4.Bloggers
5.Build Systems
6.Business Application
7.Charting Reporting Tools
8.Chat Servers
9.Code Coverage Tools
10.Content Management Systems CMS
11.CRM ERP
12.Database
13.Development
14.Email
15.Forum
16.Game
17.GIS
18.GUI
19.IDEs
20.Installers Generators
21.Inversion of Control Dependency Injection
22.Issue Tracking
23.Logging Tools
24.Message
25.Mobile
26.Network Clients
27.Network Servers
28.Office
29.PDF
30.Persistence Frameworks
31.Portals
32.Profilers
33.Project Management
34.RSS RDF
35.Rule Engines
36.Script
37.Search Engines
38.Sound Audio
39.Source Control
40.SQL Clients
41.Template Engines
42.Testing
43.UML
44.Web Frameworks
45.Web Service
46.Web Testing
47.Wiki Engines
48.Windows Presentation Foundation
49.Workflows
50.XML Parsers
C# / C Sharp
C# / C Sharp by API
C# / CSharp Tutorial
C# / CSharp Open Source » Inversion of Control Dependency Injection » Spring.net 
Spring.net » Spring » Messaging » Ems » Core » EmsTemplate.cs
#region License

/*
 * Copyright  2002-2006 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#endregion

using System;
using Common.Logging;
using Spring.Messaging.Ems.Common;
using Spring.Messaging.Ems.Connections;
using Spring.Messaging.Ems.Support;
using Spring.Messaging.Ems.Support.Converter;
using Spring.Messaging.Ems.Support.Destinations;
using Spring.Transaction.Support;
using Spring.Util;
using TIBCO.EMS;
using QueueTIBCO.EMS.Queue;

namespace Spring.Messaging.Ems.Core{
    /// <summary> Helper class that simplifies EMS access code.</summary>
    /// <remarks>
    /// <para>If you want to use dynamic destination creation, you must specify
    /// the type of EMS destination to create, using the "pubSubDomain" property.
    /// For other operations, this is not necessary.
    /// Point-to-Point (Queues) is the default domain.</para>
    ///
    /// <para>Default settings for EMS Sessions is "auto-acknowledge".</para>
    ///
    /// <para>This template uses a DynamicDestinationResolver and a SimpleMessageConverter
    /// as default strategies for resolving a destination name or converting a message,
    /// respectively.</para>
    ///
    /// </remarks>
    /// <author>Mark Pollack</author>
    /// <author>Juergen Hoeller</author>
    /// <author>Mark Pollack (.NET)</author>
    public class EmsTemplate : EmsDestinationAccessor, IEmsOperations
    {
        #region Logging

        private readonly ILog logger = LogManager.GetLogger(typeof(EmsTemplate));


        #endregion
        #region Fields

        /// <summary>
        /// Timeout value indicating that a receive operation should
      /// check if a message is immediately available without blocking.   
        /// </summary>
        public static readonly long DEFAULT_RECEIVE_TIMEOUT = -1;

        private EmsTemplateResourceFactory transactionalResourceFactory;

        private object defaultDestination;

        private IMessageConverter messageConverter;


        private bool messageIdEnabled = true;

        private bool messageTimestampEnabled = true;

        private bool pubSubNoLocal = false;

        private long receiveTimeout = DEFAULT_RECEIVE_TIMEOUT;

        private bool explicitQosEnabled = false;

        private int priority = Message.DEFAULT_PRIORITY;

        private int deliveryMode = Message.DEFAULT_DELIVERY_MODE;

        private long timeToLive = Message.DEFAULT_TIME_TO_LIVE;
           
        #endregion

        #region Constructor (s)

        /// <summary> Create a new EmsTemplate.</summary>
        /// <remarks>
        /// <para>Note: The ConnectionFactory has to be set before using the instance.
        /// This constructor can be used to prepare a EmsTemplate via an ObjectFactory,
        /// typically setting the ConnectionFactory.</para>
        /// </remarks>
        public EmsTemplate()
        {
            transactionalResourceFactory = new EmsTemplateResourceFactory(this);
            InitDefaultStrategies();
        }


        /// <summary> Create a new EmsTemplate, given a ConnectionFactory.</summary>
        /// <param name="connectionFactory">the ConnectionFactory to obtain Connections from
        /// </param>
        public EmsTemplate(IConnectionFactory connectionFactory)
            : this()
        {
            ConnectionFactory = connectionFactory;
            AfterPropertiesSet();
        }

        #endregion

        #region Methods

        /// <summary> Initialize the default implementations for the template's strategies:
        /// DynamicDestinationResolver and SimpleMessageConverter.
        /// </summary>
        protected virtual void InitDefaultStrategies()
        {
            MessageConverter = new SimpleMessageConverter();
        }

        private void CheckDefaultDestination()
        {
            if (defaultDestination == null)
            {
                throw new SystemException(
                    "No defaultDestination or defaultDestinationName specified. Check configuration of EmsTemplate.");
            }
        }


        private void CheckMessageConverter()
        {
            if (MessageConverter == null)
            {
                throw new SystemException("No messageConverter registered. Check configuration of EmsTemplate.");
            }
        }

        /// <summary> Execute the action specified by the given action object within a
        /// EMS Session.
        /// </summary>
        /// <remarks> Generalized version of <code>execute(SessionCallback)</code>,
        /// allowing the EMS Connection to be started on the fly.
        /// <p>Use <code>execute(SessionCallback)</code> for the general case.
        /// Starting the EMS Connection is just necessary for receiving messages,
        /// which is preferably achieved through the <code>receive</code> methods.</p>
        /// </remarks>
        /// <param name="action">callback object that exposes the session
        /// </param>
        /// <param name="startConnection">Start the connection before performing callback action.
        /// </param>
        /// <returns> the result object from working with the session
        /// </returns>
        /// <exception cref="EMSException">If there is any problem accessing the EMS API</exception>
        public virtual object Execute(ISessionCallback action, bool startConnection)
        {
            return Execute(action.DoInEms, startConnection);
        }


        /// <summary> Execute the action specified by the given action object within a
        /// EMS Session.
        /// </summary>
        /// <remarks> Generalized version of <code>execute(SessionCallback)</code>,
        /// allowing the EMS Connection to be started on the fly.
        /// <p>Use <code>execute(SessionCallback)</code> for the general case.
        /// Starting the EMS Connection is just necessary for receiving messages,
        /// which is preferably achieved through the <code>receive</code> methods.</p>
        /// </remarks>
        /// <param name="action">callback object that exposes the session
        /// </param>
        /// <param name="startConnection">Start the connection before performing callback action.
        /// </param>
        /// <returns> the result object from working with the session
        /// </returns>
        /// <exception cref="EMSException">If there is any problem accessing the EMS API</exception>
        public virtual object Execute(SessionDelegate action, bool startConnection)
        {
            AssertUtils.ArgumentNotNull(action, "Callback object must not be null");

            IConnection conToClose = null;
            ISession sessionToClose = null;
            // bool sessionInTLS = true;

            //NOTE: Not closing session or connection unless session is not returned from 
            //      ConnectionFactoryUtils.DoGetTransactionalSession and CacheEmsResources is set to false
            try
            {
                ISession sessionToUse =
                    ConnectionFactoryUtils.DoGetTransactionalSession(ConnectionFactory, transactionalResourceFactory,
                                                                     startConnection);
                if (sessionToUse == null)
                {
                    //sessionInTLS = false;
                    conToClose = CreateConnection();
                    sessionToClose = CreateSession(conToClose);
                    if (startConnection)
                    {
                        conToClose.Start();
                    }
                    sessionToUse = sessionToClose;
                }
                if (logger.IsDebugEnabled)
                {
                    logger.Debug("Executing callback on EMS Session [" + sessionToUse + "]");
                }
                return action(sessionToUse);
            }
            finally
            {
                EmsUtils.CloseSession(sessionToClose);
                ConnectionFactoryUtils.ReleaseConnection(conToClose, ConnectionFactory, startConnection);
                /*
                if (!sessionInTLS && !CacheEmsResources)
                {
                    EmsUtils.CloseSession(session);
                    ConnectionFactoryUtils.ReleaseConnection(con, ConnectionFactory, startConnection);
                }*/
            }
        }

        #endregion

        #region Properties

        /// <summary>
        /// Gets or sets the default destination to be used on send/receive operations that do not
        /// have a destination parameter.
        /// </summary>
        /// <remarks>Alternatively, specify a "defaultDestinationName", to be
        /// dynamically resolved via the DestinationResolver.</remarks>
        /// <value>The default destination.</value>
        virtual public Destination DefaultDestination
        {
            get { return (defaultDestination as Destination); }

            set { defaultDestination = value; }
        }


        /// <summary>
        /// Gets or sets the name of the default destination name
        /// to be used on send/receive operations that
        /// do not have a destination parameter.
        /// </summary>
        /// <remarks>
        /// Alternatively, specify a EMS Destination object as "DefaultDestination"
        /// </remarks>
        /// <value>The name of the default destination.</value>
        virtual public string DefaultDestinationName
        {
            get { return (defaultDestination as string); }

            set { defaultDestination = value; }
        }

        /// <summary>
        /// Gets or sets the message converter for this template.
        /// </summary>
        /// <remarks>
        /// Used to resolve
        /// Object parameters to convertAndSend methods and Object results
        /// from receiveAndConvert methods.
        /// <p>The default converter is a SimpleMessageConverter, which is able
        /// to handle BytesMessages, TextMessages and ObjectMessages.</p>
        /// </remarks>
        /// <value>The message converter.</value>
        virtual public IMessageConverter MessageConverter
        {
            get { return messageConverter; }

            set { messageConverter = value; }
        }

        /// <summary>
        /// Gets or sets a value indicating whether Message Ids are enabled.
        /// </summary>
        /// <value><c>true</c> if message id enabled; otherwise, <c>false</c>.</value>
        virtual public bool MessageIdEnabled
        {
            get { return messageIdEnabled; }

            set { messageIdEnabled = value; }
        }

        /// <summary>
        /// Gets or sets a value indicating whether message timestamps are enabled.
        /// </summary>
        /// <value>
        ///   <c>true</c> if [message timestamp enabled]; otherwise, <c>false</c>.
        /// </value>
        virtual public bool MessageTimestampEnabled
        {
            get { return messageTimestampEnabled; }

            set { messageTimestampEnabled = value; }
        }


        /// <summary>
        /// Gets or sets a value indicating whether to inhibit the delivery of messages published by its own connection.
        ///
        /// </summary>
        /// <value><c>true</c> if inhibit the delivery of messages published by its own connection; otherwise, <c>false</c>.</value>
        virtual public bool PubSubNoLocal
        {
            get { return pubSubNoLocal; }

            set { pubSubNoLocal = value; }
        }

        /// <summary>
        /// Gets or sets the receive timeout to use for recieve calls.
        /// </summary>
        /// <remarks>The default is -1, which means no timeout.</remarks>
        /// <value>The receive timeout.</value>
        virtual public long ReceiveTimeout
        {
            get { return receiveTimeout; }

            set { receiveTimeout = value; }
        }

        /// <summary>
        /// Gets or sets a value indicating whether to use explicit Quality of Service values.
        /// </summary>
        /// <remarks>If "true", then the values of deliveryMode, priority, and timeToLive
        /// will be used when sending a message. Otherwise, the default values,
        /// that may be set administratively, will be used</remarks>
        /// <value><c>true</c> if use explicit QoS values; otherwise, <c>false</c>.</value>
        virtual public bool ExplicitQosEnabled
        {
            get { return explicitQosEnabled; }

            set { explicitQosEnabled = value; }
        }

        /// <summary>
        /// Sets a value indicating the delivery mode QOS
        /// </summary>
        /// <remarks>
        /// This will set the delivery to persistent, non-persistent, or reliable delivery.
        /// Default value is Message.DEFAULT_DELIVERY_MODE (aka TIBCO.EMS.DeliveryMode.PERSISTENT)
        /// </remarks>
        /// <value>Integer value representing the delivery mode [delivery persistent]; otherwise, <c>false</c>.</value>
        virtual public int DeliveryMode
        {
      get { return deliveryMode; }
      
            set { deliveryMode = value; }
        }

        /// <summary>
        /// Gets or sets the priority when sending.
        /// </summary>
        /// <remarks>Since a default value may be defined administratively,
        /// this is only used when "isExplicitQosEnabled" equals "true".</remarks>
        /// <value>The priority.</value>
        virtual public int Priority
        {
            get { return priority; }

            set { priority = value; }
        }

        /// <summary>
        /// Gets or sets the time to live when sending
        /// </summary>
        /// <remarks>Since a default value may be defined administratively,
        /// this is only used when "isExplicitQosEnabled" equals "true".</remarks>
        /// <value>The time to live.</value>
        virtual public long TimeToLive
        {
            get { return timeToLive; }

            set { timeToLive = value; }
        }

        /*        
        /// <summary>
        /// Gets or sets a value indicating whether the EmsTemplate should itself
        /// be responsible for caching EMS Connection/Session/MessageProducer as compared to
        /// creating new instances per operation (unless such resources are already
        /// present in Thread-Local storage either due to the use of EmsTransactionMananger or
        /// SimpleMessageListenerContainer at an outer calling layer.
        /// </summary>
        /// <remarks>Connection/Session/MessageProducer are thread-safe classes in TIBCO EMS.</remarks>
        /// <value><c>true</c> to locally cache ems resources; otherwise, <c>false</c>.</value>
        virtual public bool CacheEmsResources
        {
            get { return cacheEmsResources; }
            set { cacheEmsResources = value; }
        }*/


        #endregion

        /// <summary>
        /// Extract the content from the given JMS message.
        /// </summary>
        /// <param name="message">The Message to convert (can be <code>null</code>).</param>
        /// <returns>The content of the message, or <code>null</code> if none</returns>
        protected virtual object DoConvertFromMessage(Message message)
        {
            if (message != null)
            {
                return MessageConverter.FromMessage(message);
            }
            return null;
        }

        #region EMS Factory Methods

        /// <summary> Fetch an appropriate Connection from the given EmsResourceHolder.
        /// </summary>
        /// <param name="holder">the EmsResourceHolder
        /// </param>
        /// <returns> an appropriate Connection fetched from the holder,
        /// or <code>null</code> if none found
        /// </returns>
        protected virtual IConnection GetConnection(EmsResourceHolder holder)
        {
            return holder.GetConnection();
        }

        /// <summary> Fetch an appropriate Session from the given EmsResourceHolder.
        /// </summary>
        /// <param name="holder">the EmsResourceHolder
        /// </param>
        /// <returns> an appropriate Session fetched from the holder,
        /// or <code>null</code> if none found
        /// </returns>
        protected virtual ISession GetSession(EmsResourceHolder holder)
        {
            return holder.GetSession();
        }

        /// <summary> Create a EMS MessageProducer for the given Session and Destination,
        /// configuring it to disable message ids and/or timestamps (if necessary).
        /// <p>Delegates to <code>doCreateProducer</code> for creation of the raw
        /// EMS MessageProducer</p>
        /// </summary>
        /// <param name="session">the EMS Session to create a MessageProducer for
        /// </param>
        /// <param name="destination">the EMS Destination to create a MessageProducer for
        /// </param>
        /// <returns> the new EMS MessageProducer
        /// </returns>
        /// <exception cref="EMSException">If there is any problem accessing the EMS API</exception>
        /// <seealso cref="DoCreateProducer">
        /// </seealso>
        /// <seealso cref="MessageIdEnabled">
        /// </seealso>
        /// <seealso cref="MessageTimestampEnabled">
        /// </seealso>
        protected virtual IMessageProducer CreateProducer(ISession session, Destination destination)
        {
            IMessageProducer producer = DoCreateProducer(session, destination);
            if (!MessageIdEnabled)
            {
                producer.DisableMessageID = true;
            }
            if (!MessageTimestampEnabled)
            {
                producer.DisableMessageTimestamp = true;
            }
            return producer;
        }


        /// <summary>
        /// Determines whether the given Session is locally transacted, that is, whether
        /// its transaction is managed by this template class's Session handling
        /// and not by an external transaction coordinator. 
        /// </summary>
        /// <remarks>
        /// The Session's own transacted flag will already have been checked
        /// before. This method is about finding out whether the Session's transaction
        /// is local or externally coordinated.
        /// </remarks>
        /// <param name="session">The session to check.</param>
        /// <returns>
        ///   <c>true</c> if the session is locally transacted; otherwise, <c>false</c>.
        /// </returns>
        protected virtual bool IsSessionLocallyTransacted(ISession session)
        {
            return SessionTransacted &&
                !ConnectionFactoryUtils.IsSessionTransactional(session, ConnectionFactory);
        }

        /// <summary> Create a raw EMS MessageProducer for the given Session and Destination.
        /// </summary>
        /// <remarks>If CacheJmsResource is true, then the producer 
        /// will be created upon the first invocation and will retrun the same
        /// producer (per destination) on all subsequent calls.
        /// </remarks>
        /// <param name="session">the EMS Session to create a MessageProducer for
        /// </param>
        /// <param name="destination">the EMS Destination to create a MessageProducer for
        /// </param>
        /// <returns> the new EMS MessageProducer
        /// </returns>
        /// <exception cref="EMSException">If there is any problem accessing the EMS API</exception>
        protected virtual IMessageProducer DoCreateProducer(ISession session, Destination destination)
        {
            return session.CreateProducer(destination);
            /*
            if (CacheEmsResources)
            {
                if (destination == null)
                {
                    if (emsResources.UnspecifiedDestinationMessageProducer == null)
                    {
                        emsResources.UnspecifiedDestinationMessageProducer = session.CreateProducer(destination);
                    }
                    return emsResources.UnspecifiedDestinationMessageProducer;
                }
                IMessageProducer producer = (IMessageProducer)emsResources.Producers[destination];
                if (producer != null)
                {
                    #region Logging

                    if (logger.IsDebugEnabled)
                    {
                        logger.Debug("Found cached MessageProducer for destination [" + destination + "]");
                    }

                    #endregion
                }
                else
                {
                    producer = session.CreateProducer(destination);
                    emsResources.Producers.Add(destination, producer);
                    #region Logging

                    if (logger.IsDebugEnabled)
                    {
                        logger.Debug("Created cached MessageProducer for destination [" + destination + "]");
                    }

                    #endregion
                }
                return producer;
            }
            else
            {
                return session.CreateProducer(destination);
            }*/      
        }

        /// <summary> Create a EMS MessageConsumer for the given Session and Destination.
        /// </summary>
        /// <param name="session">the EMS Session to create a MessageConsumer for
        /// </param>
        /// <param name="destination">the EMS Destination to create a MessageConsumer for
        /// </param>
        /// <param name="messageSelector">the message selector for this consumer (can be <code>null</code>)
        /// </param>
        /// <returns> the new EMS MessageConsumer
        /// </returns>
        /// <exception cref="EMSException">If there is any problem accessing the EMS API</exception>
        protected virtual IMessageConsumer CreateConsumer(ISession session, Destination destination,
                                                          string messageSelector)
        {
            // Only pass in the NoLocal flag in case of a Topic:
            // Some EMS providers, such as WebSphere MQ 6.0, throw IllegalStateException
            // in case of the NoLocal flag being specified for a Queue.
            if (PubSubDomain)
            {
                return session.CreateConsumer(destination, messageSelector, PubSubNoLocal);
            }
            else
            {
                return session.CreateConsumer(destination, messageSelector);
            }
        }
        /*
        /// <summary>Create a EMS Connection via this template's ConnectionFactory.
        /// </summary>
        /// <remarks>If CacheJmsResource is true, then the connection 
        /// will be created upon the first invocation and will retrun the same
        /// connection on all subsequent calls.
        /// </remarks>
        /// <returns>A EMS Connection
        /// </returns>
        /// <exception cref="EMSException">If there is any problem accessing the EMS API</exception>
        protected override IConnection CreateConnection()
        {
            if (CacheEmsResources)
            {
                if (emsResources.Connection == null)
                {
                    emsResources.Connection = ConnectionFactory.CreateConnection();
                }
                return emsResources.Connection;

            }
            else
            {
                return ConnectionFactory.CreateConnection();
            }
        }*/
        /*
        /// <summary> Create a EMS Session for the given Connection.
        /// </summary>
        /// <remarks>If CacheJmsResource is true, then the session 
        /// will be created upon the first invocation and will retrun the same
        /// session on all subsequent calls.
        /// </remarks>
        /// <param name="con">the EMS Connection to create a Session for
        /// </param>
        /// <returns> the new EMS Session
        /// </returns>
        /// <exception cref="EMSException">If there is any problem accessing the EMS API</exception>
        protected override ISession CreateSession(IConnection con)
        {
            if (CacheEmsResources)
            {
                if (emsResources.Session == null)
                {
                    emsResources.Session = emsResources.Connection.CreateSession(SessionTransacted, SessionAcknowledgeMode);
                }
                return emsResources.Session;
            }
            else
            {
                return con.CreateSession(SessionTransacted, SessionAcknowledgeMode);
            }
        }*/

        /// <summary>
        /// Send the given message.
        /// </summary>
        /// <param name="session">The session to operate on.</param>
        /// <param name="destination">The destination to send to.</param>
        /// <param name="messageCreatorDelegate">The message creator delegate callback to create a Message.</param>
        protected internal virtual void DoSend(ISession session, Destination destination, MessageCreatorDelegate messageCreatorDelegate)
        {
            AssertUtils.ArgumentNotNull(messageCreatorDelegate, "IMessageCreatorDelegate must not be null");
            DoSend(session, destination, null, messageCreatorDelegate);
        }

        /// <summary>
        /// Send the given message.
        /// </summary>
        /// <param name="session">The session to operate on.</param>
        /// <param name="destination">The destination to send to.</param>
        /// <param name="messageCreator">The message creator callback to create a Message.</param>
        protected internal virtual void DoSend(ISession session, Destination destination, IMessageCreator messageCreator)
        {
            AssertUtils.ArgumentNotNull(messageCreator, "IMessageCreator must not be null");
            DoSend(session, destination, messageCreator, null);
        }

        /// <summary> Send the given EMS message.</summary>
        /// <param name="session">the EMS Session to operate on
        /// </param>
        /// <param name="destination">the EMS Destination to send to
        /// </param>
        /// <param name="messageCreator">callback to create a EMS Message
        /// </param>
        /// <param name="messageCreatorDelegate">delegate callback to create a EMS Message
        /// </param>
        /// <exception cref="EMSException">If there is any problem accessing the EMS API</exception>
        protected internal virtual void DoSend(ISession session, Destination destination, IMessageCreator messageCreator,
                                               MessageCreatorDelegate messageCreatorDelegate)
        {


            IMessageProducer producer = CreateProducer(session, destination);            
            try
            {
                
                Message message;
                if (messageCreator != null)
                {
                    message = messageCreator.CreateMessage(session) ;
                }
                else {
                    message = messageCreatorDelegate(session);
                }
                if (logger.IsDebugEnabled)
                {
                    logger.Debug("Sending created message [" + message + "]");
                }
                DoSend(producer, message);

                // Check commit, avoid commit call is Session transaction is externally coordinated.
                if (session.Transacted && IsSessionLocallyTransacted(session))
                {
                    // Transacted session created by this template -> commit.
                    EmsUtils.CommitIfNecessary(session);
                }
            }
            finally
            {
                EmsUtils.CloseMessageProducer(producer);
            }
        }


        /// <summary> Actually send the given EMS message.</summary>
        /// <param name="producer">the EMS MessageProducer to send with
        /// </param>
        /// <param name="message">the EMS Message to send
        /// </param>
        /// <exception cref="EMSException">If there is any problem accessing the EMS API</exception>
        protected virtual void DoSend(IMessageProducer producer, Message message)
        {
            if (ExplicitQosEnabled)
            {
                producer.Send(message, DeliveryMode, Priority, TimeToLive);
            }
            else
            {
                producer.Send(message);
            }
        }


        #endregion

        #region IEmsOperations Implementation

        /// <summary>
        /// Execute the action specified by the given action object within
        /// a EMS Session.
        /// </summary>
        /// <param name="del">delegate that exposes the session</param>
        /// <returns>
        /// the result object from working with the session
        /// </returns>
        /// <remarks>
        ///   <para>Note that the value of PubSubDomain affects the behavior of this method.
        /// If PubSubDomain equals true, then a Session is passed to the callback.
        /// If false, then a Session is passed to the callback.</para>b
        /// </remarks>
        /// <exception cref="EMSException">If there is any problem accessing the EMS API</exception>
        public object Execute(SessionDelegate del)
        {
            return Execute(new ExecuteSessionCallbackUsingDelegate(del));
        }

        /// <summary> Execute the action specified by the given action object within
        /// a EMS Session.
        /// <p>Note: The value of PubSubDomain affects the behavior of this method.
        /// If PubSubDomain equals true, then a Session is passed to the callback.
        /// If false, then a Session is passed to the callback.</p>
        /// </summary>
        /// <param name="action">callback object that exposes the session
        /// </param>
        /// <returns> the result object from working with the session
        /// </returns>
        /// <exception cref="EMSException">If there is any problem accessing the EMS API</exception>
        public object Execute(ISessionCallback action)
        {
            return Execute(action, false);
        }

        /// <summary> Send a message to a EMS destination. The callback gives access to
        /// the EMS session and MessageProducer in order to do more complex
        /// send operations.
        /// </summary>
        /// <param name="action">callback object that exposes the session/producer pair
        /// </param>
        /// <returns> the result object from working with the session
        /// </returns>
        /// <exception cref="EMSException">If there is any problem accessing the EMS API</exception>
        public object Execute(IProducerCallback action)
        {
            return Execute(new ProducerCreatorCallback(this, action));
        }

        /// <summary> Send a message to a EMS destination. The callback gives access to
        /// the EMS session and MessageProducer in order to do more complex
        /// send operations.
        /// </summary>
        /// <param name="del">delegate that exposes the session/producer pair
        /// </param>
        /// <returns> the result object from working with the session
        /// </returns>
        /// <exception cref="EMSException">If there is any problem accessing the EMS API</exception>
        public object Execute(ProducerDelegate del)
        {
            return Execute(new ProducerCreatorCallback(this, del));
        }

        /// <summary> Send a message to the default destination.
        /// <p>This will only work with a default destination specified!</p>
        /// </summary>
        /// <param name="messageCreatorDelegate">delegate callback to create a message
        /// </param>
        /// <exception cref="EMSException">If there is any problem accessing the EMS API</exception>
        public void SendWithDelegate(MessageCreatorDelegate messageCreatorDelegate)
        {
            CheckDefaultDestination();
            if (DefaultDestination != null)
            {
                SendWithDelegate(DefaultDestination, messageCreatorDelegate);
            }
            else
            {
                SendWithDelegate(DefaultDestinationName, messageCreatorDelegate);
            }
        }

        /// <summary> Send a message to the specified destination.
        /// The MessageCreator callback creates the message given a Session.
        /// </summary>
        /// <param name="destination">the destination to send this message to
        /// </param>
        /// <param name="messageCreatorDelegate">delegate callback to create a message
        /// </param>
        /// <exception cref="EMSException">If there is any problem accessing the EMS API</exception>
        public void SendWithDelegate(Destination destination, MessageCreatorDelegate messageCreatorDelegate)
        {
            Execute(new SendDestinationCallback(this, destination, messageCreatorDelegate), false);
        }

        /// <summary> Send a message to the specified destination.
        /// The MessageCreator callback creates the message given a Session.
        /// </summary>
        /// <param name="destinationName">the name of the destination to send this message to
        /// (to be resolved to an actual destination by a DestinationResolver)
        /// </param>
        /// <param name="messageCreatorDelegate">delegate callback to create a message
        /// </param>
        /// <exception cref="EMSException">If there is any problem accessing the EMS API</exception>
        public void SendWithDelegate(string destinationName, MessageCreatorDelegate messageCreatorDelegate)
        {
            Execute(new SendDestinationCallback(this, destinationName, messageCreatorDelegate), false);
        }

        /// <summary> Send a message to the default destination.
        /// <p>This will only work with a default destination specified!</p>
        /// </summary>
        /// <param name="messageCreator">callback to create a message
        /// </param>
        /// <exception cref="EMSException">If there is any problem accessing the EMS API</exception>
        public void Send(IMessageCreator messageCreator)
        {
            CheckDefaultDestination();
            if (DefaultDestination != null)
            {
                Send(DefaultDestination, messageCreator);
            }
            else
            {
                Send(DefaultDestinationName, messageCreator);
            }
        }

        /// <summary> Send a message to the specified destination.
        /// The MessageCreator callback creates the message given a Session.
        /// </summary>
        /// <param name="destination">the destination to send this message to
        /// </param>
        /// <param name="messageCreator">callback to create a message
        /// </param>
        /// <exception cref="EMSException">If there is any problem accessing the EMS API</exception>
        public void Send(Destination destination, IMessageCreator messageCreator)
        {
            
            Execute(new SendDestinationCallback(this, destination, messageCreator), false);
        }

        /// <summary> Send a message to the specified destination.
        /// The MessageCreator callback creates the message given a Session.
        /// </summary>
        /// <param name="destinationName">the destination to send this message to
        /// </param>
        /// <param name="messageCreator">callback to create a message
        /// </param>
        /// <exception cref="EMSException">If there is any problem accessing the EMS API</exception>
        public void Send(string destinationName, IMessageCreator messageCreator)
        {
            Execute(new SendDestinationCallback(this, destinationName, messageCreator), false);
        }
        /// <summary> Send the given object to the default destination, converting the object
        /// to a EMS message with a configured IMessageConverter.
        /// <p>This will only work with a default destination specified!</p>
        /// </summary>
        /// <param name="message">the object to convert to a message
        /// </param>
        /// <exception cref="EMSException">If there is any problem accessing the EMS API</exception>
        public void ConvertAndSend(object message)
        {
            CheckDefaultDestination();
            if (DefaultDestination != null)
            {
                ConvertAndSend(DefaultDestination, message);
            }
            else
            {
                ConvertAndSend(DefaultDestinationName, message);
            }
        }

        /// <summary> Send the given object to the specified destination, converting the object
        /// to a EMS message with a configured IMessageConverter.
        /// </summary>
        /// <param name="destination">the destination to send this message to
        /// </param>
        /// <param name="message">the object to convert to a message
        /// </param>
        /// <exception cref="EMSException">If there is any problem accessing the EMS API</exception>
        public void ConvertAndSend(Destination destination, object message)
        {
            CheckMessageConverter();
            Send(destination, new SimpleMessageCreator(this, message));
        }

        /// <summary> Send the given object to the specified destination, converting the object
        /// to a EMS message with a configured IMessageConverter.
        /// </summary>
        /// <param name="destinationName">the name of the destination to send this message to
        /// (to be resolved to an actual destination by a DestinationResolver)
        /// </param>
        /// <param name="message">the object to convert to a message
        /// </param>
        /// <exception cref="EMSException">If there is any problem accessing the EMS API</exception>
        public void ConvertAndSend(string destinationName, object message)
        {
            Send(destinationName, new SimpleMessageCreator(this, message));
        }

        /// <summary> Send the given object to the default destination, converting the object
        /// to a EMS message with a configured IMessageConverter. The IMessagePostProcessor
        /// callback allows for modification of the message after conversion.
        /// <p>This will only work with a default destination specified!</p>
        /// </summary>
        /// <param name="message">the object to convert to a message
        /// </param>
        /// <param name="postProcessor">the callback to modify the message
        /// </param>
        /// <exception cref="EMSException">If there is any problem accessing the EMS API</exception>
        public void ConvertAndSend(object message, IMessagePostProcessor postProcessor)
        {
            CheckDefaultDestination();
            if (DefaultDestination != null)
            {
                ConvertAndSend(DefaultDestination, message, postProcessor);
            }
            else
            {
                ConvertAndSend(DefaultDestinationName, message, postProcessor);
            }
        }

        /// <summary> Send the given object to the specified destination, converting the object
        /// to a EMS message with a configured IMessageConverter. The IMessagePostProcessor
        /// callback allows for modification of the message after conversion.
        /// </summary>
        /// <param name="destination">the destination to send this message to
        /// </param>
        /// <param name="message">the object to convert to a message
        /// </param>
        /// <param name="postProcessor">the callback to modify the message
        /// </param>
        /// <exception cref="EMSException">If there is any problem accessing the EMS API</exception>
        public void ConvertAndSend(Destination destination, object message, IMessagePostProcessor postProcessor)
        {
            CheckMessageConverter();
            Send(destination, new ConvertAndSendMessageCreator(this, message, postProcessor));
            
        }

        /// <summary> Send the given object to the specified destination, converting the object
        /// to a EMS message with a configured IMessageConverter. The IMessagePostProcessor
        /// callback allows for modification of the message after conversion.
        /// </summary>
        /// <param name="destinationName">the name of the destination to send this message to
        /// (to be resolved to an actual destination by a DestinationResolver)
        /// </param>
        /// <param name="message">the object to convert to a message.
        /// </param>
        /// <param name="postProcessor">the callback to modify the message
        /// </param>
        /// <exception cref="EMSException">If there is any problem accessing the EMS API</exception>
        public void ConvertAndSend(string destinationName, object message, IMessagePostProcessor postProcessor)
        {
            CheckMessageConverter();
            Send(destinationName, new ConvertAndSendMessageCreator(this, message, postProcessor));  
        }
        /// <summary>
        /// Send the given object to the default destination, converting the object
        /// to a EMS message with a configured IMessageConverter. The IMessagePostProcessor
        /// callback allows for modification of the message after conversion.
        /// <p>This will only work with a default destination specified!</p>
        /// </summary>
        /// <param name="message">the object to convert to a message</param>
        /// <param name="postProcessor">the callback to modify the message</param>
        /// <exception cref="EMSException">If there is any problem accessing the EMS API</exception>
        public void ConvertAndSendWithDelegate(object message, MessagePostProcessorDelegate postProcessor)
        {
            //Execute(new SendDestinationCallback(this, destination, messageCreatorDelegate), false);
            CheckDefaultDestination();
            if (DefaultDestination != null)
            {
                ConvertAndSendWithDelegate(DefaultDestination, message, postProcessor);
            }
            else
            {
                ConvertAndSendWithDelegate(DefaultDestinationName, message, postProcessor);
            }
        }

        /// <summary>
        /// Send the given object to the specified destination, converting the object
        /// to a EMS message with a configured IMessageConverter. The IMessagePostProcessor
        /// callback allows for modification of the message after conversion.
        /// </summary>
        /// <param name="destination">the destination to send this message to</param>
        /// <param name="message">the object to convert to a message</param>
        /// <param name="postProcessor">the callback to modify the message</param>
        /// <exception cref="EMSException">If there is any problem accessing the EMS API</exception>
        public void ConvertAndSendWithDelegate(Destination destination, object message,
                                               MessagePostProcessorDelegate postProcessor)
        {
            CheckMessageConverter();
            Send(destination, new ConvertAndSendMessageCreator(this, message, postProcessor));
        }

        /// <summary>
        /// Send the given object to the specified destination, converting the object
        /// to a EMS message with a configured IMessageConverter. The IMessagePostProcessor
        /// callback allows for modification of the message after conversion.
        /// </summary>
        /// <param name="destinationName">the name of the destination to send this message to
        /// (to be resolved to an actual destination by a DestinationResolver)</param>
        /// <param name="message">the object to convert to a message.</param>
        /// <param name="postProcessor">the callback to modify the message</param>
        /// <exception cref="EMSException">If there is any problem accessing the EMS API</exception>
        public void ConvertAndSendWithDelegate(string destinationName, object message,
                                               MessagePostProcessorDelegate postProcessor)
        {
            CheckMessageConverter();
            Send(destinationName, new ConvertAndSendMessageCreator(this, message, postProcessor));

        }

        /// <summary> Receive a message synchronously from the default destination, but only
        /// wait up to a specified time for delivery.
        /// <p>This method should be used carefully, since it will block the thread
        /// until the message becomes available or until the timeout value is exceeded.</p>
        /// <p>This will only work with a default destination specified!</p>
        /// </summary>
        /// <returns> the message received by the consumer, or <code>null</code> if the timeout expires
        /// </returns>
        /// <exception cref="EMSException">If there is any problem accessing the EMS API</exception>
        public Message Receive()
        {
            CheckDefaultDestination();
            if (DefaultDestination != null)
            {
                return Receive(DefaultDestination);
            }
            else
            {
                return Receive(DefaultDestinationName);
            }
        }

        /// <summary> Receive a message synchronously from the specified destination, but only
        /// wait up to a specified time for delivery.
        /// <p>This method should be used carefully, since it will block the thread
        /// until the message becomes available or until the timeout value is exceeded.</p>
        /// </summary>
        /// <param name="destination">the destination to receive a message from
        /// </param>
        /// <returns> the message received by the consumer, or <code>null</code> if the timeout expires
        /// </returns>
        /// <exception cref="EMSException">If there is any problem accessing the EMS API</exception>
        public Message Receive(Destination destination)
        {
            return Execute(new ReceiveCallback(this, destination)) as Message;
        }


        /// <summary> Receive a message synchronously from the specified destination, but only
        /// wait up to a specified time for delivery.
        /// <p>This method should be used carefully, since it will block the thread
        /// until the message becomes available or until the timeout value is exceeded.</p>
        /// </summary>
        /// <param name="destinationName">the name of the destination to send this message to
        /// (to be resolved to an actual destination by a DestinationResolver)
        /// </param>
        /// <returns> the message received by the consumer, or <code>null</code> if the timeout expires
        /// </returns>
        /// <exception cref="EMSException">If there is any problem accessing the EMS API</exception>
        public Message Receive(string destinationName)
        {
            return Execute(new ReceiveCallback(this, destinationName)) as Message;
        }

        /// <summary> Receive a message synchronously from the default destination, but only
        /// wait up to a specified time for delivery.
        /// <p>This method should be used carefully, since it will block the thread
        /// until the message becomes available or until the timeout value is exceeded.</p>
        /// <p>This will only work with a default destination specified!</p>
        /// </summary>
        /// <param name="messageSelector">the EMS message selector expression (or <code>null</code> if none).
        /// See the EMS specification for a detailed definition of selector expressions.
        /// </param>
        /// <returns> the message received by the consumer, or <code>null</code> if the timeout expires
        /// </returns>
        /// <exception cref="EMSException">If there is any problem accessing the EMS API</exception>
        public Message ReceiveSelected(string messageSelector)
        {
            CheckDefaultDestination();
            if (DefaultDestination!= null)
            {
                return ReceiveSelected(DefaultDestination, messageSelector);
            }
            else
            {
                return ReceiveSelected(DefaultDestinationName, messageSelector);
            }
        }

        /// <summary> Receive a message synchronously from the specified destination, but only
        /// wait up to a specified time for delivery.
        /// <p>This method should be used carefully, since it will block the thread
        /// until the message becomes available or until the timeout value is exceeded.</p>
        /// </summary>
        /// <param name="destination">the destination to receive a message from
        /// </param>
        /// <param name="messageSelector">the EMS message selector expression (or <code>null</code> if none).
        /// See the EMS specification for a detailed definition of selector expressions.
        /// </param>
        /// <returns> the message received by the consumer, or <code>null</code> if the timeout expires
        /// </returns>
        /// <exception cref="EMSException">If there is any problem accessing the EMS API</exception>
        public Message ReceiveSelected(Destination destination, string messageSelector)
        {
            return Execute(new ReceiveSelectedCallback(this, destination, messageSelector), true) as Message;
        }

        /// <summary> Receive a message synchronously from the specified destination, but only
        /// wait up to a specified time for delivery.
        /// <p>This method should be used carefully, since it will block the thread
        /// until the message becomes available or until the timeout value is exceeded.</p>
        /// </summary>
        /// <param name="destinationName">the name of the destination to send this message to
        /// (to be resolved to an actual destination by a DestinationResolver)
        /// </param>
        /// <param name="messageSelector">the EMS message selector expression (or <code>null</code> if none).
        /// See the EMS specification for a detailed definition of selector expressions.
        /// </param>
        /// <returns> the message received by the consumer, or <code>null</code> if the timeout expires
        /// </returns>
        /// <exception cref="EMSException">If there is any problem accessing the EMS API</exception>
        public Message ReceiveSelected(string destinationName, string messageSelector)
        {
            return Execute(new ReceiveSelectedCallback(this, destinationName, messageSelector), true) as Message;
        
        }

        /// <summary>
        /// Receive a message.
        /// </summary>
        /// <param name="session">The session to operate on.</param>
        /// <param name="destination">The destination to receive from.</param>
        /// <param name="messageSelector">The message selector for this consumer (can be <code>null</code></param>
        /// <returns>The Message received, or <code>null</code> if none.</returns>
        protected virtual Message DoReceive(ISession session, Destination destination, string messageSelector)
        {
            return DoReceive(session, CreateConsumer(session, destination, messageSelector));
        }

        /// <summary>
        /// Receive a message.
        /// </summary>
        /// <param name="session">The session to operate on.</param>
        /// <param name="consumer">The consumer to receive with.</param>
        /// <returns>The Message received, or <code>null</code> if none</returns>
        protected virtual Message DoReceive(ISession session, IMessageConsumer consumer)
        {
            try
            {
                long timeout = ReceiveTimeout;
                EmsResourceHolder resourceHolder =
                (EmsResourceHolder)TransactionSynchronizationManager.GetResource(ConnectionFactory);
                if (resourceHolder != null && resourceHolder.HasTimeout)
                {
                    timeout = Convert.ToInt64(resourceHolder.TimeToLiveInMilliseconds);
                }
                Message message = (timeout > 0)
                                      ? consumer.Receive(timeout)
                                      : consumer.Receive();
                if (session.Transacted)
                {
                    // Commit necessary - but avoid commit call is Session transaction is externally coordinated.
                    if (IsSessionLocallyTransacted(session))
                    {
                        // Transacted session created by this template -> commit.
                        EmsUtils.CommitIfNecessary(session);
                    }
                }
                else if (IsClientAcknowledge(session))
                {
                    // Manually acknowledge message, if any.
                    if (message != null)
                    {
                        message.Acknowledge();
                    }
                }
                return message;
            }
            finally
            {
                EmsUtils.CloseMessageConsumer(consumer);
            }
        }


        /// <summary> Receive a message synchronously from the default destination, but only
        /// wait up to a specified time for delivery. Convert the message into an
        /// object with a configured IMessageConverter.
        /// <p>This method should be used carefully, since it will block the thread
        /// until the message becomes available or until the timeout value is exceeded.</p>
        /// <p>This will only work with a default destination specified!</p>
        /// </summary>
        /// <returns> the message produced for the consumer or <code>null</code> if the timeout expires.
        /// </returns>
        /// <exception cref="EMSException">If there is any problem accessing the EMS API</exception>
        public object ReceiveAndConvert()
        {
            CheckMessageConverter();
            return DoConvertFromMessage(Receive());
        }

        /// <summary> Receive a message synchronously from the specified destination, but only
        /// wait up to a specified time for delivery. Convert the message into an
        /// object with a configured IMessageConverter.
        /// <p>This method should be used carefully, since it will block the thread
        /// until the message becomes available or until the timeout value is exceeded.</p>
        /// </summary>
        /// <param name="destination">the destination to receive a message from
        /// </param>
        /// <returns> the message produced for the consumer or <code>null</code> if the timeout expires.
        /// </returns>
        /// <exception cref="EMSException">If there is any problem accessing the EMS API</exception>
        public object ReceiveAndConvert(Destination destination)
        {
            CheckMessageConverter();
            return DoConvertFromMessage(Receive(destination));
        }


        /// <summary> Receive a message synchronously from the specified destination, but only
        /// wait up to a specified time for delivery. Convert the message into an
        /// object with a configured IMessageConverter.
        /// <p>This method should be used carefully, since it will block the thread
        /// until the message becomes available or until the timeout value is exceeded.</p>
        /// </summary>
        /// <param name="destinationName">the name of the destination to send this message to
        /// (to be resolved to an actual destination by a DestinationResolver)
        /// </param>
        /// <returns> the message produced for the consumer or <code>null</code> if the timeout expires.
        /// </returns>
        /// <exception cref="EMSException">If there is any problem accessing the EMS API</exception>
        public object ReceiveAndConvert(string destinationName)
        {
            CheckMessageConverter();
            return DoConvertFromMessage(Receive(destinationName));
        }

        /// <summary> Receive a message synchronously from the default destination, but only
        /// wait up to a specified time for delivery. Convert the message into an
        /// object with a configured IMessageConverter.
        /// <p>This method should be used carefully, since it will block the thread
        /// until the message becomes available or until the timeout value is exceeded.</p>
        /// <p>This will only work with a default destination specified!</p>
        /// </summary>
        /// <param name="messageSelector">the EMS message selector expression (or <code>null</code> if none).
        /// See the EMS specification for a detailed definition of selector expressions.
        /// </param>
        /// <returns> the message produced for the consumer or <code>null</code> if the timeout expires.
        /// </returns>
        /// <exception cref="EMSException">If there is any problem accessing the EMS API</exception>
        public object ReceiveSelectedAndConvert(string messageSelector)
        {
            CheckMessageConverter();
            return DoConvertFromMessage(ReceiveSelected(messageSelector));
        }

        /// <summary> Receive a message synchronously from the specified destination, but only
        /// wait up to a specified time for delivery. Convert the message into an
        /// object with a configured IMessageConverter.
        /// <p>This method should be used carefully, since it will block the thread
        /// until the message becomes available or until the timeout value is exceeded.</p>
        /// </summary>
        /// <param name="destination">the destination to receive a message from
        /// </param>
        /// <param name="messageSelector">the EMS message selector expression (or <code>null</code> if none).
        /// See the EMS specification for a detailed definition of selector expressions.
        /// </param>
        /// <returns> the message produced for the consumer or <code>null</code> if the timeout expires.
        /// </returns>
        /// <exception cref="EMSException">If there is any problem accessing the EMS API</exception>
        public object ReceiveSelectedAndConvert(Destination destination, string messageSelector)
        {
            CheckMessageConverter();
            return DoConvertFromMessage(ReceiveSelected(destination, messageSelector));
        }

        /// <summary> Receive a message synchronously from the specified destination, but only
        /// wait up to a specified time for delivery. Convert the message into an
        /// object with a configured IMessageConverter.
        /// <p>This method should be used carefully, since it will block the thread
        /// until the message becomes available or until the timeout value is exceeded.</p>
        /// </summary>
        /// <param name="destinationName">the name of the destination to send this message to
        /// (to be resolved to an actual destination by a DestinationResolver)
        /// </param>
        /// <param name="messageSelector">the EMS message selector expression (or <code>null</code> if none).
        /// See the EMS specification for a detailed definition of selector expressions.
        /// </param>
        /// <returns> the message produced for the consumer or <code>null</code> if the timeout expires.
        /// </returns>
        /// <exception cref="EMSException">If there is any problem accessing the EMS API</exception>
        public object ReceiveSelectedAndConvert(string destinationName, string messageSelector)
        {
            CheckMessageConverter();
            return DoConvertFromMessage(ReceiveSelected(destinationName, messageSelector));
        }


        /// <summary>
        /// Browses messages in the default EMS queue. The callback gives access to the EMS
        /// Session and QueueBrowser in order to browse the queue and react to the contents.
        /// </summary>
        /// <param name="action">The action callback object that exposes the session/browser pair.</param>
        /// <returns>the result object from working with the session</returns>
        /// <exception cref="EMSException">If there is any problem accessing the EMS API</exception>        
        public object Browse(IBrowserCallback action)
        {
            AssertUtils.ArgumentNotNull(action, "action");
            return BrowseWithDelegate(action.DoInEms);
        }

        /// <summary>
        /// Browses messages in a EMS queue. The callback gives access to the EMS Session
        /// and QueueBrowser in order to browse the queue and react to the contents.
        /// </summary>
        /// <param name="queue">The queue to browse.</param>
        /// <param name="action">The action callback object that exposes the session/browser pair.</param>
        /// <returns>the result object from working with the session</returns>
        /// <exception cref="EMSException">If there is any problem accessing the EMS API</exception>        
        public object Browse(Queue queue, IBrowserCallback action)
        {
            AssertUtils.ArgumentNotNull(action, "action");
            return BrowseSelectedWithDelegate(queue, null, action.DoInEms);
        }

        /// <summary>
        /// Browses messages in a EMS queue. The callback gives access to the EMS Session
        /// and QueueBrowser in order to browse the queue and react to the contents.
        /// </summary>
        /// <param name="queueName">Name of the queue to browse, 
        /// (to be resolved to an actual destination by a DestinationResolver)</param>
        /// <param name="action">The action callback object that exposes the session/browser pair.</param>
        /// <exception cref="EMSException">If there is any problem accessing the EMS API</exception>
        public object Browse(string queueName, IBrowserCallback action)
        {
            AssertUtils.ArgumentNotNull(action, "action");
            return BrowseSelectedWithDelegate(queueName, null, action.DoInEms);
        }

        /// <summary>
        /// Browses messages in a EMS queue. The callback gives access to the EMS Session
        /// and QueueBrowser in order to browse the queue and react to the contents.
        /// </summary>
        /// <param name="messageSelector">The EMS message selector expression (or <code>null</code> if none).</param>
        /// <param name="action">The action callback object that exposes the session/browser pair.</param>
        /// <exception cref="EMSException">If there is any problem accessing the EMS API</exception>
        public object BrowseSelected(string messageSelector, IBrowserCallback action)
        {
            AssertUtils.ArgumentNotNull(action, "action");
            return BrowseSelectedWithDelegate(messageSelector, action.DoInEms);
        }

        /// <summary>
        /// Browses messages in a EMS queue. The callback gives access to the EMS Session
        /// and QueueBrowser in order to browse the queue and react to the contents.
        /// </summary>
        /// <param name="queue">The queue to browse.</param>
        /// <param name="messageSelector">The EMS message selector expression (or <code>null</code> if none).</param>
        /// <param name="action">The action callback object that exposes the session/browser pair.</param>
        /// <returns></returns>
        /// <exception cref="EMSException">If there is any problem accessing the EMS API</exception>
        public object BrowseSelected(Queue queue, string messageSelector, IBrowserCallback action)
        {
            AssertUtils.ArgumentNotNull(action, "action");
            return BrowseSelectedWithDelegate(queue, messageSelector, action.DoInEms);        
        }


        /// <summary>
        /// Browses messages in a EMS queue. The callback gives access to the EMS Session
        /// and QueueBrowser in order to browse the queue and react to the contents.
        /// </summary>
        /// <param name="queueName">Name of the queue to browse, 
        /// (to be resolved to an actual destination by a DestinationResolver)</param>
        /// <param name="messageSelector">The EMS message selector expression (or <code>null</code> if none).</param>
        /// <param name="action">The action callback object that exposes the session/browser pair.</param>
        /// <returns></returns>
        /// <exception cref="EMSException">If there is any problem accessing the EMS API</exception>
        public object BrowseSelected(string queueName, string messageSelector, IBrowserCallback action)
        {
            AssertUtils.ArgumentNotNull(action, "action");
            return BrowseSelectedWithDelegate(queueName, messageSelector, action.DoInEms);
        }



        /// <summary>
        /// Browses messages in the default EMS queue. The callback gives access to the EMS
        /// Session and QueueBrowser in order to browse the queue and react to the contents.
        /// </summary>
        /// <param name="action">The action callback delegate that exposes the session/browser pair.</param>
        /// <returns>the result object from working with the session</returns>
        public object BrowseWithDelegate(BrowserDelegate action)
        {
            if (DefaultDestinationName != null)
            {
                return BrowseSelectedWithDelegate(DefaultDestinationName, action);
            }
            else
            {
                Destination destination = DefaultDestination;
                if (!(destination is Queue))
                {
                    throw new InvalidOperationException("defaultDestination does not correspond to a Queue. Check configuration of EmsTemplate.");
                }
                return BrowseWithDelegate((Queue)destination, action);
            }
        }

        /// <summary>
        /// Browses messages in a EMS queue. The callback gives access to the EMS Session
        /// and QueueBrowser in order to browse the queue and react to the contents.
        /// </summary>
        /// <param name="queue">The queue to browse.</param>
        /// <param name="action">The action callback delegate that exposes the session/browser pair.</param>
        /// <returns>the result object from working with the session</returns>
        public object BrowseWithDelegate(Queue queue, BrowserDelegate action)
        {
            return BrowseSelectedWithDelegate(queue, null, action);
        }

        /// <summary>
        /// Browses messages in a EMS queue. The callback gives access to the EMS Session
        /// and QueueBrowser in order to browse the queue and react to the contents.
        /// </summary>
        /// <param name="queueName">Name of the queue to browse, 
        /// (to be resolved to an actual destination by a DestinationResolver)</param>
        /// <param name="action">The action callback delegate that exposes the session/browser pair.</param>
        /// <exception cref="EMSException">If there is any problem accessing the EMS API</exception>
        public object BrowseWithDelegate(string queueName, BrowserDelegate action)
        {
            return BrowseSelectedWithDelegate(queueName, null, action);
        }

        /// <summary>
        /// Browses messages in a EMS queue. The callback gives access to the EMS Session
        /// and QueueBrowser in order to browse the queue and react to the contents.
        /// </summary>
        /// <param name="messageSelector">The EMS message selector expression (or <code>null</code> if none).</param>
        /// <param name="action">The action callback delegate that exposes the session/browser pair.</param>
        /// <exception cref="EMSException">If there is any problem accessing the EMS API</exception>
        public object BrowseSelectedWithDelegate(string messageSelector, BrowserDelegate action)
        {
            if (DefaultDestinationName != null)
            {
                return BrowseSelectedWithDelegate(DefaultDestinationName, messageSelector, action);
            }
            else
            {
                Destination destination = DefaultDestination;
                if (!(destination is Queue))
                {
                    throw new InvalidOperationException("defaultDestination does not correspond to a Queue. Check configuration of EmsTemplate.");
                }
                return BrowseSelectedWithDelegate((Queue)destination, messageSelector, action);
            }
        }

        /// <summary>
        /// Browses messages in a EMS queue. The callback gives access to the EMS Session
        /// and QueueBrowser in order to browse the queue and react to the contents.
        /// </summary>
        /// <param name="queue">The queue to browse.</param>
        /// <param name="messageSelector">The EMS message selector expression (or <code>null</code> if none).</param>
        /// <param name="action">The action callback delegate that exposes the session/browser pair.</param>
        /// <returns></returns>
        /// <exception cref="EMSException">If there is any problem accessing the EMS API</exception>
        public object BrowseSelectedWithDelegate(Queue queue, string messageSelector, BrowserDelegate action)
        {
            AssertUtils.ArgumentNotNull(action, "action");
            return Execute(delegate(ISession session)
                               {
                                   QueueBrowser browser = CreateBrowser(session, queue, messageSelector);
                                   try
                                   {
                                       return action(session, browser);
                                   }
                                   finally
                                   {
                                       EmsUtils.CloseQueueBrowser(browser);
                                   }
                               }, true); 
        }

        /// <summary>
        /// Browses messages in a EMS queue. The callback gives access to the EMS Session
        /// and QueueBrowser in order to browse the queue and react to the contents.
        /// </summary>
        /// <param name="queueName">Name of the queue to browse, 
        /// (to be resolved to an actual destination by a DestinationResolver)</param>
        /// <param name="messageSelector">The EMS message selector expression (or <code>null</code> if none).</param>
        /// <param name="action">The action callback delegate that exposes the session/browser pair.</param>
        /// <returns></returns>
        /// <exception cref="EMSException">If there is any problem accessing the EMS API</exception>
        public object BrowseSelectedWithDelegate(string queueName, string messageSelector, BrowserDelegate action)
        {
            AssertUtils.ArgumentNotNull(action, "action");
            return Execute(delegate(ISession session)
                               {
                                   Queue queue = (Queue)DestinationResolver.ResolveDestinationName(session, queueName, false);
                                   QueueBrowser browser = CreateBrowser(session, queue, messageSelector);
                                   try
                                   {
                                       return action(session, browser);
                                   }
                                   finally
                                   {
                                       EmsUtils.CloseQueueBrowser(browser);
                                   }
                               }, true);  
        }

        #endregion

        /// <summary>
        /// Creates the queue browser.
        /// </summary>
        /// <param name="session">The session.</param>
        /// <param name="queue">The queue.</param>
        /// <param name="selector">The selector.</param>
        /// <returns>A new queue browser</returns>
        protected virtual QueueBrowser CreateBrowser(ISession session, Queue queue, string selector)
        {
            return session.CreateBrowser(queue, selector);
        }


        #region Supporting Internal Classes

        /// <summary>
        /// ResourceFactory implementation that delegates to this template's callback methods.
        /// </summary>
        private class EmsTemplateResourceFactory : ConnectionFactoryUtils.ResourceFactory
        {
            private EmsTemplate enclosingTemplateInstance;
      
            public EmsTemplateResourceFactory(EmsTemplate enclosingInstance)
            {
                InitBlock(enclosingInstance);
            }

            private void InitBlock(EmsTemplate enclosingInstance)
            {
                enclosingTemplateInstance = enclosingInstance;
            }

            public EmsTemplate EnclosingInstance
            {
                get { return enclosingTemplateInstance; }
            }

            public virtual IConnection GetConnection(EmsResourceHolder holder)
            {
                return EnclosingInstance.GetConnection(holder);
            }

            public virtual ISession GetSession(EmsResourceHolder holder)
            {
                return EnclosingInstance.GetSession(holder);
            }

            public virtual IConnection CreateConnection()
            {
                return EnclosingInstance.CreateConnection();
            }

            public virtual ISession CreateSession(IConnection con)
            {
                return EnclosingInstance.CreateSession(con);
            }

            public bool SynchedLocalTransactionAllowed
            {
                get { return EnclosingInstance.SessionTransacted; }
            }
        }

        private class ProducerCreatorCallback : ISessionCallback
        {
            private EmsTemplate jmsTemplate;
            private IProducerCallback producerCallback;
            private ProducerDelegate producerDelegate;

            public ProducerCreatorCallback(EmsTemplate jmsTemplate, IProducerCallback producerCallback)
            {
                this.jmsTemplate = jmsTemplate;
        this.producerCallback = producerCallback;
            }

            public ProducerCreatorCallback(EmsTemplate jmsTemplate, ProducerDelegate producerDelegate)
            {
                this.jmsTemplate = jmsTemplate;
                this.producerDelegate = producerDelegate;
            }


            public object DoInEms(ISession session)
            {
                IMessageProducer producer = jmsTemplate.CreateProducer(session, null);
                try
                {
                    if (producerCallback != null)
                    {
                        return producerCallback.DoInEms(session, producer);
                    }
                    else
                    {
                        return producerDelegate(session, producer);
                    }
                }
                finally
                {
                    EmsUtils.CloseMessageProducer(producer);
                }

            }
        }

        private class ReceiveCallback : ISessionCallback
        {
            private EmsTemplate jmsTemplate;
            private Destination destination;
            private string destinationName;


            public ReceiveCallback(EmsTemplate jmsTemplate, string destinationName)
            {
                this.jmsTemplate = jmsTemplate;
                this.destinationName = destinationName;
            }

            public ReceiveCallback(EmsTemplate jmsTemplate, Destination destination)
            {
                this.jmsTemplate = jmsTemplate;
                this.destination = destination;
            }

            public object DoInEms(ISession session)
            {
                if (destination != null)
                {
                    return jmsTemplate.DoReceive(session, destination, null);
                }
                else
                {
                    return jmsTemplate.DoReceive(session,
                                                 jmsTemplate.ResolveDestinationName(session, destinationName),
                                                 null);
                }
                
            }
        }

        private class ConvertAndSendMessageCreator : IMessageCreator
        {
            private EmsTemplate jmsTemplate;
            private object objectToConvert;
            private IMessagePostProcessor messagePostProcessor;
            private MessagePostProcessorDelegate messagePostProcessorDelegate;

            public ConvertAndSendMessageCreator(EmsTemplate jmsTemplate, object message, IMessagePostProcessor messagePostProcessor)
            {
                this.jmsTemplate = jmsTemplate;
                objectToConvert = message;
                this.messagePostProcessor = messagePostProcessor;
            }

            public ConvertAndSendMessageCreator(EmsTemplate jmsTemplate, object message, MessagePostProcessorDelegate messagePostProcessorDelegate)
            {
                this.jmsTemplate = jmsTemplate;
                objectToConvert = message;
                this.messagePostProcessorDelegate = messagePostProcessorDelegate;
            }

            public Message CreateMessage(ISession session)
            {
                Message msg = jmsTemplate.MessageConverter.ToMessage(objectToConvert, session);
                if (messagePostProcessor != null)
                {
                    return messagePostProcessor.PostProcessMessage(msg);
                }
                else
                {
                    return messagePostProcessorDelegate(msg);
                }
            }
        }

        private class ReceiveSelectedCallback : ISessionCallback
        {
            private EmsTemplate jmsTemplate;
            private string messageSelector;
            private string destinationName;
            private Destination destination;

            public ReceiveSelectedCallback(EmsTemplate jmsTemplate,
                               Destination destination,
                               string messageSelector)
            {
                this.jmsTemplate = jmsTemplate;
                this.destination = destination;
                this.messageSelector = messageSelector;
            }
            public ReceiveSelectedCallback(EmsTemplate jmsTemplate,
                                           string destinationName,
                                           string messageSelector)
            {
                this.jmsTemplate = jmsTemplate;
                this.destinationName = destinationName;
                this.messageSelector = messageSelector;
            }

            public object DoInEms(ISession session)
            {
                if (destination != null)
                {
                    return jmsTemplate.DoReceive(session, destination, messageSelector);
                }
                else
                {
                    return jmsTemplate.DoReceive(session,
                                                 jmsTemplate.ResolveDestinationName(session, destinationName),
                                                 messageSelector);
                }

            }

        }

        private class ExecuteSessionCallbackUsingDelegate : ISessionCallback
        {
            private SessionDelegate del;
            public ExecuteSessionCallbackUsingDelegate(SessionDelegate del)
            {
                this.del = del;
            }

            public object DoInEms(ISession session)
            {
                return del(session);
            }
        }
        
        #endregion
    }

    /// <summary>
    /// This is a TIBCO specific class so that we can reuse connections, session, and
    /// message producers instead of creating/destroying them on each operation.
    /// </summary>
/*    internal class EmsResources
    {
        private IConnection connection;
        private ISession session;

        private IDictionary cachedProducers = new Hashtable();
        private IMessageProducer cachedUnspecifiedDestinationMessageProducer;

        public IConnection Connection
        {
            get { return connection; }
            set { connection = value; }
        }

        public ISession Session
        {
            get { return session; }
            set { session = value; }
        }

        public IMessageProducer UnspecifiedDestinationMessageProducer
        {
            get { return cachedUnspecifiedDestinationMessageProducer; }
            set { cachedUnspecifiedDestinationMessageProducer = value; }
        }


        public IDictionary Producers
        {
            get { return cachedProducers; }
            set { cachedProducers = value; }
        }
    }*/


    internal class SimpleMessageCreator : IMessageCreator
    {
        private EmsTemplate jmsTemplate;
        private object objectToConvert;
        
        public SimpleMessageCreator(EmsTemplate jmsTemplate, object objectToConvert)
        {
            this.jmsTemplate = jmsTemplate;
            this.objectToConvert = objectToConvert;
        }

        public Message CreateMessage(ISession session)
        {
            return jmsTemplate.MessageConverter.ToMessage(objectToConvert, session);
        }


    }



    internal class SendDestinationCallback : ISessionCallback
    {
        private string destinationName;
        private Destination destination;
        private EmsTemplate jmsTemplate;
        private IMessageCreator messageCreator;
        private MessageCreatorDelegate messageCreatorDelegate;

        public SendDestinationCallback(EmsTemplate jmsTemplate, string destinationName, IMessageCreator messageCreator)
        {
            this.jmsTemplate = jmsTemplate;
            this.destinationName = destinationName;
            this.messageCreator = messageCreator;
        }

        public SendDestinationCallback(EmsTemplate jmsTemplate, Destination destination, IMessageCreator messageCreator)
        {
            this.jmsTemplate = jmsTemplate;
            this.destination = destination;
            this.messageCreator = messageCreator;
        }

        public SendDestinationCallback(EmsTemplate jmsTemplate, string destinationName, MessageCreatorDelegate messageCreatorDelegate)
        {
            this.jmsTemplate = jmsTemplate;
            this.destinationName = destinationName;
            this.messageCreatorDelegate = messageCreatorDelegate;
        }

        public SendDestinationCallback(EmsTemplate jmsTemplate, Destination destination, MessageCreatorDelegate messageCreatorDelegate)
        {
            this.jmsTemplate = jmsTemplate;
            this.destination = destination;
            this.messageCreatorDelegate = messageCreatorDelegate;
        }


        public object DoInEms(ISession session)
        {
            if (destination == null)
            {
                destination = jmsTemplate.ResolveDestinationName(session, destinationName);
            }
            if (messageCreator != null)
            {
                jmsTemplate.DoSend(session, destination, messageCreator);
            }
            else
            {
                jmsTemplate.DoSend(session, destination, messageCreatorDelegate);
            }
            return null;
        }
    }
}
www.java2v.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.