衡量网站流量一个最简单的指标,就是网站的页面浏览量(Page View,PV)。用户每次打开一个页面便记录1次PV,多次打开同一页面则浏览量累计。
一般来说,PV与来访者的数量成正比,但是PV并不直接决定页面的真实来访者数量,如同一个来访者通过不断的刷新页面,也可以制造出非常高的PV。接下来我们就用咱们之前学习的Flink算子来实现PV的统计
1.假设我们已经采集到数据UserBehavior,并将数据放在工程目录input下面,截图如下:
文件格式如下:
自己可以随便写一个csv文件就行,用excel打开就是每个填充一个,用notepad++打开就是以,分开
2.创建bean:
package com.mischen.it.entity;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;/*** @ClassName UserBehavior* @Description DOTO* @Author mischen* @Date 2021/6/30 0030 7:50* @Version 1.0**/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class UserBehavior {private Long userId;private Long itemId;private Integer categoryId;private String behavior;private Long timestamp;
}
3.书写main方法,代码如下:
package com.mischen.it;import com.mischen.it.entity.UserBehavior;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** @ClassName Flink01_Project_PV* @Description DOTO* @Author mischen* @Date 2021/6/30 0030 7:52* @Version 1.0**/
public class Flink01_Project_PV {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.readTextFile("input/UserBehavior.csv").map(line -> { // 对数据切割, 然后封装到POJO中String[] split = line.split(",");return new UserBehavior(Long.valueOf(split[0]),Long.valueOf(split[1]),Integer.valueOf(split[2]),split[3],Long.valueOf(split[4]));}).filter(behavior -> "pv".equals(behavior.getBehavior())) //过滤出pv行为.map(behavior -> Tuple2.of("pv", 1L)).returns(Types.TUPLE(Types.STRING, Types.LONG)) // 使用Tuple类型, 方便后面求和.keyBy(value -> value.f0) // keyBy: 按照key分组.sum(1) // 求和.print();env.execute();}
}
4.运行结果如下:
最开始的数据条数为:
从上面对比中,可以看出最后过滤出了434349条数据。