本来想写一个库,看能不是提高pandas处理dataframe持久化成h5的读写速度,结果是无功而返,速度差距很大。
感觉难点之一在数据转换上,现在还没有很好的办法。做个标记,也算是对h5折腾的一个回顾。
先说观点:
1、df的持久化库最好还是先选择pdstore的方法,现成,效率也算很快。
2、如果是array,那么,选择h5py.
3、如果对速度有极致,那么尽量选择h5py.
# -*- coding: utf-8 -*-
#import tools
import os
import pandas as pd
import os.path
import numpy as np
import h5py
import time as t
_dir =r"S:\join_quant_data\futures\minute\IF.CCFX"
_dr = r"S:\join_quant_data\futures\minute\IF.CCFX\IF1910.CCFX_2019-09-05_2019-09-13.csv"
data_type ='.csv'
save_path = r"C:\Users\Administrator\Desktop\t1.h5"
#basecode ="ic"
--------------------
#data =np.random.rand(20000000,15)
#save_path = r"C:\Users\rustr\Desktop\new34.h5"
---------------------
# 简单数值型
def write_h5(save_path,data):
try:
t0= t.time()
f = h5py.File(save_path,'w')
f['data'] =data
finally:
f.close()
print("cost time:",t.time()-t0)
def read_h5(save_path):
try:
t0= t.time()
f = h5py.File(save_path,'r')
data =f['data'][()]
finally:
f.close()
print("cost time:",t.time()-t0,'s')
print("shape",data.shape)
return data
# CSV:
def get_all_files_by_root_sub_dirs(directory,file_type):
data = list()
if os.path.isdir(directory): # 是目录
dir_list = os.walk(directory) #os.listdir(directory)
for (root,sub,files) in dir_list:
for file in files:
path = root +'\\'+file
if path.endswith(file_type) :
data.append(path)
else: # 不是目录,直接输出
if directory.endswith(file_type) :
data.append(directory)
return data
def get_contracts_files(_dir,data_type):
files = get_all_files_by_root_sub_dirs(_dir,data_type)
s1 ='8888'
s2 ='9999'
fls = [file for file in files if ((s1 in file ) or (s2 in file))==False]
print("files count :",len(fls))
return fls
def get_code_from_file(file):
s = os.path.basename(file)
s = s.split('_')[0]
if s.endswith('.csv') or s.endswith('.CSV'):
s = s[:-4]
return s
def get_data_from_csv_files(files):
df = pd.DataFrame()
for file in files:
code = get_code_from_file(file)
temp = pd.read_csv(file,sep=',',index_col=0,encoding='utf-8')
#temp.index = np.array([code]*len(temp))
column_add = list([code]*len(temp))
temp.insert(0,"code", column_add)
#print("code:",code, "temp: ",temp.shape)
df = temp.append(df)
return df
# IC1906.CCFX => IC.CCFX
# 000002.XSHE =>000002.XSHE
def get_base_code(code):
#a = "IC1906.CCFX"
code_spot = code.split(".")
#_end = code_spot[1] #.CCFX
_first = code_spot[0] #.IC1906
#str.isalpha() 所有字符都是字母
#str.isdigit() 所有字符都是数字
if _first.isdigit():
return code
else:
if _first[:2].isalpha():
return _first[:2]#+'.'+ _end #不要加.CCFX,不符合规范
elif _first[:1].isalpha():
return _first[0]#+'.'+ _end
else:
return ""
def get_csv_data_test():
#files = get_all_files_by_root_sub_dirs(_dir,data_type)
files = get_contracts_files(_dir,data_type)
data = get_data_from_csv_files(files)
return data
# dataframe
def pd_store_write_h5(save_path,basecode,data):
try:
store = pd.HDFStore(save_path,complevel=5,complib ='zlib')
store[basecode] = data
finally:
store.close()
def pd_store_read_h5(save_path,basecode):
try:
store = pd.HDFStore(save_path,mode='r')
data =store[basecode]
finally:
store.close()
return data
#def df_to_array(df,index=True):#?
# ra = df.to_records(index=index)
# np_array = np.asarray(ra)
# return np_array
def test():
t0 =t.time()
df = get_csv_data_test()
basecode ="ic"
print("get csv data cost time :",t.time()-t0)
def store_test(data):
print("store_test..............")
code = get_code_from_file(_dir)
basecode = get_base_code(code)
print("write h5...")
t1 =t.time()
pd_store_write_h5(save_path,basecode,data)
print("write h5 cost time:",t.time()-t1)
t2 =t.time()
data = pd_store_read_h5(save_path,basecode)
print("read h5 cost time:",t.time()-t2)
print("data shape :",data.shape)
#h5py
def test_h5py_write():
d_type = np.dtype([
('stk_code','S8'),
('trans_time','i4'),
('price','i8'),
('volumn','i8'),
('buy_order_id', 'i8'),
('sell_order_id', 'i8'),
('b_s', 'c'),
('trans_type', 'c')
])
dat = [
('SZ000001', 112233156, 90100, 30000, 12345,67890, b'B', b'F'),
('SZ000001', 112233256, 90100, 30000, 12345,67890, b'B', b'F'),
('SZ000001', 112233356, 90100, 30000, 12345,67890, b'B', b'F'),
('SZ000001', 112233456, 90100, 30000, 12345,67890, b'B', b'F'),
]
arr = np.array(dat, dtype=d_type)
s_path = r"C:\Users\rustr\Desktop\new33.h5"
try:
f= h5py.File(s_path,'w')
g = f.create_group("SZ000001")
#g.create_dataset("data",data =arr,compression='gzip', compression_opts=6)
g.create_dataset("data",data =arr,compression='lzf')
finally:
f.close()
def test_h5py_read():
s_path = r"C:\Users\rustr\Desktop\new34.h5"
try:
f = h5py.File(s_path,'r')
data =f['SZ000001/data'][()]
print("type:",type(data))
finally:
f.close()
return data
def df_to_h5_v1(df,save_path,basecode):
t0 = t.time()
f= h5py.File(save_path,'w')
basecode ="IC"
try:
g = f.create_group(basecode) #IF.CCFX
columns = np.array(df.columns,dtype='str')
index = np.array(df.index,dtype = np.dtype(('U', 19)))
code = np.array(df.iloc[:,0],dtype='str')
# 合并一列
#str_content =np.column_stack((index,code))
float_content = np.array(df.iloc[:,1:],dtype ='float')
d_type = h5py.special_dtype(vlen=str)
#lzf
g_index = g.create_dataset("index",index.shape,dtype =d_type,compression='lzf')
g_index[:] = index
g_code = g.create_dataset("code",code.shape,dtype =d_type,compression='lzf')
g_code[:] = code
g.create_dataset("float",data =float_content)
finally:
f.close()
print("df_to_h5 cost time:",t.time()-t0)
def to_1dim_array(data,dtype=None):
#return list(data)
return np.array([tuple(list(data))],dtype)
def to_ndim_array(data,dtype=None):
return np.array(list(data),dtype)
def df_to_h5_v2(df,save_path,basecode):
t0 = t.time()
f= h5py.File(save_path,'w')
#basecode ="IC"
try:
g = f.create_group(basecode) #IF.CCFX
columns = np.array(df.columns,dtype='str')
ind = to_1dim_array(df.index)
index = np.array(ind)
print("index type",type(index),"index len:",len(index))
d_type = h5py.special_dtype(vlen=str)
#g_index = g.create_dataset("index",index.shape,dtype =d_type,compression='gzip',compression_opts=6)
#g_index[:] = index
for col in columns:
data_col = df.loc[:,col]
temp = np.array(data_col)
if isinstance(col ,str):
ty =d_type
continue
else:
ty =np.float32
# data_col.shape
g_temp = g.create_dataset(col,data_col.shape,dtype = ty,compression='gzip',compression_opts=6) #shape
g_temp[:] = temp
finally:
f.close()
print("df_to_h5 cost time:",t.time()-t0)
def df_to_array_v1(df):
columns = list(df.columns)
columns.insert(0,"index")
arr =[]
for col in columns:
if col =="index":
temp =to_1dim_array(df.index)
arr = temp.T
# print("arr type:",type(arr))
# print("first arr=>len:",len(arr))
# print(arr)
else:
temp =to_1dim_array(df.loc[:,col])
#temp =temp.T
# print("temp type:",type(temp))
# print("temp len:",len(temp))
# print(temp)
arr = np.concatenate((arr,temp.T),axis=1)
return arr
# np.array(list(tuple),dtype=types)
def df_to_array(df):
index =np.array([tuple(list(df.index))])
values = np.array(df[:])
arr = np.concatenate((index.T,values),axis=1)
return arr
def to_tuple_list(arr):
data =[tuple(arr)]
return data
def get_df_dtype(df):
columns = list(df.columns)
columns.insert(0,"index")
d_types =[]
for col in columns:
if col=='code'or col=="index":
tup =(col,'S8')
else:
tup =(col,'float')
d_types.append(tup)
return d_types
def df_to_array_type(df):
d_types = get_df_dtype(df)
arr = df_to_array(df)
data = np.array(arr,dtype =d_types)
return data
def df_to_h5_update(df,save_path,basecode):
t0 = t.time()
f= h5py.File(save_path,'w')
#basecode ="IC"
try:
g = f.create_group(basecode) #IF.CCFX
# d_types = get_df_dtype(df)
# types =np.dtype(d_types)
# arr = df_to_array(df)
# data =np.array(arr,dtype =types)
# g.create_dataset()
finally:
f.close()
print("df_to_h5 cost time:",t.time()-t0)
def read_h5(save_path,basecode):
t0 =t.time()
f = h5py.File(save_path,'r+')
try:
d = f[basecode]
arr =[]
for k in d.keys():
temp =to_1dim_array(d[k][()]) # get value
print("temp type:",type(temp))
if arr ==[]:
arr = temp.T
print("arr len",len(arr))
else:
TT= t.time()
#arr = np.concatenate((arr,temp.T),axis=1)
print("stack cost:",t.time()-TT)
#print(arr.shape)
finally:
f.close()
print("read_h5 cost time:",t.time()-t0)
return arr
# a =[1,2,3] =>list
# b =np.array(a) =>array
# c =np.array([a]) => ndarray
# d =[(1,2,3)] =>np.array(d)
# e =[1,2,3] => tuple; tuple(e)=>(1,2,3)
# b =list(a)
#c=tuple(b)
#d =np.array([c])
有意思的尝试:
如果当array类型存入h5时,带进去类型标识dtype,这个读写速度有多大影响?
如果有兴趣,大家可以试一下。
dtype =[(col,'f') for col in df.columns]
#arr = np.array(df,dtype=dtype)