学习spark笔记

news/2024/4/27 16:57:27/文章来源:https://blog.csdn.net/qq_42496461/article/details/130307727

✨ 学习 Spark 和 Scala

一 ​ 🐦Spark 算子

spark常用算子详解(小部分算子使用效果与描述不同)

Spark常用的算子以及Scala函数总结

Spark常用Transformations算子(二)

  1. Transformation 算子(懒算子):不会提交spark作业,从一个RDD转换成另一个RDD
  2. Action 算子:触发 SparkContext 提交job作业,将数据输出到Spark系统。返回类型是一个其他的数据类型

ps.show 属于action算子

一 Transformation 算子(Value数据类型)

1. 输入分区与输出分区 一对一

map,flatMap,mapPartirions

glom:算子将一个RDD分区中的元素 组成Array数组的形式 eg.RDD[Int] => RDD[Arrray[Int]]

请添加图片描述
请添加图片描述
在这里插入图片描述

2. 输入分区与输出分区多对一

union:我测试代码分区是叠加 网上别人文档是取最大分区 存疑。输出分区应大于输入分区,或者说输入2+2输出4。

cartesian:取笛卡儿积。问题同union

在这里插入图片描述在这里插入图片描述

3. 输入分区与输出分区多对多

groupby Key 算子 也是shuffle类算子。 (默认分区数与原RDD分区数一致可以重新指定)

sortby key 算子 也是shuffle类算子。 (默认分区数与原RDD分区数一致可以重新指定,默认使用范围分区器) 在spark ui界面查看 job sortby算子会触发一个job 这个Job是用于评估数据分布,评估结果用于后续的排序操作,并不是真正的排序操作。

4. 输出分区为输入分区子集

distnct 算子:也是 shuffle类算子 。map reducebykey map (默认分区数与原RDD分区数一致可以重新指定)

filter 算子:过滤算子

subtract 算子:也是 shuffle类算子 。RDD1 去除 RDD1与RDD2交集的 剩下的RDD1 (默认分区数与原RDD分区数一致可以重新指定)

intersection 算子: 也是 shuffle类算子 。返回两个RDD的交集并去重 (默认分区数与原RDD分区数一致可以重新指定)

sample 算子。采样 将 RDD 这个集合内的元素进行采样,获取所有元素的子集。用户可以设定是否有放回的抽样、百分比、随机种子,进而决定采样方式。内部实现是生成 SampledRDD(withReplacement, fraction, seed)。函数参数设置:withReplacement=true,表示有放回的抽样。withReplacement=false,表示无放回的抽样。
关于spark的sample()算子参数详解

takeSample:算子 不使用相对比例采样,而是按设定的采样个数进行采样

    println("distinct")val d1:RDD[String] = sparkSession1.sparkContext.makeRDD(Array("hello","hello","world"))val distinctRDD:RDD[String] = d1.map((_,1)).reduceByKey(_+_).map(_._1)distinctRDD.foreach(println)

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

5. Cache 型算子

cache:算子将RDD元素从磁盘缓存到内存 相当于 persist(MEMORY_ONLY)函数。分区一对一的

persist:算子有缓存 内存 磁盘 压缩等多个参数可选

二 Transformation 算子(Key - Value数据类型)

1. 输入分区与输出分区一对一

mapValues:算子 对value处理 可以用map代替

在这里插入图片描述

2. 单个RDD聚集算子

combineByKey:combineByKey的使用 也是 shuffle类算子 。

reduceBykey:通过key值相同会去聚合 也是 shuffle类算子 。reuduceByKey 会在map端 先进行本地combine (预聚合),减少传输到reduce端的数据量,减少网络传输的开销。只有reduceByKey处理不了的时候,会用 groupByKey().map() 代替

eg.SparkStreaming 算子:reduceByKeyAndWindow 窗口函数 每10秒计算一下前15秒的内容

  1. 存储上一个window的reduce值
  2. 计算出上一个window的begin 时间到 重复段的开始时间的reduce 值 =》 oldRDD
  3. 重复时间段的值结束时间到当前window的结束时间的值 =》 newRDD
  4. 重复时间段的值等于上一个window的值减去oldRDD =》coincodeRDD
  5. coincodeRDD + newRDD
