1、kafka部署:
(1)先创建一个网络:
docker network create app-tier --driver bridge
(2)安装zookeeper,kafka依赖zookeeper所以需要先安装zookeeper:
docker run -d --name zookeeper-server --network app-tier -p 2181:2181 -e ALLOW_ANONYMOUS_LOGIN=yes bitnami/zookeeper:latest
(3)安装Kafka:
docker run -d --name kafka-server --network app-tier -p 9092:9092 -e ALLOW_PLAINTEXT_LISTENER=yes -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-server:2181 -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.202.211:9092 bitnami/kafka:latest
参数解释:
ALLOW_PLAINTEXT_LISTENER
KAFKA_CFG_ZOOKEEPER_CONNECT
KAFKA_CFG_ADVERTISED_LISTENERS
(4)部署kafka图形化管理工具,选择kafka-map或kafka-manager:
kafka-map(推荐)
docker run -d --name kafka-map --network app-tier -p 9090:8080 -v /root/kafka-map/data:/usr/local/kafka-map/data -e DEFAULT_USERNAME=admin -e DEFAULT_PASSWORD=admin --restart always dushixiang/kafka-map:latest
访问地址:192.168.202.211:9090
账号密码:admin/admin
kafka-manager(不好用,不推荐)
docker run --name kafka-manager -d --network app-tier -p 9091:9000 -e ZK_HOSTS="zookeeper-server:2181" sheepkiller/kafka-manager
访问地址:192.168.202.211:9091
2、springboot集成kafka:
(1)pom文件里引入kafka依赖
<dependency>
<groupId>org.springframework.kafkagroupId>
<artifactId>spring-kafkaartifactId>
<version>2.8.1version>
dependency>
(2)application.yml配置文件:
spring:
kafka:
bootstrap-servers: 192.168.202.211:9092
consumer:
group-id: my-group-id
auto-offset-reset: earliest
(3)在你的项目里新建文件结构如下:
biz/src/java/com/heurd
|______BizApplication.java
|______biz
|_______|____controller
|_______|______|____KafkaController.java
|_______|____kafka
|_______|______|____producer
|_______|______|________|____KafkaProducer.java
|_______|______|____consumer
|_______|______|________|____KafkaConsumer.java
生产者,KafkaProducer.java文件内容如下:
package com.heurd.intellidigital.service.modular.data.kafka.producer;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String topicName, String message) {
System.out.printf("topicName: %s, message: %s%n", topicName, message);
kafkaTemplate.send(topicName, message);
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
消费者,KafkaConsumer.java文件内容如下:
package com.heurd.intellidigital.service.modular.data.kafka.consumer;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaConsumer {
@KafkaListener(topics = "my-topic", groupId = "my-group-id")
public void listen(String message) {
System.out.println("Received message: " + message);
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
controller文件内容如下:
package com.heurd.intellidigital.service.modular.data.controller;
import com.heurd.intellidigital.service.modular.data.kafka.consumer.KafkaConsumer;
import com.heurd.intellidigital.service.modular.data.kafka.producer.KafkaProducer;
import com.heurd.intellimes.core.annotion.BusinessLog;
import com.heurd.intellimes.core.enums.LogAnnotionOpTypeEnum;
import com.heurd.intellimes.core.pojo.response.ResponseData;
import com.heurd.intellimes.core.pojo.response.SuccessResponseData;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
@RestController
@RequestMapping("/kafka")
public class KafkaController {
@Resource
private KafkaProducer kafkaProducer;
@Resource
private KafkaConsumer kafkaConsumer;
@ApiOperation("kafka测试")
@GetMapping(value = "/test")
public ResponseData<Object> kafka() {
String message = "Hello, Kafka!";
kafkaProducer.sendMessage("my-topic", message);
return new SuccessResponseData<>(message);
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
(4)调用 /kafka/test 接口,打印结果:
生产者:topicName: my-topic, message: Hello, Kafka!
消费者:Received message: Hello, Kafka!