kafka使用入门案例与踩坑记录

news/2024/3/29 18:41:35/文章来源:https://blog.csdn.net/weixin_44107140/article/details/129228849

每次用到kafka时都会出现各种奇怪的问题,综合实践,下面汇总下主要操作步骤:

Docker镜像形式启动

zookeeper启动

docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper

kafka启动

docker run --name kafka01 -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=150.158.16.123:12348 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://150.158.16.123:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -d  wurstmeister/kafka

进入kafka容器

docker exec -it [容器id] /bin/bash

创建topic

进入容器,在/opt/kafka_2.13-2.8.1/bin 目录下创建topic

./kafka-topics.sh --create --zookeeper 150.158.16.123:12348 --replication-factor 1 --partitions 1 --topic mykafka
./kafka-topics.sh --create --zookeeper 150.158.16.123:2181 --replication-factor 1 --partitions 1 --topic mykafka

运行生产者

image-20220923112024973

运行消费者

image-20220923112035890


单机形式启动

前提

1、Linux 机器

2、环境已准备好JDK,如果还没有装,推荐用yum一键安装

yum  install  -y  java-1.8.0-openjdk.x86_64

检验:

[root@localhost ~]# java -version
openjdk version "1.8.0_362"
OpenJDK Runtime Environment (build 1.8.0_362-b08)
OpenJDK 64-Bit Server VM (build 25.362-b08, mixed mode)

3、将kafka压缩包上传到你的Linux

配置文件关注config目录下的zookeeper.propertiesserver.properties,启动服务时要指定

配置-启动

有默认配置,可不做修改(有需要可以自定义启动端口和数据存放位置等参数)

1、先启动自带的 Zookeeper:

[root@localhost bin]# ./zookeeper-server-start.sh ../config/zookeeper.properties 
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
[2023-02-26 14:14:52,759] INFO Reading configuration from: ../config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2023-02-26 14:14:52,766] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager)
[2023-02-26 14:14:52,767] INFO autopurge.purgeInterval set to 0 (org.apache.zookeeper.server.DatadirCleanupManager)
[2023-02-26 14:14:52,767] INFO Purge task is not scheduled. (org.apache.zookeeper.server.DatadirCleanupManager)
[2023-02-26 14:14:52,767] WARN Either no config or no quorum defined in config, running  in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain)
[2023-02-26 14:14:52,783] INFO Reading configuration from: ../config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2023-02-26 14:14:52,784] INFO Starting server (org.apache.zookeeper.server.ZooKeeperServerMain)
[2023-02-26 14:14:52,796] INFO Server environment:zookeeper.version=3.4.14-4c25d480e66aadd371de8bd2fd8da255ac140bcf, built on 03/06/2019 16:18 GMT (org.apache.zookeeper.server.ZooKeeperServer)
[2023-02-26 14:14:52,796] INFO Server environment:host.name=localhost (org.apache.zookeeper.server.ZooKeeperServer)
[2023-02-26 14:14:52,796] INFO Server environment:java.version=1.8.0_362 (org.apache.zookeeper.server.ZooKeeperServer)
[2023-02-26 14:14:52,796] INFO Server environment:java.vendor=Red Hat, Inc. (org.apache.zookeeper.server.ZooKeeperServer)
[2023-02-26 14:14:52,796] INFO Server environment:java.home=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.362.b08-1.el7_9.x86_64/jre (org.apache.zookeeper.server.ZooKeeperServer)
(省略大部分)

2、启动 Kafka