//PairRDDFunction ,PairRDD属于RDD RDD的方法也通用 PairRDD 就是键值对的RDD 也是 RDD[Tuple2[]]   
def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {reduceByKey(defaultPartitioner(self), func)}
//入参的func 必须是(V,V)=>V的类型 操作两个Value;返回的结果必须是(Key,Value) 
//可以用self和this访问自身成员
//withScope 源码参照private[spark] def withScope[U](body: => U): U = RDDOperationScope.withScope[U](sc)(body)

Spark源码之withScope方法的理解

partitionBy:自定义分区器 也是 shuffle类算子 。

repartitionAndSortWithinPartitions(partitioner):该方法根据partitioner对RDD进行分区,并且在每个结果分区中按key进行排序。也是 shuffle类算子 。

coalesce(numPartitions):重新分区,减少RDD中分区的数量到numPartitions。也是 shuffle类算子 。

repartition(numPartitions):repartition是coalesce接口中shuffle为true的简易实现,即Reshuffle RDD并随机分区,使各分区数据量尽可能平衡。若分区之后分区数远大于原分区数,则需要shuffle。

aggregateBykey :也是 shuffle类算子 。没有使用过 对所有分区的元素先聚合再fold操作?

3. 两个RDD聚集算子

cogroup :合并两个RDD,生成一个新的RDD。实例中包含两个Iterable值,第一个表示RDD1中相同值,第二个表示RDD2中相同值(key值),这个操作需要通过partitioner进行重新分区,因此需要执行一次shuffle操作。(若两个RDD在此之前进行过shuffle,则不需要)

4. 连接

join:对两个需要连接的 RDD 进行 cogroup函数操作,将相同 key 的数据能够放到一个分区,在 cogroup 操作之后形成的新 RDD 对每个key 下的元素进行笛卡尔积的操作,返回的结果再展平,对应 key 下的所有元组形成一个集合。最后返回 RDD[(K, (V, W))]。
  下 面 代 码 为 join 的 函 数 实 现, 本 质 是通 过 cogroup 算 子 先 进 行 协 同 划 分, 再 通 过flatMapValues 将合并的数据打散。
this.cogroup(other,partitioner).f latMapValues{case(vs,ws) => for(v<-vs;w<-ws)yield(v,w) }
图 20是对两个 RDD 的 join 操作示意图。大方框代表 RDD,小方框代表 RDD 中的分区。函数对相同 key 的元素,如 V1 为 key 做连接后结果为 (V1,(1,1)) 和 (V1,(1,2))。

leftOutJoin和rightOutJoin: LeftOutJoin(左外连接)和RightOutJoin(右外连接)相当于在join的基础上先判断一侧的RDD元素是否为空,如果为空,则填充为空。 如果不为空,则将数据进行连接运算,并
返回结果。
下面代码是leftOutJoin的实现。
if (ws.isEmpty) {
vs.map(v => (v, None))
} else {
for (v <- vs; w <- ws) yield (v, Some(w))
}

zip:拉齐算子 包括 zipWithIndex(下标为value) 该函数将RDD中的元素和这个元素在RDD中的ID(索引号)组合成键/值对,zipWithUniqueId 该函数将RDD中元素和一个唯一ID组合成键/值对,该唯一ID生成算法如下:
每个分区中第一个元素的唯一ID值为:该分区索引号,
每个分区中第N个元素的唯一ID值为:(前一个元素的唯一ID值) + (该RDD总的分区数)
该函数将RDD中的元素和这个元素在RDD中的ID(索引号)组合成键/值对。

所以分区不用值也不一样

通过SparkContext 提交作业 触发RDD DAG 的执行

  /*** Return an array that contains all of the elements in this RDD.** @note This method should only be used if the resulting array is expected to be small, as* all the data is loaded into the driver's memory.*/
//collect 算子的方法 def collect(): Array[T] = withScope {//提交Jobval results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)//_* 变长参数Array.concat(results: _*)}

三 Action 算子

1. 无输出算子

foreach:对RDD的每个元素应用 f 函数操作,不返回 RDD 和 Array ,返回的Unit

2. 输出到HDFS

saveAsTextFile : 算子 通过调用 saveAsHadoopFile 进行实现:
this.map(x => (NullWritable.get(), new Text(x.toString))).saveAsHadoopFileTextOutputFormat[NullWritable, Text]
将 RDD 中的每个元素映射转变为 (null, x.toString),然后再将其写入 HDFS

 /*** Save this RDD as a compressed text file, using string representations of elements.*/
