Hangfire Redis实现秒级定时任务与CQRS实现动态执行代码的实践

笔记哥 / 04-18 / 37点赞 / 0评论 / 308阅读
目录 - 定时任务需求 - 核心逻辑 - 使用 Redis 实现秒级定时任务 - **第一步** - **第二步** - **第三步** - **第四步** - 业务服务实现动态代码 - 第一步 - 第二步 - 第三步 - 第四步 - 第五步 - 最后 ### 定时任务需求 本文示例项目仓库:whuanle/HangfireDemo 主要有两个核心需求: - 需要实现秒级定时任务; - 开发者使用定时任务要简单,不要弄复杂了; 在微服务架构中中,定时任务是最常用的基础设施组件之一,社区中有很多定时任务类库或平台,例如 Quartz.NET、xxx-job,使用方法差异很大,比如 xxx-job 的核心是 http 请求,配置定时任务实现 http 请求具体的接口,不过用起来还是比较复杂的。 在微服务中,使用的组件太多了,如果每个组件的集成都搞得很麻烦,那么服务的代码很可能会大量膨胀,并且容易出现各种 bug。以 xxx-job 为例,如果项目中有 N 个定时任务,设计 N 个 http 接口被 xxx-job 回调触发,除了 http 接口数量庞大,在各个环节中还容易出现 bug。 在近期项目需求中,刚好要用到定时任务,结合 C# 语言的特性,笔者的方法是利用 Hangfire 框架和语言特性,封装一些方法,使得开发者可以无感使用定时任务,大大简化链路和使用难度。 使用示例,结合 MediatR 框架定义 CQRS ,该 Command 将会被定时任务触发执行: ```csharp public class MyTestRequest : HangfireRequest, IRequest { } /// /// 要被定时任务执行的代码. /// public class MyTestHandler : IRequestHandler { public async Task Handle(MyTestRequest request, CancellationToken cancellationToken) { // 逻辑 return new ExecteTasResult { CancelTask = false }; } } ``` 要启动一个定时任务,只需要: ```csharp private readonly SendHangfireService _hangfireService; public SendTaskController(SendHangfireService hangfireService) {_hangfireService = hangfireService; } [HttpGet("aaa")] public async Task SendAsync() {await _hangfireService.Send(new MyTestRequest{ CreateTime = DateTimeOffset.Now, CronExpression = "* * * * * *", TaskId = Guid.NewGuid().ToString(),}); return "aaa"; } ``` 通过这种方式使用定时任务,开发者只需要使用很简单的代码即可完成需求,不需要关注细节,也不需要定义各种 http 接口,并且犹豫不需要关注使用的外部定时任务框架,所以随时可以切换不同的定时任务实现。 ### 核心逻辑 本文示例项目仓库:whuanle/HangfireDemo 示例项目结构如下: ![image-20250418084329362](https://cdn.res.knowhub.vip/c/2504/19/41acec4d.png?G1YAAMTsdJxIPhK026hD2jvFHc2ARBZBpYT1es9Z%2byb6fhcWjc9offr%2b8JfWpxMSTC8jYTFWBI8KtoLMhpAlJS1QjWs4) HangfireServer 是定时任务服务实现,HangfireServer 服务只需要暴露两个接口 `addtask`、`cancel`,分别用于添加定时任务和取消定时任务,无论什么业务的服务,都通过 `addtask` 服务添加。 DemoApi 则是业务服务,业务服务只需要暴露一个· `execute` 接口用于触发定时任务即可。 基础逻辑如下: graph LR subgraph DemoApi A[定义 Command] -- 序列化参数Command --> AA[发送定时任务] E[DemoApi:execute 接口] --> F[DemoApi:执行 Command] end subgraph Hangfire B[addtask] --> C[Hangfire:存储任务] C --> D[Hangfire:执行任务] D --> DD[发起请求] end %% 同时建立必要的连接 AA -- 添加定时任务 --> B DD -- 请求 --> E 由于项目中使用的是 MediatR 框架实现 CQRS 模式,因此很容易实现定时任务动态调用代码,只需要按照平时的 CQRS 发送定时任务命令,指定定时任务要执行的 Command 即可。 例如,有以下 Command 需要被定时任务执行: ```csharp ACommand BCommand CCommand ``` 首先这些命令会被序列化为 json ,发送到 HangfireServer 服务,HangfireServer 在恰当时机将参数原封不动推送到 DemoApi 服务,DemoApi 服务拿到这些参数序列化为对应的类型,然后通过 MediatR 发送命令,即可实现任意命令的定时任务动态调用。 下面来分别实现 HangfireServer 、DemoApi 服务。 在 Shred 项目中添加以下文件。 ![image-20250418093956062](https://cdn.res.knowhub.vip/c/2504/19/54ffea92.png?G1cAAMTW3DgpPEJF22gDdWfqnTYDFmkElRLW6917rpvo%2b0NYND%2b9thHrw29qG0EoMHUjYTFWpIALbCfgagmHQIrAPc8e) 其中 TaskRequest 内容如下,其它文件请参考示例项目。 ```csharp public class TaskRequest { /// /// 任务 id. /// public string TaskId { get; set; } = ""; /// /// 定时任务要请求的服务地址或服务名称. /// public string ServiceName { get; set; } = ""; /// /// 参数类型名称. /// public string CommandType { get; set; } = ""; /// /// 请求参数内容,json 序列化后的字符串. /// public string CommandBody { get; set; } = ""; /// /// Cron 表达式. /// public string CronExpression { get; set; } = ""; /// /// 创建时间. /// public string CreateTime { get; set; } = ""; } ``` ### 使用 Redis 实现秒级定时任务 Hangfire 本身配置比较复杂,其分布式实现对数据库性能要求比较高,因此使用 Mysql、Sqlserver 等数据库存储数据会带了很大的压力,而且要求实现秒级定时任务,NoSql 数据库可以更加好地实现这一需求,笔者这里使用 Redis 来存储任务数据。 HangfireServer 项目结构如下: ![image-20250418094109409](https://cdn.res.knowhub.vip/c/2504/19/d0c36d68.png?G1YAAMTW3DgpD0JR22gDdWfqnTYDElkElRLW6917rpvo%2b0Mgmp9e24j14S%2b1jSAubHoZCcSgSJ6dYSfz4Z4cAIu65dkD) 对 HangfireServer 的设计主要分为几步: - Hangfire 支持容器管理; - 配置 Hangfire ; - 定义 RecurringJobHandler 执行任务发起 http 请求到业务系统; - 定义 http 接口,接收定时任务; 引入类库: ```csharp ``` 首先是关于 Hangfire 本身的配置,现在几乎都是基于依赖注入的设计,不搞静态类型,所以我们需要实现定时任务执行器创建服务实例的,以便每次定时任务请求时,服务实例都是在一个新的容器,处以一个新的上下文中。 #### **第一步** 创建 HangfireJobActivatorScope、HangfireActivator 两个文件,实现 Hangfire 支持容器上下文。 ```csharp /// /// 任务容器. /// public class HangfireJobActivatorScope : JobActivatorScope { private readonly IServiceScope _serviceScope; private readonly string _jobId; /// /// Initializes a new instance of the class. /// /// /// public HangfireJobActivatorScope([NotNull] IServiceScope serviceScope, string jobId) { _serviceScope = serviceScope ?? throw new ArgumentNullException(nameof(serviceScope)); _jobId = jobId; } /// public override object Resolve(Type type) { var res = ActivatorUtilities.GetServiceOrCreateInstance(_serviceScope.ServiceProvider, type); return res; } /// public override void DisposeScope() { _serviceScope.Dispose(); } } ``` ```csharp /// /// JobActivator. /// public class HangfireActivator : JobActivator { private readonly IServiceScopeFactory _serviceScopeFactory; /// /// Initializes a new instance of the class. /// /// public HangfireActivator(IServiceScopeFactory serviceScopeFactory) { _serviceScopeFactory = serviceScopeFactory ?? throw new ArgumentNullException(nameof(serviceScopeFactory)); } /// public override JobActivatorScope BeginScope(JobActivatorContext context) { return new HangfireJobActivatorScope(_serviceScopeFactory.CreateScope(), context.BackgroundJob.Id); } } ``` #### **第二步** 配置 Hangfire 服务,使其支持 Redis,并且配置一些参数。 ```csharp private void ConfigureHangfire(IServiceCollection services) {var options = new RedisStorageOptions { // 配置 redis 前缀,每个任务实例都会创建一个 key Prefix = "aaa:aaa:hangfire", }; services.AddHangfire( config => { config.UseRedisStorage("{redis连接字符串}", options) .SetDataCompatibilityLevel(CompatibilityLevel.Version_180) .UseSimpleAssemblyNameTypeSerializer() .UseRecommendedSerializerSettings(); config.UseActivator(new HangfireActivator(services.BuildServiceProvider().GetRequiredService())); }); services.AddHangfireServer(options =>{ // 注意,这里必须设置非常小的间隔 options.SchedulePollingInterval = TimeSpan.FromSeconds(1); // 如果考虑到后续任务比较多,则需要调大此参数 options.WorkerCount = 50;}); } ``` ![1744940846417](https://cdn.res.knowhub.vip/c/2504/19/4856dafa.png?G1YAAMTW3DgpgHxF22gDdWfqnTYDElkElRLW6917rpvo%2b0NZLT%2b9thHrw19qG0FSBOYgZQUbkpdLGKcITJPD7XBX5NkD) #### **第三步** 实现 RecurringJobHandler 执行定时任务,发起 http 请求业务系统。 被调用方要返回 TaskInterfaceResponse 类型,主要考虑如果被调用方后续不需要在继续此定时任务,那么返回参数 `CancelTask = tre` 时,定时任务服务直接取消后续的任务即可,不需要被调用方手动调用接口取消。 ```csharp public class RecurringJobHandler { private readonly IServiceProvider _serviceProvider; public RecurringJobHandler(IServiceProvider serviceProvider) { _serviceProvider = serviceProvider; } /// /// 执行任务. /// /// /// Task. public async Task Handler(TaskRequest taskRequest) { var ioc = _serviceProvider; var recurringJobManager = ioc.GetRequiredService(); var httpClientFactory = ioc.GetRequiredService(); var logger = ioc.GetRequiredService>(); using var httpClient = httpClientFactory.CreateClient(taskRequest.ServiceName); // 无论是否请求成功,都算完成了本次任务 try { // 请求子系统的接口 var response = await httpClient.PostAsJsonAsync(taskRequest.ServiceName, taskRequest); var execteResult = await response.Content.ReadFromJsonAsync(); // 被调用方要求取消任务 if (execteResult != null && execteResult.CancelTask) { recurringJobManager.RemoveIfExists(taskRequest.TaskId); } } catch (Exception ex) { logger.LogError(ex, "Task error."); } } } ``` #### **第四步** 配置好 Hangfire 后,开始考虑如何接收任务和发起请求,首先定义一个 Http 接口或 grpc 接口。 ```csharp [ApiController] [Route("/execute")] public class HangfireController : ControllerBase { private readonly IRecurringJobManager _recurringJobManager; public HangfireController(IRecurringJobManager recurringJobManager) { _recurringJobManager = recurringJobManager; } [HttpPost("addtask")] public async Task AddTask(TaskRequest value) { await Task.CompletedTask; _recurringJobManager.AddOrUpdate( value.TaskId, task => task.Handler(value), cronExpression: value.CronExpression, options: new RecurringJobOptions { }); return new TaskResponse { }; } [HttpPost("cancel")] public async Task Cancel(CancelTaskRequest value) { await Task.CompletedTask; _recurringJobManager.RemoveIfExists(value.TaskId); return new TaskResponse { }; } } ``` ### 业务服务实现动态代码 业务服务只需要暴露一个 `exceute` 接口给 HangfireServer 即可,DemoApi 将 Command 序列化包装为请求参数给 HangfireServer ,然后 HangfireServer 原封不动地将参数请求到 `exceute` 接口。 ![image-20250418095553964](https://cdn.res.knowhub.vip/c/2504/19/97ad2f96.png?G1cAAER17rxga9ky9TvxQBMEEmwGLNIIKiWs1%2fP%2fa18i79eotHyP2mfbH35T%2b2yCArfThUpXQwo4oB4goiSYKYJk5DUa) 对 DemoApi 主要设计过程如下: - 定义 SendHangfireService 服务,包装 Command 数据和一些定时任务参数,通过 http 发送到 HangfireServer 中; - 定义 ExecuteTaskHandler ,当接口被触发时,实现反序列化参数并使用 MediatR 发送 Command,实现动态执行; - 定义 ExecuteController 接口,接收 HangfireServer 请求,并调用 ExecuteTaskHandler 处理请求; DemoApi 引入类库如下-: ```csharp - ``` > > > Maomi.Core 是一个模块化和自动服务注册框架。 > #### 第一步 定义 SendHangfireService 服务,包装 Command 数据和一些定时任务参数,通过 http 发送到 HangfireServer 中。 接收 HangfireServer 请求时,需要通过字符串查找出 Type,这就需要 DemoApi 启动时,自动扫描程序集并将对应的类型缓存起来。 为了将定时任务命令和其它 Command 区分处理,需要定义一个统一的抽象,当然也可以不这样做,也可以通过特性注解的方式做处理。 ```csharp /// /// 定时任务抽象参数. /// public abstract class HangfireRequest : IRequest { /// /// 定时任务 id. /// public string TaskId { get; init; } = string.Empty; /// /// 该任务创建时间. /// public DateTimeOffset CreateTime { get; init; } } ``` 定义 HangireTypeFactory ,以便能够通过字符串快速查找 Type。 ```csharp /// /// 记录 CQRS 中的命令类型,以便能够通过字符串快速查找 Type. /// public class HangireTypeFactory { private readonly ConcurrentDictionary _typeDictionary; public HangireTypeFactory() { _typeDictionary = new ConcurrentDictionary(); } public void Add(Type type) { if (!_typeDictionary.ContainsKey(type.Name)) { _typeDictionary[type.Name] = type; } } public Type? Get(string typeName) { if (_typeDictionary.TryGetValue(typeName, out var type)) { return type; } return _typeDictionary.FirstOrDefault(x => x.Value.FullName == typeName).Value; } } ``` 最后实现 SendHangfireService 服务,能够包装参数发送到 HangfireServer 中。 > > > 当然,可以使用 CQRS 处理。 > ```csharp /// /// 定时任务服务,用于发送定时任务请求. /// [InjectOnScoped] public class SendHangfireService { private static readonly JsonSerializerOptions JsonOptions = new JsonSerializerOptions { AllowTrailingCommas = true, PropertyNameCaseInsensitive = true, PropertyNamingPolicy = JsonNamingPolicy.CamelCase, ReadCommentHandling = JsonCommentHandling.Skip }; private readonly IHttpClientFactory _httpClientFactory; public SendHangfireService(IHttpClientFactory httpClientFactory) { _httpClientFactory = httpClientFactory; } /// /// 发送定时任务请求. /// /// /// /// /// /// public async Task Send(TCommand request) where TCommand : HangfireRequest { using var httpClient = _httpClientFactory.CreateClient(); var taskRequest = new TaskRequest { TaskId = request.TaskId, CommandBody = JsonSerializer.Serialize(request, JsonOptions), ServiceName = "http://127.0.0.1:5000/hangfire/execute", CommandType = typeof(TCommand).Name ?? throw new TypeLoadException(typeof(TCommand).Name), CreateTime = request.CreateTime.ToUnixTimeMilliseconds().ToString(), CronExpression = request.CronExpression, }; _ = await httpClient.PostAsJsonAsync("http://127.0.0.1:5001/execute/addtask", taskRequest); } /// /// 取消定时任务. /// /// /// public async Task Cancel(string taskId) { using var httpClient = _httpClientFactory.CreateClient(); _ = await httpClient.PostAsJsonAsync("http://127.0.0.1:5001/hangfire/cancel", new CancelTaskRequest { TaskId = taskId }); } } ``` #### 第二步 要实现通过 Type 动态执行某个 Command ,其实思路比较简单,也并不需要表达式树等麻烦的方式。 笔者的实现思路如下,定义 ExecuteTaskHandler 泛型类,直接以强类型的方式触发 Command,但是为了屏蔽泛型类型强类型在代码调用中的麻烦,需要再抽象一个接口 IHangfireTaskHandler 屏蔽泛型。 ```csharp /// /// 定义执行任务的抽象,便于忽略泛型处理. /// public interface IHangfireTaskHandler { /// /// 执行任务. /// /// /// Task Handler(TaskRequest taskRequest); } ``` ```csharp /// /// 用于反序列化参数并发送 Command. /// /// 命令. public class ExecuteTaskHandler : IHangfireTaskHandler where TCommand : HangfireRequest, IRequest { private readonly IMediator _mediator; /// /// Initializes a new instance of the class. /// /// public ExecuteTaskHandler(IMediator mediator) { _mediator = mediator; } private static readonly JsonSerializerOptions JsonSerializerOptions = new JsonSerializerOptions { AllowTrailingCommas = true, PropertyNameCaseInsensitive = true, PropertyNamingPolicy = JsonNamingPolicy.CamelCase, ReadCommentHandling = JsonCommentHandling.Skip }; /// public async Task Handler(TaskRequest taskRequest) { var command = JsonSerializer.Deserialize(taskRequest.CommandBody, JsonSerializerOptions)!; if (command == null) { throw new Exception("解析命令参数失败"); } // 处理命令的逻辑 var response = await _mediator.Send(command); return response; } } ``` #### 第三步 实现定时任务 `execute` 触发接口,然后将参数转发到 ExecuteTaskHandler 中,这里通过依赖注入的方式屏蔽和解决强类型的问题。 ```csharp /// /// 定时任务触发入口. /// [ApiController] [Route("/hangfire")] public class ExecuteController : ControllerBase { private readonly IServiceProvider _serviceProvider; private readonly HangireTypeFactory _hangireTypeFactory; public ExecuteController(IServiceProvider serviceProvider, HangireTypeFactory hangireTypeFactory) { _serviceProvider = serviceProvider; _hangireTypeFactory = hangireTypeFactory; } [HttpPost("execute")] public async Task ExecuteTask([FromBody] TaskRequest request) { var commandType = _hangireTypeFactory.Get(request.CommandType); // 找不到该事件类型,取消后续事件执行 if (commandType == null) { return new ExecteTasResult { CancelTask = true }; } var commandTypeHandler = typeof(ExecuteTaskHandler<>).MakeGenericType(commandType); var handler = _serviceProvider.GetService(commandTypeHandler) as IHangfireTaskHandler; if(handler == null) { return new ExecteTasResult { CancelTask = true }; } return await handler.Handler(request); } } ``` #### 第四步 封装好代码后,开始最后一个环境,配置和注册服务,由于笔者使用 `Maomi.Core` 框架,因此服务注册配置和扫描程序集变得非常简单,只需要通过 `Maomi.Core` 框架提供的接口即可最简单地实现功能。 ```csharp public class ApiModule : Maomi.ModuleCore, IModule { private readonly HangireTypeFactory _hangireTypeFactory; public ApiModule() { _hangireTypeFactory = new HangireTypeFactory(); } public override void ConfigureServices(ServiceContext context) { context.Services.AddTransient(typeof(ExecuteTaskHandler<>)); context.Services.AddSingleton(_hangireTypeFactory); context.Services.AddHttpClient(); context.Services.AddMediatR(o => { o.RegisterServicesFromAssemblies(context.Modules.Select(x => x.Assembly).ToArray()); }); } public override void TypeFilter(Type type) { if (!type.IsClass || type.IsAbstract) { return; } if (type.IsAssignableTo(typeof(HangfireRequest))) { _hangireTypeFactory.Add(type); } } } ``` ![1744942983410](https://cdn.res.knowhub.vip/c/2504/19/9ae0f030.png?G1YAAGSd87ygi2Lrd%2bJRTRBIoBmQyCKolLBe7zlr3wDfH4xc8jNan7E%2f%2fKX1GUBCWlyBkRULkicj1Iu4uiVVdjMRz2sE) #### 第五步 开发者可以这样写定时任务 Command 以及执行器,然后通过接口触发定时任务。 ```csharp public class MyTestRequest : HangfireRequest, IRequest { } /// /// 要被定时任务执行的代码. /// public class MyTestHandler : IRequestHandler { private static volatile int _count; private static DateTimeOffset _lastTime; public async Task Handle(MyTestRequest request, CancellationToken cancellationToken) { _count++; if (_lastTime == default) { _lastTime = DateTimeOffset.Now; } Console.WriteLine($""" 执行时间:{DateTimeOffset.Now.ToString("HH:mm:ss.ffff")} 执行频率(每 10s):{(_count / (DateTimeOffset.Now - _lastTime).TotalSeconds * 10)} """); return new ExecteTasResult { CancelTask = false }; } } ``` ```csharp [ApiController] [Route("/test")] public class SendTaskController : ControllerBase { private readonly SendHangfireService _hangfireService; public SendTaskController(SendHangfireService hangfireService) { _hangfireService = hangfireService; } [HttpGet("aaa")] public async Task SendAsync() { await _hangfireService.Send(new MyTestRequest { CreateTime = DateTimeOffset.Now, CronExpression = "* * * * * *", TaskId = Guid.NewGuid().ToString(), }); return "aaa"; } } ``` ### 最后 启动项目测试代码,记录执行频率和时间间隔。 ![image-20250418103509714](https://cdn.res.knowhub.vip/c/2504/19/422be2ae.png?G1YAAMR0rnGCXr8gbiMO1QSBBJoBiSyCSgnr9Z6z9k30%2fSEQy89ofcb%2b8JfWZxAru1UngTgMyfPF8MJqUpMUVrhC8xoB) ![动画](https://cdn.res.knowhub.vip/c/2504/19/4ce9e8d4.gif?G1YAAMTm2vOlAB2G%2fZ62RW6WbdYMSGQRVEpYr%2fv%2fv3UQ9SeUFfl825ixPvyljRkkmxiqkbIaA8mLC9sOca4JKM6lwnP0Cw%3d%3d)