JobRunShell.cs :  » Business-Application » Quartz-Enterprise-Scheduler » Quartz » Core » C# / CSharp Open Source

Home
C# / CSharp Open Source
1.2.6.4 mono .net core
2.2.6.4 mono core
3.Aspect Oriented Frameworks
4.Bloggers
5.Build Systems
6.Business Application
7.Charting Reporting Tools
8.Chat Servers
9.Code Coverage Tools
10.Content Management Systems CMS
11.CRM ERP
12.Database
13.Development
14.Email
15.Forum
16.Game
17.GIS
18.GUI
19.IDEs
20.Installers Generators
21.Inversion of Control Dependency Injection
22.Issue Tracking
23.Logging Tools
24.Message
25.Mobile
26.Network Clients
27.Network Servers
28.Office
29.PDF
30.Persistence Frameworks
31.Portals
32.Profilers
33.Project Management
34.RSS RDF
35.Rule Engines
36.Script
37.Search Engines
38.Sound Audio
39.Source Control
40.SQL Clients
41.Template Engines
42.Testing
43.UML
44.Web Frameworks
45.Web Service
46.Web Testing
47.Wiki Engines
48.Windows Presentation Foundation
49.Workflows
50.XML Parsers
C# / C Sharp
C# / C Sharp by API
C# / CSharp Tutorial
C# / CSharp Open Source » Business Application » Quartz Enterprise Scheduler 
Quartz Enterprise Scheduler » Quartz » Core » JobRunShell.cs
/* 
* Copyright 2004-2009 James House 
* 
* Licensed under the Apache License, Version 2.0 (the "License"); you may not 
* use this file except in compliance with the License. You may obtain a copy 
* of the License at 
* 
*   http://www.apache.org/licenses/LICENSE-2.0 
*   
* Unless required by applicable law or agreed to in writing, software 
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 
* License for the specific language governing permissions and limitations 
* under the License.
* 
*/

/*
* Previously Copyright (c) 2001-2004 James House
*/
using System;
using System.Globalization;
using System.Threading;

using Common.Logging;

using Quartz.Spi;

namespace Quartz.Core{
  /// <summary> 
  /// JobRunShell instances are responsible for providing the 'safe' environment
  /// for <see cref="IJob" /> s to run in, and for performing all of the work of
  /// executing the <see cref="IJob" />, catching ANY thrown exceptions, updating
  /// the <see cref="Trigger" /> with the <see cref="IJob" />'s completion code,
  /// etc.
  /// <p>
  /// A <see cref="JobRunShell" /> instance is created by a <see cref="IJobRunShellFactory" />
  /// on behalf of the <see cref="QuartzSchedulerThread" /> which then runs the
  /// shell in a thread from the configured <see cref="ThreadPool" /> when the
  /// scheduler determines that a <see cref="IJob" /> has been triggered.
  /// </p>
  /// </summary>
  /// <seealso cref="IJobRunShellFactory" /> 
  /// <seealso cref="QuartzSchedulerThread" />
  /// <seealso cref="IJob" />
  /// <seealso cref="Trigger" />
  /// <author>James House</author>
  /// <author>Marko Lahma (.NET)</author>
  public class JobRunShell : IThreadRunnable
  {
    private readonly ILog log;

    private JobExecutionContext jec = null;
    private QuartzScheduler qs = null;
    private readonly IScheduler scheduler = null;
    private readonly SchedulingContext schdCtxt = null;
    private readonly IJobRunShellFactory jobRunShellFactory = null;
    private bool shutdownRequested = false;


    /// <summary>
    /// Create a JobRunShell instance with the given settings.
    /// </summary>
    /// <param name="jobRunShellFactory">A handle to the <see cref="IJobRunShellFactory" /> that produced
    /// this <see cref="JobRunShell" />.</param>
    /// <param name="scheduler">The <see cref="IScheduler" /> instance that should be made
    /// available within the <see cref="JobExecutionContext" />.</param>
    /// <param name="schdCtxt">the <see cref="SchedulingContext" /> that should be used by the
    /// <see cref="JobRunShell" /> when making updates to the <see cref="IJobStore" />.</param>
    public JobRunShell(IJobRunShellFactory jobRunShellFactory, IScheduler scheduler, SchedulingContext schdCtxt)
    {
      this.jobRunShellFactory = jobRunShellFactory;
      this.scheduler = scheduler;
      this.schdCtxt = schdCtxt;
            log = LogManager.GetLogger(GetType());
    }

