转自:https://juejin.cn/post/7136928319450136606
10. 监听器Listener实现AOP
注意:确保监听器永远不会抛出异常(使用 try-catch)并且它们可以处理内部问题。当监听器失败时,Quartz 无法确定监听器中所需的逻辑是否成功完成后,作业可能会卡住。
侦听器在运行时向调度程序注册,并且不与作业和触发器一起存储在 JobStore 中。这是因为侦听器通常是与您的应用程序的集成点。因此,每次您的应用程序运行时,侦听器都需要重新注册到调度程序。
10.1 TriggerListener
TriggerListeners 接收与触发器相关的事件,用于根据调度程序中发生的事件执行操作。
ITriggerListener 接口
public interface ITriggerListener
{
string Name { get; }
Task TriggerFired(ITrigger trigger, IJobExecutionContext context);
Task<bool> VetoJobExecution(ITrigger trigger, IJobExecutionContext context);
Task TriggerMisfired(ITrigger trigger);
Task TriggerComplete(ITrigger trigger, IJobExecutionContext context, int triggerInstructionCode);
}
定义一个类,实现ITriggerListener 接口或者扩展TriggerListenerSupport类简单地覆盖您感兴趣的事件,而不是实现这些接口
public class CustomTriggerListener : ITriggerListener
{
public string Name => "CustomTriggerListener";
/// <summary>
/// 触发
/// </summary>
/// <param name="trigger"></param>
/// <param name="context"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task TriggerFired(ITrigger trigger, IJobExecutionContext context, CancellationToken cancellationToken = default)
{
Console.WriteLine("【***************************************************************************************************************】");
Console.WriteLine($"【{Name}】---【TriggerFired】-【触发】");
await Task.CompletedTask;
}
/// <summary>
/// 判断作业是否继续
/// </summary>
/// <param name="trigger"></param>
/// <param name="context"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task<bool> VetoJobExecution(ITrigger trigger, IJobExecutionContext context, CancellationToken cancellationToken = default)
{
Console.WriteLine($"【{Name}】---【VetoJobExecution】-【判断作业是否继续】-{true}");
return await Task.FromResult(cancellationToken.IsCancellationRequested);
}
/// <summary>
/// 触发完成
/// </summary>
/// <param name="trigger"></param>
/// <param name="context"></param>
/// <param name="triggerInstructionCode"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task TriggerComplete(ITrigger trigger, IJobExecutionContext context, SchedulerInstruction triggerInstructionCode, CancellationToken cancellationToken = default)
{
Console.WriteLine($"【{Name}】---【TriggerComplete】-【触发完成】");
await Task.CompletedTask;
}
/// <summary>
/// 触发作业
/// </summary>
/// <param name="trigger"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task TriggerMisfired(ITrigger trigger, CancellationToken cancellationToken = default)
{
Console.WriteLine($"【{Name}】---【TriggerMisfired】【触发作业】");
await Task.CompletedTask;
}
}
注册到调度器
//实例化调度器工厂
ISchedulerFactory schedulefactory = new StdSchedulerFactory();
//实例化调度器
IScheduler scheduler = schedulefactory.GetScheduler().Result;
scheduler.Start();
//创建一个作业
IJobDetail job1 = JobBuilder.Create<MyJob>()
.WithIdentity("job1", "groupa")//名称,分组
.Build();
//创建一个触发器
ITrigger trigger1 = TriggerBuilder.Create()
.WithIdentity("trigger1", "groupa")//名称,分组
.StartNow()//从启动的时候开始执行
.WithSimpleSchedule(b =>
{
b.WithIntervalInSeconds(2);//2秒执行一次
})
.Build();
//将trigger监听器注册到调度器
scheduler.ListenerManager.AddTriggerListener(new CustomTriggerListener());
//把作业,触发器加入调度器
scheduler.ScheduleJob(job1, trigger1);
运行结果
【***************************************************************************************************************】
【CustomTriggerListener】---【TriggerFired】-【触发】
【CustomTriggerListener】---【VetoJobExecution】-【判断作业是否继续】-True
【任务执行】:2022/8/15 10:47:19
【触发时间】:2022/8/15 10:47:19
【下次触发时间】:2022/8/15 10:47:21
【CustomTriggerListener】---【TriggerComplete】-【触发完成】
10.2 JobListener
JobListeners 接收与作业相关的事件,用于根据调度程序中发生的事件执行操作。
IJobListener 接口
public interface IJobListener
{
string Name { get; }
Task JobToBeExecuted(IJobExecutionContext context);
Task JobExecutionVetoed(IJobExecutionContext context);
Task JobWasExecuted(IJobExecutionContext context, JobExecutionException jobException);
}
定义一个类,实现IJobListener接口或者扩展JobListenerSupport类简单地覆盖您感兴趣的事件,而不是实现这些接口
public class CustomJobListener : IJobListener
{
public string Name => "CustomJobListener";
/// <summary>
/// 任务执行前
/// </summary>
/// <param name="context"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task JobToBeExecuted(IJobExecutionContext context, CancellationToken cancellationToken = default)
{
Console.WriteLine($"【{Name}】-【JobToBeExecuted】-【要执行的任务】");
await Task.CompletedTask;
}
/// <summary>
/// 任务执行后
/// </summary>
/// <param name="context"></param>
/// <param name="jobException"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task JobWasExecuted(IJobExecutionContext context, JobExecutionException? jobException, CancellationToken cancellationToken = default)
{
Console.WriteLine($"【{Name}】-【JobWasExecuted】-【作业已执行】");
await Task.CompletedTask;
}
/// <summary>
/// 任务被拒绝执行的时候
/// </summary>
/// <param name="context"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task JobExecutionVetoed(IJobExecutionContext context, CancellationToken cancellationToken = default)
{
Console.WriteLine($"【{Name}】-【JobExecutionVetoed】-【工作执行被否决】");
await Task.CompletedTask;
}
}
注册到调度器
//实例化调度器工厂
ISchedulerFactory schedulefactory = new StdSchedulerFactory();
//实例化调度器
IScheduler scheduler = schedulefactory.GetScheduler().Result;
scheduler.Start();
//创建一个作业
IJobDetail job1 = JobBuilder.Create<MyJob>()
.WithIdentity("job1", "groupa")//名称,分组
.Build();
//创建一个触发器
ITrigger trigger1 = TriggerBuilder.Create()
.WithIdentity("trigger1", "groupa")//名称,分组
.StartNow()//从启动的时候开始执行
.WithSimpleSchedule(b =>
{
b.WithIntervalInSeconds(2);//2秒执行一次
})
.Build();
//将job监听器注册到调度器
scheduler.ListenerManager.AddJobListener(new CustomJobListener());
//把作业,触发器加入调度器
scheduler.ScheduleJob(job1, trigger1);
运行结果
【CustomJobListener】-【JobToBeExecuted】-【要执行的任务】
【任务执行】:2022/8/15 11:02:25
【触发时间】:2022/8/15 11:02:25
【下次触发时间】:
【CustomJobListener】-【JobWasExecuted】-【作业已执行】
多种使用场景
添加一个对特定工作感兴趣的 JobListener:
scheduler.ListenerManager.AddJobListener(myJobListener, KeyMatcher<JobKey>.KeyEquals(new JobKey("myJobName", "myJobGroup")));
添加一个对特定组的所有作业感兴趣的 JobListener:
scheduler.ListenerManager.AddJobListener(myJobListener, GroupMatcher<JobKey>.GroupEquals("myJobGroup"));
添加一个对两个特定组的所有作业感兴趣的 JobListener:
scheduler.ListenerManager.AddJobListener(myJobListener,
OrMatcher<JobKey>.Or(GroupMatcher<JobKey>.GroupEquals("myJobGroup"), GroupMatcher<JobKey>.GroupEquals("yourGroup")));
添加一个对所有作业感兴趣的 JobListener:
scheduler.ListenerManager.AddJobListener(myJobListener, GroupMatcher<JobKey>.AnyGroup());
10.3 SchedulerListener
SchedulerListener ,它们只接收调度程序本身的事件通知 - 不一定是与特定触发器或作业相关的事件。
与调度器相关的事件包括:作业/触发器的添加、作业/触发器的移除、调度器内部的严重错误、调度器被关闭的通知等。
ISchedulerListener 接口
public interface ISchedulerListener
{
Task JobScheduled(Trigger trigger);
Task JobUnscheduled(string triggerName, string triggerGroup);
Task TriggerFinalized(Trigger trigger);
Task TriggersPaused(string triggerName, string triggerGroup);
Task TriggersResumed(string triggerName, string triggerGroup);
Task JobsPaused(string jobName, string jobGroup);
Task JobsResumed(string jobName, string jobGroup);
Task SchedulerError(string msg, SchedulerException cause);
Task SchedulerShutdown();
}
添加调度器监听器:
scheduler.ListenerManager.AddSchedulerListener(mySchedListener);
删除 SchedulerListener:
scheduler.ListenerManager.RemoveSchedulerListener(mySchedListener);
11. 线程池ThreadPool
11.1 简介
SimpleThreadPool是Quartz.Net中自带的线程池,默认个数为10个,代表一个Scheduler同一时刻并发的最多只能执行10个job,超过10个的job需要排队等待。
下面通过四种配置方式来实现线程的配置,同时了解下有四种参数配置的方式。
4种方式的优先级为:quartz.config < app.config < 环境变量 < namevaluecollection
11.2 NameValueCollection方式配置
需要利用StdSchedulerFactory的构造函数进行传进去,向哪个Sheduler中传,即配置哪个Sheduler的对应的线程池。
NameValueCollection pars = new NameValueCollection
{
//线程池个数20
["quartz.threadPool.threadCount"] = "20"
};
ISchedulerFactory schedulefactory = new StdSchedulerFactory(pars);
IScheduler scheduler = schedulefactory.GetScheduler().Result;
11.3 系统配置文件的方式配置
App.config/web.config
在.net framwork程序中,可以配置在App.config/web.config文件中,该模式代码中不需要进行任何的额外配置,应用于所有的Sheduler。
<?xml version="1.0" encoding="utf-8" ?>
<configuration>
<!--线程池个数设置 开始-->
<configSections>
<section name="quartz" type="System.Configuration.NameValueSectionHandler, System, Version=1.0.5000.0,Culture=neutral, PublicKeyToken=b77a5c561934e089"/>
</configSections>
<quartz>
<!--设置Sheduler的线程池个数为22-->
<add key="quartz.threadPool.threadCount" value="22"/>
</quartz>
<!--线程池个数设置 结束-->
<startup>
<supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.6"/>
</startup>
</configuration>
appsettings.json
在net core程序中,可以配置在appsettings.json文件中,该模式代码中不需要进行任何的额外配置,应用于所有的Sheduler。
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft": "Warning",
"Microsoft.Hosting.Lifetime": "Information"
}
},
"Quartz": {
"quartz.scheduler.instanceName": "Quartz ASP.NET Core Sample Scheduler"
}
}
11.4 quartz.config的方式进行配置
添加一个配置文件quartz.config,属性设置成 始终复制,该模式代码中不需要进行任何的额外配置,应用于所有的Sheduler。
quartz.threadPool.threadCount=15
11.5 设置环境变量来实现
应用于所有的Sheduler。
Environment.SetEnvironmentVariable("quartz.threadPool.threadCount", "26");
var factory = new StdSchedulerFactory();
var scheduler = factory.GetScheduler();
12. 工作存储JobStore
12.1 RAMJobStore
RAMJobStore是最简单的 JobStore,它也是性能最高,速度最快。它将所有数据保存在 RAM 中。缺点是当您的应用程序结束(或崩溃)时,所有调度信息都会丢失 - 这意味着 RAMJobStore 无法遵守作业和触发器的“非易失性”设置。
配置 Quartz 以使用 RAMJobStore
quartz.jobStore.type = Quartz.Simpl.RAMJobStore, Quartz
如果使用StdSchedulerFactory构建Schedule,不需要做任何特别的配置。Quartz.NET 的默认配置RAMJobStore用作作业存储实现。
12.2 AdoJobStore
AdoJobStore通过 ADO.NET 将所有数据保存在数据库中。它的配置要复杂一些,而且速度也没有那么快。
首先创建数据库表
创建表的SQL: https://github.com/quartznet/quartznet/tree/main/database/tables
- qrtz_blob_triggers : 以Blob 类型存储的触发器。
- qrtz_calendars:日历信息, quartz可配置一个日历来指定一个时间范围。
- qrtz_cron_triggers:cron类型的触发器。
- qrtz_fired_triggers:已触发的触发器。
- qrtz_job_details:jobDetail信息。
- qrtz_locks: 程序的悲观锁的信息(假如使用了悲观锁)。
- qrtz_paused_trigger_graps:暂停掉的触发器。
- qrtz_scheduler_state:调度器状态。
- qrtz_simple_triggers:简单触发器的信息。
- qrtz_simprop_triggers:简单触发器的一些其他信息。
- qrtz_triggers:将Trigger和job进行关联的表。
配置使用 AdoJobStore
目前,作业存储内部实现的唯一选择是JobStoreTX自己创建事务。
quartz.jobStore.type = Quartz.Impl.AdoJobStore.JobStoreTX, Quartz
配置使用 DriverDelegate
选择一个IDriverDelegate实现供 JobStore 使用, StdAdoDelegate是一个通用委托。但是针对不同类型数据库的特殊委托通常具有更好的性能或针对性。可以在Quartz.Impl.AdoJobStore命名空间中找到这些委托,具体的可以参考配置参考。
quartz.jobStore.driverDelegateType = Quartz.Impl.AdoJobStore.StdAdoDelegate, Quartz
配置表前缀
提供的sql脚本中所有表格都以前缀QRTZ_开头,这个前缀实际上可以配置的,使用不同的前缀可能有助于在同一数据库中为多个调度程序实例创建多组表。
quartz.jobStore.tablePrefix = QRTZ_
配置数据源名称
quartz.jobStore.dataSource = myDS
配置数据源的连接字符串和数据库提供者
具体的配置可以查看配置参考
quartz.dataSource.myDS.connectionString = Server=localhost;Database=quartz;Uid=quartznet;Pwd=quartznet
quartz.dataSource.myDS.provider = MySql
如果您的调度程序非常繁忙(即几乎总是执行与线程池大小相同数量的作业,那么您可能应该将数据源中的连接数设置为大约线程池大小 + 1。这通常在 ADO.NET 连接字符串中配置。
配置使用字符串作为 JobDataMap 值
可以设置为“ quartz.jobStore.useProperties=true(默认为 false),以指示 AdoJobStore JobDataMaps 中的所有值都是字符串,因此可以存储为键值对,而不是以序列化形式存储更复杂的对象在 BLOB 列。大大降低了类型序列化问题的可能性。
quartz.jobStore.useProperties = true
配置存储数据用的序列化程序
Quartz.NET 支持二进制和 JSON 序列化来存储数据到数据库。JSON序列化来自单独的NuGet 包 Quartz.Serialization.Json。建议使用JSON序列化。
quartz.serializer.type = json
12.3 实现Quartz持久化
- nuget引入mysql驱动程序
MySql.Data - nuget引入序列化包
Quartz.Serialization.Json - 安装数据库:这里使用docker安装数据库
docker run --name mysqlserver -v /data/mysql/conf:/etc/mysql/conf.d -v /data/mysql/logs:/logs -v /data/mysql/data:/var/lib/mysql -e MYSQL_ROOT_PASSWORD=123456 -d -i -p 3306:3306 mysql:latest --lower_case_table_names=1,linux相关知识请参考linux详解,docker相关知识请参考docker详解,mysql相关知识请参考mysql详解,创建数据库quartzmanager。 - 数据库
quartzmanager中执行mysql数据库对应的sql脚本: https://github.com/quartznet/quartznet/tree/main/database/tables - 连接字符串参考:www.connectionstrings.com,这里填上自己的数据库服务器IP地址。
- 运行程序