• 【Kafka】SpringBoot整合Kafka


    1.引入依赖

    1. <dependency>
    2. <groupId>org.springframework.kafkagroupId>
    3. <artifactId>spring-kafkaartifactId>
    4. dependency>

    2.配置参数

    1. server:
    2. port: 8080
    3. spring:
    4. kafka:
    5. bootstrap-servers: 101.34.251.168:9092
    6. producer: # ⽣产者
    7. retries: 3 # 设置⼤于0的值,则客户端会将发送失败的记录重新发送
    8. batch-size: 16384
    9. buffer-memory: 33554432
    10. acks: 1
    11. # 指定消息key和消息体的编解码⽅式
    12. key-serializer: org.apache.kafka.common.serialization.StringSerializer
    13. value-serializer: org.apache.kafka.common.serialization.StringSerializer
    14. consumer:
    15. group-id: default-group
    16. enable-auto-commit: false
    17. auto-offset-reset: earliest
    18. key-deserializer:
    19. org.apache.kafka.common.serialization.StringDeserializer
    20. value-deserializer:
    21. org.apache.kafka.common.serialization.StringDeserializer
    22. max-poll-records: 500
    23. listener:
    24. # 当每⼀条记录被消费者监听器(ListenerConsumer)处理之后提交
    25. # RECORD
    26. # 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
    27. # BATCH
    28. # 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间⼤于TIME时提交
    29. # TIME
    30. # 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量⼤于等于COUNT时提交
    31. # COUNT
    32. # TIME | COUNT 有⼀个条件满⾜时提交
    33. # COUNT_TIME
    34. # 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后, ⼿动调⽤Acknowledgment.acknowledge()后提交
    35. # MANUAL
    36. # ⼿动调⽤Acknowledgment.acknowledge()后⽴即提交,⼀般使⽤这种
    37. # MANUAL_IMMEDIATE
    38. ack-mode: MANUAL_IMMEDIATE
    39. # redis:
    40. # host: 172.16.253.21

    3.生产者搭建

    1. package com.wen.kafka.controller;
    2. import org.springframework.beans.factory.annotation.Autowired;
    3. import org.springframework.kafka.core.KafkaTemplate;
    4. import org.springframework.web.bind.annotation.RequestMapping;
    5. import org.springframework.web.bind.annotation.RestController;
    6. @RestController
    7. @RequestMapping("msg")
    8. public class ProducerController {
    9. @Autowired
    10. private KafkaTemplate kafkaTemplate;
    11. @RequestMapping("/send")
    12. public String sendMessage(){
    13. kafkaTemplate.send("test", "key", "msg2");
    14. return "Send Success";
    15. }
    16. }

    4.消费者搭建

    1. package com.wen.kafka.consumer;
    2. import org.apache.kafka.clients.consumer.ConsumerRecord;
    3. import org.springframework.kafka.annotation.KafkaListener;
    4. import org.springframework.stereotype.Component;
    5. @Component
    6. public class Consumer {
    7. @KafkaListener(topics = "test", groupId = "GroupOne")
    8. public void listenGroup(ConsumerRecord record){
    9. System.out.println(record);
    10. }
    11. }

  • 相关阅读:
    在线文本实体抽取能力,助力应用解析海量文本数据
    2023最新SSM计算机毕业设计选题大全(附源码+LW)之java校内考研吧hd450
    分析每一段的代码的代码及代码运行的结果
    Springboot乐动健身房管理系统6xl64计算机毕业设计-课程设计-期末作业-毕设程序代做
    如何配置Header Editor
    R语言dplyr中的Select函数变量列名
    新版首途影视视频网站源码/22套带后台版全开源+无加密源码(全新二开完整版)
    新能源汽车行业资讯-2022-9-15
    关于账号安全的一些思考
    Java的UI
  • 原文地址:https://blog.csdn.net/qq_58137891/article/details/134533875