list是Redis的一种数据结构,可以把它理解成双向链表
可以从头部插入数据然后从尾部取出数据,从而实现消息队列的效果
利用命令 LPUSH和RPOP (从左边插入数据从右边取出数据)
lpush l1 e1 e2
rpop l1
或者 RPUSH和LPOP (从左边插入数据从右边取出数据)从而实现消息队列
rpush l1 e1 e2
lpop l1
但是使用list作为消息队列也有弊端 - 不能实现广播功能,只能单对单的进行消息队列
PubSub是Redis引入的一种消息传递的模型。消费者可以订阅一个或者多个Channel,从Channel中获取数据,当生产者向Channel发送数据的时候,所有的消费者都可以接收数据
(XXX是Channel xxx是消息)
发送消息
publish order.XXX xxx
接收消息
subscribe order.XXX
但是使用PubSub作为消息队列也有弊端 - 当消息堆积以后会造成消息的丢失
stream是Redis引入的新的消息队列,是功能比较完善的消息队列
使用XADD用于添加信息
具体做法可以参考下面这张图
编写一段命令
XADD l1 20 * name jack age 21
使用XREAD来接收命令
编写一段命令来接收刚刚发送的消息
XREAD Count 1 BLOCK 20 Stream l1 0
我们也可以$来表示获取最新的消息
XREAD Count 1 BLOCK 20 Stream l1 $
当然我们也可以创建一个消息组,利用消息组来处理消息
XGROUP CREATE l1 g1 $
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
当消息没有被确认的时候就会被放进PendingList里面等待处理
下面编写一段命令来创建两个消费者
XREADGROUP GROUP g1 c1 BLOCK 2000 STREAM l1
XREADGROUP GROUP g1 c2 BLOCK 2000 STREAM l1
下面来演示一下怎么使用
public class voucherOrderHandler implements Runnable{
@Orrvide
public void run(){
while(true){
try{
//接收信息
List<MapRecord<String,Object,Object>> list =stringRedisTemplate.opsFotStream().read(
Consumer.from("g1","c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create("stream.orders",ReadOffset.lastConsumed())
);
//省略一堆业务代码
//确认机制
stringRedisTemplate.opsForStream.acknowledge("s1","g1",record.getId();
}catch(Exception e){
log.erroe("消息处理异常",e);
handlePendingList();
}
}
}
//当消息没有被确认就会出现异常,那么我们就从PendingList里面尝试取出数据
public void handlePendingList(){
//几乎同样的逻辑再来一遍
while(true){
try{
//接收信息 从PendingList中读取消息不需要阻塞
List<MapRecord<String,Object,Object>> list = stringRedisTemplate.opsFotStream().read(
Consumer.from("g1","c1"),
StreamReadOptions.empty().count(1),
StreamOffset.create("stream.orders",ReadOffset.from("0"))
)
//省略一堆业务代码(当处理完消息的时候记得退出循环)
//确认机制
stringRedisTemplate.opsForStream.acknowledge("s1","g1",record.getId();
//当出现异常的时候由于我们已经设置类while(true)所以会自动循环
}catch(Exception e){
log.erroe("消息处理异常",e);
try{
Thread.sleep(20);
}catch(Exception e){
e.printStackTrace();
}
}
}
}
}