这是我参与11月更文挑战的第5天,活动详情查看:2021最后一次更文挑战。
上一章节,我们讲了canal的配置和以及MySQL的配置,并实现了canal通过伪装MySQL的slave,模拟MySQL slave的交互协议向MySQL Mater发送 dump协议,MySQL mater收到canal发送过来的dump请求,开始推送binary log给canal,然后canal解析binary log,再发送到存储目的地,比如MySQL,Kafka,Elastic Search等等。
今天我们通过canal对接rabbitmq,把增量数据写到rabbitmq,然后进行处理,写入数据库mysql或者elastic。
配置canal:
配置文件../canal/canal.properties
需要提前获取mq的连接信息,并建立对应的exchange。
# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = rabbitMQ # 这里改成rabbitMQ模式
##################################################
######### RabbitMQ #############
##################################################
# 配置rabbitMQ连接信息
rabbitmq.host = 192.168.1.1
rabbitmq.virtual.host = /
rabbitmq.exchange = exchange.fanout.canal # exchange的名字,需要提前新建好
rabbitmq.username = admin
rabbitmq.password = admin
rabbitmq.deliveryMode = fanout # exchange的模式
复制代码
配置/../canal/instance.properties
配置binlog日志文件名称和点位positon,登录MySQL,输入show master status;可以看到master节点的信息,binlog的File名称和positon以及binlog_Ignore_DB信息。
添加对应配置
canal.instance.master.journal.name
就是File字段【mysql-bin.004911】binlog文件名
canal.instance.master.position
就是binlog的偏移的位置【471834950】
...
# 填写mysql信息
# position info
canal.instance.master.address=192.168.1.1:3306
canal.instance.master.journal.name=mysql-bin.004911
canal.instance.master.position=471834950
canal.instance.master.timestamp=
canal.instance.master.gtid=
...
# 填写mysql用户信息
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# 指定监听某个库、表
canal.instance.filter.regex=.*\..* # 【.*\..*】则是监听所有的库表。【test..*】代表监听test库下面的所有表。
复制代码
启动canal,windows下startup.bat 启动canal server脚本,观察rabbitmq日志。
- 观察日志
tail -f /tmp/canal/logs/example/example.log
- 观察MQ情况:新建一个queue对接exchange,看看能否接受到数据
rabbitmq配置exchange和queue,直接贴代码:
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 修改了messageConverter后需要重新设置手动确认
return factory;
}
复制代码
@RabbitListener( queues = {"testDirectQueue"})
public void receiver5(Message msg, Channel channel) throws IOException, InterruptedException {
//打印数据
String message = new String(msg.getBody(), StandardCharsets.UTF_8);
log.info("队列消费消息{}"+message);
//channel.basicReject(msg.getMessageProperties().getDeliveryTag(), false);
}
复制代码
观察监听队列的日志输出信息。
今天时间有点紧,就到这里了。