using System;
using grof.protocols;
using grof;
using grof.util;
using System.Diagnostics;
namespace grof.protocols{
/// <summary>
/// Declaration of the delegate for retrieved messages.
/// </summary>
/// <param name="msg">The retrieved message.</param>
public delegate bool ProcessMessageDelegate( Message msg );
/// <summary>
/// This class represents the base class for all
/// protocol implementations. A <c>IProtocol</c>
/// implementation processes the incoming and
/// outgoing messages in a special way. The
/// <c>UdpMembershipProtocol</c> for example is responsible
/// for recognizing new group members and registering the
/// local group member at the group. If the protocol
/// has done its job it delegates the processed message to
/// the next lower (outgoing messages) or upper (incoming
/// messages) protocol.
/// <para>
/// Each protocol instance manages two queues, one for caching
/// the incoming messages and the other one for caching the
/// outgoing messages.
/// </para>
/// </summary>
public abstract class AbstractProtocol : IProtocol
{
/// <summary>
/// The queue for holding the incoming messages.
/// </summary>
protected IBlockingQueue<Message> incomingQueue;
/// <summary>
/// The queue for holding the outgoing messages.
/// </summary>
protected IBlockingQueue<Message> outgoingQueue;
/// <summary>
/// Each protocol can dispose of a lower protocol
/// to which outgoing messages are delegated.
/// </summary>
protected AbstractProtocol lowerProtocol;
/// <summary>
/// Each protocol can dispose of a upper protocol
/// to which incoming messages are delegated.
/// </summary>
protected AbstractProtocol upperProtocol;
/// <summary>
/// Worker thread which forwards messages from the
/// incoming queue of the lower protocol to the
/// internal incoming queue.
/// </summary>
protected WorkerThread incomingWorker;
/// <summary>
/// Worker thread which forwards messages from the
/// outgoing queue of the upper protocol to the
/// internal outgoing queue.
/// </summary>
protected WorkerThread outgoingWorker;
/// <summary>
/// The name of the protocol.
/// </summary>
protected string name;
/// <summary>
/// Default constructor of class <c>AbstractProtocol</c>
/// </summary>
/// <param name="name">
/// The name of the protocol.
/// </param>
public AbstractProtocol( string name )
{
this.name = name;
this.incomingQueue = new BlockingQueueImpl<Message>( this.name );
this.outgoingQueue = new BlockingQueueImpl<Message>( this.name );
}
/// <summary>
/// Puts a message into the outgoing queue.
/// </summary>
/// <param name="msg">The message which is put into
/// the outgoing queue.</param>
public virtual void PutOutgoing( Message msg )
{
Debug.WriteLine( "[AbstractProtocol#PutOutgoing] [" + this.name + "] Putting message into outgoing queue..." );
this.outgoingQueue.Put( msg );
Debug.WriteLine( "[AbstractProtocol#PutOutgoing] [" + this.name + "] Message put into outgoing queue." );
}
/// <summary>
/// Removes a <code>Message</code> object from
/// the incoming queue.
/// </summary>
/// <returns>The <code>Message</code> object
/// which is taken from the incoming queue.</returns>
public virtual Message TakeIncoming()
{
Debug.WriteLine( "[AbstractProtocol#TakeIncoming] [" + this.name + "] Taking message from incoming queue..." );
return this.incomingQueue.Take();
Debug.WriteLine( "[AbstractProtocol#TakeIncoming] [" + this.name + "] Message taken from incoming queue." );
}
/// <summary>
/// Starts working of the protocol instance.
/// </summary>
public virtual void Start()
{
if ( this.lowerProtocol != null )
{
this.incomingWorker = new WorkerThread( this.name, this.lowerProtocol.IncomingQueue,
this.incomingQueue, this.ProcessIncomingMessage );
Debug.WriteLine( "[AbstractProtocol#Start] [" + this.name + "] Lower protocol exists, create worker thread for incoming messages." );
this.incomingWorker.Start();
}
if ( this.upperProtocol != null )
{
this.outgoingWorker = new WorkerThread( this.name, this.upperProtocol.OutgoingQueue,
this.outgoingQueue, this.ProcessOutgoingMessage );
Debug.WriteLine( "[AbstractProtocol#Start] [" + this.name + "] Upper protocol exists, create worker thread for outgoing messages." );
this.outgoingWorker.Start();
}
}
/// <summary>
/// Returns the name of the protocol.
/// </summary>
public string Name
{
get
{
return this.Name;
}
}
/// <summary>
/// Is called by <c>WorkerThread</c> for
/// processing incoming message.
/// </summary>
/// <param name="msg">The incoming message.</param>
/// <returns>True, if incoming message must be put into internal
/// incoming queue.
/// </returns>
public abstract bool ProcessIncomingMessage( Message msg );
/// <summary>
/// Is called by <c>WorkerThread</c> for
/// processing outgoing message.
/// </summary>
/// <param name="msg">The outgoing message.</param>
/// <returns>True, if outgoing message must be put into internal
/// outgoing queue.
/// </returns>
public abstract bool ProcessOutgoingMessage( Message msg );
/// <summary>
/// Stops working of the protocol instance.
/// </summary>
public virtual void Stop()
{
// The incoming worker takes messages of the incoming queue of the lower protocol.
// Therefore the incoming queue of the lower protocol has to be stopped.
if ( this.lowerProtocol != null )
{
Debug.WriteLine( "[AbstractProtocol#Stop] [" + this.name + "] Stopping incoming queue of lower protocol." );
this.lowerProtocol.IncomingQueue.Stop();
}
// The outgoing worker takes messages of the outgoing queue of the upper protocol.
// Therefore the outgoing queue of the upper protocol has to be stopped.
if ( this.upperProtocol != null )
{
Debug.WriteLine( "[AbstractProtocol#Stop] [" + this.name + "] Stopping outgoing queue of upper protocol." );
this.upperProtocol.OutgoingQueue.Stop();
}
//this.IncomingQueue.Stop();
//this.OutgoingQueue.Stop();
if ( this.incomingWorker != null )
{
Debug.WriteLine( "[AbstractProtocol#Stop] [" + this.name + "] Stop worker thread for incoming messages." );
this.incomingWorker.Stop();
}
if ( this.outgoingWorker != null )
{
Debug.WriteLine( "[AbstractProtocol#Stop] [" + this.name + "] Stop worker thread for outgoing messages." );
this.outgoingWorker.Stop();
}
}
/// <summary>
/// Returns all group members this group member
/// belongs to.
/// </summary>
///
/// <returns>
/// An array of all group members.
/// </returns>
public abstract GroupMemberInfo[] GetGroupMembers();
/// <summary>
/// Reference to the lower protocol
/// within the protocol stack for delegating
/// outgoing messages.
/// </summary>
public AbstractProtocol LowerProtocol
{
set
{
this.lowerProtocol = value;
}
}
/// <summary>
/// Reference to the upper protocol
/// within the protocol stack for delegating
/// outgoing messages.
/// </summary>
public AbstractProtocol UpperProtocol
{
set
{
this.upperProtocol = value;
}
}
/// <summary>
/// Reference to the incoming queue
/// </summary>
public IBlockingQueue<Message> IncomingQueue
{
get
{
return this.incomingQueue;
}
}
/// <summary>
/// Reference to the outgoing queue
/// </summary>
public IBlockingQueue<Message> OutgoingQueue
{
get
{
return this.outgoingQueue;
}
}
}
}
|