kafka 报错 - Cannot assign requested address

news/2024/5/20 1:09:27/文章来源:https://blog.csdn.net/Enchanter06/article/details/131194986

背景

在华为云服务器上跑了 zookeeper 和 kafka 的 broker,想内外网分流,重点就是做不到从外网去消费,比如用自己的 windows 笔记本去消费。

配置 server.properties 的 listener 为 broker 所在机子的的内网 IP 后,终于能 start 了:

listener=PLAINTEXT://192.168.0.154:9092

zookeeper 查看 kafka broker 的地址:

get /brokers/ids/0
{"features":{},"listener_security_protocol_map":{"PUBLIC":"PLAINTEXT"},"endpoints":["PUBLIC://192.168.0.154:9092"],"jmx_port":-1,"port":9092,"host":"192.168.0.154","version":5,"timestamp":"1686651266529"}

地址是 “endpoints”:[“PUBLIC://192.168.0.154:9092”]

从 broker 机子的本地创建 topic:

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic test0

结果疯狂滚屏:

workClient)
[2023-06-13 18:21:48,162] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.1.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2023-06-13 18:21:49,266] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

换个姿势创建:

bin/kafka-topics.sh --create --bootstrap-server 192.168.0.154:9092 --topic test0

成功了:

在这里插入图片描述

broker 本机来 produce。

在这里插入图片描述

换同网段机子 192.168.0.28 来消费:

在这里插入图片描述

外网 windows 去消费:

