kafka入门到精通

news/2024/4/26 22:07:21/文章来源:https://blog.csdn.net/User_bie/article/details/128386503

文章目录

  • 一、kafka概述?
    • 1.定义
    • 1.2消息队列
      • 1.2.1 传统消息队列的使用场景
      • 1.2.2 消息队列好处
      • 1.2.3 消息队列两种模式
    • 1.3 kafka基础架构
  • 二、kafka快速入门
    • 1.1使用docker-compose安装kafka
    • 1.2测试访问kafka-manager
    • 1.3 查看kafka版本号
    • 1.4 查看zookeeper版本号
    • 1.5 扩展kafka的broker
    • 1.6 使用kafka
    • 1.7 测试生产者和消费者
    • 1.8 到zk 中查看节点信息,如下
    • 1.9 kafka群起脚本
  • 三、kafka架构深入
    • 1.kafka数据文件的存储
    • 2.kafka生产者
      • 2.1 分区策略
      • 2.2 数据可靠性保证
        • 2.2.1.**副本数据同步策略**
        • 2.2.2.ISR队列
        • 2.2.3 ack应答机制
        • 2.2.4 故障处理细节
      • 2.3Exactly Once语义(精准一次)
    • 3.kafka消费者
      • 3.1消费方式
      • 3.2分区分配策略(针对group)
        • 1.round robin
        • 2.range(默认分区分配策略)
      • 3.3offet的维护
      • 3.4消费者组案例
    • 4.kafka高效读写数据
    • 5.zk在kafka中的作用
    • 6.kafka事务
      • 6.1producer事务
      • 6.2comsumer事务(很少聊)
  • 四、kafka的API
    • 1.producer API
      • 1.1消息发送流程
      • 1.2.异步发送api
      • 1.3.同步发送api
    • 2.consumer API
    • 3.自定义interceptor
  • 五、kafka监控
    • **kafka eagle**
  • 六、flume对接kafka
  • 七、kafka面试题


文章整理自:尚硅谷kafka教程

一、kafka概述?

1.定义

kafka是一个分布式的基于发布/订阅模式的消息队列,主要应用于大数据实时处理领域。

1.2消息队列

1.2.1 传统消息队列的使用场景

在这里插入图片描述

1.2.2 消息队列好处

1.解耦
允许程序独立的扩展或两边的处理过程,只要确保它们遵守相同的接口约束

2.可恢复性
*系统一部分组件失效时,不会影响整个系统。消息队列降低了进程间的耦合度,所以即使一个消息处理进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理

3.缓冲
有助于控制和优化数据流经过系统的速度,解决生产消息远大于消费消息处理速度不一致的问题

4.灵活性(kafka服务扩缩容)&削峰处理
在访问量突增的情况下,应用仍然要继续发挥作用,但这样的突发流量并不常见。
如果因为以处理这类峰值的标准来部署应用则会造成巨大的资源浪费。使用消息队列能够使关键应用顶住突发访问压力,而不会因为突发的超负荷请求使整个服务崩溃。

5.异步通信
很多情况,用户也不需要立即处理消息。消息队列提供了异步处理机制,允许程序把一个消息放入队列,但并不需要立即处理。向消息队列中放入大量消息,在需要的时候程序再去进行处理

1.2.3 消息队列两种模式

点对点模式(一对一,消费者主动拉取数据,消息收到后清除消息)

消息生产者发送消息到queue中,消息消费者从queue中取出消息并消费消息。
消息被消费后,queue中不再存储该消息,所以消费者不可能消费到已经被消费的消息。
queue支持存在多个消费者,但对于一个消息,只会有一个消费者进行消费。

在这里插入图片描述

发布/订阅模式(一对多,消费者消费数据后不会清除消息)

消息生产者(发布)将消息发布到topic中,同时有多个消费者(订阅)消费该消息。
和点对点模式不同,发布到topic的消息会被所有订阅者消费
在这里插入图片描述
发布订阅模式也有两种:
1.消费者主动拉取;
缺点:消费者不知道topic中是否有消息,需要定时去轮询,比较浪费资源,所以出现了主动推送的模式
2.消息队列主动推送(如微信公众号)

1.3 kafka基础架构

