下面是使用Spring Boot和Kafka实现消息队列的简单例子:
在pom.xml中添加以下依赖:
- <dependency>
- <groupId>org.springframework.kafkagroupId>
- <artifactId>spring-kafkaartifactId>
- <version>2.7.5version>
- dependency>
在application.properties中添加Kafka的相关配置:
- spring.kafka.bootstrap-servers=localhost:9092
- spring.kafka.consumer.group-id=myGroup
- spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
- spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
- spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
- spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
创建一个生产者类,使用KafkaTemplate发送消息:
- @Service
- public class KafkaProducerService {
- @Autowired
- private KafkaTemplate
kafkaTemplate; -
- public void sendMessage(String topic, String message) {
- kafkaTemplate.send(topic, message);
- }
- }
创建一个消费者类,使用@KafkaListener注解监听指定的主题,处理消息:
- @Service
- public class KafkaConsumerService {
- @KafkaListener(topics = "myTopic", groupId = "myGroup")
- public void onMessage(String message) {
- System.out.println("Received message: " + message);
- }
- }
在Controller中调用生产者发送消息,然后在控制台中可以看到消费者接收到的消息:
- @RestController
- public class KafkaController {
- @Autowired
- private KafkaProducerService kafkaProducerService;
-
- @GetMapping("/send")
- public String sendMessage() {
- kafkaProducerService.sendMessage("myTopic", "Hello, Kafka!");
- return "Message sent successfully";
- }
- }
以上就是一个简单的使用Spring Boot和Kafka实现消息队列的例子
- @Autowired
- private KafkaTemplate<String, String> kafkaTemplate;
-
- public void sendMessage(String message, int partition) {
- kafkaTemplate.send("my-topic", partition, null, message);
2.编写Kafka消费者代码,使用@KafkaListener注解监听指定的主题,并在方法参数中获取分区号。如下所示:
- @KafkaListener(topics = "my-topic", groupId = "my-group")
- public void listen(ConsumerRecord
record, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) { - System.out.println("Received message: " + record.value() + ", partition: " + partition);