【Spark分布式内存计算框架——Spark Streaming】3.入门案例(上)官方案例运行

news/2024/4/26 17:34:03/文章来源:https://blog.csdn.net/CSDNGuoYuying/article/details/129220190

2.1 官方案例运行

运行官方提供案例,使用【$SPARK_HOME/bin/run-example】命令运行,效果如下:
在这里插入图片描述

具体步骤如下:

第一步、准备数据源启动端口,准备数据

nc -lk 9999
spark spark hive hadoop spark hive

第二步、运行官方案例

  • 使用官方提供命令行运行案例
# 官方入门案例运行:词频统计
/export/server/spark/bin/run-example --master local[2] streaming.NetworkWordCount node1.itcast.cn 9999 

第三步、运行结果
在这里插入图片描述
SparkStreaming模块对流式数据处理,介于Batch批处理和RealTime实时处理之间处理数据方式。

2.2 编程实现

基于IDEA集成开发环境,编程实现:从TCP Socket实时读取流式数据,对每批次中数据进行词频统计WordCount。

StreamingContext
回顾SparkCore和SparkSQL及SparkStreaming处理数据时编程:

1)、SparkCore

  • 数据结构:RDD
  • SparkContext:上下文实例对象

2)、SparkSQL

  • 数据结构:Dataset/DataFrame = RDD + Schema
  • SparkSession:会话实例对象, 在Spark 1.x中SQLContext/HiveContext

3)、SparkStreaming

  • 数据结构:DStream = Seq[RDD]
  • StreamingContext:流式上下文实例对象,底层还是SparkContext
  • 参数:划分流式数据时间间隔BatchInterval:1s,5s(演示)

文档:http://spark.apache.org/docs/2.2.0/streaming-programming-guide.html#initializing-streamingcontext
从官方文档可知,提供两种方式构建StreamingContext实例对象,截图如下:

第一种方式:构建SparkConf对象
在这里插入图片描述
第二种方式:构建SparkContext对象
在这里插入图片描述
编写代码
针对SparkStreaming流式应用来说,代码逻辑大致如下五个步骤:

1、Define the input sources by creating input DStreams.
定义从哪个数据源接收流式数据,封装到DStream中
2、Define the streaming computations by applying transformation and output operations to DStreams.
针对业务调用DStream中函数,进行数据处理和输出
3、Start receiving data and processing it using streamingContext.start().
4 、 Wait for the processing to be stopped (manually or due to any error) using streamingContext.awaitTermination().
5、The processing can be manually stopped using streamingContext.stop().
启动流式应用,并且一直等待程序终止(人为或异常),最后停止运行

完整StreamingWordCount代码如下所示:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* 基于IDEA集成开发环境,编程实现从TCP Socket实时读取流式数据,对每批次中数据进行词频统计。
*/
object StreamingWordCount {
def main(args: Array[String]): Unit = {
// TODO: 1. 构建StreamingContext流式上下文实例对象
val ssc: StreamingContext = {
// a. 创建SparkConf对象,设置应用配置信息
val sparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[3]")
// b.创建流式上下文对象, 传递SparkConf对象,TODO: 时间间隔 -> 用于划分流式数据为很多批次Batch
val context = new StreamingContext(sparkConf, Seconds(5))
// c. 返回
context
}
// TODO: 2. 从数据源端读取数据,此处是TCP Socket读取数据
/*
def socketTextStream(
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[String]
*/
val inputDStream: ReceiverInputDStream[String] = ssc.socketTextStream(
"node1.itcast.cn", 9999
)
// TODO: 3. 对每批次的数据进行词频统计
val resultDStream: DStream[(String, Int)] = inputDStream
// 过滤不合格的数据
.filter(line => null != line && line.trim.length > 0)
// 按照分隔符划分单词
.flatMap(line => line.trim.split("\\s+"))
// 转换数据为二元组,表示每个单词出现一次
.map(word => (word, 1))
// 按照单词分组,聚合统计
.reduceByKey((tmp, item) => tmp + item)
// TODO: 4. 将结果数据输出 -> 将每批次的数据处理以后输出
resultDStream.print(10)
// TODO: 5. 对于流式应用来说,需要启动应用
ssc.start()
// 流式应用启动以后,正常情况一直运行(接收数据、处理数据和输出数据),除非人为终止程序或者程序异常停止
ssc.awaitTermination()
// 关闭流式应用(参数一:是否关闭SparkContext,参数二:是否优雅的关闭)
ssc.stop(stopSparkContext = true, stopGracefully = true)
}
}

