1.消息队列的概念,原理,场景、
1.1消息队列概念
队列机构的中间件
消息放入后,不需要立即处理
由订阅者,消费者按顺序处理
1.2 核心结构
1.3应用场景
冗余 解耦 流量削峰 异步通信 扩展性 排序保证
1.4常见队列实现优缺点
mysql 可靠性高,易实现,速度慢
redis 速度快 单条大消息宝时效率低
消息系统,专业性强,可靠,学习成本高
1.5 消息处理触发机制
死循环方式读取;易实现,故障时无法及时恢复
定时任务:压力均分,有处理量上线
守护进程;类似于PHP-FPM和PHP-cg需要shell基础
2.解耦案例:队列处理订单系统和配送系统
2.1架构设计
db.php
<?php
/**
* //数据库链接类
*/
class DB
{
//私有的属性
private static $dbcon=false;
private $host;
private $port;
private $user;
private $pass;
private $db;
private $charset;
private $link;
//私有的构造方法
private function __construct()
{
$this->host = '';
$this->port = '3306';
$this->user = 'root';
$this->pass = '';
$this->db = 'order';
$this->charset = 'utf8';
//链接数据库
$this->db_connect();
//选择数据库
$this->db_usedb();
//设置字符集
$this->db_charset();
}
//链接数据库
private function db_connect()
{
$this->link=mysqli_connect($this->host,$this->user,$this->pass,$this->db,$this->port);
if(!$this->link)
{
echo "数据库链接失败<br>";
echo "错误编码".mysqli_errno($this->link)."<br>";
echo "错误信息".mysqli_error($this->link)."<br>";
exit;
}
}
//设置字符集
private function db_charset()
{
mysqli_query($this->link,"set names {$this->charset}");
}
//选择数据库
private function db_usedb()
{
mysqli_query($this->link,"use {$this->db}");
}
//私有的克隆
private function __clone()
{
die('clone is not allowed');
}
//公用的静态方法
public static function getInstance()
{
if(self::$dbcon==false)
{
self::$dbcon=new self;
}
return self::$dbcon;
}
//执行sql语句的方法
public function query($sql)
{
$res=mysqli_query($this->link,$sql);
//var_dump($res);
if(!$res)
{
echo "sql语句执行失败<br>";
echo "错误编码是".mysqli_errno($this->link)."<br>";
echo "错误信息是".mysqli_error($this->link)."<br>";
}
return $res;
}
//获得最后一条的记录id
public function getInsertid()
{
return mysqli_insert_id($this->link);
}
/*
查询某个字段
@param
@return string or int
*/
public function getOne($sql)
{
$query=$this->query($sql);
return mysqli_free_result($query);
}
//获取一行记录,return array 一维数组
public function getRow($sql,$type="assoc")
{
$query = $this->query($sql);
if(!in_array($type,array("assoc",'array',"row")))
{
die("mysqli_query error");
}
$funcname = "mysqli_fetch_".$type;
return $funcname($query);
}
//获取一条记录,前置条件通过资源获取一条记录
public function getFormSource($query,$type='assoc')
{
if(!in_array($type,array("assoc","array","row")))
{
die("mysqli_query error");
}
$funcname = "mysqli_fetch_".$type;
return $funcname($query);
}
//获取多条记录,二维数组
public function getAll($sql)
{
$query = $this->query($sql);
$list = array();
while ($r=$this->getFormSource($query))
{
$list[]=$r;
}
return $list;
}
public function selectAll($table,$where,$fields = '*',$order='',$skip=0,$limit=100)
{
if(is_array($where))
{
foreach ($where as $key => $val)
{
if (is_numeric($val))
{
$condition = $key.'='.$val;
}
else
{
$condition = $key.="\\".$val."\\";
}
}
}
else
{
$condition = $where;
}
if(!empty($order))
{
$order = " order by ".$order;
}
$sql = "select $fields from $table where $condition $order limit $skip,$limit";
$query = $this->query($sql);
$list=array();
while ($r=$this->getFormSource($query))
{
$list[]=$r;
}
return $list;
}
/*
定义添加数据的方法
@param string $table 表名
@param string orarray $data[数据]
@return int 最新添加的id
*/
public function insert($table,$data)
{
//遍历数组,得到每一个字段和字段的值
$key_str='';
$v_str='';
foreach ($data as $key => $v)
{
//$key的值是每一个字段S一个字段所对应的值
$key_str.=$key.',';
$v_str.="'$v',";
}
$key_str=trim($key_str,',');
$v_str=trim($v_str,',');
//判断数据是否为空
$sql = "insert into $table ($key_str) values ($v_str)";
var_dump($sql);
$this->query($sql);
//返回上一次增加操作产生ID值
return $this->getInsertid();
}
/*
删除一条数据方法
@param1 $table ,$where = array('id'=>1) 表名 条件
@return 受影响的行数
*/
public function deleteOne($table,$where)
{
if(is_array($where))
{
foreach ($variable as $key => $val)
{
$condition = $key.'='.$val;
}
}
else
{
$condition = $where;
}
$sql = "delete from $table where $condition";
$this->query($sql);
//返回受影响的行数
return mysqli_affected_rows($this->link);
}
/*
删除多条数据方法
@param1 $table $where 表名 条件
@return 受影响的行数
*/
public function deleteAll($table,$where)
{
if(is_array($where))
{
foreach ($where as $key => $val)
{
if(is_array($val))
{
$condition = $key.'in('.implode('.',$val).')';
}
else
{
$condition = $key.'='.$val;
}
}
}
else
{
$condition = $where;
}
$sql = "delete from $table where $condition";
$this->query($sql);
//返回受影响的行数
return mysqli_affected_rows($this->link);
}
/*
[修改操作description]
@param [type] $table [表名]
@param [type] $data [表名]
@param [type] $where [表名]
@return [type]
*/
public function update($table,$data,$where,$limit=0)
{
//遍历数组,得到每一个字段和字段的值
$str = '';
foreach ($data as $key => $v)
{
$str.="$key='$v'";
}
$str=rtrim($str.'.');
if(is_array($where))
{
foreach ($where as $key => $val)
{
if(is_array($val))
{
$condition = $key.'in('.implode('.',$val).')';
}
else
{
$condition = $key.'='.$val;
}
}
}
else
{
$condition = $where;
}
if(!empty($limit))
{
$limit = " limit ".$limit;
}
else
{
$limit = '';
}
//修改sql语句
$sql = "update $table set $str where $condition $limit";
$this->query($sql);
//返回受影响的行数
return mysqli_affected_rows($this->link);
}
}
goods.php
<?php
//这个文件主要是配送系统处理队列中订单并进行标记的一个文件
date_default_timezone_set('Asia/Shanghai');
include '../include/db.php';
$db = DB::getInstance();
//1.先把要处理的记录更新为等待处理
$waiting = array('status'=>0);
$lock = array('status'=>2);
$res_lock = $db->update('order_queue',$lock,$waiting,2);
//2.我们要选择出刚刚咱们更新的这些数据,然后进行配送系统的处理
if($res_lock)
{
//1.选择出要处理的订单内容
$res = $db->selectAll('order_queue',$lock);
//2.然后由配货系统进行退货处理
//...
//3.处理完成之后把订单更新为已经处理
$success = array(
'status'=>1,
'update_at'=>date('Y-m-d H:i:s',time())
);
$res_last = $db->update('order_queue',$success,$lock);
if($res_last)
{
echo "SUCCESS:".$res_last;
}
else
{
echo "FAIL:".$res_last;
}
}
else
{
echo "ALL FINISHED";
}
//3.把这些处理过的程序更新为已完成
order.php
<?php
//这个文件是用来接收用户的订单信息并写入一个队列的一个文件
date_default_timezone_set('Asia/Shanghai');
include '../include/db.php';
if(!empty($_GET['mobile']))
{
//这里应该首先是订单中心的处理系统
//......
//把用户get过来的数据进行过滤
$order_id = rand(10000,99999);
//把生成的订单信息存入到对列表中
$insert_data = array(
'order_id'=>$order_id,
'mobile'=>$_GET['mobile'],
'created_at'=>date('Y-m-d H:i:s',time()),
'address'=>"很远的地方",
'`status`'=>0,
);
//把数据存入数据表中
$db = DB::getInstance();
$res = $db->insert('order_queue',$insert_data);
if($res)
{
echo $insert_data['order_id']."保存成功";
}
else
{
echo "保存失败";
}
}
goods.sh
#!/bin/bash
date "+%G-%m-%d %H:%M:S"
cd /webdata/queue/queue_mysql/
php goods.php
crontab
SHELL=/bin/bash
PATH=/sbin:/bin:/usr/sbin:/usr/bin
MAILTO=root
# For details see man 4 crontabs
# Example of job definition:
# .---------------- minute (0 - 59)
# | .------------- hour (0 - 23)
# | | .---------- day of month (1 - 31)
# | | | .------- month (1 - 12) OR jan,feb,mar,apr ...
# | | | | .---- day of week (0 - 6) (Sunday=0 or 7) OR sun,mon,tue,wed,thu,fri,sat
# | | | | |
# * * * * * user-name command to be executed
#每秒钟执行一次,输出到日志文件
*/1 * * * * root /webdata/queue/queue_mysql/goods.sh >> /webdata/queue/queue_mysql/log.log 2>&1
order.queue.sql
CREATE TABLE `order_queue` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '订单的id号',
`order_id` int(11) NOT NULL,
`mobile` varchar(20) NOT NULL COMMENT '用户的手机号',
`address` varchar(100) NOT NULL COMMENT '用户地址',
`created_at` datetime NOT NULL DEFAULT '0000-00-00 00:00:00' COMMENT '订单的创建时间',
`updated_at` datetime NOT NULL DEFAULT '0000-00-00 00:00:00' COMMENT '处理完成的时间',
`status` tinyint(2) NOT NULL COMMENT '当前状态,0未处理,1已处理, 2处理中',
PRIMARY KEY(`id`)
) ENGINE=InnoDB DEFAULT CHARSET=UTF8;
实时查看输出日志:tail -f log.log 查看最后十条:tail log.log
在windows下面编写bash文件的传到服务器之后要对该文件类型进行修改.
查看当前文件类型命令: :set ff
将当前文件类型转换为unix。 :set ff=unix 一般类型为dos何unix
3.流量削峰案例;redis的list类型实现秒杀
redis_queue.sql
CREATE TABLE `redis_queue`(
`id` int(10) unsigned NOT NULL AUTO_INCREMENT,
`UID` INT(11) NOT NULL DEFAULT '0',
`time_stamp` varchar(24) NOT NULL,
PRIMARY KEY(`id`)
)ENGINE=InnoDB DEFAULT CHARSET=UTF8;
user.php
<?php
//首先,加载redis组件
$redis = new Redis();
$redis->connect('127.0.0.1',6379);
$redis_name = "miaosha";
for ($i=0; $i <100 ; $i++) //只能模仿高压,不能模仿高并发
{
$uid = rand(100000,999999);
}
//接受用户id
//$uid = $_GET['uid'];
//获取一下redis里面已有的数量
$num = 10;
//如果当天人数少于十的时候,则加入这个队列
if($redis->lLen($redis_name)<10)
{
$redis->rPush($redis_name,$uid.'%'.microtime());
}
else
{
//如果当天人数已经达到了十个人,则返回秒杀结束
echo "秒杀已结束";
}
$redis->close();
savetodb.php
<?php
include '../include/db.php';
//首先,加载redis组件
$redis = new Redis();
$redis->connect('127.0.0.1',6379);
$redis_name = "miaosha";
$db=DB::getInstance();
//死循环
while (1)
{
//从队列最左侧取出一个值
$user = $redis->lPop($redis_name);
//然后判断这个值是否存在
if(!$user || $user =='null')
{
sleep(2);
continue;
}
//切割出时间,uid
$user_arr = explode('%',$user);
$insert_data = array(
'uid'=>$user[0],
'time_stamp'=>$user[1]
);
//保存到数据库中
$res = $db->insert('redis_queue',$insert_data);
//数据库插入的失败的时候的回滚机智
if(!$redis)
{
$redis->rPush($redis_name,$user);
}
sleep(2);
}
$redis->close();
4.rabbitmq 更专业的消息系统实现方案
源码地址:
链接:https://pan.baidu.com/s/1vnqlaFyPZ8opFiZ8qPwiuw
提取码:tf80