SpringCloudAlibaba:消息驱动之RocketMQ学习

news/2024/4/27 22:29:11/文章来源:https://blog.csdn.net/Microhoo_/article/details/131711892

目录

一、MQ简介

(一)什么是MQ

(二)MQ的应用场景

1、异步解耦

2、流量削峰

(三)常见的MQ产品

二、RocketMQ入门

(一)RocketMQ安装部署

1、环境要求

2、下载RocketMQ

3、安装RocketMQ

4、启动RocketMQ

5、测试RocketMQ

6、关闭RocketMQ

(二)RocketMQ控制台安装与启动

下载并解压

三、springcloud集成rocketmq

(一)产品微服务-发送消息

1、pom添加依赖

2、application.yml配置

3、controller发送消息

(二)用户微服务-订阅消息

1、pom添加依赖

2、application.yml配置

3、消息接收服务

4、测试

(三)控制台

四、不同类型的消息发送与接收

(一)普通消息

1、可靠同步发送(sync)

2、可靠异步发送(async)

3、单向发送(oneway)

4、三种发送方式的对比

5、发消息代码案例

(二)顺序消息

(三)广播模式

(四)延时消息

(五)批量消息

(六)过滤消息

(七)事务消息

(八)消息消费要注意的细节


一、MQ简介

(一)什么是MQ

MQ(Message Queue <消息队列>)是一种跨进程的通信机制,用于传递消息。通俗点说,就是一个先进先出的数据结构。

(二)MQ的应用场景

1、异步解耦

最常见的一个场景是用户注册后,需要发送注册邮件和短信通知,以告知用户注册成功。传统的做法如下:

此架构下注册、邮件、短信三个任务全部完成后,才返回注册结果到客户端,用户才能使用账号登录。 但是对于用户来说,注册功能实际只需要注册系统存储用户的账户信息后,该用户便可以登录,而后续的注册短信和邮件不是即时需要关注的步骤。


所以实际当数据写入注册系统后,注册系统就可以把其他的操作放入对应的消息队列 MQ 中然后马上返回用户结果,由消息队列 MQ 异步地进行这些操作。架构图如下:

异步解耦是消息队列 MQ 的主要特点,主要目的是减少请求响应时间和解耦。主要的使用场景就是将比较耗时而且不需要即时(同步)返回结果的操作作为消息放入消息队列。同时,由于使用了消息队列MQ,只要保证消息格式不变,消息的发送方和接收方并不需要彼此联系,也不需要受对方的影响,即解耦合。

2、流量削峰

流量削峰也是消息队列 MQ 的常用场景,一般在秒杀或团队抢购(高并发)活动中使用广泛。在秒杀或团队抢购活动中,由于用户请求量较大,导致流量暴增,秒杀的应用在处理如此大量的访问流量后,下游的通知系统无法承载海量的调用量,甚至会导致系统崩溃等问题而发生漏通知的情况。为解决这些问题,可在应用和下游通知系统之间加入消息队列 MQ。

秒杀处理流程如下所述:

  • 用户发起海量秒杀请求到秒杀业务处理系统。

  • 秒杀处理系统按照秒杀处理逻辑将满足秒杀条件的请求发送至消息队列 MQ。

  • 下游的通知系统订阅消息队列 MQ 的秒杀相关消息,再将秒杀成功的消息发送到相应用户。

  • 用户收到秒杀成功的通知。

(三)常见的MQ产品

ZeroMQ、RabbitMQ、ActiveMQ、RocketMQ、Kafka

二、RocketMQ入门

RocketMQ是阿里巴巴开源的分布式消息中间件,现在是Apache的一个顶级项目。在阿里内部使用非常广泛,已经经过了"双11"这种万亿级的消息流转。

(一)RocketMQ安装部署

接下来我们先在linux平台下安装一个RocketMQ的服务

1、环境要求

  • Linux 64位操作系统

  • 64bit JDK 1.8+

2、下载RocketMQ

Release Notes - Apache RocketMQ - Version 4.9.5 | RocketMQ

下载时要注意与springcloudalibaba版本匹配 版本说名链接地址

