1. 简介
在Flink部署架构中,JobManager负责协调Flink任务调度和资源管理。
默认情况下,每个Flink集群都有一个JobManager实例,这会产生单点故障(SPOF single point of failure),如果JobManager进程崩溃,则无法提交新任务且运行中的任务也会失败。
启用JobManager HA后,就可以JobManager崩溃故障中恢复,消除单点故障。目前Flink支持独立集群和YARN集群两种HA配置方式。
YARN集群模式主要依赖YARN的HA机制,具体的行为还依赖YARN版本,本文将主要介绍独立集群的HA机制。
2. 独立集群HA
独立集群的JobManager的机制是,一个leader JobManager和多个standby JobManager,当leader JobManager崩溃后,多个standby JobManager选举后产生新的leader JobManager。
leader JobManager和standby JobManager之间没有区别,任何JobManager都可以承担leader或standby角色。
3. 源码分析
3.1 HighAvailabilityMode
public enum HighAvailabilityMode {
NONE(false),
ZOOKEEPER(true),
FACTORY_CLASS(true);
private final boolean haActive;
HighAvailabilityMode(boolean haActive) {
this.haActive = haActive;
}
}
Flink定义了三种高可用性模式枚举
- NONE:非HA模式
- ZOOKEEPER:基于ZK实现HA
- FACTORY_CLASS:自定义HA工厂类,该类需要实现HighAvailabilityServicesFactory接口
3.2 HighAvailabilityOptions
HighAvailabilityOptions定义了HA相关配置项
- HA_MODE:HA模式,可以设置为NONE、ZOOKEEPER,或设置为自定义HA工厂类名
- HA_CLUSTER_ID:Flink集群id,区分多个Flink集群
- HA_STORAGE_PATH:Flink储存HA元数据的文件系统路径
- HA_JOB_MANAGER_PORT_RANGE:JobManager监听端口范围,JobManager在此范围内随机选择一个端口号
- HA_ZOOKEEPER_QUORUM:ZK集群节点的地址和端口号
- HA_ZOOKEEPER_ROOT:Flink在ZK中持久化数据的根目录
3.3 HighAvailabilityServicesUtils
HighAvailabilityServicesUtils主要用于创建具体HighAvailabilityServices实例对象
3.3.1 createHighAvailabilityServices
创建HighAvailabilityServices实例
public static HighAvailabilityServices createHighAvailabilityServices(
Configuration configuration,
Executor executor,
AddressResolution addressResolution) throws Exception {
HighAvailabilityMode highAvailabilityMode = LeaderRetrievalUtils.getRecoveryMode(configuration);
switch (highAvailabilityMode) {
case NONE:
// 省略部分代码
// 返回非HA服务类实例
return new StandaloneHaServices(
resourceManagerRpcUrl,
dispatcherRpcUrl,
jobManagerRpcUrl,
String.format("%s%s:%s", protocol, address, port));
case ZOOKEEPER:
BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(configuration);
// 返回ZK HA服务类实例
return new ZooKeeperHaServices(
ZooKeeperUtils.startCuratorFramework(configuration),
executor,
configuration,
blobStoreService);
case FACTORY_CLASS:
// 返回自定义HA服务类实例
return createCustomHAServices(configuration, executor);
default:
throw new Exception("Recovery mode " + highAvailabilityMode + " is not supported.");
}
}
3.3.2 createCustomHAServices
该方法用于创建自定义HighAvailabilityServices实现类
private static HighAvailabilityServices createCustomHAServices(Configuration config, Executor executor) throws FlinkException {
// 获取当前线程的上下文类加载器
final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
// 获取工厂类类名
final String haServicesClassName = config.getString(HighAvailabilityOptions.HA_MODE);
final HighAvailabilityServicesFactory highAvailabilityServicesFactory;
try {
// 根据类名、类加载器实例化工厂类
highAvailabilityServicesFactory = InstantiationUtil.instantiate(
haServicesClassName,
HighAvailabilityServicesFactory.class,
classLoader);
} catch (Exception e) {
throw new FlinkException(
String.format(
"Could not instantiate the HighAvailabilityServicesFactory '%s'. Please make sure that this class is on your class path.",
haServicesClassName),
e);
}
try {
// 调用工厂类的createHAServices方法,返回HighAvailabilityServices实例
return highAvailabilityServicesFactory.createHAServices(config, executor);
} catch (Exception e) {
throw new FlinkException(
String.format(
"Could not create the ha services from the instantiated HighAvailabilityServicesFactory %s.",
haServicesClassName),
e);
}
}
3.4 HighAvailabilityServices
HighAvailabilityServices接口定义了HA服务类应当实现的方法,它继承了AutoCloseable接口。
3.5 HighAvailabilityServices的实现类
HighAvailabilityServices的实现类主要有StandaloneHaServices(非HA)、ZooKeeperHaServices、YarnHighAvailabilityServices。
3.6 LeaderElectionService
LeaderElectionService接口定义了leader选举和获取leader的方法
public interface LeaderElectionService {
// 启动leader选举服务
void start(LeaderContender contender) throws Exception;
// 停止leader选举服务
void stop() throws Exception;
// 获取新的leader session ID
void confirmLeaderSessionID(UUID leaderSessionID);
// 是否拥有leader
boolean hasLeadership(@Nonnull UUID leaderSessionId);
}
4. 总结
借助ZK的临时节点机制,Flink实现了Job Manager独立集群的高可用性。但是由于ZK是CP,并不保证每次可用性,实际使用中应当予以考虑。
5. 引用
[https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/jobmanager_high_availability.html]
[https://cwiki.apache.org/confluence/display/FLINK/JobManager+High+Availability]