Python的queue介绍
Python的队列,内置的有两种,一种是线程queue,另一种是进程queue,但是这两种queue都是只能在同一个进程下的线程间或者父进程与子进程之间进行队列通讯,并不能进行程序与程序之间的信息交换。这种情况下,就要引用一个中间件,来实现程序之间的通讯。可实现的工具有 Redis\httpsqs\RabbitMQ等,以RabbitMQ为例。
RabbitMQ
MQ并不是python内置的模块,而是一个需要你额外安装的程序,安装完毕后可通过python中内置的pika模块来调用MQ发送或接收队列请求。接下来我们就看几种python调用MQ的模式与方法。
Centos 7 进行RabbitMQ 安装流程,如下:
Centos7安装RabbitMQ 先安装Erlang #rpm -Uvh http://www.rabbitmq.com/releases/erlang/erlang-18.1-1.el7.centos.x86_64.rpm 安装rabbitmq-server #rpm -Uvh http://www.rabbitmq.com/releases/rabbitmq-server/v3.5.6/rabbitmq-server-3.5.6-1.noarch.rpm 查看rabbitmq-server有没有安装好,能查到说明已经安装成功了 #rpm -qa|grep rabbitmq 用以下命令安装维护插件: #rabbitmq-plugins enable rabbitmq_management 开启rabbit-server用以下命令: #service rabbitmq-server restart 用以下命令查看rabbit-server当前状态 #rabbitmqctl status 创建一个账户,赋予管理员权限 # rabbitmqctl add_user chen1203 chen1203. (账号 密码) # rabbitmqctl set_user_tags chen1203 administrator 命令查看创建完的账号 #rabbitmqctl list_users 删除用户 #rabbitmqctl delete_user username 修改密码 #rabbitmqctl oldPassword Username newPassword 浏览器:http://外网ip:15672/ 登录
重要概念理解:
Broker:简单来说就是消息队列服务器实体。 Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。 Queue:消息队列载体,每个消息都会被投入到一个或多个队列。 Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。 Routing Key:路由关键字,exchange根据这个关键字进行消息投递。 vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。 producer:消息生产者,就是投递消息的程序。 consumer:消息消费者,就是接受消息的程序。 channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务
将使用Python编写两个小程序:
发送单个消息的生产者(发送者),以及接收消息并将其打印出来的消费者(接收者)。这是一个消息传递的“Hello World”。
图中,“P”是我们的生产者,“C”是我们的消费者。中间的盒子是一个队列 - RabbitMQ代表消费者保存的消息缓冲区。
整体设计将如下所示:
P:生产者简称,一个发送消息的程序是一个生产者。尽管消息流经RabbitMQ或者您的应用程序,但它们只能存储在队列中。一个队列只受主机内存和磁盘限制的约束,它本质上是一个很大的消息缓冲区。许多生产者可以发送进入一个队列的消息,并且许多消费者可以尝试从一个队列接收数据。
hello:queue_name 队列名称。这里命名为hello 。生产者发送信息到队列,消费者从队列中取数据。
C:消费与接受有类似的意义。一个消费者是一个程序,主要是等待接收信息。
举例:
生产者:RabbitMQ_producers.py
import pika rabbitmq_passwd = pika.PlainCredentials('chen12034','chen12033..') connection = pika.BlockingConnection(pika.ConnectionParameters('203.66.8.41',5672,'/',rabbitmq_passwd)) channel = connection.channel() #声明一个管道 #声明queue队列 channel.queue_declare(queue='hello') channel.basic_publish(exchange='', routing_key='hello', #队列名字 body='Hello World!') #消息内容 print(" [x] Sent 'Hello World!'") connection.close() #队列关闭
消费者:RabbitMQ_client.py
import pika rabbitmq_passswd = pika.PlainCredentials('chen12034','chen12033..') connection = pika.BlockingConnection(pika.ConnectionParameters('203.66.8.41',5672,'/',rabbitmq_passswd)) channel = connection.channel() #声明一个管道 #声明queue队列, channel.queue_declare(queue='hello') #ch 管道的内存对象地址 def callback(ch, method, properties, body): print(" [x] Received %r" % body) #开始消费消息 声明语法 channel.basic_consume(callback, #调用函数,如果收到消息就调用函数来处理消息 queue='hello', no_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming() #开始收队列
打印信息如下:
生产者: [x] Sent 'Hello World!' 消费者: [*] Waiting for messages. To exit press CTRL+C [x] Received b'Hello World!' [x] Received b'Hello World!' [x] Received b'Hello World!'
结论:消费者只要一启动,就会启动在那里,不停的收取信息。除非CTRL+c终止,进程。这里模拟生产者,每次只发送一次信息。可以多执行几次。
插播报错信息处理流程:
远程调用RabbitMQ端口报错,报错如下:
/Library/Frameworks/Python.framework/Versions/3.5/bin/python3.5 /Users/mac/PycharmProjects/untitled2/51CTO/7days/rabbitmq_producers.py Traceback (most recent call last): File "/Users/mac/PycharmProjects/untitled2/51CTO/7days/rabbitmq_producers.py", line 4, in <module> connection = pika.BlockingConnection(pika.ConnectionParameters('203.66.5.4',5672,'/',rabbitmq_passwd)) File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/pika-0.11.2-py3.5.egg/pika/adapters/blocking_connection.py", line 374, in __init__ File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/pika-0.11.2-py3.5.egg/pika/adapters/blocking_connection.py", line 414, in _process_io_for_connection_setup File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/pika-0.11.2-py3.5.egg/pika/adapters/blocking_connection.py", line 466, in _flush_output pika.exceptions.ProbableAccessDeniedError: (-1, 'EOF')
修改 RabbitMQ用户权限,流程如下:
调整RabbitMQ 用户访问权限:把NO access 改为 /。
可点击用户名设置:
或者使用命令调整:#rabbitmqctl set_permissions -p '/' chen1203 ".*" ".*" ".*"Setting permissions for user "chen1203" in vhost "/" ...
效果如下:
整体设计将如下所示:
启动三个消费者程序,多次执行生产程序发送消息。观察发现消费者会先启动优先接收的原则,进行获取。模拟过程中,可以加入time.sleep()参数。
举例如下:
生产者:
import pika rabbitmq_passwd = pika.PlainCredentials('chen1203','chen1203..') connection = pika.BlockingConnection(pika.ConnectionParameters('203.66.5.4',5672,'/',rabbitmq_passwd)) channel = connection.channel() #声明一个管道 #声明queue队列 channel.queue_declare(queue='hello') body = "hello word11" channel.basic_publish(exchange='', routing_key='hello', #队列名字 body=body) #消息内容 print(" [x] Sent %s" % body) connection.close() #队列关闭
消费者:
#no_ack=True 信息处理完成或没处理完成,都不会给服务器端确认。当队列信息不关心的情况下,可以设置no_ack=True(简单说可以允许信息丢失)。一般默认情况下不加,让其他消费者客户端继承接收,信息接收完成,需要手动确认一下,保证消息被完整处理。
import pika import time rabbitmq_passswd = pika.PlainCredentials('chen1203','chen1203..') connection = pika.BlockingConnection(pika.ConnectionParameters('203.66.5.4',5672,'/',rabbitmq_passswd)) channel = connection.channel() #声明一个管道 #声明queue队列, channel.queue_declare(queue='hello') #ch 管道的内存对象地址 def callback(ch, method, properties, body): time.sleep(10) print(" [x] Received %r" % body) #开始消费消息 声明语法 channel.basic_consume(callback, #调用函数,如果收到消息就调用函数来处理消息 queue='hello', #no_ack=True ) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming() #开始收队列
信息输出:
/Library/Frameworks/Python.framework/Versions/3.5/bin/python3.5 /Users/mac/PycharmProjects/untitled2/51CTO/7days/rabbitmq_client.py [*] Waiting for messages. To exit press CTRL+C [x] Received b'hello word5' [x] Received b'hello word11' [x] Received b'hello word' [x] Received b'hello word2' [x] Received b'hello word5' [x] Received b'hello word10' [x] Received b'hello word1' [x] Received b'hello word6' [x] Received b'Hello World!' [x] Received b'Hello World1122!' [x] Received b'hello word' [x] Received b'hello word3' [x] Received b'hello word4' [x] Received b'hello word6' [x] Received b'hello word7' [x] Received b'hello word9'
结论:当把消费者C1 、C2 慢慢关掉之后,只保留C3 一个的时候。C3会把C1、C2的信息进行接管。可以理解为,只要RabbitMQ 服务器不挂,信息没有丢失。