• kafka入门教程,介绍全面


    1、官网下载最新版本的kafka,里面已经集成zookeeper。直接解压到D盘

    2、配置文件修改,config目录下面的zookeeper.properties.   设置zookeeper数据目录

    dataDir=D:/kafka_2.12-3.6.0/tmp/zookeeper

    fe331be6fb6a4e7898696c2c0e768cb3.png

    3、修改kafka的配置文件server.properties.   主要修改内容如下:

    zookeeper.connect=localhost:2181

    log.dirs=D:\\kafka_2.12-3.6.0\\logs

    listeners=PLAINTEXT://localhost:9092

    其他默认即可。

    4、修改完成后进入bin目录:启动zookeeper和kafka,命令如下

    zookeeper-server-start.bat ../../config/zookeeper.properties

    kafka-server-start.bat ../../config/server.properties

    5、命令行创建topic,命令如下:

    kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic hello

     

    6、创建生产者和消费者,测试。生产者输入消息,消费者就会收到相应的消息了 

    kafka-console-producer.bat --broker-list localhost:9092 --topic hello

    667368c7b8434bb6a9e522a1bb94f624.png

    kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic hello--from-beginning

    381fe833b5a1440e8629f2e1638458e2.png

    7、创建springboot工程,测试

    引入依赖:


    org.springframework.boot
    spring-boot-starter-web



    org.springframework.boot
    spring-boot-starter-test
    test



    org.springframework.kafka
    spring-kafka

     

    8、yml文件配置kafka

    spring:
    kafka:
    bootstrap-servers: localhost:9092
    producer:
    acks: 1
    retries: 3
    batch-size: 16384
    properties:
    linger:
    ms: 0
    buffer-memory: 33554432
    key-serializer: org.apache.kafka.common.serialization.StringSerializer
    value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
    group-id: helloGroup
    enable-auto-commit: false
    auto-commit-interval: 1000
    auto-offset-reset: latest
    properties:
    request:
    timeout:
    ms: 18000
    session:
    timeout:
    ms: 12000
    key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

     

    9、使用springboot    KafkaTemplate发送消息

    @RequestMapping(value = "/sendMessage", method = RequestMethod.GET)
    public String sendMessage(String message) {
    kafkaTemplate.send("hello", message);

    return "发送成功~";
    }

    10、消息消费,


    @KafkaListener(topics = "hello")
    public void receiveMessage(ConsumerRecord record) {
    String topic = record.topic();
    long offset = record.offset();
    int partition = record.partition();
    String message = record.value();

    System.out.println("topic = " + topic);
    System.out.println("offset = " + offset);
    System.out.println("partition = " + partition);
    System.out.println("message = " + message);
    }

     

     

     

     

  • 相关阅读:
    [附源码]计算机毕业设计中小学课后延时服务管理系统Springboot程序
    flink教程(2)-source- sink
    Spring系列15:Environment抽象
    查找的三种常用算法
    R语言绘制环状柱状堆积图+分组+显著性
    Ceph
    JupyterNotebook的快捷键
    Error: impossible constraint in ‘asm‘
    Part2_扩展MATSIM_Subpart3_个人汽车交通_第13章 停车
    Coursera自然语言处理专项课程04:Natural Language Processing with Attention Models笔记 Week02
  • 原文地址:https://blog.csdn.net/u013558123/article/details/134155237