spark第七章:SparkStreaming实例

news/2024/5/16 15:26:58/文章来源:https://blog.csdn.net/weixin_50835854/article/details/130038834

系列文章目录

系列文章目录

spark第一章:环境安装
spark第二章:sparkcore实例
spark第三章:工程化代码
spark第四章:SparkSQL基本操作
spark第五章:SparkSQL实例
spark第六章:SparkStreaming基本操作
spark第七章:SparkStreaming实例


文章目录

  • 系列文章目录
  • 系列文章目录
  • 前言
  • 一、环境准备
    • 1.pox修改
    • 2.文件准备
    • 3.数据准备
  • 二、项目案例
    • 1.需求一:广告黑名单
    • 2.需求二:广告点击量实时统计
    • 3.需求三:最近一小时广告点击量
  • 总结


前言

今天我们来完成spark的最后一次实验案例.


一、环境准备

1.pox修改

<dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.2.3</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>3.2.3</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.47</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.12</artifactId><version>3.2.3</version></dependency><dependency><groupId>org.apache.hive</groupId><artifactId>hive-exec</artifactId><version>1.2.1</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.2.3</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>3.2.3</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-core</artifactId><version>2.14.2</version></dependency><!-- https://mvnrepository.com/artifact/com.alibaba/druid --><dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId><version>1.1.10</version></dependency>

这是完整的pom代码,查缺补漏吧.

2.文件准备

在这里插入图片描述
为了不要和之前的项目混淆,我重建了一个包

3.数据准备

我们通过代码发送数据到kafka来生产数据,然后在从另一端消费数据进行分析.
每条数据有五个字段,其中包括.
时间(用时间戳代替)
地区
城市
用户
广告.
MockData.scala

package com.atguigu.bigdata.spark.streaming.expimport org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}import java.util.Properties
import scala.collection.mutable.ListBuffer
import scala.util.Randomobject MockData {def main(args: Array[String]): Unit = {//生成模拟数据//格式 : timestamp area city userid adid//含义 : 时间戳 区域 城市 用户 广告// 创建配置对象val prop = new Properties()// 添加配置prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092")prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")val producer = new KafkaProducer[String, String](prop)while (true){mockdata().foreach((data: String) =>{val record = new ProducerRecord[String,String]("atguigu",data)producer.send(record)})Thread.sleep(3000)}}def mockdata(): ListBuffer[String] ={val list: ListBuffer[String] = ListBuffer[String]()val areaList: ListBuffer[String] = ListBuffer[String]("华北", "华东", "华南")val cityList: ListBuffer[String] = ListBuffer[String]("北京", "上海", "深圳")for (_ <-1 to 30){val area: String = areaList(new Random().nextInt(3))val city: String = cityList(new Random().nextInt(3))val userid: Int = new Random().nextInt(6)+1val adid: Int = new Random().nextInt(6)+1list.append(s"${System.currentTimeMillis()} $area $city $userid $adid")}list}
}

此处用的是之前创建的atguigu主题,如果删除了,在创建一下.

为了测试生产的数据,我们先简单消费一下,直接打印一下.
req1.scala

package com.atguigu.bigdata.spark.streaming.expimport org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}object req1 {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStream")val ssc = new StreamingContext(sparkConf,Seconds(3))val kafkaPara: Map[String, Object] = Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",ConsumerConfig.GROUP_ID_CONFIG -> "atguigu","key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer","value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer")val kafkaDataDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Set("atguigu"), kafkaPara))kafkaDataDS.map(_.value()).print()ssc.start()ssc.awaitTermination()}
}

在集群中开启zookpeer和kafka,然后进行数据消费
在这里插入图片描述
出现时间戳后开始生产数据.
在这里插入图片描述
当开始打印数据后,就代码我们整个流程没有问题,接下来我们对数据进行处理.

二、项目案例

1.需求一:广告黑名单

实现实时的动态黑名单机制:将每天对某个广告点击超过 100 次的用户拉黑。
注:黑名单保存到 MySQL 中。
MySQL建表
在这里插入图片描述
我直接新建了一个spark-streaming数据库
建表语句
存放黑名单用户的表
CREATE TABLE black_list (userid CHAR(1) PRIMARY KEY);