在这里插入图片描述
topic:主题,对消息进行分类
partition:分区,主要提高kafka集群负载均衡,同时也提高了并发量
leader:针对于分区,消费者连接kafa只会连接leader
follower:针对于分区,仅仅数据备份,同一个partition的leader和follower一定不在同一台kafka服务上
comsumer goup:消费组,提高消费能力。多个消费者在同一个group时,消费消息时,一个分区的消息只能被同一个消费组的同一个消费者消费;消费者group中消费者的个数等于topic的partition数时,消费能力最合理,group中消费者数量大于partition分区数时,会造成资源浪费,多余消费者依然无法消费到消息。
zookeeper:管理kafa集群信息,存储消费位置信息(0.9版本前);
如:消费者A需要消费topic-partition0的10条消息,在消费到第5条时挂了,这时候消费进度的信息保存在zk里和内存中;
0.9版本后,offset存储在kafka集群的系统级topic中(默认存储磁盘7天),有kafka集群维护,主动拉取时高并发情况下对zk访问压力较大。

二、kafka快速入门

kafka的jar下载

1.1使用docker-compose安装kafka

version: '3'
services:zookeeper:image: wurstmeister/zookeeperports:- "2181:2181"kafka:image: wurstmeister/kafkaports:- "9092:9092"environment:KAFKA_ADVERTISED_HOST_NAME: 192.168.58.100 # 不能通过hostname来配置(消费者和生产者识别不到)KAFKA_CREATE_TOPICS: "test:1:1"KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181volumes:- /var/run/docker.sock:/var/run/docker.sockkafka-manager:image: sheepkiller/kafka-managerenvironment:ZK_HOSTS: zookeeper:2181ports:- "19000:9000"

1.2测试访问kafka-manager

192.168.253.100:19000
在这里插入图片描述
出现以上问题 一般是docker-compose.yml文件中的kafka配置ip地址有误

添加kafka节点
在这里插入图片描述

1.3 查看kafka版本号

docker exec kafka_compose-kafka-1 find / -name *kafka_* | head -1 | grep -o ‘\kafka[^\n]*’
在这里插入图片描述

1.4 查看zookeeper版本号

docker exec kafka_compose-zookeeper-1 pwd
在这里插入图片描述

1.5 扩展kafka的broker

启动时指定kafka的broker数量

#端口一致所以只能启动一个broker,可能需要配置swarm网络。未亲测
docker-compose up --scale kafka=3 -d 

查看kafka扩展后的容器名称

docker ps;CONTAINER ID   IMAGE                       COMMAND                   CREATED         STATUS         PORTS                                                                   NAMES
0b32ee5a3744   wurstmeister/kafka          "start-kafka.sh"          9 minutes ago   Up 9 minutes   0.0.0.0:32777->9092/tcp, :::32777->9092/tcp                             kafka_compose-kafka-4
0a655887b672   wurstmeister/kafka          "start-kafka.sh"          9 minutes ago   Up 9 minutes   0.0.0.0:32778->9092/tcp, :::32778->9092/tcp                             kafka_compose-kafka-1
e8fcd6cafa2c   sheepkiller/kafka-manager   "./start-kafka-manag…"   9 minutes ago   Up 9 minutes   0.0.0.0:19000->9000/tcp, :::19000->9000/tcp                             kafka_compose-kafka-manager-1
402829709dc9   wurstmeister/zookeeper      "/bin/sh -c '/usr/sb…"   9 minutes ago   Up 9 minutes   22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp   kafka_compose-zookeeper-1
fbee5d80662b   wurstmeister/kafka          "start-kafka.sh"          9 minutes ago   Up 9 minutes   0.0.0.0:32779->9092/tcp, :::32779->9092/tcp                             kafka_compose-kafka-2
8c78ffde4538   wurstmeister/kafka          "start-kafka.sh"          9 minutes ago   Up 9 minutes   0.0.0.0:32780->9092/tcp, :::32780->9092/tcp                             kafka_compose-kafka-3

1.6 使用kafka

需要进入一个容器

docker exec -it kafka_compose-kafka-1 /bin/sh

创建topic:

kafka-topics.sh --create --topic topic001 --partitions 4 --zookeeper zookeeper:2181 --replication-factor 2

查看topic

#查看一个topic
kafka-topics.sh --list --zookeeper zookeeper:2181 topic001
#查看所有topic
kafka-topics.sh --list --zookeeper zookeeper:2181

删除topic

# 需要配置server.properties的delete.topic.enable=true
kafka-topics.sh --delete --topic topic002 --zookeeper zookeeper:2181

查看刚刚创建的topic的情况,borker和副本情况

kafka-topics.sh --describe --zookeeper zookeeper:2181 topic001

1.7 测试生产者和消费者

(1) 启动消费者客户端

