消息队列--Kafka

news/2024/4/26 18:37:22/文章来源:https://blog.csdn.net/qq_47800859/article/details/129195486
  1. Kafka简介
  2. 集群部署
  3. 配置Kafka
  4. 测试Kafka

1.Kafka简介

数据缓冲队列。同时提高了可扩展性。具有峰值处理能力,使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。

Kafka是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。

特性:

  • 高吞吐量:kafka每秒可以处理几十万条消息。

  • 可扩展性:kafka集群支持热扩展- 持久性、

  • 可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失

  • 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)

  • 高并发:支持数千个客户端同时读写

它主要包括以下组件:

话题(Topic):是特定类型的消息流。(每条发布到 kafka 集群的消息属于的类别,即 kafka 是面向 topic 的。)
生产者(Producer):是能够发布消息到话题的任何对象(发布消息到 kafka 集群的终端或服务).
消费者(Consumer):可以订阅一个或多个话题,从而消费这些已发布的消息。
服务代理(Broker):已发布的消息保存在一组服务器中,它们被称为代理(Broker)或Kafka集群。

partition(区):每个 topic 包含一个或多个 partition。
replication:partition 的副本,保障 partition 的高可用。
leader:replica 中的一个角色, producer 和 consumer 只跟 leader 交互。
follower:replica 中的一个角色,从 leader 中复制数据。
zookeeper:kafka 通过 zookeeper 来存储集群的信息。

Zookeeper:

ZooKeeper是一个分布式协调服务,它的主要作用是为分布式系统提供一致性服务,提供的功能包括:配置维护、分布式同步等。Kafka的运行依赖ZooKeeper。  也是java微服务里面使用的一个注册中心服务

ZooKeeper主要用来协调Kafka的各个broker,不仅可以实现broker的负载均衡,而且当增加了broker或者某个broker故障了,ZooKeeper将会通知生产者和消费者,这样可以保证整个系统正常运转。

在Kafka中,一个topic会被分成多个区并被分到多个broker上,分区的信息以及broker的分布情况与消费者当前消费的状态信息都会保存在ZooKeeper中。

2.集群部署

        2.1环境

系统:Centos-Stream7

节点:

192.168.26.166   es01 ​

192.168.26.170   es02 ​

192.168.26.171   es03

软件版本:kafka_2.12-3.0.2.tgz

        2.2  安装配置jdk8

#yum install -y java-1.8.0-openjdk

        2.3  安装配置zookeeper

在配置中要注意每个配置项后面不要有空格否则会导致zookeeper启动不起来!!!!

Kafka运行依赖ZK,Kafka官网提供的tar包中,已经包含了ZK,这里不再额外下载ZK程序。

配置相互解析---三台机器(在es集群上安装的kafka):

# vim /etc/hosts

192.168.26.166   es01 ​

192.168.26.170   es02 ​

192.168.26.171   es03

安装Kafka:

# wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/3.0.2/kafka_2.12-3.0.2.tgz

# tar xzvf kafka_2.12-2.8.0.tgz -C /usr/local/

# mv /usr/local/kafka_2.12-2.8.0/ /usr/local/kafka/

配置zookeeper:

在es01节点中:

# sed -i 's/^[^#]/#&/' /usr/local/kafka/config/zookeeper.properties

# vim /usr/local/kafka/config/zookeeper.properties  #添加如下配置
dataDir=/opt/data/zookeeper/data  # 需要创建,所有节点一致
dataLogDir=/opt/data/zookeeper/logs # 需要创建,所有节点一致
clientPort=2181 
tickTime=2000 
initLimit=20 
syncLimit=10 

# 以下 IP 信息根据自己服务器的 IP 进行修改
server.1=192.168.19.20:2888:3888  //kafka集群IP:Port
server.2=192.168.19.21:2888:3888
server.3=192.168.19.22:2888:3888


#创建data、log目录

# mkdir -p /opt/data/zookeeper/{data,logs}

#创建myid文件

# echo 1 > /opt/data/zookeeper/data/myid     #myid号按顺序排

在es02节点中:

# sed -i 's/^[^#]/#&/' /usr/local/kafka/config/zookeeper.properties