[root@localhost kafka_2.12-2.3.0]# bin/kafka-server-start.sh config/server.properties 
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
[2023-02-26 14:16:00,261] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2023-02-26 14:16:01,004] INFO Registered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler)
[2023-02-26 14:16:01,024] INFO starting (kafka.server.KafkaServer)
[2023-02-26 14:16:01,025] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
[2023-02-26 14:16:01,068] INFO [ZooKeeperClient Kafka server] Initializing a new session to localhost:2181. (kafka.zookeeper.ZooKeeperClient)
[2023-02-26 14:16:01,072] INFO Client environment:zookeeper.version=3.4.14-4c25d480e66aadd371de8bd2fd8da255ac140bcf, built on 03/06/2019 16:18 GMT (org.apache.zookeeper.ZooKeeper)
[2023-02-26 14:16:01,072] INFO Client environment:host.name=localhost (org.apache.zookeeper.ZooKeeper)
[2023-02-26 14:16:01,072] INFO Client environment:java.version=1.8.0_362 (org.apache.zookeeper.ZooKeeper)
[2023-02-26 14:16:01,072] INFO Client environment:java.vendor=Red Hat, Inc. (org.apache.zookeeper.ZooKeeper)
[2023-02-26 14:16:01,072] INFO Client environment:java.home=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.362.b08-1.el7_9.x86_64/jre (org.apache.zookeeper.ZooKeeper)
(省略大部分)

上述步骤只要启动过程没有报错信息,一般是没有问题的

测试

1、创建个topic

[root@localhost bin]# ./kafka-topics.sh --create --zookeeper 192.168.154.134:2181 --replication-factor 1 --partitions 1 --topic test
Created topic test.

2、查看topic列表

[root@localhost bin]# ./kafka-topics.sh --zookeeper 192.168.154.134:2181 --list
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
test

3、启动生产者

[root@localhost bin]# ./kafka-console-producer.sh --broker-list 192.168.154.134:9092 --topic test
>hi
>什么意思啊

4、启动消费者

[root@localhost bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.154.134:9092 --topic test
hi
什么意思啊

正常启动,OK!

可视化:kafka-manager

镜像下载

docker pull sheepkiller/kafka-manager

运行容器

docker run -d --name kafka-manager -p 12349:9000 --link zookeeper --link kafka01 --env ZK_HOSTS=zookeeper:2181 sheepkiller/kafka-manager  

然后访问对应的IP:端口即可进入管理页面

注意:ZK_HOSTS 后面在web页面上要用到!

管理界面

进入主页面后,点击 Add Cluster 添加集群信息

image-20220818233716973

然后填写配置信息,主要填写集群名称,Zookeeper的Hosts,还有指定kafka版本(选个跟你所使用的kafka版本号最接近的就行),其他的一些配置按默认的就行。

当你正确连接上以后,就能看到你的集群啦,如:

image-20220818235108897

image-20220818235134261

更多关于kafka可视化操作就由你慢慢探索吧!这里将你引进门!

注意:

  1. 如果你在启动kafka manager这个容器时指定了 ZK_HOSTS ,那么Cluster Zookeeper Hosts这项填的内容要和 ZK_HOSTS 一致,否则会出现连接不上,连接超时等情况。如下图:

    image-20220818234605319

  2. 另外有些配置默认值是1,但是你得将其改成1以上的整数,否则不能正确保存提交。如:

    image-20220818234844587


注意

kafka版本不同,响应的api有区别

本版本是2.11

注意3.x是 --bootstrap-server localhost:9092方式新建,kafka2.x是以–zookeeper方式创建。下面查看新建的topic。

image-20220925165659756

image-20220925165713665

奇葩问题

1.重启docker失败?

[root@localhost ~]# systemctl restart docker
Job for docker.service failed because the control process exited with error code. See "systemctl status docker.service" and "journalctl -xe" for details.
[root@localhost ~]# journalctl -xe
-- The result is failed.
222 02:01:53 localhost.localdomain systemd[1]: Unit docker.service entered failed state.
222 02:01:53 localhost.localdomain systemd[1]: Unit docker.service entered failed state.
222 02:01:53 localhost.localdomain systemd[1]: docker.service failed.
222 02:01:55 localhost.localdomain systemd[1]: docker.service holdoff time over, scheduling restart.
222 02:01:55 localhost.localdomain systemd[1]: Stopped Docker Application Container Engine.
-- Subject: Unit docker.service has finished shutting down
-- Defined-By: systemd
-- Support: http://lists.freedesktop.org/mailman/listinfo/systemd-devel
-- 
-- Unit docker.service has finished shutting down.
222 02:01:55 localhost.localdomain systemd[1]: start request repeated too quickly for docker.service
222 02:01:55 localhost.localdomain systemd[1]: Failed to start Docker Application Container Engine.
-- Subject: Unit docker.service has failed
-- Defined-By: systemd
-- Support: http://lists.freedesktop.org/mailman/listinfo/systemd-devel
-- 
-- Unit docker.service has failed.
-- 
-- The result is failed.

原因:修改文件/etc/docker/daemon.json时不规范,可能存在空格什么的

解决:

[root@localhost ~]# cat <<EOF >/etc/docker/daemon.json
> {
> "registry-mirrors": ["https://registry.docker-cn.com"]
> }
> EOF
[root@localhost ~]# cat /etc/docker/daemon.json 
{
"registry-mirrors": ["https://registry.docker-cn.com"]
}
[root@localhost ~]# 
[root@localhost ~]# systemctl daemon-reload
[root@localhost ~]# systemctl restart docker

2.查询镜像无果?

[root@localhost ~]# docker search kafka
Error response from daemon: Get "https://index.docker.io/v1/search?q=kafka&n=25": x509: certificate has expired or is not yet valid: current time 2023-02-22T02:08:25+08:00 is before 2023-02-22T00:00:00Z

原因:虚拟机时间与外部时间不一致

解决:

[root@localhost ~]# date
2023年 02月 22日 星期三 02:09:50 CST
[root@localhost ~]# ntpdate cn.pool.ntp.org
26 Feb 13:31:38 ntpdate[44996]: step time server 119.28.206.193 offset 386475.634457 sec
[root@localhost ~]# date
2023年 02月 26日 星期日 13:31:48 CST
[root@localhost ~]# docker search kafka
NAME                                         DESCRIPTION                                     STARS     OFFICIAL   AUTOMATED
bitnami/kafka                                Apache Kafka is a distributed streaming plat…   615                  [OK]
ubuntu/kafka                                 Apache Kafka, a distributed event streaming …   25                   
bitnami/kafka-exporter                                                                       9                    
ibmcom/kafka                                 Docker Image for IBM Cloud Private-CE (Commu…   6                    
bitnami/kafka-trigger-controller             Source for this controller is in the kubeles…   5                    
ibmcom/kafka-python-console-sample           Docker image for the IBM Event Streams Pytho…   2                    
openwhisk/kafkaprovider                      Apache OpenWhisk event provider service for2                    [OK]

3.Docker容器内如何安装vim?

  1. apt-get install vim (可能提示你安装失败!继续往下)

  2. agt-get update 同步 /etc/apt/sources.list 和 /etc/apt/sources.list.d 中列出的源的索引

    配置国内镜像源:

    echo "deb http://mirrors.163.com/debian/ jessie main non-free contrib" >/etc/apt/sources.listecho "deb http://mirrors.163.com/debian/ jessie-proposed-updates main non-free contrib" >>/etc/apt/sources.listecho "deb-src http://mirrors.163.com/debian/ jessie main non-free contrib" >>/etc/apt/sources.listecho "deb-src http://mirrors.163.com/debian/ jessie-proposed-updates main non-free contrib" >>/etc/apt/sources.list
    
  3. 返回第一步

4.无法启动kafka?

kafka.common.KafkaException: Socket server failed to bind to 150.158.16.123:9092: 无法指定被请求的地址.at kafka.network.Acceptor.openServerSocket(SocketServer.scala:327)at kafka.network.Acceptor.<init>(SocketServer.scala:252)at kafka.network.SocketServer$$anonfun$startup$1.apply(SocketServer.scala:91)at kafka.network.SocketServer$$anonfun$startup$1.apply(SocketServer.scala:83)at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)at kafka.network.SocketServer.startup(SocketServer.scala:83)at kafka.server.KafkaServer.startup(KafkaServer.scala:222)at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:38)at kafka.Kafka$.main(Kafka.scala:65)at kafka.Kafka.main(Kafka.scala)

注意,上面是配置里面有个地址写得不对,listeners=PLAINTEXT://10.20.30.153:9092后接的是内网地址,通过ip addr即可查看,如我的机器

