.Net Core对于`RabbitMQ`封装分布式事件总线

news/2024/5/9 9:38:07/文章来源:https://blog.csdn.net/ma_nong33/article/details/128996466

首先我们需要了解到分布式事件总线是什么;


分布式事件总线是一种在分布式系统中提供事件通知、订阅和发布机制的技术。它允许多个组件或微服务之间的协作和通信,而无需直接耦合或了解彼此的实现细节。通过事件总线,组件或微服务可以通过发布或订阅事件来实现异步通信。

例如,当一个组件完成了某项任务并生成了一个事件,它可以通过事件总线发布该事件。其他相关组件可以通过订阅该事件来接收通知,并做出相应的反应。这样,组件之间的耦合就被减轻了,同时也提高了系统的可维护性和可扩展性。

然后了解一下RabbitMQ


RabbitMQ是一种开源的消息代理和队列管理系统,用于在分布式系统中进行异步通信。它的主要功能是接收和分发消息,并且支持多种协议,包括AMQP,STOMP,MQTT等。RabbitMQ通过一个中间层,可以把消息发送者与消息接收者隔离开来,因此消息发送者和消息接收者并不需要在同一时刻在线,并且也不需要互相知道对方的地址。

  1. RabbitMQ的主要功能包括:

  1. 消息存储:RabbitMQ可以将消息存储在内存或硬盘上,以保证消息的完整性。

  1. 消息路由:RabbitMQ支持消息的路由功能,可以将消息从生产者发送到消费者。

  1. 消息投递:RabbitMQ提供了多种消息投递策略,包括简单模式、工作队列、发布/订阅模式等。

  1. 可靠性:RabbitMQ保证消息的可靠性,即消息不会丢失、不重复、按顺序投递。

  1. 可扩展性:RabbitMQ支持水平扩展,可以通过增加节点来扩展系统的处理能力。

本文将讲解使用RabbitMQ实现分布式事件

实现我们创建一个EventsBus.Contract的类库项目,用于提供基本接口,以支持其他实现

在项目中添加以下依赖引用,并且记得添加EventsBus.Contract项目引用

<ItemGroup><PackageReferenceInclude="Microsoft.Extensions.DependencyInjection.Abstractions"Version="7.0.0" /><PackageReferenceInclude="Microsoft.Extensions.Options"Version="7.0.0" /><PackageReferenceInclude="Microsoft.Extensions.Options.ConfigurationExtensions"Version="7.0.0" /><PackageReferenceInclude="RabbitMQ.Client"Version="6.4.0" /></ItemGroup>

创建项目完成以后分别创建EventsBusOptions.cs,IEventsBusHandle.cs,RabbitMQEventsManage.cs,ILoadEventBus.cs ,提供我们的分布式事件基本接口定义

EventsBusOptions.cs:

namespace EventsBus.Contract;public classEventsBusOptions
{/// <summary>/// 接收时异常事件/// </summary>public static Action<IServiceProvider, Exception,byte[]>? ReceiveExceptionEvent;
}

IEventsBusHandle.cs:

namespace EventsBus.Contract;public interface IEventsBusHandle<in TEto> where TEto : class
{Task HandleAsync(TEto eventData);
}

ILoadEventBus.cs:

namespace EventsBus.Contract;public interface ILoadEventBus
{/// <summary>/// 发布事件/// </summary>/// <param name="eto"></param>/// <typeparam name="TEto"></typeparam>/// <returns></returns>Task PushAsync<TEto>(TEto eto) where TEto : class;
}

EventsBusAttribute.cs:用于Eto(Eto 是我们按照约定使用的Event Transfer Objects(事件传输对象)的后缀. s虽然这不是必需的,但我们发现识别这样的事件类很有用(就像应用层上的DTO 一样))的名称,对应到RabbitMQ的通道

namespace EventsBus.RabbitMQ;[AttributeUsage(AttributeTargets.Class)]
public classEventsBusAttribute : Attribute
{public readonly string Name;public EventsBusAttribute(string name){Name = name;}
}

