Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。
发送者(发布者)并不是直接发送它们的消息给指定的接收者(订阅者),而是将消息发布到特定的消息通道,并且不需要知道订阅者的任何信息。订阅者可以订阅一个或多个感兴趣的消息通道,同时也只会收到他们感兴趣通道的信息,而不用去关心是谁发布的。这种发布者与订阅者的解耦,使其具备更强的扩展性并得到一个更加动态的网络拓扑。
订阅:客户端订阅喜欢的频道。
发布:消息发送给指定的频道,由频道发送给订阅它的客户端。
subscribe channel [channel ...]
示例:订阅a、b、c频道
subscribe a b c
publish channel message
示例:给a频道发送hello
publish a hello
127.0.0.1:6379> pubsub channels
1) "a"
2) "e"
3) "c"
4) "b"
127.0.0.1:6379>
unsubscribe channel [channel ...]
示例:退订a频道
unsubscribe a
tips:注意:由于redis客户端订阅操作会占用当前客户端窗口,因此执行不了任何redis命令,退订频道命令一般用于程序客户端操作使用(如Java客户端、C客户端、PHP客户端等)
psubscribe parent [parent ...]
示例:订阅所有以a结尾的频道
psubscribe *a
punsubscribe parent [parent ...]
示例:退订以a结尾的所有频道
punsubscribe *a *b *c
案例:
client1:
127.0.0.1:6379> psubscribe goods.*
client2:
127.0.0.1:6379> psubscribe order.*
client3:
127.0.0.1:6379> publish goods.save hello~
(integer) 1
127.0.0.1:6379> publish ace hi~
(integer) 1
127.0.0.1:6379>
观测变化:
在Jedis中提供有JedisPubSub类,该类主要用于监听触发发布订阅指定命令的执行;当有发布订阅相关命令执行时,就会触发JedisPubSub中指定的方法;
方法 | 触发时机 |
---|---|
onSubscribe | 当有频道订阅时触发(subscribe) |
onMessage | 当有频道收到消息时触发(publish) |
onUnsubscribe | 当有频道退订时触发(unsubscribe) |
onPSubscribe | 当使用psubscribe命令订阅一批频道时触发 |
onPUnsubscribe | 当使用punsubscribe命令退订一批频道时触发 |
onPMessage | 当使用pubsub命令时调用 |
package com.dfbz.listener;
import redis.clients.jedis.JedisPubSub;
/**
* @author lscl
* @version 1.0
* @intro:
*/
public class PubSubListener extends JedisPubSub {
/**
* 当有频道进行订阅时调用(subscribe)
*
* @param channel: 订阅的频道
* @param subscribedChannels: 当前jedis连接订阅了几个频道
*/
@Override
public void onSubscribe(String channel, int subscribedChannels) {
System.out.println("有频道【" + channel + "】订阅了: " + subscribedChannels);
}
/**
* 当频道收到消息时调用(publish)
*
* @param channel
* @param message
*/
@Override
public void onMessage(String channel, String message) {
System.out.println("接收到来自【" + channel + "】的信息【" + message + "】");
// 如果发送的消息是close时退订这个频道
if("close".equals(message)){
this.unsubscribe(channel);
}
}
/**
* 取消订阅频道时调用(unsubscribe)
*
* @param channel
* @param subscribedChannels
*/
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
System.out.println("有频道【" + channel + "】取消订阅了: " + subscribedChannels);
}
/**
* 使用psubscribe命令订阅一批频道时触发
*
* @param pattern
* @param subscribedChannels
*/
public void onPSubscribe(String pattern, int subscribedChannels) {
System.out.println("使用【" + pattern + "】表达式订阅了频道: " + subscribedChannels);
}
/**
* 使用punsubscribe命令退订一批频道时触发
*
* @param pattern
* @param subscribedChannels
*/
public void onPUnsubscribe(String pattern, int subscribedChannels) {
System.out.println("使用【" + pattern + "】表达式退订了频道: " + subscribedChannels);
}
/**
* 使用pubsub命令时触发
*
* @param pattern
* @param channel
* @param message
*/
public void onPMessage(String pattern, String channel, String message) {
System.out.println("订阅表达式为【" + pattern + "】,订阅的频道是【" + channel + "】,消息【" + message + "】");
}
}
package com.dfbz.demo01;
import com.dfbz.listener.PubSubListener;
import org.junit.Before;
import org.junit.Test;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
/**
* @author lscl
* @version 1.0
* @intro:
*/
public class Demo01 {
private JedisPool jedisPool;
@Before
public void before() {
jedisPool = new JedisPool("localhost", 6379);
}
@Test
public void test1() throws Exception {
PubSubListener pubSubListener = new PubSubListener();
// 从连接池获取一个连接
Jedis jedis = jedisPool.getResource();
jedis.subscribe(pubSubListener, "a", "b", "c");
}
@Test
public void test2() throws Exception {
Jedis jedis = jedisPool.getResource();
jedis.publish("a", "a say hello~");
jedis.publish("b", "b say hello~");
jedis.publish("c", "c say hello~");
// 退订a频道
jedis.publish("a", "close");
// 退订是比较耗时的操作
Thread.sleep(10);
jedis.publish("a", "a say hello~");
jedis.publish("b", "b say hello~");
jedis.publish("c", "c say hello~");
}
}
测试pattern模式:
package com.dfbz.demo01;
import com.dfbz.listener.PubSubListener;
import org.junit.Before;
import org.junit.Test;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
/**
* @author lscl
* @version 1.0
* @intro:
*/
public class Demo02 {
private JedisPool jedisPool;
@Before
public void before() {
jedisPool = new JedisPool("localhost", 6379);
}
@Test
public void test1() throws Exception {
PubSubListener pubSubListener = new PubSubListener();
// 从连接池获取一个连接
Jedis jedis = jedisPool.getResource();
jedis.psubscribe(pubSubListener, "goods.*", "order.*");
}
@Test
public void test2() throws Exception {
Jedis jedis = jedisPool.getResource();
jedis.publish("goods.save","goods.save hello~");
jedis.publish("goods.delete","goods.delete hello~");
jedis.publish("order.query","order.query hello~");
jedis.publish("order.update","order.update hello~");
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.dfbz</groupId>
<artifactId>02_Redis_SpringBoot</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<artifactId>spring-boot-parent</artifactId>
<groupId>org.springframework.boot</groupId>
<version>2.0.1.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
</project>
CRMListener:
package com.dfbz.listener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.stereotype.Component;
/**
* @author lscl
* @version 1.0
* @intro: 监听CRM业务相关的消息
*/
@Component
public class CRMListener implements MessageListener {
@Autowired
private RedisTemplate redisTemplate;
@Override
public void onMessage(Message message, byte[] bytes) {
// redis序列化工具
RedisSerializer<?> serializer = redisTemplate.getValueSerializer();
// 频道
byte[] channel = message.getChannel();
// 发送的数据
byte[] body = message.getBody();
System.out.println("我是CRMListener--来自【" + new String(channel) + "】频道的消息:【" +serializer.deserialize(body) + "】");
}
}
OAListener:
package com.dfbz.listener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.stereotype.Component;
/**
* @author lscl
* @version 1.0
* @intro: 监听OA业务相关的消息
*/
@Component
public class OAListener implements MessageListener {
@Autowired
private RedisTemplate redisTemplate;
@Override
public void onMessage(Message message, byte[] pattern) {
// redis序列化工具
RedisSerializer<?> serializer = redisTemplate.getValueSerializer();
// 频道
byte[] channel = message.getChannel();
// 发送的数据
byte[] body = message.getBody();
System.out.println("我是OAListener--来自【" + new String(channel) + "】频道的消息:【" + serializer.deserialize(body).toString() + "】");
}
}
package com.dfbz;
import com.dfbz.listener.CRMListener;
import com.dfbz.listener.OAListener;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.Topic;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import java.util.ArrayList;
import java.util.List;
/**
* @author lscl
* @version 1.0
* @intro:
*/
@SpringBootApplication
public class RedisApplication {
public static void main(String[] args) {
SpringApplication.run(RedisApplication.class);
}
// 封装成监听适配器
@Bean
public MessageListenerAdapter CRMListenerAdapter(CRMListener crmListener) {
return new MessageListenerAdapter(crmListener);
}
// 封装成监听适配器
@Bean
public MessageListenerAdapter OAListenerAdapter(OAListener oaListener) {
return new MessageListenerAdapter(oaListener);
}
// 注册监听适配器
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(
RedisConnectionFactory redisConnectionFactory,
MessageListenerAdapter CRMListenerAdapter,
MessageListenerAdapter OAListenerAdapter
) {
// 监听规则集合
List<Topic> oaList = new ArrayList<Topic>();
// 普通订阅,订阅具体的频道
ChannelTopic deptTopic = new ChannelTopic("dept");
oaList.add(deptTopic);
// 模式订阅,支持模式匹配订阅,*为模糊匹配符
PatternTopic userTopic = new PatternTopic("user.*");
oaList.add(userTopic);
// 监听规则集合
List<Topic> crmList = new ArrayList<Topic>();
// 普通订阅,订阅具体的频道
ChannelTopic examineTopic = new ChannelTopic("examine");
crmList.add(examineTopic);
// 模式订阅,支持模式匹配订阅,*为模糊匹配符
PatternTopic reportTopic = new PatternTopic("report.*");
crmList.add(reportTopic);
// 监听容器
RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
redisMessageListenerContainer.addMessageListener(CRMListenerAdapter, crmList);
redisMessageListenerContainer.addMessageListener(OAListenerAdapter, oaList);
return redisMessageListenerContainer;
}
}
package com.dfbz.demo01;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.test.context.junit4.SpringRunner;
/**
* @author lscl
* @version 1.0
* @intro:
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class Demo01 {
@Autowired
private RedisTemplate redisTemplate;
@Test
public void test1(){
redisTemplate.convertAndSend("dept","dept hello~");
redisTemplate.convertAndSend("user.save","user.save hello~");
redisTemplate.convertAndSend("user.delete","user.delete hello~");
redisTemplate.convertAndSend("examine","examine hello~");
redisTemplate.convertAndSend("report.query","report.query hello~");
redisTemplate.convertAndSend("report.update","report.update hello~");
}
}