RocketMQ之(一)RocketMQ入门

news/2024/4/24 18:01:54/文章来源:https://blog.csdn.net/qq_50994235/article/details/129028957

一、RocketMQ入门

  • 一、RocketMQ 介绍
    • 1.1 RocketMQ 是什么?
    • 1.2 RocketMQ 应用场景
      • 01、应用解耦
      • 02、流量削峰
      • 03、数据分发
    • 1.3 RocketMQ 核心组成
      • 01、NameServer
      • 02、Broker
      • 03、Producer
      • 04、Consumer
    • 1.6 运转流程
    • 1.5 RocketMQ 架构
      • 01、NameServer 集群
      • 02、Broker 集群
      • 03、Producer 集群
      • 04、Consumer 集群
      • 07、集群工作流程
      • 06、集群间的交互方式
    • 1.6 RocketMQ 优缺点
      • 01、优点
      • 02、缺点
    • 1.7 各种 MQ 比较
  • 二、RocketMQ 安装(Linux 版本)
    • 2.1 环境要求
    • 2.2 安装步骤
      • 01、上传安装包
      • 02、解压安装包
      • 03、参数配置
    • 2.3 目录介绍
    • 2.4 启动 RocketMQ
    • 2.5 测试 RocketMQ
    • 2.6 关闭 RocketMQ
  • 三、rocketmq-console 集群监控平台搭建
    • 3.1 简介
    • 3.2 搭建集群监控平台
      • 01、下载
      • 02、上传解压
      • 03、修改配置参数
      • 04、打包
      • 05、启动和访问
      • 06、问题点
  • 四、RocketMQ 发送消息基本样例
    • 4.1 普通消息发送
    • 4.2 普通消息消费

一、RocketMQ 介绍

1.1 RocketMQ 是什么?

RocketMQ 是一款纯 java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。

1.2 RocketMQ 应用场景

消息队列是一种"先进先出"的数据结构,其应用场景主要包含以下三个方面:

01、应用解耦

系统的耦合性越高,容错性就越低。

以电商应用为例:用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障或者因为升级等原因暂时不可用,都会造成下单操作异常,影响用户使用体验。
在这里插入图片描述
使用消息队列解耦,系统的耦合性就会提高了。比如:如果物流系统发生故障,需要几分钟才能修复好,在这段时间内,物流系统要处理的数据被缓存到消息队列中,用户的下单操作正常完成。当物流系统恢复后,补充处理存在消息队列中的订单消息即可,终端系统感知不到物流系统发生过几分钟故障。
在这里插入图片描述

02、流量削峰

应用系统如果遇到系统请求流量的瞬间猛增,有可能会将系统压垮。有了消息队列后,可以将大量请求缓存起来,分散到很长一段时间处理,这样可以大大提升系统的稳定性和用户体验。
在这里插入图片描述
一般情况下,为了保证系统的稳定性,如果系统负载超过阈值,就会阻止用户请求,这会影响用户体验。而如果使用消息队列将请求缓存起来,等待系统处理完毕后通知用户下单完毕,这样的体验应该要好很多。
在这里插入图片描述
处于经济考量的目的:
业务系统正常时段的 QPS 如果是 1000,流量最高峰是 10000,为了应对流量高峰配置高性能的服务器显然不划算,这时就可以考虑使用消息队列对峰值流量削峰。

03、数据分发

通过消息队列可以让数据在多个系统之间进行流通。数据的产生方不需要关心谁来使用数据,只需要将数据发送到消息队列,数据使用方直接在消息队列中直接获取数据即可。
在这里插入图片描述
在这里插入图片描述

1.3 RocketMQ 核心组成

RocketMQ 主要有四大核心组成部分:NameServer、Broker、Producer、Consumer。

在这里插入图片描述
Topic:区分消息的种类。一个发送者可以发送消息给一个或多个 Topic,一个消息的接收者可以订阅一个或多个 Topic 消息。

Message Queue:相当于是 Topic 的分区,用于并行发送和接收消息。

01、NameServer

