Exception in thread "main" org.apache.flink.api.common.typeutils.CompositeType$InvalidFieldReferenceException: Cannot reference field by field expression on GenericType<UserActionLogPOJO>Field expressions are only supported on POJO types, tuples, and case classes. (See the Flink documentation on what is considered a POJO.)
at org.apache.flink.streaming.util.typeutils.FieldAccessorFactory.getAccessor(FieldAccessorFactory.java:193)
at org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator.<init>(ComparableAggregator.java:67)
at org.apache.flink.streaming.api.datastream.KeyedStream.max(KeyedStream.java:836)
at Aggregate.main(Aggregate.java:52)
Process finished with exit code 1
解决方案:
①src/main下面新建文件夹resources
②resources中新建文件log4j.properties
③log4j.properties中加入:
# 可以设置级别: debug>info>error
#debug :显示 debug 、 info 、 error
#info :显示 info 、 error
#error :只 error
# 也就是说只显示比大于等于当前级别的信息
log4j.rootLogger=info,appender1
#log4j.rootLogger=info,appender1
#log4j.rootLogger=error,appender1
# 输出到控制台
log4j.appender.appender1=org.apache.log4j.ConsoleAppender
# 样式为 TTCCLayout
log4j.appender.appender1.layout=org.apache.log4j.TTCCLayout
④pom.xml中加入:
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
⑤重新运行代码,我们会看到:
[main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class UserActionLogPOJO does not contain a getter for field productPrice
[main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class class UserActionLogPOJO cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
去你的工程下面的POJO类中,根据上述INFO信息进行修改,再重新运行即可解决该问题.
注意哈,上述设置办法其实也适合于其他maven工程,可以不一定是flink
好了,最后给一个POJO的样子(Flink认可的pojo类)
import java.io.Serializable;
public class UserActionLogPOJO implements Serializable
{
private String userId; //用户id
private String itemId; //商品分类id
private int productPrice; //商品价格
public UserActionLogPOJO (String userId,String itemId,int productPrice)
{
this.userId=userId;
this.itemId=itemId;
this.productPrice=productPrice;
}
/** default constructor */
public UserActionLogPOJO ()
{
}
public String getUserID()
{
return this.userId;
}
public void setProductID(String itemId)
{
this.itemId=itemId;
}
//-----------------------------------
public String getItemId() {
return itemId;
}
public void setItemId(String itemId) {
this.itemId = itemId;
}
//-----------------------------------
public void setProductPrice(int price)
{
this.productPrice=productPrice;
}
public int getproductPrice() {
return productPrice;
}
//-----------------------------------
//-----------------------------------
public void setUserId(String userId) {
this.userId = userId;
}
public String getUserId() {
return userId;
}
public String toString()
{
return "userId="+userId+","+"price="+productPrice;
}
}
Reference: