最近接了个需求,从某站点爬取30万数据回来,并对每条数据在获取明细。
那么如何半个小时内让你的程序完成这个工作呢?这是个很有挑战性的工作。
开始着手分析,此数据是分页得,每页10
条,每条数据有30
个字段,明细又有10
个字段。明细接口需传当前数据得id
标识获取。提取回来得数据为json
格式。
分析完数据,就开始着手准备,首先先获取一页数据,获取回来为个json
数组,那么拿这个json
数组在Idea
中使用GsonFormat
(Idea
插件)生成实体类,并获取一条详情数据,同样生成一个实体类。
接下来引入hutool
工具包,使用它的HttpUtil比较方便快捷
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.3.2</version>
</dependency>
接着花一点时间构建下逻辑伪代码如下
main(){
for(i=0;i<30000;i++){
getPage(i);
}
}
getDetail(){
}
getPage(int page){
log.info("当前页:{}",page)
Map<String, String> heads = new HashMap<>();
heads.put("Authorization", ACCESS_TOKEN);
String params = "{\"page\":" + page+"}");
HttpResponse res = HttpUtil.createPost(String.format(PAGE_URL, page, 15)).body(params).execute();
String body = res.body();
Result result = JSONUtil.toBean(body, Result.class);
List<Order> list = result.getOrders().getList();
List<Detail> details=new ArrayList<>();
for (Order order : list) {
Detail detail = getDetail(order.getId);
if (detail != null) {
details.add(detail);
}
}
writeToFile();
}
writeToFile(){
}
以上为主要伪代码,总体逻辑没问题,跑起来调试下获取一个页面数据,一切正常。
接下来如何将上面代码优化到30分钟内能爬到所有数据呢?这才是挑战。
考虑到高性能,这种情况除了多线程没有别得方案。
- 那么首先先着手分析下平均一个请求得耗时从网页分析结果来看一个页面请求平均耗时在
200ms
左右,一个详情获取在140ms
左右。 - 自身得带宽
100MB
,下行10MB/S
左右 - 分页数据大小
40kb
- 详情数据大小
10kb
- 上行请求大小可以忽略不计
有了这些前提接下来考虑以下两个点:
Ⅰ、基于以上数据可以计算出带宽可以支撑200
个左右下行并发流量,实际可能更多(因为以上计算使用的是50kb
来平均计算),姑且先算这么多,那么假如我每秒发起200
个请求,相当于大约一次获取20
个分页数据。我总共有3
万个页面除以20
就是1500
秒大约在25
分钟左右能跑完整个程序。(当然此处直接忽略服务器带宽情况)。
Ⅱ、启动一个20
个线程得线程池获取分页,启动一个200
个线程的线程池获取详情。这里陈述一个误区,很多人涉及到多线程编程得时候会想到网络上得最佳线程数计算方式,但是这个在此不适用。为什么呢?且听我细细道来:
- 首先排除带宽性能而言,此处得性能瓶颈是在
http
请求得响应时间上,并不在cpu上下文调度上。有人跟我说开200
个线程没有用,这个是个很大得误区,假如我只开10
个线程,那么一个请求100ms
,这个时候我后续进来得请求只能在队列中等待,为什么呢?因为cpu
得调度是微秒级别,并且一个cpu
时钟分配给一个进程执行时间在10ms
左右。那么我请求没有完成这段时间cpu
处于空闲状态,它完全由能力处理更多得请求,所以不要把书读死。
所以对以上代码进行逻辑重组,重组之后如下
自定义线程池
public class ThreadPool {
public static ThreadPoolExecutor pagePoolExecutor = new ThreadPoolExecutor(// 自定义一个线程池
20, // coreSize
20, // maxSize
60, // 60s
TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024) // 有界队列
, Executors.defaultThreadFactory()
, new ThreadPoolExecutor.CallerRunsPolicy()//队列溢出策略,回退防止数据丢失
);
public static ThreadPoolExecutor detailPoolExecutor = new ThreadPoolExecutor(// 自定义一个线程池
200, // coreSize
200, // maxSize
60, // 60s
TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024) // 有界队列
, Executors.defaultThreadFactory()
, new ThreadPoolExecutor.CallerRunsPolicy()
);
}
代码改造,因为使用了多线程,所以改造代码的时候就需要注意存储数据的list得使用Collections.synchronizedList
main(){
for(i=0;i<30000;i++){
ThreadPool.pagePoolExecutor.execute(() -> {
getPage(i);
}
}
//防止线程执行完成直接退出
while (true) {
if (ThreadPool.pagePoolExecutor.getActiveCount() == 0) {
break;
}
}
}
getDetail(){
}
//改成全局容器
List<Detail> details = Collections.synchronizedList(Detail);
getPage(int page){
log.info("当前页:{}",page)
Map<String, String> heads = new HashMap<>();
heads.put("Authorization", ACCESS_TOKEN);
String params = "{\"page\":" + page+"}");
HttpResponse res = HttpUtil.createPost(String.format(PAGE_URL, page, 15)).body(params).execute();
String body = res.body();
Result result = JSONUtil.toBean(body, Result.class);
List<Order> list = result.getOrders().getList();
for (Order order : list) {
ThreadPool.detailPoolExecutor.execute(() -> {
Detail detail = getDetail(order.getId);
if (detail != null) {
details.add(detail);
}
}
}
//防止线程执行完成直接退出
while (true) {
if (ThreadPool.detailPoolExecutor.getActiveCount() == 0) {
writeToFile(details);
break;
}
}
}
writeToFile(){
}
按照以上逻辑改完之后和预期效果差不多,实际执行了30分钟左右。但是以上程序可能健壮度不够,例如:中间网络请求超时,或者写入磁盘出错等情况都未考虑在内。所以如果你也有同样的需求,请考虑程序健壮性,此处只给出一个核心范例。