• Kafka部署安装及简单使用


    一、环境准备

    1、jdk 8+

    2、zookeeper 

    3、kafka

    说明:在kafka较新版本中已经集成了zookeeper,所以不用单独安装zookeeper,只需要在kafka文件目录中启动zookeeper即可

    二、下载地址

    Apache Kafka

    三、部署

    1、启动zookeeper

    -- 启动
    ./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
    
    -- 查看是否启动成功
    ps -ef | grep zoo

    2、进入解压的kafka目录,修改/config/kafka-server的配置文件

    vi config/server.properties
    -- 重点配置节点说明
    
    listeners=PLAINTEXT://localhost:9092   -- 将localhost修改为主机ip
    
    log.dirs=/bigdata/kafka/logs-1  -- 默认不修改也可

     3、使用kafka-server-start.sh,启动kafka服务

    ./bin/kafka-server-start.sh config/server.properties

    四、使用客户端kafka tools连接kafka

    客户端下载地址:Offset Explorer

    关于客户端如何使用可查看:kafka可视化客户端工具(Kafka Tool)的基本使用 - Frankdeng - 博客园

    五、kafka实战简单使用

    NuGet:Confluent.Kafka

     1、新建.Net Core控制台项目,代码如下:

    static void Main(string[] args)
            {
                // 发送消息
                var producerConfig = new ProducerConfig
                {
                    BootstrapServers = "192.168.140.131:9092",
                    MessageTimeoutMs = 50000
                };
                var builder = new ProducerBuilder(producerConfig);
                using (var producer = builder.Build())
                {
                    var data = new { key = "1", value = "001" };
                    var json = JsonConvert.SerializeObject(data);
                    var dr = producer.ProduceAsync("order", new Message { Key = "order", Value = json }).GetAwaiter().GetResult();
                    Console.WriteLine($"发送事件{dr.Value}到{dr.TopicPartitionOffset}成功");
                }
    
                // 消费消息
                var consumerConfig = new ConsumerConfig {
                    BootstrapServers = "192.168.140.131:9092",
                    AutoOffsetReset=AutoOffsetReset.Earliest,
                    GroupId="1111", // 自定义
                    EnableAutoCommit=true
                };
                var consumerBuilder = new ConsumerBuilder(consumerConfig);
    
    
                using (var consumer = consumerBuilder.Build())
                {
                    // 1、订阅
                    consumer.Subscribe("order");
                    while (true)
                    {
                        try
                        {
                            // 2、消费(自动确认)
                            var result = consumer.Consume();
    
                            // 3、业务逻辑
                            string key = result.Key;
                            string value = result.Value;
    
                            Console.WriteLine($"创建商品:Key:{key}");
                            Console.WriteLine($"创建商品:Order:{value}");
                            consumer.Commit(result);
    
                        }
                        catch (Exception e)
                        {
                            Console.WriteLine($"异常:Order:{e}");
                        }
                    }
                }
            }

    2、使用kafka客户端查看消息投递

  • 相关阅读:
    P1441 砝码称重 (状压
    【Fusion360】常用快捷键和技巧
    【大话云原生】煮饺子与docker、kubernetes之间的关系
    MySQL基础知识总结
    【LeetCode】No.75. Sort Colors -- Java Version
    虚拟局域网
    「废话少说,放码过来」:博客园2024夏季T恤上架预售
    「Django秘境探险:揭开Web开发的神秘面纱」
    ESP8266如何连接两个服务器
    文件系统之程序是怎么打开文件进行操作的
  • 原文地址:https://blog.csdn.net/jh035/article/details/128062117