image-20221001140813960

一个写内网地址,一个写外网地址即可

image-20221001140917719

本次分享到这,下期见!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_74434.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

YOLOV5s+Shufflenetv2+VOC数据集+迁移学习

前言&#xff1a;更改YOLOV5的backbone网络为 Shufflenetv2&#xff0c;便于达到轻量化的目的 1. 试运行YOLOv5 b站推土机 2. VOC数据集处理 3. 更改轻量级网络 参考魔改yolov5 3.1 在common.py末尾加入以下代码 #添加轻量化模块Shufflenetv2 # ------------------------…

安装配置DHCP

本次实验采用CentOS71.检查在安装DHCP之前先使用rpm命令查看系统中已有的DHCP软件包rpm -qa | grep dhcp由此可知&#xff0c;系统中尚未安装DHCP软件包2.安装我们可以使用yum命令为系统安装DHCP软件包yum -y install dhcp安装完成后再次检查可以看到DHCP软件包3.配置dhcp配置文…

办公室人员离岗识别检测系统 yolov7

办公室人员离岗识别检测系统根据yolov7网络模型深度学习技术&#xff0c;办公室人员离岗识别检测算法能够7*24小时全天候自动识别人员是否在岗位。YOLOv7 在 5 FPS 到 160 FPS 范围内&#xff0c;速度和精度都超过了所有已知的目标检测器&#xff0c;并在V100 上&#xff0c;30…

刷题28-有效的变位词

32-有效的变位词 解题思路&#xff1a; 注意变位词的条件&#xff0c;当两个字符串完全相等或者长度不等时&#xff0c;就不是变位词。 把字符串中的字符映射成整型数组&#xff0c;统计每个字符出现的次数 注意数组怎么初始化&#xff1a; int [] s1new int[26]代码如下&a…

Docker buildx 的跨平台编译

docker buildx 默认的 docker build 命令无法完成跨平台构建任务&#xff0c;我们需要为 docker 命令行安装 buildx 插件扩展其功能。buildx 能够使用由 Moby BuildKit 提供的构建镜像额外特性&#xff0c;它能够创建多个 builder 实例&#xff0c;在多个节点并行地执行构建任…

社畜大学生的Python之pandas学习笔记,保姆入门级教学

接上期&#xff0c;上篇介绍了 NumPy&#xff0c;本篇介绍 pandas。 目录 pandas 入门pandas 的数据结构介绍基本功能汇总和计算描述统计处理缺失数据层次化索引 pandas 入门 Pandas 是基于 Numpy 构建的&#xff0c;让以 NumPy 为中心的应用变的更加简单。 Pandas是基于Numpy…

NLP中的对话机器人——预训练基准模型

引言 本文是七月在线《NLP中的对话机器人》的视频笔记&#xff0c;主要介绍FAQ问答型聊天机器人的实现。 场景二 上篇文章中我们解决了给定一个问题和一些回答&#xff0c;从中找到最佳回答的任务。 在场景二中&#xff0c;我们来实现&#xff1a; 给定新问题&#xff0c;从…

基础夯实,字节内部总结240道算法LeetCode刷题笔记,直呼太全

