说明:网上有好几种PHP操作kafka的扩展,有kafka-php和php-rdkafka两种是比较流行的。但其中kafka-php功能较全,但是使用composer安装,对于内网用户不能访问外网,因此,我们选择php-rdkafka。
php-rdkafka依赖librdkafka,先下载安装包librdkafka和php-rdkafka
1、新建一个目录用来存在扩展包
mkdir /home/kafka
2、将两个扩展包放到/home/kafka目录下
3、安装librdkafka
unzip librdkafka-master.zip #解压
cd librdkafka-master #进入安装目录
./configure #配置
make && make install
4、安装php-rdkafka
unzip php-rdkafka-master.zip
cd php-rdkafka-master
/usr/local/php/bin/phpize #安装php扩展
./configure --with-php-config=/usr/local/php/bin/php-config --with-rdkafka #配置
make all -j 5
make install
#装载扩展到配置
vim /usr/local/php/etc/php.ini
添加
extension = rdkafka.so
重启apache
重启php-fpm
5、打开phpinfo出现下图表示安装成功
6、测试一下
/usr/local/php/bin/php -c \a
/usr/local/php/etc/php.ini -r "new Rdkafka\Conf();"
不报错表示成功
7、生产者和消费者类
<?php
/**
* User: TCF_jingfeng
* Date: 18-8-27
* Time: 上午9:09
*/
class Kafka
{
// public $broker_list = 'localhost:9092';//配置kafka,可以用逗号隔开多个kafka
// public $topic = 'test';
public $broker_list = '你自己的地址:9092';//配置kafka,可以用逗号隔开多个kafka
public $topic='';//你自己的topic
public $partition = 0;
protected $producer = null;
protected $consumer = null;
public function __construct()
{
if (empty($this->broker_list)) {
throw new InvalidConfigException("broker not config");
}
$rk = new \RdKafka\Producer();
if (empty($rk)) {
throw new InvalidConfigException("producer error");
}
$rk->setLogLevel(LOG_DEBUG);
if (!$rk->addBrokers($this->broker_list)) {
throw new InvalidConfigException("producer error");
}
$this->producer = $rk;
}
public function setTopic($mytopic){
$this->topic = $mytopic;
}
/**
* 生产者
* @param array $messages
* @return mixed
*/
public function send($messages = [])
{
$topic = $this->producer->newTopic($this->topic);
return $topic->produce(RD_KAFKA_PARTITION_UA, $this->partition, json_encode($messages));
}
/**
* 消费者
*/
public function consumer($object, $callback)
{
$conf = new \RdKafka\Conf();
$conf->set('group.id', 0);
$conf->set('metadata.broker.list', $this->broker_list);
$topicConf = new \RdKafka\TopicConf();
$topicConf->set('auto.offset.reset', 'smallest');
$conf->setDefaultTopicConf($topicConf);
$consumer = new \RdKafka\KafkaConsumer($conf);
$consumer->subscribe([$this->topic]);
echo "waiting for messages.....\n";
while (true) {
$message = $consumer->consume(120 * 1000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
echo "message payload....";
$object->$callback($message->payload);
break;
}
sleep(1);
}
}
}
8、生产者代码
$producer = new Kafka();
$producer ->setTopic('errorlog');
$producer ->send("hello kafka");