3、安装RocketMQ

  1. 上传文件到Linux系统

  2. 解压到安装目录

    [root@bogon RocketMQ]# unzip rocketmq-all-4.9.5-bin-release.zip
    [root@bogon RocketMQ]# ll
    total 32136
    drwxr-xr-x. 6 root root      103 Mar 27 14:47 rocketmq-all-4.9.5-bin-release
    -rw-r--r--. 1 root root 32906177 May 24 10:22 rocketmq-all-4.9.5-bin-release.zip
  3. 修改RocketMQ启动配置

    bin 下的 3 个配置文件不然会报insufficient memory:

    1)runserver.sh

    vi runserver.sh 
    ​
    # JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
    JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn512m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

    2)runbroker.sh

    vi runbroker.sh
    ​
    # JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
    JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"

    3)tools.sh

    vi tools.sh
    ​
    # JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m"
    JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m"
  4. 开启自动动创建Topic功能 在conf/broker.conf⽂件中加⼊如下配置,开启自动动创建Topic功能。

    autoCreateTopicEnable=true

4、启动RocketMQ

  1. 启动NameServer 执行命令启动NameServer

    ## 创建日志目录 
    cd bin
    mkdir logs
    ​
    # nohup ./mqnamesrv &:属于后台以静默⽅式启动
    # ./mqnamesrv:属于终端启动,直接输出日志信息,按 ctrl+c 可直接关闭退出
    ​
    nohup ./mqnamesrv > logs/mqnamesrv.out 2>1 &

    查看启动状态,在当前目录下会有一个nohup.out的日志文件,可以打开查看。

    ## 查看日志
    tail -f logs/mqnamesrv.out
    ​
    ## 看到以下表示启动成功
    The Name Server boot success. serializeType=JSON

    解决报错

    ## 报错
    ERROR: Please set the JAVA_HOME variable in your environment, We need java(x64)! !!
    ​
    ## 解决 配置jdk环境变量
    # 需要 export 环境变量

  2. 启动Broker

    同样进入 RocketMQ 安装目录下的 /bin目录进行操作 执行启动命令,并且常驻内存,注意ip地址要配置成为服务的ip地址,保证地址以及端口能够访问。

    # 启动命令,并且常驻内存:注意ip地址要配置成为服务的ip地址,保证地址以及端口能够访问
    # nohup ./mqbroker -n 192.168.109.149:9876 & :属于后台以静默⽅式启动
    # sh ./mqbroker -n 92.168.109.149:9876 :属于终端启动,直接输出日志信息,按 ctrl+c 可直接关闭退出
    ​
    nohup ./mqbroker -n 192.168.109.149:9876 > logs/mqbroker.out 2>1 &

    查看启动状态,启动之后同样提示将日志信息追加到了当前目录下的nohup.out文件中。

    ## 查看日志
    tail -f logs/mqbroker.out
    ​
    ## 看到以下表示启动成功
    The broker[linux1, 192.168.109.149:10911] boot success. serializeType=JSON and name server is 192.168.109.149:9876

    解决没反应:

    #删除/root/store/*
    cd /root/store
    rm -rf *
    ​
    # 重新启动broker
    nohup ./mqbroker -n 192.168.109.149:9876 > logs/mqbroker.out 2>1 &

5、测试RocketMQ

发送/接收消息之前,需要告诉客户端(Producer、Consumer)名称服务器的位置,RocketMQ 提供了多种方法来实现这一点.

