目前需要用到维表数据,所以选择hbase和mysql作为维表数据存储,目前主要考虑参考官网。
不知道依赖的,可以去参考官网,或者链接别人的代码。
还有此人的博客地址(我称之为白斩鸡兄),典型的干货多博客不火。
https://blog.csdn.net/weixin_47482194/article/details/105854970
github的传送门:https://github.com/lonelyGhostisdog/flinksql
大概率这个文章也是参考官网跟他的文章写的,当然都得自己去实践的,不实践怎么发现问题学习呢?
1)sql加载kafka的数据
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); EnvironmentSettings blinkStreamSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment blinkStreamTableEnv = StreamTableEnvironment.create(env, blinkStreamSettings); String ddlSource = "CREATE TABLE source_kafka (\n" + " order_key STRING ," + " order_number STRING ," + " company_code STRING , " + " ts BIGINT," + " proctime AS PROCTIME() " + ") WITH (\n" + " 'connector.type' = 'kafka',\n" + " 'connector.version' = '0.11',\n" + " 'connector.topic' = 'test_window_01',\n" + " 'connector.startup-mode' = 'earliest-offset',\n" + " 'connector.properties.zookeeper.connect' = 'xxx:2181',\n" + " 'connector.properties.bootstrap.servers' = 'xxx:9092',\n" + " 'format.type' = 'json'\n" + ")"; blinkStreamTableEnv.sqlUpdate(ddlSource);
2)读取mysql,都是基础代码
String dimDDL = "" + "CREATE TABLE dim_mysql ( " + " aaa BIGINT, " + " bbbb STRING , " + " ccc STRING " + ") WITH ( " + " 'connector.type' = 'jdbc', " + " 'connector.url' = 'jdbc:mysql://xxx:3306/test', " + " 'connector.table' = 'test', " + " 'connector.driver' = 'com.mysql.jdbc.Driver', " + " 'connector.username' = 'root', " + " 'connector.password' = '12345678' " + " 'connector.lookup.cache.max-rows' = '5000', " //缓存的最大行 + " 'connector.lookup.cache.ttl' = '360s', " //这个参数是ttl缓存时间,时间到了会更新,看我们的维表更新频率 + " 'connector.lookup.max-retries' = '3' " + ")"; blinkStreamTableEnv.sqlUpdate(dimDDL); String quertSql = "select * from dim_mysql ";
3)hbase表join 效果:
public class Dw_count_user_all_bak { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); EnvironmentSettings blinkStreamSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment blinkStreamTableEnv = StreamTableEnvironment.create(env, blinkStreamSettings); SingleOutputStreamOperator<Row> ds = env.addSource(new RichSourceFunction<Row>() { @Override public void run(SourceContext<Row> ctx) throws Exception { Row r = new Row(2); r.setField(0, "a"); r.setField(1, "a"); ctx.collect(r); } @Override public void cancel() { } }).returns(Types.ROW(Types.STRING, Types.STRING)); // blinkStreamTableEnv.createTemporaryView("t",ds,"id,order_key,proctime.proctime"); String ddlSource = "CREATE TABLE t (\n" + " order_key STRING ," + " order_number STRING ," + " company_code STRING , " + " ts BIGINT," + " proctime AS PROCTIME() " + ") WITH (" + " 'connector.type' = 'kafka'," + " 'connector.version' = '0.11'," + " 'connector.topic' = 'test_window_01'," + " 'connector.startup-mode' = 'earliest-offset'," + " 'connector.properties.zookeeper.connect' = 'xxx:2181'," + " 'connector.properties.bootstrap.servers' = 'xxx:9092'," + " 'format.type' = 'json'" + ")"; blinkStreamTableEnv.sqlUpdate(ddlSource); //todo join Hbase维表 String hbaseDDL = "" + "CREATE TABLE dim_hbase ( " + "rowkey String, " + "cf ROW<aaa String> " + ") WITH ( " + " 'connector.type' = 'hbase'," + " 'connector.version' = '1.4.3', " + " 'connector.table-name' = 'test'," + " 'connector.zookeeper.quorum' = 'xxx:2181,xxx:2181,xxx:2181'," + " 'connector.zookeeper.znode.parent' = '/hbase' " + ")"; System.out.println("hbaseDDL = " + hbaseDDL); blinkStreamTableEnv.sqlUpdate(hbaseDDL); // String queryHbase = "select * from dim_hbase"; // Table hbaseTable = blinkStreamTableEnv.sqlQuery(queryHbase); // blinkStreamTableEnv.toAppendStream(hbaseTable,Row.class).print(); //todo kafka流 join hbase的流 Table table2 = blinkStreamTableEnv.sqlQuery("select a.* ,b.* from t a left " + " join dim_hbase FOR SYSTEM_TIME AS OF a.proctime AS b on a.order_key = b.rowkey"); blinkStreamTableEnv.toRetractStream(table2,Row.class).print(); blinkStreamTableEnv.execute(Dw_count_user_all.class.getSimpleName()); // env.execute("dont't kill me please"); } }
4)总结的话:
我们在创建mysqlDDL的时候,是可以通过select * 查询这个表
我们在创建hbaseDDL的时候,是不可以通过select * 查询这个表,会报错~~这是个坑。记得就好