Flink示例——Table、SQL
其他
2020-03-04 10:51:16
阅读次数: 0
Flink示例——Table、SQL
版本信息
产品 |
版本 |
Flink |
1.7.2 |
Java |
1.8.0_231 |
Scala |
2.11.12 |
Mavan依赖
自定义SourceFunction
- 提供一个SourceFunction,方便后面测试
public class CustomSourceFunction extends RichSourceFunction<String> {
private boolean flag = true;
private long idAdder = 0L;
@Override
public void run(SourceContext<String> ctx) throws Exception {
List<String> nameList = Arrays.asList("xiaowang", "lilei", "yangyang", "zhangsan", "lisi", "wangwu", "meimei");
List<String> addressList = Arrays.asList("beijing", "chongqing", "shanghai", "nanjing", "chengdu", "guangzhou");
Random random = new Random();
while (flag) {
Thread.sleep(100);
long id = idAdder++;
String name = nameList.get(random.nextInt(nameList.size()));
int age = random.nextInt(20) + 10;
String address = addressList.get(random.nextInt(addressList.size()));
long eventTime = System.currentTimeMillis() + (random.nextInt(nameList.size()) - nameList.size() / 2) * 1000;
ctx.collect(id + "," + name+ "," + age + "," + address + "," + eventTime);
}
}
@Override
public void cancel() {
flag = false;
}
}
TableAPI、SQL 简单示例
- 使用Tuple方式转换DataStrem(较繁琐)
public class SimpleDemo {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
StreamTableEnvironment tableEnv = StreamTableEnvironment.getTableEnvironment(env);
CustomSourceFunction sourceFunction = new CustomSourceFunction();
DataStreamSource<String> customDS = env.addSource(sourceFunction);
DataStream<Tuple5<Long, String, Integer, String, Long>> personDS = customDS.map(line -> {
String[] fields = line.split(",");
long id = Long.parseLong(fields[0]);
String name = fields[1];
int age = Integer.parseInt(fields[2]);
String address = fields[3];
long eventTime = Long.parseLong(fields[4]);
return Tuple5.of(id, name, age, address, eventTime);
}).returns(Types.TUPLE(Types.LONG, Types.STRING, Types.INT, Types.STRING, Types.LONG));
Table srcTable = tableEnv.fromDataStream(personDS, "id, name, age, address, eventTime");
srcTable.printSchema();
Table table = srcTable
.filter("age > 20 && age < 25")
.select("name, age, address");
tableEnv.toAppendStream(table, Row.class)
.print();
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
- 使用POJO方式转换DataStrem(需要定义POJO类)
- POJO类 Person
public class Person {
public long id;
public String name;
public int age;
public String address;
public long eventTime;
public Person() {
}
public Person(long id, String name, int age, String address, long eventTime) {
this.id = id;
this.name = name;
this.age = age;
this.address = address;
this.eventTime = eventTime;
}
public static Person parseCSV(String line) {
String[] fields = line.split(",");
long id = Long.parseLong(fields[0]);
String name = fields[1];
int age = Integer.parseInt(fields[2]);
String address = fields[3];
long eventTime = Long.parseLong(fields[4]);
return new Person(id, name, age, address, eventTime);
}
}
- 处理代码
DataStream<Person> personDS = customDS.map(Person::parseCSV);
Table table = tableEnv.fromDataStream(personDS)
.filter("age > 20 && age < 25")
.select("name, age, address");
tableEnv.toAppendStream(table, Row.class)
.print();
- 使用SQL处理DataStream
DataStream<Person> personDS = customDS.map(Person::parseCSV);
tableEnv.registerDataStream("tb_person", personDS);
Table table = tableEnv.sqlQuery(
"SELECT name, age, address FROM tb_person WHERE age > '20' AND age < '25'"
);
tableEnv.toAppendStream(table, Row.class).print();
TableAPI、SQL 窗口聚合示例
- 代码 WindowAggDemo
public class WindowAggDemo {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
StreamTableEnvironment tableEnv = StreamTableEnvironment.getTableEnvironment(env);
CustomSourceFunction sourceFunction = new CustomSourceFunction();
DataStreamSource<String> customDS = env.addSource(sourceFunction);
DataStream<Person> personDS = customDS.map(Person::parseCSV);
Table srcTable = tableEnv.fromDataStream(personDS, "id, name, age, address, eventTime, UserActionTime.proctime");
Window myWindow = Tumble.over("5.seconds").on("UserActionTime").as("myWindow");
Table table = srcTable.window(myWindow)
.groupBy("myWindow, address")
.select("address, age.avg");
tableEnv.toRetractStream(table, Row.class)
.filter(tuple2 -> tuple2.f0)
.print();
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
发布了146 篇原创文章 ·
获赞 54 ·
访问量 17万+
转载自blog.csdn.net/alionsss/article/details/104282584