1、什么是算法算法(algorithm&#xff0c;[ˈlɡərɪəm]&#xff0c;计算程序)&#xff1a;就是定义良好的计算过程&#xff0c;他取一个或一组的值为输入&#xff0c;并产生出一个或一组值作为输出。简单来说算法就是一系列的计算步骤&#xff0c;用来将输入数据转化成输出结…

java spring AOP 完全注解开发

我们先创建一个项目 然后引入java spring aop的依赖 然后 在src下创建目录 我这里 直接就叫 Aop了 下面创建一个User类 参考代码如下 package Aop;import org.springframework.stereotype.Component;Component public class User {public void add(){System.out.println(&qu…

Redis高级-主从复制相关操作

2.1 主从复制简介 2.1.1 高可用 首先我们要理解互联网应用因为其独有的特性我们演化出的三高架构 高并发 应用要提供某一业务要能支持很多客户端同时访问的能力&#xff0c;我们称为并发&#xff0c;高并发意思就很明确了 高性能 性能带给我们最直观的感受就是&#xff1a;速…

Java EE|TCP/IP协议栈之应用层协议DNS详解

文章目录一、对DNS的感性认识简介特点一些常见疑问二、DNSDNS域名结构域名的分级三、域名服务器四、域名解析过程参考一、对DNS的感性认识 简介 DNS&#xff0c;即Domain Name System,是域名系统的简称。它是Internet上解决网上机器命名的一种系统。 TCP/IP中的IP地址是由四…

【蓝桥集训】第七天并查集

作者&#xff1a;指针不指南吗 专栏&#xff1a;Acwing 蓝桥集训每日一题 &#x1f43e;或许会很慢&#xff0c;但是不可以停下来&#x1f43e; 文章目录1.亲戚2.合并集合3.连通块中点的数量有关并查集的知识学习可以移步至—— 【算法】——并查集1.亲戚 或许你并不知道&#…

c语言tips-大端小端存储介绍和使用union判断大小端

1. 大小端介绍 大端&#xff08;Big Endian&#xff09;和小端&#xff08;Little Endian&#xff09;是两种CPU或者计算机系统存储数据的方式。 在大端系统中&#xff0c;数据的高位字节&#xff08;MSB&#xff09;存储在内存地址的低位&#xff0c;低位字节&#xff08;LSB…

【C++】C++入门(下)

引用 什么是引用&#xff1f;   引用是给一个已经存在的变量取一个别名&#xff0c;在语法上并不会给这个别名开一个空间&#xff0c;它和她引用的变量共用一个空间。但是实际上引用也是开了一块空间的&#xff0c;用来存放引用名。引用是按照指针的方式来实现的。引用语法&…

《分布式技术原理与算法解析》学习笔记Day23

分布式数据复制 我们在进行分布式数据存储设计时&#xff0c;通常会考虑对数据进行备份&#xff0c;以提高数据的可用性和可靠性&#xff0c;“数据复制技术”就是实现数据备份的关键技术。 什么是数据复制技术&#xff1f; 在分布式数据库系统中&#xff0c;通常会设置主备…

华为OD机试用Python实现 -【统一限载货物数最小值】(2023-Q1 新题)

华为OD机试题 华为OD机试300题大纲统一限载货物数最小值题目描述输入描述输出描述说明示例一输入输出说明示例二输入输出说明Python 代码实现算法逻辑华为OD机试300题大纲 参加华为od机试,一定要注意不要完全背诵代码,需要理解之后模仿写出,通过率才会高。 华为 OD 清单查…

python爬虫常见错误

python爬虫常见错误前言python常见错误1. AttributeError: WebDriver object has no attribute find_element_by_id1. 问题描述2. 解决办法2. selenium&#xff1a;DeprecationWarning: executable_path has been deprecated, please pass in1. 问题描述2. 解决办法3. 下载了包…

k8s-资源限制-探针检查

文章目录一、资源限制1、资源限制的使用2、reuqest资源&#xff08;请求&#xff09;和limit资源&#xff08;约束&#xff09;3、Pod和容器的资源请求和限制4、官方文档示例5、资源限制实操5.1 编写yaml资源配置清单5.2 释放内存&#xff08;node节点&#xff0c;以node01为例…

《程序员思维修炼》速读笔记

文章目录书籍信息概览绪论从新手到专家的历程认识大脑利用右脑调试大脑主动学习积累经验控制注意力超越专家图解书籍信息 书名&#xff1a;《程序员思维修炼&#xff08;修订版&#xff09;》 作者&#xff1a;[美] Andy Hunt 概览 绪论 再提“实用”关注情境所有人都关注这…

Flutter3引用原生播放器-IOS(Swift)篇

前言由于Flutter项目中需要使用到播放器功能&#xff0c;因此对flutter中各种播放器解决方案进行了一番研究和比对&#xff0c;最后决定还是自己通过Plugin的方法去引用原生播放器符合自己的需求&#xff0c;本篇文章会对各种解决方案做一个简单的比较&#xff0c;以及讲解一下…