kafka如何保证数据不丢失?

news/2024/4/26 15:11:28/文章来源:https://blog.csdn.net/sis12e/article/details/130288868

1.生产者

生产者生产数据有两种模式:一种是同步模式,一种是异步模式。

同步模式:生产者生产一条数据,就保存一条数据,保存成功后,再生产下一条数据,能够保证数据不丢失,但是效率太低了。

异步模式(采用ack机制):

 Properties properties = new Properties();properties.put("bootstrap.servers", "kafka01:9092,kafka02:9092");properties.put("acks", "all");properties.put("retries", 0);properties.put("batch.size", 16384);properties.put("linger.ms", 1);properties.put("buffer.memory", 33554432);properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);for (int i = 1; i <= 600; i++) {kafkaProducer.send(new ProducerRecord<String, String>("z_test_20190430", "testkafka0613"+i));System.out.println("testkafka"+i);}kafkaProducer.close();

在producer端开启一块buff缓冲,用来缓存数据,缓存一批数据,保存到partition当中.

0:生产者生产数据,不管leader是否保存成功,follower是否同步成功,继续发送下一批数据

1:生产者生产数据,只保证leader保存成功,不管follower是否同步成功,继续发送下一批数据.

-1或者all:生产者生产数据,既要保证leader保存成功,也要保证follower同步成功,继续发送下一批数据.

2.broker端

在Broker端,可以给Topic配置更大的备份因子replication-factors。配置了备份因子后,Kafka会给每个Partition分配多个备份Partition。这些Partiton会尽量平均的分配到多个Broker上。并且,在这些Partiton中,会选举产生Leader Partition和Follower Partition。这样,当Leader Partition发生故障时,其他Follower Partition上还有消息的备份。就可以重新选举产生Leader Partition,继续提供服务。

这样整个集群内的消息不会丢失。

3.消费者

自动提交偏移量改成手动提交偏移量

设置 enable.auto.commit = false , 默认值true,自动提交

手动提交offset

使用kafka的Consumer的类,用方法consumer.commitSync()提交

或者使用spring-kafka的 Acknowledgment类,用方法ack.acknowledge()提交(推荐使用)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_103216.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java编程设计语言-集合类

API(application programming interface)是JDK的重要组成部分&#xff0c;API提供了Java程序与运行它的系统软件&#xff08;Java虚拟机&#xff09;之间的接口&#xff0c;可以帮助开发者方便、快捷地开发Java程序 集合在程序设计中是一种重要的是数据结构&#xff0c;Java中提…

Semantic Kernel 知多少 | 开启面向 AI 编程新篇章

在 ChatGPT 火热的当下, 即使没有上手亲自体验&#xff0c;想必也对 ChatGPT 的强大略有耳闻。当一些人在对 ChatGPT 犹犹豫豫之时&#xff0c;一些敏锐的企业主和开发者们已经急不可耐地开展基于 ChatGPT 模型 AI 应用的落地探索。 因此&#xff0c;可以明确预见的是&#xf…

Java+Angular开发的医院信息管理系统源码,系统部署于云端,支持多租户

云HIS系统源码&#xff0c;采用云端SaaS服务的方式提供 基于云计算技术的B/S架构的云HIS系统源码&#xff0c;采用云端SaaS服务的方式提供&#xff0c;使用用户通过浏览器即能访问&#xff0c;无需关注系统的部署、维护、升级等问题&#xff0c;系统充分考虑了模板化、配置化、…

系统分析师之软件工程(十二)

目录 一、 软件开发生命周期 1.1 开发阶段工作细分 二、软件开发模型 2.1 瀑布模型 2.2 原型模型 2.3 增量模型与螺旋模型 2.4 V模型 2.5 喷泉模型 2.6 快速应用开发模型RAD 2.7 构件主装模型 2.8 统一过程 2.9 敏捷方法 三、逆向工程 四、净室软件工程 一、 软件…

斯坦福| ChatGPT用于生成式搜索引擎的可行性

文&#xff5c;智商掉了一地 随着 ChatGPT 在文本生成领域迈出了重要一步&#xff0c;Bing 浏览器也接入了聊天机器人功能&#xff0c;因此如何保证 Bing Chat 等搜索引擎结果的精确率和真实性也成为了搜索领域的热门话题之一。 当我们使用搜索引擎时&#xff0c;往往希望搜索结…

电子阅读器市场角力,AI成为关键变量

配图来自Canva可画 近年来&#xff0c;随着国家“书香型社会”建设政策的出台&#xff0c;公众的阅读需求正在逐年增加&#xff0c;各类读书产品和读书活动&#xff0c;也如同雨后春笋般涌现&#xff0c;人们的阅读体验日益得到丰富。比如&#xff0c;昨天世界读书日举行的“不…

更简单的存取Bean方式-@Bean方法注解

1.Bean方法存储 类注解是添加在某个类上的,那么方法注解是添加在某个方法前的 public class UserBeans {Beanpublic User user1(){User user new User();user.setUid(001);user.setUname("zhangsan");user.setAge(19);user.setPassword("123123");retur…

【分布式搜索引擎ES01】

分布式搜索引擎ES 分布式搜索引擎ES1.elasticsearch概念1.1.ES起源1.2.倒排索引1.2.1.正向索引1.2.2.倒排索引 1.3.es的一些概念1.3.1.文档和字段1.3.2.索引和映射1.3.3.mysql与elasticsearch 1.4.1安装es、kibana、IK分词器1.4.2扩展词词典与停用词词典 2.索引库操作2.1.mappi…

Springcloud连接nacos集群,nacos地址配置为nginx,报错:requst nacos server failed

先说下版本&#xff1a; Spring cloud&#xff1a; Hoxton.SR12 spring.cloud.alibaba&#xff1a; 2.2.9.RELEASE spring.boot&#xff1a; 2.3.12.RELEASE Linux Centos7 nacos-server&#xff1a;2.1.0 nginx&#xff1a; 1.20.2 环境说明&#xff1a; nacos正常搭建三个集…

supervisor安装

说明 Supervisor翻译过来是监管人&#xff0c;在Linux中Supervisor是一个进程管理工具&#xff0c;当进程中断的时候Supervisor能自动重新启动它。可以运行在各种类Linux/unix的机器上&#xff0c;supervisor就是用Python开发的一套通用的进程管理程序&#xff0c;能将一个普通…

【虚幻引擎】UE4/UE5科大讯飞文字合成语音

一、链接地址 链接&#xff1a;https://pan.baidu.com/s/15Qoc48x3DLpw4eW1qHXInQ 提取码&#xff1a;jqpx B站视频链接&#xff1a;https://space.bilibili.com/449549424?spm_id_from333.1007.0.0 二、案例介绍 第一步&#xff1a;首先进入讯飞开放平台注册一个账号&…

ThreadPoolExecutor源码阅读流程图

1.创建线程池 public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), def…

Automa函数学习(三)

从变量中获取数据 当我们想要用automa获取文本标签获取到网页的文本内容后,想要将获取到的文本内容当做参数往后面的标签里进行传递时就需要用到automa提供的传参格式 {{ variables.自定义参数名}} 举例: 先建立打开百度首页工作流 前面自定义的变量名为text,所以这里参数拼接…

开放式耳机有什么好处,盘点几款性能不错的开放式耳机

随着人们对生活质量要求的提高&#xff0c;大家在运动的时候都喜欢戴上耳机&#xff0c;享受运动的乐趣。但是传统耳机戴久了之后就会出现耳朵酸痛的情况&#xff0c;这是因为传统耳机佩戴方式是通过空气振动来传递声音&#xff0c;而人在运动时就会伴随着大量的汗水&#xff0…

深入学习RabbitMQ五种模式(一)

1.安装erlang 下载otp_win64_25.3.exe https://www.erlang.org/downloads erlang安装完成&#xff0c;需要配置erlang环境变量 ERLANG_HOMEE:\software\Erlang OTPPATH%PATH%;%ERLANG_HOME%\bin; 2.安装RabbitMQ 下载rabbitmq-server-3.11.13.exe https://www.rabbitmq.com/dow…

【Python 协程详解】

0.前言 前面讲了线程和进程&#xff0c;其实python还有一个特殊的线程就是协程。 协程不是计算机提供的&#xff0c;计算机只提供&#xff1a;进程、线程。协程是人工创造的一种用户态切换的微进程&#xff0c;使用一个线程去来回切换多个进程。 为什么需要协程&#xff1f; …

IntelliJ IDEA 接入ChatGPT (免费,无需注册)生产力被干爆了!

IntelliJ IDEA 接入ChatGPT 前言 : 今天给大家介绍一款好用的 IntelliJ IDEA ChatGPT 插件 可以帮助我们写代码&#xff0c;以及语言上的处理工作&#xff0c;以及解释代码。让我们的生产力大大提高&#xff01; 一. ChatGPT-Plus 功能介绍 支持最新idea版本AI询问功能,写好…

Adobe Photoshop 软件下载

Adobe Photoshop&#xff0c;简称“PS”&#xff0c;是由Adobe Systems开发和发行的图像处理软件。Photoshop主要处理以像素所构成的数字图像。 时至今日&#xff0c;Adobe Photoshop 已经成为当今世界上最流行、应用最广泛的图像处理软件。不但设计专业的学生要系统的学习这个…

智能建筑中电力监控系统的应用与产品选型

摘要&#xff1a;近几十年&#xff0c;中国现代化经济不断发展&#xff0c;计算机技术、信息技术等相关产业也取得了飞跃性的进步。随着商业、生活以及公共建筑不断提高智能管理和节能的要求&#xff0c;电力监控系统开始逐渐渗入人们的日常生活&#xff0c;发挥着不可替代的作…

算法刷题|0-1背包问题、416.分割等和子集

0-1背包问题 什么是0-1背包&#xff1f; 有i个物品和一个容量为j的背包&#xff0c;每个物品有重量和价值两个属性&#xff1b;求容量为j的背包能装的物品的最大价值是多少。每个物品智能使用一次。 二维dp数组 dp[i][j]的含义&#xff1a;表示从前i个物品中&#xff0c;当前…