【博学谷学习记录】超强总结,用心分享|狂野大数据课程【Spark SQL函数定义】的总结分析

news/2024/5/20 2:06:34/文章来源:https://blog.csdn.net/qq_42198232/article/details/129348370

5.1 如何使用窗口函数

回顾:

窗口函数格式:分析函数 over(partition by xxx order by xxx [asc|desc] [rows between xxx and xxx])学习的相关分析函数有那些? 第一类: row_number() rank() dense_rank() ntile()第二类: 和聚合函数组合使用  sum() avg() max() min() count()第三类: lag() lead() first_value() last_value()

如何在Spark SQL中使用呢?

  • SQL中: 与HIVE中应用基本没啥区别, 更多关注的是DSL写法
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql import Window as win
import os# 锁定远端环境, 确保环境统一
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':print("演示: 如何在Spark SQL中使用窗口函数...")# 1- 创建SparkSession对象spark = SparkSession.builder.appName('df_write').master('local[*]').getOrCreate()# 2-读取外部文件的数据df = spark.read.csv(path='file:///export/data/workspace/ky06_pyspark/_03_SparkSql/data/pv.csv',header=True,inferSchema=True)df.createTempView('t1')# 3- 执行相关的操作# 需要: 统计每个cookie中, pv数量排名前二内容是哪些? (分组TOPN 问题)# SQLspark.sql("""with t2 as(select*,row_number() over (partition by cookieid order by pv desc) as rank1from t1 )select  * from  t2 where rank1 <=2""").show()# DSL:df.select('*',F.row_number().over(win.partitionBy('cookieid').orderBy(F.desc('pv'))).alias('rank1')).where('rank1 <= 2').show()

2 SQL函数的分类说明

整个SQL函数, 主要是分为以下三大类:

  • UDF函数: 用户自定义函数
    • 表示: 一进一出
    • 整个函数中, 大多数的函数都是属于一进一出的函数: split() substr()
  • UDAF函数: 用户自定义聚合函数
    • 表示: 多进一出
    • 例如: sum() avg() count() ….
  • UDTF函数: 用户自定义表生成函数
    • 表示: 一进多出
    • 指的: 进去一行数据, 最终产生多行 或者多列的数据
    • 例如: explode

在SQL中提供的内置函数, 都是属于以上三类中某一类函数

思考: 提供了那么多的函数, 为啥还需要自定义函数呢?

扩充函数. 在实际使用中, 并不能保证所有的操作可能用的函数都已经提前的内置好, 尤其是很多具有特殊业务处理功能, 其实并没有相对应函数 , 提供的函数更多是以公共的功能为主函数, 此时需要进行自定义, 从而扩充新的功能

​ 在Spark SQL中, 对于自定义函数, 原生支持的粒度并不是特别好, 目前原生的PY方案仅支持自定义UDF函数, 无法自定义UDAF函数和UDTF函数, 在1.6版本之后, Java 和scala语言支持了自定义UDAF函数,但是Python并不支持,Spark官方提供了解决的方案: 基于pandas来自定义UDF 和 UDAF函数. 但是对于UDTF函数, Spark是不支持自定义,但是Spark支持HIVE的函数定义, 所以可以通过HIVE自定义函数来解决

在这里插入图片描述

	虽然Python支持自定义UDF函数, 但是其效率并不是特别的高效, 因为在使用的时候, 传递一行处理一行, 返回一行的操作, 这样会带来非常大的序列化开销问题, 以及网络开销问题, 导致原生UDF函数效率不好早期解决方案: 基于 scala/Java来编写自定义UDF函数, 然后基于Python使用即可目前主要解决方案: 引入Arrow框架, 可以基于内存来完成数据传输工作, 可以大大降低了序列化开销问题, 提供传输的效率, 解决了原生问题, 同时还可以基于pandas的自定义函数, 利用pandas函数优势, 完成各种处理操作所以后期主推的方案: 基于pandas 自定义函数, 然后底层基于arrow完成数据传输工作

3 Spark SQL原生自定义函数

如何自定义原生函数流程(非常重要):

第一步: 在Python中创建一个python的函数, 在这个函数中书写自定义函数的功能逻辑代码即可第二步: 将Python函数注册到Spark SQL中, 成为Spark SQL的函数注册方式一: udf对象 = sparkSession.udf.register(参数1,参数2,参数3)参数1: UDF函数的名称, 此名称用于后续在SQL语法中使用 , 可以任意定义名称, 但是要符合定义名称规范参数2: python函数的名称, 表示将哪个python的函数注册为Spark SQL的函数参数3: 返回值的类型, 用于表示当前这个Python的函数返回的类型对应的Spark SQL的数据类型udf对象: 此对象主要是用于DSL中注册方式二:  udf对象 = F.udf(参数1,参数2)参数1: python函数的名称, 表示将哪个python的函数注册为Spark SQL的函数参数2: 返回值的类型, 用于表示当前这个Python的函数返回的类型对应的Spark SQL的数据类型udf对象: 此对象主要是用于DSL中说明: 此种方式还支持语法糖写法:  @F.udf(returnType=返回值类型) 需要放置到对应函数上面
第三步: 在Spark SQL的DSL/SQL中进行使用即可

演示操作: 请自定义一个函数, 完成对数据统一添加一个后缀名的操作

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
import os# 锁定远端环境, 确保环境统一
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':print("演示原生的自定义函数:")# 1- 创建SparkSession对象spark = SparkSession.builder.appName('df_write').master('local[*]').getOrCreate()# 2- 初始化一些数据df = spark.createDataFrame(data=[(1,'张三','北京'),(2,'李四','上海'),(3,'王五','广州'),(4,'赵六','深圳'),(5,'田七','杭州')],schema='id int,name string,address string')df.createTempView('t1')# 3- 执行相关的操作:# 请自定义一个函数, 完成对数据统一添加一个后缀名的操作# 3.1 定义一个Python的函数, 接收一个数据, 给数据添加一个后缀返回@F.udf(returnType=StringType())def add_post(data):return data+'_boxuegu'# 3.2 对函数进行注册操作# 注册方式一# 当采用注解方式注册函数的使用, 如果依然想在SQL中使用, 可以再次使用方式一注册,但是不需要设置返回值类型了spark.udf.register('add_post_sql',add_post)# 注册方式二: 还有一种语法糖模式#add_post_dsl = F.udf(add_post,StringType())# 3.3 使用自定义函数# SQLspark.sql("""select*,add_post_sql(address) as r1from t1""").show()# DSLdf.select('*',add_post('address').alias('r1')).show()

演示操作: 自定义一个函数, 让其返回值的类型为字典 列表 元组

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
import os# 锁定远端环境, 确保环境统一
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':print("演示原生的自定义函数:")# 1- 创建SparkSession对象spark = SparkSession.builder.appName('df_write').master('local[*]').getOrCreate()# 2- 初始化一些数据df = spark.createDataFrame(data=[(1,'张三 北京'),(2,'李四 上海'),(3,'王五 广州'),(4,'赵六 深圳'),(5,'田七 杭州')],schema='id int,name_address string')df.createTempView('t1')# 3- 执行相关的操作:# 需求: 自定义一个函数, 将姓名和地址拆分开schema = StructType().add('name',StringType()).add('address',StringType())@F.udf(returnType=schema)def split_data(data):res = data.split(' ')#return [res[0],res[1]]#return (res[0], res[1])return {'name':res[0],'address':res[1]}# 使用字典返回, key值 必须和schema中定义字段名称保持一致# 注册函数spark.udf.register('split_data',split_data)# 使用函数# SQLdf1 = spark.sql("""select*,split_data(name_address)['name'] as name,split_data(name_address)['address'] as addressfrom t1""")df1.printSchema()df1.show()# DSLdf.select('*',split_data('name_address')['name'].alias('name'),split_data('name_address')['address'].alias('address')).show()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_77918.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

sklearn使用入门

文章目录1.机器学习1.1 机器学习简介1.2 有监督学习(supervised learning)1.3 无监督学习(unsupervised learning)1.4 半监督学习2. 机器学习工具SKlearn2.1 sklearn2.2 sklearn常用模块2.2.1 分类2.2.2 回归2.2.3 聚类2.2.4 降维2.2.5 模型选择2.2.6 数据预处理2.3 sklearn使用…

ChatGPT vs Bard 背后的技术对比分析和未来发展趋势

ChatGPT vs Bard 背后的技术对比分析和未来发展趋势 目录 ChatGPT vs Bard 背后的技术对比分析和未来发展趋势

SQLI-Labs(3)8-14关【布尔盲注和时间盲注】

目录 第八关 第九关&#xff1a; 第十关 第十一关 第十二关 第十三关 第十四关 第八关 我们用测试语句来测试是否为注入点 从上图中得知存在注入点&#xff0c;那么接下来就是爆列 一共有三列&#xff0c;接下来用union select 和报错注入都试一下发现没有回显点&…

嵌入式安防监控项目——前期知识复习

目录 一、概述 二、C语言 三、数据结构 四、IO进程 五、网络 六、ARM体系结构和接口技术 七、系统移植 八、内核驱动 一、概述 我再报班之前学过51和32&#xff0c;不过都是自学的。报班开始先从应用层入手的&#xff0c;C语言和数据结构。只要是个IT专业的大学这都是必…

cadence专题【1】--多引脚IC如何创建orcad原理图库

cadense下载说明新建工程一、采用传统方式创建1、新建库文件2、放置pin array3、修改管脚信息二、采用电子表格方式创建1、新建库文件2、Ctrlc、Ctrlvcadense下载说明 cadence是目前最流行的EDA&#xff0c;下载装机全交给阿狸狗即可。 浏览器搜索cadence吴川斌或点击链接: ht…

既然有MySQL了,为什么还要有Redis?

目录专栏导读一、同样是缓存&#xff0c;用map不行吗&#xff1f;二、Redis为什么是单线程的&#xff1f;三、Redis真的是单线程的吗&#xff1f;四、Redis优缺点1、优点2、缺点五、Redis常见业务场景六、Redis常见数据类型1、String2、List3、Hash4、Set5、Zset6、BitMap7、Bi…

GDScript 导出变量 (Godot4.0)

概述 导出变量的功能在3.x版本中也是有的&#xff0c;但是4.0版本对其进行了语法上的改进。 导出变量在日常的游戏制作中提供节点的自定义参数化调节功能时非常有用&#xff0c;除此之外还用于自定义资源。 本文是&#xff08;Bilibili巽星石&#xff09;在4.0官方文档《GDScr…

Java学习笔记 --- jQuery

一、jQuery介绍 jQuery&#xff0c;顾名思义&#xff0c;也就是JavaScript和查询&#xff08;Query&#xff09;&#xff0c;它就是辅助JavaScript开发的js类库。它的核心思想是write less&#xff0c;do more&#xff08;写得更少&#xff0c;做得更多&#xff09;&#xff0c…

C语言实现扫雷【详细讲解+全部源码】

扫雷的实现1. 配置运行环境2. 扫雷游戏的初步实现2.1 建立扫雷分布模块2.2 创建名为board的二维数组并进行棋盘初始化2.3 打印棋盘3. 接下来该讨论的事情3.1 布置雷3.2 排查雷3.3 统计坐标周围有几个雷4. 完整扫雷游戏的实现4.1 game.h4.2 game.c4.3 扫雷.c1. 配置运行环境 本游…

信息安全与数学基础-笔记-③一次同余方程

知识目录一次同余方程的解中国剩余定理中国剩余定理的应用一次同余方程的解 本文只研究一次同余方程的解。 f(x) 三 0 (mod m)&#xff0c; 若有一个s能够满足该式子&#xff0c;那么该数字就是该式子的解&#xff0c; 在同余方程式中的解一般写成&#xff1a;x三s (mod m) 同…

04_Apache Pulsar的可视化监控管理、Apache Pulsar的可视化监控部署

