在Flink中会有很多配置,比如像算子的并行度配置、Kafka 数据源的配置(broker 地址、topic 名、group.id)、Checkpoint 是否开启、状态后端存储路径、数据库地址、用户名和密码等。这些配置肯定不能写死在代码中,需要在配在配置文件中。
先指定环境
可以先配置一个application.properties,里面配置哪个环境,比如
ykc.profile=dev
#ykc.profile=test
#ykc.profile=prod
分别表示读取的是
application-dev.properties、application-test.properties、application-prod.properties的配置文件。
从具体环境的配置文件中读取配置
读取配置文件的工具类
public class PropertiesConstants {
public static String PROPERTIES_FILE_NAME = "/application-dev-location.properties";
static {
// 读取配置文件 application.properties 中的 ykc.profile
String profile=PropConfigUtil.getProperties("ykc.profile");
if(profile.equals("dev-location")){
PROPERTIES_FILE_NAME="/application-dev.properties";
}
if(profile.equals("test")){
PROPERTIES_FILE_NAME="/application-test.properties";
}
if(profile.equals("prod")){
PROPERTIES_FILE_NAME="/application-prod.properties";
}
}
/**
* flink环境参数
*/
public static final String STREAM_PARALLELISM = "ykc.flink.stream.parallelism";
public static final String STREAM_SINK_PARALLELISM = "ykc.flink.stream.sink.parallelism";
public static final String STREAM_DEFAULT_PARALLELISM = "ykc.flink.stream.default.parallelism";
public static final String STREAM_CHECKPOINT_ENABLE = "ykc.flink.stream.checkpoint.enable";
public static final String STREAM_CHECKPOINT_DIR = "ykc.flink.stream.checkpoint.dir";
public static final String STREAM_CHECKPOINT_TYPE = "ykc.flink.stream.checkpoint.type";
public static final String STREAM_CHECKPOINT_INTERVAL = "ykc.flink.stream.checkpoint.interval";
/**
* kafka参数 消费者
*/
public static final String KAFKA_BROKERS = "ykc.kafka.brokers";
public static final String KAFKA_GROUP_ID = "ykc.kafka.groupId";
public static final String CONSUMER_KAFKA_TOPIC = "ykc.kafka.consumer.topic";
/**
* hbase参数
*/
public static final String HBASE_ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum";
public static final String HBASE_CLIENT_RETRIES_NUMBER = "hbase.zookeeper.property.clientPort";
public static final String HBASE_NAMESPACE = "hbase.namespace";
/**
* hdfs 配置参数
*/
public static final String HDFS_PATH = "hdfs.path";
public static final String HDFS_PATH_DATE_FORMAT = "hdfs.path.date.format";
public static final String HDFS_PATH_DATE_ZONE = "hdfs.path.date.zone";
/**
* hive参数
*/
public static final String HIVE_JDBC_URL = "hive.jdbc.url";
public static final String HIVE_DATABASE = "hive.database";
public static final String HIVE_LOCATION = "hive.hdfs.location";
/**
* impala参数
*/
public static final String IMPALA_JDBC_URL = "impala.jdbc.url";
/**
* mysql参数
*/
public static final String MYSQL_DRIVER = "ykc.mysql.driver";
public static final String MYSQL_URL = "ykc.mysql.url";
public static final String MYSQL_USERNAME = "ykc.mysql.username";
public static final String MYSQL_PASSWORD = "ykc.mysql.password";
/**
* redis的参数
*/
public static final String REDIS_HOST = "ykc.redis.host";
public static final String REDIS_PORT = "ykc.redis.port";
public static final String REDIS_PASSWORD = "ykc.redis.password";
public static final String REDIS_TIMEOUT = "ykc.redis.timeout";
public static final String REDIS_DATABASE = "ykc.redis.database";
public static final String REDIS_MAXIDLE = "ykc.redis.maxidle";
public static final String REDIS_MINIDLE = "ykc.redis.minidle";
public static final String REDIS_MAXTOTAL = "ykc.redis.maxtotal";
@Slf4j
public class ExecutionEnvUtil {
/**
* ParameterTool全局参数
* mergeWith()的会覆盖前面的
*
* @param args
* @return
*/
public static ParameterTool createParameterTool(final String[] args) {
try {
return ParameterTool
.fromPropertiesFile(ExecutionEnvUtil.class.getResourceAsStream(PropertiesConstants.PROPERTIES_FILE_NAME))
.mergeWith(ParameterTool.fromArgs(args))
.mergeWith(ParameterTool.fromSystemProperties());
} catch (Exception e) {
log.error("获取ParameterTool全局参数异常");
}
return ParameterTool.fromSystemProperties();
}
/**
* ParameterTool全局参数
*
* @return
*/
public static ParameterTool createParameterTool() {
try {
return ParameterTool
.fromPropertiesFile(ExecutionEnvUtil.class.getResourceAsStream(PropertiesConstants.PROPERTIES_FILE_NAME))
.mergeWith(ParameterTool.fromSystemProperties());
} catch (IOException e) {
log.error("获取ParameterTool全局参数异常");
}
return ParameterTool.fromSystemProperties();
}
}
由于ParameterTool 是可序列化的,所以你可以将它当作参数进行传递给自定义的函数,然后在函数内部使用 ParameterTool 来获取命令行参数,这样就意味着你在作业任何地方都可以获取到参数。
先将ParameterTool 注册为全作业参数的参数:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(ExecutionEnvUtil.createParameterTool(args));
再从ParameterTool 中回去参数
parameterTool.getInt(PropertiesConstants.STREAM_SINK_PARALLELISM);
......
// 也可以从上下文中获取ParameterTool ,比如在用户自定义的Rich 函数中
ParameterTool parameterTool = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();