以下源码基于Rocket MQ 4.7.0
Broker
消息中转角色,负责存储消息、转发消息。Broker在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。
Broker作用
主要四个作用:
- 消息存储
- 消息投递
- 消息查询
- 服务的高可用
这四个功能由五个模块来实现。
Broker启动流程分析
总体的启动流程和NameServer的启动流程差不多但是比NameServer的启动流程复杂:
这个流程大致分为以下的几个步骤:
- BrokerController创建
- NettyServer和NettyClient的配置处理
- 命令行参数的处理
- Broker角色的处理
- 创建BrokerController
- 初始化BrokerController通过调用方法
- BrokerController启动
BrokerController创建
BrokerStartUp.createBrokerController
调用方法创建
上图是设置Netty的发送和接收缓冲区的大小。
上图主要是处理在集群条件小的Broker角色的,从这里看出来brokerId为0的为Mater节点,其他的为Slave节点。
角色类型:SYNC_MASTER/ASYNC_MASTER/SLAVE 默认为ASYNC_MASTER,
上图是处理命令行参数
初始化BrokerController。然后返回controller。
BrokerController启动
public static BrokerController start(BrokerController controller) {
try {
controller.start();
String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", "
+ controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
if (null != controller.getBrokerConfig().getNamesrvAddr()) {
tip += " and name server is " + controller.getBrokerConfig().getNamesrvAddr();
}
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
复制代码
启动比较简单,调用start方法。
Broker配置
Property Name | Default value | Details |
---|---|---|
listenPort | 10911 | listen port for client |
namesrvAddr | null | name server address |
brokerIP1 | InetAddress for network interface | Should be configured if having multiple addresses |
brokerName | null | broker name |
brokerClusterName | DefaultCluster | this broker belongs to which cluster |
brokerId | 0 | broker id, 0 means master, positive integers mean slave |
storePathCommitLog | $HOME/store/commitlog/ | file path for commit log |
storePathConsumerQueue | $HOME/store/consumequeue/ | file path for consume queue |
mapedFileSizeCommitLog | 1024 * 1024 * 1024(1G) | mapped file size for commit log |
deleteWhen | 04 | When to delete the commitlog which is out of the reserve time |
fileReserverdTime | 72 | The number of hours to keep a commitlog before deleting it |
brokerRole | ASYNC_MASTER | SYNC_MASTER/ASYNC_MASTER/SLAVE |
flushDiskType | ASYNC_FLUSH | {SYNC_FLUSH/ASYNC_FLUSH}. Broker of SYNC_FLUSH mode flushes each message onto disk before acknowledging producer. Broker of ASYNC_FLUSH mode, on the other hand, takes advantage of group-committing, achieving better performance. |