Elasticsearch:使用 Logstash 构建从 Kafka 到 Elasticsearch 的管道 - Nodejs

news/2024/5/3 17:10:19/文章来源:https://blog.csdn.net/UbuntuTouch/article/details/129341993

在我之前的文章 “Elastic:使用 Kafka 部署 Elastic Stack”,我构建了从 Beats => Kafka => Logstash => Elasticsearch 的管道。在今天的文章中,我将描述从 Nodejs => Kafka => Logstash => Elasticsearch 这样的一个数据流。在之前的文章 “Elastic:Data pipeline:使用 Kafka => Logstash => Elasticsearch” 中,我也展示了使用 Python 的方法。我的配置如下:

在上面的架构中,有几个重要的组件:

  • Kafka Server:这就是数据首先发布的地方。
  • Producer:扮演将数据发布到 Kafka topic 的角色。 在现实世界中,你可以具有任何可以为 kafka 主题生成数据的实体。 在我们的示例中,我们将生成伪造的用户注册数据。
  • Elasticsearch:这将充当将用户注册数据存储到其自身的数据库,并提供搜索及分析。
  • Logstash:Logstash 将扮演中间人的角色,在这里我们将从 Kafka topic 中读取数据,然后将其插入到 Elasticsearch 中。
  • Kibana:Kibana 将扮演图形用户界面的角色,它将以可读或图形格式显示数据。

为了演示的方便,你可以在地址下载演示文件 GitHub - liu-xiao-guo/data-pipeline8。我的文件目录是这样的:

$ pwd
/Users/liuxg/data/data-pipeline8
$ tree -L 3
.
├── README.md
├── docker-elk
│   ├── docker-compose.yml
│   └── logstash_pipeline
│       └── kafka-elastic.conf
├── docker-kafka
│   └── kafka-docker-compose.yml
└── kafka_producer.js
$ pwd
/Users/liuxg/data/data-pipeline8/docker-elk
$ ls -al
total 16
drwxr-xr-x  5 liuxg  staff   160 May 14  2021 .
drwxr-xr-x  8 liuxg  staff   256 Mar  5 07:36 ..
-rw-r--r--  1 liuxg  staff    29 May  7  2021 .env
-rw-r--r--  1 liuxg  staff  1064 May 13  2021 docker-compose.yml
drwxr-xr-x  3 liuxg  staff    96 May 13  2021 logstash_pipeline
$ vi .env
$ cat .env
ELASTIC_STACK_VERSION=8.6.2

上面的其它文件将在我下面的章节中介绍。如果你自己想通过手动的方式部署 Kafka 请参阅我的另外一篇文章 “使用 Kafka 部署 Elastic Stack”。

安装

Kafka,Zookeeper 及 Kafka Manager

我将使用 docker-compose 来进行安装。一旦安装好,我们可以看到:

  • Kafka 在 PORT 9092 侦听
  • Zookeeper 在 PORT 2181 侦听
  • Kafka Manager 侦听 PORT 9000 侦听

kafka-docker-compose.yml

version: "3"
services:zookeeper:image: zookeeperrestart: alwayscontainer_name: zookeeperhostname: zookeeperports:- 2181:2181environment:ZOO_MY_ID: 1kafka:image: wurstmeister/kafkacontainer_name: kafkaports:- 9092:9092environment:KAFKA_ADVERTISED_HOST_NAME: 192.168.0.3 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181kafka_manager:image: hlebalbau/kafka-manager:stablecontainer_name: kakfa-managerrestart: alwaysports:- "9000:9000"environment:ZK_HOSTS: "zookeeper:2181"APPLICATION_SECRET: "random-secret"command: -Dpidfile.path=/dev/null

我们可以使用如下的命令来进行启动(在 Docker 运行的前提下):

docker-compose -f kafka-docker-compose.yml up

 一旦运行起来后,我们可以使用如下的命令来进行查看:

docker ps
$ docker ps
CONTAINER ID   IMAGE                            COMMAND                  CREATED              STATUS              PORTS                                                  NAMES
a4acc0730467   zookeeper                        "/docker-entrypoint.…"   About a minute ago   Up About a minute   2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, 8080/tcp   zookeeper
02ec8e8a1e30   hlebalbau/kafka-manager:stable   "/kafka-manager/bin/…"   About a minute ago   Up About a minute   0.0.0.0:9000->9000/tcp                                 kakfa-manager
a85c32c0c08e   wurstmeister/kafka               "start-kafka.sh"         About a minute ago   Up About a minute   0.0.0.0:9092->9092/tcp                                 kafka

