//This file is part of ORM.NET.
//
//ORM.NET is free software; you can redistribute it and/or
//modify it under the terms of the GNU Lesser General Public
//License as published by the Free Software Foundation; either
//version 2.1 of the License, or (at your option) any later version.
//
//ORM.NET is distributed in the hope that it will be useful,
//but WITHOUT ANY WARRANTY; without even the implied warranty of
//MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
//Lesser General Public License for more details.
//
//You should have received a copy of the GNU Lesser General Public
//License along with ORM.NET; if not, write to the Free Software
//Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
using System;
using System.IO;
using System.Diagnostics;
using System.Data;
using System.Data.SqlClient;
using System.Data.SqlTypes;
using System.Configuration;
using System.Threading;
using System.ComponentModel;
namespace OrmLib{
[EditorBrowsable(EditorBrowsableState.Never)]
public class SqlBatchProcessor
{
private const int MIN_SECONDS_BETWEEN_DB_CALLS = 8;
private Mutex FileProcessingLock = new Mutex();
private FileInfo BatchUpdate;
private DateTime LastDbRefresh;
private DataTable dt;
private string SqlSelect;
private string TableName;
private string Dsn;
private string BatchStatusColumnName;
private int NumRowsToProcess;
private int MaxMinutesToProcess;
private int CurrentRowNum = 0;
private int NumThreadsFinished = 0;
public SqlBatchProcessor(string dsn, string tableName,
string batchStatusColumnName, int numRowsToProcess,
int maxMinutesToProcess)
{
Dsn = dsn;
TableName = tableName;
BatchStatusColumnName = batchStatusColumnName;
NumRowsToProcess = numRowsToProcess;
MaxMinutesToProcess = maxMinutesToProcess;
// any left over processed batch from last run, that need
// to go to the DB?
CommitLastBatchToDb();
// generate a sql statement to select this type of batch
//SqlSelect = GenerateBatchSelect( );
// populate the batch
dt = PopulateDataTable();
}
private bool CommitLastBatchToDb()
{
bool bSuccess = true;
LastDbRefresh = DateTime.Now;
BatchUpdate = new FileInfo( ConfigurationSettings.AppSettings["SqlBatchesDir"] + "\\" + TableName + ".batch");
bool IsOwned = false;
try
{
BatchUpdate.Refresh();
if (BatchUpdate.Exists )
{
// lock anyone else from writing to the file
FileProcessingLock.WaitOne(10000, false);
IsOwned = true;
// read the file into a string
FileStream fs = BatchUpdate.OpenRead();
StreamReader sr = new StreamReader( fs );
string sqlBatch = sr.ReadToEnd();
sr.Close();
fs.Close();
// execute the sql and delete the file
ExecuteSql(sqlBatch);
// if there were no exceptions, then delete it
BatchUpdate.Delete();
}
}
catch(Exception e)
{
string message = e.Message;
Debug.WriteLine( message, e.StackTrace);
bSuccess = false;
}
finally
{
// release the file to the writers again
if (IsOwned) FileProcessingLock.ReleaseMutex();
}
return bSuccess;
}
private void ExecuteSql(string sql)
{
sql = "BEGIN TRANSACTION\r\n" + sql + "\r\nCOMMIT TRANSACTION\r\n";
SqlConnection conn = new SqlConnection();
try
{
conn.ConnectionString = Dsn;
conn.Open();
SqlCommand cmd = new SqlCommand(sql, conn);
cmd.ExecuteNonQuery();
}
catch (Exception ex)
{
if (conn != null) conn.Close();
throw ex;
}
if (conn != null) conn.Close();
}
private void AddToBatch( string sqlUpdate)
{
// lock the file while we append to it
FileProcessingLock.WaitOne( 10000, false);
StreamWriter sw = BatchUpdate.AppendText();
sw.WriteLine(sqlUpdate);
sw.Flush();
sw.Close();
// unlock it
FileProcessingLock.ReleaseMutex();
}
private DataTable PopulateDataTable()
{
DataTable dt = new DataTable();
dt.Locale = System.Globalization.CultureInfo.CurrentCulture;
SqlConnection conn = new SqlConnection();
try
{
// generate a sql statement to select this type of batch
// this needs to be done each time so old jobs wont be
// re-fetched.
SqlSelect = GenerateBatchSelect( );
conn.ConnectionString = Dsn;
SqlCommand cmd = new SqlCommand( SqlSelect, conn);
SqlDataAdapter da = new SqlDataAdapter( cmd );
dt = new DataTable();
da.Fill( dt );
CurrentRowNum = 0;
NumThreadsFinished = 0;
}
catch(Exception e)
{
// TODO: log error
Debug.WriteLine( e.Message, "SqlBatchProcessor::PopulateDataTable()");
if (conn != null) conn.Close();
throw e;
}
if (conn != null) conn.Close();
return dt;
}
private string GenerateBatchSelect()
{
string s = "";
s += "BEGIN TRANSACTION\r\n\r\n";
// reset older records that are still in a processing state..
s += "UPDATE\t" + TableName + "\r\n";
s += "SET\t" + BatchStatusColumnName + " = NULL\r\n";
s += "WHERE\tdtStamp < dateadd( n, -" + MaxMinutesToProcess.ToString() + ", getDate()) AND\r\n";
s += BatchStatusColumnName + "\tLIKE 'PROCESSING%' \r\n";
// mark the batch of records we want to work on
// with a unique processing guid
s += "SET ROWCOUNT " + NumRowsToProcess.ToString() + "\r\n";
s += "UPDATE\t" + TableName + "\r\n";
s += "SET\t" + BatchStatusColumnName + " = '{0}'\r\n";
s += "WHERE\t" + BatchStatusColumnName + " IS NULL AND DateToProcess <= getDate()\r\n";
// retrieve the batch we just marked
s += "SELECT\t*\r\n";
s += "FROM\t" + TableName + "\r\n";
s += "WHERE\t" + BatchStatusColumnName + " = '{0}'\r\n";
s += "SET ROWCOUNT 0\r\n";
s += "COMMIT TRANSACTION\r\n";
s = string.Format( s, "PROCESSING " + Guid.NewGuid().ToString("N") );
//Debug.WriteLine(s);
return s;
}
public void SetThreadFinished()
{
lock(this)
{
NumThreadsFinished++;
}
}
public DataRow GetNextRecord()
{
DataRow r = null;
lock(this)
{
// is the table fully processed?
if ((NumThreadsFinished) >= dt.Rows.Count && (CurrentRowNum >= dt.Rows.Count))
{
//if (dt.Rows.Count > 0) Debug.WriteLine("RowCount=" + dt.Rows.Count + " ThreadsReturned=" + NumFinishedThreads + " CurrentRow=" + CurrentRowNum,"SqlBatchProcessor");
TimeSpan ts = DateTime.Now - LastDbRefresh;
// if its not too early to refresh...
if (ts.Seconds > MIN_SECONDS_BETWEEN_DB_CALLS)
{
// commit the last batch
if (CommitLastBatchToDb())
{
// refresh table
dt = PopulateDataTable();
}
}
return null;
}
// dont return a row if job is finished and we are waiting for threads to return
//if (CurrentRowNum <= dt.Rows.Count)
if (dt.Rows.Count > CurrentRowNum)
{
// this copy of the row (and table schema) needs to be made
// so that we still contain a valid row when dt gets assigned
// a new set of rows.
//DataTable dt_copy = dt.Clone();
//dt_copy.ImportRow(dt.Rows[CurrentRowNum]);
//r = dt_copy.Rows[0];
r = dt.Rows[CurrentRowNum];
CurrentRowNum++;
}
}
return r;
}
public void UpdateRecord( DataRow r)
{
// generate the sql to update this row
string sqlUpdate = GenerateUpdateSql( r );
// append the sql statement to the batch file
AddToBatch( sqlUpdate);
}
private string GenerateUpdateSql(DataRow r)
{
string sqlUpdate = "";
sqlUpdate += "UPDATE\t" + TableName + "\r\n";
sqlUpdate += "SET\t" + BatchStatusColumnName + " = '" + (string)r[BatchStatusColumnName] + "',\r\n";
sqlUpdate += "\tDateProcessed = '" + FormatDateString(((DateTime)r["DateProcessed"])) + "'\r\n";
sqlUpdate += "WHERE\t ID= " + ((int)r["ID"]).ToString() + "\r\n";
return sqlUpdate;
}
private string FormatDateString( DateTime d)
{
return d.ToShortDateString() + " " + d.ToLongTimeString();
}
}
}
|