using System;
using grof.protocols;
using grof.util;
using grof;
using System.Threading;
using System.Collections;
using System.Diagnostics;
namespace grof.protocols.membership{
/// <summary>
/// The <c>Sender</c> object is responsible for sending
/// all message (which are queued in the outgoing queue
/// of the <c>UdpMembershipProtocol</c> instance) to
/// all members of the same group.
/// </summary>
class Sender
{
/// <summary>
/// Indicates whether <c>Sender</c> object was
/// stopped sending messages.
/// </summary>
private bool stopped = false;
/// <summary>
/// The <c>Sender</c> object works in its own
/// thread.
/// </summary>
private Thread senderThread;
/// <summary>
/// The TCP sender objects for sending messages
/// via the TCP protocol.
/// </summary>
private TcpSender tcpSender;
/// <summary>
/// The multicast sender object for sending
/// messages via the UDP protocol (JOIN and
/// LEAVE messages)
/// </summary>
private MulticastSender multicastSender;
/// <summary>
/// The incoming queue which holds all received
/// messages.
/// </summary>
private IBlockingQueue<Message> incomingQueue;
/// <summary>
/// The outgoing queue caching all messages
/// which ought to be sent.
/// </summary>
private IBlockingQueue<Message> outgoingQueue;
/// <summary>
/// The <c>GroupMemberInfoMap</c> object which stores
/// all relevant data of all group members of the same
/// group.
/// </summary>
private GroupMemberInfoMap gmInfoMap;
/// <summary>
/// Semaphore used for synchronizing.
/// </summary>
private object semaphore;
/// <summary>
/// The <c>ProtocolInitializer</c> class
/// containing member relevant information.
/// </summary>
private ProtocolInitializer protInit;
/// <summary>
/// Creates instances of class <c>Sender</c>.
/// </summary>
/// <param name="protInit">The initializer object.</param>
/// <param name="gmInfoMap">The map for storing group member data.</param>
/// <param name="incomingQueue">The incoming queue holding all received
/// messages.</param>
/// <param name="outgoingQueue">The outgoing queue caching all messages which
/// are sent.</param>
public Sender( ProtocolInitializer protInit,
GroupMemberInfoMap gmInfoMap,
IBlockingQueue<Message> incomingQueue,
IBlockingQueue<Message> outgoingQueue )
{
this.semaphore = new object();
this.tcpSender = new TcpSender( protInit.MemberName );
this.multicastSender = new MulticastSender( protInit.MemberName, protInit.GroupAddress,
protInit.GroupPort );
this.senderThread = new Thread( new ThreadStart( this.Run ) );
this.incomingQueue = incomingQueue;
this.outgoingQueue = outgoingQueue;
this.gmInfoMap = gmInfoMap;
this.protInit = protInit;
Debug.WriteLine( "[Sender#constructor] " + protInit.MemberName + ": Sender object created." );
}
/// <summary>
/// Starts a thread which takes messages
/// from the outgoing queue and sends
/// them to the group members.
/// </summary>
public void Start()
{
this.senderThread.Start();
Debug.WriteLine( "[Sender#Start] " + protInit.MemberName + ": Sender started." );
}
/// <summary>
/// This method blocks until
/// all elements of the outgoing queue
/// were sent. Then the sender thread
/// is stopped.
/// </summary>
public void Stop()
{
Monitor.Enter( this.semaphore );
while ( !this.stopped )
{
Monitor.Wait( this.semaphore );
}
Monitor.Exit( this.semaphore );
Debug.WriteLine( "[Sender#Stop] " + protInit.MemberName + ": Sender stopped." );
}
/// <summary>
/// This method run in its own thread. It retrieves
/// messages from the outgoing queue and sends them
/// via TCP or UDP.
/// </summary>
public void Run()
{
while( true )
{
Message msg = null;
try
{
Debug.WriteLine( "[Sender#run] " + protInit.MemberName + ": Taking message from outgoing queue..." );
msg = this.outgoingQueue.Take();
Debug.WriteLine( "[Sender#Run] " + protInit.MemberName + ": Message taken from outgoing queue, msg: " + msg.ToString() );
} catch( BlockingQueueStoppedException e )
{
Debug.WriteLine( "[Sender#Run] " + protInit.MemberName + ": Outgoing queue was stopped, break..." );
break;
}
if ( ( ( Message.MessageType ) msg.GetType() ) == Message.MessageType.JOIN ||
( ( Message.MessageType ) msg.GetType() ) == Message.MessageType.LEAVE )
{
Debug.WriteLine( "[Sender#Run] " + protInit.MemberName + ": Message is of type JOIN/LEAVE, multicast..." );
// send a JOIN/LEAVE message to all group members
this.multicastSender.Send( msg );
} else
{
Debug.WriteLine( "[Sender#Run] " + protInit.MemberName + ": Message is NOT of type JOIN/LEAVE, send via TCP..." );
// for all other message types send the message
// to the group members via TCP
// As long as the message is sent to all
// group members, the group member info map
// must not be changed. Therefore the access
// has to be synchronized
lock( this.gmInfoMap )
{
this.tcpSender.Send( msg, this.gmInfoMap.GetGroupMemberInfos() );
}
}
}
this.multicastSender.Stop();
this.finished();
}
/// <summary>
/// This method is called when sender thread
/// was stopped.
/// </summary>
private void finished()
{
Monitor.Enter( this.semaphore );
this.stopped = true;
Monitor.PulseAll( this.semaphore );
Monitor.Exit( this.semaphore );
}
}
}
|