存放单日各用户点击每个广告的次数
CREATE TABLE user_ad_count (
dt varchar(255),
userid CHAR (1),
adid CHAR (1),
count BIGINT,
PRIMARY KEY (dt, userid, adid)
);
封装MySQL工具类
JDBCUtil.scala

package com.atguigu.bigdata.spark.streaming.exp.Utilimport com.alibaba.druid.pool.DruidDataSourceFactoryimport java.sql.{Connection, PreparedStatement}
import java.util.Properties
import javax.sql.DataSourceobject JDBCUtil { //初始化连接池var dataSource: DataSource = init()//初始化连接池方法def init(): DataSource = {val properties = new Properties()properties.setProperty("driverClassName", "com.mysql.jdbc.Driver")properties.setProperty("url", "jdbc:mysql://hadoop102:3306/spark-streaming?useUnicode=true&characterEncoding=UTF-8&useSSL=false")properties.setProperty("username", "root")properties.setProperty("password", "000000")properties.setProperty("maxActive", "50")DruidDataSourceFactory.createDataSource(properties)}//获取 MySQL 连接def getConnection: Connection = {dataSource.getConnection}
}

需求实现
req1_BlackList.scala

package com.atguigu.bigdata.spark.streaming.expimport com.atguigu.bigdata.spark.streaming.exp.Util.JDBCUtil
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}import java.sql.{Connection, PreparedStatement, ResultSet}
import java.text.SimpleDateFormat
import java.util.Date
import scala.collection.mutable.ListBufferobject req1_BlackList {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStream")val ssc = new StreamingContext(sparkConf,Seconds(3))val kafkaPara: Map[String, Object] = Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",ConsumerConfig.GROUP_ID_CONFIG -> "atguigu","key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer","value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer")val kafkaDataDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Set("atguigu"), kafkaPara))val adClickData: DStream[AdClickData] = kafkaDataDS.map((kafkaData: ConsumerRecord[String, String]) => {val data: String = kafkaData.value()val datas: Array[String] = data.split(" ")AdClickData(datas(0), datas(1), datas(2), datas(3), datas(4))})//获取统计后的数据val ds: DStream[((String, String, String), Int)] = adClickData.transform((rdd: RDD[AdClickData]) => {val blackList: ListBuffer[String] = ListBuffer[String]()val conn: Connection = JDBCUtil.getConnectionval pstat: PreparedStatement = conn.prepareStatement("select userid from black_list")val rs: ResultSet = pstat.executeQuery()while (rs.next()) {blackList.append(rs.getString(1))}rs.close()pstat.close()conn.close()//判断用户是否在黑名单中val filterRDD: RDD[AdClickData] = rdd.filter((data: AdClickData) => {!blackList.contains(data.user)})filterRDD.map((data: AdClickData) => {val sdf = new SimpleDateFormat("yyyy-MM-dd")val day: String = sdf.format(new Date(data.ts.toLong))val user: String = data.userval ad: String = data.ad((day, user, ad), 1)}).reduceByKey((_: Int) + (_: Int))})ds.foreachRDD((rdd: RDD[((String, String, String), Int)]) =>{rdd.foreach{case ((day, user, ad), count)=>{println((day, user, ad), count)if (count>=30){//如果统计数量超过30,将用户拉近黑名单val conn: Connection = JDBCUtil.getConnectionval pstat: PreparedStatement = conn.prepareStatement("""|insert into black_list (userid) values (?)|on DUPLICATE KEY|UPDATE userid=?|""".stripMargin)pstat.setString(1,user)pstat.setString(2,user)pstat.executeUpdate()pstat.close()conn.close()}else{//如果没有超过,点击数量更新val conn: Connection = JDBCUtil.getConnectionval pstat: PreparedStatement = conn.prepareStatement("""| select *| from user_ad_count| where dt =? and userid=? and adid=?|""".stripMargin)pstat.setString(1,day)pstat.setString(2,user)pstat.setString(3,ad)val rs: ResultSet = pstat.executeQuery()if (rs.next()){//如果存在数据,那么更新val pstat1: PreparedStatement = conn.prepareStatement("""| update user_ad_count| set count=count+?| where dt =? and userid=? and adid=?|""".stripMargin)pstat1.setInt(1,count)pstat1.setString(2,day)pstat1.setString(3,user)pstat1.setString(4,ad)pstat1.executeUpdate()pstat1.close()//更新后如果超过,拉进黑名单val pstat2: PreparedStatement = conn.prepareStatement("""| select *| from user_ad_count| where dt =? and userid=? and adid=? and count>=30|""".stripMargin)pstat2.setString(1,day)pstat2.setString(2,user)pstat2.setString(3,ad)val rs2: ResultSet = pstat2.executeQuery()if (rs2.next()){val pstat3: PreparedStatement = conn.prepareStatement("""|insert into black_list (userid) values (?)|on DUPLICATE KEY|UPDATE userid=?|""".stripMargin)pstat3.setString(1,user)pstat3.setString(2,user)pstat3.executeUpdate()pstat3.close()}rs2.close()pstat2.close()}else{//如果不存在数据,那么新增val pstat1: PreparedStatement = conn.prepareStatement("""| insert into user_ad_count (dt,userid,adid,count) values (?,?,?,?)|""".stripMargin)pstat1.setString(1,day)pstat1.setString(2,user)pstat1.setString(3,ad)pstat1.setInt(4,count)pstat1.executeUpdate()pstat1.close()}rs.close()pstat.close()conn.close()}}}})ssc.start()ssc.awaitTermination()}//广告点击数据case class AdClickData(ts:String,area:String,city:String,user:String,ad:String)}

