文章目录
异常
Exception in thread "main" org.apache.flink.table.api.TableException: Only the first field can reference an atomic type.
at org.apache.flink.table.api.TableEnvironment.$anonfun$getFieldInfo$7(TableEnvironment.scala:1117)
at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:240)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:194)
at scala.collection.TraversableLike.flatMap(TraversableLike.scala:240)
at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:237)
at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:194)
at org.apache.flink.table.api.TableEnvironment.getFieldInfo(TableEnvironment.scala:1112)
at org.apache.flink.table.api.BatchTableEnvironment.registerDataSetInternal(BatchTableEnvironment.scala:423)
at org.apache.flink.table.api.java.BatchTableEnvironment.registerDataSet(BatchTableEnvironment.scala:129)
at com.tt.study.sql.WordCountSql.main(WordCountSql.java:33)
maven依赖
<properties>
<flink.version>1.7.2</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
flink sql实例
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import java.util.ArrayList;
import java.util.List;
public class WordCountSql {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
List<WC> list = new ArrayList<>();
String wordsStr = "Hello Flink Hello immoc";
String[] words = wordsStr.split("\\W+");
for (String word : words){
WC wc = new WC(word, 1);
list.add(wc);
}
DataSet<WC> input = env.fromCollection(list);
tEnv.registerDataSet("WordCount", input,"word, frequency");
Table table = tEnv.sqlQuery("select word, sum(frequency) as frequency from WordCount group by word");
DataSet<WC> result = tEnv.toDataSet(table, WC.class);
result.print();
}
public static class WC {
String word;
long frequency;
public WC() {
}
public WC(String word, long count) {
this.word = word;
this.frequency = count;
}
@Override
public String toString() {
return "WC{" +
"word='" + word + '\'' +
", frequency=" + frequency +
'}';
}
}
}
方案
参考网上方案加了依赖、和Bean的无参构造方法后异常仍然存在。于是查看官方的样例 ,仔细对比代码实现。发现样例的静态类属性是public,于是在WC类中生成get、set方法,或者用lombok包的@Data后,异常得以解决。
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
WC{word='Hello', frequency=2}
WC{word='immoc', frequency=1}
WC{word='Flink', frequency=1}
Disconnected from the target VM, address: '127.0.0.1:58738', transport: 'socket'
Process finished with exit code 0