Kafka由浅入深(二)—— 生产者工作原理

news/2024/5/20 4:14:58/文章来源:https://blog.csdn.net/dreamcatcher1314/article/details/127328194

1、生产者的流程架构

 

生产者主体逻辑整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和Sender 线程(发送线程)。

1.1 主线程:

在主线程中由KafkaProducer 创建消息,然后通过可能的拦截器、序列化器和分区器的作 用之后缓存到消息累加器( RecordAccumulator ,也称为消息收集器〉中。

1.2 Sender线程:

Sender 线程负责从RecordAccumulator 中获取消息并将其发送到Kafka

2、拦截器主线程核心功能

2.1、拦截器:

生产者拦截器可以用来在消息发送前做一些准备工作,可以修改消息发送的内容,但是拦截器的所有方法都不会对外抛出异常。

拦截器常见使用场景:

1、按照某个规则过滤不符合要求的消息

2、修改消息的内容等

3、统计类工作

查看org.apache.kafka.clients.producer.KafkaProducer 类的send() 方法,拦截器的处理是通过责任链设计模式去注入,拦截器是在消息发送第一步处理的逻辑。

自定义生产者拦截器,只需要实现org.apache.kafka.clients.producer.ProducerInterceptor接口和对应的方法

1、onSend() 方法: 可以对消息进行定制化的操作,但是一般不修改ProducerRecord的topic、key和partition等信息,如果修改可能会影响到分区计算、broker端日志压缩功能

2、onAcknowledgement() 方法:

    a、消息被应答之前或消息发送失败调用生产者拦截器的onAcknowledgement()方法,优先于用户设定的Callback之前执行。
    b、这个方法通常在Producer的后台I/O线程中执行,所以这个方法的逻辑越简单越好,否则,会影响到消息发送的效率和速度。

3、close() 方法:关闭拦截器的时候,清理资源

public interface ProducerInterceptor<K, V> extends Configurable, AutoCloseable {// 可以对消息进行定制化的操作,但是一般不修改ProducerRecord的topic、key和partition等信息,如果修改可能会影响到分区计算、broker端日志压缩功能ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);// 消息被应答之前或消息发送失败调用生产者拦截器的onAcknowledgement()方法,优先于用户设定的Callback之前执行// 这个方法通常在Producer的后台I/O线程中执行,所以这个方法的逻辑越简单越好,否则,会影响到消息发送的效率和速度// 调用方将忽略此方法引发的任何异常void onAcknowledgement(RecordMetadata metadata, Exception exception);// 关闭拦截器的时候,清理资源void close();
}// 在拦截器中所抛出的异常都会记录到日志中,不会向上传递,所以拦截器所产生的异常,不会影响到主流程。

拦截器注入位置

1、onSend() 拦截器注入在调用KafkaProducer的send()方法第一步执行

public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {// intercept the record, which can be potentially modified; this method does not throw exceptionsProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);return doSend(interceptedRecord, callback);}

 2、onAcknowledgement() 拦截器注入分两种场景

     a、消息发送完成 KafkaProducer 的onCompletion()方法

     b、消息发送异常 this.interceptors.onSendError(record, appendCallbacks.topicPartition(), e);


public void onCompletion(RecordMetadata metadata, Exception exception) {if (metadata == null) {metadata = new RecordMetadata(topicPartition(), -1, -1, RecordBatch.NO_TIMESTAMP, -1, -1);}this.interceptors.onAcknowledgement(metadata, exception);if (this.userCallback != null)this.userCallback.onCompletion(metadata, exception);
}

消息发送异常 的处理源代码: org.apache.kafka.clients.producer.internals.ProducerInterceptors 拦截器中 onSendError()中,也调用应答的处理。

 onSendError的处理逻辑:

public void onSendError(ProducerRecord<K, V> record, TopicPartition interceptTopicPartition, Exception exception) {for (ProducerInterceptor<K, V> interceptor : this.interceptors) {try {if (record == null && interceptTopicPartition == null) {interceptor.onAcknowledgement(null, exception);} else {if (interceptTopicPartition == null) {interceptTopicPartition = extractTopicPartition(record);}interceptor.onAcknowledgement(new RecordMetadata(interceptTopicPartition, -1, -1,RecordBatch.NO_TIMESTAMP, -1, -1), exception);}} catch (Exception e) {// do not propagate interceptor exceptions, just loglog.warn("Error executing interceptor onAcknowledgement callback", e);}}}

2.2、序列化:

序列化的原因:Kafka服务端接收的数据格式是字节数组(byte[]), 所以生产者需要用序列化器(Serializer)把对象转换成字节数组才能通过网络发送给Kafka, 消费者从Kafak中获取字节数组数据,再通过反序列化器(Deserializer)成相应的对象。因此,生产者和消费者的序列化规则需要保持一致。

常见的序列化方式:org.apache.kafka.common.serialization.Serializer接口是Kafka的父接口,客户端自带的String的序列化器StringSerializer(org.apache.kafka.common.serialization.StringSerializer),以及ByteArray、ByteBuffer、Double、Integer、Long 等类型,都是实现与 Serializer 接口。

 org.apache.kafka.common.serialization.Serializer接口提供的三个方法:

1、configure(Map<String, ?> configs, boolean isKey) 

    Serializer类的方法是配置当前类, Map<String, ?> configs 参数是key/value键值对的配置,boolean isKey 是key还是value得参数。

2、serialize()

    对序列化方式的具体处理逻辑

3、close()

public interface Serializer<T> extends Closeable {default void configure(Map<String, ?> configs, boolean isKey) {// intentionally left blank}byte[] serialize(String topic, T data);default byte[] serialize(String topic, Headers headers, T data) {return serialize(topic, data);}@Overridedefault void close() {// intentionally left blank}
}

我们可以来看一下Kafka客户端StringSerializer 是如何进行字符串序列化。StringSerializer默认的编码集是UTF-8,也提供了自定义设计编码集。serialize(String topic, String data) 的实现逻辑也很简单,是通过String.getBytes()来实现字符串转byte[]。


public class StringSerializer implements Serializer<String> {private String encoding = StandardCharsets.UTF_8.name();@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";Object encodingValue = configs.get(propertyName);if (encodingValue == null)encodingValue = configs.get("serializer.encoding");if (encodingValue instanceof String)encoding = (String) encodingValue;}@Overridepublic byte[] serialize(String topic, String data) {try {if (data == null)return null;elsereturn data.getBytes(encoding);} catch (UnsupportedEncodingException e) {throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);}}
}

下面的代码也是Kafka基于Jackson 实现Json转byte[]的序列化器。

package org.apache.kafka.connect.json;
public class JsonSerializer implements Serializer<JsonNode> {private final ObjectMapper objectMapper = new ObjectMapper();public JsonSerializer() {this(Collections.emptySet(), JsonNodeFactory.withExactBigDecimals(true));}JsonSerializer(final Set<SerializationFeature> serializationFeatures,final JsonNodeFactory jsonNodeFactory) {serializationFeatures.forEach(objectMapper::enable);objectMapper.setNodeFactory(jsonNodeFactory);}@Overridepublic byte[] serialize(String topic, JsonNode data) {if (data == null)return null;try {return objectMapper.writeValueAsBytes(data);} catch (Exception e) {throw new SerializationException("Error serializing JSON message", e);}}
}

可以通过JSON、Protostuff、ProtoBuf、Thrift等常用的序列化工具实现,实现自定义序列化,满足业务自定义需求。

2.3、分区器:

分区器的作用是为消息分配分区,使得消息存储分布式存储。

1、如果消息ProducerRecord指定发送分区发送,则就不会使用到分区器。指定分区发送的实现方式,是在消息ProducerRecord设置partition 分区号;

2、如果不指定分区发送,则需要使用分区器经过规则计算出partition分区号,将消息发送到指定的分区。

Partitioner(org.apache.kafka.clients.producer.Partitioner)是Kafka的分区器父接口。Kafka的默认分区器是DefaultPartitioner(org.apache.kafka.clients.producer.internals.DefaultPartitioner),

默认分区器DefaultPartitioner的实现是partition()方法中定义了主要的分区分配逻辑。如果key不为null,默认的分区器会key 进行哈希(采用MurmurHash2 算法,具备高运算性能及低碰撞率),最终根据得到的哈希值来计算得到分区号, 相同key 的消息会被写入同一个分区。如果key 为null ,消息将通过轮询的方式发往主题内的各个可用分区。

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,int numPartitions) {if (keyBytes == null) {// 没有key值,则通过粘性分区分配分区return stickyPartitionCache.partition(topic, cluster);}// 对key 进行哈希,采用MurmurHash2算法,具备高运算性能及低碰撞率return BuiltInPartitioner.partitionForKey(keyBytes, numPartitions);}

但是在新版的Kafka(3.3.1)客户端不建议使用默认分区器,信息如下。

NOTE this partitioner is deprecated and shouldn't be used.  To use default partitioning logicremove partitioner.class configuration setting.  See KIP-794 for more info.

KIP-794 改进了默认分区器,以在健康的代理之间分批均匀分布非键控数据,而向不健康的代理分配更少的数据。例如,具有异常行为的生产者工作负载的 p99 延迟从 11 秒减少到 154 毫秒

Kafka官方KIP-794链接:

K​​​​​IP-794: Strictly Uniform Sticky Partitioner - Apache Kafka - Apache Software Foundation

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_24042.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

带你吃透Servlet核心编程下篇(完整图文教程)

本文被 系统学习JavaWeb 收录点击订阅专栏 文章目录1 Http协议1.1 什么是 HTTP 协议1.2 GET请求与POST请求1.3 响应的HTTP协议格式1.4 MIME数据类型2 HttpServletRequest类2.1 HttpServletRequest说明及常用方法2.2 HttpServletRequest类演示2.3 获取请求表单中的参数值&#x…

车车基础知识扫盲

排量 排量是指发动机气缸工作容积之和。所谓工作容积就是活塞在一个冲程内经过的区域的体积。气缸的总容积减去活塞的工作容积&#xff0c;剩下的就是压缩容积&#xff0c;压缩容积是用来燃烧的。 排量的单位是升(L)&#xff0c;常见的排量的标识有三种&#xff0c;T&#xff…

SpringMvc模块

SpingMVC 模块 简介 Spring MVC是一种基于MVC架构模式的轻量级Web框架。 SpringMVC处理过程 Spring MVC的处理过程&#xff1a; DispatcherServlet 接收用户的请求找到用于处理request的 handler 和Interceptors&#xff0c;构造成 HandlerExecutionChain执行链找到 handle…

宏任务与微任务

原文:做一些动图,学习一下EventLoop (https://juejin.cn/post/6969028296893792286)一、任务队列JavaScript 是单线程执行的语言, 在同一时间只能干一件事情。如果前面的任务很耗时后面的任务就会一直等待,为了解决这个问题,js中出现了同步任务和异步任务 1.1 同步任务在主…

Linux服务器部署Mysql5.7全过程记录

1、先下载安装包文件 mysql-5.7.27-linux-glibc2.12-x86_64.tar Mysql5.7.27 Linux安装包 链接&#xff1a;https://pan.baidu.com/s/1p4KmDp5O2bGJLXUHOHMQFQ 提取码&#xff1a;4692 2、解压 cd /usr/local 切换到安装包所在目录 tar -zxvf mysql-5.7.30-l…

【数据获取】可以公开获取到的百度迁徙数据

百度迁徙数据是一种较为常用的互联网数据&#xff0c;在之前的文章里小编已经讲了百度迁徙数据是什么、怎么获取、该如何处理、怎么用它做和弦图这些内容。但是其中数据的获取部分一直没有详细讲解&#xff0c;那么该如何获取它呢&#xff1f; 今天&#xff0c;就告诉大家一个…

教学设计题-教学过程

空间中直线与平面之间的位置关系 生活中的三种位置关系的实例 直线在平面内&#xff1a;开门关门时&#xff0c;门轴所在的直线在门所在平面内 直线与平面相交&#xff1a;操场上&#xff0c;升旗的旗杆所在直线与地面所在平面相交 直线与平面平行&#xff1a;黑板的一条边所在…

护肤 第三课

皮肤的生长周期一般是1-2个月 所以护肤品想要其效果 一般就是这个周期才会有效果 外用护肤品只能渗透到表皮层或者真皮层的表层&#xff0c;只有医疗美容的方法才有机会到真皮层 黑色素 黑色素细胞在基底层 黑色素细胞能产生黑色素 黑色素的作用&#xff1a;吸收和散射紫外线…

A Survey on Big Data Market: Pricing, Trading and Protection

基于大数据市场&#xff1a;定价、交易、保护的研究 作者&#xff1a;FAN LIANG, WEI YU , DOU AN, QINGYU YANG, XINWEN FU, AND WEI ZHAO 文章目录基于大数据市场&#xff1a;定价、交易、保护的研究Abstract1.Intro2.大数据的基本概念2.1.大数据的定义2.2.大数据的好处和挑…

【23秋招c++后端面试技术突围】mysql通俗易懂的数据库连接池原理及模拟实现

什么是数据库连接池&#xff1f; 当系统使用JDBC技术访问数据库时会创建一个connection对象&#xff0c;而该对象的创建过程是非常消耗资源的&#xff0c;并且创建对象的时间也特别长&#xff0c;假设系统一天有1万次的访问量&#xff0c;那么一天就会有1万个connection对象被…

Acetal-NHS (SDMB),乙缩醛-琥珀酰亚胺酯

An English name&#xff1a;Acetal-NHS (SDMB) Chinese name&#xff1a;乙缩醛-琥珀酰亚胺酯 Item no&#xff1a;X-GF-0136 Density: PEG density is approximately 1.125 g/mL Molecular formula&#xff1a; Physical form&#xff1a;PEG products generally appear…

CMake中find_file的使用

CMake中的命令find_file用于查找指定文件(named file)的完整路径&#xff0c;其格式如下&#xff1a;将创建一个由<VAR>命名的缓存条目即cache变量&#xff0c;将<VAR>的值存入CMakeCache.txt中);或如果指定了NO_CACHE&#xff0c;由<VAR>命名的普通变量来存…

文件IO操作笔记

目录 1.文件操作 2.File类 3.流&#xff08;针对文件内容操作读写&#xff09; 3.1 InputStream 3.2 Scanner 4.练习 1.文件操作 狭义文件就是存储在硬盘上的数据&#xff0c;以“文件”为单位 广义上的文件就是操作系统负责管理硬件资源&#xff0c;操作系统&#xff08;…

按钮(Button)组件

1.按钮组件的属性 属性 说明 onPressed 必填参数&#xff0c;按下按钮时触发的回调&#xff0c;接收一个方法&#xff0c;传null表示按钮禁用&#xff0c;会显示禁用相关样式 child 子组件 style 通过ButtonStyle装饰 ButtonStylee里面的常用的参数 属性名称 值类型 …

15 个实用的 Linux find 命令示例

除了在目录结构下查找文件的基本操作外&#xff0c;我们还可以使用 find 命令执行一些实际操作&#xff0c;这将使我们的命令行之旅变得轻松。 在本文中&#xff0c;让我们回顾 15 个 Linux find 命令的实际示例&#xff0c;它们对新手和专家都非常有用。 首先&#xff0c;在…

公众号网课查题功能搭建系统使用

公众号网课查题功能搭建系统使用 本平台优点&#xff1a; 多题库查题、独立后台、响应速度快、全网平台可查、功能最全&#xff01; 1.想要给自己的公众号获得查题接口&#xff0c;只需要两步&#xff01; 2.题库&#xff1a; 查题校园题库&#xff1a;查题校园题库后台&…

图学习——02.Graph Neural Network

Graph Neural Network GCN A矩阵表示邻接矩阵&#xff0c;I矩阵表示单位阵&#xff0c;D~矩阵是对A~的矩阵按行求和后把每行的值写在对角线上&#xff08;度矩阵&#xff0c;包括自连接的度&#xff09;&#xff0c;W(l)是要学习的参数&#xff0c;H(l)是这一层的节点的特征 …

2键/双按键触摸触控检测IC ,VK3602KA抗电源电压波动干扰/超高稳定性,适用门禁、台灯,家电等电源供电的应用

型号&#xff1a;VK3602KA 品牌&#xff1a;VINKA/永嘉微电 封装形式&#xff1a;SOP8 年份&#xff1a;新年份 KPP2436 VK3602KA具有2个触摸按键&#xff0c;可用来检测外部触摸按键上人手的触摸动作。该芯片具有较高的集成度&#xff0c;仅需极少的外部组件便可实现触摸按…

Vue基础语法(二)

目录 一、条件判断 1、概念 2、v-if、v-else-if、v-else 3、v-show 4、v-show与v-if的异同 二、v-for 1、遍历数组 2、遍历对象 3、key属性 三、数据更新检测 1、末尾的添加、删除 2、前面的插入、删除 3、splice(index,length,替换内容) 4、sort()排序 5、反转&a…

谷粒商城 spu保存

主要记录一下收获 spu 一类物品 sku 具体什么物品&#xff08;由销售属性组合定义&#xff09; 写保存接口步骤 对比前端拿到的VO和实际的POJO属性有哪些差异将差异属性通过其他方式拿到&#xff0c;其余属性直接BeanUtils.copyProperties保存&#xff0c;保存时还有其他关联…