springboot集成kafka消费数据

news/2024/2/25 12:17:30/文章来源:https://blog.csdn.net/weixin_47874230/article/details/135589570

springboot集成kafka消费数据

文章目录

  • springboot集成kafka消费数据
  • 1.引入pom依赖
  • 2.添加配置文件
    • 2.1.添加KafkaConsumerConfig.java
    • 2.2.添加KafkaIotCustomProperties.java
    • 2.3.添加application.yml配置
  • 3.消费者代码

1.引入pom依赖

        <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.8.11</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.1.2</version></dependency>

2.添加配置文件

2.1.添加KafkaConsumerConfig.java

@Configuration
@EnableConfigurationProperties(KafkaIotCustomProperties.class)
@Slf4j
public class KafkaConsumerConfig {@AutowiredKafkaIotCustomProperties kafkaIotCustomProperties;@BeanKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());// 并发数 多个微服务实例会均分factory.setConcurrency(3);factory.setBatchListener(true);ContainerProperties containerProperties = factory.getContainerProperties();// 是否设置手动提交containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}private ConsumerFactory<String, String> consumerFactory() {Map<String, Object> consumerConfigs = consumerConfigs();log.info("消费者的配置信息:{}",JSONObject.toJSONString(consumerConfigs));return new DefaultKafkaConsumerFactory<>(consumerConfigs);}@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> propsMap = new HashMap<>();// 服务器地址propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaIotCustomProperties.getBootstrapServers());// 是否自动提交propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaIotCustomProperties.isEnableAutoCommit());// 自动提交间隔propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaIotCustomProperties.getAutoCommitInterval());//会话时间propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaIotCustomProperties.getSessionTimeOut());//key序列化propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaIotCustomProperties.getKeyDeserializer());//value序列化propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaIotCustomProperties.getValueDeserializer());// 心跳时间propsMap.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, kafkaIotCustomProperties.getHeartbeatInterval());// 分组idpropsMap.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaIotCustomProperties.getGroupId());//消费策略propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaIotCustomProperties.getAutoOffsetReset());// poll记录数propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaIotCustomProperties.getMaxPollRecords());//poll时间propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaIotCustomProperties.getMaxPollInterval());return propsMap;}}

2.2.添加KafkaIotCustomProperties.java

@Component
@ConfigurationProperties(prefix = "fxyh.realdata.kafka")
@Data
public class KafkaIotCustomProperties {private List<String> topics;private String groupId;private String sessionTimeOut;private String bootstrapServers;private String autoOffsetReset;private boolean enableAutoCommit;private String autoCommitInterval;private String fetchMinSize;private String fetchMaxWait;private String maxPollRecords;private String maxPollInterval;private String heartbeatInterval;private String keyDeserializer;private String valueDeserializer;
}

2.3.添加application.yml配置

fxyh:realdata:kafka:bootstrapServers:  192.168.80.251:9092topics: ["test1","test2"]groupId: shengtingrealdatagroup#后台的心跳线程必须在30秒之内提交心跳,否则会reBalancesessionTimeOut: 30000#      autoOffsetReset: earliest#取消自动提交,即便如此 spring会帮助我们自动提交enableAutoCommit: false#自动提交间隔autoCommitInterval: 1000#拉取的最小字节fetchMinSize: 1#拉去最小字节的最大等待时间fetchMaxWait: 500maxPollRecords: 50#300秒的提交间隔,如果程序大于300秒提交,会报错maxPollInterval: 300000#心跳间隔heartbeatInterval: 10000keyDeserializer: org.apache.kafka.common.serialization.StringDeserializervalueDeserializer: org.apache.kafka.common.serialization.StringDeserializerauto-offset-reset: latest

3.消费者代码


@Slf4j
@Component
public class DeviceDataConsumer {@Autowiredprivate KafkaIotCustomProperties kafkaIotCustomProperties;@KafkaListener(topics = {"#{@kafkaIotCustomProperties.topics}"}, groupId = "#{@kafkaIotCustomProperties.groupId}", containerFactory = "kafkaListenerContainerFactory",properties = {"#{@kafkaIotCustomProperties.autoOffsetReset}"})public void topicTest(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {for (ConsumerRecord<String, String> record : records) {log.info("topic_test 消费了: Topic:" + record.topic() + ",groupId:" + kafkaIotCustomProperties.getGroupId() + ",Message:" + record.value());//手动提交偏移量ack.acknowledge();}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_925643.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

不同打包工具下的环境变量配置方式对比

本文作者为 360 奇舞团前端开发工程师 天明 前言 在现代的JavaScript应用程序开发中&#xff0c;环境变量的配置是至关重要的。不同的应用场景和部署环境可能需要不同的配置&#xff0c;例如开发、测试和生产环境。最常见的需求是根据不同的环境&#xff0c;配置如是否开启sour…

excel统计分析——Sidak、Bonferroni法多重比较

参考资料&#xff1a;生物统计学 Sidak法和Bonferroni法针对LSD法犯第Ⅰ类错误风险较大的问题进行了改进&#xff0c;通过根据平均数个数k&#xff0c;减小显著水平α的值来增大t值&#xff0c;从而增大差数显著显著性。 Sidak法的显著水平调整公式为&#xff1a; Bonferroni法…

多输入多输出 | Matlab实现基于LightGBM多输入多输出预测

多输入多输出 | Matlab实现基于LightGBM多输入多输出预测 目录 多输入多输出 | Matlab实现基于LightGBM多输入多输出预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 Matlab实现基于LightGBM多输入多输出预测&#xff08;完整源码和数据&#xff09; 1.data为数据集&a…

【目标检测实验系列】YOLOv5模型改进:融入坐标注意力机制CA,多维度关注数据特征,高效涨点!(内含源代码,超详细改进代码流程)

自我介绍&#xff1a;本人硕士期间全程放养&#xff0c;目前成果:一篇北大核心CSCD录用,两篇中科院三区已见刊&#xff0c;一篇中科院四区在投。如何找创新点&#xff0c;如何放养过程厚积薄发&#xff0c;如何写中英论文&#xff0c;找期刊等等。本人后续会以自己实战经验详细…

【2023 我的编程之旅】

前言 转眼 2024 年都过去 14 天了。回顾 2023 有太多技术上的思考以及人生的感悟&#xff0c;接下来趁着 CSDN 官方活动&#xff0c;顺便记录下来。 技术的价值 与现在的年轻人一心只想搞钱不同&#xff0c;刚毕业的时候&#xff0c;我的梦想是进入一家有实力的科技企业&…

机器学习---xgboost算法

1. xgboost算法原理 XGBoost&#xff08;Extreme Gradient Boosting&#xff09;全名叫极端梯度提升树&#xff0c;XGBoost是集成学习方法的王 牌&#xff0c;在Kaggle数据挖掘比赛中&#xff0c;大部分获胜者用了XGBoost。 XGBoost在绝大多数的回归和分类 问题上表现的十分…

STM32——ADC知识总结及多通道采样实验

1.ADC概念 ADC&#xff0c;全称&#xff1a;Analog-to-Digital Converter&#xff0c;指模拟/数字转换器 2 STM32各系列ADC的主要特性 3.F4框图 4.转换序列与转换时间 A/D转换被组织为两组&#xff1a;规则组&#xff08;常规转换组&#xff09;和注入组&#xff08;注入…

【征服redis1】基础数据类型详解和应用案例

博客计划 &#xff0c;我们从redis开始&#xff0c;主要是因为这一块内容的重要性不亚于数据库&#xff0c;但是很多人往往对redis的问题感到陌生&#xff0c;所以我们先来研究一下。 本篇&#xff0c;我们先看一下redis的基础数据类型详解和应用案例。 1.redis概述 以mysql为…

使用composer生成的DMG和PKG格式软件包有何区别

在使用Composer从包源构建软件包时候&#xff0c;有两种不同类型的包&#xff1a;PKG和DMG。你知道两者之间的区别吗? 以及如何选取吗&#xff1f; 每种格式都有各自的优势具体取决于软件包的预期用途以及用于部署软件包的工具。下面我们来了解一下PKG和DMG格式的区别和用途。…

科研绘图(八)线性热图

线性热图&#xff08;Linear Heat Map&#xff09;是一种数据可视化技术&#xff0c;用于展示数值在一维线性空间上的分布情况。它通常用于展示沿着一条线&#xff08;例如时间线或任何一维序列&#xff09;的数据密度或强度变化。线性热图与传统的二维热图不同&#xff0c;后者…

InternLM第5次课笔记

LMDeploy 大模型量化部署实践 1 大模型部署背景 2 LMDeploy简介 3 动手实践环节 https://github.com/InternLM/tutorial/blob/main/lmdeploy/lmdeploy.md 3

Spring Security-查询数据库认证

查询数据库认证权限(未自定义页面) 整合mybatis-plus 完成数据库操作 1.引入相关依赖 再父工程中 增加 mybatis-plus lombok mysql 相关依赖及版本号 <dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</grou…

AIGC ChatGPT 4 Prompt 万能提示词公式

最近大家都在使用ChatGPT来帮助自己完成相应的工作。很多时候大家提出的问题得不到很清晰,很明确的答案。 我们应该怎么样来和ChatGPT进行有效的沟通呢? 例如我们先来问一问ChatGPT: 要获得最准确的回复,请确保遵循以下建议: 明确性:请尽量明确描述您的问题。确保提供足…

AI大模型预先学习笔记一:transformer和fine tune技术介绍

一、商业观点&#xff1a;企业借助大模型获得业务增长可能 二、底层原理&#xff1a;transformer 1&#xff09;备注 ①下面每个步骤都是自回归的过程&#xff08;aotu-regressive&#xff09;&#xff1a;已输出内容的每个字作为输入&#xff0c;一起生成下一个字 ②合起来就…

Android中的SPI实现

Android中的SPI实现 SPI是JVM世界中的标准API&#xff0c;但在Android应用程序中并不常用。然而&#xff0c;它可以非常有用地实现插件架构。让我们探讨一下如何在Android中利用SPI。 问题 在Android中&#xff0c;不同的提供者为推送功能提供服务&#xff0c;而在大型项目中…

使用micro-app将现有项目改造成微前端,对现有项目实现增量升级

使用micro-app将现有项目改造成微前端&#xff0c;对现有项目实现增量升级 基座应用 1、安装依赖 npm i micro-zoe/micro-app --save2、在入口引入 //main.js import microApp from micro-zoe/micro-appnew Vue({ }) //在new Vue 下面执行 microApp.start()3、新增一个vue页…

Nacos和Eureka比较、统一配置管理、Nacos热更新、多环境配置共享、Nacos集群搭建步骤

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、Nacos和eureka的对比二、统一配置管理二、Nacos热更新方式一方式二 三、多环境配置共享四、Nacos集群搭建步骤&#xff08;黑马springCloud的p29&#xff0…

SpringCloud 源码系列之全局 Fegin 日志收集(okHttpClient、httpClient)

SpringCloud 源码系列之全局 Fegin 日志收集&#xff08;okHttpClient、httpClient&#xff09;目录 HttpClient 全局日志收集思路切换成HttpClient验证配置效果HttpClient 全局日志收集源码分析看源码顺带产物okHttpClient 全局日志收集总结 接上文SpringCloud OpenFegin 底层…

Python爬虫---scrapy shell 调试

Scrapy shell是Scrapy提供的一个交互式shell工具&#xff0c;它可以帮助我们进行爬虫的开发和调试。可以使用它来测试xpath或css表达式&#xff0c;查看它们是如何工作的&#xff0c;以及它们从你试图抓取的网页中提取的数据。它允许你在编写spider时交互地测试表达式&#xff…

【QT】自定义对话框及其调用

目录 1 对话框的不同调用方式 2 对话框QWDialogSize的创建和使用 3 对话框QWDialogHeaders的创建和使用 4 对话框QWDialogLocate的创建与使用 5 利用信号与槽实现交互操作 1 对话框的不同调用方式 在一个应用程序设计中&#xff0c;为了实现一些特定的功能&#xff0c;必须设计…