flink集群安装部署

news/2024/4/28 6:43:10/文章来源:https://blog.csdn.net/qq_35370485/article/details/130582536

1.下载

官网下载:Downloads | Apache Flink

阿里网盘下载(包含依赖包):阿里云盘分享

提取码:9bl2

2.解压

tar -zxvf flink-1.12.7-bin-scala_2.11.tgz -C ../opt/module

3.修改配置文件

cd flink-1.12.7/conf/

修改  flink-conf.yaml  文件

修改  masters  文件

创建  workers 文件

3.1修改  flink-conf.yaml  文件

具体配置按照自己的集群配置来

################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
#################################################################################==============================================================================
# Common
#==============================================================================# The external address of the host on which the JobManager runs and can be
# reached by the TaskManagers and any clients which want to connect. This setting
# is only used in Standalone mode and may be overwritten on the JobManager side
# by specifying the --host <hostname> parameter of the bin/jobmanager.sh executable.
# In high availability mode, if you use the bin/start-cluster.sh script and setup
# the conf/masters file, this will be taken care of automatically. Yarn/Mesos
# automatically configure the host name based on the hostname of the node where the
# JobManager runs.jobmanager.rpc.address: hadoop102# The RPC port where the JobManager is reachable.jobmanager.rpc.port: 6123# The heap size for the JobManager JVMjobmanager.heap.size: 800m# The total process memory size for the TaskManager.
#
# Note this accounts for all memory usage within the TaskManager process, including JVM metaspace and other overhead.taskmanager.memory.process.size: 1024m# To exclude JVM metaspace and overhead, please, use total Flink memory size instead of 'taskmanager.memory.process.size'.
# It is not recommended to set both 'taskmanager.memory.process.size' and Flink memory.
#
#taskmanager.memory.flink.size: 1280m# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.taskmanager.numberOfTaskSlots: 1# The parallelism used for programs that did not specify and other parallelism.parallelism.default: 1# The default file system scheme and authority.
# 
# By default file paths without scheme are interpreted relative to the local
# root file system 'file:///'. Use this to override the default and interpret
# relative paths relative to a different file system,
# for example 'hdfs://mynamenode:12345'
#
# fs.default-scheme#==============================================================================
# High Availability
#==============================================================================# The high-availability mode. Possible options are 'NONE' or 'zookeeper'.
#
high-availability: zookeeper# The path where metadata for master recovery is persisted. While ZooKeeper stores
# the small ground truth for checkpoint and leader election, this location stores
# the larger objects, like persisted dataflow graphs.
# 
# Must be a durable file system that is accessible from all nodes
# (like HDFS, S3, Ceph, nfs, ...) 
#high-availability.storageDir: hdfs://192.168.233.130:8020/flink/ha/# The list of ZooKeeper quorum peers that coordinate the high-availability
# setup. This must be a list of the form:
# "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)
#
high-availability.zookeeper.quorum: 192.168.233.130:2181,192.168.233.131:2181,192.168.233.132:2181# ACL options are based on https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes
# It can be either "creator" (ZOO_CREATE_ALL_ACL) or "open" (ZOO_OPEN_ACL_UNSAFE)
# The default value is "open" and it can be changed to "creator" if ZK security is enabled
#
high-availability.zookeeper.client.acl: open#==============================================================================
# Fault tolerance and checkpointing
#==============================================================================# The backend that will be used to store operator state checkpoints if
# checkpointing is enabled.
#
# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
# <class-name-of-factory>.
#state.backend: filesystem# Directory for checkpoints filesystem, when using any of the default bundled
# state backends.
#state.checkpoints.dir: hdfs://192.168.233.130:8020/flink-checkpoints# Default target directory for savepoints, optional.
#state.savepoints.dir: hdfs://192.168.233.130:8020/flink-checkpoints# Flag to enable/disable incremental checkpoints for backends that
# support incremental checkpoints (like the RocksDB state backend). 
#
# state.backend.incremental: false# The failover strategy, i.e., how the job computation recovers from task failures.
# Only restart tasks that may have been affected by the task failure, which typically includes
# downstream tasks and potentially upstream tasks if their produced data is no longer available for consumption.jobmanager.execution.failover-strategy: region#==============================================================================
# Rest & web frontend
#==============================================================================# The port to which the REST client connects to. If rest.bind-port has
# not been specified, then the server will bind to this port as well.
#
rest.port: 8081# The address to which the REST client will connect to
#
rest.address: 0.0.0.0# Port range for the REST and web server to bind to.
#
rest.bind-port: 8080-8090# The address that the REST & web server binds to
#
rest.bind-address: 0.0.0.0# Flag to specify whether job submission is enabled from the web-based
# runtime monitor. Uncomment to disable.web.submit.enable: true#==============================================================================
# Advanced
#==============================================================================# Override the directories for temporary files. If not specified, the
# system-specific Java temporary directory (java.io.tmpdir property) is taken.
#
# For framework setups on Yarn or Mesos, Flink will automatically pick up the
# containers' temp directories without any need for configuration.
#
# Add a delimited list for multiple directories, using the system directory
# delimiter (colon ':' on unix) or a comma, e.g.:
#     /data1/tmp:/data2/tmp:/data3/tmp
#
# Note: Each directory entry is read from and written to by a different I/O
# thread. You can include the same directory multiple times in order to create
# multiple I/O threads against that directory. This is for example relevant for
# high-throughput RAIDs.
#
# io.tmp.dirs: /tmp# The classloading resolve order. Possible values are 'child-first' (Flink's default)
# and 'parent-first' (Java's default).
#
# Child first classloading allows users to use different dependency/library
# versions in their application than those in the classpath. Switching back
# to 'parent-first' may help with debugging dependency issues.
#
# classloader.resolve-order: child-first# The amount of memory going to the network stack. These numbers usually need 
# no tuning. Adjusting them may be necessary in case of an "Insufficient number
# of network buffers" error. The default min is 64MB, the default max is 1GB.
# 
# taskmanager.memory.network.fraction: 0.1
#taskmanager.memory.network.min: 128mb
#taskmanager.memory.network.max: 1gb#==============================================================================
# Flink Cluster Security Configuration
#==============================================================================# Kerberos authentication for various components - Hadoop, ZooKeeper, and connectors -
# may be enabled in four steps:
# 1. configure the local krb5.conf file
# 2. provide Kerberos credentials (either a keytab or a ticket cache w/ kinit)
# 3. make the credentials available to various JAAS login contexts
# 4. configure the connector to use JAAS/SASL# The below configure how Kerberos credentials are provided. A keytab will be used instead of
# a ticket cache if the keytab path and principal are set.# security.kerberos.login.use-ticket-cache: true
# security.kerberos.login.keytab: /path/to/kerberos/keytab
# security.kerberos.login.principal: flink-user# The configuration below defines which JAAS login contexts# security.kerberos.login.contexts: Client,KafkaClient#==============================================================================
# ZK Security Configuration
#==============================================================================# Below configurations are applicable if ZK ensemble is configured for security# Override below configuration to provide custom ZK service name if configured
# zookeeper.sasl.service-name: zookeeper# The configuration below must match one of the values set in "security.kerberos.login.contexts"
# zookeeper.sasl.login-context-name: Client#==============================================================================
# HistoryServer
#==============================================================================# The HistoryServer is started and stopped via bin/historyserver.sh (start|stop)# Directory to upload completed jobs to. Add this directory to the list of
# monitored directories of the HistoryServer as well (see below).
jobmanager.archive.fs.dir: hdfs://192.168.233.130:8020/completed-jobs/# The address under which the web-based HistoryServer listens.
#historyserver.web.address: 0.0.0.0# The port under which the web-based HistoryServer listens.
historyserver.web.port: 8082# Comma separated list of directories to monitor for completed jobs.
historyserver.archive.fs.dir: hdfs://192.168.233.130:8020/completed-jobs/# Interval in milliseconds for refreshing the monitored directories.
historyserver.archive.fs.refresh-interval: 10000

