• Spring Cloud Stream kafka项目启动时报错


    报错信息

    • 使用的是spring-cloud-starter-stream-kafka,函数式编程
    • 程序接收socket.io消息,使用kafka发送消息
    • 程序启动时,StreamBridge.send 发送消息报空指针异常,追踪代码报错在Message resultMessage = (Message)((Function)functionToInvoke).apply(data);这一行,functionToInvokenull
    • 具体代码如下:
    @Component
    public class ChannelParamHandler {
    
        static final String OUTPUT_BINDING_NAME = "channel-out-0";
        private StreamBridge streamBridge;
    
        public ChannelParamHandler(StreamBridge streamBridge) {
            this.streamBridge = streamBridge;
        }
         @OnEvent(EventName.CHANNEL_PARAM)
        public void onEvent(SocketIOClient client, Object data, AckRequest ackSender) {
    		// ....其他逻辑省略
    		streamBridge.send(OUTPUT_BINDING_NAME, mapper.writeValueAsString(channelDTOList) + "-");
    		// ....其他逻辑省略
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 启动时有一个warn日志 WARN 1 --- [ restartedMain] onConfiguration$FunctionBindingRegistrar : You have defined function definition that does not exist: streamBridge
    • 具体报错信息如下:
    2022-07-22 11:08:38.934 ERROR 18088 --- [ntLoopGroup-3-6] c.n.s.listener.DefaultExceptionListener  : java.lang.NullPointerException
    
    com.newatc.socketio.handler.SocketIOException: java.lang.NullPointerException
    	at com.newatc.socketio.annotation.OnEventScanner$2.onData(OnEventScanner.java:111)
    	at com.newatc.socketio.namespace.Namespace.onEvent(Namespace.java:127)
    	at com.newatc.socketio.handler.PacketListener.onPacket(PacketListener.java:73)
    	at com.newatc.socketio.handler.InPacketHandler.channelRead0(InPacketHandler.java:84)
    	at com.newatc.socketio.handler.InPacketHandler.channelRead0(InPacketHandler.java:37)
    	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
    	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:305)
    	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    	at com.newatc.socketio.handler.AuthorizeHandler.channelRead(AuthorizeHandler.java:132)
    	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:102)
    	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
    	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299)
    	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
    	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
    	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
    	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
    	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
    	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
    	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    	at java.base/java.lang.Thread.run(Thread.java:834)
    Caused by: java.lang.NullPointerException: null
    	at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:221)
    	at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:164)
    	at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:144)
    	at com.newatc.unit.agent.handler.PhaseStatusHandler.onEvent(PhaseStatusHandler.java:65)
    	at jdk.internal.reflect.GeneratedMethodAccessor292.invoke(Unknown Source)
    	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    	at com.newatc.socketio.annotation.OnEventScanner$2.onData(OnEventScanner.java:109)
    	... 42 common frames omitted
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • streamBridge为null,空指针异常
    • streamBridge不为null时,functionToInvoke 为null,空指针异常

    解决

    • 原因不难想到,项目启动时,还未创建好kafka连接,但是程序已经接收到socket消息,已经开始调用streamBridge发送消息了

    • 这个时候streamBridge并未创建好或者初始化完成,就导致了空指针异常

    • 一开始想着优先创建kafka连接相关bean完成初始化,再去允许程序调用,但是针对Spring Cloud Stream,实在找不到可以提前初始化的bean,这条路失败

    • 项目启动或重启过程中,接收到的一些socket消息,其实是可以丢弃的,项目启动完成后再接收处理也没关系。于是就对streamBridge加了非空判断,不为null时再去调用send方法发送消息,但是仍然遇到了空指针异常

    • 继续debug代码,追踪出错时的情况和正常情况,进行对比,发现失败时streamBridge的初始化initialized为false,成功时为true
      在这里插入图片描述

    • 于是继续看源码找初始化方法,根据initialized搜索,果然找到了
      在这里插入图片描述

    • 判断非空后,先调用初始化方法,再调用send方法,果然没有空指针问题了

    • 总结一下:空指针异常处理,增加非空判断,增加初始化方法

    			if (streamBridge != null) {
                    streamBridge.afterSingletonsInstantiated();
                    streamBridge.send(OUTPUT_BINDING_NAME, mapper.writeValueAsString(channelDTOList) + "-");
                } else {
                    log.error("streamBridge is null-------------------!");
                }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
  • 相关阅读:
    【全国大学生loT设计竞赛】安谋科技&灵动赛题国二分享:MagicDog—仿生狗四足机器人
    webpack5学习笔记
    省赛真题求和
    Spring Data JPA 之事务与连接池之间的关系与配置
    Git 不识别文件名字母大小写变化
    人工智能学习:载入MNIST数据集(1)
    数据结构 —— 双向链表(超详细图解 & 接口函数实现)
    C语言—贪吃蛇(链表)超详解
    聊聊并发编程的12种业务场景
    IDEA 配置 云服务器远程部署
  • 原文地址:https://blog.csdn.net/u010882234/article/details/126475416