• 5-3:Spring整合Kafka


    引入依赖

    spring-kafka

    在这里插入图片描述

    
    <dependency>
        <groupId>org.springframework.kafkagroupId>
        <artifactId>spring-kafkaartifactId>
        /*可以注释掉,利用父版本所声明的版本即可*/<version>3.0.0version>
    dependency>
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    配置Kafka

    配置server、consumer

    在这里插入图片描述在这里插入图片描述
    在application.propertities中配置

    # KafkaProperties
    # 配置服务,消费者:消费者的分组ID:在consumer的配置文件里有
    # 是否自动提交消费者的偏移量。消费者读取消息时按偏移量读取,一般是移动提交
    # 自动提交的频率,多长时间提交一次:3000ms
    spring.kafka.bootstrap-servers=localhost:9092 
    spring.kafka.consumer.group-id=community-consumer-group
    spring.kafka.consumer.enable-auto-commit=true
    spring.kafka.consumer.auto-commit-interval=3000
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    访问Kafka
    • 生产者
      kafkaTemplate.send (topic, data) ;
    • 消费者
      @KafkaListener (topics = { “test” })
      public void handleMessage (ConsumerRecord record){}

    利用方法,监听主题,一旦主题上有消息,就会调用方法去处理消息。会把消息包装成record,对record做出处理

    测试代码
    package com.nowcoder.community;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.stereotype.Component;
    import org.springframework.test.context.ContextConfiguration;
    import org.springframework.test.context.junit4.SpringRunner;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    @ContextConfiguration(classes = CommunityApplication.class)
    public class KafkaTests {
    
        @Autowired
        private KafkaProducer kafkaProducer;
    
        @Test
        public void testKafka() {//用生产者发送消息,然后用消费者处理消息(这里是打印)
            kafkaProducer.sendMessage("test", "你好");
            kafkaProducer.sendMessage("test", "在吗");
            //发完消息后,不能立刻结束,否则就看不到消费者的输出,
            //生产者是主动的,消费者是被动的,可能会有一点延迟
    
            try {
                Thread.sleep(1000 * 10);//1000ms*10
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
    }
    
    @Component
    class KafkaProducer {//一般会将生产者或消费者的代码封装,希望用spring的容器来管理
    
        @Autowired
        private KafkaTemplate kafkaTemplate;//被spring整合的kafka工具
    
        public void sendMessage(String topic, String content) {//传入消息的主题,消息的内容
            kafkaTemplate.send(topic, content);
        }
    
    }
    
    @Component
    class KafkaConsumer {//封装消费者的bean
        //指定要监听的主题,不需要依赖template,服务启动了后,spring就会自动监听
        //消费者就会有一个线程阻塞,试图读取test主题的消息,若有消息,就会读取,若没有,就会阻塞
        @KafkaListener(topics = {"test"})
        public void handleMessage(ConsumerRecord record) {
            //处理一个消息
            System.out.println(record.value());//读取消息
        }
    
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
  • 相关阅读:
    猿创征文|HCIE-Security Day58:内容过滤技术
    数学建模如何创新
    Superset (三) --------- Superset 使用
    干货|供应链办理体系的特色有哪些?
    HDMI之EDID
    文本处理三剑客之 sed 流编辑器(高级部分)
    【web前端开发】后台PHP
    软件测试面试题【2023最新合集】
    DevOps2023现状报告|注重文化、以用户为中心是成功的关键
    华为od德科面试数据算法解析 2022-7-2 相对开音节
  • 原文地址:https://blog.csdn.net/qq_41026725/article/details/128189142