【SpringBoot框架篇】35.kafka环境搭建和收发消息

news/2024/2/29 17:06:36/文章来源:https://blog.csdn.net/ming19951224/article/details/135634610

kafka环境搭建

kafka依赖java环境,如果没有则需要安装jdk

yum install java-1.8.0-openjdk* -y

1.下载安装kafka

kafka3.0版本后默认自带了zookeeper,3.0之前的版本需要单独再安装zookeeper,我使用的最新的3.6.1版本。

cd /usr/local
wget https://dlcdn.apache.org/kafka/3.6.1/kafka_2.12-3.6.1.tgz
tar -zxvf  kafka_2.12-3.6.1.tgz
cd kafka_2.12-3.6.1.tgz

2.启动zookeeper

cd到kafka的安装根目录后,执行下面命令指令zookeeper.properties文件路径启动zookeeper,默认启动的zk服务使用内存是512m,可以修改zookeeper-server-start.sh脚本中参数调大使用堆内存

bin/zookeeper-server-start.sh config/zookeeper.properties

也可以通过指定-daemon以守护进程方式启动zookeeper,如果不指定关闭终端时zookeeper服务则会被杀死

bin/zookeeper-server-start.sh -daemon  config/zookeeper.properties
#通过tail命令查看zookeeper实时日志
tail -f logs/zookeeper.out

启动完看到下面的日志表示启动成功了
在这里插入图片描述
停止zookeeper服务

bin/zookeeper-server-stop.sh

zk默认的端口是2181,可以修改zookeeper.properties里的clientPort字段改变zk监控的端口
在这里插入图片描述

可以再开一个终端启动zk客户端测试连接

bin/zookeeper-shell.sh 127.0.0.1:2181

执行ls查看根目录下的文件信息,默认只有zookeeper目录,由于我之前启动过kafka,所以这里会有kafka-server注册到zk中元数据信息

ls /

在这里插入图片描述

3.配置kafka

修改配置kafka配置文件,方便后面在idea中访问

vi config/server.properties

如果kafka需要被外部机器访问需要配置listeners和advertised.listeners字段,下图圈中的是我虚拟机的访问ip,如果不配置的话在笔记本上idea中访问会报错。
在这里插入图片描述
如果kafka和zookeeper不在同一台机器上面,需要修改zookeeper.connect字段
在这里插入图片描述

4.启动kafka

下面指定了kafka配置文件路径的方式启动kafka

bin/kafka-server-start.sh  config/server.properties

也可以通过指定-daemon以守护进程方式启动kafka,如果不指定关闭终端时kafka服务则会被杀死

bin/kafka-server-start.sh -daemon  config/server.properties
#指定了-daemon参数可以通过tail命令查看kafka实时日志
tail -f logs/server.log

看到下面的日志表示kafka启动成功
在这里插入图片描述
通过jps命令可以看到kafka和zookeeper两个java进程
在这里插入图片描述

停止kafka服务

bin/kafka-server-stop.sh

5.创建主题

通过kafka-topics.sh脚本可以对主题操作,由于我修改过server.properties监听地址为服务器的ip,所以不能使用localhost访问,只能用服务器ip访问

#bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic kafkatest
bin/kafka-topics.sh --create --bootstrap-server 192.168.1.7:9092 --replication-factor 1 --partitions 1 --topic kafkatest
  • –bootstrap-server 指定kafka的server地址
  • –replicator-factor 指定了副本因子(即副本数量); 表示该topic需要在不同的broker中保存几份,这里设置成1,表示在两个broker中保存两份Partitions分区数。
  • –partitions 指定分区个数
  • –topic 指定所要创建主题的名称,比如kafkatest

查看所有kafka主题信息

bin/kafka-topics.sh --list --bootstrap-server 192.168.1.7:9092

可以看到下面有刚刚创建的kafkatest主题
在这里插入图片描述
查看主题的详细信息

