Sender.cs :  » Network-Clients » GROF » grof » protocols » membership » C# / CSharp Open Source

Home
C# / CSharp Open Source
1.2.6.4 mono .net core
2.2.6.4 mono core
3.Aspect Oriented Frameworks
4.Bloggers
5.Build Systems
6.Business Application
7.Charting Reporting Tools
8.Chat Servers
9.Code Coverage Tools
10.Content Management Systems CMS
11.CRM ERP
12.Database
13.Development
14.Email
15.Forum
16.Game
17.GIS
18.GUI
19.IDEs
20.Installers Generators
21.Inversion of Control Dependency Injection
22.Issue Tracking
23.Logging Tools
24.Message
25.Mobile
26.Network Clients
27.Network Servers
28.Office
29.PDF
30.Persistence Frameworks
31.Portals
32.Profilers
33.Project Management
34.RSS RDF
35.Rule Engines
36.Script
37.Search Engines
38.Sound Audio
39.Source Control
40.SQL Clients
41.Template Engines
42.Testing
43.UML
44.Web Frameworks
45.Web Service
46.Web Testing
47.Wiki Engines
48.Windows Presentation Foundation
49.Workflows
50.XML Parsers
C# / C Sharp
C# / C Sharp by API
C# / CSharp Tutorial
C# / CSharp Open Source » Network Clients » GROF 
GROF » grof » protocols » membership » Sender.cs

using System;
using grof.protocols;
using grof.util;
using grof;
using System.Threading;
using System.Collections;
using System.Diagnostics;

namespace grof.protocols.membership{
  /// <summary>
  /// The <c>Sender</c> object is responsible for sending
    /// all message (which are queued in the outgoing queue
    /// of the <c>UdpMembershipProtocol</c> instance) to
    /// all members of the same group.
  /// </summary>
  class Sender
  {

        /// <summary>
        /// Indicates whether <c>Sender</c> object was
        /// stopped sending messages.
        /// </summary>
    private bool stopped = false;    
    
        /// <summary>
        /// The <c>Sender</c> object works in its own 
        /// thread.
        /// </summary>
    private Thread senderThread;

        /// <summary>
        /// The TCP sender objects for sending messages
        /// via the TCP protocol.
        /// </summary>
    private TcpSender tcpSender;

        /// <summary>
        /// The multicast sender object for sending
        /// messages via the UDP protocol (JOIN and
        /// LEAVE messages)
        /// </summary>
    private MulticastSender multicastSender;
    
        /// <summary>
        /// The incoming queue which holds all received
        /// messages.
        /// </summary>
    private IBlockingQueue<Message> incomingQueue;

        /// <summary>
        /// The outgoing queue caching all messages
        /// which ought to be sent.
        /// </summary>
    private IBlockingQueue<Message> outgoingQueue;
    
        /// <summary>
        /// The <c>GroupMemberInfoMap</c> object which stores
        /// all relevant data of all group members of the same
        /// group.
        /// </summary>
    private GroupMemberInfoMap gmInfoMap;

        /// <summary>
        /// Semaphore used for synchronizing.
        /// </summary>
    private object semaphore;
    
    /// <summary>
    /// The <c>ProtocolInitializer</c> class 
    /// containing member relevant information.
    /// </summary>
    private ProtocolInitializer protInit;
    
    
        /// <summary>
        /// Creates instances of class <c>Sender</c>.
        /// </summary>
        /// <param name="protInit">The initializer object.</param>
        /// <param name="gmInfoMap">The map for storing group member data.</param>
        /// <param name="incomingQueue">The incoming queue holding all received
        /// messages.</param>
        /// <param name="outgoingQueue">The outgoing queue caching all messages which
        /// are sent.</param>
    public Sender( ProtocolInitializer protInit,
                  GroupMemberInfoMap gmInfoMap,
                  IBlockingQueue<Message> incomingQueue, 
                  IBlockingQueue<Message> outgoingQueue )
    {
      this.semaphore = new object();
      this.tcpSender = new TcpSender( protInit.MemberName );
      this.multicastSender = new MulticastSender( protInit.MemberName, protInit.GroupAddress, 
                                                 protInit.GroupPort );
      this.senderThread = new Thread( new ThreadStart( this.Run ) );
      this.incomingQueue = incomingQueue;
      this.outgoingQueue = outgoingQueue;
      this.gmInfoMap = gmInfoMap;      
      this.protInit = protInit;
      Debug.WriteLine( "[Sender#constructor] " + protInit.MemberName + ": Sender object created." );
    }
    
    /// <summary>
    /// Starts a thread which takes messages
    /// from the outgoing queue and sends
    /// them to the group members.  
    /// </summary>
    public void Start()
    {      
      this.senderThread.Start();
      Debug.WriteLine( "[Sender#Start] " + protInit.MemberName + ": Sender started." );
    }

        /// <summary>
        /// This method blocks until
        /// all elements of the outgoing queue
        /// were sent. Then the sender thread
        /// is stopped.
        /// </summary>
    public void Stop()
    {
      Monitor.Enter( this.semaphore );
      while ( !this.stopped )
      {
        Monitor.Wait( this.semaphore );
      }
      Monitor.Exit( this.semaphore );    
      Debug.WriteLine( "[Sender#Stop] " + protInit.MemberName + ": Sender stopped." );
    }
    
        /// <summary>
        /// This method run in its own thread. It retrieves
        /// messages from the outgoing queue and sends them
        /// via TCP or UDP.
        /// </summary>
    public void Run()
    {
      while( true )
      {
        Message msg = null;
        try 
        {
          Debug.WriteLine( "[Sender#run] " + protInit.MemberName + ": Taking message from outgoing queue..." );
          msg = this.outgoingQueue.Take();
          Debug.WriteLine( "[Sender#Run] " + protInit.MemberName + ": Message taken from outgoing queue, msg: " + msg.ToString() );
        } catch( BlockingQueueStoppedException e )
        {
          Debug.WriteLine( "[Sender#Run] " + protInit.MemberName + ": Outgoing queue was stopped, break..." );
          break;
        }
        if ( ( ( Message.MessageType ) msg.GetType() ) == Message.MessageType.JOIN ||
            ( ( Message.MessageType ) msg.GetType() ) == Message.MessageType.LEAVE )
        {
          Debug.WriteLine( "[Sender#Run] " + protInit.MemberName + ": Message is of type JOIN/LEAVE, multicast..." );
          // send a JOIN/LEAVE message to all group members
          this.multicastSender.Send( msg );
        } else
        {
          Debug.WriteLine( "[Sender#Run] " + protInit.MemberName + ": Message is NOT of type JOIN/LEAVE, send via TCP..." );
          
          // for all other message types send the message
          // to the group members via TCP  
          // As long as the message is sent to all
          // group members, the group member info map
          // must not be changed. Therefore the access
          // has to be synchronized
          lock( this.gmInfoMap )
          {
            this.tcpSender.Send( msg, this.gmInfoMap.GetGroupMemberInfos() );
          }
        }
      }  
      this.multicastSender.Stop();
      this.finished();
    }
    
        /// <summary>
        /// This method is called when sender thread
        /// was stopped.
        /// </summary>
    private void finished()
    {
      Monitor.Enter( this.semaphore );
      this.stopped = true;
      Monitor.PulseAll( this.semaphore );
      Monitor.Exit( this.semaphore );
    }
    
    
  }
  
}
www.java2v.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.