运行结果监控截图:
在这里插入图片描述

Streaming 应用监控
运行上述词频统计案例,登录到WEB UI监控页面:http://localhost:4040,查看相关监控信息。

其一、Streaming流式应用概要信息
在这里插入图片描述
每批次Batch数据处理总时间TD = 批次调度延迟时间SD + 批次数据处理时间PT。
在这里插入图片描述
其二、性能衡量标准
SparkStreaming实时处理数据性能如何(是否可以实时处理数据)??如何衡量的呢??

每批次数据处理时间TD <= BatchInterval每批次时间间隔

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_74334.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

面试官: 你知道 JWT、JWE、JWS 、JWK嘛?

想起了 之前做过的 很多 登录授权 的项目 它相比原先的session、cookie来说&#xff0c;更快更安全&#xff0c;跨域也不再是问题&#xff0c;更关键的是更加优雅 &#xff0c;所以今天总结了一篇文章来介绍他 JWT 指JSON Web Token&#xff0c;如果在项目中通过 jjwt 来支持 J…

hook与mixin

看完vue3就开始看vue3的源码&#xff0c;表示很懵~ 刚把rollup打包搞完&#xff0c;这不响应式就接着来了&#xff01;&#xff0c;还是写篇直接使用vue3的博客清清脑吧&#xff01; 什么是hook、mixin&#xff1f; mixin: Vue2中多个组件内存在重复JS业务逻辑&#xff0c;使…

k8s学习之路 | Day15 k8s 中的 yaml 语法

文章目录yaml 基础什么是 yaml&#xff1f;yaml 特性适用场景基本语法规则数据类型yaml 对象yaml 数组yaml 纯量yaml 引用k8s 中的 yaml 语法\<string>\<Object>\<map[string]string>\<[]Object>\<boolean>示例 yaml 说明我在学习过程中&#xf…

Mr. Cappuccino的第44杯咖啡——Kubernetes之Service

Kubernetes之ServiceService的概念Service的类型Service演示案例环境准备ClusterIP&#xff08;集群内部访问&#xff09;IptablesIPVSEndpointNodePort&#xff08;对外暴露应用&#xff09;LoadBalancer&#xff08;对外暴露应用&#xff0c;适用于公有云&#xff09;Ingress…

echo命令

这是一条内置命令。 输出指定的字符串 一、语法 echo [选项] [参数] 二、选项 -e&#xff1a;激活转义字符。 使用-e选项时&#xff0c;若字符串中出现以下字符&#xff0c;则特别加以处理&#xff0c;而不会将它当成一般文字输出&#xff1a; \a 发出警告声&#xff1b; \b 删…

产业链金融的前世今生

产业链金融脱胎于供应链金融&#xff0c;又不同于供应链金融。二者的区别是&#xff0c; 供应链金融服务于单个环节、单个企业&#xff0c;而产业链金融是以产业链的核心 企业为依托&#xff0c;针对产业链的各个环节&#xff0c;设计个性化、标准化的金融服务产品&#xff0c;…

阿里巴巴内网 Java 面试 2000 题解析(2023 最新版)

前言 这份面试清单是今年 1 月份之后开始收集的&#xff0c;一方面是给公司招聘用&#xff0c;另一方面是想用它来挖掘在 Java 技术栈中&#xff0c;还有一些知识点是我还在探索的&#xff0c;我想找到这些技术盲点&#xff0c;然后修复它&#xff0c;以此来提高自己的技术水平…

DNS 域名解析

介绍域名 网域名称&#xff08;英语&#xff1a;Domain Name&#xff0c;简称&#xff1a;Domain&#xff09;&#xff0c;简称域名、网域。 域名是互联网上某一台计算机或计算机组的名称。 域名可以说是一个 IP 地址的代称&#xff0c;目的是为了便于记忆。例如&#xff0c…

3.2 网站图的爬取路径

深度优先与广度优先方法都是遍历树的一种方法&#xff0c;但是网站的各个网页 之间的关系未必是树的结构&#xff0c;它们可能组成一个复杂的图形结构&#xff0c;即有回路。如果在前面的网站中每个网页都加一条Home的语句&#xff0c;让每个网页都能回到主界面&#xff0c;那么…

JasperReports studio相关操作

