使用kafka-clients操作数据(java)

news/2024/4/27 12:14:22/文章来源:https://blog.csdn.net/qq_29752857/article/details/131995916

一、添加依赖

     <!--    kafka-clients--><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.5.1</version></dependency>

二、生产者

自定义分区,可忽略

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;import java.util.Map;public class MyPatitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {String msgStr = value.toString();if(msgStr.contains("a")){return 1;}return 0;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}

1、普通消息

 public static void main(String[] args) throws ExecutionException, InterruptedException {//配置Properties properties = new Properties();//连接参数properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.157.130:9092");//序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//关联自定义分区器 可选properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.minos.kafka.producer.MyPatitioner");//优化参数 可选//缓冲器大小 32Mproperties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 30 * 1024 * 1024);//批次大小properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * 1024);//Linger.msproperties.put(ProducerConfig.LINGER_MS_CONFIG, 5);//压缩properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");//acksproperties.put(ProducerConfig.ACKS_CONFIG, "-1");//重试次数properties.put(ProducerConfig.RETRIES_CONFIG, 3);//创建生产者KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);//异步发送数据for (int i = 0; i < 10; i++) {//给first主题发消息kafkaProducer.send(new ProducerRecord<String, String>("first", "hello" + i));//回调异步发送kafkaProducer.send(new ProducerRecord<String, String>("first", "hello2" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e == null) {System.out.println("主题:" + recordMetadata.topic() + "分区:" + recordMetadata.partition());}}});kafkaProducer.send(new ProducerRecord<String, String>("first", "a" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e == null) {System.out.println("主题:" + recordMetadata.topic() + "分区" + recordMetadata.partition() + "a");}}});Thread.sleep(500);}//同步for (int i = 0; i < 10; i++) {//给first主题发消息kafkaProducer.send(new ProducerRecord<String, String>("first", "sync_hello" + i)).get();}//关闭资源kafkaProducer.close();}
root@ubuntu2203:/usr/local/kafka_2.12-3.5.1/bin# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first
a0
hello0
hello20
a1
hello1
hello21
a2
hello2
hello22
a3
hello3
hello23
a4
hello4
hello24
a5
hello5
hello25
a6
hello6
hello26
a7
hello7
hello27
a8
hello8
hello28
a9
hello9
hello29
sync_hello0
sync_hello1
sync_hello2
sync_hello3
sync_hello4
sync_hello5
sync_hello6
sync_hello7
sync_hello8
sync_hello9

2、事务消息

 public static void main(String[] args) throws ExecutionException, InterruptedException {//配置Properties properties = new Properties();//连接参数properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.157.130:9092");//序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//关联自定义分区器 可选properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.minos.kafka.producer.MyPatitioner");//优化参数 可选//缓冲器大小 32Mproperties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 30 * 1024 * 1024);//批次大小properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * 1024);//Linger.msproperties.put(ProducerConfig.LINGER_MS_CONFIG, 5);//压缩properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");//acksproperties.put(ProducerConfig.ACKS_CONFIG, "-1");//重试次数properties.put(ProducerConfig.RETRIES_CONFIG, 3);//指定事务IDproperties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transactional_id_01");properties.put("enable.idempotence", "true");//创建生产者KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);//事务消息 初始化kafkaProducer.initTransactions();//开始事务kafkaProducer.beginTransaction();try {kafkaProducer.send(new ProducerRecord<String, String>("first", "Transactions")).get();//提交事务kafkaProducer.commitTransaction();} catch (Exception e) {//终止事务kafkaProducer.abortTransaction();} finally {//关闭资源kafkaProducer.close();}}
root@ubuntu2203:/usr/local/kafka_2.12-3.5.1/bin# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first
Transactions

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_337339.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[个人笔记] vCenter设置时区和NTP同步

VMware虚拟化 - 运维篇 第三章 vCenter设置时区和NTP同步 VMware虚拟化 - 运维篇系列文章回顾vCenter设置时区和NTP同步&#xff08;附加&#xff09;ESXi设置alias参考链接 系列文章回顾 第一章 vCenter给虚机添加RDM磁盘 第二章 vCenter回收活跃虚拟机的剩余可用空间 vCente…

【算法基础:动态规划】5.4 数位统计DP(计数问题)(数位DP)

文章目录 例题&#xff1a;338. 计数问题解法1——转换成1067. 范围内的数字计数&#xff0c;数位DP模板解法2——分情况讨论&#xff08;TODO&#xff0c;还没理解&#xff09; 相关链接⭐ 例题&#xff1a;338. 计数问题 https://www.acwing.com/problem/content/340/ 解法…

软考A计划-系统集成项目管理工程师-项目人力资源管理-中

点击跳转专栏>Unity3D特效百例点击跳转专栏>案例项目实战源码点击跳转专栏>游戏脚本-辅助自动化点击跳转专栏>Android控件全解手册点击跳转专栏>Scratch编程案例点击跳转>软考全系列 &#x1f449;关于作者 专注于Android/Unity和各种游戏开发技巧&#xff…

运算放大器--------加减运算电路

反向求和运算电路 电路 公式 同向加法运算电路 电路 公式 加减运算电路 分别求正向输入的输出和反相输入的输出&#xff0c;然后求和就可以得到到最终的输出。 切记&#xff0c;虚短虚断不是真正的断路和短路。

M1/M2 通过VM Fusion安装Win11 ARM,解决联网和文件传输

前言 最近新入了Macmini M2&#xff0c;但是以前的老电脑的虚拟机运行不起来了。&#x1f605;&#xff0c;实际上用过K8S的时候&#xff0c;会发现部分镜像也跑不起来&#xff0c;X86的架构和ARM实际上还是有很多隐形兼容问题。所以只能重新安装ARM Win11&#xff0c;幸好微软…

MySQL的JSON操作

官网地址 1. MySQL json介绍 As of MySQL 5.7.8, MySQL supports a native JSON data type defined by RFC 7159 that enables efficient access to data in JSON (JavaScript Object Notation) documents. Automatic validation of JSON documents stored in JSON columns. …

iOS - 检测项目中无用类和无用图片

一、无引用图片检测 LSUnusedResources 安装插件 LSUnusedResources &#xff0c;用【My Mac】模拟器运行,如下图&#xff1a; Project Path 就是项目所在的路径&#xff0c;然后点击右下角 Search按钮&#xff0c;就可以看到被搜索出来的图片资源。 注意&#xff1a;这里被搜…

绕过TLS/akamai指纹护盾

文章目录 前言TLS指纹什么是TLS指纹测试TLS指纹绕过TLS指纹使用原生urllib使用其他成熟库&#xff01;&#xff01;修改requests底层代码 Akamai指纹相关&#xff08;HTTP/2指纹&#xff09;什么是Akamai指纹测试Akamai指纹绕过Akamai指纹使用其他成熟库 实操参考 前言 有道是…

【计算机网络】11、网桥(bridge)、集线器(hub)、交换机(switch)、路由器(router)、网关(gateway)

文章目录 一、网桥&#xff08;bridge)二、集线器&#xff08;hub&#xff09;三、交换机&#xff08;switch)四、路由器&#xff08;router&#xff09;五、网关&#xff08;gateway&#xff09; 对于hub&#xff0c;一个包过来后&#xff0c;直接将包转发到其他口。 对于桥&…

基于RK3588+FPGA+AI算法定制的智慧交通与智能安防解决方案

随着物联网、大数据、人工智能等技术的快速发展&#xff0c;边缘计算已成为当前信息技术领域的一个热门话题。在物联网领域&#xff0c;边缘计算被广泛应用于智慧交通、智能安防、工业等多个领域。因此&#xff0c;基于边缘计算技术的工业主板设计方案也受到越来越多人的关注。…

【1.1】Java微服务:初识微服务

✅作者简介&#xff1a;大家好&#xff0c;我是 Meteors., 向往着更加简洁高效的代码写法与编程方式&#xff0c;持续分享Java技术内容。 &#x1f34e;个人主页&#xff1a;Meteors.的博客 &#x1f49e;当前专栏&#xff1a; 微服务 ✨特色专栏&#xff1a; 知识分享 &#x…

KWP2000协议和OBD-K线

KWP2000最初是基于K线的诊断协议&#xff0c; 但是由于后来无法满足越来越复杂的需求&#xff0c;以及自身的局限性&#xff0c;厂商又将这套应用层协议移植到CAN上面&#xff0c;所以有KWP2000-K和KWP2000-CAN两个版本。 这篇文章主要讲基于K线的早期版本协议&#xff0c;认…

NICE-SLAM: Neural Implicit Scalable Encoding for SLAM论文阅读

论文信息 标题&#xff1a;NICE-SLAM: Neural Implicit Scalable Encoding for SLAM 作者&#xff1a;Zihan Zhu&#xff0c; Songyou Peng&#xff0c;Viktor Larsson — Zhejiang University 来源&#xff1a;CVPR 代码&#xff1a;https://pengsongyou.github.io/nice-slam…

Spring-mybatis结合的底层原理

1.项目前期准备 1.1 导入maven jar包 <dependencies><!-- spring依赖 --><dependency><groupId>org.springframework</groupId><artifactId>spring-context</artifactId><version>5.2.5.RELEASE</version></depende…

Java中对Redis的常用操作

目录 数据类型五种常用数据类型介绍各种数据类型特点 常用命令字符串操作命令哈希操作命令列表操作命令集合操作命令有序集合操作命令通用命令 在Java中操作RedisRedis的Java客户端Spring Data Redis使用方式介绍环境搭建配置Redis数据源编写配置类&#xff0c;创建RedisTempla…

QT--day5(网络聊天室、学生信息管理系统)

服务器&#xff1a; #include "widget.h" #include "ui_widget.h"Widget::Widget(QWidget *parent): QWidget(parent), ui(new Ui::Widget) {ui->setupUi(this);//给服务器指针实例化空间servernew QTcpServer(this); }Widget::~Widget() {delete ui; …

texshop mac中文版-TeXShop for Mac(Latex编辑预览工具)

texshop for mac是一款可以在苹果电脑MAC OS平台上使用的非常不错的Mac应用软件&#xff0c;texshop for mac是一个非常有用的工具&#xff0c;广泛使用在数学&#xff0c;计算机科学&#xff0c;物理学&#xff0c;经济学等领域的合作&#xff0c;这些程序的标准tetex分布特产…

【雕爷学编程】MicroPython动手做(15)——掌控板之AB按键

知识点&#xff1a;什么是掌控板&#xff1f; 掌控板是一块普及STEAM创客教育、人工智能教育、机器人编程教育的开源智能硬件。它集成ESP-32高性能双核芯片&#xff0c;支持WiFi和蓝牙双模通信&#xff0c;可作为物联网节点&#xff0c;实现物联网应用。同时掌控板上集成了OLED…

MacOS Monterey VM Install ESXi to 7 U2

一、MacOS Monterey ISO 准备 1.1 下载macOS Monterey 下载&#x1f517;链接 一定是 ISO 格式的&#xff0c;其他格式不适用&#xff1a; https://www.mediafire.com/file/4fcx0aeoehmbnmp/macOSMontereybyTechrechard.com.iso/file 1.2 将 Monterey ISO 文件上传到数据…

jenkins 配置git

在linux 中输入 保证git 安装成功 git --version使用查看git 安装目录&#xff08;非源码安装直接用yum 安装的&#xff09; which gitjenkins 中到 系统管理–>全局工具配置–> Git installations 新建一个项目 选择自由风格 源码管理选择 git 如果使用的是码云&a…