然后测试一下,还是先消费后生产,先将kafka积压的数据都消费掉,在重新生产.
在这里插入图片描述
如果没有开启生产就出现了数据,说明之前kafka有数据积压,我们将数据库的内容清空后,就可以开始生产数据了.
在这里插入图片描述
之后刷新数据库,可以发现数据开始不断变化,直到最后一个字段,点击数量超过30,被拉入黑名单.
在这里插入图片描述
在这里插入图片描述
代码优化
修改工具类
JDBCUtil.scala

package com.atguigu.bigdata.spark.streaming.exp.Utilimport com.alibaba.druid.pool.DruidDataSourceFactoryimport java.sql.{Connection, PreparedStatement}
import java.util.Properties
import javax.sql.DataSourceobject JDBCUtil { //初始化连接池var dataSource: DataSource = init()//初始化连接池方法def init(): DataSource = {val properties = new Properties()properties.setProperty("driverClassName", "com.mysql.jdbc.Driver")properties.setProperty("url", "jdbc:mysql://hadoop102:3306/spark-streaming?useUnicode=true&characterEncoding=UTF-8&useSSL=false")properties.setProperty("username", "root")properties.setProperty("password", "000000")properties.setProperty("maxActive", "50")DruidDataSourceFactory.createDataSource(properties)}//获取 MySQL 连接def getConnection: Connection = {dataSource.getConnection}//执行 SQL 语句,单条数据插入def executeUpdate(connection: Connection, sql: String, params: Array[Any]): Int = {var rtn = 0var pstmt: PreparedStatement = nulltry {connection.setAutoCommit(false)pstmt = connection.prepareStatement(sql)if (params != null && params.length > 0) {for (i <- params.indices) {pstmt.setObject(i + 1, params(i))}}rtn = pstmt.executeUpdate()connection.commit()pstmt.close()} catch {case e: Exception => e.printStackTrace()}rtn}//判断一条数据是否存在def isExist(connection: Connection, sql: String, params: Array[Any]): Boolean ={var flag: Boolean = falsevar pstmt: PreparedStatement = nulltry {pstmt = connection.prepareStatement(sql)for (i <- params.indices) {pstmt.setObject(i + 1, params(i))}flag = pstmt.executeQuery().next()pstmt.close()} catch {case e: Exception => e.printStackTrace()}flag}
}

req1_BlackList1.scala

