基于Scala开发Spark ML的ALS推荐模型实战

news/2024/6/16 10:09:33/文章来源:https://blog.csdn.net/pblh123/article/details/137288160

推荐系统,广泛应用到电商,营销行业。本文通过Scala,开发Spark ML的ALS算法训练推荐模型,用于电影评分预测推荐。

算法简介

ALS算法是Spark ML中实现协同过滤的矩阵分解方法。

ALS,即交替最小二乘法(Alternating Least Squares),是协同过滤技术中的一种经典算法。它通过对用户和物品的潜在特征进行建模,来预测用户对未知物品的评分或偏好。具体介绍如下:

  1. 矩阵分解模型:在推荐系统中,我们通常有一个用户-物品的评分矩阵,其中行表示用户,列表示物品,矩阵中的值代表用户对物品的评分。然而,这个矩阵通常是非常稀疏的,因为用户只给少数物品评分。ALS算法就是在这样的不完整评分矩阵上操作,通过矩阵分解来补全缺失值,进而产生推荐。
  2. 算法原理:ALS算法的核心思想是通过迭代过程更新用户和物品的潜在因子向量。在每次迭代中,一个评分被建模为用户潜在特征向量和物品潜在特征向量的点积,加上一个偏差项。通过最小化实际评分和预测评分之间的差异来不断优化这些潜在特征向量。
  3. Spark ML实现:在Spark ML库中,ALS算法被用于处理大规模的数据集,并提供了多种参数以适应不同的数据特性和需求。例如,可以设置潜在因子的数量、正则化参数、迭代次数等。此外,Spark ML的ALS还支持隐式反馈数据的变体,这对于无法获取明确评分的数据非常有用。

总的来说,ALS是一种强大的推荐系统算法,尤其适用于处理大规模稀疏数据集。通过合理地选择和调整参数,可以在保持高效计算的同时获得良好的推荐质量。

代码实战

pom.xml文件更新,加入相关依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>sparkGNU2023</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><scala.version>2.13</scala.version><spark.version>3.4.1</spark.version><log4j.version>1.2.17</log4j.version><slf4j.version>1.7.22</slf4j.version></properties><dependencies><!--日志相关依赖--><dependency><groupId>org.slf4j</groupId><artifactId>jcl-over-slf4j</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>${log4j.version}</version></dependency><dependency><groupId>com.thoughtworks.paranamer</groupId><artifactId>paranamer</artifactId><version>2.8</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.13</artifactId><version>3.4.1</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.13</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.13</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.13</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.13</artifactId><version>3.4.1</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-mllib_2.13</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-8_2.11</artifactId><version>2.4.8</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.30</version></dependency><dependency><groupId>org.apache.flume.flume-ng-clients</groupId><artifactId>flume-ng-log4jappender</artifactId><version>1.11.0</version></dependency><!--        flume 拦截器相关依赖--><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version><scope>provided</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.62</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.6.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build></project>

训练ALS模型

基于scala训练ALS模型

