大数据培训之RDD的转换

news/2024/4/29 2:59:53/文章来源:https://blog.csdn.net/zjjcchina/article/details/126929152

RDD的转换(面试开发重点)

RDD整体上分为Value类型和Key-Value类型

1 Value类型

1.1 map(func)案例

  1. 作用:返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
  2. 需求:创建一个1-10数组的RDD,将所有元素*2形成新的RDD

(1)创建

scala> var source  = sc.parallelize(1 to 10)

source: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24

(2)打印

scala> source.collect()

res7: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

(3)将所有元素*2

scala> val mapadd = source.map(_ * 2)

mapadd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[9] at map at <console>:26

(4)打印最终结果

scala> mapadd.collect()

res8: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)

1.2 mapPartitions(func) 案例

  1. 作用:类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]。假设有N个元素,有M个分区,那么map的函数的将被调用N次,而mapPartitions被调用M次,一个函数一次处理所有分区。
  2. 需求:创建一个RDD,使每个元素*2组成新的RDD

(1)创建一个RDD

scala> val rdd = sc.parallelize(Array(1,2,3,4))

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24

(2)使每个元素*2组成新的RDD

scala> rdd.mapPartitions(x=>x.map(_*2))

res3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at mapPartitions at <console>:27

(3)打印新的RDD

scala> res3.collect

res4: Array[Int] = Array(2, 4, 6, 8)

1.3 mapPartitionsWithIndex(func) 案例

  1. 作用:类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U];
  2. 需求:创建一个RDD,使每个元素跟所在分区形成一个元组组成一个新的RDD

(1)创建一个RDD

scala> val rdd = sc.parallelize(Array(1,2,3,4))

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24

(2)使每个元素跟所在分区形成一个元组组成一个新的RDD

scala> val indexRdd = rdd.mapPartitionsWithIndex((index,items)=>(items.map((index,_))))

indexRdd: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[5] at mapPartitionsWithIndex at <console>:26

(3)打印新的RDD

scala> indexRdd.collect

res2: Array[(Int, Int)] = Array((0,1), (0,2), (1,3), (1,4))

1.4 map()和mapPartition()的区别

  1. map():每次处理一条数据。
  2. mapPartition():每次处理一个分区的数据,这个分区的数据处理完后,原RDD中分区的数据才能释放,可能导致OOM。
  3. 开发指导:当内存空间较大的时候建议使用mapPartition(),以提高处理效率。

1.5 flatMap(func) 案例

  1. 作用:类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)
  2. 需求:创建一个元素为1-5的RDD,运用flatMap创建一个新的RDD,新的RDD为原RDD的每个元素的扩展(1->1,2->1,2……5->1,2,3,4,5)

scala> flatMap.collect()

res12: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5)

1.6 glom案例

  1. 作用:将每一个分区形成一个数组,形成新的RDD类型时RDD[Array[T]]
  2. 需求:创建一个4个分区的RDD,并将每个分区的数据放到一个数组

(1)创建

scala> val rdd = sc.parallelize(1 to 16,4)

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[65] at parallelize at <console>:24

(2)将每个分区的数据放到一个数组并收集到Driver端打印

scala> rdd.glom().collect()

res25: Array[Array[Int]] = Array(Array(1, 2, 3, 4), Array(5, 6, 7, 8), Array(9, 10, 11, 12), Array(13, 14, 15, 16))

1.7 groupBy(func)案例

  1. 作用:分组,按照传入函数的返回值进行分组。将相同的key对应的值放入一个迭代器。
  2. 需求:创建一个RDD,按照元素模以2的值进行分组。

(1)创建

scala> val rdd = sc.parallelize(1 to 4)

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[65] at parallelize at <console>:24

(2)按照元素模以2的值进行分组

scala> val group = rdd.groupBy(_%2)

group: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[2] at groupBy at <console>:26

(3)打印结果

scala> group.collect

res0: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(2, 4)), (1,CompactBuffer(1, 3)))

1.8 filter(func) 案例

  1. 作用:过滤。返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成。
  2. 需求:创建一个RDD(由字符串组成),过滤出一个新RDD(包含”xiao”子串)

1.9 sample(withReplacement, fraction, seed) 案例

  1. 作用:以指定的随机种子随机抽样出数量为fraction的数据,withReplacement表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样,seed用于指定随机数生成器种子。
  2. 需求:创建一个RDD(1-10),从中选择放回和不放回抽样

