【Spark分布式内存计算框架——Spark Streaming】9. 获取偏移量 应用案例:百度搜索风云榜(上)

news/2024/4/26 20:22:54/文章来源:https://blog.csdn.net/CSDNGuoYuying/article/details/129221453

4.4 获取偏移量

当SparkStreaming集成Kafka时,无论是Old Consumer API中Direct方式还是New Consumer API方式获取的数据,每批次的数据封装在KafkaRDD中,其中包含每条数据的元数据信息。
在这里插入图片描述
文档:http://spark.apache.org/docs/2.4.5/streaming-kafka-0-10-integration.html#obtaining-offsets
具体说明如下:
在这里插入图片描述
获取偏移量信息代码如下:
在这里插入图片描述
代码演示获取每批次RDD中对应Kafka分区中数据偏移量信息:
在这里插入图片描述
具体演示代码如下:

import java.util.Date
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010._
/**
* 集成Kafka,实时消费Topic中数据,获取每批次数据对应Topic各个分区数据偏移量
*/
object StreamingKafkaOffset {
def main(args: Array[String]): Unit = {
// TODO: 1. 构建StreamingContext流式上下文实例对象
val ssc: StreamingContext = {
// a. 创建SparkConf对象,设置应用配置信息
val sparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[3]")
// b.创建流式上下文对象, 传递SparkConf对象,TODO: 时间间隔 -> 用于划分流式数据为很多批次Batch
val context = new StreamingContext(sparkConf, Seconds(5))
// c. 返回
context
}
// 2. 读取Kafka Topic中数据
/*
def createDirectStream[K, V](
ssc: StreamingContext,
locationStrategy: LocationStrategy,
consumerStrategy: ConsumerStrategy[K, V]
): InputDStream[ConsumerRecord[K, V]]
*/
// i.位置策略
val locationStrategy: LocationStrategy = LocationStrategies.PreferConsistent
/*
def Subscribe[K, V](
topics: ju.Collection[jl.String],
kafkaParams: ju.Map[String, Object]
): ConsumerStrategy[K, V]
*/
// ii.读取哪些Topic数据
val topics = Array("wc-topic")
// iii.消费Kafka 数据配置参数
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "node1.itcast.cn:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "group_id_streaming_0001",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
// iv.消费数据策略
val consumerStrategy: ConsumerStrategy[String, String] = ConsumerStrategies.Subscribe(
topics, kafkaParams
)
// v.采用新消费者API获取数据,类似于Direct方式
val kafkaDStream: DStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
ssc, locationStrategy, consumerStrategy
)
// TODO:其一、定义数组存储每批次数据对应RDD中各个分区的Topic Partition中偏移量信息
var offsetRanges: Array[OffsetRange] = Array.empty
// 3. 对每批次的数据进行词频统计
val resultDStream: DStream[(String, Int)] = kafkaDStream.transform(kafkaRDD => {
// TODO:其二、直接从Kafka获取的每批次KafkaRDD中获取偏移量信息
offsetRanges = kafkaRDD.asInstanceOf[HasOffsetRanges].offsetRanges
val resultRDD: RDD[(String, Int)] = kafkaRDD
.map(record => record.value()) // 获取Message数据
// 过滤不合格的数据
.filter(line => null != line && line.trim.length > 0)
// 按照分隔符划分单词
.flatMap(line => line.trim.split("\\s+"))
// 转换数据为二元组,表示每个单词出现一次
.map(word => (word, 1))
// 按照单词分组,聚合统计
.reduceByKey((tmp, item) => tmp + item)
resultRDD
})
// 4. 将结果数据输出 -> 将每批次的数据处理以后输出
resultDStream.foreachRDD{ (rdd, time) =>
val batchTime: String = FastDateFormat.getInstance("yyyy/MM/dd HH:mm:ss")
.format(new Date(time.milliseconds))
println("-------------------------------------------")
println(s"Time: $batchTime")
println("-------------------------------------------")
// 先判断RDD是否有数据,有数据在输出
if(!rdd.isEmpty()){
rdd
// 对于结果RDD输出,需要考虑降低分区数目
.coalesce(1)
// 对分区数据操作
.foreachPartition{iter =>iter.foreach(item => println(item))}
}
// TODO: 其三、当DStream进行Output操作完成以后,更新偏移量至外部存储系统(如Zookeeper、Redis等)
for (offsetRange <- offsetRanges) {
println(s"topic: ${offsetRange.topic} partition: ${offsetRange.partition} offsets:${o
ffsetRange.fromOffset} to ${offsetRange.untilOffset}")
}
}
// 5. 对于流式应用来说,需要启动应用
ssc.start()
// 流式应用启动以后,正常情况一直运行(接收数据、处理数据和输出数据),除非人为终止程序或者程序异常停止
ssc.awaitTermination()
// 关闭流式应用(参数一:是否关闭SparkContext,参数二:是否优雅的关闭)
ssc.stop(stopSparkContext = true, stopGracefully = true)
}
}

