SqlBatchProcessor.cs :  » Persistence-Frameworks » ORM.NET » OrmLib » C# / CSharp Open Source

Home
C# / CSharp Open Source
1.2.6.4 mono .net core
2.2.6.4 mono core
3.Aspect Oriented Frameworks
4.Bloggers
5.Build Systems
6.Business Application
7.Charting Reporting Tools
8.Chat Servers
9.Code Coverage Tools
10.Content Management Systems CMS
11.CRM ERP
12.Database
13.Development
14.Email
15.Forum
16.Game
17.GIS
18.GUI
19.IDEs
20.Installers Generators
21.Inversion of Control Dependency Injection
22.Issue Tracking
23.Logging Tools
24.Message
25.Mobile
26.Network Clients
27.Network Servers
28.Office
29.PDF
30.Persistence Frameworks
31.Portals
32.Profilers
33.Project Management
34.RSS RDF
35.Rule Engines
36.Script
37.Search Engines
38.Sound Audio
39.Source Control
40.SQL Clients
41.Template Engines
42.Testing
43.UML
44.Web Frameworks
45.Web Service
46.Web Testing
47.Wiki Engines
48.Windows Presentation Foundation
49.Workflows
50.XML Parsers
C# / C Sharp
C# / C Sharp by API
C# / CSharp Tutorial
C# / CSharp Open Source » Persistence Frameworks » ORM.NET 
ORM.NET » OrmLib » SqlBatchProcessor.cs
//This file is part of ORM.NET.
//
//ORM.NET is free software; you can redistribute it and/or
//modify it under the terms of the GNU Lesser General Public
//License as published by the Free Software Foundation; either
//version 2.1 of the License, or (at your option) any later version.
//
//ORM.NET is distributed in the hope that it will be useful,
//but WITHOUT ANY WARRANTY; without even the implied warranty of
//MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
//Lesser General Public License for more details.
//
//You should have received a copy of the GNU Lesser General Public
//License along with ORM.NET; if not, write to the Free Software
//Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA


using System;
using System.IO;
using System.Diagnostics;
using System.Data;
using System.Data.SqlClient;
using System.Data.SqlTypes;
using System.Configuration;
using System.Threading;
using System.ComponentModel;

namespace OrmLib{
  [EditorBrowsable(EditorBrowsableState.Never)]
  public class SqlBatchProcessor
  {
    private const int MIN_SECONDS_BETWEEN_DB_CALLS = 8;
    
    private Mutex FileProcessingLock = new Mutex();
    private FileInfo BatchUpdate;
    private DateTime LastDbRefresh;
    private DataTable dt;
    private string SqlSelect;
    private string TableName;
    private string Dsn;
    private string BatchStatusColumnName;
    private int NumRowsToProcess;
    private int MaxMinutesToProcess;
    private int CurrentRowNum = 0;
    private int NumThreadsFinished = 0;

    public SqlBatchProcessor(string dsn, string tableName,
      string batchStatusColumnName, int numRowsToProcess, 
      int maxMinutesToProcess)
    {
      Dsn = dsn;
      TableName = tableName;
      BatchStatusColumnName = batchStatusColumnName;
      NumRowsToProcess = numRowsToProcess;
      MaxMinutesToProcess = maxMinutesToProcess;


      // any left over processed batch from last run, that need
      // to go to the DB?
      CommitLastBatchToDb();
      
      // generate a sql statement to select this type of batch
      //SqlSelect = GenerateBatchSelect( );

      // populate the batch
      dt = PopulateDataTable();
    }

    private bool CommitLastBatchToDb()
    {
      bool bSuccess = true;
      LastDbRefresh = DateTime.Now;
      BatchUpdate = new FileInfo( ConfigurationSettings.AppSettings["SqlBatchesDir"] + "\\" + TableName + ".batch");
      bool IsOwned = false;
      try
      {
        BatchUpdate.Refresh();
        if (BatchUpdate.Exists )
        {
          // lock anyone else from writing to the file
          FileProcessingLock.WaitOne(10000, false);
          IsOwned = true;

          // read the file into a string
          FileStream fs = BatchUpdate.OpenRead();
          StreamReader sr = new StreamReader( fs );
          string sqlBatch = sr.ReadToEnd();

          sr.Close();
          fs.Close();

          // execute the sql and delete the file
          ExecuteSql(sqlBatch);

          // if there were no exceptions, then delete it
          BatchUpdate.Delete();
        }
      }
      catch(Exception e)
      {
        string message = e.Message;
        Debug.WriteLine( message, e.StackTrace);
        bSuccess = false;
      }
      finally
      {
        // release the file to the writers again
        if (IsOwned) FileProcessingLock.ReleaseMutex();
      }
      return bSuccess;
    }

    private void ExecuteSql(string sql)
    {
      sql = "BEGIN TRANSACTION\r\n" + sql + "\r\nCOMMIT TRANSACTION\r\n";
      
      SqlConnection conn = new SqlConnection();
      
      try
      {
        conn.ConnectionString = Dsn;

        conn.Open();
        SqlCommand cmd = new SqlCommand(sql, conn);
        cmd.ExecuteNonQuery();
      }

      catch (Exception ex)
      {
        if (conn != null) conn.Close();
        throw ex;
      }

      if (conn != null) conn.Close();
    }