NameServer 是一个几乎无状态节点,可集群部署,节点之间没有任何信息同步。所以 RocketMQ 需要先启动 NameServer 再启动 Broker。

  • 作用

    NameServer 是整个 RocketMQ 的 "大脑",它相当于是服务注册中心的角色,用来管理 Broker。举例:各个邮局的管理机构。

    每个 NameServer 中都保存着 Broker 集群的整个路由信息和用于客户端查询的队列信息,生产者和消费者通过 NameServer 去获取整个 Broker 集群的路由信息,从而进行消息的投递和消费。

    每个 Broker 在启动的时候会到 NameServer 中注册,Producer 在发送消息前会根据 Topic 到 NameServer 获取到 Broker 的路由信息,进而和 Broker 取得连接。Consumer 也会定时获取 Topic 的路由信息。所以从功能上看应该是和 ZooKeeper 差不多的,但是据说 RocketMQ 的早期版本确实是使用的ZooKeeper ,后来改为了自己实现 NameServer。

  • 与 ZooKeeper 的区别

    NameServer 和 ZooKeeper 的作用大致是相同的。从宏观上来看,NameServer 做的东西很少,就是保存一些运行数据,NameServer 之间不互相连,这就需要 Broker 端连接所有的 NameServer,运行数据的改动要发送到每一个 NameServer ,从而来保证运行数据的一致性(这个一致性确实有点弱),这样就变成了 NameServer很轻量级,但是 Broker 端就要做更多的东西了。

    但是在 ZooKeeper 中,Broker 只需要连接其中的一台机器,运行数据分发、一致性都交给了 ZooKeeper 来完成。

  • 高可用保障

    Broker 在启动时向所有 NameServer 注册(主要是服务器地址等) ,生产者在发送消息之前先从NameServer 获取 Broker 服务器地址列表(消费者一样),然后根据负载均衡算法从列表中选择一台服务器进行消息发送。

    NameServer 与每台 Broker 服务保持长连接,并间隔 30S 检查 Broker 是否存活,如果检测到 Broker 宕机,则从路由注册表中将其移除,这样就可以实现 RocketMQ 的高可用。

02、Broker

Broker 是消息存储中心,主要作用是接收来自 Producer 的消息并存储,Consumer 从这里取得消息。举例:邮局。

它还存储与消息相关的元数据,包括用户组、消费进度偏移量、队列信息等。从部署结构图中可以看出 Broker 有 Master 和 Slave 两种类型,Master 既可以写又可以读,Slave 只可以读不可以写。

从物理结构上看 Broker 的集群部署方式有四种:单 Master 、多 Master 、多 Master 多 Slave(同步刷盘)、多 Master 多 Slave(异步刷盘)。

  • 单 Master

    这种方式风险较大,一旦 Broker 重启或者宕机时,会导致整个服务不可用。不建议线上环境使用,可以用于本地测试。

  • 多 Master

    所有消息都是 Master,没有 Slave。这种方式优缺点:

    • 优点:配置简单,单个 Master 宕机或重启维护对应用无影响。在磁盘配置为 RAID10 时,即使机器宕机不可恢复的情况下,由于RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高;
    • 缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。
  • 多 Master 多 Slave(异步复制)

    异步:先响应后再存入磁盘。

    每个 Master 配置一个 Slave,有多对 Master-Slave,HA 采用异步复制方式,主备之间有毫秒级消息延迟。这种方式优缺点:

    • 优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受到影响。同时 Master 宕机后,消费者仍然可以从 Salve 消费,而且此过程对应用透明,不需要人工干预,性能同多Master模式几乎一样;
    • 缺点:Master宕机,磁盘损坏情况下会丢失少量消息。
  • 多 Master 多 Slave(同步双写)

    同步:立刻存入磁盘后响应。

    每个 Master 配置一个 Slave,有多对 Master-Slave,HA 采用同步双写方式,即只有主备都写成功,才向应用返回成功。这种模式的优缺点:

    • 优点:数据与服务都无单点故障,Master 宕机情况下,消息无延迟,服务可用性与数据可用性都非常高;
    • 缺点:性能比异步复制模式略低(大约低10%左右),发送单个消息的 RT 会略高,且目前版本在主节点宕机后,备机不能自动切换为主机。
  • 高可用保障

    每个 Broker 与 NameServer 集群中的所有节点建立长连接,定时(每隔30s)注册 Topic 信息到所有 NameServer。NameServer定时(每隔10s)扫描所有存活 Broker 的连接,如果 NameServer 超过2分钟没有收到心跳,则 NameServer 断开与 Broker 的连接。

03、Producer

