• Kafka 生产者和消费者实例


    目录

    一、基于命令行使用Kafka

    二、创建一个名为“itcasttopic”的主题

    ①、创建生产者

    ②、创建消费者

    ③、测试发送数据

    三、基于Java API方式使用Kafka

    ①、创建工程添加依赖

    ②、编写生产者客户端

    ③ 、配置环境

    ④、编写消费者客户端

    ⑤、再运行KafkaConsumerTest程序 ​编辑​编辑

    ⑥、再回到KafkaProducerTest.java运行该程序


     

    一、基于命令行使用Kafka

        类似scala,mysql等,命令行是初学者操作Kafka的基本方式,kafka的模式是生产者消费者模式,他们之间通讯是通过,一个公共频道完成

    二、创建一个名为“itcasttopic”的主题

    kafka-topics.sh --create --topic itcasttopic  --partitions 3  --replication-factor 2  --zookeeper master:2181,slave1:2181,slave2:2181

     

    --create --topic itcasttopic:  创建主题名称是 itcasttopic

    --partitions 3  : 分区数是3

    --replication-factor 2:副本数是 2

    --zookeeper master:2181,slave1:2181,slave2:2181 : zookeeper:服务的IP地址和端口

    1b35e7041cd746459f7b9bcc011850f6.png

     

    ##删除主题##

    $ bin/kafka-topics.sh --delete -zookeeper master:2181,slave1:2181,slave2:2181 --topic itcasttopic

     

    ①、创建生产者

    kafka-console-producer.sh  --broker-list master:9092,slave1:9092,slave2:9092 --topic itcasttopic

    e1138514f25c4465a5111fd5a02611db.png

     

    (上面是等待输入光标在闪烁)

    83087331134e44b38cd72745454d087f.png

     

    转换到slave1

    、创建消费者

    kafka-console-consumer.sh  --from-beginning --topic itcasttopic --bootstrap-server master:90

    ce761842a2d84647976512dd17bd843f.png

     

    ③、测试发送数据

    生产发送数据

    ea666802b2d2474cacf525450d3c5205.png

    消费接收数据

    2e9816c7bdc142b3a8bfca89a486984b.png

     

    三、基于Java API方式使用Kafka

    2ab018b696754842a27c5e791c26f33d.png

    560f905f3548454a828ed3c4898d1f1f.png

     

    修改配置:

    7d791f05e2a849c38c3ee4b778d10131.png

     a09d2778cfbb47aead16d8bf74d02f3e.png

     

    ①、创建工程添加依赖

    在工程里面的pom.xml文件添加Kafka依赖

    (Kafka依赖需要与虚拟机安装的Kafka版本保持一致)

    1. 2.11.8
    2. 2.7.4
    3. 2.3.2
    4.     
    5.         
    6.             org.apache.maven.plugins
    7.             maven-compiler-plugin
    8.             
    9.                 1.8
    10.                 1.8
    11.             
    12.         
    13.     
    14.     org.apache.kafka
    15.     kafka-clients
    16.     2.0.0
    17.     org.apache.kafka
    18.     kafka-streams
    19.     2.0.0

    ②、编写生产者客户端

    在工程的java目录下创建KafkaProducerTest文件

    5e0dada88c894b37872116cb5afc4f84.png

     

    1. import org.apache.kafka.clients.producer.KafkaProducer;
    2. import org.apache.kafka.clients.producer.ProducerRecord;
    3. import java.util.Properties;
    4. public class KafkaProducerTest {
    5.     public static void main(String[] args){
    6.         Properties props = new Properties();
    7.         //
    8.         props.put("bootstrap.servers","master:9092,slave1:9092,slave2:9092");
    9.         //
    10.         props.put("acks","all");
    11.         //
    12.         props.put("retries",0);
    13.         //
    14.         props.put("batch.size",16384);
    15.         //
    16.         props.put("linger.ms",1);
    17.         //
    18.         props.put("buffer.memory",33554432);
    19.         //
    20.         props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
    21.         //
    22.         props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
    23.         //
    24.         KafkaProducer<String,String> producer=new KafkaProducer<String, String>(props);
    25.         for (int i=0; i<50; i++){
    26.             producer.send(new ProducerRecord<String, String>("itcasttopic",Integer.toString(i),"hello world [2] -"+i));
    27.         }
    28.         producer.close();
    29.     }


    }edcb457a6e69430d975b24ad6ddf82d3.png

     

    Slave1上出现的结果

    3920872aaecf49f2b6506b6b07e69ea1.png

     

    ③ 、配置环境

    c26e97e27cb14e5dad208c2ef1ccab15.png

    80160532d6a3427d8b1ec7a4464bd345.png

     

    ④、编写消费者客户端

    1. import org.apache.kafka.clients.consumer.ConsumerRecord;
    2. import org.apache.kafka.clients.consumer.ConsumerRecords;
    3. import org.apache.kafka.clients.consumer.KafkaConsumer;
    4. import org.apache.kafka.clients.producer.Callback;
    5. import org.apache.kafka.clients.producer.KafkaProducer;
    6. import org.apache.kafka.clients.producer.ProducerRecord;
    7. import org.apache.kafka.clients.producer.RecordMetadata;
    8. import java.util.Arrays;
    9. import java.util.Properties;
    10. public class KafkaConsumerTest {
    11.     public static void main(String[] args) {
    12.         // 1、准备配置文件
    13.         Properties props = new Properties();
    14.         // 2、指定Kafka集群主机名和端口号
    15.         props.put("bootstrap.servers", "master:9092,slave1:9092,slave2:9092");
    16.         // 3、指定消费者组ID,在同一时刻同一消费组中只有一个线程可以去消费一个分区数据,不同的消费组可以去消费同一个分区的数据。
    17.         props.put("group.id", "itcasttopic");
    18.         // 4、自动提交偏移量
    19.         props.put("enable.auto.commit", "true");
    20.         // 5、自动提交时间间隔,每秒提交一次
    21.         props.put("auto.commit.interval.ms", "1000");
    22.         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    23.         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    24.         KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props);
    25.         // 6、订阅数据,这里的topic可以是多个
    26.         kafkaConsumer.subscribe(Arrays.asList("itcasttopic"));
    27.         // 7、获取数据
    28.         while (true) {
    29.             //每隔100ms就拉去一次
    30.             ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
    31.             for (ConsumerRecord<String, String> record : records) {
    32.                 System.out.printf("topic = %s,offset = %d, key = %s, value = %s%n", record.topic(), record.offset(), record.key(), record.value());
    33.             }
    34.         }
    35.     }
    36. }

    运行KafkaP roducerTest程序

    83ffa7dd0cb14c72b442a98b7f45f136.png

     

    ⑤、再运行KafkaConsumerTest程序 790caac612934c6eb173c4d117372979.png

    ⑥、再回到KafkaProducerTest.java运行该程序

    (查看KafkaConsumerTest的运行框)由以下图可以看出生产者生产消息成功被终端消费

    5b69e55e64c34dc69f6fccb1f8059e32.png

     

     

  • 相关阅读:
    为什么OpenCV计算的帧率是错误的?
    公共4G广播音柱有哪些用处
    四氯四碘荧光素二钾,CAS号: 632-68-8
    CSS3专题-[上篇]:过渡、2D转换、动画
    删除元素专题
    武汉信息系统建设和服务能力评估CS认证的重要性
    Jackson
    南洋才女,德艺双馨,孙燕姿本尊回应AI孙燕姿(基于Sadtalker/Python3.10)
    学信息系统项目管理师第4版系列30_信息系统管理
    十九、kafka消费者思考之partition leader切换会引起消费者Rebalance么?
  • 原文地址:https://blog.csdn.net/m0_57781407/article/details/126909919