在进行数据推送的时候,使用到了WebSocket
技术实现从服务端向客户端推送的机制,然而因为长连接的机制原因,连接会固定在一台服务器上,这时候数据产出后,需要在集群中广播以实现将数据推送给所有需要的用户。这个广播可以使用Redis
以及MQ
来实现。
这里因为数据产出侧的限制,会将数据发送到kafka,所以接入方也只好使用Kafka来消费数据。
但是有一个问题就是,kafka不支持广播模式,需要自己想办法实现。
在Kafka中,同一个消费者组内的消费者是竞争消费,一个消息只能被一个消费者消费。
从这个角度出发,我们就可以将集群中的每个服务的消费者都在不同的组内,即可实现集群内的广播。
而这个消费者组随机的话,则会导致监控困难。
在一些云平台,比如阿里云,消费者组需要提前创建,这对于随机消费组来说就不太友好。即使云平台上支持随机消费者组,那样就无法监控统计到消费情况,这显然也不符合项目的运维要求。
在Kafka中,assign模式是指放弃消费者集群,直接订阅分区,所有消费者都订阅指定分区,也可以实现分区内消息的广播消费。
![企业微信截图_16469847773808.png](https://img-blog.csdnimg.cn/img_convert/62235f34bbacd8a38fa0e0c28bbb05a0.png#clientId=u3532dcd9-9025-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=334&id=ua84935e3&margin=[object Object]&name=企业微信截图_16469847773808.png&originHeight=667&originWidth=576&originalType=binary&ratio=1&rotation=0&showTitle=false&size=231219&status=done&style=none&taskId=ub4e19171-6e81-4706-b48c-beedec921f4&title=&width=288)
既然Kafka对于广播的支持不那么友好,那么将消息转发到其他其他消息队列,比如RocketMQ
也不失为一种方案。
这种转发可以自己编码实现,也可以借由云平台的产品。目前阿里云公测的Connector支持Kafka和其他云服务之间数据同步。
依赖pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0modelVersion>
<parent>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-parentartifactId>
<version>2.7.2version>
<relativePath/>
parent>
<groupId>com.examplegroupId>
<artifactId>kafka-broadcastartifactId>
<version>0.0.1-SNAPSHOTversion>
<name>kafka-broadcastname>
<description>kafka-broadcastdescription>
<properties>
<java.version>1.8java.version>
properties>
<dependencies>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starterartifactId>
dependency>
<dependency>
<groupId>org.springframework.kafkagroupId>
<artifactId>spring-kafkaartifactId>
dependency>
<dependency>
<groupId>org.projectlombokgroupId>
<artifactId>lombokartifactId>
<optional>trueoptional>
dependency>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-testartifactId>
<scope>testscope>
dependency>
dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-maven-pluginartifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombokgroupId>
<artifactId>lombokartifactId>
exclude>
excludes>
configuration>
plugin>
plugins>
build>
project>
启动时,保证每个Listener的groupId是唯一且随机的即可。
代码实现可参考:
@KafkaListener(topics = "broadcast_test",groupId = "broadcast_test_group" + "#{T(java.util.UUID).randomUUID()})")
public void consume(String data) {
System.out.println("消费者1消费到消息:" + data);
}
assign模式在网上的资料偏少,原生kafka-client中可以遍历topic下的partition,然后
consume.assign(partition);
即可。
在spring-kafka下,我们一般都直接使用@KafkaListener
。
那么在这个注解里,可不可以使用assign呢?
经过一番代码查找,发现是可行的
在这个注解里有这么一个属性
/**
* The topicPartitions for this listener when using manual topic/partition
* assignment.
*
* Mutually exclusive with {@link #topicPattern()} and {@link #topics()}.
* @return the topic names or expressions (SpEL) to listen to.
*/
TopicPartition[] topicPartitions() default {};
意思是这个属性设置了assign模式需要订阅的分区,那么是不是配了这个就可以了呢?
翻看代码
在org.springframework.kafka.listener.KafkaMessageListenerContainer
类中
@Override
protected void doStart() {
if (isRunning()) {
return;
}
if (this.clientIdSuffix == null) { // stand-alone container
checkTopics();
}
ContainerProperties containerProperties = getContainerProperties();
checkAckMode(containerProperties);
Object messageListener = containerProperties.getMessageListener();
AsyncListenableTaskExecutor consumerExecutor = containerProperties.getConsumerTaskExecutor();
if (consumerExecutor == null) {
consumerExecutor = new SimpleAsyncTaskExecutor(
(getBeanName() == null ? "" : getBeanName()) + "-C-");
containerProperties.setConsumerTaskExecutor(consumerExecutor);
}
GenericMessageListener<?> listener = (GenericMessageListener<?>) messageListener;
ListenerType listenerType = determineListenerType(listener);
// 新建消费者
this.listenerConsumer = new ListenerConsumer(listener, listenerType);
setRunning(true);
this.startLatch = new CountDownLatch(1);
this.listenerConsumerFuture = consumerExecutor
.submitListenable(this.listenerConsumer);
try {
if (!this.startLatch.await(containerProperties.getConsumerStartTimeout().toMillis(), TimeUnit.MILLISECONDS)) {
this.logger.error("Consumer thread failed to start - does the configured task executor "
+ "have enough threads to support all containers and concurrency?");
publishConsumerFailedToStart();
}
}
catch (@SuppressWarnings(UNUSED) InterruptedException e) {
Thread.currentThread().interrupt();
}
}
ListenerConsumer(GenericMessageListener<?> listener, ListenerType listenerType) {
Properties consumerProperties = propertiesFromProperties();
checkGroupInstance(consumerProperties, KafkaMessageListenerContainer.this.consumerFactory);
this.autoCommit = determineAutoCommit(consumerProperties);
this.consumer =
KafkaMessageListenerContainer.this.consumerFactory.createConsumer(
this.consumerGroupId,
this.containerProperties.getClientId(),
KafkaMessageListenerContainer.this.clientIdSuffix,
consumerProperties);
this.clientId = determineClientId();
this.transactionTemplate = determineTransactionTemplate();
this.genericListener = listener;
this.consumerSeekAwareListener = checkConsumerSeekAware(listener);
this.commitCurrentOnAssignment = determineCommitCurrent(consumerProperties,
KafkaMessageListenerContainer.this.consumerFactory.getConfigurationProperties());
// 判断是订阅还是监听
subscribeOrAssignTopics(this.consumer);
if (listener instanceof BatchMessageListener) {
this.listener = null;
this.batchListener = (BatchMessageListener<K, V>) listener;
this.isBatchListener = true;
this.wantsFullRecords = this.batchListener.wantsPollResult();
this.pollThreadStateProcessor = setUpPollProcessor(true);
}
else if (listener instanceof MessageListener) {
this.listener = (MessageListener<K, V>) listener;
this.batchListener = null;
this.isBatchListener = false;
this.wantsFullRecords = false;
this.pollThreadStateProcessor = setUpPollProcessor(false);
}
else {
throw new IllegalArgumentException("Listener must be one of 'MessageListener', "
+ "'BatchMessageListener', or the variants that are consumer aware and/or "
+ "Acknowledging"
+ " not " + listener.getClass().getName());
}
this.listenerType = listenerType;
this.isConsumerAwareListener = listenerType.equals(ListenerType.ACKNOWLEDGING_CONSUMER_AWARE)
|| listenerType.equals(ListenerType.CONSUMER_AWARE);
this.commonErrorHandler = determineCommonErrorHandler();
Assert.state(!this.isBatchListener || !this.isRecordAck,
"Cannot use AckMode.RECORD with a batch listener");
if (this.containerProperties.getScheduler() != null) {
this.taskScheduler = this.containerProperties.getScheduler();
this.taskSchedulerExplicitlySet = true;
}
else {
ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
threadPoolTaskScheduler.initialize();
this.taskScheduler = threadPoolTaskScheduler;
}
this.monitorTask = this.taskScheduler.scheduleAtFixedRate(this::checkConsumer, // NOSONAR
Duration.ofSeconds(this.containerProperties.getMonitorInterval()));
if (this.containerProperties.isLogContainerConfig()) {
this.logger.info(toString());
}
Map<String, Object> props = KafkaMessageListenerContainer.this.consumerFactory.getConfigurationProperties();
this.checkNullKeyForExceptions = this.containerProperties.isCheckDeserExWhenKeyNull()
|| checkDeserializer(findDeserializerClass(props, consumerProperties, false));
this.checkNullValueForExceptions = this.containerProperties.isCheckDeserExWhenValueNull()
|| checkDeserializer(findDeserializerClass(props, consumerProperties, true));
this.syncCommitTimeout = determineSyncCommitTimeout();
if (this.containerProperties.getSyncCommitTimeout() == null) {
// update the property so we can use it directly from code elsewhere
this.containerProperties.setSyncCommitTimeout(this.syncCommitTimeout);
if (KafkaMessageListenerContainer.this.thisOrParentContainer != null) {
KafkaMessageListenerContainer.this.thisOrParentContainer
.getContainerProperties()
.setSyncCommitTimeout(this.syncCommitTimeout);
}
}
this.maxPollInterval = obtainMaxPollInterval(consumerProperties);
this.micrometerHolder = obtainMicrometerHolder();
this.deliveryAttemptAware = setupDeliveryAttemptAware();
this.subBatchPerPartition = setupSubBatchPerPartition();
this.lastReceivePartition = new HashMap<>();
this.lastAlertPartition = new HashMap<>();
this.wasIdlePartition = new HashMap<>();
}
private void subscribeOrAssignTopics(final Consumer<? super K, ? super V> subscribingConsumer) {
// 如果没有配置topicPartitions属性
if (KafkaMessageListenerContainer.this.topicPartitions == null) {
ConsumerRebalanceListener rebalanceListener = new ListenerConsumerRebalanceListener();
Pattern topicPattern = this.containerProperties.getTopicPattern();
if (topicPattern != null) {
subscribingConsumer.subscribe(topicPattern, rebalanceListener);
}
else {
subscribingConsumer.subscribe(Arrays.asList(this.containerProperties.getTopics()), // NOSONAR
rebalanceListener);
}
}
else {
List<TopicPartitionOffset> topicPartitionsToAssign =
Arrays.asList(KafkaMessageListenerContainer.this.topicPartitions);
this.definedPartitions = new LinkedHashMap<>(topicPartitionsToAssign.size());
for (TopicPartitionOffset topicPartition : topicPartitionsToAssign) {
this.definedPartitions.put(topicPartition.getTopicPartition(),
new OffsetMetadata(topicPartition.getOffset(), topicPartition.isRelativeToCurrent(),
topicPartition.getPosition()));
}
// 监听指定分区
subscribingConsumer.assign(new ArrayList<>(this.definedPartitions.keySet()));
}
}
从这几块代码可以看出,如果@KafkaListener
中的属性topicPartitions
有值,则使用assign模式。
所以代码实现为
@KafkaListener(topics = "broadcat_test",groupId = "broadcat_test_group",
topicPartitions = {@TopicPartition(topic = "broadcat_test_sanjiu", partitions = "0")})
public void consume(String data) {
System.out.println("消费者1消费到消息:" + data);
}
随机消费者组
assign模式
转发到其他消息队列
Slicenfer
2022/07/29