bin/kafka-topics.sh --describe --bootstrap-server 192.168.1.7:9092 --topic kafkatest

在这里插入图片描述

6.生产者发送消息

执行kafka-console-producer.sh命令给主题名称为kafkatest主题的发送消息

bin/kafka-console-producer.sh --broker-list 192.168.1.7:9092 --topic kafkatest

输入消息后按回车键就会发送消息
在这里插入图片描述

7.消费者消费消息

bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.7:9092    --topic=kafkatest --from-beginning
  • –from-beginning 参数从主题头开始消费消息,不指令只会消费实时消息

可以看到下图有刚才生产者发送的三条消息
在这里插入图片描述

默认所有kafka消费者都会消费kafka生产者发送到主题的消息(有兴趣的可以再开一个终端启动kafka消费者,然后再用生产者发送消息,可以看到消息被两个消费者消费了,效果如下图)
在这里插入图片描述

可以指定kafka消费者的组Id让在同一组的客户端只有一个实例能消费消息。

bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.7:9092 --topic kafkatest  -consumer-property group.id=testGroup --consumer-property client.id=consumer-1
  • group.id 用于指定消费者分组
  • client.id 用于指定消费者在组中的客户端Id

再另外一个终端启动上面的命令,需要把client.id改成consumer-2

然后再用生产者发送消息,可以看到下图只有一个消费者在消费消息
在这里插入图片描述

在SpringBoot中使用

1.引入依赖

     <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency>

2.application.yml

server:port: 8035spring:kafka:bootstrap-servers: 192.168.1.7:9092 #kafka server的地址producer:batch-size: 16384 #批量大小acks: -1 #应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)retries: 10 # 消息发送重试次数buffer-memory: 33554432key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerproperties:linger:ms: 2000 #提交延迟consumer:group-id: testGroup #默认的消费组IDenable-auto-commit: true #是否自动提交offsetauto-commit-interval: 2000 #提交offset延时auto-offset-reset: latestmax-poll-records: 100 #单次拉取消息的最大条数key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproperties:session:timeout:ms: 10000 # 消费会话超时时间(超过这个时间 consumer 没有发送心跳,就会触发 rebalance 操作)request:timeout:ms: 30000 # 消费请求的超时时间listener:#type: batch #设置批量消费,注释掉则是单次消费missing-topics-fatal: false # consumer listener topics 不存在时,启动项目就会报错concurrency: 10 # 默认消费者线程数 也可以在@KafkaListener注解内配置concurrency字段值

3.创建主题

有两种创建主题的方式
通过TopicBuilder+ @Bean自动创建主题

@Configuration
public class KafkaConfig {public static final String DEFALUT_TOPIC = "autoTopic";@Beanpublic NewTopic newTopic() {//如果存在则不会创建, 参数:主题名称、分区数、副本数return TopicBuilder.name(DEFALUT_TOPIC ).partitions(1).replicas(1).build();}}

通过AdminClient 手动创建主题

@Configuration
public class KafkaConfig {@Beanpublic AdminClient adminClient(@Value("${spring.kafka.bootstrap-servers}") String bootstrapServers) {Properties prop = new Properties();prop.put("bootstrap.servers",bootstrapServers);return AdminClient.create(prop);}
}    

web接口,下面定义了两个接口分别用于创建主题和查看所有主题

@Slf4j
@RestController
public class KafkaAdminController {@Resourceprivate AdminClient adminClient;/*** 创建主题*/@GetMapping("/create/{topicName}")public String createTopic(@PathVariable String topicName) throws Exception {//需要判主题是否已存在,已存在再创建会报错if (getTopicSet().contains(topicName)) {return "topicExists ";}// 创建主题  参数:主题名称、分区数、副本数CreateTopicsResult result = adminClient.createTopics(Collections.singletonList(new NewTopic(topicName, 1, (short) 1)));result.all().get();return "success";}/*** 查看所有主题*/@GetMapping("/listTopic")public String listTopic() throws Exception {Set<String> set = getTopicSet();return String.format("topics[%s]", getTopicSet().stream().collect(Collectors.joining(",")));}public Set<String> getTopicSet() throws Exception {ListTopicsResult listTopicsResult = adminClient.listTopics();KafkaFuture<Set<String>> future = listTopicsResult.names();return future.get();}}

启动项目后调用创建主题接口创建名称为newTopic的主题

用浏览器访问http://localhost:8035/create/newTopic 两次可以看到返回了主题已存在的错误信息
在这里插入图片描述
用浏览器访问http://localhost:8035/listTopic查看所有主题,可以看到通过TopicBuilder和AdminClient创建的主题都存在。其它的是之前测试造出来的脏数据
在这里插入图片描述

4.发送消息

4.1.正常消息

@RestController
public class KafkaProducerController {@Resourceprivate KafkaTemplate<String, Object> kafkaTemplate;/*** 正常消息发送*/@GetMapping("/send/{msg}")public String sendMessage(@PathVariable String msg) {log.info("sendMsg=" + msg);kafkaTemplate.send(KafkaConfig.DEFALUT_TOPIC, msg);return "success";}

4.2.带回调函数的消息

