本来承接上篇应该到 OSS 模块的初始化分析了,结果半路杀出个程咬金:在做分析的时候遇到了发布订阅的相关功能的一些问题,去请教了一下 狮子大佬 ,所以决定这篇先简单分析一下发布订阅功能,下一篇再回归 OSS 功能。
对于相关概念,我就不再在此重复了,感兴趣的朋友可以自行查看或者查找。
V4.3.0V3.17.5RedisPubSubController#pub

RedisPubSubController#sub


RedisPubSubController#sub
RedisUtils#subscribe
方法 topic.addListener 是为 RTopic 添加一个监听器,框架中对监听器的方法进行了重写,在后面发布的部分会详细说明。
RedissonTopic#addListener
RedissonTopic#addListenerAsync

PublishSubscribeService#subscribe


PublishSubscribeService#subscribeNoTimeout
第一次请求时没有 PubSubConnectionEntry 对象,继续执行下面的逻辑(后面存入容器对象 name2PubSubConnection 中),后续请求如果存在对象则直接返回。

PubSubConnectionEntry#subscribe
RedisPubSubConnection#subscribe
RedisPubSubConnection#async
断点到这里后面就不再深入了,是由io.netty.channel.AbstractChannelHandlerContext#safeExecute 这个方法完成底层订阅操作。

RedisPubSubController#pub
这里 System.out.println("发布通道 => " + key + ", 发送值 => " + value); 实际上就是上面第 3 步中重写的方法中的逻辑。
RedisUtils#publish
RedissonTopic#publish
RedissonTopic#publishAsync
CommandAsyncService#writeAsync
CommandAsyncService#async

RedisExecutor#execute
RedisExecutor#sendCommand
同样,断点到这里后面就不再深入了,最后发布操作同样是由io.netty.channel.AbstractChannelHandlerContext#safeExecute 这个方法完成。

特别说明:这里分析截图用的订阅通道是【test1】,发布消息内容是【test1】。
如果先进行订阅操作,再进行发布操作,那么在发布完成后订阅通道就能够直接接收到消息,因此实际上这一部分内容在时间上是紧接着【发布流程】的。
CommandPubSubDecoder#decodeResult这里解码并根据消息类型进行对应的操作。

这个方法前面还有一些操作步骤(参考以下序列图,只保留了关键方法):

RedisPubSubConnection#onMessage
此处的逻辑是遍历所有的监听器,执行 onMessage 方法。
PubSubMessageListener#onMessage
MessageListener 接口:

RedisUtils#subscribe重写 onMessage 方法:

换种写法比较容易理解:

RedisPubSubController#sub