然后可以创建我们的RabbitMQ实现了,创建EventsBus.RabbitMQ类库项目,用于编写EventsBus.Contract的RabbitMQ实现

创建项目完成以后分别创建Extensions\EventsBusRabbitMQExtensions.cs,Options\RabbitMQOptions.cs,EventsBusAttribute.cs,,RabbitMQFactory.cs,RabbitMQLoadEventBus.cs

Extensions\EventsBusRabbitMQExtensions.cs:提供我们RabbitMQ扩展方法让使用者更轻松的注入,命名空间使用Microsoft.Extensions.DependencyInjection,这样就在注入的时候减少过度使用命名空间了

using EventsBus.Contract;
using EventsBus.RabbitMQ;
using EventsBus.RabbitMQ.Options;
using Microsoft.Extensions.Configuration;namespace Microsoft.Extensions.DependencyInjection;public staticclassEventsBusRabbitMQExtensions
{public static IServiceCollection AddEventsBusRabbitMQ(this IServiceCollection services,IConfiguration configuration){services.AddSingleton<RabbitMQFactory>();services.AddSingleton(typeof(RabbitMQEventsManage<>));services.Configure<RabbitMQOptions>(configuration.GetSection(nameof(RabbitMQOptions)));services.AddSingleton<ILoadEventBus, RabbitMQLoadEventBus>();return services;}
}

Options\RabbitMQOptions.cs:提供基本的Options 读取配置文件中并且注入,services.Configure<RabbitMQOptions>(configuration.GetSection(nameof(RabbitMQOptions)));的方法是读取IConfiguration的名称为RabbitMQOptions的配置东西,映射到Options中,具体使用往下看。

using RabbitMQ.Client;namespace EventsBus.RabbitMQ.Options;public classRabbitMQOptions
{/// <summary>/// 要连接的端口。 <see cref="AmqpTcpEndpoint.UseDefaultPort"/>/// 指示应使用的协议的缺省值。/// </summary>public int Port { get; set; } = AmqpTcpEndpoint.UseDefaultPort;/// <summary>/// 地址/// </summary>public string HostName { get; set; }/// <summary>/// 账号/// </summary>public string UserName { get; set; }/// <summary>/// 密码/// </summary>public string Password { get; set; }
}

RabbitMQEventsManage.cs:用于管理RabbitMQ的数据接收,并且将数据传输到指定的事件处理程序

