Swoole整合ThinkPHP3.2系列教程二

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/u013705066/article/details/77679233

swoole和ThinkPHP的整合

先上一份在我们系统内部的swoole整合的架构预览

├─Application                           应用目录
│  │
│  ├─Cli                                Cli模块
│  │  ├─Controller                      制器类
│  │  │  ├─StartController              TP框架加载时默认加载的控制器
│  │  │  ├─SwooleController             我们的业务逻辑写在这里面
│  │  └─ ...
├─Swoole
│  │
│  ├─log                                swoole运行日志
│  ├─Server.php                         swoole的服务代码
│  ├─swoole.php                         用于cli模式下启动和软重启swoole服务

最早测试的时候不是这样搭建的,而是把Server.php里的关于服务的东西放在了TP控制器里,在cli模式下调用

php index.php(入口文件) Swoole/start (控制器/方法)

这种模式是把swoole套在了TP里运行,也是可行的,但是总觉得启动个swoole服务为毛还要告诉TP一声?

我们想要的一种模式是业务服务器独立运行,swoole服务作为守护进程常驻内存,当浏览器需要运行比较耗时的操作时,需要跟swoole服务进程建立长连接,当耗时的任务执行完毕时会通知浏览器已经完毕。

整个过程下来,swoole服务和业务服务不应该耦合在一起的。最完美的状态是swoole独立运行(使用swoole框架重新写耗时操作的业务逻辑代码),独立连接数据库。当浏览器执行这些任务时,就不再找业务服务器了,而是直接跟swoole服务打交道。

可是我们没有那么多的时间和精力,我们只能在swoole服务的进程里调用TP框架的东西,来执行我们在TP里写的代码。最后参考了网上的一些方案,选择了这样与TP结合。


代码逻辑

/Swoole/Server.php

  • 关于swoole服务的一些配置,比如监听地址,端口号(默认为9501),和一些基础配置

  • 一些回调函数,里面的代码注释很完整,可以直接看代码