3.2修改masters

192.168.233.130:8081

3.3创建workers

里面配置 集群 各机器的 ip 或者 主机名

192.168.233.130
192.168.233.131
192.168.233.132

4.分发集群

scp -r flink-1.12.7 hadoop103:/opt/module
scp -r flink-1.12.7 hadoop104:/opt/module

5.配置环境变量

vim /etc/profile.d/my_env.sh #FLINK_HOME
export FLINK_HOME=/opt/module/flink-1.12.7
export PATH=$FLINK_HOME/bin:$PATH

6.分发环境变量

 scp /etc/profile.d/my_env.sh hadoop102:/etc/profile.d/scp /etc/profile.d/my_env.sh hadoop103:/etc/profile.d/

7.source 环境变量

集群节点1:source /etc/profile.d/my_env.sh
集群节点2:source /etc/profile.d/my_env.sh
集群节点3:source /etc/profile.d/my_env.sh

8.添加依赖包

可以不添加试试,直接  bin/start-cluster.sh 启动 flink集群

不出意外的话 可能会出现一系列 的错误

https://mvnrepository.com/

搜索  commons-cli-1.4.jar  和 flink-shaded-hadoop-3-uber-3.1.1.7.2.1.0-327-9.0.jar 并下载

这个当然会有点慢

不过我在阿里云盘中已经放置了,可以去里面下载

9.启动flink集群

bin/start-cluster.sh

