Apache Kafka是分布式发布 - 订阅消息系统。
官网:http://kafka.apache.org
下载地址:https://archive.apache.org/dist/kafka
Kafka 的架构包括以下组件:
1、生产者(Producer):发布消息;
2、服务代理(Broker):已发布的消息保存在一组服务器中,它们被称为代理(Broker)或 Kafka 集群;
3、消费者(Consumer):可以订阅一个或多个话题,并从 Broker接收的消息。
第1步:POM文件引入:
<dependency>
<groupId>org.springframework.kafkagroupId>
<artifactId>spring-kafkaartifactId>
<version>2.9.2version>
dependency>
第2步:设置配置参数,修改application.properties文件:
# kafka连接接地址
spring.kafka.bootstrap-servers= localhost:9092
# 生产者配置
# 序列化key的类
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
# 反序列化value的类
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
第3步:定义ProducerService类,通过调用KafkaTemplate类来发送消息;
@Service
public class ProducerService {
public static final String topic = "test";
@Autowired
private KafkaTemplate<?,String> kafkaTemplate;
//向topic发送消息
public void sendMessage(String message){
JSONObject jsonObject = new JSONObject();
jsonObject.put("data",message);
kafkaTemplate.send(topic,jsonObject.toJSONString());
}
}
第4步:定义KafkaController类,通过http接口可以发送消息。
@RequestMapping(value = "/sendMessage", method = RequestMethod.GET)
public String sendMessage(@RequestParam(value = "message") String message) {
producerService.sendMessage(message);
return message;
}
第1步:POM文件引入:
<dependency>
<groupId>org.springframework.kafkagroupId>
<artifactId>spring-kafkaartifactId>
<version>2.9.2version>
dependency>
第2步:设置配置参数,修改application.properties文件:
# kafka连接接地址
spring.kafka.bootstrap-servers=localhost:9092
# 消费者所属消息组
spring.kafka.consumer.group-id=testGroup
# 反序列化key的类
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 反序列化value的类
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
第3步:定义通过调用ConsumerService类,通过@KafkaListener来实现消息的监听。
@Service
@Slf4j
public class ConsumerService {
public static final String topic = "test";
@Autowired
private KafkaTemplate<?,String> kafkaTemplate;
//从topic接收消息
@KafkaListener(topics = "test", groupId = "testGroup", topicPartitions = {})
public void receiveMessage(String message) {
log.info("receive:" + message);
}
}
验证如下:Postman发送http get请求:
生产者consumer的工程控制输入log:
可见:kafka服务启动成功,生产者(producer)消费者(consumer)通信正常。
代码详见:
https://gitee.com/linghufeixia/springboot-simple
chapter6-2: 生产者(producer)项目
chapter6-3: 消费者(consumer)项目
教程列表:
springboot simple(0) springboot简介
springboot simple(1) springboot Helloworld
springboot simple(2) springboot Starter
springboot simple(3 )springboot Web开发
springboot simple(4)springboot 数据持久化
springboot simple (5) springboot Nosql
springboot simple (6) springboot mqtt
springboot simple (7) springboot thrift
springboot simple (8) springboot kafka
springboot simple (9) springboot jpa(Hibernate)
springboot simple (10) springboot protobuf
springboot simple (11) springboot protostuff
springboot simple (12) springboot RabbitMQ