• 消息队列 随笔 6-spring-kafka


    0. 写的比较乱还不是最难受,最难受的是有一些问题遗留了下来😥

    SpringBoot集成kafka全面实战
    spring kafka 事务整合
    spring kafka 消息转发 、 延迟监听


    以下代码,详见gittee

    1. 配置类

    除了配置类,其余的配置文件、代码我都是分成生产者、消费者描述了,因为我是运行测试类上面的

    package com.weng.cloud.service8881.kafka;
    
    import com.google.common.collect.Maps;
    import org.apache.kafka.clients.admin.NewTopic;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.Primary;
    import org.springframework.kafka.annotation.EnableKafka;
    import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
    import org.springframework.kafka.core.ConsumerFactory;
    import org.springframework.kafka.core.DefaultKafkaProducerFactory;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.core.ProducerFactory;
    import org.springframework.kafka.listener.AbstractMessageListenerContainer;
    import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
    import org.springframework.kafka.listener.ContainerProperties;
    import org.springframework.kafka.listener.KafkaMessageListenerContainer;
    import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
    import org.springframework.scheduling.annotation.EnableScheduling;
    
    import java.util.Map;
    
    /**
     * @Author: weng
     * @Date: 2022/8/7
     * @Description: com.weng.cloud.service8881.kafka
     * 

    * 生产者 配置事务 有两种方式: * 1.使用默认的ProducerFactory,需要指定 spring.kafka.producer.transaction-id-prefix 配置 * 2.自定义ProducerFactory *

    * 代码中的 调用方式 也分为两种,与配置方式无关: * 1.@Transactional * 2.kafkaTemplate.executeInTransaction() */ @Configuration @EnableKafka // 之前不加这个@EnableScheduling KafkaListenerEndpointRegistry 的注入无效,不知道啥问题 @EnableScheduling public class KafkaConfig { @Bean public NewTopic initTopic() { return new NewTopic("weng-test-topic", 8, (short) 2); } // 如果要修改分区数,只需修改配置值重启项目即可 // 修改分区数并不会导致数据的丢失,但是分区数只能增大不能减小 // @Bean // public NewTopic updateTopic() { // return new NewTopic("weng-test-topic", 10, (short) 2); // } /* 这里配置自定义的 ProducerFactory 以及 KafkaTemplate */ @Bean("kafkaTemplate") @Primary public KafkaTemplate<String, Object> kafkaTemplate(ProducerFactory producerFactory) { return new KafkaTemplate<>(producerFactory); } @Value("${spring.kafka.bootstrap-servers}") private String servers; @Bean("txKafkaTemplate") public KafkaTemplate<String, Object> txKafkaTemplate() { return new KafkaTemplate<>(this.producerFactory()); } private ProducerFactory<String, Object> producerFactory() { DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory(producerConfigs()); producerFactory.setTransactionIdPrefix("tx"); return producerFactory; } private Map<String, Object> producerConfigs() { Map<String, Object> props = Maps.newHashMap(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.servers); props.put(ProducerConfig.RETRIES_CONFIG, 1); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // 配置生产者拦截器 // props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.example.springbootkafka.interceptor.CustomProducerInterceptor"); // 配置拦截器消息处理类 // SendMessageInterceptorUtil sendMessageInterceptorUtil = new SendMessageInterceptorUtil(); return props; } @Bean public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() { return (message, exception, consumer) -> { System.err.println("突然演变成了,阳光的夏天 >>> " + exception.getMessage() + "消费异常:" + message.getPayload()); return null; }; } @Bean public ConcurrentKafkaListenerContainerFactory filterOddContainerFactory(ConsumerFactory consumerFactory) { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory); // 被过滤的消息是否丢弃 factory.setAckDiscarded(true); // 自定义过滤策略 factory.setRecordFilterStrategy(consumerRecord -> { if (Integer.parseInt(consumerRecord.value().toString()) % 2 == 0) { return false; } //返回true消息则被过滤 return true; }); return factory; } // 监听器容器工厂(设置禁止KafkaListener自启动) @Bean public ConcurrentKafkaListenerContainerFactory delayContainerFactory(ConsumerFactory consumerFactory) { ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory(); container.setConsumerFactory(consumerFactory); //禁止KafkaListener自启动 //设置提交偏移量的方式(禁止 自启动 需要手工提交) // container.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); container.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); // 实际测试过程中发现禁止自启动并没有成功 container.setAutoStartup(false); return container; } @Bean public ConcurrentKafkaListenerContainerFactory sendToContainerFactory(ConsumerFactory consumerFactory, KafkaTemplate kafkaTemplate) { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory); factory.setReplyTemplate(kafkaTemplate); return factory; } @Bean public ReplyingKafkaTemplate replyingKafkaTemplate(ProducerFactory producerFactory, KafkaMessageListenerContainer replyContainer) { ReplyingKafkaTemplate replyingKafkaTemplate = new ReplyingKafkaTemplate<>(producerFactory, replyContainer); return replyingKafkaTemplate; } @Bean public KafkaMessageListenerContainer replyContainer(ConsumerFactory consumerFactory) { ContainerProperties containerProperties = new ContainerProperties("topic-weng-reply"); // 这里初始化的时候,报错:No group.Id xxx fail to start ,故这里会加一个消费者的组id containerProperties.setGroupId("defaultConsumerGroup"); return new KafkaMessageListenerContainer(consumerFactory, containerProperties); } }

    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155

    2. 生产者

    2.1 yml

    spring:
      kafka:
        # kafka 集群brokers
        bootstrap-servers: localhost:9092,localhost:9093,localhost:9094
        producer:
          retries: 0
          # 生产者ack级别
          acks: -1
          # 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
          # 批量提交大小
          batch-size: 16384
          # 提交延时
          properties:
            linger:
              # linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了
              ms: 0
              # 自定义分区器(作为默认的分区器)
    #        partitioner:
    #          class: com.weng.cloud.service8881.kafka.CustomizePartitioner
          # 生产者的缓冲区大小
          buffer-memory: 33554432
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
          # 这个方式即通过配置的方式开启事务,只要追踪这个报错信息改,就完事了
          # 若不配置这里的话,调用executeInTransaction 将报错:Producer factory does not support transactions
          # 离谱的是,还得设置重试次数,不然:Must set retries to non-zero when using the idempotent producer.
          # 还需要设置 确认级别,否则报错Must set acks to all in order to use the idempotent producer. Otherwise we cannot guarantee idempotence.
          # 发送方法还需要加 @Transactional,否则报错 No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record
          # 需要ReplyTemplate,否则报错:a KafkaTemplate is required to support replies
    #      transaction-id-prefix: tx
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    2.2 test-code

    package com.weng.cloud.service8881.kafka;
    
    import com.cloud.api.mock.MockArgDto;
    import com.weng.cloud.commons.base.JsonUtil;
    import com.weng.cloud.commons.base.TestUtil;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.apache.kafka.common.header.internals.RecordHeader;
    import org.junit.jupiter.api.DisplayName;
    import org.junit.jupiter.api.Test;
    import org.junit.jupiter.api.extension.ExtendWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
    import org.springframework.kafka.requestreply.RequestReplyFuture;
    import org.springframework.kafka.support.KafkaHeaders;
    import org.springframework.kafka.support.SendResult;
    import org.springframework.messaging.MessageHeaders;
    import org.springframework.messaging.support.GenericMessage;
    import org.springframework.test.context.ActiveProfiles;
    import org.springframework.test.context.junit.jupiter.SpringExtension;
    import org.springframework.transaction.annotation.Transactional;
    import org.springframework.util.concurrent.ListenableFuture;
    
    import javax.annotation.Resource;
    import java.util.Date;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @Author: weng
     * @Date: 2022/8/13
     * @Description: com.weng.cloud.service8881.kafka
     *
     * @see spring kafka 实战
     * @see spring kafka 事务整合
     * @see spring kafka 消息转发 、 延迟监听
     */
    @Slf4j
    @ExtendWith(SpringExtension.class)
    @SpringBootTest
    @ActiveProfiles("kafka-producer")
    public class ProducerTest {
    
        @Resource
        KafkaTemplate<String, Object> kafkaTemplate;
    
        @Resource
        KafkaTemplate<String, Object> txKafkaTemplate;
    
        @DisplayName("简单生产者")
        @Test
        void t1() {
            String json = JsonUtil.toText(MockArgDto.builder().content("简单生产者").date(new Date()).build());
            kafkaTemplate.send("topic-simple", json);
        }
    
        @DisplayName("使用Message发送消息")
        @Test
        void t2() {
            //使用Message发送消息
            Map map = new HashMap();
            map.put(KafkaHeaders.TOPIC, "topic-simple");
            map.put(KafkaHeaders.PARTITION_ID, 0);
            map.put(KafkaHeaders.MESSAGE_KEY, "0");
            String json = JsonUtil.toText(MockArgDto.builder().content("使用Message发送消息").date(new Date()).build());
            GenericMessage message = new GenericMessage(json, new MessageHeaders(map));
            kafkaTemplate.send(message);
        }
    
        @DisplayName("record")
        @Test
        void t3() {
            String json = JsonUtil.toText(MockArgDto.builder().content("record").date(new Date()).build());
            ProducerRecord<String, Object> record = new ProducerRecord<>("topic-simple", json);
            kafkaTemplate.send(record);
        }
    
        @DisplayName("成功的回调")
        @Test
        void
        t4() throws InterruptedException, ExecutionException {
            String json = JsonUtil.toText(MockArgDto.builder().content("成功的回调").date(new Date()).build());
            // 生产者回调
            // public ListenableFuture> send(String topic, @Nullable V data)
            // 可以看到send方法返回的是future,是异步的
            ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("topic-simple", json);
            future.addCallback(success -> {
                ProducerRecord<String, Object> producerRecord = success.getProducerRecord();
                RecordMetadata recordMetadata = success.getRecordMetadata();
                log.info("producerRecord >>> {}", JsonUtil.toText(producerRecord));
                log.info("recordMetadata >>> {}", JsonUtil.toText(recordMetadata));
            }, failure -> {
    //            failure.getStackTrace();
    //            failure.getCause();
                log.error("failure >>> {}", JsonUtil.toText(failure.getMessage()));
            });
            System.err.println(TestUtil.format("future.get >>> {}", JsonUtil.toText(future.get())));
        }
    
        @DisplayName("失败的回调")
        @Test
        void
        t5() throws InterruptedException, ExecutionException, TimeoutException {
            String json = JsonUtil.toText(MockArgDto.builder().content("失败的回调").date(new Date()).build());
            // 生产者回调
            // public ListenableFuture> send(String topic, @Nullable V data)
            // 可以看到send方法返回的是future,是异步的
            ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("topic-simple", json);
            future.addCallback(success -> {
                ProducerRecord<String, Object> producerRecord = success.getProducerRecord();
                RecordMetadata recordMetadata = success.getRecordMetadata();
                log.info("producerRecord >>> {}", JsonUtil.toText(producerRecord));
                log.info("recordMetadata >>> {}", JsonUtil.toText(recordMetadata));
            }, failure -> {
    //            failure.getStackTrace();
    //            failure.getCause();
                log.error("failure >>> {}", JsonUtil.toText(failure.getMessage()));
            });
            System.err.println(TestUtil.format("future.get >>> {}", JsonUtil.toText(
                    // 给一个极小的返回等待时间,制造一起人为的生产者异常
                    future.get(1, TimeUnit.NANOSECONDS))));
        }
    
        @DisplayName("指定分区(发送时指定)")
        @Test
        void t6() throws ExecutionException, InterruptedException {
            for (int i = 0; i < 4; i++) {
                System.out.println(kafkaTemplate.send("weng-test-topic", 7,
                        // 顺便 赋值 时间戳
                        Long.valueOf(System.currentTimeMillis()), "test-key",
                        JsonUtil.toText(MockArgDto.builder().content("指定分区(发送时指定) 【" + i + "】")
                                .date(new Date()).build())).get());
            }
        }
    
        @DisplayName("指定分区(配置&配置类)")
        @Test
        void t7() throws ExecutionException, InterruptedException {
            for (int i = 0; i < 7; i++) {
                System.out.println(kafkaTemplate.send("weng-test-topic",
                        JsonUtil.toText(MockArgDto.builder().content("指定分区(配置&配置类) 【" + -i + "】")
                                .date(new Date()).build())).get());
            }
        }
    
    /*
        @DisplayName("生产者的事务(一旦回滚,消息不会发出去)")
    //    @Transactional
        @Test
        void t8() {
            // ReplyTemplate
            ConcurrentKafkaListenerContainerFactory containerFactory = new ConcurrentKafkaListenerContainerFactory();
            containerFactory.setReplyTemplate(kafkaTemplate);
            String json = JsonUtil.toText(MockArgDto.builder()
                    .content("生产者的事务(一旦回滚,消息不会发出去)").date(new Date()).build());
            kafkaTemplate.executeInTransaction(operations -> {
                operations.send("tx-topic", json);
                throw new RuntimeException("我自然而然的关怀");
            });
    
            // 这个是 消费者手动提交偏移量 使用的
    //        kafkaTemplate.sendOffsetsToTransaction();
        }
    */
    
    /*
        @DisplayName("测试事务(一旦回滚,消息不会发出去)")
        @Transactional
        @Test
        void t9() {
            String json = JsonUtil.toText(MockArgDto.builder()
                    .content("测试事务(一旦回滚,消息不会发出去)").date(new Date()).build());
            kafkaTemplate.send("tx-topic", json);
            throw new RuntimeException("My Life 一直在等待");
        }
    */
    
        @DisplayName("俩kafkaTemplate")
        @Test
        void t10() {
            KafkaTemplate<String, Object> kafkaTemplate = this.kafkaTemplate;
            KafkaTemplate<String, Object> txKafkaTemplate = this.txKafkaTemplate;
            System.err.println();
        }
    
        @DisplayName("事务代码调用方式1.@Transactional")
        @Test
        @Transactional
        void t11() {
            // 好像把kafka_home/log下面的事务topic干掉的话,可能会报这个(搞的我每次都要重启kafka,并清理log文件)
            // java.lang.IllegalStateException: No transaction is in process; possible solutions: run the template operation
            // within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional
            // before invoking the template method, run in a transaction started by a listener container when consuming a record
            String json = JsonUtil.toText(MockArgDto.builder()
                    .content("事务代码调用方式1.@Transactional").date(new Date()).build());
            this.txKafkaTemplate.send("tx-topic-weng-0", json);
            throw new RuntimeException("豁咖啡的生活应该会从容一些吧...");
        }
    
        @DisplayName("事务代码调用方式2.kafkaTemplate.executeInTransaction()")
        @Test
        void t12() {
            String json = JsonUtil.toText(MockArgDto.builder()
                    .content("事务代码调用方式2.kafkaTemplate.executeInTransaction()").date(new Date()).build());
            this.txKafkaTemplate.executeInTransaction(operations -> {
                operations.send("tx-topic-weng-0", json);
                throw new RuntimeException("蓝色的思念...");
            });
        }
    
        @DisplayName("使用默认的ProducerFactory x 事务代码调用方式1.@Transactional")
        @Transactional
        @Test
        void t13() {
            String json = JsonUtil.toText(MockArgDto.builder()
                    .content("使用默认的ProducerFactory x 事务代码调用方式1.@Transactional").date(new Date()).build());
            this.kafkaTemplate.send("tx-topic-weng-0", json);
            throw new RuntimeException("期待你发现我的爱~~~");
        }
    
        @DisplayName("使用默认的ProducerFactory x 事务代码调用方式2.kafkaTemplate.executeInTransaction()")
        @Test
        void t14() {
            String json = JsonUtil.toText(MockArgDto.builder()
                    .content("使用默认的ProducerFactory x 事务代码调用方式2.kafkaTemplate.executeInTransaction()").date(new Date()).build());
            this.kafkaTemplate.executeInTransaction(operations -> {
                operations.send("tx-topic-weng-0", json);
                throw new RuntimeException("就像...你的香味悬在半空");
            });
        }
    
        @DisplayName("测试消费者批量消费")
        @Test
        void t15() {
            for (int i = 0; i < 6; i++) {
                String json = JsonUtil.toText(MockArgDto.builder()
                        .content(TestUtil.format("测试消费者批量消费 第{}条消息,第{}批次",i,(i/2)+1)).date(new Date()).build());
                this.kafkaTemplate.send("topic-batch", json);
            }
        }
    
        @DisplayName("测试消费者过滤奇数")
        @Test
        void t16() {
            for (int i = 0; i < 6; i++) {
                ProducerRecord<String, Object> record = new ProducerRecord<>("topic-filtered", ""+i);
                kafkaTemplate.send(record);
            }
        }
    
        @DisplayName("测试消费者监听器延迟启动")
        @Test
        void t17() {
            String json = JsonUtil.toText(MockArgDto.builder().content("测试消费者监听器延迟启动").date(new Date()).build());
            kafkaTemplate.send("topic-delay", json);
            System.err.println(json);
        }
    
        @Autowired
        ReplyingKafkaTemplate replyingKafkaTemplate;
    
        @DisplayName("测试消费者基于ReplyTemplate方式的消息转发")
        @Test
        void t18() throws ExecutionException, InterruptedException {
            ProducerRecord<String, Object> record = new ProducerRecord<>("topic-filtered","this is a message");
            record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "topic-weng-reply".getBytes()));
            RequestReplyFuture<String, String, Object> future = replyingKafkaTemplate.sendAndReceive(record);
            SendResult<String, String> sendResult = future.getSendFuture().get();
            log.info("record metaData >>> {}", sendResult.getRecordMetadata());
            ConsumerRecord<String, Object> consumerRecord = future.get();
            log.info("return value >>> {}", consumerRecord.value());
            TimeUnit.MINUTES.sleep(1);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    • 249
    • 250
    • 251
    • 252
    • 253
    • 254
    • 255
    • 256
    • 257
    • 258
    • 259
    • 260
    • 261
    • 262
    • 263
    • 264
    • 265
    • 266
    • 267
    • 268
    • 269
    • 270
    • 271
    • 272
    • 273
    • 274
    • 275
    • 276
    • 277
    • 278
    • 279
    • 280
    • 281

    3. 消费者

    3.1 yml

    spring:
      kafka:
        # kafka 集群brokers
        bootstrap-servers: localhost:9092,localhost:9093,localhost:9094
        consumer:
          properties:
            group:
              # # 默认的消费组ID
              id: defaultConsumerGroup
            session:
              # 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
              timeout:
                ms: 120000
            request:
              # 消费请求超时时间
              timeout:
                ms: 180000
          # 是否自动提交
          # 这里关闭是因为 需要测试 禁止监听自启动(需要手动提交)
          enable-auto-commit: true
          # 提交offset延时(接收到消息后多久提交offset)
          auto-commit-interval: 1000
          # 当kafka中没有初始offset或offset超出范围时将自动重置offset
          # earliest:重置为分区中最小的offset;
          # latest:重置为分区中最新的offset(消费分区中新产生的数据);
          # none:只要有一个分区不存在已提交的offset,就抛出异常;
          auto-offset-reset: latest
          # Kafka提供的序列化和反序列化类
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          # 批量消费每次最多消费多少条消息
    #      max-poll-records: 2
        listener:
          # 消费端监听的topic不存在时,项目启动会报错(关掉),其实默认就是false
          missing-topics-fatal: false
          # 默认单条处理,这里设置 批量消费
    #      type: batch
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    3.2 test-code

    package com.weng.cloud.service8881.kafka;
    
    import com.weng.cloud.commons.base.JsonUtil;
    import com.weng.cloud.commons.base.TestUtil;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.junit.jupiter.api.DisplayName;
    import org.junit.jupiter.api.Test;
    import org.junit.jupiter.api.extension.ExtendWith;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.listener.ContainerProperties;
    import org.springframework.kafka.support.Acknowledgment;
    import org.springframework.messaging.handler.annotation.SendTo;
    import org.springframework.test.context.ActiveProfiles;
    import org.springframework.test.context.junit.jupiter.SpringExtension;
    
    import javax.annotation.Resource;
    import java.util.List;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @Author: weng
     * @Date: 2022/8/13
     * @Description: com.weng.cloud.service8881.kafka
     */
    @Slf4j
    @ExtendWith(SpringExtension.class)
    @SpringBootTest
    @ActiveProfiles("kafka-consumer")
    public class ConsumerTest {
    
        @Resource
        KafkaTemplate<String, Object> kafkaTemplate;
    
        @DisplayName("简单消费者")
        @Test
        public void t1() throws InterruptedException {
            TimeUnit.MINUTES.sleep(3);
        }
    
        // topicPattern = "a.*.b" 不能同时使用 topics
        // containerFactory 支持 传入工厂(比如说 指定线程数、拉取频率、批读取 等等局部属性)
        // @TopicPartition 可以指定分区
        @KafkaListener(topics = {"topic-simple", "weng-test-topic"})
        public void listen(ConsumerRecord<?, ?> record) {
            System.err.println(TestUtil.format("一年前的我们过的那么快乐 >>> {}", JsonUtil.toText(record)));
        }
    
        @KafkaListener(topicPattern = "tx-topic-weng-*")
        public void listenTx(ConsumerRecord<?, ?> record) {
            System.err.println(TestUtil.format("我猜你早已发现我的爱 >>> {}", JsonUtil.toText(record)));
        }
    
        @KafkaListener(topics = "topic-batch", errorHandler = "consumerAwareErrorHandler")
        public void listenBatch(List<ConsumerRecord<?, ?>> records) throws Exception {
            System.err.println(TestUtil.format("天空好像下雨 >>> {}", JsonUtil.toText(records)));
            throw new Exception("还记得昨天,那个夏天");
        }
    
        @KafkaListener(topics = {"topic-filtered"}, containerFactory = "filterOddContainerFactory")
        public void listenFiltered(ConsumerRecord<?, ?> record) {
            System.err.println(TestUtil.format("心中的热却不退 >>> {}", JsonUtil.toText(record)));
        }
    
        // ReplyTemplate是我们用来转发消息所使用的类。@SendTo注解本质其实就是利用这个ReplyTemplate转发监听方法的返回值到对应的Topic中
        @KafkaListener(topics = "topic-filtered", containerFactory = "sendToContainerFactory")
        // 消息转发 方式1.将需要转发的消息 直接放入到 指定的目标topic中
        @SendTo("topic-simple")
        public String listenReplied(ConsumerRecord<?, ?> record) {
            Integer i = Integer.valueOf(record.value().toString());
            System.err.println(TestUtil.format("想要找回来 >>> {}", i));
            i = i * 10;
            return i.toString();
        }
    
        // 使用ReplyTemplate方式不同于@SendTo方式,@SendTo是直接将监听方法的返回值转发对应的Topic中
        // 而ReplyTemplate也是将监听方法的返回值转发Topic中,但转发Topic成功后,会被请求者消费。
        @KafkaListener(id = "listenReply2", topics = {"topic-filtered"}, containerFactory = "sendToContainerFactory")
        // 消息转发 方式2.将 需要转发的消息 放入到 ContainerFactory自定义的内部的一个topic, 消息生产者将从这个topic中异步的poll这个处理之后的消息
        @SendTo
        public String listenReply2(ConsumerRecord<?, ?> record) {
            System.err.println(TestUtil.format("想要找回来 >>> {}", JsonUtil.toText(record)));
            return TestUtil.format("已处理消息 >>> {}", record.value());
        }
    
        // @KafkaListener这个注解所标注的方法并没有在IOC容器中注册为Bean,而是会被注册在KafkaListenerEndpointRegistry中
        @Resource
        private KafkaListenerEndpointRegistry registry;
    
        @DisplayName("不马上启动监听")
        @Test
        public void t2() throws InterruptedException {
            TimeUnit.MINUTES.sleep(3);
            startListener();
            TimeUnit.MINUTES.sleep(2);
            shutDownListener();
            TimeUnit.MINUTES.sleep(1);
        }
    
        // TODO 真的尝试了很多招,问题还是 —— 监听器还是会自启动....
    //    @KafkaListener(id = "timingConsumer",topics = {"topic-delay"}, autoStartup = "false")
        @KafkaListener(id = "timingConsumer",topics = {"topic-delay"}, containerFactory = "delayContainerFactory", autoStartup = "false", properties={
            "enable.auto.commit:false"
        })
        public void listenDelay(ConsumerRecord<?, ?> record, Acknowledgment ack) {
            System.err.println(TestUtil.format("冬天仿佛不再留念 >>> {}", JsonUtil.toText(record)));
            // 手动提交
            ack.acknowledge();
            // 手工失败
    //        ack.nack();
        }
    
        // 定时启动监听器
        private void startListener() {
            System.out.println("启动监听器...");
            // "timingConsumer"是@KafkaListener注解后面设置的监听器ID,标识这个监听器
            if (!registry.getListenerContainer("timingConsumer").isRunning()) {
                registry.getListenerContainer("timingConsumer").start();
            }
            registry.getListenerContainer("timingConsumer").resume();
        }
    
        // 定时停止监听器
        private void shutDownListener() {
            System.out.println("关闭监听器...");
            registry.getListenerContainer("timingConsumer").pause();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
  • 相关阅读:
    Facebook的创新实验室:人工智能与新技术探索
    【数学建模】图论模型(基础理论+最大流与最小费用流问题)
    第二章 算法
    java计算机毕业设计学生宿舍管理系统源程序+mysql+系统+lw文档+远程调试
    设计模式-12-策略模式
    MFC 更改控件的大小和位置
    优化算法 - 学习率调度器
    按计划进行
    Linux安装Jenkins
    使用ffmpeg将一个目录下的mkv格式的视频文件转换成mp4格式
  • 原文地址:https://blog.csdn.net/weixin_43638238/article/details/126337917