• springboot集成kafka详解


    1、kafka部署:
    (1)先创建一个网络:
    docker network create app-tier --driver bridge
    
    
    • 1
    • 2
    (2)安装zookeeper,kafka依赖zookeeper所以需要先安装zookeeper:
    docker run -d --name zookeeper-server --network app-tier -p 2181:2181 -e ALLOW_ANONYMOUS_LOGIN=yes bitnami/zookeeper:latest
    
    
    • 1
    • 2
    (3)安装Kafka:
    docker run -d --name kafka-server --network app-tier -p 9092:9092 -e ALLOW_PLAINTEXT_LISTENER=yes -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-server:2181 -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.202.211:9092 bitnami/kafka:latest
    
    
    • 1
    • 2
    参数解释:
    ALLOW_PLAINTEXT_LISTENER		# 任何人可以访问
    KAFKA_CFG_ZOOKEEPER_CONNECT		# 链接的zookeeper
    KAFKA_CFG_ADVERTISED_LISTENERS		# 当前主机IP或地址(我的主机IP为:192.168.202.211)
    
    
    • 1
    • 2
    • 3
    • 4
    (4)部署kafka图形化管理工具,选择kafka-map或kafka-manager:
    kafka-map(推荐)
    docker run -d --name kafka-map --network app-tier -p 9090:8080 -v /root/kafka-map/data:/usr/local/kafka-map/data -e DEFAULT_USERNAME=admin -e DEFAULT_PASSWORD=admin --restart always dushixiang/kafka-map:latest
    
    访问地址:192.168.202.211:9090
    账号密码:admin/admin
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    kafka-manager(不好用,不推荐)
    docker run --name kafka-manager -d --network app-tier -p 9091:9000 -e ZK_HOSTS="zookeeper-server:2181" sheepkiller/kafka-manager
    
    访问地址:192.168.202.211:9091
    
    
    • 1
    • 2
    • 3
    • 4
    2、springboot集成kafka:
    (1)pom文件里引入kafka依赖
    
    <dependency>
        <groupId>org.springframework.kafkagroupId>
        <artifactId>spring-kafkaartifactId>
        <version>2.8.1version>
    dependency>
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    (2)application.yml配置文件:
    spring:
      kafka:
        bootstrap-servers: 192.168.202.211:9092          # Kafka服务器地址和端口
        consumer:
          group-id: my-group-id                          # 消费者组ID,与部署的Kafka服务中的组ID保持一致
          auto-offset-reset: earliest                    # 从最早的消息开始消费,可选值:earliest, latest, none
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    (3)在你的项目里新建文件结构如下:
    biz/src/java/com/heurd
        		     |______BizApplication.java		# 启动类
                     |______biz
        		     |_______|____controller
        		     |_______|______|____KafkaController.java
        		     |_______|____kafka
       		         |_______|______|____producer
        		     |_______|______|________|____KafkaProducer.java	# 生产者
        		     |_______|______|____consumer
        		     |_______|______|________|____KafkaConsumer.java	# 消费者
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    生产者,KafkaProducer.java文件内容如下:
    package com.heurd.intellidigital.service.modular.data.kafka.producer;
    
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.stereotype.Component;
    
    /**
     * @Author: goggle
     * @Date: 2023-11-10 14:19
     * @Desperation: kafka生产者
     */
    @Component
    public class KafkaProducer {
    
        private final KafkaTemplate<String, String> kafkaTemplate;
    
        public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
            this.kafkaTemplate = kafkaTemplate;
        }
    
        public void sendMessage(String topicName, String message) {
            System.out.printf("topicName: %s, message: %s%n", topicName, message);
            kafkaTemplate.send(topicName, message);
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    消费者,KafkaConsumer.java文件内容如下:
    package com.heurd.intellidigital.service.modular.data.kafka.consumer;
    
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;
    
    /**
     * @Author: goggle
     * @Date: 2023-11-10 14:19
     * @Desperation: kafka消费者
     */
    @Component
    public class KafkaConsumer {
    
        @KafkaListener(topics = "my-topic", groupId = "my-group-id") // 指定监听的主题和组ID
        public void listen(String message) {
            System.out.println("Received message: " + message);
            // 处理接收到的消息
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    controller文件内容如下:
    package com.heurd.intellidigital.service.modular.data.controller;
    
    import com.heurd.intellidigital.service.modular.data.kafka.consumer.KafkaConsumer;
    import com.heurd.intellidigital.service.modular.data.kafka.producer.KafkaProducer;
    import com.heurd.intellimes.core.annotion.BusinessLog;
    import com.heurd.intellimes.core.enums.LogAnnotionOpTypeEnum;
    import com.heurd.intellimes.core.pojo.response.ResponseData;
    import com.heurd.intellimes.core.pojo.response.SuccessResponseData;
    import io.swagger.annotations.Api;
    import io.swagger.annotations.ApiOperation;
    import org.springframework.web.bind.annotation.*;
    
    import javax.annotation.Resource;
    
    /**
     * @Author: goggle
     * @Date: 2023-11-10 14:19
     * @Desperation: kafka控制器
     */
    @RestController
    @RequestMapping("/kafka")
    public class KafkaController {
    
        @Resource
        private KafkaProducer kafkaProducer;
    
        @Resource
        private KafkaConsumer kafkaConsumer;
    
        /**
         * kafka测试
         *
         * @author heurd
         * @date 2023-11-10 15:29:19
         */
        @ApiOperation("kafka测试")
        @GetMapping(value = "/test")
        public ResponseData<Object> kafka() {
    
            String message = "Hello, Kafka!"; 			// 要发送的消息内容
            kafkaProducer.sendMessage("my-topic", message); 	// 发送消息到指定主题(这里为"my-topic")
            // 此时,Kafka消费者会自动监听到该消息并调用listen()方法进行处理。注意:消费者的监听是异步的,因此不会立即收到响应。如果您需要等待响应,请根据实际情况进行修改。
    
            return new SuccessResponseData<>(message);
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    (4)调用 /kafka/test 接口,打印结果:
    生产者:topicName: my-topic, message: Hello, Kafka!
    
    消费者:Received message: Hello, Kafka!
    
    
    • 1
    • 2
    • 3
    • 4
  • 相关阅读:
    现货白银入门技巧:心理线
    【微信小程序入门到精通】— 轮播图你会了么?快速拿下 swiper 和 swiper-item
    计算机毕业设计JAVA网上商城购物系统mybatis+源码+调试部署+系统+数据库+lw
    深入理解XGBoost:集成学习与堆叠模型
    基于匹配追踪和最大重叠离散小波变换的ECG心电信号R波检测(MATLAB 2018a)
    NLP模型笔记2022-33:Sentence-BERT STS模型列表与预训练方法
    D365: 压缩
    LabVIEW如何获取波形图上游标所在位置的数值
    【业务功能109】微服务-springcloud-springboot-Skywalking-链路追踪-监控
    [附源码]java毕业设计毕业设计管理系统
  • 原文地址:https://blog.csdn.net/qq_33867131/article/details/134393928