   /*** 带回调的消息发送*/@GetMapping("/sendCallback/{msg}")public String sendCallbackMessage(@PathVariable String msg) {kafkaTemplate.send(KafkaConfig.DEFALUT_TOPIC, msg).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {@Overridepublic void onFailure(Throwable throwable) {log.error("send msg to kafka error:{}", throwable.getMessage());}@Overridepublic void onSuccess(SendResult<String, Object> result) {log.info("send msg to kafka success topic={},partition={},msg={}", result.getRecordMetadata().topic(), result.getRecordMetadata().partition(),result.getProducerRecord().value());}});return "success";}

4.3.全局监听回调函数配置

4.2.使用的ListenableFutureCallback下文使用的ProducerListener两种监听的回调函数都会执行

@Slf4j
@Configuration
public class KafkaConfig {@ResourceProducerFactory producerFactory;@Beanpublic KafkaTemplate<String, Object> kafkaTemplate() {KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<String, Object>(producerFactory);kafkaTemplate.setProducerListener(new ProducerListener<String, Object>() {@Overridepublic void onSuccess(ProducerRecord<String, Object> producerRecord, RecordMetadata recordMetadata) {log.info("send susscess , data= {}", producerRecord.toString());}@Overridepublic void onError(ProducerRecord<String, Object> producerRecord, Exception exception) {//当消息发送失败可以拿到消息存在缓存或数据中 定时重试发送log.error("send fail , data{}", producerRecord.toString());}});return kafkaTemplate;}
}    

分布用浏览器访问下面两个地址发送消息
http://localhost:8035/send/testmsg1
http://localhost:8035/sendCallback/testmsg2
在这里插入图片描述
由上图可以看到sendCallback接口两个监听器的回调函数都执行了。

5.消费消息

5.1.单次消费

通过@KafkaListener配置消费者信息

  • topics 订阅的主题,可以是多个
  • concurrency 线程数,如果不配置,则会使用用配置文件中的全局参数spring.kafka.listener.concurrency字段值,都不配置默认是单线程
@Slf4j
@Component
public class KafkaConsumer {/*** 监听消息*/@KafkaListener(topics = {KafkaConfig.DEFALUT_TOPIC}, concurrency = "5")public void onMessage(ConsumerRecord<String, Object> record) {log.info("onMessage msg={}",record.value());}}

5.2.批量消费消息

需要注释掉5.1的单次消息的代码,要不然会报错

批量消费需要在配置文件设置spring.kafka.listener.type=batch,可以通过max-poll-records指定最大条数

spring:kafka:consumer:max-poll-records: 100 #单次拉取消息的最大条数listener:type: batch #设置批量消费,注释掉则是单次消费
   /*** 同一主题批量消费groupId不能和单次消费的一样*/@KafkaListener(topics = {KafkaConfig.DEFALUT_TOPIC}, errorHandler = KafkaConstant.CONSUMER_ERROR_HANDLER_NAME,groupId = "batchGroup")public void onBatchMessage(List<ConsumerRecord<String, Object>> records) throws Exception {log.info("batch size={}", records.size());for (ConsumerRecord<String, Object> record : records) {log.info("onBatchMessage   msg={}", record.value());}}

用生产者发送多条消息,由下图可以看到消费者同时消费了6条消息
在这里插入图片描述

5.3.配置消费异常监听

@Slf4j
@Configuration
public class KafkaConfig {public static final String CONSUMER_LISTENER_ERROR_HANDLER_NAME ="consumerAwareListenerErrorHandler";@Bean(CONSUMER_LISTENER_ERROR_HANDLER_NAME)public ConsumerAwareListenerErrorHandler consumerAwareListenerErrorHandler() {return new ConsumerAwareListenerErrorHandler() {@Overridepublic Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {log.error("consumer fail:{}" ,exception.getMessage());return null;}};}}

在@KafkaListener注解里通过errorHandler字段指定消费异常监听器的Bean名称

      @KafkaListener(topics = {KafkaConfig.DEFALUT_TOPIC}, errorHandler = KafkaConfig.CONSUMER_LISTENER_ERROR_HANDLER_NAME,groupId = "batchGroup")public void onBatchMessage(List<ConsumerRecord<String, Object>> records) throws Exception {log.info("batch size={}", records.size());for (ConsumerRecord<String, Object> record : records) {log.info("onBatchMessage msg={}", record.value());}//模拟异常throw  new Exception("test errorHandler");}

使用生产者发送消息,可以看到控制台打印了消费异常监听器里的日志
在这里插入图片描述

项目配套代码

gitee代码地址

创作不易,要是觉得我写的对你有点帮助的话,麻烦在gitee上帮我点下 Star

【SpringBoot框架篇】其它文章如下,后续会继续更新。

  • 1.搭建第一个springboot项目
  • 2.Thymeleaf模板引擎实战
  • 3.优化代码,让代码更简洁高效
  • 4.集成jta-atomikos实现分布式事务
  • 5.分布式锁的实现方式
  • 6.docker部署,并挂载配置文件到宿主机上面
  • 7.项目发布到生产环境
  • 8.搭建自己的spring-boot-starter
  • 9.dubbo入门实战
  • 10.API接口限流实战
  • 11.Spring Data Jpa实战
  • 12.使用druid的monitor工具查看sql执行性能
  • 13.使用springboot admin对springboot应用进行监控
  • 14.mybatis-plus实战
  • 15.使用shiro对web应用进行权限认证
  • 16.security整合jwt实现对前后端分离的项目进行权限认证
  • 17.使用swagger2生成RESTful风格的接口文档
  • 18.使用Netty加websocket实现在线聊天功能
  • 19.使用spring-session加redis来实现session共享
  • 20.自定义@Configuration配置类启用开关
  • 21.对springboot框架编译后的jar文件瘦身
  • 22.集成RocketMQ实现消息发布和订阅
  • 23.集成smart-doc插件零侵入自动生成RESTful格式API文档
  • 24.集成FastDFS实现文件的分布式存储
  • 25.集成Minio实现文件的私有化对象存储
  • 26.集成spring-boot-starter-validation对接口参数校验
  • 27.集成mail实现邮件推送带网页样式的消息
  • 28.使用JdbcTemplate操作数据库
  • 29.Jpa+vue实现单模型的低代码平台
  • 30.使用sharding-jdbc实现读写分离和分库分表
  • 31.基于分布式锁或xxx-job实现分布式任务调度
  • 32.基于注解+redis实现表单防重复提交
  • 33.优雅集成i18n实现国际化信息返回
  • 34.使用Spring Retry完成任务的重试
  • 35.kafka环境搭建和收发消息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_926183.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Redis主从架构、哨兵集群原理实战

1.主从架构简介 背景 单机部署简单&#xff0c;但是可靠性低&#xff0c;且不能很好利用CPU多核处理能力生产环境必须要保证高可用&#xff0c;一般不可能单机部署读写分离是可用性要求不高、性能要求较高、数据规模小的情况 目标 读写分离&#xff0c;扩展主节点的读能力&…

canvas绘制美队盾牌

查看专栏目录 canvas示例教程100专栏&#xff0c;提供canvas的基础知识&#xff0c;高级动画&#xff0c;相关应用扩展等信息。canvas作为html的一部分&#xff0c;是图像图标地图可视化的一个重要的基础&#xff0c;学好了canvas&#xff0c;在其他的一些应用上将会起到非常重…

STC8H8K蓝牙智能巡线小车——1. 环境搭建(基于RTX51操作系统)

1. 基本介绍 开发环境准备&#xff1a;Keil uVision5 烧录软件&#xff1a;STC-ISP&#xff08;V6.92A&#xff09; 芯片&#xff1a; STC8H8K64U-45I-LQFP64 芯片引脚&#xff1a; 2.创建项目 打开Keil&#xff0c;点击【Project】&#xff0c;选择【new uVersion proje…

快乐学Python,如何使用爬虫从网页中提取感兴趣的内容?

前面的内容&#xff0c;我们了解了使用urllib3和selenium来下载网页&#xff0c;但下载下来的是整个网页的内容&#xff0c;那我们又怎么从下载下来的网页中提取我们自己感兴趣的内容呢&#xff1f;这里就需要Python的另一个库来实现-BeautifulSoup。 BeautifulSoup 是一个 Py…

数据仓库(2)-认识数仓

1、数据仓库是什么 数据仓库 &#xff0c;由数据仓库之父比尔恩门&#xff08;Bill Inmon&#xff09;于1990年提出&#xff0c;主要功能仍是将组织透过资讯系统之联机事务处理(OLTP)经年累月所累积的大量资料&#xff0c;透过数据仓库理论所特有的资料储存架构&#xff0c;做…

可以在微信群里使用midjourney,gpt4,gemini,文心一言4.0,且免费

免费使用gpt4和midjourney 免费使用 参考链接&#xff1a; https://chat.xutongbao.top/

【银行测试】银行项目,信用卡业务测试+常问面试(三)

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 银行测试-信用卡业…

轻松识别Midjourney等AI生成图片,开源GenImage

AIGC时代&#xff0c;人人都可以使用Midjourney、Stable Diffusion等AI产品生成高质量图片&#xff0c;其逼真程度肉眼难以区分真假。这种虚假照片有时会对社会产生不良影响&#xff0c;例如&#xff0c;生成公众人物不雅图片用于散播谣言&#xff1b;合成虚假图片用于金融欺诈…

Angular系列教程之DOM操作

文章目录 引言1. ElementRef2. Renderer23. ViewChild结论 引言 在Angular中&#xff0c;DOM操作是开发Web应用程序的一个重要方面。通过对DOM进行操作&#xff0c;我们可以动态地修改页面内容、样式和元素行为。本文将详细介绍如何在Angular中进行DOM操作&#xff0c;并提供相…

从数据可视化到场景渲染:山海鲸的创新与实践

作为山海鲸的开发者&#xff0c;我们深知可视化模型场景渲染在数据分析和决策支持中的重要作用。因此在保证山海鲸可视化软件免费编辑、分享、部署的同时也在场景渲染方面不断优化&#xff0c;本文将介绍山海鲸在可视化模型场景渲染方面的技术革新与实践探索。 首先&#xff0…

【STM32】STM32学习笔记-USART串口数据包(28)

00. 目录 文章目录 00. 目录01. 串口简介02. HEX数据包03. 文本数据包04. HEX数据包接收05. 文本数据包接收06. 预留07. 附录 01. 串口简介 串口通讯(Serial Communication)是一种设备间非常常用的串行通讯方式&#xff0c;因为它简单便捷&#xff0c;因此大部分电子设备都支持…

Go并发快速入门:Goroutine

Go并发&#xff1a;Goroutine 1.并发基础概念&#xff1a;进程、线程、协程 (1) 进程 可以比作食材加工的一系列动作 进程就是程序在操作系统中的一次执行过程&#xff0c;是由系统进行资源分配和调度的基本单位&#xff0c;进程是一个动态概念&#xff0c;是程序在执行过程…

unity面试题

一&#xff1a;什么是协同程序&#xff1f; 在主线程运行的同时开启另一段逻辑处理&#xff0c;来协助当前程序的执行&#xff0c;协程很像多线程&#xff0c;但是不是多线程&#xff0c;Unity的协程实在每帧结束之后去检测yield的条件是否满足。 二&#xff1a;Unity3d中的碰…

汽车线束的汽配企业MES管理系统解决方案

随着科技的飞速发展和环保需求的日益提升&#xff0c;新能源汽车在全球范围内崭露头角&#xff0c;成为未来出行的主导力量。在这股浪潮中&#xff0c;中国凭借其强大的研发实力和市场敏锐度&#xff0c;迅速崛起为新能源汽车领域的佼佼者。而作为汽车数字化控制与智能化应用的…

Excel地址

解题思路&#xff1a; 根据题中歪歪和笨笨的话可以有两种解法。 1.输入的数为多大&#xff0c;则循环1多少次&#xff0c;当值为27时就要进行进位操作。这时要分情况讨论。 当集合中元素为一个时&#xff0c;如26&#xff0c;则需要变为1 1&#xff0c;集合元素个数加一。 当…

Maven 依赖传递和冲突、继承和聚合

一、依赖传递和冲突 1.1 Maven 依赖传递特性 1.1.1 概念 假如有三个 Maven 项目 A、B 和 C&#xff0c;其中项目 A 依赖 B&#xff0c;项目 B 依赖 C。那么我们可以说 A 依赖 C。也就是说&#xff0c;依赖的关系为&#xff1a;A—>B—>C&#xff0c; 那么我们执行项目 …

IC验证——perl脚本ccode_standard——c代码寄存器配置标准化

目录 1 脚本名称 2 脚本路径 3 脚本参数说明 4 脚本操作说明 5 脚本代码 1 脚本名称 ccode_standard 2 脚本路径 /scripts/bin/ccode_standard 3 脚本参数说明 次序 参数名 说明 1 address (./rfdig&#xff1b;.&#xff1b;..&#xff1b;./boot) 指定脚本执行路…

ES高级查询

ES中提供了一种强大的检索数据方式&#xff0c;这种检索方式称为Query DSL&#xff0c;这种方式的丰富查询语法让ES检索变得更强大&#xff0c;更简洁。 1.常见查询 1.1查询所有[match_all] match_all关键字&#xff1a;返回索引中的全部文档。 GET /products/_search { &…

Redis-redis.conf配置文件中的RDB与AOF持久化方式的详解与区别

RDB&#xff08;Redis Database&#xff09; RDB是Redis的默认持久化方式&#xff0c;它将内存中的数据以二进制格式写入磁盘&#xff0c;形成一个快照。RDB持久化有以下几个重要的配置选项&#xff1a; save&#xff1a;指定了保存RDB的策略&#xff0c;默认的配置是每900秒&…

域名群站开源系统分享开源域名授权系统

一、需要自己安装PHP和MYSQL服务器环境。 二、务必设置伪静态规则&#xff0c;否则将无法访问文章栏目页面。 三、启用伪静态功能&#xff0c;请在站点设置中选择使用thinkphp的伪静态规则。 四、在域名的根目录下找到”data/config.php”文件&#xff0c;填入数据库的账号和…