• Kafka-Java一:Spring实现kafka消息的简单发送


    目录

    写在前面

    一、创建maven项目

    二、引入依赖

    2.1、maven项目创建完成后,需要引入以下依赖

    2.2、创建工程目录

    三、创建生产者

    3.1、创建生产者,同步发送消息

    3.2、创建生产者,异步发送消息

    四、同步发送消息和异步发送消息的区别

    五、报错处理思路


    写在前面

            该文章通过spring只实现消息的简单发送,不实现消息的监听。

    一、创建maven项目

            创建maven过程不再赘述。

    二、引入依赖

    2.1、maven项目创建完成后,需要引入以下依赖

    1. // kafka 依赖
    2. org.apache.kafka
    3. kafka-clients
    4. 3.4.0
    5. // json依赖,demo中可能会用到该依赖,与kafka依赖无关
    6. com.alibaba
    7. fastjson
    8. 2.0.10

    2.2、创建工程目录

    三、创建生产者

    3.1、创建生产者,同步发送消息

            3.1.1、在MyProducer中实现如下代码

    1. package com.demo.lxb.kafka;
    2. import com.alibaba.fastjson.JSON;
    3. import com.demo.lxb.entiry.Order;
    4. import org.apache.kafka.clients.producer.*;
    5. import org.apache.kafka.common.serialization.StringSerializer;
    6. import java.util.Properties;
    7. import java.util.concurrent.CountDownLatch;
    8. import java.util.concurrent.ExecutionException;
    9. /**
    10. * @Description:
    11. * @Author: lvxiaobu
    12. * @Date: 2023-10-23 17:06
    13. **/
    14. public class MyProducer {
    15. private final static String TOPIC_NAME = "topic0921";
    16. public static void main(String[] args) throws ExecutionException, InterruptedException {
    17. Properties props = new Properties();
    18. // 一、设置参数
    19. // 配置kafka地址
    20. //props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.151.28:9092"); // 单机配置
    21. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
    22. "192.168.151.28:9092,192.168.151.28:9092,192.168.151.28:9092"); // 集群配置
    23. // 配置消息 键值的序列化规则
    24. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    25. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
    26. // 二、声明消息对象
    27. // 未指定发送分区,具体撒送分区的计算公式: hash(key)%PartitionNum
    28. // 创建发送的消息: producerRecord
    29. // 参数1: 要发送的主题
    30. // 参数2: 消息的key值,可有可无,如果存在的话,该字段用来带入分区的计算公式
    31. // 参数3: value,具体的消息的内容,json格式的字符串
    32. ProducerRecord producerRecord = new ProducerRecord
    33. (TOPIC_NAME,
    34. "mykey",
    35. "hello-kafka");
    36. // 三、声明消息发送者
    37. Producer producer = new KafkaProducer(props);
    38. // 开发发送,并返回结果和元数据
    39. RecordMetadata recordMetadata = producer.send(producerRecord).get();
    40. System.out.println("发送消息结果: " + "topic-" + recordMetadata.topic() + " | partition-"
    41. + recordMetadata.partition() + " | offset-" + recordMetadata.offset());
    42. }
    43. }

            执行main方法,结果如下:

            如果多次执行main方法,会发现offset偏移量的数字会发生变化。 

    3.2、创建生产者,异步发送消息

            3.2.1、在MyProducer2中实现如下代码

    1. package com.demo.lxb.kafka;
    2. import org.apache.kafka.clients.producer.*;
    3. import org.apache.kafka.common.serialization.StringSerializer;
    4. import java.util.Properties;
    5. import java.util.concurrent.ExecutionException;
    6. /**
    7. * @Description: kafka 异步发送消息
    8. * @Author: lvxiaobu
    9. * @Date: 2023-10-23 17:06
    10. **/
    11. public class MyProducer2 {
    12. private final static String TOPIC_NAME = "topic0921";
    13. public static void main(String[] args) throws ExecutionException, InterruptedException {
    14. Properties props = new Properties();
    15. // 一、设置参数
    16. // 配置kafka地址
    17. // props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
    18. // "192.168.151.28:9092"); // 单机配置
    19. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
    20. "192.168.154.128:9092,192.168.154.128:9093,192.168.154.128:9094"); // 集群配置
    21. // 配置消息 键值的序列化规则
    22. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    23. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
    24. // 二、声明消息对象
    25. // 未指定发送分区,具体撒送分区的计算公式: hash(key)%PartitionNum
    26. // 创建发送的消息: producerRecord
    27. // 参数1: 要发送的主题
    28. // 参数2: 消息的key值,可有可无,如果存在的话,该字段用来带入分区的计算公式
    29. // 参数3: value,具体的消息的内容,json格式的字符串
    30. ProducerRecord producerRecord = new ProducerRecord
    31. (TOPIC_NAME,
    32. "mykey",
    33. "hello-kafka2");
    34. // 三、声明消息发送者
    35. Producer producer = new KafkaProducer(props);
    36. // 异步发送消息,通过callback回调函数获取发送结果
    37. producer.send(producerRecord, new Callback() {
    38. public void onCompletion(RecordMetadata recordMetadata, Exception e) {
    39. if(e != null){
    40. System.out.println("消息发送失败:" + e);
    41. }
    42. if(recordMetadata != null){
    43. System.out.println("发送消息结果: " + "topic-" + recordMetadata.topic() + " | partition-"
    44. + recordMetadata.partition() + " | offset-" + recordMetadata.offset());
    45. }
    46. }
    47. });
    48. Thread.sleep(50000L);
    49. }
    50. }

    执行 Main方法,会产生和同步发送消息一样的结果。

    说明:Thread.sleep(50000L)是让主线程休眠50s,否则主线程在异步发送了消息以后就直接结束了,不会再输出callback中的输出语句

    四、同步发送消息和异步发送消息的区别

    消息的同步发送
      如果生产者发送的消息没有收到kafka的ack通知,生产者会产生阻塞,如果阻塞了3s仍然没有收到消息反馈,会进行消息发送的重试操作,重试的次数是3次。如果三次以后还不行,代码将抛出异常
    消息的异步发送
      生产者发送消息后,会提供一个callback的回调方法,callback会获取消息是否发送成功的结果。但是需要注意,异步发送消息会出现消息的丢失。

    五、报错处理思路

            3.2.1、检查Props配置Kafka地址是否正确

            3.2.2、检查Linux是否关闭防火墙

  • 相关阅读:
    【涨薪技术】0到1学会性能测试 —— LR录制回放&事务&检查点
    MIKE水动力笔记12_数字化海图1之提取确值水深点
    SQL中的case then的使用(select、update、insert、delete中各自使用)
    【推搜】embedding评估 | faiss的topK向量检索
    再来一次,新技术搞定老业务「GitHub 热点速览 v.22.44」
    【机器学习3】有监督学习经典分类算法
    蓝牙助听模块场景分析之一
    网速、宽带、带宽、流量三者之间的关系是什么?
    ubuntu x86_64 源码编译 rust 1.48.0
    国庆10.01
  • 原文地址:https://blog.csdn.net/qq_36769100/article/details/133995494