binlog2sql代码解读

这是学习笔记的第 1895 篇文章


  今天看了下binlog2sql的代码,总体来说是代码质量是很高的。

通过阅读好的开源项目,也能够让自己多沉淀些学习经验。 

 binlog2sql的工程主要包含两个Python文件,一个是binlog2sql.py,这个文件是对外统一使用的脚本入口,另外一个是工具类binlog2sql_util.py



640?wx_fmt=png

util类的方法略多一些,核心的方法有2个。

640?wx_fmt=png

核心的方法是generate_sql_pattern,主要的闪回逻辑在这里。

640?wx_fmt=png

这部分逻辑是相对通用的,而不是只局限于使用binlog2sql

通读源代码后,发现有一个潜在的瓶颈点就是在处理binlog的时候,使用了开源项目python-my-replication来模拟从库

640?wx_fmt=png

这类方案和阿里的canal是类似的。

里面解析binlog的方法是基于BinLogStreamReader

我们来看一个简单的小例子,使用BinlogStreamReader来做下解析。

#!/usr/bin/env python

# -*- coding: utf-8 -*-


from pymysqlreplication import BinLogStreamReader

from pymysqlreplication.row_event import (

    DeleteRowsEvent,

    UpdateRowsEvent,

    WriteRowsEvent,

)

import sys

import json

def main():

    MYSQL_SETTINGS = {'host': 'xxxx',

                      'port': 3306, 'user': 'replicator', 'passwd': 'xxxx'}

    stream = BinLogStreamReader(

        connection_settings=MYSQL_SETTINGS,

        server_id=3,

        blocking=True,

        only_schemas=["test"],

        only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent])

    for binlogevent in stream:

        for row in binlogevent.rows:

            event = {"schema": binlogevent.schema, "table": binlogevent.table}

            if isinstance(binlogevent, DeleteRowsEvent):

                event["action"] = "delete"

                event["values"] = dict(row["values"].items())

                event = dict(event.items())

            elif isinstance(binlogevent, UpdateRowsEvent):

                event["action"] = "update"

                event["before_values"] = dict(row["before_values"].items())

                event["after_values"] = dict(row["after_values"].items())

                event = dict(event.items())

            elif isinstance(binlogevent, WriteRowsEvent):

                event["action"] = "insert"

                event["values"] = dict(row["values"].items())

                event = dict(event.items())

            print json.dumps(event)

            sys.stdout.flush()

    stream.close()

if __name__ == "__main__":

    main()


执行结果如下:

{"action": "insert", "table": "tb", "values": {"id": 30, "v": "aaa"}, "schema": "test"}

{"action": "insert", "table": "tb", "values": {"id": 31, "v": "aaa"}, "schema": "test"}

{"action": "insert", "table": "tb", "values": {"id": 32, "v": "aaa"}, "schema": "test"}

{"action": "insert", "table": "tb", "values": {"id": 33, "v": "aaa"}, "schema": "test"}

{"action": "insert", "table": "tb", "values": {"id": 34, "v": "aaa"}, "schema": "test"}

可以看到采用了集成的方式解析Binlog,格式是相对比较规范的,但是这种方式的一个明显问题就是性能,如果文件在50M以内,解析效果还能接受,如果binlog超过了200M,解析性能就很差了,如果是1G的规模,基本上就没有反应了。

所以明确了binlog2sql的代码逻辑后,我们需要做的一些改进就可以主要在BinLogStreamReader的方案思路上进行改进。




640?


猜你喜欢

转载自blog.csdn.net/weixin_36250635/article/details/87745560