using System;
using grof.protocols;
using grof.protocols.membership;
using System.Threading;
using System.Diagnostics;
using grof.util;
namespace grof{
/// <summary>
/// Declaration of the delegate for delegating the
/// received messages.
/// </summary>
/// <param name="msg">The received message.</param>
public delegate void MessageReceivedDelegate( Message msg );
/// <summary>
/// The <c>GroupMember</c> class provides the
/// functionality to join or leave groups and listen
/// for incoming <c>Messages</code>.
/// The group member is only initialized by an
/// <c>Initializer</c> object which holds following
/// infos:
/// <para>
/// <list type="bullet" >
/// <item>Group address: each group is defined by an
/// IP address.</item>
/// <item>Group port: the port number of the group.</item>
/// <item>Listener address: the IP address the group
/// member listens for incoming messages.</item>
/// <item>Listener port: the port number the group
/// member listens for incoming messages.</item>
/// <item>Group name: the name of the group the
/// group member belongs to.</item>
/// <item>Member name: an unique name of the group member.</item>
/// </list>
/// </para>
/// </summary>
public class GroupMember
{
/// <summary>
/// The initializer holds all relevant data for
/// initializing the group member.
/// </summary>
private Initializer init;
/// <summary>
/// Currently the group member only works with
/// one protocol, the <c>UdpMembershipProtocol</c>
/// for membership management, sending and receiving
/// messages.
/// </summary>
//private AbstractProtocol prot;
/// <summary>
/// The helper class for creating messages.
/// </summary>
private MessageCreator msgCreator;
/// <summary>
/// This thread retrieves the messages from the
/// message queue and delegates them to the
/// message listener.
/// </summary>
private Thread receiverThread;
/// <summary>
/// Indicates that the group member left
/// the group and stops receiving messages.
/// </summary>
private bool stopped = false;
/// <summary>
/// Indicates that stopping the
/// receiver thread was finished.
/// </summary>
private bool stoppingReceiverThreadFinished = false;
/// <summary>
/// A delegate object to which the received messages
/// are delegated.
/// </summary>
private MessageReceivedDelegate messageDelegate;
/// <summary>
/// The group member information.
/// </summary>
private GroupMemberInfo gmInfo;
/// <summary>
/// The protocol stack.
/// </summary>
private AbstractProtocol[] protocolStack;
/// <summary>
/// Creates instances of class <c>GroupMember</c>
/// </summary>
/// <param name="init">The initializer object.</param>
/// <returns></returns>
public GroupMember( Initializer initializer )
{
this.protocolStack = new AbstractProtocol[1];
this.protocolStack[0] = new UdpMembershipProtocol( this.Initialize( initializer ) );
this.ConnectProtocols();
}
/// <summary>
/// Creates instances of class <c>GroupMember</c>.
/// </summary>
///
/// <param name="init">
/// The initializer object.
/// </param>
///
/// <param name="protocolStack">
/// The protocol stack defined by the user.
/// </param>
///
public GroupMember( Initializer initializer, AbstractProtocol[] protocolStack )
{
ProtocolInitializer protInit = this.Initialize( initializer );
this.protocolStack = protocolStack;
this.ConnectProtocols();
}
/// <summary>
/// Sets for each protocol of the protocol stack
/// the upper and lower protocols.
/// </summary>
private void ConnectProtocols()
{
for( int i=0; i<this.protocolStack.Length; i++ )
{
if ( i == 0 )
{
if ( this.protocolStack.Length > 1 )
{
Debug.WriteLine( "[GroupMember#JoinGroup] Set the lower protocol of the uppest protocol, index:" + i );
this.protocolStack[0].LowerProtocol = this.protocolStack[1];
} else
{
// do nothing
}
} else
{
Debug.WriteLine( "[GroupMember#JoinGroup] Set the upper protocol of protocol with index:" + i );
this.protocolStack[i].UpperProtocol = this.protocolStack[i-1];
if ( (i+1) < this.protocolStack.Length )
{
Debug.WriteLine( "[GroupMember#JoinGroup] Set the lower protocol of protocol with index:" + i );
this.protocolStack[i].LowerProtocol = this.protocolStack[i+1];
} else
{
// do nothing
}
}
}
}
/// <summary>
/// Helper method for initializing.
/// </summary>
///
/// <param name="init">
/// The initializer object.
/// </param>
///
/// <returns>
/// A protocol initializer object.
/// </returns>
private ProtocolInitializer Initialize( Initializer init )
{
this.init = init;
ProtocolInitializer protInit = new ProtocolInitializer();
protInit.GroupAddress = init.GroupAddress;
protInit.GroupName = init.GroupName;
protInit.GroupPort = init.GroupPort;
protInit.ListenerAddress = init.ListenerAddress;
protInit.ListenerPort = init.ListenerPort;
protInit.MemberName = init.MemberName;
this.gmInfo = new GroupMemberInfo( protInit.MemberName,
protInit.ListenerAddress,
protInit.ListenerPort );
this.msgCreator = new MessageCreator( protInit.MemberName,
protInit.ListenerAddress,
protInit.ListenerPort );
protInit.MessageCreator = this.msgCreator;
this.receiverThread = new Thread( new ThreadStart( this.Receive ) );
return protInit;
}
/// <summary>
/// Joins a group. Group name and group member name need
/// not be specified here because they were set in the
/// constructor.
/// </summary>
public void JoinGroup()
{
Debug.WriteLine( "[GroupMember#JoinGroup] Joining group..." );
this.stopped = false;
this.receiverThread.Start();
foreach( AbstractProtocol protocol in this.protocolStack )
{
Debug.WriteLine( "[GroupMember#JoinGroup] Starting protocol..." );
protocol.Start();
}
Debug.WriteLine( "[GroupMember#JoinGroup] Joining group - FINISHED" );
}
/// <summary>
/// Leaves the group.
/// </summary>
public void LeaveGroup()
{
Debug.WriteLine( "[GroupMember#LeaveGroup] Leaving group..." );
this.stopped = true;
for( int i= this.protocolStack.Length-1; i>=0; i-- )
{
Debug.WriteLine( "[GroupMember#JoinGroup] Stopping protocol..." );
this.protocolStack[i].Stop();
}
this.waitForStoppingFinished();
Debug.WriteLine( "[GroupMember#LeaveGroup] Leaving group - FINISHED" );
}
/// <summary>
/// Sends an object to all group members of the
/// group.
/// <note>The passed objects must be serializable!</note>
/// </summary>
/// <param name="obj">The object which shall be sent.</param>
public void Send( object obj )
{
Message msg = this.msgCreator.CreateApplicationMessage( obj );
this.protocolStack[0].PutOutgoing( msg );
}
/// <summary>
/// Sets the <c>MessageReceivedDelegate</c> delegate for delegating
/// received messages.
/// </summary>
/// <param name="messageDelegate">Delegate for received messages.</param>
public void SetMessageReceiver( MessageReceivedDelegate messageDelegate )
{
this.messageDelegate = messageDelegate;
}
/// <summary>
/// Returns an array which contains group member infos
/// of all group members.
/// </summary>
/// <returns>An array containing the <c>GroupMemberInfo</c> objects
/// of all group members.</returns>
public GroupMemberInfo[] GetGroupMembers()
{
return this.protocolStack[0].GetGroupMembers();
}
/// <summary>
/// Returns a container containing member
/// relevant information, like member name,
/// group name etc.
/// </summary>
/// <returns>The group member information.</returns>
public GroupMemberInfo GetGroupMemberInfo()
{
return this.gmInfo;
}
/// <summary>
/// Retrieves messages from the message queue
/// of the UDP membership protocol.
/// </summary>
protected void Receive()
{
while( !stopped )
{
try
{
Debug.WriteLine( "[GroupMember#Receive] Taking message from incoming queue of UDP membership protocol..." );
Message msg = this.protocolStack[0].TakeIncoming();
Debug.WriteLine( "[GroupMember#Receive] Message '" + msg.ToString() + "' received." );
this.messageDelegate(msg);
} catch( BlockingQueueStoppedException e )
{
this.stopped = true;
Debug.WriteLine( "[GroupMember#Receive] Incoming queue of UDP protocol was stopped -> stop receiver thread." );
}
}
this.stoppingFinished();
}
/// <summary>
/// Indicates that stopping the receiver thread
/// was finished.
/// </summary>
private void stoppingFinished()
{
Monitor.Enter( this );
this.stoppingReceiverThreadFinished = true;
Monitor.Pulse( this );
Monitor.Exit( this );
}
/// <summary>
/// Waits till the receiver thread
/// was stopped.
/// </summary>
private void waitForStoppingFinished()
{
Monitor.Enter( this );
while ( !this.stoppingReceiverThreadFinished )
{
Monitor.Wait( this );
}
Monitor.Exit( this );
}
}
}
|