师姐想下载亿些CMIP6数据,全球日数据太大了硬盘不够,而且只想要中国范围的月平均数据。她推给我一篇介绍在shell脚本中用wget+cdo实现类似需求的方式,而我并不懂shell和cdo,大概看了一下后觉得用Python应该可以做得更好一些。
在我理解中,shell脚本中用wget+cdo的实现方法为:循环使用wget下载原始数据到本地硬盘上,每下载完一条后通过cdo命令将数据处理成所需要的另存到硬盘,再将原始数据删除。这样做思路清晰,但有两个地方我认为可以改进:
- 多线程下载
- 原始数据直接在内存中就处理,省去硬盘写入、读取、删除的开销,硬盘上只保存最终所需要的数据
思路为:urllib
多线程下载,下载数据用io.BytesIO
保存在内存,xarray
读取下载数据并处理保存到硬盘。
from urllib.request import Request, urlopen
from io import BytesIO
from multiprocessing.dummy import Pool
from os.path import exists
import json
from datetime import datetime
import xarray
from get_url import nc_urls
threads = 10
lon_w, lon_e = 70, 140
lat_s, lat_n = 15, 55
save_to = r'./cmip6_china_monthmean'
base_headers = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
'AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/74.0.3729.131 Safari/537.36',
'Connection': 'Keep-Alive'
}
pool = Pool(threads)
lon_slice = slice(lon_w, lon_e)
lat_slice = slice(lat_s, lat_n)
def worker(info):
nc_url, pos_start, pos_end, n, cache = info
headers = {
**base_headers, 'range': f'bytes={pos_start}-{pos_end}'}
while True:
try:
cache[n] = urlopen(Request(nc_url, headers=headers), timeout=5).read()
return
except Exception as e:
continue
done_json = f'{save_to}/done.json'
if exists(done_json):
with open(done_json) as f:
done = json.load(f)
else:
done = {
}
for nc_name, nc_url in nc_urls.items():
if nc_name in done:
print(f'{nc_name}\n已处理过,跳过')
continue
cache = [None] * threads
print(f'{nc_name}\n{datetime.now()}开始下载')
while True:
try:
nc_bytes = int(urlopen(Request(nc_url, headers=base_headers), timeout=5).headers['content-length'])
break
except Exception as e:
continue
subsize = nc_bytes // threads
worker_info = [[nc_url, x*subsize, (x+1)*subsize-1, x, cache] for x in range(threads)]
worker_info[-1][2] = nc_bytes - 1
pool.map(worker, worker_info)
print(f'{datetime.now()}下载完成')
f = BytesIO(b''.join(cache))
del cache
f.seek(0)
ds = xarray.open_dataset(f).sel(lon=lon_slice, lat=lat_slice)
ds = ds.groupby(ds.time.astype('datetime64[M]')).mean()
ds.to_netcdf(f'{save_to}/{nc_name[:-3]}_china_monthmean.nc')
print(f'{datetime.now()}处理完成')
del ds, f
done[nc_name] = nc_url
with open(done_json, 'w') as f:
json.dump(done, f, indent=4)
pool.close()
pool.join()
其中get_url
是用来从网站提供的sh脚本中提取下载链接的,内容如下:
from os.path import basename, exists
from glob import glob
import re
import json
wget_sh_glob = './pr_ssp245/wget*.sh'
nc_urls_json = './pr_ssp245/nc_urls.json'
if exists(nc_urls_json):
with open(nc_urls_json) as f:
nc_urls = json.load(f)
else:
nc_urls = {
}
for sh_path in glob(wget_sh_glob):
with open(sh_path) as f:
txt = f.read()
for nc_url in re.findall(r'(http://\S+?.nc)\'', txt):
nc_name = basename(nc_url)
if nc_name in nc_urls:
print(f'{nc_name}')
else:
nc_urls[nc_name] = nc_url
with open(nc_urls_json, 'w') as f:
json.dump(nc_urls, f, indent=4)
效果如图:
收获如下:
- 虽然不太优雅,但第一次成功尝试多线程,以后有的是进步空间
- 第一次尝试直接读取内存中的二进制nc文件(需要安装
h5netcdf
包)
存在的问题:
- 有些数据从如1850年开始计时,所以需要安装
cftime
包 - 下载过程中没有失败跳过判断(需求多的话我再加上吧)
- 第一次用
io.BytesIO
,尝试过多线程直接按照各自位置向io.BytesIO
中存,但失败了,所以妥协了先用字节串列表存数据,再拼接到io.BytesIO
中,希望擅长多线程或者相关“内存临时文件”编程的前辈指教一下