1,踩坑的地方,可能是依赖冲突:
在开始执行代码的时候发现报错是缺少ORC的依赖,所以导致添加了部分依赖,导致程序执行结果跟预想不一样
2,不熟悉具体API导致
3,都是idea本地执行的代码,还没在集群测试。
POM文件,干净版本:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>${flink.version}</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>1.11.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-hive --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-hive_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.hive/hive-exec --> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>1.1.0</version> </dependency>
代码:
public class StreamingWriteHive2 { private static final String KAFKA_SQL = "CREATE TABLE kafkaTable22 (\n" + " code STRING," + " total_emp INT ," + " ts bigint ," + " r_t AS TO_TIMESTAMP(FROM_UNIXTIME(ts,'yyyy-MM-dd HH:mm:ss'),'yyyy-MM-dd HH:mm:ss'),\n" + " WATERMARK FOR r_t AS r_t - INTERVAL '5' SECOND "+ ") WITH (" + " 'connector' = 'kafka'," + " 'topic' = 'flink_dwd_test4'," + " 'properties.bootstrap.servers' = 'node1:9092'," + " 'properties.group.id' = 'test1'," + " 'format' = 'json'," + " 'scan.startup.mode' = 'latest-offset'" + ")"; public static void main(String[] args) throws Exception{ StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv, bsSettings); bsEnv.enableCheckpointing(5000); bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); String name = "myhive"; String defaultDatabase = "flink"; String hiveConfDir = "G:\\=Flink SQL开发文件"; // a local path String version = "1.1.0"; HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version); tEnv.registerCatalog("myhive", hive); tEnv.useCatalog("myhive"); tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); tEnv.executeSql("drop table kafkaTable22"); tEnv.executeSql(KAFKA_SQL); tEnv.getConfig().setSqlDialect(SqlDialect.HIVE); // tEnv.executeSql("drop table fs_table1111"); // 如果hive中已经存在了相应的表,则这段代码省略 String hiveSql = "CREATE TABLE fs_table1111 (\n" + " f_random_str STRING,\n" + " f_sequence INT" + ") partitioned by (dt string,hr string) " + "stored as PARQUET " + "TBLPROPERTIES (\n" + " 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',\n" + " 'sink.partition-commit.delay'='5 s',\n" + " 'sink.partition-commit.trigger'='partition-time',\n" + // " 'sink.partition-commit.delay'='1 m',\n" + " 'sink.partition-commit.policy.kind'='metastore,success-file'" + ")"; tEnv.executeSql(hiveSql); String insertSql = "insert into fs_table1111 SELECT code, total_emp, " + " DATE_FORMAT(r_t, 'yyyy-MM-dd'), DATE_FORMAT(r_t, 'HH') FROM kafkaTable22"; tEnv.executeSql(insertSql).print(); }
这里要重点说一下
sink.partition-commit.policy.kind'='metastore'
'sink.partition-commit.policy.kind'='metastore,success-file'
这里地方踩坑了:
如果不加success-file.name的时候,在初次启动,读取kafka fka 数据写入hive,hive表跟kafka表都是刚刚创建的,如果kafka没有新的数据进入,hive的元数据信息是不会更新的,所以查询hive的时候是没有数据的。其实原理就是加了success-file之后,
hive在hdfs上面的一个小文件就完成了,会更新元数据信息。
kafka代码:
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Date; import java.util.Properties; import java.util.Random; public class KafkaProducerUtil extends Thread { private String topic; public KafkaProducerUtil(String topic) { super(); this.topic = topic; } private Producer<String, String> createProducer() { // 通过Properties类设置Producer的属性 Properties properties = new Properties(); properties.put("bootstrap.servers", "dev-ct6-dc-worker01:9092,dev-ct6-dc-worker02:9092,dev-ct6-dc-worker03:9092"); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); return new KafkaProducer<String, String>(properties); } @Override public void run() { Producer<String, String> producer = createProducer(); Random random = new Random(); Random random2 = new Random(); while (true) { int nums = random.nextInt(10); int nums2 = random2.nextInt(10); String time = System.currentTimeMillis() / 1000 + 5 + ""; String type = "pv"; try { if (nums2 % 2 == 0) { type = "pv"; } else { type = "uv"; } String kaifa_log = "{\"code\":\"" + type+"\",\"total_emp\":\"1" + "\",\"ts\":" + time + "}"; System.out.println("kaifa_log = " + kaifa_log); producer.send(new ProducerRecord<String, String>(this.topic, kaifa_log)); } catch (Exception e) { e.printStackTrace(); } System.out.println("=========循环一次=========="); try { sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { new KafkaProducerUtil("flink_dwd_test4").run(); } }
如果不报错的化 HDFS 查看数据是否写入,没有数据写入 可能是hadoop环境的问题,查新hive表是否有数据。
reading hive:
注意2个点:
如果没有这2个设置我们查询hive表数据会发现每次查询一次任务就完结了,加了下面的参数之后,任务会一直执行,不停的执行。
tEnv.getConfig().getConfiguration().setBoolean("table.dynamic-table-options.enabled",true);
"
/*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2020-07-14') */
"
代码如下:
public class QueryHiveTable { public static void main(String[] args) { StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv, bsSettings); bsEnv.enableCheckpointing(5000); bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); /* Configuration configuration = tEnv.getConfig().getConfiguration(); configuration.setString("stream-source.enable", "true"); configuration.setString("stream-source.monitor-interval", "1 m"); */ tEnv.getConfig().getConfiguration().setBoolean("table.dynamic-table-options.enabled",true); String name = "myhive"; String defaultDatabase = "flink"; String hiveConfDir = "G:\\汪小剑的文件夹\\司机宝文件夹\\Flink SQL开发文件"; // a local path String version = "1.1.0"; HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version); tEnv.registerCatalog("myhive", hive); tEnv.useCatalog("myhive"); tEnv.getConfig().setSqlDialect(SqlDialect.HIVE); // String querySql = "select * from fs_table1111"; String querySql = "select * from fs_table1111 /*+ OPTIONS('streaming-source.enable'='true') */"; String querySql2 = "SELECT count(*) FROM fs_table1111 /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2020-07-14') */"; // TableResult tableResult = tEnv.executeSql("select * from fs_table1111 /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2020-07-14') */"); System.out.println("querySql = " + querySql2); tEnv.executeSql(querySql2).print(); // tableResult.print(); // String querySql22 = "select count(*) from fs_table21"; // tEnv.executeSql(querySql22).print(); } }
都是踩的坑呐~还好有白斩鸡大佬指点,得多看他的博客了,博客地址: