MASA Framework 事件总线 - 进程内事件总线

news/2024/5/18 12:23:15/文章来源:https://blog.csdn.net/MASAteam/article/details/128011875

概述

事件总线是一种事件发布/订阅结构,通过发布订阅模式可以解耦不同架构层级,同样它也可以来解决业务之间的耦合,它有以下优点

  • 松耦合
  • 横切关注点
  • 可测试性
  • 事件驱动

发布订阅模式

通过下图我们可以快速了解发布订阅模式的本质

  1. 订阅者将自己关心的事件在调度中心进行注册
  2. 事件的发布者通过调度中心把事件发布出去
  3. 订阅者收到自己关心的事件变更并执行相对应业务

EventBus.png

其中发布者无需知道订阅者是谁,订阅者彼此之间也互不认识,彼此之间互不干扰

事件总线类型

在Masa Framework中,将事件划分为

  • 进程内事件 (Event)

本地事件,它的发布与订阅需要在同一个进程中,订阅方与发布方需要在同一个项目中

  • 跨进程事件 (IntegrationEvent)

集成事件,它的发布与订阅一定不在同一个进程中,订阅方与发布方可以在同一个项目中,也可以在不同的项目中

下面我们会用一个注册用户的例子来说明如何使用本地事件

入门

  • 安装.NET 6.0
  1. 新建ASP.NET Core 空项目Assignment.InProcessEventBus,并安装Masa.Contrib.Dispatcher.Events
dotnet new web -o Assignment.InProcessEventBus
cd Assignment.InProcessEventBus
dotnet add package Masa.Contrib.Dispatcher.Events --version 0.7.0-preview.7
  1. 注册EventBus (用于发布本地事件), 修改Program.cs
builder.Services.AddEventBus();
  1. 新增RegisterUserEvent类并继承Event,用于发布注册用户事件
public record RegisterEvent : Event
{public string Account { get; set; }public string Email { get; set; }public string Password { get; set; }
}
  1. 新增注册用户处理程序

在指定事件处理程序方法上增加特性 EventHandler,并在方法中增加参数 RegisterUserEvent

public class UserHandler
{private readonly ILogger<UserHandler>? _logger;public UserHandler(ILogger<UserHandler>? logger = null){//todo: 根据需要可在构造函数中注入其它服务 (需支持从DI获取)_logger = logger;}[EventHandler]public void RegisterUser(RegisterUserEvent @event){//todo: 1. 编写注册用户业务_logger?.LogDebug("-----------{Message}-----------", "检测用户是否存在并注册用户");//todo: 2. 编写发送注册通知等_logger?.LogDebug("-----------{Account} 注册成功 {Message}-----------", @event.Account, "发送邮件提示注册成功");}
}

注册用户的处理程序可以放到任意一个类中,但其构造函数参数必须支持从DI获取,且处理程序的方法仅支持 TaskVoid 两种, 不支持其它类型

  1. 发送注册用户事件,修改Program.cs
app.MapPost("/register", async (RegisterUserEvent @event, IEventBus eventBus) =>
{await eventBus.PublishAsync(@event);
});

进阶

处理流程

EventBus的 请求管道包含一系列请求委托,依次调用。 它们与ASP.NET Core中间件有异曲同工之妙,区别点在于中间件的执行顺序与注册顺序相反,最先注册的最后执行

EventBus.png

每个委托均可在下一个委托前后执行操作,其中TransactionMiddleware是EventBus发布后第一个要进入的中间件 (默认提供),并且它是不支持多次嵌套的。

EventBus 支持嵌套,这意味着我们可以在Handler中重新发布一个新的Event,但TransactionMiddleware仅会在最外层进入时被触发一次

自定义中间件

根据需要我们可以自定义中间件,并注册到EventBus的请求管道中,比如通过增加FluentValidation, 将参数验证从业务代码中剥离开来,从而使得处理程序更专注于业务

  1. 注册FluentValidation, 修改Program.cs
builder.Services.AddValidatorsFromAssembly(Assembly.GetEntryAssembly());
  1. 自定义验证中间件ValidatorMiddleware.cs,用于验证参数
public class ValidatorMiddleware<TEvent> : Middleware<TEvent>where TEvent : IEvent
{private readonly ILogger<ValidatorMiddleware<TEvent>>? _logger;private readonly IEnumerable<IValidator<TEvent>> _validators;public ValidatorMiddleware(IEnumerable<IValidator<TEvent>> validators, ILogger<ValidatorMiddleware<TEvent>>? logger = null){_validators = validators;_logger = logger;}public override async Task HandleAsync(TEvent @event, EventHandlerDelegate next){var typeName = @event.GetType().FullName;_logger?.LogDebug("----- Validating command {CommandType}", typeName);var failures = _validators.Select(v => v.Validate(@event)).SelectMany(result => result.Errors).Where(error => error != null).ToList();if (failures.Any()){_logger?.LogError("Validation errors - {CommandType} - Event: {@Command} - Errors: {@ValidationErrors}",typeName,@event,failures);throw new ValidationException("Validation exception", failures);}await next();}
}
  1. 注册EventBus并使用验证中间件ValidatorMiddleware
builder.Services.AddEventBus(eventBusBuilder=>eventBusBuilder.UseMiddleware(typeof(ValidatorMiddleware<>)));
  1. 添加注册用户验证类RegisterUserEventValidator.cs
public class RegisterUserEventValidator : AbstractValidator<RegisterUserEvent>
{public RegisterUserEventValidator(){RuleFor(e => e.Account).NotNull().WithMessage("用户名不能为空");RuleFor(e => e.Email).NotNull().WithMessage("邮箱不能为空");RuleFor(e => e.Password).NotNull().WithMessage("密码不能为空").MinimumLength(6).WithMessage("密码必须大于6位").MaximumLength(20).WithMessage("密码必须小于20位");}
}

编排

EventBus 支持事件编排,它们可以用来处理一些对执行顺序有要求的业务,比如: 注册用户必须成功之后才可以发送注册邮件通知,发送奖励等等,那我们可以这样做

将注册用户业务拆分为三个Handler,并通过指定Order的值来对执行事件排序

public class UserHandler
{private readonly ILogger<UserHandler>? _logger;public UserHandler(ILogger<UserHandler>? logger = null){_logger = logger;}[EventHandler(1)]public void RegisterUser(RegisterUserEvent @event){_logger?.LogDebug("-----------{Message}-----------", "检测用户是否存在并注册用户");//todo: 编写注册用户业务}[EventHandler(2)]public void SendAwardByRegister(RegisterUserEvent @event){_logger?.LogDebug("-----------{Account} 注册成功 {Message}-----------", @event.Account, "发送注册奖励");//todo: 编写发送奖励等}[EventHandler(3)]public void SendNoticeByRegister(RegisterUserEvent @event){_logger?.LogDebug("-----------{Account} 注册成功 {Message}-----------", @event.Account, "发送注册成功邮件");//todo: 编写发送注册通知等}
}

Saga

EventBus支持Saga模式

Saga

具体是怎么做呢?

[EventHandler(1, IsCancel = true)]
public void CancelSendAwardByRegister(RegisterUserEvent @event)
{_logger?.LogDebug("-----------{Account} 注册成功,发放奖励失败 {Message}-----------", @event.Account, "发放奖励补偿");
}

当发送奖励出现异常时,则执行补偿机制,执行顺序为 (2 - 1) > 0,由于目前仅存在一个Order为1的Handler,则执行奖励补偿后退出

但对于部分不需要执行失败但不需要执行回退的方法,我们可以修改 FailureLevels 确保不会因为当前方法的异常而导致执行补偿机制

[EventHandler(3, FailureLevels = FailureLevels.Ignore)]
public void SendNoticeByRegister(RegisterUserEvent @event)
{_logger?.LogDebug("-----------{Account} 注册成功 {Message}-----------", @event.Account, "发送邮件提示注册成功");//todo: 编写发送注册通知等
}

源码解读

EventHandler

  • FailureLevels: 失败级别, 默认: Throw
    • Throw:发生异常后,依次执行Order小于当前Handler的Order的取消动作,比如:Handler顺序为 1、2、3,CancelHandler为 1、2、3,如果执行 Handler3 异常,则依次执行 2、1
    • ThrowAndCancel:发生异常后,依次执行Order小于等于当前Handler的Order的取消动作,比如:Handler顺序为 1、2、3,CancelHandler为 1、2、3,如果执行 Handler3 异常,则依次执行 3、2、1
    • Ignore:发生异常后,忽略当前异常(不执行取消动作),继续执行其他Handler
  • Order: 执行顺序,默认: int.MaxValue,用于控制当前方法的执行顺序
  • EnableRetry: 当Handler异常后是否启用重试, 默认: false
  • RetryTimes: 重试次数,当出现异常后执行多少次重试, 需开启重试配置
  • IsCancel: 是否是补偿机制,默认: false

Middleware

  • SupportRecursive: 是否支持递归 (嵌套), 默认: true
    • 部分中间件仅在最外层被触发一次,像TransactionMiddleware 就是如此,但也有很多中间件是需要被多次执行的,比如ValidatorMiddleware,每次发布事件时都需要验证参数是否正确
  • HandleAsync(TEvent @event, EventHandlerDelegate next): 处理程序,通过调用 next() 使得请求进入下一个Handler

IEventHandler 与 ISagaEventHandler

  • HandleAsync(TEvent @event): 提供事件的Handler
  • CancelAsync(TEvent @event): 提供事件的补偿Handler

EventHandler功能类似,提供基本的Handler以及补偿Handler,推荐使用EventHandler的方式使用

TransactionMiddleware

提供事务中间件,当EventBusUoW以及Masa提供的Repository来使用时,当存在待提交的数据时,会自动执行保存并提交,当出现异常后,会执行事务回滚,无需担心脏数据入库

性能测试

与市面上使用较多的MeidatR作了对比,结果如下图所示:

BenchmarkDotNet=v0.13.1, OS=Windows 10.0.19043.1023 (21H1/May2021Update)
11th Gen Intel Core i7-11700 2.50GHz, 1 CPU, 16 logical and 8 physical cores
.NET SDK=7.0.100-preview.4.22252.9
[Host] : .NET 6.0.6 (6.0.622.26707), X64 RyuJIT DEBUG
Job-MHJZJL : .NET 6.0.6 (6.0.622.26707), X64 RyuJIT

Runtime=.NET 6.0 IterationCount=100 RunStrategy=ColdStart

MethodMeanErrorStdDevMedianMinMax
AddShoppingCartByEventBusAsync124.80 us346.93 us1,022.94 us8.650 us6.500 us10,202.4 us
AddShoppingCartByMediatRAsync110.57 us306.47 us903.64 us7.500 us5.300 us9,000.1 us

根据性能测试我们发现,EventBus与MediatR性能差距很小,但EventBus提供的功能却要强大的多

常见问题

  1. 按照文档操作,通过EventBus发布事件后,对应的Handler并没有执行,也没有发现错误?

①. EventBus.PublishAsync(@event) 是异步方法,确保等待方法调用成功,检查是否出现同步方法调用异步方法的情况
②. 注册EventBus时指定程序集集合, Assembly被用于注册时获取并保存事件与Handler的对应关系

var assemblies = new[]
{typeof(UserHandler).Assembly
};
builder.Services.AddEventBus(assemblies);

程序集: 手动指定Assembly集合 -> MasaApp.GetAssemblies() -> AppDomain.CurrentDomain.GetAssemblies()

但由于NetCore按需加载,未使用的程序集在当前域中不存在,因此可能会导致部分事件以及Handler的对应关系未正确保存,因此可通过手动指定Assembly集合或者修改全局配置中的Assembly集合来修复这个问题

  1. 通过EventBus发布事件,Handler出错,但数据依然保存到数据库中

①. 检查是否禁用事务

  1. DisableRollbackOnFailure是否为true (是否失败时禁止回滚)
  2. UseTransaction是否为false (禁止使用事务)

②. 检查当前数据库是否支持回滚。例如: 使用的是Mysql数据库,但回滚数据失败,请查看

本章源码

Assignment11

https://github.com/zhenlei520/MasaFramework.Practice

开源地址

MASA.Framework:https://github.com/masastack/MASA.Framework

如果你对我们的 MASA Framework 感兴趣,无论是代码贡献、使用、提 Issue,欢迎联系我们

