概述
需求
计算每小时每个页面的 PV 和 UV
数据样本(页面ID,用户ID,时间戳)
{
"device_id": "d1", "page_id": "p1", "timestamp": 3600}
{
"device_id": "d1", "page_id": "p2", "timestamp": 3601}
{
"device_id": "d2", "page_id": "p2", "timestamp": 3602}
{
"device_id": "d1", "page_id": "p3", "timestamp": 3603}
{
"device_id": "d2", "page_id": "p3", "timestamp": 3604}
{
"device_id": "d3", "page_id": "p3", "timestamp": 3610}
{
"device_id": "d3", "page_id": "p3", "timestamp": 3610}
{
"device_id": "d11", "page_id": "p1", "timestamp": 7200}
{
"device_id": "d11", "page_id": "p2", "timestamp": 7201}
{
"device_id": "d12", "page_id": "p2", "timestamp": 7202}
{
"device_id": "d11", "page_id": "p3", "timestamp": 7203}
{
"device_id": "d12", "page_id": "p3", "timestamp": 7204}
{
"device_id": "d13", "page_id": "p3", "timestamp": 7210}
{
"device_id": "d11", "page_id": "p1", "timestamp": 7210}
{
"device_id": "d1", "page_id": "p1", "timestamp": 10800}
{
"device_id": "d1", "page_id": "p2", "timestamp": 10801}
{
"device_id": "d2", "page_id": "p2", "timestamp": 10802}
{
"device_id": "d1", "page_id": "p3", "timestamp": 10803}
{
"device_id": "d2", "page_id": "p3", "timestamp": 10811}
{
"device_id": "d3", "page_id": "p3", "timestamp": 10811}
环境和依赖
本地开发环境:WIN10+JDK1.8+IDEA+Maven3.6.3
<!-- 配置 -->
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.14.6</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<fastjson.version>2.0.19</fastjson.version>
<lombok.version>1.18.24</lombok.version>
</properties>
<!-- https://mvnrepository.com/ -->
<dependencies>
<!-- Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- JSON解析 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<!-- 简化JavaBean书写 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</dependency>
</dependencies>
PV计算代码
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.util.Scanner;
public class PvCount {
public static void main(String[] args) throws Exception {
//创建执行环境,设置并行度
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(2);
//加入自定义数据源
DataStreamSource<String> d = env.addSource(new MySource());
//水位线策略
WatermarkStrategy<PV> w = WatermarkStrategy.<PV>forMonotonousTimestamps()
.withTimestampAssigner((SerializableTimestampAssigner<PV>) (e, recordTimestamp) -> e.timestamp * 1000L);
//WatermarkStrategy<PV> w = WatermarkStrategy.<PV>forBoundedOutOfOrderness(Duration.ofSeconds(2))
// .withTimestampAssigner((SerializableTimestampAssigner<PV>) (e, r) -> e.timestamp * 1000L);
//########################################## JSON解析 #############################################
SingleOutputStreamOperator<Tuple2<String, Long>> s = d
//1、过滤非JSON
.filter(PvCount::isValidJSON)
//2、String转JSON
.map(JSONObject::parseObject)
//3、JSON转JavaBean
.map(j -> j.toJavaObject(PV.class))
//4、过滤空pageId
.filter(j -> j.getPageId() != null)
//5、确定水位线
.assignTimestampsAndWatermarks(w)
//6、记数
.map(j -> Tuple2.of(j.getPageId(), 1L)).returns(Types.TUPLE(Types.STRING, Types.LONG));
//######################################## 计算每小时PV ###########################################
//s.windowAll(TumblingEventTimeWindows.of(Time.hours(1))).sum(1).print("总PV");
//################################### 计算每小时每个页面PV ##########################################
s
//7、按页面ID分区
.keyBy(t -> t.f0)
//8、开启1小时的滚动窗口
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.allowedLateness(Time.seconds(1)) //允许迟到
//9、求和
.sum(1)
//10、输出(假设写到数据库)
.addSink(new RichSinkFunction<Tuple2<String, Long>>() {
@Override
public void invoke(Tuple2<String, Long> value, Context context) {
//允许迟到的情况下,当迟到数据来了后,要求更新数据库
long windowEnd = context.timestamp();
long windowStart = windowEnd - 3599999;
System.out.println(windowStart + "~" + windowEnd + ":" + value);
}
@Override
public void open(Configuration parameters) {
System.out.println("假设创建数据库连接");}
@Override
public void close() {
System.out.println("假设关闭数据库连接");}
});
//执行
env.execute();
}
/** 判断字符串是否能转换成合法的JSON */
public static boolean isValidJSON(String string) {
try {
JSONObject.parseObject(string);
return true;
} catch (JSONException e) {
return false;
}
}
/** 统计PageView的JavaBean */
@Data
@AllArgsConstructor
public static class PV {
String deviceId;
String pageId;
Long timestamp;
}
/** 自定义数据源 */
public static class MySource implements SourceFunction<String> {
public MySource() {
}
@Override
public void run(SourceContext<String> sc) {
Scanner scanner = new Scanner(System.in);
while (true) {
String str = scanner.nextLine().trim();
if (str.equals("STOP")) {
break;}
if (!str.equals("")) {
sc.collect(str); }
}
scanner.close();
}
@Override
public void cancel() {
}
}
}
并行度设1,关闭允许迟到,测试结果如下
UV计算代码
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.HashSet;
import java.util.Scanner;
public class UvCount {
public static void main(String[] args) throws Exception {
//创建执行环境,设置并行度
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(2);
//加入自定义数据源
DataStreamSource<String> d = env.addSource(new MySource());
//水位线策略
WatermarkStrategy<PV> w = WatermarkStrategy.<PV>forMonotonousTimestamps()
.withTimestampAssigner((SerializableTimestampAssigner<PV>) (e, recordTimestamp) -> e.timestamp * 1000L);
//########################################## JSON解析 #############################################
SingleOutputStreamOperator<PV> s = d
//1、过滤非JSON
.filter(PvCount::isValidJSON)
//2、String转JSON
.map(JSONObject::parseObject)
//3、JSON转JavaBean
.map(j -> j.toJavaObject(PV.class))
//4、过滤空pageId
.filter(j -> j.getPageId() != null)
//5、确定水位线
.assignTimestampsAndWatermarks(w);
//################################### 计算每小时每个页面UV ##########################################
s
//6、按页面ID分区
.keyBy(PV::getPageId)
//7、开启1小时的滚动窗口
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.allowedLateness(Time.seconds(10L))
//8、ProcessWindowFunction<IN, OUT, KEY, W extends Window>
.process(new ProcessWindowFunction<PV, String, String, TimeWindow>() {
@Override
public void process(String k, Context context, Iterable<PV> elements, Collector<String> out) {
//process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out)
HashSet<String> hs = new HashSet<>();
for (PV element : elements) {
hs.add(element.getDeviceId());
}
TimeWindow w = context.window();
long start = w.getStart();
long end = w.getEnd() - 1L;
out.collect(start + "~" + end + ":" + k + ":" + hs.size());
}
}).print();
//执行
env.execute();
}
/** 判断字符串是否能转换成合法的JSON */
public static boolean isValidJSON(String string) {
try {
JSONObject.parseObject(string);
return true;
} catch (JSONException e) {
return false;
}
}
/** 统计PageView的JavaBean */
@Data
@AllArgsConstructor
public static class PV {
String deviceId;
String pageId;
Long timestamp;
}
/** 自定义数据源 */
public static class MySource implements SourceFunction<String> {
public MySource() {
}
@Override
public void run(SourceContext<String> sc) {
Scanner scanner = new Scanner(System.in);
while (true) {
String str = scanner.nextLine().trim();
if (str.equals("STOP")) {
break;}
if (!str.equals("")) {
sc.collect(str);}
}
scanner.close();
}
@Override
public void cancel() {
}
}
}
并行度设1,关闭允许迟到,测试结果如下