• kafka消息队列简单使用


    下面是使用Spring Boot和Kafka实现消息队列的简单例子:

    1. 引入依赖

    在pom.xml中添加以下依赖:

    1. <dependency>
    2. <groupId>org.springframework.kafkagroupId>
    3. <artifactId>spring-kafkaartifactId>
    4. <version>2.7.5version>
    5. dependency>
    1. 配置Kafka

    在application.properties中添加Kafka的相关配置:

    1. spring.kafka.bootstrap-servers=localhost:9092
    2. spring.kafka.consumer.group-id=myGroup
    3. spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    4. spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
    5. spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    6. spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    1. 发送消息

    创建一个生产者类,使用KafkaTemplate发送消息:

    1. @Service
    2. public class KafkaProducerService {
    3. @Autowired
    4. private KafkaTemplate kafkaTemplate;
    5. public void sendMessage(String topic, String message) {
    6. kafkaTemplate.send(topic, message);
    7. }
    8. }
    1. 接收消息

    创建一个消费者类,使用@KafkaListener注解监听指定的主题,处理消息:

    1. @Service
    2. public class KafkaConsumerService {
    3. @KafkaListener(topics = "myTopic", groupId = "myGroup")
    4. public void onMessage(String message) {
    5. System.out.println("Received message: " + message);
    6. }
    7. }
    1. 测试

    在Controller中调用生产者发送消息,然后在控制台中可以看到消费者接收到的消息:

    1. @RestController
    2. public class KafkaController {
    3. @Autowired
    4. private KafkaProducerService kafkaProducerService;
    5. @GetMapping("/send")
    6. public String sendMessage() {
    7. kafkaProducerService.sendMessage("myTopic", "Hello, Kafka!");
    8. return "Message sent successfully";
    9. }
    10. }

    以上就是一个简单的使用Spring Boot和Kafka实现消息队列的例子

    分区

    1. 编写Kafka生产者代码,使用KafkaTemplate发送消息,并指定分区号。如下所示:
    1. @Autowired
    2. private KafkaTemplate<String, String> kafkaTemplate;
    3. public void sendMessage(String message, int partition) {
    4. kafkaTemplate.send("my-topic", partition, null, message);

    2.编写Kafka消费者代码,使用@KafkaListener注解监听指定的主题,并在方法参数中获取分区号。如下所示:

    1. @KafkaListener(topics = "my-topic", groupId = "my-group")
    2. public void listen(ConsumerRecord record, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
    3. System.out.println("Received message: " + record.value() + ", partition: " + partition);
  • 相关阅读:
    使用HTML制作静态宠物网站——蓝色版爱宠之家(HTML+CSS)
    LLM 系列 | 21 : Code Llama实战(上篇) : 模型简介与评测
    深入分析Spring的IoC容器:从底层源码探索
    神经网络是一种算法吗,神经网络包括哪些算法
    MAE
    关于原型交互设计文档的一些建议
    【IMX6ULL学习笔记之Linux系统移植03】——Linux系统移植
    记一下pyinstaller打包的坑
    痞子衡嵌入式:借助i.MXRT10xx系列INIT_VTOR功能可以缩短程序热重启时间
    Variations-of-SFANet-for-Crowd-Counting记录
  • 原文地址:https://blog.csdn.net/qq_56921846/article/details/133378735