Pyspark基础入门4_RDD转换算子

news/2024/4/26 22:51:20/文章来源:https://blog.csdn.net/weixin_53280379/article/details/129131110

Pyspark

注:大家觉得博客好的话,别忘了点赞收藏呀,本人每周都会更新关于人工智能和大数据相关的内容,内容多为原创,Python Java Scala SQL 代码,CV NLP 推荐系统等,Spark Flink Kafka Hbase Hive Flume等等~写的都是纯干货,各种顶会的论文解读,一起进步。
今天继续和大家分享一下Pyspark基础入门3
#博学谷IT学习技术支持


文章目录

  • Pyspark
  • 前言
  • 一、RDD算子的分类
  • 二、转换算子
    • 1.map算子
    • 2.groupBy算子
    • 3.filter算子
    • 4.flatMap算子
    • 5.union(并集) 和 intersection(交集)算子
    • 6.groupByKey算子
    • 7.reduceByKey算子
    • 8.sortByKey算子
    • 9.countByKey和 countByValue算子
  • 三、动作算子
    • 1.reduce算子
    • 2.first算子
    • 3.take算子
    • 4.top算子
    • 5.count算子
    • 6.foreach算子
    • 7.takeSample算子
  • 总结


前言

今天和大家分享的是Spark RDD算子相关的操作。
RDD算子: 指的是RDD对象中提供了非常多的具有特殊功能的函数, 我们一般将这样的函数称为算子(大白话: 指的RDD的API)


一、RDD算子的分类

整个RDD算子, 共分为二大类: Transformation(转换算子) 和 Action(动作算子)
转换算子:
1- 所有的转换算子在执行完成后, 都会返回一个新的RDD
2- 所有的转换算子都是LAZY(惰性),并不会立即执行, 此时可以认为通过转换算子来定义RDD的计算规则
3- 转换算子必须遇到Action算子才会触发执行

动作算子:
1- 动作算子在执行后, 不会返回一个RDD, 要不然没有返回值, 要不就返回其他的
2- 动作算子都是立即执行, 一个动作算子就会产生一个Job执行任务,运行这个动作算子所依赖的所有的RDD

二、转换算子

在这里插入图片描述

1.map算子

  • 格式: rdd.map(fn)
  • 说明: 根据传入的函数, 对数据进行一对一的转换操作, 传入一行, 返回一行
from pyspark import SparkContext, SparkConfif __name__ == '__main__':conf = SparkConf().setAppName("demo1").setMaster("local[*]")sc = SparkContext(conf=conf)rdd_init = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])rdd_map = rdd_init.map(lambda num: num + 1)rdd_res = rdd_map.collect()print(rdd_res)sc.stop()

2.groupBy算子

  • 格式: groupBy(fn)
  • 说明: 根据传入的函数对数据进行分组操作
from pyspark import SparkContext, SparkConfif __name__ == '__main__':conf = SparkConf().setAppName("demo2").setMaster("local[*]")sc = SparkContext(conf=conf)rdd_init = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])# def jo(num):#     if num % 2 == 0:#         return 'o'#     else:#         return 'j'rdd_group_by = rdd_init.groupBy(lambda num: 'o' if num % 2 == 0 else 'j')rdd_res = rdd_group_by.mapValues(list).collect()print(rdd_res)sc.stop()

3.filter算子

  • 格式: filter(fn)
  • 说明: 过滤算子, 可以根据函数中指定的过滤条件, 对数据进行过滤操作, 条件返回True表示保留, 返回False表示过滤掉
from pyspark import SparkContext, SparkConfif __name__ == '__main__':conf = SparkConf().setAppName("demo3").setMaster("local[*]")sc = SparkContext(conf=conf)rdd_init = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])rdd_res = rdd_init.filter(lambda num: num > 3).collect()print(rdd_res)sc.stop()

4.flatMap算子

  • 格式: flatMap(fn)
  • 说明: 在map算子的基础上, 在加入一个压扁的操作, 主要适用于一行中包含多个内容的操作, 实现一转多的操作
from pyspark import SparkContext, SparkConfif __name__ == '__main__':conf = SparkConf().setAppName("demo4").setMaster("local[*]")sc = SparkContext(conf=conf)rdd_init = sc.parallelize(['张三 李四 王五 赵六', '田七 周八 李九'])rdd_res = rdd_init.flatMap(lambda line: line.split(' ')).collect()print(rdd_res)sc.stop()

5.union(并集) 和 intersection(交集)算子

格式: rdd1.union|intersection(rdd2)