编程方式,如:producer.setNamesrvAddr(“ip:port”) Java 选项,如:rocketmq.namesrv.addr 环境变量,如:NAMESRV_ADDR HTTP 端点

  1. 测试消息发送

    [root@linux1 rocketmq]# export NAMESRV_ADDR=localhost:9876
    [root@linux1 rocketmq]# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
    OpenJDK 64-Bit Server VM warning: MaxNewSize (262144k) is equal to or greater than the entire heap (262144k).  A new max generation size of 261632k will be used.
    16:19:05.806 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework
    RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
    RocketMQLog:WARN Please initialize the logger system properly.
    SendResult [sendStatus=SEND_OK, msgId=AC11000176396FF3C5B512F379FA0000, offsetMsgId=AC11000100002A9F0000000000000000, messageQueue=MessageQueue [topic=TopicTest, brokerName=linux1, queueId=3], queueOffset=0]
    ......
    SendResult [sendStatus=SEND_OK, msgId=AC11000176396FF3C5B512F382B603E7, offsetMsgId=AC11000100002A9F00000000000317BF, messageQueue=MessageQueue [topic=TopicTest, brokerName=linux1, queueId=2], queueOffset=249]
    16:19:08.609 [NettyClientSelector_1] INFO  RocketmqRemoting - closeChannel: close the connection to remote address[172.17.0.1:10911] result: true
    16:19:08.631 [NettyClientSelector_1] INFO  RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
  2. 测试消息接收

    [root@linux1 rocketmq]# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
    OpenJDK 64-Bit Server VM warning: MaxNewSize (262144k) is equal to or greater than the entire heap (262144k).  A new max generation size of 261632k will be used.
    16:21:15.395 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework
    Consumer Started.
    ConsumeMessageThread_3 Receive New Messages: [MessageExt [brokerName=linux1, queueId=2, storeSize=201, queueOffset=1, sysFlag=0, bornTimestamp=1659601146477, bornHost=/192.168.0.101:48216, storeTimestamp=1659601146478, storeHost=/172.17.0.1:10911, msgId=AC11000100002A9F000000000000057F, commitLogOffset=1407, bodyCRC=988340972, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1659601275866, UNIQ_KEY=AC11000176396FF3C5B512F37A6D0007, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 55], transactionId='null'}]] 
    ConsumeMessageThread_4 Receive New Messages: [MessageExt [brokerName=linux1, queueId=2, storeSize=202, queueOffset=2, sysFlag=0, bornTimestamp=1659601146500, bornHost=/192.168.0.101:48216, storeTimestamp=1659601146501, storeHost=/172.17.0.1:10911, msgId=AC11000100002A9F00000000000008A4, commitLogOffset=2212, bodyCRC=2088767104, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1659601275867, UNIQ_KEY=AC11000176396FF3C5B512F37A84000B, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49, 49], transactionId='null'}]] 

    消息发送完毕之后就会退出,在同一窗口中可以使用消费者类来进行接收消息,消费是多线程的。

6、关闭RocketMQ

与启动顺序相反进行关闭,先关闭 broker、在关闭 nameserv

./mqshutdown broker
./mqshutdown namesrv

(二)RocketMQ控制台安装与启动

下载并解压

下载地址:Tags · apache/rocketmq-externals · GitHub

下载rocketmq-console-1.0.0:https://codeload.github.com/apache/rocketmq-externals/zip/refs/tags/rocketmq-console-1.0.0

2、将压缩包放到你平时放项目的文件夹中,解压到当前文件夹

3、点进去,选中rocketmq-console

4、进入idea,找到解压的文件夹

5、选中下面 rocketmq-console ,点击ok,打开

6、然后点击 Trust Object(可信的实体),再点击建一个新的窗口,进去之后找到配置文件:按图修改

7、运行启动类,访问访问控制台 http://127.0.0.1:8080

 

三、springcloud集成rocketmq

接下来我们模拟一种场景: 下单成功之后,向下单用户发送短信。设计图如下:

(一)产品微服务-发送消息

1、pom添加依赖

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.2</version>
</dependency>

2、application.yml配置

rocketmq:name-server: 虚拟机ip.XXX.XXX.XXX:9876producer:group: product-server # 生产者消息分组

3、controller发送消息

@RestController
@RequestMapping("/mq")
public class RocketMQController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@GetMapping("/send")public void send(){rocketMQTemplate.convertAndSend("product-topic", "hello world from repository!!");}
}

(二)用户微服务-订阅消息

1、pom添加依赖

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.2</version>
</dependency>

2、application.yml配置

rocketmq:name-server: 虚拟机ip.XXX.XXX.XXX:9876

3、消息接收服务

//@RocketMQMessageListener 注解用于配置一个消息监听器,它包含以下两个重要的参数
//topic:指定监听器要消费哪个主题的消息。一个主题可以有多个消费者组订阅。
//consumerGroup:指定监听器属于哪个消费者组。一个消费者组内的消费者会负责消费主题的一部分消息,实现负载均衡。
@Service
@RocketMQMessageListener(consumerGroup = "product-server", topic = "product-topic")
public class SmsService implements RocketMQListener<String> {private Logger logger = LoggerFactory.getLogger("user-server");@Overridepublic void onMessage(String s) {logger.info("收到一个订单信息{},接下来发送短信"+s);}
}

4、测试

调用产品微服务的接口发送消息,在用户微服务的控制台查看接收到的消息即可。

 

(三)控制台

在控制台可以查看到主题,可以在控制台发送消息: 

