一、创建项目并导入pom依赖
- <dependency>
- <groupId>org.springframework.kafkagroupId>
- <artifactId>spring-kafkaartifactId>
- dependency>
二、修改application.yml配置
1. producer 生产端的配置
- spring:
- #重要提示:kafka配置,该配置属性将直接注入到KafkaTemplate中
- kafka:
- bootstrap-servers: 192.168.168.160:9092
2. consumer 消费端的配置,需要给consumer配置一个group-id
- spring:
- #重要提示:kafka配置,该配置属性将直接注入到KafkaTemplate中
- kafka:
- bootstrap-servers: 192.168.168.160:9092
- #https://kafka.apache.org/documentation/#consumerconfigs
- consumer:
- group-id: auto-dev #消费者组
三、生产者生产消息,消费者消费消息
1. 简单消费
producer生产者中使用自动注入的方式创建KafkaTemplate 对象
- @Autowired
- private KafkaTemplate
kafkaTemplate; -
- @Test
- void sendMessage(){
- // 第一个参数为topic,第二个为消息体
- kafkaTemplate.send("ifun","hello");
- }
consumer消费消息,使用@KafkaListener注解监听topic为ifun中的消息,可以监听多个topic
- @Component
- @Slf4j
- public class ConsumerListener {
- // 消费监听
- @KafkaListener(topics = {"ifun"})
- public void onMessage(ConsumerRecord
record) { - // 消费的哪个topic、partition的消息,打印出消息内容
- log.info("topic:{},partition:{},消息:{}",record.topic(),record.partition(),record.value());
- }
- }
2. 带回调的生产者,两种方式
- @Test
- void sendCallBackMessageOne(){
- kafkaTemplate.send("ifun","hello callback one").addCallback(success -> {
- // 消息发送到的topic
- String topic = success.getRecordMetadata().topic();
- // 消息发送到的分区
- int partition = success.getRecordMetadata().partition();
- // 消息在分区内的offset
- long offset = success.getRecordMetadata().offset();
- log.info("send success:topic:{} partition:{} offset:{}",topic,partition,offset);
- }, failure -> {
- log.info("send fail:message:{} ", failure.getMessage());
- });
- }
-
- @Test
- void sendCallBackMessageTwo(){
- kafkaTemplate.send("ifun", "hello callback two").addCallback(new ListenableFutureCallback
>() { - @Override
- public void onFailure(Throwable ex) {
- log.info("send fail:message:{} ", ex.getMessage());
- }
- @Override
- public void onSuccess(SendResult
result) { - String topic = result.getRecordMetadata().topic();
- int partition = result.getRecordMetadata().partition();
- long offset = result.getRecordMetadata().offset();
- log.info("send success:topic:{} partition:{} offset:{}",topic,partition,offset);
- }
- });
- }
回调补充,全局回调,需要继承ProducerListener,并重写onSuccess和onError方法
- @Component
- @Slf4j
- public class KafkaSendResultHandler implements ProducerListener {
-
- @Override
- public void onSuccess(ProducerRecord producerRecord,
- RecordMetadata recordMetadata) {
- String topic = recordMetadata.topic();
- int partition = recordMetadata.partition();
- long offset = recordMetadata.offset();
- log.info("send success:topic:{} partition:{} offset:{}",topic,partition,offset);
- }
-
- @Override
- public void onError(ProducerRecord producerRecord, RecordMetadata recordMetadata, Exception exception) {
- log.info("send fail : {}", exception.getMessage());
- }
- }
3. 配置自定义分区策略
application.yml中需要指定分区策略的class
- spring:
- #重要提示:kafka配置,该配置属性将直接注入到KafkaTemplate中
- kafka:
- bootstrap-servers: 192.168.168.160:9092
- #https://kafka.apache.org/documentation/#producerconfigs
- producer:
- properties:
- partitioner.class: com.ifun.kafka.producer.config.CustomPartitioner
分区类的实现
- @Component
- @Slf4j
- public class CustomPartitioner implements Partitioner {
-
- @Override
- public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
- // 自定义分区规则(这里假设全部发到0号分区)
- log.info("自定义分区策略 topic:{} key:{} value:{}",topic,key,value.toString());
- return 0;
- }
-
- @Override
- public void close() {
-
- }
-
- @Override
- public void configure(Map
configs) { -
- }
- }
4. kafka事务提交
如果在发送消息的时候需要创建事务,可以使用KafkaTemplate的executeInTransaction方法来声明事务。
application.yml增加transaction配置
- java.lang.IllegalStateException: Producer factory does not support transactions
-
- org.apache.kafka.common.config.ConfigException: Must set acks to all in order to use the idempotent producer. Otherwise we cannot guarantee idempotence.
-
- java.lang.IllegalStateException: No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record
-
- 第一个异常是你没有配置transactions
- 第二个异常是因为你配置的acks不为all
- 第三个是正常的send方法,但是抛异常了,需要加@Transactional 注解
-
- spring:
- #重要提示:kafka配置,该配置属性将直接注入到KafkaTemplate中
- kafka:
- bootstrap-servers: 192.168.168.160:9092
- #https://kafka.apache.org/documentation/#producerconfigs
- producer:
- properties:
- partitioner.class: com.ifun.kafka.producer.config.CustomPartitioner
- acks: all
- transaction-id-prefix: "IFUN_TX"
发送消息代码
- @Test
- @Transactional
- void sendWithException(){
- kafkaTemplate.send("ifun","不带事务提交!");
- kafkaTemplate.executeInTransaction(oper->{
- oper.send("ifun","带事务的提交");
- throw new RuntimeException("fail 1");
- });
- throw new RuntimeException("fail 2");
- }
带事务的提交消息发送失败
不带事务的消息被成功消费
5. 消费者配置更详细的配置
@KafkaListener注解说明:
- @KafkaListener(id = "ifun-001",groupId = "ifun-01", topicPartitions={
- @TopicPartition(topic = "ifun1",partitions = {"0"}),
- @TopicPartition(topic = "ifun2",
- partitions = {"0"},
- partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "8"))
- }
- )
- public void onTopicsMessage(ConsumerRecord
record) { - log.info("topic:{},partition:{},消息:{}",record.topic(),record.partition(),record.value());
- }
解释:这里定义了消费者id为ifun-001,消费者组id为ifun-01,同时监听两个topic,ifun1和ifun2,其中监听ifun1的0号分区,ifun2的0号和1号分区,其中1号分区开始的offset为8,也就是说如果next-offset大于8就会消费,小于8不会消费。
6. 消费者批量消费
需要在application.yml中开启批量消费,sping.kafka.listener.type: batch 监听类型为batch,spring.kafka.consumer.max-poll-records 批量消费每次最多消费多少条消息,接收消息的时候需要使用List来接收。
- spring:
- #重要提示:kafka配置,该配置属性将直接注入到KafkaTemplate中
- kafka:
- bootstrap-servers: 192.168.168.160:9092
- #https://kafka.apache.org/documentation/#consumerconfigs
- listener:
- #batch single
- type: batch
- consumer:
- group-id: auto-dev #消费者组
- max-poll-records: 3
- @KafkaListener(topics = {"ifun"})
- public void onBatchMessage(List
> records) { - log.info("批量消费");
- for (ConsumerRecord
record : records) { - log.info("topic:{},partition:{},消息:{}",record.topic(),record.partition(),record.value());
- }
- }
7. 消费端手动ack
设置spring.kafka.consumer.enable-auto-commit 为false 的时候 spring.kafka.listener.ack-mode 才会生效,设置为手动的manual表示手动
- spring:
- #重要提示:kafka配置,该配置属性将直接注入到KafkaTemplate中
- kafka:
- bootstrap-servers: 192.168.168.160:9092
- #https://kafka.apache.org/documentation/#consumerconfigs
- listener:
- #batch single
- type: batch
- # 手动确认模式 RECORD, BATCH, TIME, COUNT, COUNT_TIME, MANUAL, MANUAL_IMMEDIATE;
- ack-mode: manual
- consumer:
- #消费者组 id
- group-id: auto-dev
- max-poll-records: 3
- #是否自动提交偏移量offset
- enable-auto-commit: false
消费代码
- @KafkaListener(topics = {"ifun"})
- public void onBatchMessage(List
> records, Acknowledgment ack) { - try{
- log.info("批量消费");
- for (ConsumerRecord
record : records) { - log.info("topic:{},partition:{},消息:{}",record.topic(),record.partition(),record.value());
- }
- }finally {
- ack.acknowledge();
- }
- }
如果没有ack,那么会出现如下情况:
9. 消费异常捕获
配置ConsumerAwareListenerErrorHandler 处理类,在listener上设置errorHandler属性为ConsumerAwareListenerErrorHandler的BeanName
- @Bean
- public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() {
- return (message, exception, consumer) -> {
- System.out.println("消费异常:"+message.getPayload());
- return null;
- };
- }
- @KafkaListener(topics = {"ifun"}, errorHandler = "consumerAwareErrorHandler")
- public void onBatchMessage(List
> records, Acknowledgment ack) { - try{
- log.info("批量消费");
- for (ConsumerRecord
record : records) { - log.info("topic:{},partition:{},消息:{}",record.topic(),record.partition(),record.value());
- throw new RuntimeException("消费异常");
- }
- }finally {
- ack.acknowledge();
- }
- }
显示结果如下:
10. 配置消息过滤器
消息过滤器可以在消息抵达consumer之前被拦截,在实际应用中,我们可以根据自己的业务逻辑,筛选出需要的信息再交由KafkaListener处理,不需要的消息则过滤掉。
需要为监听器工厂配置一个RecordFilterStrategy,返回true的时候消息将被抛弃,返回false会正常抵达监听器。
然后在监听器上设置containerFactory属性为配置的过滤器工厂类
- @Bean
- public ConcurrentKafkaListenerContainerFactory filterContainerFactory() {
- ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
- factory.setConsumerFactory(consumerFactory);
- // 被过滤的消息将被丢弃
- factory.setAckDiscarded(true);
- // 消息过滤策略
- factory.setRecordFilterStrategy(consumerRecord -> {
- if (Integer.parseInt(consumerRecord.value().toString()) % 2 == 0) {
- return false;
- }
- //返回true消息则被过滤
- return true;
- });
- return factory;
- }
- @KafkaListener(topics = {"ifun"},containerFactory = "filterContainerFactory")
- public void onMessage(ConsumerRecord
record) { - log.info("topic:{},partition:{},消息:{}",record.topic(),record.partition(),record.value());
- }
过滤结果
11. 消息转发
Topic A 收到消息后将消息转发给Topic B,使用@SendTo注解即可
- @KafkaListener(topics = {"ifun"})
- @SendTo("ifun1")
- public String onMessage(ConsumerRecord
record) { - log.info("topic {} 收到需要转发的消息:{}",record.topic(), record.value());
- return record.value()+" 【forward message】";
- }
-
- @KafkaListener(topics = {"ifun1"})
- public void onIFun1Message(ConsumerRecord
record) { - log.info("topic:{},partition:{},消息:{}",record.topic(),record.partition(),record.value());
- }
结果如下,可以看到消息先记过第一个topic,然后转发给了第二个topic
12. 设置json序列化方式生产和消费消息
消费端配置如下
- spring:
- #重要提示:kafka配置,该配置属性将直接注入到KafkaTemplate中
- kafka:
- bootstrap-servers: 192.168.168.160:9092
- #https://kafka.apache.org/documentation/#consumerconfigs
- consumer:
- #消费者组 id
- group-id: auto-dev
- key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
- properties:
- spring:
- json:
- trusted:
- # 配置json反序列化信任的包
- packages: '*'
生产端配置如下
- spring:
- #重要提示:kafka配置,该配置属性将直接注入到KafkaTemplate中
- kafka:
- bootstrap-servers: 192.168.168.160:9092
- #https://kafka.apache.org/documentation/#producerconfigs
- producer:
- #key的编解码方法
- key-serializer: org.apache.kafka.common.serialization.StringSerializer
- #value的编解码方法
- value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
编写消息实体类
- @Data
- public class UserInfo implements Serializable {
- private Long id;
- private String name;
- private Integer age;
- }
发送消息
- @Test
- void sendMessage(){
- UserInfo userInfo = new UserInfo();
- userInfo.setAge(21);
- userInfo.setId(1L);
- userInfo.setName("Jack");
- kafkaTemplate.send("ifun",userInfo);
- }
消费消息
- @KafkaListener(topics = {"ifun"})
- public void onMessage(UserInfo userInfo){
- log.info("消息:{}",userInfo);
- }
消费结果如下:
注意:发送的类要和消费的类的全类名一致才行,不能是类名一样,字段一样,但是包名不一样,这样会抛异常。
四、kafka其他配置
- spring:
- #重要提示:kafka配置,该配置属性将直接注入到KafkaTemplate中
- kafka:
- bootstrap-servers: xx.xx.xx.xx:9092
- #https://kafka.apache.org/documentation/#producerconfigs
- #生产者配置
- producer:
- bootstrap-servers: xx.xx.xx.xx:9092
- #生产者发送消息失败重试次数
- retries: 1
- # 同一批次内存大小(默认16K)
- batch-size: 16384
- #生产者内存缓存区大小(300M = 300*1024*1024)
- buffer-memory: 314572800
- #acks=0:无论成功还是失败,只发送一次。无需确认
- #acks=1:即只需要确认leader收到消息
- #acks=all或-1:ISR + Leader都确定收到
- acks: 1
- #key的编解码方法
- key-serializer: org.apache.kafka.common.serialization.StringSerializer
- #value的编解码方法
- value-serializer: org.apache.kafka.common.serialization.StringSerializer
- #开启事务,但是要求ack为all,否则无法保证幂等性
- #transaction-id-prefix: "IFUN_TX"
- #额外的,没有直接有properties对应的参数,将存放到下面这个Map对象中,一并初始化
- properties:
- #自定义拦截器,注意,这里是classes(先于分区器)
- interceptor.classes: cn.com.controller.TimeInterceptor
- #自定义分区器
- #partitioner.class: com.alibaba.cola.kafka.test.customer.inteceptor.MyPartitioner
- #即使达不到batch-size设定的大小,只要超过这个毫秒的时间,一样会发送消息出去
- linger.ms: 1000
- #最大请求大小,200M = 200*1024*1024
- max.request.size: 209715200
- #Producer.send()方法的最大阻塞时间(115秒)
- max.block.ms: 115000
- #该配置控制客户端等待请求响应的最长时间。
- #如果超时之前仍未收到响应,则客户端将在必要时重新发送请求,如果重试次数(retries)已用尽,则会使请求失败。
- #此值应大于replica.lag.time.max.ms(broker配置),以减少由于不必要的生产者重试而导致消息重复的可能性。
- request.timeout.ms: 115000
- #等待send回调的最大时间。常用语重试,如果一定要发送,retries则配Integer.MAX
- #如果超过该时间:TimeoutException: Expiring 1 record(s) .. has passed since batch creation
- delivery.timeout.ms: 120000
-
-
- #https://kafka.apache.org/documentation/#consumerconfigs
- #消费者配置
- consumer:
- bootstrap-servers: xx.xx.xx.xx:9092
- #消费者组id
- group-id: default-group
- #消费方式: earliest:从头开始消费 latest:从最新的开始消费,默认latest
- auto-offset-reset: earliest
- #是否自动提交偏移量offset
- enable-auto-commit: false
- #前提是 enable-auto-commit=true。自动提交的频率
- auto-commit-interval: 1s
- #key 解码方式
- key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- #value 解码方式
- value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- #最大消费记录数
- max-poll-records: 2
- properties:
- #如果在这个时间内没有收到心跳,该消费者会被踢出组并触发{组再平衡 rebalance}
- session.timeout.ms: 120000
- #最大消费时间。此决定了获取消息后提交偏移量的最大时间,超过设定的时间(默认5分钟),服务端也会认为该消费者失效。踢出并再平衡
- max.poll.interval.ms: 300000
- #配置控制客户端等待请求响应的最长时间。
- #如果在超时之前没有收到响应,客户端将在必要时重新发送请求,
- #或者如果重试次数用尽,则请求失败。
- request.timeout.ms: 60000
- #服务器返回的最大数据量,不能超过admin的message.max.bytes单条数据最大大小
- max.partition.fetch.bytes: 1048576
- #订阅或分配主题时,允许自动创建主题。0.11之前,必须设置false
- allow.auto.create.topics: true
- # 如果设置的json解码器,需要配置所信任的包名
- spring:
- json:
- trusted:
- packages: '*'
-
- #监听器配置
- listener:
- #当enable.auto.commit的值设置为false时,该值会生效;为true时不会生效
- #manual_immediate:需要手动调用Acknowledgment.acknowledge()后立即提交
- ack-mode: manual_immediate
- #如果至少有一个topic不存在,true启动失败。false忽略
- missing-topics-fatal: true
- #单条消费 single 批量消费batch
- #批量消费需要配合 consumer.max-poll-records
- type: batch
- #配置多少,就为为每个消费者实例创建多少个线程。多出分区的线程空闲
- concurrency: 2
-
- template:
- default-topic: "default-topic"
五、总结
kafka的简单使用就到此结束了,和rabbitmq还是有挺大的区别的。大家快去试试吧。