本文主要介绍zookeeper集群模式的流程,单机版启动流程请参考另一篇博客:Zookeeper单机模式启动流程源码分析
1. org.apache.zookeeper.server.quorum.QuorumPeerMain
此类main方法会调用initializeAndRun方法,该方法中会判断是集群还是单机模式启动,如果是集群模式则会调用runFromConfig方法。
public class QuorumPeerMain {
private static final Logger LOG = LoggerFactory.getLogger(QuorumPeerMain.class);
private static final String USAGE = "Usage: QuorumPeerMain configfile";
protected QuorumPeer quorumPeer;
/**
* To start the replicated server specify the configuration file name on
* the command line.
* @param args path to the configfile
*/
public static void main(String[] args) {
//构建QuorumPeerMain实例
QuorumPeerMain main = new QuorumPeerMain();
try {
//主要逻辑就一个初始化方法
main.initializeAndRun(args);
//省略其他异常判断
} catch (Exception e) {
LOG.error("Unexpected exception, exiting abnormally", e);
System.exit(1);
}
LOG.info("Exiting normally");
System.exit(0);
}
protected void initializeAndRun(String[] args)
throws ConfigException, IOException, AdminServerException
{
//创建配置类实例
QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
//解析配置,见步骤2
config.parse(args[0]);
}
// 创建并启动快照、事务历史文件清理器DatadirCleanupManager
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
.getDataDir(), config.getDataLogDir(), config
.getSnapRetainCount(), config.getPurgeInterval());
purgeMgr.start();
//判断是否是集群模式:如果servers集合中的元素个数大于0,则是集群模式,否则是单机模式。
if (args.length == 1 && config.isDistributed()) {
runFromConfig(config);
} else {
LOG.warn("Either no config or no quorum defined in config, running "
+ " in standalone mode");
//如果是单机模式,则委托给ZookeeperServerMain进行处理。
ZooKeeperServerMain.main(args);
}
}
2. 装载并解析配置org.apache.zookeeper.server.quorum.QuorumPeerConfig
当中涉及的配置含义请参考Zookeeper入门及单机及集群环境搭建的配置部分
public void parse(String path) throws ConfigException {
LOG.info("Reading configuration from: " + path);
try {
File configFile = (new VerifyingFileFactory.Builder(LOG)
.warnForRelativePath()
.failForNonExistingPath()
.build()).create(path);
//将指定 的配置文件加载到 Properties
Properties cfg = new Properties();
FileInputStream in = new FileInputStream(configFile);
try {
cfg.load(in);
configFileStr = path;
} finally {
in.close();
}
//将Properties转换成QuorumPeerConfig
//通过可以获取Properties中的属性,简单处理或者判断赋值给QuorumPeerConfig中的属性
parseProperties(cfg);
} catch (IOException e) {
throw new ConfigException("Error processing " + path, e);
} catch (IllegalArgumentException e) {
throw new ConfigException("Error processing " + path, e);
}
//省略动态配置部分源码
}
3. org.apache.zookeeper.server.DatadirCleanupManager
在第一步的main方法加载配置之后,会根据配置的数据日志路径及清理周期创建DatadirCleanupManager实例,并启动定时清理任务
public class DatadirCleanupManager {
private static final Logger LOG = LoggerFactory.getLogger(DatadirCleanupManager.class);
/**
* 清洗任务状态
*/
public enum PurgeTaskStatus {
NOT_STARTED, STARTED, COMPLETED;
}
private PurgeTaskStatus purgeTaskStatus = PurgeTaskStatus.NOT_STARTED;
//快照(数据)路径
private final File snapDir;
//日志(数据)路径
private final File dataLogDir;
//这个参数和下面的参数搭配使用,这个参数指定了需要保留的文件数目。默认是保留3个,也是3.4以后才有的。
private final int snapRetainCount;
//3.4.0及之后版本,ZK提供了自动清理事务日志和快照文件的功能,这个参数指定了清理频率,单位是小时,需要配置一个1或更大的整数,默认是0,表示不开启自动清理功能。
private final int purgeInterval;
//TImer定时器,根据上面的配置定时执行清理
private Timer timer;
//省略业务逻辑
}
4. org.apache.zookeeper.server.quorum.QuorumPeerMain.runFromConfig(QuorumPeerConfig)
步骤一中提到的,在集群模式下的启动方法
public void runFromConfig(QuorumPeerConfig config)
throws IOException, AdminServerException
{
try {
//注册Log4JBean,可以设置zookeeper.jmx.log4j.disable=true,禁用
ManagedUtil.registerLog4jMBeans();
} catch (JMException e) {
LOG.warn("Unable to register log4j JMX control", e);
}
LOG.info("Starting quorum peer");
try {
/*
* ServerCnxnFactory是Zookeeper中的重要组件,负责处理客户端与服务器的连接.主要有两个实现,
* 一个是NIOServerCnxnFactory,使用Java原生NIO处理网络IO事件;
* 另一个是NettyServerCnxnFactory,使用Netty处理网络IO事件.作为处理客户端连接的组件,其会启动若干线程监听客户端连接端口(即默认的9876端口)
*/
ServerCnxnFactory cnxnFactory = null;
ServerCnxnFactory secureCnxnFactory = null;
if (config.getClientPortAddress() != null) {
//创建CnxnFactory,负责处理客户端与服务器的连接,默认是NIOServerCnxnFactory,可以通过系统配置zookeeper.serverCnxnFactory指定
cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(),
config.getMaxClientCnxns(),
false);
}
if (config.getSecureClientPortAddress() != null) {
//创建secureCnxnFactory,负责处理客户端与服务器的连接,默认是NIOServerCnxnFactory,可以通过系统配置zookeeper.serverCnxnFactory指定
secureCnxnFactory = ServerCnxnFactory.createFactory();
secureCnxnFactory.configure(config.getSecureClientPortAddress(),
config.getMaxClientCnxns(),
true);
}
//创建QuorumPeer,并根据配置为属性赋值。
//QuorumPeer是Zookeeper集群的核心类,Quorum是集群模式下特有的对象,是Zookeeper服务器实例的托管者,
//从集群层面来看QuorumPeer代表了Zookeeper集群中一台服务器,在运行期间它会不断检查当前服务器实例运行的状态。然后根据情况进行Leader选举。
quorumPeer = getQuorumPeer();
//创建Zookeeper数据管理器FileTxnSnapLog
quorumPeer.setTxnFactory(new FileTxnSnapLog(
config.getDataLogDir(),
config.getDataDir()));
quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
quorumPeer.enableLocalSessionsUpgrading(
config.isLocalSessionsUpgradingEnabled());
//quorumPeer.setQuorumPeers(config.getAllMembers());
quorumPeer.setElectionType(config.getElectionAlg());
quorumPeer.setMyid(config.getServerId());
quorumPeer.setTickTime(config.getTickTime());
quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
quorumPeer.setInitLimit(config.getInitLimit());
quorumPeer.setSyncLimit(config.getSyncLimit());
quorumPeer.setConfigFileName(config.getConfigFilename());
//创建内存数据库ZKDatabase
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
if (config.getLastSeenQuorumVerifier()!=null) {
quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
}
//初始化内存数据库ZKDatabase
quorumPeer.initConfigInZKDatabase();
quorumPeer.setCnxnFactory(cnxnFactory);
quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
quorumPeer.setSslQuorum(config.isSslQuorum());
quorumPeer.setUsePortUnification(config.shouldUsePortUnification());
quorumPeer.setLearnerType(config.getPeerType());
quorumPeer.setSyncEnabled(config.getSyncEnabled());
quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
if (config.sslQuorumReloadCertFiles) {
quorumPeer.getX509Util().enableCertFileReloading();
}
// sets quorum sasl authentication configurations
quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
if(quorumPeer.isQuorumSaslAuthEnabled()){
quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);
quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);
quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);
quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);
quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
}
quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
//初始化QuorumPeer
quorumPeer.initialize();
//启动QuorumPeer
quorumPeer.start();
quorumPeer.join();
} catch (InterruptedException e) {
// warn, but generally this is ok
LOG.warn("Quorum Peer interrupted", e);
}
}
5.org.apache.zookeeper.server.quorum.QuorumPeer.start()
QuorumPeer的启动方法,启动之后会执行线程的join方法
启动方法主要执行以下操作:加载内存数据库、启动ServerCnxnFactory、启动leader选举、Zookeeper集群主线程进入WAITING状态,开始对外提供服务。
public synchronized void start() {
if (!getView().containsKey(myid)) {
throw new RuntimeException("My id " + myid + " not in the peer list");
}
//zkServer中有一个内存数据库对象ZKDatabase, zkServer在启动时需要将已被持久化的数据加载进内存中,也就是加载至ZKDatabase。
loadDataBase();
// 这一步会开启一个线程来接收客户端请求,但是需要注意,这一步执行完后虽然成功开启了一个线程,并且也可以接收客户端线程,
//但是因为现在zkServer还没有经过初始化,实际上把请求拒绝掉,直到zkServer初始化完成才能正常的接收请求。
startServerCnxnFactory();
try {
adminServer.start();
} catch (AdminServerException e) {
LOG.warn("Problem starting AdminServer", e);
System.out.println(e);
}
//这个方法并没有真正的开始领导选举,而是进行一些初始化。
startLeaderElection();
// 启动线程,执性run()方法的逻辑包括进行领导者选举、zkServer初始化。
super.start();
}
关于领导者选举及当前线程启动之后的run方法逻辑请参考另一篇博客:Zookeeper集群模式下领导者选举原理源码分析