using System;
using System.Threading;
using grof.util;
using System.Diagnostics;
namespace grof.protocols{
/// <summary>
/// The <c>WorkerThread</c> class is used for:
/// <list type="bullet">
/// <item>Taking messages from the incoming queue of the
/// lower protocol.</item>
/// <item>Processing the messages.</item>
/// <item>Putting the messages in its own incoming queue.</item>
/// </list>
/// Or:
/// <list type="bullet">
/// <item>Taking messages from the outgoing queue of the
/// upper protocol.</item>
/// <item>Processing the messages.</item>
/// <item>Putting the messages in its own outgoing queue.</item>
/// </list>
///
///
/// </summary>
public class WorkerThread
{
/// <summary>
/// The name of the worker thread.
/// </summary>
private string name;
/// <summary>
/// The delegate which process the message before
/// putting the message in its own outgoing or
/// incoming queue.
/// </summary>
private ProcessMessageDelegate msgDelegate;
/// <summary>
/// The source queue where the messages are taken
/// (incoming queue from the lower protocol or
/// outgoing queue of the upper protocol).
/// </summary>
private IBlockingQueue<Message> source;
/// <summary>
/// The sink queue where the messages are put
/// (own incoming or outgoing queue).
/// </summary>
private IBlockingQueue<Message> sink;
/// <summary>
/// Indicates whether worker thread is
/// stopped.
/// </summary>
private bool stopped = false;
/// <summary>
/// Indicates that stopping the
/// worker thread was finished.
/// </summary>
private bool stoppingReceiverThreadFinished = false;
/// <summary>
/// Creates instances of class <c>WorkerThread</c>.
/// </summary>
/// <param name="source">
/// The source queue where messages are taken.
/// </param>
/// <param name="sink">
/// The sink queue where messages are put.
/// </param>
/// <param name="processMsg">
/// The delegate which processes the message.
/// </param>
/// <returns></returns>
public WorkerThread( string name, IBlockingQueue<Message> source, IBlockingQueue<Message> sink,
ProcessMessageDelegate processMsg )
{
this.name = name;
this.source = source;
this.sink = sink;
this.msgDelegate = processMsg;
}
/// <summary>
/// Starts the worker thread.
/// </summary>
public void Start()
{
Thread t = new Thread( new ThreadStart( this.Run ) );
t.Start();
}
/// <summary>
/// Stops the worker thread.<br>
/// Depends on stopping the source queue.
/// </summary>
public void Stop()
{
this.stopped = true;
this.WaitForStoppingFinished();
}
/// <summary>
/// Runs the worker thread.<br>
/// The worker thread takes messages from source queue
/// forwards the message to the delegate and afterwards
/// puts the message into sink queue.
/// </summary>
public void Run()
{
Debug.WriteLine( "[WorkerThread#Run] [" + this.name + "] Started." );
while( !stopped )
{
try
{
Debug.WriteLine( "[WorkerThread#Run] [" + this.name + "] Taking message from source" );
Message msg = this.source.Take();
Debug.WriteLine( "[WorkerThread#Run] [" + this.name + "] Process message, msg ID: " + msg.ToString() );
if ( this.msgDelegate( msg ) )
{
Debug.WriteLine( "[WorkerThread#Run] [" + this.name + "] Put message to sink." );
this.sink.Put( msg );
} else
{
Debug.WriteLine( "[WorkerThread#Run] [" + this.name + "] Message is not put to sink." );
}
} catch( BlockingQueueStoppedException e )
{
this.stopped = true;
Debug.WriteLine( "[WorkerThread#Run] [" + this.name + "] Incoming queue of UDP protocol was stopped -> stop receiver thread." );
}
}
this.StoppingFinished();
}
/// <summary>
/// Indicates that stopping the worker thread
/// was finished.
/// </summary>
private void StoppingFinished()
{
Monitor.Enter( this );
this.stoppingReceiverThreadFinished = true;
Monitor.Pulse( this );
Debug.WriteLine( "[WorkerThread#stoppingFinished] [" + this.name + "] Called." );
Monitor.Exit( this );
}
/// <summary>
/// Waits till the worker thread
/// was stopped.
/// </summary>
private void WaitForStoppingFinished()
{
Monitor.Enter( this );
while ( !this.stoppingReceiverThreadFinished )
{
Debug.WriteLine( "[WorkerThread#waitForStoppingFinished] [" + this.name + "] Waiting for finished stopping..." );
Monitor.Wait( this );
}
Debug.WriteLine( "[WorkerThread#waitForStoppingFinished] [" + this.name + "] Stopping was finished." );
Monitor.Exit( this );
}
}
}
|