简析pyspider

pyspider优势所在

pyspider非常适合那种很小很杂的爬虫的管理,比如有100个小网站,规则又各不相同,我要获取他的一些很简单的内容,如标题,所有的图片,正文内容。他分为几个模块:scheduler,fetcher,processor,resultworker以及一个ui,前三者各自分离,用消息队列连接,因此很容易做成分布式(或者说设计之初就是为了分布式的)。

scheduler

了解scheduler之前,先了解两个概念,一个是project,代表着一个项目,如百度爬虫项目;一个是task,代表一个爬取任务,如爬取百度首页,爬取某一个新闻业,都是一个task。

与scheduler相关的队列有三个

  • scheduler2fetcher 也就是scheduler中的out queue,用于发送task给fetcher
  • status_queue 用于从processor中获取已经爬取的task的状态并做相应处理
  • newtask_queue 新产生的task

scheduler负责调度,与scrapy或者其他的爬虫框架类似,调度器负责调度需要爬取的内容,决定哪些内容在哪些时候进行爬取。我们从代码入手看下pyspider的调度器做了啥。

def run(self):
    while not self._quit:
        self.run_once()
        
def run_once(self):
    self._update_projects()
    self._check_task_done()
    self._check_request()
    while self._check_cronjob():
        pass
    self._check_select()
    self._check_delete()
    self._try_dump_cnt()

入口为run函数,真正有用的是run_once函数。我们可以看到,每一轮调度都会依次调用几个方法。

_update_projects

该方法会从projectdb中读取是有有新的project更新,如果更新了就得处理这个project

_check_task_done ?

该方法会消费status queue,爬取失败的task,检查下要不要重新爬,标记一下,存起来。爬取成功的task,看下是否要再爬一次,标记一下,存起来。

_check_request

消费newtask_queue,该队列为待爬取的队列,任务取出来,处理处理,标记一下,存起来。

_check_cronjob

看下有没有什么定时任务触发了,有的话,丢到out queue(scheduler2fetcher)给fetcher爬去。

_check_select

之前不是标记并存了好多要爬取的任务咩,取出来,丢给out queue给fetcher爬去。

_check_delete

处理一些被标记为删除的project

_try_dump_cnt

本轮结束,记个数。

scheduler逻辑相当清晰,分工也很明确:找到需要爬取的任务给fetcher。

fetcher

fetcher的职责更为清晰:下载。

与他相关的有两个队列

  • scheduler2fetcher 也是fetcher中的inqueue,调度器传给fetcher的任务
  • fetcher2processor 也是fetcher中的outqueue,fetcher传给processor的任务

fetcher的入口也是run方法,会从inqueue中读取任务去爬取。整个fetcher是基于tornado实现的(说真,tornado在py3 async的时代看起来显得好丑..)并提供了几种爬取的方式。这部分代码很简单,不细说了,就是下载下来,爬取结束之后发送到outqueue中。

processor

涉及到四个队列

  • fetcher2processor 也是inqueue,为fetcher的输入
  • status_queue 把fetcher爬到的内容输出给scheduler
  • newtask_queue 新任务队列,一个task可能会产生多个新的task,传递给scheduler
  • processor2result 也是result_queue,输出获取到的需要的数据,为最终的输出

程序的入口同样为run,核心方法只有一个,就是on_task,处理唯一的输入inqueue中获取到的task,主要做了这么几件事

  • 处理下task,该找外链的找外链,该获取格式化数据的获取数据,并发送到result_queue中。(这部分在ProjectManager这个类的on_result方法中完成)

  • 把task的内容做一些处理,形成一个新的dict,包含爬取状态,时间等信息,发到status_queue

  • 处理找到的外链(如果有需要的话,即在回调中有调用self.crawl)包装一下,发送给newtask_queue

result worker

result worker只涉及到一个队列,就是processor中输出的result queue。

这部分我觉得是pyspider比较弱的一部分,类似于scrapy中的Pipeline,对输出的数据进行一些处理,如保存数据库等。需要继承实现一个ResultWorker类。默认的这个类会把数据保存到resultdb中,但我们实际需要的肯定不止如此,可以重写on_result方法做一些处理。

不过因为所有的输出都在一个队列,所以result worker也只能有一类(并不是一个,可以做分布式处理),处理一个类似的逻辑,比如统统都保存到mongo。或者在一个result worker中写判断语句,进行不同的逻辑处理。但这样就不够优雅了。

总结

pyspider应该算是一个相当不错的框架,代码很清晰,很适合去读。不过适合的场景还是比较有限,着重于调度,分布式爬取,弱化了对数据的处理部分(当然,这部分也可以很方便的扩展)。

猜你喜欢

转载自blog.csdn.net/weixin_34010566/article/details/87522604