package com.atguigu.bigdata.spark.streaming.expimport com.atguigu.bigdata.spark.streaming.exp.Util.JDBCUtil
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}import java.sql.{Connection, PreparedStatement, ResultSet}
import java.text.SimpleDateFormat
import java.util.Date
import scala.collection.mutable.ListBufferobject req1_BlackList1 {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStream")val ssc = new StreamingContext(sparkConf,Seconds(3))val kafkaPara: Map[String, Object] = Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",ConsumerConfig.GROUP_ID_CONFIG -> "atguigu","key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer","value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer")val kafkaDataDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Set("atguigu"), kafkaPara))val adClickData: DStream[AdClickData] = kafkaDataDS.map((kafkaData: ConsumerRecord[String, String]) => {val data: String = kafkaData.value()val datas: Array[String] = data.split(" ")AdClickData(datas(0), datas(1), datas(2), datas(3), datas(4))})//获取统计后的数据val ds: DStream[((String, String, String), Int)] = adClickData.transform((rdd: RDD[AdClickData]) => {val blackList: ListBuffer[String] = ListBuffer[String]()val conn: Connection = JDBCUtil.getConnectionval pstat: PreparedStatement = conn.prepareStatement("select userid from black_list")val rs: ResultSet = pstat.executeQuery()while (rs.next()) {blackList.append(rs.getString(1))}rs.close()pstat.close()conn.close()//判断用户是否在黑名单中val filterRDD: RDD[AdClickData] = rdd.filter((data: AdClickData) => {!blackList.contains(data.user)})filterRDD.map((data: AdClickData) => {val sdf = new SimpleDateFormat("yyyy-MM-dd")val day: String = sdf.format(new Date(data.ts.toLong))val user: String = data.userval ad: String = data.ad((day, user, ad), 1)}).reduceByKey((_: Int) + (_: Int))})ds.foreachRDD((rdd: RDD[((String, String, String), Int)]) =>{//一个分区创建一个连接对象
//        rdd.foreachPartition(
//          iter=>{
//            val conn: Connection = JDBCUtil.getConnection
//            iter.foreach{
//              case ((day, user, ad), count)=>{
//
//              }
//            }
//            conn.close()
//          }
//        )rdd.foreach{case ((day, user, ad), count)=>{println((day, user, ad), count)if (count>=30){//如果统计数量超过30,将用户拉近黑名单val conn: Connection = JDBCUtil.getConnectionval sql: String ="""|  insert into black_list (userid) values (?)|  on DUPLICATE KEY|  UPDATE userid=?|""".stripMarginJDBCUtil.executeUpdate(conn,sql,Array(user,user))conn.close()}else{//如果没有超过,点击数量更新val conn: Connection = JDBCUtil.getConnectionval sql0: String ="""| select *| from user_ad_count| where dt =? and userid=? and adid=?|""".stripMarginval flg: Boolean = JDBCUtil.isExist(conn, sql0, Array(day, user, ad))if (flg){//如果存在数据,那么更新val sql1: String ="""|   update user_ad_count|   set count=count+?|   where dt =? and userid=? and adid=?|""".stripMarginJDBCUtil.executeUpdate(conn,sql1,Array(count,day,user,ad))//更新后如果超过,拉进黑名单val sql2: String ="""| select *| from user_ad_count| where dt =? and userid=? and adid=? and count>=30|""".stripMarginval flg1: Boolean = JDBCUtil.isExist(conn, sql2, Array(day, user, ad))if (flg1){val sql3: String ="""|  insert into black_list (userid) values (?)|  on DUPLICATE KEY|  UPDATE userid=?|""".stripMarginJDBCUtil.executeUpdate(conn,sql3,Array(user,user))}}else{//如果不存在数据,那么新增val sql4: String ="""|insert into user_ad_count (dt,userid,adid,count) values (?,?,?,?)|""".stripMarginJDBCUtil.executeUpdate(conn,sql4,Array(day,user,ad,count))}conn.close()//更新后如果超过,拉进黑名单}}}})ssc.start()ssc.awaitTermination()}//广告点击数据case class AdClickData(ts:String,area:String,city:String,user:String,ad:String)
}

