• 宝塔下 php7.4 使用kafka


    先安装 rdkafka

    GitHub - edenhill/librdkafka: The Apache Kafka C/C++ library

    下载压缩包,丢到服务器上

    cd 到压缩包目录,进行安装

    1. cd librdkafka
    2. ./configure
    3. make && make install

    安装php-rdkafka扩展

    1. wget https://github.com/arnaud-lb/php-rdkafka/archive/4.0.2.tar.gz
    2. cd php-rdkafka-4.0.2/
    3. /www/server/php/74/bin/phpize
    4. ./configure --with-php-config=/www/server/php/74/bin/php-config
    5. make && make install

    添加extension=rdkafka.so 到php.ini文件 放到文件最后一行即可

    宝塔php7.4文件位置:

    /www/server/php/74/etc/php.ini

    /www/server/php/74/etc/php-cli.ini

    然后重载一下php即可。

    tp6的使用方法

    composer安装kafka,命令:

    composer require nmred/kafka-php

    nmred/kafka-php - Packagist

    代码:使用cli的方式调用消费者会502,调用消费者一般使用命令行方式

    1. //生产者
    2. public function index()
    3. {
    4. $config = \Kafka\ProducerConfig::getInstance();
    5. $config->setMetadataRefreshIntervalMs(10000);
    6. $config->setMetadataBrokerList('localhost:9092');
    7. $config->setBrokerVersion('1.0.0');
    8. $config->setRequiredAck(1);
    9. $config->setIsAsyn(false);
    10. $config->setProduceInterval(500);
    11. $producer = new \Kafka\Producer(
    12. function() {
    13. return [
    14. [
    15. 'topic' => 'test',
    16. 'value' => 'test....message.',
    17. 'key' => 'testkey',
    18. ],
    19. ];
    20. }
    21. );
    22. $producer->success(function($result) {
    23. var_dump($result);
    24. });
    25. $producer->error(function($errorCode) {
    26. var_dump($errorCode);
    27. });
    28. $producer->send(true);
    29. }
    30. //消费者
    31. public function xiaofei()
    32. {
    33. $config = \Kafka\ConsumerConfig::getInstance();
    34. $config->setMetadataRefreshIntervalMs(10000);
    35. $config->setMetadataBrokerList('localhost:9092');
    36. $config->setGroupId('test');
    37. $config->setBrokerVersion('1.0.0');
    38. $config->setTopics(['topicA']);
    39. //$config->setOffsetReset('earliest');
    40. $consumer = new \Kafka\Consumer();
    41. $consumer->start(function($topic, $part, $message) {
    42. var_dump($message);
    43. });
    44. }

    原生php调用:

    也是使用composer包,命令:composer require nmred/kafka-php

    目录结构:

     

    生产者文件:

    1. require './vendor/autoload.php';
    2. date_default_timezone_set('PRC');
    3. $config = \Kafka\ProducerConfig::getInstance();
    4. $config->setMetadataRefreshIntervalMs(10000);
    5. $config->setMetadataBrokerList('localhost:9092');
    6. $config->setBrokerVersion('1.0.0');
    7. $config->setRequiredAck(1);
    8. $config->setIsAsyn(false);
    9. $config->setProduceInterval(500);
    10. $producer = new \Kafka\Producer();
    11. for($i = 0; $i < 10; $i++) {
    12. $result = $producer->send([
    13. [
    14. 'topic' => 'topic',
    15. 'value' => '今晚军训甄姬 '.$i.' 次',
    16. 'key' => '',
    17. ],
    18. ]);
    19. var_dump($result);
    20. }

    消费者文件:

    1. require './vendor/autoload.php';
    2. date_default_timezone_set('PRC');
    3. $config = \Kafka\ConsumerConfig::getInstance();
    4. $config->setMetadataRefreshIntervalMs(10000);
    5. $config->setMetadataBrokerList('localhost:9092');
    6. $config->setGroupId('test');
    7. $config->setBrokerVersion('1.0.0');
    8. $config->setTopics(array('topic'));
    9. //$config->setOffsetReset('earliest');
    10. $consumer = new \Kafka\Consumer();
    11. $consumer->start(function ($topic, $part, $message) {
    12. var_dump($message);
    13. });

    运行生产者:php producer.php

    运行 消费者:php demo/consumer.php

     

  • 相关阅读:
    javascript生成ics、日程、日历
    SpringBoot集成腾讯云云点播服务/视频上传
    【vue2高德地图api】03-完善展示页,并且调用poi搜索接口
    Java之SpringCloud Alibaba【六】【Alibaba微服务分布式事务组件—Seata】
    vue3前端excel导出;组件表格,自定义表格导出;Vue3 + xlsx + xlsx-style
    纯html项目配置babel,报错Uncaught ReferenceError: require is not defined
    原型设计模式
    redis 高可用
    Pyecharts | 《白蛇2:青蛇劫起》20000+数据分析可视化
    Qt网络编程之搭建Udp通信【单播、组播、广播】
  • 原文地址:https://blog.csdn.net/weixin_42592326/article/details/126075557