配置binlog并使用Canal实现Mysql定制同步数据的功能

binlog概述

binlog是一个二进制文件,它保存在磁盘中,是用来记录数据库表结构变更、表数据修改的二进制日志。其实除了数据复制外,它还可以实现数据恢复、增量备份等功能。

首先需要确保mysql服务已经启用了binlog

show variables like 'log_bin';

如果为值为OFF,表示没有启用,那么需要首先启用binlog,修改mysql的配置文件:

log_bin= /var/log/mysql/mysql-bin.log #指定binlog路径
binlog-format=ROW
server-id=1
expire_logs_days    = 10
max_binlog_size     = 100M

对参数做一个简要说明:

  • 在配置文件中加入了log_bin配置项后,表示启用了binlog
  • binlog-format是binlog的日志格式,支持三种类型,分别是STATEMENT、ROW、MIXED,我们在这里使用ROW模式
  • server-id用于标识一个sql语句是从哪一个server写入的,这里一定要进行设置,否则我们在后面的代码中会无法正常监听到事件
  • 主要用来控制binlog日志文件保留时间,超过保留时间的binlog日志会被自动删除

注意:

  1. 该文件默认不允许修改,需要右键“管理员取得所有权”之后才能保存修改。
  2. binlog文件路若指定为绝对路径,则为指定路径:
    log_bin=C:\mysql-binlog\mysql-bin
    若不指定绝对路径则默认当前目录下Data文件夹下:log_bin=mysql-bin

在更改完配置文件后,重启mysql服务。再次查看是否启用binlog,返回为ON,表示已经开启成功。

这时候随意的对某一个数据库中的表做一下增删改,对应的日志就会记录在/var/log/mysql/这个文件夹下了。我们看一下这个文件夹里的东西:

在这里插入图片描述
这里的文件是没有办法正常查看的,需要使用mysql提供的命令来查看,命令是这个样子的:

1. 查看
mysqlbinlog mysql-bin.000002
2. 指定位置查看
mysqlbinlog --start-position="120" --stop-position="332" mysql-bin.000002

binlog相关命令

-- 查询binglog日志列表
show binary logs;

-- 查询第一个(最早)的binlog日志
show binlog events; 
 
-- 指定查询 mysql-bin.000077 日志
show binlog events in 'mysql-bin.000077';
 
-- 指定查询 mysql-bin.000077 日志,并且从pos=1024开始查
show binlog events in 'mysql-bin.000077' from 1024;
 
-- 指定查询 mysql-bin.000077 日志,并且从pos=1024开始查起,查询10条
show binlog events in 'mysql-bin.000077' from 1024 limit 10;
 
-- 指定查询 mysql-bin.000077 日志,并且从pos=1024开始查起,偏移2行,查询10条
show binlog events in 'mysql-bin.000077' from 1024 limit 2,10;

因为我们现在的binlog_format指定的格式是ROW(就在上面写的,还记得吗?),所谓binlog文件的内容没有办法正常查看,因为他是这个样子的:

在这里插入图片描述
这时,我们需要对输出进行解码

mysqlbinlog --base64-output=decode-rows -v mysql-bin.000001

这时候,显示的结果就变成了:

在这里插入图片描述
虽然还不是正常的sql,但是好赖是有一定的格式了。

but自己来做解析的话还是很麻烦,so~放弃这种操作。
在这个过程中,我发现阿里巴巴有一款开源的软件可以用。就是标题上说道的:canal。看了一下网站上的介绍,简直美滋滋。

canal概述

canal简介

canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。
早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

canal官网地址:https://github.com/alibaba/canal

基于日志增量订阅和消费的业务包括

数据库镜像
数据库实时备份
索引构建和实时维护(拆分异构索引、倒排索引等)
业务 cache 刷新
带业务逻辑的增量数据处理

canal要做的就是基于binlog实现增量而不是全量的数据同步

技术选型

基于binlog实现数据同步的方案有两种:

mysql-binlog-connector ali的canal
通过引入依赖jar包实现,需要自行实现解析,但是相对轻量。 是数据同步中间件,需要单独部署维护,功能强大,支持数据库及MQ的同步,维护成本高。

根据实际业务场景,按需索取,业务量小,业务简单,轻量可以通过mysql-binlog-connector,业务量大,逻辑复杂,有专门的运维团队,可以考虑canal,比较经过阿里高并发验证,相对稳定。

Canal监听mysql的binlog日志实现数据同步:https://blog.csdn.net/m0_37583655/article/details/119517336
Java监听mysql的binlog详解(mysql-binlog-connector):https://blog.csdn.net/m0_37583655/article/details/119148470

原理分析

MySQL主备复制原理