<?php
// +----------------------------------------------------------------------
// 2017-8-23 09:03:40
// 此次修改为只作为websocket的服务端
// +----------------------------------------------------------------------
class Server{
    protected $swoole;
    // 监听所有地址
    protected $host = '0.0.0.0';
    // 监听 9501 端口
    protected $port = 9501;
    // 配置项
    protected $option = [
        //设置启动的worker进程数
        'worker_num' => 2,
        //task进程的数量
        'task_worker_num' => 4,
        //指定task任务使用消息队列模式,3表示完全争抢模式,task进程会争抢队列,无法使用定向投递
        'task_ipc_mode' => 3,
        //task进程的最大任务数
        'task_max_request' => 1000,
        // 守护进程化
        'daemonize'  => false,
        // 监听队列的长度
        'backlog'    => 128,
        //绑定uid时用
        'dispatch_mode' => 5,
        //设置日志路径
        'log_file' => SWOOLE_LOG_PATH,
    ];
    protected function init(){
        //异步非阻塞多进程的websocket
        $this->swoole = new swoole_websocket_server($this->host, $this->port);
        $eventList    = ['Open', 'Message', 'Close', 'HandShake' , 'Task', 'Finish' , 'WorkerStart' , 'Receive'];

        // 设置参数
        if (!empty($this->option)) {
            $this->swoole->set($this->option);
        }
        // 设置回调
        foreach ($eventList as $event) {
            if (method_exists($this, 'on' . $event)) {
                $this->swoole->on($event, [$this, 'on' . $event]);
            }
        }
    }
    public function start(){
        $this->init();
        $this->swoole->start();
    }
    public function getHost(){
        return $this->host;
    }
    public function getPort(){
        return $this->port;
    }
    /**
     * [onOpen 建立连接时的回调函数]
     * @method onOpen
     * @param  swoole_server       $serv [description]
     * @param  swoole_http_request $req    [description]
     * @return [type]                      [description]
     */
    public function onOpen(swoole_server $serv, swoole_http_request $req){
        //将连接绑定到uid上面
        if(!empty($req->get)&&$req->get['uid']){
            $serv->bind($req->fd , $req->get['uid']);
        }
    }
    /**
     * [onMessage 接收到socket客户端发送数据的回调函数]
     * @method onMessage
     * @param  swoole_server          $serv [description]
     * @param  swoole_websocket_frame $frame  [description]
     * @return [type]                         [description]
     */
    public function onMessage(swoole_server $serv,  swoole_websocket_frame $frame){
        //收到数据时处理数据
        //根据收到的cmd名字去调用指定的方法
        $receive = json_decode($frame->data,true);
        //为了避免数据处理量过大阻塞当前进程,导致服务响应变慢,我们把耗时的操作扔到TaskWorker进程池中执行
        //当要执行的方法存在并且已经在swoole_log表里备案过的可以丢到task进程池
        $swooleLog = D('SwooleLog');
        $swooleController = A('Swoole');
        //$receive['args']['id']是业务服务器那边数据库SwooleLog表里的id
        if (method_exists($swooleController, $receive['cmd']) && $receive['args']['id']) {
            $task_id = $serv->task($receive);
            //记录task_id信息
            //...其他你想要做的精细化处理
        }else{
            if($receive['cmd'] === 'reload'){
                //利用Swoole提供的柔性终止/重启的机制
                $rs = $serv -> reload();
                $serv->push($frame->fd , $rs);
            }elseif($receive['args']['id']){
                $returnData = $this->_returnStr('submit_error');
                $serv->push($frame->fd , $returnData);
            }elseif(method_exists($this, $receive['cmd'])){
                $this->{$receive['cmd']}($serv , $frame);
            }
        }
    }
    /**
     * [onTask 在task_worker进程池内被调用时的回调函数]
     * @method onTask
     * @param  swoole_server $serv          [description]
     * @param  int           $task_id       [description]
     * @param  int           $src_worker_id [description]
     * @param  mixed         $data          [description]
     * @return [type]                       [description]
     */
    public function onTask(swoole_server $serv, $task_id, $src_worker_id, $data){
        //记录任务开始执行的时间
        //...自己发挥  我们项目的业务逻辑部分已经删除
        //
        try{
            $swooleController = A('Swoole');
            $rs = $swooleController->{$data['cmd']}($data['args']);
            return json_encode(['id'=>$data['args']['id'] , 'status' => true , 'other' => $rs]);
        }catch(\Exception $e){
            return json_encode(['id'=>$data['args']['id'] , 'status' => false , 'other' => $e->getMessage()]);
        }
    }
    /**
     * [onFinish 任务执行完毕时的回调函数]
     * @method onFinish
     * @param  swoole_server $serv    [description]
     * @param  int           $task_id [description]
     * @param  string        $data    [description]
     * @return [type]                 [description]
     */
    public function onFinish(swoole_server $serv, $task_id, string $data){
        $rs = json_decode($data , true);
        $swooleLog = D('SwooleLog');
        //...
        //通知用户该任务已经执行完毕了 可以来查看数据了
        //检查客户端链接是否存在 如果存在的话发送消息
        $returnData = $this->_returnStr('task_excute_success');
        foreach($serv->connections as $fd){
            $conn = $serv->connection_info($fd);
            //根据uid判断当前连接是活跃的
            //这里是业务服务器那边能取到的用户信息,这里已经删除,保护我们的项目
            $serv->push($fd , $returnData);
        }
    }
    /**
     * [onWorkerStart 为了应用能热加载 把框架的东西放到worker进程启动]
     * @method onWorkerStart
     * @param  swoole_server $server    [description]
     * @param  [type]        $worker_id [description]
     * @return [type]                   [description]
     */
    public function onWorkerStart(swoole_server $server, $worker_id){
        // 定义应用目录
        define('APP_PATH', '../Application/');
        // 定义应用模式
        define('APP_Mode', 'cli');
        define('BIND_MODULE','Cli');
        // 引入ThinkPHP入口文件
        require '../ThinkPHP/ThinkPHP.php';
    }
    /**
     * [_returnStr 返回给客户端的数据]
     * @method _returnStr
     * @param  [type]     $type [description]
     * @return [type]           [description]
     */
    private function _returnStr($type){
        switch ($type) {
            case 'submit_success':
                $returnData = [
                    'code' => 202,
                    'info' => '任务已经提交,请等待服务器计算数据!'
                ];
                break;
            case 'submit_error':
                $returnData = [
                    'code' => 404,
                    'info' => '任务提交失败,请联系管理员进行处理!'
                ];
                break;
            case 'task_excute_success':
                $returnData = [
                    'code' => 200,
                    'info' => '任务执行完毕!'
                ];
                break;
        }
        return json_encode($returnData);
    }
    /**
     * [clients 获取所有的客户端连接信息,返回我们的uid]
     * @method clients
     * @return [type] [description]
     */
    public function clients(swoole_server $serv , swoole_websocket_frame $frame){
        //...
        $serv->push($frame->fd , json_encode($serv->connections));
    }
    public function __call($method, $args){
        call_user_func_array([$this->swoole, $method], $args);
    }
}