1.4.Apache Pulsar的可视化监控管理 1.4.1.Apache Pulsar的可视化监控部署 1.4.Apache Pulsar的可视化监控管理 1.4.1.Apache Pulsar的可视化监控部署 第一步&#xff1a;下载Pulsar-Manager https://archive.apache.org/dist/pulsar/pulsar-manager/pulsar-manager-0.2.0/…

分布式对象存储——Apache Hadoop Ozone

前言 本文隶属于专栏《大数据技术体系》&#xff0c;该专栏为笔者原创&#xff0c;引用请注明来源&#xff0c;不足和错误之处请在评论区帮忙指出&#xff0c;谢谢&#xff01; 本专栏目录结构和参考文献请见大数据技术体系 1. 概述 Ozone是Apache Hadoop项目的子项目&#xf…

嵌入式和Python(二):python初识及其基本使用规则

目录 一&#xff0c;python基本特点 二&#xff0c;python使用说明 ● 两种编程方式 ① 交互式编程 ② 脚本式编程 ● python中文编码 ● python行和缩进 ● python引号 ● python空行 ● python等待用户输入 ① 没有转换变量类型 ② 转换变量类型 ● python变…

Raspbian镜像无头烧录

Raspbian镜像无头烧录1. 源由2. 需求3. 分析4. 步骤4.1 删除tf卡分区内容4.2 balena烧录镜像4.3 配置USB直接登录4.4 配置WiFi 2.4G网络登录4.5 修改登录账号密码4.6 数据同步和弹出tf卡5. 登录5.1 登录异常处理5.2 WiFi 2.4G网络登录5.3 USB直接登录6. 参考资料7. 补充资料这里…

套接字实现TCP

套接字 套接字的意义就是客户端与服务器进行双向通信的端点&#xff0c;如果有不理解点上面套接字三字更近距离了解套接字。 网络套接字与客户连接的特定网络有关的服务端口号&#xff0c;这个端口号允许linux进入特定的端口号的连接转到正确的服务器进程。 套接字通信的建立过…

JVM运行时数据区—程序计数器

JVM中的程序计数寄存器&#xff08;Program Counter Register&#xff09;中&#xff0c;Register的命名源于CPU的寄存器&#xff0c;寄存器存储指令相关的现场信息。CPU只有把数据装载到寄存器才能够运行。JVM中的PC寄存器是对物理PC寄存器的一种抽象模拟。 一个线程对应一个…

JavaScript事件循环及任务处理

JavaScript事件循环及任务处理## 浏览器中 JavaScript 的执行流程和 Node.js 中的流程都是基于 事件循环 的。 理解事件循环的工作方式对于代码优化、性能优化很重要&#xff0c;有时对于正确的架构也很重要。 我们首先介绍事件循环工作方式的理论细节&#xff0c;然后介绍该知…

MMSeg绘制模型指定层的Heatmap热力图

文章首发及后续更新&#xff1a;https://mwhls.top/4475.html&#xff0c;无图/无目录/格式错误/更多相关请至首发页查看。 新的更新内容请到mwhls.top查看。 欢迎提出任何疑问及批评&#xff0c;非常感谢&#xff01; 摘要&#xff1a;绘制模型指定层的热力图 可视化环境安装 …

Mysql数据库的(超详细)安装及环境变量的配置

一、 下载MySQL Mysql官网下载地址&#xff1a;https://downloads.mysql.com/archives/installer/ 1. 选择需要的版本点击Download进行下载 本篇文章选择的8.0.26版本 二、 安装MySQL 1. 选择设置类型 双击运行mysql-installer-community-8.0.26.msi&#xff0c;这里选择是…

数据库复习

什么是数据库系统 数据库系统是指在计算机系统中引入数据库后构成的系统&#xff0c;一般由数据库、数据库管理系统(及其开发工具)、应用系统、数据库管理员和用户构成 数据库系统的特点是什么&#xff1f; 数据结构化数据的共享性高&#xff0c;冗余度低且易扩充数据独立性高数…