效果和之前一样,就不演示了.

2.需求二:广告点击量实时统计

描述:实时统计每天各地区各城市各广告的点击总流量,并将其存入 MySQL。
MySQL建表
CREATE TABLE area_city_ad_count (
dt VARCHAR(255),
area VARCHAR(255),
city VARCHAR(255),
adid VARCHAR(255),
count BIGINT,
PRIMARY KEY (dt,area,city,adid)
);
req2.scala

package com.atguigu.bigdata.spark.streaming.expimport com.atguigu.bigdata.spark.streaming.exp.Util.JDBCUtil
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}import java.sql.{Connection, PreparedStatement}
import java.text.SimpleDateFormat
import java.util.Dateobject req2 {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStream")val ssc = new StreamingContext(sparkConf,Seconds(3))val kafkaPara: Map[String, Object] = Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",ConsumerConfig.GROUP_ID_CONFIG -> "atguigu","key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer","value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer")val kafkaDataDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Set("atguigu"), kafkaPara))val adClickData: DStream[AdClickData] = kafkaDataDS.map((kafkaData: ConsumerRecord[String, String]) => {val data: String = kafkaData.value()val datas: Array[String] = data.split(" ")AdClickData(datas(0), datas(1), datas(2), datas(3), datas(4))})val reduceDS: DStream[((String, String, String, String), Int)] = adClickData.map((data: AdClickData) => {val sdf = new SimpleDateFormat("yyyy-MM-dd")val day: String = sdf.format(new Date(data.ts.toLong))val area: String = data.areaval city: String = data.cityval ad: String = data.ad((day, area, city, ad), 1)}).reduceByKey((_: Int) + (_: Int))reduceDS.foreachRDD(rdd=>{rdd.foreachPartition(iter=>{val conn: Connection = JDBCUtil.getConnectionval pstat: PreparedStatement = conn.prepareStatement("""|  insert into area_city_ad_count (dt ,area,city,adid,count)|  values (?,?,?,?,?)|  on DUPLICATE KEY|  UPDATE count=count+?|""".stripMargin)iter.foreach{case ((day, area, city, ad), sum)=>{pstat.setString(1,day)pstat.setString(2,area)pstat.setString(3,city)pstat.setString(4,ad)pstat.setInt(5,sum)pstat.setInt(6,sum)pstat.executeUpdate()}}pstat.close()conn.close()})})ssc.start()ssc.awaitTermination()}//广告点击数据case class AdClickData(ts:String,area:String,city:String,user:String,ad:String)
}

还是先消费,后生产,然后查看数据库.
在这里插入图片描述

3.需求三:最近一小时广告点击量

一个小时太长了,咱们就做1分钟的.10秒钟统计一次.
req3.scala

package com.atguigu.bigdata.spark.streaming.expimport com.atguigu.bigdata.spark.streaming.exp.Util.JDBCUtil
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}import java.sql.{Connection, PreparedStatement}
import java.text.SimpleDateFormat
import java.util.Dateobject req3 {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStream")val ssc = new StreamingContext(sparkConf,Seconds(5))val kafkaPara: Map[String, Object] = Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",ConsumerConfig.GROUP_ID_CONFIG -> "atguigu","key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer","value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer")val kafkaDataDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Set("atguigu"), kafkaPara))val adClickData: DStream[AdClickData] = kafkaDataDS.map((kafkaData: ConsumerRecord[String, String]) => {val data: String = kafkaData.value()val datas: Array[String] = data.split(" ")AdClickData(datas(0), datas(1), datas(2), datas(3), datas(4))})//最近一分钟,每10秒计算一次val reduceDS: DStream[(Long, Int)] = adClickData.map(data => {val ts: Long = data.ts.toLongval newTs: Long = ts / 10000 * 10000(newTs, 1)}).reduceByKeyAndWindow((_: Int) + (_: Int), Seconds(60), Seconds(10))reduceDS.print()ssc.start()ssc.awaitTermination()}//广告点击数据case class AdClickData(ts:String,area:String,city:String,user:String,ad:String)
}

还是先消费,后生产.
在这里插入图片描述

总结

