https://arnaud.le-blanc.net/php-rdkafka-doc/phpdoc/book.rdkafka.html
https://github.com/arnaud-lb/php-rdkafka
https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-PHP
https://pecl.php.net/package/rdkafka
将下载的拓展包中的 librdkafka.dll移到php安装根目录下,php_rdkafka.dll移到php/ext目录下
再php.ini文件中增加
extension=php_rdkafka.dll
pecl install rdkafka
extension=rdkafka.so
- git clone https://github.com/arnaud-lb/php-rdkafka.git
- cd php-rdkafka
- phpize
- ./configure
- make all -j 5
extension=rdkafka.so
安装过程中提示如下错误解决
安装librdkafka
- git clone https://github.com/edenhill/librdkafka.git
- cd librdkafka
- chmod 777 configure lds-gen.py
- ./configure
- make && make install
如果安装librdkafka过程中出现如下错误解决
将Makefile.config里面的WITH_LDS=y这一行注释掉即可
- $conf = new \RdKafka\Conf();
-
- $conf->set('sasl.mechanisms', 'PLAIN');
- //SASL认证账户
- $conf->set('sasl.username', 'xxx');
- //SASL认证密码
- $conf->set('sasl.password', 'xxx');
- //认证协议
- $conf->set('security.protocol', 'SASL_SSL');
- //证书地址
- $conf->set('ssl.ca.location', '/xxx/xxx.crt');
-
- // 分组ID
- $conf->set('group.id', 'xxx');
-
- // 消费地址
- $conf->set('metadata.broker.list', 'xxx.xxx.xxx.xxx:xxx');
-
- // 偏移量
- $conf->set('auto.offset.reset', 'earliest');
-
-
- $consumer = new \RdKafka\KafkaConsumer($conf);
-
- // 消费topic
- $consumer->subscribe(['XXX']);
-
- echo "Waiting for partition assignment... (make take some time when\n";
- echo "quickly re-joining the group after leaving it.)\n";
-
- while (true) {
- $message = $consumer->consume(30*1000);
- switch ($message->err) {
- case RD_KAFKA_RESP_ERR_NO_ERROR:
- var_dump($message);
- break;
- case RD_KAFKA_RESP_ERR__PARTITION_EOF:
- echo "No more messages; will wait for more\n";
- break;
- case RD_KAFKA_RESP_ERR__TIMED_OUT:
- echo "Timed out\n";
- break;
- default:
- throw new \Exception($message->errstr(), $message->err);
- break;
- }
- }