gitee代码连接地址:
https://gitee.com/liushanshan126/kafka-test
百度网盘链接地址:
链接:https://pan.baidu.com/s/1vsFCu6vQTuktq6QTnh9dlg
提取码:4h45

缓存/消峰、解耦和异步通信。




总结:一个100T的数据放不进去一个服务器里面的topic中,所以引入分区的概念,将一个topic变成多个分区,存入不同的服务器中。
partition的认知,参考知乎回答(总的来说就是一个topic可以分成多个分区partition,一个分区可以被多个consumer监听,当写入数据到topic的时候, 会根据一定的规则分发到分区里面去):
https://zhuanlan.zhihu.com/p/371886710#:~:text=Kafka%20%E4%B8%AD%20Topic%20%E8%A2%AB%E5%88%86%E6%88%90%E5%A4%9A%E4%B8%AA%20Partition%20%E5%88%86%E5%8C%BA%E3%80%82%20Topic%20%E6%98%AF%E4%B8%80%E4%B8%AA,%EF%BC%8C%E6%8E%8C%E6%8F%A1%E7%9D%80%E4%B8%80%E4%B8%AA%20Topic%20%E7%9A%84%E9%83%A8%E5%88%86%E6%95%B0%E6%8D%AE%E3%80%82%20%E6%AF%8F%E4%B8%AA%20Partition%20%E9%83%BD%E6%98%AF%E4%B8%80%E4%B8%AA%E5%8D%95%E7%8B%AC%E7%9A%84%20log%20%E6%96%87%E4%BB%B6%EF%BC%8C%E6%AF%8F%E6%9D%A1%E8%AE%B0%E5%BD%95%E9%83%BD%E4%BB%A5%E8%BF%BD%E5%8A%A0%E7%9A%84%E5%BD%A2%E5%BC%8F%E5%86%99%E5%85%A5%E3%80%82


cd

bin/kafka-server-stop.sh
bin/kafka-topics.sh


增加topic:bin/kafka-topics.sh --bootstrap-server 192.168.75.128:9092 --create --partitions 1 --replication-factor 3 --topic siv


bin/kafka-console-producer.sh --bootstrap-server 192.168.75.128:9092 --topic first


bin/kafka-console-consumer.sh --bootstrap-server 192.168.75.128:9092 --topic topic1 --from-beginning




listeners=PLAINTEXT://192.168.75.128:9092释放出来。这里的ip地址是当前服务器的ip)







代码中实现:






注意:


生产者有 序列化、拦截器
消费者有 反序列化、拦截器






bin/kafka-server-start.sh -daemon config/kraft/server.properties

bin/kafka-server-stop.sh

pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0modelVersion>
<parent>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-parentartifactId>
<version>2.6.1version>
<relativePath/>
parent>
<groupId>org.examplegroupId>
<artifactId>kafka-testartifactId>
<version>1.0-SNAPSHOTversion>
<properties>
<maven.compiler.source>8maven.compiler.source>
<maven.compiler.target>8maven.compiler.target>
properties>
<dependencies>
<dependency>
<groupId>org.apache.kafkagroupId>
<artifactId>kafka-clientsartifactId>
<version>3.0.0version>
dependency>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-webartifactId>
dependency>
<dependency>
<groupId>org.springframework.kafkagroupId>
<artifactId>spring-kafkaartifactId>
dependency>
<dependency>
<groupId>org.projectlombokgroupId>
<artifactId>lombokartifactId>
<optional>trueoptional>
dependency>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-testartifactId>
<scope>testscope>
dependency>
<dependency>
<groupId>org.springframework.kafkagroupId>
<artifactId>spring-kafka-testartifactId>
<scope>testscope>
dependency>
dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-maven-pluginartifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombokgroupId>
<artifactId>lombokartifactId>
exclude>
excludes>
configuration>
plugin>
plugins>
build>
project>
application.properties
# 应用名称
spring.application.name=kafka-test
# 指定 kafka 的地址
spring.kafka.bootstrapservers=192.168.75.128:9092,192.168.75.129:9092,192.168.75.130:9092
#指定 key 和 value 的序列化器
spring.kafka.producer.keyserializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.valueserializer=org.apache.kafka.common.serialization.StringSerializer
controller
package com.bear.springbootkafka.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author LiuShanshan
* @version V1.0
* @Description
*/
@RestController
public class ProducerController {
@Autowired
KafkaTemplate<String, String> kafka;
@RequestMapping("/kafkaProducterTest")
public String data(String msg) {
kafka.send("first", msg);
return "ok";
}
}

实例化对象放入spring容器中,这里的@KafkaListener一直监听kafka中是否有新的数据产生。

监听了topic为first和siv的测试结果:
