using System;
using System.Net;
using System.Net.Sockets;
using System.IO;
using System.Diagnostics;
using System.Threading;
namespace grof.protocols.membership{
/// <summary>
/// The <code>TcpReceiver</code> class is responsible
/// for listening to incoming TCP connections and
/// reading messages from them. The received messages
/// are delegated to the passed <code>IMessageListener</code>
/// object.
/// </summary>
public class TcpReceiver
{
/// <summary>
/// The port the TCP receiver listens for
/// incoming messages.
/// </summary>
private int listenerPort;
/// <summary>
/// The IP address the TCP receiver is bound to.
/// </summary>
private IPAddress address;
/// <summary>
/// The delegate the received message is
/// forwarded.
/// </summary>
private MessageReceivedDelegate msgListener;
/// <summary>
/// Listens for incoming messages.
/// </summary>
private TcpListener server;
/// <summary>
/// Indicates whether the TCP receiver
/// was stopped.
/// </summary>
private bool stopped = false;
/// <summary>
/// The TCP client.
/// </summary>
private TcpClient client;
/// <summary>
/// Listening for incoming messages in
/// a specific thread.
/// </summary>
private Thread receiverThread;
/// <summary>
/// The name of the group member.
/// </summary>
private string memberName;
/// <summary>
/// Creates instances of class <code>TcpReceiver</code>.
/// </summary>
/// <param name="address">The IP address the receiver is bound.</param>
/// <param name="port">The port the receiver listens.</param>
/// <param name="listener">The <code>IMessageListener</code> object where
/// the received messages are delegated.</param>
/// <returns></returns>
public TcpReceiver( string address, int port, string memberName,
MessageReceivedDelegate listener )
{
this.listenerPort = port;
this.address = IPAddress.Parse( address );
this.msgListener = listener;
this.memberName = memberName;
this.receiverThread = new Thread( new ThreadStart( this.Receive ) );
Debug.WriteLine( "[TcpReceiver#constructor] " + this.memberName + ": TCP receiver created" );
}
/// <summary>
/// Starts the <code>TcpReceiver</code>
/// thread in its own thread and waits
/// for incoming connections.
/// </summary>
public void Start()
{
this.receiverThread.Start();
Debug.WriteLine( "[TcpReceiver#Start] " + this.memberName + ": TcpReceiver thread was started." );
}
/// <summary>
/// Starts receiving messages from
/// incoming network connections.
/// </summary>
void Receive()
{
Debug.WriteLine( "[TcpReceiver] " + this.memberName + ": Receive method entered." );
try
{
server = new TcpListener( this.address, this.listenerPort);
// Start listening for client requests.
server.Start();
while( !this.isStopped() )
{
// Enter the listening loop.
Debug.WriteLine("[TcpReceiver#Receive] " + this.memberName + ": Waiting for a connection... ");
// Perform a blocking call to accept requests.
// You could also user server.AcceptSocket() here.
client = server.AcceptTcpClient();
try
{
Debug.WriteLine("[TcpReceiver#Receive] " + this.memberName + ": Connected!");
// Get a stream object for reading and writing
NetworkStream stream = client.GetStream();
// Loop to receive all the data sent by the client.
byte[] buffer = new byte[4096];
MemoryStream ms = new MemoryStream();
while ( true )
{
Debug.WriteLine( "[TcpReceiver#Receive] " + this.memberName + ": Reading message..." );
int read = stream.Read (buffer, 0, buffer.Length);
Debug.WriteLine( "[TcpReceiver#Receive] " + buffer.Length + " bytes read." );
if (read <= 0)
break;
ms.Write (buffer, 0, read);
}
Message msg = Message.FromBytes( ms.ToArray() );
this.msgListener( msg );
Debug.WriteLine( "[TcpReceiver#Receive] " + this.memberName + ": Received message: " + msg );
}
catch( SocketException se )
{
// this exception is thrown when something went wrong with
// reading the incoming message from stream.
Trace.WriteLine( "[TcpReceiver#Receive] " + this.memberName + ": Network error during reading message:" +
se.ToString() );
} catch( Exception e )
{
Debug.WriteLine( "[TcpReceiver#Receive] " + this.memberName + ": Error occurred during reading message: " +
e.ToString() );
}
finally {
// Shutdown and end connection
client.Close();
}
}
}
catch( SocketException e )
{
// This exception is thrown when something went wrong during
// starting the TcpListener
Trace.WriteLine("[TcpReceiver#Receive] " + this.memberName + ": SocketException, Message: " +
e.Message + ", Error code: " + e.ErrorCode );
}
catch( ThreadAbortException ta )
{
// This exception is thrown when TcpReceiver thread was stopped
// by Stop() method.
Trace.WriteLine( "[TcpReceiver#Receive] " + this.memberName + ": TcpReceiver thread was stopped." );
}
Trace.WriteLine( "[TcpReceiver#Receive] " + this.memberName + ": Receive method left." );
}
/// <summary>
/// Helper method which indicates whether
/// receiver was stopped.
/// </summary>
/// <returns></returns>
private bool isStopped()
{
lock( this )
{
return this.stopped;
}
}
/// <summary>
/// Stops the <code>TcpReceiver</code>
/// thread and cleans resources.
/// </summary>
public void Stop()
{
lock( this )
{
this.stopped = true;
}
if ( this.server != null )
{
this.server.Stop();
}
if ( this.client != null )
{
this.client.Close();
}
this.receiverThread.Abort();
Debug.WriteLine( "[TcpReceiver#Stop] " + this.memberName + ": called." );
}
}
}
|