运行Streaming应用程序,控制台输出结果如下:
在这里插入图片描述

5. 应用案例:百度搜索风云榜

百度搜索风云榜(http://top.baidu.com/)以数亿网民的单日搜索行为作为数据基础,以搜索关键词为统计对象建立权威全面的各类关键词排行榜,以榜单形式向用户呈现基于百度海量搜索数据的排行信息,线上覆盖十余个行业类别,一百多个榜单。
在这里插入图片描述
在【热点榜单】中,可以看到依据搜索关键词实时统计各种维度热点,下图展示【实时热点】。
在这里插入图片描述

5.1 业务场景

仿【百度搜索风云榜】对用户使用百度搜索时日志进行分析:【百度搜索日志实时分析】,主要业务需求如下三个方面:
在这里插入图片描述

  • 业务一:搜索日志数据存储HDFS,实时对日志数据进行ETL提取转换,存储HDFS文件系统;
  • 业务二:百度热搜排行榜Top10,累加统计所有用户搜索词次数,获取Top10搜索词及次数;
  • 业务三:近期时间内热搜Top10,统计最近一段时间范围(比如,最近半个小时或最近2个小时)内用户搜索词次数,获取Top10搜索词及次数;

开发Maven Project中目录结构如下所示:
在这里插入图片描述

5.2 初始化环境

编程实现业务之前,首先编写程序模拟产生用户使用百度搜索产生日志数据和创建工具类StreamingContextUtils提供StreamingContext对象与从Kafka接收数据方法。

创建 Topic
启动Kafka Broker服务,创建Topic【search-log-topic】,命令如下所示:

# 1. 启动Zookeeper 服务
zookeeper-daemon.sh start
# 2. 启动Kafka 服务
kafka-daemon.sh start
# 3. Create Topic
kafka-topics.sh --create --topic search-log-topic \
--partitions 3 --replication-factor 1 --zookeeper node1.itcast.cn:2181/kafka200
# List Topics
kafka-topics.sh --list --zookeeper node1.itcast.cn:2181/kafka200
# Producer
kafka-console-producer.sh --topic search-log-topic --broker-list node1.itcast.cn:9092
# Consumer
kafka-console-consumer.sh --topic search-log-topic \
--bootstrap-server node1.itcast.cn:9092 --from-beginning

模拟日志数据
模拟用户搜索日志数据,字段信息封装到CaseClass样例类【SearchLog】类,代码如下:

package cn.itcast.spark.app.mock
/**
* 用户百度搜索时日志数据封装样例类CaseClass
* <p>
* @param sessionId 会话ID
* @param ip IP地址
* @param datetime 搜索日期时间
* @param keyword 搜索关键词
*/
case class SearchLog(
sessionId: String, //
ip: String, //
datetime: String, //
keyword: String //
) {
override def toString: String = s"$sessionId,$ip,$datetime,$keyword"
}

模拟产生搜索日志数据类【MockSearchLogs】具体代码如下:

package cn.itcast.spark.app.mock
import java.util.{Properties, UUID}
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import scala.util.Random
/**
* 模拟产生用户使用百度搜索引擎时,搜索查询日志数据,包含字段为:
* uid, ip, search_datetime, search_keyword
*/
object MockSearchLogs {
def main(args: Array[String]): Unit = {
// 搜索关键词,直接到百度热搜榜获取即可
val keywords: Array[String] = Array("罗志祥", "谭卓疑", "当当网", "裸海蝶", "张建国")
// 发送Kafka Topic
val props = new Properties()
props.put("bootstrap.servers", "node1.itcast.cn:9092")
props.put("acks", "1")
props.put("retries", "3")
props.put("key.serializer", classOf[StringSerializer].getName)
props.put("value.serializer", classOf[StringSerializer].getName)
val producer = new KafkaProducer[String, String](props)
val random: Random = new Random()
while (true){
// 随机产生一条搜索查询日志
val searchLog: SearchLog = SearchLog(
getUserId(), //
getRandomIp(), //
getCurrentDateTime(), //
keywords(random.nextInt(keywords.length)) //
)
println(searchLog.toString)
Thread.sleep(10 + random.nextInt(100))
val record = new ProducerRecord[String, String]("search-log-topic", searchLog.toString)
producer.send(record)
}
// 关闭连接
producer.close()
}
/**
* 随机生成用户SessionId
*/
def getUserId(): String = {
val uuid: String = UUID.randomUUID().toString
uuid.replaceAll("-", "").substring(16)
}
/**
* 获取当前日期时间,格式为yyyyMMddHHmmssSSS
*/
def getCurrentDateTime(): String = {
val format = FastDateFormat.getInstance("yyyyMMddHHmmssSSS")
val nowDateTime: Long = System.currentTimeMillis()
format.format(nowDateTime)
}
/**
* 获取随机IP地址
*/
def getRandomIp(): String = {
// ip范围
val range: Array[(Int, Int)] = Array(
(607649792,608174079), //36.56.0.0-36.63.255.255
(1038614528,1039007743), //61.232.0.0-61.237.255.255
(1783627776,1784676351), //106.80.0.0-106.95.255.255
(2035023872,2035154943), //121.76.0.0-121.77.255.255
(2078801920,2079064063), //123.232.0.0-123.235.255.255
(-1950089216,-1948778497),//139.196.0.0-139.215.255.255
(-1425539072,-1425014785),//171.8.0.0-171.15.255.255
(-1236271104,-1235419137),//182.80.0.0-182.92.255.255
(-770113536,-768606209),//210.25.0.0-210.47.255.255
(-569376768,-564133889) //222.16.0.0-222.95.255.255
)
// 随机数:IP地址范围下标
val random = new Random()
val index = random.nextInt(10)
val ipNumber: Int = range(index)._1 + random.nextInt(range(index)._2 - range(index)._1)
//println(s"ipNumber = ${ipNumber}")
// 转换Int类型IP地址为IPv4格式
number2IpString(ipNumber)
}
/**
* 将Int类型IPv4地址转换为字符串类型
*/
def number2IpString(ip: Int): String = {
val buffer: Array[Int] = new Array[Int](4)
buffer(0) = (ip >> 24) & 0xff
buffer(1) = (ip >> 16) & 0xff
buffer(2) = (ip >> 8) & 0xff
buffer(3) = ip & 0xff
// 返回IPv4地址
buffer.mkString(".")
}
}

运行应用程序,源源不断产生日志数据,发送至Kafka(同时在控制台打印),截图如下:
在这里插入图片描述

StreamingContextUtils 工具类
所有SparkStreaming应用都需要构建StreamingContext实例对象,并且从采用New Kafka Consumer API消费Kafka数据,编写工具类【StreamingContextUtils】,提供两个方法:

方法一:getStreamingContext,获取StreamingContext实例对象
在这里插入图片描述
方法二:consumerKafka,消费Kafka Topic中数据
在这里插入图片描述
具体代码如下:

package cn.itcast.spark.app
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* 工具类提供:构建流式应用上下文StreamingContext实例对象和从Kafka Topic消费数据
*/
object StreamingContextUtils {
/**
* 获取StreamingContext实例,传递批处理时间间隔
* @param batchInterval 批处理时间间隔,单位为秒
*/
def getStreamingContext(clazz: Class[_], batchInterval: Int): StreamingContext = {
// i. 创建SparkConf对象,设置应用配置信息
val sparkConf = new SparkConf()
.setAppName(clazz.getSimpleName.stripSuffix("$"))
.setMaster("local[3]")
// 设置Kryo序列化
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(Array(classOf[ConsumerRecord[String, String]]))
// ii.创建流式上下文对象, 传递SparkConf对象和时间间隔
val context = new StreamingContext(sparkConf, Seconds(batchInterval))
// iii. 返回
context
}
/**
* 从指定的Kafka Topic中消费数据,默认从最新偏移量(largest)开始消费
* @param ssc StreamingContext实例对象
* @param topicName 消费Kafka中Topic名称
*/
def consumerKafka(ssc: StreamingContext, topicName: String): DStream[ConsumerRecord[String, String]] = {
// i.位置策略
val locationStrategy: LocationStrategy = LocationStrategies.PreferConsistent
// ii.读取哪些Topic数据
val topics = Array(topicName)
// iii.消费Kafka 数据配置参数
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "node1.itcast.cn:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "group_id_streaming_0001",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
// iv.消费数据策略
val consumerStrategy: ConsumerStrategy[String, String] = ConsumerStrategies.Subscribe(
topics, kafkaParams
)
// v.采用新消费者API获取数据,类似于Direct方式
val kafkaDStream: DStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
ssc, locationStrategy, consumerStrategy
)
// vi.返回DStream
kafkaDStream
}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_75719.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux系统介绍及熟悉Linux基础操作

一、什么是Liunx Linux&#xff0c;全称GNU/Linux&#xff0c;是一种免费使用和自由传播的类UNIX操作系统&#xff0c;其内核由林纳斯本纳第克特托瓦兹&#xff08;Linus Benedict Torvalds&#xff09;于1991年10月5日首次发布&#xff0c;它主要受到Minix和Unix思想的启发&am…

【图像处理】数字图像处理基础(分辨率,像素,显示...)

Table of Contents1.数字图像处理基础1.1 图像表示1.1.1 图像成像模型1.1.2 数字图像的表示a.图像采样b.图像灰度的量化c.算比特数1.2 分辨率1.2.1 空间分辨率1.2.2 灰度分辨率1.3 像素间的关系1.3.1 像素邻域a.4邻域b.4对角邻域c.8邻域1.3.2 像素邻接1.3.3 像素连通1.3.4 像素…

“速通“ 老生常谈的HashMap [实现原理源码解读]

&#x1f473;我亲爱的各位大佬们好&#x1f618;&#x1f618;&#x1f618; ♨️本篇文章记录的为 HashMap 实现原理&&源码解读 相关内容&#xff0c;适合在学Java的小白,帮助新手快速上手,也适合复习中&#xff0c;面试中的大佬&#x1f649;&#x1f649;&#x1f…

【Leedcode】栈和队列必备的面试题(第二期)

【Leedcode】栈和队列必备的面试题&#xff08;第二期&#xff09; 文章目录【Leedcode】栈和队列必备的面试题&#xff08;第二期&#xff09;一、题目&#xff08;用两个队列实现栈&#xff09;二、思路图解1.定义两个队列2.初始化两个队列3.往两个队列中放入数据4.两个队列出…

对账平台设计

背景 随着公司业务的蓬勃发展&#xff0c;交易履约清结算业务的复杂性也在不断的增高&#xff0c;资金以及各种数据的一致性和准确性也变得越发重要。 以交易链路为例&#xff0c;存在着如下一些潜在的不一致场景&#xff1a; 订单支付成功了&#xff0c;但是订单状态却还是“…

JVM方法区详解有这篇就够了

1、方法区在哪里《Java虚拟机规范》中明确说明&#xff1a;“尽管所有的方法区在逻辑上是属于堆的一部分&#xff0c;但一些简单的实现可能不会选择去进行垃圾收集或者进行压缩。”但对于HotSpotJVM而言&#xff0c;方法区还有一个别名叫做Non-Heap&#xff08;非堆&#xff09…

机械键盘不只有轴体的区别!键帽高度也有些学问

键盘键帽的学问有很多&#xff0c;上篇文章中&#xff0c;笔者和大家聊了键帽的材质和耐油污的问题。 除此之外&#xff0c;键帽的高度和字符的印刷方式也有不同&#xff0c;对于多数机械键盘来说&#xff0c;会发现每一列键帽的倾斜角度都略有不同&#xff0c;使用起来可以减少…

Android TV UI开发常用知识

导入依赖 Google官方为Android TV的UI开发提供了一系列的规范组件&#xff0c;在leanback的依赖库中&#xff0c;这里介绍一些常用的组件&#xff0c;使用前需要导入leanback库。 implementation androidx.leanback:leanback:$version常用的页面 这些Fragment有设计好的样式&…

3.ffmpeg命令行环境搭建、ffmpeg命令行初步了解

在上章,我们讲过: ffmpeg.exe: 主要用于转码或者剪切的应用程序, 也可以从url/现场音频/视频源抓取输入源ffplay.exe: 主要用于播放视频的应用程序,该应用程序源码是开源的,我们后面章节会去源码分析ffprobe.exe: 主要用于分析视频码流的应用程序, 可以获取媒体文件的详细信息,…

【Azure 架构师学习笔记】-Azure Data Factory (4)-触发器详解-事件触发器

本文属于【Azure 架构师学习笔记】系列。 本文属于【Azure Data Factory】系列。 接上文【Azure 架构师学习笔记】-Azure Data Factory (3)-触发器详解-翻转窗口 前言 事件触发指的是存储事件&#xff0c;所以在新版的ADF 中&#xff0c;已经明确了是“存储事件”&#xff0c;…

【C语言】结构体进阶

一、结构体 1. 结构体的声明 &#xff08;1&#xff09; 结构的基础知识 结构是一些值的集合&#xff0c;这些值称为成员变量。结构的每个成员可以是不同类型的变量。&#xff08;2&#xff09;结构的声明 struct tag {member-list; }variable-list;例如描述一个学生&#x…

【SPSS】两配对样本T检验分析详细操作教程(附案例实战)

&#x1f935;‍♂️ 个人主页&#xff1a;艾派森的个人主页 ✍&#x1f3fb;作者简介&#xff1a;Python学习者 &#x1f40b; 希望大家多多支持&#xff0c;我们一起进步&#xff01;&#x1f604; 如果文章对你有帮助的话&#xff0c; 欢迎评论 &#x1f4ac;点赞&#x1f4…

RocketMQ的一些使用理解

1.RocketMQ的生产者生产负载策略&#xff08;3种&#xff09; (1)SelectMessageQueueByHash &#xff08;一致性hash&#xff09; (2)SelectMessageQueueByMachineRoom &#xff08;机器随机&#xff09; (3)SelectMessageQueueByRandom &#xff08;随机&#xff09; 第1种一…

VBA之正则表达式(41)-- 快速标记两个星号之后的字符

实例需求&#xff1a;工作表中的数据保存在A列~G列&#xff0c;现需要识别D列中包含超过两个星号的内容&#xff0c;并将第3个星号及其之后的字符设置为红色字体&#xff0c;如图所示。 示例代码如下。 Sub Demo1()Dim objRegExp As ObjectDim objMatch As ObjectDim strMatch…

08 自研or借力(上):集成Gin替换已有核心

我们的框架和这些顶级的框架相比&#xff0c;差了什么呢&#xff1f;如何才能快速地把我们的框架可用性&#xff0c;和这些框架提升到同一个级别&#xff1f;我们做这个框架除了演示每个实现细节&#xff0c;它的优势是什么呢&#xff1f; 不妨带着这些问题&#xff0c;把我们…

ClickHouse的架构与基本概念

一、ClickHouse的定义 ClickHouse是一个完全的列式分布式数据库管理系统(DBMS)&#xff0c;允许在运行时创建表和数据库&#xff0c;加载数据和运行查询&#xff0c;而无需重新配置和重新启动服务器&#xff0c;支持线性扩展&#xff0c;简单方便&#xff0c;高可靠性&#xf…

C++学习笔记-内存空间

考虑这样一种情况&#xff0c;当我们使用相同的名称&#xff0c;叫Zara的两个人在同一个班级。我们需要明确区分它们将不得不使用一些额外的信息&#xff0c;如他们的名字&#xff0c;如他们生活在不同的区域或母亲或父亲的名字等等。 同样的情况也出现在C应用程序中。例如&am…

iphone系统崩溃数据能恢复吗?教你三招方法

最近有些苹果用户反应自己手机的屏幕无法滑动&#xff0c;桌面上APP也无法点开&#xff0c;想要关机重启下试试&#xff0c;可是&#xff0c;连关机都关不了&#xff0c;甚至连Siri都罢工了。苹果手机系统崩溃&#xff0c;出现黑屏、白屏、无限重启之类的故障&#xff0c;导致手…

大数据处理学习笔记1.6 Scala数据结构

文章目录零、本讲学习目标一、数组 (Array)&#xff08;一&#xff09;定长数组1、数组定义&#xff08;1&#xff09;定义数组时初始化数据&#xff08;2&#xff09;定义时指定数组长度&#xff0c;后赋值2、数组遍历&#xff08;1&#xff09;传统for循环方式&#xff08;2&…

Databend 开源周报 第 82 期

Databend 是一款现代云数仓。专为弹性和高效设计&#xff0c;为您的大规模分析需求保驾护航。自由且开源。即刻体验云服务&#xff1a;https://app.databend.com 。Whats New探索 Databend 本周新进展&#xff0c;遇到更贴近你心意的 Databend 。Features & Improvements :…