• Kafka部署安装及简单使用


    一、环境准备

    1、jdk 8+

    2、zookeeper 

    3、kafka

    说明:在kafka较新版本中已经集成了zookeeper,所以不用单独安装zookeeper,只需要在kafka文件目录中启动zookeeper即可

    二、下载地址

    Apache Kafka

    三、部署

    1、启动zookeeper

    -- 启动
    ./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
    
    -- 查看是否启动成功
    ps -ef | grep zoo

    2、进入解压的kafka目录,修改/config/kafka-server的配置文件

    vi config/server.properties
    -- 重点配置节点说明
    
    listeners=PLAINTEXT://localhost:9092   -- 将localhost修改为主机ip
    
    log.dirs=/bigdata/kafka/logs-1  -- 默认不修改也可

     3、使用kafka-server-start.sh,启动kafka服务

    ./bin/kafka-server-start.sh config/server.properties

    四、使用客户端kafka tools连接kafka

    客户端下载地址:Offset Explorer

    关于客户端如何使用可查看:kafka可视化客户端工具(Kafka Tool)的基本使用 - Frankdeng - 博客园

    五、kafka实战简单使用

    NuGet:Confluent.Kafka

     1、新建.Net Core控制台项目,代码如下:

    static void Main(string[] args)
            {
                // 发送消息
                var producerConfig = new ProducerConfig
                {
                    BootstrapServers = "192.168.140.131:9092",
                    MessageTimeoutMs = 50000
                };
                var builder = new ProducerBuilder(producerConfig);
                using (var producer = builder.Build())
                {
                    var data = new { key = "1", value = "001" };
                    var json = JsonConvert.SerializeObject(data);
                    var dr = producer.ProduceAsync("order", new Message { Key = "order", Value = json }).GetAwaiter().GetResult();
                    Console.WriteLine($"发送事件{dr.Value}到{dr.TopicPartitionOffset}成功");
                }
    
                // 消费消息
                var consumerConfig = new ConsumerConfig {
                    BootstrapServers = "192.168.140.131:9092",
                    AutoOffsetReset=AutoOffsetReset.Earliest,
                    GroupId="1111", // 自定义
                    EnableAutoCommit=true
                };
                var consumerBuilder = new ConsumerBuilder(consumerConfig);
    
    
                using (var consumer = consumerBuilder.Build())
                {
                    // 1、订阅
                    consumer.Subscribe("order");
                    while (true)
                    {
                        try
                        {
                            // 2、消费(自动确认)
                            var result = consumer.Consume();
    
                            // 3、业务逻辑
                            string key = result.Key;
                            string value = result.Value;
    
                            Console.WriteLine($"创建商品:Key:{key}");
                            Console.WriteLine($"创建商品:Order:{value}");
                            consumer.Commit(result);
    
                        }
                        catch (Exception e)
                        {
                            Console.WriteLine($"异常:Order:{e}");
                        }
                    }
                }
            }

    2、使用kafka客户端查看消息投递

  • 相关阅读:
    c语言 编程及答案
    MySQL奇偶数判断
    codesys TCP客户端程序
    聊聊 mysql 事务?(一)
    新知实验室TRTC 初体验
    【无标题】
    CSP-S2022 一轮游
    力扣刷题记录120.1-----718. 最长重复子数组
    打电话用蓝牙耳机什么牌子好?打电话清晰的蓝牙耳机推荐
    04-nginx静态资源部署实战
  • 原文地址:https://blog.csdn.net/jh035/article/details/128062117