import pymysql import decimal import datetime import json class DecimalEncoder(json.JSONEncoder): def default(self,obj): if isinstance(obj,decimal.Decimal):#decimal类型转换,使其可以转换为json格式数据 return float(obj) elif isinstance(obj,datetime.datetime):#datetime类型转换,使其可以转换为json格式数据 return obj.__str__() return super(DecimalEncoder,self).default(obj) class RUNSQL(object): def run_sql(self): new_db=pymysql.connect(host='192.168.103.116',port=3306,user='db_select',password='LTyIt7AIPkWzwOrF',db='tms',charset='utf8') old_db=pymysql.connect(host='192.168.103.116',port=3306,user='db_select',password='LTyIt7AIPkWzwOrF',db='tms_tmp',charset='utf8') #获取字段属性值 self.new_cursor1=new_db.cursor() self.old_cursor1=old_db.cursor() self.new_cursor1.execute("desc order_returns;") self.old_cursor1.execute('desc t_wms2tms_saleafter_item ;') #所有新旧数据对比 sum self.new_all_data=new_db.cursor() self.old_all_data=old_db.cursor() self.new_all_data.execute("SELECT SUM( CRC32( CONCAT( IFNULL(a.related_order_no, ''), IFNULL(logistics_code, ''), IFNULL(order_no, ''), IFNULL(express_no, ''), IFNULL(total_price, ''), IFNULL(collection_price, ''), IFNULL(number, ''), IFNULL(weight, ''), IFNULL(order_closed_at, ''), IFNULL(remarks, ''), IFNULL(insurance_price, ''), IFNULL(send_status, ''), IFNULL(order_created_at, ''), IFNULL(order_outhoused_at, ''), IFNULL(need_invoice, ''), IFNULL(created_at, ''), IFNULL(delivery_method, ''), IFNULL(pay_at, ''), IFNULL(location, ''), IFNULL(STATUS, ''), IFNULL(order_source, '') ) ) ) FROM tms.order_returns a JOIN tms.order_id_after_tmp b ON a.related_order_no = b.related_order_no;") self.old_all_data.execute("SELECT SUM( CRC32( CONCAT( IFNULL(saleAfterOrderId, ''), IFNULL(logisticCompanyId, ''), IFNULL(txLogisticID, ''), IFNULL(mailNo, ''), IFNULL(goodsValue, ''), IFNULL(itemsValue, ''), IFNULL(totalPCS, ''), IFNULL(totalWeight, ''), IFNULL(closed_time, ''), IFNULL(remark, ''), IFNULL(insuranceValue, ''), IFNULL(sendstatus, ''), IFNULL(createTime, ''), IFNULL(outhousetime, ''), IFNULL(needInvoice, ''), IFNULL(arriveTime, ''), IFNULL(deliverymode, ''), IFNULL(paytime, ''), IFNULL(location, ''), IFNULL(deletestatus, ''), IFNULL(orderSource, '') ) ) ) FROM tms_tmp.t_wms2tms_saleafter_order a JOIN tms.order_id_after_tmp b ON a.saleAfterOrderId = b.related_order_no;") #新旧数据count self.new_count_data=new_db.cursor() self.old_count_data=old_db.cursor() self.new_count_data.execute("SELECT count( CRC32( CONCAT( IFNULL(related_order_no, '')))) FROM order_returns") self.old_count_data.execute("SELECT count( CRC32( CONCAT( IFNULL(saleAfterOrderId, '')))) FROM tms_tmp.t_wms2tms_saleafter_order a JOIN tms.order_id_after_tmp b ON a.saleAfterOrderId = b.related_order_no") new_db.close() old_db.close() def sql_test(self): '''新旧字段对比''' self.run_sql() print('字段名称','字段类型','是否为空','主键外键','默认值','附加说明') # print(self.new_cursor1.fetchall()) # print(self.old_cursor1.fetchall()) new_list=[] old_list=[] for res_new in self.new_cursor1.fetchall(): if res_new[0]=='order_no'or res_new[0]=='express_no'or res_new[0]=='logistics_code'or res_new[0]=='order_transport_type'or res_new[0]=='number'or \ res_new[0]=='weight'or res_new[0]=='total_price'or res_new[0]=='collection_price'or res_new[0]=='insurance_price'or res_new[0]=='order_source'or \ res_new[0]=='remarks'or res_new[0]=='order_created_at'or res_new[0]=='order_outhoused_at'or res_new[0]=='order_review_at'or res_new[0]=='status'or \ res_new[0]=='created_at': new_list.append(res_new) else: print(res_new) print('\n') for res_old in self.old_cursor1.fetchall(): if res_old[0]=='txLogisticID'or res_old[0]=='logisticsOutNo'or res_old[0]=='logisticCompanyId'or res_old[0]=='flag'or res_old[0]=='totalPCS' or \ res_old[0]=='totalweight'or res_old[0]=='goodsValue'or res_old[0]=='itemsValue'or res_old[0]=='insuranceValue'or res_old[0]=='orderSource'or \ res_old[0]=='remark'or res_old[0]=='createTime' or res_old[0]=='outhousetime'or res_old[0]=='reviewtime'or res_old[0]=='deletestatus' or \ res_old[0]=='arriveTime': old_list.append(res_old) else: pass # print(new_list) # print(old_list) for i in range(len(new_list)): if new_list[i][0]=='order_no': print(new_list[i]) for i in range(len(new_list)): if old_list[i][0]=='txLogisticID': print(old_list[i]) print('\n') for i in range(len(new_list)): if new_list[i][0]=='express_no': print(new_list[i]) for i in range(len(new_list)): if old_list[i][0]=='logisticsOutNo': print(old_list[i]) print('\n') for i in range(len(new_list)): if new_list[i][0]=='logistics_code': print(new_list[i]) for i in range(len(new_list)): if old_list[i][0]=='logisticCompanyId': print(old_list[i]) print('\n') for i in range(len(new_list)): if new_list[i][0]=='order_transport_type': print(new_list[i]) for i in range(len(new_list)): if old_list[i][0]=='flag': print(old_list[i]) print('\n') for i in range(len(new_list)): if new_list[i][0]=='number': print(new_list[i]) for i in range(len(new_list)): if old_list[i][0]=='totalPCS': print(old_list[i]) print('\n') for i in range(len(new_list)): if new_list[i][0]=='weight': print(new_list[i]) for i in range(len(new_list)): if old_list[i][0]=='totalweight': print(old_list[i]) print('\n') for i in range(len(new_list)): if new_list[i][0]=='total_price': print(new_list[i]) for i in range(len(new_list)): if old_list[i][0]=='goodsValue': print(old_list[i]) print('\n') for i in range(len(new_list)): if new_list[i][0]=='collection_price': print(new_list[i]) for i in range(len(new_list)): if old_list[i][0]=='itemsValue': print(old_list[i]) print('\n') for i in range(len(new_list)): if new_list[i][0]=='insurance_price': print(new_list[i]) for i in range(len(new_list)): if old_list[i][0]=='insuranceValue': print(old_list[i]) print('\n') for i in range(len(new_list)): if new_list[i][0]=='order_source': print(new_list[i]) for i in range(len(new_list)): if old_list[i][0]=='orderSource': print(old_list[i]) print('\n') for i in range(len(new_list)): if new_list[i][0]=='remarks': print(new_list[i]) for i in range(len(new_list)): if old_list[i][0]=='remark': print(old_list[i]) print('\n') for i in range(len(new_list)): if new_list[i][0]=='order_created_at': print(new_list[i]) for i in range(len(new_list)): if old_list[i][0]=='createTime': print(old_list[i]) print('\n') for i in range(len(new_list)): if new_list[i][0]=='order_outhoused_at': print(new_list[i]) for i in range(len(new_list)): if old_list[i][0]=='outhousetime': print(old_list[i]) print('\n') for i in range(len(new_list)): if new_list[i][0]=='order_review_at': print(new_list[i]) for i in range(len(new_list)): if old_list[i][0]=='reviewtime': print(old_list[i]) print('\n') for i in range(len(new_list)): if new_list[i][0]=='status': print(new_list[i]) for i in range(len(new_list)): if old_list[i][0]=='deletestatus': print(old_list[i]) print('\n') for i in range(len(new_list)): if new_list[i][0]=='created_at': print(new_list[i]) for i in range(len(new_list)): if old_list[i][0]=='arriveTime': print(old_list[i]) print('\n') def all_data(self): '''新旧数据的对比 sum''' print('平移数据') self.run_sql() new_data=self.new_all_data.fetchall()[0][0] old_data=self.old_all_data.fetchall()[0][0] print("新表:",new_data) print("旧表:",old_data) if new_data==old_data: print("新旧数据相等") else: print("数据不等,请重新检查") def count_data(self): '''新旧数据count''' self.run_sql() new_count=self.new_count_data.fetchall()[0][0] old_count=self.old_count_data.fetchall()[0][0] print("新表:",new_count) print("旧表:",old_count) if new_count==old_count: print("新旧数据相等") else: print("数据不等,请重新检查") if __name__=='__main__': s=RUNSQL() s.all_data() # s.count_data()
数据库迁移测试--平移
猜你喜欢
转载自blog.csdn.net/lssrain/article/details/80606865
今日推荐
周排行