php多进程消费kafka消息封装公共类

版权声明:转载请注明来源 https://blog.csdn.net/u013702678/article/details/88234881

php操作kafka,需要安装rdkafka扩展,具体安装手册在网络自行查找。

下面代码是低阶和高阶的操作公共类库。

abstract class KafkaConsumerBaseService
{
    protected $topic;
    protected $brokers;
    protected $worker_process_num = 0;
    private $partion_work_pid = [];//partion进程集合
    private $queue_work_pid = [];//queue进程集合
    private $current_process = null;
    private $status = 'running';
    private $partions_count = 0;

    const LOG_MODULE = 'kafka-comsuer';
    const CHECK_QUEUE_LEN_TIME = 10000;//内部队列监控时长,单位为s,即目前是10s触发一次检查
    const CHECK_PARTION_WORKER_TIME = 10000;//检查partion进程个数时长,单位为s,即目前是10s触发一次检查
    const CHECK_QUEUE_WORKER_TIME = 10000;//检查queue进程个数时长,单位为s,即目前是10s触发一次检查
    const GET_META_DATA_TIMEOUT = 60e3;//获取队列元数据超时时间
    const CHECK_QUEUE_LEN_LIMIT = 100;//内部队列监控消息个数,超过这个个数,会触发告警
    const CHECK_QUEUE_MAX_BYTES = 8192;//内部队列监控消息大小,超过这个大小,会触发告警
    const CONSUMER_TIME_OUT = 1000;//consum读取消息超时时间
    const CONSUMER_OFFSET_COMMIT_TIME = 100;//offset自动提交时间

    public function __construct($topic,$brokers,$worker_process_num=0)
    {
        $this->topic = $topic;
        $this->brokers = $brokers;
        $this->worker_process_num = $worker_process_num;
        $this->init();
    }

    public function callback_function_kafka_consumer(\swoole_process $worker)
    {
        \swoole_set_process_name($this->topic."_partition_consumser_worker");
        $GLOBALS['worker'] = $worker;
        \swoole_event_add($worker->pipe, function ($pipe) {
            $worker = $GLOBALS['worker'];
            $partion_id = $worker->read();
            if($partion_id>=0)
            {
                $this->consumer($partion_id,$worker);
            }
        });
    }

    public function callback_local_queue(\swoole_process $worker)
    {
        \swoole_set_process_name($this->topic."_logic_consumser_worker");
        while (($msg=$worker->pop())!=null)
        {
            $this->handleLogic($msg);
        }
    }

    public function start_partion_worker($partion_id)
    {
        $process = new \swoole_process([$this,"callback_function_kafka_consumer"], false, 2);
        $process->useQueue();
        $process->write($partion_id);
        $child_pid = $process->start();
        $this->partion_work_pid[$child_pid] = $partion_id;

        return $child_pid;
    }

    public function start_queue_worker()
    {
        $process = new \swoole_process([$this,"callback_local_queue"], false, false);
        $process->useQueue();
        $child_pid = $process->start();
        $this->queue_work_pid[$child_pid] = 1;
        $this->current_process = $process;

        return $child_pid;
    }

    protected function info($message)
    {
        echo $message.PHP_EOL;
    }

    protected function error($message)
    {
        echo $message.PHP_EOL;
    }

    abstract function handleLogic($msg);
    abstract function consumer($partion_id,$worker);

    private function check()
    {
        //检查依赖扩展
        foreach (array('swoole') as $ext) {
            if (!extension_loaded($ext)) {
                echo "please install {$ext} extension.".PHP_EOL;
                exit(1);
            }
        }

        //检查运行用户
        if (0 !== posix_getuid()) {
            echo "please run as root user.".PHP_EOL;
            exit(1);
        }

        //检查php版本,要php7以及更高版本
        if (!version_compare(phpversion(), '7.0.1', '>=')) {
            echo "need to run in php7 version and higher.".PHP_EOL;
            exit(1);
        }
    }

    public function exec()
    {
        $partitions = $this->get_partitions();
        if(!empty($partitions) && is_object($partitions))
        {
            foreach ($partitions as $partition)
            {
                $this->start_partion_worker($partition->getId());
            }

            $this->info("start partion worker finished,count:".count($this->partion_work_pid));
            $this->partions_count = count($partitions);

            //启动定时器,做定时检查
            \swoole_timer_tick(self::CHECK_PARTION_WORKER_TIME, function() {
                if(count($this->partion_work_pid)!=$this->get_partitions_count())
                {
                    $msg = "partion worker count invalid,current parent worker:".count($this->partion_work_pid).",partitions count:".$this->get_partitions_count();
                    $this->error($msg);
                    $this->alarm($msg);
                }
            });
        }

        if($this->worker_process_num>0)
        {
            //如果启用了worker进程池,则worker进程池的进程个数必须不小于partion个数
            $this->worker_process_num = max($this->worker_process_num,$this->partions_count);
            for($i=0;$i<$this->worker_process_num;$i++)
            {
                $this->start_queue_worker();
            }

            $this->info("start queue worker finished,count:".count($this->queue_work_pid));

            \swoole_timer_tick(self::CHECK_QUEUE_LEN_TIME, function() {
                if($this->current_process)
                {
                    $ret = $this->current_process->statQueue();
                    if(!empty($ret) && is_array($ret) && isset($ret['queue_num']) && isset($ret['queue_bytes']))
                    {
                        if($ret['queue_num']>self::CHECK_QUEUE_LEN_LIMIT || $ret['queue_bytes']>self::CHECK_QUEUE_MAX_BYTES)
                        {
                            $msg = 'local queue invalid';
                            $this->error($msg);
                            $this->alarm($msg);
                        }
                    }
                }
            });

            \swoole_timer_tick(self::CHECK_QUEUE_WORKER_TIME, function() {
                if(count($this->queue_work_pid)!=$this->worker_process_num)
                {
                    $msg = "queue worker count invalid,current parent worker:".count($this->queue_work_pid).",partitions count:".$this->worker_process_num;
                    $this->error($msg);
                    $this->alarm($msg);
                }
            });
        }
    }

