using System;
using NUnit.Framework;
using System.Threading;
using grof;
using System.Diagnostics;
using grof.protocols;
using grof.util;
namespace grof.protocols{
/// <summary>
/// Description of TestWorkerThread.
/// </summary>
[TestFixture]
public class TestWorkerThread : AbstractTest
{
private MessageCreator msgCreator;
private BlockingQueueImpl<Message> source;
private BlockingQueueImpl<Message> sink;
private int counter = 0;
public TestWorkerThread()
{
}
[SetUp]
protected void SetUp()
{
base.SetUp();
this.source = new BlockingQueueImpl<Message>( "source" );
this.sink = new BlockingQueueImpl<Message>( "sink" );
this.msgCreator = new MessageCreator( "Foo", "127.0.0.1", 9010 );
}
[TearDown]
protected void TearDown()
{
base.TearDown();
}
[Test]
/// <summary>
/// Tests processing messages.<br>
/// <list type="bullet">
/// <listheader>Steps:</listheader>
/// <item>The source queue is filled with 10 messages.</item>
/// <item>The worker thread is started.</item>
/// <item>The worker thread must foward the messages of source queue
/// to the <c>ProcessMessageDelegate</c> and afterwards put into
/// the sink queue.</item>
/// </list>
/// </summary>
public void TestProcessingMessages()
{
for ( int i=0;i<10;i++ )
{
source.Put( this.msgCreator.CreateApplicationMessage( null ) );
}
WorkerThread wThread = new WorkerThread( "thread", source, sink, new ProcessMessageDelegate( this.ProcessMessage ) );
wThread.Start();
this.waitForProcessedMsgs();
// Waiting till last message was put into sink
Thread.Sleep( 200 );
Assert.AreEqual( 10, sink.Size() );
Assert.AreEqual( 0, source.Size() );
this.sink.Stop();
this.source.Stop();
wThread.Stop();
}
/// <summary>
/// Tests processing messages.<br>
/// <list type="bullet">
/// <listheader>Steps:</listheader>
/// <item>The worker thread is started.</item>
/// <item>The source queue is filled with 10 messages.</item>
/// <item>The worker thread must foward the messages of source queue
/// to the <c>ProcessMessageDelegate</c> and afterwards put into
/// the sink queue.</item>
/// </list>
/// </summary>
[Test]
public void TestProcessingMessages2()
{
WorkerThread wThread = new WorkerThread( "thread", source, sink, new ProcessMessageDelegate( this.ProcessMessage ) );
wThread.Start();
this.waitForProcessedMsgs();
for ( int i=0;i<10;i++ )
{
source.Put( this.msgCreator.CreateApplicationMessage( null ) );
}
// waiting till last message is put into sink
Thread.Sleep( 200 );
Assert.AreEqual( 10, sink.Size() );
Assert.AreEqual( 0, source.Size() );
}
/// <summary>
/// Tests stopping worker thread.<br>
/// <list type="bullet">
/// <listheader>Steps:</listheader>
/// <item>The worker thread is started.</item>
/// <item>After the worker thread is started the queue is stopped.</item>
/// <item>The worker thread is stopped and has to finish working.</item>
/// </list>
/// </summary>
[Test]
public void TestStoppingWorkerThreadStopped()
{
WorkerThread wThread = new WorkerThread( "thread", source, sink, new ProcessMessageDelegate( this.ProcessMessage ) );
wThread.Start();
// waiting till worker thread is fully started.
Thread.Sleep( 200 );
// stopping the queue
this.source.Stop();
// stopping the worker thread
wThread.Stop();
}
bool ProcessMessage( Message msg )
{
lock ( this )
{
counter++;
Monitor.Pulse( this );
}
return true;
}
void waitForProcessedMsgs()
{
lock ( this )
{
while ( this.counter != 10 )
{
Monitor.Wait( this );
}
}
}
}
}
|