# vim /usr/local/kafka/config/zookeeper.properties
dataDir=/opt/data/zookeeper/data 
dataLogDir=/opt/data/zookeeper/logs
clientPort=2181 
tickTime=2000 
initLimit=20 
syncLimit=10 
server.1=192.168.19.20:2888:3888
server.2=192.168.19.21:2888:3888
server.3=192.168.19.22:2888:3888


#创建data、log目录

# mkdir -p /opt/data/zookeeper/{data,logs}

#创建myid文件

# echo 2 > /opt/data/zookeeper/data/myid

在es03节点中:

# sed -i 's/^[^#]/#&/' /usr/local/kafka/config/zookeeper.properties

# vim /usr/local/kafka/config/zookeeper.properties
dataDir=/opt/data/zookeeper/data 
dataLogDir=/opt/data/zookeeper/logs
clientPort=2181 
tickTime=2000 
initLimit=20 
syncLimit=10 
server.1=192.168.19.20:2888:3888
server.2=192.168.19.21:2888:3888
server.3=192.168.19.22:2888:3888


#创建data、log目录

# mkdir -p /opt/data/zookeeper/{data,logs}

#创建myid文件

# echo 3 > /opt/data/zookeeper/data/myid

配置项含义:

dataDir     ZK数据存放目录。
dataLogDir  ZK日志存放目录。
clientPort  客户端连接ZK服务的端口。
tickTime    ZK服务器之间或客户端与服务器之间维持心跳的时间间隔。
initLimit   允许follower连接并同步到Leader的初始化连接时间,当初始化连接时间超过该值,则表示连接失败。
syncLimit   Leader与Follower之间发送消息时如果follower在设置时间内不能与leader通信,那么此follower将会被丢弃。
server.1=192.168.19.20:2888:3888    2888是follower与leader交换信息的端口,3888是当leader挂了时用来执行选举时服务器相互通信的端口。

3.配置Kafka

        3.1  配置

# sed -i 's/^[^#]/#&/' /usr/local/kafka/config/server.properties

# vim /usr/local/kafka/config/server.properties  #在最后添加

broker.id=1  #改 
listeners=PLAINTEXT://192.168.19.20:9092   #改
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/data/kafka/logs  
num.partitions=6
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=536870912
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.19.20:2181,192.168.19.21:2181,192.168.19.22:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0


[root@es01 ~]# mkdir -p /opt/data/kafka/logs

        3.2  其他节点配置

只需把配置好的安装包直接分发到其他节点,修改 Kafka的broker.id和 listeners就可以了。

        3.3  配置项含义

broker.id 
    每一个broker在集群中的唯一标识,要求是正数。在改变IP地址,不改变broker.id的时不会影响consumers
listeners=PLAINTEXT://192.168.19.22:9092       
    监听地址
num.network.threads  
    broker 处理消息的最大线程数,一般情况下不需要去修改
num.io.threads
    broker处理磁盘IO 的线程数 ,数值应该大于你的硬盘数
socket.send.buffer.bytes          
    socket的发送缓冲区
socket.receive.buffer.bytes        
    socket的接收缓冲区
socket.request.max.bytes
    socket请求的最大数值,防止serverOOM,message.max.bytes必然要小于socket.request.max.bytes,会被topic创建时的指定参数覆盖
log.dirs        日志文件目录
num.partitions
num.recovery.threads.per.data.dir   每个数据目录(数据目录即指的是上述log.dirs配置的目录路径)用于日志恢复启动和关闭时的线程数量。
offsets.topic.replication.factor

transaction state log replication factor  事务主题的复制因子(设置更高以确保可用性)。 内部主题创建将失败,直到群集大小满足此复制因素要求

log.cleanup.policy = delete
    日志清理策略 选择有:delete和compact 主要针对过期数据的处理,或是日志文件达到限制的额度,会被 topic创建时的指定参数覆盖
log.cleanup.interval.mins=1
    指定日志每隔多久检查看是否可以被删除,默认1分钟    
log.retention.hours
    数据存储的最大时间 超过这个时间 会根据log.cleanup.policy设置的策略处理数据,也就是消费端能够多久去消费数据。log.retention.bytes和log.retention.minutes或者log.retention.hours任意一个达到要求,都会执行删除,会被topic创建时的指定参数覆盖

log.segment.bytes
    topic的分区是以一堆segment文件存储的,这个控制每个segment的大小,会被topic创建时的指定参数覆盖