//第一个参数:Path为保存的路径;第二个参数:codec为压缩编码格式;def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit = withScope {// https://issues.apache.org/jira/browse/SPARK-2075val nullWritableClassTag: ClassTag[NullWritable] = implicitly[ClassTag[NullWritable]]val textClassTag: ClassTag[Text] = implicitly[ClassTag[Text]]val r : RDD[(NullWritable,Text)] = this.mapPartitions { iter =>val text:Text = new Text()iter.map { x =>text.set(x.toString)(NullWritable.get(), text)}}RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null).saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path, codec)}

saveAsObjectFile 算子:将分区中的每10个元素组成一个Array,然后将这个Array序列化,映射为(Null,BytesWritable(Y))的元素,写入HDFS为SequenceFile的格式。
  下面代码为函数内部实现。
  map(x=>(NullWritable.get(),new BytesWritable(Utils.serialize(x))))
  图24中的左侧方框代表RDD分区,右侧方框代表HDFS的Block。 通过函数将RDD的每个分区存储为HDFS上的一个Block。

3. Scala集合和数据类型

collect 算子 相当于 toArray, toArray 已经过时不推荐使用, collect 将分布式的 RDD 返回为一个单机的 scala Array 数组。在这个数组上运用 scala 的函数式操作。
  图 25中左侧方框代表 RDD 分区,右侧方框代表单机内存中的数组。通过函数操作,将结果返回到 Driver 程序所在的节点,以数组形式存储。

collectAsMap: 对(K,V)型的RDD数据返回一个单机HashMap。 对于重复K的RDD元素,后面的元素覆盖前面的元素。

lookup(key:K):Seq[V]查找
Lookup函数对(Key,Value)型的RDD操作,返回指定Key对应的元素形成的Seq。 这个函数处理优化的部分在于,如果这个RDD包含分区器,则只会对应处理K所在的分区,然后返回由(K,V)形成的Seq。 如果RDD不包含分区器,则需要对全RDD元素进行暴力扫描处理,搜索指定K对应的元素。
 在这里插入图片描述

count : 返回整个RDD的元素个数

top:返回最大的K个元素 ·top返回最大的k个元素。·take返回最小的k个元素。·takeOrdered返回最小的k个元素,并且在返回的数组中保持元素的顺序。·first相当于top(1)返回整个RDD中的前k个元素,可以定义排序的方式Ordering[T]。返回的是一个含前k个元素的数组。

reduceByKeyLocally: 实现的是先reduce再collectAsMap的功能,先对RDD的整体进行reduce操作,然后再收集所有结果返回为一个HashMap。

在这里插入图片描述

reduce: 规约操作 scala当中的reduce可以对集合当中的元素进行归约操作。
reduce包含reduceLeft和reduceRight。reduceLeft就是从左向右归约,reduceRight就是从右向左归约。

(1 to 9).reduceLeft( _ * _) //相当于1 * 2 * 3 * 4 * 5 * 6 * 7 * 8 * 9 
(1 to 9).reduceLeft( _ + _) //相当于1 + 2 + 3 + 4 + 5 + 6 + 7 + 8 + 9 
(1 to 9).reduce(_ + _) //默认是reduceLeft

fold :RDD牵涉到多个分区时,每个分区的初始值都会被累加一次 在累加的时候会再加一次

Spark算子 - fold

2*(2*1*2)*(2*3*4) = 192

在这里插入图片描述

四 有状态算子

val wordAndOne: DStream[(String, Int)] = words.map((_, 1))
/*
参数1: reduce 计算规则
参数2: 窗口长度
参数3: 窗口滑动步长. 每隔这么长时间计算一次.*/
val count: DStream[(String, Int)] = wordAndOne.reduceByKeyAndWindow((x: Int, y: Int) => x + y,Seconds(15), Seconds(10))

countByWindow(windowLength, slideInterval): 返回一个滑动窗口的元素个数

countByValueAndWindow(windowLength, slideInterval, [numTasks]): 对(K,V)对的DStream调用,返回(K,Long)对的新DStream,其中每个key的的对象的v是其在滑动窗口中频率。如上,可配置reduce任务数量。

// Shuffle 性能优化
new SparkConf().set("spark.shuffle.consolidateFiles","true")spark.shuffle.consolidateFiles: 是否开启 shuffle block file 的合并,默认为 false;spark.reducer.maxSizeInFlight: reduce task 的拉取缓存,默认 48Mspark.shuffle.file.buffer : map task 的写磁盘缓存,默认 32k;spark.shuffle.io.maxRetries: 拉取失败的最大重试次数,默认 3 次;spark.shuffle.io.retryWait : 拉取失败的重试间隔,默认 5sspark.shuffle.memoryFraction: 用于 reduce 端聚合的内存比例,默认 0.2, 超过比例就会溢出到磁盘上;

