在之前文章《如何用Lua脚本基于OpenResty、Redis实现数据的缓存?》中介绍了一种简洁实现数据缓存的一种方式。但是对于数据缓存还有一个重要的事就是可能存在数据的不一致问题。就拿上篇文章中的广告来说,临时更换广告内容或者下架广告的场景下,如果不立刻更新缓存中的数据而是任由其等到失效时间过后重新获取,这就有可能造成很严重的后果。所以要在极短的时间内实现缓存与数据库中的数据相一致。这里又介绍一种有局限性的解决方案——基于Canal。所谓的局限性就是因为Canal本身只是针对于Mysql,所以也只能实现Mysql与缓存数据同步,其他的数据库就无能为力。
早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求。所以Canal也就随之诞生,主要是通过尝试数据库日志解析获取增量变更进行同步。因此Canal的工作原理其实也就是遵从了Mysql 的 Master Slave的交互协议,将自己伪装成一个Mysql的Slave,不停的接收来自Mysql Master的binary log,然后进行解析成相应的binary log对象。然后再将这些数据发送到不同的存储目的地,比如Mysql、Kafka、ElasticSearch等。
在使用Canal之前需要做一些准备工作,因为Canal是基于Mysql的Master Slave的交互协议以及解析二进制日志文件。所以需要将Mysql的配置文件my.cnf文件中的log-bin开启以及设置一个server_id【全局唯一】。
# 开启mysql的binlog模块及文件存储目录
log-bin=mysql-bin
# 选择ROW(行)模式
binlog-format=ROW
# server_id需保证唯一,不能和canal的slaveId重复
server_id=123456
另外为了保证数据的安全性,需要建立一个专门用于Canal的一个用户并授权。
Canal安装
为了解决安装过程中的繁琐配置问题,这里介绍一下使用Docker安装Canal。
# 拉取镜像
docker pull canal/canal-server:latest
# 运行
docker run -p 11111:11111 --name canal -id canal/canal-server
运行后需要进入容器中修改Canal的一些配置文件。
docker exec -it canal /bin/bash
vim /etc/mysql/mysql.conf/canal.properties
canal.properties是Canal的一些配置文件,这里必须修改canal.id,且不能和mysql的my.cnf中的server_id一样。
除了canal.properties外还有example文件夹下的instance.properties,这个文件是配置指定要同步的数据库。需要修改canal.instance.master.address参数,这个就是配置Mysql数据库的信息。
另外还有访问的用户和密码。
如果需要针对于某个特定的数据库或者表可以在这里单独配置。
这里的是配置过滤的规则:
.*:表示所有数据库
.\\..*:表示所有表。
.*\\..*:表示所有数据库中所有表的所有变化
以上Canal的基本配置基本完成,配置修改后需要重启Docker容器。
项目搭建
对于Canal的集成,这里使用了一个开源的项目Spring-boot-starter-canal【点击可下载】,它集成了SpringBoot和Canal,比原生更加优雅。但是没有在maven的中央仓库中,因此需要我们将该项目下载到本地手动install到本地仓库中。
①引入依赖
<dependency>
<groupId>com.xpand</groupId>
<artifactId>starter-canal</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
②添加properties
canal:
client:
instances:
example:
host: 192.168.132.132
port: 11111
③创建启动类
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
@EnableCanalClient
public class CanalApplication {
public static void main(String[] args) {
SpringApplication.run(CanalApplication.class);
}
}
④创建监听类
/**
* @author SunRains
* @date 2021/3/10 0010
*/
@CanalEventListener
public class CanalDataEventListener {
/**
* @param eventType 当前操作的类型 增加数据类型
* @param rowData 发生变更的一行数据
* @InsertListerPoint 增加监听 只有增加后的数据
* rowData.getAfterColumnsList:增加、修改
* rowData.getBeforeColumnList:删除、修改
*/
@InsertListenPoint
public void onEventInsert(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
System.out.println("列名:" + column.getName() + "-------变更的数据" + column.getValue());
}
}
/**
* 修改监听
*/
@UpdateListenPoint
public void onEventUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
System.out.println("列名:" + column.getName() + "-------变更的数据" + column.getValue());
}
}
/**
* 删除添加
*/
@DeleteListenPoint
public void onEventDelete(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
System.out.println("列名:" + column.getName() + "-------变更的数据" + column.getValue());
}
}
/***
* 自定义数据修改监听
* @param eventType
* @param rowData
*/
@ListenPoint(destination = "example", schema = "changgou_content", table = {"tb_content_category", "tb_content"}, eventType = CanalEntry.EventType.UPDATE)
public void onEventCustomUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
System.err.println("DeleteListenPoint");
rowData.getAfterColumnsList().forEach((c) -> System.out.println("By--Annotation: " + c.getName() + " :: " + c.getValue()));
}
}
最后运行的结果如图所示:
有了上面的实例,就可以轻松实现,数据库数据的变更立即更新缓存中的数据,从而保持数据的一致性。