• kafka消息队列简单使用


    下面是使用Spring Boot和Kafka实现消息队列的简单例子:

    1. 引入依赖

    在pom.xml中添加以下依赖:

    1. <dependency>
    2. <groupId>org.springframework.kafkagroupId>
    3. <artifactId>spring-kafkaartifactId>
    4. <version>2.7.5version>
    5. dependency>
    1. 配置Kafka

    在application.properties中添加Kafka的相关配置:

    1. spring.kafka.bootstrap-servers=localhost:9092
    2. spring.kafka.consumer.group-id=myGroup
    3. spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    4. spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
    5. spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    6. spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    1. 发送消息

    创建一个生产者类,使用KafkaTemplate发送消息:

    1. @Service
    2. public class KafkaProducerService {
    3. @Autowired
    4. private KafkaTemplate kafkaTemplate;
    5. public void sendMessage(String topic, String message) {
    6. kafkaTemplate.send(topic, message);
    7. }
    8. }
    1. 接收消息

    创建一个消费者类,使用@KafkaListener注解监听指定的主题,处理消息:

    1. @Service
    2. public class KafkaConsumerService {
    3. @KafkaListener(topics = "myTopic", groupId = "myGroup")
    4. public void onMessage(String message) {
    5. System.out.println("Received message: " + message);
    6. }
    7. }
    1. 测试

    在Controller中调用生产者发送消息,然后在控制台中可以看到消费者接收到的消息:

    1. @RestController
    2. public class KafkaController {
    3. @Autowired
    4. private KafkaProducerService kafkaProducerService;
    5. @GetMapping("/send")
    6. public String sendMessage() {
    7. kafkaProducerService.sendMessage("myTopic", "Hello, Kafka!");
    8. return "Message sent successfully";
    9. }
    10. }

    以上就是一个简单的使用Spring Boot和Kafka实现消息队列的例子

    分区

    1. 编写Kafka生产者代码,使用KafkaTemplate发送消息,并指定分区号。如下所示:
    1. @Autowired
    2. private KafkaTemplate<String, String> kafkaTemplate;
    3. public void sendMessage(String message, int partition) {
    4. kafkaTemplate.send("my-topic", partition, null, message);

    2.编写Kafka消费者代码,使用@KafkaListener注解监听指定的主题,并在方法参数中获取分区号。如下所示:

    1. @KafkaListener(topics = "my-topic", groupId = "my-group")
    2. public void listen(ConsumerRecord record, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
    3. System.out.println("Received message: " + record.value() + ", partition: " + partition);
  • 相关阅读:
    Java学习笔记——并发编程(三)
    (二)初识Vue
    Postman的简单使用
    【分析思路】测试数据分析思路
    Mysql用户管理
    简单对比一下 C 与 Go 两种语言
    使用高德开放平台显示指定的坐标点和线
    【Android TV 开发】-->一些优秀 TV 开发相关框架 & 文章
    点云学习笔记14——ModuleNotFoundError: No module named ‘rospy
    vue基于element树形控件实现上下拖拽
  • 原文地址:https://blog.csdn.net/qq_56921846/article/details/133378735