    /// <summary>
    /// Initializes the job execution context with given scheduler and bundle.
    /// </summary>
    /// <param name="sched">The scheduler.</param>
    /// <param name="firedBundle">The bundle offired triggers.</param>
    public virtual void Initialize(QuartzScheduler sched, TriggerFiredBundle firedBundle)
    {
      qs = sched;

      IJob job;
      JobDetail jobDetail = firedBundle.JobDetail;

      try
      {
        job = sched.JobFactory.NewJob(firedBundle);
      }
      catch (SchedulerException se)
      {
        sched.NotifySchedulerListenersError(string.Format(CultureInfo.InvariantCulture, "An error occured instantiating job to be executed. job= '{0}'", jobDetail.FullName), se);
        throw;
      }
      catch (Exception e)
      {
        SchedulerException se = new SchedulerException(string.Format(CultureInfo.InvariantCulture, "Problem instantiating type '{0}'", jobDetail.JobType.FullName), e);
        sched.NotifySchedulerListenersError(string.Format(CultureInfo.InvariantCulture, "An error occured instantiating job to be executed. job= '{0}'", jobDetail.FullName), se);
        throw se;
      }

      jec = new JobExecutionContext(scheduler, firedBundle, job);
    }

    /// <summary>
    /// Requests the Shutdown.
    /// </summary>
    public virtual void RequestShutdown()
    {
      shutdownRequested = true;
    }

    /// <summary>
    /// This method has to be implemented in order that starting of the thread causes the object's
    /// run method to be called in that separately executing thread.
    /// </summary>
    public virtual void Run()
    {
            try
            {
                Trigger trigger = jec.Trigger;
                JobDetail jobDetail = jec.JobDetail;
                do
                {
                    JobExecutionException jobExEx = null;
                    IJob job = jec.JobInstance;

                    try
                    {
                        Begin();
                    }
                    catch (SchedulerException se)
                    {
                        qs.NotifySchedulerListenersError(
                            string.Format(CultureInfo.InvariantCulture, "Error executing Job ({0}: couldn't begin execution.", jec.JobDetail.FullName),
                            se);
                        break;
                    }

                    // notify job & trigger listeners...
                    SchedulerInstruction instCode;
                    try
                    {
                        if (!NotifyListenersBeginning(jec))
                        {
                            break;
                        }
                    }
                    catch (VetoedException)
                    {
                        try
                        {
                            instCode = trigger.ExecutionComplete(jec, null);
                            try
                            {
                                qs.NotifyJobStoreJobVetoed(schdCtxt, trigger, jobDetail, instCode);
                            }
                            catch (JobPersistenceException)
                            {
                                VetoedJobRetryLoop(trigger, jobDetail, instCode);
                            }
                            Complete(true);
                        }
                        catch (SchedulerException se)
                        {
                            qs.NotifySchedulerListenersError(
                                string.Format(CultureInfo.InvariantCulture, "Error during veto of Job ({0}: couldn't finalize execution.",
                                              jec.JobDetail.FullName), se);
                        }
                        break;
                    }

                    DateTime startTime = DateTime.UtcNow;
                    DateTime endTime;

                    // Execute the job
                    try
                    {
                        if (log.IsDebugEnabled)
                        {
                            log.Debug("Calling Execute on job " + jobDetail.FullName);
                        }
                        job.Execute(jec);
                        endTime = DateTime.UtcNow;
                    }
                    catch (JobExecutionException jee)
                    {
                        endTime = DateTime.UtcNow;
                        jobExEx = jee;
                        log.Info(string.Format(CultureInfo.InvariantCulture, "Job {0} threw a JobExecutionException: ", jobDetail.FullName), jobExEx);
                    }
                    catch (Exception e)
                    {
                        endTime = DateTime.UtcNow;
                        log.Error(string.Format(CultureInfo.InvariantCulture, "Job {0} threw an unhandled Exception: ", jobDetail.FullName), e);
                        SchedulerException se = new SchedulerException("Job threw an unhandled exception.", e);
                        se.ErrorCode = SchedulerException.ErrorJobExecutionThrewException;
                        qs.NotifySchedulerListenersError(
                            string.Format(CultureInfo.InvariantCulture, "Job ({0} threw an exception.", jec.JobDetail.FullName), se);
                        jobExEx = new JobExecutionException(se, false);
                        jobExEx.ErrorCode = JobExecutionException.ErrorJobExecutionThrewException;
                    }

                    jec.JobRunTime = endTime - startTime;

                    // notify all job listeners
                    if (!NotifyJobListenersComplete(jec, jobExEx))
                    {
                        break;
                    }

                    instCode = SchedulerInstruction.NoInstruction;

                    // update the trigger
                    try
                    {
                        instCode = trigger.ExecutionComplete(jec, jobExEx);
                        if (log.IsDebugEnabled)
                        {
                            log.Debug(string.Format(CultureInfo.InvariantCulture, "Trigger instruction : {0}", instCode));
                        }
                    }
                    catch (Exception e)
                    {
                        // If this happens, there's a bug in the trigger...
                        SchedulerException se = new SchedulerException("Trigger threw an unhandled exception.", e);
                        se.ErrorCode = SchedulerException.ErrorTriggerThrewException;
                        qs.NotifySchedulerListenersError("Please report this error to the Quartz developers.", se);
                    }

                    // notify all trigger listeners
                    if (!NotifyTriggerListenersComplete(jec, instCode))
                    {
                        break;
                    }
                    // update job/trigger or re-Execute job
                    if (instCode == SchedulerInstruction.ReExecuteJob)
                    {
                        if (log.IsDebugEnabled)
                        {
                            log.Debug("Rescheduling trigger to reexecute");
                        }
                        jec.IncrementRefireCount();
                        try
                        {
                            Complete(false);
                        }
                        catch (SchedulerException se)
                        {
                            qs.NotifySchedulerListenersError(
                                string.Format(CultureInfo.InvariantCulture, "Error executing Job ({0}: couldn't finalize execution.",
                                              jec.JobDetail.FullName), se);
                        }
                        continue;
                    }

                    try
                    {
                        Complete(true);
                    }
                    catch (SchedulerException se)
                    {
                        qs.NotifySchedulerListenersError(
                            string.Format(CultureInfo.InvariantCulture, "Error executing Job ({0}: couldn't finalize execution.",
                                          jec.JobDetail.FullName), se);
                        continue;
                    }

                    try
                    {
                        qs.NotifyJobStoreJobComplete(schdCtxt, trigger, jobDetail, instCode);
                    }
                    catch (JobPersistenceException jpe)
                    {
                        qs.NotifySchedulerListenersError(
                            string.Format(CultureInfo.InvariantCulture, "An error occured while marking executed job complete. job= '{0}'",
                                          jobDetail.FullName), jpe);
                        if (!CompleteTriggerRetryLoop(trigger, jobDetail, instCode))
                        {
                        }
                        return;
                    }

                    break;
                } while (true);

            }
        finally
            {
                jobRunShellFactory.ReturnJobRunShell(this);
            }
    }

