Spark 分析计算连续三周登录的用户数

news/2024/4/26 9:18:21/文章来源:https://blog.csdn.net/weixin_46389691/article/details/129177858

前言:本文用到了窗口函数 range between,可以参考这篇博客进行了解——窗口函数rows between 、range between的使用

创建数据环境

在 MySQL 中创建数据测试表 log_data

create table if not exists  log_data(
log_id varchar(200) comment '日志id',
user_id  varchar(200) comment '用户id', 
log_time datetime NULL DEFAULT NULL comment '登录时间');

插入数据:

insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('1', '2022-03-10 10:08:13', '1000');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('2', '2022-03-18 10:33:22', '1000');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('3', '2022-03-26 18:59:19', '1000');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('4', '2022-03-03 20:59:13', '1001');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('5', '2022-03-10 05:53:49', '1001');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('6', '2022-02-26 02:27:51', '1001');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('7', '2022-03-01 20:59:13', '1002');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('8', '2022-03-07 05:53:49', '1002');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('9', '2022-02-28 02:27:51', '1002');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('10', '2022-02-27 20:59:13', '1003');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('11', '2022-03-05 05:53:49', '1003');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('12', '2022-03-12 02:27:51', '1003');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('13', '2022-02-28 20:59:13', '1004');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('14', '2022-03-05 05:53:49', '1004');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('15', '2022-03-18 02:27:51', '1004');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('16', '2022-02-25 20:59:13', '1005');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('17', '2022-03-04 05:53:49', '1005');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('18', '2022-03-11 02:27:51', '1005');

需求介绍

login_time 字段值为 2022-03-10 的最近连续三周登录的用户数,最终使用 Spark 中的 show 算子输出如下字段:

字段名介绍备注
end_date数据统计日期2022-03-10
active_total活跃用户数
date_range统计周期格式:统计开始时间_结束时间

需求分析

根据 login_time 字段值为 2022-03-10 的最近连续三周登录的用户数,想要完成这个需求,我们得先拿到最近三周的开始时间和结束时间,然后筛选范围数据,最后利用窗口函数计算其连续性。

  1. 确定当前 2022-03-10 是周几,然后求得周日的日期(也就是这三周的最后一天)。
  2. 拿到 2022-03-10 这周的周日时间后,获取两周前的开始日期(也就是这三周的第一天)。
  3. 筛选范围,计算每位用户是否符合三周的连续性。

需求实现