kafka-console-consumer.sh --topic topic001 --bootstrap-server kafka_compose-kafka-1:9092,kafka_compose-kafka-2:9092,kafka_compose-kafka-3:9092,kafka_compose-kafka-4:9092
# --from-beginning 参数会从topic开始的位置消费,如果不指定,不会消费当前时刻之前的信息

启动后控制台不会打印消息,因为没有生产者生产消息。
(2) 启动生产者并且发送消息客户端

kafka-console-producer.sh --topic topic001 --broker-list kafka_compose-kafka-1:9092,kafka_compose-kafka-2:9092,kafka_compose-kafka-3:9092,kafka_compose-kafka-4:9092

现在已经进入了生产消息的命令行模式,输入一些字符串然后回车,再去消费消息的控制台窗口看看,已经有消息打印出来,说明消息的生产和消费都成功了。

创建kafka集群时,需要勾选启动JMX PORT,broker通讯需要用到
在这里插入图片描述

1.8 到zk 中查看节点信息,如下

1.安装prettyZoo可视化软件
prettyZoo可视化安装

2.连接并查看zookeeper情况
在这里插入图片描述

1.9 kafka群起脚本

#/bin/bash
case #1 in
"start"){for i in hadoop102 hadoop103 hadoop104doecho "***********************$1***************************"ssh $1 "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/mudole/kafka/config/server.properties"	done
};;"stop"){for i in hadoop102 hadoop103 hadoop104doecho "***********************$1***************************"ssh $1 "/opt/module/kafka/bin/kafka-server-stop.sh "	done
};;esac

kafka无法启动时,主要是看server.log日志

三、kafka架构深入

生产者生产消息时,如主题不存在,默认会创建一个分区和一个副本。在server.properties中进行修改
kafka集群可以保证分区有序,不能保证全局数据有序

1.kafka数据文件的存储

.log文件和.index文件

kafka分区的两个重要文件
0000000000000000.log只存储数据
0000000000000000.index记录消费数据offet

1.log文件默认存储大小1g,超过1个g了,生成新文件讲如何命名?
.index 和 .log 文件的命名规则就是当前文件的最小offset值(偏移量值)

2.怎么快速定位到想要消费的位置?
引入分片和索引机制。
分片规则:log文件1g后会新增分片,每片段都包含一个对应的.log和.index文件
索引机制:index文件中,存储了每条消息的id、起始偏移量和消息的大小。
文件索引原理:index文件通过二分查找找到是哪个消息,通过消息去log文件中找到消息内容

2.kafka生产者

2.1 分区策略

1.分区原因
方便在集群扩展
提高并发,可以以partition为单位进行读写

2.分区原则(3种)
在这里插入图片描述
1).在指定partition分区时,直接将数据存放在指定的分区中
2).未指定partition分区时,根据key的值hash后取余topic的分区数,得到存放数据的partition
3).未指定partition分区和key时,第一次调用时随机生成一个整数(后面如果没有指定partition和key时,会在这个整数的基础上自增),根据这个值与topic的partion数量取余得到存放数据的partition。
这是round-robin算法。

2.2 数据可靠性保证

topic的每个partition收到producer发送的数据后,都会向producer发送akc(acknowledgment确认收到),如果producer收到ack,就会发送下一轮数据,否则重复发送

kafka何时发送akc方案:
在这里插入图片描述

2.2.1.副本数据同步策略

在这里插入图片描述

全同步策略:
5台机器最多容忍4台机器故障,topicA分区下的5个副本(leader+follower)分别位于5台机器上且都是全量数据,5个副本中只要保证1个正常,那么kafka数据就是稳定的。则需要n+1个副本

超半数同步策略:
同样如果要容忍4台机器故障且kafka可用,则至少有一个副本在4台故障后依然是有topicA的全量数据,最坏情况下,这4台故障机器都是半数已经同步的机器,那么为了保证kafka数据稳定且可用需要4*2+1个副本。

kafka最终选择的方案:所有fowoller同步完成后发送ack
1.优点:
解决数据冗余:同样为了容忍n台节点故障,超半数同步策略需要2n+1个副本,而全同步策略只需要n+1个副本,针对kafka的使用场景,每个分区都有大量的数据,第一种方案会造成大量数据冗余
2.缺点:
网络延迟:虽然全量同步方案网络延迟高,但对kafka的影响较小

2.2.2.ISR队列

针对kafka选择全量同步方案时,网络延迟较高的问题,kafka做了优化
ISR队列(in-sync replicats):作用1在生产消息时快速响应、2在leader选举时做了优化
在0.9版本以前:ISR队列中存储的副本取决于两个因素,即同步数据快的follower、同步数据多的follower将会优先存储在ISR队列中,以备leader选举时进行挑选,那些同步慢、同步数据小的副本将会被剔除,不被考虑作为下一任leader