    /// <summary>
    /// Runs begin procedures on this instance.
    /// </summary>
    protected internal virtual void Begin()
    {
    }

    /// <summary>
    /// Completes the execution.
    /// </summary>
    /// <param name="successfulExecution">if set to <c>true</c> [successful execution].</param>
    protected internal virtual void Complete(bool successfulExecution)
    {
    }

    /// <summary>
    /// Passivates this instance.
    /// </summary>
    public virtual void Passivate()
    {
      jec = null;
      qs = null;
    }

    private bool NotifyListenersBeginning(JobExecutionContext ctx)
    {
      bool vetoed;

      // notify all trigger listeners
      try
      {
        vetoed = qs.NotifyTriggerListenersFired(ctx);
      }
      catch (SchedulerException se)
      {
        qs.NotifySchedulerListenersError(
          string.Format(CultureInfo.InvariantCulture, "Unable to notify TriggerListener(s) while firing trigger (Trigger and Job will NOT be fired!). trigger= {0} job= {1}", ctx.Trigger.FullName, ctx.JobDetail.FullName), se);

        return false;
      }

      if (vetoed)
      {
        try
        {
          qs.NotifyJobListenersWasVetoed(ctx);
        }
        catch (SchedulerException se)
        {
          qs.NotifySchedulerListenersError(
            string.Format(CultureInfo.InvariantCulture, "Unable to notify JobListener(s) of vetoed execution while firing trigger (Trigger and Job will NOT be fired!). trigger= {0} job= {1}", ctx.Trigger.FullName, ctx.JobDetail.FullName), se);
        }
        throw new VetoedException(this);
      }

      // notify all job listeners
      try
      {
        qs.NotifyJobListenersToBeExecuted(ctx);
      }
      catch (SchedulerException se)
      {
        qs.NotifySchedulerListenersError(
          string.Format(CultureInfo.InvariantCulture, "Unable to notify JobListener(s) of Job to be executed: (Job will NOT be executed!). trigger= {0} job= {1}", ctx.Trigger.FullName, ctx.JobDetail.FullName), se);

        return false;
      }

      return true;
    }