using System.Reflection;
using System.Text.Json;
using EventsBus.Contract;
using Microsoft.Extensions.DependencyInjection;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;namespace EventsBus.RabbitMQ;public classRabbitMQEventsManage<TEto> where TEto : class
{private readonly IServiceProvider _serviceProvider;private readonly RabbitMQFactory _rabbitMqFactory;public RabbitMQEventsManage(IServiceProvider serviceProvider, RabbitMQFactory rabbitMqFactory){_serviceProvider = serviceProvider;_rabbitMqFactory = rabbitMqFactory;_ = Task.Run(Start);}private voidStart(){var channel = _rabbitMqFactory.CreateRabbitMQ();var eventBus = typeof(TEto).GetCustomAttribute<EventsBusAttribute>();var name = eventBus?.Name ?? typeof(TEto).Name;channel.QueueDeclare(name, false, false, false, null);var consumer = new EventingBasicConsumer(channel); //消费者channel.BasicConsume(name, true, consumer); //消费消息consumer.Received += async (model, ea) =>{var bytes = ea.Body.ToArray();try{// 这样就可以实现多个订阅var events = _serviceProvider.GetServices<IEventsBusHandle<TEto>>();foreach (var handle in events){await handle?.HandleAsync(JsonSerializer.Deserialize<TEto>(bytes));}}catch (Exception e){EventsBusOptions.ReceiveExceptionEvent?.Invoke(_serviceProvider, e, bytes);}};}
}

RabbitMQFactory.cs:提供RabbitMQ链接工厂,在这里你可以自己去定义和管理RabbitMQ工厂

using EventsBus.RabbitMQ.Options;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;namespace EventsBus.RabbitMQ;public classRabbitMQFactory : IDisposable
{private readonly RabbitMQOptions _options;private readonly ConnectionFactory _factory;private IConnection? _connection;public RabbitMQFactory(IOptions<RabbitMQOptions> options){_options = options?.Value;// 将Options中的参数添加到ConnectionFactory_factory = new ConnectionFactory{HostName = _options.HostName,UserName = _options.UserName,Password = _options.Password,Port = _options.Port};}public IModel CreateRabbitMQ(){// 当第一次创建RabbitMQ的时候进行链接_connection ??= _factory.CreateConnection();return _connection.CreateModel();}public voidDispose(){_connection?.Dispose();}
}

RabbitMQLoadEventBus.cs:用于实现ILoadEventBus.cs通过ILoadEventBus发布事件RabbitMQLoadEventBus.cs是RabbitMQ的实现

using System.Reflection;
using System.Text.Json;
using EventsBus.Contract;
using Microsoft.Extensions.DependencyInjection;namespace EventsBus.RabbitMQ;public classRabbitMQLoadEventBus : ILoadEventBus
{private readonly IServiceProvider _serviceProvider;private readonly RabbitMQFactory _rabbitMqFactory;public RabbitMQLoadEventBus(IServiceProvider serviceProvider, RabbitMQFactory rabbitMqFactory){_serviceProvider = serviceProvider;_rabbitMqFactory = rabbitMqFactory;}public async Task PushAsync<TEto>(TEto eto) where TEto : class{//创建一个通道//这里Rabbit的玩法就是一个通道channel下包含多个队列Queueusing var channel = _rabbitMqFactory.CreateRabbitMQ();// 获取Eto中的EventsBusAttribute特性,获取名称,如果没有默认使用类名称var eventBus = typeof(TEto).GetCustomAttribute<EventsBusAttribute>();var name = eventBus?.Name ?? typeof(TEto).Name;// 使用获取的名称创建一个通道channel.QueueDeclare(name, false, false, false, null);var properties = channel.CreateBasicProperties();properties.DeliveryMode = 1;// 将数据序列号,然后发布channel.BasicPublish("", name, false, properties, JsonSerializer.SerializeToUtf8Bytes(eto)); //生产消息// 让其注入启动管理服务,RabbitMQEventsManage需要手动激活,由于RabbitMQEventsManage是单例,只有第一次激活才有效,var eventsManage = _serviceProvider.GetService<RabbitMQEventsManage<TEto>>();await Task.CompletedTask;}
}

在这里我们的RabbitMQ分布式事件就设计完成了,注:这只是简单的一个示例,并未经过大量测试,请勿直接在生产使用;

然后我们需要使用RabbitMQ分布式事件总线工具包

使用RabbitMQ分布式事件总线的示例


首先我们需要准备一个RabbitMQ,可以在官网自行下载,我就先使用简单的,通过docker compose启动一个RabbitMQ,下面提供一个compose文件

version:'3.1'services:rabbitmq:restart:always# 开机自启image:rabbitmq:3.11-management# RabbitMQ使用的镜像container_name:rabbitmq# docker名称hostname:rabbitports:-5672:5672# 只是RabbitMQ SDK使用的端口-15672:15672# 这是RabbitMQ管理界面使用的端口environment:TZ:Asia/Shanghai# 设置RabbitMQ时区RABBITMQ_DEFAULT_USER:token# rabbitMQ账号RABBITMQ_DEFAULT_PASS:dd666666# rabbitMQ密码volumes:-./data:/var/lib/rabbitmq

启动以后我们创建一个WebApi项目,项目名称Demo,创建完成打开项目文件添加引用

<ProjectSdk="Microsoft.NET.Sdk.Web"><PropertyGroup><TargetFramework>net7.0</TargetFramework><Nullable>enable</Nullable><ImplicitUsings>enable</ImplicitUsings></PropertyGroup><ItemGroup><PackageReferenceInclude="Microsoft.AspNetCore.OpenApi"Version="7.0.0" /><PackageReferenceInclude="Swashbuckle.AspNetCore"Version="6.4.0" /></ItemGroup><ItemGroup><!-- 引用RabbitMQ事件总线项目--><ProjectReferenceInclude="..\EventsBus.RabbitMQ\EventsBus.RabbitMQ.csproj" /></ItemGroup></Project>

修改appsettings.json配置文件:将RabbitMQ的配置写上,RabbitMQOptions名称对应在EventsBus.RabbitMQ中的RabbitMQOptions文件![image-20230211022801105]

在这里注入的时候将配置注入好了

{"Logging":{"LogLevel":{"Default":"Information","Microsoft.AspNetCore":"Warning"}},"AllowedHosts":"*","RabbitMQOptions":{"HostName":"127.0.0.1","UserName":"token","Password":"dd666666"}}

创建DemoEto.cs文件:

using EventsBus.RabbitMQ;namespace Demo;[EventsBus("Demo")]
public classDemoEto
{public int Size { get; set; }public string Value { get; set; }
}

创建DemoEventsBusHandle.cs文件:这里是订阅DemoEto事件,相当于是DemoEto的处理程序

using System.Text.Json;
using EventsBus.Contract;namespace Demo;/// <summary>/// 事件处理服务,相当于订阅事件/// </summary>
public classDemoEventsBusHandle : IEventsBusHandle<DemoEto>
{public async Task HandleAsync(DemoEto eventData){Console.WriteLine($"DemoEventsBusHandle: {JsonSerializer.Serialize(eventData)}");await Task.CompletedTask;}
}

打开Program.cs 修改代码: 在这里注入了事件总线服务,和我们的事件处理服务

using Demo;
using EventsBus.Contract;var builder = WebApplication.CreateBuilder(args);builder.Services.AddControllers();builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();// 注入事件处理服务
builder.Services.AddSingleton(typeof(IEventsBusHandle<DemoEto>),typeof(DemoEventsBusHandle));// 注入RabbitMQ服务
builder.Services.AddEventsBusRabbitMQ(builder.Configuration);var app = builder.Build();// 只有在Development显示Swaggerif (app.Environment.IsDevelopment())
{app.UseSwagger();app.UseSwaggerUI();
}// 强制Https
app.UseHttpsRedirection();app.UseAuthorization();app.MapControllers();app.Run();

创建Controllers\EventBusController.cs控制器:我们在控制器中注入了ILoadEventBus ,通过调用接口实现发布事件;

using EventsBus.Contract;
using Microsoft.AspNetCore.Mvc;namespace Demo.Controllers;[ApiController]
[Route("[controller]")]
public classEventBusController : ControllerBase
{private readonly ILoadEventBus _loadEventBus;public EventBusController(ILoadEventBus loadEventBus){_loadEventBus = loadEventBus;}/// <summary>/// 发送信息/// </summary>/// <param name="eto"></param>[HttpPost]public async Task Send(DemoEto eto){await _loadEventBus.PushAsync(eto);}
}

然后我们启动程序会打开Swagger调试界面:

然后我们发送一下事件:

我们可以看到,在数据发送的时候也同时订阅到了我们的信息,也可以通过分布式事件总线限流等实现,

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_256844.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

大家心心念念的RocketMQ5.x入门手册来喽

1、前言 为了更好的拥抱云原生&#xff0c;RocketMQ5.x架构进行了大的重构&#xff0c;提出了存储与计算分离的设计架构&#xff0c;架构设计图如下所示&#xff1a; RocketMQ5.x提供了一套非常建议的消息发送、消费API&#xff0c;并统一放在Apache顶级开源项目rocketmq-clie…

TC3xx FlexRay™ 协议控制器 (E-Ray)-01

1 FlexRay™ 协议控制器 (E-Ray) E-Ray IP 模块根据为汽车应用开发的 FlexRay™ 协议规范 v2.1 执行通信【performs communication according to the FlexRay™ 1) protocol specification v2.1】。使用最大指定时钟&#xff0c;比特率可以编程为高达 10 Mbit/s 的值。连接到物…

就现在!为元宇宙和Web3对互联网的改造做准备!

欢迎来到Hubbleverse &#x1f30d; 关注我们 关注宇宙新鲜事 &#x1f4cc; 预计阅读时长&#xff1a;8分钟 本文仅代表作者个人观点&#xff0c;不代表平台意见&#xff0c;不构成投资建议。 如今&#xff0c;互联网是各种不同的网站、应用程序和平台的集合。由于彼此分离…

STM32单片机GSM短信自动存取快递柜

实践制作DIY- GC0104-自动存取快递柜 一、功能说明&#xff1a; 基于STM32单片机设计-自动存取快递柜 二、功能介绍&#xff1a; STM32F103C系列最小系统板0.96寸OLED显示器DY-SV17F串口语音播报模块4*4矩阵键盘GSM短信模块4路舵机&#xff08;模拟4个柜子&#xff09; ***…

【openGauss实战9】深度分析分区表

&#x1f4e2;&#x1f4e2;&#x1f4e2;&#x1f4e3;&#x1f4e3;&#x1f4e3; 哈喽&#xff01;大家好&#xff0c;我是【IT邦德】&#xff0c;江湖人称jeames007&#xff0c;10余年DBA及大数据工作经验 一位上进心十足的【大数据领域博主】&#xff01;&#x1f61c;&am…

Syzkaller学习笔记---更新syz-extract/syz-sysgen(一)

Syzkaller学习笔记Syzkaller 安装文件系统内核Android common kernel参考文献syzkaller 源码阅读笔记-1前言syz-extractmainarchListcreateArchesworkerprocessArchprocessFileextractcheckUnsupportedCallsarchList小结syz-sysgenmainprocessJob()generateExecutorSyscalls()w…

2016-ICLR-Order Matters- Sequence to sequence for sets

2016-ICLR-Order Matters- Sequence to sequence for sets Paper: [https://arxiv.org/pdf/1511.06391.pdf](https://arxiv.org/pdf/1511.06391.pdf) Code: 顺序重要性&#xff1a;集合的顺序到序列 摘要 许多需要从观察序列映射或映射到观察序列的复杂任务现在可以使用序列…

C++创建多线程的方法总结

下个迭代有个任务很有趣&#xff0c;用大量的线程去访问一个接口&#xff0c;直至其崩溃为止&#xff0c;这就需要多线程的知识&#xff0c;这也不是什么难事&#xff0c;总结一下C中的多线程方法&#xff1a;std、boost、pthread、windows api。 目录 一、多线程预备知识 二…

基于SpringBoot实现ChatGPT-QQ机器人

概述 近期ChatGPT火爆全球&#xff0c;在其官方网站上也列举了非常全面的应用案例&#xff0c;仅仅上线两个月活跃用户数已经达到1亿&#xff0c;成为历史上用户数增长最快的面向消费者的应用 快速体验 OpenAI官网对外提供了标准的 API 接口&#xff0c;可以通过HTTP请求进行…

简单的密码加密

用户的密码必须被加密后再存储到数据库, 否则就存在用户账号安全问题用户使用的原始密码通常称之为"原文"或"明文", 经过算法的运算, 得到的结果通常称之为"密文"在处理密码加密时, 不可以使用任何加密算法, 因为所有加密算法都是可以被逆向运算…

centos学习记录

遇到的问题及其解决办法 centos7安装图形化界面 yum groupinstall ‘X Window System’ yum groupinstall -y ‘GNOME Desktop’ 安装完成后输入init 5进入图形化界面 centos7安装vmware-tools 第一步卸载open-vm-tools 输入命令 yum remove open-vm-tools 输入命令 reboot 在…

微前端基础

一、什么是微前端 微前端是一种软件架构&#xff0c;可以将前端应用拆解成一些更小的能够独立开发部署的微型应用&#xff0c;然后再将这些微应用进行组合使其成为整体应用的架构模式。微前端架构类似于组件架构&#xff0c;但不同的是&#xff0c;组件不能独立构建和发布&…

大数据时代的小数据神器 - asqlcell

自从Google发布了经典的MapReduce论文&#xff0c;以及Yahoo开源了Hadoop的实现&#xff0c;大数据这个词就成为了一个行业的热门。在不断提高的机器性能和各种层出不穷的工具框架加持下&#xff0c;数据分析开始从过去的采样抽查变成全量整体&#xff0c;原先被抽样丢弃的隐藏…

网络安全实验室7.综合关

7.综合关 1.渗透测试第一期 url&#xff1a;http://lab1.xseclab.com/base14_2d7aae2ae829d1d5f45c59e8046bbc54/ 进入忘记密码页面&#xff0c;右键查看源码&#xff0c;发现一个手机号 解题思路&#xff1a;通过给admin用户绑定13388758688手机号码&#xff0c;然后再进行…

使用vue3,vite,less,flask,python从零开始学习硅谷外卖(16-40集)

严正声明&#xff01; 重要的事情说一遍&#xff0c;本文章仅供分享&#xff0c;文章和代码都是开源的&#xff0c;严禁以此牟利&#xff0c;严禁侵犯尚硅谷原作视频的任何权益&#xff0c;我知道学习编程的人各种各样的心思都有&#xff0c;但这不是你对开源社区侵权的理由&am…

【算法题解】15. 设计最小栈

这是一道 中等难度 的题。 题目来自&#xff1a;leetcode 题目 设计一个支持 push &#xff0c;pop &#xff0c;top 操作&#xff0c;并能在 常数时间 内检索到最小元素的栈。 实现 MinStack 类: MinStack() 初始化堆栈对象。void push(int val) 将元素val推入堆栈。void p…

驱动 | Linux | NVMe 不完全总结

本文主要参考这里 1’ 2 的解析和 linux 源码 3。 此处推荐一个可以便捷查看 linux 源码的网站 bootlin 4。 更新&#xff1a;2022 / 02 / 11 驱动 | Linux | NVMe 不完全总结NVMe 的前世今生从系统角度看 NVMe 驱动NVMe CommandPCI 总线从架构角度看 NVMe 驱动NVMe 驱动的文件…

详细解读503服务不可用的错误以及如何解决503服务不可用

文章目录1. 问题引言2. 什么是503服务不可用错误3 尝试解决问题3.1 重新加载页面3.2 检查该站点是否为其他人关闭3.3 重新启动设备3.3 联系网站4. 其他解决问的方法1. 问题引言 你以前遇到过错误503吗&#xff1f; 例如&#xff0c;您可能会收到消息&#xff0c;如503服务不可…

三种方式查看linux终端terminal是否可以访问外网ping,curl,wget

方法1&#xff1a;ping注意不要用ping www.google.com.hk来验证&#xff0c;因为有墙&#xff0c;墙阻止了你接受网址发回的响应数据。即使你那啥过&#xff0c;浏览器都可以访问Google&#xff0c;terminal里面也是无法得到响应 百度在墙内&#xff0c;所以可以正常拿到响应信…

sklearn降维算法1 - 降维思想与PCA实现

目录1、概述1.1 维度概念2、PCA与SVD2.1 降维实现2.2 重要参数n_components2.2.1 案例&#xff1a;高维数据的可视化2.2.2 最大似然估计自选超参数2.2.3 按信息量占比选超参数1、概述 1.1 维度概念 shape返回的结果&#xff0c;几维几个方括号嵌套 特征矩阵特指二维的 一般来…