Flink- 物理分区、Sink输出

news/2024/5/4 4:28:44/文章来源:https://blog.csdn.net/dafsq/article/details/129707652

物理分区

        随机分区(shuffle)

        轮询分区(Round-Robin) 

        重缩放分区(rescale) 

        广播(broadcast)

        全局分区(global)

        自定义分区(Custom)

输出算子(Sink)

        连接到外部系统

        输出到文件

        输出到 Kafka

        输出到 Redis

        输出到 MySQL(JDBC)


物理分区

        “分区”(partitioning)操作就是要将数据进行重新分布,传递到不同的流分区去进行下一 步计算。keyBy()是一种逻辑分区(logical partitioning)操作。 Flink 对于经过转换操作之后的 DataStream,提供了一系列的底层操作算子,能够帮我们 实现数据流的手动重分区。为了同 keyBy()相区别,我们把这些操作统称为“物理分区”操作。

        常见的物理分区策略有随机分区、轮询分区、重缩放和广播,还有一种特殊的分区策略— —全局分区,并且 Flink 还支持用户自定义分区策略。

        随机分区(shuffle)

        最简单的重分区方式就是直接“洗牌”。通过调用 DataStream 的 shuffle()方法,将数据随 机地分配到下游算子的并行任务中去。

        创建一个数据流类作为数据源 

class f4 extends SourceFunction[ Event ]{ //实现SourceFunction接口 泛型为之前定义好的样例类Event//标志位var running = true//重写抽象方法override def run(sourceContext: SourceFunction.SourceContext[Event]): Unit = {//随机数生成器val random = new Random ()//定义数据随机选择的范围val user = Array ("张三", "李四", "王五")val url = Array ("02", "01", "03", "04")//用标志位作为循环判断条件,不停的发出数据while (running) {val event = Event (user (random.nextInt (user.length) ), url (random.nextInt (url.length) ), Calendar.getInstance.getTimeInMillis)//调用ctx的方法向下游发送数据sourceContext.collect (event)//每隔1秒发送一条数据Thread.sleep (1000)}}override def cancel(): Unit = running = false
}

        进行随机分区操作

def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//设置全局并行度env.setParallelism(1)//读取数据流文件val stream: DataStream[Event] = env.addSource( new f4)//洗牌之后打印输出stream.shuffle.print().setParallelism(4) //并行度设置为4//执行env.execute()}

        分区结果基本都是均匀随机的 

 

        轮询分区(Round-Robin) 

         轮询也是一种常见的重分区方式。简单来说就是“发牌”,按照先后顺序将数据做依次分发。通过调用 DataStream的.rebalance()方法,就可以实现轮询重分区。rebalance() 使用的是 Round-Robin 负载均衡算法,可以将输入流数据平均分配到下游的并行任务中去 

def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//设置全局并行度env.setParallelism(1)//读取数据流文件val stream: DataStream[Event] = env.addSource( new f4)//轮询重分区之后打印输出stream.rebalance.print().setParallelism(4) //并行度设置为4//执行env.execute()}

        按照规律顺序的轮询分区输出

        重缩放分区(rescale) 

 重缩放分区和轮询分区非常相似。当调用 rescale()方法时,其实底层也是使用 Round-Robin算法进行轮询,但是只会将数据轮询发送到下游并行任务的一部分中。也就 是说,“发牌人”如果有多个,那么 rebalance()的方式是每个发牌人都面向所有人发牌;而 rescale()的做法是分成小团体,发牌人只给自己团体内的所有人轮流发牌。

def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//设置全局并行度env.setParallelism(1)//读取数据流文件val stream: DataStream[Int] = env.addSource(new RichParallelSourceFunction[Int] { //定义一个并行的数据源override def run(ctx: SourceFunction.SourceContext[Int]): Unit = {for (i <- 0 to 10) {//利用运行时上下文中的subTask的信息来控制数据由哪个并行子任务生成if( getRuntimeContext.getIndexOfThisSubtask == (i + 1) % 2 )ctx.collect(i + 1)}}override def cancel(): Unit = ???})setParallelism(2) //并行度设置为2//分区之后打印输出stream.rescale.print().setParallelism(4) //并行度设置为4//执行env.execute()}

        广播(broadcast)

        这种方式其实不应该叫作“重分区”,因为经过广播之后,数据会在不同的分区都保留一 份,可能进行重复处理。可以通过调用 DataStream 的 broadcast()方法,将输入数据复制并发送 到下游算子的所有并行任务中去。