log.retention.check.interval.ms 
    文件大小检查的周期时间,是否触发 log.cleanup.policy中设置的策略
zookeeper.connect   
    ZK主机地址,如果zookeeper是集群则以逗号隔开。
zookeeper.connection.timeout.ms     
    连接到Zookeeper的超时时间。

4.测试Kafka

        4.1  启动zookeeper集群

在三个节点依次执行:

# cd /usr/local/kafka

# nohup bin/zookeeper-server-start.sh config/zookeeper.properties &

查看端口:

# netstat -lntp | grep 2181

        4.2  启动Kafka

在三个节点依次执行:

# cd /usr/local/kafka

# nohup bin/kafka-server-start.sh config/server.properties &

        4.3  测验

验证  在192.168.26.166上创建topic:

# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testtopic

参数解释:

–zookeeper指定zookeeper的地址和端口,
–partitions指定partition的数量,
–replication-factor指定数据副本的数量

在26.170上面查询192.168.26.166上的topic:

[root@es03 kafka]# bin/kafka-topics.sh --zookeeper 192.168.26.166:2181 --list
testtopic

        (二)模拟消息生产和消费

发送消息到192.168.26.166:

[root@es01 kafka]# bin/kafka-console-producer.sh --broker-list 192.168.19.20:9092 --topic testtopic

>hello
>你好呀
>

从192.168.26.171接受消息:

[root@es02 kafka]# bin/kafka-console-consumer.sh --bootstrap-server  192.168.19.21:9092 --topic testtopic --from-beginning

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_73803.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JAVA 8 新特性 Lamdba表达式

Java8 新特性: 1、Lamdba表达式 2、函数式接口 3、方法引用和构造引用 4、Stream API 5、接口中的默认方法和静态方法 6、新时间日期API 7、Optional 8、其他特性 Java8 优势:速度快、代码更少(增加了新的语法 Lambda 表达式)、强…

Android 架构 MVC MVP MVVM,这一波你应该了然于心

MVC,MVP和MVVM是软件比较常用的三种软件架构,这三种架构的目的都是分离,避免将过多的逻辑全部堆积在一个类中。在Android中,Activity中既有UI的相关处理逻辑,又有数据获取逻辑,从而导致Activity逻辑复杂不单…

Wireshark抓包

Wireshark 1 抓包时间显示格式 2 界面显示列设置 3 protocol协议解析 4 过滤器 tcp.port:TCP端口tcp.dstport:TCP目的端口tcp.srcport:TCP源端口udp.port:UDP端口udp.dstport:UDP目的端口udp.srcport:UDP…

月薪过3W的软件测试工程师,都是怎么做到的?

对任何职业而言,薪资始终都会是众多追求的重要部分。前几年的软件测试行业还是一个风口,随着不断地转行人员以及毕业的大学生疯狂地涌入软件测试行业,目前软件测试行业“缺口”已经基本饱和。当然,我说的是最基础的功能测试的岗位…

良许也成为砖家啦~

大家好,我是良许。 没错,良许成为砖家啦,绝不是口嗨,有图有真相! 有人会说,咦,这明明是严宇啊,跟你良许有啥关系? 额。。老读者应该知道良许的来历—— 鄙人真名严宇&a…

Python-datetime、time包常用功能汇总

目录基础知识时间格式有哪些?Python中的时间格式化时间戳datetimedatedatetimetimedeltatime常用获取今天凌晨字符串?将一个时间格式的字符串转为时间戳将一个时间戳转为指定格式的字符串全部代码参考基础知识 时间格式有哪些? 「格林威治标…

最新OpenMVG编译安装与逐命令运行增量式和全局式SfM教程

openmvg是一个轻便的可以逐步运行的SfM开源库,它同时实现了增量式和全局式两种算法。 说明文档地址:https://openmvg.readthedocs.io/en/latest/ github主页地址:https://github.com/openMVG/openMVG 1 编译安装 openmvg的安装比较简单&…

Windows 11 22H2 中文版、英文版 (x64、ARM64) 下载 (updated Feb 2023)

Windows 11, version 22H2,2023 年 2 月 更新 请访问原文链接:https://sysin.org/blog/windows-11/,查看最新版。原创作品,转载请保留出处。 作者主页:www.sysin.org 全新推出 Windows 11 全新 Windows 体验&#x…