from pyspark import SparkContext, SparkConfif __name__ == '__main__':conf = SparkConf().setAppName("demo5").setMaster("local[*]")sc = SparkContext(conf=conf)rdd1 = sc.parallelize([3, 1, 5, 7, 9])rdd2 = sc.parallelize([5, 8, 2, 4, 0])# rdd_res = rdd1.union(rdd2).collect()# rdd_res = rdd1.union(rdd2).distinct().collect()rdd_res = rdd1.intersection(rdd2).collect()print(rdd_res)sc.stop()

6.groupByKey算子

  • 格式: groupByKey()
  • 说明: 根据key进行分组操作
from pyspark import SparkContext, SparkConfif __name__ == '__main__':conf = SparkConf().setAppName("demo6").setMaster("local[*]")sc = SparkContext(conf=conf)rdd_init = sc.parallelize([('c01', '张三'), ('c02', '李四'), ('c02', '王五'),('c02', '赵六'), ('c03', '田七'), ('c03', '周八')])rdd_res = rdd_init.groupByKey().mapValues(list).collect()print(rdd_res)sc.stop()

7.reduceByKey算子

  • 格式: reduceByKey(fn)
  • 说明: 根据key进行分组, 将一个组内的value数据放置到一个列表中, 对这个列表基于 传入函数进行聚合计算操作
from pyspark import SparkContext, SparkConfif __name__ == '__main__':conf = SparkConf().setAppName("demo7").setMaster("local[*]")sc = SparkContext(conf=conf)rdd_init = sc.parallelize([('c01', '张三'), ('c02', '李四'), ('c02', '王五'),('c02', '赵六'), ('c03', '田七'), ('c03', '周八')])rdd_res = rdd_init.map(lambda kv: (kv[0], 1)).reduceByKey(lambda arr, curr: arr + curr).collect()print(rdd_res)sc.stop()

8.sortByKey算子

  • 格式: sortByKey(ascending = True|False)
  • 说明: 根据key进行排序操作, 默认按照key进行升序排序, 如果需要倒序, 设置 ascending 为False
from pyspark import SparkContext, SparkConfif __name__ == '__main__':conf = SparkConf().setAppName("demo8").setMaster("local[*]")sc = SparkContext(conf=conf)rdd_init = sc.parallelize([('c03', '张三'), ('c04', '李四'), ('c05', '王五'),('c01', '赵六'), ('c07', '田七'), ('c08', '周八')])rdd_res = rdd_init.sortByKey(ascending=False).collect()print(rdd_res)sc.stop()

9.countByKey和 countByValue算子

  • countByKey() 根据key进行分组 统计每个分组下有多少个元素
  • countByValue() 根据value进行分组, 统计相同value有多少个
from pyspark import SparkContext, SparkConfif __name__ == '__main__':conf = SparkConf().setAppName("demo9").setMaster("local[*]")sc = SparkContext(conf=conf)rdd_init = sc.parallelize([('c01', '张三'), ('c02', '李四'), ('c02', '王五'),('c02', '赵六'), ('c03', '田七'), ('c03', '周八'), ('c01', '张三')])rdd_res0 = rdd_init.countByKey()rdd_res1 = rdd_init.countByValue()print(rdd_res0)print(rdd_res1)sc.stop()

三、动作算子

在这里插入图片描述

1.reduce算子

  • 格式: reduce(fn)
  • 作用: 根据传入的函数对数据进行聚合操作
from pyspark import SparkContext, SparkConfif __name__ == '__main__':conf = SparkConf().setAppName("demo1").setMaster("local[*]")sc = SparkContext(conf=conf)rdd_init = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])rdd_res = rdd_init.reduce(lambda agg, curr: agg + curr)print(rdd_res)sc.stop()

2.first算子

  • 格式: first()
  • 说明: 获取第一个元素
from pyspark import SparkContext, SparkConfif __name__ == '__main__':conf = SparkConf().setAppName("demo2").setMaster("local[*]")sc = SparkContext(conf=conf)rdd_init = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])rdd_res = rdd_init.first()print(rdd_res)sc.stop()

3.take算子

  • 格式: take(N)
  • 说明: 获取前N个元素, 类似于limit操作
from pyspark import SparkContext, SparkConfif __name__ == '__main__':conf = SparkConf().setAppName("demo3").setMaster("local[*]")sc = SparkContext(conf=conf)rdd_init = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])rdd_res = rdd_init.take(3)print(rdd_res)sc.stop()

4.top算子

  • 格式: top(N, [fn])
  • 说明: 对数据集进行倒序排序操作, 如果是kv类型, 默认是针对key进行排序, 获取前N个元素
  • fn: 可以自定义排序, 根据谁来排序