Starting standalonesession daemon on host hadoop102.
Starting taskexecutor daemon on host hadoop102.
Starting taskexecutor daemon on host hadoop103.
Starting taskexecutor daemon on host hadoop104.
 

实在不放心,在每个机器上 jps 查看下 是否又有对应的进程

 

 

 10.查看 flink web ui

http://192.168.233.130:8081/ 

11.测试

 bin/flink run examples/streaming/WordCount.jar

日志:

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/module/flink-1.12.7/lib/chunjun/chunjun-dist/connector/iceberg/chunjun-connector-iceberg.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/module/flink-1.12.7/lib/chunjun-dist/connector/iceberg/chunjun-connector-iceberg.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/module/flink-1.12.7/lib/chunjun/lib/log4j-slf4j-impl-2.16.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/module/flink-1.12.7/lib/log4j-slf4j-impl-2.16.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/module/flink-1.12.7/lib/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Executing WordCount example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID c537bf865e69549f725bb167bfa64c18
Program execution finished
Job with JobID c537bf865e69549f725bb167bfa64c18 has finished.
Job Runtime: 2678 ms

成功!

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_298120.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Java】javafx | 打包成jar包

一、说明 1、javafx项目 2、maven管理 二、解决方案 1&#xff09;加入maven插件 <build><resources><resource><!-- 这里是放在 src/main/java--><directory>src/main/java</directory><includes><include>**/*.properties&…

深度学习 - 46.DIN 深度兴趣网络

目录 一.引言 二.摘要 ABSTRACT 三.介绍 INTRODUCTION 1.CTR 在广告系统的作用 2.传统 MLP 存在的问题 3.DIN 的改进 四.近期工作 RELATEDWORK 1.传统推荐算法 2.用户行为抽取 五.背景 BACKGROUD 六.深度兴趣网络 DEEP INTEREST NETWORK 1.特征表示 Feature Repres…

【操作系统】从操作系统底层出发,成为更好的程序员

冯老爷子的模型 首先&#xff0c;我们从一个问题开始(&#xffe3;∇&#xffe3;)/ 为什么需要程序员&#xff1f; 早期的计算机程序是硬件化的&#xff0c;即使用各种门电路组装出一个固定的电路板&#xff0c;这个电路板只能用于执行某个特定的程序&#xff0c;如果需要修…

Java并发编程实践学习笔记(三)——共享对象之可见性

目录 1 过期数据 2 非原子的64位操作 3 锁和可见性 4 Volatile变量&#xff08;Volatile Variables&#xff09; 在单线程环境中&#xff0c;如果向某个变量写入值&#xff0c;在没有其他写入操作的情况下读取这个变量&#xff0c;那么总能得到相同的值。然而&…

ALOHA 开源机械臂(Viper 300 Widow X 250 6DOF机械臂组成)第一部分

软件简介&#xff1a; ALOHA 即 A Low-cost Open-source Hardware System for Bimanual Teleoperation&#xff0c;是一个低成本的开源双手遥控操作硬件系统&#xff0c;即开源机械臂。其算法 Action Chunking with Transformers (ACT) 采用了神经网络模型 Transformers&#…

C#学习笔记--实现一个可以重复权重并且能够自动排序的容器--MultiplySortedSet

目录 前言SortedSetC#自带类型自定义类SortedSet权值重复 需求自定义容器 -- MultiplySortedSet核心实现思路 MultiplySortedSet 使用C#自带类型自定义类 前言 最近需要在C#中实现一个功能 有一个容器&#xff0c;该容器能自动对里面的元素进行排序&#xff0c;类似C的优先队列…

FS5175AE降压型1-4节锂电池充电芯片

FS5175AE是一款工作于5V到24V的多串锂电池同步开关降压充电管理芯片。内置MOS管集成了低导通阻抗的NMOS&#xff0c;FS5175AE采用1MHz同步开关架构&#xff0c;实现高 效率充电并简化外围器件&#xff0c;降低BOM成本。通过调节检测电阻&#xff0c;可实现**2A充电电流&#xf…

SpringCloud(22):Sentinel对Feign的支持

Sentinel 适配了 Feign组件。如果想使用&#xff0c;除了引入 spring-cloud-starter-alibaba-sentinel 的依赖外还需要 2个步骤&#xff1a; 配置文件打开 Sentinel 对 Feign 的支持&#xff1a;feign.sentinel.enabledtrue加入 spring-cloud-starter-openfeign 依赖使 Sentin…

基于Linux系统在线安装RabbitMQ

一、前言 二、Erlang下载安装 三、RabbitMQ下载安装 三、RabbitMQ Web界面管理 一、前言 本次安装使用的操作系统是Linux centOS7。 二、Erlang下载安装 在确定了RabbitMQ版本号后&#xff0c;先下载安装Erlang环境。下面演示操作过程&#xff1a; Erlang下载链接&#…

