无论是kafka,还是RocketMQ,rabbitMQ等,与springboot的结合得益于spring的强大,使得变的非常easy,但依然知识简单的使用变的非常容易,如果要达到理想的结果,不仅需要他们对原理熟悉一点,还要对spring提供的sdk熟悉,下面就看一下kafka的使用,以及需要解决的一些问题。
<!--kafka-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.2</version>
</dependency>
/**
* 消费者监听.
*
* @param message 消息内容
* @param ack ack
*/
@KafkaListener(topics = {"test_topic"})
public void listener(String message) {
//消费落库
}
spring:
kafka:
bootstrap-servers: localhost:9092
通过以上三步,基本上就可以成功消费到。
如果你在公司写这样的代码,肯定要被吐槽的,因为这段代码和配置,只能简单的消费,并不能解决消息丢失,重复消费,并发消费,消费能力不足或者浪费资源等问题。
接下来一一改造成理想的样子。
生产者层面,Kafka消息发送有两种方式:
同步(sync)和异步(async),默认是同步方式,可通过 producer.type)属性进行配置。Kafka通过配置request.required.acks属性来确认消息的生产:
这是发送消息阶段需要根据需要去配置的。可以配置-1.但是效率是最低的。
当然还有另一种情况就属于与业务层面,消费后kafka的offset被自动提交了,但实际上业务并没有成功消费。
针对这种情况,可以设置手动提交,配置enable.auto.commit为false。手动 去提交offset,代码改造为:
@KafkaListener(topics = {"test_topic"})
public void listener(final String message, final Acknowledgment ack) {
//消费业务代码
//...
//提交offset
ack.acknowledge();
}
先看一下设置为手动提交offset后,产生的三种情况:
接下来根据情况来解决
手动提交offset,如果消费的时候业务代码没有完全执行结束,导致偏移量没有提交;
经过测试,如果消费业务代码出现异常导致ack.acknowledge()没有执行,kakaf会重试多次进行消费。
此时我们的业务代码就要处理这种插入数据的场景产生的重复数据落到数据库里;
第一种解决办法就是在insert的时候使用INSERT INTO ...ON DUPLICATE KEY UPDATE语法,不存在时插入,存在时更新,是天然支持幂等性的。
第二种解决办法,就是通过redis,根据业务的唯一键来存储到redis,每次消费时判断是否消费过,但一定要设置一个过期时间。
第三种情况是,如果你的消费者的concurrency设置的是1,没有并发的情况,那你可以先查库判断库里面是否有,再进行插入。(concurrency相当于消费线程,也相当于消费者,机器数量*concurrency <= 分区数)
消费端重复发送了
此时也可以用上面第一种所描述的方案
其实不管那种情况导致的重复消息,解决方案在业务里是一成不变的。
本文由 mdnice 多平台发布