package com.wudl.flink.action;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
/**
* @author :wudl
* @date :Created in 2021-12-07 0:16
* @description:广播流的使用
* @modified By:
* @version: 1.0
*/
public class BroadcastStateDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<Tuple4<String, String, String, Integer>> logDs = env.addSource(new MySource());
DataStream<Map<String, Tuple2<String, Integer>>> userDs = env.addSource(new MySqlSource());
MapStateDescriptor<Void, Map<String, Tuple2<String, Integer>>> descriptor = new MapStateDescriptor<>("user", Types.VOID, Types.MAP(Types.STRING, Types.TUPLE(Types.STRING, Types.INT)));
//.根据状态描述器将userInfoDS作为状态进行广播
BroadcastStream<Map<String, Tuple2<String, Integer>>> broadcastDs = userDs.broadcast(descriptor);
//将实时日志事件流和广播流进行连接
BroadcastConnectedStream<Tuple4<String, String, String, Integer>, Map<String, Tuple2<String, Integer>>> connectDs = logDs.connect(broadcastDs);
//--4.处理连接流中的数据
SingleOutputStreamOperator<Object> resultDs = connectDs.process(new BroadcastProcessFunction<Tuple4<String, String, String, Integer>, Map<String, Tuple2<String, Integer>>, Object>() {
@Override
public void processElement(Tuple4<String, String, String, Integer> value, ReadOnlyContext ctx, Collector<Object> collector) throws Exception {
String userId = value.f0;
//拿到状态
ReadOnlyBroadcastState<Void, Map<String, Tuple2<String, Integer>>> broadcastState = ctx.getBroadcastState(descriptor);
Map<String, Tuple2<String, Integer>> userMap = broadcastState.get(null);
if (userMap != null) {
Tuple2<String, Integer> user = userMap.get(userId);
String userName = user.f0;
Integer userAage = user.f1;
collector.collect(Tuple6.of(userId, userName, userAage, value.f1, value.f2, value.f3));
}
}
// 处理广播的状态
@Override
public void processBroadcastElement(Map<String, Tuple2<String, Integer>> value, Context ctx, Collector<Object> collector) throws Exception {
BroadcastState<Void, Map<String, Tuple2<String, Integer>>> broadcastState = ctx.getBroadcastState(descriptor);
broadcastState.clear();
broadcastState.put(null, value);
}
});
resultDs.print("----------------->");
env.execute();
}
/**
* .实时日志事件流
*/
public static class MySource implements SourceFunction<Tuple4<String, String, String, Integer>> {
private boolean isRunning = true;
@Override
public void run(SourceContext<Tuple4<String, String, String, Integer>> ctx) throws Exception {
Random random = new Random();
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
while (isRunning) {
int id = random.nextInt(4) + 1;
String user_id = "user_" + id;
String eventTime = df.format(new Date());
String eventType = "type_" + random.nextInt(3);
int productId = random.nextInt(4);
ctx.collect(Tuple4.of(user_id, eventTime, eventType, productId));
Thread.sleep(500);
}
}
@Override
public void cancel() {
isRunning = false;
}
}
public static class MySqlSource extends RichSourceFunction<Map<String, Tuple2<String, Integer>>> {
private boolean flag = true;
private Connection conn = null;
private PreparedStatement ps = null;
private ResultSet rs = null;
@Override
public void open(Configuration parameters) throws Exception {
conn = DriverManager.getConnection("jdbc:mysql://192.168.1.180:3306/test", "root", "123456");
String sql = "select `userID`, `userName`, `userAge` from `user_info`";
ps = conn.prepareStatement(sql);
}
@Override
public void run(SourceContext<Map<String, Tuple2<String, Integer>>> sourceContext) throws Exception {
while (flag) {
Map<String, Tuple2<String, Integer>> map = new HashMap<>();
ResultSet rs = ps.executeQuery();
while (rs.next()) {
String userID = rs.getString("userID");
String userName = rs.getString("userName");
int userAge = rs.getInt("userAge");
//Map<String, Tuple2<String, Integer>>
map.put(userID, Tuple2.of(userName, userAge));
}
sourceContext.collect(map);
Thread.sleep(5000);//每隔5s更新一下用户的配置信息!
}
}
@Override
public void cancel() {
flag = false;
}
@Override
public void close() throws Exception {
if (conn != null) {
conn.close();
}
if (ps != null) {
ps.close();
}
if (rs != null) {
rs.close();
}
}
}
}
Flink --广播流BroadcastState 的应用
猜你喜欢
转载自blog.csdn.net/wudonglianga/article/details/121759864
今日推荐
周排行