    private function registerSignal()
    {
        //父进程收到kill信号
        \swoole_process::signal(SIGTERM, function ($signo) {
            $this->quit();
        });

        //重启所有worker进程
        \swoole_process::signal(SIGUSR1, function ($signo) {
            $this->restart();
        });

        //子进程退出
        \swoole_process::signal(SIGCHLD, function ($sig) {
            while (true)
            {
                $ret = \swoole_process::wait(false);
                if($ret)
                {
                    if($this->status==='running')
                    {
                        if(isset($ret['pid']))
                        {
                            if(array_key_exists($ret['pid'],$this->partion_work_pid))
                            {
                                $partion_id = $this->partion_work_pid[$ret['pid']];
                                if($partion_id>=0)
                                {
                                    $new_pid = $this->start_partion_worker($partion_id);
                                    if($ret['pid']!=$new_pid)
                                    {
                                        unset($this->partion_work_pid[$ret['pid']]);
                                    }
                                    $this->info("$new_pid now start!!");
                                }
                            } else if(array_key_exists($ret['pid'],$this->queue_work_pid)) {
                                $new_pid = $this->start_queue_worker();
                                if($ret['pid']!=$new_pid)
                                {
                                    unset($this->queue_work_pid[$ret['pid']]);
                                }
                                $this->info("$new_pid now start!!");
                            }
                        } else {
                            $this->error("wait ret invalid,ret non't set pid!!");
                            break;
                        }
                    } else {
                        $this->info("ALL Worker Exist, Partion Worker Num:".count($this->partion_work_pid).",Queue Worker Num:".count($this->queue_work_pid));
                    }
                } else {
                    $this->error("wait ret invalid,ret null!!");
                    break;
                }
            }
        });
    }

    private function quit() {
        $this->info("Master Receive Quit Signo.");
        //修改运行状态为停止状态
        $this->status = 'stop';
        //杀掉子进程
        $this->info('Start To Kill QUEUE Process.');
        foreach ($this->queue_work_pid as $k => $v) {
            //平滑退出,用exit;强制退出用kill
            \swoole_process::kill($k);
            unset($this->queue_work_pid[$k]);
            $this->info("QUEUE Child[{$k}] Quit. Worker Num: " . count($this->queue_work_pid));
        }
        $this->info("All QUEUE Child Have Kill");
        $this->info('Start To Kill Partion Child Process.');
        foreach ($this->partion_work_pid as $k => $v) {
            //平滑退出,用exit;强制退出用kill
            \swoole_process::kill($k);
            unset($this->partion_work_pid[$k]);
            $this->info("Partion Child[{$k}] Quit. Worker Num: " . count($this->partion_work_pid));
        }
        $this->info("All Partion Child Have Kill. Master Quit!");
        //主进程退出
        exit;
    }

    private function restart() {
        $this->info("Master Receive SIGUSER1 Signo,Now Resart Worker");
        //杀掉子进程
        $this->info('Start To Kill QUEUE Child Process.');
        foreach ($this->queue_work_pid as $k => $v) {
            //平滑退出,用exit;强制退出用kill
            \swoole_process::kill($k);
            $this->info("Child[{$k}] Quit. Worker Num: " . count($this->queue_work_pid));
        }
        $this->info("All QUEUE Child Have Kill!");

        //杀掉子进程
        $this->info('Start To Kill Partion Child Process.');
        foreach ($this->partion_work_pid as $k => $v) {
            //平滑退出,用exit;强制退出用kill
            \swoole_process::kill($k);
            $this->info("Child[{$k}] Quit. Worker Num: " . count($this->partion_work_pid));
        }
        $this->info("All Partion Child Have Kill!");
    }

    private function alarm($msg)
    {
        echo $msg.PHP_EOL;
    }

    private function init()
    {
        $this->check();
        $this->registerSignal();
    }

    //获取partition信息
    private function get_partitions()
    {
        $consumer = new \RdKafka\Consumer();
        $consumer->addBrokers($this->brokers);
        $topic   = $consumer->newTopic($this->topic);
        $allInfo = $consumer->getMetadata(false, $topic,self::GET_META_DATA_TIMEOUT);
        $topics  = $allInfo->getTopics();
        $partitions = null;
        foreach($topics as $tp) {
            $partitions = $tp->getPartitions();
            break;
        }

        return $partitions;
    }

    //动态获取partition个数
    private function get_partitions_count()
    {
        $consumer = new \RdKafka\Consumer();
        $consumer->addBrokers($this->brokers);
        $topic   = $consumer->newTopic($this->topic);
        $allInfo = $consumer->getMetadata(false, $topic,self::GET_META_DATA_TIMEOUT);
        $topics  = $allInfo->getTopics();
        $partitions = null;
        foreach($topics as $tp) {
            $partitions = $tp->getPartitions();
            break;
        }

        return (($partitions==null) ? 0:count($partitions));
    }
}

猜你喜欢

转载自blog.csdn.net/u013702678/article/details/88234881