• SpringBoot整合Kafka并实现广播模式


    环境说明

    • IDEA 2022.2
    • JDK 8.0.312
    • Mac OS 13 beta 4
    • SpringBoot 2.7.2

    需求背景

    在进行数据推送的时候,使用到了WebSocket技术实现从服务端向客户端推送的机制,然而因为长连接的机制原因,连接会固定在一台服务器上,这时候数据产出后,需要在集群中广播以实现将数据推送给所有需要的用户。这个广播可以使用Redis以及MQ来实现。
    这里因为数据产出侧的限制,会将数据发送到kafka,所以接入方也只好使用Kafka来消费数据。
    但是有一个问题就是,kafka不支持广播模式,需要自己想办法实现。

    方案分析

    随机消费组

    在Kafka中,同一个消费者组内的消费者是竞争消费,一个消息只能被一个消费者消费。
    从这个角度出发,我们就可以将集群中的每个服务的消费者都在不同的组内,即可实现集群内的广播。
    而这个消费者组随机的话,则会导致监控困难。
    在一些云平台,比如阿里云,消费者组需要提前创建,这对于随机消费组来说就不太友好。即使云平台上支持随机消费者组,那样就无法监控统计到消费情况,这显然也不符合项目的运维要求。

    assign模式

    在Kafka中,assign模式是指放弃消费者集群,直接订阅分区,所有消费者都订阅指定分区,也可以实现分区内消息的广播消费。
    ![企业微信截图_16469847773808.png](https://img-blog.csdnimg.cn/img_convert/62235f34bbacd8a38fa0e0c28bbb05a0.png#clientId=u3532dcd9-9025-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=334&id=ua84935e3&margin=[object Object]&name=企业微信截图_16469847773808.png&originHeight=667&originWidth=576&originalType=binary&ratio=1&rotation=0&showTitle=false&size=231219&status=done&style=none&taskId=ub4e19171-6e81-4706-b48c-beedec921f4&title=&width=288)

    转发到支持广播的消息队列

    既然Kafka对于广播的支持不那么友好,那么将消息转发到其他其他消息队列,比如RocketMQ也不失为一种方案。
    这种转发可以自己编码实现,也可以借由云平台的产品。目前阿里云公测的Connector支持Kafka和其他云服务之间数据同步。

    代码实现

    依赖pom.xml

    
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0modelVersion>
        <parent>
            <groupId>org.springframework.bootgroupId>
            <artifactId>spring-boot-starter-parentartifactId>
            <version>2.7.2version>
            <relativePath/> 
        parent>
        <groupId>com.examplegroupId>
        <artifactId>kafka-broadcastartifactId>
        <version>0.0.1-SNAPSHOTversion>
        <name>kafka-broadcastname>
        <description>kafka-broadcastdescription>
        <properties>
            <java.version>1.8java.version>
        properties>
        <dependencies>
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starterartifactId>
            dependency>
            <dependency>
                <groupId>org.springframework.kafkagroupId>
                <artifactId>spring-kafkaartifactId>
            dependency>
    
            <dependency>
                <groupId>org.projectlombokgroupId>
                <artifactId>lombokartifactId>
                <optional>trueoptional>
            dependency>
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-testartifactId>
                <scope>testscope>
            dependency>
        dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.bootgroupId>
                    <artifactId>spring-boot-maven-pluginartifactId>
                    <configuration>
                        <excludes>
                            <exclude>
                                <groupId>org.projectlombokgroupId>
                                <artifactId>lombokartifactId>
                            exclude>
                        excludes>
                    configuration>
                plugin>
            plugins>
        build>
    
    project>
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59

    随机消费者组

    启动时,保证每个Listener的groupId是唯一且随机的即可。
    代码实现可参考:

        @KafkaListener(topics = "broadcast_test",groupId = "broadcast_test_group" + "#{T(java.util.UUID).randomUUID()})")
        public void consume(String data) {
            System.out.println("消费者1消费到消息:" + data);
        }
    
    • 1
    • 2
    • 3
    • 4

    assign模式

    assign模式在网上的资料偏少,原生kafka-client中可以遍历topic下的partition,然后

    consume.assign(partition);
    
    • 1

    即可。
    在spring-kafka下,我们一般都直接使用@KafkaListener
    那么在这个注解里,可不可以使用assign呢?
    经过一番代码查找,发现是可行的
    在这个注解里有这么一个属性

    	/**
    	 * The topicPartitions for this listener when using manual topic/partition
    	 * assignment.
    	 * 

    * Mutually exclusive with {@link #topicPattern()} and {@link #topics()}. * @return the topic names or expressions (SpEL) to listen to. */ TopicPartition[] topicPartitions() default {};

    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    意思是这个属性设置了assign模式需要订阅的分区,那么是不是配了这个就可以了呢?
    翻看代码
    org.springframework.kafka.listener.KafkaMessageListenerContainer类中

    @Override
    	protected void doStart() {
    		if (isRunning()) {
    			return;
    		}
    		if (this.clientIdSuffix == null) { // stand-alone container
    			checkTopics();
    		}
    		ContainerProperties containerProperties = getContainerProperties();
    		checkAckMode(containerProperties);
    
    		Object messageListener = containerProperties.getMessageListener();
    		AsyncListenableTaskExecutor consumerExecutor = containerProperties.getConsumerTaskExecutor();
    		if (consumerExecutor == null) {
    			consumerExecutor = new SimpleAsyncTaskExecutor(
    					(getBeanName() == null ? "" : getBeanName()) + "-C-");
    			containerProperties.setConsumerTaskExecutor(consumerExecutor);
    		}
    		GenericMessageListener<?> listener = (GenericMessageListener<?>) messageListener;
    		ListenerType listenerType = determineListenerType(listener);
            // 新建消费者
    		this.listenerConsumer = new ListenerConsumer(listener, listenerType);
    		setRunning(true);
    		this.startLatch = new CountDownLatch(1);
    		this.listenerConsumerFuture = consumerExecutor
    				.submitListenable(this.listenerConsumer);
    		try {
    			if (!this.startLatch.await(containerProperties.getConsumerStartTimeout().toMillis(), TimeUnit.MILLISECONDS)) {
    				this.logger.error("Consumer thread failed to start - does the configured task executor "
    						+ "have enough threads to support all containers and concurrency?");
    				publishConsumerFailedToStart();
    			}
    		}
    		catch (@SuppressWarnings(UNUSED) InterruptedException e) {
    			Thread.currentThread().interrupt();
    		}
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    ListenerConsumer(GenericMessageListener<?> listener, ListenerType listenerType) {
    			Properties consumerProperties = propertiesFromProperties();
    			checkGroupInstance(consumerProperties, KafkaMessageListenerContainer.this.consumerFactory);
    			this.autoCommit = determineAutoCommit(consumerProperties);
    			this.consumer =
    					KafkaMessageListenerContainer.this.consumerFactory.createConsumer(
    							this.consumerGroupId,
    							this.containerProperties.getClientId(),
    							KafkaMessageListenerContainer.this.clientIdSuffix,
    							consumerProperties);
    
    			this.clientId = determineClientId();
    			this.transactionTemplate = determineTransactionTemplate();
    			this.genericListener = listener;
    			this.consumerSeekAwareListener = checkConsumerSeekAware(listener);
    			this.commitCurrentOnAssignment = determineCommitCurrent(consumerProperties,
    					KafkaMessageListenerContainer.this.consumerFactory.getConfigurationProperties());
                // 判断是订阅还是监听
    			subscribeOrAssignTopics(this.consumer);
    			if (listener instanceof BatchMessageListener) {
    				this.listener = null;
    				this.batchListener = (BatchMessageListener<K, V>) listener;
    				this.isBatchListener = true;
    				this.wantsFullRecords = this.batchListener.wantsPollResult();
    				this.pollThreadStateProcessor = setUpPollProcessor(true);
    			}
    			else if (listener instanceof MessageListener) {
    				this.listener = (MessageListener<K, V>) listener;
    				this.batchListener = null;
    				this.isBatchListener = false;
    				this.wantsFullRecords = false;
    				this.pollThreadStateProcessor = setUpPollProcessor(false);
    			}
    			else {
    				throw new IllegalArgumentException("Listener must be one of 'MessageListener', "
    						+ "'BatchMessageListener', or the variants that are consumer aware and/or "
    						+ "Acknowledging"
    						+ " not " + listener.getClass().getName());
    			}
    			this.listenerType = listenerType;
    			this.isConsumerAwareListener = listenerType.equals(ListenerType.ACKNOWLEDGING_CONSUMER_AWARE)
    					|| listenerType.equals(ListenerType.CONSUMER_AWARE);
    			this.commonErrorHandler = determineCommonErrorHandler();
    			Assert.state(!this.isBatchListener || !this.isRecordAck,
    					"Cannot use AckMode.RECORD with a batch listener");
    			if (this.containerProperties.getScheduler() != null) {
    				this.taskScheduler = this.containerProperties.getScheduler();
    				this.taskSchedulerExplicitlySet = true;
    			}
    			else {
    				ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
    				threadPoolTaskScheduler.initialize();
    				this.taskScheduler = threadPoolTaskScheduler;
    			}
    			this.monitorTask = this.taskScheduler.scheduleAtFixedRate(this::checkConsumer, // NOSONAR
    					Duration.ofSeconds(this.containerProperties.getMonitorInterval()));
    			if (this.containerProperties.isLogContainerConfig()) {
    				this.logger.info(toString());
    			}
    			Map<String, Object> props = KafkaMessageListenerContainer.this.consumerFactory.getConfigurationProperties();
    			this.checkNullKeyForExceptions = this.containerProperties.isCheckDeserExWhenKeyNull()
    					|| checkDeserializer(findDeserializerClass(props, consumerProperties, false));
    			this.checkNullValueForExceptions = this.containerProperties.isCheckDeserExWhenValueNull()
    					|| checkDeserializer(findDeserializerClass(props, consumerProperties, true));
    			this.syncCommitTimeout = determineSyncCommitTimeout();
    			if (this.containerProperties.getSyncCommitTimeout() == null) {
    				// update the property so we can use it directly from code elsewhere
    				this.containerProperties.setSyncCommitTimeout(this.syncCommitTimeout);
    				if (KafkaMessageListenerContainer.this.thisOrParentContainer != null) {
    					KafkaMessageListenerContainer.this.thisOrParentContainer
    							.getContainerProperties()
    							.setSyncCommitTimeout(this.syncCommitTimeout);
    				}
    			}
    			this.maxPollInterval = obtainMaxPollInterval(consumerProperties);
    			this.micrometerHolder = obtainMicrometerHolder();
    			this.deliveryAttemptAware = setupDeliveryAttemptAware();
    			this.subBatchPerPartition = setupSubBatchPerPartition();
    			this.lastReceivePartition = new HashMap<>();
    			this.lastAlertPartition = new HashMap<>();
    			this.wasIdlePartition = new HashMap<>();
    		}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    private void subscribeOrAssignTopics(final Consumer<? super K, ? super V> subscribingConsumer) {
                // 如果没有配置topicPartitions属性
    			if (KafkaMessageListenerContainer.this.topicPartitions == null) {
    				ConsumerRebalanceListener rebalanceListener = new ListenerConsumerRebalanceListener();
    				Pattern topicPattern = this.containerProperties.getTopicPattern();
    				if (topicPattern != null) {
    					subscribingConsumer.subscribe(topicPattern, rebalanceListener);
    				}
    				else {
    					subscribingConsumer.subscribe(Arrays.asList(this.containerProperties.getTopics()), // NOSONAR
    							rebalanceListener);
    				}
    			}
    			else {
    				List<TopicPartitionOffset> topicPartitionsToAssign =
    						Arrays.asList(KafkaMessageListenerContainer.this.topicPartitions);
    				this.definedPartitions = new LinkedHashMap<>(topicPartitionsToAssign.size());
    				for (TopicPartitionOffset topicPartition : topicPartitionsToAssign) {
    					this.definedPartitions.put(topicPartition.getTopicPartition(),
    							new OffsetMetadata(topicPartition.getOffset(), topicPartition.isRelativeToCurrent(),
    									topicPartition.getPosition()));
    				}
                    // 监听指定分区
    				subscribingConsumer.assign(new ArrayList<>(this.definedPartitions.keySet()));
    			}
    		}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    从这几块代码可以看出,如果@KafkaListener中的属性topicPartitions有值,则使用assign模式。
    所以代码实现为

     @KafkaListener(topics = "broadcat_test",groupId = "broadcat_test_group",
                topicPartitions = {@TopicPartition(topic = "broadcat_test_sanjiu", partitions = "0")})
        public void consume(String data) {
            System.out.println("消费者1消费到消息:" + data);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    总结

    1. 随机消费者组

      1. 优点:简单
      2. 缺点:后期监控统计困难
    2. assign模式

      1. 优点:简单
      2. 缺点:放弃集群特性,topic分区动态扩容特性等,对于kafka的使用意义大大降低
    3. 转发到其他消息队列

      1. 优点:可以使用支持广播的消息队列完成
      2. 缺点:引入新的中间件,增加架构复杂度以及项目风险

      Slicenfer
      2022/07/29

  • 相关阅读:
    APS高级排产助取暖器企业实现生产计划管理效率的提升
    甲方需求被公司明确指示不能做,身为公司项目经理,怎么处理?
    c++ 正则表达式的若干难查问题
    IdentityServer4实战详解(HyBrid混合模式篇)
    二维数组的最小路径和问题
    Redis缓存序列化配置
    常用数据结构 ——— 队列(环形队列和顺序队列)
    d应扩展__traits(parameters)的使用范围
    【cv】图像预处理技术——从特征检测讲述图像预处理理论、实践、应用|01
    通过空间占用和执行计划了解SQL Server的行存储索引
  • 原文地址:https://blog.csdn.net/cslucifer/article/details/126057304