以爬douban小说为例
首先启动Redis,新建文件crawl_douban.py
import requests
from bs4 import BeautifulSoup
import time
from celery import Celery
import redis
from configparser import ConfigParser
cp=ConfigParser()
cp.read('config')
#获取配置信息
db_host=cp.get(section='redis',option='db_host')
db_port=cp.getint('redis','db_port')
db_pwd=cp['redis']['db_pwd']
#redis连接
pool = redis.ConnectionPool(host=db_host, port=db_port, db=15, password=db_pwd)
r = redis.StrictRedis(connection_pool=pool)
set_name='crawl:douban'
app = Celery('crawl', include=['task'], broker='redis://:{}@{}:{}/12'.format(db_pwd,db_host,db_port), backend='redis://:{}@{}:{}/13'.format(db_pwd,db_host,db_port))
# 官方推荐使用json作为消息序列化方式
app.conf.update(
CELERY_TIMEZONE='Asia/Shanghai',
CELERY_ENABLE_UTC=True,
CELERY_ACCEPT_CONTENT=['json'],
CELERY_TASK_SERIALIZER='json',
CELERY_RESULT_SERIALIZER='json',
)
headers={
'User-Agent':'',
}
@app.task
def crawl(url):
res=requests.get(url,headers=headers)
#延迟2秒
time.sleep(2)
soup=BeautifulSoup(res.text,'lxml')
items=soup.select('.subject-list .subject-item .info h2 a')
titles=[item['title'] for item in items]
#将小说的title存入redis数据库
r.sadd(set_name,(url,titles,time.time()))
print(titles)
return (url,titles)
将上面的脚本部署到两台主机A和B,然后各自运行下面的命令:
celery -A crawl_douban worker -l info
在本机C新建文件task_dispatcher.py用于异步分发任务,代码如下:
from crawl_douban import app
from crawl_douban import crawl
def manage_crawl(urls):
for url in urls:
app.send_task('crawl_douban.crawl', args=(url,))
#上句也可以写成 crawl.apply_async(args=(url,)) 或 crawl.delay(url)
if __name__ == '__main__':
start_url = 'https://book.douban.com/tag/小说'
#爬去10页,每页20本书
url_list = ['{}?start={}&type=T'.format(start_url, page * 20) for page in range(10)]
manage_crawl(url_list)
运行task_dispatcher.py,跑完用时2.8s
celery worker -A tasks --loglevel=info --concurrency=5
- 参数”-A”指定了Celery实例的位置
- 参数”loglevel”指定了日志等级,也可以不加,默认为warning。
- 参数”concurrency”指定最大并发数,默认为CPU核数。
[program:celery]
command=celery worker -A tasks --loglevel=info --concurrency=5
directory=/home/user_00/learn
stdout_logfile=/home/user_00/learn/logs/celery.log
autorestart=true
redirect_stderr=true