(1)创建RDD

scala> val rdd = sc.parallelize(1 to 10)

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[20] at parallelize at <console>:24

(2)打印

scala> rdd.collect()

res15: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

(3)放回抽样

scala> var sample1 = rdd.sample(true,0.4,2)

sample1: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[21] at sample at <console>:26

(4)打印放回抽样结果

scala> sample1.collect()

res16: Array[Int] = Array(1, 2, 2, 7, 7, 8, 9)

(5)不放回抽样

scala> var sample2 = rdd.sample(false,0.2,3)

sample2: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[22] at sample at <console>:26

(6)打印不放回抽样结果

scala> sample2.collect()

res17: Array[Int] = Array(1, 9)

1.10 distinct([numTasks])) 案例

  1. 作用:对源RDD进行去重后返回一个新的RDD。
  2. 需求:创建一个RDD,使用distinct()对其去重。

1.11 coalesce(numPartitions) 案例

  1. 作用:缩减分区数,用于大数据集过滤后,提高小数据集的执行效率。
  2. 需求:创建一个4个分区的RDD,对其缩减分区

(1)创建一个RDD

scala> val rdd = sc.parallelize(1 to 16,4)

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[54] at parallelize at <console>:24

(2)查看RDD的分区数

scala> rdd.partitions.size

res20: Int = 4

(3)对RDD重新分区

scala> val coalesceRDD = rdd.coalesce(3)

coalesceRDD: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[55] at coalesce at <console>:26

(4)查看新RDD的分区数

scala> coalesceRDD.partitions.size

res21: Int = 3

1.12 repartition(numPartitions) 案例

  1. 作用:根据分区数,重新通过网络随机洗牌所有数据。
  2. 需求:创建一个4个分区的RDD,对其重新分区

(1)创建一个RDD

scala> val rdd = sc.parallelize(1 to 16,4)

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[56] at parallelize at <console>:24

(2)查看RDD的分区数

scala> rdd.partitions.size

res22: Int = 4

(3)对RDD重新分区

scala> val rerdd = rdd.repartition(2)

rerdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[60] at repartition at <console>:26

(4)查看新RDD的分区数

scala> rerdd.partitions.size

res23: Int = 2

1.13 coalesce和repartition的区别

  1. coalesce重新分区,可以选择是否进行shuffle过程。由参数shuffle: Boolean = false/true决定。
  2. repartition实际上是调用的coalesce,进行shuffle。源码如下:

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
  coalesce(numPartitions, shuffle = true)
}

1.14 sortBy(func,[ascending], [numTasks]) 案例

  1. 作用;使用func先对数据进行处理,按照处理后的数据比较结果排序,默认为正序。
  2. 需求:创建一个RDD,按照不同的规则进行排序

(1)创建一个RDD

scala> val rdd = sc.parallelize(List(2,1,3,4))

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[21] at parallelize at <console>:24

(2)按照自身大小排序

scala> rdd.sortBy(x => x).collect()

res11: Array[Int] = Array(1, 2, 3, 4)

(3)按照与3余数的大小排序

scala> rdd.sortBy(x => x%3).collect()

res12: Array[Int] = Array(3, 4, 1, 2)

1.15 pipe(command, [envVars]) 案例

  1. 作用:管道,针对每个分区,都执行一个shell脚本,返回输出的RDD。

注意:脚本需要放在Worker节点可以访问到的位置

  1. 需求:编写一个脚本,使用管道将脚本作用于RDD上。

(1)编写一个脚本

Shell脚本

#!/bin/sh

echo “AA”

while read LINE; do

   echo “>>>”${LINE}

done

(2)创建一个只有一个分区的RDD

scala> val rdd = sc.parallelize(List(“hi”,”Hello”,”how”,”are”,”you”),1)

rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[50] at parallelize at <console>:24

(3)将脚本作用该RDD并打印

scala> rdd.pipe(“/opt/module/spark/pipe.sh”).collect()

res18: Array[String] = Array(AA, >>>hi, >>>Hello, >>>how, >>>are, >>>you)

(4)创建一个有两个分区的RDD

scala> val rdd = sc.parallelize(List(“hi”,”Hello”,”how”,”are”,”you”),2)

rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[52] at parallelize at <console>:24

(5)将脚本作用该RDD并打印

scala> rdd.pipe(“/opt/module/spark/pipe.sh”).collect()

