flink问题解决集合一:集群安装使用

(1)安装standlode集群模式启动

启动脚本是 bin/start-cluster.sh  不能用sh start-cluster

flink-1.8.1/bin/flink list

flink-1.8.1/bin/flink cancel 9b99be4eed871c4e62562f9035ebef65

(2)flink任务停不掉

执行取消命令后,查看一直在取消状态

是由于有台机器挂掉了,没有响应

提交任务的时候,报错

flink scala.Predef$.refArrayOps([Ljava/lang/Object;)Lscala/collection/mutabl

项目用到了import scala.collection.JavaConverters._

所以需要引入scala的jar包

我本地用的scala2.11,我相信自己线上下的也是2.11,找了半天,在flink安装目录opt/jar中看到是2.12的scala,版本不一致导致的

(3)web监控页面

Bytes received       Records received  Bytes sent      Records sent

如果源和sink在同一个里面是不会有显示值的

需要额外说明的是,这里的输入/输出只是在 flink 的各个节点之间,不包含与外界组件的交互信息。所以,这里的统计里, flink source 的 read-bytes/read-records 都是 0;flink sink 的 write-bytes/write-records 都是 0。在 flink UI 上的显示也是如此。(yuchuanchen的博客有说明:https://blog.csdn.net/yuchuanchen/article/details/88406438)

通过在写出前加个keyby就可以了

val source = env
  .addSource(kafkaConsumerA)
  .map(a =>{
    val simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
    val day = simpleDateFormat.format(new Date())
    parseToBean(a,day)
  })
source.keyBy(a =>{
  a.carid
}).map(a =>toClickHouseInsertFormat(a))
   .addSink(new ClickhouseSink(props)).name("ck_sink")

 也可以自己定义metric,然后提交上去就能看到指定位置的数据信息

猜你喜欢

转载自www.cnblogs.com/lichar/p/11687085.html