from pyspark import SparkContext, SparkConfif __name__ == '__main__':conf = SparkConf().setAppName("demo4").setMaster("local[*]")sc = SparkContext(conf=conf)rdd_init = sc.parallelize([('c03', 10), ('c04', 30), ('c05', 20),('c01', 20), ('c07', 80), ('c08', 5)])rdd_res = rdd_init.top(3, lambda kv: kv[1])print(rdd_res)sc.stop()

5.count算子

  • 格式: count()
  • 说明: 统计多少个
from pyspark import SparkContext, SparkConfif __name__ == '__main__':conf = SparkConf().setAppName("demo5").setMaster("local[*]")sc = SparkContext(conf=conf)rdd_init = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])rdd_res = rdd_init.count()print(rdd_res)sc.stop()

6.foreach算子

  • 格式: foreach(fn)
  • 说明: 对数据集进行遍历操作, 遍历后做什么, 取决于传入的函数
from pyspark import SparkContext, SparkConfif __name__ == '__main__':conf = SparkConf().setAppName("demo6").setMaster("local[*]")sc = SparkContext(conf=conf)rdd_init = sc.parallelize([('c03', 10), ('c04', 30), ('c05', 20),('c01', 20), ('c07', 80), ('c08', 5)])rdd_res = rdd_init.foreach(lambda kv : print(kv))print(rdd_res)sc.stop()

7.takeSample算子

  • 格式: takeSample(True|False, N,seed(种子值))

    • 参数1: 是否允许重复采样
    • 参数2: 采样多少个, 如果允许重复采样, 采样个数不限制, 否则最多等于本身数量个数
    • 参数3: 设置种子值, 值可以随便写, 一旦写死了, 表示每次采样的内容也是固定的(可选的) 如果没有特殊需要, 一般不设置
  • 作用: 数据抽样

from pyspark import SparkContext, SparkConfif __name__ == '__main__':conf = SparkConf().setAppName("demo6").setMaster("local[*]")sc = SparkContext(conf=conf)rdd_init = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])rdd_res = rdd_init.takeSample(withReplacement=True, num=5, seed=1)print(rdd_res)sc.stop()

总结

今天主要分享了RDD的转换算子和动作算子,下次继续分享RDD的一些其他重要算子。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_71838.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Head First设计模式---2.观察者模式

观察者(Observer)模式,是一种行为型设计模式,允许你定义一种订阅机制,可以在对象事件发生时通知更多个“观察”该对象的其他对象,类似于“订阅—通知” 问题 假如你有两种类型的对象,顾客和商…

将默认安装的 WSL2 迁移至指定目录

将默认安装的 WSL2 迁移至指定目录WSL2 默认安装在 C 盘下,系统盘空间有限,推荐更改安装目录。 1. 默认安装的 WSL2 目录 C:\Users\cheng\AppData\Local\Packages\CanonicalGroupLimited.Ubuntu20.04onWindows_79rhkp1fndgsc\LocalState\ext4.vhdx 2. …

运筹系列65:TSP问题的精确求解法概述

1. 给定upbound的Christofides方法 这是可以给出上界的一个方法,可以证明构造出的路线不超过最优路线的1.5倍。步骤为: 1)构造MST(最小生成树) 2)将里面的奇点连接起来构成欧拉回路称为完美匹配。Edmonds给…

Docker--------Day2

1.Docker镜像 1.1 是什么 镜像 是一种轻量级、可执行的独立软件包,它包含运行某个软件所需的所有内容,我们把应用程序和配置依赖打包好形成一个可交付的运行环境(包括代码、运行时需要的库、环境变量和配置文件等),这个打包好的运行环境就是…

盘点2023年大企业都在用的优秀项目管理软件

行内有句话:每个成功的项目背后肯定有一个成功的项目经理,而每个项目经理背后都少不了一些专业的项目管理工具。要在任何项目中取得成功,对项目进行全面的管理非常关键,包括项目的执行、计划、推进、监控、结果等,有了…

[架构之路-114]-《软考-系统架构设计师》-软件架构设计-7-软件架构评估

前言第7节 软件架构评估7.1 什么是架构评估/为什么要软件架构评估在软硬件系统总体架构设计完成之后,为保证架构设计的合理性、完整性和针对性,从根本上保证系统质量,降低成本及投资风险,需要对总体架构进行评估。7.2 软件架构评估…

rk3568网口CAN串口通信速率性能

通信接口性能参数外设接口性能参数测试结果为实验室实测值,可作为设计参考,但因测试环境和器件批次差异,可能会存在一定的误差,且测试结果依赖评估板性能,核心板搭配不同底板性能也可能存在差异,请结合实际…