def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//设置全局并行度env.setParallelism(1)//读取数据流文件val stream: DataStream[Event] = env.addSource( new f4)//分区之后打印输出stream.broadcast.print().setParallelism(4) //并行度设置为4//执行env.execute()}

        全局分区(global)

        全局分区也是一种特殊的分区方式。这种做法非常极端,通过调用.global()方法,会将所 有的输入流数据都发送到下游算子的第一个并行子任务中去。这就相当于强行让下游任务并行 度变成了 1,所以使用这个操作需要非常谨慎,可能对程序造成很大的压力。

 def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//设置全局并行度env.setParallelism(1)//读取数据流文件val stream: DataStream[Event] = env.addSource( new f4)//分区之后打印输出stream.global.print().setParallelism(4) //并行度设置为4//执行env.execute()}

        执行后全部被划分到一个分区中

        自定义分区(Custom)

        当Flink提供的所有分区策略都不能满足用户的需求时,我们可以通过使用partitionCustom()方法来自定义分区策略。 在调用时,方法需要传入两个参数,第一个是自定义分区器(Partitioner)对象,第二个 是应用分区器的字段,它的指定方式与 keyBy 指定 key 基本一样:可以通过字段名称指定, 也可以通过字段位置索引来指定,还可以实现一个 KeySelector 接口。

