- server:
- port: 8080
- spring:
- kafka:
- bootstrap-servers: 192.168.79.104:9092
- producer: # 生产者
- retries: 3 # 设置大于 0 的值,则客户端会将发送失败的记录重新发送
- batch-size: 16384
- buffer-memory: 33554432
- acks: 1
- # 指定消息key和消息体的编解码方式
- key-serializer: org.apache.kafka.common.serialization.StringSerializer
- value-serializer: org.apache.kafka.common.serialization.StringSerializer
- consumer:
- group-id: default-group
- enable-auto-commit: false
- auto-offset-reset: earliest
- key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- max-poll-records: 500
- listener:
- # 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
- # RECORD
- # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
- # BATCH
- # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
- # TIME
- # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
- # COUNT
- # TIME | COUNT 有一个条件满足时提交
- # COUNT_TIME
- # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
- # MANUAL
- # 手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种
- # MANUAL_IMMEDIATE
- ack-mode: MANUAL_IMMEDIATE
- redis:
- host: 192.168.79.104
- port: 6379
- password: 123321
- lettuce:
- pool:
- max-active: 10
- max-idle: 10
- min-idle: 1
- time-between-eviction-runs: 10s
-
- @Configuration
- public class KafkaProducerConfig {
-
- @Value("${spring.kafka.bootstrap-servers}")
- private String bootstrapServers;
-
- @Bean
- public Map<String, Object> producerConfigs() {
- Map<String, Object> props = new HashMap<>();
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- return props;
- }
-
- @Bean
- public ProducerFactory<String, String> producerFactory() {
- return new DefaultKafkaProducerFactory<>(producerConfigs());
- }
-
- @Bean
- public KafkaTemplate<String, String> kafkaTemplate() {
- return new KafkaTemplate<>(producerFactory());
- }
-
- }
- @RestController
- public class KafkaController {
-
- @Autowired
- private KafkaTemplate
kafkaTemplate; -
- @PostMapping("/send")
- public void sendMessage(@RequestBody String message) {
- kafkaTemplate.send("my-topic", message);
- }
-
- }
- @Configuration
- @EnableKafka
- public class KafkaConsumerConfig {
-
- @Value("${spring.kafka.bootstrap-servers}")
- private String bootstrapServers;
-
- @Value("${spring.kafka.consumer.group-id}")
- private String groupId;
-
- @Bean
- public Map<String, Object> consumerConfigs() {
- Map<String, Object> props = new HashMap<>();
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
- props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- return props;
- }
-
- @Bean
- public ConsumerFactory<String, String> consumerFactory() {
- return new DefaultKafkaConsumerFactory<>(consumerConfigs());
- }
-
- @Bean
- public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
- ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
- factory.setConsumerFactory(consumerFactory());
- return factory;
- }
-
- }
- @Service
- public class KafkaConsumer {
-
- @KafkaListener(topics = "my-topic", groupId = "default-group")
- public void consume(String message) {
- System.out.println("Received message: " + message);
- }
-
- }
在上面的代码中,我们使用 @KafkaListener 注解声明了一个消费者方法,用于接收从 my-topic 主题中读取的消息。在这里,我们将消费者组 ID 设置为default-group。
现在,我们已经完成了 Kafka 生产者和消费者的设置。我们可以使用 mvn spring-boot:run 命令启动应用程序,并使用 curl 命令发送 POST 请求到 http://localhost:8080/send 端点,以将消息发送到 Kafka。然后,我们可以在控制台上查看消费者接收到的消息。
这就是使用 Spring Boot 和 Kafka 的基本设置。我们可以根据需要进行更改和扩展,以满足特定的需求。