D:\Programs\MQ\kafka\kafka_2.12-3.2.3\bin\windows>kafka-console-consumer.bat --topic test0 --from-beginning --bootstrap-server 121.37.xx.xxx:9092
[2023-06-13 18:44:40,678] WARN [Consumer clientId=console-consumer, groupId=console-consumer-76813] Connection to node 0 (/192.168.0.154:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

访问不到。

listeners & advertised.listeners

让我们来看下官网的说明:

章节来源:(7. Security - 7.2 Listener Configuration)

每个服务器上必须至少定义一个监听器。 listeners 中定义的每个监听器的格式如下:

{LISTENER_NAME}://{hostname}:{port}

LISTENER_NAME通常是一个描述性的名字,定义了监听器的用途。例如,许多配置为客户端流量使用单独的监听器,所以他们可能在配置中把相应的监听器称为`CLIENT’.

listener.security.protocol.map。该值是一个逗号分隔的列表,列出了映射到其安全协议的每个监听器。例如,下面值配置指定 CLIENT 监听器将使用 SSL,而BROKER 监听器将使用明文(plaintext)。

listener.security.protocol.map=CLIENT:SSL,BROKER:PLAINTEXT

下面给出了安全协议的可能选项:

  1. PLAINTEXT
  2. SSL
  3. SASL_PLAINTEXT
  4. SASL_SSL

明文(PLAINTEXT)协议不提供安全性,不需要任何额外的配置。在下面的章节中,本文将介绍如何配置其余的协议。

也可以在监听器中使用安全协议名称作为监听器名称。

在 listeners list 中,可以通过将 inter.broker.listener.name ,来哪一个声明监听器用于 broker 间的通信。broker 间监听器的主要目的是分区复制。如果没有定义,那么 broker 间的监听器由 security.inter.broker.protocol 定义的安全协议决定,该协议默认为PLAINTEXT

对于依赖 Zookeeper 存储集群元数据 metadata 的传统集群 cluster,可以声明一个单独的 listener,用于从活动控制器 controller 到 broker 的元数据 metadata 传播。这是由 control.plane.listener.name 定义的。当 controller 需要向 cluster 中的 broker 推送 metadata 更新时,它将使用这个监听器 listener。使用控制平面监听器(contol.plane.listener)的好处是,它使用一个单独的处理线程,这使得应用程序流量不太可能阻碍元数据变化的及时传播(如分区领导和ISR更新)。

控制器 controller 接收来自其他控制器 controller 和 broker 的请求。由于这个原因,即使一个服务器没有启用控制器角色(即它只是一个 broker),它仍然必须定义控制器监听器以及配置它所需的任何安全属性。例如,我们可以在一个独立的 broker 上使用以下配置:

process.roles=broker
listeners=BROKER://localhost:9092
inter.broker.listener.name=BROKER
controller.quorum.voters=0@localhost:9093
controller.listener.names=CONTROLLER
listener.security.protocol.map=BROKER:SASL_SSL,CONTROLLER:SASL_SSL

在这个例子中,控制器监听器仍然被配置为使用 SASL_SSL 安全协议,但它不包括在 listeners 中,因为 broker 没有暴露控制器监听器本身。在这种情况下,将使用的端口来自 controller.quorum.voters 配置,它定义了完整的控制器列表。


(3.1 Broker Configs)

🌵 listeners

\############################# Socket Server Settings #############################
\# The address the socket server listens on. If not configured, the host name will be equal to the value of\# java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092.\#   FORMAT:
\#     listeners = listener_name://host_name:port
\#   EXAMPLE:
\#     listeners = PLAINTEXT://your.host.name:9092
\#listeners=PLAINTEXT://:9092

Listener List - Comma-separated list of URIs we will listen on and the listener names. If the listener name is not a security protocol, listener.security.protocol.map must also be set.
Listener names and port numbers must be unique.
Specify hostname as 0.0.0.0 to bind to all interfaces.
Leave hostname empty to bind to default interface.
Examples of legal listener lists:
PLAINTEXT://myhost:9092,SSL://:9091
CLIENT://0.0.0.0:9092,REPLICATION://localhost:9093

监听器
Listener List - 以逗号分隔的我们要监听的 URI 列表和监听器名称。如果监听器的名字不是安全协议,listenener.security.protocol.map 也必须被设置。
监听器名称和端口号必须是唯一的。
指定 hostname 为 0.0.0.0 以绑定所有接口。
把 hostname 留空就可以绑定到默认接口。
合法监听器列表的例子。
PLAINTEXT://myhost:9092,SSL://:9091
CLIENT://0.0.0.0:9092,REPLICATION://localhost:9093

(/key:value -> ListenerName: Uri/)

🌵 advertised.listeners

\# Listener name, hostname and port the broker will advertise to clients.
\# If not set, it uses the value for "listeners".
\#advertised.listeners=PLAINTEXT://your.host.name:9092
advertised.listeners=

/监听器名称、主机名和端口,broker将向 clients 公布/

Listeners to publish to ZooKeeper for clients to use, if different than the listeners config property. In IaaS environments, this may need to be different from the interface to which the broker binds. If this is not set, the value for listeners will be used. Unlike listeners, it is not valid to advertise the 0.0.0.0 meta-address.

发布至 zookeeper 注册中心的 listeners, 为了让 clients (从zk拿来)使用。


只需外网访问 kafka

你肯定想到了最简单的一个方法,listeners使用外网ip

listeners=PLAINTEXT://101.89.163.1:9092

如果宿主机有外网网卡,这么配当然没问题。如果没有(ifconfig看不到外网ip的网卡,基本上就不存在这个外网网卡),很可能和我使用的的宿主机(华为云ECS)一样是通过 NAT 映射或者啥办法搞出来的外网 ip,此时 kafka 无法监听这个外网 ip(因为不存在,启动就会报错)。

[2023-06-13 18:57:53,738] INFO App info kafka.server for 0 unregistered (org.apache.kafka.common.utils.AppInfoParser)
[2023-06-13 18:57:53,738] INFO [KafkaServer id=0] shut down completed (kafka.server.KafkaServer)
[2023-06-13 18:57:53,738] ERROR Exiting Kafka due to fatal exception during startup. (kafka.Kafka$)
org.apache.kafka.common.KafkaException: Socket server failed to bind to 121.37.xx.xxx:9092: Cannot assign requested address.at kafka.network.Acceptor.openServerSocket(SocketServer.scala:684)at kafka.network.Acceptor.<init>(SocketServer.scala:576)

failed to bind to 121.37.xx.xxx:9092: Cannot assign requested address.

这时候就是advertised.listeners真正发挥作用的时候了。使用如下配置:

listeners=PLAINTEXT://192.168.0.213:9092 //内网IP
advertised.listeners=PLAINTEXT://101.89.163.1:9092 //外网IP

此时一个完整的 kafka 客户端访问服务端的流程:

  • 客户端访问 101.89.163.1:9092,被 kafka 宿主机所在环境映射到内网192.168.0.213:9092,访问到了kafka节点,请求获得 kafka 服务端的访问地址
  • kafka 从 zookeeper 拿到自己和其他兄弟节点通过 advertised.listeners 注册到 zookeeper 的101.89.163.1:9092 等外网地址,作为 kafka 的服务端访问地址返回给客户端
  • 客户端拿这些地址访问 kafka 集群,被 kafka 宿主机所在环境映射到各kafka节点的内网ip,访问到了kafka服务端…完美循环

你可能会问已经配置了访问地址,为什么还要在第一次访问的时候请求获得 kafka 的访问地址。因为如果是 kafka 集群,你可以选择只给客户端配置一个 kafka 节点的地址(这样是不推荐的),但是客户端必须要访问集群中的每一个节点,所以必须通过这个节点获得集群中每一个节点的访问地址。
如果不配置advertised.listeners=PLAINTEXT://101.89.163.1:9092,你会发现虽然你给kafka 客户端配置的访问地址是 101.89.163.1:9092,但是kafka客户端访问时报错,报错原因是Connection to node -1[192.168.0.213:9092] could not be established. Broker may not be available.。这就是因为不配置advertised.listenersadvertised.listeners 默认使用listeners配置的地址,客户端拿到的就是listeners配置的内网地址


内外网分流

如果是有外网网卡的情况,直接配置外网 ip 有没有问题呢?
如果既要内网访问,又要外网访问,本来可以走内网的流量都走外网网卡,显然不合适;而且有的环境可能被配置成这些 kafka 宿主机是没有外网访问权限的,即虽然他可以访问自己的外网ip,但是访问不了兄弟节点的外网ip。这时候就要配置内外网分流。网上教程就有很多了。


像上面这样设置完后,再从外网(我的windows)访问 broker:

成功。

broker 的日志:

[2023-06-13 19:11:22,003] INFO [GroupMetadataManager brokerId=0] Finished loading offsets and group metadata from __consumer_offsets-28 in 48 milliseconds for epoch 0, of which 48 milliseconds was spent in the scheduler. (kafka.coordinator.group.GroupMetadataManager)
[2023-06-13 19:11:49,972] INFO [GroupCoordinator 0]: Dynamic member with unknown member id joins group console-consumer-85874 in Empty state. Created a new member id console-consumer-da4137d8-6b3a-4983-bf0e-e1dcd49f876b and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
[2023-06-13 19:11:50,009] INFO [GroupCoordinator 0]: Preparing to rebalance group console-consumer-85874 in state PreparingRebalance with old generation 0 (__consumer_offsets-23) (reason: Adding new member console-consumer-da4137d8-6b3a-4983-bf0e-e1dcd49f876b with group instance id None; client reason: rebalance failed due to MemberIdRequiredException) (kafka.coordinator.group.GroupCoordinator)
[2023-06-13 19:11:50,015] INFO [GroupCoordinator 0]: Stabilized group console-consumer-85874 generation 1 (__consumer_offsets-23) with 1 members (kafka.coordinator.group.GroupCoordinator)
[2023-06-13 19:11:50,048] INFO [GroupCoordinator 0]: Assignment received from leader console-consumer-da4137d8-6b3a-4983-bf0e-e1dcd49f876b for group console-consumer-85874 for generation 1. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)

…for group console-consumer-85874 for generation 1. The group has 1 members,


consumer端:

在这里插入图片描述

跑了五次,创了五个consumer:

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_506531.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ECC算法学习(一)算法公式

ECC 一、ECC简介优缺点运用 二、算法理论基础1. 椭圆曲线的加法2. 椭圆曲线的二倍运算3. 同余运算4. 有限域5. 乘法逆元 三、算法公式1、有限域的负元2、有限域的加法&#xff0c; P Q P Q PQ3. 斜率计算&#xff08;PQ即要计算P点切线&#xff0c;需要求导&#xff09;4. 椭…

【位图布隆过滤器海量数据面试题】

文章目录 1 位图2 布隆过滤器 1 位图 首先我们来看看一个腾讯的面试题&#xff1a;给40亿个不重复的无符号整数&#xff0c;没排过序。给一个无符号整数&#xff0c;如何快速判断一个数是否在这40亿个数中。 分析&#xff1a; 40亿个不重复整形数据&#xff0c;大概有160亿字节…

Axios和Spring MVC[前端和后端的请求和响应处理]

在前后端交互中&#xff0c;Axios和Spring MVC扮演着不同的角色&#xff0c;分别负责前端和后端的请求和响应处理。它们之间的作用如下&#xff1a; Axios&#xff08;前端&#xff09;&#xff1a; 发送HTTP请求&#xff1a;前端使用Axios库发送HTTP请求到后端。可以使用Axi…

机器学习实践(1.1)XGBoost分类任务

前言 XGBoost属于Boosting集成学习模型&#xff0c;由华盛顿大学陈天齐博士提出&#xff0c;因在机器学习挑战赛中大放异彩而被业界所熟知。相比越来越流行的深度神经网络&#xff0c;XGBoost能更好的处理表格数据&#xff0c;并具有更强的可解释性&#xff0c;还具有易于调参…

hard fault on thread: mqtt0解决办法

rt thread版本4.1.0 使用paho mqtt软件包 运行一段时间后出现 psr: 0x21000000 r00: 0x5036fc8f r01: 0x5036fc88 r02: 0x00000000 r03: 0x5036fc8f r04: 0x00000007 r05: 0x00000063 r06: 0x00005f70 r07: 0x2001f1d8 r08: 0xdeadbeef r09: 0xdeadbeef r10: 0xdeadbeef r11…

关于Java SSM框架的面试题

一、Spring面试题 1、Spring 在ssm中起什么作用&#xff1f; Spring&#xff1a;轻量级框架作用&#xff1a;Bean工厂&#xff0c;用来管理Bean的生命周期和框架集成。两大核心&#xff1a;1、IOC/DI(控制反转/依赖注入) &#xff1a;把dao依赖注入到service层&#xff0c;se…

28.vite

目录 1 一些概念 1.1 单页面应用程序SPA 1.2 vite 2 初始化vite项目 3 项目中的文件 1 一些概念 1.1 单页面应用程序SPA 单页面应用程序是只有一个页面的前端&#xff0c;切换页面通过前端路由来切换 特点如下 实现了前后端分离&#xff0c;后端仅出接口&#…

动态规划III (买股票-121、122、123、188、309)

CP121 买股票的最佳时机 题目描述&#xff1a; 给定一个数组 prices &#xff0c;它的第 i 个元素 prices[i] 表示一支给定股票第 i 天的价格。你只能选择 某一天 买入这只股票&#xff0c;并选择在 未来的某一个不同的日子 卖出该股票。设计一个算法来计算你所能获取的最大利…

YOLOv5-7.0添加解耦头

Decoupled Head Decoupled Head是由YOLOX提出的用来替代YOLO Head&#xff0c;可以用来提升目标检测的精度。那么为什么解耦头可以提升检测效果呢&#xff1f; 在阅读YOLOX论文时&#xff0c;找到了两篇引用的论文&#xff0c;并加以阅读。 第一篇文献是Song等人在CVPR2020发表…

【59天|503.下一个更大元素II ● 42. 接雨水】

503.下一个更大元素II class Solution { public:vector<int> nextGreaterElements(vector<int>& nums) {stack<int> st;int n nums.size();vector<int> res (n, -1);for(int i0; i<2*n;i){while(!st.empty()&&nums[i%n]>nums[st.t…

随机的乐趣和游戏

1、猜数字游戏 #GuessingGame.py import random the_number random.randint(1, 10) print("计算机已经在1到10之间随机生成了一个数字&#xff0c;") guess int(input("请你猜猜是哪一个数字: ")) while guess ! the_number:if guess > the_number:p…

PHP设计模式21-工厂模式的讲解及应用

文章目录 前言基础知识简单工厂模式工厂方法模式抽象工厂模式 详解工厂模式普通的实现更加优雅的实现 总结 前言 本文已收录于PHP全栈系列专栏&#xff1a;PHP快速入门与实战 学会好设计模式&#xff0c;能够对我们的技术水平得到非常大的提升。同时也会让我们的代码写的非常…

淘宝详情页分发推荐算法总结:用户即时兴趣强化

转子&#xff1a;https://juejin.cn/post/6992169847207493639 商品详情页是手淘内流量最大的模块之一&#xff0c;它加载了数十亿级商品的详细信息&#xff0c;是用户整个决策过程必不可少的一环。这个区块不仅要承接用户对当前商品充分感知的诉求&#xff0c;同时也要能肩负起…

Spark大数据处理学习笔记1.5 掌握Scala内建控制结构

文章目录 一、学习目标二、条件表达式&#xff08;一&#xff09;语法格式&#xff08;二&#xff09;执行情况&#xff08;三&#xff09;案例演示任务1、根据输入值的不同进行判断任务2、编写Scala程序&#xff0c;判断奇偶性 三、块表达式&#xff08;一&#xff09;语法格式…

Redis入门 - Lua脚本

原文首更地址&#xff0c;阅读效果更佳&#xff01; Redis入门 - Lua脚本 | CoderMast编程桅杆https://www.codermast.com/database/redis/redis-scription.html Redis 脚本使用 Lua 解释器来执行脚本。 Redis 2.6 版本通过内嵌支持 Lua 环境。执行脚本的常用命令为 EVAL。 …

不要把异常当做业务逻辑,这性能可能你无法承受

一&#xff1a;背景 1. 讲故事 在项目中摸爬滚打几年&#xff0c;应该或多或少的见过有人把异常当做业务逻辑处理的情况(┬&#xff3f;┬)&#xff0c;比如说判断一个数字是否为整数,就想当然的用try catch包起来&#xff0c;再进行 int.Parse&#xff0c;如果抛异常就说明不…

Unity入门8——音效系统

一、音频文件参数面板 Force To Mono&#xff1a;多声道转单声道 Normalize&#xff1a;强制为单声道时&#xff0c;混合过程中被标准化 Load In Background&#xff1a;后台加载&#xff0c;不阻塞主线程&#xff0c;适合大音效 Ambisonic&#xff1a;立体混响声 非常适合 36…

JUC并发编程初学

什么是JUC进程和线程回顾Lock锁生产者和消费者8锁的线程集合类不安全CallableCountDownLatch、CyclicBarrier、Semaphore读写锁阻塞队列线程池四大函数式接口Stream流式计算分支合并异步回调JMMvolatile深入单例模式深入理解CAS原子引用可重入锁、公平锁非公平锁、自旋锁、死锁…

使用单元测试框架unittest进行有效测试

一、介绍 在软件开发中&#xff0c;单元测试是一种测试方法&#xff0c;它用于检查单个软件组件&#xff08;例如函数或方法&#xff09;的正确性。Python 提供了一个内置的单元测试库&#xff0c;名为 unittest&#xff0c;可以用来编写测试代码&#xff0c;然后运行测试&…

MyCat总结

目录 什么是mycat 核心概念 逻辑库 逻辑表 分片节点 数据库主机 用户 mycat原理 目录结构 配置文件 读写分离 搭建读写分离 搭建主从复制&#xff1a; 搭建读写分离&#xff1a; 分片技术 垂直拆分 实现分库&#xff1a; 水平拆分 实现分库&#xff1a; ER表 全局表 分…