tp5使用workerman实现异步任务

问题描述:

采集数据时过程很慢,导致无法继续进行其他任务,,避免主业务被长时间阻塞,故而将其提交给异步任务,当任务完成通知客户端即可

流程

前端业务:

由于本系统采用iframe结构,为避免点击其他页面业务中断,所以业务在父页面执行,

1.用户在子页面点击采集按钮调用父级方法

function to_collect(ids) {
        window.parent.startCollect(ids);
    }

2.父级页面进行socket链接,当收到服务器处理完任务消息时关闭socket并通知用户结果

function startCollect(ids)
    {
        var wsServer = 'ws://127.0.0.1:5432';
        var websocket = new WebSocket(wsServer);
        var inter_val = 0;
        websocket.onopen = function (evt) {
            console.log("Connected to WebSocket server.");
            var data = {ids:ids};
            data = JSON.stringify(data);
            websocket.send(data);

            //设置心跳,避免服务器断开
            inter_val = setInterval(function () {
                websocket.send('hello');
            }, 50000)
        };

        websocket.onclose = function (evt) {
            console.log("Disconnected");
        };

        websocket.onmessage = function (evt) {
            console.log('Retrieved data from server: ' + evt.data);
            if (isJson(evt.data)) {
                var res = JSON.parse(evt.data);
                if(res.code == 0){
                    alert("采集条数:"+res.msg)
                    websocket.close();
                    clearInterval(inter_val);//关闭定时器
                }
            }



        };

        websocket.onerror = function (evt, e) {
            console.log('Error occured: ' + evt.data);
        };
    }


    /**
     * 判断是否json
     * @param $string
     * @returns {boolean}
     */
    function isJson($string)
    {
        try {
            if(typeof JSON.parse($string) == 'object')
                return true;
            return false;
        } catch (e) {
            console.log(e);
            return false;

        }
    }

服务端

1. 收到前端发来的数据,调用model进行业务处理,然后通知客户端

<?php
namespace app\http;

use app\common\model\Collect;
use think\worker\Server;
use Workerman\Lib\Timer;
use Workerman\Worker as W;

class Worker2 extends Server
{
    protected $socket = 'websocket://0.0.0.0:5432';
    protected $option = [
        'count'=> 4,
    ];

    /**
     * 每个进程启动
     * @param $worker
     */


    public function onWorkerStart($worker)
    {


        // 心跳间隔55秒
        define('HEARTBEAT_TIME', 55);
        Timer::add(1, function()use($worker){
            $time_now = time();
            foreach($worker->connections as $connection) {
                // 有可能该connection还没收到过消息,则lastMessageTime设置为当前时间
                if (empty($connection->lastMessageTime)) {
                    $connection->lastMessageTime = $time_now;
                    continue;
                }
                // 上次通讯时间间隔大于心跳间隔,则认为客户端已经下线,关闭连接
                if ($time_now - $connection->lastMessageTime > HEARTBEAT_TIME) {
                    $connection->close();
                }
            }
        });
    }

    public function onMessage($connection,$data)
    {
        global $worker;
        // 判断当前客户端是否已经验证,即是否设置了uid
        if(!isset($connection->uid))
        {
            // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
            $connection->uid = ip2long($connection->getRemoteIp()).time().rand(1,9999);
            /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
             * 实现针对特定uid推送数据
             */
            $worker->uidConnections[$connection->uid] = $connection;
            $connection->send('login success, your uid is ' . $connection->uid);
        }
        $ids = json_decode($data,true)['ids'] ?? 0;
        if ($ids)
        {
            $collect_model = new Collect();
            $res = $collect_model->getNewestArticle($ids);
            $res = json_encode(['code' => 0,'msg' =>$res]);
            $connection->send($res);
        }

        // 给connection临时设置一个lastMessageTime属性,用来记录上次收到消息的时间
        $connection->lastMessageTime = time();
        //$connection->send('receive success');
        echo $data;
        echo "\n";
    }

    public function onConnect($connection)
    {

    }

    /**
     * 当连接断开时触发的回调函数
     * @param $connection
     */
    public function onClose($connection)
    {
        global $worker;
        if(isset($connection->uid))
        {
            // 连接断开时删除映射
            unset($worker->uidConnections[$connection->uid]);
        }
    }
    /**
     * 当客户端的连接上发生错误时触发
     * @param $connection
     * @param $code
     * @param $msg
     */
    public function onError($connection, $code, $msg)
    {
        echo "error $code $msg\n";
    }


    // 针对uid推送数据
    public function sendMessageByUid($uid, $message)
    {
        global $worker;
        if(isset($worker->uidConnections[$uid]))
        {
            $connection = $worker->uidConnections[$uid];
            $connection->send($message);
            return true;
        }
        return false;
    }


}

启动  workerman

执行结果

thinkphp5中如何开启workman请参照https://blog.csdn.net/flysnownet/article/details/96475927

发布了85 篇原创文章 · 获赞 45 · 访问量 95万+

猜你喜欢

转载自blog.csdn.net/flysnownet/article/details/99825601