• Kafka - 04 Java客户端实现消息发送和订阅


    1. Kafka测试命令行操作

    1. 主题命令行操作

    在上一节中我们安装了Kafka单机环境和集群环境,这一节来测试下Linux环境安装Kafka后的命令行操作。

    我们之前在用Windows环境安装Kafka Kafka应用场景|基础架构|Windows安装|命令行操作 和命令行操作时,讲到主题命令行参数如下:

    在这里插入图片描述

    1. 创建主题

    [root@localhost kafka-01]# bin/kafka-topics.sh --bootstrape-server localhost:9092 --create --topic test1 --partitions 3 --replication-factor 3
    
    • 1

    在这里插入图片描述

    注意:这里之所以无法识别 --bootstrape-server 参数是因为kafka的版本低于2.2,我安装的kafka版本为kafka_2.12-2.2.1.tgz,应该使用 --zookeeper localhost:2181参数:

    [root@localhost kafka-01]# bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --partitions 3 --replication-factor 3
    
    • 1

    在这里插入图片描述

    –zookeeper:指定了Kafka所连接的Zookeeper服务地址
    –topic:指定了所要创建主题的名称
    –partitions:指定了分区个数
    –replication-factor:指定了副本因子
    –create:创建主题的动作指令

    2. 查看主题详情

    [root@localhost kafka-01]# bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test
    
    • 1

    在这里插入图片描述

    3. 查看所有主题

    [root@localhost kafka-01]# bin/kafka-topics.sh --zookeeper localhost:2181 --list
    
    • 1

    在这里插入图片描述

    2. 消费者命令行操作

    在这里插入图片描述

    [root@localhost kafka-01]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
    
    • 1

    –bootstrap-server 指定了连接Kafka集群的地址
    –topic 指定了消费端订阅的主题

    在这里插入图片描述

    3. 生产者命令行操作

    在这里插入图片描述

    [root@localhost kafka-01]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
    
    • 1

    –broker-list 指定了连接的Kafka集群的地址
    –topic 指定了发送消息时的主题

    生产者发送消息:

    在这里插入图片描述

    消费者接收消息:

    在这里插入图片描述

    2. Java程序调用Kafka

    ① 创建kafka项目并引入依赖:

    
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0modelVersion>
        <parent>
            <groupId>org.springframework.bootgroupId>
            <artifactId>spring-boot-starter-parentartifactId>
            <version>2.2.4.RELEASEversion>
            <relativePath/>
        parent>
    
        <groupId>com.hhgroupId>
        <artifactId>kafkaartifactId>
        <version>1.0-SNAPSHOTversion>
    
        <properties>
            <maven.compiler.source>8maven.compiler.source>
            <maven.compiler.target>8maven.compiler.target>
        properties>
    
        <dependencies>
            <dependency>
                <groupId>org.apache.kafkagroupId>
                <artifactId>kafka-clientsartifactId>
                <version>3.0.0version>
            dependency>
    
            <dependency>
                <groupId>org.projectlombokgroupId>
                <artifactId>lombokartifactId>
                <optional>trueoptional>
            dependency>
    
            <dependency>
                <groupId>cn.hutoolgroupId>
                <artifactId>hutool-allartifactId>
                <version>5.7.20version>
            dependency>
    
            <dependency>
                <groupId>com.alibabagroupId>
                <artifactId>fastjsonartifactId>
                <version>1.2.83version>
            dependency>
    
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-webartifactId>
            dependency>
    
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-testartifactId>
                <scope>testscope>
            dependency>
        dependencies>
    
    project>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59

    ② kafka生产者发送消息:

    public class CustomProducer01 {
        public static void main(String[] args) {
            // kafka生产者属性配置
            Properties properties = new Properties();
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.38.22:9092");
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
    
            // kafka生产者发送消息,默认是异步发送方式
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>("test", "hello,kafka");
            try{
                // 发送消息
                kafkaProducer.send(producerRecord);
            }catch (Exception e){
                e.printStackTrace();
            }
            // 关闭资源
            kafkaProducer.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    ③ 查看kafka消费者有没有消费消息:

    在这里插入图片描述

    ④ kafka消费者消费消息:

    查看kafka安装目录config/consumer.properties文件中的group.id:

    在这里插入图片描述

    public class CustomConsumer01 {
        public static void main(String[] args) {
            Properties properties = new Properties();
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.38.22:9092");
            properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test-consumer-group");
    
            KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
            ArrayList<String> topics = new ArrayList<>();
            topics.add("test");
            consumer.subscribe(topics);
    
            while (true){
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(record.value());
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    ⑤ 启动消费者程序后,再启动生产者程序发送消息,查看消费者控制台:

    在这里插入图片描述

  • 相关阅读:
    Python的Django部署uwsgi后自签名实现的HTTPS
    快来看,数据分析BI软件居然也能完成基金变迁大数据分析?
    商业银行如何构建一体化监控
    【Git】安装和常用命令的使用与讲解及项目搭建和团队开发的出现的问题并且给予解决
    大数据下一代变革之必研究数据湖技术Hudi原理实战双管齐下-后续
    学透阿里P8总结最新Java面试宝典,大厂offer任你挑选
    分享一个Redis自带的压测工具:redis-benchmark
    用css画一个半圆弧(以小程序为例)
    【Vue2.0学习】— 列表排序(四十一)
    新的iLeakage攻击从Apple Safari窃取电子邮件和密码
  • 原文地址:https://blog.csdn.net/qq_42764468/article/details/128051988