• 【Kafka】SpringBoot整合Kafka


    1.引入依赖

    1. <dependency>
    2. <groupId>org.springframework.kafkagroupId>
    3. <artifactId>spring-kafkaartifactId>
    4. dependency>

    2.配置参数

    1. server:
    2. port: 8080
    3. spring:
    4. kafka:
    5. bootstrap-servers: 101.34.251.168:9092
    6. producer: # ⽣产者
    7. retries: 3 # 设置⼤于0的值,则客户端会将发送失败的记录重新发送
    8. batch-size: 16384
    9. buffer-memory: 33554432
    10. acks: 1
    11. # 指定消息key和消息体的编解码⽅式
    12. key-serializer: org.apache.kafka.common.serialization.StringSerializer
    13. value-serializer: org.apache.kafka.common.serialization.StringSerializer
    14. consumer:
    15. group-id: default-group
    16. enable-auto-commit: false
    17. auto-offset-reset: earliest
    18. key-deserializer:
    19. org.apache.kafka.common.serialization.StringDeserializer
    20. value-deserializer:
    21. org.apache.kafka.common.serialization.StringDeserializer
    22. max-poll-records: 500
    23. listener:
    24. # 当每⼀条记录被消费者监听器(ListenerConsumer)处理之后提交
    25. # RECORD
    26. # 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
    27. # BATCH
    28. # 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间⼤于TIME时提交
    29. # TIME
    30. # 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量⼤于等于COUNT时提交
    31. # COUNT
    32. # TIME | COUNT 有⼀个条件满⾜时提交
    33. # COUNT_TIME
    34. # 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后, ⼿动调⽤Acknowledgment.acknowledge()后提交
    35. # MANUAL
    36. # ⼿动调⽤Acknowledgment.acknowledge()后⽴即提交,⼀般使⽤这种
    37. # MANUAL_IMMEDIATE
    38. ack-mode: MANUAL_IMMEDIATE
    39. # redis:
    40. # host: 172.16.253.21

    3.生产者搭建

    1. package com.wen.kafka.controller;
    2. import org.springframework.beans.factory.annotation.Autowired;
    3. import org.springframework.kafka.core.KafkaTemplate;
    4. import org.springframework.web.bind.annotation.RequestMapping;
    5. import org.springframework.web.bind.annotation.RestController;
    6. @RestController
    7. @RequestMapping("msg")
    8. public class ProducerController {
    9. @Autowired
    10. private KafkaTemplate kafkaTemplate;
    11. @RequestMapping("/send")
    12. public String sendMessage(){
    13. kafkaTemplate.send("test", "key", "msg2");
    14. return "Send Success";
    15. }
    16. }

    4.消费者搭建

    1. package com.wen.kafka.consumer;
    2. import org.apache.kafka.clients.consumer.ConsumerRecord;
    3. import org.springframework.kafka.annotation.KafkaListener;
    4. import org.springframework.stereotype.Component;
    5. @Component
    6. public class Consumer {
    7. @KafkaListener(topics = "test", groupId = "GroupOne")
    8. public void listenGroup(ConsumerRecord record){
    9. System.out.println(record);
    10. }
    11. }

  • 相关阅读:
    windows service项目里关于定时器的用法
    【无标题】
    7-91 统计字符出现次数
    85智慧楼宇建设解决方案
    vue-2
    面试官:Redis中集合数据类型的内部实现方式是什么?
    LogbackMDC 2022年有变动?
    HTTPS 协议的加密
    PG数据库字符截取
    Docker - 镜像
  • 原文地址:https://blog.csdn.net/qq_58137891/article/details/134533875