论坛项目小程序和h5登录

项目中安装uview出现npm安装uview 直接报错:创建一个package.json配置文件在进行安装。cmd到项目。初始化一个package.json文件(vue项目的配置文件) npm init --yes 安装uview项目点击关注进入管页面,需要验证用户是否登录查用户是…

Linux学习(8)Linux文件与目录管理

以下内容转载自鸟哥的Linux私房菜 绝对路径与相对路径 绝对路径:路径的写法『一定由根目录 / 写起』,例如: /usr/share/doc 这个目录。相对路径:路径的写法『不是由 / 写起』,例如由 /usr/share/doc 要到 /usr/share…

Java实现在线沟通功能

文章目录1、介绍 和 特点2、整合SpringBoot2.1、导入依赖2.2、websocket 配置类2.3、消息处理类2.4、启动服务2.5、前端代码:张三2.6、前端代码:李四3、效果4、小结1、介绍 和 特点 t-io是基于JVM的网络编程框架,和netty属同类,所…

【LeetCode】剑指 Offer 14- I. 剪绳子 p96 -- Java Version

题目链接:https://leetcode.cn/problems/jian-sheng-zi-lcof/ 1. 题目介绍(14- I. 剪绳子) 给你一根长度为 n 的绳子,请把绳子剪成整数长度的 m 段(m、n都是整数,n>1并且m>1)&#xff0c…

计算机网络你都懂了吗

文章目录一、计算机网络的定义简单定义通用定义二、计算机网络通信过程三、什么是网络协议(Protocol)四、网络协议组成及功能一、计算机网络的定义 简单定义 计算机网络是一些相互连接的、自治的计算机系统的集合。 通用定义 将处于不同位置并具有独…

MySQL简介、M有SQL的存储引擎、表、字段和数据

Java知识点总结:想看的可以从这里进入 目录2、MySQL特性介绍2.1、MySQL简介2.2、存储引擎2.3、表、字段、数据2、MySQL特性介绍 2.1、MySQL简介 MySQL 是一个关系型数据库管理系统(RDBMS),于2009年被 Oracle 公司收购。它是一种关…

Hive---排序

Hive语法之排序 文章目录Hive语法之排序全局排序(Order By)升序降序按照别名排序多个列排序每个 Reduce 内部排序(Sort By)设置 reduce 个数查看设置 reduce 个数分区排序(Distribute By)设置 reduce 个数簇…

仅花半年时间,他从外包月薪5K到阿里月薪15K,究竟经历了什么?

背景介绍:“渣渣”二本,95年Java程序员**外包类型:**传统外包公司**内容简介:**朋友从一个传统公司是如何修仙到阿里巴巴?分享一些他的真实经历,希望对你有帮助。**学习路线:**基础(…

为什么HR眼中,Python是真正的简历加分项?

教育部在发布的关于《2023届高校毕业生预计1158万 校园招聘月启动》文中明确指出:“2023届高校毕业生预计1158万,同比增加82万人”。除开考研、考公的少数同学,几百万大军拼命往大企业投简历,求职竞争十分激烈。 来源&#xff1a…

优化长尾关键词有什么好处?在线长尾关键词挖掘

​想知道为什么要使用长尾关键词? 好吧,它们可以帮助你轻松找到合适的受众。 ​ 1.获得更高的转化率 长尾关键词对于搜索特定信息更有用。使用长尾关键词时通常会获得更高的转化率,因为内容与受众的需求更相关。 举个例子:你正…

数影周报:动视暴雪疑似数据泄露,数据出境安全评估申报最新进展

本周看点:动视暴雪疑似员工敏感信息及游戏数据泄露;谷歌云计算部门:两名员工合用一个工位;数据出境安全评估申报最新进展;TikTok Shop东南亚商城在泰国和菲律宾公布;智己汽车获九大金融机构50亿元贷款签约.…

Redis:实现全局唯一ID

Redis:实现全局唯一ID一. 概述二. 实现(1)获取初始时间戳(2)生成全局ID三. 测试为什么可以实现全局唯一?其他唯一ID策略补充:countDownLatch一. 概述 全局ID生成器:是一种在【分布式…