[工具]Pytorch-lightning的使用

Pytorch-lightning的使用 Pytorch-lightning介绍Pytorch-lightning与Pytorch的区别Pytorch-lightning框架的优势Pytorch-lightning框架 重要资源 Pytorch-lightning介绍 这里介绍Pytorch_lighting框架. Pytorch-lightning与Pytorch的区别 Pytorch-lightning可以简单的看作是…

强化学习p3-策略学习

Policy Network (策略网络) 我们无法知道策略函数 π \pi π所以要做函数近似&#xff0c;求一个近似的策略函数 使用策略网络 π ( a ∣ s ; θ ) \pi(a|s;\theta) π(a∣s;θ) 去近似策略函数 π ( a ∣ s ) \pi(a|s) π(a∣s) ∑ a ∈ A π ( a ∣ s ; θ ) 1 \sum_{a\in …

《狂飙》原著来了,邀你重新见证

2023年的开篇十分精彩且丰富&#xff0c;经历过生活的不幸&#xff0c;新的一年万物复兴&#xff0c;每个人心底那颗躁动的心又重新释放&#xff0c;希望新的开始不负所望&#xff0c;年末复盘时所得皆所愿&#xff01; 开篇 开年影视第一炮&#xff0c;炸出了所有人被压抑的内…

告别PPT手残党!这6款AI神器,让你秒变PPT王者!

如果你是一个PPT手残党&#xff0c;每每制作PPT总是让你焦头烂额&#xff0c;那么你一定需要这篇幽默拉风的推广文案&#xff01; 我向你保证&#xff0c;这篇文案将帮助你发现6款AI自动生成PPT的神器&#xff0c;让你告别PPT手残党的身份&#xff0c;成为一名PPT王者。 无论…

计算机图形学 | 实验六:旋转立方体

计算机图形学 | 实验六&#xff1a;旋转立方体 计算机图形学 | 实验六&#xff1a;旋转立方体Z-缓冲GLM函数库PVM矩阵PVM矩阵的使用 华中科技大学《计算机图形学》课程 MOOC地址&#xff1a;计算机图形学&#xff08;HUST&#xff09; 计算机图形学 | 实验六&#xff1a;旋转…

单词词义、词性、例句查询python代码

单词发音、词义、词性、例句查询、输出结果更简洁&#xff0c;一次可查多个单词 运行该代码&#xff0c;命令窗口输入单词&#xff0c;单词用“/”分开&#xff0c;例如&#xff1a;noisy/problem/community/neighbor 可以更多。先安装两个python包requests、 beautifulsoup4&…

卖一辆亏5.8万美元!福特的困扰

随着电动化进入关键的「抢量」周期&#xff0c;加上年初掀起的降价潮&#xff0c;对于还无法适应转型节奏的传统汽车制造商来说&#xff0c;现在是一个艰难的时刻。 本月初&#xff0c;福特首席执行官Jim Farley表示&#xff0c;电动汽车市场的降价是"令人担忧的趋势"…

2023/5/8总结

JAVA基础知识&#xff08;2&#xff09; 1.方法 1、方法定义 格式&#xff1a;public static void 方法名&#xff08;&#xff09;{ //方法体 } 2、方法调用 格式&#xff1a;方法名&#xff08;&#xff09;&#xff1b; 3、方法的通用格式 public static 返回值类型方法名&…

车载测试-can报文解析规则实例

报文解析 报文组成 一般报文主要有以下几个参数&#xff08;比较全的情况下&#xff09; 例 解析报文时主要用到的是帧ID和帧数据 帧ID 接收到的帧ID是十六进制的形式&#xff0c;由29位标识符转换的&#xff0c;目前大多数的通信协议中都直接给出了相应的帧ID&#xff0c…

mathtype不激活能用吗 mathtype产品密钥如何取得

在文档中输入数学式子时一般会用到mathtype&#xff0c;虽然mathtype为广大用户提供了一定期限的试用期&#xff0c;但试用期后如果没有成为正式用户&#xff0c;那么部分功能可能就用不了了。有些小伙伴可能会对mathtype不激活能用吗&#xff0c;mathtype产品密钥如何取得这两…

kt:reified和sam转换(Single Abstract Method Conversions)

什么是refied关键字 ​由于我们都知道Kotlin和Java一样都存在着泛型擦除问题&#xff0c;而Kotlin它知道Java所带来的这个问题&#xff0c;所以对此Kotlin留了一个后门&#xff0c;就是通过inline函数保证使得泛型类的类型实参在运行时能够保留&#xff0c;这样的操作 Kotlin 中…