1.2 JasperReports JasperReports是一个强大、灵活的报表生成工具&#xff0c;能够展示丰富的页面内容&#xff0c;并将之转换成PDF&#xff0c;HTML&#xff0c;或者XML格式。该库完全由Java写成&#xff0c;可以用于在各种Java应用程序&#xff0c;包括J2EE&#xff0c;Web应…

Playbook的用法

目录 Playbook Playbook 与 Ad-Hoc 对比 YAML 语言特性 YAML语法简介 支持的数据类型 写法格式 1 scalar 标量 建议缩进两个空格&#xff0c;可多 2 Dictionary 字典 3 List 列表 三种常见的数据格式 Playbook 核心组件 不要用 tab 可以#注释 hosts remote_us…

Oracle-01-简介篇

&#x1f3c6;一、Oracle的历史和发展 Oracle公司成立于1977年&#xff0c;由拉里埃里森&#xff08;Larry Ellison&#xff09;、鲍勃明特&#xff08;Bob Miner&#xff09;和埃德奥茨&#xff08;Ed Oates&#xff09;共同创立。起初&#xff0c;公司的主要业务是开发和销售…

Lenovo Legion Y530-15ICH电脑 Hackintosh 黑苹果efi引导文件

原文来源于黑果魏叔官网&#xff0c;转载需注明出处。硬件型号驱动情况主板Lenovo Legion Y530-15ICH处理器Intel Core™ i7-8750H (Coffee-Lake)已驱动内存16GB RAM DDR4 2667MHz已驱动硬盘2TB HP EX950 PCI-E Gen3 x4 NVMe SSD已驱动显卡Intel UHD Graphics 630Nvidia GTX 10…

aws console 使用fargate部署aws服务快速跳转前端搜索栏

测试过程中需要在大量资源之间跳转&#xff0c;频繁的点击不如直接搜索来的快&#xff0c;于是写了一个搜索框方便跳转。 前端的静态页面可以通过s3静态网站托管实现&#xff0c;但是由于中国区需要备案的原因&#xff0c;可以使用ecs fargate部署 步骤如下&#xff1a; 编写…

DHCP服务器的使用以及可能出现的问题(图文详细版)

DHCP服务的使用 开始&#xff0d;管理工具&#xff0d;DHCP,打开DHCP服务器选项窗口 新建作用域 在此处输入名称和描述,单击下一步 随机确定一组IP地址的范围,并指定其子网掩码 , 单击下一步 若想要排除某一个/组特定的IP地址,我们可以在此界面输入该IP地址,若没有,则可…

如何使用 FreeSql 无缝接替 EF Core ?

如何使用 FreeSql 无缝接替 EF Core&#xff0c;并实现数据表的 CRUD 操作项目说明DB & 数据表结构DB & 数据表创建数据表 User 实体模型创建使用 EF Core 实现 User 表新增用户信息添加 EF Core 相关的 nuget 包编写 EF Core 操作 User 表的 CRUD 代码FreeSql 使用 Db…

AI_Papers周刊:第三期

CV - 计算机视觉 | ML - 机器学习 | RL - 强化学习 | NLP 自然语言处理 2023.02.20—2023.02.26 文摘词云 Top Papers Subjects: cs.CL 1.LLaMA: Open and Efficient Foundation Language Models 标题&#xff1a;LLaMA&#xff1a;开放高效的基础语言模型 作者&#…

zookeeper集群的搭建,菜鸟升级大神必看

一、下载安装zookeeperhttp://archive.apache.org/dist/zookeeper/下载最新版本2.8.1http://archive.apache.org/dist/zookeeper/zookeeper-3.8.1/二、上传安装包到服务器上并且解压&#xff0c;重命名tar -zxvf apache-zookeeper-3.8.1-bin.tar.gzmv apache-zookeeper-3.8.1-b…

Python安装教程(附带安装包)

首先&#xff0c;打开python安装包的下载地址&#xff0c;https://www.python.org/downloads/&#xff0c;会有些慢 点击downloads中的windows 左侧是稳定的版本&#xff0c;我这边下的是3.8的&#xff0c;不想去官网下载的可以直接用我下载的这个3.8版本&#xff0c;https://…

WebGPU学习(4)---使用 UniformBuffer

接下来让我们使用 UniformBuffer。UniformBuffer 是一个只读内存区域&#xff0c;可以在着色器上访问。 这次&#xff0c;我们将传递给着色器的矩阵存储在 UniformBuffer 中。演示示例 1.在顶点着色器中的 UniformBuffer 这次我们在顶点着色器里定义一个名为Uniforms的新结构体…