1. canal原理概述
*** canal将自己伪装成mysql的从节点,当mysql的binlog日志发生改变,作为从节点的canal服务端会发生dump行为,
拿到日志变化的数据,然后通过canal客户端进行交互获取到变化数据,进行实时处理
2. 实现条件
*** mysql5以上、mysql作为主节点开启binlog日志功能
2.1 修改mysql配置文件
vi /etc/my.cnf
[client]
user=root
password=
port = 3306
socket = /tmp/mysql.sock
[mysqld]
init-connect='SET NAMES utf8'
basedir=/usr/local/mysql
datadir=/usr/local/mysql/data
socket=/tmp/mysql.sock
max_connections=50
character-set-server=utf8
default-storage-engine=INNODB
# 添加此处三个配置
server_id=1 // 作为主节点的节点ID,之后canal作为从节点时,ID不同跟这个ID相同
log-bin=mysql-bin // 开始binlog日志功能
binlog-format=ROW // 日志的格式为ROW格式
2.2 重启mysql
service mysql restart
2.3 查看是否开启binlog
show variables like 'log_bin';
3. 安装部署canal服务端(用以监控mysql的binlog)
3.1 canal服务端下载
链接: https://pan.baidu.com/s/1_XqtkNBQ7od89CnwWcOLoA 密码: 6cij
3.2 解压压缩包(指定解压路径,因为canal解压直接解压到当前目录)
tar xzvf canal.deployer-1.1.4.tar.gz -C /opt/bigdata/canal
3.3 解读目录文件
3.3.1 canal.properties--服务端配置文件
我们需要关注修改如下几个配置:
canal.port = 11111 //canal服务的连接端口,可自行配置
canal.destinations = example //我们订阅的是哪个实例,可填写多个,每一个实例都会在根目录下有一个以实例命名的文件夹
canal.conf.dir = ../conf //实例配置文件的根目录
canal.instance.global.spring.xml = classpath:spring/file-instance.xml // 如果是单实例,默认读取该Spring配置
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml // 如果是多实例,默认读取该Spring配置
3.3.2 conf/example/instance.properties--具体实例的配置文件
针对mysql我们需要关注如下几个配置:
canal.instance.mysql.slaveId=1234 //此处即为该实例作为mysql从节点的节点ID,不能与上面mysql的ID相同,1.0.26版本以后其实已经不用配置,会自动生成一个不一样的ID
canal.instance.tsdb.enable=false //这个配置大家都说改为false,具体原因我也没搞懂
canal.instance.master.address=ip:3306 //mysql的地址
canal.instance.dbUsername=user //mysql用户名
canal.instance.dbPassword=pwd //mysql密码
canal.instance.filter.regex=.*\\..* //监控的库名、表名 前面是库名 后面是表名 此处默认为全库全表。一般我们会写成 db.table监控某一个库的某一张表 做具体逻辑处理
3.4 启动canal服务端
./bin/startup.sh
3.5 查看日志logs文件夹
4. 开发客户端代码(代码有详细注释,无需详细解读)
package com.qu.canal;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import java.net.InetSocketAddress;
import java.util.List;
/**
* canal客户端程序: canal是cs架构,server端不需要编写代码 直接部署在Linux即可
* 客户端程序需要开发 连接服务端 与canal服务端进行交互
*/
public class CanalClient {
public static void main(String[] args) {
/**
* 实现步骤:
* 1 创建连接
* 2 建立连接
* 3 订阅主题
* 4 获取数据
* 5 递交确认
* 6 关闭连接
*/
// 1. 创建连接
CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("IP", 11111),
"example", "", "");
// 定义一个标记 不停的拉取数据
boolean isRunning = true;
try{
// 2. 建立连接
canalConnector.connect();
// 宕机后重启 设置回滚上一次的请求,重新获取数据
canalConnector.rollback();
// 3. 订阅主题
canalConnector.subscribe("test.person");
// 4. 不停的拉取数据
while (isRunning){
// 获取数据
Message message = canalConnector.getWithoutAck(1000);
// 获取batch的ID
long batchId= message.getId();
// 获取binlog日志的数据总数
List<CanalEntry.Entry> entries = message.getEntries();
if(entries.size() > 0){
// 有数据再处理
dealDatas(entries);
}
// 5 递交确认
canalConnector.ack(batchId);
}
}catch (Exception e){
e.printStackTrace();
}finally {
canalConnector.disconnect();
}
}
public static void dealDatas(List<CanalEntry.Entry> entries){
for (CanalEntry.Entry entry: entries) {
// entry 就是mysql的一条记录
// 首先判断数据类型 当是binlog日志数据的开始和结尾 便不做任何处理
// binlog日志数据的格式为: 事务头BEGIN|数据ROWDATA|事务尾END
CanalEntry.EntryType entryType = entry.getEntryType();
if(entryType == CanalEntry.EntryType.TRANSACTIONBEGIN || entryType == CanalEntry.EntryType.TRANSACTIONEND){
continue;
}
// 获取数据的ROWDATA
// 获取binlog文件名
CanalEntry.Header header = entry.getHeader();
String logfileName = header.getLogfileName();
// 获取log file的偏移量
long offset = header.getLogfileOffset();
// 获取sql语句执行的时间戳
long executeTime = header.getExecuteTime();
// 获取数据库名
String database = header.getSchemaName();
// 获取表名
String tableName = header.getTableName();
// 获取时间的类型 insert/update/delete
CanalEntry.EventType eventType = header.getEventType();
System.out.println("事件类型:" + eventType.toString().toLowerCase());
CanalEntry.RowChange rowChange = null;
try{
// 获取存储数据,并将二进制字节数据解析转换为RowChange对象
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
}catch (InvalidProtocolBufferException e){
// canal中传递的数据是Protocol格式的数据
// 解析时格式不对 便会抛出异常
e.printStackTrace();
}
CanalEntry.EventType eventType1 = rowChange.getEventType();
System.out.println("事件类型1:" + eventType1.toString().toLowerCase());
// 当事件类型是 DDl语句
if(eventType1 == CanalEntry.EventType.QUERY || rowChange.getIsDdl()){
System.out.println("查询语句 ===> " + rowChange.getSql());
}
// 当事件类型是 DML语句
for (CanalEntry.RowData rowData: rowChange.getRowDatasList()) {
// 如果是删除事件 输出删除前的数据
if(eventType1 == CanalEntry.EventType.DELETE){
System.out.println("delete了 ===> " + rowData.getBeforeColumnsList());
}
// 如果是插入操作 输出插入的数据
else if(eventType1 == CanalEntry.EventType.INSERT){
System.out.println("insert了 ===> " + rowData.getAfterColumnsList());
System.out.println("insert了 ===> " + rowData.getAfterColumnsList());
}
// 如果是更新操作 输出更新前后的数据
else if(eventType1 == CanalEntry.EventType.UPDATE){
System.out.println("update前 ===> " + rowData.getBeforeColumnsList());
System.out.println("update后 ===> " + rowData.getAfterColumnsList());
}
}
}
}
}
5. 效果展示