我们发现 Kafka Manager 运行于 9000 端口。我们打开本地电脑的 9000 端口:

在上面它显示了一个默认的 topic,虽然不是我们想要的。

 

这样,我们就把 Kafka 上的 kafka_logstash topic 创建好了。

我们可以登录 kafka 容器来验证我们已经创建的 topic。我们使用如下的命令来找到 kafka 容器的名称:

docker ps -s
$ docker ps -s
CONTAINER ID   IMAGE                            COMMAND                  CREATED         STATUS         PORTS                                                  NAMES           SIZE
de7453250529   hlebalbau/kafka-manager:stable   "/kafka-manager/bin/…"   9 minutes ago   Up 9 minutes   0.0.0.0:9000->9000/tcp                                 kakfa-manager   117kB (virtual 427MB)
65eba68350f1   zookeeper                        "/docker-entrypoint.…"   9 minutes ago   Up 9 minutes   2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, 8080/tcp   zookeeper       33kB (virtual 288MB)
3394868b23e9   wurstmeister/kafka               "start-kafka.sh"         9 minutes ago   Up 9 minutes   0.0.0.0:9092->9092/tcp                                 kafka           210kB (virtual 457MB)

上面显示 kafka 的容器名称为 wurstmeister/kafka。我们使用如下的命令来进行登录:

docker exec -it wurstmeister/kafka  /bin/bash

然后我们在容器里 打入如下的命令:

$ docker exec -it kafka  /bin/bash
root@3394868b23e9:/# kafka-topics.sh --list -zookeeper zookeeper:2181
__consumer_offsets
kafka_logstash

上面的命令显示已经存在的被创建的 kafka_logstash topic。我们可以使用如下的命令来向这个被创建的 topic 来发送数据:

kafka-console-consumer.sh --bootstrap-server 192.168.0.3:9092 --topic kafka_logstash --from-beginning
root@3394868b23e9:/# kafka-console-consumer.sh --bootstrap-server 192.168.0.3:9092 --topic kafka_logstash --from-beginning

Elastic Stack 安装

 我们接下来安装 Elastic Stack。同样地,我使用 docker-compose 来部署 Elasticsearch, Logstash 及 Kibana。你们可以参考我之前的文章 “Logstash:在 Docker 中部署 Logstash”。为了能够把数据传入到 Elasticsearch 中,我们需要在 Logstash 中配置一个叫做 kafka-elastic.conf 的配置文件:

kafka-elastic.conf

input {kafka {bootstrap_servers => "192.168.0.3:9092"topics => ["kafka_logstash"]}
}output {elasticsearch {hosts => ["elasticsearch:9200"]index => "kafka_logstash"workers => 1}
}

请注意:在上面的 192.168.0.3 为我自己电脑的本地 IP 地址。为了说明问题的方便,我们没有对来自 kafka 里的 registered_user 这个 topic 做任何的数据处理,而直接发送到 Elasticsearch 中。

我们的 docker-compose.yml 配置文件如下:

docker-compose.yml

version: "3.9"
services:elasticsearch:image: elasticsearch:${ELASTIC_STACK_VERSION}container_name: elasticsearchenvironment:- discovery.type=single-node- ES_JAVA_OPTS=-Xms1g -Xmx1g- xpack.security.enabled=falsevolumes:- type: volumesource: es_datatarget: /usr/share/elasticsearch/dataports:- target: 9200published: 9200networks:- elastickibana:image: kibana:${ELASTIC_STACK_VERSION}container_name: kibanaports:- target: 5601published: 5601depends_on:- elasticsearchnetworks:- elastic   logstash:image: logstash:${ELASTIC_STACK_VERSION}container_name: logstashports:- 5200:5200volumes: - type: bindsource: ./logstash_pipeline/target: /usr/share/logstash/pipelineread_only: truenetworks:- elastic           volumes:es_data:driver: localnetworks:elastic:name: elasticdriver: bridge

为方便起见,在我的安装中,我没有配置安全。如果你需要为 Elasticsearch 设置安全的话,请参考我之前的文章 “Elasticsearch:使用 Docker compose 来一键部署 Elastic Stack 8.x”。