在这里插入图片描述

  1. MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  2. MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  3. MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

canal原理

  1. canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  2. MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  3. canal 解析 binary log 对象(原始为 byte 流)。
    在这里插入图片描述

canal安装配置

mysql环境准备

  1. mysql版本
    当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。
  2. mysql开启binlog

canal配置启动

打开配置文件conf/example/instance.properties,配置数据库连接等信息如下:

#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0

# enable gtid use true/false
canal.instance.gtidon=false

# position info
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
# 在MySQL服务器授权的账号密码字符集
canal.instance.dbUsername=root
canal.instance.dbPassword=123456
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex.*\\..*表示监听所有表 也可以写具体的表名,用,隔开
canal.instance.filter.regex=.*\\..*
# table black regex
# mysql 数据解析表的黑名单,多个表用,隔开
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#################################################

启动canal.deployer-1.1.5\bin路径下:startup.bat

开发

前期准备

  1. 项目启动的时候,开启canal的链接,以及初始化一些配置。
@Bean
public CanalConnector canalConnector() {
    
    
    CanalConnector connector = CanalConnectors.newSingleConnector(
            //对应canal服务的链接
            new InetSocketAddress(canalConf.getIp(), canalConf.getPort()),
            //链接的目标,这里对应canal服务中的配置,需要查阅文档
            canalConf.getDestination(),
            //不知道是什么用户,使用“”
            canalConf.getUser(),
            //不知道是什么密码,使用“”
            canalConf.getPassword()
    );
    return connector;
}

2)先开启一个线程,里面写一个死循环,用于从canal的服务中获取binlog中的消息。这个消息类是:com.alibaba.otter.canal.protocol.Message。

Message message = connector.getWithoutAck(100);
  • connector:canal链接的实例化对象。
  • connector.getWithoutAck(100):从连接中获取100条binlog中的数据。

3)取出Message中的事件集合,就是binlog中的每一条数据。将类型为增删改的数据取出,之后每一条数据放在一个线程中,用线程池去执行它。

List<Entry> entries = message.getEntries();
message.getEntries():从链接中获取的数据集合,每一条代表1条binlog数据

4)在每一个线程中,取出Entry中的数据,根据其类型拼接各种sql,并执行。

Header header = entry.getHeader();
//获取发生变化的表名称,可能会没有
String tableName = header.getTableName();

//获取发生变化的数据库名称,可能会没有
String schemaName = header.getSchemaName();

//获取事件类型
EventType eventType = rowChange.getEventType();
/**
这里我们只是用其中的三种类型:
    EventType.DELETE 删除
    EventType.INSERT 插入
    EventType.UPDATE 更新
*/
//获取发生变化的数据
RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());

//遍历其中的数据
int rowDatasCount = rowChange.getRowDatasCount();
for (int i = 0; i < rowDatasCount; i++) {
    
    
    //每一行中的数据
    RowData rowData = rowChange.getRowDatas(i);
}

//获取修改前的数据
List<Column> before = rowData.getBeforeColumnsList();

//获取修改后的数据
List<Column> after = rowData.getAfterColumnsList();

Column中有一系列方法,比如是否发生修改,时候为key,是否是null等,就不在细说了。扩展:阿里Canal框架(数据同步中间件)初步实践
https://mp.weixin.qq.com/s?__biz=MzU2MTI4MjI0MQ==&mid=2247486253&idx=1&sn=28112ccd3f5b20e93a3a98836f70948b&scene=45#wechat_redirect

开发

1)这里先写一个线程,用于不停的从canal服务中获取消息,然后创建新的线程并让其处理其中的数据。代码如下:

@Override
public void run() {
    
    
    while (true) {
    
    
        //主要用于在链接失败后用于再次尝试重新链接
        try {
    
    
            if (!run) {
    
    
                //打开链接,并设置 run=true
                startCanal();
            }
        } catch (Exception e) {
    
    
            System.err.println("连接失败,尝试重新链接。。。");
            threadSleep(3 * 1000);
        }
        System.err.println("链接成功。。。");
        //不停的从CanalConnector中获取消息
        try {
    
    
            while (run) {
    
    
                //获取一定数量的消息,这里为线程池数量×3
                Message message = connector.getWithoutAck(batchSize * 3);
                long id = message.getId();

                //处理获取到的消息
                process(message);
                connector.ack(id);
            }
        } catch (Exception e) {
    
    
            System.err.println(e.getMessage());
        } finally {
    
    
            //如果发生异常,最终关闭连接,并设置run=false
            stopCanal();
        }
    }

}
void process(Message message) {
    
    
    List<Entry> entries = message.getEntries();
    if (entries.size() <= 0) {
    
    
        return;
    }
    log.info("process message.entries.size:{}", entries.size());
    for (Entry entry : entries) {
    
    
        Header header = entry.getHeader();
        String tableName = header.getTableName();
        String schemaName = header.getSchemaName();

        //这里判断是否可以取出数据库名称和表名称,如果不行,跳过循环
        if (StringUtils.isAllBlank(tableName, schemaName)) {
    
    
            continue;
        }

        //创建新的线程,并执行
        jobList.stream()
                .filter(job -> job.isMatches(tableName, schemaName))
                .forEach(job -> executorService.execute(job.newTask(entry)));
    }
}

