持久化机制
什么是持久化机制?简单的说就是activemq服务器宕机了,那些已发送的消息数据不会丢;就好比redis持久化rdb、aof一个概念。可以找到mq安装目录conf/activemq.xml
<!--
Configure message persistence for the broker. The default persistence
mechanism is the KahaDB store (identified by the kahaDB tag).
For more information, see:
http://activemq.apache.org/persistence.html 官网持久化说明
-->
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/> <!-- 位置在/data/kahadb -->
</persistenceAdapter>
持久化过程
1. mq启动以后,首先要检查存储位置(也就是kahadb);如果有未发送成功的消息,则需要把消息发送出去
2. mq收到生产者的消息后,首先将消息存储到本地数据文件(kahadb);再将消息发送给消费者;成功则将消息从存储中(kahadb)删除,失败则继续尝试发送
kahadb介绍
使用mysql作为持久化机制
拷贝mysql驱动包到activemq安装目录的lib下,修改activemq.xml
<!--配置持久化适配器 http://activemq.apache.org/persistence.html-->
<persistenceAdapter>
<!-- <kahaDB directory="${activemq.data}/kahadb"/> -->
<!--mysql-ds是bean的id,createTablesOnStartup:每次启动mq是否创建持久化相关的表,默认true,第一次启动true,之后需要改为false -->
<!--其实不改为false也没关系,经测试,为true时,会清空activemq_msgs表已被消费的数据,未消费的仍然在 -->
<jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="true" />
</persistenceAdapter>
<!--创建mysql数据源,官网示例使用的dbcp2,若要使用其他数据源需要拷贝jar包到lib下 -->
<!-- bean标签需要写在</broker>之后,<import /> 之前 -->
<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://192.168.1.3:3306/activemq?relaxAutoCommit=true"/> <!--需要先创建activemq库-->
<property name="username" value="admin"/>
<property name="password" value="cmbc.123"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
启动mq,查看数据库,多了三张表
发送一条队列消息,activemq_msgs表增加一条记录
接收队列消息后, activemq_msgs记录消失
消费者启动持久主题订阅,activemq_acks表增加一条持久订阅记录
生产者发送一条主题消息,activemq_msgs表增加一条主题消息,消费者接收后,主题消息记录是不会消失的,除非从控制台手动清除
再次打开 activemq_acks表,last_acked_id最后被签收的id变为了2,也就是activemq_msgs表的id
使用高速缓存日志Journal
消息生产者每次有消息需要持久化都会通过JDBC去调用数据库服务以持久化数据,但像队列中的消息,大部分在消费后又需要清除,进行及其短暂的持久化意义不大,但却依然使JDBC频繁的和数据库进行交互,效率低下。因此可以在ActiveMQ和数据库服务器之间加一层高速缓存,用来暂时缓存消息,若队列中的消息长时间没有被消费时才进行持久化,这样就会大大减少ActiveMQ和数据库服务交互的次数,提高效率。举例来说:生产者生产了1000条消息,这1000条消息会保存到缓存文件journal文件中,如果消费者的消费速度很快,在journal文件还没有同步到数据库之前,消费者已经消费了900条,那么这时就只需要将剩余的100条同步即可,如果消费者消费的很慢,journal文件可以批量的将消息同步到数据库,大大减少了ActiveMQ和数据库服务器的交互次数。
<!--注释掉persistenceAdapter -->
<!-- <persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
mysql-ds是bean的id,createTablesOnStartup:每次启动mq是否创建持久化相关的表,默认true,第一次启动true,之后需要改为false
<jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="true" />
</persistenceAdapter> -->
<!--增加persistenceFactory -->
<persistenceFactory>
<!--journalLogFiles:日志文件个数,默认5个-->
<!--journalLogFileSize:日志文件大小,单位字节,默认20Mb,可以使用20Mb、1024kb, 1g-->
<!--useJournal:是否启用Journal,默认true-->
<!--useQuickJournal:
Enables or disables the use of quick journal, which keeps messages in the journal and just stores a reference to the messages in JDBC.
Defaults to false so that messages actually reside long term in the JDBC database
翻译:启用或禁用quick journal,它将消息保存在日志中,并只在JDBC中存储对消息的引用。默认值为false,这样消息实际上长期驻留在JDBC数据库中
-->
<!--dataDirectory:日志存放目录-->
<!--参考文档 http://activemq.apache.org/schema/core/activemq-core-5.15.10-schema.html#journalPersistenceAdapterFactory-->
<journalPersistenceAdapterFactory
journalLogFiles="4"
journalLogFileSize="32768"
useJournal="true"
useQuickJournal="true"
dataSource="#mysql-ds"
dataDirectory="${activemq.data}"
/>
</persistenceFactory>