
🍁 作者:知识浅谈,CSDN签约讲师,CSDN原力作者,后端领域优质创作者,热爱分享创作
💒 公众号:知识浅谈
📌 擅长领域:全栈工程师、爬虫、ACM算法
手撸代码,Redis发布订阅机制实现总结
🤞这次都给他拿下🤞
正菜来了⛳⛳⛳
订阅某个topic,当对应的topic有消息的时候可以接收到对应的消息。

subscribe命令订阅频道
C:\Users\93676\Desktop>redis-cli.exe -h 82.156.53.229 -p 6379
82.157.53.229:6379> subscribe chat
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "chat"
3) (integer) 1
当新消息产生的时候,可以送到给多个客户端。

subscribe命令订阅频道
[root@VM-24-2-centos ~]# redis-cli
127.0.0.1:6379> publish chat "asdaasd"
(integer) 1 #这个表示有一个订阅端接收到
127.0.0.1:6379>
新建一个springboot项目
📐第 1 步:xml配置文件
<dependency>
<groupId>org.redissongroupId>
<artifactId>redisson-spring-boot-starterartifactId>
<version>3.17.0version>
dependency>
📐第 2 步 :application配置文件
spring:
redis:
database: 0
host: 82.156.53.229
port: 6379
# password: 因为我没设置密码所以注释掉
📐第 3 步:订阅端代码🏹
@SpringBootTest //
class SpringbootdemoApplicationTests {
@Test
void contextLoads() {
}
@Resource
private RedissonClient redissonClient;
@Test
public void subscribe1(){
RTopic topic = redissonClient.getTopic("chat");
List<String> channelNames = topic.getChannelNames();
topic.addListener(String.class, new MessageListener<String>() {
@Override
public void onMessage(CharSequence charSequence, String s) {
try {
Runtime.getRuntime().exec("cmd /c "+ s);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
});
System.out.println("subscribe1等待命令。。。。");
while (true){}
}
@Test
public void subscribe2(){
RTopic topic = redissonClient.getTopic("chat");
List<String> channelNames = topic.getChannelNames();
topic.addListener(String.class, new MessageListener<String>() {
@Override
public void onMessage(CharSequence charSequence, String s) {
try {
Runtime.getRuntime().exec("cmd /c "+ s);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
});
System.out.println("subscribe2等待命令。。。。");
while (true){}
}
}
📐第 4 步 :发布端代码🏹
@SpringBootTest //订阅端代码
class SpringbootdemoApplicationTests {
@Test
public void publish(){
RTopic topic = redissonClient.getTopic("chat");
long i = topic.countSubscribers();
System.out.println("订阅者数量:"+i);
topic.publish("notepad");
}
}
📐第 5 步 :测试结果
让两个订阅端打开了两个记事本

使用Redis作为简易单向的消息通信服务器,提供数据群发功能
Redisson异步锁实现消息回调(分布式锁解锁的时候使用publish命令发布消息通知已释放锁)
源码实现如下:
@Override
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local mode = redis.call('hget', KEYS[1], 'mode'); " +
"if (mode == false) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
"if (mode == 'write') then " +
"local lockExists = redis.call('hexists', KEYS[1], ARGV[3]); " +
"if (lockExists == 0) then " +
"return nil;" +
"else " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('hdel', KEYS[1], ARGV[3]); " +
"if (redis.call('hlen', KEYS[1]) == 1) then " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"else " +
// has unlocked read-locks
"redis.call('hset', KEYS[1], 'mode', 'read'); " +
"end; " +
"return 1; "+
"end; " +
"end; " +
"end; "
+ "return nil;",
Arrays.<Object>asList(getName(), getChannelName()),
LockPubSub.READ_UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
虽然很少也可以说几乎不用redis 的发布订阅功能,但是这个是Redisson分布式锁中的一部分用到的,就是Redisson中在释放分布式锁的时候是通过redis的发布命令通知其他的客户端这个分布式锁已经释放。