这里的jobList是我自己定义List,代码如下:

package com.hebaibai.miner.job;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jdbc.core.JdbcTemplate;

import static com.alibaba.otter.canal.protocol.CanalEntry.Entry;

@Slf4j
@Data
public abstract class Job {
    
    


    /**
     * 数据库链接
     */
    protected JdbcTemplate jdbcTemplate;

    /**
     * 额外配置
     */
    protected JSONObject prop;

    /**
     * 校验目标是否为合适的数据库和表
     *
     * @param table
     * @param database
     * @return
     */
    abstract public boolean isMatches(String table, String database);

    /**
     * 实例化一个Runnable
     *
     * @param entry
     * @return
     */
    abstract public Runnable newTask(final Entry entry);


    /**
     * 获取RowChange
     *
     * @param entry
     * @return
     */
    protected CanalEntry.RowChange getRowChange(Entry entry) {
    
    
        try {
    
    
            return CanalEntry.RowChange.parseFrom(entry.getStoreValue());
        } catch (InvalidProtocolBufferException e) {
    
    
            e.printStackTrace();
        }
        return null;
    }

}

jobList里面放的是Job的实现类。

写一个Job的实现类,并用于同步表,并转换字段名称。因为需求中要求两个同步的数据中可能字段名称不一致,所以我写了一个josn用来配置两个表的字段对应关系:

//省略其他配置
"prop": {
    
    
//来源数据库
  "database": "pay",
//来源表
  "table": "p_pay_msg",
//目标表(目标库在其他地方配置)
  "target": "member",
//字段对应关系
//key  :来源表的字段名
//value:目标表的字段名
  "mapping": {
    
    
    "id": "id",
    "mch_code": "mCode",
    "send_type": "mName",
    "order_id": "phone",
    "created_time": "create_time",
    "creator": "remark"
  }
}
//省略其他配置

下面是全部的代码,主要做的就是取出变动的数据,按照对应的字段名重新拼装sql,然后执行就好了。扩展:基于canal进行日志的订阅和转换
https://mp.weixin.qq.com/s?__biz=MzI4Njc5NjM1NQ==&mid=2247489997&idx=2&sn=c28567f0248601443c066ed63f4fcedb&chksm=ebd626e1dca1aff7ae57cfa1c5ebe7f121c5e85e463eae713621c12b6773d851ed94e4ec6bf2&scene=21#wechat_redirect

import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

import static com.alibaba.otter.canal.protocol.CanalEntry.*;

/**
 * 单表同步,表的字段名称可以不同,类型需要一致
 * 表中需要有id字段
 */
@SuppressWarnings("ALL")
@Slf4j
public class TableSyncJob extends Job {
    
    


    /**
     * 用于校验是否适用于当前的配置
     *
     * @param table
     * @param database
     * @return
     */
    @Override
    public boolean isMatches(String table, String database) {
    
    
        return prop.getString("database").equals(database) &&
                prop.getString("table").equals(table);
    }

    /**
     * 返回一个新的Runnable
     *
     * @param entry
     * @return
     */
    @Override
    public Runnable newTask(final Entry entry) {
    
    
        return () -> {
    
    
            RowChange rowChange = super.getRowChange(entry);
            if (rowChange == null) {
    
    
                return;
            }
            EventType eventType = rowChange.getEventType();
            int rowDatasCount = rowChange.getRowDatasCount();
            for (int i = 0; i < rowDatasCount; i++) {
    
    
                RowData rowData = rowChange.getRowDatas(i);
                if (eventType == EventType.DELETE) {
    
    
                    delete(rowData.getBeforeColumnsList());
                }
                if (eventType == EventType.INSERT) {
    
    
                    insert(rowData.getAfterColumnsList());
                }
                if (eventType == EventType.UPDATE) {
    
    
                    update(rowData.getBeforeColumnsList(), rowData.getAfterColumnsList());
                }
            }
        };
    }