Producer 也称为消息发布者,负责生产并发送消息至 Topic。举例:发信者。生产者向 brokers 发送由业务应用程序系统生成的消息。RocketMQ 提供了发送:同步、异步和单向(one-way)的多种范例。

  • 同步发送

    同步发送消息指消息发送方发出数据后会在收到接收方发回响应之后才发下一个数据包。一般用于重要通知消息,比如重要通知邮件、营销短信等。

  • 异步发送

    异步发送指发送方发出数据后,不等接收方发回响应,接着发送下个数据包,一般用于可能链路耗时较长而对响应时间敏感的业务场景,例如用户视频上传后通知启动转码服务。假如过一段时间检测到某个信息发送失败,可以选择重新发送。

  • 单向发送

    单向发送是指只负责发送消息而不等待服务器回应且没有回调函数触发,适用于某些耗时非常短但对可靠性要求并不高的场景,例如日志收集。

  • 生产者组

    生产者组(Producer Group)是一类 Producer 的集合,这类 Producer 通常发送一类消息并且发送逻辑一致,所以将这些 Producer 分组在一起。从部署结构上看生产者通过 Producer Group 的名字来标记自己是一个集群。

  • 高可用保障

    Producer 与 NameServer 集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer 取 Topic 路由信息,并向提供 Topic 服务的 Master 建立长连接,且定时向 Master 发送心跳。Producer 完全无状态,可集群部署。

    Producer 每隔30s(由 ClientConfig 的 pollNameServerInterval )从 Nameserver 获取所有 topic 队列的最新情况,这意味着如果 Broker 不可用,Producer 最多30s能够感知,在此期间内发往 Broker 的所有消息都会失败。

    Producer 每隔30s(由 ClientConfig 中 heartbeatBrokerInterval 决定)向所有关联的 broker 发送心跳,Broker 每隔10s扫描所有存活的连接,如果 Broker 在2分钟内没有收到心跳数据,则关闭与 Producer 的连接。

04、Consumer

Consumer 也称为消息订阅者,负责从 Topic 接收并消费消息。举例:收信者。消费者从 brokers 里拉取信息并将其输入应用程序中。

  • 消费者组

    消费者组(Consumer Group)是一类 Consumer 的集合名称,这类 Consumer 通常消费同一类消息并且消费逻辑一致,所以将这些 Consumer 分组在一起。消费者组与生产者组类似,都是将相同角色的分组在一起并命名。

    RocketMQ 中的消息有个特点:同一条消息,只能被某一消费组其中的一台机器消费,但是可以同时被不同的消费组消费。

  • 高可用保障

    Consumer 与 NameServer 集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer 取 Topic 路由信息,并向提供 Topic 服务的 Master、Slave 建立长连接,且定时向 Master、Slave 发送心跳。Consumer 既可以从 Master 订阅消息,也可以从 Slave 订阅消息,订阅规则由 Broker 配置决定。

    Consumer 每隔30s从 Nameserver 获取 topic 的最新队列情况,这意味着 Broker 不可用时,Consumer 最多最需要30s才能感知。

    Consumer 每隔30s(由 ClientConfig 中 heartbeatBrokerInterval 决定)向所有关联的 broker 发送心跳,Broker 每隔10s扫描所有存活的连接,若某个连接2分钟内没有发送心跳数据,则关闭连接;并向该 Consumer Group 的所有 Consumer 发出通知,Group 内的 Consumer 重新分配队列,然后继续消费。

    当 Consumer 得到 master 宕机通知后,转向 slave 消费,slave 不能保证 master 的消息100%都同步过来了,因此会有少量的消息丢失。但是一旦 master 恢复,未同步过去的消息会被最终消费掉。

1.6 运转流程

在这里插入图片描述

  1. NameServer 先启动;
  2. Broker 启动时向 NameServer 注册;
  3. 生产者在发送某个主题的消息之前先从 NamerServer 获取 Broker 服务器地址列表(有可能是集群),然后根据负载均衡算法从列表中选择一台 Broker 进行消息发送;
  4. NameServer 与每台 Broker 服务器保持长连接,并间隔 30S 检测 Broker 是否存活,如果检测到 Broker 宕机(使用心跳机制, 如果检测超120S),则从路由注册表中将其移除;
  5. 消费者在订阅某个主题的消息之前从 NamerServer 获取 Broker 服务器地址列表(有可能是集群),但是消费者选择从 Broker 中 订阅消息,订阅规则由 Broker 配置决定。

1.5 RocketMQ 架构

在这里插入图片描述
RocketMQ 架构图中展示了四个集群:

01、NameServer 集群

提供轻量级的服务发现及路由,每个 NameServer 记录完整的路由信息,提供相应的读写服务,支持快速存储扩展。

NameServer 是一个功能齐全的服务器,主要包含两个功能:

  1. Broker 管理,接收来自 Broker 集群的注册请求,提供心跳机制检测 Broker 是否存活;
  2. 路由管理,每个 NameServer 持有全部有关 Broker 集群和客户端请求队列的路由信息。

02、Broker 集群

通过提供轻量级的 Topic 和 Queue 机制处理消息存储。同时支持推(Push)和拉(Pull)两种模型,包含容错机制。提供强大的峰值填充和以原始时间顺序累积数千亿条消息的能力。此外还提供灾难恢复,丰富的指标统计数据和警报机制,这些都是传统的消息系统缺乏的。

Broker 有几个重要的子模块:

  1. 远程处理模块,Broker 入口,处理来自客户端的请求;
  2. 客户端管理(包括消息生产者和消费者),维护消费者的主题订阅;
  3. 存储服务,提供在物理硬盘上存储和查询消息的简单 API;
  4. HA 服务,提供主从 Broker 间数据同步;
  5. 索引服务,通过指定键为消息建立索引并提供快速消息查询。

03、Producer 集群

消息生产者支持分布式部署,分布式生产者通过多种负载均衡模式向 Broker 集群发送消息。

04、Consumer 集群

消息消费者也支持 Push 和 Pull 模型的分布式部署,还支持集群消费和消息广播。提供了实时的消息订阅机制,可以满足大多数消费者的需求。

DefultMQPullConsumer :consumer 定时向 broker 发送请求获取内存数据,避免给 broker 造成巨大的压力。一般会在本地使用定时任务实现。

DefultMQPushConsumer :consumer 向 broker 发送请求,两者保持长链接的状态。broker 会定时(每 5 秒)去查询 consumer 中是否有要订阅的数据,有就将消息推送给 consumer。

无论是 pull 还是 push,其实本质上都是拉取消息。

07、集群工作流程

  1. 启动 NameServer,NameServer 启动后监听端口,等待 Broker、Producer、Consumer 连接,相当于一个路由控制中心。
  2. Broker 启动后,跟所有的 NameServer 保持长连接,定时发送心跳包。心跳包中包含当前 Broker 信息(IP + 端口等)以及存储所有 Topic 信息。注册成功后,NameServer 集群中就有 Topic 与 Broker 的映射关系。
  3. 收发消息前,先创建 Topic,创建 Topic 时需要指定该 Topic 要存储在哪些 Broker 上,也可以在发送消息时自动创建 Topic。
  4. Producer 发送消息,启动时先跟 NameServer 集群中的其中一台建立长连接,并从 NameServer 中获取当前发送的 Topic 存在哪些 Broker 上,轮询从队列列表中选择一个队列,然后与队列所在的 Broker 建立长连接从而向 Broker 发消息。
  5. Consumer 跟 Producer 类似,跟其中一台 NameServer 建立长连接,获取当前订阅 Topic 存在哪些 Broker 上,然后直接跟 Broker 建立连接通道,开始消费消息。

06、集群间的交互方式

  1. Broker Master 和 Broker Slave 是主从结构,会执行数据同步 Data Sync
    每个 Broker 与 NameServer 集群中所有节点建立长连接,定时注册 Topic 信息到所有 NameServer;
  2. Producer 与 NameServer 集群中的其中一个节点(随机)建立长连接,定期从 NameServer 获取 Topic 路由信息,并与提供 Topic 服务的 Broker Master 建立长连接,定时向 Broker 发送心跳;
  3. Producer 只能将消息发送到 Broker Master,但是 Consumer 同时和 Broker Master 和 Broker Slave 建立长连接,既可以从 Master 订阅消息,也可以从 Slave 订阅消息。

1.6 RocketMQ 优缺点

01、优点

  • 单机吞吐量:十万级
  • 可用性:非常高,分布式架构
  • 消息可靠性:经过参数优化配置,消息可以做到 0 丢失
  • 功能支持:MQ 功能较为完善,还是分布式的,扩展性好
  • 支持 10 亿级别的消息堆积,不会因为堆积导致性能下降
  • 源码是 Java,方便结合公司自己的业务进行二次开发
  • 天生为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削峰,在大量交易涌入时,后端可能无法及时处理的情况
  • RoketMQ 在稳定性上可能更值得信赖,这些业务场景在阿里双11已经经历了多次考验。

02、缺点

  • 支持的客户端语言不多,目前仅支持 Java 及 C++,而且 C++ 还不成熟
  • 没有在 MQ 核心中去实现 JMS 等接口,有些系统要迁移需要修改大量代码

1.7 各种 MQ 比较

在这里插入图片描述

二、RocketMQ 安装(Linux 版本)

安装 RocketMQ 版本:4.5.0,我这里使用的是阿里云服务器,也可以在虚拟机上操作。

2.1 环境要求

  1. Linux 64 位操作系统
  2. JDK 1.8
  3. Maven 3.9.0(maven 版本不固定,但是最好不要使用最高版本)

2.2 安装步骤

01、上传安装包

将下载好的 RocketMQ 安装包上传到服务器上:
在这里插入图片描述

02、解压安装包

# 解压
unzip rocketmq-all-4.5.0-bin-release.zip# 将解压包移动到指定路径下
mv rocketmq-all-4.5.0-bin-release ../software

在这里插入图片描述

03、参数配置

这里需要配置三个文件:

  1. /conf/broker.conf

    指定 broker 的命名空间地址和当前 broker 监听的 IP:
    在这里插入图片描述
    默认情况下,namesrvAddr = 127.0.0.1:9876brokerIP1 = 127.0.0.1

  2. /bin/runserver.sh

    RocketMQ 默认的虚拟机内存比较大,启动 Broker 如果因为内存不足失败,就需要编辑这两个配置文件,修改 JVM 内存大小:

    JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m  -XX:MaxMetaspaceSize=320m"
    

    在这里插入图片描述

  3. /bin/runbroker.sh

    同上,修改 JVM 内存大小:

    JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
    

    在这里插入图片描述

2.3 目录介绍

  • bin:启动脚本,包括 shell 脚本和 cmd 脚本
  • conf:实例配置文件,包括 broker 配置文件、logback 配置文件等
  • lib:依赖 jar 包,包括 Netty、commons-lang、FastJSON 等

2.4 启动 RocketMQ

启动前先查看进程:
在这里插入图片描述
启动命令(在 bin 目录下执行):

# 启动 nameserver
nohup sh mqnamesrv -n 公网IP:9876 &# 启动 broker
nohup sh mqbroker -n 公网IP:9876 -c conf/broker.conf autoCreateTopicEnable=true &

启动后查看进程,有这两个进程即启动成功:
在这里插入图片描述

2.5 测试 RocketMQ

  • 模拟生产者发送消息

    # 生产者
    sh tools.sh org.apache.rocketmq.example.quickstart.Producer
    

    输入这个命令后,控制台会输出很多的信息,不报错就说明发送成功了:
    在这里插入图片描述

  • 模拟消费者接收消息

    # 消费者
    sh tools.sh org.apache.rocketmq.example.quickstart.Consumer
    

    在这里插入图片描述

2.6 关闭 RocketMQ

# 关闭服务
sh mqshutdown namesrv
sh mqshutdown broker

三、rocketmq-console 集群监控平台搭建

3.1 简介

RocketMQ 有个可视化的管理界面,通过可视化界面,我们可以方便地监控 RocketMQ 集群,并实现很多操作。比如:创建管理 Topic,查看和发送 ,essage等等。

3.2 搭建集群监控平台

01、下载

RocketMQ提供了UI管理工具,名为rocketmq-console,项目地址:https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console。

这个是 rocketmq 的扩展,里面不仅包含控制台的扩展,也包含对大数据 flume、hbase 等组件的对接和扩展。

在这里插入图片描述

02、上传解压

  • 上传
    在这里插入图片描述
  • 解压
    # 解压
    unzip rocketmq-console.zip# 移动到 software 目录下
    mv rocketmq-console ../software
    
    在这里插入图片描述

03、修改配置参数

修改 rocketmq-console\src\main\resources\application.properties 配置文件:
在这里插入图片描述

04、打包

进入 rocketmq-console 目录下执行命令:

# 打包
mvn clean package ‐Dmaven.test.skip=true

打包完成后,会在 /software/rocketMQ/rocketmq-console 目录下生成一个 target 文件夹

05、启动和访问

  • 启动

    进入 /rocketmq-console/target 目录下执行:

    # 指定端口号和命名空间地址
    java -jar rocketmq-console-ng-1.0.1.jar --server.port=8088 --rocketmq.config.namesrvAddr=公网IP:9876
    

    在这里插入图片描述

  • 访问

    http://106.15.0.30:8088
    在这里插入图片描述

06、问题点

  • 防火墙

    防火墙需要开放访问 RocketMQ 的一系列端口号:

    # 查看防火墙状态
    systemctl status firewalld# 关闭防火墙
    systemctl stop firewalld# 启动防火墙
    systemctl start firewalld# 永久开放指定端口号【把用到的端口号都开放】
    firewall-cmd --zone=public --add-port=10909/tcp --permanent
    firewall-cmd --zone=public --add-port=10911/tcp --permanent
    firewall-cmd --zone=public --add-port=9876/tcp --permanent
    firewall-cmd --zone=public --add-port=9877/tcp --permanent# 重新加载防火墙
    firewall-cmd --reload# 或者重启防火墙
    systemctl restart firewalld.service# 查看防火墙信息列表
    firewall-cmd --list-all# 只查看防火墙开放端口号列表
    firewall-cmd --list-ports
    

    除了这一层防火墙之外,阿里云服务器自己还有一层防火墙 iptables,是默认配置的,我们也需要关闭这层防火墙或者开放对应的端口号:

    # 查看防火墙状态出现的问题
    service iptables status# 关闭防火墙
    service iptables stop
    
  • 安全组

    防火墙端口号开放之后,同时也需要在 ECS 服务器安全组中添加端口规则:
    在这里插入图片描述

四、RocketMQ 发送消息基本样例

引入 jar 包:

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.5.0</version>
</dependency>

4.1 普通消息发送

/*** 同步发送消息** @author qiaohaojie* @date 2023/2/20  17:00*/
public class SyncProducer {public static void main(String[] args) throws Exception {// 1. 实例化消息生产者 producerDefaultMQProducer producer = new DefaultMQProducer("group_producer_demo1");// 2. 设置 nameServer 的地址producer.setNamesrvAddr("公网IP:9876");// 关闭 VIP 通道
//        producer.setVipChannelEnabled(false);producer.setSendMsgTimeout(3000);// 3. 启动 Producer 实例producer.start();for (int i = 0; i < 10; i++) {// 4. 创建消息 messageMessage message = new Message("Topic_Demo", "Tags", "Hello RocketMQ" + i, "hello".getBytes(RemotingHelper.DEFAULT_CHARSET));// 5. 发送消息SendResult result = producer.send(message);System.out.println(result);}// 6. 关闭 producerproducer.shutdown();}
}

这里的 message 实例中有几个参数:
在这里插入图片描述

  1. topic:代表消息的主题
  2. tags:主要用于消息过滤
  3. keys:消息的唯一值
  4. body:消息体,代表消息的内容
    在这里插入图片描述

4.2 普通消息消费

/*** 同步发送消息-消费者** @author qiaohaojie* @date 2023/2/20  21:39*/
public class Consumer {public static void main(String[] args) throws Exception {// 1. 创建DefaultMQPushConsumer实例DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_consumer_demo1");// 2. 设置nameServer地址consumer.setNamesrvAddr("公网IP:9876");// 设置消息拉取最大数consumer.setConsumeMessageBatchMaxSize(2);// 3. 设置subscribe,这里是要读取的主题信息consumer.subscribe("Topic_Demo", "*");// 4. 设置消息监听consumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {// 5. 获取消息信息// 迭代消息信息for (MessageExt msg : msgs) {try {// 获取主题String topic = msg.getTopic();// 获取标签String tags = msg.getTags();// 获取信息String result = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);System.out.println("Consumer消费消息--topic:" + topic + ",targs:" + tags + ",result:" + result);} catch (UnsupportedEncodingException e) {e.printStackTrace();// 消息重试return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}// 6. 返回消息读取状态// 消息消费完成return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 开启Consumerconsumer.start();}
}

消费消息时有两个参数:
在这里插入图片描述

  1. topic:指定要消费的主题
  2. subExpression:消息过滤规则
    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_71752.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux docker(03)可使用GPU渲染的x11docker实战总结

该系列文章的目的旨在之前的章节基础上&#xff0c;使用x11docker构建一个可以使用GPU的docker容器。该容器可以用于3D图形渲染/XR 等使用GPU渲染的程序调试和运行。 0 why docker 为什么非要用x11docker&#xff0c;而不是其他的docker呢&#xff1f; 因为一般的docker是不…

第2讲-数据库系统的结构抽象与演变(测试题总结)

一、测试题 DBS的三级模式&#xff1a;外模式&#xff08;也叫用户模式或子模式&#xff09;&#xff0c;模式&#xff08;也叫逻辑模式&#xff09;&#xff0c;内模式&#xff08;也叫存储模式&#xff09; 外模式/模式映像 实现了数据的逻辑独立性 模式/内模式映像 实现了…

C++ 入门篇(八) auto关键字

目录 一、auto简介 二、auto的使用场景 三、注意事项 四、源代码 一、auto简介 在早期C/C中auto的含义是&#xff1a;使用auto修饰的变量&#xff0c;是具有自动存储器的局部变量&#xff0c;C11中&#xff0c;标准委员会赋予了auto全新的含义即&#xff1a;auto不再是一个存…

c++ 那些事 笔记

GitHub - Light-City/CPlusPlusThings: C那些事 1. ① extern extern关键字&#xff0c;C语言extern关键字用法详解 如果全局变量不在文件的开头定义&#xff0c;其有效的作用范围只限于其定义处到文件结束。如果在定义点之前的函数想引用该全局变量&#xff0c;则应该在…

前缀和差分(C/C++)

目录 1. 前缀和的定义 2. 一维前缀和 2.1 计算公式 2.2 用途 2.3 小试牛刀 3. 二维前缀和 3.1 用途 1. 前缀和的定义 对于一个给定的数列A&#xff0c;他的前缀和数中 S 中 S[ i ] 表示从第一个元素到第 i 个元素的总和。 如下图&#xff1a;绿色区域的和就是前缀和数组…

清洁级动物(CL)实验室设计SICOLAB

清洁级动物&#xff08;CL&#xff09;实验室设计清洁级动物&#xff08;CL&#xff09;实验室设计有哪些内容&#xff1f;工艺流程是如何&#xff1f;功能房间的划分清洁级动物实验室&#xff08;CL实验室&#xff09;是进行高洁净度动物实验的专门场所&#xff0c;需要满足一…

Shopee、ebay、亚马逊等跨境卖家了解测评的一篇干货

随着时代的发展&#xff0c;大家越来越喜欢网购&#xff0c;国外也有亚马逊、沃尔码、阿里国际、速卖通、ebay、shopee、Lazada、ozon、temu等等&#xff0c;而国外这些平台也有很大的市场&#xff0c;跨境电商也随时诞生&#xff0c;而当今社会环境实体生意越来越难做&#xf…

Kubernetes二 Kubernetes之实战以及pod详解

Kubernetes入门 一 Kubernetes实战 本章节将介绍如何在kubernetes集群中部署一个nginx服务&#xff0c;并且能够对其进行访问。 1.1 Namespace Namespace是kubernetes系统中的一种非常重要资源&#xff0c;它的主要作用是用来实现多套环境的资源隔离或者多租户的资源隔离。…

【java】Spring Cloud --Spring Cloud 的核心组件

文章目录前言一、Eureka&#xff08;注册中心&#xff09;二、Zuul&#xff08;服务网关&#xff09;三、 Ribbon&#xff08;负载均衡&#xff09;四、Hystrix&#xff08;熔断保护器&#xff09;五、 Feign&#xff08;REST转换器&#xff09;六、 Config&#xff08;分布式配…

飞塔Fortinet防火墙SSL VPN双因素身份认证(2FA)方案

作为行业领先的防火墙厂商&#xff0c;飞塔Fortinet结合了高性能 VPN 功能&#xff0c;代表了网络安全的新概念。其中飞塔Fortinet防火墙 SSL VPN 因其突出的安全性能而被广泛应用在远程办公场景中。但在 SSL VPN 登录时用户仅需输入用户名和固定的静态密码&#xff0c;若遭遇账…

kettle安装部署_简单认识_Spoon勺子界面---大数据之kettle工作笔记002

然后我们来看一下这个kettle的安装,很简单,下载解压就可以了 上面的地址是官网很烂 下面的地址好一些 这个是官网可以看到很慢,很不友好 这个是下面那个地址,可以看到 最新的是9.0了,一般都用 一般都用8.2 这里下载这个就可以了 下载以后可以看到有个pdi

LeetCode 每日一题2347. 最好的扑克手牌

Halo&#xff0c;这里是Ppeua。平时主要更新C语言&#xff0c;C&#xff0c;数据结构算法......感兴趣就关注我吧&#xff01;你定不会失望。 &#x1f308;个人主页&#xff1a;主页链接 &#x1f308;算法专栏&#xff1a;专栏链接 我会一直往里填充内容哒&#xff01; &…

《爆肝整理》保姆级系列教程python接口自动化(十九)--Json 数据处理---实战(详解)

简介 上一篇说了关于json数据处理&#xff0c;是为了断言方便&#xff0c;这篇就带各位小伙伴实战一下。首先捋一下思路&#xff0c;然后根据思路一步一步的去实现和实战&#xff0c;不要一开始就盲目的动手和无头苍蝇一样到处乱撞&#xff0c;撞得头破血流后而放弃了。不仅什么…

【大数据离线开发】7.2 搭建HBase环境

7.2 搭建HBase的环境 准备工作&#xff1a; 解压Hbase安装包 [rootbigdata111 tools]# tar -zxvf hbase-1.3.1-bin.tar.gz -C ~/training/设置Hadoop的环境变量 vi ~/.bash_profile HBASE_HOME/root/training/hbase-1.3.1 export HBASE_HOMEPATH$HBASE_HOME/bin:$PATH export…

005 利用fidder抓取app的api,获得股票数据

一、下载安装fidder 百度搜索fidder直接下载&#xff0c;按提示安装即可。 二、配置fidder 1. 打开fidder&#xff0c;选择tools——options。 2. 选择HTTPS选项卡&#xff0c;勾选前三项&#xff0c;然后点击右侧【actions】&#xff0c;选择【trust root certificate】&a…

黑马程序员-Linux系统编程-01

课程链接 01-Linux命令基础习惯-Linux系统编程_哔哩哔哩_bilibili 课程重点笔记 01-linux命令基础习惯 终端 终端&#xff1a;一切输入、输出的总称&#xff0c;因此终端并不是一定指的是命令行&#xff0c;只要是能进行输入或者输出即可&#xff0c;但是在linux终端上‘’内…

Java EE|TCP/IP协议栈之传输层UDP协议详解

文章目录一、对UDP协议的感性认识简介主要特点二、UDP的报文结构协议端格式概览报文结构详解源端口目的端口16位UDP报文长度16位校验和参考一、对UDP协议的感性认识 简介 UDP&#xff0c;是User Datagram Protocol的简称&#xff0c;中文名是用户数据报协议&#xff0c;是OSI…

RocketMQ 第二章

RocketMQ 第二章 7、SpringBoot整合RocketMQ SpringBoot 提供了快捷操作 RocketMQ 的 RocketMQTemplate 对象。 7.1、引入依赖 注意依赖的版本需要和 RocketMQ 的版本相同。 <dependency><groupId>org.apache.rocketmq</groupId><artifactId>rock…

【Java期末复习】《面向对象程序设计》练习库

目录 一、单选题 二、填空题 三、程序填空题 1、 super使用--有如下父类和子类的定义&#xff0c;根据要求填写代码 2、简单加法计算器的实现 3、House类 4、矩形类 5、创建一个Box类&#xff0c;求其体积 四、函数题 6-1 求圆面积自定义异常类 6-2 判断一个数列是…

会利用信息差赚钱的人才是聪明人

毕业后找不到工作&#xff0c;穷到只剩下时间&#xff0c;大小做了20多份副业兼职&#xff0c;终于找到了可靠的渠道&#xff0c; 我是专科生&#xff0c;学历不好&#xff0c;专业拉胯。毕业后&#xff0c;我找了两三份工作。要么工资太低&#xff0c;只能交房租&#xff0c;…