res19: Array[String] = Array(AA, >>>hi, >>>Hello, AA, >>>how, >>>are, >>>you)

想要了解跟多关于大数据培训课程内容欢迎关注尚硅谷大数据培训,尚硅谷除了这些技术文章外还有免费的高质量大数据培训课程视频供广大学员下载学习。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_10322.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vue:实现锚点双向滚动/文章章节联动滚动效果

需求描述 需要实现类似doc中文档大纲的效果&#xff0c;点击对应章节的名称时定位到相应的正文&#xff1b;而当正文滚动时&#xff0c;高亮显示对应的章节名称 实现思路 其实笔者一开始想到的是利用a标签页内跳转&#xff08;也就是“锚点”&#xff09;&#xff0c;类似于…

我惊了,花重金求来的并发编程笔记,颠覆了我以往“正确“的认知

对于一个 Java 程序员而言&#xff0c;能否熟练掌握并发编程是判断他优秀与否的重要标准之一。因为并发编程是 Java 语言中最为晦涩的知识点&#xff0c;它涉及操作系统、内存、CPU、编程语言等多方面的基础能力&#xff0c;更为考验一个程序员的内功。那到底应该怎么学习并发编…

每周一坑--阿里云欠费之网站504

阿里云欠费之网站504想不到周一就遇到大坑,网站访问不了,堡垒机远程登录不上服务器,只能控制台操作。   我详细描述下现象,希望大家不要踩前人的坑~(我可是以写故障报告为代价的 = =) 现象:该网站因为接入到WAF上,所以80、443端口只对WAF段开放,排查过程中,我一度以…

浅谈java单元测试框架junit4/5

0 前言 junit是一个开源的Java语言的单元测试框架。目前junit主要有版本junit3&#xff0c;junit4和junit5。因在junit3中&#xff0c;是通过对测试类和测试方法的命名来确定是否是测试&#xff0c;且所有的测试类必须继承junit的测试基类TestCase&#xff0c;所以本文不再讨论…

c++11 map自定义key

文章目录一、map简介二、自定义key三、demo演示四、自定义模板参数总结参考资料一、map简介 map一般采用红黑树存储数据&#xff0c;是一种映射类型的关联容器&#xff0c;map中的元素是关键字 - 值&#xff08;key-value&#xff09;对&#xff1a;关键字起到索引的作用&…

物联网威胁监测系统最新发现一款针对IoT设备的RAT远控木马

一、背景概述 2022年7月20日&#xff0c;天穹威胁监测系统监测到IoT蜜罐系统中的D-Link Dir 817LW路由器遭受来自澳大利亚IP的攻击。系统显示目标设备被攻击成功&#xff0c;并且下载了恶意样本。实验室相关人员第一时间对该攻击事件样本进行分析&#xff0c;发现该次攻击投递…

【新学期 新Flag】新方向 新动力

暑假结束&#xff0c;新学期开始&#xff0c;快来立下你的Flag吧&#xff01; 一、自我介绍 本人来自陕西某大学计算机科学与技术专业的一名学生&#xff0c;作为一个创作者&#xff0c;在CSDN写文章&#xff0c;记录下自己学习的过程&#xff0c;通过社区互相交流&#xff0…

ESXi重置密码以及修改网络IP地址的方法

Study From https://www.cnblogs.com/mk21/p/15784082.html前期公司有部分虚拟化的服务器因为只通过vCenter进行管理. 导致密码遗失. 最近因为公司的服务器要切换IP地址, 发现没有密码无法进行修改,比较难处理. 为了能够将网络进行正常的迁移. 从网上找到了个比较简单的重置…

某大型保险集团在线财险业务系统数据库存储架构由集中式向分布式转型实践

【摘要】随着某机构业务自传统B2B类型向互联网的转变,访问量的激增、用户量持续爆炸式增长、数据量爆炸式增长,业务场景具备高吞吐量、高并发量等等新需求,这些都要求后台数据库具备支持高TPS、高QPS(每秒的查询量)以及支持高并发量的能力。传统的集中式存储架构已不适应新…

美化页面元素

目录 1、为什么要美化网页 span标签&#xff1a; 2、字体样式 &#xff08;1&#xff09;选择字体 font-family &#xff08;2&#xff09;font-size 字体大小 &#xff08;3&#xff09;font-weight 字体粗细 &#xff08;4&#xff09;color 字体颜色 连着写 扩展&am…

