• spring boot + kafka 简单配置


    kafka 安装及配置

    wget 有点慢,不想等的可以再网盘下载

    wget https://archive.apache.org/dist/kafka/3.1.0/kafka_2.12-3.1.0.tgz

    网盘提取

    链接: https://pan.baidu.com/s/1zxbZHht7u6F-hBNRBiDnFw?pwd=jb5d

    提取码: jb5d 

    得到安装包之后,解压

    tar -zxvf kafka_2.12-3.1.0.tgz

     在解压的目录下找到 config 文件夹

    vi server.properties

    编辑kafka的配置文件(我主要是配置了远程访问)

    1. ############################# Socket Server Settings #############################
    2. #远程访问配置
    3. listeners = PLAINTEXT://内网IP:9092
    4. advertised.listeners=PLAINTEXT://公网IP:9092

    启动zookeeper

    回到 bin 目录

    ./zookeeper-server-start.sh ../config/zookeeper.properties &

    启动 kafka

    ./kafka-server-start.sh server.properties &

    验证环节

    创建topic(bin 目录下执行)

    kafka 3 不需要指定zookeeper

    ./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test1

    创建生产者

    ./kafka-console-producer.sh --broker-list localhost:9092 --topic test

     输入要发送的消息

    创建消费者

    ./kafka-console-consumer.sh --bootstrap-server localhost:9092  --topic test --from-beginning

    spring boot 配置

    pom.xml 

    1. <dependency>
    2. <groupId>org.springframework.kafkagroupId>
    3. <artifactId>spring-kafkaartifactId>
    4. <version>2.8.3version>
    5. dependency>

    2.8.3 可以搭配 上面安装的kafka 版本,太低的驱动版本会有兼容问题,比如2.4.1。

    其他版本没有进行测试 

    bootstrap.yml

    1. spring:
    2. kafka:
    3. bootstrap-servers: 公网IP:9092
    4. producer: # producer 生产者
    5. retries: 0 # 重试次数
    6. acks: 1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选01、all/-1)
    7. batch-size: 16384 # 批量大小
    8. buffer-memory: 33554432 # 生产端缓冲区大小
    9. consumer: # consumer消费者
    10. group-id: javagroup # 默认的消费组ID
    11. enable-auto-commit: true # 是否自动提交offset
    12. auto-commit-interval: 100 # 提交offset延时(接收到消息后多久提交offset)

    生产者模拟代码

    1. @RestController
    2. public class KafkaProducer {
    3. @Resource
    4. private KafkaTemplate kafkaTemplate;
    5. @GetMapping("/kafka/test/{msg}")
    6. public void sendMessage(@PathVariable("msg") String msg) {
    7. kafkaTemplate.send("test", JSON.toJSONString(m));
    8. }
    9. }

    消费者模拟代码

    1. @Slf4j
    2. @Component
    3. public class KafkaConsumer {
    4. //不指定group,默认取yml里配置的
    5. @KafkaListener(topics = {"test"})
    6. public void onMessage1(ConsumerRecord consumerRecord) {
    7. Optional optional = Optional.ofNullable(consumerRecord.value());
    8. if (optional.isPresent()) {
    9. Object msg = optional.get();
    10. log.info("message:{}", msg);
    11. }
    12. }
    13. }

  • 相关阅读:
    Rust中FFI编程知识点整理总结
    Face AE调试及其合理性分析
    改进YOLOv5系列:结合ShuffleNet V2主干网络,高效CNN架构设计的实用指南
    软件测试工程师如何对算法做测试?
    C语言基础篇 —— 4.1 管理内存:栈(stack)、堆(heap)、数据区(.data)
    vscode离线安装ssh插件(本机和服务器都离线)
    自定义通用分页标签01
    WPF 图片头像自由剪切器实时截图细节放大器
    开发者,10分钟学会Tomcat ?
    串口协议、I2C协议、SPI协议总结
  • 原文地址:https://blog.csdn.net/Amy126/article/details/126008395