    private bool NotifyJobListenersComplete(JobExecutionContext ctx, JobExecutionException jobExEx)
    {
      try
      {
        qs.NotifyJobListenersWasExecuted(ctx, jobExEx);
      }
      catch (SchedulerException se)
      {
        qs.NotifySchedulerListenersError(
          string.Format(CultureInfo.InvariantCulture, "Unable to notify JobListener(s) of Job that was executed: (error will be ignored). trigger= {0} job= {1}", ctx.Trigger.FullName, ctx.JobDetail.FullName), se);

        return false;
      }

      return true;
    }

    private bool NotifyTriggerListenersComplete(JobExecutionContext ctx, SchedulerInstruction instCode)
    {
      try
      {
        qs.NotifyTriggerListenersComplete(ctx, instCode);
      }
      catch (SchedulerException se)
      {
        qs.NotifySchedulerListenersError(
          string.Format(CultureInfo.InvariantCulture, "Unable to notify TriggerListener(s) of Job that was executed: (error will be ignored). trigger= {0} job= {1}", ctx.Trigger.FullName, ctx.JobDetail.FullName), se);

        return false;
      }

      if (!ctx.Trigger.GetNextFireTimeUtc().HasValue)
      {
        qs.NotifySchedulerListenersFinalized(ctx.Trigger);
      }

      return true;
    }

    /// <summary>
    /// Completes the trigger retry loop.
    /// </summary>
    /// <param name="trigger">The trigger.</param>
    /// <param name="jobDetail">The job detail.</param>
    /// <param name="instCode">The inst code.</param>
    /// <returns></returns>
        public virtual bool CompleteTriggerRetryLoop(Trigger trigger, JobDetail jobDetail, SchedulerInstruction instCode)
    {
            long count = 0;
            while (!shutdownRequested)
            { // FIXME: jhouse: note that there is no longer anthing that calls requestShutdown()
                try
                {
                    Thread.Sleep(TimeSpan.FromSeconds(15)); 
                    // retry every 15 seconds (the db
                    // connection must be failed)
                    qs.NotifyJobStoreJobComplete(schdCtxt, trigger, jobDetail, instCode);
                    return true;
                }
                catch (JobPersistenceException jpe)
                {
                    if (count % 4 == 0)
                        qs.NotifySchedulerListenersError(
                            "An error occured while marking executed job complete (will continue attempts). job= '"
                                    + jobDetail.FullName + "'", jpe);
                }
                catch (ThreadInterruptedException)
                {
                }
                count++;
            }
            return false;
    }

    /// <summary>
    /// Vetoeds the job retry loop.
    /// </summary>
    /// <param name="trigger">The trigger.</param>
    /// <param name="jobDetail">The job detail.</param>
    /// <param name="instCode">The inst code.</param>
    /// <returns></returns>
        public bool VetoedJobRetryLoop(Trigger trigger, JobDetail jobDetail, SchedulerInstruction instCode)
        {
            while (!shutdownRequested)
            {
                try
                {
                    Thread.Sleep(5 * 1000); // retry every 5 seconds (the db
                    // connection must be failed)
                    qs.NotifyJobStoreJobVetoed(schdCtxt, trigger, jobDetail, instCode);
                    return true;
                }
                catch (JobPersistenceException jpe)
                {
                    qs.NotifySchedulerListenersError(
                            string.Format(CultureInfo.InvariantCulture, "An error occured while marking executed job vetoed. job= '{0}'", jobDetail.FullName), jpe);
                }
                catch (ThreadInterruptedException)
                {
                }
            }
            return false;
        }


    [Serializable]
    internal class VetoedException : Exception
    {
      private readonly JobRunShell enclosingInstance;

      public JobRunShell EnclosingInstance
      {
        get { return enclosingInstance; }
      }

      public VetoedException(JobRunShell shell)
      {
        enclosingInstance = shell;
      }
    }
  }
}
www.java2v.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.