在做数据分析时可能会需要很多数据来做测试,但数据并不是每次都能找到合适的,所以可以通过一些方法生成一些数据来做计算,可以根据自己的需求来生成需要的数据。
实现方式就是利用多进程多线程来并发的生成数据,主要用于生成大量数据集的数据CSV文件
主要代码如下:
# create_data_util.py
import time
import random
from multiprocessing import Process, Manager, Pool
from threading import Thread
import sys
import pandas as pd
def get_result(val_type, num):
'''
用于生成指定类型随机数据的方法
'''
if val_type == 'str':
base_str = 'qwertyuiopasdfghjklzxcvbnm12345679890'
result = [''.join(random.sample(base_str,random.randint(0,20))) for _ in range(num)]
elif val_type == 'int':
result = [random.randint(0,10000) for _ in range(num)]
elif val_type == 'float':
result = [random.random() * random.randint(0,100000) for _ in range(num)]
elif val_type == 'bool':
result = [bool(random.randint(0,1)) for _ in range(num)]
elif val_type == 'None':
result = [None for _ in range(num)]
else:
result = [random.random() * random.randint(0,100000) for _ in range(num)]
return result
def multi_row(cols, df_ls, val_ls, v_tpyes):
'''
生成dataframe的方法,将得到的参数变成一个100列10000行的dataframe
'''
# print('开启一个线程')
tmp_df = pd.DataFrame(columns=cols)
for key in cols:
tmp_df[key] = get_result(v_tpyes[cols.index(key)], 10000)
df_ls.append(tmp_df)
# print('完成一个df')
def multi_col(cols, rows, val_ls):
'''
将数据按照每一万行开启一个线程写数据,将最终得到的dataframe合并成一个大dataframe
'''
# print('开启一个进程')
v_tpyes = [random.choice(val_ls) for _ in cols]
df_ls = []
# 当数据小于一万行时,直接生成即可
if rows < 10000:
tmp_df = pd.DataFrame(columns=cols)
for key in cols:
tmp_df[key] = get_result(v_tpyes[cols.index(key)], rows)
df_ls.append(tmp_df)
# 当数据大于一万行时,将数据按照每一万行开启一个线程的方式来生成数据
else:
ts = []
for _ in range(rows // 10000):
t = Thread(target=multi_row, args=(cols, df_ls, val_ls, v_tpyes))
t.start()
ts.append(t)
[t.join() for t in ts]
# print('准备合并dataframe')
pd_con = pd.concat(df_ls)
return pd_con
def split_data_computed(data, sept):
'''
拆分数据的方法,根据传入的数据和步长,将数据拆分为指定的块大小,例如传入data长度为200 sept为100,将返回[(0,100),(100,200)]
'''
tmp = len(data) // sept
res_data = []
for i in range(tmp):
if i != tmp -1:
res_data.append(tuple((i*sept, (i +1) * sept)))
else:
res_data.append(tuple((i*sept, len(data))))
if tmp == 0:
res_data.append((0, len(data)))
return res_data
def run_concat_data(cols, rows, val_ls, file_name):
'''
多进程处理数据,将数据按没一百列分成一个进程处理,将每个进程放入到队列中的dataframe取出进行再合并,得到最终的dataframe
'''
print('开始生成数据...')
tmp_df = []
def call_back_data(*args, **kwargs):
tmp_df.append(args[0])
# 使用进程池来开启多进程跑数据
pool = Pool()
for item in split_data_computed(cols, 100):
pool.apply_async(func=multi_col, args=(cols[item[0]:item[1]], rows, val_ls), callback=call_back_data)
pool.close()
pool.join()
# print('长度:', len(tmp_df))
print('分片数据生成完毕,准备合并数据...')
pd_con = pd.concat(tmp_df,axis=1)
print('合并完成,准备存储中。', time.strftime('%Y-%m-%d %H:%M:%S'))
pd_con.to_csv(file_name, index=False)
pkl_name = file_name.split('.')[0] + '.pkl'
pd_con.to_pickle(pkl_name)
print('保存完毕,主进程结束。', time.strftime('%Y-%m-%d %H:%M:%S'))
def main(input_data):
rule = '''
定义运行脚本时的参数:
1:指定需要生成的列数 int 示例 100列 -> 100
2:指定需要生成的行数 str 示例 100行 -> 100 10万行 -> 10w
3:指定生成的数据格式 str 示例 默认是生成包含 str int float bool None等混合的类型,当指定时按照指定的来生成,例如传入 float,int 将生成只包含该类型的数据
示例:
python create_data_util.py 100 10w 会生成默认值的100列10万行的csv数据
python create_data_util.py 100 10w str,float 会生成值只有str和float的100列10万行的csv数据
100列 10000行
'''
if len(input_data) < 3:
print('传入参数有误')
print(rule)
return
cols = int(input_data[1])
rows = int(input_data[2]) if 'w' not in input_data[2] else int(input_data[2][:-1]) * 10000
if len(input_data) == 3:
val_ls = ['str', 'int', 'float', 'bool', 'None']
else:
val_ls = input_data[3].split(',')
print('待生成的数据 列数:%d;行数:%d;值类型:%s' % (cols, rows, ','.join(val_ls)))
# 生成列名数据
col_names = ['field' + str(num) for num in range(cols)]
file_name = '%s_col_%s_row.csv' % (cols, input_data[2])
run_concat_data(col_names, rows, val_ls, file_name)
if __name__ == '__main__':
start_time = time.time()
main(sys.argv)
print('耗时:', time.time() - start_time)
运行示例:
python create_data_util.py 100 10w # 会生成默认值的100列10万行的csv数据
python create_data_util.py 10 10w str,float # 会生成值只有str float的10列10万行的csv数据