在0.9版本及以后:ISR队列只考同步快的副本进入队列

  • 为什么0.9版本后ISR队列取消了follower同步数量的条件?
    producer批量发送消息时,如果这个batch数量大于ISR同步的数量,那么久会造成延迟且数据不在范围内,可能就会从ISR中批量剔除副本,会造成频繁的ISR队列进出和zk的写入

现任版本:Leader维护了一个动态的ISR队列,意为和leader保持同步的follower集合。当ISR中的leader存储完producer的消息后,leader会给follower发送ack,如果follower长时间没有从leader同步数据,将会被剔除ISR队列(该时间阈值由replicat.lag.time.max.ms参数设定),一旦收到了ISR队列中所有follower的ACK,该消息就被确认commit了,leader将增加HW并且想producer发送ACK。
leader故障后,会从ISR队列中重新选区leader。

2.2.3 ack应答机制

对于某些不太重要的数据,对数据可靠性不是很高,能够容忍少量数据丢失,所以没有必要等ISR中所有follower全部接收成功。
kafka提供了3中可靠性的级别,用户根据可靠性和延迟的要求进行权衡进行选择
acks参数配置
acks=0时,producer不等待broker的ack,这种操作延迟最低,broker接收到消息还没写入磁盘就返回,当broker发生故障时有可能丢失数据
acks=1时,producer等待broker的ack,partition的leader落盘成功后返回ack,如果follower在同步成功前leader发生故障有可能丢失数据
acks=-1(all)时,producer等待broker的ack,partition的leader和follower全部落盘成功后才会返回ack。但是如果follower同步完成后,broker返回producer的ack之前,leader发生故障,那么会造成数据重复;当ISR中没有可用的follower时,leader返回ack后此时leader宕机并未将消息同步给其他ISR之外的follower时,就会丢失数据

2.2.4 故障处理细节

在这里插入图片描述
LEO:每个副本最大的offset
HW:高水位,是指消费者能见到的最大offset,ISR队列中最小的LEO;保证消费者获取数据一致性;保证副本之间的数据一致性

1.follower故障
follower发生故障后会被临时剔除ISR,等待follower恢复后,follower会读取本地磁盘记录的上次的HW,并将log文件中高于HW的部分截取掉,从HW开始想leader同步。等该follower的LEO大于等于该topic的HW,即follower追上leader后,就可以重新加入ISR
2.leader故障
leader发生故障后,会从ISR中选举一个新的leader,之后为保证副本之前的一致性,其余的follower会先将各自的log文件中高于HW部分截取掉,然后重新想leader同步数据

这里只能保证副本之间数据的一致性。并不能保证数据不丢失或重复(ack决定)。

2.3Exactly Once语义(精准一次)

将kafkaserver的ack级别设置为-1,可以保证producer到server之间不会丢失数据,即是at least once语义;
相对的将ack级别设置为0,可以保证生产者每条消息只会发送一次,即at most once语义;
at least once可以保证数据不丢失,但是不能保证数据不重复;
at most once可以保证数据不重复,但是不能保证数据不丢失;
但对于一些非常重要的消息,比如说交易数据,下游的消费者要求数据即不能重复也不能丢失,即是Exactly Once语义;

kafka在0.11以前,对此是无能为力,只能保证数据不丢失,在消费者对数据做全局去重。对于多个下游应用的情况下,每个应用都要单独去重,这对性能造成了很大影响。

0.11之后,引入了幂等性;指的是producer无论向server发送多少次重复数据,server端都只会持久化一条。幂等性+at least once = exactly once

3.kafka消费者

3.1消费方式

1.comsumer采用pull的方式从broker拉去数据;pull可以根据消费能力调整消费速率。
2.broker向comsumer push的方式很难适应comsumer,因为push的频率是由broker决定的。push频率过快会comsumer来不及消费,表现为拒绝服务或网络阻塞。

pull不足:kafka没有消息被消费时,会陷入循环中,一直返回空数据,会造成资源浪费。kafka引入了timeout参数,comsumer pull返回空后,在timeout时间之后再去拉去。

3.2分区分配策略(针对group)

一个消费组有多个comsumer,一个topic有多个partition,消费时会涉及到partiion分配问题。

1.round robin

按照消费组来进行轮训分配
使用前提:消费组消费的是一个topic

