wget 有点慢,不想等的可以再网盘下载
wget https://archive.apache.org/dist/kafka/3.1.0/kafka_2.12-3.1.0.tgz
网盘提取
链接: https://pan.baidu.com/s/1zxbZHht7u6F-hBNRBiDnFw?pwd=jb5d
提取码: jb5d
得到安装包之后,解压
tar -zxvf kafka_2.12-3.1.0.tgz
在解压的目录下找到 config 文件夹
vi server.properties
编辑kafka的配置文件(我主要是配置了远程访问)
- ############################# Socket Server Settings #############################
- #远程访问配置
- listeners = PLAINTEXT://内网IP:9092
- advertised.listeners=PLAINTEXT://公网IP:9092
启动zookeeper
回到 bin 目录
./zookeeper-server-start.sh ../config/zookeeper.properties &
启动 kafka
./kafka-server-start.sh server.properties &
创建topic(bin 目录下执行)
kafka 3 不需要指定zookeeper
./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test1
创建生产者
./kafka-console-producer.sh --broker-list localhost:9092 --topic test
输入要发送的消息

创建消费者
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
pom.xml
- <dependency>
- <groupId>org.springframework.kafkagroupId>
- <artifactId>spring-kafkaartifactId>
- <version>2.8.3version>
- dependency>
2.8.3 可以搭配 上面安装的kafka 版本,太低的驱动版本会有兼容问题,比如2.4.1。
其他版本没有进行测试
bootstrap.yml
- spring:
- kafka:
- bootstrap-servers: 公网IP:9092
- producer: # producer 生产者
- retries: 0 # 重试次数
- acks: 1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
- batch-size: 16384 # 批量大小
- buffer-memory: 33554432 # 生产端缓冲区大小
-
- consumer: # consumer消费者
- group-id: javagroup # 默认的消费组ID
- enable-auto-commit: true # 是否自动提交offset
- auto-commit-interval: 100 # 提交offset延时(接收到消息后多久提交offset)
生产者模拟代码
- @RestController
- public class KafkaProducer {
- @Resource
- private KafkaTemplate
kafkaTemplate; -
- @GetMapping("/kafka/test/{msg}")
- public void sendMessage(@PathVariable("msg") String msg) {
- kafkaTemplate.send("test", JSON.toJSONString(m));
- }
- }
消费者模拟代码
- @Slf4j
- @Component
- public class KafkaConsumer {
-
- //不指定group,默认取yml里配置的
- @KafkaListener(topics = {"test"})
- public void onMessage1(ConsumerRecord, ?> consumerRecord) {
- Optional> optional = Optional.ofNullable(consumerRecord.value());
- if (optional.isPresent()) {
- Object msg = optional.get();
- log.info("message:{}", msg);
- }
- }
- }