Spark的学习就告一段落了,下一步估计要啃Flink了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_283805.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

javaEE+jsp820高校校园设备报修系统dzkfa9程序mysql

1&#xff0e;系统登录&#xff1a;系统登录是用户访问系统的路口&#xff0c;设计了系统登录界面&#xff0c;包括用户名、密码和验证码&#xff0c;然后对登录进来的用户判断身份信息&#xff0c;判断是管理员用户还是普通用户。 2&#xff0e;系统用户管理&#xff1a;不管是…

从C出发 13 --- 多维数组

数组的本质是数据集合&#xff0c;我们在程序里面操作数组&#xff0c;就是在操作数据 数组中的元素能不能是其他程序元素? 这个说法只是表示数组里面的元素是int 类型 而这个数组的类型是 int [5] 由元素类型和数组大小共同决定 int a[10] {0}; // a的类型 : int[10]…

文件小注意

目录 0 前言 1 标识 O_CREAT O_APPEND 2 ftruncate与truncate 3 O_DIRECT与O_DSYNC、O_SYNC 4 open与fopen 5 关于mmap 0 前言 文件操作在软件开发中是很常见的一件事。虽然与它相关的工作看起来不怎么起眼&#xff0c;无非就是通过通过open、read、write、close几个调用…

【MySQL】主从复制过程(实践)

1.安装好2台数据库服务器的系统&#xff0c;然后安装好MySQL软件 [rootjd-mysql ~]# mysql --version mysql Ver 14.14 Distrib 5.7.40, for linux-glibc2.12 (x86_64) using EditLine wrapper[rootjd-mysql-2 ~]# mysql --version …

第03章_用户与权限管理

第03章_用户与权限管理 1. 用户管理 ​ MysQL用户可以分为普通用户和root用户。root用户是超级管理员&#xff0c;拥有所有权限&#xff0c;包括创建用户 、删除用户和修改用户的密码等管理权限;普通用户只拥有被授予的各种权限。 MysQL提供了许多语句用来管理用户账号&#…

认识C++字符串复合类型

目录 前言&#xff1a; 1.数组 1.1C的数组 1.2C数组初始化 *2.字符串 2.1字符串与数组 2.2字符数组的存储 2.3字符串输入cin 2.4cin.getline() 2.5cin.get() 2.6函数重载例子 2.7混合输入数字和字符串 前言&#xff1a; C与C语言在内容上有些是一样的&#xff0c;也…

Zooker配置与测试

目录 1.介绍 2.配置 1.配置准备 2.配置修改 3.测试 1.介绍 2.配置 1.配置准备 zookeeper官网:Apache ZooKeeper &#xff08;1&#xff09;安装 JDK &#xff08;2&#xff09;拷贝 apache-zookeeper-3.5.7-bin.tar.gz 安装包到software目录下 &#xff08;3&#xff09;解…

mysql常用的基础命令

通过学习mysql命令提高数据处理和工作效率 基础命令 1.登录MySQL mysql -u root -p 2.查看当前系统所有数据库 show databases; 3.切换数据库 use 数据库名称 4.查看数据库下的所有表 show tables; 5.查看表结构&#xff1b; desc 表名&#xff1b; 6.创建数据库 crea…

CentOS7的下载、安装和配置(详细图解)

CentOS7安装包的下载 Centos7的安装包可以去官网&#xff08;https://www.centos.org/&#xff09;下载&#xff0c;但速度比较慢。 也可以用搜索引擎搜索国内镜像站点的安装包文件与官网同步&#xff0c;下载的速度非常快。 CentOS7软件安装包的分享 百度网盘分享&#xff…

python函数详解_INDEX函数

一. 函数的作用 函数就是将一段具有独立功能的代码块 整合到一个整体并命名&#xff0c;在需要的位置调用这个名称即可完成对应的需求。 函数在开发过程中&#xff0c;可以更高效的实现代码重用。 二. 函数的使用步骤 1. 定义函数 def 函数名(参数):代码1代码2...... 复制 …

usb_cam相机录制rosbag

