activeMQ(三)broker源码浅析

架构图

activeMQ的架构如图所示,抛开右边的部分,我们看到自上而下有三部分组成:

1、connector:连接器

2、region:主要分为topic和queue,分别处理相关内容

3、message store:存储

DEMO

我们从broker的IDERunner这个测试类开始,看看broker源码,代码如下

 1 // 配置管理broker、connector
 2 BrokerService brokerService = new BrokerService();
 3 // 配置----------------------------------------------------------
 4 // 设定broker绑定的address地址
 5 String bindAddress = "tcp://0.0.0.0:61616?trace=" + TRANSPORT_TRACE + "&transport.wireFormat.maxFrameSize=104857600";
 6 // 添加连接器
 7 // 1、将根据bindAddress的schema构建connector连接器,构建TransportServer(如果是Http,那么使用Jetty的connector实现)
 8 brokerService.addConnector(bindAddress);
 9 // broker是否持久化消息
10 // 1、true表示开启持久化
11 brokerService.setPersistent(true);
12 // 是否使用jmx
13 brokerService.setUseJmx(false);
14 // 允许警报降级
15 brokerService.setAdvisorySupport(false);
16 // 启动-----------------------------------------------------------
17 // 启动brokerService
18 brokerService.start();
19 // 等待结束-------------------------------------------------------
20 // 主线程await,直到brokerService状态是stop
21 brokerService.waitUntilStopped();

示例8行代码,加上注释共21行。

源码解析

addConnector

第2行,构建了BrokerService对象,第4行addConnector构建了连接器,该方法创建了TransportConnector并添加到BrokerService中。

1 public TransportConnector addConnector(URI bindAddress) throws Exception {
2   return addConnector(createTransportConnector(bindAddress));
3 }

进入createTransportConnector方法,构造TransportConnector之前通过共产构建了TransportServer

1 protected TransportConnector createTransportConnector(URI brokerURI) throws Exception {
2     TransportServer transport = TransportFactorySupport.bind(this, brokerURI);
3     return new TransportConnector(transport);
4 }

再进入bind方法,选择了TransportFactory并调用其doBind方法完成TransportServer的创建。比如HttpTransportServer由HttpTransportFactory来创建。

 1 public static TransportServer bind(BrokerService brokerService, URI location) throws IOException {
 2     TransportFactory tf = TransportFactory.findTransportFactory(location); // 根据schema找到factory
 3     if( brokerService!=null && tf instanceof BrokerServiceAware) {
 4         ((BrokerServiceAware)tf).setBrokerService(brokerService);
 5     }
 6     try {
 7         if( brokerService!=null ) {
 8             SslContext.setCurrentSslContext(brokerService.getSslContext());
 9         }
10         return tf.doBind(location); // 构建Transportserver
11     } finally {
12         SslContext.setCurrentSslContext(null);
13     }
14 }

总的来说,addConnector方法就是创建了一个TransportConnector其中包含了TransportServer用于处理通信(比如Http通信由HttpTransportServer实现)。

start

再看第18行,brokerService调用start方法,进入方法,该方法核心在于启动broker

startBroker(startAsync)
 1 private void startBroker(boolean async) throws Exception {
 2     if (async) {
 3         new Thread("Broker Starting Thread") {
 4             @Override
 5             public void run() {
 6                 // ...
 7                 doStartBroker();
 8                 // ...
 9             }
10         }.start();
11     } else {
12         doStartBroker();
13     }
14 }

跟进doStartBroker

1 // ...
2 broker = getBroker(); // 获取broker实例对象
3 // ...
4 broker.start(); // 启动broker, region, destination
5 // ...
6 startAllConnectors(); // 启动connectors连接器
7 // ...

第2行getBroker方法创建了Broker的实现类RegionBroker,其中包含了QueueRegion和TopicRegion等region实现。

第4行start方法启动broker,以及启动所有region

1 public void start() throws Exception {
2     started = true; // 修改broker的状态
3     // 启动region,region下的所有destination也会启动
4     queueRegion.start(); // 启动queue region
5     topicRegion.start(); // 启动topic region
6     tempQueueRegion.start(); // 启动temp queue region
7     tempTopicRegion.start(); // 启动temp topic region
8     // ...
9 }