    /**
     * 修改后的数据
     *
     * @param after
     */
    private void insert(List<Column> after) {
    
    
        //找到改动的数据
        List<Column> collect = after.stream().filter(column -> column.getUpdated() || column.getIsKey()).collect(Collectors.toList());
        //根据表映射关系拼装更新sql
        JSONObject mapping = prop.getJSONObject("mapping");
        String target = prop.getString("target");
        List<String> columnNames = new ArrayList<>();
        List<String> columnValues = new ArrayList<>();
        for (int i = 0; i < collect.size(); i++) {
    
    
            Column column = collect.get(i);
            if (!mapping.containsKey(column.getName())) {
    
    
                continue;
            }
            String name = mapping.getString(column.getName());
            columnNames.add(name);
            if (column.getIsNull()) {
    
    
                columnValues.add("null");
            } else {
    
    
                columnValues.add("'" + column.getValue() + "'");
            }
        }
        StringBuilder sql = new StringBuilder();
        sql.append("REPLACE INTO ").append(target).append("( ")
                .append(StringUtils.join(columnNames, ", "))
                .append(") VALUES ( ")
                .append(StringUtils.join(columnValues, ", "))
                .append(");");
        String sqlStr = sql.toString();
        log.debug(sqlStr);
        jdbcTemplate.execute(sqlStr);
    }

    /**
     * 更新数据
     *
     * @param before 原始数据
     * @param after  更新后的数据
     */
    private void update(List<Column> before, List<Column> after) {
    
    
        //找到改动的数据
        List<Column> updataCols = after.stream().filter(column -> column.getUpdated()).collect(Collectors.toList());
        //找到之前的数据中的keys
        List<Column> keyCols = before.stream().filter(column -> column.getIsKey()).collect(Collectors.toList());
        //没有key,执行更新替换
        if (keyCols.size() == 0) {
    
    
            return;
        }
        //根据表映射关系拼装更新sql
        JSONObject mapping = prop.getJSONObject("mapping");
        String target = prop.getString("target");
        //待更新数据
        List<String> updatas = new ArrayList<>();
        for (int i = 0; i < updataCols.size(); i++) {
    
    
            Column updataCol = updataCols.get(i);
            if (!mapping.containsKey(updataCol.getName())) {
    
    
                continue;
            }
            String name = mapping.getString(updataCol.getName());
            if (updataCol.getIsNull()) {
    
    
                updatas.add("`" + name + "` = null");
            } else {
    
    
                updatas.add("`" + name + "` = '" + updataCol.getValue() + "'");
            }
        }
        //如果没有要修改的数据,返回
        if (updatas.size() == 0) {
    
    
            return;
        }
        //keys
        List<String> keys = new ArrayList<>();
        for (Column keyCol : keyCols) {
    
    
            String name = mapping.getString(keyCol.getName());
            keys.add("`" + name + "` = '" + keyCol.getValue() + "'");
        }
        StringBuilder sql = new StringBuilder();
        sql.append("UPDATE ").append(target).append(" SET ");
        sql.append(StringUtils.join(updatas, ", "));
        sql.append(" WHERE ");
        sql.append(StringUtils.join(keys, "AND "));
        String sqlStr = sql.toString();
        log.debug(sqlStr);
        jdbcTemplate.execute(sqlStr);
    }

    /**
     * 删除数据
     *
     * @param before
     */
    private void delete(List<Column> before) {
    
    
        //找到改动的数据
        List<Column> keyCols = before.stream().filter(column -> column.getIsKey()).collect(Collectors.toList());
        if (keyCols.size() == 0) {
    
    
            return;
        }
        //根据表映射关系拼装更新sql
        JSONObject mapping = prop.getJSONObject("mapping");
        String target = prop.getString("target");
        StringBuilder sql = new StringBuilder();
        sql.append("DELETE FROM `").append(target).append("` WHERE ");
        List<String> where = new ArrayList<>();
        for (Column column : keyCols) {
    
    
            String name = mapping.getString(column.getName());
            where.add(name + " = '" + column.getValue() + "' ");
        }
        sql.append(StringUtils.join(where, "and "));
        String sqlStr = sql.toString();
        log.debug(sqlStr);
        jdbcTemplate.execute(sqlStr);
    }
}

引用

mysql-binlog-connector-java
https://github.com/shyiko/mysql-binlog-connector-java
mysql原理 ~ binlog系列之 table_id详谈https://www.cnblogs.com/danhuangpai/p/11484256.html

Canal监听mysql的binlog日志实现数据同步https://blog.csdn.net/m0_37583655/article/details/119517336

用 canal 监控 binlog 并实现mysql定制同步数据的功能https://blog.51cto.com/u_12302929/3294157

猜你喜欢

转载自blog.csdn.net/qq_43961619/article/details/127674569