在我们的项目开发过程中,我们有时会有时候有两个或者多个程序交互的情况,当然就会使用到这里的消息队列来实现。现在比较火的就是RabbitMQ,还有一些ZeroMQ ,ActiveMQ 等等,著名的openstack默认用的RabbitMQ来实现的。
python中我们使用pika模块来操作消息队列,当然Celery也是python中比较火的做分布式消息队列的模块。
1,RabbitMQ的安装
参考链接https://www.cnblogs.com/zzqit/p/10158923.html
2,最简单的发收事例
发送端(producer)
1 import pika 2 3 # 建立一个实例 4 connection = pika.BlockingConnection( 5 pika.ConnectionParameters('localhost') # 默认端口5672,可不写 6 ) 7 8 # 声明一个管道,在管道里发消息 9 channel = connection.channel() 10 11 # 在管道里声明queue名字为test 12 channel.queue_declare(queue='test') 13 14 # 指明发送队列的名字跟内容 15 channel.basic_publish(exchange='', 16 routing_key='test', # queue名字 17 body='Hello World!' # 消息内容 18 ) 19 20 print(" [x] Sent 'Hello World!'") 21 22 connection.close() # 队列关闭
消费端(consumer)
1 import pika 2 3 # 建立实例 4 connection = pika.BlockingConnection( 5 pika.ConnectionParameters('localhost') 6 ) 7 8 # 声明管道 9 channel = connection.channel() 10 11 # 这里声明queue的名字防止消费端先跑,服务端还没开启报错 12 channel.queue_declare(queue='test') 13 14 # 消费端的回调函数 15 def callback(ch, method, properties, body): 16 print(" [x] Received %r" % body) 17 ch.basic_ack(delivery_tag = method.delivery_tag) # 告诉生成者,消息处理完成 18 19 channel.basic_consume( # 消费消息 20 callback, # 如果收到消息,就调用callback函数来处理消息 21 queue='hello', # 你要从那个队列里收消息 22 # no_ack=True # 为True不管消费者消费的时候是否处理完成,这条消息在队列中就没有了 23 ) 24 25 print(' [*] Waiting for messages. To exit press CTRL+C') 26 27 channel.start_consuming() # 开始消费消息
当然这里的连接方式是针对在本地连接,如果需要连接远程就采用下面的方式进行连接
credentials = pika.PlainCredentials('test', '123456') # rabbitmq登录账号 connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.12.12',5672,'/',credentials)) # 前面为ip地址,5672为端口号
3,消息持久化及公平分发
我们可能会遇到生产者服务器意外宕机了,这样我们生成的消息队列跟里面的消息就会全部没有,当然这样肯定是不行的,所以我们可以通过下面的设置消息持久化跟队列持久化来达到最终的目的。
上面的问题解决了以后,在我们实际的生产过程中,每个消费者服务器的性能是不均等的,所以我们需要根据不同服务器的性能做负载均衡,实现公平分发。当然下面已经解决了提到的这两个问题。
发送端(producer)
1 import pika 2 import sys 3 4 connection = pika.BlockingConnection( 5 pika.ConnectionParameters(host='localhost') 6 ) 7 8 channel = connection.channel() 9 10 channel.queue_declare(queue='test', durable=True) # 队列持久化 11 12 message = ' '.join(sys.argv[1:]) or "Hello World!" # sys.argv获取输入命令后面的参数 13 14 channel.basic_publish(exchange='', 15 routing_key='test', # 队列名称 16 body=message, # 消息体 17 properties=pika.BasicProperties( 18 delivery_mode=2, # 消息持久化 19 )) 20 21 print(" [x] Sent %r" % message) 22 23 connection.close()
消费端(consumer)
1 import pika 2 3 connection = pika.BlockingConnection( 4 pika.ConnectionParameters(host='localhost') 5 ) 6 7 channel = connection.channel() 8 9 channel.queue_declare(queue='test', durable=True) # 消费端也要写durable=True 10 11 print(' [*] Waiting for messages. To exit press CTRL+C') 12 13 14 def callback(ch, method, properties, body): 15 print(" [x] Received %r" % body) 16 ch.basic_ack(delivery_tag=method.delivery_tag) # 告诉生产者处理完成 17 18 channel.basic_qos(prefetch_count=1) # 做公平分发,如果有一个消息就不给你发了 19 20 channel.basic_consume(callback, 21 queue='test') 22 23 channel.start_consuming()
4,广播模式
前面的效果都是生产者生产了消息以后把消息放入队列中然后等消费者来消费,当然也可以做成广播的形式,发送端发的同时,绑定的消费端就实时接受到数据,当然如果发消息的时候没绑定就不会把这个消息存下来。下面会简单介绍三种模式:fanout,direct,topic
(1)消息发布订阅(fanout)
发送端
1 import pika 2 import sys 3 4 connection = pika.BlockingConnection( 5 pika.ConnectionParameters(host='localhost') 6 ) 7 8 channel = connection.channel() 9 10 channel.exchange_declare(exchange='logs', # 声明广播管道 11 type='fanout') # 类型为fanout只要是绑定了exchange为logs的都可以接受 12 13 message = ' '.join(sys.argv[1:]) or "info: Hello World!" 14 15 channel.basic_publish(exchange='logs', # 发送的广播管道 16 routing_key='', # 需要等消费者接入,为空,必须有 17 body=message) 18 19 print(" [x] Sent %r" % message) 20 21 connection.close()
接收端
1 import pika 2 3 connection = pika.BlockingConnection( 4 pika.ConnectionParameters(host='localhost') 5 ) 6 7 channel = connection.channel() 8 9 channel.exchange_declare(exchange='logs', # 声明和生产者一样 10 type='fanout') 11 12 res = channel.queue_declare(exclusive=True) # 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除 13 queue_name = res.method.queue # 分配的queue名字 14 15 channel.queue_bind(exchange='logs', 16 queue=queue_name) 17 18 19 def callback(ch, method, properties, body): 20 print(" [x] %r" % body) 21 22 channel.basic_consume(callback, 23 queue=queue_name, 24 no_ack=True) # 广播是实时的,消息不保存 25 26 channel.start_consuming()
(2)有选择的接收(direct)
前面的fanout消费者是默认接收所有的数据,但是有时候需要筛选一下,只接收自己想要的数据,就需要用到这里的direct了。
发送端
1 import pika 2 import sys 3 4 connection = pika.BlockingConnection( 5 pika.ConnectionParameters(host='localhost') 6 ) 7 8 channel = connection.channel() 9 10 channel.exchange_declare(exchange='direct_logs', # 跟前面的fanout差不多 11 type='direct') 12 13 severity = sys.argv[1] if len(sys.argv) > 1 else 'info' # 发送消息级别,默认为info 14 15 message = ' '.join(sys.argv[2:]) or 'Hello World!' # 发送消息内容 16 17 channel.basic_publish(exchange='direct_logs', 18 routing_key=severity, # 发送消息的级别 19 body=message) 20 21 print(" [x] Sent %r:%r" % (severity, message)) 22 23 connection.close()
接收端
1 import pika 2 import sys 3 4 connection = pika.BlockingConnection( 5 pika.ConnectionParameters(host='localhost') 6 ) 7 8 channel = connection.channel() 9 10 channel.exchange_declare(exchange='direct_logs', 11 type='direct') 12 13 result = channel.queue_declare(exclusive=True) # #不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除 14 queue_name = result.method.queue 15 16 severities = sys.argv[1:] # 获取运行脚本的参数 17 if not severities: 18 sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) 19 sys.exit(1) 20 21 for severity in severities: # 循环绑定所有想接收的级别 22 channel.queue_bind(exchange='direct_logs', 23 queue=queue_name, 24 routing_key=severity) 25 26 def callback(ch, method, properties, body): 27 print(" [x] %r:%r" % (method.routing_key, body)) 28 29 channel.basic_consume(callback, 30 queue=queue_name, 31 no_ack=True) 32 33 channel.start_consuming()
运行代码示例
python producer.py info hello # 运行发送端,级别为info,内容为hello python consumer.py info error # 接收端,接收级别为info跟error python consumer.py info warning # 接收端,接收级别为info跟warning python consumer.py error warning # 接收端,接收级别为error跟warning # 最终收到消息的有1,2两个,第三个收不到
(3)更加细致的过滤(topic)
发送端
1 import pika 2 import sys 3 4 connection = pika.BlockingConnection( 5 pika.ConnectionParameters(host='localhost') 6 ) 7 8 channel = connection.channel() 9 10 channel.exchange_declare(exchange='topic_logs', 11 type='topic') 12 13 routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' 14 15 message = ' '.join(sys.argv[2:]) or 'Hello World!' 16 17 channel.basic_publish(exchange='topic_logs', 18 routing_key=routing_key, 19 body=message) 20 21 print(" [x] Sent %r:%r" % (routing_key, message)) 22 23 connection.close()
接收端
1 import pika 2 import sys 3 4 connection = pika.BlockingConnection( 5 pika.ConnectionParameters(host='localhost') 6 ) 7 8 channel = connection.channel() 9 10 channel.exchange_declare(exchange='topic_logs', 11 type='topic') 12 13 result = channel.queue_declare(exclusive=True) # 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除 14 queue_name = result.method.queue 15 16 binding_keys = sys.argv[1:] 17 if not binding_keys: 18 sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) 19 sys.exit(1) 20 21 for binding_key in binding_keys: 22 channel.queue_bind(exchange='topic_logs', 23 queue=queue_name, 24 routing_key=binding_key) 25 26 27 def callback(ch, method, properties, body): 28 print(" [x] %r:%r" % (method.routing_key, body)) 29 30 channel.basic_consume(callback, 31 queue=queue_name, 32 no_ack=True) 33 34 channel.start_consuming()
在使用topic的情况下,接收端可以通过下面几种写法筛选自己想要接收的数据
python consumer.py *.info # 所有info结尾的 python consumer.py *.info nginx.* # 所有info结尾的和nginx开始的 python consumer.py '#' # 接收所有