系列文章目录
系列文章目录
spark第一章:环境安装
spark第二章:sparkcore实例
spark第三章:工程化代码
spark第四章:SparkSQL基本操作
spark第五章:SparkSQL实例
spark第六章:SparkStreaming基本操作
spark第七章:SparkStreaming实例
文章目录
- 系列文章目录
- 系列文章目录
- 前言
- 一、环境准备
- 1.pox修改
- 2.文件准备
- 3.数据准备
- 二、项目案例
- 1.需求一:广告黑名单
- 2.需求二:广告点击量实时统计
- 3.需求三:最近一小时广告点击量
- 总结
前言
今天我们来完成spark的最后一次实验案例.
一、环境准备
1.pox修改
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.2.3</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>3.2.3</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.47</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.12</artifactId><version>3.2.3</version></dependency><dependency><groupId>org.apache.hive</groupId><artifactId>hive-exec</artifactId><version>1.2.1</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.2.3</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>3.2.3</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-core</artifactId><version>2.14.2</version></dependency><!-- https://mvnrepository.com/artifact/com.alibaba/druid --><dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId><version>1.1.10</version></dependency>
这是完整的pom代码,查缺补漏吧.
2.文件准备
为了不要和之前的项目混淆,我重建了一个包
3.数据准备
我们通过代码发送数据到kafka来生产数据,然后在从另一端消费数据进行分析.
每条数据有五个字段,其中包括.
时间(用时间戳代替)
地区
城市
用户
广告.
MockData.scala
package com.atguigu.bigdata.spark.streaming.expimport org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}import java.util.Properties
import scala.collection.mutable.ListBuffer
import scala.util.Randomobject MockData {def main(args: Array[String]): Unit = {//生成模拟数据//格式 : timestamp area city userid adid//含义 : 时间戳 区域 城市 用户 广告// 创建配置对象val prop = new Properties()// 添加配置prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092")prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")val producer = new KafkaProducer[String, String](prop)while (true){mockdata().foreach((data: String) =>{val record = new ProducerRecord[String,String]("atguigu",data)producer.send(record)})Thread.sleep(3000)}}def mockdata(): ListBuffer[String] ={val list: ListBuffer[String] = ListBuffer[String]()val areaList: ListBuffer[String] = ListBuffer[String]("华北", "华东", "华南")val cityList: ListBuffer[String] = ListBuffer[String]("北京", "上海", "深圳")for (_ <-1 to 30){val area: String = areaList(new Random().nextInt(3))val city: String = cityList(new Random().nextInt(3))val userid: Int = new Random().nextInt(6)+1val adid: Int = new Random().nextInt(6)+1list.append(s"${System.currentTimeMillis()} $area $city $userid $adid")}list}
}
此处用的是之前创建的atguigu主题,如果删除了,在创建一下.
为了测试生产的数据,我们先简单消费一下,直接打印一下.
req1.scala
package com.atguigu.bigdata.spark.streaming.expimport org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}object req1 {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStream")val ssc = new StreamingContext(sparkConf,Seconds(3))val kafkaPara: Map[String, Object] = Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",ConsumerConfig.GROUP_ID_CONFIG -> "atguigu","key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer","value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer")val kafkaDataDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Set("atguigu"), kafkaPara))kafkaDataDS.map(_.value()).print()ssc.start()ssc.awaitTermination()}
}
在集群中开启zookpeer和kafka,然后进行数据消费
出现时间戳后开始生产数据.
当开始打印数据后,就代码我们整个流程没有问题,接下来我们对数据进行处理.
二、项目案例
1.需求一:广告黑名单
实现实时的动态黑名单机制:将每天对某个广告点击超过 100 次的用户拉黑。
注:黑名单保存到 MySQL 中。
MySQL建表
我直接新建了一个spark-streaming数据库
建表语句
存放黑名单用户的表
CREATE TABLE black_list (userid CHAR(1) PRIMARY KEY);
存放单日各用户点击每个广告的次数
CREATE TABLE user_ad_count (
dt varchar(255),
userid CHAR (1),
adid CHAR (1),
count BIGINT,
PRIMARY KEY (dt, userid, adid)
);
封装MySQL工具类
JDBCUtil.scala
package com.atguigu.bigdata.spark.streaming.exp.Utilimport com.alibaba.druid.pool.DruidDataSourceFactoryimport java.sql.{Connection, PreparedStatement}
import java.util.Properties
import javax.sql.DataSourceobject JDBCUtil { //初始化连接池var dataSource: DataSource = init()//初始化连接池方法def init(): DataSource = {val properties = new Properties()properties.setProperty("driverClassName", "com.mysql.jdbc.Driver")properties.setProperty("url", "jdbc:mysql://hadoop102:3306/spark-streaming?useUnicode=true&characterEncoding=UTF-8&useSSL=false")properties.setProperty("username", "root")properties.setProperty("password", "000000")properties.setProperty("maxActive", "50")DruidDataSourceFactory.createDataSource(properties)}//获取 MySQL 连接def getConnection: Connection = {dataSource.getConnection}
}
需求实现
req1_BlackList.scala
package com.atguigu.bigdata.spark.streaming.expimport com.atguigu.bigdata.spark.streaming.exp.Util.JDBCUtil
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}import java.sql.{Connection, PreparedStatement, ResultSet}
import java.text.SimpleDateFormat
import java.util.Date
import scala.collection.mutable.ListBufferobject req1_BlackList {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStream")val ssc = new StreamingContext(sparkConf,Seconds(3))val kafkaPara: Map[String, Object] = Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",ConsumerConfig.GROUP_ID_CONFIG -> "atguigu","key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer","value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer")val kafkaDataDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Set("atguigu"), kafkaPara))val adClickData: DStream[AdClickData] = kafkaDataDS.map((kafkaData: ConsumerRecord[String, String]) => {val data: String = kafkaData.value()val datas: Array[String] = data.split(" ")AdClickData(datas(0), datas(1), datas(2), datas(3), datas(4))})//获取统计后的数据val ds: DStream[((String, String, String), Int)] = adClickData.transform((rdd: RDD[AdClickData]) => {val blackList: ListBuffer[String] = ListBuffer[String]()val conn: Connection = JDBCUtil.getConnectionval pstat: PreparedStatement = conn.prepareStatement("select userid from black_list")val rs: ResultSet = pstat.executeQuery()while (rs.next()) {blackList.append(rs.getString(1))}rs.close()pstat.close()conn.close()//判断用户是否在黑名单中val filterRDD: RDD[AdClickData] = rdd.filter((data: AdClickData) => {!blackList.contains(data.user)})filterRDD.map((data: AdClickData) => {val sdf = new SimpleDateFormat("yyyy-MM-dd")val day: String = sdf.format(new Date(data.ts.toLong))val user: String = data.userval ad: String = data.ad((day, user, ad), 1)}).reduceByKey((_: Int) + (_: Int))})ds.foreachRDD((rdd: RDD[((String, String, String), Int)]) =>{rdd.foreach{case ((day, user, ad), count)=>{println((day, user, ad), count)if (count>=30){//如果统计数量超过30,将用户拉近黑名单val conn: Connection = JDBCUtil.getConnectionval pstat: PreparedStatement = conn.prepareStatement("""|insert into black_list (userid) values (?)|on DUPLICATE KEY|UPDATE userid=?|""".stripMargin)pstat.setString(1,user)pstat.setString(2,user)pstat.executeUpdate()pstat.close()conn.close()}else{//如果没有超过,点击数量更新val conn: Connection = JDBCUtil.getConnectionval pstat: PreparedStatement = conn.prepareStatement("""| select *| from user_ad_count| where dt =? and userid=? and adid=?|""".stripMargin)pstat.setString(1,day)pstat.setString(2,user)pstat.setString(3,ad)val rs: ResultSet = pstat.executeQuery()if (rs.next()){//如果存在数据,那么更新val pstat1: PreparedStatement = conn.prepareStatement("""| update user_ad_count| set count=count+?| where dt =? and userid=? and adid=?|""".stripMargin)pstat1.setInt(1,count)pstat1.setString(2,day)pstat1.setString(3,user)pstat1.setString(4,ad)pstat1.executeUpdate()pstat1.close()//更新后如果超过,拉进黑名单val pstat2: PreparedStatement = conn.prepareStatement("""| select *| from user_ad_count| where dt =? and userid=? and adid=? and count>=30|""".stripMargin)pstat2.setString(1,day)pstat2.setString(2,user)pstat2.setString(3,ad)val rs2: ResultSet = pstat2.executeQuery()if (rs2.next()){val pstat3: PreparedStatement = conn.prepareStatement("""|insert into black_list (userid) values (?)|on DUPLICATE KEY|UPDATE userid=?|""".stripMargin)pstat3.setString(1,user)pstat3.setString(2,user)pstat3.executeUpdate()pstat3.close()}rs2.close()pstat2.close()}else{//如果不存在数据,那么新增val pstat1: PreparedStatement = conn.prepareStatement("""| insert into user_ad_count (dt,userid,adid,count) values (?,?,?,?)|""".stripMargin)pstat1.setString(1,day)pstat1.setString(2,user)pstat1.setString(3,ad)pstat1.setInt(4,count)pstat1.executeUpdate()pstat1.close()}rs.close()pstat.close()conn.close()}}}})ssc.start()ssc.awaitTermination()}//广告点击数据case class AdClickData(ts:String,area:String,city:String,user:String,ad:String)}
然后测试一下,还是先消费后生产,先将kafka积压的数据都消费掉,在重新生产.
如果没有开启生产就出现了数据,说明之前kafka有数据积压,我们将数据库的内容清空后,就可以开始生产数据了.
之后刷新数据库,可以发现数据开始不断变化,直到最后一个字段,点击数量超过30,被拉入黑名单.
代码优化
修改工具类
JDBCUtil.scala
package com.atguigu.bigdata.spark.streaming.exp.Utilimport com.alibaba.druid.pool.DruidDataSourceFactoryimport java.sql.{Connection, PreparedStatement}
import java.util.Properties
import javax.sql.DataSourceobject JDBCUtil { //初始化连接池var dataSource: DataSource = init()//初始化连接池方法def init(): DataSource = {val properties = new Properties()properties.setProperty("driverClassName", "com.mysql.jdbc.Driver")properties.setProperty("url", "jdbc:mysql://hadoop102:3306/spark-streaming?useUnicode=true&characterEncoding=UTF-8&useSSL=false")properties.setProperty("username", "root")properties.setProperty("password", "000000")properties.setProperty("maxActive", "50")DruidDataSourceFactory.createDataSource(properties)}//获取 MySQL 连接def getConnection: Connection = {dataSource.getConnection}//执行 SQL 语句,单条数据插入def executeUpdate(connection: Connection, sql: String, params: Array[Any]): Int = {var rtn = 0var pstmt: PreparedStatement = nulltry {connection.setAutoCommit(false)pstmt = connection.prepareStatement(sql)if (params != null && params.length > 0) {for (i <- params.indices) {pstmt.setObject(i + 1, params(i))}}rtn = pstmt.executeUpdate()connection.commit()pstmt.close()} catch {case e: Exception => e.printStackTrace()}rtn}//判断一条数据是否存在def isExist(connection: Connection, sql: String, params: Array[Any]): Boolean ={var flag: Boolean = falsevar pstmt: PreparedStatement = nulltry {pstmt = connection.prepareStatement(sql)for (i <- params.indices) {pstmt.setObject(i + 1, params(i))}flag = pstmt.executeQuery().next()pstmt.close()} catch {case e: Exception => e.printStackTrace()}flag}
}
req1_BlackList1.scala
package com.atguigu.bigdata.spark.streaming.expimport com.atguigu.bigdata.spark.streaming.exp.Util.JDBCUtil
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}import java.sql.{Connection, PreparedStatement, ResultSet}
import java.text.SimpleDateFormat
import java.util.Date
import scala.collection.mutable.ListBufferobject req1_BlackList1 {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStream")val ssc = new StreamingContext(sparkConf,Seconds(3))val kafkaPara: Map[String, Object] = Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",ConsumerConfig.GROUP_ID_CONFIG -> "atguigu","key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer","value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer")val kafkaDataDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Set("atguigu"), kafkaPara))val adClickData: DStream[AdClickData] = kafkaDataDS.map((kafkaData: ConsumerRecord[String, String]) => {val data: String = kafkaData.value()val datas: Array[String] = data.split(" ")AdClickData(datas(0), datas(1), datas(2), datas(3), datas(4))})//获取统计后的数据val ds: DStream[((String, String, String), Int)] = adClickData.transform((rdd: RDD[AdClickData]) => {val blackList: ListBuffer[String] = ListBuffer[String]()val conn: Connection = JDBCUtil.getConnectionval pstat: PreparedStatement = conn.prepareStatement("select userid from black_list")val rs: ResultSet = pstat.executeQuery()while (rs.next()) {blackList.append(rs.getString(1))}rs.close()pstat.close()conn.close()//判断用户是否在黑名单中val filterRDD: RDD[AdClickData] = rdd.filter((data: AdClickData) => {!blackList.contains(data.user)})filterRDD.map((data: AdClickData) => {val sdf = new SimpleDateFormat("yyyy-MM-dd")val day: String = sdf.format(new Date(data.ts.toLong))val user: String = data.userval ad: String = data.ad((day, user, ad), 1)}).reduceByKey((_: Int) + (_: Int))})ds.foreachRDD((rdd: RDD[((String, String, String), Int)]) =>{//一个分区创建一个连接对象
// rdd.foreachPartition(
// iter=>{
// val conn: Connection = JDBCUtil.getConnection
// iter.foreach{
// case ((day, user, ad), count)=>{
//
// }
// }
// conn.close()
// }
// )rdd.foreach{case ((day, user, ad), count)=>{println((day, user, ad), count)if (count>=30){//如果统计数量超过30,将用户拉近黑名单val conn: Connection = JDBCUtil.getConnectionval sql: String ="""| insert into black_list (userid) values (?)| on DUPLICATE KEY| UPDATE userid=?|""".stripMarginJDBCUtil.executeUpdate(conn,sql,Array(user,user))conn.close()}else{//如果没有超过,点击数量更新val conn: Connection = JDBCUtil.getConnectionval sql0: String ="""| select *| from user_ad_count| where dt =? and userid=? and adid=?|""".stripMarginval flg: Boolean = JDBCUtil.isExist(conn, sql0, Array(day, user, ad))if (flg){//如果存在数据,那么更新val sql1: String ="""| update user_ad_count| set count=count+?| where dt =? and userid=? and adid=?|""".stripMarginJDBCUtil.executeUpdate(conn,sql1,Array(count,day,user,ad))//更新后如果超过,拉进黑名单val sql2: String ="""| select *| from user_ad_count| where dt =? and userid=? and adid=? and count>=30|""".stripMarginval flg1: Boolean = JDBCUtil.isExist(conn, sql2, Array(day, user, ad))if (flg1){val sql3: String ="""| insert into black_list (userid) values (?)| on DUPLICATE KEY| UPDATE userid=?|""".stripMarginJDBCUtil.executeUpdate(conn,sql3,Array(user,user))}}else{//如果不存在数据,那么新增val sql4: String ="""|insert into user_ad_count (dt,userid,adid,count) values (?,?,?,?)|""".stripMarginJDBCUtil.executeUpdate(conn,sql4,Array(day,user,ad,count))}conn.close()//更新后如果超过,拉进黑名单}}}})ssc.start()ssc.awaitTermination()}//广告点击数据case class AdClickData(ts:String,area:String,city:String,user:String,ad:String)
}
效果和之前一样,就不演示了.
2.需求二:广告点击量实时统计
描述:实时统计每天各地区各城市各广告的点击总流量,并将其存入 MySQL。
MySQL建表
CREATE TABLE area_city_ad_count (
dt VARCHAR(255),
area VARCHAR(255),
city VARCHAR(255),
adid VARCHAR(255),
count BIGINT,
PRIMARY KEY (dt,area,city,adid)
);
req2.scala
package com.atguigu.bigdata.spark.streaming.expimport com.atguigu.bigdata.spark.streaming.exp.Util.JDBCUtil
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}import java.sql.{Connection, PreparedStatement}
import java.text.SimpleDateFormat
import java.util.Dateobject req2 {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStream")val ssc = new StreamingContext(sparkConf,Seconds(3))val kafkaPara: Map[String, Object] = Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",ConsumerConfig.GROUP_ID_CONFIG -> "atguigu","key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer","value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer")val kafkaDataDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Set("atguigu"), kafkaPara))val adClickData: DStream[AdClickData] = kafkaDataDS.map((kafkaData: ConsumerRecord[String, String]) => {val data: String = kafkaData.value()val datas: Array[String] = data.split(" ")AdClickData(datas(0), datas(1), datas(2), datas(3), datas(4))})val reduceDS: DStream[((String, String, String, String), Int)] = adClickData.map((data: AdClickData) => {val sdf = new SimpleDateFormat("yyyy-MM-dd")val day: String = sdf.format(new Date(data.ts.toLong))val area: String = data.areaval city: String = data.cityval ad: String = data.ad((day, area, city, ad), 1)}).reduceByKey((_: Int) + (_: Int))reduceDS.foreachRDD(rdd=>{rdd.foreachPartition(iter=>{val conn: Connection = JDBCUtil.getConnectionval pstat: PreparedStatement = conn.prepareStatement("""| insert into area_city_ad_count (dt ,area,city,adid,count)| values (?,?,?,?,?)| on DUPLICATE KEY| UPDATE count=count+?|""".stripMargin)iter.foreach{case ((day, area, city, ad), sum)=>{pstat.setString(1,day)pstat.setString(2,area)pstat.setString(3,city)pstat.setString(4,ad)pstat.setInt(5,sum)pstat.setInt(6,sum)pstat.executeUpdate()}}pstat.close()conn.close()})})ssc.start()ssc.awaitTermination()}//广告点击数据case class AdClickData(ts:String,area:String,city:String,user:String,ad:String)
}
还是先消费,后生产,然后查看数据库.
3.需求三:最近一小时广告点击量
一个小时太长了,咱们就做1分钟的.10秒钟统计一次.
req3.scala
package com.atguigu.bigdata.spark.streaming.expimport com.atguigu.bigdata.spark.streaming.exp.Util.JDBCUtil
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}import java.sql.{Connection, PreparedStatement}
import java.text.SimpleDateFormat
import java.util.Dateobject req3 {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStream")val ssc = new StreamingContext(sparkConf,Seconds(5))val kafkaPara: Map[String, Object] = Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",ConsumerConfig.GROUP_ID_CONFIG -> "atguigu","key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer","value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer")val kafkaDataDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Set("atguigu"), kafkaPara))val adClickData: DStream[AdClickData] = kafkaDataDS.map((kafkaData: ConsumerRecord[String, String]) => {val data: String = kafkaData.value()val datas: Array[String] = data.split(" ")AdClickData(datas(0), datas(1), datas(2), datas(3), datas(4))})//最近一分钟,每10秒计算一次val reduceDS: DStream[(Long, Int)] = adClickData.map(data => {val ts: Long = data.ts.toLongval newTs: Long = ts / 10000 * 10000(newTs, 1)}).reduceByKeyAndWindow((_: Int) + (_: Int), Seconds(60), Seconds(10))reduceDS.print()ssc.start()ssc.awaitTermination()}//广告点击数据case class AdClickData(ts:String,area:String,city:String,user:String,ad:String)
}
还是先消费,后生产.
总结
Spark的学习就告一段落了,下一步估计要啃Flink了