Spark(37):Streaming DataFrame 和 Streaming DataSet 创建

news/2024/4/24 12:44:58/文章来源:https://blog.csdn.net/yang_shibiao/article/details/131959537

目录

0. 相关文章链接

1. 概述

2. socket source

3. file source

3.1. 读取普通文件夹内的文件

3.2. 读取自动分区的文件夹内的文件

4. kafka source

4.1. 导入依赖

4.2. 以 Streaming 模式创建 Kafka 工作流

4.3. 通过 Batch 模式创建 Kafka 工作流

5. Rate Source


0. 相关文章链接

 Spark文章汇总 

1. 概述

        使用 Structured Streaming 最重要的就是对 Streaming DataFrame 和 Streaming DataSet 进行各种操作。从 Spark2。0 开始, DataFrame 和 DataSet 可以表示静态有界的表, 也可以表示流式无界表。与静态 Datasets/DataFrames 类似,我们可以使用公共入口点 SparkSession 从流数据源创建流式 Datasets/DataFrames,并对它们应用与静态 Datasets/DataFrames 相同的操作。通过spark.readStream()得到一个DataStreamReader对象, 然后通过这个对象加载流式数据源, 就得到一个流式的 DataFrame。

// 1. 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSession
val spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()
import spark.implicits._// 2. 从数据源(socket)中加载数据.
val lines: DataFrame = spark.readStream.format("socket") // 设置数据源.option("host", "localhost").option("port", 9999).load

spark 内置了几个流式数据源, 基本可以满足我们的所有需求:

  • File source 读取文件夹中的文件作为流式数据。 支持的文件格式: text, csv, josn, orc, parquet。 注意, 文件必须放置的给定的目录中, 在大多数文件系统中, 可以通过移动操作来完成。
  • kafka source 从 kafka 读取数据。 目前兼容 kafka 0。10。0+ 版本
  • socket source 用于测试。 可以从 socket 连接中读取 UTF8 的文本数据。 侦听的 socket 位于驱动中。 注意, 这个数据源仅仅用于测试。
  • rate source 用于测试。 以每秒指定的行数生成数据,每个输出行包含一个 timestamp 和 value。其中 timestamp 是一个 Timestamp类型(信息产生的时间),并且 value 是 Long 包含消息的数量。 用于测试和基准测试。
SourceOptionsFault-tolerantNotes
File sourcepath: path to the input directory, and common to all file formats. maxFilesPerTrigger: maximum number of new files to be considered in every trigger (default: no max) latestFirst: whether to process the latest new files first, useful when there is a large backlog of files (default: false) fileNameOnly: whether to check new files based on only the filename instead of on the full path (default: false). With this set to true, the following files would be considered as the same file, because their filenames, “dataset.txt”, are the same: “file:///dataset.txt” “s3://a/dataset.txt” “s3n://a/b/dataset.txt” “s3a://a/b/c/dataset.txt” For file-format-specific options, see the related methods in DataStreamReader(Scala/Java/Python/R). E.g. for “parquet” format options see DataStreamReader.parquet(). In addition, there are session configurations that affect certain file-formats. See the SQL Programming Guide for more details. E.g., for “parquet”, see Parquet configuration section.YesSupports glob paths, but does not support multiple comma-separated paths/globs.
Socket Sourcehost: host to connect to, must be specified port: port to connect to, must be specifiedNo
Rate SourcerowsPerSecond (e.g. 100, default: 1): How many rows should be generated per second. rampUpTime (e.g. 5s, default: 0s): How long to ramp up before the generating speed becomes rowsPerSecond. Using finer granularities than seconds will be truncated to integer seconds. numPartitions (e.g. 10, default: Spark’s default parallelism): The partition number for the generated rows. The source will try its best to reach rowsPerSecond, but the query may be resource constrained, and numPartitions can be tweaked to help reach the desired speed.Yes
Kafka SourceSee the Kafka Integration Guide.Yes

2. socket source