我们使用如下的命令来启动 Elastic Stack。在 docker-compose.yml 所在的目录中打入如下的命令:

$ pwd
/Users/liuxg/data/data-pipeline8/docker-elk
$ ls
docker-compose.yml logstash_pipeline
$ docker-compose up

 等所有的 Elastic Stack 运行起来后,我们再次通过如下的命令来进行查看:

docker ps
$ docker ps
CONTAINER ID   IMAGE                            COMMAND                  CREATED              STATUS              PORTS                                                  NAMES
3db5e4e6e23e   kibana:8.6.2                     "/bin/tini -- /usr/l…"   About a minute ago   Up About a minute   0.0.0.0:5601->5601/tcp                                 kibana
210b673dd89a   logstash:8.6.2                   "/usr/local/bin/dock…"   About a minute ago   Up About a minute   5044/tcp, 9600/tcp, 0.0.0.0:5200->5200/tcp             logstash
05c434edd823   elasticsearch:8.6.2              "/bin/tini -- /usr/l…"   About a minute ago   Up About a minute   0.0.0.0:9200->9200/tcp, 9300/tcp                       elasticsearch
de7453250529   hlebalbau/kafka-manager:stable   "/kafka-manager/bin/…"   51 minutes ago       Up 51 minutes       0.0.0.0:9000->9000/tcp                                 kakfa-manager
65eba68350f1   zookeeper                        "/docker-entrypoint.…"   51 minutes ago       Up 51 minutes       2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, 8080/tcp   zookeeper
3394868b23e9   wurstmeister/kafka               "start-kafka.sh"         51 minutes ago       Up 51 minutes       0.0.0.0:9092->9092/tcp                                 kafka

我们可以看到 Elasticsearch 运用于 9000 端口,Kibana 运行于 5601 端口,而 Logstash 运行 5000 端口。 我们可以访问 Kibana 的端口地址 5601: 

 

运行 Nodejs 应用导入模拟数据

我们接下来建立一个 Nodejs 的应用来模拟一些数据。首先,我们需要安装如下的包:

npm install kafkajs uuid randomstring random-mobile

我们在根目录下打入如下的命令:

npm init -y
$ npm init -y
Wrote to /Users/liuxg/data/data-pipeline8/package.json:{"dependencies": {"kafkajs": "^2.2.4"},"name": "data-pipeline8","description": "This is a sample code showing how to realize the following data pipeline:","version": "1.0.0","main": "kafka_producer.js","devDependencies": {},"scripts": {"test": "echo \"Error: no test specified\" && exit 1"},"repository": {"type": "git","url": "git+https://github.com/liu-xiao-guo/data-pipeline8.git"},"keywords": [],"author": "","license": "ISC","bugs": {"url": "https://github.com/liu-xiao-guo/data-pipeline8/issues"},"homepage": "https://github.com/liu-xiao-guo/data-pipeline8#readme"
}

上述命令生成一个叫做 package.json 的文件。在以后安装的 packages,它也会自动添加到这个文件中。默认的设置显然不是我们想要的。我们需要对它做一些修改。

kafka_producer.js

