Redis 的发布订阅(Pub/Sub)模式是一种消息传递机制,它允许在发送者和接收者之间建立松耦合的通信关系。在这种模式中,发送者(发布者)将消息发布到一个指定的频道或模式,而接收者(订阅者)可以订阅一个或多个频道,以便接收发布的消息。
以下是Redis发布订阅模式的主要组件:
可以看下图,Publisher 和 Subscriber、Channel 的关系很清晰:
发布者往 “Channel A” 通道发布消息:Hello World!,消息的所有订阅者就会收到这个消息
Redis实现 发布/订阅 一共有两种模式:
Redis 可以支持多个数据库,每个数据库都有自己的命名空间和数据。通过使用多个数据库,可以实现数据隔离、分区和组织
但是值得注意的是:这种发布订阅机制与 数据分区空间无关,比如在 db 0 发布消息, 其他区的订阅者都会收到消息
Redis 使用以下命令操作 Pub/Sub 工作:
SUBSCRIBE
:订阅一个或多个频道
SUBSCRIBE channel [channel ...]
UNSUBSCRIBE
:取消订阅一个或多个频道
UNSUBSCRIBE [channel [channel ...]]
PSUBSCRIBE
:订阅一个或多个模式
PSUBSCRIBE pattern [pattern ...]
PUNSUBSCRIBE
取消订阅一个或多个模式
PUNSUBSCRIBE [pattern [pattern ...]]
PUBSUB CHANNELS [pattern]
:列出活跃的 channelPUBSUB NUMSUB [channel-1 ... channel-N]
:列出 channel 的订阅者个数通过频道(Channel)进行发布订阅过程如下:
订阅后:
使用客户端 [subscriber A] 订阅 Channel [mychannel] 来接收消息。从上面可以看出响应的信息:
进入订阅后的客户端可以收到 3 种枚举类型的消息:
发布消息:
发布的消息并不会持久化存储下来,所以消息发布之后被某个 Subcriber 订阅到的话,消息生命周期基本就完成了
想要收到上面 发布者发布的消息,我们的客户端首先需要关注了 [mychannel] 频道,才能收到 “Hello, World!” 这条消息
如果你不想收到某个频道的消息了,你可以取消预订
来看看另一种实现发布订阅的方案 ,就是模式匹配的方式:除了直接订阅的客户端之外,还会检查是否有与我们模式相匹配的 Channel,如果有,消息也会发布到对应匹配的频道上,订阅这个 Channel 的客户端也会收到消息
如下图:
当 Message.Queue.Area1 频道接收到消息之后,除了订阅自身频道的 Actor A 和 Actor B 能收到消息之外。因为频道与模式匹配成功,消息还会发送给订阅 Message.Queue.* 模式的所有人员。
因为使用匹配模式,PUBLISH 消息发布到 Message.Queue.Area2 之外,还会将该 Channel 与匹配模式的Channel进行对比,如果 Channel 与某个模式匹配的话,也将这个消息发布到订阅这个模式的客户端。
所以图中红色线条部分,包括 Actor C、Actor D、Actor E 都接受到了消息
Client A 订阅 Message.Queue.Area1:
Client B 订阅 Message.Queue.Area2:
Client C 订阅 Message.Queue.*:
对应频道的订阅者收到消息(Client A ):
匹配模式的订阅者收到消息(Client C):
因为没有筛重策略,所以如果你既订阅了匹配模式(如 Message.Queue.* ),又订阅了对应的频道(如 Message.Queue.Area2),那么你的客户端会收到两条同样的消息,一条消息类型是message,一条类型是 pmessage
订阅消息就是接收消息,这个比较复杂。既有对 Redis 连接的管理,也有对消费消息的线程池的管理。不过 Spring 已经把这个“重活”给干了。
Spring 提供了一个全套的解决方案,这里面包括:
由于 Spring 已经全权代理,用户只需要提供要消费的 topic 以及对应的消费回调代码即可。
我们需要了解Spring提供的几个接口和类,才可以很好的使用:
Topic
接口,表示一个订阅对象:它有两个实现类,ChannelTopic
和 PatternTopic
,前者对应 redis的 channel,后者对应 redis 的 patternMessageListener
接口,回调接口,通过它来执行业务代码Message
接口,表示从 redis 接收到的消息RedisMessageListenerContainer
类,这个核心类,相当于一个代理,就是它负责接收 redis 的消息,并分发给 MessageListenerRedisConnectionFactory
:RedisMessageListenerContainer
需要此类 RedisConnectionFactory
:redis连接工厂,用来获取一个redis连接,由于这个连接用于接收消息,所以它是一直阻塞着的SpringBoot集成Redis Messaging (Pub/Sub)
先说下在 springboot 中使用 redis 的发布订阅的步骤:
MessageListener
接口,重写 onMessage() 方法)。RedisMessageListenerContainer
)。添加一个订单消息监听器:
@Component
public class OrderSubscriber implements MessageListener {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Override
public void onMessage(Message message, byte[] pattern) {
// 获取消息
byte[] messageBody = message.getBody();
// 使用值序列化器转换
Object msg = redisTemplate.getValueSerializer().deserialize(messageBody);
// 获取监听的频道
byte[] channelByte = message.getChannel();
// 使用字符串序列化器转换
Object channel = redisTemplate.getStringSerializer().deserialize(channelByte);
// 渠道名称转换
String patternStr = new String(pattern);
System.out.println(patternStr);
System.out.println("---频道---: " + channel);
System.out.println("---消息内容---: " + msg);
}
}
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(redisConnectionFactory);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
// json 序列化配置
Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
jackson2JsonRedisSerializer.setObjectMapper(om);
// String 序列化
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
// 所有的 key 采用 string 的序列化
template.setKeySerializer(stringRedisSerializer);
// 所有的 value 采用 jackson 的序列化
template.setValueSerializer(jackson2JsonRedisSerializer);
// hash 的 key 采用 string 的序列化
template.setHashKeySerializer(stringRedisSerializer);
// hash 的 value 采用 jackson 的序列化
template.setHashValueSerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory redisConnectionFactory, OrderSubscriber orderSubscriber) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
// 设置连接工厂
container.setConnectionFactory(redisConnectionFactory);
// 监听器订阅频道
container.addMessageListener(orderSubscriber, new ChannelTopic("order"));
container.addMessageListener(orderSubscriber, new ChannelTopic("sms"));
// 序列化
Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
seria.setObjectMapper(objectMapper);
container.setTopicSerializer(seria);
return container;
}
}
添加一个短信监听器:
@Component
public class SmsSubscriber implements MessageListener {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Override
public void onMessage(Message message, byte[] pattern) {
// 获取消息
byte[] messageBody = message.getBody();
// 使用值序列化器转换
}
}
修改配置:
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory redisConnectionFactory, OrderSubscriber orderSubscriber, SmsSubscriber smsSubscriber) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
// 设置连接工厂
container.setConnectionFactory(redisConnectionFactory);
// 监听器订阅频道
container.addMessageListener(orderSubscriber, Arrays.asList(new ChannelTopic("order"), new ChannelTopic("sms")));
container.addMessageListener(smsSubscriber, new ChannelTopic("sms"));
// 序列化
Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
seria.setObjectMapper(objectMapper);
container.setTopicSerializer(seria);
return container;
}
container.addMessageListener(smsSubscriber, new PatternTopic("redis.*"));
@RestController
@RequestMapping("/pub")
public class PubController {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@GetMapping("/publish")
public String publish() {
redisTemplate.convertAndSend("order", "该订单已过期");
redisTemplate.convertAndSend("sms", "该短信已发送");
return "publish";
}
}
1、定义 一个消息接受类
@Component
public class OrderMessageReceiver {
public void receiveMessage(String message, String channel){
System.out.println("---频道---: " + channel);
System.out.println("---消息内容---: " + message);
}
}
2、配置一个 MessageListenerAdapter
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory redisConnectionFactory, MessageListenerAdapter adapter, SmsSubscriber smsSubscriber) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
// 设置连接工厂
container.setConnectionFactory(redisConnectionFactory);
// 监听器订阅频道
container.addMessageListener(adapter, Arrays.asList(new ChannelTopic("order"), new ChannelTopic("sms")));
container.addMessageListener(smsSubscriber, new PatternTopic("redis.*"));
// 序列化
Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
seria.setObjectMapper(objectMapper);
container.setTopicSerializer(seria);
return container;
}
@Bean
public MessageListenerAdapter smsExpirationListener(OrderMessageReceiver messageListener) {
MessageListenerAdapter receiveMessage = new MessageListenerAdapter(messageListener, "receiveMessage");
// 序列化
Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
seria.setObjectMapper(objectMapper);
receiveMessage.setSerializer(seria);
return receiveMessage;
}
Spring boot整合Redis实现发布订阅(超详细)
springboot中使用redis发布订阅
当使用 Pattern 进行发布订阅的时候。如果有消息发布出来,除了订阅该 Channel 的 Client 之外,所有订阅了与 Channel 匹配的模式的 Client 同样会收到消息。
另外,Redis 发布订阅的消息不会被持久化,所以无历史消息,也不支持 ACK 机制,