然后idea中user模块控制台可以接收到消息。

四、不同类型的消息发送与接收

(一)普通消息

RocketMQ提供三种方式来发送普通消息:可靠同步发送、可靠异步发送和单向发送。

1、可靠同步发送(sync)

同步发送是指消息发送方发出数据后,会在收到接收方发回响应之后才发下一个数据包的通讯方式。 此种方式应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等可靠性要求高的的数据或者需要实时响应的数据。

这种方式具有内部重试机制,即在主动声明本次消息发送失败之前,内部实现将重试一定次数,默认为2次(DefaultMQProducer#getRetryTimesWhenSendFailed)。 发送的结果存在同一个消息可能被多次发送给给broker,这里需要应用的开发者自己在消费端处理幂等性问题。

2、可靠异步发送(async)

异步发送是指发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式。发送方通过回调接口接收服务器响应,并对响应结果进行处理。 异步发送一般用于链路耗时较长,对 RT 响应时间较为敏感的业务场景,例如用户视频上传后通知启动转码服务,转码完成后通知推送转码结果等。

同同步发送一样,异步模式也在内部实现了重试机制,默认次数为2次(DefaultMQProducer#getRetryTimesWhenSendAsyncFailed})。发送的结果同样存在同一个消息可能被多次发送给给broker,需要应用的开发者自己在消费端处理幂等性问题。

3、单向发送(oneway)

单向发送是指发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。 适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。

4、三种发送方式的对比

在实际使用场景中,利用何种发送方式,可以总结如下:

  • 当发送的消息很重要,且对响应时间不敏感的时候采用sync方式;

  • 当发送的消息很重要,且对响应时间非常敏感的时候采用async方式;

  • 当发送的消息不重要,采用one-way方式,以提高吞吐量;

5、发消息代码案例

product模块

@Slf4j
@RestController
@RequestMapping("/mq")
public class RocketMQController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;// 同步消息@GetMapping("/testSyncSend")public void testSyncSend(){//参数一: topic, 如果想添加tag 可以使用"topic:tag"的写法//参数二: 消息内容SendResult sendResult = rocketMQTemplate.syncSend("product-topic", "这是一条同步消息");log.info(sendResult.toString());}// 异步消息@GetMapping("/testASyncSend")public void testASyncSend(){//参数一: topic, 如果想添加tag 可以使用"topic:tag"的写法//参数二: 消息内容//参数三: 回调函数, 处理返回结果rocketMQTemplate.asyncSend("product-topic", "这是一条异步消息", new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info(sendResult.toString());}@Overridepublic void onException(Throwable throwable) {log.info(throwable.toString());}});//因为是异步发送,所以让线程不要终止以便测试try {Thread.sleep(60000);} catch (Exception e) {throw new RuntimeException(e);}}// 单向消息@GetMapping("/testOneWay")public void testOneWay(){rocketMQTemplate.sendOneWay("product-topic", "这是一条单向消息");}}

接收异步、同步消息可以使用RocketMQ的消息监听器。通过实现RocketMQListener接口来监听指定Topic上的消息,异步处理消息时不需要等待,示例代码如下:

user模块

@Service
@RocketMQMessageListener(consumerGroup = "product-server", topic = "product-topic")
public class SmsService implements RocketMQListener<String> {private Logger logger = LoggerFactory.getLogger("user-server");@Overridepublic void onMessage(String s) {logger.info("收到一个产品信息{},接下来发送短信"+s);}
}

在上面的示例中,通过使用@RocketMQMessageListener注解指定了Topic和Consumer Group,消息被接收到后会自动调用onMessage方法进行处理。

需要注意的是,在异步接收消息时,RocketMQ会启动多个消费线程来处理消息,需要确保消息处理的线程安全性。

(二)顺序消息

顺序消息是消息队列提供的一种严格按照顺序来发布和消费的消息类型

//同步顺序消息[异步顺序 单向顺序写法类似]

// 顺序消息
@GetMapping("/testSyncSendOrderly")
public void testSyncSendOrderly(){//第三个参数用于队列的选择rocketMQTemplate.syncSendOrderly("product-topic", "这是一条异步顺序消息", "xxxx");
}

(三)广播模式

在RocketMQ中,消息可以以广播模式发送,广播模式的消息会被所有订阅了该topic的消费者都接收到。下面是一个简单的广播模式下消息的发送和接收的示例:

  1. 发送广播模式的消息 (一个服务发,两个服务收)

    @GetMapping("/sendBroadcastMsg")
    public void sendBroadcastMsg(@RequestParam("msg") String msg){Message<String> message = MessageBuilder.withPayload(msg).build();SendResult sendResult = rocketMQTemplate.syncSend("product-topic", message);System.out.printf("广播消息发送结果:MsgId:%s SendStatus:%s%n", sendResult.getMsgId(), sendResult.getSendStatus());
    }

    这个消息发给一个主题(product-topic),只要是这个主题都可以收到消息,但消费者的组名得不同。

  2. 接收广播模式的消息

    order-server

    @Service                // consumerGroup  消费者消息分组  topic 主题
    @RocketMQMessageListener(consumerGroup = "order-server", topic = "product-topic")
    public class MessageListener implements RocketMQListener<String> {private Logger logger = LoggerFactory.getLogger("order-server");@Overridepublic void onMessage(String message) {logger.info("收到一个短信:"+message);}
    }

    user-server

    @Service
    @RocketMQMessageListener(consumerGroup = "user-server", topic = "product-topic")
    public class MessageListener implements RocketMQListener<String> {private Logger logger = LoggerFactory.getLogger("user-server");@Overridepublic void onMessage(String s) {logger.info("收到一个短信: "+s);}
    }

    生产者发送消息,两个消费者都收到消息了。

上述示例代码中,通过使用@RocketMQMessageListener注解指定了订阅的Topic和Consumer Group,实现对广播消息的接收。接收广播模式的消息时,消息广播到所有订阅该topic的消费者,每个消费者都会独立接收到广播的消息。

需要注意的是,广播消息可能会被重复消费,消费者收到广播消息后会自动提交offset,若后续有新的消费者加入,则会从消费组最早的消费offset位置重新消费。

(四)延时消息

可以使用Spring Boot提供的RocketMQTemplate来发送延时消息,具体代码如下:

当使用 syncSend 发送延迟消息时,必须注意以下两个点:

  1. 必须设置 timeout 超时时间,防止方法堵塞

  2. timeout 时间建议设置稍大于 delayLevel 对应的延迟时间

如果不设置 timeout 时间,方法会永久堵塞,直到消息被消费后才返回。

@GetMapping("/sendMsg")public void sendMsg(String msg) {// 发送延时消息,设置延时10秒(即该消息将在10秒后被消费)Message<String> message = MessageBuilder.withPayload(msg).build();// 这里设置 timeout = 10000 毫秒,即 10 秒。同时设置 delayLevel = 3,对应 10 秒的延迟。// 这样可以确保,即使消息在 broker 延迟 10 秒未消费,syncSend 方法也不会永久堵塞,它会在超过 10 秒后返回。rocketMQTemplate.syncSend("product-topic", message, 10000, 3);}

RocketMQTemplate提供了syncSend方法,第三个参数是超时时间,第四个参数是延时时间。以上示例中延时级别为3,即延时10秒。

如果需要在Spring Boot应用中接收延时消息,推荐使用@RocketMQMessageListener注解,具体代码如下:

@Service
@RocketMQMessageListener(consumerGroup = "user-server", topic = "product-topic")
public class MessageListener implements RocketMQListener<String> {private Logger logger = LoggerFactory.getLogger("user-server");@Overridepublic void onMessage(String s) {logger.info("收到一个短信: "+s);}
}

等待十秒钟,订阅主题的消费者们会收到消息。当收到延时消息后,Spring会自动调用onMessage方法来处理消息。

RocketMQ支持通过延时级别来控制消息的延时时间,延时级别设置在消息发送时,具体的延时时间由该延时级别对应的配置参数决定。RocketMQ定义了18个延时级别,级别从1到18,延时时间从1秒到2小时不等,各级别对应的延时时间和配置参数如下:

例如,若要设置一个延时30秒的消息,可以将延时级别设为4,参数设置为delayLevel=4。

需要注意的是,这里提到的延时时间是指消息发送后到达Broker存储的时间,与消息从Broker发送到消费者端的时间无关。在实际使用过程中,考虑到网络延迟等因素,消息的最终消费时间可能会比预设的延时时间略晚。

(五)批量消息

下面是一个使用SpringBoot整合RocketMQ的示例代码,演示如何发送和接收批量消息。

发送批量消息:

// 批量消息
@GetMapping("/sendBatchMsg")
public void sendBatchMsg() {List<String> messages = new ArrayList<>();messages.add("Message 1");messages.add("Message 2");messages.add("Message 3");messages.add("Message 4");messages.add("Message 5");try {SendResult result = rocketMQTemplate.syncSend("product-topic", messages);System.out.println("Batch message send result: " + result);} catch (MessagingException e) {e.printStackTrace();}
}

在以上示例中,使用了RocketMQTemplate的syncSend()方法,来发送批量消息。syncSend()方法的第二个参数可以是一个List集合,每个元素表示一条消息。具体使用时,可以将多条消息打包成一个List集合,然后将集合作为syncSend()方法的第二个参数。最后通过获取SendResult对象,来获取消息发送的结果信息。

接收批量消息的RocketMQListener:(user-server接收代码,order-server和下面差不多,只是消费消息分组不同)

@Service
@RocketMQMessageListener(consumerGroup = "user-server", topic = "product-topic")
public class MessageListener implements RocketMQListener<List<String>> {private Logger logger = LoggerFactory.getLogger("user-server");@Overridepublic void onMessage(List<String> s) {logger.info("收到一个短信: "+s);}
}

在以上示例中,通过实现RocketMQListener<List>接口,来接收List类型的批量消息。在重写的onMessage()方法中,处理接收到的批量消息。

需要特别注意的是,RocketMQListener的泛型需要根据发送的消息类型进行设置。如果发送的是String类型的批量消息,那么RocketMQListener的泛型就应该设置为List。如果发送的是其他类型的批量消息,比如自定义的对象,那么需要自定义RocketMQMessageConverter实现类,来将消息转换为指定的类型。

(六)过滤消息

下面是一个使用SpringBoot整合RocketMQ的示例代码,演示如何发送和接收过滤消息。

@GetMapping("/sendFilterMsg")
public void sendMsg(@RequestParam("msg") String msg, @RequestParam("tag") String tag) {rocketMQTemplate.convertAndSend("product-topic" + ":" + tag, message);
}

在以上示例中,使用Autowired注解自动注入RocketMQTemplate实例,并通过setMessageProperty()方法和setTags()方法指定消息的属性和标签。最后调用convertAndSend()方法,将消息发送到名为product-topic的主题。

接收过滤消息的RocketMQListener:

@Slf4j
@Service
@RocketMQMessageListener(topic = "product-topic",consumerGroup = "filter-user-server",selectorExpression = "tagA || tagB",selectorType = SelectorType.TAG
)
public class FilterMsgListener implements RocketMQListener<String> {@Overridepublic void onMessage(String msg) {log.info("收到一个过滤短信:"+ msg);}
}

在以上示例中,通过@RocketMQMessageListener注解指定了消费组、主题和选择器表达式(相当于过滤表达式)。在选择器表达式中使用了消息的属性prop1和prop2,以及消息的标签tag1来过滤消息。最后通过实现RocketMQListener接口,来处理接收到的过滤消息。

(七)事务消息

RocketMQ提供了事务消息,通过事务消息就能达到分布式事务的最终一致。 事务消息交互流程:

两个概念:

  1. 半事务消息:暂不能投递的消息,发送方已经成功地将消息发送到了RocketMQ服务端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半事务消息。

  2. 消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,RocketMQ服务端通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback),该询问过程即消息回查。

事务消息发送步骤:

  1. 发送方将半事务消息发送至RocketMQ服务端。

  2. RocketMQ服务端将消息持久化之后,向发送方返回Ack确认消息已经发送成功,此时消息为半事务消息。

  3. 发送方开始执行本地事务逻辑。

  4. 发送方根据本地事务执行结果向服务端提交二次确认(Commit 或是 Rollback),服务端收到Commit 状态则将半事务消息标记为可投递,订阅方最终将收到该消息;服务端收到 Rollback 状态则删除半事务消息,订阅方将不会接受该消息。

事务消息回查步骤:

  1. 在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查。

  2. 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。

  3. 发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行操作。

(八)消息消费要注意的细节

@RocketMQMessageListener(consumerGroup = "shop",//消费者分组topic = "order-topic",//要消费的主题consumeMode = ConsumeMode.CONCURRENTLY, //消费模式:无序和有序messageModel = MessageModel.CLUSTERING, //消息模式:广播和集群,默认是集群
)
public class SmsService implements RocketMQListener<Order> {
}

RocketMQ支持两种消息模式:

  • 广播消费: 每个消费者实例都会收到消息,也就是一条消息可以被每个消费者实例处理;

  • 集群消费: 一条消息只能被一个消费者实例消费

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_331095.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

nginx的前端集成

对于springcloud项目&#xff0c;后端我们有很多的微服务&#xff0c;当然前端我们也可以有很多的小项目进行集成 前端项目部署思路 通过nginx来进行配置&#xff0c;功能如下 通过nginx的反向代理功能访问后台的网关资源 通过nginx的静态服务器功能访问前端静态页面 配置ng…

CSS3绘制3D银行卡片层叠展示特效

使用纯css3绘制3D银行卡层叠展示特效 具体示例如下 <template><div><div class"tariffCards"><div class"economy"><img src"../images/css-article-imgs/example-css3D-card/tarcs.png" alt"中信银行" he…

图腾柱电路

驱动MOS或者IGBT管&#xff0c;需要比较大的驱动电流或者灌电流 使用图腾柱电路或许是一个好的办法 电流路径是这样的 当CTL1端口输出为高电平的时候 三极管Q2的2脚为高&#xff0c;三极管Q2不导通 三极管Q1的2脚为高&#xff0c;三极管导通 所以Q1的3脚和1脚导通 VCC--…

Linux线程的生产者消费者模型 --- 阻塞队列(blockqueue)

文章目录 线程同步条件变量条件变量的接口 生产者消费者场景消费者和消费者的关系生产者和生产者的关系生产者和消费者的关系从何体现出效率的提高 Blockqueueblockqueue.hpp为什么条件变量的接口有锁作为参数 CP.cc生产者 -> queue -> 消费者兼生产者 -> queue ->…

【HarmonyOS】Stage模型二维码/条码生成与解析

HarmonyOS的官方API中提供了QRCode组件&#xff08;QRCode-基础组件-组件参考&#xff08;基于ArkTS的声明式开发范式&#xff09;-ArkTS API参考-HarmonyOS应用开发&#xff09;&#xff0c;这个组件有个缺点只能用于显示二维码&#xff0c;无法显示条码与解析码内容&#xff…

【已解决】Flask项目报错TypeError: tuple indices must be integers or slices, not str

文章目录 问题情境报错及分析报错代码分析 解决方案必要的解决方法可能有用的解决方法 问题情境 本解决方案适用情境&#xff1a;在本地可以正常运行的flask项目&#xff0c;放到云服务器报错TypeError: tuple indices must be integers or slices, not str&#xff0c;即代码…

《深度学习推荐系统》笔记

目录 一、推荐系统是什么1.作用和意义2.推荐系统的架构2.1 逻辑架构2.2 技术架构 二、传统的推荐系统方法1. 协同过滤算法1.1 userCF&&ItemCF1.3 矩阵分解算法 2. 逻辑回归算法3. 因子分解机3.1 POLY2模型3.2 FM模型3.3 FFM模型3.4 小结 4. 组合模型4.1 GBDTLR组合模型…

【C++/嵌入式笔试面试八股】二、24.TCP三次握手四次挥手 | TCP可靠性

TCP三次握手四次挥手 64.TCP头部中有哪些信息?❤️ TCP数据报格式(左图) UDP数据报格式也放这(右图),不具体解释了。 结合三次握手四次挥手来看 端口: 区分应用层的不同应用进程 扩展:应用程序的端口号和应用程序所在主机的 IP 地址统称为 socket(套接字),IP:端口…

Docker安装ElasticSearch/ES

目录 前言准备拉取ElasticSearch镜像安装ElasticSearch拉取elasticsearch-head镜像安装elasticsearch-head参考 前言 TencentOS Server 3.1Docker version 19.03.14, build 5eb3275d40 准备 docker 已安装。 安装 docker 参考&#xff1a;【Centos 8】【Centos 7】安装 docke…

基于STM32 ARM+FPGA伺服控制系统总体设计方案(一)

设计需求 一套完整的伺服控制方案包括了上位机、驱控一体控制器和功率板三者。操作人员 通过上位机发送各种不同指令&#xff0c;然后控制器解析指令后执行相应的伺服功能&#xff0c;其次控 制器将驱动信号传输至功率板驱动电机&#xff0c;最后控制器采集反馈信息进行闭环…

了解PostgreSQL sql shell和VACUUM命令

从SQL Shell进入PostgreSQL&#xff1b;没用过这东西&#xff0c;看一下&#xff1b; 一直回车&#xff1b;最后输入口令就登入了&#xff1b;此时是登入默认的数据库postgres&#xff1b;这个数据库是默认安装的&#xff1b; 看一下有没有表&#xff0c;根据资料可以用 \d 或…

大坝安全监测中需要做好检查监测

大坝安全监测是人们了解大坝运行状态和安全状况的有效手段和方法。它的目的主要是了解大坝安全状况及其发展态势&#xff0c;是一个包括由获取各种环境、水文、结构、安全信息到经过识别、计算、判断等步骤&#xff0c;最终给出一个大坝安全 程度的全过程。 此过程包括&#xf…

Linux中常用的监控性能的命令(sar、mpstat,vmstat, iostat,)详解

Linux中常用的监控性能的命令有&#xff1a; sar&#xff1a;能查看CPU的平均信息&#xff0c;还能查看指定CPU的信息。与mpstat相比&#xff0c;sar能查看CPU历史信息 mpstat&#xff1a;能查看所有CPU的平均信息&#xff0c;还能查看指定CPU的信息。 与sar相比&#xff0c…

九五从零开始的运维之路(其二十)

[TOC](文章目录) 文章目录 前言一、LAMP是什么二、配置环境及安装1.配置yum源2.关闭防火墙、网络图形化工具及SElinux3.安装软件包 三、配置apache服务器内容四、启动服务五、访问验证总结 前言 本篇将简述的内容&#xff1a;Linux系统下的LAMP平台部署 基于discuz框架的论坛搭…

阿里云无影云电脑价格_企业办公型1元_云桌面入口

阿里云无影云电脑配置费用&#xff0c;4核8G企业办公型云电脑可以免费使用3个月&#xff0c;无影云电脑地域不同费用不同&#xff0c;无影云电脑是由云桌面配置、云盘、互联网访问带宽、AD Connector、桌面组共用桌面session等费用组成&#xff0c;阿里云百科分享阿里云无影云电…

中文数据下载

研究AI离不开数据&#xff0c;数据库可以说是AI的半壁天下。有链接的数据库下载是很nice的。 语音数据集整理 目录 1.Mozilla Common Voice. 2 2.翻译和口语音频的大型数据库Tatoeba. 2 3.VOiCES Dataset 3 4. LibriSpeech. 4 5.2000 HUB5 English&#xff1a;... 4 6.…

如何用Three.js + Blender打造一个web 3D展览馆

作者&#xff1a;vivo 互联网前端团队- Wei Xing 运营活动新玩法层出不穷&#xff0c;web 3D炙手可热&#xff0c;本文将一步步带大家了解如何利用Three.js和Blender来打造一个沉浸式web 3D展览馆。 一、前言 3D展览馆是什么&#xff0c;先来预览下效果&#xff1a; 看起来像…

Linux离线环境Jenkins部署SpringBoot

Jenkins服务器 把Jar包上传到Linux服务器的/jenkins/目录下 Dashboard----》新建任务----》构建一个自由风格的软件项目----》test 修改jenkins工作空间 新建构建前执行命令stop.sh&#xff0c;停止SpringBoot并备份 &#xff08;这里是目标服务器&#xff0c;即部署项目的…

激斗云计算:互联网大厂打响新一轮排位战

大模型如同一辆时代列车&#xff0c;所有科技大厂都想上车。 自去年底ChatGPT一炮而红&#xff0c;国内外数十家科技大厂、创业公司、机构相继下场&#xff0c;一时间掀起大模型的热浪。 《中国人工智能大模型地图研究报告》显示&#xff0c;截至今年5月28日&#xff0c;中国…

第八章:SegNet——一个用于强大的语义像素级标注的深度卷积编码-解码架构

0.摘要 我们提出了一种新颖的深度架构SegNet&#xff0c;用于语义像素级图像标注。SegNet具有一些吸引人的特性&#xff1a; (i)它只需要对完全学习的函数进行前向评估&#xff0c;就可以获得平滑的标签预测&#xff1b; (ii)随着深度增加&#xff0c;像素标注考虑了更大的上下…