using System;
using System.Net;
using System.Net.Sockets;
using System.IO;
using System.Text;
using System.Threading;
using System.Diagnostics;
using grof;
namespace grof.protocols.membership{
/// <summary>
/// The <c>MulticastReceiver</c> is part of the
/// <c>UdpMembershipProtocol</c> and listens
/// for multicast messages of type JOIN and LEAVE.
/// </summary>
public class MulticastReceiver {
/// <summary>
/// The IP address of the multicast group
/// defined in class <c>Initializer</c>.
/// </summary>
private IPAddress groupAddress;
/// <summary>
/// The port number of the multicast group
/// defined in class <c>Initializer</c>.
/// </summary>
private int groupPort;
/// <summary>
/// The <c>ReusableUdpClient</c> receives
/// multicast UDP messages and makes it possible
/// that several group members on the same local
/// machine are able to listen to the same group
/// port.
/// </summary>
private ReusableUdpClient receiver;
/// <summary>
/// The <c>MulticastReceiver</c> instance works in
/// its own thread and listens for incoming UDP
/// packets.
/// </summary>
private Thread t;
/// <summary>
/// Delegates received messages.
/// </summary>
private MessageReceivedDelegate listener;
/// <summary>
/// Indicates when the <c>MulticastReceiver</c> was
/// stopped.
/// </summary>
private bool stopped;
/// <summary>
/// The name of the group member.
/// </summary>
private string memberName;
/// <summary>
/// Creates instances of class
/// <c>MulticastReceiver</c>.
/// </summary>
/// <param name="groupAddress">The IP address of the multicast group.</param>
/// <param name="groupPort">The port number of the multicast group.</param>
/// <param name="listener">The listener where the messages are delegated to.</param>
public MulticastReceiver( string groupAddress, int groupPort, string memberName, MessageReceivedDelegate listener )
{
this.groupAddress = IPAddress.Parse( groupAddress );
this.groupPort = groupPort;
this.listener = listener;
this.memberName = memberName;
this.receiver = new ReusableUdpClient( this.groupPort );
//this.receiver.Client.SetSocketOption(SocketOptionLevel.Socket,
// SocketOptionName.ReuseAddress,
// 1);
this.t = new Thread( new ThreadStart( this.Listen ) );
// setting the thread as background thread
// so it is stopped when the last foreground thread is stopped.
this.t.IsBackground = true;
Debug.WriteLine( "[MulticastReceiver#constructor] " + this.memberName + ": Multicast receiver object created." );
}
/// <summary>
/// Starts the multicast receiver for listening.
/// </summary>
public void Start()
{
this.stopped = false;
t.Start();
Debug.WriteLine( "[MulticastReceiver#Start] " + this.memberName + ": Multicast receiver started." );
}
/// <summary>
/// This method is called when the <c>MulticastReceiver</c>
/// thread was started.
/// </summary>
public void Listen() {
try {
this.receiver.JoinMulticastGroup( this.groupAddress, 50 );
IPEndPoint iep = new IPEndPoint( IPAddress.Any, 0 );
while ( !this.isStopped() ) {
Debug.WriteLine("[MulticastReceiver#Listen] " + this.memberName + ": Waiting for multicast message");
byte[] data = this.receiver.Receive(ref iep);
Message msg = Message.FromBytes( data );
Debug.WriteLine("[MulticastReceiver#Listen] " + this.memberName + ": Received multicast message from "
+ iep.ToString() );
// This check has to be done, because stopping the receiver
// unblocks the Receive() method and would cause a NullPointerException
if ( msg != null )
{
Debug.WriteLine("[MulticastReceiver#Listen] " + this.memberName + ": Received message: "
+ msg.ToString() );
this.listener( msg );
}
}
} catch( SocketException se )
{
Debug.WriteLine( "[MulticastReceiver#Listen] " + this.memberName + ": SocketException occurred! Message: " + se.Message + ", Error code: " + se.ErrorCode );
} catch ( Exception e )
{
Debug.WriteLine( "[MulticastReceiver#listen] " + this.memberName + ": Exception occured! Message: " + e.Message );
}
Debug.WriteLine( "[MulticastReceiver#listen] " + this.memberName + ": Receiver left receive loop." );
}
/// <summary>
/// Helper method which indicates whether
/// receiver was stopped.
/// </summary>
/// <returns></returns>
private bool isStopped()
{
lock( this )
{
return this.stopped;
}
}
/// <summary>
/// Stops listening for incoming multicast messages.
/// </summary>
public void Stop()
{
lock( this )
{
this.stopped = true;
}
// closes the multicast socket
// which causes a SocketException in listener
// thread
this.receiver.Close();
Debug.WriteLine( "[MulticastReceiver#Stop] " + this.memberName + ": Multicast receiver stopped." );
}
}
}
|