版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/weixin_39532362/article/details/89397309
项目练习_爬取股票财报数据【requests,多线程,struct】
爬取股票财报数据
# -*- coding:utf-8 -*-
# 爬取股票财报信息
import requests
from bs4 import BeautifulSoup
import pandas as pd
import numpy as np
import urllib
import json
import threadpool
from sqlalchemy import create_engine
import itertools
import threading
import struct
import ctypes
import binascii
class crawlfinancial:
def __init__(self,tdc,method,*codes):
self.headers={'User-Agent':'Mozilla/5.0 (Windows NT 6.1; rv2.0.1) Gecko/20100101 Firefox/4.0.1'}
self.rp_engine=self.get_engine('report')
self.ct=itertools.count(1)
self.lock=threading.Lock()
self.path='./down'
self.log_tpes=[]
self.action_crawl(tdc,method,*codes)
def __del__(self):
pd.DataFrame(self.log_tpes).to_csv('%s/log_tpe.txt'%self.path)
print('is end...')
def get_engine(self,db):
engine=create_engine("mysql+pymysql://root:[email protected]:3306/%s?charset=utf8mb4"%db)
return engine
def get_codes(self):
'''
获取代码
'''
sreq=requests.Session()
url='http://quotes.money.163.com/hs/service/diyrank.php?host=http%3A%2F%2Fquotes.money.163.com%2Fhs%2Fservice%2Fdiyrank.php&page={page}&query=STYPE%3AEQA&fields=NO%2CSYMBOL%2CNAME%2CPRICE%2CPERCENT%2CUPDOWN%2CFIVE_MINUTE%2COPEN%2CYESTCLOSE%2CHIGH%2CLOW%2CVOLUME%2CTURNOVER%2CHS%2CLB%2CWB%2CZF%2CPE%2CMCAP%2CTCAP%2CMFSUM%2CMFRATIO.MFRATIO2%2CMFRATIO.MFRATIO10%2CSNAME%2CCODE%2CANNOUNMT%2CUVSNEWS&sort=SYMBOL&order=asc&count=24&type=query'
# ===获取页数
uqurl=urllib.parse.unquote(url.format(page=0))
response=sreq.get(uqurl,headers=self.headers)
pagecount=json.loads(response.text).get('pagecount')
# ===获取market_pctchg
codes=[]
for page in range(pagecount):
uqurl=urllib.parse.unquote(url.format(page=page))
response=sreq.get(uqurl,headers=self.headers)
result=json.loads(response.text)
for market in result.get('list'):
item={}
codes.append(market.get('CODE')[1:])
print('page %s is ok...'%page)
print('crawl_codes is ok...',len(codes),codes[:5])
return codes
def req(self,url):
'''
请求
'''
# ===请求doc
response=requests.get(url,headers=self.headers)
response.encoding='utf-8'
soup=BeautifulSoup(response.text,'lxml')
# ===定位并连接需要的table
tbls=soup.select('.table_bg001')
tbls_str='\n'.join([tbl.prettify() for tbl in tbls])
# ===表格化并返回
dfs=pd.read_html(tbls_str,encoding='utf-8')
return dfs
def frm(self,rs):
'''
格式化
'''
rs=rs.set_index('报告日期').T
rs.columns.name='指标'
rs.index.name='报告日期'
rs=rs.replace(['--','0',0],np.nan)
rs=rs.apply(lambda se :se.astype(np.float64))
rs=rs.replace(np.nan,0)
rs.index=pd.DatetimeIndex(rs.index)
return rs
# ---
def crawl_zycwzb(self,code):
'''
主要财务指标
'''
dfs=self.req('http://quotes.money.163.com/f10/zycwzb_%s.html'%code)
rs_ls=[]
# 第一个表的index时分开的
rs_ls.append(pd.merge(dfs[0],dfs[1],left_index=True,right_index=True).set_index('报告日期'))
[rs_ls.append(df.set_index('Unnamed: 0')) for df in dfs[2:]]
rs=pd.concat(rs_ls,ignore_index=False).replace(['--','0',0],np.nan).apply(lambda se :se.astype(np.float64)).T
rs=rs.replace(np.nan,0)
rs.index=pd.DatetimeIndex(rs.index).rename('报告日期')
rs['code']=code
rs['report']='zycwzb'
return rs
def crawl_zcfzb(self,code):
'''
资产负债表
'''
dfs=self.req('http://quotes.money.163.com/f10/zcfzb_%s.html'%code)
rs=pd.merge(dfs[0],dfs[1],left_index=True,right_index=True)
rs=self.frm(rs)
rs['code']=code
rs['report']='zcfzb'
return rs
def crawl_lrb(self,code):
'''
利润表
'''
dfs=self.req('http://quotes.money.163.com/f10/lrb_%s.html'%code)
rs=pd.merge(dfs[0],dfs[1],left_index=True,right_index=True)
rs=self.frm(rs)
rs['code']=code
rs['report']='lrb'
return rs
def crawl_xjllb(self,code):
'''
现金流量表
'''
dfs=self.req('http://quotes.money.163.com/f10/xjllb_%s.html'%code)
rs=pd.merge(dfs[0],dfs[1],left_index=True,right_index=True)
rs=self.frm(rs)
rs['code']=code
rs['report']='xjllb'
return rs
# ---
def save_csv(self,code):
zycwzb=self.crawl_zycwzb(code)
zcfzb=self.crawl_zcfzb(code)
lrb=self.crawl_lrb(code)
xjllb=self.crawl_xjllb(code)
zycwzb.to_csv('%s/zycwzb_%s.csv'%(self.path,code),encoding='utf-8')
zcfzb.to_csv('%s/zcfzb_%s.csv'%(self.path,code),encoding='utf-8')
lrb.to_csv('%s/lrb_%s.csv'%(self.path,code),encoding='utf-8')
xjllb.to_csv('%s/xjllb_%s.csv'%(self.path,code),encoding='utf-8')
print('num(%s) code(%s) is ok...'%(next(self.ct),code))
def save_sql(self,code):
'''
待升级:过滤存在行,首次执行多线程创建表bug
'''
zycwzb=self.crawl_zycwzb(code).reset_index()
zcfzb=self.crawl_zcfzb(code).reset_index()
lrb=self.crawl_lrb(code).reset_index()
xjllb=self.crawl_xjllb(code).reset_index()
ct=next(self.ct)
if ct==1:
self.lock.acquire()
print('save is lock 1 td...')
zycwzb.to_sql('zycwzb',con=self.rp_engine,index=False,if_exists='append')
zcfzb.to_sql('zcfzb',con=self.rp_engine,index=False,if_exists='append')
lrb.to_sql('lrb',con=self.rp_engine,index=False,if_exists='append')
xjllb.to_sql('xjllb',con=self.rp_engine,index=False,if_exists='append')
if ct==1:
self.lock.release()
print('save is lock 1 td...')
print('code(%s) is ok...'%code)
def get_bts(self,ls,encoding='utf-8'):
'''
字符化,返回列表
'''
str_ls=[v if isinstance(v,str) else str(v) for v in ls]
return [bytes(v,encoding) for v in str_ls]
def get_tpe(self,ls,encoding='utf-8'):
'''
类型化,返回字符串
'''
byt_ls=self.get_bts(ls,encoding)
return ''.join(['%ss'%len(v) for v in byt_ls])
def write_hex(self,df):
'''
df类型以二进制写到文件
'''
# ===类型化字符串
col_tpe=self.get_tpe(df.columns)
idx_tpe=self.get_tpe(df.index)
dat_tpe=''.join(['%ss'%len(bytes(val,'utf-8')) if isinstance(val,str) else 'd' for val in df.ix[0,:].values])
# ===构造数据结构
col_sct=struct.Struct(col_tpe)
idx_sct=struct.Struct(idx_tpe)
dat_sct=struct.Struct(dat_tpe)
# ===创建内存
col_buf=ctypes.create_string_buffer(col_sct.size)
idx_buf=ctypes.create_string_buffer(idx_sct.size)
dat_buf=ctypes.create_string_buffer(dat_sct.size)
# ===数据写入内存
col_sct.pack_into(col_buf,0,*self.get_bts(df.columns))
idx_sct.pack_into(idx_buf,0,*self.get_bts(df.index))
# ===写出文件
fname='%s_%s'%(df.ix[0,'report'],df.ix[0,'code'])
with open('%s/%s.dat'%(self.path,fname),'wb') as f:
f.write(col_buf)
f.write(idx_buf)
for ind in df.index:
dat_sct.pack_into(dat_buf,0,*[bytes(val,'utf-8') if isinstance(val,str) else val for val in df.ix[ind,:].values])
f.write(dat_buf)
# ===写出日志,记录数据的类型字符化
self.log_tpes.append(dict([('fname',fname),('col_tpe',col_tpe),('idx_tpe',idx_tpe),('dat_tpe',dat_tpe)]))
def save_dat(self,code):
'''
16进制保存
'''
zycwzb=self.crawl_zycwzb(code)
zcfzb=self.crawl_zcfzb(code)
lrb=self.crawl_lrb(code)
xjllb=self.crawl_xjllb(code)
self.write_hex(zycwzb)
self.write_hex(zcfzb)
self.write_hex(lrb)
self.write_hex(xjllb)
print('num(%s) code(%s) is ok...'%(next(self.ct),code))
# ---
def action_crawl(self,tdc,method,*codes):
if method=='csv':
callable_=self.save_csv
elif method=='sql':
callable_=self.save_sql
elif method=='dat':
callable_=self.save_dat
else:
assert False
if len(codes)==0 or codes[0]==None or codes[0]=='':
codes=self.get_codes()
tuple_list=[((code,),None) for code in codes]
pool=threadpool.ThreadPool(tdc)
tasks=threadpool.makeRequests(callable_=callable_,args_list=tuple_list)
[pool.putRequest(task) for task in tasks]
pool.wait()
self.__del__()
class read_dat:
def __init__(self,fname):
self.path='./down'
self.log_tpe=pd.read_csv(open('%s/log_tpe.txt'%self.path),index_col=0,header=0,encoding='utf-8').set_index('fname')
self.parse_fle(fname)
def get_tpe(self,fname,tpe):
return self.log_tpe.ix[fname.split('.')[0],tpe]
def parse_fle(self,fname):
'''
解释dat文件
'''
path='%s/%s'%(self.path,fname)
# ===格式化字符串
col_tpe=self.get_tpe(fname,'col_tpe')
idx_tpe=self.get_tpe(fname,'idx_tpe')
dat_tpe=self.get_tpe(fname,'dat_tpe')
# ===创建结构类型
col_sct=struct.Struct(col_tpe)
idx_sct=struct.Struct(idx_tpe)
dat_sct=struct.Struct(dat_tpe)
# ===获取数据
rd_ls=[]
with open(path,'rb') as f:
col_row=col_sct.unpack(f.read(col_sct.size))
rd_ls.append([v.decode('utf-8') for v in col_row])
idx_row=idx_sct.unpack(f.read(idx_sct.size))
rd_ls.append([v.decode('utf-8') for v in idx_row])
while True:
try:
dat_row=dat_sct.unpack(f.read(dat_sct.size))
rd_ls.append([v.decode('utf-8') if isinstance(v,bytes) else v for v in dat_row])
except Exception as e:
print(e)
break
# ===打印
rd=pd.DataFrame(rd_ls[2:],columns=rd_ls[0],index=rd_ls[1])
print(rd)
if __name__=='__main__':
crawlfinancial(2,'dat','000001','000002')
read_dat('lrb_000001.dat')