def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.fromElements(1,2,3,4,5,6,7,8).partitionCustom(new Partitioner[Int] {// 根据 key 的奇偶性计算出数据将被发送到哪个分区override def partition(key: Int, numPartitions: Int): Int = key % 2},data => data // 以自身作为 key).print()env.execute()}

输出算子(Sink)

        连接到外部系统

        与 Source 算子非常类似,除去一些 Flink 预实现的 Sink,一般情况下 Sink 算子的创建是 通过调用 DataStream 的 addSink()方法实现的。

stream.addSink(new SinkFunction(…))

        addSource 的参数需要实现一个 SourceFunction 接口;类似地,addSink 方法同样需要传入 一个参数,实现的是 SinkFunction 接口。在这个接口中只需要重写一个方法 invoke(),用来将指 定的值写入到外部系统中。这个方法在每条数据记录到来时都会调用:

default void invoke(IN value, Context context) throws Exception

        输出到文件

case class Event(user: String, url: String, timestamp: Long)
 def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//设置全局并行度env.setParallelism(1)//创建当前样例类Event的对象val stream: DataStream[Event] = env.fromElements(Event("张三", "01", 1000L),Event("李四", "04", 2000L),Event("王五", "01", 6000L),Event("赵六", "03", 1000L))//以文本形式分布式的写入文件中//定义fileSinkval fileSink: StreamingFileSink[String] = StreamingFileSink.forRowFormat(new Path("D:\\Flink\\datas"), new SimpleStringEncoder[String]("UTF-8")).build()//转换为String              StreamingFileSink.forRowFormat("路径","编码器")stream.map(_.toString).addSink(fileSink)//执行env.execute()}

        输出到 Kafka

        Flink 官方为 Kafka 提供了 Source 和 Sink 的连接器,我们可以用它方便地从 Kafka 读写数 据。Flink 与 Kafka 的连接器提供了端到端的精确一次(exactly once)语义保证,这在实际项 目中是最高级别的一致性保证。

配置步骤:

  1. 添加 Kafka 连接器依赖 。
  2. 启动 Kafka 集群
  3. 编写输出到 Kafka 的示例代码

        我们可以直接将用户行为数据保存为文件 clicks.csv,读取后不做转换直接写入 Kafka,主 题(topic)命名为“clicks”。 

def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//设置全局并行度env.setParallelism(1)//将数据写入到kafa//创建连接配置val properties = new Properties()properties.put("bootstrap.servers", "hadoop102:9092")val stream = env.readTextFile("input/clicks.csv")stream.addSink(new FlinkKafkaProducer[String]("clicks",new SimpleStringSchema(),properties))//执行env.execute()}

        我们可以直接将用户行为数据保存为文件 clicks.csv,读取后不做转换直接写入 Kafka,主 题(topic)命名为“clicks”。

bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic clicks

        输出到 Redis

        Flink 没有直接提供官方的 Redis 连接器,不过 Bahir 项目还是担任了合格的辅助角色,为 我们提供了 Flink-Redis 的连接工具。但版本升级略显滞后,目前连接器版本为 1.1,支持的 Scala 版本最新到 2.11。

导入的 Redis 连接器依赖

<dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.0</version>
</dependency>
 def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)//创建连接配置val conf = new FlinkJedisPoolConfig.Builder().setHost("hadoop102").build()env.addSource(new ClickSource).addSink(new RedisSink[Event](conf, new MyRedisMapper())) //写入env.execute()}

        输出到 MySQL(JDBC)

        添加依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId><version>${flink.version}</version>
</dependency>
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.47</version>
</dependency>

        准备数据表 

        写入mysql 

def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//设置全局并行度env.setParallelism(1)//将数据写入到mysqlval stream: DataStream[Event] = env.fromElements(Event(4, "aa", 18),Event(5, "bb", 23),Event(6, "cc", 60),Event(7, "dd",15))//JdbcSink.sink(需要写入的sql语句,添加的元素,连接jdbc的配置)stream.addSink( JdbcSink.sink("INSERT INTO user(id,name,age) VALUES(?,?,?)", //定义写入mysql的语句new JdbcStatementBuilder[Event] {override def accept(t: PreparedStatement, u: Event): Unit = {//setInt(索引位置,元素)t.setInt(1,u.id)t.setString(2,u.name)t.setInt(3,u.age)}},//创建JDBC连接的配置项new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://master:3306/spark-sql").withDriverName("com.mysql.jdbc.Driver") //添加驱动类.withUsername("root") //指定用户名.withPassword("p@ssw0rd") //指定密码.build()))//执行env.execute()}

         如果运行时报如下错误:

 Wed Mar 22 20:36:28 CST 2023 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.

        警告:不建议在没有服务器身份验证的情况下建立SSL连接。根据MySQL 5.5.45+、5.6.26+和5.7.6+的要求,如果没有设置显式选项,默认必须建立SSL连接。为了符合不使用SSL的现有应用程序,verifyServerCertificate属性被设置为'false'。您需要通过设置useSSL=false显式禁用SSL,或者设置useSSL=true并为服务器证书验证提供信任存储区。

        hive中conf目录下的hive-site.xml中在mysql连接字符串的url中添加配置 ?useSSL=false即可

useSSL=false              //禁用SSL
useServerPrepStmts=true   //开启预编译功能

         到数据表中查看添加成功

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_274776.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Studio One6中文语言版DAW数字音频音乐创作软件

Studio One6是一款非常实用的数字音乐创作软件&#xff0c;专门用于创作现代化音乐&#xff0c;软件具有简洁的界面和强大的功能&#xff0c;能够很好地辅助用户创作音乐。顾名思义就是“一个工作室”的意思&#xff0c;它所倡导的制作理念是直接在一个制作软件里完成音乐制作的…

Android 解包payload.bin文件,获取system.img

解析payload.bin获取.img文件 payload.bin payload.bin是Android OTA镜像打包文件&#xff0c;将包括system.img、boot.img和lk.img等在内的Android系统进行&#xff0c;打包为一个payload.bin文件。 在系统OTA过程中&#xff0c;系统会自动解压安装。 前期准备 需要安装py…

学习Java日志框架之——搞懂日志门面(JCL+SLF4J)

文章目录一、什么是日志门面1、门面模式&#xff08;外观模式&#xff09;2、日志门面二、了解JCL1、JCL组件结构2、JCL案例&#xff08;1&#xff09;JCL默认实现&#xff08;2&#xff09;导入log4j测试原有程序三、SLF4J简介四、SLF4J基本使用1、入门案例2、动态打印信息3、…

一次内存泄露排查

前因&#xff1a; 因为测试 长时间压测导致 接口反应越来越慢&#xff0c;甚至 导致服务器 崩溃 排查过程 1、top 查看是 哪个进程 占用 内存过高 2、根据 进程 id 去查找 具体是哪个 程序的问题 ps -ef| grep 41356 可以看到 具体的 容器位置 排查该进程 对象存活 状态…

23年PMP考试会使用第七版教材吗?

大家都知道了&#xff0c;今年的考纲是改版了的&#xff0c;为啥要改版呢&#xff0c;因为《PMBOK指南》更新到第七版了&#xff0c;考纲自然也要更新&#xff0c;据PMI的市场调查&#xff0c;近年来&#xff0c;项目管理行业新趋势在第六版和旧考纲中未收纳&#xff0c;为了确…

三、数据链路层

&#xff08;一&#xff09;纠错与检错1、奇偶校验码&#xff08;再研究下&#xff0c;原理知道&#xff0c;具体过程无法重现&#xff09;分为奇校验和偶校验&#xff0c;奇偶校验位在首部或尾部&#xff0c;奇偶校验满信息位奇偶校验位&#xff08;1&#xff09;原理&#xf…

Redis 数据结构

这里写目录标题Redis 数据结构一、String类型String数据类型的使用场景key 的设置约定二、Hash数据类型string存储对象&#xff08;json&#xff09;与hash存储对象的区别三、list 类型四、set 类型set数据交并差操作set 类型数据操作的注意事项六、sorted_set 类型Redis 数据结…

算法----火柴拼正方形

题目 你将得到一个整数数组 matchsticks &#xff0c;其中 matchsticks[i] 是第 i 个火柴棒的长度。你要用 所有的火柴棍 拼成一个正方形。你 不能折断 任何一根火柴棒&#xff0c;但你可以把它们连在一起&#xff0c;而且每根火柴棒必须 使用一次 。 如果你能使这个正方形&a…

Junit单元测试框架

1)Junit是一个开源的JAVA语言的单元测试框架&#xff0c;也是JAVA方向使用最广泛的单元测试框架&#xff0c;使用JAVA开发者都应该学习junit框架&#xff0c;并且掌握单元测试的编写 2)selenium和Junit都可以被导入到maven项目里面 3)先进行创建maven项目&#xff0c;导入相关依…

