using System;
using System.Collections.Generic;
using System.Threading;
using System.Diagnostics;
namespace grof{
namespace util
{
/// <summary>
/// This class represents an implementation of the
/// <c>IBlockingQueue</c> interface.
/// </summary>
public class BlockingQueueImpl<T> : IBlockingQueue<T>
{
/// <summary>
/// The protocol name.
/// </summary>
private string name;
/// <summary>
/// The internal data structure for queuing
/// elements.
/// </summary>
private Queue<T> queue;
/// <summary>
/// A semaphore for synchronizing
/// the access to the queue.
/// </summary>
private object semaphore;
/// <summary>
/// Indicates whether the queue was stopped.
/// </summary>
private bool stopped;
/// <summary>
/// Creates instances of class
/// <c>BlockingQueueImpl</c>.
/// </summary>
public BlockingQueueImpl( string name )
{
this.name = name;
this.semaphore = new object();
this.queue = new Queue<T>();
this.stopped = false;
}
/// <summary>
/// Puts an element into the queue. If the queue
/// was stopped no further elements can be
/// queued, because a <c>BlockingQueueStoppedException</c>
/// is thrown.
/// </summary>
/// <param name="element">The element which is cached
/// in the queue.</param>
public void Put( T element )
{
Debug.WriteLine( "[BlockingQueueImpl#Put] [" + this.name + "] Called." );
try
{
Monitor.Enter( this.semaphore );
if ( this.stopped == false )
{
this.queue.Enqueue( element );
Monitor.PulseAll( this.semaphore );
} else
{
throw new BlockingQueueStoppedException( "Queue was stopped." );
}
} finally {
Monitor.Exit( this.semaphore );
}
}
/// <summary>
/// Retrieves an element from the queue. If the
/// queue was stopped, no further elements can be
/// retrieved, because a <c>BlockingQueueStoppedException</c>
/// is thrown.
/// </summary>
/// <returns>The retrieved element.</returns>
public T Take()
{
Debug.WriteLine( "[BlockingQueueImpl#Take] [" + this.name + "] Called." );
Monitor.Enter( this.semaphore );
try {
while ( this.queue.Count == 0 )
{
if ( this.stopped )
{
throw new BlockingQueueStoppedException( "Queue was stopped" );
}
Monitor.Wait( this.semaphore );
}
if ( this.stopped )
{
throw new BlockingQueueStoppedException( "Queue was stopped" );
}
T element = this.queue.Dequeue();
return element;
} finally{
Monitor.Exit( this.semaphore );
}
}
/// <summary>
/// Returns the size of the queue.
/// </summary>
/// <returns>The number of cached elements.</returns>
public int Size()
{
Debug.WriteLine( "[BlockingQueueImpl#Size] [" + this.name + "] Called." );
Monitor.Enter( this.semaphore );
try
{
return this.queue.Count;
} finally
{
Monitor.Exit( this.semaphore );
}
}
/// <summary>
/// Stops the queue, that means no further elements
/// can be queued or dequeued in and from queue.
/// </summary>
public void Stop()
{
Debug.WriteLine( "[BlockingQueueImpl#Stop] [" + this.name + "] Called." );
Monitor.Enter( this.semaphore );
this.stopped = true;
Monitor.PulseAll( this.semaphore );
Monitor.Exit( this.semaphore );
}
}
}
}
|