2.range(默认分区分配策略)

根据topic来进行范围分配,多个主题被消费时,消费者消费数据不对等问题

当comsumer个数发生变化时,都会触发分配策略,重新分配消费分区。
当comsumer个数大于partition个数时,也会触发重新分配,会有多余的comsumer分配不到partition。

3.3offet的维护

comsumer group+topic+partition确定一个offset

3.4消费者组案例

4.kafka高效读写数据

1.顺序写磁盘
2.零拷贝

5.zk在kafka中的作用

controller:broker抢占zk中的controller,谁先来谁就是controller,controller是哪个broker无所谓,kafka的数据都是共享的,只是由这个身份为controller的broker来维护与zk的信息

在这里插入图片描述

6.kafka事务

kafka从0.11版本开始支持事务。事务可以保证kafka在exactly once语义的基础上,生产者和消费者可以跨分区和会话,要么全部成功,要么全部失败。

6.1producer事务

为了实现跨分区会话的事务,需要引入一个全局的transactionID(producer客户端自己生成),并且producer的pid和tid绑定,这样当producer重启后就可以通过正在进行的tid来获得原来的pid;
为了管理transaction,kafka引入了一个新的组件transaction coordinator。producer就是通过transaction coordinator交互获得TID对应的任务状态;coordinator还负责将事务所有写入kafka内部的一个topic,这样即使整个服务重启,由于事务状态可以保存,进行中的事务状态可以恢复,从而继续运行。

6.2comsumer事务(很少聊)

上述的事务机制主要是从producer方面去考虑,对于comsumer而言,事务的保证相对较弱,尤其是无法保证commit的信息被精确消费。这是由于comsumer可以通过offset访问任何信息,而且segmentFile的生命周期不同,同一事务的消息可能会出现重启后被删除的情况。(如segmnetFile01刚好7天过期,consumer批量消费时,恰好跨了两个segmentFile,重启后发现segmentFile01刚好过期,则无法读取到过期的数据)

四、kafka的API

1.producer API

1.1消息发送流程

kafka的producer消息时异步发送的。在消息发送过程中涉及到了两个线程main和sender,以及线程共享的一个变量RecordAccumulator。main线程将消息发送给RecordAccumulator,sender线程不断从RecordAccumulator中拉去并发送到broker。

在这里插入图片描述
相关参数:
batch.size: 只有数据累计到这个值的时候,sender才会发送数据
linger.ms:当batch.size迟迟没有累计到这个值,到了linger.ms时间是,sender也会发送

1.2.异步发送api

1.导入依赖
在这里插入图片描述
2.编写代码
需要用到的类:
KafkaProducer:创建一个生产者对象用来发送数据
ProducerConfig:获取所需的一系列参数
ProducerRecord:每一条消息封装成为一个对象

1.3.同步发送api

2.consumer API

3.自定义interceptor

五、kafka监控

kafka eagle

六、flume对接kafka

七、kafka面试题

kafka基础架构

1.Kafka中的ISR(InSyncRepli)、OSR(OutSyncRepli)、AR(AllRepli)代表什么?

ISR:副本同步队列,速率和leader相差小于10秒的follower集合
OSR:非副本同步队列,速率和leader相差大于10秒的follower集合
AR:所有分区的follower,AR=ISR+OSR

2.Kafka 中的 HW、LEO 等分别代表什么?

HW:高水位,根据同一分区中,最低的LEO所决定,是消费者能见可消费的数据
LEO: 每个副本最高的offset
在leader、follower节点故障后,partition会对HW和每个副本的LEO进行调整

3.Kafka的用途有哪些?使用场景如何?

1.用户追踪:根据用户在web或app上的操作,将这些操作记录到topic中,消费者订阅这些消息做实时的分析和数据挖掘
2.日志收集:通过kafka对各个服务的日志进行收集,再开放给comsumer
3.系统消息:缓存消息
运营指标:记录运营监控数据,搜集操作应用数据的集中反馈,如报错和报告

4.Kafka中是怎么体现消息顺序性的?

每个分区内,每条消息都有offset,所以只能在同一分区内有序;不同的分区无法做到消息的顺序性

5.“消费组中的消费者个数如果超过topic的分区,那么就会有消费者消费不到数据”这句话是否正确?

是。超过分区数的消费者是接受不到数据的,只要有消费者接入 就会触发分区分配策略

6.有哪些情形会造成重复消费?或丢失信息?

重复消费:先处理业务后提交offset,可能会造成重复消费
丢失信息:先提交offset再处理业务,可能会造成信息丢失