接下来第6行startAllConnectors方法启动我们创建的所有TransportConnector

1 // ...
2 for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) {
3     TransportConnector connector = iter.next();
4     al.add(startTransportConnector(connector)); // 启动连接器
5 }
6 // ...

进入startTransportConnector方法,其调用了Connector的start

1 public TransportConnector startTransportConnector(TransportConnector connector) throws Exception {
2     // ...
3     connector.start();
4     return connector;
5 }

跟进TransportConnector的start方法

 1 public void start() throws Exception {
 2     broker = brokerService.getBroker();
 3     // ...
 4     // 添加transport监听器
 5     // 由jetty的连接器回调当前listener的onAccept方法
 6     getServer().setAcceptListener(new TransportAcceptListener() {
 7         @Override
 8         public void onAccept(final Transport transport) {
 9             // ...
10             try {
11                 brokerService.getTaskRunnerFactory().execute(new Runnable() {
12                     @Override
13                     public void run() {
14                         try {
15                             if (!brokerService.isStopping()) {
16                                 // 接收到客户端transport连接
17                                 Connection connection = createConnection(transport); // 创建transport的connection
18                                 connection.start(); // 启动该connection
19                             } else {
20                                 // ...
21                             }
22                         } catch (Exception e) {
23                             // ...
24                         }
25                     }
26                 });
27             } catch (Exception e) {
28                 // ...
29             }
30         }
31         // ...
32     });
33     getServer().setBrokerInfo(brokerInfo);
34     getServer().start(); // transport server
35     // ...
36 }

start方法的核心有两块

1、设置了一个Transport的监听器,也就是客户端如果发起一个transport连接,那么就会触发监听器。

2、启动TransportServer

 我们先看看TransportServer.doStart做了什么(以HttpTransportServer为例)

 1 protected void doStart() throws Exception {
 2     createServer(); // 创建jetty的server对象
 3     // ...
 4     ServletHolder holder = new ServletHolder();
 5     holder.setServlet(new HttpTunnelServlet());
 6     contextHandler.addServlet(holder, "/"); // 所有映射路径
 7     // ...
 8     server.start(); // 启动jetty的server
 9     // ...
10 }

主要是使用了Jetty的server来做HTTP服务,这里创建了一个HttpTunnelServlet来处理请求。当一个请求从HttpTunnelServlet进来以后,被包装成Transport,然后就会触发我们刚刚第一步所说的TransportServerListener监听器并创建Connection,进入createConnection方法

1 protected Connection createConnection(Transport transport) throws IOException {
2     //...
3     TransportConnection answer = new TransportConnection(this, transport, broker, disableAsyncDispatch ? null
4             : taskRunnerFactory, brokerService.getTaskRunnerFactory());
5     // ...
6     return answer;
7 }

跟进TransportConnection构造方法,该方法做了两件事

1、service方法处理command,比如发送Message将调用broker的send,broker将找到对应的region -> destination -> message store -> addMessage这样一个链路去处理

2、dispatchSync分发消息

 1 public TransportConnection(TransportConnector connector, final Transport transport, Broker broker,
 2                            TaskRunnerFactory taskRunnerFactory, TaskRunnerFactory stopTaskRunnerFactory) {
 3     // ...
 4     this.transport.setTransportListener(new DefaultTransportListener() {
 5         @Override
 6         public void onCommand(Object o) {
 7             // ...
 8             try {
 9                 // ...
10                 Command command = (Command) o;
11                 if (!brokerService.isStopping()) {
12                     Response response = service(command);
13                     if (response != null && !brokerService.isStopping()) {
14                         // 接收到消息
15                         dispatchSync(response);
16                     }
17                 } else {
18                     // ...
19                 }
20             } finally {
21                 // ...
22             }
23         }
24         // ...
25     });
26     // ...
27 }

UML

猜你喜欢

转载自www.cnblogs.com/lay2017/p/11129966.html