爬虫漫游指南
消息队列(2) — RabbitMQ
1. RabbitMQ简介
上一篇文章介绍了消息队列中的业余选手Redis,这位业余选手把消息pop出去之后就不管了,如何实现对消息的可靠消费就要靠我们自己来解决了。这在某些场景下是比较麻烦的,所以,本文带来了消息队列中的职业选手——RabbitMQ。
MQ(Message Queue)就是消息队列的缩写,因此光看名字,RabbitMQ的专业程度就领先了Redis一大截。RabbitMQ是Erlang编写的,AMQP协议的一个开源实现,提供可靠消费机制、队列监控平台、消费者负载均衡等等高级功能,这些都是redis所不具备的。所以除了爬虫之外,RabbitMQ在一些分布式架构场景下也有广泛的应用。
Tips:我选择RabbitMQ并不是因为他最牛逼,而是因为在celery中使用比较方便,消息队列的选择还是要根据实际应用场景来判断。
2. RabbitMQ安装
以Windows系统下的安装为例,首先去到官网安装教程,下载里面给出的rabbitmq-server-版本号.exe文件。除此之外,还需要安装Erlang,前文说过RabbitMQ是Erlang写的,所以安装一下环境也是合情合理的。这里需要注意一下,官方指定了需要64位的Erlang,不要去装32位的给自己找麻烦。另外Erlang那个安装包我这里下载极慢,可以考虑去百度网盘下载,放心,就算是限速的百度网盘也会比它快的。
然后安装两个exe即可,先装Erlang再装RabbitMQ,相信脑回路不是太清奇的人都不会把这个顺序弄错。安装步骤就不截图了,只要识字并且会使用鼠标,就能轻松完成安装。
RabbitMQ安装完成后会按照默认配置自动启动,并且会开机自动启动,不需要自己再去注册系统服务。修改配置需要重启的话可以去Windows开始菜单里找。安装完成后,个人建议把目录中的sbin文件路径添加到环境变量中,使用起来会方便很多。添加环境变量后,随手输一个rabbitmqctl status
看看,出来一大串东西,就没问题了。
3. 使用Web管理插件
RabbitMQ自带许多插件,其中有一个不得不提的,就是Web管理插件。首先来看一下它的界面
一看就有一种功能丰富,牛逼哄哄的感觉,牛逼的东西那当然要用起来。插件是不会默认启动的,需要运行命令rabbitmq-plugins enable rabbitmq_management
开启Web管理插件。
运行上述命令后,插件会以默认参数启动,默认参数难免有不符合需求的时候,所以这里讲一下如何修改它的配置参数。跟着刚才步骤走的同学一定找到了sbin文件夹,现在就去它的隔壁,找到它的邻居etc文件夹,etc文件中有一个rabbitmq.config文件,打开编辑。这个文件比较大,最好是CTRL+F一下,刚才启动的是rabbitmq_management,所以搜索这个名字,就可以看到如下这么一大堆信息
%% ----------------------------------------------------------------------------
%% RabbitMQ Management Plugin
%%
%% See http://www.rabbitmq.com/management.html for details
%% ----------------------------------------------------------------------------
{rabbitmq_management,
[%% Pre-Load schema definitions from the following JSON file. See
%% http://www.rabbitmq.com/management.html#load-definitions
%%
%% {load_definitions, "/path/to/schema.json"},
%% Log all requests to the management HTTP API to a file.
%%
%% {http_log_dir, "/path/to/access.log"},
%% Change the port on which the HTTP listener listens,
%% specifying an interface for the web server to bind to.
%% Also set the listener to use SSL and provide SSL options.
%%
%% {listener, [{port, 12345},
%% {ip, "127.0.0.1"},
%% {ssl, true},
%% {ssl_opts, [{cacertfile, "/path/to/cacert.pem"},
%% {certfile, "/path/to/cert.pem"},
%% {keyfile, "/path/to/key.pem"}]}]},
%% One of 'basic', 'detailed' or 'none'. See
%% http://www.rabbitmq.com/management.html#fine-stats for more details.
%% {rates_mode, basic},
%% Configure how long aggregated data (such as message rates and queue
%% lengths) is retained. Please read the plugin's documentation in
%% http://www.rabbitmq.com/management.html#configuration for more
%% details.
%%
%% {sample_retention_policies,
%% [{global, [{60, 5}, {3600, 60}, {86400, 1200}]},
%% {basic, [{60, 5}, {3600, 60}]},
%% {detailed, [{10, 5}]}]}
]},
可以说是非常详细了,一般改的比较多的是端口,在listener那里,RabbitMQ的默认TCP端口是5672,所以道上很多兄弟都会用15672来做HTTP的端口,ip就写127.0.0.1就行,ssl的话我是设置成false的,配个ssl证书有点麻烦。ssl配成false之后,那个ssl_opts就不用再配了。配完之后大概就是这个样子
{listener, [{port, 15672},
{ip, "127.0.0.1"},
{ssl, false}
]},
把这段代码复制到原先的listener底下就可以了,不要把原来的注释删掉,说不定哪天你就用上它了。
配置完成之后,重启rabbit,rabbitmq-server restart
,就能使用了。除了作为管理界面外,下文即将提到的http api也需要用到它。
4. python中的简单使用
其实在官网文档中使用样例都写的很清楚了,所以这里给它简单封装一下(来自CV文档工程师最后的倔强)。
1)建立连接
首先上代码,完整代码就不贴了,相信大家看到片段就能理解主要内容了
def __build_connection(self):
# 和rabbit server建立连接
while not self.connection:
try:
logging.debug("Building connection to rabbitmq...")
# 用户凭证
self.__credentials = pika.PlainCredentials(
self.CONFIG["user"],
self.CONFIG["password"]
)
# 配置连接参数
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=self.CONFIG["host"],
port=self.CONFIG["port"],
virtual_host=self.CONFIG["v_host"],
credentials=self.__credentials,
)
)
except Exception as _e:
logging.exception(_e)
time.sleep(1)
建立连接非常简单,唯一想提一下的就是这里的virtual_host。在网上看到好几篇文章里都把这个参数写成了“/”,不知道是不是互相CV出来的,这样写的话未来在调用http api的时候就比较坑了,我是强烈不推荐这么写的,哪怕随手写个“abc”也行。
2)生产者(发布消息)
不哔哔,上马
def publish(self, key: str, body: str, exchange: str = ""):
"""
生产者发布消息到指定队列
:param key: 对列名
:param body: 要推送的内容
:param exchange: 要发送到的交换机
:return: None
"""
# 检查连接
if self.connection.is_closed:
self.__build_connection()
channel = self.connection.channel()
channel.queue_declare(queue=key)
channel.basic_publish(exchange=exchange, routing_key=key, body=body)
连接建立后,首先需要声明一个队列,如果将消息直接发布到一个不存在的队列,RabbitMQ会把这条消息丢掉,declare之后,如果队列不存在,那么会创建一个。
在basic_publish函数中,有一个exchange参数,是交换机的意思。在RabbitMQ中,永远不会直接把一条消息直接丢进队列里,永远会通过一个exchange来进行转发。
3)消费者(收取消息)
上马
def consume_simple(self, key: str, callback_func: callable, prefetch_count: int = None):
"""
消费者读取消息
:param key: 队列名
:param callback_func: 指定回调函数
:param prefetch_count: 公平调度
:return:
"""
# 检查连接
if self.connection.is_closed:
self.__build_connection()
channel = self.connection.channel()
if prefetch_count:
channel.basic_qos(prefetch_count=prefetch_count)
channel.queue_declare(queue=key)
channel.basic_consume(
queue=key,
on_message_callback=callback_func
)
channel.start_consuming()
和生产者比较类似,一样的代码就不提了,看看不一样的部分。
- 首先,basic_qos。这里声明了一个prefetch_count参数,是预读取数据的条数。这里涉及到一个公平调度(Fair dispatch)的概念,不再展开讲,有兴趣的可以去搜一下。简单来说,比如我把prefetch_count设置为1,那么读取一条数据后,rabbit就会等待这条数据处理完成后,返回ACK了,才会去读取下一条数据,如果处理完不发ACK,那就一直卡在那儿不动了。
- 然后是basic_consume。basic_consume里面的参数较旧版发生了一些改动,所以一些比较老的文章里的调用方法可能不对。使用的时候点进去看一下源码就清楚了。我的pika1.1.0中,队列名和回调函数是必选参数,因此只填了这两个。回调函数中,读一下body然后发个ack就完事儿了,就这么简单。
- 最后是start_consuming,开始消费,不调用这个的话程序是不会对队列中的消息做任何操作的。进去之后就相当于一个while true,队列中没消息了也不会停,需要手动停止。
4)关闭连接
马
def close(self):
# 关闭连接
if not self.connection.is_closed:
self.connection.close()
就像上完厕所要冲水一样,连接用完了得关。
5)http api
本来我是想实现一个获取队列长度的方法,结果找了一圈没找到,只找到了这个叫做http api的东西,那没办法,简单封装一下用起来吧。
def get_count_message(self, key: str):
"""
获取指定队列的count信息
:param key: 队列名
:return: ready消息数量,unacked消息数量,total总量
"""
url = f'http://{self.CONFIG["host"]}:{self.CONFIG["http_port"]}/api/queues/{self.CONFIG["v_host"]}/{key}'
try:
retry = 3
while retry > 0:
retry -= 1
req = requests.get(url, timeout=10, auth=(self.CONFIG["user"], self.CONFIG["password"]))
if req.status_code != 200:
continue
result = req.json()
ready_count = result['messages_ready']
unacknowledged_count = result['messages_unacknowledged']
total_count = result['messages']
return ready_count, unacknowledged_count, total_count
except Exception as _e:
logging.exception(_e)
讲几点tips:
- 发请求时要添加auth,否则会返回401,一个名为Unauthorized的状态码。
- vhost,就是上文提到的virtual_host,如果用了斜杠那这边就没法用了,所以一定要随便写点什么。
- response中还有很多其它信息,不止这几个count。
讲完,收工。