OpenEuler安装软件方法

在树莓派上烧录好OpenEuler后上面是什么软件都没有的,像一些gcc的环境都需要自己进行配置。官方提供的安装命令是yum,但是执行yum是找不到命令的:   这个其实是因为OpenEuler中默认的安装软件使用了dnf而不是yum,所以软件的安装…

《Python机器学习》安装anaconda + numpy使用示例

👂 小宇(治愈版) - 刘大拿 - 单曲 - 网易云音乐 目录 一,安装 二,Numpy使用示例 (一)Numpy数组的创建和访问 1,创建和访问Numpy的一维数组和二维数组 2,Numpy数组…

可调恒流驱动LED电路分析

https://www.icxbk.com/article/detail?aid884 常规使用的pwm调亮度不仅会导致频闪,而且在长时间使用的时候,有损坏led的风险,所以这次设计了一个恒流调亮度电路,其电路图如下所示 电路原理的解读: 左侧的电位计起着…

Eclipse各版本安装Tomcat插件全攻略

Eclipse Tomcat 插件的作用 Eclipse Tomcat 插件可以将Tomcat 集成到Eclipse中,插件安装之后在Eclipse中可以看到类似下面的几个图标: Eclipse Tomcat 插件的主要作用有: 在Eclipse 中可以直接启动,关闭和重启本机的Tomcat可以…

电容的参数-详细描述

贴片电容 如同如所示,MLCC(Multi-layer Ceramic Capacitors),外形很好区分。 实际内部结构 使用的还是平行板电容器原理,只是这个是叠层结构;电解电容是卷起来的圆柱状; 容值: …

Ubuntu22.04设置独显用于深度学习运算,核显用于屏幕显示

目录摘要主板bios设置第一步:切换prime-select第二步:关机重启,并将显示器接口插到主板上第三步:设置PRIME Profiles为NVIDIA On-Demand模式注意事项参考文献摘要 目前有需求配置台式机win11Ubuntu的双系统,安装双系统…

linux线程的基本知识

这里用的是Linux的pthread线程库,需要加pthread线程库。 线程的创建 第一个参数是线程id的地址。第二个参数是线程属性,一般为NULL。第三个是要执行的函数。第四个是函数的参数,一般也为NULL 线程的等待,第一个参数是线程的id,第…

SpringBoot之DEBUG远程调试黑科技?

所谓的远程调试就是服务端程序运行在一台远程服务器上,我们可以在本地服务端的代码(前提是本地 的代码必须和远程服务器运行的代码一致)中设置断点,每当有请求到远程服务器时时能够在本地知道 远程服务端的此时的内部状态。 简单的…

10.现代循环神经网络

10.现代循环神经网络 目录 门控循环单元(GRU)门控隐状态 重置门和更新门候选隐状态 隐状态从零开始实现 初始化模型参数定义模型训练与预测 简洁实现总结 长短期记忆网络(LSTM) 门控记忆元 输入门、忘记门和输出门候选记忆元记忆…

论文复现:模拟风电不确定性——拉丁超立方抽样生成及缩减场景(Matlab)

风电出力的不确定性主要源于预测误差,而研究表明预测误差(e)服从正态分布且大概为预测出力的10%。本代码采用拉丁超立方抽样实现场景生成[1,2]、基于概率距离的快速前代消除法实现场景缩减[3],以此模拟了风电出力的不确定性。 1 …

蓝桥杯刷题025——推导部分和(加权并查集)

2022省赛 问题描述 对于一个长度为 N 的整数数列 ​, 小蓝想知道下标 l 到 r 的部 分和是多少? 然而, 小蓝并不知道数列中每个数的值是多少, 他只知道它的 M 个部分和 的值。其中第 i 个部分和是下标 ​ 到 的部分和 , 值是 。 输入格式 第一行包含 3 个整数 N、M 和 Q 。分…

基于DSP+FPGA的机载雷达伺服控制系统的硬件设计与开发

机载雷达是以飞机为载体的各种雷达天线的总称,主要用于空中侦察、警戒、保 证航行准确与安全[1]。随着航空航天技术的飞速发展,以及微电子、计算机和高速集 成电路等新型技术在军事领域的广泛应用[2],各国都研制出了许多新型战机和导弹,机 载…

企业微信的聊天机器人来了,免费下载(Python版)

大家好,这里是程序员晚枫,个人网址:python-office.com 上次分享了微信机器人的视频以后,视频下面有一个热门评论: 什么时候开发企业版微信机器人?自动回复、自动群发等等~ 在经历了一段时间的查找和开发以…