• 21 springboot集成kafka


    1. 配置文件
    spring.kafka.bootstrap-servers=CentOSA:9092,CentOSB:9092,CentOSC:9092
    
    spring.kafka.producer.retries=5
    spring.kafka.producer.acks=all
    spring.kafka.producer.batch-size=16384
    spring.kafka.producer.buffer-memory=33554432
    spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.producer.properties.enable.idempotence=true
    spring.kafka.producer.transaction-id-prefix=transaction-id-
    
    
    spring.kafka.consumer.group-id=group1
    spring.kafka.consumer.auto-offset-reset=earliest
    spring.kafka.consumer.enable-auto-commit=true
    spring.kafka.consumer.auto-commit-interval=100
    spring.kafka.consumer.properties.isolation.level=read_committed
    spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    
    spring.kafka.streams.application-id=wordcount_id
    spring.kafka.streams.client-id=app1
    spring.kafka.streams.auto-startup=true
    spring.kafka.streams.state-dir=/Users/admin/Desktop/checkpoint
    spring.kafka.streams.replication-factor=1
    spring.kafka.streams.properties.processing.guarantee=exactly_once
            
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    2. logback.xml
    
    <configuration>
        <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
            <encoder>
                <pattern>%p %d{yyyy-MM-dd HH:mm:ss} - %m%npattern>
                <charset>UTF-8charset>
            encoder>
        appender>
    
        
        <root level="ERROR">
            <appender-ref ref="STDOUT" />
        root>
    
        <logger name="org.springframework.kafka" level="INFO"  additivity="false">
            <appender-ref ref="STDOUT" />
        logger>
        <logger name="org.springframework.kafka.transaction" level="debug"  additivity="false">
             <appender-ref ref="STDOUT" />
        logger>
    configuration>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    3. application
    package com.baizhi;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    
    import org.apache.kafka.common.serialization.Serdes;
    import org.apache.kafka.common.utils.Bytes;
    import org.apache.kafka.streams.StreamsBuilder;
    import org.apache.kafka.streams.kstream.*;
    import org.apache.kafka.streams.state.KeyValueStore;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.Bean;
    import org.springframework.kafka.annotation.EnableKafka;
    import org.springframework.kafka.annotation.EnableKafkaStreams;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.kafka.annotation.KafkaListeners;
    import org.springframework.messaging.handler.annotation.SendTo;
    
    import java.io.IOException;
    import java.util.Arrays;
    import java.util.stream.Collectors;
    
    
    
    @SpringBootApplication
    @EnableKafkaStreams
    @EnableKafka
    public class KafkaSpringBootApplication {
        public static void main(String[] args) throws IOException {
            SpringApplication.run(KafkaSpringBootApplication.class,args);
            System.in.read();
        }
    
    
    
        @KafkaListeners(value = {@KafkaListener(topics = {"topic04"})})
        @SendTo(value = {"topic05"})
        public String listenner(ConsumerRecord<?, ?> cr) {
    
            return cr.value()+" mashibing edu";
        }
    
        @Bean
        public KStream<String, String> kStream(StreamsBuilder kStreamBuilder) {
    
            KStream<String, String> stream = kStreamBuilder.stream(
                    "topic02",
                    Consumed.with(Serdes.String(),
                    Serdes.String()));
    
            stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {
                @Override
                public Iterable<String> apply(String s) {
                    return Arrays.stream(s.split(" ")).collect(Collectors.toList());
                }
            })
            .selectKey((k,v)->v)
            .groupByKey(Serialized.with(Serdes.String(),Serdes.String()))
            .count(Materialized.<String,Long, KeyValueStore<Bytes, byte[]>>as("wordcount"))
            .toStream()
            .print(Printed.toSysOut());
    
            return stream;
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    package com.baizhi.service;
    
    public interface IOrderService {
        public void saveOrder(String id,Object message);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    package com.baizhi.service.impl;
    
    import com.baizhi.service.IOrderService;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.stereotype.Service;
    import org.springframework.transaction.annotation.Transactional;
    
    @Transactional
    @Service
    public class OrderService implements IOrderService {
    
        @Autowired
        private KafkaTemplate kafkaTemplate;
    
        @Override
        public void saveOrder(String id,Object message) {
            kafkaTemplate.send(new ProducerRecord("topic04",id,message));
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    package com.baizhi.tests;
    
    import com.baizhi.KafkaSpringBootApplication;
    import com.baizhi.service.IOrderService;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.kafka.core.KafkaOperations;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.test.context.junit4.SpringRunner;
    import org.springframework.transaction.annotation.Propagation;
    import org.springframework.transaction.annotation.Transactional;
    
    import java.util.Date;
    import java.util.zip.DataFormatException;
    
    @SpringBootTest(classes = {KafkaSpringBootApplication.class})
    @RunWith(SpringRunner.class)
    public class KafkaTempolateTests {
        @Autowired
        private KafkaTemplate kafkaTemplate;
        @Autowired
        private IOrderService orderService;
    
        @Test
        public void testOrderService(){
            orderService.saveOrder("001","baizhi edu ");
        }
        @Test
        public void testKafkaTemplate(){
            kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback() {
                @Override
                public Object doInOperations(KafkaOperations kafkaOperations) {
                    return kafkaOperations.send(new ProducerRecord("topic01","002","this is a demo"));
                }
            });
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
  • 相关阅读:
    (10)svelte 教程:Slots
    【linux】SourceForge 开源软件开发平台和仓库
    STL-List
    oCPC实践录 | oCPC转化的设计、选择、归因与成本设置(3)
    竞赛 基于机器视觉的手势检测和识别算法
    Linux 下安装 Maven 3.8.8【详细步骤】
    AutoGPT:自动化GPT原理及应用实践
    大学生个人网页设计 HTML个人网页制作 web个人网站模板 简单静态HTML个人网页作品
    蓝桥杯每日一题2023.11.9
    XML外部实体注入漏洞(一)
  • 原文地址:https://blog.csdn.net/weixin_39563769/article/details/136395927