import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.streaming.StreamingQueryobject StreamTest {def main(args: Array[String]): Unit = {// 1. 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 2. 从数据源(socket)中加载数据.val lines: DataFrame = spark.readStream.format("socket") // 设置数据源.option("host", "localhost").option("port", 9999).load// 3. 把每行数据切割成单词val words: Dataset[String] = lines.as[String].flatMap((_: String).split("\\W"))// 4. 计算 word countval wordCounts: DataFrame = words.groupBy("value").count()// 5. 启动查询, 把结果打印到控制台val query: StreamingQuery = wordCounts.writeStream.outputMode("complete").format("console").startquery.awaitTermination()spark.stop()}
}

3. file source

3.1. 读取普通文件夹内的文件

代码示例:

import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.types.{LongType, StringType, StructType}object StreamTest {def main(args: Array[String]): Unit = {// 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 定义 Schema, 用于指定列名以及列中的数据类型val userSchema: StructType = new StructType().add("name", StringType).add("job", StringType).add("age", LongType)// 使用SparkSession通过readStream方法读取文件(必须是目录, 不能是文件名)val user: DataFrame = spark.readStream.format("csv").schema(userSchema).load("/Project/Data/csv")// DataStreamReader中还有csv、json、text等方法,可以直接读取对应的文件val userCopy: DataFrame = spark.readStream.schema(userSchema).csv("/Project/Data/csv")// 将对应的数据输出(trigger表示触发器:数字表示毫秒值. 0 表示立即处理)val query: StreamingQuery = user.writeStream.outputMode("append").trigger(Trigger.ProcessingTime(0)).format("console").start()// 启动执行器query.awaitTermination()spark.stop()}
}

模板数据:

lisi,male,18
zhiling,female,28

结果输出:

3.2. 读取自动分区的文件夹内的文件

        当文件夹被命名为 “key=value” 形式时, Structured Streaming 会自动递归遍历当前文件夹下的所有子文件夹, 并根据文件名实现自动分区。如果文件夹的命名规则不是“key=value”形式, 则不会触发自动分区。 另外, 同级目录下的文件夹的命名规则必须一致。

步骤一:创建如下目录结构

year=2023month=07month=08
year=2024month=07

步骤二:写入文件数据

lisi,male,18
zhiling,female,28

步骤三:编写代码(如上 读取普通文件夹内的文件 代码完全一致)

步骤四:启动运行打印日志

4. kafka source

4.1. 导入依赖

在其余Spark依赖的情况下,还需要导入如下SparkSQL的kafka依赖,参考文档: Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher) - Spark 3.4.1 Documentation

<dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql-kafka-0-10_2.12</artifactId><version>2.4.3</version>
</dependency>

4.2. 以 Streaming 模式创建 Kafka 工作流

import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}object StreamTest {def main(args: Array[String]): Unit = {// 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 使用spark通过readStream方法可以以流的方式读取kafka里面的数据// 通过format设置 kafka 数据源// 通过 kafka.bootstrap.servers 设置kafka的参数// 通过 subscribe 设置订阅的主题,也可以订阅多个主题:   "topic1,topic2"// load后会返回一个DataFrame类型, 其schema是固定的: key,value,topic,partition,offset,timestamp,timestampTypeval df: DataFrame = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "bigdata1:9092,bigdata2:9092,bigdata3:9092").option("subscribe", "topic1").load// 通过 selectExpr 只获取其中的value字段// 通过as转换成 Datasetval lines: Dataset[String] = df.selectExpr("CAST(value AS string)").as[String]// 可以对 Dataset 进行各种操作val query: DataFrame = lines.flatMap((_: String).split("\\W+")).groupBy("value").count()// 进行输出,并且可以通过checkpointLocation来设置checkpoint// 下次启动的时候, 可以从上次的位置开始读取query.writeStream.outputMode("complete").format("console").option("checkpointLocation", "./ck1") .start.awaitTermination()// 关闭执行环境spark.stop()}
}

4.3. 通过 Batch 模式创建 Kafka 工作流

        这种模式一般需要设置消费的其实偏移量和结束偏移量, 如果不设置 checkpoint 的情况下, 默认起始偏移量 earliest, 结束偏移量为 latest。该模式为一次性作业(批处理), 而非持续性的处理数据。

import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}object StreamTest {def main(args: Array[String]): Unit = {// 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._// 使用 read 方法,而不是 readStream 方法val lines: Dataset[String] = spark.read.format("kafka").option("kafka.bootstrap.servers", "bigdata1:9092,bigdata2:9092,bigdata3:9092").option("subscribe", "topic1").option("startingOffsets", "earliest").option("endingOffsets", "latest").load.selectExpr("CAST(value AS STRING)").as[String]// 同样对 Dataset[String] 进行各种操作val query: DataFrame = lines.flatMap(_.split("\\W+")).groupBy("value").count()// 使用 write 而不是 writeStreamquery.write.format("console").save()// 关闭执行环境spark.stop()}
}

5. Rate Source

以固定的速率生成固定格式的数据, 用来测试 Structured Streaming 的性能

import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{DataFrame, SparkSession}object StreamTest {def main(args: Array[String]): Unit = {// 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("StreamTest").getOrCreate()import spark.implicits._val rows: DataFrame = spark.readStream.format("rate") // 设置数据源为 rate.option("rowsPerSecond", 10) // 设置每秒产生的数据的条数, 默认是 1.option("rampUpTime", 1) // 设置多少秒到达指定速率 默认为 0.option("numPartitions", 2) /// 设置分区数  默认是 spark 的默认并行度.loadrows.writeStream.outputMode("append").trigger(Trigger.Continuous(1000)).format("console").start().awaitTermination()// 关闭执行环境spark.stop()}
}

注:其他Spark相关系列文章链接由此进 ->  Spark文章汇总 


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_337137.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2023牛客暑期多校-J-Qu‘est-ce Que C‘est?(DP)

题意&#xff1a; 给定长度为n的数列,要求每个数都在的范围&#xff0c;且任意长度大于等于2的区间和都大于等于0&#xff0c;问方案数。。 思路&#xff1a; 首先要看出是dp题&#xff0c;用来表示遍历到第i位且后缀和最小为x的可行方案数&#xff08;此时的后缀可以只有最…

Java 版 spring cloud +spring boot 工程系统管理 工程项目管理系统源码 工程项目各模块及其功能点清单

工程项目各模块及其功能点清单 一、系统管理 1、数据字典&#xff1a;实现对数据字典标签的增删改查操作 2、编码管理&#xff1a;实现对系统编码的增删改查操作 3、用户管理&#xff1a;管理和查看用户角色 4、菜单管理&#xff1a;实现对系统菜单的增删改查操…

linux下nginx的安装和使用

文章目录 &#x1f4d2;安装nginx正常使用nginx包安装1️⃣上传到对应目录2️⃣解压nginx3️⃣检查是否启动成功 使用docker安装nginx &#x1f4d2;使用nginx1️⃣简单的反向代理2️⃣介绍location配置中root和alias的区别 &#x1f4d2;安装nginx 正常使用nginx包安装 官网…

BLE基础理论/Android BLE开发示例

参考&#xff1a;https://blog.csdn.net/qq_36075612/article/details/127739150?spm1001.2014.3001.5502 参考&#xff1a; https://blog.csdn.net/qq_36075612/article/details/122772966?spm1001.2014.3001.5502 目录 蓝牙的分类传统蓝牙低功耗蓝牙 蓝牙专业词汇&#xff…

SpringAOP的相关概念

文章目录 一.什么是AOP二.AOP的组成部分三.SpringAOP的实现3.1 增加SpringAOP依赖3.2 创建切面3.2 创建切点3.3 创建通知3.4 创建连接点 四.SpringAOP的实现原理4.1 JDK动态代理4.2 CGLIB 动态代理总结 一.什么是AOP AOP&#xff0c;全称为Aspect-Oriented Programming&#x…

解决 tensorflow 出现的 ImportError: Could not find the DLL(s) ‘msvcp140_1.dll‘. 问题

在安装完tensorflow库后出现 问题详述&#xff1a; ImportError: Could not find the DLL(s) msvcp140_1.dll. TensorFlow requires that these DLLs be installed in a directory that is named in your %PATH% environment variable. You may install these DLLs by downlo…

新零售行业如何做会员管理和会员营销

蚓链数字化营销系统全渠道会员管理解决方案&#xff0c;线上线下统一管理&#xff0c;打造私域流量&#xff0c;微信、门店会员全渠道管理&#xff0c;打通私域流量池&#xff0c;实现裂变营销&#xff1a; 开启新零售之路&#xff0c;必然要摒弃原有的管理模式&#xff0c;大…

郑州多域名https证书

多域名https证书是https证书中比较特殊的一款&#xff0c;它保护的域名记录是众多https证书中最灵活的。不管是DV基础型的多域名https证书还是OV企业型和EV增强型的多域名https证书既可以保护多个主域名或者子域名&#xff0c;还可以主域名子域名随意组合&#xff0c;只要申请者…

【动态规划part11】| 123.买卖股票的最佳时机III、188.买卖股票的最佳时机IV

目录 &#x1f388;LeetCode123.买卖股票的最佳时机III &#x1f388;LeetCode188.买卖股票的最佳时机IV &#x1f388;LeetCode123.买卖股票的最佳时机III 链接&#xff1a;123.买卖股票的最佳时机III 给定一个数组&#xff0c;它的第 i 个元素是一支给定的股票在第 i…

无涯教程-jQuery - Pulsate方法函数

Pulsate 效果可以与effect()方法一起使用。这会使元素的不透明性产生多次脉冲。 Pulsate - 语法 selector.effect( "pulsate", {arguments}, speed ); 这是所有参数的描述- times - 脉动的时间。默认值为3。model - 效果的模式。可以是"显示(show)"&a…

基于Java+SpringBoot+vue前后端分离技术交流和分享平台设计实现

博主介绍&#xff1a;✌全网粉丝30W,csdn特邀作者、博客专家、CSDN新星计划导师、Java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专…

Moke 一百万条 Mysql 的数据

文章目录 前言创建数据库创建表结构生成数据 前言 想研究一下&#xff0c;数据量大的情况下&#xff0c;如何优化前端分页&#xff0c;所以需要 Moke 一些数据 创建数据库 在 Mysql的基础上&#xff0c;可以写个语句执行 CREATE DATABASE test_oneMillion; USE test_oneMi…

JAVA SE -- 第十一天

&#xff08;全部来自“韩顺平教育”&#xff09; 异常-Exception 一、异常介绍 1、基本介绍 Java语言中&#xff0c;将程序执行中发生的不正常情况为“异常”&#xff08;开发过程中的语法错误和逻辑错误不是异常&#xff09; 2、执行过程中发生的异常事件可分为两大类 …

Reinforcement Learning with Code 【Chapter 9. Policy Gradient Methods】

Reinforcement Learning with Code This note records how the author begin to learn RL. Both theoretical understanding and code practice are presented. Many material are referenced such as ZhaoShiyu’s Mathematical Foundation of Reinforcement Learning, . 文章…

opencv+ffmpeg环境(ubuntu)搭建全面详解

一.先讲讲opencv和ffmpeg之间的关系 1.1它们之间的联系 我们知道opencv主要是用来做图像处理的&#xff0c;但也包含视频解码的功能&#xff0c;而在视频解码部分的功能opencv是使用了ffmpeg。所以它们都是可以处理图像和视频的编解码&#xff0c;我个人感觉两个的侧重点不一…

SpringBoot项目部署(前后端分离、Linux部署项目)

一、架构 部署环境说明&#xff1a; 192.168.122.100(服务器A)&#xff1a; Nginx&#xff1a;部署前端项目、配置反向代理 Mysql&#xff1a;主从复制结构中的主库 192.168.122.131 (服务器B)&#xff1a; jdk: 运行Java项目 git:版本控制工具 (从gitee中拉取源码) maven:…

数据结构:快速的Redis有哪些慢操作?

redis 为什么要这莫快&#xff1f;一个就是他是基于内存的&#xff0c;另外一个就是他是他的数据结构 说到这儿&#xff0c;你肯定会说&#xff1a;“这个我知道&#xff0c;不就是 String&#xff08;字符串&#xff09;、List&#xff08;列表&#xff09;、 Hash&#xff08…

【SpringCloud Alibaba】(五)服务雪崩与容错方案

在前面的文章中&#xff0c;我们实现了用户微服务、商品微服务和订单微服务之间的远程调用&#xff0c;并且实现了服务调用的负载均衡。 但是&#xff0c;现在系统中存在着一个很明显的问题&#xff1a;那就是如果系统的并发量上来后&#xff0c;系统并没有容错的能力&#xf…

Java | 继承、多态、抽象类与接口

目录 一、类的继承 二、Object类 2.1 getClass()方法 2.2 toString()方法 2.3 equals()方法 三 、对象类型的转换 3.1 向上转换 3.2 向下转型 四、使用instanceof关键字判断对象类型 五、方法的重载 六、final关键字 6.1 final变量 6.2 final方法 6.3 final类 七…

LeetCode 1857. Largest Color Value in a Directed Graph【拓扑排序,动态规划】困难

本文属于「征服LeetCode」系列文章之一&#xff0c;这一系列正式开始于2021/08/12。由于LeetCode上部分题目有锁&#xff0c;本系列将至少持续到刷完所有无锁题之日为止&#xff1b;由于LeetCode还在不断地创建新题&#xff0c;本系列的终止日期可能是永远。在这一系列刷题文章…