搭建集群环境
消息的持久化和签收机制等都是为了保证消息的可靠性,但仅保证可靠性是不够的,有时我们还需要保证高可用性,这时就需要搭建ActiveMQ集群,以避免单点故障提升可用性。官方为我们介绍了三种集群搭建的方式
Master Slave Type | Requirements | Pros | Cons |
---|---|---|---|
Shared File System Master Slave | A shared file system such as a SAN | Run as many slaves as required. Automatic recovery of old masters | Requires shared file system |
JDBC Master Slave | A Shared database | Run as many slaves as required. Automatic recovery of old masters | Requires a shared database. Also relatively slow as it cannot use the high performance journal |
Replicated LevelDB Store | ZooKeeper Server | Run as many slaves as required. Automatic recovery of old masters. Very fast. | Requires a ZooKeeper server. |
5.6版本之后推出了LevelDB的持久化引擎,它使用自定义的索引代替常用的BTree索引,持久化性能高于KahaDB。虽然目前默认的持久化方式还是KahaDB,但是LevelDB会是趋势。5.9版本还提供了基于Zookeeper和LevelDB的数据主从复制方式(即Replicated LevelDB Store),是主从数据复制方案的首选。
集群工作原理
使用Zookeeper集群注册所有的ActiveMQ Broker,但只有其中的一个Broker可以提供服务,这个提供服务的Broker被视为Master,其他的Broker处于待机状态,被视为Slave。客户端只能连接Master,不能连接Slave。如果Master因故障不能提供服务,Zookeeper会从Slave中选举出一个Broker充当Master。Slave连接Master并同步它的存储状态,Slave不接受客户端连接。所有的存储操作都将被复制到连接至Master的Slaves。如果Master宕机则得到了最新更新的Slave会成为Master,故障节点在恢复后会重新加入到集群中并连接Master进入Slave模式。所有需要同步的消息操作都将等待存储状态被复制到其他法定节点的操作完成才算完成。所以,如果你配置了replicas=3(集群参数3台服务器),那么法定大小是(3/2)+1=2。Master将会存储并更新然后等待(2-1)=1个Slave存储和更新完成才汇报success。有一个node要作为观察者存在,当一个新的Master被选中,你需要至少保障一个法定node在线以能够找到拥有最新状态的node,这个node才可以成为新的Master。因此推荐运行至少3个replica nodes以防止一个node失败后导致服务中断。(对这段话的理解,最好先学习下zookeeper选举机制,至少需要3台机器)
集群配置
这里我们模拟一台服务器启动3个activemq实例的方式创建集群,先按下表规划规划集群配置,zookeeper集群搭建不再这里讲解
主机 | Zookeeper集群端口 | AMQ集群bind端口 | AMQ消息tcp端口 | 管理控制台端口 | AMQ节点安装目录 |
---|---|---|---|---|---|
192.168.1.3 | 2181、2182、2183 | tcp://0.0.0.0:63631 | tcp://0.0.0.0:61616 | 8161 | /opt/activemq-cluster/activemq-01 |
192.168.1.3 | 2181、2182、2183 | tcp://0.0.0.0:63632 | tcp://0.0.0.0:61617 | 8162 | /opt/activemq-cluster/activemq-02 |
192.168.1.3 | 2181、2182、2183 | tcp://0.0.0.0:63633 | tcp://0.0.0.0:61618 | 8163 | /opt/activemq-cluster/activemq-03 |
分别配置每台机器:
1. vim /opt/activemq-cluster/activemq-01/conf/jetty.xml
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8161"/> <!-- 管理台端口号 -->
</bean>
2. vim /opt/activemq-cluster/activemq-01/conf/activemq.xml
<!--三台机器的brokerName必须要一致 -->
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">
<!-- 持久化配置 -->
<persistenceAdapter>
<!-- LevelDB集群配置 -->
<!--
replicas:集群节点个数(也就是服务器数量)
bind:集群绑定端口
zkAddress:zookeeper集群地址
hostname:主机名
sync:控制更新在被认为完成之前驻留的位置,可用值local_mem, local_disk, remote_mem, remote_disk, quorum_mem, quorum_disk
zkPath: ZooKeeper目录的路径,在该目录中将交换主/从选举信息
更多参数参考:http://activemq.apache.org/replicated-leveldb-store
-->
<replicatedLevelDB
directory="${activemq.data}/leveldb"
replicas="3"
bind="tcp://0.0.0.0:63631"
zkAddress="192.168.1.3:2181,192.168.1.3:2182,192.168.1.3:2183"
hostname="192.168.1.3"
sync="local_disk"
zkPath="/activemq/leveldb-stores"/>
</persistenceAdapter>
<transportConnectors>
<!-- 修改tcp端口号 -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
集群启动与验证
1. 启动集群,记得要先启动zookeeper
/opt/activemq-cluster/activemq-01/bin/activemq start
/opt/activemq-cluster/activemq-02/bin/activemq start
/opt/activemq-cluster/activemq-03/bin/activemq start
2. 使用zookeeper客户端查看zookeeper节点中是否有activemq集群信息
/opt/zookeeper-cluster/zookeeper-03/bin/zkCli.sh #连接到本地zkServer
/opt/zookeeper-cluster/zookeeper-03/bin/zkCli.sh -server 192.168.1.3:2182 #连接到其他zkServer
#选择其中一种连接方式即可
3. 查看哪一台才是master,因为activemq集群,只有master才对外暴露服务,slave是用来同步数据用的
4. 访问master的控制台,http://192.168.1.3:8161,其他slave的控制台是无法访问的
Java连接集群
ActiveMQ的客户端只能访问Master的Broker,其他处于Slave的Broker不能访问,所以客户端连接的Broker应该使用failover协议在发生连接失败时进行转移。在我们上面配置的集群环境中,ZK和MQ各有3个服务,当一个ActiveMQ节点挂掉或者一个ZK节点挂掉,ActiveMQ服务依然能够正常运转,但如果仅剩下一个ActiveMQ节点,由于不能选举Master,则ActiveMQ不能正常运转。同样地,如果ZK仅剩一个节点活动,不管ActiveMQ各节点的存活情况,ActiveMQ都不能正常提供服务,也就是说此时ActiveMQ集群的高可用依赖于Zookeeper集群的高可用。在代码中使用failover协议(故障转移传输)连接服务器:生产者和消费者都是如此
public static void main(String[] args) throws JMSException {
//randomize=false,是否随机链接,建议false,且把master的地址放第一位
//因为其他从机属于待机状态,端口都不通,如果从机放前面,会报Connection refused,一直到maser才会连接成功
//参数配置参考http://activemq.apache.org/failover-transport-reference
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("failover:(tcp://192.168.1.3:61616,tcp://192.168.1.3:61617,tcp://192.168.1.3:61618)?randomize=false");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("cluster01");
MessageProducer messageProducer = session.createProducer(queue);
TextMessage textMessage = session.createTextMessage("这是一个消息" + System.currentTimeMillis());
messageProducer.send(textMessage);
messageProducer.close();
session.close();
connection.close();
System.out.println("消息发送完毕");
}