using System;
using grof.protocols;
using grof.util;
using grof;
using System.Threading;
using System.Collections.Generic;
using System.Diagnostics;
// TODO: unit tests for protocol stack
namespace grof.protocols.membership{
/// <summary>
/// The <code>UdpMembershipProtocol</code> manages the
/// membership of a group member.
///
/// </summary>
public class UdpMembershipProtocol : AbstractProtocol
{
/// <summary>
/// The <code>Sender</code> object is responsible
/// for sending the queued messages to other
/// group members
/// </summary>
private Sender sender;
/// <summary>
/// The <code>Receiver</code> object is responsible
/// for receiving messages from other group members,
/// processing them and delegating to higher
/// protocol instances.
/// </summary>
private Receiver receiver;
/// <summary>
/// The <code>GroupMemberInfoMap</code> contains
/// all members of a group.
/// </summary>
protected GroupMemberInfoMap gmInfoMap;
/// <summary>
/// The <code>MessageCreator</code> class simplifies
/// the creation of <code>Message</code> objects.
/// </summary>
private MessageCreator msgCreator;
/// <summary>
/// Creates instances of class
/// <code>UdpMembershipProtocol</code>.
/// </summary>
///
/// <param name="name">
/// The name of the protocol.
/// </param>
/// <param name="protInit">
/// The <code>ProtocolInitializer</code>
/// object which contains all relevant data for initializing
/// the membership protocol
/// </param>
///
/// <returns></returns>
public UdpMembershipProtocol( ProtocolInitializer protInit ) : base( "UDP-membership-protocol" )
{
this.gmInfoMap = new GroupMemberInfoMap();
this.sender = new Sender( protInit, gmInfoMap,
this.incomingQueue, this.outgoingQueue);
this.receiver = new Receiver( protInit,
this.incomingQueue,
this.outgoingQueue,
this.gmInfoMap );
this.msgCreator = protInit.MessageCreator;
this.name = protInit.ProtocolName;
Debug.WriteLine( "[UdpMembershipProtocol#constructor] [" + this.name + "] UDP membership protocol created." );
}
public override GroupMemberInfo[] GetGroupMembers()
{
// The gmInfoMap has to be locked in order to avoid
// parallel access to the map.
lock( this.gmInfoMap )
{
ICollection<GroupMemberInfo> gmInfos = this.gmInfoMap.GetGroupMemberInfos();
int len = gmInfos.Count;
GroupMemberInfo[] gmInfoArray = new GroupMemberInfo[len];
int counter = 0;
foreach ( GroupMemberInfo gmInfo in gmInfos )
{
gmInfoArray[counter] = gmInfo;
counter++;
}
return gmInfoArray;
}
}
/// <summary>
/// Starts the queue for work. The sender object
/// is started for taking the <code>Message</code>
/// objects from the outgoing queue and sending them
/// to other group members.
/// The receiver object is started for receiving messages
/// from other group members and putting them into the
/// incoming queue.
/// </summary>
public override void Start()
{
base.Start();
this.sender.Start();
this.receiver.Start();
this.outgoingQueue.Put( this.msgCreator.CreateJoinMessage( null ) );
Debug.WriteLine( "[UdpMembershipProtocol#Start] [" + this.name + "] Join message put into outgoing queue." );
Debug.WriteLine( "[UdpMembershipProtocol#Start] [" + this.name + "] UDP membership protocol started." );
}
public override void Stop()
{
base.Stop();
Debug.WriteLine( "[UdpMembershipProtocol#Start] [" + this.name + "] Stopping UDP membership protocol..." );
this.outgoingQueue.Put( this.msgCreator.CreateLeaveMessage( null ) );
// stopping the receiver, so
// no further messages are received
this.receiver.Stop();
// stopping the queues
// no further elements can be inserted
// into queue. The remaining elements int the
// queues are executed.
this.outgoingQueue.Stop();
this.incomingQueue.Stop();
// stopping the sender which only stops
// when all remaining messages were sent.
this.sender.Stop();
// Do not do the following, could lead
// to NULL reference error
//this.sender = null;
//this.receiver = null;
//this.incomingQueue = null;
//this.outgoingQueue = null;
Debug.WriteLine( "[UdpMembershipProtocol#Stop] [" + this.name + "] UDP membership protocol stopped." );
}
/// <summary>
/// Is called by <c>WorkerThread</c> for
/// processing incoming message.
/// </summary>
/// <param name="msg">The incoming message.</param>
/// <returns>True, if incoming message must be put into internal
/// incoming queue.
/// </returns>
public override bool ProcessIncomingMessage( Message msg )
{
Debug.WriteLine( "[UdpMembershipProtocol#ProcessIncomingMessage] [" + this.name + "] Called, message: " + msg.ToString() );
return true;
}
/// <summary>
/// Is called by <c>WorkerThread</c> for
/// processing outgoing message.
/// </summary>
/// <param name="msg">The outgoing message.</param>
/// <returns>True, if outgoing message must be put into internal
/// outgoing queue.
/// </returns>
public override bool ProcessOutgoingMessage( Message msg )
{
Debug.WriteLine( "[UdpMembershipProtocol#ProcessIncomingMessage] [" + this.name + "] Called, message: " + msg.ToString() );
return true;
}
}
}
|