datax + crontab 同步 mysql库实践
1.环境介绍与准备
项目 | 版本 |
---|---|
mysql | 5.7 |
python | 3.6.8 |
我们将datax编译好的带有bin文件夹的datax文件放到/opt目录下:
bin conf job lib log plugin script tmp
bin文件夹下2_datax.py 2_dxprof.py 2_perftrace.py datax.py dxprof.py perftrace.py
job文件夹存放job.json文件
script文件夹放脚本文件
log文件放日志,当然我们也放在这
bin的三个py文件更换成适应python3版本的了:WeiYe-Jing/datax-web
的doc/datax源码阅读笔记/datax-python3
下
2.源码及详解与使用场景和拓展思考
-
初步版本的要求是这样的:
整个库表的迁移/同步:给定源库、目的库jdbc连接方式的参数,给定需要迁移的库名;用sql手段先保证两库的表结构一致,这个我不会。
注意路径:crondatax定时配置文件的、datax执行同步数据的日志【表名_191217.log】、此脚本的路径、datax.py的路径
同步时间参数配置:CRON_TIME = "1,2,3 15 * * * "
这里表示年月*天的15时1,2,3分
执行操作"python3 /opt/datax/bin/datax.py /opt/datax/job/%s.json >> /opt/datax/log/%s_%s.log 2>&1"
其中,%s.json
是表名.json
即datax的job.json文件;l%s_%s.log
是表名_191217.log
即此表今日同步行为记录的日志文件,今天的所有关于此表的记录都会在这里,增量记录。目前我是这么思考的,一天之内的方便查错,如果明天再次备份,就变成了表名_191218.log
了。其实我要是做成前端进行配置的话,会配置几种比较常见记录日志的方式【天\月,其他因素】以供选择。另外还有为什么FirstBlood配置job时候比较麻烦,它的思考是根据应用场景得到的,它假设同步的任务每天都一样,如果新增就增加一个,如果有上千个表,第一次配置的时候要填一千次【估计首次的话应该会由脚本直接在数据库先写好】。而对于没准备好脚本的用户,我们更需要的其实是能较为方便的去配置job,或者说批量配置好job的体验。
这里使用threadpool并发执行同步job,crontab的开关可以设置一下。
注意路径处理统一切换成了pathlib的;
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import csv
import json
import time
import threadpool
import pymysql.cursors
from pathlib import *
local_time = time.strftime("%y%m%d",time.localtime(time.time()))
CMD = "python3 /opt/datax/bin/datax.py /opt/datax/job/%s.json >> /opt/datax/log/%s_%s.log 2>&1"
FLAG_TIMING = True # 是否启用crontab
CRON_TIME = "58 10 * * * "
CMD_CRON = CRON_TIME + CMD
WD = Path(__file__).resolve().parent
crondatax = Path(WD).joinpath("crondatax")
JOB_JSON ={
"job": {
"content": [{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": ['*'],
"connection": [{
"jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/test?characterEncoding=utf8"],
"table": []
}],
"password": "asdf",
"username": "root"
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"column": ['*'],
"connection": [{
"jdbcUrl": "jdbc:mysql://192.168.255.134:3306/test?characterEncoding=utf8",
"table": []
}],
"password": "asdf",
"username": "root"
}
}
}],
"setting": {
"speed": {
"channel": "4", # 并发数限速【根据CPU合理控制并发数】
"byte": 524288, # 字节流限速【根据磁盘和网络合理控制字节数】
"record": 10000 # 记录流限速【根据数据合理空行数】
}
}
}
}
tables_from = JOB_JSON["job"]["content"][0]["reader"]["parameter"]["connection"][0]["table"] # 源表
tables_to = JOB_JSON["job"]["content"][0]["writer"]["parameter"]["connection"][0]["table"] # 目标表
def fetch_tables_jdbc(dbname):
connection = pymysql.connect(host='localhost',
user='root',
password='asdf',
db=dbname,
charset='utf8',
cursorclass=pymysql.cursors.DictCursor)
results =[]
try:
with connection.cursor() as cursor:
sql = '''SHOW TABLES'''
cursor.execute(sql)
result = cursor.fetchall()
for i in range(len(result)):
results.append(result[i]['Tables_in_%s' % dbname])
finally:
connection.close()
return results
def fetch_tables_csv(csvpath):
with open(csvpath, 'r', encoding = "utf-8") as f:
reader = csv.reader(f)
datas = []
for row in reader:
datas.append(row[0])
return datas
def rundatax(table):
if table:
each_job = table + ".json"
tables_from.append(table)
tables_to.append(table)
each_job_dir = Path(WD.parent).joinpath("job", each_job)
with each_job_dir.open('w') as f:
f.write(json.dumps(JOB_JSON))
tables_from.remove(table)
tables_to.remove(table)
if FLAG_TIMING == False:
print("1. run datax")
flag = os.system(CMD % (table, table, local_time)) # 立即执行同步命令
elif FLAG_TIMING == True:
fs = os.listdir(Path(WD.parent).joinpath("job"))
with crondatax.open("a") as f_cron:
if each_job in fs: # 若有job,则写crontab任务
CRONTAB = CMD_CRON % (table, table, local_time) + "\n"
f_cron.write(CRONTAB)
print("2. run crondatax")
flag = os.system("crontab " + str(crondatax)) # 运行定时任务
if flag == 0: # 命令执行成功
pass
else:
with Path(WD.parent).joinpath("log", "errortable.list").open() as f:
f.write(table + "\n")
def test(tables):
print(type(tables), tables)
# for table in tables:
if tables:
print(tables)
if __name__ == '__main__':
tables = fetch_tables_jdbc("test") # 根据库取表
# tables = fetch_tables_csv("tables.csv") # 根据csv文件取表名【一列,没对header判断处理】
# crontab任务配置文件清空
with crondatax.open("w") as f:
f.truncate() # 清空文件
pool = threadpool.ThreadPool(2)
requests = threadpool.makeRequests(rundatax, tables)
[pool.putRequest(req) for req in requests]
pool.wait()