• SpringBoot配置kafka


    1. server:
    2. port: 8080
    3. spring:
    4. kafka:
    5. bootstrap-servers: 192.168.79.104:9092
    6. producer: # 生产者
    7. retries: 3 # 设置大于 0 的值,则客户端会将发送失败的记录重新发送
    8. batch-size: 16384
    9. buffer-memory: 33554432
    10. acks: 1
    11. # 指定消息key和消息体的编解码方式
    12. key-serializer: org.apache.kafka.common.serialization.StringSerializer
    13. value-serializer: org.apache.kafka.common.serialization.StringSerializer
    14. consumer:
    15. group-id: default-group
    16. enable-auto-commit: false
    17. auto-offset-reset: earliest
    18. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    19. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    20. max-poll-records: 500
    21. listener:
    22. # 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
    23. # RECORD
    24. # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
    25. # BATCH
    26. # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
    27. # TIME
    28. # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
    29. # COUNT
    30. # TIME | COUNT 有一个条件满足时提交
    31. # COUNT_TIME
    32. # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
    33. # MANUAL
    34. # 手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种
    35. # MANUAL_IMMEDIATE
    36. ack-mode: MANUAL_IMMEDIATE
    37. redis:
    38. host: 192.168.79.104
    39. port: 6379
    40. password: 123321
    41. lettuce:
    42. pool:
    43. max-active: 10
    44. max-idle: 10
    45. min-idle: 1
    46. time-between-eviction-runs: 10s
    1. @Configuration
    2. public class KafkaProducerConfig {
    3. @Value("${spring.kafka.bootstrap-servers}")
    4. private String bootstrapServers;
    5. @Bean
    6. public Map<String, Object> producerConfigs() {
    7. Map<String, Object> props = new HashMap<>();
    8. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    9. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    10. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    11. return props;
    12. }
    13. @Bean
    14. public ProducerFactory<String, String> producerFactory() {
    15. return new DefaultKafkaProducerFactory<>(producerConfigs());
    16. }
    17. @Bean
    18. public KafkaTemplate<String, String> kafkaTemplate() {
    19. return new KafkaTemplate<>(producerFactory());
    20. }
    21. }

     

    1. @RestController
    2. public class KafkaController {
    3. @Autowired
    4. private KafkaTemplate kafkaTemplate;
    5. @PostMapping("/send")
    6. public void sendMessage(@RequestBody String message) {
    7. kafkaTemplate.send("my-topic", message);
    8. }
    9. }

    1. @Configuration
    2. @EnableKafka
    3. public class KafkaConsumerConfig {
    4. @Value("${spring.kafka.bootstrap-servers}")
    5. private String bootstrapServers;
    6. @Value("${spring.kafka.consumer.group-id}")
    7. private String groupId;
    8. @Bean
    9. public Map<String, Object> consumerConfigs() {
    10. Map<String, Object> props = new HashMap<>();
    11. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    12. props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    13. props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    14. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    15. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    16. return props;
    17. }
    18. @Bean
    19. public ConsumerFactory<String, String> consumerFactory() {
    20. return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    21. }
    22. @Bean
    23. public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    24. ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    25. factory.setConsumerFactory(consumerFactory());
    26. return factory;
    27. }
    28. }
    1. @Service
    2. public class KafkaConsumer {
    3. @KafkaListener(topics = "my-topic", groupId = "default-group")
    4. public void consume(String message) {
    5. System.out.println("Received message: " + message);
    6. }
    7. }

     在上面的代码中,我们使用 @KafkaListener 注解声明了一个消费者方法,用于接收从 my-topic 主题中读取的消息。在这里,我们将消费者组 ID 设置为default-group。
    现在,我们已经完成了 Kafka 生产者和消费者的设置。我们可以使用 mvn spring-boot:run 命令启动应用程序,并使用 curl 命令发送 POST 请求到 http://localhost:8080/send 端点,以将消息发送到 Kafka。然后,我们可以在控制台上查看消费者接收到的消息。
    这就是使用 Spring Boot 和 Kafka 的基本设置。我们可以根据需要进行更改和扩展,以满足特定的需求。
     

     

  • 相关阅读:
    面试官:如何实现多级缓存?
    【Transformers】第 7 章:文本表示
    LeetCode301:删除无效的括号
    【日志技术——Logback日志框架】
    【机器学习13】生成对抗网络
    【C语言入门】ZZULIOJ 1031-1035
    UE5.4新功能 - MotionDesign上手简介
    基于ISO14229协议的安全访问系列_1
    c# 导出Excel文件的几种方式 简单试验
    【linux】awk常用处理文件命令示例-替换/格式化/条件选择/求和/正则/子字符串
  • 原文地址:https://blog.csdn.net/weixin_53150299/article/details/133620270