设计一个相对通用型的表格合并程序,并将其封装成exe格式
【核心设计思路】
1.使用pandas读取待合并的文件,并校验各个待合并的文件的样式是否一致
2.合并df
3.将合并df写入到表格中输出
【注意点】
合并表格的程序设计相对简单,但仍然需要注意几个点:
1.待合并的子表文件夹内是否存在正在编辑状态的表格和非表格类型文件
2.待合并的表格内含有多个子表的情形
3.跳行的情形
【完整代码】
import pandas as pd
import os
import time
import xlwings as xw
import tqdm
import random
import httplib2
import datetime
app=xw.App(visible=False,add_book=False)
app.display_alerts=False
app.screen_updating=False
f_0='./待合并子表/'
f_s='./合并结果/'
f_k='./样表/'
forbidden_symbol=['<', '>', '?', ':','|', '*', '/', '"', '\\','[',']']
def read_data(data,sht_name,skip_row,level):
if data[-4:]=='.csv':
try:
d_u = pd.read_csv(data, header=0, chunksize=10000, encoding='gbk') # encoding='gbk',
except UnicodeDecodeError:
d_u = pd.read_csv(data, header=0, chunksize=10000, encoding='utf-8')
chunk_0 = [i for i in d_u]
df = pd.concat(chunk_0)
else:
if level==0 or level==1:
header=skip_row
else:
header=[skip_row + i for i in range(level)]
df = pd.read_excel(data, sheet_name=sht_name,header=header)
return df
def basic_check(DataSet_list,OpenCheck_list,SimpleName_list):
u=0
ori_data=[]
for i in DataSet_list:
if not os.path.exists(i):
os.mkdir(i)
for i in range(len(OpenCheck_list)):
if len(os.listdir(OpenCheck_list[i]))==0:
print('{}为空,请核验!'.format(SimpleName_list[i]))
u+=1
break
if u==0:
for i in range(len(OpenCheck_list)):
v = []
f_x = [OpenCheck_list[i] + j for j in os.listdir(OpenCheck_list[i])]
for s in f_x:
if '~$' in s:
v.append((s.replace(OpenCheck_list[i],'')).replace('~$',''))
f_x.remove(s)
for s in f_x:
if (s[-4:] !='.csv' ) and ('.xls' not in s[-5:]):
f_x.remove(s)
ori_data.append(f_x)
if len(v)>0:
print('Hint:\n【{}】文件夹内的以下文件正处于编辑状态:'.format(OpenCheck_list[i][2:-2]))
refix_col(v,5)
else:
print('\n※----基础验证未通过······※')
return ori_data
def choose_from_list(list_0,file_0,sht_type,positon,type=1):
choosed_data=''
if type>0:
list_1=[i.replace(file_0,'') for i in list_0]
else:
list_1=list_0[:]
s=0
t=0
if len(list_0)==1:
choosed_data=list_0[0]
else:
if sht_type=='源表':
print('\n检测到【{}】内存在多个{},请选择其中一个进行后续操作:\n\t若{}是以整数命名的,请输入·[完整文件名].如:{}.xlsx'.format(positon,sht_type,sht_type,random.randint(1,9)))
else:
print('\n检测到【{}】内存在多个{},请选择其中一个进行后续操作:\n\t若{}是以整数命名的,请使用·[整数+]·的格式,如:{}+.'.format(positon,sht_type,sht_type,random.randint(1,9)))
while s==0:
if t<=2:
print('可选{}清单:'.format(sht_type))
refix_col(list_1,9)
sht_x = input('请输入{}名称或{}位次:'.format(sht_type,sht_type))
try:
if sht_x == '':
sht_num = 0
choosed_data = list_0[sht_num]
s=1
else:
sht_num=int(sht_x)
if sht_num==1 :
sht_num=0
choosed_data = list_0[sht_num]
s=1
else:
if (sht_num>=len(list_0)+1) or (sht_num<=-len(list_0)-1):
print('\t※----下标越界-----------下标越界-----------下标越界----※\n')
t+=1
else:
if sht_num>0:
sht_num-=1
else:
pass
s=1
choosed_data = list_0[sht_num]
except (TypeError,ValueError):
if sht_x[0] or sht_x[-1]=="'":
sht_x=sht_x.replace("'",'')
if sht_x[0] or sht_x[-1]=='"':
sht_x=sht_x.replace('"','')
if sht_x[-1]=='+':
sht_x=sht_x[:-1]
else:
pass
if sht_x not in list_1:
print('\t输入的{}名称不在【{}】内,请核验!\n'.format(sht_type,positon))
t+=1
else:
choosed_data=list_0[list_1.index(sht_x)]
s=1
else:
s=1
print('\n-----*-----您已多次输错{}名称,请核验后重启程序再次尝试!-----*-----\nHint:\n\t①注意审查输入的{}名称是否存在空格\n\t②直接在可选{}清单内复制对应名称'.format(sht_type ,sht_type,sht_type))
choosed_data=''
if len(choosed_data)>0:
if type <=1:
if len(list_0)>1:
print('\t□······已选{}·[{}].'.format(sht_type,choosed_data.replace(file_0,'')))
else:
print('\t□······即将使用{}·[{}]的数据进行后续操作.\n'.format(sht_type, choosed_data.replace(file_0, '')))
else:
if len(list_0)>1:
print('\t□······已选{}·[{}]作为样表写入数据.'.format(sht_type,choosed_data.replace(file_0,'')))
else:
print('\t□······将使用·[{}]·作为{}写入数据.'.format(choosed_data.replace(file_0, ''),sht_type))
time.sleep(0.5)
return choosed_data
def refix_col(col_0,step,diff=''):
col_plus = int(len(col_0) % step)
c = 1
col_x = []
for i in range(len(col_0)):
col_x.append(col_0[i])
if c % step == 0:
print('\t{}{}'.format(diff,col_x))
col_x = []
else:
pass
c += 1
col_left = col_0[-col_plus:]
if col_plus > 0:
print('\t{}{}'.format(diff,col_left))
def Get_Common_SheetName(sheet_list):
s=pd.ExcelFile(sheet_list[0]).sheet_names
set_x=set(pd.ExcelFile(sheet_list[0]).sheet_names)
for i in sheet_list[1:]:#获取交集与并集
sht_name=pd.ExcelFile(i).sheet_names
s=set(s).intersection(set(sht_name))
set_x=set_x.union(set(sht_name))
xs=[len(pd.ExcelFile(i).sheet_names) for i in sheet_list]#获取各个表格的工作簿数量
z={
}
v=[]
for i in list(set_x):
z[i]=0
for j in sheet_list:
if i in pd.ExcelFile(j).sheet_names:
z[i]+=1
z=dict(sorted(z.items(),key=lambda x:x[1],reverse=True))
for i in z.keys():
v.append(i+': '+str(z[i]))
return list(s) ,list(set_x),v,xs
def set_file_name(exist_list,sheet_type='.csv'):
s_name=''
es = 0
sp = 0
while es == 0:
sk = 0
s_name = input('\n【请输入内容】※-----请输入即将应用至生成文件的规范文件名:')
for ep in forbidden_symbol:
if ep in s_name:
sk += 1
if sk > 0:
print('\t※----命名错误----键入的文件名不符合规范,请重新输入!----命名错误----※')
sp += 1
if sp >= 2:
print('\t提示:由本程序生成的Excel文件名称不可包含以下字符:\n\t\t{}'.format(forbidden_symbol))
else:
if s_name=='':
print('\t※----值为空----键入的文件名不能为空,请重新输入!----值为空----※')
else:
if '~$'+s_name+sheet_type in exist_list:
print('\t该文件已在[合并结果]文件夹内且处于编辑状态,请使用其他文件名或关闭该文件后再次尝试!')
else:
if s_name + sheet_type in exist_list:
IsCover=input('\t[合并结果]文件夹内已包含以该名字命名的文件,是否覆盖?(1/0):')
if IsCover in ['yes', '1', 'y', 'YES', 'Yes', '是', '']:
es = 1
print('\t□····已选[是]:[合并结果]文件夹内对应文件将被覆盖.')
else:
pass
else:
es = 1
print('\t文件名验证通过···············√')
time.sleep(1)
print('\n〇···开始写入数据:')
return s_name
def set_row(type_s='s'):
u = 0
skip_row=0
while u == 0:
try:
skip_row = int(input('\n【请输入内容】※-----请输入{}:'.format(type_s)))
if skip_row < 0:
print('\t※···×···请输入非负整数···×···※')
else:
u = 1
except (TypeError, ValueError):
print('\t※···×···请输入整数值···×···※')
return skip_row
def Generate_Xls(ds,file_loc,level):
if level==1:
ds.to_excel(file_loc, index=False,na_rep='')
else:
app = xw.App(visible=False, add_book=False)
app.display_alerts = False
app.screen_updating = False
ex = app.books.add()
st = ex.sheets[0]
st.range('A1').options(transpose=True).value = ds.columns.tolist() # .options(transpose=False)
st.range('A{}'.format(level + 1)).options(transpose=False).value = ds.values.tolist()
ex.save(file_loc)
ex.close()
app.kill()
if __name__ == '__main__':
try:
dataset=basic_check([f_0,f_s],[f_0],['待合并子表'])
if len(dataset)>0:
if len(dataset[0])<2:
print('待合并子表内工作表数量过少,无需合并.')
else:
common_sheet,union_set,dict_0,xs=Get_Common_SheetName(dataset[0])
s=0
if sum(xs)==len(dataset[0]):
print('检测到待合并子表内各个工作表中只有1个工作簿,将默认使用各个工作表的唯一工作簿数据进行合并.')
s=2
else:
if len(common_sheet)==0:
print('※···×···基础验证未通过···×···※')
print('\t[待合并子表]内表格的子表名称无公共子集,请核对!\n[Hint]:\n\t[待合并子表]文件夹内各个工作表的工作簿名称数量分布如下:')#\n\t{}'.format(dict_0))
refix_col(dict_0,1 if round(len(dict_0)/9,0)==0 else round(len(dict_0)/9,0),'\t')
else:
s=1
if s==0:
pass
else:
if s==1:
print('※···√···基础验证通过···√···※')
sht_name=choose_from_list(common_sheet,f_0,'待合并工作表','可合并工作簿',type=1)
else:
sht_name=0
skip_row=set_row(type_s='需要跳过的行数')
level=set_row(type_s='表头层级数')
col_list=[]
for i in dataset[0]:
d_s=read_data(i,sht_name,skip_row,level)
col_list.append(list(d_s.columns))
s = set(col_list[0])
for i in col_list[1:]: # 获取交集与并集
s = set(s).intersection(i)
if len(s)==0:
print('\t※···×···待合并对象无公共字段,无法完成合并,请核验···×···※')
else:
print('\n表格合并进程开始:')
print('\t①:提取公共字段数据···')
v=[]
for i in tqdm.trange(len(dataset[0])):
d_s = read_data(dataset[0][i], sht_name, skip_row, level)
d_f=d_s.loc[:,list(s)].copy()
v.append(d_f)
concated_data=pd.concat(v)
print('\t②:表头重排序···')
ori_col = []
for col in col_list[0]:
if col in s:
ori_col.append(col)
re_sorted_data=concated_data.loc[:,ori_col].copy()
print('数据准备完毕.')
file_name=set_file_name(os.listdir(f_s),sheet_type='.xlsx')
whole_name=f_s+file_name+'.xlsx'
t_x=time.time()
Generate_Xls(re_sorted_data,whole_name,level)
t_y=time.time()
print('\t数据写入完成,耗时{}s.'.format(round(t_y-t_x,2)))
except Exception as e:
print('※发生了意料之外的错误:\n\t{}'.format(e))