网站日志实时分析之Flink处理实时热门和PVUV统计

news/2024/5/14 4:11:45/文章来源:https://blog.csdn.net/u013411339/article/details/107925298

点击上方蓝色字体,选择“设为星标

回复”资源“获取更多资源

大数据技术与架构

点击右侧关注,大数据开发领域最强公众号!

暴走大数据

点击右侧关注,暴走大数据!

实时热门统计

操作步骤:

  • 先从Kafka读取消费数据

  • 使用map算子对数据进行预处理

  • 过滤数据,只留住pv数据

  • 使用timewindow,每隔10秒创建一个20秒的window

  • 然后将窗口自定义预聚合,并且兹定于窗口函数,按指定输入输出case操作数据

  • 上面操作时候返回的是DataStream,那么就根据timestampEnd进行keyby

  • 使用底层API操作,对每个时间窗口内的数据进行排序,取top

package com.ongbo.hotAnalysisimport java.sql.Timestamp
import java.util.Propertiesimport org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.util.Collectorimport scala.collection.mutable.ListBuffer/*
*定义输入数据的样例类*/
case class UserBehavior(userId: Long, itemId: Long, cateGoryId: Int,behavior: String, timestamp: Long)
//定义窗口聚合结果样例类
case class ItemViewCount(itemId: Long, windowEnd: Long, count: Long)object HotItems {def main(args: Array[String]): Unit = {//1:创建执行环境val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)//设置为事件事件env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)//2:读取数据/*kafka源*/val properties = new Properties()properties.setProperty("bootstrap.servers","114.116.219.197:5008,114.116.220.98:5008,114.116.199.154:5008")properties.setProperty("group.id","web-consumer-group")properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")properties.setProperty("auto.offset.reset","latest")val dataStream = env.addSource(new FlinkKafkaConsumer[String]("weblog", new SimpleStringSchema(),properties))
//    val dataStream = env.readTextFile("/Users/ongbo/Maven/bin/UserBehaviorAnalysis/HotItemAnalysis/src/main/resources/UserBehavior.csv").map(data =>{System.out.println("data:"+data)val dataArray = data.split(",")
//        if(dataArray(0).equals("ij"))UserBehavior(dataArray(0).trim.toLong, dataArray(1).trim.toLong, dataArray(2).trim.toInt, dataArray(3).trim, dataArray(4).trim.toLong)}).assignAscendingTimestamps(_.timestamp * 1000L)//3:transform处理数据val processStream = dataStream//筛选出埋点pv数据.filter(_.behavior.equals("pv"))//先对itemID进行分组.keyBy(_.itemId)//然后设置timeWindow,size为1小时,步长为5分钟的滑动窗口.timeWindow(Time.seconds(20), Time.seconds(10))//窗口聚合,按道理说应该不用窗口聚合,但是因为达到的数据可能时间顺序会扰乱,所以聚合后要keyby.aggregate(new CountAgg(), new WindowResult()).keyBy(_.windowEnd)      //按照窗口分组.process(new TopNHotItems(10))//sink:输出数据processStream.print("processStream::")
//    dataStream.print()//执行env.execute("hot Items Job")}
}/*自定义预聚合函数*/
class CountAgg() extends AggregateFunction[UserBehavior, Long, Long]{//累加器初始值override def createAccumulator(): Long = 0//每来一次就加一override def add(in: UserBehavior, acc: Long): Long = acc+1//override def getResult(acc: Long): Long = accoverride def merge(acc: Long, acc1: Long): Long = acc + acc1
}//自定义窗口函数,输出ItemViewCount
class WindowResult() extends WindowFunction[Long,ItemViewCount, Long, TimeWindow]{override def apply(key: Long, window: TimeWindow, input: Iterable[Long], out: Collector[ItemViewCount]): Unit =  {out.collect(ItemViewCount(key,window.getEnd,input.iterator.next()))}
}//自定义处理函数
class TopNHotItems(topsize: Int) extends KeyedProcessFunction[Long, ItemViewCount, String] {private var itemState: ListState[ItemViewCount] = _override def open(parameters: Configuration): Unit = {itemState = getRuntimeContext.getListState(new ListStateDescriptor[ItemViewCount]("item-state", classOf[ItemViewCount]))}override def processElement(value: ItemViewCount, ctx: KeyedProcessFunction[Long, ItemViewCount, String]#Context, out: Collector[String]): Unit = {//把每条数据存入状态列表itemState.add(value)//注册一个定时器ctx.timerService().registerEventTimeTimer(value.windowEnd + 1)}//定时器触发时,对所有的数据排序,并输出结果override def onTimer(timestamp: Long, ctx: _root_.org.apache.flink.streaming.api.functions.KeyedProcessFunction[Long, _root_.com.ongbo.hotAnalysis.ItemViewCount, _root_.scala.Predef.String]#OnTimerContext, out: _root_.org.apache.flink.util.Collector[_root_.scala.Predef.String]): Unit = {//将所有state中的数据取出,放到一个list Buffer中val allItems: ListBuffer[ItemViewCount] = new ListBuffer()import scala.collection.JavaConversions._for(item <- itemState.get()){allItems += item}//按照点计量count大小排序,sortBy默认是升序,并且取前三个val sortedItems = allItems.sortBy(_.count)(Ordering.Long.reverse).take(topsize)//清空状态itemState.clear()//格式化输出排名结果val result : StringBuilder = new StringBuilderresult.append("时间:").append(new Timestamp(timestamp - 1)).append("\n")//输出每一个商品信息for(i<- sortedItems.indices){val currentItem = sortedItems(i)result.append("No").append(i+1).append(":").append("  商品ID:").append(currentItem.itemId).append("  浏览量:").append(currentItem.count).append("\n")}result.append("============================\n")//控制输出频率Thread.sleep(1000)out.collect(result.toString())}
}
/*自定义预聚合函数计算平均数*/
class AverageAgg() extends AggregateFunction[UserBehavior, (Long,Int), Double]{override def createAccumulator(): (Long, Int) = (0L,0)override def add(in: UserBehavior, acc: (Long, Int)): (Long, Int) = (acc._1+in.timestamp, acc._2+1)override def getResult(acc: (Long, Int)): Double = acc._1 /acc._2override def merge(acc: (Long, Int), acc1: (Long, Int)): (Long, Int) = (acc._1+acc1._1, acc._2+acc1._2)
}

实时PV统计

这里按道理应该也要从Kafka读取数据的,但是这里暂时先从本地读,因为当时本地网络的原因,暂时不在服务器上创建数据,而直接用本地的。
这个很简单,直接创建滚动窗口,从而能够计算一个小时的PV,然后每隔一个小时更新一次。
package com.ongbo.NetWorkFlow_Analysisimport org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time/*
*定义输入数据的样例类*/
case class UserBehavior(userId: Long, itemId: Long, cateGoryId: Int,behavior: String, timestamp: Long)object PageVies {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)env.setParallelism(1)//用相对路径定义数据集val resource = getClass.getResource("/UserBehavior.csv")val dataStream = env.readTextFile(resource.getPath).map(data =>{val dataArray = data.split(",")UserBehavior(dataArray(0).trim.toLong,dataArray(1).trim.toLong,dataArray(2).trim.toInt,dataArray(3).trim,dataArray(4).trim.toLong)}).assignAscendingTimestamps(_.timestamp * 1000L).filter(_.behavior.equals("pv")).map(data => ("pv", 1)).keyBy(_._1).timeWindow(Time.hours(1)).sum(1)dataStream.print("pv count")env.execute("PV")}
}

实时UV统计:布隆过滤器

我们统计UV需要注意,很多重复的user会占用到内存,所以我们采用布隆过滤器优化,减少Flink缓存user从而降低性能。而且将数据count保存在Redis,可以给后端使用的。
package com.ongbo.NetWorkFlow_Analysisimport com.ongbo.NetWorkFlow_Analysis.UniqueView.getClass
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import redis.clients.jedis.Jedisobject UvWithBloom {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)env.setParallelism(1)//用相对路径定义数据集val resource = getClass.getResource("/UserBehavior.csv")val dataStream = env.readTextFile("/Users/ongbo/Maven/bin/UserBehaviorAnalysis/NetWorkFlowAnalysis/src/main/resources/UserBehavior.csv").map(data =>{val dataArray = data.split(",")UserBehavior(dataArray(0).trim.toLong,dataArray(1).trim.toLong,dataArray(2).trim.toInt,dataArray(3).trim,dataArray(4).trim.toLong)}).assignAscendingTimestamps(_.timestamp * 1000L).filter(_.behavior.equals("pv")).map( data => ("dummyKey",data.userId)).keyBy(_._1).timeWindow(Time.hours(1)).trigger(new MyTrigger()).process(new UvCountWithBloom())dataStream.print()env.execute()}
}//自定义窗口触发器
class MyTrigger() extends Trigger[(String,Long),TimeWindow]{override def onElement(element: (String, Long), timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {//每来一条数据就直接触发窗口操作,并清空所有状态TriggerResult.FIRE_AND_PURGE}override def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUEoverride def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUEoverride def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = {}
}
class UvCountWithBloom() extends ProcessWindowFunction[(String,Long),UvCount,String, TimeWindow] {// 定义Redis连接lazy val jedis = new Jedis("114.116.219.97",5000)//29位,也就是64Mlazy val bloom = new Bloom(1 << 29)override def process(key: String, context: Context, elements: Iterable[(String, Long)], out: Collector[UvCount]): Unit = {//位图的存储方式 , key是windowwen,value是位图val storeKey = context.window.getEnd.toStringvar count = 0L//把每个窗口的count值,也存入Redis表里,存放内容位(windowEnd,uccount),所以要先从Redis中读取if(jedis.hget("count",storeKey) != null){
//      System.out.println(v)count = jedis.hget("count",storeKey).toLong}//用布隆过滤器判断当前用户是否已经存在val userId = elements.last._2.toStringval offset = bloom.hash(userId, 61)//定义一个标志位,判断Redis位图中有没有这一位val isExist = jedis.getbit(storeKey, offset)if(!isExist){//如果不存在位图对应位置变成1,count+1jedis.setbit(storeKey,offset,true)jedis.hset("count",storeKey,(count+1).toString)out.collect(UvCount(storeKey.toLong,count+1))}else{out.collect(UvCount(storeKey.toLong,count))}}
}class Bloom(size: Long) extends Serializable{//位图大小private val cap = if(size>0) size else 1 << 27//定义Hash函数def hash(value: String, seed: Int) : Long = {var result:Long = 0Lfor(i <- 0 until value.length){result = result * seed + value.charAt(i)}result & (cap-1)}
}

欢迎点赞+收藏+转发朋友圈素质三连

文章不错?点个【在看】吧! ????

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_766089.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

nodejs爬虫-通过抓取搜狗微信网站获取微信文章信息

展示地址 : http://39.108.162.233/wxlist github地址 : https://github.com/zzwwjjdj319/wechat_crawler 一 总量统计 二 按日期搜索 三 文章列表 四 微信文章链接

将ASP.NET网站部署到服务器IIS上

ASP.NET编写的网站程序&#xff0c;在网站编写完成所有流程都测试通过后&#xff0c;需要将网站发布到IIS的Web服务器上&#xff0c;此文将介绍发布的流程以及IIS相关设置过程&#xff0c;帮助读者了解网站发布的流程。 一、首先在Visual Studio中选择网站项目&#xff0c;然后…

WEB服务器之一:创建一个网站

一、实验准备网站的名字为&#xff1a;www.itat.com这里我们要用两台虚拟机&#xff1a;1、 Nanjing&#xff08;DNS服务器、WEB服务器&#xff09; IP&#xff1a;192.168.11.732、 shanghai&#xff08;做测试&#xff09;IP&#xff1a;192.168.11.72 DNS&#xff1a;192.1…

Asp.net MVC 2 网站轻松实现多语言支持

本文短地址&#xff1a;http://zdd.me/aspnetmultilingual 现在的网站大多数都支持多语言&#xff0c;为不同语言的用户访问网站提供方便。我在前几天用asp.net mvc 2 做了一个网站同时提供了中文和英文的支持&#xff0c;在这里将我的网站的多语言的实现方式与各位网友分享一下…

怪异而美丽的网站

怪异而美丽的网站 记录跳闸 记录跳闸是一个漂亮的贝尔兄弟的实验品。 邀请您来解决刮伤用鼠标的滚动轮记录的一系列难题。 聪明的游戏和一个可爱的界面&#xff0c;使这个有趣的在线Flash游戏&#xff0c;因为它是怪异。 声汉堡 这里是最有创造力的和非常规的闪光&#xff0c;我…

Windows安装node环境,部署静态网站

1、进入官网&#xff0c;下载nodejs https://nodejs.org/zh-cn/ 2、安装nodejs win10怎么安装nodejs和npm https://jingyan.baidu.com/article/d169e1860e6d8c436611d89a.html 3、查看版本信息 C:\Users\admin>node -v v12.2.0 4、部署静态网页 在当前页面打开命令行&#x…

创建自定义主机头的网站集

当我们在一个SharePoint Web应用程序中创建新网站集时&#xff0c;虽然我们可以指定网站集的路径&#xff0c;但是网站集的主机头&#xff0c;似乎必须使用Web应用程序所定义的主机头。比如&#xff0c;当在“http://sp2010”这个Web应用程序中创建一个新网站集时&#xff0c;网…

使用django建站系列之登录页面(一)

先秀一下我的登录页&#xff1a; 废话没有&#xff0c;直接上操作步骤。 1&#xff09;建立工程 #django-admin.py startproject MyWeb 2)建立应用 #cd MyWeb/ #django-admin.py startapp app51cto 3)修改settings.py添加app vim MyWeb/settings.py INSTALLED_APPS ( django.c…

WordPress网站制作静态化插件:Cos-Html-Cache介绍

现在很多做网站的新手朋友、甚至网站建设公司都会采用现成的网站管理系统来建站&#xff0c;国内的PageAdmin、Discuz、Ecshop等网站管理系统都已经很成熟&#xff0c;大量的被用于网站建设中&#xff0c;之前小熊优化的小编分别介绍了这些网站管理系统&#xff0c;并且几乎都默…

网站经验谈:网站被降权怎么办 如何恢复权重

虽然百度一再强调百度本身并没有权重一说&#xff0c;但是对于各位站长朋友来说&#xff0c;权重还是衡量一个网站的重要指标之一。一个网站的权重高&#xff0c;代表流量高&#xff0c;收录好&#xff0c;关键词排名也相对较好。那么&#xff0c;这么重要的指标&#xff0c;一…

如何判断一个网站是否被墙

http://www.cnblogs.com/wangkangluo1/archive/2012/04/15/2447921.html 大家都知道GFW 平日作恶多端&#xff0c;一旦有网站不能访问&#xff0c;很多人都把矛头直指它了…. 虽然一般都是它干的&#xff0c;但实际上也不排除一些人别有用心…. 那么下面就来简单判断一下吧… 方…

学用MVC4做网站五:5.2我的文章

文章管理这一块&#xff0c;按照左侧导航这一块向下写 到了“我的文章”这一块。 先还是打开【ArticleController】&#xff0c;添加public ActionResult UserOwn(int id 0, int page 1) 这里的id是指栏目id&#xff0c;可以显示自己发布的指定栏目的文章&#xff0c;默认为0…

企业网站优化切忌心浮气躁

为什么80%的码农都做不了架构师&#xff1f;>>> 互联网时代&#xff0c;大部分的企业都有自己的网站&#xff0c;随着网站管理员的更新换代&#xff0c;企业网站的优化成了遗留问题。很多企业网站域名年龄都有几年以上了&#xff0c;但是从seo的角度来说还是那么烂…

网站用户分析知识总结

本文是《数据蛙三个月强化课》的第七篇总结教程&#xff0c;如果想要了解数据蛙社群&#xff0c;可以阅读给DataFrog社群同学的学习建议。温馨提示&#xff1a;如果您已经熟悉网站用户分析知识,大可不必再看这篇文章&#xff0c;或是只挑选部分文章 一&#xff1a;用户分析概…

浙大海洋法律与治理研究中心网站

近日&#xff0c;为浙江大学海洋法律与治理研究中心开发官方网站正式上线。2012年8月21日&#xff0c;浙江大学海洋法律与治理研究中心成立。中心将汇聚浙江大学跨学科优势资源&#xff0c;适应国家战略需求和促进海洋法制保障&#xff0c;致力打造一支国内顶尖、结构合理的海洋…

curl网站开发指南

转载 http://www.ruanyifeng.com/blog/2011/09/curl.html 我一向以为&#xff0c;curl只是一个编程用的函数库。最近才发现&#xff0c;这个命令本身&#xff0c;就是一个无比有用的网站开发工具&#xff0c;请看我整理的它的用法。curl网站开发指南阮一峰 整理curl是一种命令行…

ASP.NET MVC5 网站开发实践(二) Member区域 - 修改及删除文章

上次做了显示文章列表&#xff0c;再实现修改和删除文章这部分内容就结束了&#xff0c;这次内容比较简单&#xff0c;由于做过了添加文章&#xff0c;修改文章非常类似&#xff0c;就是多了一个TryUpdateModel部分更新模型数据。 目录&#xff1a; ASP.NET MVC5 网站开发实践 …

ASP.NET MVC5 网站开发实践(二) Member区域–管理列表、回复及删除

本来想接着上次把这篇写完的&#xff0c;没想到后来工作的一些事落下了&#xff0c;放假了赶紧补上。 目录&#xff1a; ASP.NET MVC5 网站开发实践 - 概述 ASP.NET MVC5 网站开发实践(一) - 项目框架 ASP.NET MVC5 网站开发实践(一) - 框架&#xff08;续&#xff09; 模型、数…

网站创建自定义百度地图

第一步&#xff1a;百度搜索“创建地图-百度地图生成器”或者直接点击链接&#xff1a;http://api.map.baidu.com/lbsapi/creatmap/&#xff0c;打开页面 如下图所示&#xff1a; 第二步&#xff1a;输入你要查找的地址名称&#xff0c;点击查找&#xff0c;如下图所示&#xf…

【网站国际化必备】Asp.Net MVC 集成Paypal(贝宝)快速结账 支付接口 ,附源码demo...

【网站国际化必备】Asp.Net MVC 集成Paypal&#xff08;贝宝&#xff09;快速结账 支付接口 &#xff0c;附源码demo 原文:【网站国际化必备】Asp.Net MVC 集成Paypal&#xff08;贝宝&#xff09;快速结账 支付接口 &#xff0c;附源码demo开篇先给大家讲段历史故事&#xff0…