StructuredStreaming

🎋 Spark 内存模型

在这里插入图片描述

https://blog.csdn.net/j904538808/article/details/78854742?utm_medium=distribute.pc_relevant.none-task-blog-baidulandingword-2&spm=1001.2101.3001.4242

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_103185.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SLAM论文速递:SLAM—— 流融合:基于光流的动态稠密RGB-D SLAM—4.25(2)

论文信息 题目&#xff1a; FlowFusion:Dynamic Dense RGB-D SLAM Based on Optical Flow 流融合:基于光流的动态稠密RGB-D SLAM论文地址&#xff1a; https://arxiv.org/pdf/2003.05102.pdf发表期刊&#xff1a; 2020 IEEE International Conference on Robotics and Automa…

flex布局属性详解

Flex布局 flex-directionflex-wrapflex-flowjustify-contentalign-itemsalign-content其他orderflexalign-self 含义:Flex是Flexible Box的缩写&#xff0c;意为”弹性布局”&#xff0c;用来为盒状模型提供最大的灵活性。 flex-direction flex-direction属性决定主轴的方向&…

危险区域闯入识别系统 yolov8

危险区域闯入识别系统通过YOLOv8网络模型技术&#xff0c;危险区域闯入识别系统对现场画面中发现有人违规闯入禁区&#xff0c;系统立即抓拍告警同步回传后台。YOLOv8 提供了一个全新的 SOTA 模型&#xff0c;包括 P5 640 和 P6 1280 分辨率的目标检测网络和基于 YOLACT 的实例…

Model-Contrastive Federated Learning 论文解读(CVPR 2021)

Model-Contrastive Federated Learning 论文解读 对比学习SimCLR 对比学习的基本想法是同类相聚&#xff0c;异类相离 从不同的图像获得的表征应该相互远离&#xff0c;从相同的图像获得的表征应该彼此靠近 具体框架&#xff1a; T随机数据增强模块&#xff1a;随机裁剪然…

光波导相控阵技术

在简述电光效应和热光效应的基础上综述了国内外光波导相控阵技术研究进展&#xff0c;包括一维和二维光波导相控阵的技术途径、结构特点和性能指标&#xff0c;给出了光波导相控阵的优势以及在激光雷达、成像等领域的应用前景。结果表明&#xff0c;光波导相控阵技术正向着大扫…

JavaScript Debugger 调试断点模式

在代码中加入debugger&#xff0c;相当于断点停顿&#xff0c;可用于查看变量传递情况&#xff0c;比如&#xff1a;Vue组件中生命周期onLoad(options) &#xff0c;在上一页面进入下一页面后&#xff0c;传递进来的参数值。 备注 &#xff1a;options 参数为字符串&#xff0…

从需求分析到上线发布,一步步带你开发收废品小程序

在如今的环保和可持续性的大趋势下&#xff0c;废品回收已经成为了人们日常生活中不可或缺的一部分。收废品小程序的开发可以帮助人们更方便地找到回收废品的地点&#xff0c;并有效减少废品对环境造成的污染。因此&#xff0c;我们的收废品小程序需要满足以下需求&#xff1a;…

2023年电信推出新套餐:月租19元=135G流量+长期套餐+无合约期!

在三大运营商推出的流量卡当中&#xff0c;电信可以说是性价比最高的一个&#xff0c;相对于其他两家运营商&#xff0c;完全符合我们低月租&#xff0c;大流量的要求&#xff0c;所以&#xff0c;今天小编介绍的还是电信流量卡。 在这里说一下&#xff0c;小编推荐的卡都是免…

中国制造再击败一家海外企业,彻底取得垄断地位

中国制造已在13个行业取得领先优势&#xff0c;凸显出中国制造的快速崛起&#xff0c;日前中国制造又在一个行业彻底击败海外同行&#xff0c;再次证明了中国制造的实力。 一、海外企业承认失败 提前LGD宣布它位于广州的8.5代液晶面板生产线停产&#xff0c;预计该项目将出售给…

Linux命令rsync增量同步目录下的文件