文章目录运行环境&#xff1a;1.1 usb_cam连接&#xff1a;1.2 usb-cam启动1.2 查看相机话题名称2.1 rosbag录制2.2 播放rosbag运行环境&#xff1a; ubuntu20.04 noetic 杰瑞微通usb_cam&#xff08;分辨率640x480&#xff09; 宏基暗影骑士笔记本 1.1 usb_cam连接&#xff…

Golang每日一练(leetDay0030)

目录 88. 合并两个有序数组 Merge Sorted Array &#x1f31f; 89. 格雷编码 Gray Code &#x1f31f;&#x1f31f; 90. 子集 II Subsets II &#x1f31f;&#x1f31f; &#x1f31f; 每日一练刷题专栏 &#x1f31f; Golang每日一练 专栏 Python每日一练 专栏 C/…

Linux复习 / 进程控制QA梳理

文章目录前言Q&A进程终止Q&#xff1a;exit和_exit的区别&#xff1f;Q&#xff1a;内核是如何终止进程的&#xff1f;进程等待Q&#xff1a;为什么要等待子进程&#xff1f;Q&#xff1a;如何等待子进程&#xff08;wait/waitpid的区别&#xff09;&#xff1f;进程替换Q&…

TCP协议工作机制二(滑动窗口,流量控制,拥塞控制,延时应答,捎带应答等)

目录 滑动窗口 流量控制 拥塞控制 延时应答 捎带应答 面向字节流 异常情况 UDP和TCP对比 滑动窗口 由于TCP是可靠传输,有确认应答,超时重传,连接管理等机制,发送消息时需要等待接收方返回的ack.因此会消耗大量等待ack的时间,我们引入滑动窗口的机制来竭尽可能提高TCP的…

【Linux】环境变量进程虚拟地址空间

环境变量&进程虚拟地址空间环境变量一些常见的环境变量-PATH修改环境变量进程虚拟地址空间环境变量 使用ls man pwd cd echo 这些指令时&#xff0c;不需要加./但是要运行我们自己的可执行程序就需要加上&#xff0c;本质上两个都是指令&#xff0c;为什么执行方法不同&am…

python学习之http客户端和服务端

Part1前言python非常简洁&#xff0c;非常适合写小功能以及测试接口。本文主要记录用pyhon实现一个简单的http客户端和服务端。Part2http客户端这里采用request库来实现。示例如下import requests import json url http://127.0.0.1:81/test?key1123&key2456headers {Au…

代码不熟没关系,让AI替你写

程序员早已不是一个陌生的群体&#xff0c;但程序、代码相对普通人而言&#xff0c;看着还是比较深奥难懂&#xff0c;但自从有了ChatGPT&#xff0c;不少对此有兴趣的外行人士&#xff0c;也能轻松写出代码了&#xff0c;比如让ChatGPT写一个贪吃蛇游戏&#xff0c;按它给出的…

【如何使用Arduino控制WS2812B可单独寻址的LED】

【如何使用Arduino控制WS2812B可单独寻址的LED】 1. 概述2. WS2812B 发光二极管的工作原理3. Arduino 和 WS2812B LED 示例3.1 例 13.2 例 24. 使用 WS2812B LED 的交互式 LED 咖啡桌4.1 原理图4.2 源代码在本教程中,我们将学习如何使用 Arduino 控制可单独寻址的 RGB LED 或 …

ROS实践05 订阅方实现Python

文章目录运行环境&#xff1a;思路&#xff1a;1.1 Python代码实现1&#xff09;工作空间创建和编译2&#xff09;功能包创建和添加依赖3&#xff09;新建.py文件4&#xff09;修改CMakeList5&#xff09;运行节点运行环境&#xff1a; ubuntu20.04 noetic 宏基暗影骑士笔记本…

Java每日一练(20230409)

目录 1. 多数元素 &#x1f31f; 2. 反转链表 II &#x1f31f;&#x1f31f; 3. 日期之间的遍历 &#x1f31f; &#x1f31f; 每日一练刷题专栏 &#x1f31f; Golang每日一练 专栏 Python每日一练 专栏 C/C每日一练 专栏 Java每日一练 专栏 1. 多数元素 给定一个…