不要关闭Zookeeper和Kafka的dos窗口
我们再csmall项目中编写一个简单的Demo学习Kafka的使用
在csmall-cart-webapi模块中
添加依赖
<dependency>
<groupId>com.google.code.gsongroupId>
<artifactId>gsonartifactId>
dependency>
<dependency>
<groupId>org.springframework.kafkagroupId>
<artifactId>spring-kafkaartifactId>
dependency>
修改yml文件配置
spring:
kafka:
# 定义kafka的位置
bootstrap-servers: localhost:9092
# 话题的分组名称,是必须配置的
# 为了区分当前项目和其他项目使用的,防止了不同项目相同话题的干扰或错乱
# 本质是在话题名称前添加项目名称为前缀来防止的
consumer:
group-id: csmall
SpringBoot启动类中添加注解
@SpringBootApplication
@EnableDubbo
// 启动kafka的功能
@EnableKafka
// 为了测试kafka,我们可以周期性的发送消息到消息队列
// 使用SpringBoot自带的调度工具即可
@EnableScheduling
public class CsmallCartWebapiApplication {
public static void main(String[] args) {
SpringApplication.run(CsmallCartWebapiApplication.class, args);
}
}
下面我们就可以实现周期性的向kafka发送消息并接收的操作了
编写消息的发送
cart-webapi包下创建kafka包
包中创建Producer类来发送消息
// 我们需要周期性的向Kafka发送消息
// 需要将具备SpringBoot调度功能的类保存到Spring容器才行
@Component
public class Producer {
// 能够实现将消息发送到Kafka的对象
// 只要Kafka配置正确,这个对象会自动保存到Spring容器中,我们直接装配即可
// KafkaTemplate<[话题名称的类型],[传递消息的类型]>
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
// 每隔10秒向Kafka发送信息
int i=1;
// fixedRate是周期运行,单位毫秒 10000ms就是10秒
@Scheduled(fixedRate = 10000)
// 这个方法只要启动SpringBoot项目就会按上面设置的时间运行
public void sendMessage(){
// 实例化一个Cart类型对象,用于发送消息
Cart cart=new Cart();
cart.setId(i++);
cart.setCommodityCode("PC100");
cart.setPrice(RandomUtils.nextInt(100)+200);
cart.setCount(RandomUtils.nextInt(5)+1);
cart.setUserId("UU100");
// 将cart对象转换为json格式字符串
Gson gson=new Gson();
// 执行转换
String json=gson.toJson(cart);
System.out.println("本次发送的消息为:"+json);
// 执行发送
// send([话题名称],[发送的消息]),需要遵循上面kafkaTemplate声明的泛型类型
kafkaTemplate.send("myCart",json);
}
}
创建一个叫Consumer的类来接收消息
// 因为Kafka接收消息是自动的,所以这个类也必须交由Spring容器管理0
@Component
public class Consumer {
// SpringKafka框架接收Kafka中的消息使用监听机制
// SpringKafka框架提供一个监听器,专门负责关注指定的话题名称
// 只要该话题名称中有消息,会自动获取该消息,并调用下面方法
@KafkaListener(topics = "myCart")
// 上面注解和下面方法关联,方法的参数就是接收到的消息
public void received(ConsumerRecord<String,String> record){
// 方法参数类型必须是ConsumerRecord
// ConsumerRecord<[话题名称类型],[消息类型]>
// 获取消息内容
String json=record.value();
// 要想在java中使用,需要转换为java对象
Gson gson=new Gson();
// 将json转换为java对象,需要提供转换目标类型的反射
Cart cart=gson.fromJson(json,Cart.class);
System.out.println("接收到对象为:"+cart);
}
}