【黑马程序员SpringBoot2全套视频教程,springboot零基础到项目实战(spring boot2完整版)】
之前我们已经完成了Kafka 的安装与测试

没啥问题,接下来就是使用SpringBoot 去整合了
【导入坐标】
<dependency>
<groupId>org.springframework.kafkagroupId>
<artifactId>spring-kafkaartifactId>
dependency>

OK, 记得刷一下
【配置】
kafka:
bootstrap-servers: localhost:9092
spring 的下一级

OK
整合
先把之前的bean 都注掉

基于kafka 的实现类
package com.dingjiaxiong.service.impl.kafka;
import com.dingjiaxiong.service.MessageService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
/**
* ClassName: MessageServiceKafkaImpl
* date: 2022/10/23 18:54
*
* @author DingJiaxiong
*/
@Service
public class MessageServiceKafkaImpl implements MessageService {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Override
public void sendMessage(String id) {
System.out.println("待发送短信的订单已纳入处理队列(kafka),id " + id);
kafkaTemplate.send("dingjiaxiong",id);
}
@Override
public String doMessage() {
return null;
}
}
现在就可以完成消息的发送了
直接启动服务器看看

使用postman 发送消息

可以看到,消息已经进去了,而且控制台输出了不少信息
看看我们之前开启的消费者客户端

没错,已经消费了,现在就相当于是我通过编程的形式模拟了一个发送消息的客户端

现在做一个listener ,来消费消息
package com.dingjiaxiong.service.impl.kafka.listener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
* ClassName: MessageListener
* date: 2022/10/23 19:02
*
* @author DingJiaxiong
*/
@Component
public class MessageListener {
@KafkaListener(topics = "dingjiaxiong")
public void onMessage(ConsumerRecord<String, String> record) {
System.out.println("已完成短信发送业务(kafka),id:" + record.value());
}
}
还要加个组id 的配置,不然启动会报错
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: order

OK,直接重启服务器测试

没毛病,自动消费了

cmd 客户端也监听到了
【大致整合就是这样了】
回顾一下




【以上就是SpringBoot 整合各种消息中间件的全部内容了,当然只是皮毛,还需要加深学习】
【我们一定能够成为我们想要成为的人!!】