先安装 rdkafka
GitHub - edenhill/librdkafka: The Apache Kafka C/C++ library
下载压缩包,丢到服务器上
cd 到压缩包目录,进行安装
- cd librdkafka
- ./configure
- make && make install
安装php-rdkafka扩展
- wget https://github.com/arnaud-lb/php-rdkafka/archive/4.0.2.tar.gz
- cd php-rdkafka-4.0.2/
- /www/server/php/74/bin/phpize
- ./configure --with-php-config=/www/server/php/74/bin/php-config
- make && make install
添加extension=rdkafka.so 到php.ini文件 放到文件最后一行即可
宝塔php7.4文件位置:
/www/server/php/74/etc/php.ini
/www/server/php/74/etc/php-cli.ini
然后重载一下php即可。
tp6的使用方法
composer安装kafka,命令:
composer require nmred/kafka-php
代码:使用cli的方式调用消费者会502,调用消费者一般使用命令行方式
- //生产者
- public function index()
- {
- $config = \Kafka\ProducerConfig::getInstance();
- $config->setMetadataRefreshIntervalMs(10000);
- $config->setMetadataBrokerList('localhost:9092');
- $config->setBrokerVersion('1.0.0');
- $config->setRequiredAck(1);
- $config->setIsAsyn(false);
- $config->setProduceInterval(500);
- $producer = new \Kafka\Producer(
- function() {
- return [
- [
- 'topic' => 'test',
- 'value' => 'test....message.',
- 'key' => 'testkey',
- ],
- ];
- }
- );
- $producer->success(function($result) {
- var_dump($result);
- });
- $producer->error(function($errorCode) {
- var_dump($errorCode);
- });
- $producer->send(true);
- }
-
- //消费者
- public function xiaofei()
- {
- $config = \Kafka\ConsumerConfig::getInstance();
- $config->setMetadataRefreshIntervalMs(10000);
- $config->setMetadataBrokerList('localhost:9092');
- $config->setGroupId('test');
- $config->setBrokerVersion('1.0.0');
- $config->setTopics(['topicA']);
- //$config->setOffsetReset('earliest');
- $consumer = new \Kafka\Consumer();
- $consumer->start(function($topic, $part, $message) {
- var_dump($message);
- });
- }
原生php调用:
也是使用composer包,命令:composer require nmred/kafka-php
目录结构:

生产者文件:
- require './vendor/autoload.php';
- date_default_timezone_set('PRC');
-
- $config = \Kafka\ProducerConfig::getInstance();
- $config->setMetadataRefreshIntervalMs(10000);
- $config->setMetadataBrokerList('localhost:9092');
- $config->setBrokerVersion('1.0.0');
- $config->setRequiredAck(1);
- $config->setIsAsyn(false);
- $config->setProduceInterval(500);
- $producer = new \Kafka\Producer();
-
-
-
- for($i = 0; $i < 10; $i++) {
- $result = $producer->send([
- [
- 'topic' => 'topic',
- 'value' => '今晚军训甄姬 '.$i.' 次',
- 'key' => '',
- ],
- ]);
- var_dump($result);
- }
消费者文件:
-
- require './vendor/autoload.php';
- date_default_timezone_set('PRC');
-
-
-
- $config = \Kafka\ConsumerConfig::getInstance();
- $config->setMetadataRefreshIntervalMs(10000);
- $config->setMetadataBrokerList('localhost:9092');
- $config->setGroupId('test');
- $config->setBrokerVersion('1.0.0');
- $config->setTopics(array('topic'));
- //$config->setOffsetReset('earliest');
- $consumer = new \Kafka\Consumer();
-
- $consumer->start(function ($topic, $part, $message) {
- var_dump($message);
- });
运行生产者:php producer.php

运行 消费者:php demo/consumer.php
