spring-cloud-starter-stream-kafka
,函数式编程socket.io
消息,使用kafka发送消息StreamBridge.send
发送消息报空指针异常,追踪代码报错在Message resultMessage = (Message)((Function)functionToInvoke).apply(data);
这一行,functionToInvoke
为null
@Component
public class ChannelParamHandler {
static final String OUTPUT_BINDING_NAME = "channel-out-0";
private StreamBridge streamBridge;
public ChannelParamHandler(StreamBridge streamBridge) {
this.streamBridge = streamBridge;
}
@OnEvent(EventName.CHANNEL_PARAM)
public void onEvent(SocketIOClient client, Object data, AckRequest ackSender) {
// ....其他逻辑省略
streamBridge.send(OUTPUT_BINDING_NAME, mapper.writeValueAsString(channelDTOList) + "-");
// ....其他逻辑省略
}
}
WARN 1 --- [ restartedMain] onConfiguration$FunctionBindingRegistrar : You have defined function definition that does not exist: streamBridge
2022-07-22 11:08:38.934 ERROR 18088 --- [ntLoopGroup-3-6] c.n.s.listener.DefaultExceptionListener : java.lang.NullPointerException
com.newatc.socketio.handler.SocketIOException: java.lang.NullPointerException
at com.newatc.socketio.annotation.OnEventScanner$2.onData(OnEventScanner.java:111)
at com.newatc.socketio.namespace.Namespace.onEvent(Namespace.java:127)
at com.newatc.socketio.handler.PacketListener.onPacket(PacketListener.java:73)
at com.newatc.socketio.handler.InPacketHandler.channelRead0(InPacketHandler.java:84)
at com.newatc.socketio.handler.InPacketHandler.channelRead0(InPacketHandler.java:37)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:305)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at com.newatc.socketio.handler.AuthorizeHandler.channelRead(AuthorizeHandler.java:132)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:102)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.NullPointerException: null
at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:221)
at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:164)
at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:144)
at com.newatc.unit.agent.handler.PhaseStatusHandler.onEvent(PhaseStatusHandler.java:65)
at jdk.internal.reflect.GeneratedMethodAccessor292.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at com.newatc.socketio.annotation.OnEventScanner$2.onData(OnEventScanner.java:109)
... 42 common frames omitted
原因不难想到,项目启动时,还未创建好kafka连接,但是程序已经接收到socket消息,已经开始调用streamBridge发送消息了
这个时候streamBridge并未创建好或者初始化完成,就导致了空指针异常
一开始想着优先创建kafka连接相关bean完成初始化,再去允许程序调用,但是针对Spring Cloud Stream
,实在找不到可以提前初始化的bean,这条路失败
项目启动或重启过程中,接收到的一些socket消息,其实是可以丢弃的,项目启动完成后再接收处理也没关系。于是就对streamBridge加了非空判断,不为null时再去调用send方法发送消息,但是仍然遇到了空指针异常
继续debug代码,追踪出错时的情况和正常情况,进行对比,发现失败时streamBridge的初始化initialized为false,成功时为true
于是继续看源码找初始化方法,根据initialized搜索,果然找到了
判断非空后,先调用初始化方法,再调用send方法,果然没有空指针问题了
总结一下:空指针异常处理,增加非空判断,增加初始化方法
if (streamBridge != null) {
streamBridge.afterSingletonsInstantiated();
streamBridge.send(OUTPUT_BINDING_NAME, mapper.writeValueAsString(channelDTOList) + "-");
} else {
log.error("streamBridge is null-------------------!");
}