Spark 对hadoopnamenode-log文件进行数据清洗并存入mysql数据库

news/2024/5/5 17:57:45/文章来源:https://blog.csdn.net/weixin_53898747/article/details/130118719

一.查找需要清洗的文件

1.1查看hadoopnamenode-log文件位置

 1.2 开启Hadoop集群和Hive元数据、Hive远程连接

具体如何开启可以看我之前的文章:(10条消息) SparkSQL-liunx系统Spark连接Hive_难以言喻wyy的博客-CSDN博客

 1.3 将这个文件传入到hdfs中:

hdfs dfs -put hadoop-root-namenode-gree2.log   /tmp/hadoopNamenodeLogs/hadooplogs/hadoop-root-namenode-gree2.log

二.日志分析

将里面部分字段拿出来分析:

 2023-02-10 16:55:33,123 INFO org.apache.hadoop.hdfs.server.namenode.NameNode: registered UNIX signal handlers for [TERM, HUP, INT]
2023-02-10 16:55:33,195 INFO org.apache.hadoop.hdfs.server.namenode.NameNode: createNameNode []
2023-02-10 16:55:33,296 INFO org.apache.hadoop.metrics2.impl.MetricsConfig: loaded properties from hadoop-metrics2.properties
2023-02-10 16:55:33,409 INFO org.apache.hadoop.metrics2.impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).

可以看出其可以以INFO来作为中间字段,用indexof读取出该位置索引,以截取字符段的方式来将清洗的数据拿出。

三.代码实现

3.1 对数据进行清洗