import org.apache.spark.sql.SparkSessionobject Get3WeekUserCnt {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("Get3WeekUserCnt").master("local[*]").getOrCreate()// 1.读取 MySQL 数据spark.read.format("jdbc").option("driver","com.mysql.jdbc.Driver").option("url","jdbc:mysql://master:3306/test").option("user","root").option("password","123456").option("dbtable","log_data").load().createTempView("data")// 2.获取 2022-03-10 这周的周日时间spark.sql("""|select|   user_id,|   log_time, |   date_add("2022-03-10",if(pmod(datediff("2022-03-10","1970-01-01")-3,7)=0,0,7-pmod(datediff("2022-03-10","1970-01-01")-3,7))) date_end|from|   data|""".stripMargin).createTempView("first_data")// 3.获取两周前的日期,筛选符合要求的数据spark.sql("""|select|   *|from|   (select|      user_id,|      date(log_time) log_date,|      date_end,|      date_sub(date_end,20) date_begin|   from|      first_data)t1|where|   log_date <= date_end|   and|   log_date >= date_begin|""".stripMargin).createTempView("second_data")// 4.计算各个用户三周连续性spark.sql("""|select|   end_date,|   count(distinct user_id) active_total,|   date_range|from|   (select|      "2022-03-10" end_date,|       user_id,|       concat(date_begin,"_",date_end) date_range,|       count(user_id) over(partition by user_id order by unix_timestamp(log_date,"yyyy-MM-dd") range between current row and 6*3600*24 following) cnt_1week,|       count(user_id) over(partition by user_id order by unix_timestamp(log_date,"yyyy-MM-dd") range between 7*3600*24 following and 13*3600*24 following) cnt_2week,|       count(user_id) over(partition by user_id order by unix_timestamp(log_date,"yyyy-MM-dd") range between 14*3600*24 following and 20*3600*24 following) cnt_3week|   from|      second_data)t1|where|   cnt_1week >= 1|   and|   cnt_2week >= 1|   and|   cnt_3week >= 1|group by|   end_date,|   date_range|""".stripMargin).show()spark.stop()}}

输出结果如下:

结果验证:

输出结果有误!

通过对日历与日期进行对比,我们可以发现,实际上一共有3位用户满足连续登录三个月的请求。那么这是什么原因呢?首先,我们主要来看一下日期计算的这部分代码:

count(user_id) over(partition by user_id order by unix_timestamp(log_date,"yyyy-MM-dd") range between current row and 6*3600*24 following) cnt_1week,count(user_id) over(partition by user_id order by unix_timestamp(log_date,"yyyy-MM-dd") range between 7*3600*24 following and 13*3600*24 following) cnt_2week,count(user_id) over(partition by user_id order by unix_timestamp(log_date,"yyyy-MM-dd") range between 14*3600*24 following and 20*3600*24 following) cnt_3week

其中,第一行 cnt_1week 统计的是各个用户第一周登录的次数,后面两行以此类推。我们将用户 1001 代入其中,很快就发现了问题,如下:

我们设置的时间跨度为一周,但是没有考虑到该日期为本周的第几天。通过用户 1001 可以看出,如果是这样算的话,就相当于我们把每个日期都当成了每周的第一天。就比如我们这里将 26 号作为了第一天,而六天后,也就是3月4号,变成了这周的最后一天。它将2.26号——3.4号作为了一个自然周。很明显,我们这样计算是错误的。

解决方法也很简单,我们可以通过一个字段来记录每个用户登录日期的那一周的周天日期,然后在日期计算的时候将该字段作为计算的考量日期。

修改后代码如下:

import org.apache.spark.sql.SparkSessionobject Get3WeekUserCnt {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("Get3WeekUserCnt").master("local[*]").getOrCreate()// 1.读取 MySQL 数据spark.read.format("jdbc").option("driver","com.mysql.jdbc.Driver").option("url","jdbc:mysql://master:3306/test").option("user","root").option("password","123456").option("dbtable","log_data").load().createTempView("data")// 2.获取 2022-03-10 这周的周日时间spark.sql("""|select|   user_id,|   date(log_time) log_date,|   date_add("2022-03-10",if(pmod(datediff("2022-03-10","1970-01-01")-3,7)=0,0,7-pmod(datediff("2022-03-10","1970-01-01")-3,7))) date_end|from|   data|""".stripMargin).createTempView("first_data")// 3.获取两周前的日期与当前日期周天的日期,筛选符合要求的数据spark.sql("""|select|   *|from|   (select|      user_id,|      date_add(log_date,if(pmod(datediff(log_date,"1970-01-01")-3,7)=0,0,7-pmod(datediff(log_date,"1970-01-01")-3,7))) log_week_end_date,|      date_end,|      date_sub(date_end,20) date_begin|   from|      first_data)t1|where|   log_date <= date_end|   and|   log_date >= date_begin|""".stripMargin).createTempView("second_data")// 4.计算各个用户三周连续性spark.sql("""|select|   end_date,|   count(distinct user_id) active_total,|   date_range|from|   (select|      "2022-03-10" end_date,|	   user_id,|      concat(date_begin,"_",date_end) date_range,|      count(user_id) over(partition by user_id order by unix_timestamp(log_week_end_date,"yyyy-MM-dd") range between 6*3600*24 preceding and current row) cnt_1week,|      count(user_id) over(partition by user_id order by unix_timestamp(log_week_end_date,"yyyy-MM-dd") range between 1*3600*24 following and 7*3600*24 following) cnt_2week,|      count(user_id) over(partition by user_id order by unix_timestamp(log_week_end_date,"yyyy-MM-dd") range between 8*3600*24 following and 14*3600*24 following) cnt_3week|   from|      second_data)t1|where|   cnt_1week >= 1|   and|   cnt_2week >= 1|   and|   cnt_3week >= 1|group by|   end_date,|   date_range|""".stripMargin).show()spark.stop()}}

注意,最后统计数量的时候不要忘记对用户的 ID 进行去重!

输出结果如下:

Bingo~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_74756.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

代码随想录【Day27】| 39. 组合总和、40. 组合总和 II、131. 分割回文串

39. 组合总和 题目链接 题目描述&#xff1a; 给定一个无重复元素的数组 candidates 和一个目标数 target &#xff0c;找出 candidates 中所有可以使数字和为 target 的组合。 candidates 中的数字可以无限制重复被选取。 说明&#xff1a; 所有数字&#xff08;包括 tar…

taobao.top.secret.bill.detail( 服务商的商家解密账单详情查询 )

&#xffe5;免费必须用户授权 服务商的商家解密账单详情查询&#xff0c;仅对90天内的账单提供SLA保障。 公共参数 请求地址: HTTP地址 http://gw.api.taobao.com/router/rest 公共请求参数: 公共响应参数: 请求参数 响应参数 点击获取key和secret 请求示例 TaobaoClient…

js 拖动--动态改变div的宽高大小

index.html 如下&#xff1a;&#xff08;可以新建一个index.html文件直接复制&#xff0c;打开运行&#xff09; <!DOCTYPE html> <html lang"en"> <head> <meta charset"UTF-8"> <meta http-equiv"X-UA-Compatible&qu…

Mybatis源码学习笔记(五)之Mybatis框架缓存机制原理解析

1 Mybatis框架的缓存模块 MyBatis 内置了一个强大的事务性查询缓存机制&#xff0c;它可以非常方便地配置和定制。Mybatis框架中的缓存分为一级缓存和二级缓存&#xff0c;三级缓存基本都要借助自定义缓存或第三方服务来进行实现。但本质上是一样的&#xff0c;都是借助Cache接…

【Python学习笔记】第十九节 Python 面向对象(一)

在现实世界中&#xff0c;随处可见的一种事物就是对象&#xff0c;对象是事物存在的实体&#xff0c;如学生、汽车等。人类解决问题的方式总是将复杂的事物简单化&#xff0c;于是就会思考这些对象都是由哪些部分组成的。通常都会将对象划分为两个部分&#xff0c;即静态部分与…

SSL证书对虚拟主机的用处有哪些?

虚拟主机是指在同一台服务器上&#xff0c;通过不同的域名或IP地址为多个网站提供服务的一种网络主机。而SSL证书则是一种数字证书&#xff0c;它用于加密网站与用户之间的通信&#xff0c;确保数据传输的安全性和完整性。在虚拟主机上&#xff0c;SSL证书有以下几个用处&#…

HiveSql一天一个小技巧:如何巧用分布函数percent_rank()求去掉最大最小值的平均薪水问题

0 问题描述参考链接(3条消息) HiveSql面试题12--如何分析去掉最大最小值的平均薪水&#xff08;字节跳动&#xff09;_莫叫石榴姐的博客-CSDN博客文中已经给出了三种解法&#xff0c;这里我们借助于此题&#xff0c;来研究如何用percent_rank()函数求解&#xff0c;简化解题思路…

深入理解C#的协变和逆变及其限制原因

阅读本文需要的一些前置知识&#xff1a; C#基本语法、C#的泛型使用、C#的运行过程 由于协变和逆变存在一些细节&#xff0c;在阅读时请注意“接口”和“类型”的差异&#xff0c;此外&#xff0c;文中有可能在不同的语境中将“结构体”和“值类型”混用&#xff0c;但表达的同…

深入浅出1588v2(PTP)里的时间同步原理

1.时间同步1.1 单步同步(OneStep)单步同步最为简单&#xff0c;master向slave发送一个sync的同步包&#xff0c;同步包里带有这条信息发送时master的当前时间t1&#xff0c;假如这条信息从master传输到slave需要的传输时间是D&#xff0c;那么slave收到信息时&#xff0c;maste…

BIM小技巧丨关于如何在Revit明细表中显示门窗面积

在明细表中显示门窗面积(以门明细表为例)在新建一个门明细表后&#xff0c;可以发现在Revit中不能直接使用明细表统计门窗面积。 这时&#xff0c;可以通过使用添加“计算值”的方式来处理&#xff0c;得到如下图所示&#xff0c;两种不同的面积统计结果&#xff1a; 除此之外&…

前端基础之CSS扫盲

文章目录一. CSS基本规范1. 基本语法格式2. 在HTML引入CSS3. 选择器分类二. CSS常用属性1. 文本属性2. 文本格式3. 背景属性4. 圆角矩形和圆5. 元素的显示模式6. CSS盒子模型7. 弹性布局光使用HTML来写一个前端页面的话其实只是写了一个大体的框架, 整体的页面并不工整美观, 而…

ledcode【用队列实现栈】

目录 题目描述&#xff1a; 解析题目 代码解析 1.封装一个队列 1.2封装带两个队列的结构体 1.3封装指向队列的结构体 1.4入栈函数实现 1.5出栈函数实现 1.6取栈顶数据 1.7判空函数实现 题目描述&#xff1a; 解析题目 这个题我是用c语言写的&#xff0c;所以队列的pu…

JavaSE-3 Java运行原理

一、Java的运行过程 &#x1f34e;Java程序运行时,必须经过编译和运行两个步骤。 首先将后缀名为.java的源文件进行编译,最终生成后缀名为.class的字节码文件。然后Java虚拟机将字节码文件进行解释执行,并将结果显示出来。具体过程如下图所示。 &#x1f349;Java程序的运行过…

【Python数据挖掘入门】2.2文本分析-中文分词(jieba库cut方法/自定义词典load_userdict/语料库分词)

中文分词就是将一个汉字序列切分成一个一个单独的词。例如&#xff1a; 另外还有停用词的概念&#xff0c;停用词是指在数据处理时&#xff0c;需要过滤掉的某些字或词。 一、jieba库 安装过程见&#xff1a;https://blog.csdn.net/momomuabc/article/details/128198306 ji…

数字IC手撕代码--小米科技(除法器设计)

前言&#xff1a; 本专栏旨在记录高频笔面试手撕代码题&#xff0c;以备数字前端秋招&#xff0c;本专栏所有文章提供原理分析、代码及波形&#xff0c;所有代码均经过本人验证。目录如下&#xff1a;1.数字IC手撕代码-分频器&#xff08;任意偶数分频&#xff09;2.数字IC手撕…

原始GAN-pytorch-生成MNIST数据集(代码)

文章目录原始GAN生成MNIST数据集1. Data loading and preparing2. Dataset and Model parameter3. Result save path4. Model define6. Training7. predict原始GAN生成MNIST数据集 原理很简单&#xff0c;可以参考原理部分原始GAN-pytorch-生成MNIST数据集&#xff08;原理&am…

记一次线上es慢查询导致的服务不可用

现象 某日线上业务同学反馈订单列表查询页面一直loding&#xff0c;然后提示请求超时&#xff0c;几分钟之后恢复正常 接到报障之后&#xff0c;马上根据接口URL&#xff0c;定位到了请求链路&#xff0c;发现是es查询超时&#xff0c;这里我们的业务订单表数据是由几百万的&a…

如何基于MLServer构建Python机器学习服务

文章目录前言一、数据集二、训练 Scikit-learn 模型三、基于MLSever构建Scikit-learn服务四、测试模型五、训练 XGBoost 模型六、服务多个模型七、测试多个模型的准确性总结参考前言 在过去我们训练模型&#xff0c;往往通过编写flask代码或者容器化我们的模型并在docker中运行…

Python学习笔记202302

1、numpy.empty 作用&#xff1a;根据给定的维度和数值类型返回一个新的数组&#xff0c;其元素不进行初始化。 用法&#xff1a;numpy.empty(shape, dtypefloat, order‘C’) 2、logging.debug 作用&#xff1a;Python 的日志记录工具&#xff0c;这个模块为应用与库实现了灵…

C# Sqlite数据库加密

sqlite官方的数据库加密是收费的&#xff0c;而且比较贵。 幸亏微软提供了一种免费的方法。 1 sqlite加密demo 这里我做了一个小的demo演示如下&#xff1a; 在界面中拖入数据库名、密码、以及保存的路径 比如我选择保存路径桌面的sqlite目录&#xff0c;数据库名guigutool…