环境:windows、jdk1.8、springboot2
Apache KafkaApache Kafka: A Distributed Streaming Platform.
https://kafka.apache.org/
Kafka 是一种高性能、分布式的消息队列系统,最初由 LinkedIn 公司开发,并于2011年成为 Apache 顶级项目。它设计用于处理大规模的实时数据流,具有高吞吐量、低延迟、持久性等特点,被广泛应用于构建实时数据管道、日志收集、事件驱动架构等场景。
详细概述见Kafka概述:
topic:Kafka将消息分门别类,每一类的消息称之为一个主题(Topic 就是Rabbitmq中的queue)
producer:发布消息的对象称之为主题生产者(Kafka topic producer)
consumer:订阅消息并处理发布的消息的对象称之为主题消费者(consumers)
broker:已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker)。 消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。
Apache KafkaApache Kafka: A Distributed Streaming Platform.
https://kafka.apache.org/downloads 选择最新版就可以

解压下载的文件,修改 config 文件夹下的 zookeeper.properties

修改 config 文件夹下的 server.properties

当需要外网访问时要配置advertised.listeners(比如连云服务器的kafka)
advertised.listeners=PLAINTEXT://xxx.xxx.xxx.xxx:9092

Zookeeper 在 Kafka 中充当了分布式协调服务的角色,帮助 Kafka 实现了集群管理、元数据存储、故障恢复、领导者选举等功能,是 Kafka 高可用性、可靠性和分布式特性的重要支撑。
kafka_2.13-3.7.0\bin\windows文件夹中输入命令:
zookeeper-server-start.bat ../../config/zookeeper.properties
可以本地访问看一下:http://localhost:2181/
kafka_2.13-3.7.0\bin\windows文件夹中输入命令:
kafka-server-start.sh ../../config/server.properties
访问路径: http://localhost:9092/
两个脚本放到Kafka的目录(kafka_2.13-3.7.0)中
cd bin\windows
zookeeper-server-start.bat ../../config/zookeeper.properties
cd bin\windows
kafka-server-start.bat ../../config/server.properties
(1)添加pom依赖
-
org.springframework.boot -
spring-boot-starter-parent -
2.2.8.RELEASE -
1.2.58 -
-
org.springframework.boot -
spring-boot-starter-web -
-
-
-
org.springframework.kafka -
spring-kafka -
-
-
com.alibaba -
fastjson -
${fastjson.version} -
-
-
org.springframework.boot -
spring-boot-starter-test -
(2)配置类application.yml
生产者:
- spring:
- kafka:
- bootstrap-servers: xxx.xxx.xxx.xxx:9092
- producer:
- retries: 0
- key-serializer: org.apache.kafka.common.serialization.StringSerializer
- value-serializer: org.apache.kafka.common.serialization.StringSerializer
消费者:
- spring:
- kafka:
- bootstrap-servers: xxx.xxx.xxx.xxx:9092
- consumer:
- group-id: kafka-demo-kafka-group
- key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
(3)启动类
- import org.springframework.boot.SpringApplication;
- import org.springframework.boot.autoconfigure.SpringBootApplication;
-
- @SpringBootApplication
- public class KafkaApp {
- public static void main(String[] args) {
- SpringApplication.run(KafkaApp.class, args);
- }
- }
junit测试,新建消息发送方
- import org.junit.Test;
- import org.junit.runner.RunWith;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.test.context.SpringBootTest;
- import org.springframework.kafka.core.KafkaTemplate;
- import org.springframework.test.context.junit4.SpringRunner;
-
-
- @RunWith(SpringRunner.class)
- @SpringBootTest
- public class KafkaSendTest {
- @Autowired
- private KafkaTemplate
kafkaTemplate; //如果这里有红色波浪线,那是假错误 -
- @Test
- public void sendMsg(){
- String topic = "spring_test";
- kafkaTemplate.send(topic,"hello spring boot kafka!");
- System.out.println("发送成功.");
- while (true){ //保存加载ioc容器
-
- }
- }
- }
新建监听类:
-
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.springframework.kafka.annotation.KafkaListener;
- import org.springframework.stereotype.Component;
-
- @Component
- public class MyKafkaListener {
-
- // 以下两种方法都行
-
- // 指定监听的主题
- // @KafkaListener(topics = "spring_test")
- // public void receiveMsg(String message){
- // System.out.println("接收到的消息:"+message);
- // }
-
-
- @KafkaListener(topics = "spring_test")
- public void handleMessage(ConsumerRecord
record) { - System.out.println("接收到消息,偏移量为: " + record.offset() + " 消息为: " + record.value());
- }
- }