/Swoole/swoole.php

是一个面向cli编程的代码,主要提供了start和reload两个命令

#!/bin/env php
<?php
/**
 * 定义项目根目录&swoole-task pid
 */
define('SWOOLE_PATH', __DIR__);
define('SWOOLE_LOG_PATH', SWOOLE_PATH . DIRECTORY_SEPARATOR . 'log' . DIRECTORY_SEPARATOR . 'Swoole' . '.log');

/**
 * 加载 swoole server
 */
include SWOOLE_PATH .  DIRECTORY_SEPARATOR . 'Server.php';

//提示帮助信息
if ($argc != 2 || in_array($argv[1], array('--help', '-help', '-h', '-?'))) {
    echo <<<HELP
用法:php swoole.php 选项 ... 可选的命令[start|reload|list]

--help|-help|-h|-?  显示本帮助说明

选项说明
    start,  启动swoole服务[默认监测9501端口]
    reload, 柔性重启所有workder进程
    list,   查看当前所有连接的客户端数

HELP;
    exit;
}

//执行命令行选项
switch($argv[1]){
    case 'start':
        start();
        break;
    case 'reload':
        reload();
        break;
    case 'list':
        clients();
        break;
    default:
        exit("输入命令有误 : {$argv[1]}, 请查看帮助文档\n");
        break;
}

//启动swoole服务
function start(){
    $server = new Server();
    $server->start();
}

//柔性重启swoole服务
function reload(){
    echo "正在柔性重启swoole的worker进程..." . PHP_EOL;
    try {
        $server = new Server();
        $port = $server->getPort();
        $host = '127.0.0.1';
        $cli = new swoole_http_client($host, $port);
        $cli->set(['websocket_mask' => true]);
        $cli->on('message', function ($_cli, $frame) {
            if($frame->data){
                exit('swoole重启成功!'.PHP_EOL);
            }
        });
        $cli->upgrade('/', function ($cli) {
            $cli->push(json_encode(['cmd' => 'reload']));
        });
    } catch (Exception $e) {
        exit($e->getMessage() . PHP_EOL . $e->getTraceAsString());
    }
}

function clients(){
    try {
        $server = new Server();
        $port = $server->getPort();
        $host = '127.0.0.1';
        $cli = new swoole_http_client($host, $port);
        $cli->set(['websocket_mask' => true]);
        $cli->on('message', function ($_cli, $frame) {
            $users = json_decode($frame->data , true);
            if(empty($users)){
                echo '当前没有客户端连接'.PHP_EOL;
            }else{
                foreach($users as $user){
                    echo '---'.$user.PHP_EOL;
                }
            }
            exit();
        });
        $cli->upgrade('/', function ($cli) {
            $cli->push(json_encode(['cmd' => 'clients']));

        });
    } catch (Exception $e) {
        exit($e->getMessage() . PHP_EOL . $e->getTraceAsString());
    }
}

存在的问题

0.以上的代码是从项目里摘出来的,大体思路还在,业务部分代码删除了。不保证正常运行,但是问题不大,如果报错了聪明的你一定一看就知道是什么错误。

1.由于我们将TP框架的东西放在workstart回调函数里启动,这里只要启动worker进程和task进程,都会加载一次TP框架的东西到内存里,对内存的占用问题还需要仔细研究。

2.目前还不支持那些不支持HTML5 WebSocket的浏览器 我会尽快想办法解决。用flash模拟socket请求的方法没有试成功,近期重新再搞一波。

猜你喜欢

转载自blog.csdn.net/u013705066/article/details/77679233