RabbitMq-接收消息+redis消费者重复接收

news/2024/4/17 2:40:48/文章来源:https://blog.csdn.net/zw899004/article/details/130381269

在接触RammitMQ时,好多文章都说在配置中设置属性 

# rabbitmq 配置
rabbitmq:host: xxx.xxx.xxx.xxxport: xxxxusername: xxxpassword: xxxxxx## 生产端配置# 开启发布确认,就是confirm模式. 消费端ack应答后,才将消息从队列中删除#确认消息已发送到队列(Queue)publisher-returns: true#确认消息已发送到交换机(Exchange)publisher-confirm-type: correlatedlistener: #消费者 端配置retry:enabled: true # 是否支持重试default-requeue-rejected: falsemax-attempts: 5 #最大重试次数initial-interval: 3000 # 重试时间间隔direct:acknowledge-mode: manualsimple:acknowledge-mode: manual

消息接收消息失败时,可以重复调用5次;按照此操作,发现没有重复调用。

----------------------------------正确思路---------------------------------------------------------------------------------

设置完配置文件属性后,在代码中利用redis与channel.basicNack联合使用,将错误记录保存至数据库,方便查找原因;

---------------------------------------代码

package com.charg.listener;import com.charg.common.constant.CacheConstants;
import com.charg.common.constant.Constants;
import com.charg.common.utils.JsonUtils;
import com.charg.common.utils.redis.RedisUtils;
import com.charg.constant.RabbitConstants;
import com.charg.product.domain.bo.ProductDeviceBo;
import com.charg.product.domain.bo.RabMsgLogBo;
import com.charg.product.service.IProductDeviceService;
import com.charg.product.service.IRabMsgLogService;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.time.Duration;/*** rabbitmq 监听*/
@Slf4j
@Component
public class RabbitQueueListener {/*** 最大重试次数*/private static int maxReconsumeCount = 3;@Autowiredprivate StringRedisTemplate redisTemplate;/*** 监听  队列的处理器** @param message*/@RabbitListener(queues = "队列名称")@RabbitHandlerpublic void onMessage(Message message, Channel channel) {//唯一标识String messageId = message.getMessageProperties().getMessageId();try {//判断messageId在redis中是否存在if (verificationMessageId(messageId)) {log.error("消息已重复处理,拒绝再次接收...");// 拒绝消息channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);} else {//不存在 则处理消息// 接收消息if (StringUtils.isNotBlank(new String(message.getBody()))) {//修改业务逻辑if (!false) {log.error("消息即将再次返回队列处理...逻辑错误");// 处理最大回调次数getMaximumNumber(message, channel);} else {channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//加入缓存addMessageId(message);}} else {log.info("消息为空拒绝接收...");// 拒绝消息channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);}}} catch (Exception e) {e.printStackTrace();try {if (message.getMessageProperties().getRedelivered()) {log.error("消息已重复处理,拒绝再次接收----...");// 拒绝消息channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);} else {log.error("消息即将再次返回队列处理...");// 处理最大回调次数getMaximumNumber(message, channel);}} catch (Exception exception) {exception.printStackTrace();}}}/*** 记录消息最大次数** @param message* @param channel* @throws IOException*/private void getMaximumNumber(Message message, Channel channel) {try {int recounsumeCounts = RedisUtils.getCacheObject("messageMaxKey"+message.getMessageProperties().getMessageId()) == null ? 0 : RedisUtils.getCacheObject("messageMaxKey"+message.getMessageProperties().getMessageId());if (maxReconsumeCount > recounsumeCounts) {log.info("maxMessageId(message.getMessageProperties().getMessageId())=" + recounsumeCounts);channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);// 记录重试次数maxMessageId(message.getMessageProperties().getMessageId());} else {log.info("次数达到三次了呢---------" + RedisUtils.getCacheObject(CacheConstants.MESSAGE_MAX_KEY + message.getMessageProperties().getMessageId()));// 将消息重新放回队列channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);// 清除缓存RedisUtils.deleteObject("messageMaxKey" + message.getMessageProperties().getMessageId());//重试三次后,还是失败 需记录到数据库addRabMsgLog(message);}} catch (Exception e) {e.printStackTrace();}}/*** 设置消息的最大重试次数*/public void maxMessageId(String messageId) {String messageMax ="messageMaxKey"+ messageId;// 存入缓存,用来记录该消息重试了几次if (RedisUtils.hasKey(messageMax)) {RedisUtils.incrAtomicValue(messageMax);} else {//错误的消息-插入数据库RedisUtils.setCacheObject(messageMax, 1, Duration.ofHours(Constants.MESSAGE_TIME));}}/*** 校验消息是否消费过该消息** @param messageId 消息id* @return*/public boolean verificationMessageId(String messageId) {// 消息是否存在keyString verifyIsExistKey ="messageExistKey" + messageId;if ((RedisUtils.hasKey(verifyIsExistKey))) {return true;}return false;}/*** 保存消费过消息** @param message 消息* @return*/public void addMessageId(Message message) {// 存入缓存RedisUtils.setCacheObject("messageExistKey" + message.getMessageProperties().getMessageId(), message.getMessageProperties().getMessageId(), 1);}/*** 消息队列 失败日志 操作* 自己存数据库逻辑*/public void addRabMsgLog(Message message) {log.info("====操作日志===");//将内容记录到数据库}}
--------------------------------数据库表

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_103738.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

排查和解决CentOS系统上Nacos服务启动报错“java.net.UnknownHostException: jmenv.tbsite.net“问题

背景 环境是CentOS7操作系统,nacos服务宕掉了,启动服务的时候报错。 Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.boot.web.servlet.FilterRegistrationBean]: Factory method ‘di…

大数据管理中心规划设计方案(ppt可编辑)

本资料来源公开网络,仅供个人学习,请勿商用,如有侵权请联系删除。 统一汇聚 推动业务数据协同5 价值提炼 支撑精准服务与科学管理6 实时感知 辅助城市治理高效运行7 大数据资源平台目标体系规划11 建设目标与思路12 使能高效协同&#xff0…

Qt+MySql开发笔记:Qt5.9.3的msvc2017x64版本编译MySql8.0.16版本驱动并Demo连接数据库测试

若该文为原创文章,转载请注明原文出处 本文章博客地址:https://hpzwl.blog.csdn.net/article/details/130381428 红胖子网络科技博文大全:开发技术集合(包含Qt实用技术、树莓派、三维、OpenCV、OpenGL、ffmpeg、OSG、单片机、软硬…

【数据结构初阶】第七节.树和二叉树的基本操作

作者简介:大家好,我是未央; 博客首页:未央.303 系列专栏:Java初阶数据结构 每日一句:人的一生,可以有所作为的时机只有一次,那就是现在!!! 文章目…

港科夜闻|香港科技大学(广州)与中国电信广东公司签署战略合作协议

关注并星标 每周阅读港科夜闻 建立新视野 开启新思维 1、香港科技大学(广州)与中国电信广东公司签署战略合作协议。根据协议,双方将围绕各自科技创新及发展需要,整合双方的优质资源和优势能力,务实开展多方位的战略合作。本次合作&#xff0c…

6.微服务项目实战---Sleuth--链路追踪

6.1 链路追踪介绍 在大型系统的微服务化构建中,一个系统被拆分成了许多模块。这些模块负责不同的功能,组合成 系统,最终可以提供丰富的功能。在这种架构中,一次请求往往需要涉及到多个服务。互联网应用构建在不同的软件模块集上…

【算法与数据结构】6 学会对算法进行性能测试

欢迎来到爱书不爱输的程序猿的博客, 本博客致力于知识分享,与更多的人进行学习交流 本文收录于算法与数据结构体系专栏,本专栏对于0基础者极为友好,欢迎与我一起完成算法与数据结构的从0到1的跨越 算法性能测试 一、前情回顾二、算法性能测试1.生成测试用例2.使用测…

多种内网穿透的实现方案

1. 内网穿透的应用场景 1.1. 开发调试 比如企业微信、钉钉等开发,需要一个回调地址,开发的时候,希望回调到开发的电脑上,打断点进行调试,这就需要穿透到内网的开发机器。 1.2. 演示测试 有需要演示或测试的系统&am…

SLAM论文速递:SLAM—— MID-Fusion:基于八叉树的对象级多实例动态SLAM—4.26(1)

论文信息 题目: MID-Fusion:Octree-based Object-Level Multi-Instance Dynamic SLAMMID-Fusion:基于八叉树的对象级多实例动态SLAM 论文地址: https://ieeexplore.ieee.org/abstract/document/8794371发表期刊: 2019 International Conf…

凌恩生物文献分享|一株细菌完成图也能发一区10分+!

期刊:Science of the Total Environment 影响因子:10.753 发表时间:2022 样本类型:Bosea sp. Ads-6菌株 客户单位:中国科学院微生物研究所 一、研究背景 环境中抗生素残留和耐药性的增加引发了许多…

亚马逊美国站纽扣电池标准

近日,亚马逊美国站公布要求卖家需遵守扭电池和硬币电池的新包装和警示标签规定公告。 在亚马逊销售单独的纽扣电池和硬币电池,则从2023年3月2日开始,您需要证明您的符合儿童安全包装和警告标签要求。 适用产品有;单独的纽扣电池或硬币电池&a…

Android SeekBar控制视频播放进度(二)——seekTo()不准确

Android SeekBar控制视频播放进度二——seekTo不准确 简介seekTo()视频帧 和 视频关键帧解决办法方法一方法二 简介 上一篇文章中,我们介绍了使用SeekBar控制视频播放,使用过程中发现,对于一些视频,我们拖动SeekBar进度条调节播放…

谈谈如何用开源网关进行 API 管理

需求痛点 1.企业不清楚到底有多少个API,无法形成API资产管理等问题。 2.API在不同集群的生命周期问题。 3.API运行状态监控和告警问题。 4.API请求限流、流量控制以及安全等问题。 功能介绍 Apinto的API管理提供API生命周期控制:可管理所有API&…

【DRF配置管理】如何在视图类使用get_objects()

原文作者:我辈李想 版权声明:文章原创,转载时请务必加上原文超链接、作者信息和本声明。 DRF应用和管理 【DRF配置管理】Django使用DRF框架 【DRF配置管理】如何在视图类配置参数(一) 【DRF配置管理】如何在视图类配置参数(二) 【DRF配置管理…

跨数据中心下的 Kafka 高可用架构分析

导语 本文介绍了 Kafka 跨数据中心的两种部署方式,简要分析两种方式下的不同架构以及优缺点,对这些架构可能碰到的问题也提供了一些解决思路;同时也说明了 Kafka 跨数据中心部署的社区解决方案和商业化解决方案。 背景 Kafka 作为世界上最…

Excel技能之实用技巧,高手私藏

今天来讲一下Excel技巧,工作常用,高手私藏。能帮到你是我最大的荣幸。 与其加班熬夜赶进度,不如下班学习提效率。能力有成长,效率提上去,自然不用加班。 消化吸收,工作中立马使用,感觉真不错。…

宠物领养系统【GUI/Swing+MySQL】(Java课设)

系统类型 Swing窗口类型Mysql数据库存储数据 使用范围 适合作为Java课设!!! 部署环境 jdk1.8Mysql8.0Idea或eclipsejdbc 运行效果 本系统源码地址:https://download.csdn.net/download/qq_50954361/87708775 更多系统资源库…

「OceanBase 4.1 体验」|大厂开始接入的国产分布式数据库,不来了解了解?

OceanBase 4.1 体验 前言OCP Express在线升级功能租户级物理备库TP(事务处理)和AP(分析处理)优化TP 性能优化AP 性能优化 结尾 前言 上次我们讲了本人自己亲自上手OceanBase 4.1的初体验,国产的分布式数据库也太太太太…

【STM32】基础知识 第八课 MDK 工程

【STM32】基础知识 第八课 MDK 工程 准备工作新建寄存器版本 MDK 工程步骤新建工程文件夹添加文件魔术棒设置绝对路径和相对路径对比测试程序 新建 HAL 库版本 MDK 工程CMSISHAL 库简介DriversMiddlewaresDevice 和 Include HAL 库文件介绍HAL 库 API 函数和比那辆命名规则HAL …

【ArcGIS】常见问题总结

1 arcgis如何打开*.adf文件 在处理数据时发现,获取到的土地利用类型数据有两个文件夹,一个叫info,另一个叫lucc2010(年份),打开lucc2010里面是一系列的*.adf文件,数据应该如何打开呢&#xff1…