业务场景描述 最近遇到一个问题&#xff0c;需要编写相应的Linux命令&#xff0c;增量同步/var/mysql里的所有文件到另外一个目录/opt/mysql&#xff0c;但是里面相关的日志文件xx.log是不同步的&#xff0c;这个场景&#xff0c;可以使用rsync来实现 什么是rsync命令&#x…

6、什么是类型断言?

虽然 TypeScript 很强大&#xff0c;但有时还不如我们了解一个值的类型方便&#xff0c;这时候我们更希望 TypeScript 不要帮我们进行类型检查&#xff0c;而是交给我们自己来&#xff0c;所以就用到了类型断言。类型断言有点像是一种类型转换&#xff0c;它把某个值强行指定为…

当,Kotlin Flow与Channel相逢

前言 之前的文章已经分析了Flow的相关原理与简单使用&#xff0c;Flow之所以用起来香&#xff0c;Flow便捷的操作符功不可没&#xff0c;而想要熟练使用更复杂的操作符&#xff0c;那么需要厘清Flow和Channel的关系。 本篇文章构成&#xff1a; 1. Flow与Channel 对比 1.1 Fl…

论文实验1、安装tensorflow运行节点嵌入相关方法

还是官方的教程好使 使用 pip 安装 TensorFlow 只有三步 1.安装python&#xff0c;版本太高不行&#xff0c;在推荐版本里选最高的。 2.安装python虚拟环境venv python -m venv --system-site-packages .\venv .\venv\Scripts\activate 3.在虚拟环境里装tensorflow pip…

开发人员应考虑使用 Edge浏览器的 8 个理由

1.无限访问ChatGPT 这是正确的。您可以通过 Bing 访问 GPT-4。但与 2021 年后没有数据的 ChatGPT 不同&#xff0c;必应通过从自己的搜索引擎中提取最新数据来对其进行补偿。 首先&#xff0c;点击Edge 浏览器左上角的Bing 小图标 Bing 具有三个选项卡&#xff1a;Chat、Compo…

VSCode连接远程服务器调试代码详细流程

文章目录 1.远程连接服务器2. 打开项目文件目录3. 配置调试环境 在研究人工智能项目时&#xff0c;很多时候本地机器性能不够&#xff0c;只能把代码拉倒服务器上&#xff0c;然后利用服务器资源来运行代码。遇到问题时需要调试&#xff0c;本文详细介绍利用VScode来调试远程服…

DAB-Deformable-DETR代码学习记录之模型构建

DAB-DETR的作者在Deformable-DETR基础上&#xff0c;将DAB-DETR的思想融入到了Deformable-DETR中&#xff0c;取得了不错的成绩。今天博主通过源码来学习下DAB-Deformable-DETR模型。 首先我么看下Deformable的创新之处&#xff1a; Deformable-DETR创新 多尺度融合 首先便是…

layui 表格中嵌入下拉框被遮挡

1、代码 单元格样式&#xff1a; * 设置下拉框的高度与表格单元相同 */.layui-table-cell {width: 100%;height: 100%;border: 1px;border-color: #F2F2F2;} 表格初始化后的回调&#xff1a; done: function (res, curr, count) {$(".layui-table-body, .layui-tabl…

MC9S12G128开发板—实现按键发送CAN报文指示小车移动功能

实验环境&#xff1a;MC9S12G128开发板 基本功能&#xff1a;控制开发板上的按键&#xff0c;模拟车辆移动的上下左右四个方位&#xff0c;通过can通信告诉上位机界面&#xff0c;车辆轨迹的移动方位。 1. 1939报文发送的示例代码 MC9S12G128开发板1939协议发送can报文数据的…

php+vue 校友交流平台

1.普通用户功能分析 &#xff08;1&#xff09;用户注册&#xff1a;用于注册校友录用户。 &#xff08;2&#xff09;用户登录&#xff1a;供校友录用户登录。 &#xff08;3&#xff09;资料修改&#xff1a;修改当前登录使用者信息。 &#xff08;4&#xff09;…

“量子+生成式AI”!IBM联合生物制药公司Moderna进行疫苗研究

​ &#xff08;图片来源&#xff1a;网络&#xff09; 4月20日&#xff0c;以COVID-19疫苗而闻名的生物技术和制药公司Moderna Inc.表示&#xff0c;宣布正在与IBM公司合作&#xff0c;利用量子计算和生成式人AI探索推进研究mRNA技术的方法。 双方签署了一项协议&#xff0c;允…