7.Kafka 分区的目的?

对kafka集群来说,分区做到负载均衡;对于消费者来说,可以提高并发度,提高读取效率

8.Kafka 的高可靠性是怎么实现的?

为了实现高可靠性,kafka使用了订阅模式,并且使用ISR和ack应答机制
能进入ISR中的follower和leadeer之间同步速率相差小于10秒
当ack=0时, producr不等待broker的ack,不管数据有没有写成,都不再重发这个数据
当ack=1时,broker会等leader写完数据后想producer发送ack,但不会等follower同步数据;在follower数据未同步前,leader挂掉,producer会再次发送新的消息到新的leader中,old的leader未同步的消息就会丢失
当ack=all(-1)时,broker会等到leader和isr中的所有follower都同步完成后再想producer发送ack;当follower数据同步完成返回producer ack前,leader挂掉,producer数据重发就会造成数据重复。

9.topic的分区数可不可以增加?如果可以怎么增加?如果不可以,那又是为什么?

可以增加

bin/kafka-topics.sh --zookeeper localhost:2181/kafka --alter --topic topic-config --partitions 3

10.topic的分区数可不可以减少?如果可以怎么减少?如果不可以,那又是为什么?

不可以,现有的分区数据难以处理

11.简述Kafka的日志目录结构?

每一个分区对应一个文件夹,命名为topic-0或topic-1,,每个文件夹内有.index文件和.log文件;
.log文件存储数据
.index文件存储.log文件的数据id,起始偏移量和数据大小;通过index

12.如何解决消费者速率低的问题?

增加topic分区的数量、增加消费者个数

13.Kafka的那些设计让它有如此高的性能??

1.kafka是分布式的消息队列
2.对log文件进行了segment,并对segment文件进行索引
3.对于单节点使用了顺序读写,速度可达600M/S
4.引入了零拷贝,在os系统上就能完成读写操作,无需进入kafka应用(用户态)

14.kafka启动不起来的原因?

在关闭kafka时,先关闭了zk,zk中保留了kafka的id信息,会导致kafka下一次启动时报节点已经存在
把zk中的zkdata/version-2的文件删除就可以了

15.聊一聊Kafka Controller的作用?

负责kafka集群上下线工作,所有topic的副本分区分配和leader选举

16.Kafka中有那些地方需要选举?这些地方的选举策略又有哪些?

1.kafka的controller选举,先到先得
2.partitioin的leader选举,从isr中随机选取

17.失效副本是指什么?有那些应对措施?

失效副本:同步速率比leader相差大于10秒的副本
将失效的副本剔除ISR队列,进入OSR队列
失效副本等于leader同步速率小于10秒后从新进入ISR队列

18.Kafka消息是采用Pull模式,还是Push模式?

在producer阶段,采用的是push模式
在comsumer阶段,采用的是pull模式
comsumer在pull模式下:
优点:
comsumer可以根据自己消费能力调整消费速率,避免了broker push到comsumer时消费不及时而导致的崩溃问题;
缺点:consumer要时不时的去询问broker是否有新数据,容易发生死循环,内存溢出
解决办法:拉去不到数据时,增加下次拉去的时间,有api

19.Kafka创建Topic时如何将分区放置到不同的Broker中?

1.副本数不能超过broker数量
2.第一个分区是controller从broker中随机选取一个,然后其他分区相对0号分区依次向后移,第一个分区是用nextReplicatShift决定,而这个数也是随机产生

20.Kafka中的事务是怎么实现的?☆☆☆☆☆

kafka有两种事务:producer事务和consumer事务
producer事务是为了解决kafka跨分区跨会话的问题
kafka早起版本不能跨分区是因为producer的pid是kafka server根据producer生成的
为了解决这个问题,在java代码中给producer指定id,也就是transaction id,简称TID
我们将TID和PID进行绑定,在producer带着TID和PID第一次想broker注册时,broker会记录TID,并生成一个新的组件_transaction_state用来保存TID的事务状态信息
当producer重启后,就会带着TID和新的PID想broker发起请求,当发现TID一致时,producer就会获取之前的PID,将新的PID覆盖掉,并且去上一次事务的状态信息,从而继续上次的工作;
consumer事务相对于producer的事务相对弱一点,需要先确保consumer的消费和提交位置为一致且具有事务功能,才能保证数据的完成,不然就会造成数据的丢失或重复

21.Kafka中的分区器、序列化器、拦截器是否了解?它们之间的处理顺序是什么?