package base.charpter10import breeze.linalg.sum
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.recommendation.ALS
import org.apache.spark.sql.functions.{col, count, explode, when}
import org.apache.spark.sql.{DataFrame, SparkSession}/*** @projectName sparkGNU2023  * @package base.charpter10  * @className base.charpter10.MovieRecommender  * @description ${description}  * @author pblh123* @date 2024/3/29 15:18* @version 1.0**/object MovieRecommender {def main(args: Array[String]): Unit = {// 创建Spark会话val spark = SparkSession.builder().appName("MovieRecommender").master("local[*]").getOrCreate()import spark.implicits._// 假设我们有一个用户-物品评分数据集,格式为(userId, itemId, rating)/*** UserID,MovieID,Rating,Timestamp*  1,1193,5,978300760*  1,661,3,978302109*/// 指定CSV文件的路径,以及解析选项val csvFilePath = "data/ratings.csv"val csvOptions = Map("header" -> "true", // 是否有列名头"inferSchema" -> "true", // 是否自动推断数据类型"encoding" -> "UTF-8", // 如果有特定的编码格式,例如对于包含中文的CSV文件:)// 读取CSV文件并创建DataFrameval ratingsDF = spark.read.format("csv").options(csvOptions).load(csvFilePath)// 显示DataFrame的前几行以验证数据是否正确加载println("查看原始据数据样例:")ratingsDF.show(5)val ratings: DataFrame = ratingsDF.select("UserID", "MovieID", "Rating").withColumnRenamed("UserID", "userId").withColumnRenamed("MovieID", "itemId").withColumnRenamed("Rating", "rating")// 将数据集分割为训练集和测试集val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2))println("查看训练集数据")training.show(5)println("查看测试集数据")test.show(5)// 设置ALS参数// 创建一个ALS实例并配置参数val als = new ALS().setMaxIter(10) // 设置最大迭代次数为5,10,本地测试时,设置过大,会报错.setRegParam(0.01) // 设置正则化参数为0.01.setUserCol("userId") // 设置用户列名为"userId".setItemCol("itemId") // 设置物品列名为"itemId".setRatingCol("rating") // 设置评分列名为"rating"/*** ALS(Alternating Least Squares)是一种基于矩阵分解的协同过滤算法,用于处理用户和物品之间的评分数据。各参数说明如下:*  setMaxIter: 设置最大迭代次数,决定模型训练的精细程度。迭代次数越多,模型通常越精确,但训练时间也可能更长。*  setRegParam: 设置正则化参数,用于控制模型的复杂度和过拟合程度。较小的正则化参数值可能导致模型过复杂,容易过拟合;较大的值则可能导致模型过于简单,欠拟合。*  setUserCol, setItemCol, setRatingCol: 分别设置用户ID列、物品ID列和评分列的名称。这些列名根据实际的数据结构来确定,用于告诉ALS算法在哪些列中查找用户、物品和评分信息。*/// 训练ALS模型println("开始训练模型")val model = als.fit(training)// 对测试集进行预测val predictions = model.transform(test)predictions.show()predictions.filter($"rating".isNotNull && $"prediction".isNotNull).count() // 确认有非空的评分和预测值// 评估模型val evaluator = new RegressionEvaluator().setMetricName("rmse").setLabelCol("rating").setPredictionCol("prediction")val rmse = evaluator.evaluate(predictions)println(s"Root-mean-square error = $rmse")// 为用户生成推荐// 该函数是基于一个模型(model)为所有用户推荐项目的函数。它将为每个用户推荐5个项目/*** +------+--------------------------------------------------------------------------------------------+*  |userId|recommendations[{itemid,pred_rating},{itemid,pred_rating},...]                                                                             |*  +------+--------------------------------------------------------------------------------------------+*  |12    |[{1864, 9.721167}, {2964, 8.815781}, {3867, 8.480173}, {1539, 7.8904114}, {563, 7.8829007}] |*  |22    |[{2964, 6.090676}, {3215, 5.6165895}, {1534, 5.4731245}, {718, 5.462125}, {2632, 5.4482727}]|*/val userRecs = model.recommendForAllUsers(5)userRecs.show(5,false)println("保存预测结果")
//    userRecs.write.mode("overwrite").parquet("models/recomALSmodel") // 保存为parquet格式,一般用于集群中
// userRecs是一个DataFrame,其中"recommendations"列是数组类型val explodedUserRecs = userRecs.withColumn("recommendations", explode($"recommendations")).select($"userId", $"recommendations.itemId".as("itemId"), $"recommendations.rating".as("PredRating"))explodedUserRecs.write.mode("overwrite").format("csv").save("predictRes/recomALS")  // PC 调试使用// 保存模型到指定路径val modelPath = "models/recomALSmodel"model.write.overwrite().save(modelPath)println(s"Model saved to $modelPath")// 停止Spark会话spark.stop()/*当程序试图停止Spark会话时,可能会触发清理临时文件的操作,从而导致出现NoSuchFileException异常。通常情况下,这不是代码逻辑的问题,而是Spark内部在清理资源时可能出现的问题。可以尝试重启Spark环境或者适当增大Spark的临时目录空间来避免此类问题。*/}}

运行代码,效果图如下

TodoList:目前RMSE计算出问题,原数据清洗没有做,模型参数还可以调整。后期调整更新后,再发一篇文章。

使用训练的模型预测新数据

scala开发应用模型demo代码

package base.charpter10import org.apache.spark.ml.recommendation.ALSModel
import org.apache.spark.sql.SparkSession/*** @projectName sparkGNU2023  * @package base.charpter10  * @className base.charpter10.RecommendationModelLoadDemo  * @description ${description}  * @author pblh123* @date 2024/3/29 15:36* @version 1.0**/object RecommendationModelLoadDemo {def main(args: Array[String]): Unit = {// 创建Spark会话val spark = SparkSession.builder().master("local[*]").appName("RecommendationModelUsageDemo").getOrCreate()import spark.implicits._// 加载之前保存的ALS模型val modelPath = "models/recomALSmodel"val loadedModel: ALSModel = ALSModel.load(modelPath)// 假设我们有一些新的用户-物品对,我们想要预测它们的评分val userItemPairs = Seq((1, 4), // 用户1对物品4的评分预测(2, 2) // 用户2对物品2的评分预测).toDF("userId", "itemId")// 使用模型进行评分预测val predictions = loadedModel.transform(userItemPairs)predictions.show()// 现在,假设我们想要为用户1生成前N个推荐物品val numRecommendations = 5 // 为用户推荐的物品数量val userRecs = loadedModel.recommendForAllUsers(numRecommendations)userRecs.show(5,false)// 停止Spark会话spark.stop()}}

运行效果如下

评估效果说明:目前的预测评分不合理,是因为模型没有经过精挑,优化,预测的记过会依据预测评分高低排序,选取得分高的前5个结果返回。后期模型调优后,结果就正常了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_1034745.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

VUE——生命周期

概念&#xff1a; mounted:挂载 new Vue({el: "#x",data: {},methods: {},mounted() {}, }) 系统会自己调用&#xff0c;不需要我们调用。 案例 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><…

走进《与凤行》感受维达棉韧联名魅力

随着《与凤行》的热播&#xff0c;维达也陆续推出了各个纸抽系列的联名&#xff0c;棉韧联名就是这样一款结合了维达品牌优质棉韧面巾纸和《与凤行》IP元素的产品。这款软抽不仅在质地上保持了维达棉韧系列的柔软舒适&#xff0c;还融入了《与凤行》的设计元素&#xff0c;为用…

【威胁情报综述阅读3】Cyber Threat Intelligence Mining for Proactive Cybersecurity Defense

【威胁情报综述阅读1】Cyber Threat Intelligence Mining for Proactive Cybersecurity Defense: A Survey and New Perspectives 写在最前面一、介绍二、网络威胁情报挖掘方法和分类A. 研究方法1&#xff09; 第 1 步 - 网络场景分析&#xff1a;2&#xff09; 第 2 步 - 数据…

如何优化Flutter应用以通过iOS应用商店的审核流程

本文探讨了使用Flutter开发的iOS应用能否上架&#xff0c;以及上架的具体流程。苹果提供了App Store作为正式上架渠道&#xff0c;同时也有TestFlight供开发者进行内测。合规并通过审核后&#xff0c;Flutter应用可以顺利上架。但上架过程可能存在一些挑战&#xff0c;因此可能…

百度语音识别

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 一、建号—获取试用KEY二、测试代码三、运行四、运行结果五、验证五、总结 一、建号—获取试用KEY https://console.bce.baidu.com/ai/#/ai/speech/overview/index…

第117讲:深入MySQL性能优化:从多个角度提升数据库性能

文章目录 1.从哪些角度去考虑MySQL的优化2.数据库服务器的选型3.从操作系统层面去优化MySQL数据库3.1.关于CPU方面的优化3.2.关于内存方面的优化3.3.关于磁盘IO方面 4.应用端的优化5.数据库系统优化工具6.数据库系统参数优化6.1.最大连接数的优化&#xff08;max_connections&a…

【蓝桥杯选拔赛真题54】C++分糖果游戏 第十四届蓝桥杯青少年创意编程大赛 算法思维 C++编程选拔赛真题解

目录 C分糖果游戏 一、题目要求 1、编程实现 2、输入输出 二、算法分析 三、程序编写 四、程序说明 五、运行结果 六、考点分析 七、推荐资料 C分糖果游戏 第十四届蓝桥杯青少年创意编程大赛C选拔赛真题 一、题目要求 1、编程实现 现有N罐糖果&#xff0c;且已知…

数据库加载驱动问题(java.lang.ClassNotFoundException: com.mysql.cj.jdbc.Driver)

java.lang.ClassNotFoundException: com.mysql.cj.jdbc.Driver 遇到此问题&#xff0c;首先检查IDEA外部库中是否有mysql数据库驱动。如下所示&#xff1a; 如果发现外部库中存有mysql数据库驱动&#xff0c;需要在数据库配置文件中查看是否设置有时区mysql8.0以上版本需要设…

鸿蒙TypeScript入门学习第6天:【条件语句】

1、TypeScript 条件语句 条件语句用于基于不同的条件来执行不同的动作。 TypeScript 条件语句是通过一条或多条语句的执行结果&#xff08;True 或 False&#xff09;来决定执行的代码块。 可以通过下图来简单了解条件语句的执行过程: 2、条件语句 通常在写代码时&#xff0…

CQI-17:2021 V2 英文 、中文版。特殊过程:电子组装制造-锡焊系统评审标准

锡焊作为一个特殊的工艺过程&#xff0c;由于其材料特性的差异性、工艺参数的复杂性和过程控制的不确定性&#xff0c;长期以来一直视为汽车零部件制造业的薄弱环节&#xff0c;并将很大程度上直接导致整车产品质量的下降和召回风险的上升。 美国汽车工业行动集团AIAG的特别工…

AWS创建IAM用户,以及通过IAM用户登录

基本概念&#xff1a; IAM Identity Center&#xff08;AWS SSO&#xff09; 跨账户访问&#xff1a;IAM Identity Center允许用户使用他们自己的单一登录凭证来访问多个AWS账户和应用程序。这意味着你可以拥有一个账户和密码&#xff0c;通过IAM Identity Center的用户门户&…

Linux_进程的优先级环境变量上下文切换

文章目录 一、进程的优先级二、进程的四个重要概念三、上下文切换四、环境变量4.1 查看当前shell环境下的环境变量与内容 一、进程的优先级 什么是优先级&#xff1f; 指定一个进程获取某种资源的先后顺序本质是进程获取cpu资源的优先顺序 为什么要有优先级 进程访问的资源&am…

基于单片机和PCF8591波形发生器可调系统设计

**单片机设计介绍&#xff0c;基于单片机和PCF8591波形发生器可调系统设计 文章目录 一 概要二、功能设计设计思路 三、 软件设计原理图 五、 程序六、 文章目录 一 概要 基于单片机和PCF8591的波形发生器可调系统设计是一个结合了硬件与软件技术的综合性项目。这种设计旨在通…

VXLAN技术解析

什么是VXLAN VXLAN&#xff08;Virtual eXtensible Local Area Network&#xff0c;虚拟扩展局域网&#xff09;&#xff0c;是由IETF定义的NVO3&#xff08;Network Virtualization over Layer 3&#xff09;标准技术之一&#xff0c;是对传统VLAN协议的一种扩展。VXLAN的特点…

Python快速入门系列-8(Python数据分析与可视化)

第八章:Python数据分析与可视化 8.1 数据处理与清洗8.1.1 数据加载与查看8.1.2 数据清洗与处理8.1.3 数据转换与整理8.2 数据可视化工具介绍8.2.1 Matplotlib8.2.2 Seaborn8.2.3 Plotly8.3 数据挖掘与机器学习简介8.3.1 Scikit-learn8.3.2 TensorFlow总结在本章中,我们将探讨…

史上最强 PyTorch 2.2 GPU 版最新安装教程

一 深度学习主机 1.1 配置 先附上电脑配置图&#xff0c;如下&#xff1a; 利用公司的办公电脑对配置进行升级改造完成。除了显卡和电源&#xff0c;其他硬件都是公司电脑原装。 1.2 显卡 有钱直接上 RTX4090&#xff0c;也不能复用公司的电脑&#xff0c;其他配置跟不上。…

势拓伺服科技将莅临2024年第13届生物发酵展

参展企业介绍 厦门势拓伺服科技股份有限公司是福建省冶金(控股)有限责任公司的控股子公司&#xff0c;厦门钨业股份有限公司参股&#xff0c;是厦钨永磁电机产业园的主体之一。公司专业从事永磁同步电机的研发、设计与制造及电机的配套及延伸产业。 公司生产的永磁同步电机包括…

Transformer - 掩码张量

Transformer - 掩码张量 flyfish 在编写注意力函数时&#xff0c;其中之一参数为掩码张量mask attention(query, key, value, maskmask)先看一个函数np.triu import numpy as npsize 10 attn_shape (1, size, size)subsequent_mask np.triu(np.ones(attn_shape), k0).asty…

3.6k star, 免费开源跨平台的数据库管理工具 dbgate

3.6k star, 免费开源跨平台的数据库管理工具 dbgate 分类 开源分享 项目名: dbgate -- 免费开源跨平台的数据库管理工具 Github 开源地址&#xff1a; GitHub - dbgate/dbgate: Database manager for MySQL, PostgreSQL, SQL Server, MongoDB, SQLite and others. Runs under…

TRL校准的解析计算过程

目录 0 引言 1 信号流图 2 信号流图的分解 3 TRL校准的解析 4 仿真验证 5 总结 参考文献 0 引言 矢量网络分析仪(VNA)通常被校准到它们自己的参考平面上,但是,由于测量夹具以及线缆等的使用,这些参考平面通常不同于待测件(DUT)的参考平面,而这些连接装置,例如射…