• Spring Boot 整合 Kafka


    Kafka 环境搭建

    kafka 安装、配置、启动、测试说明:
    
    1. 安装:直接官网下载安装包,解压到指定位置即可(kafka 依赖的 Zookeeper 在文件中已包含)
    下载地址:https://kafka.apache.org/downloads
    示例版本:kafka_2.13-2.8.0.tgz
    下载后可本地解压安装,解压位置自选,如 D:\Java 下
    解压命令:tar -zxvf kafka_2.13-2.8.0.tgz
    PS:可在 idea 命令行窗口或 git 提供的命令窗口中进行命令操作
    使用 git 提供的命令窗口:空白文件夹中右键——》Git Bash Here 即可打开
    
    2. 添加地址配置
    在 D:\Java\kafka_2.13-2.8.0\config\server.properties 中搜索添加以下两行配置:
    listeners=PLAINTEXT://localhost:9092
    advertised.listeners=PLAINTEXT://localhost:9092
    说明:以上配置默认是注释掉的,可搜索找到,根据需求进行自定义地址配置
    
    重要说明:以下命令操作默认都是在 D:\Java\kafka_2.13-2.8.0\ 即 kafaka 根目录下进行!
    
    3. 使用配置文件方式后台启动/关闭 Zookeeper 服务
    启动:bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
    关闭【自选】:bin/zookeeper-server-stop.sh -daemon config/zookeeper.properties
    
    4. 使用配置文件方式后台启动/关闭 kafka 服务
    启动:bin/kafka-server-start.sh -daemon config/server.properties
    关闭【自选】:bin/kafka-server-stop.sh -daemon config/server.properties 
    
    5. 服务测试
    
    5.1 创建主题
    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic Hello-Kafka
    
    5.2 查看主题(可能需要查一会儿)
    bin/kafka-topics.sh --list --zookeeper localhost:2181
    
    说明:发送消息和监听消息需要打开两个窗口进行测试!
    
    5.3 发送消息(kafka 根目录下新建窗口)
    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Hello-Kafka
    输入以上命令回车后,可继续输入内容测试消息发送
    
    5.4 监听消息(kafka 根目录下新建窗口)
    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic Hello-Kafka --from-beginning
    输入以上命令后,可观察消息接收情况,并且可在消息发送窗口继续发送消息测试此监听窗口的接收情况,正常接收,则服务环境搭建成功。
    

    Spring Boot 整合 Kafka

    环境:自行创建 Spring Boot 项目,添加测试依赖,并启动 Zookeeper 和 kafka 服务。

    注意:Zookeeper 默认好像占用 8080 端口,自己注意端口占用问题。

    1. 添加依赖

    
    <dependency>
        <groupId>org.springframework.kafkagroupId>
        <artifactId>spring-kafkaartifactId>
    dependency>
    

    2. 添加配置

    # kafka 配置
    spring:
      kafka:
        bootstrap-servers: localhost:9092
        producer:
          # 发生错误后,消息重发的次数。
          retries: 1
          #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
          batch-size: 16384
          # 设置生产者内存缓冲区的大小。
          buffer-memory: 33554432
          # 键的序列化方式
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          # 值的序列化方式
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
          # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
          # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
          # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
          acks: 1
        consumer:
          # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
          auto-commit-interval: 1S
          # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
          # latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
          # earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
          auto-offset-reset: earliest
          # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
          enable-auto-commit: false
          # 键的反序列化方式
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          # 值的反序列化方式
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        listener:
          # 在侦听器容器中运行的线程数。
          concurrency: 5
          # listner负责ack,每调用一次,就立即commit
          ack-mode: manual_immediate
          missing-topics-fatal: false
    

    3. 创建消息生产者

    @Component
    public class KafkaProducer {
    
        private Logger logger = LoggerFactory.getLogger(KafkaProducer.class);
    
        @Resource
        private KafkaTemplate kafkaTemplate;
    
        public static final String TOPIC_TEST = "Hello-Kafka";
    
        public static final String TOPIC_GROUP = "test-consumer-group";
    
        public void send(Object obj) {
            String obj2String = JSON.toJSONString(obj);
            logger.info("准备发送消息为:{}", obj2String);
    
            // 发送消息
            ListenableFuture> future = kafkaTemplate.send(TOPIC_TEST, obj);
            future.addCallback(new ListenableFutureCallback>() {
                @Override
                public void onFailure(Throwable throwable) {
                    //发送失败的处理
                    logger.info(TOPIC_TEST + " - 生产者 发送消息失败:" + throwable.getMessage());
                }
    
                @Override
                public void onSuccess(SendResult stringObjectSendResult) {
                    //成功的处理
                    logger.info(TOPIC_TEST + " - 生产者 发送消息成功:" + stringObjectSendResult.toString());
                }
            });
        }
    
    }
    

    4. 创建消息消费者

    @Component
    public class KafkaConsumer {
    
        private Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
    
        @KafkaListener(topics = KafkaProducer.TOPIC_TEST, groupId = KafkaProducer.TOPIC_GROUP)
        public void topicTest(ConsumerRecord record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
            Optional message = Optional.ofNullable(record.value());
            if (message.isPresent()) { // 包含非空值,则执行
                Object msg = message.get();
                logger.info("topic_test 消费了: Topic:" + topic + ",Message:" + msg);
                ack.acknowledge(); // 确认成功消费一个消息
            }
        }
    
    }
    

    5. 消息发送测试

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class KafkaProducerTest {
    
        private Logger logger = LoggerFactory.getLogger(KafkaProducerTest.class);
    
        @Resource
        private KafkaProducer kafkaProducer; // 注意使用自己创建的,看清楚!
    
        /*
          测试之前需要开启 Kafka 服务
          启动 Zookeeper:bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
          启动 Kafka:bin/kafka-server-start.sh -daemon config/server.properties
    
          测试结果数据:
    
          准备发送消息为:"你好,我是Lottery 001"
          Hello-Kafka - 生产者 发送消息成功:SendResult [producerRecord=ProducerRecord(topic=Hello-Kafka, partition=null,
          headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value=你好,我是Lottery 001, timestamp=null),
          recordMetadata=Hello-Kafka-0@47]
    
          topic_test 消费了: Topic:Hello-Kafka,Message:你好,我是Lottery 001
         */
        @Test
        public void test_send() throws InterruptedException {
            // 循环发送消息
            while (true) {
                kafkaProducer.send("你好,我是Lottery 001");
                Thread.sleep(3500);
            }
        }
    
    }
    

    __EOF__

  • 本文作者: Luis
  • 本文链接: https://www.cnblogs.com/luisblog/p/17307292.html
  • 关于博主: 评论和私信会在第一时间回复。或者直接私信我。
  • 版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!
  • 声援博主: 如果您觉得文章对您有帮助,可以点击文章右下角推荐一下。
  • 相关阅读:
    《设计模式:可复用面向对象软件的基础》——行为模式(2)(笔记)
    手写一个自己的mystrstr
    提升UI设计能力的几种套路 优漫动游
    【图论】SPFA求负环
    【数据结构:线性表——2.1 向量】
    【python 学习】如何将Python脚本打包成可执行的exe文件#520表白代码
    【Vivado HLS Bug】Ubuntu环境下Vivado HLS导出IP报错:HLS ERROR: [IMPL 213-28]
    为什么使用Golang而非Rust开发桌面应用?
    java web核心 HTTP 请求数据 响应数据 Tomcat基本使用 Servlet
    基于模态凝聚算法的特征系统实现算法的自然激励技术(Matlab代码实现)
  • 原文地址:https://www.cnblogs.com/luisblog/p/17307292.html