kafka和flink的入门到精通 1 大数据时代,分布式数据存储,数仓

参考006 - 大数据 - 系统架构 - 总览_哔哩哔哩_bilibili 目录 一、大数据时代 ◼ 信息化浪潮 ◼ 技术支撑 ◼ 三阶段 ◼ 大数据的发展历程 二、大数据概述 ◼ 大数据 特点&#xff1a; ◼ 大数据部门组织 三、分布式数据存储 ◼ 单点数据存储 ◼ 主从架构模式 ◼…

知物由学 | AI与黑产的攻守之道,详解攻击类文字图像的检测

导读&#xff1a;随着 OCR 系统识别能力的提升&#xff0c;专业对抗 OCR 的黑产也越来越多&#xff0c;这个过程中 AI 如何抵御黑产攻击类的文字图像&#xff1f;本文通过分享相似性特征训练的常见算法&#xff0c;并选择了其中一些有代表性的工作进行介绍&#xff0c;希望能给…

数据库干货 | 防止重复记录的发生

许多数据库管理员&#xff08;DBA&#xff09;需要至少花费一段时间来尝试从数据库表中识别和删除重复记录。如果一开始多注意防止重复插入&#xff0c;那么识别和删除重复记录所花费的大部分时间都可以用于其他工作上。原则上&#xff0c;这并不难做到。但是&#xff0c;实际上…

Windows环境下Hadoop的安装和配置

Windows环境下Hadoop的安装和配置Windows环境下Hadoop的安装和配置平台及版本安装 Java1.8&#xff0c;并配置环境变量安装Hadoop2.7.3Hadoop核心配置文件启动Hadoop服务Windows环境下Hadoop的安装和配置 平台及版本 Windows10JDK1.8.0_192Hadoop2.7.3 安装 Java1.8&#xf…

Java Math

本博客具体总结了java中的api Math中部分函数使用方法&#xff1a; 取绝对值 Modifier and TypeMethod and Descriptionstatic doubleabs(double a) 返回一个 double值的绝对值。static floatabs(float a) 返回一个 float值的绝对值。static intabs(int a) 返回一个值的绝对值…

卜算法学习笔记-02-分而治之算法02

数组中的逆序对计数 算法分析 所谓逆序对&#xff0c;是指数组中的两个元素 A[i]A[i]A[i] 和 A[j]A[j]A[j]&#xff0c;其下标 i<ji < ji<j&#xff0c;但是考察元素的值&#xff0c;却有 A[i]>A[j]A[i] > A[j]A[i]>A[j]。 输入&#xff1a;一个包含 nnn 个…

vue项目实战-完成路由组件的搭建

vue项目实战-完成路由组件的搭建 1.安装vue-router npm i vue-router --save分析结构可知&#xff0c;路由组件有四个&#xff1a;Home、Search、Login、Register 2.创建路由组件文件夹pages以及各路由组件 3.配置路由 项目中配置路由一般配置在router文件夹中&#xff0c;…

工业智能网关BL110应用之八十一: 实现西门子S7-400 PLC 接入亚马逊云平台

LAN 接口的配置COM口采集西门子S7-400 PLC的配置 工业智能网关BL110一共有一 个LAN 接口&#xff0c;一个WAN接口&#xff0c;可以通过LAN 接口采集数据&#xff0c;通过WAN接口接入局域网&#xff0c;设置过程不一样&#xff0c;WAN接口可以自动获取IP以及相关以太网设置。 …

硅光电子器件模拟:“RSoft光电器件设计仿真技术与应用”

RSoft光子器件工具包括业界最广泛的模拟器和优化器&#xff0c;一款非常优秀的设计仿真软件&#xff0c;能够帮助用户轻松的设计光学元件、纳米级光学结构&#xff0c;同时也可以模拟无源或有源的光电子器等。RSoft具有高度精确的算法能快速建立虚拟样机&#xff0c;同时降低了…

FPGA 20个例程篇:15.VGA显示八种颜色的彩条

第六章 图像显示处理&#xff0c;经典再现 15.VGA显示八种颜色的彩条 图像和视频处理可以说是FPGA中又一个经典地应用&#xff0c;使用FPGA做图像处理最核心的优势就在于&#xff1a;FPGA能进行实时流水线运算&#xff0c;从而达到更高的实时性&#xff0c;围绕着图像处理又有…