• 简单实现,在nodejs中简单使用kafka


    什么是 Kafka

    Kafka 是由 Linkedin 公司开发的,它是一个分布式的,支持多分区、多副本,基于 Zookeeper 的分布式消息流平台,它同时也是一款开源的基于发布订阅模式的消息引擎系统

    Kafka 的基本术语

    消息:Kafka 中的数据单元被称为消息,也被称为记录,可以把它看作数据库表中某一行的记录。

    批次:为了提高效率, 消息会分批次写入 Kafka,批次就代指的是一组消息。

    主题:消息的种类称为 主题(Topic),可以说一个主题代表了一类消息。相当于是对消息进行分类。主题就像是数据库中的表。

    分区:主题可以被分为若干个分区(partition),同一个主题中的分区可以不在一个机器上,有可能会部署在多个机器上,由此来实现 kafka 的伸缩性,单一主题中的分区有序,但是无法保证主题中所有的分区有序

    一、本地简单搭建kafka

    1.1 下载并解压2.8.1版本 或者其他版本(点击下载:Apache Kafka

    下载后,解压到指定文件夹,并创建两个文件夹以后使用

    1.2修改(如下图)配置文件

    1.config下的zookeeper.properties,修改为刚才创建data的真实路径

    2.config下的server.properties,修改为刚才创建kafka-log的真实路径

    1.3启动项目 顺序启动

    1.启动zookeeper

    到解压文件夹下执行:不要关闭窗口

    .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

    2.启动kafka

    到解压文件夹下执行:不要关闭窗口

    .\bin\windows\kafka-server-start.bat .\config\server.properties

    3.创建一个为test的topic

    不要关闭窗口

    .\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test1

    4.创建生产者

    到解压文件夹下执行:

    .\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test1


    5.创建消费者

    .\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test1 --from-beginning

    6.测试一下

    生产者发消息后消费者是否可以收到

    二.在nodejs中使用kafka

    2.1安装kafka-node
    命令 npm install kafka-node

    2.2 创建生产者
    import kafka  = require('kafka-node');
    var client = new kafka.KafkaClient({kafkaHost:'127.0.0.1:9092'});
    var Producer = kafka.Producer;
    var topic = 'test1';
    var payloads = [{  // 需要发送的一些配置信息
            topic: topic,
            messages: 'arrange'  // 需要生产的消息
        }];  // 此处必须要使用数组的形式,因为payloads的遍历采用的是foreach
    producer.on('ready', function () {
       producer.send(payloads, function (err, data) {
          console.log(payloads);
          console.log("=======");
          console.log(data);
       });
    })

    producer.on('error', function (err) {
      console.log('error', err);
    });

    2.3创建消费者

    import kafka  = require('kafka-node');
    var client = new kafka.KafkaClient({kafkaHost:'127.0.0.1:9092'});
    var topic = 'test1';
    var payloads = [{  // 需要发送的一些配置信息
            topic: topic,
            messages: 'arrange'  // 需要生产的消息
        }];  // 此处必须要使用数组的形式,因为payloads的遍历采用的是foreach
    var options = {  // 消费者的选择
        host: 'localhost:9092',
        sessionTimeout: 15000,
        autoCommit: true
    };
    var consumer = new kafka.Consumer(client, payloads, options);
    consumer.on('message', function (message) {
        console.log(message);
    });
     

  • 相关阅读:
    linux下常用命令
    WPF 设置全局静态参数
    elasticsearch实现mysql数据同步
    05、docker安装nginx
    MeterSphere 至学篇
    暑假加餐|有钱人和你想的不一样(第14天)+基于改进量子粒子群算法的电力系统经济调度(Matlab代码实现)
    SpringBoot-MQTT实现消息的发布订阅
    Android应用集成RabbitMQ消息处理指南
    AI伦理:科技发展中的人性之声
    【mybatis-plus进阶】多租户场景中多数据源自定义来源dynamic-datasource实现
  • 原文地址:https://blog.csdn.net/qqxinxi/article/details/134376772