linux 全局环境变量删除后 还有 仍然存在

linux 全局环境变量删除后 还有 仍然存在1、编辑 /etc/profile2、设置REDISCLI_AUTH后&#xff0c;redis-cli 进去redis后不需要再次认证2、删除全局环境后 source后 仍然存在3、unset释放全局环境变量4、总结1、编辑 /etc/profile 设置redis环境变量 在末尾加入一行 export R…

家电企业数字工厂系统解决方案

国内小型家电生产商的中小企业普遍使用传统的手工作业模式&#xff0c;依靠大量的人力&#xff0c;线下管理各种数据&#xff0c;如&#xff1a;纸质文档、excel制作等&#xff0c;信息化程度非常低&#xff0c;严重限制着企业生产效率的提升和生产规模的扩大。对传统制造企业来…

基于WEB的网上购物系统的设计与实现(附:源码 论文 sql文件)

摘 要 随着计算机网络技术的飞速发展和人们生活节奏的不断加快&#xff0c;电子商务技术已经逐渐融入了人们的日常生活当中&#xff0c;网上商城作为电子商务最普遍的一种形式&#xff0c;已被大众逐渐接受。因此开发一个网上商城系统&#xff0c;适合当今形势&#xff0c;更加…

AWS白皮书 – 成本优化

本文讲解AWS良好架构框架&#xff08;AWS Well-Architected Framework&#xff09;里其中五大支柱之一&#xff1a;成本优化&#xff08;Cost Optimization&#xff09;。 一套成本优化型系统应充分利用全部资源、以最低价格来实现业务成果&#xff0c;同时充分满足你的功能需…

Google Bard VS ChatGPT:哪个是更好的AI聊天机器人?

文章目录前言一、Bard和ChatGPT的宏观对比二、应用场景不同三、知识的时效性四、未来的归宿总结前言 自从 OpenAI 向公众发布ChatGPT以来的过去几个月里&#xff0c;我们都见证了围绕 ChatGPT 的各种测评&#xff0c;并为它带来的效果感到惊艳。 昨晚Google开放了自家研发的A…

SpringBoot的简介和使用

文章目录1. SpringBoot简介和概述2. SpringBoot的使用3.SpringBoot 项目打包及运行4.切换web服务器1. SpringBoot简介和概述 Spring Boot是由Pivotal团队提供的一套开源框架&#xff0c;可以简化spring应用的创建及部署。它提供了丰富的Spring模块化支持&#xff0c;可以帮助开…

JUC并发编程共享模型之不可变(五)

5.1 问题引出 public interface Account {// 获取余额Integer getBalance();void withdraw(Integer amount);/*** 方法内会启动1000个线程&#xff0c;每个线程做-10元的操作* 如果初始余额为 10000 那么正确的结果应当是0*/static void demo(Account account){List<Thread…

整数拼接(思维枚举,两变量满足某条件-->通过其中一变量根据条件推断另一变量

2068.整数拼接&#xff08;思维&#xff0c;枚举&#xff09; 输入样例&#xff1a; 4 2 1 2 3 4输出样例&#xff1a; 6大佬思路 很多需要双重循环两个值&#xff0c;暴力判断组合在一起是否满足某个条件(比如等式是否成立)&#xff0c; 其实可以换个角度&#xff0c;遍历…

WPF中阴影效果和模糊效果的使用

总目录 文章目录总目录前言一、DropShadowEffect1、DropShadowEffect各属性效果图2、实现代码二、BlurEffect1、BlurEffect各属性效果图2、实现代码3、进阶使用结语前言 WPF中的控件效果主要通过Effect来实现&#xff0c;而Effect有DropShadowEffect&#xff08;投影效果&…

【并发】详解redis的incr、decr命令

一、前言 redis是一个单线程的服务&#xff0c;那么所有的命令肯定会排队被redis执行&#xff0c;redis提供的命令都是原子性的&#xff0c;百度搜索incr\decr就是说将对应的key1&#xff0c;key-1的值重新set到redis中&#xff0c;而且很多都是认为incr\decr原子性的&#xf…

chatgpt优化使用手册

提问方式的优化 我们在首次使用chatgpt的时候&#xff0c;当我们问它一些问题的时候&#xff0c;我们会发现它的有些回答广泛且空洞&#xff0c;不够专业&#xff0c;但是chatgpt是能实现角色的扮演和切换的&#xff0c;所以我们在提问它时需要先给它输入一些剧本&#xff0c;…