  • WeChat:MasaStackTechOps
  • QQ:7424099

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_224767.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python学习 - 异常处理

Python学习 - 语法入门&#xff1a;https://blog.csdn.net/wanzijy/article/details/125287855 Python学习 - 数据类型&#xff1a;https://blog.csdn.net/wanzijy/article/details/125341568 Python学习 - 流程控制&#xff1a;https://blog.csdn.net/wanzijy/article/details…

什么是软件测试?

什么是软件测试&#xff1f; 软件测试的定义&#xff1a;在一定条件下对软件进行操作&#xff0c;发现软件的问题&#xff0c;提高软件的质量。 软件测试在开发中的有着重要地位。软件测试在各阶段的完成相应的任务&#xff0c;需求测试&#xff0c;架构测试&#xff0c;详细测…

关于windows的文件监控管理系统(Java)

目 录 摘 要 I Abstract II 1.绪论 1 1.1课题背景 1 1.2系统开发的目的和意义 2 1.3国内外概况 3 1.4研究主要内容 3 2.windows文件监控管理系统相关技术介绍 4 2.1 API 4 2.2 API HOOK 5 2.3 Java 5 2.4 DLL 6 2.4 Windows系统的Socket编程 6 2.4.1使用WinSock API 6 2.4.2 使…

MCE | 为什么肥胖经常被“针对”?

近年来&#xff0c;肥胖问题受到越来越多的关注&#xff0c;肥胖不只影响美丽身材&#xff0c;过度肥胖还可能导致肥胖症&#xff0c;这是很多疾病的高风险因素。所以肥胖是一种病&#xff1f;肥胖的标准是什么&#xff1f;别急&#xff0c;等小编慢慢道来。 认识肥胖症 (Obesi…

运动用品品牌排行榜,2022年值得买的运动用品推荐

如今&#xff0c;人们的生活节奏越来越快&#xff0c;工作和生活压力大。因此&#xff0c;人们越来越重视体育运动&#xff0c;通过体育运动达到放松和锻炼身体的目的&#xff0c;运动装备也就跟着火热起来。无论是进行室内或户外活动&#xff0c;选一套合适的运动装备是很有必…

顶象首届业务安全保卫战完美落幕,快来看看TOP10里有没有你!

今年双十一&#xff0c;顶象特别发起了首届业务安全保卫战&#xff0c;旨在召集白帽子们为业务安全贡献自己的一份力量。历经一个月&#xff0c;顶象首届业务安全保卫战已于20日正式落下帷幕。 截止11月20 日&#xff0c;顶象业务安全保卫战通过审核的业务安全情报&业务安…

Jetpack Compose中的state核心思想

Compose 中的状态 应用的“状态”是指可以随时间变化的任何值。这是一个非常宽泛的定义&#xff0c;从 Room 数据库到类的变量&#xff0c;全部涵盖在内。 所有 Android 应用都会向用户显示状态。下面是 Android 应用中的一些状态示例&#xff1a; 聊天应用中最新收到的消息…

MES必懂知识,市场需求下的生产管理系统

任何事物的产生和发展都与市场的需求是分不开的&#xff0c;只有当市场需求新生的事物的时候&#xff0c;他才会兴起&#xff0c;有的事物早已经产生&#xff0c;在当时的环境下并未兴起&#xff0c;却在后来才兴盛&#xff0c;这是市场的需求的影响。 MES便是在市场需求下诞生…

105-120-Hadoop-MapReduce-outputformat:

105-Hadoop-MapReduce-outputformat&#xff1a; OutputFormat 数据输出&#xff0c;OutputFormat接口实现类 OutputFormat是MapReduce输出的基类&#xff0c;所有实现MapReduce输出都实现了 OutputFormat 接口。下面我们介绍几种常见的OutputFormat实现类。 1&#xff0e;O…

系统分析与设计 复习

文章目录系统分析与设计 复习第 1 章 系统分析与设计概述系统特性DevOps第 2 章 系统规划**系统规划步骤**规划模型诺兰模型**CMM 模型**系统规划方法战略集合转换法 SST关键成功因素法 CSF企业资源规划法 BSPCSB 三者联系和区别第 3 章系统分析系统分析概述业务流程图系统流程…

linux进程间通信-FIFO,让你全方位理解

有名管道(FIFO) 有名管道也被称为FIFO文件&#xff0c;是一种特殊的文件。由于linux所有的事物都可以被视为文件&#xff0c;所以对有名管道的使用也就变得与文件操作非常统一。 (1)创建有名管道 用如下两个函数中的其中一个&#xff0c;可以创建有名管道。 #include #include …

我们的程序是如何跑起来的?

1.我们写的代码写完并测试以后是如何部署给用户使用的? 1. 准备所需要的服务器 2. 在服务器上安装JDK、mysql、redis、Tomcat、Nginx等环境 3. 进行mysql、redis、nginx的连接配置 4. 项目打包。前端构建打包成功后在根目录dist文件夹中&#xff1b;后端打成jar包&#xff0c…

用HTML+CSS做一个简单好看的汽车网页

&#x1f389;精彩专栏推荐 &#x1f4ad;文末获取联系 ✍️ 作者简介: 一个热爱把逻辑思维转变为代码的技术博主 &#x1f482; 作者主页: 【主页——&#x1f680;获取更多优质源码】 &#x1f393; web前端期末大作业&#xff1a; 【&#x1f4da;毕设项目精品实战案例 (10…

第二章 计算机算术

数据表示决定了计算机所执行操作的类型&#xff0c;数据从一个位置传到另一个位置的方法&#xff0c; 以及对存储元件的特性要求。浮点运算是非常重要的&#xff0c;因为它的实现决定了计算机执行复杂图形变换和图像处理的速度&#xff0c; 而且浮点运算对计算的准确度也有很重…

生物素标记试剂:(1458576-00-5,1802908-00-4)Biotin-PEG4-alkyne,Dde-生物素-四聚乙二醇-炔

一、Biotin-PEG4-alkyne 【中文名称】生物素-四聚乙二醇-炔&#xff0c;生物素-四聚乙二醇-丙炔基 【英文名称】 Biotin-PEG4-alkyne 【CAS】1458576-00-5 【分子式】C21H35N3O6S 【分子量】457.58 【纯度】95% 【外观】 淡黄色或白色固体 &#xff08;具体由其分子量大小决定…

XSS game -xss学习

网址 https://xss-game.appspot.com/level1Level 1: Hello, world of XSS payload: <script> alert(1); </script>漏洞产生处: message "Sorry, no results were found for <b>" query "</b>."Level 2: Persistence is key…

Solidity vs. Vyper:不同的智能合约语言的优缺点

本文探讨以下问题&#xff1a;哪种智能合约语言更有优势&#xff0c;Solidity 还是 Vyper&#xff1f;最近&#xff0c;关于哪种是“最好的”智能合约语言存在很多争论&#xff0c;当然了&#xff0c;每一种语言都有它的支持者。 这篇文章是为了回答这场辩论最根本的问题&…

Python读取CSV文件,数值精度丢失

Excel保存为csv以后&#xff0c;大数值的列&#xff0c;会把转换为科学计数法&#xff0c;而且后边几位都会被转为0. 搞了很多方法,最后直接安装 openpyxl 组件 和 pandas&#xff0c; 读取Excel文件就行了。 data pd.read_excel("C:/work/20221111AI/cleaned_data_noTi…

WinForm,可能是Windows上手最快的图形框架了

文章目录Label和控件属性按钮和回调逻辑事件常用控件前文提要&#xff1a;超快速成&#xff0c;零基础掌握C#开发中最重要的概念抽丝剥茧&#xff0c;C#面向对象快速上手源码地址&#xff1a;我的第一个WinForm程序 Label和控件属性 WinForm是一门非常经济实惠的技术&#xf…

软件测试V模型

以“编码”为黄金分割线&#xff0c;将整个过程分为开发和测试&#xff0c;并且开发和测试之间是串行的关系 特点&#xff1a; 明确标注了测试的类型 明确标注了测试阶段和开发阶段之间的对应关系 缺点&#xff1a; 测试后置 V模型是基于瀑布模型的&#xff0c;将测试放在…