• Redis实现消息队列


    使用Redis中的list实现消息队列

    list是Redis的一种数据结构,可以把它理解成双向链表

    可以从头部插入数据然后从尾部取出数据,从而实现消息队列的效果

    利用命令 LPUSH和RPOP (从左边插入数据从右边取出数据)

    lpush l1 e1 e2
    
    • 1
    rpop l1
    
    • 1

    或者 RPUSH和LPOP (从左边插入数据从右边取出数据)从而实现消息队列

    rpush l1 e1 e2
    
    • 1
    lpop l1
    
    • 1

    但是使用list作为消息队列也有弊端 - 不能实现广播功能,只能单对单的进行消息队列

    使用Redis中的pubsub实现消息队列

    PubSub是Redis引入的一种消息传递的模型。消费者可以订阅一个或者多个Channel,从Channel中获取数据,当生产者向Channel发送数据的时候,所有的消费者都可以接收数据

    (XXX是Channel xxx是消息)

    发送消息

    publish order.XXX xxx 
    
    • 1

    接收消息

    subscribe order.XXX
    
    • 1

    但是使用PubSub作为消息队列也有弊端 - 当消息堆积以后会造成消息的丢失

    使用Redis中的stream实现消息队列

    stream是Redis引入的新的消息队列,是功能比较完善的消息队列

    使用XADD用于添加信息

    具体做法可以参考下面这张图
    在这里插入图片描述

    编写一段命令

    XADD l1 20 * name jack age 21
    
    • 1

    使用XREAD来接收命令
    在这里插入图片描述
    编写一段命令来接收刚刚发送的消息

    XREAD Count 1 BLOCK 20 Stream l1 0
    
    • 1

    我们也可以$来表示获取最新的消息

    XREAD Count 1 BLOCK 20 Stream l1 $
    
    • 1

    当然我们也可以创建一个消息组,利用消息组来处理消息

    • 创建消息组
    XGROUP CREATE l1 g1 $
    
    • 1
    • 利用消息组读取消息
      • XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
      • group:消费组名称
      • consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
      • count:本次查询的最大数量
      • BLOCK milliseconds:当没有消息时最长等待时间
      • NOACK:无需手动ACK,获取到消息后自动确认
      • STREAMS key:指定队列名称
      • ID:获取消息的起始ID

    当消息没有被确认的时候就会被放进PendingList里面等待处理

    下面编写一段命令来创建两个消费者

    XREADGROUP GROUP g1 c1 BLOCK 2000 STREAM l1
    
    XREADGROUP GROUP g1 c2 BLOCK 2000 STREAM l1
    
    • 1
    • 2
    • 3

    下面来演示一下怎么使用

    public class voucherOrderHandler implements Runnable{
    	
    	@Orrvide
    	public void run(){
    		while(true){
    			try{
    
    				//接收信息
    				List<MapRecord<String,Object,Object>> list 		=stringRedisTemplate.opsFotStream().read(
    					Consumer.from("g1","c1"),
    					StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
    					StreamOffset.create("stream.orders",ReadOffset.lastConsumed())
    				);
    
    				//省略一堆业务代码
    
    
    				//确认机制
    				stringRedisTemplate.opsForStream.acknowledge("s1","g1",record.getId();			
    			}catch(Exception e){
    				log.erroe("消息处理异常",e);
    				handlePendingList();
    			}
    		}
    	}
    	//当消息没有被确认就会出现异常,那么我们就从PendingList里面尝试取出数据
    	public void handlePendingList(){
    		//几乎同样的逻辑再来一遍
    		while(true){
    			try{
    
    				//接收信息 从PendingList中读取消息不需要阻塞
    				List<MapRecord<String,Object,Object>> list = stringRedisTemplate.opsFotStream().read(
    					Consumer.from("g1","c1"),
    					StreamReadOptions.empty().count(1)StreamOffset.create("stream.orders",ReadOffset.from("0"))
    				)
    
    				//省略一堆业务代码(当处理完消息的时候记得退出循环)
    
    
    				//确认机制
    				stringRedisTemplate.opsForStream.acknowledge("s1","g1",record.getId();	
    				
    				//当出现异常的时候由于我们已经设置类while(true)所以会自动循环		
    			}catch(Exception e){
    				log.erroe("消息处理异常",e);
    				try{
    					Thread.sleep(20);
    				}catch(Exception e){
    					e.printStackTrace();
    				}
    			}
    		}
    	}
    } 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
  • 相关阅读:
    UNIAPP实战项目笔记36 购物车的编辑商品数量完成的功能和数量价格计算
    Java使用volatile关键字进行同步,结果不对
    索引构建磁盘IO太高,巧用tmpfs让内存来帮忙
    stereo-inertial-gnss-lidar device
    IDEA生成带参数和返回值注解
    二叉搜索树
    FlashDuty Changelog 2023-09-21 | 自定义字段和开发者中心
    Java实现二十三种设计模式(二)—— 七种结构型模式 (上)——适配器模式、桥接模式、组合模式
    [思维]Sum Plus Product 2022杭电多校第9场 1010
    说一说ajax的请求过程?
  • 原文地址:https://blog.csdn.net/Superkom666/article/details/134017745