• 发送消息(二)RoutingKafkaTemplate,DefaultKafkaProducerFactory和 ReplyingKafkaTemplate


    一、RoutingKafkaTemplate

    1.1、RoutingKafkaTemplate 能做什么

    RoutingKafkaTemplate可以根据目标topic名称在运行时选择生产者。

    RoutingKafkaTemplate 不支持事务、execute、flush或metrics操作,因为这些操作的主题未知。

    1.2、使用前提

    RoutingKafkaTemplate 和 KafkaTemplate使用前要做的工作差不多,都需要一个前期的配置。
    下面是一个例子:

    @SpringBootApplication
    public class Application {
    
        public static void main(String[] args) {
            SpringApplication.run(Application.class, args);
        }
    
        @Bean
        public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context,
                ProducerFactory<Object, Object> pf) {
    
            // 使用不同的序列化程序克隆生产者工厂,向Spring注册以关闭
            Map<String, Object> configs = new HashMap<>(pf.getConfigurationProperties());
            configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
            DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(configs);
            context.registerBean(DefaultKafkaProducerFactory.class, "bytesPF", bytesPF);
    
            Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
            map.put(Pattern.compile("two"), bytesPF);
            map.put(Pattern.compile(".+"), pf); // 带StringSerializer的默认生产者工厂
            return new RoutingKafkaTemplate(map);
        }
    
        @Bean
        public ApplicationRunner runner(RoutingKafkaTemplate routingTemplate) {
            return args -> {
                routingTemplate.send("one", "thing1");
                routingTemplate.send("two", "thing2".getBytes());
            };
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    二、使用 DefaultKafkaProducerFactory

    2.1、创建

    在KafkaTemplate配置的时候,有这么一段代码:

    @Bean
    public ProducerFactory<Integer, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }
    
    • 1
    • 2
    • 3
    • 4

    它用于创建生产者,在不使用事务的前提下 DefaultKafkaProducerFactory 会创建一个由所有客户端使用的单例生产者,
    这个状态下如果调用flush()模板,可能会导致使用同一生产者的其他线程出现延迟。要避免这种情况可以将属性producerPerThread设置为true,这个时候工厂将为每个线程创建(并缓存)一个单独的生产者。

    当producerPerThread是true且不再需要生产者时,用户代码必须调用 closeThreadBoundProducer()从物理上关闭生产者并将其从 ThreadLocal 中删除. 调用reset()或destroy()不会清理这些生产者。

    创建 DefaultKafkaProducerFactory 时,可以通过调用仅接受属性映射的构造函数从配置中获取键和值,
    或者可以将 Serializer 实例传递给 DefaultKafkaProducerFactory 构造函数(在 在这种情况下,所有 Producer 共享相同的实例)。
    或者,提供Supplier,它将用于为每个生产者获取单独的Serializer实例:

    @Bean
    public ProducerFactory<Integer, CustomValue> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs(), null, () -> new CustomValueSerializer());
    }
    
    @Bean
    public KafkaTemplate<Integer, CustomValue> kafkaTemplate() {
        return new KafkaTemplate<Integer, CustomValue>(producerFactory());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    2.2、更新

    在创建工厂后更新生产者属性可以调用:

    void updateConfigs(Map<String, Object> updates);
    
    void removeConfig(String configKey);
    
    • 1
    • 2
    • 3

    这两个方法不会影响现有的生产者实例,调用reset()可以关闭任何现有的生产者,之后使用新属性创建新的生产者。

    不能将事务性生产者工厂更改为非事务性工厂,或者颠倒更改

    三、使用 ReplyingKafkaTemplate

    3.1、创建和简单使用

    ReplyingKafkaTemplate是KafkaTemplate的子类,用来提供请求/回复。

    @SpringBootApplication
    public class KRequestingApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(KRequestingApplication.class, args).close();
        }
    
        @Bean
        public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
            return args -> {
                ProducerRecord<String, String> record = new ProducerRecord<>("kRequests", "foo");
                RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
                SendResult<String, String> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
                System.out.println("Sent ok: " + sendResult.getRecordMetadata());
                ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
                System.out.println("Return value: " + consumerRecord.value());
            };
        }
    
        @Bean
        public ReplyingKafkaTemplate<String, String, String> replyingTemplate(
                ProducerFactory<String, String> pf,
                ConcurrentMessageListenerContainer<String, String> repliesContainer) {
    
            return new ReplyingKafkaTemplate<>(pf, repliesContainer);
        }
    
        @Bean
        public ConcurrentMessageListenerContainer<String, String> repliesContainer(
                ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
    
            ConcurrentMessageListenerContainer<String, String> repliesContainer =
                    containerFactory.createContainer("replies");
            repliesContainer.getContainerProperties().setGroupId("repliesGroup");
            repliesContainer.setAutoStartup(false);
            return repliesContainer;
        }
    
        @Bean
        public NewTopic kRequests() {
            return TopicBuilder.name("kRequests")
                .partitions(10)
                .replicas(2)
                .build();
        }
    
        @Bean
        public NewTopic kReplies() {
            return TopicBuilder.name("kReplies")
                .partitions(10)
                .replicas(2)
                .build();
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55

    可以使用 Boot 的自动配置容器工厂来创建回复容器,比如配置回复容器并使用相同共享回复主题

    @Bean
    public ConcurrentMessageListenerContainer<String, String> replyContainer(
            ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
    
        ConcurrentMessageListenerContainer<String, String> container = containerFactory.createContainer("topic2");
        container.getContainerProperties().setGroupId(UUID.randomUUID().toString()); // 唯一的
        Properties props = new Properties();
        props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // 不会收到旧回复
        container.getContainerProperties().setKafkaConsumerProperties(props);
        return container;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    3.2、使用 Message 回复/请求

    Message是 ReplyingKafkaTemplate 的一个抽象方法。

    @KafkaListener(id = "requestor", topics = "request")
    @SendTo
    public Message<?> messageReturn(String in) {
        return MessageBuilder.withPayload(in.toUpperCase())
                .setHeader(KafkaHeaders.TOPIC, replyTo)
                .setHeader(KafkaHeaders.MESSAGE_KEY, 42)
                .setHeader(KafkaHeaders.CORRELATION_ID, correlation)
                .build();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
  • 相关阅读:
    deepspeed 训练多机多卡报错 ncclSystemError Last error
    论文学习记录随笔 多标签之LIFT
    基于JAVA SpringBoot和HTML美食网站博客程序设计
    webpack:自定义plugin插件开发
    golang 协程的实现原理
    【JAVA项目实战】【图书管理系统】登录模块【Servlet】+【Session】+【Filter】+【JSP】
    PG FULL_PAGE_WRITES MYSQL DOUBLE WRITE LOG
    中国三氧化二砷行业研究与未来预测报告(2022版)
    SpringBoot如何自定义启动Banner 以及自定义启动项目控制台输出信息 类似于若依启动大佛 制作教程
    什么是无损检测设备?
  • 原文地址:https://blog.csdn.net/qq_35241329/article/details/132875536