• springboot基础(51):整合kafka


    前言

    springboot如何整合kafka

    环境准备

    需要安装kafka,安装请见:springboot基础(50):linux安装kafka

    整合kafka

    1. 导入依赖
     <dependency>
                <groupId>org.springframework.kafkagroupId>
                <artifactId>spring-kafkaartifactId>
            dependency>
    
    • 1
    • 2
    • 3
    • 4
    1. 编写接口发送消息
    @RestController
    @RequestMapping("/msg")
    public class MessageController {
    
        @Autowired
        private MessageService messageService;
    
        @GetMapping("{id}")
        public String sendMessage(@PathVariable  String id){
            messageService.sendMessage(id);
            return "success";
    
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    public interface MessageService {
    
        void sendMessage(String id);
    
    
        String doMessage();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    1. 编写接口实现(用于前台发送)
    @Service
    public class MessageKafkatmqServiceImpl implements MessageService {
    
    
        @Autowired
        private KafkaTemplate<String,String> kafkaTemplate;
    
    
        @Override
        public void sendMessage(String id) {
            System.out.println("待发送消息加入到队列(kafka )id:"+id);
            kafkaTemplate.send("hello",id);//hello是前面已经创建的topic
        }
    
    
        @Override
        public String doMessage() {
            return null;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    1. 定义监听进行异步消息发送
    /**
     * 监听器
     */
    @Component
    public class KafkaMessageListener  {
    
    
        @KafkaListener(topics = "hello")
        public void onMessage222(ConsumerRecord<String,String> consumerRecord) {
            System.out.println("完成短信的发送,id :"+consumerRecord.value());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    1. 配置文件
    spring:
      kafka:
        bootstrap-servers: 106.13.2.249:9092
        consumer:
          group-id: abc #设置消费者的groupid
    
    • 1
    • 2
    • 3
    • 4
    • 5
    1. 启动运行服务器,并测试发送消息

    在这里插入图片描述

    在这里插入图片描述
    在这里插入图片描述

    遇到的问题

    java.net.UnknownHostException: xxxx

    在这里插入图片描述

    2022-07-20 15:48:28.701  INFO 15924 --- [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-abc-1, groupId=abc] Cluster ID: LFbHxG8qSSu7PyPKXoDD4g
    2022-07-20 15:48:28.703  INFO 15924 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-abc-1, groupId=abc] Discovered group coordinator ls-bptysztw:9092 (id: 2147483647 rack: null)
    2022-07-20 15:48:30.990  WARN 15924 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-abc-1, groupId=abc] Error connecting to node ls-bptysztw:9092 (id: 2147483647 rack: null)
    
    java.net.UnknownHostException: ls-bptysztw
    	at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method) ~[na:1.8.0_144]
    	at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:928) ~[na:1.8.0_144]
    	at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1323) ~[na:1.8.0_144]
    	at java.net.InetAddress.getAllByName0(InetAddress.java:1276) ~[na:1.8.0_144]
    	at java.net.InetAddress.getAllByName(InetAddress.java:1192) ~[na:1.8.0_144]
    	at java.net.InetAddress.getAllByName(InetAddress.java:1126) ~[na:1.8.0_144]
    	at org.apache.kafka.clients.DefaultHostResolver.resolve(DefaultHostResolver.java:27) ~[kafka-clients-3.1.1.jar:na]
    	at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:110) ~[kafka-clients-3.1.1.jar:na]
    	at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:511) ~[kafka-clients-3.1.1.jar:na]
    	at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:468) ~[kafka-clients-3.1.1.jar:na]
    	at org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:173) ~[kafka-clients-3.1.1.jar:na]
    	at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:988) [kafka-clients-3.1.1.jar:na]
    	at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:301) [kafka-clients-3.1.1.jar:na]
    	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.tryConnect(ConsumerNetworkClient.java:575) [kafka-clients-3.1.1.jar:na]
    	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:854) [kafka-clients-3.1.1.jar:na]
    	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:830) [kafka-clients-3.1.1.jar:na]
    	at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206) [kafka-clients-3.1.1.jar:na]
    	at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169) [kafka-clients-3.1.1.jar:na]
    	at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129) [kafka-clients-3.1.1.jar:na]
    	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602) [kafka-clients-3.1.1.jar:na]
    	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412) [kafka-clients-3.1.1.jar:na]
    	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297) [kafka-clients-3.1.1.jar:na]
    	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) [kafka-clients-3.1.1.jar:na]
    	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) [kafka-clients-3.1.1.jar:na]
    	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:246) [kafka-clients-3.1.1.jar:na]
    	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.coordinatorUnknownAndUnready(ConsumerCoordinator.java:459) [kafka-clients-3.1.1.jar:na]
    	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:487) [kafka-clients-3.1.1.jar:na]
    	at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1262) [kafka-clients-3.1.1.jar:na]
    	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231) [kafka-clients-3.1.1.jar:na]
    	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) [kafka-clients-3.1.1.jar:na]
    	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1522) [spring-kafka-2.8.7.jar:2.8.7]
    	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1512) [spring-kafka-2.8.7.jar:2.8.7]
    	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1340) [spring-kafka-2.8.7.jar:2.8.7]
    	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1252) [spring-kafka-2.8.7.jar:2.8.7]
    	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_144]
    	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_144]
    	at java.lang.Thread.run(Thread.java:748) [na:1.8.0_144]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    解决办法
    配置C:\Windows\System32\drivers\etc\hosts文件

    123.123.123.123       ls-bptysztw
    
    • 1
  • 相关阅读:
    Spring Data JPA 之 一对一,一对多,多对多 关系映射
    python系列教程214——列表解析与for和if
    Java初识:类和对象(上)
    阿里云大数据开发一面面经,已过,面试题已配答案
    写一个flutter程序2
    2.6 用一套万能文案公式来拆解4个小红书爆文案例【玩赚小红书】
    前端该了解的网络知识
    wy的leetcode刷题记录_Day38
    springboot+人力资源管理系统 毕业设计-附源码181614
    轻松搭建短域名短链接服务系统,可选权限认证,并自动生成证书认证把nginx的http访问转换为https加密访问,完整步骤和代码
  • 原文地址:https://blog.csdn.net/u011628753/article/details/125888442