昨天晚上Flink1.11出了,这次改动很多,我只关心hive这一部分。
目前尝试了几个小时用代码读取hive,安装官网的文档,没成功,先蹭个热点,记录下。
先贴一下依赖吧:
注意:反正各种报错,看社区有说需要flink-clients.jar 手动去下载导入依赖
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.6.0-cdh5.16.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.6.0-cdh5.16.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.0-cdh5.16.1</version> <scope>provided</scope> </dependency> <!--hive依赖--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-hive_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>1.1.0</version> <scope>provided</scope> </dependency> </dependencies>
这里运行不报错,可是API变了,不知道怎么打印输出,尝试使用
table.execute().print();
错误信息
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/optimizer/costs/CostEstimator
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //构建EnvironmentSettings 并指定Blink Planner EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); //构建TableEnvironment TableEnvironment tableEnv = TableEnvironment.create(bsSettings); // StreamTableEnvironment tableEnv2 = StreamTableEnvironment.create(bsSettings); String name = "myhive"; String defaultDatabase = "default"; String hiveConfDir = "G:\\xxxx\\Flink SQL开发文件"; // hive配置文件地址 String version = "1.1.0"; Catalog catalog = new HiveCatalog(name,defaultDatabase, hiveConfDir, version); tableEnv.registerCatalog("myhive", catalog); tableEnv.useCatalog("myhive"); String createDbSql = "SELECT code ,total_emp FROM sample_07 "; String[] strings = tableEnv.listTables(); for (int i = 0; i < strings.length; i++) { System.out.println(strings[i]); } Table table = tableEnv.sqlQuery(createDbSql); table.printSchema();env.execute();
后面尝试直接查询hive的数据插入kafka试试。 未完待续