因公司业务要求,需要增加Mysql数据库数量,所以需要把数据重新从老的数据库根据分库规则分布到不同Mysql数据库中。本来可以使用ETL工具的,but,公司VM太破不支持呀。嗯,正好最近学了点python,就拿它练练手吧。
功能:1、多对多数据迁移
2、断点续传(从哪里跌倒,就从哪里爬起来)
3、数据条数对比。就是对比源数据库和目标数据库之前数据条数是否一致。
4、数据表过滤。过滤掉不需要的数据表。
注意:这个工具读取数据时必须把id放在首位,因为脚本会对id进行重新编辑。当然,如果不需要可以把这段注掉就可以了。
主文件DataTanslate.py
import pymysql
import hashlib
import hascode
import nev_dataTransferSql
import datetime
import configparser
import math
import sys
#获取参数
conf = configparser.ConfigParser()
conf.read("./dataBaseConfig.ini")
#源数据库参数
sourceDBUrl = (conf.get('soureDB', 'sourceDBUrl')).split(',')
sourceDBUser = (conf.get('soureDB', 'sourceDBUser')).split(',')
sourceDBKey = (conf.get('soureDB', 'sourceDBKey')).split(',')
sourceDataBse = (conf.get('soureDB', 'sourceDataBse')).split(',')
#目标数据库参数
targetDBUrl = (conf.get('targetDB', 'targetDBUrl')).split(',')
targetDBUser = (conf.get('targetDB', 'targetDBUser')).split(',')
targetDBKey = (conf.get('targetDB', 'targetDBKey')).split(',')
targetDBs = (conf.get('targetDB', 'targetDBs')).split(',')
#当前执行数据库标识
sourcedatabseFlg =(conf.get('soureDB', 'sourcedatabseFlg')).split(',')
currentSourcedatabseFlg = conf.get('progress', 'currentSourcedatabseFlg')
dataBaseInterval = int(conf.get('config', 'dataBaseInterval'))
#不需要迁移的表
filterTables = (conf.get('filter', 'tables')).split(',')
dataBaseNo = 0
def connectDB(url,user,key,dataBase):
# 连接数据库
db = pymysql.connect(url, user,key, dataBase)
# 使用cursor()方法创建一个游标对象
cursor = db.cursor()
return db,cursor
def getDataFromTarget(sourceDBUrl,sourceDBUser, sourceDBKey,sql,dataBase):
#从源数据库获取数据
# 连接数据库
db, cursor = connectDB(sourceDBUrl,sourceDBUser, sourceDBKey,dataBase)
# 使用execute()方法执行SQL语句
cursor.execute(sql)
# 使用fetall()获取全部数据
data = cursor.fetchall()
# 关闭游标和数据库的连接
cursor.close()
db.close()
return data
def insertDataToTarget(index,sql,args):
#向数据库插入数据
# 连接数据库
db, cursor = connectDB(targetDBUrl[index], targetDBUser[index],targetDBKey[index], targetDBs[index])
# 使用execute()方法执行SQL语句
cursor.executemany(sql,args)
# 关闭游标和数据库的连接
cursor.close()
db.commit()
db.close()
#分库规则
def formatVin(args):
vin = args[2]
m = hashlib.md5()
m.update(vin.encode(encoding='UTF-8'))
num = hascode.getHashCode(m.hexdigest())
mvalue = (num & 2147483647)%len(targetDBs)
if(mvalue <= len(targetDBs) ):
return mvalue
else:
return 0
# print(vin)
#根据分库规则对数据分组
def shunt(args):
list_ = [[()]] * len(targetDBs)
for index in range(len(args)):
index2 = formatVin((args[index]))
if (len((list_[index2])[0]) == 0):
(list_[index2]) = [args[index]]
else:
(list_[index2]).append(args[index])
for index3 in range(len(list_)):
list_[index3] = tuple(list_[index3])
return list_
def tables(sourceDBUrl,sourceDBUser, sourceDBKey,sourceDataBse,pageIndex,size,getData_sql,insert_sql,currentPage,currentTable):
#开始执行数据迁移
#print(getData_sql)
getData_sql = getData_sql + str(pageIndex) + " ," + str(size)
#D获取需要插入的数据
args = getDataFromTarget(sourceDBUrl,sourceDBUser, sourceDBKey,getData_sql,sourceDataBse)
if(len(list(args)) == 0):
return
args = editResultID(args, pageIndex,currentPage,currentTable)
result = shunt(args)
for index in range(len(result)):
if(len(result)>0):
print("插入数据:")
# insertDataToTarget(index,insert_sql,result[index])
else:
break
return len(result)
#b保存当前数据插入信息,以备续传
def saveCurrentData(data,currentPage,currentTable):
print("当前源数据库",sourcedatabseFlg[dataBaseNo])
print("当前表",currentTable)
print("总页数",pageCount)
print("页码", currentPage)
print("开始ID",(data[0])[0])
print("结束ID",(data[len(data)-1])[0])
conf = configparser.ConfigParser()
conf.read("./dataBaseConfig.ini")
conf.set("progress", "table", currentTable)
conf.set("progress", "pageNo", str(currentPage))
conf.set("progress", "beginId", str((data[0])[0]))
conf.set("progress", "endId", str((data[len(data)-1])[0]))
conf.write(open("./dataBaseConfig.ini", "w"))
def editResultID(result,pageIndex,currentPage,currentTable):
count = pageIndex + 1 + dataBaseInterval * dataBaseNo
newResult = list(result)
for index in range(len(newResult)):
temp = list(newResult[index])
temp[0] = count + index
newResult[index] = tuple(temp)
saveCurrentData(newResult,currentPage,currentTable)
return tuple(newResult)
def getPageCount(sourceDBUrl,sourceDBUser, sourceDBKey,sourceDb,currentTable,pageSize):
sql = "select count(*) from " + currentTable
amount = (getDataFromTarget(sourceDBUrl,sourceDBUser, sourceDBKey,sql,sourceDb)[0])[0]
pageCount = math.ceil(amount/pageSize) + 1
print("总页数为:" +str(pageCount))
return pageCount
def saveCurrentProcess(currentTable,currentPage):
global currentSourcedatabseFlg
currentSourcedatabseFlg = sourcedatabseFlg[dataBaseNo]
conf = configparser.ConfigParser()
conf.read("./dataBaseConfig.ini")
conf.set("progress", "table", currentTable)
conf.set("progress", "pageNo", str(currentPage))
conf.set("progress", "beginId", str(1))
conf.set("progress", "endId", str((0)))
conf.set("progress", "currentsourcedatabseflg", str(sourcedatabseFlg[dataBaseNo]))
conf.write(open("./dataBaseConfig.ini", "w"))
def start(thread_num):
try:
#从配置文件读取信息
beginPageNo = int(conf.get("progress", "pageNo"))
beginTable = conf.get("progress", "table")
# 数据转移配置
pageSize = int(conf.get('config', 'pageSize'))
print("开始页码:",beginPageNo)
starttime = datetime.datetime.now()
#获取转移数据所需sql,以及所有表
sqlList,tablesList = nev_dataTransferSql.getDataTransferSql(sourceDBUrl[0], sourceDBUser[0], sourceDBKey[0],sourceDataBse[0], filterTables, [], " ORDER BY id desc ", " LIMIT ")
tableList = []
for table in tablesList:
if(table[0] not in filterTables):
tableList.append(table[0])
print("要迁移的表:%s"% tableList)
for dataBaseIndex in range(len(sourcedatabseFlg)):
if(currentSourcedatabseFlg != sourcedatabseFlg[dataBaseIndex]):
continue
# 已完成表数量
amount = 0
print("第%d个库"%dataBaseIndex)
for value in sqlList:
currentTable = str(tableList[amount])
print(currentTable,"开始")
#判断当前表是否续传
if(beginTable != currentTable):
beginPageNo = 1
global pageCount
#获取数据页数
pageCount = getPageCount(sourceDBUrl[dataBaseIndex],sourceDBUser[dataBaseIndex], sourceDBKey[dataBaseIndex],sourceDataBse[dataBaseIndex],currentTable,pageSize)
for i in range(beginPageNo,pageCount):
index = (i-1)*pageSize
if(tables(sourceDBUrl[dataBaseIndex],sourceDBUser[dataBaseIndex], sourceDBKey[dataBaseIndex],sourceDataBse[dataBaseIndex],index,pageSize,value[0] ,value[1],i,currentTable)==0):
break
print("表 %s 已完成 %s" % (str(tableList[amount]), str(((i / (pageCount - 1)) * 100)) + "%"))
print("%d-----整体已完成:%s" % (thread_num, str((amount / (len(tableList) - 1)) * 100) + "%"))
if(amount == len(tableList)-1):
global dataBaseNo
if(dataBaseIndex +1 < len(sourcedatabseFlg)):
dataBaseNo = dataBaseIndex + 1
saveCurrentProcess(tableList[0],0)
amount += 1
endtime = datetime.datetime.now()
print("总耗时:%s"% (endtime - starttime).seconds)
except OSError as err:
print("OS error: {0}".format(err))
except ValueError:
print("Could not convert data to an integer.")
except:
print("Unexpected error:", sys.exc_info()[0])
raise
start(1)
用于生成sql文
import pymysql
import configparser
#获取参数
conf = configparser.ConfigParser()
conf.read("./dataBaseConfig.ini")
def connectDB(url,user,key,table):
# 连接数据库
#db = pymysql.connect("localhost", "root", "admin", table)
db = pymysql.connect(url, user,key, table)
# 使用cursor()方法创建一个游标对象
cursor = db.cursor()
return db,cursor
def getDataFromTarget(sourceDBUrl, sourceDBUser, sourceDBKey,sql,table):
# 从目标数据库取出数据
db, cursor = connectDB(sourceDBUrl, sourceDBUser, sourceDBKey,table)
# 使用execute()方法执行SQL语句
cursor.execute(sql)
# 使用fetall()获取全部数据
data = cursor.fetchall()
# 关闭游标和数据库的连接
cursor.close()
db.close()
return data
def editeCloumn(cloumn,reType,reCloumn):
#编辑表字段
if(cloumn[1]== "datetime"):
return "date_format("+ cloumn[0] + ",'%Y-%m-%d %H:%i:%S') AS " + cloumn[0]
else:
return cloumn[0]
def editeSql(table,cloumns,_cloumns,orderBy,limit):
#生成SQL文 table:表名;cloumns:字段集合;_cloumns:完整的字段集合;orderBy:查询排序:limit:分页
sql_select = "select " + cloumns + " from " + table
if( len(orderBy)>0 and table!="user"):
sql_select +=orderBy
if(len(limit)>0):
sql_select += " limit "
sql_insert = "insert into " + table + "(" + _cloumns + ")" + "values( " + "%s,"*(len(_cloumns.split(",")) - 1) + "%s )"
return sql_select,sql_insert
#获取表结构,并输出参数字符串
def getTableStructure(sourceDBUrl, sourceDBUser, sourceDBKey,table,filterCloumn,reType,reCloumn):
#table:表名;filterCloumn:过滤字段:filterCloumn;reType:替换字段类型;reCloumn:替换字段;
structure = getDataFromTarget(sourceDBUrl, sourceDBUser, sourceDBKey,'desc ' + table[0], "toyota")
cloumns = ""
_cloumns = ""
for cloumn in structure:
if(cloumn[0] not in filterCloumn):
value = editeCloumn(cloumn,reType,reCloumn)
if(len(cloumns)>0):
cloumns += "," + value
_cloumns += "," + cloumn[0]
else:
cloumns = value
_cloumns = cloumn[0]
return cloumns,_cloumns
def getDataTransferSql(sourceDBUrl, sourceDBUser, sourceDBKey,dataBase,filterTable,filterCloumn,orderBy,limit):
tablse = getDataFromTarget(sourceDBUrl, sourceDBUser, sourceDBKey,'show tables',dataBase)
list = []
for table in tablse:
if(table[0] not in filterTable):
cloumns,_cloumns = getTableStructure(sourceDBUrl, sourceDBUser, sourceDBKey,table,filterCloumn,"","")
list.append(editeSql(table[0], cloumns,_cloumns,orderBy,limit))
return list,tablse
hascode相关
def convert_n_bytes(n, b):
bits = b * 8
return (n + 2 ** (bits - 1)) % 2 ** bits - 2 ** (bits - 1)
def convert_4_bytes(n):
return convert_n_bytes(n, 4)
def getHashCode(s):
h = 0
n = len(s)
for i, c in enumerate(s):
h = h + ord(c) * 31 ** (n - 1 - i)
return convert_4_bytes(h)
从目标数据库删除数据 deleteDataFromTables.py
import pymysql
import configparser
#获取数据库参数
conf = configparser.ConfigParser()
conf.read("./dataBaseConfig.ini")
#目标数据库
targetDBUrl = (conf.get('targetDB', 'targetDBUrl')).split(',')
targetDBUser = (conf.get('targetDB', 'targetDBUser')).split(',')
targetDBKey = (conf.get('targetDB', 'targetDBKey')).split(',')
targetDBList = (conf.get('targetDB', 'targetDBs')).split(',')
beginPageNo = int(conf.get("progress", "pageNo"))
beginid = int(conf.get("progress", "beginid"))
endid = int(conf.get("progress", "endid"))
targetTables = conf.get("progress", "table")
def connectDB(url,user,key,table):
# 连接数据库
db = pymysql.connect(url, user,key, table)
# 使用cursor()方法创建一个游标对象
cursor = db.cursor()
return db,cursor
def deletData(beginId,endId):
for DBIndex in range(len(targetDBUrl)):
db, cursor = connectDB(targetDBUrl[DBIndex], targetDBUser[DBIndex], targetDBKey[DBIndex], targetDBList[DBIndex])
print("<-------------------------正在删除" + targetDBUser[DBIndex] + "库----------------------------->")
print("正在删除" + targetTables + "表")
sql = "delete from " + targetTables + " where id>= " + str(beginId) + " and id<= " + str(endId) + " ;"
print(sql)
try:
cursor.execute(sql)
db.commit()
except:
print("操作异常" )
db.rollback()
cursor.close()
db.close()
deletData(beginid,endid)
表数据条数对比
import pymysql
import configparser
#获取参数
conf = configparser.ConfigParser()
conf.read("./dataBaseConfig.ini")
#目标数据库参数
sourceDBUrl = (conf.get('soureDB', 'sourceDBUrl')).split(',')
sourceDBUser = (conf.get('soureDB', 'sourceDBUser')).split(',')
sourceDBKey = (conf.get('soureDB', 'sourceDBKey')).split(',')
sourceDataBase = (conf.get('soureDB', 'sourceDataBse')).split(',')
#目标数据库参数
targetDBUrl = (conf.get('targetDB', 'targetDBUrl')).split(',')
targetDBUser = (conf.get('targetDB', 'targetDBUser')).split(',')
targetDBKey = (conf.get('targetDB', 'targetDBKey')).split(',')
targetDBs = (conf.get('targetDB', 'targetDBs')).split(',')
#需要过滤的数据表
filterTables = (conf.get('filter', 'tables')).split(',')
beginId = conf.get('compare', 'beginId')
enId = conf.get('compare', 'enId')
def connectDB(url,user,key,table):
# 连接数据库
db = pymysql.connect(url, user,key, table)
# 使用cursor()方法创建一个游标对象
cursor = db.cursor()
return db,cursor
def getDataFromTarget(sourceDBUrl,sourceDBUser,sourceDBKey,sql,dataBase):
# 从目标数据库取出数据
db, cursor = connectDB(sourceDBUrl, sourceDBUser, sourceDBKey,dataBase)
# 使用execute()方法执行SQL语句
cursor.execute(sql)
# 使用fetall()获取全部数据
data = cursor.fetchall()
# 关闭游标和数据库的连接
cursor.close()
db.close()
return data
def editeCloumn(cloumn,reType,reCloumn):
#编辑表字段
if(cloumn[1]== "datetime"):
return "date_format("+ cloumn[0] + ",'%Y-%m-%d %H:%i:%S') AS " + cloumn[0]
else:
return cloumn[0]
def editeSql(table):
#生成SQL文 table:表名;cloumns:字段集合;_cloumns:完整的字段集合;orderBy:查询排序:limit:分页
sql_selectTarget = "select count(*)" + " from " + table + " where 1=1"
if(beginId):
sql_selectTarget = sql_selectTarget + " and id >= " + str(beginId)
if(enId):
sql_selectTarget = sql_selectTarget + " and id <= " + str(enId)
sql_selectSource = "select count(*)" + " from " + table
return sql_selectTarget,sql_selectSource
def getTableRecodAmount(sourceDBUrl,sourceDBUser,sourceDBKey,sql,dataBase):
# 从目标数据库取出数据
db, cursor = connectDB(sourceDBUrl, sourceDBUser, sourceDBKey,dataBase)
# 使用execute()方法执行SQL语句
cursor.execute(sql)
# 使用fetall()获取全部数据
amount = cursor.fetchall()
# 关闭游标和数据库的连接
cursor.close()
db.close()
return amount
def getDataTransferSql(targetDBUrl,targetDBUser,targetDBKey,targetDBs):
tablse = getDataFromTarget(targetDBUrl,targetDBUser,targetDBKey,'show tables',targetDBs)
list = []
list2 = []
newTables=[]
for table in tablse:
if(table[0] not in filterTables):
sql_selectTarget, sql_selectSource = editeSql(table[0])
list.append(sql_selectTarget)
list2.append(sql_selectSource)
newTables.append(table)
return list,list2,newTables
def start():
index = 0
sqlList, sqlList2,tables = getDataTransferSql(targetDBUrl[index],targetDBUser[index],targetDBKey[index],targetDBs[index])
for index2 in range(len(sqlList)):
amount = 0
amount2 = 0
for index3 in range(len(targetDBs)):
amount = amount + ((getTableRecodAmount(targetDBUrl[index3],targetDBUser[index3],targetDBKey[index3],sqlList[index2],targetDBs[index3]) )[0])[0]
print("目标库 " + "表:" + (tables[index2])[0] + "数据总数为:" + str(amount))
for index4 in range(len(sourceDataBase)):
amount2 = amount2 + ((getTableRecodAmount(sourceDBUrl[index4],sourceDBUser[index4],sourceDBKey[index4],sqlList2[index2],sourceDataBase[index4]) )[0])[0]
print("源数据库 " + "表:" + (tables[index2])[0] + "数据总数为:" + str(amount2))
print( "表:" + (tables[index2])[0] + "相差" + str(amount2 - amount))
start()
配置文件dataBaseConfig.ini
[soureDB]
sourcedburl =
sourcedbuser =
sourcedbkey =
sourcedatabse =
sourcedatabseflg =
[targetDB]
targetdburl =
targetdbuser =
targetdbkey =
targetdbs =
[progress]
currentsourcedatabseflg =
table =
pageno = 1
beginid =
endid =
[config]
pagesize =
databaseinterval = 10000000
[filter]
tables =
[compare]
beginId =
enId =
配置参数(dataBaseConfig.ini) | |||||||||
源数据库配置 | [soureDB] | ||||||||
sourcedburl = | 源数据库链接 | 源数据库,多个数据库之间使用“,”隔开 | |||||||
sourcedbuser = | 用户名 | ||||||||
sourcedbkey = | 密码 | ||||||||
sourcedatabse = | 数据库名 | ||||||||
sourcedatabseflg = | 源数据库标志位(必填,每个数据库对应一个标志位,且是唯一) | ||||||||
目标数据库配置 | [targetDB] | ||||||||
targetdburl = | 目标数据库链接,多个数据库用“,”间隔 | ||||||||
targetdbuser = | 用户名 | ||||||||
targetdbkey = | 密码 | ||||||||
targetdbs = | 数据库名 | ||||||||
进度配置 | [progress] | ||||||||
currentsourcedatabseflg = | 当前源数据库标志位(必填) | 初始全部设置为空即可,脚本会自动更新数据;一般再开始转移数据时设置 | |||||||
table = alarm | 当前表 | ||||||||
pageno = 1 | 当前页 | ||||||||
beginid = | 开始id | ||||||||
endid = | 结束id | ||||||||
自定义 | |||||||||
[config] | |||||||||
pagesize = 5 | 每次抽取数据条数(必填) | ||||||||
databaseinterval = 10000000 | 不同源数据库,相同表格之间ID间隔。比如第一个数据库ID从1开始,第二个库同一张表ID从1+10000000开始,避免ID冲突 | ||||||||
[filter] | |||||||||
tables = user | 统计数据和迁移数据时需要迁移的表 | ||||||||
数据对比时从目标数据库取数范围 | [compare] | ||||||||
beginId =0 | 起始ID | ||||||||
enId =40000000 | 中止ID | ||||||||
配置完参数之后,直接将脚本放到已配置好python3环境的VM任意目录执行DataTanslate.py即可;如果中途失败,中止脚本,先执行deleteDataFromTables.py再执行DataTanslate.py。 |