object hadoopDemo {def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession.builder().master("local[*]").appName("HadoopLogsEtlDemo").getOrCreate()val sc: SparkContext = spark.sparkContextimport spark.implicits._import org.apache.spark.sql.functions._
//    TODO 根据INFO这个字段来对数据进行封装到Row中。val row: RDD[Row] = sc.textFile("hdfs://192.168.61.146:9000/tmp/hadoopNamenodeLogs/hadooplogs/hadoop-root-namenode-gree2.log").filter(x => {x.startsWith("2023")}).map(x => {val strings: Array[String] = x.split(",")val num1: Int = strings(1).indexOf(" INFO ")val num2: Int = strings(1).indexOf(":")if(num1!=(-1)){val str1: String = strings(1).substring(0, num1)val str2: String = strings(1).substring(num1 + 5, num2)val str3: String = strings(1).substring(num2 + 1, strings(1).length)Row(strings(0), str1, "INFO",str2, str3)}else {val num3: Int = strings(1).indexOf(" WARN ")val num4: Int = strings(1).indexOf(" ERROR ")if(num3!=(-1)&&num4==(-1)){val str1: String = strings(1).substring(0, num3)val str2: String = strings(1).substring(num3 + 5, num2)val str3: String = strings(1).substring(num2 + 1, strings(1).length)Row(strings(0), str1,"WARN", str2, str3)}else{val str1: String = strings(1).substring(0, num4)val str2: String = strings(1).substring(num4 + 6, num2)val str3: String = strings(1).substring(num2 + 1, strings(1).length)Row(strings(0), str1,"ERROR", str2, str3)}}})val schema: StructType = StructType(Array(StructField("event_time", StringType),StructField("number", StringType),StructField("status", StringType),StructField("util", StringType),StructField("info", StringType),))val frame: DataFrame = spark.createDataFrame(row, schema)frame.show(80,false)}}

清洗后的效果图:

 

3.2  创建jdbcUtils来将其数据导入到数据库:

object jdbcUtils {val url = "jdbc:mysql://192.168.61.141:3306/jsondemo?createDatabaseIfNotExist=true"val driver = "com.mysql.cj.jdbc.Driver"val user = "root"val password = "root"val table_access_logs: String = "access_logs"val table_full_access_logs: String = "full_access_logs"val table_day_active:String="table_day_active"val table_retention:String="retention"val table_loading_json="loading_json"val table_ad_json="ad_json"val table_notification_json="notification_json"val table_active_background_json="active_background_json"val table_comment_json="comment_json"val table_praise_json="praise_json"val table_teacher_json="teacher_json"val properties = new Properties()properties.setProperty("user", jdbcUtils.user)properties.setProperty("password", jdbcUtils.password)properties.setProperty("driver", jdbcUtils.driver)def dataFrameToMysql(df: DataFrame, table: String, op: Int = 1): Unit = {if (op == 0) {df.write.mode(SaveMode.Append).jdbc(jdbcUtils.url, table, properties)} else {df.write.mode(SaveMode.Overwrite).jdbc(jdbcUtils.url, table, properties)}}def getDataFtameByTableName(spark:SparkSession,table:String):DataFrame={val frame: DataFrame = spark.read.jdbc(jdbcUtils.url, table, jdbcUtils.properties)frame}}

3.3 数据导入

jdbcUtils.dataFrameToMysql(frame,jdbcUtils.table_day_active,1)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_285722.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

windows系统管理_windows server 2016 用户管理

用户账户的概述 **计算机用户账户:**由将用户定义到某一系统的所有信息组成的记录,账户为用户或计算机提供安 全凭证,包括用户名和用户登陆所需要的密码,以及用户使用以便用户和计算机能够登录到网络并 访问域资源的权利和权限。不同的身份拥…

【Obsidian】基础使用手册(包括如何将Obsidian页面设置为中文)

💗 未来的游戏开发程序媛,现在的努力学习菜鸡 💦本专栏是我关于工具类软件的笔记 🈶本篇是Obsidian的基础使用 Obsidian的基础使用将页面设置为中文常用的默认快捷键常用的格式标题代码块表格字体样式列表任务列表官方下载地址&am…

【音视频第11天】GCC论文阅读(2)

A Google Congestion Control Algorithm for Real-Time Communication draft-alvestrand-rmcat-congestion-03论文理解 看中文的GCC算法一脸懵。看一看英文版的,找一找感觉。 目录Abstract1. Introduction1.1 Mathematical notation conventions2. System model3.Fe…

获取淘宝商品分类详情API,抓取淘宝全品类目API接口分享(代码展示、参数说明)

商品分类技巧 淘宝店铺分类怎么设置?我们登录卖家账号的时候,我们看到自己的商品,会想要给商品进行分类,一个好的分类可以帮助提高商品的曝光率。那么在给商品分类前,如果您毫无头绪,以下几点可以给您带来…

车载网络 - Autosar网络管理 - 网络管理简介

一、什么是CAN网络管理及它的作用 现在的车辆是由大量的ECU节点组成的,为了能使各ECU能够正确并及时地进行CAN通信,需要有一套机制来统一协调总线上各节点的休眠唤醒,这套机制就是CAN网络管理(NM)。 网络管理的目的是保…

【算法题解】24. 模拟机器人行走

这是一道 中等难度 的题 https://leetcode.cn/problems/walking-robot-simulation/description/ 题目 机器人在一个无限大小的 XY 网格平面上行走,从点 (0, 0) 处开始出发,面向北方。该机器人可以接收以下三种类型的命令 commands : -2 &am…

WPF mvvm框架Stylet使用教程-基础用法

Stylet框架基础用法 安装Nuget包 在“管理Nuget程序包”中搜索Stylet,查看Stylet包支持的net版本,然后选择第二个Stylet.Start包进行安装,该包会自动安装stylet并且生成基本的配置 注意事项:安装时要把需要安装的程序设为启动项…

PyCharm2021安装教程

PyCharm是一种Python IDE,带有一整套可以帮助用户在使用Python语言开发时提高其效率的工具,比如调试、语法高亮、Project管理、代码跳转、智能提示、自动完成、单元测试、版本控制。此外,该IDE提供了一些高级功能,以用于支持Djang…

IntersectionObserver与无限滚动加载

学习链接 IntersectionObserver MDN Api IntersectionObserver API详解 Intersection observer 的概念和用法 过去,要检测一个元素是否可见或者两个元素是否相交并不容易,比如实现图片懒加载、内容无限滚动等功能时,都需要通过​getBound…

[Date structure]时间/空间复杂度

⭐作者介绍:大二本科网络工程专业在读,持续学习Java,努力输出优质文章 ⭐作者主页:逐梦苍穹 ⭐所属专栏:数据结构。数据结构专栏主要是在讲解原理的基础上拿Java实现,有时候有C/C代码。 ⭐如果觉得文章写的…

linux文件类型和根目录结构

目录 一、Linux文件类型 二、Linux系统的目录结构 1. FHS 2. 路径以及工作目录 (1)路径 (2)工作目录 一、Linux文件类型 使用ls -l命令查看到的第一个字符文件类型说明-普通文件类似于Windows的记事本d目录文件类似于Windo…

[NOIP2000 提高组] 进制转换

[NOIP2000 提高组] 进制转换 题目描述 我们可以用这样的方式来表示一个十进制数: 将每个阿拉伯数字乘以一个以该数字所处位置为指数,以 10为底数的幂之和的形式。例如 123 可表示为 10^22*10^13*10^0 这样的形式。 与之相似的,对二进制数来说,也可表示成…

WordPress添加阿里云OSS对象云储存配置教程

背景:随着页面文章增多,内置图片存储拖连网站响应速度,这里对我来说主要是想提升速度 目的:使用第三方云存储作为图片外存储(图床),这样处理可以为服务器节省很多磁盘空间,在网站搬家的时候减少文件迁移的工…

2023TYUT移动应用软件开发程序设计和填空

目录 程序设计 程序设计1:根据要求设计UI,补充相应布局文件,即.xml文件 程序设计2:根据要求,补充Activity.java文件 程序填空 说明: 程序设计 程序设计1:根据要求设计UI,补充相应布局文件,即.xml文件…

安装Nginx——docker安装

使用docker安装Nginx 1.开启docker systemctl start docker docker search nginx[rootlocalhost ~]# systemctl start docker //开启docker [rootlocalhost ~]# docker search nginx //搜素镜像 2. docker pull nginxdocker imagesdocker run -…

【ROS】基于WIFI网络实现图像消息跨机实时传输

【开发背景】 研究机器人目标检测算法的时候,常常需要把推理图像实时展示出来,以供观摩。而ROS1提供的跨机通信方法,要么是配置单Master,要么是配置多Master;一方面配置麻烦,另一方面传输效率低下&#xf…

SQL select总结(基于选课系统)

表详情: 学生表: 学院表: 学生选课记录表: 课程表: 教师表: 查询: 1. 查全表 -- 01. 查询所有学生的所有信息 -- 方法一:会更复杂,进行了两次查询,第一…

基于灵动微SPIN系列开发的水泵方案介绍 以 MM32SPIN040C/MM32SPIN560C为主控

水泵是输送液体或使液体增压的机械。它将原动机的机械能或其他外部能量传送给液体,使液体能量增加,主要用来输送液体包括水、油、酸碱液、乳化液、悬乳液和液态金属等。 水泵以 MM32SPIN040C/MM32SPIN560C为主控。 水泵方案 MCU: MM32SPIN系列 1.输入…

【JavaWeb】后端(Maven+SpringBoot+HTTP+Tomcat)

目录一、Maven1.什么是Maven?2.Maven的作用?3.介绍4.安装5.IDEA集成Maven6.IDEA创建Maven项目7.IDEA导入Maven项目8.依赖配置9.依赖传递10.依赖范围11.生命周期二、SpringBoot1.Spring2.SpringBoot3.SpringBootWeb快速入门二、HTTP1.HTTP-概述2.HTTP-请求协议3.HTTP-响应协议…

机器学习实战:Python基于Logistic逻辑回归进行分类预测

目录1 前言1.1 Logistic回归的介绍1.2 Logistic回归的应用2 iris数据集数据处理2.1 导入函数2.2 导入数据2.3 简单数据查看3 可视化3.1 条形图/散点图3.2 箱线图3.3 三维散点图4 建模预测4.1 二分类预测4.2 多分类预测5 讨论1 前言 1.1 Logistic回归的介绍 逻辑回归&#xff…