using System;
using grof.protocols;
using grof.util;
using grof;
using System.Threading;
using System.Collections;
using System.Diagnostics;
namespace grof.protocols.membership{
/// <summary>
/// The <c>Receiver</c> class is responsible for
/// receiving messages via TCP and UDP and putting the
/// messages into the incoming queue of the
/// <c>UdpMembershipProtocol</c> class.
/// </summary>
class Receiver
{
/// <summary>
/// The multicast receiver for receiving
/// multicast messages via UDP.
/// </summary>
private MulticastReceiver multicastReceiver;
/// <summary>
/// The receiver object for receiving TCP
/// messages.
/// </summary>
private TcpReceiver tcpReceiver;
/// <summary>
/// The incoming queue where all received
/// messages are queued.
/// </summary>
private IBlockingQueue<Message> incomingQueue;
/// <summary>
/// The outgoing queue where all messages which
/// are sent are queued.
/// </summary>
private IBlockingQueue<Message> outgoingQueue;
/// <summary>
/// The <c>GroupMemberInfoMap</c> object for
/// storing relevant data of all group members.
/// </summary>
private GroupMemberInfoMap gmInfoMap;
/// <summary>
/// The message creator which makes creating
/// messages easier.
/// </summary>
private MessageCreator msgCreator;
/// <summary>
/// The <c>ProtocolInitializer</c> class
/// containing member relevant information.
/// </summary>
private ProtocolInitializer protInit;
/// <summary>
/// Creates instances of class <c>Receiver</c>.
/// </summary>
/// <param name="protInit">The initializer object.</param>
/// <param name="incomingQueue">The incoming queue storing all
/// received messages.</param>
/// <param name="outgoingQueue">The outgoing queue storing all
/// messages which are sent.</param>
/// <param name="gmInfoMap">The <c>GroupMemberInfoMap</c> object
/// which caches <c>GroupMemberInfo</c> objects.</param>
public Receiver( ProtocolInitializer protInit,
IBlockingQueue<Message> incomingQueue,
IBlockingQueue<Message> outgoingQueue,
GroupMemberInfoMap gmInfoMap )
{
this.msgCreator = protInit.MessageCreator;
this.gmInfoMap = gmInfoMap;
multicastReceiver = new MulticastReceiver( protInit.GroupAddress,
protInit.GroupPort, protInit.MemberName,
MessageReceived );
tcpReceiver = new TcpReceiver( protInit.ListenerAddress,
protInit.ListenerPort, protInit.MemberName, MessageReceived );
this.incomingQueue = incomingQueue;
this.outgoingQueue = outgoingQueue;
this.protInit = protInit;
Debug.WriteLine( "[Receiver#constructor] " + protInit.MemberName + ": Receiver object created." );
}
/// <summary>
/// Starts the multicast and TCP receiver
/// object for receiving messages from other
/// group members.
/// </summary>
public void Start()
{
this.multicastReceiver.Start();
this.tcpReceiver.Start();
Debug.WriteLine( "[Receiver#Start] " + protInit.MemberName + ": Receiver started." );
}
/// <summary>
/// Stops the multicast receiver and TCP receiver
/// objects. No further messages can be received.
/// </summary>
public void Stop()
{
Debug.WriteLine( "[Receiver#Stop] " + protInit.MemberName + ": Stopping receiver..." );
this.multicastReceiver.Stop();
this.tcpReceiver.Stop();
Debug.WriteLine( "[Receiver#Stop] " + protInit.MemberName + ": Receiver stopped." );
}
/// <summary>
/// This method is called by the <c>MulticastReceiver</c>
/// or <c>TcpReceiver</c> object when a message was
/// received.
/// </summary>
/// <param name="msg">The received message.</param>
public void MessageReceived( Message msg )
{
Debug.WriteLine( "[Receiver#MessageReceived] " + protInit.MemberName + ": Called." );
this.incomingQueue.Put( msg );
if ( msg.GetType() == Message.MessageType.JOIN )
{
Debug.WriteLine( "[Receiver#MessageReceived] " + protInit.MemberName + ": Message is of type JOIN, message:" +
msg.ToString() );
GroupMemberInfo gmInfo = msg.GetGroupMemberInfo();
this.gmInfoMap.Put( gmInfo.GetName(), gmInfo );
Debug.WriteLine( "[Receiver#MessageReceived] " + protInit.MemberName + ": Group member info put in map." );
this.msgCreator.CheckAndSetTimeStamp( msg.GetTimeStamp() );
// put message into outgoing queue (in front?!)
// i think not in front, because LEAVE or JOIN message
// could take over each other.
// in case of LEAVE no message should be sent back?!
// put reponse message into outgoing queue
Debug.WriteLine( "[Receiver#MessageReceived] " + protInit.MemberName + ": Putting response of JOIN message into outgoing queue..." );
this.outgoingQueue.Put( this.msgCreator.CreateAliveMessage( null ) );
} else if ( msg.GetType() == Message.MessageType.LEAVE )
{
Debug.WriteLine( "[Receiver#MessageReceived] " + protInit.MemberName + ": Message is of type LEAVE, message:" +
msg.ToString() );
GroupMemberInfo gmInfo = msg.GetGroupMemberInfo();
this.gmInfoMap.Remove( gmInfo.GetName() );
Debug.WriteLine( "[Receiver#MessageReceived] " + protInit.MemberName + ": Group member info removed from map." );
// no response to left member necessary !?
} else if ( msg.GetType() == Message.MessageType.ALIVE )
{
Debug.WriteLine( "[Receiver#MessageReceived] " + protInit.MemberName + ": Message is of type ALIVE, message:" +
msg.ToString() );
GroupMemberInfo gmInfo = msg.GetGroupMemberInfo();
this.gmInfoMap.Put( gmInfo.GetName(), gmInfo );
Debug.WriteLine( "[Receiver#MessageReceived] " + protInit.MemberName + ": Group member info added to map." );
this.msgCreator.CheckAndSetTimeStamp( msg.GetTimeStamp() );
} else
{
Debug.WriteLine( "[Receiver#MessageReceived] " + protInit.MemberName + ": Message is NOT of type JOIN/LEAVE, message:" +
msg.ToString() );
}
}
}
}
|