    private void AddToBatch( string sqlUpdate)
    {
      // lock the file while we append to it
      FileProcessingLock.WaitOne( 10000, false);

      StreamWriter sw = BatchUpdate.AppendText();
      sw.WriteLine(sqlUpdate);
      sw.Flush();
      sw.Close();

      // unlock it
      FileProcessingLock.ReleaseMutex();
    }

    private DataTable PopulateDataTable()
    {
      DataTable dt = new DataTable();
      dt.Locale = System.Globalization.CultureInfo.CurrentCulture;

      SqlConnection conn = new SqlConnection();

      try
      {
        // generate a sql statement to select this type of batch
        // this needs to be done each time so old jobs wont be
        // re-fetched.
        SqlSelect = GenerateBatchSelect( );

        conn.ConnectionString = Dsn;
        SqlCommand cmd = new SqlCommand( SqlSelect, conn);
        
        SqlDataAdapter da = new SqlDataAdapter( cmd );
        dt = new DataTable();

        da.Fill( dt );
        CurrentRowNum = 0;
        NumThreadsFinished = 0;
      }
      catch(Exception e)
      {
        // TODO: log error
        Debug.WriteLine( e.Message, "SqlBatchProcessor::PopulateDataTable()");
        if (conn != null) conn.Close();
        throw e;
      }

      if (conn != null) conn.Close();

      return dt;
    }

    private string GenerateBatchSelect()
    {
      string s = "";
      s += "BEGIN TRANSACTION\r\n\r\n";


      // reset older records that are still in a processing state..
      s += "UPDATE\t" + TableName + "\r\n";
      s += "SET\t" + BatchStatusColumnName + " = NULL\r\n";
      s += "WHERE\tdtStamp < dateadd( n, -" + MaxMinutesToProcess.ToString() + ", getDate()) AND\r\n";
      s += BatchStatusColumnName + "\tLIKE 'PROCESSING%' \r\n";

      // mark the batch of records we want to work on 
      // with a unique processing guid
      s += "SET ROWCOUNT " + NumRowsToProcess.ToString() + "\r\n";
      s += "UPDATE\t" + TableName + "\r\n";
      s += "SET\t" + BatchStatusColumnName + " = '{0}'\r\n";
      s += "WHERE\t" + BatchStatusColumnName + " IS NULL AND DateToProcess <= getDate()\r\n";
      
      // retrieve the batch we just marked
      s += "SELECT\t*\r\n";
      s += "FROM\t" + TableName + "\r\n";
      s += "WHERE\t" + BatchStatusColumnName + " = '{0}'\r\n";
      s += "SET ROWCOUNT 0\r\n";
      s += "COMMIT TRANSACTION\r\n";

      s = string.Format( s, "PROCESSING " + Guid.NewGuid().ToString("N") );

      //Debug.WriteLine(s);
      return s;
    }

    public void SetThreadFinished()
    {
      lock(this)
      {
        NumThreadsFinished++;
      }
    }

    public DataRow GetNextRecord()
    {
      DataRow r = null;

      lock(this)
      {
        // is the table fully processed?
        if ((NumThreadsFinished) >= dt.Rows.Count && (CurrentRowNum >= dt.Rows.Count))
        {
          //if (dt.Rows.Count > 0) Debug.WriteLine("RowCount=" + dt.Rows.Count + " ThreadsReturned=" + NumFinishedThreads + " CurrentRow=" + CurrentRowNum,"SqlBatchProcessor");

          TimeSpan ts = DateTime.Now - LastDbRefresh;
          
          // if its not too early to refresh...
          if (ts.Seconds > MIN_SECONDS_BETWEEN_DB_CALLS)
          {
            // commit the last batch
            if (CommitLastBatchToDb())
            {
              // refresh table
              dt = PopulateDataTable();
            }
            
          }

          return null;
        }

        // dont return a row if job is finished and we are waiting for threads to return
        //if (CurrentRowNum <= dt.Rows.Count)
        if (dt.Rows.Count > CurrentRowNum)
        {
          // this copy of the row (and table schema) needs to be made 
          // so that we still contain a valid row when dt gets assigned
          // a new set of rows.
          //DataTable dt_copy = dt.Clone();
          //dt_copy.ImportRow(dt.Rows[CurrentRowNum]);
          //r = dt_copy.Rows[0];
          r = dt.Rows[CurrentRowNum];
          CurrentRowNum++;
        }
      }  

      return r;
    }

    public void UpdateRecord( DataRow r)
    {
      // generate the sql to update this row
      string sqlUpdate = GenerateUpdateSql( r );

      // append the sql statement to the batch file
      AddToBatch( sqlUpdate);
    }

    private string GenerateUpdateSql(DataRow r)
    {
      string sqlUpdate = "";
      sqlUpdate += "UPDATE\t" + TableName + "\r\n";
      sqlUpdate += "SET\t" + BatchStatusColumnName + " = '" + (string)r[BatchStatusColumnName] + "',\r\n";
      sqlUpdate += "\tDateProcessed = '" + FormatDateString(((DateTime)r["DateProcessed"])) + "'\r\n";
      sqlUpdate += "WHERE\t ID= " + ((int)r["ID"]).ToString() + "\r\n";

      return sqlUpdate;
    }

    private string FormatDateString( DateTime d)
    {
      return d.ToShortDateString() + " " + d.ToLongTimeString();
    }
  }
}
www.java2v.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.