RabbitMQ实现了AMQP协议(高级消息队列协议)。
基于我对RabbitMQ的理解:
其基本组成结构有:publisher(生产者)、 exchange(交换机)、queue(消息队列)、consumer(消费者)
各个数据结构介绍:
publisher(生产者): 作为消息的产生者,产生消息存于消息队列中,生产的过程就是厂家将商品出货。
exchange(交换机): 消息的分配员,消息首先经过它,分配员按商品种类将放于不同的自动贩卖机中。
queue(消息队列): 消息的存储仓库,自动售卖机。
consumer(消费者): 消息的消费者,从存储仓库中取出消息,消费的过程就是人从自动贩卖机中购买商品。
常见的exchange类型有 topic, headers, direct, fanout。
补充一个概念:
消息确认机制: 就是交互双方,消息的发送者将消息发送出去后,接收者需要反馈给消费者消息已经接收到(这里无所谓消息的处理是否完成), 而amqp的交互双方也需要这个机制来保证消息确实被接受到了(所以接收者在接收到消息后需要返回ack确认包,因为交互永远是双向的不会再进行无意义的重复确认,即这里不再考虑网络延迟而导致发送者并没有接收到ack包),来保证消息确实已经被接收者接收到,可以将消息移除仓库。打个不巧当的比喻,就是你要付了钱,自动贩卖机才正真的将商品出售。
默认的exchange: 当生产者不指定exchange时,消息会发配到默认exchange,分配规则:默认exchange则会根据指定的队列queue将消息发送到queue中。
(我对MQ的生产和消费过程进行了封装 `但并没有实现高可用`,让使用者将注意转移到业务逻辑处理上)
以下是代码,目录结构:
/mq/abs 抽象层
/mq/obj 具体实现层
/mq/config 配置
/mq/web 应用层(demo)
/mq/abs/MqclientObject.php
<?php
namespace mq\abs;
use \PhpAmqpLib\Connection as MQC;
use \PhpAmqpLib\Exception\AMQPHeartbeatMissedException;
abstract class MqClientObject
{
public $channel;
private static $connect;
private static $config;
private static $lastConnectTime;
public function __construct(array $config)
{
if(!(self::$connect instanceof MQC\AMQPStreamConnection)) {
self::connect($config);
}
if(!self::$config) {
self::$config = $config;
}
$this->channel = self::$connect->channel();
}
private static function connect(array $config)
{
self::$connect = new MQC\AMQPStreamConnection(
$config['host'],
$config['port'],
$config['user'],
$config['password'],
isset($config['vhost']) && !empty($config['vhost']) ? $config['vhost'] : '/'
);
self::$lastConnectTime = time();
}
public static function close()
{
try{
self::$connect->close();
self::$connect = null;
self::$config = null;
} catch (\Exception $ex) {
return false;
}
return true;
}
public function createTempQueue()
{
list($queue, ) = $this->channel->queue_declare("", false, false, true, false);
return $queue;
}
public function ping()
{
try{
self::$connect->checkHeartBeat();
} catch (AMQPHeartbeatMissedException $MQheartEx) {
return false;
} catch(\Exception $ex) {
//todo log
exit(0);
}
return true;
}
public function reConnect()
{
self::connect(self::$config);
}
}
/mq/abs/MqconsumerObject.php
<?php
namespace mq\abs;
abstract class MqConsumerObject extends MqClientObject
{
public $queue;
private $blocking = true;
private $qos = true;
private $ack = false;
private $qosCount = 1;
public function setQueue($queue)
{
$this->queue = $queue;
return $this;
}
public function disableAck()
{
$this->ack = true;
}
public function disablePos()
{
$this->qos = false;
}
public function setQosCount($count)
{
$this->qosCount = $count;
}
public function disableBlocking()
{
$this->blocking = false;
}
//默认的消费方式
public function consume($failClose = false)
{
if($this->qos) {
$this->channel->basic_qos(null, $this->qosCount, null);
}
$this->channel->basic_consume(
$this->queue,
'',
false,
$this->ack,
false,
false,
[$this, 'call']
);
while(count($this->channel->callbacks)) {
$this->channel->wait();
if(!$this->blocking) {
$this->channel->close();
}
}
}
//回调函数
public function call($event)
{
$message = $event->body;
$channel = $event->delivery_info['channel'];
if($this->run($message)) {
$channel->basic_ack($event->delivery_info['delivery_tag']);
}
}
public function closeChannel()
{
try{
$this->channel->close();
$this->channel = null;
} catch (\Exception $ex) {
return false;
}
return true;
}
//业务处理
public abstract function run($message);
public function __destruct()
{
if($this->channel != null) {
$this->closeChannel();
}
}
}
/mq/abs/MqPublisherObject.php
<?php
namespace mq\abs;
abstract class MqPublisherObject extends MqClientObject
{
public $exchange;
public $bindKey;
public $queue;
public function setExchange($exchange)
{
$this->exchange = $exchange;
return $this;
}
public function setBindKey($bindKey)
{
$this->bindKey = $bindKey;
return $this;
}
public function setQueue($queue)
{
$this->queue = $queue;
return $this;
}
public abstract function bind();
public abstract function publish($message, $config = null);
}
/mq/config/mq_config
<?php
define('MQ_CONFIG', [
'host' => '127.0.0.1',
'port' => 5672,
'user' => 'dora',
'password' => 'hopeforyou',
'vhost' => '/dora'
]);
/mq/obj/DirectPublisherObject.php
<?php
namespace mq\obj;
use \mq\abs as MA;
use \PhpAmqpLib\Message as PM;
class DirectPublisherObject extends MA\MqPublisherObject
{
public function bind()
{
$this->channel->queue_bind($this->queue, $this->exchange, $this->bindKey);
}
public function publish($message, $config = null)
{
$objMessage = new PM\AMQPMessage($message);
$this->channel->basic_publish($objMessage, $this->exchange, $this->bindKey);
}
}
/mq/obj/FanoutPublisherObject.php
<?php
namespace mq\obj;
use \mq\abs as MA;
use \PhpAmqpLib\Message as PM;
class FanoutPublisherObject extends MA\MqPublisherObject
{
public function bind()
{
$this->channel->queue_bind($this->queue, $this->exchange);
}
public function publish($message, $config = null)
{
//暂不支持$config配置
$objMessage = new PM\AMQPMessage($message);
$this->channel->basic_publish($objMessage, $this->exchange);
}
}
/mq/obj/TopicsPublisherObject.php
<?php
namespace mq\obj;
use \mq\abs as MA;
use \PhpAmqpLib\Message as PM;
class TopicsPublisherObject extends MA\MqPublisherObject
{
public function bind()
{
$this->channel->bind_queue($this->queue, $this->exchange, $this->bindKey);
}
public function publish($message, $config = null)
{
$objMessage = new PM\AMQPMessage($message);
$this->channel->basic_publish($objMessage, $this->exchange, $this->bindKey);
}
}
/mq/obj/PayCallbackConsumer.php
<?php
namespace mq\obj;
use mq\abs\MqConsumerObject;
class PayCallbackConsumer extends MqConsumerObject
{
public function run($message)
{
//todo 业务逻辑代码
//消费者消费一次完毕后就不再消费
// $this->disableBlocking();
//return true 代表消息处理完毕
//return false 代表消息处理异常,不正真消费
}
}
/mq/web/index.php 测试代码
<?php
define('APP_PATH', dirname(__DIR__));
require APP_PATH.'/vendor/autoload.php'; //注册自动加载函数列表
require APP_PATH.'/config/mq_config.php'; //mq注册常量
//use \mq\obj as MO;
//将$queue绑定到exchange dora.fanout上
//$objMq = new MO\FanoutPublisherObject(MQ_CONFIG);
////交换机绑定队列
////$queue = 'dora.hope.for.you';
////$objMq->setExchange('dora.fanout')->setQueue($queue)->bind();
////向交换机发送消息
//$objMq->setExchange('dora.fanout');
//$objMq->publish('hope for you!');
//将$queue绑定到exchange dora.direct上
//$objMq = new MO\DirectPublisherObject(MQ_CONFIG);
//$bindKey = 'hope';
//交换机绑定队列
//$queue = 'dora.direct.hope.for';
//$objMq->setExchange('dora.direct')->setBindKey($bindKey)->setQueue($queue)->bind();
//向交换机发送消息
//$objMq->setExchange('dora.direct')->setBindKey($bindKey);
//$objMq->publish('hope for you!');
//将$queue绑定到exchange dora.topic上
//$objMq = new MO\DirectPublisherObject(MQ_CONFIG);
//$bindKey = 'hope.for';
//交换机绑定队列
//$queue = 'dora.topic.hope.for';
//$objMq->setExchange('dora.topic')->setBindKey($bindKey)->setQueue($queue)->bind();
//向交换机发送消息
//$objMq->setExchange('dora.topic')->setBindKey($bindKey);
//$objMq->publish('hope for you!');
//消费队列
//$objConsumer = new MO\PayCallbackConsumer(MQ_CONFIG);
//$queue = 'dora.hope.for.you';
//$objConsumer->setQueue($queue)->consume();
使用者在将消息上传到消息队列时,只需要关心用到的交换机exchange, 路由route。(实际项目中并不会由代码创建队列(临时队列除外)或者交换机);
使用者在消费队列时,也只需要继承MqconsumerObject类并实现run方法,即可对消息进行处理;
参考: