维表join
代码编写
- 之前在讲Flink Sql的时候和大家聊过维表以及如何用Flink Sql来完成维表Join
- 现在带大家看看如何用Zeppelin来实现这个功能
- 首先,我们先引入我们所需的依赖包,目前大家先跟着我这么写,之后会讲解引入依赖的多种方式和区别。
%flink.conf # 这是第一个paragraph,大家不要把所有代码写在一个paragraph里面 # 配置一下依赖包,这种方式会自动下载依赖 flink.execution.packages org.apache.flink:flink-connector-kafka_2.11:1.10.0,org.apache.flink:flink-connector-kafka-base_2.11:1.10.0,org.apache.flink:flink-json:1.10.0,org.apache.flink:flink-jdbc_2.11:1.10.0 # 大家千万注意,如果用的是org.apache.flink:flink-connector-kafka_2.11:1.10.0,org.apache.flink:flink-connector-kafka-base_2.11:1.10.0这2个包,那么kafka 那边的 version请写universal,否则你会发现莫名其妙的错误 # 如果kafka版本低于0.11,请用org.apache.flink:link-connector-kafka-0.11_2.11 替换上面的kafka的包,kafka版本和scala版本也请替换成对应的版本,ddl语句中的version也同样如此 # 下面会用到Mysql,如果大家已经在Flink的lib目录下放了Mysql的驱动包,那么配这么多的包就行 # 否则的话,再加上mysql:mysql-connector-java:5.1.37这个包
- 然后我们注册个
File System Source
,再注册Kafka Sink
,之后会将从文件中读取的数据写入到kafka中。注意!大家不要把所有代码写在一个paragraph里面,建议一个paragraph写一段单一功能的语句%flink.ssql -- File System Source DDL DROP TABLE IF EXISTS t1; CREATE TABLE t1 ( user_id bigint, item_id bigint, category_id bigint, behavior varchar, ts bigint ) WITH ( 'connector.type' = 'filesystem', 'connector.path' = 'hdfs:///test/UserBehavior.csv', 'format.type' = 'csv', 'format.field-delimiter' = ',' ) ;
%flink.ssql -- Kafka Sink DDL DROP TABLE IF EXISTS t2; CREATE TABLE t2 ( user_id BIGINT, item_id BIGINT, category_id BIGINT, behavior STRING, ts BIGINT ) WITH ( 'update-mode' = 'append', 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'zeppelin_01_test', 'connector.properties.zookeeper.connect' = '127.0.0.1:2181', 'connector.properties.bootstrap.servers' = '127.0.0.1:9092', 'format.type'='json' )
%flink.ssql -- 将我们的数据写入kafka -- 这里之所以用了UNIX_TIMESTAMP()这个udf来代替我们原生的ts --是因为这个ts太老了,之后我们要做窗口计算的话,会一直没法输出数据的 insert into t2 select user_id,item_id,category_id,behavior,UNIX_TIMESTAMP() as ts from t1;
- 让我们运行一下看看什么情况
- 可以看到任务在持续的执行,点击这个按钮可以跳转到Flink集群上对应的任务页面,可以查看相关信息,这里就不给大家演示了
- 接下来让我们再注册个
Kafka Source
,然后从中读取数据%flink.ssql -- Kafka Source DDL DROP TABLE IF EXISTS t3; CREATE TABLE t3( user_id BIGINT, item_id BIGINT, category_id BIGINT, behavior STRING, ts BIGINT, r_t AS TO_TIMESTAMP(FROM_UNIXTIME(ts,'yyyy-MM-dd HH:mm:ss'),'yyyy-MM-dd HH:mm:ss'),-- 计算列,因为ts是bigint,没法作为水印,所以用UDF转成TimeStamp WATERMARK FOR r_t AS r_t - INTERVAL '5' SECOND -- 指定水印生成方式 )WITH ( 'update-mode' = 'append', 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'zeppelin_01_test', 'connector.properties.zookeeper.connect' = '127.0.0.1:2181', 'connector.properties.bootstrap.servers' = '127.0.0.1:9092', 'connector.properties.group.id' = 'zeppelin_01_test', 'connector.startup-mode' = 'latest-offset', 'format.type'='json' )
%flink.ssql(type=update) select * from t3
- 有个要注意的地方是,
select
语句必须指定type
,什么意思呢?type
指的是流式数据分析的三种模式- single
- append
- update
- single模式适合当输出结果是一行的情况。使用这种模式,永远只有一行数据,但这行数据会持续不断的更新
- Append模式适合不断有新数据输出,但不会覆盖原有数据,只会不断append的情况。值得注意的是,append模式的第一列一定要是timestamp,因为需要根据时间来设置一个threshold,不然数据源源不断进来,最后会OOM,如果你要预览数据的话,可以用
-- 来自简锋大佬的教诲 %flink.ssql(type=update) select * from table order by time_column desc limit 10```
- Update模式适合多行输出的情况,适合和聚合语句配合一起使用,持续不断的更新数据,配合Zeppelin的可视化控件一起使用,效果更好
- 有个要注意的地方是,
- 瞄一眼输出的内容,没什么问题,那我们开始整合
Mysql Dim
,先去Mysql库里建个表-- Mysql 建表语句,注意这是在Mysql执行的!不要在Zeppelin执行 CREATE TABLE `dim_behavior` ( `id` int(10) NOT NULL AUTO_INCREMENT COMMENT '自增主键', `en_behavior` varchar(255) COLLATE utf8_bin DEFAULT NULL COMMENT '英文 行为', `zh_behavior` varchar(255) COLLATE utf8_bin DEFAULT NULL COMMENT '中文 行为', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 COLLATE=utf8_bin; -- 搞两条数据 INSERT INTO `dijie_test`.`dim_behavior`(`id`, `en_behavior`, `zh_behavior`) VALUES (1, 'buy', '购买'); INSERT INTO `dijie_test`.`dim_behavior`(`id`, `en_behavior`, `zh_behavior`) VALUES (2, 'pv', '浏览');
- 接下来让我们回到Zeppelin,开始写我们的建表语句和查询语句
%flink.ssql DROP TABLE IF EXISTS dim_behavior; CREATE TABLE `dim_behavior` ( `id` int , `en_behavior` varchar , `zh_behavior` varchar )WITH ( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://127.0.0.1:3306/dijie_test', 'connector.table' = 'dim_behavior', 'connector.driver' = 'com.mysql.jdbc.Driver', 'connector.username' = 'hive', 'connector.password' = 'hive' , 'connector.lookup.cache.max-rows' = '5000', 'connector.lookup.cache.ttl' = '10s' )
%flink.ssql(type = update) select zh_behavior, count(distinct user_id) as cnt_distin_user, tumble_start(c.r_t,interval '10' second) as tumble_start from ( select b.*,a.* from ( select *,proctime() as p from t3 ) a left join dim_behavior FOR SYSTEM_TIME AS OF a.p AS b on a.behavior = b.en_behavior where b.zh_behavior is not null ) c group by c.zh_behavior,tumble(c.r_t,interval '10' second)
- 我们在Sql里进行了判断,把维表中没有的数据给过滤了。瞄一眼结果,发现确实正确的过滤了。而且数据正在持续不断的更新
- 大家可以对比先之前的Flink Sql 03课程,发现之前写Sql的方式实在low爆了
- 以前还得先写Java代码,不会Java就不配写Flink Sql。而现在,除了Sql,别的什么语言都没见着,纯Sql方式完成了从读取到匹配到输出的操作,实在太方便了
- 可能有同学在用Zeppelin之前也以为也要写代码,甚至会对Zeppelin嗤之以鼻:我的IDEA不香吗?当你真正开始用上的时候,你会发现,Zeppelin才是终极杀手!The Answer!
- 好了,今天的学习就到此为止,下一次带大家学习如何在Zeppelin中,使用我们Java编写的UDF和自定义的Table Factory
采坑记录
- 如果在执行
flink.conf
的内容报如下错误时,请先去Interpreter页面,重启Interpreter,再执行语句java.io.IOException: Can not change interpreter properties when interpreter process has already been launched at org.apache.zeppelin.interpreter.InterpreterSetting.setInterpreterGroupProperties(InterpreterSetting.java:958) at org.apache.zeppelin.interpreter.ConfInterpreter.interpret(ConfInterpreter.java:73) at org.apache.zeppelin.notebook.Paragraph.jobRun(Paragraph.java:479) at org.apache.zeppelin.notebook.Paragraph.jobRun(Paragraph.java:75) at org.apache.zeppelin.scheduler.Job.run(Job.java:172) at org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:130) at org.apache.zeppelin.scheduler.FIFOScheduler.lambda$runJobInScheduler$0(FIFOScheduler.java:39) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
- 在执行
insert
或者select
时,如果发现任务一点执行就立刻结束,没有报错,Flink Web Ui 也看不到相应的任务信息,同时,Zeppelin的日志也查不到有些的信息时,请将该paragraph的注释内容全部删除,再点击执行,你就会发现任务能够正常运行了。 - 简锋大佬怀疑是Zeppelin把这条sql当成comment了,应该是个bug。我已经提ticket了,链接
- 本次教程的notebook,我也都放到了我github的仓库里了,传送门
最后,向大家宣传一下Flink on Zeppelin 的钉钉群,大家有问题可以在里面讨论,简锋大佬也在里面,有问题直接提问就好