// import { Kafka, logLevel } from "kafkajs";
const { Kafka } = require('kafkajs');
const logLevel = require("kafkajs");// import { v4 as uuidv4 } from "uuid";
const { v4: uuidv4 } = require('uuid');console.log(uuidv4());const kafka = new Kafka({clientId: "random-producer",brokers: ["localhost:9092"],connectionTimeout: 3000,
});var randomstring = require("randomstring");
var randomMobile = require("random-mobile");
const producer = kafka.producer({});
const topic = "kafka_logstash";const produce = async () => {await producer.connect();let i = 0;setInterval(async () => {var event = {};try {event = {globalId: uuidv4(),event: "USER-CREATED",data: {id: uuidv4(),firstName: randomstring.generate(8),lastName: randomstring.generate(6),country: "China",email: randomstring.generate(10) + "@gmail.com",phoneNumber: randomMobile(),city: "Hyderabad",createdAt: new Date(),},};await producer.send({topic,acks: 1,messages: [{value: JSON.stringify(event),},],});// if the message is written successfully, log it and increment `i`console.log("writes: ", event);i++;} catch (err) {console.error("could not write message " + err);}}, 5000);
};produce().catch(console.log)

我们运行上面的 Nodejs 代码:

npm start

 我们接下来在 Kibana 中来查看索引 kafka_logstash:

GET kafka_logstash/_count
{"count": 103,"_shards": {"total": 1,"successful": 1,"skipped": 0,"failed": 0}
}

我们可以看到文档的数值在不断地增加。我们可以查看文档:

很显然我们收到了数据。从上面的结果中,我们可以看出来是一些非结构化的数据。我们可以针对 Logstash 的 pipeline 进行修改:

kafka-elastic.conf

input {kafka {bootstrap_servers => "192.168.0.3:9092"topics => ["kafka_logstash"]}
}filter {json {source => "message"}mutate {add_field => {"id" => "%{[data][id]}"}add_field => {"firstName" => "%{[data][firstName]}"}add_field => {"lastName" => "%{[data][lastName]}"}add_field => {"city" => "%{[data][city]}"}add_field => {"country" => "%{[data][country]}"}add_field => {"email" => "%{[data][email]}"}add_field => {"phoneNumber" => "%{[data][phoneNumber]}"}add_field => {"createdAt" => "%{[data][createdAt]}"}remove_field => ["data", "@version", "@timestamp", "message", "event", "globalId"]}  
}output {elasticsearch {hosts => ["elasticsearch:9200"]index => "kafka_logstash"workers => 1}
}

我们在 Kibana 中删除 kafka_logstash:

DELETE kafka_logstash

我们停止运行 Nodejs 应用。我们把运行 Elastic Stack 的 docker-compose 关掉,并再次重新启动它:

docker-compose down
docker-compose up

我们再次运行 Nodejs 应用:

 我们再次到 Kibana 中进行查看:

很显然,这次,我们看到结构化的输出文件。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_78052.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【C#进阶】C# 特性

序号系列文章10【C#基础】C# 正则表达式11【C#基础】C# 预处理器指令12【C#基础】C# 文件与IO文章目录前言1,特性的概念1.1 特性的属性1.2 特性的用途2,特性的定义2.1 特性参数2.2 特性目标3,预定义特性3.1 AttributeUsage3.2 Conditional3.2…

2023年再不会Redis,就要被淘汰了

目录专栏导读一、同样是缓存,用map不行吗?二、Redis为什么是单线程的?三、Redis真的是单线程的吗?四、Redis优缺点1、优点2、缺点五、Redis常见业务场景六、Redis常见数据类型1、String2、List3、Hash4、Set5、Zset6、BitMap7、Bi…

2023款欧拉好猫上市,12.98万起

上周,2023款欧拉好猫焕新上市。2023款好猫共推出5个车型: •401km标续航,舒享型/豪华型/尊贵型,分别是12.98/13.98/14.98万元; •501km长续航,豪华型/尊贵型,分别是15.58/16.58万元;…

QT的下载与安装

下载安装工具 https://download.qt.io/official_releases/online_installers/ 双击打开安装包 一步一步安装 选择需要的包,没想好的话QT装好了也可以重新使用安装程序添加 然后就装好了

SAP UI5 Upload/Download file through NetWeaver Gateway

1、创建 SEGW对象 2、创建Entity Type 要把Media 标识打上 3、 激活对象然后到DPC Class的扩展对象里面重定义 /IWBEP/IF_MGW_APPL_SRV_RUNTIME~GET_STREAM /IWBEP/IF_MGW_APPL_SRV_RUNTIME~CREATE_STREAM /IWBEP/IF_MGW_APPL_SRV_RUNTIME~UPDATE_STREAM METHOD /iwbep/if_m…

1497. 树的遍历

文章目录1.二叉树的遍历2.二叉树的构造3.例题二叉树的构造:没有中序遍历则无法唯一构造1.二叉树的遍历 2.二叉树的构造 3.例题 一个二叉树,树中每个节点的权值互不相同。 现在给出它的后序遍历和中序遍历,请你输出它的层序遍历。 输入格式…

5.深入理解HttpSecurity的设计

深入理解HttpSecurity的设计 一、HttpSecurity的应用 在前章节的介绍中我们讲解了基于配置文件的使用方式,也就是如下的使用。 也就是在配置文件中通过 security:http 等标签来定义了认证需要的相关信息,但是在SpringBoot项目中,我们慢慢脱离…

ubuntu20修改网卡静态ip或者动态ip

1、查看所有网卡信息 ifconfig -a 2、修改信息 sudo gedit /etc/netplan/01-network-manager-all.yaml # Let NetworkManager manage all devices on this system network:ethernets:ens33: #配置的网卡的名称dhcp4: trueens38:dhcp4: trueversion: 2renderer: networkd…

用Python按时间分割txt文件中的数据

案例 有一个监测系统,每隔两分钟就会记录一下监测结果,如下图所示:现在要求按小时将数据提取,并存为新的txt文件,也就是1天会对应有24个txt文件。先整理一下思路: 读取数据将每行数据的时间戳转换成“日期-小时”格式,并按此分类数据,存入字典 按“日期-小时”分断,将…

没有钱怎么创业?一分钱没有如何能创业成功?

限制人创业成功的从来都不是资金,而是能力,这个道理很多人都可能不懂,多数人习惯了庸庸碌碌、日复一日地打工行为,却不知如何创业,那么,没有钱怎么创业?一分钱没有如何能创业成功呢?…

【虹科案例】虹科任意波形发生器在量子计算中的应用

虹科AWG在量子计算中的应用精度在研究中始终很重要,很少有研究领域需要比量子研究更高的精度。奥地利因斯布鲁克大学的量子光学和量子信息研究所需要一个任意波形发生器(AWG)来为他们的研究生成各种各样的信号。01无线电频率第一个应用是在射…

python线上商城网站项目前台和后台源码

wx供重浩:创享日记 对话框发送:python51 获取完整源码源文件说明文档配置教程等 1、网站前台 在虚拟环境中启动程序后,使用浏览器访问“http://127.0.0.1:5000”即可进入网站前台首页。如图1所示。 单击首页左上角“注册”按钮,进…

【MySQL】第17章_触发器

第17章_触发器 在实际开发中,我们经常会遇到这样的情况:有 2 个或者多个相互关联的表,如商品信息和库存信息分别存放在 2 个不同的数据表中,我们在添加一条新商品记录的时候,为了保证数据的完整性,必须同时…

正版Scrivener 3 论文/小说写作工具神器软件

一款非常优秀的写作软件,提供了各种写作辅助功能,如标注多个文档、概述介绍、全屏幕编辑、快照等,能够轻松、便捷的辅助作者从作品构思、搜集资料、组织结构、增删修改到排版输出的整个写作流程。 作为一个专业的写作软件,Scriven…

给文档添加签名,介绍用iPhone的实例

环境:iOS 16 实现电子文档上的签名不是什么新鲜事,也不需要高级的技术,原理基本一致,就是菜单路径有所不同,故在此记录一下,不然容易忘记。 这里介绍的解决方法: 需要一个签名,背…

面向对象设计模式:行为型模式之迭代器模式

一、迭代器模式,Iterator Pattern aka:Cursor Pattern 1.1 Intent 意图 Provide a way to access the elements of an aggregate object sequentially without exposing its underlying representation. 提供一种按顺序访问聚合对象的元素而不公开其基…

XShell连接ubuntu20.04.LTS

1 下载XshellXShell官方下载地址打开XSHELL官方下载地址,我们可以选择【家庭和学校用户的免费许可证】,输入邮箱之后即可获得下载链接安装非常简单,跟着提示进行即可。2 连接ubuntu2.1 查看ubuntu的ip地址输入命令查看ip地址ifconfig刚开始可…

C++ | 你真的了解namespace吗?

文章目录一、前言二、命名冲突三、命名空间1、域作用限定符2、命名空间的概念👉示例1👉示例23、命名空间的定义4、命名空间的使用① 指定命名空间访问【做项目】② 使用using部分展开【做项目】③ 使用using namespace全局展开【日常练习】5、小结解答&a…

通用业务平台设计(五):预警平台建设

前言 在上家公司,随着业务的不断拓展(从支持单个国家单个主体演变成支持多个国家多个主体),对预警的诉求越来越紧迫;如何保障业务的稳定性那?预警可以帮我们提前甄别风险,从而让我们可以在风险来临前将其消灭&#xff…

3DEXPERIENCE Works 成为了中科赛凌实现科技克隆环境的催化剂

您的企业是否想过实现设计数据的统筹管理,在设计上实现标准化,并把每位设计工程师串联起来协同办公?中科赛凌通过使用3DEXPERIENCE Works 实现了上述内容,一起来看本期案例分享吧!中科赛凌 通过其自主研发的单压缩机制冷技术实现零下190℃制…