1.flink standalone
刚开始测试的时候可以用这种模式
# we assume to be in the root directory of the unzipped Flink distribution
# (1) Start Cluster
$ ./bin/start-cluster.sh
# (2) You can now access the Flink Web Interface on http://localhost:8081
# (3) Submit example job
$ ./bin/flink run ./examples/streaming/TopSpeedWindowing.jar
# (4) Stop the cluster again
$ ./bin/stop-cluster.sh
2.flink on yarn-per-job
flink run -t yarn-per-job \
-d -ynm FlinkRetention \
-Dyarn.application.name=FlinkRetention \
-c com.bigdata.etl.FlinkRetention /data/bigdata/flink-dw/target/flink-dw.jar \
--consumer.bootstrap.servers ${consumer_bootstrap_servers} \
--producer.bootstrap.servers ${producer_bootstrap_servers} \
--retentionGroupId ${retentionGroupId} \
--flinkSeconds ${flinkSeconds} \
--redis.ip ${redis_ip} \
--redis.port ${redis_port} \
--redis.password ${redis_password}
flink run -t yarn-per-job \ //指定运行模式
-d -ynm FlinkRetention \ //指定在jobmanager里面显示的名字
-Dyarn.application.name=FlinkRetention \ // 指定在yarn上的application的名字
-c com.bigdata.etl.FlinkRetention // 入口类
/data/bigdata/flink-dw/target/flink-dw.jar \ // 自己任务的jar包
--consumer.bootstrap.servers ${consumer_bootstrap_servers} \ 需要传入的参数
--producer.bootstrap.servers ${producer_bootstrap_servers} \
--retentionGroupId ${retentionGroupId} \
--flinkSeconds ${flinkSeconds} \
--redis.ip ${redis_ip} \
--redis.port ${redis_port} \
--redis.password ${redis_password}
在代码中获取参数
// val parameter = ParameterTool.fromSystemProperties()
val parameter = ParameterTool.fromArgs(args)
println(parameter.toMap.toString)
env.getConfig.setGlobalJobParameters(parameter)
env.enableCheckpointing(Integer.parseInt(parameter.get("flinkSeconds"))); // 1000毫秒检查一次
//在map里面获取参数
val sinkStream = dataStream.map(
new RichMapFunction[String, String] {
var redisIp = ""
var redisPort = ""
var redisPassword = ""
override def open(parameters: Configuration): Unit = {
val globalParams = getRuntimeContext.getExecutionConfig.getGlobalJobParameters
val parameterTool = globalParams.asInstanceOf[ParameterTool]
redisIp = parameterTool.get("redis.ip")
redisPort = parameterTool.get("redis.port", "")
redisPassword = parameterTool.get("redis.password", "")
}
override def map(value: String): String = {
retentionCount(value, redisIp, redisPort, redisPassword)
}
}
)
3.flink on yarn-application
yarn-application 和yarn-per-job,有一些差别,yarn-application 的client只负责发起部署任务的请求,其余的比如 获取作业所需的依赖项;通过执行环境分析并取得逻辑计划,即StreamGraph→JobGraph;将依赖项和JobGraph上传到集群中。都交给集群中的jobmanager来做。
另外,如果一个main()方法中有多个env.execute()/executeAsync()调用,在Application模式下,这些作业会被视为属于同一个应用,在同一个集群中执行(如果在Per-Job模式下,就会启动多个集群)。不过两种模式的jobmanager都是分布在集群当中.
flink run-application -t yarn-application \
-ynm FlinkRetention \
-Dyarn.application.name=FlinkRetention \
-Dyarn.provided.lib.dirs="hdfs:///tmp/flink-1.13.0/lib;hdfs:///tmp/flink-1.13.0/plugins" \
-c com.bigdata.etl.FlinkRetention /data/bigdata/flink-dw/target/flink-dw.jar \
--consumer.bootstrap.servers ${consumer_bootstrap_servers} \
--producer.bootstrap.servers ${producer_bootstrap_servers} \
--retentionGroupId ${retentionGroupId} \
--flinkSeconds ${flinkSeconds} \
--redis.ip ${redis_ip} \
--redis.port ${redis_port} \
--redis.password ${redis_password}
flink run-application -t yarn-application \
-ynm FlinkRetention \
-Dyarn.application.name=FlinkRetention \
-Dyarn.provided.lib.dirs="hdfs:///tmp/flink-1.13.0/lib;hdfs:///tmp/flink-1.13.0/plugins" \ //将flink的依赖上传到hdfs上,有利于节省带宽传输。
-c com.bigdata.etl.FlinkRetention /data/bigdata/flink-dw/target/flink-dw.jar \
--consumer.bootstrap.servers ${consumer_bootstrap_servers} \
--producer.bootstrap.servers ${producer_bootstrap_servers} \
--retentionGroupId ${retentionGroupId} \
--flinkSeconds ${flinkSeconds} \
--redis.ip ${redis_ip} \
--redis.port ${redis_port} \
--redis.password ${redis_password}
异常:
Caused by: org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: The YARN application unexpectedly switched to state FAILED during deployment.
Diagnostics from YARN: Application application_1617678262990_0114 failed 2 times in previous 10000 milliseconds due to AM Container for appattempt_1617678262990_0114_000002 exited with exitCode: -1000
Failing this attempt.Diagnostics: java.io.IOException: Resource hdfs:///user/hadoop/flink-1.13.0/lib/jackson-core-2.7.3.jar is not publicly accessable and as such cannot be part of the public cache.
For more detailed output, check the application tracking page: /applicationhistory/app/application_1617678262990_0114 Then click on links to logs of each attempt.
. Failing the application.
解决:
把依赖的 文件夹flink-1.13.0/放到 /tmp下就好了