• Kafka生产者消息异步发送并返回发送信息api编写教程


    1.引入依赖(pox.xml文件)

           

                org.apache.kafka

                kafka-clients

                3.6.2

           

    2.创建java类

    3.配置运行属性

    //连接的服务器

    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092");

    //指定对应的key和value的序列化类型

    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

    //关联自定义分区器

    //properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.ljr.kafka.producer.MyPartitioner");

    4.创建生产者对象

    键入new KafkaProducer<>(),光标置于括号内CTRL+P可以显示需要对象为properties;

    键入new Properties().var 回车,键入new KafkaProducer<>(properties).var 回车,选择变量名

    5.发送消息并返回发送结果

    键入KafkaProducer.send(),提示需要对象ProducerRecord;键入topic名(order)和要发送的信息(“0000”+i),new Callback()回车会弹出需要重写的抽象类,补全返回条件、需要返回的信息即可实现抽象类;

    e == null 表示消息全部发送完毕;

    6.关闭资源

    KafkaProducer.close();

    7.运行查看结果

    运行:

    可以看到有返回信息;

    另开窗口查看发送结果

    kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092 --topic order

    信息发送成功;

    8.完整代码

    1. package com.ljr.kafka.producer;
    2. import org.apache.kafka.clients.producer.*;
    3. import org.apache.kafka.common.serialization.StringSerializer;
    4. import java.util.Properties;
    5. public class CustomProducerCallback {
    6. public static void main(String[] args) {
    7. Properties properties = new Properties();
    8. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092");
    9. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    10. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
    11. /关联自定义分区器
    12. // properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.ljr.kafka.producer.MyPartitioner");
    13. KafkaProducer kafkaProducer = new KafkaProducer<>(properties);
    14. for(int i =0; i < 3; i++){
    15. kafkaProducer.send(new ProducerRecord<>("customers", "LiSi" + i), new Callback() {
    16. @Override
    17. public void onCompletion(RecordMetadata recordMetadata, Exception e) {
    18. if (e == null) {
    19. System.out.println("topic:" + recordMetadata.topic() + " partition:" + recordMetadata.partition());
    20. }
    21. }
    22. });
    23. }
    24. kafkaProducer.close();
    25. }
    26. }

  • 相关阅读:
    国产域控TSN协议栈首发量产,理想汽车+映驰科技「抢先」
    【校招VIP】前端布局模块之Flex弹性布局
    I/O设备的概念和分类,I/O控制器
    使用 Web HID API 在浏览器中进行HID设备交互(纯前端)
    js【详解】数据类型原理(含变量赋值详解-浅拷贝)
    Android Jetpack之LifeCycle
    考虑源荷不确定性的热电联供微网优化(Matlab代码实现)
    springboot banner.txt文件
    集合幂级数相关
    第九章 软件BUG和管理
  • 原文地址:https://blog.csdn.net/v15220/article/details/139309338