scrapy作为一个专业的爬虫框架,不仅有完备的爬取工具,同样有完备的数据处理工作流。下面我们通过item和pipeline来介绍scrapy推荐的数据处理流程。
Item数据对象
英语字典中对item的解释是列表或群组中的一项。放在这里就是一条数据记录的对象。比如我们要抓取一个名单,那么item就是一个人。如果是商品列表,那么item就是一个商品。
item的主要作用是规范数据结构,简单粗暴的理解就是去掉多余的字段或补充缺失的字段,让一条数据记录保证一个一致的数据结构。它非常便于接下来存储到行列表的数据结构中(RDB或Excel)。下面定义了一个商品的item:
class ProductItem(scrapy.Item):
name = scrapy.Field()
price = scrapy.Field()
stock = scrapy.Field()
tags = scrapy.Field()
last_updated = scrapy.Field(serializer=str)
复制代码
我们可以看到定义过程非常简单,赋值的左侧直接写出我们要保留的字段即可。赋值的右边是清一色的scrapy.Field()即可,只有特殊情况需要定义一下序列化方法。
它的用法同样非常简单,假设我们在API中拿到的一条商品数据包含10个字段,但是我们只需要上面的5个字段,我们无须特殊的处理,直接把这条10个字段的数据传入构造函数即可。item会自动根据对应字段进行填充。
apidata = {
"name":"《scrapy教程》",
"price":"99.99",
"stock":"1",
"cover":"https://img.scrapy.xxx/xxxxxxx.jpg",
"summary":"这是一本scrapy教程……",
"url_buy":"https://shop.scrapy.xxx/xxx.html",
"seller":"西门老铁",
"created":"2020-02-02",
"last_updated":"2021-12-02",
"status":"0"
}
ProductItem(apidata)
复制代码
看上例的最后一行,只需要把所有的数据传入ProductItem的构造函数即可完成字段的选择和赋值。并且只保留class中定义的5个字段。
Pipeline流水线
流水线应该不难理解,就是用来处理爬虫获得的数据的,所有yield输出的数据都会进入流水线,而且一个爬虫可以定义若干个流水线,比如我抓到一组商品信息,我可以用一个流水线来做一层简单的数据处理(比如去重),然后再把他写入MySQL,同时输出一份Excel报表。我就可以定义三个流水线,①数据处理、②写入MySQL、③写Excel。
流水线可以定义优先级表示执行顺序,但是请注意,这里的顺序是单个item的顺序,而不是一个流水线处理完所有item再启动另一个流水线。
一个pipeline的定义如下
class PricePipeline:
vat_factor = 1.15
def process_item(self, item, spider):
adapter = ItemAdapter(item)
if adapter.get('price'):
if adapter.get('price_excludes_vat'):
adapter['price'] = adapter['price'] * self.vat_factor
return item
else:
raise DropItem(f"Missing price in {item}")
复制代码
一个pipeline的类里面最关键的方法便是process_item,用来处理每一个item。除此之外,还有open_spider和close_spider。注意这里open和close的后面是spider,也就是说在爬虫启动和关闭的时候调用的,而不是数据到来的时候。换句话说,在爬虫启动还没有开始抓数据的时候流水线就启动了。这里通常对应一些前置和收尾的动作,比如MySQL连接的open和close可以对应在这里执行,写入Excel文件的open和close。下面是一个MySQL的流水线示例
class WriteMysqlPipeline:
def open_spider(self,spider):
print('------pipline mysql writer------')
self.conn = pymysql.connect(host=spider.db_host,
user=spider.db_user,
password=spider.db_pwd,
database=spider.db_name,
charset='utf8mb4')
# 创建游标
self.cursor = self.conn.cursor()
def process_item(self, item, spider):
# sql语句
insert_sql = """
insert into ods_product(name,price,stock,tags,last_updated) VALUES(%s,%s,%s,%s,%s)
"""
# 执行插入数据到数据库操作
self.cursor.execute(insert_sql,(item['name'], item['price'], item['stock'],item['tags'],item['last_updated']))
return item
def close_spider(self,spider):
# 提交,不进行提交无法保存到数据库
self.conn.commit()
# 关闭游标和连接
self.cursor.close()
self.conn.close()
复制代码
多item多pipeline的协作
很多时候,我们的一个爬虫会得到不止一种数据,这就意味着可能会yield多种item,我们无法指定哪个item只进哪个流水线,只要定义在ITEM_PIPELINES的设置项中的流水线,所有item一定会经过。所以我们需要在process_item中判断我们要处理的item再做处理。但是请注意,通常来说,如果流水线不是过滤器的话,process_item中的每一个分支,我们都要以return item退出,这样item才会被后面的流水线接收到。就像下面这样:
def process_item(self, item, spider):
if not isinstance(item, ProductItem):
return item
#处理ProductItem
...
return item
复制代码
另外,我们的一个scrapy项目中往往也是有多个spider,每个sipder产生的item和对应的流水线都不同,但是settings.py中定义的ITEM_PIPELINES是全局生效的,这时候我们可以关闭settings.py中的配置,然后在对应的spider下面单独配置如下:
class ProductSpider(scrapy.Spider):
name = 'product'
custom_settings = {
'ITEM_PIPELINES': {
'shop.pipelines.ProductPipeline': 100,
'shop.pipelines.WriteMysqlPipeline': 200,
},
}
复制代码
如上配置就可以控制流水线只在ProductSpider生效,注意shop.pipelines.WriteMysqlPipeline是pipeline的完整类名,一般只需要替换最前面的shop为你的scrapy项目名称即可。