安装框架和扩展
think-queue
composer create-project topthink/think=5.0.* tp50 --prefer-dist
cd tp50
composer require topthink/think-queue
队列设置
# 配置使用redis, tp50/application/extra/queue.php
<?php
return [
// 'connector' => 'Database', // 数据库驱动
// 'expire' => null, // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null
// 'default' => 'default', // 默认的队列名称
// 'table' => 'prefix_jobs', // 存储消息的表名,不带前缀
// 'dsn' => [],
// --------------------
'connector' => 'Redis', // Redis 驱动
'expire' => null, // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null
'default' => 'default2', // 默认的队列名称
'host' => '127.0.0.1', // redis 主机ip
'port' => 6379, // redis 端口
'password' => '', // redis 密码
'select' => 0, // 使用哪一个 db,默认为 db0
'timeout' => 0, // redis连接的超时时间
'persistent' => false, // 是否是长连接s
];
# 队列job文件,执行任务,tp50/application/index/controller/Job.php
<?php
namespace app\index\controller;
use think\queue\Job as QueueJob;
class Job
{
public function fire(QueueJob $job, $data)
{
$pieces = json_encode($data);
$this->add_db($pieces);
if ($job->attempts() > 3) {
//通过这个方法可以检查这个任务已经重试了几次了
$job->delete();
}
//如果任务执行成功后 记得删除任务,不然这个任务会重复执行,直到达到最大重试次数后失败后,执行failed方法
$job->delete();
// 也可以重新发布这个任务
// $job->release($delay); //$delay为延迟时间
}
public function failed($data)
{
// ...任务达到最大重试次数后,失败了
}
public function add_db($data = [])
{
$data = (array) json_decode(json_decode($data));
// $count = Db::table('info')->where('address', $data['address'])->count();
// if ($count == 0) {
// Db::table('info')->insert($data);
// }
dump($data);
}
}
# 业务逻辑文件,do方法,tp50/application/index/controller/Index.php
<?php
namespace app\index\controller;
use think\Queue;
class Index
{
public function doo() {
for ($i = 0; $i < 100; $i++) {
$data = [
'name' => $i,
'address' => 2,
'mobile' => 2,
'p' => 2,
];
$this->push($data);
}
}
/**
* 推送列队
* @param array $data [description]
* @return [type] [description]
*/
public function push($data = [])
{
$jobData = json_encode($data);
$jobHandlerClassName = 'app\index\controller\Job';
$jobQueueName = "PaChongShuJu";
$isPushed = Queue::push($jobHandlerClassName, $jobData, $jobQueueName);
if ($isPushed) {
echo "ok";
} else {
var_dump($isPushed);
}
}
}
php使用cli模式执行业务逻辑
# 配置tp50/application/command.php
<?php
return [
'app\index\controller\Cmd',
'think\queue\command\Listen',
'think\queue\command\Restart',
'think\queue\command\Subscribe',
'think\queue\command\Work',
];
# 新建do命令tp50/application/index/controller/Cmd.php
<?php
namespace app\index\controller;
use think\console\Command;
use think\console\Input;
use think\console\Output;
class Cmd extends Command
{
protected function configure()
{
$this->setName('do')->setDescription('Here is the remark ');
}
protected function execute(Input $input, Output $output)
{
$do = new Index();
$do->doo();
}
}
执行命令
cd /tp50
# 执行业务逻辑
sudo php think do
# 监听队列
sudo php think queue:listen --queue PaChongShuJu
# 执行队列(不加--daemon为执行单个任务)
sudo php think queue:work --queue PaChongShuJu --daemon