拦截器>序列化器>分区器
拦截器拦截处理无效信息
序列化加密数据
分区分配原则

22.Kafka生产者客户端的整体结构是什么样子的?使用了几个线程来处理?分别是什么?
在这里插入图片描述

使用了2个线程:main线程和sender线程
main线程会依次经过拦截器、序列化器、分区器、将数据发送到RecordAccumlator(x线程共享变量),再有sender线程从RecordAccumlate中拉取数据并发送到kafka broker,
batch.size:只有数据累计到batch.size后,sender才会发送数据。
linger.ms:如果数据迟迟未达到batch.size,sender线程等待linger.ms之后发送数据

23.消费者提交消费位移时提交的是当前消费到的最新消息的offset还是offset+1?

答案是:offset+1;测试证明:

生产者发送数据offset是从0开始的:如下
在这里插入图片描述
消费者消费的数据offset是从offset+1开始的:如下
在这里插入图片描述

24.当你使用 kafka-topics.sh 创建(删除)了一个 topic 之后,Kafka 背后会执行什么逻辑?

1)会在 zookeeper 中的/brokers/topics 节点下创建一个新的 topic 节点,如:/brokers/topics/first
2)触发 Controller 的监听程序
3)kafka Controller 负责 topic 的创建工作,并更新 metadata cache

25.Kafka 有内部的 topic 吗?如果有是什么?有什么所用?

有 __consumer_offsets,保存消费者offset

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_73823.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

python学习之OpenCV-Python模块的部分应用示例(生成素描图和动漫图)

文章目录前言一、图片转灰度二、对图片进行二值化处理三、对图片去除噪点四、调整图片透明度五、生成素描滤镜效果图(方法结合应用)六、生成动漫卡通滤镜效果图(方法结合应用)总结前言 OpenCV 是一个图像和视频处理库&#xff0c…

掌握饮食健康:了解你的宏量营养素摄入

谷禾健康 // 俗话说“病从口入”,我们的健康状况很大一部分取决于饮食。而食物基本上是由各种营养素构成的。 宏量营养素是人体大量需要的必需营养成分。宏量营养素指的是“三大”营养素:蛋白质、脂肪和碳水化合物,它们是我们饮食中的关键。 …

【JavaScript】基本语法大全

前言: 大家好,我是程序猿爱打拳。在学习C和Java这样的后端编程语言后,我们大概率会学习一些关于前端的语言如HTMLJavaScript。又因为前后端基本语法有些许不同,因此我整理出来。今天给大家讲解的是JS中的数据类型、运算符、选择结…

【华为OD机试模拟题】用 C++ 实现 - 最低位排序(2023.Q1)

最近更新的博客 【华为OD机试模拟题】用 C++ 实现 - 货币单位换算(2023.Q1) 【华为OD机试模拟题】用 C++ 实现 - 选座位(2023.Q1) 【华为OD机试模拟题】用 C++ 实现 - 停车场最大距离(2023.Q1) 【华为OD机试模拟题】用 C++ 实现 - 重组字符串(2023.Q1) 【华为OD机试模…

Eth-trunk :LACP模式链路聚合实战

Eth-trunk : LACP模式链路聚合实战 需求描述 PC1和PC3数据vlan10 ,网段为192.168.10.0 /24PC2和PC4数据vlan20 ,网段为192.168.20.0 /24确保设备之间互联互通,使用最大互联带宽并没有环路确保相同网段的PC可以互通判断交换机之间的每个端口…

ros下用kinectv2运行orbslam2

目录 前提 创建工作空间 orbslam2源码配置、测试: 配置usb_cam ROS功能包 配置kinect 前提 vim 、 cmake 、 git 、 gcc 、 g 这些一般都装了 主要是Pangolin 、 OpenCV 、 Eigen的安装 18.04建议Pangolin0.5 创建工作空间 我们在主目录下创建一个catkin_…

Node 10.0.8.6:9003 is unknown to cluster

解决方案解决方案一解决方案一 ① 概念介绍 公网ip:就是任意两台连接了互联网的电脑可以互相ping ip,能够通的ip 内网ip:只是在内网中使用无法与外网连接的ip ②问题背景 在腾讯云上搭建的一个redis集群,集群启动后 可以看到启动节点…

TX Text Control .NET Server for ASP.NET 31.0 SP2 CRK

用于 ASP.NET 31.0 SP2 的 TX 文本控件 .NET 服务器 用于 ASP.NET 的 TX 文本控件 .NET 服务器 TX Text Control Server for ASP.NET 是用于 Web 应用程序或服务的服务器端组件。它是一个完全可编程的 ASP.NET 文字处理器引擎,提供了广泛的文字处理功能。使用 TX Te…

C++中的内存管理

文章目录前言1.C中内存空间的划分2.C内存管理方式1.对内置类型的处理2.对自定义类型的处理3.new和delete实现原理4.定位new3.总结1. malloc/free和new/delete的区别2. 内存泄漏前言 C中的内存空间划分和C语言是很像的,基本上区别不大。但是因C中,引入了…

davis2016评估教程

DAVIS 2016是VOS任务中的一个经典的benchmark,但是一些VOT的算法有时候也可以预测mask,所以也会在上面测一测性能,本次就随手记录一下自己评测的过程,有需要的小伙伴可以往下看。 DAVIS 2016数据集官方项目网站:https:…

TCP四次挥手

TCP 四次挥手过程是怎样的? TCP 断开连接是通过四次挥手方式。 双方都可以主动断开连接,断开连接后主机中的「资源」将被释放,四次挥手的过程如下图: 客户端打算关闭连接,此时会发送一个 TCP 首部 FIN 标志位被置为 1…

node报错

记录bug:运行 npx -p storybook/cli sb init 时报错gyp info spawn C:\Program Files\Microsoft Visual Studio\2022\Community\MSBuild\Current\Bin\MSBuild.exegyp info spawn args [gyp info spawn args build/binding.sln,gyp info spawn args /nologo,gyp info spawn args…

prometheus + alterManager + 飞书通知,实现服务宕机监控告警;实测可用

架构设计图 最终效果图 项目准备 xml依赖 <!-- 监控相关 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>io.…

消息队列--Kafka

Kafka简介集群部署配置Kafka测试Kafka1.Kafka简介 数据缓冲队列。同时提高了可扩展性。具有峰值处理能力&#xff0c;使用消息队列能够使关键组件顶住突发的访问压力&#xff0c;而不会因为突发的超负荷的请求而完全崩溃。 Kafka是一个分布式、支持分区的&#xff08;partition…

JAVA 8 新特性 Lamdba表达式

Java8 新特性&#xff1a; 1、Lamdba表达式 2、函数式接口 3、方法引用和构造引用 4、Stream API 5、接口中的默认方法和静态方法 6、新时间日期API 7、Optional 8、其他特性 Java8 优势&#xff1a;速度快、代码更少&#xff08;增加了新的语法 Lambda 表达式&#xff09;、强…

Android 架构 MVC MVP MVVM,这一波你应该了然于心

MVC&#xff0c;MVP和MVVM是软件比较常用的三种软件架构&#xff0c;这三种架构的目的都是分离&#xff0c;避免将过多的逻辑全部堆积在一个类中。在Android中&#xff0c;Activity中既有UI的相关处理逻辑&#xff0c;又有数据获取逻辑&#xff0c;从而导致Activity逻辑复杂不单…

Wireshark抓包

Wireshark 1 抓包时间显示格式 2 界面显示列设置 3 protocol协议解析 4 过滤器 tcp.port&#xff1a;TCP端口tcp.dstport&#xff1a;TCP目的端口tcp.srcport&#xff1a;TCP源端口udp.port&#xff1a;UDP端口udp.dstport&#xff1a;UDP目的端口udp.srcport&#xff1a;UDP…

月薪过3W的软件测试工程师,都是怎么做到的?

对任何职业而言&#xff0c;薪资始终都会是众多追求的重要部分。前几年的软件测试行业还是一个风口&#xff0c;随着不断地转行人员以及毕业的大学生疯狂地涌入软件测试行业&#xff0c;目前软件测试行业“缺口”已经基本饱和。当然&#xff0c;我说的是最基础的功能测试的岗位…

良许也成为砖家啦~

大家好&#xff0c;我是良许。 没错&#xff0c;良许成为砖家啦&#xff0c;绝不是口嗨&#xff0c;有图有真相&#xff01; 有人会说&#xff0c;咦&#xff0c;这明明是严宇啊&#xff0c;跟你良许有啥关系&#xff1f; 额。。老读者应该知道良许的来历—— 鄙人真名严宇&a…

Python-datetime、time包常用功能汇总

目录基础知识时间格式有哪些&#xff1f;Python中的时间格式化时间戳datetimedatedatetimetimedeltatime常用获取今天凌晨字符串&#xff1f;将一个时间格式的字符串转为时间戳将一个时间戳转为指定格式的字符串全部代码参考基础知识 时间格式有哪些&#xff1f; 「格林威治标…