kafka正常启动,flink无法正常消费,也不报错
解决办法:配置属性为从所有分区的最新偏移量开始读取startFromLatest()
package com.atguigu.apitest.tabletest
/*import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.{DataTypes, EnvironmentSettings, Table, TableEnvironment}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.descriptors._*/
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.{
DataTypes, Table}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.descriptors._
/**
* Copyright (c) 2018-2028 尚硅谷 All Rights Reserved
*
* Project: FlinkTutorial
* Package: com.atguigu.apitest.tabletest
* Version: 1.0
*
* Created by wushengran on 2020/8/10 14:23
*/
object TableApiTest {
def main(args: Array[String]): Unit = {
// 1. 创建环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val tableEnv = StreamTableEnvironment.create(env)
tableEnv.connect(
new Kafka()
.version("0.11")
.topic("sensor")
.startFromLatest()//配置为从所有分区的最新偏移量开始读取
.property("zookeeper.connect", "node001:2181")
.property("bootstrap.servers", "node001:9092")
).withFormat(new Csv())
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("timestamp", DataTypes.BIGINT())
.field("temperature", DataTypes.DOUBLE())
)
.createTemporaryTable("kafkaInputTable")
val inputTable: Table = tableEnv.from("kafkaInputTable")
inputTable.toAppendStream[(String,Long,Double)].print()
env.execute("table api test! ")
}
}