• 【2023】redis-stream配合spring的data-redis详细使用(包括广播和组接收)


    背景:
    使用该方式实现,主要是因为项目中有个地方刚好需要异步来实现,而项目又没有配置专业的消息中间件,并且使用的也不是太频繁,就觉得没必要专门安装一个MQ服务了,直接通过现有的redis的stream来实现异步消息接收直接具体的业务逻辑。

    一、简介

    1、介绍

    Redis Stream(Redis Streams)是Redis 5.0版本引入的一种数据结构,用于处理时间序列数据、消息队列和日志流。它提供了高吞吐量、持久性、有序、可扩展的消息传递解决方案。Redis Stream 结构是对传统发布/订阅模式的增强,使你能够更灵活地处理数据流,并提供了以下主要特性:

    1. 多生产者和多消费者:多个生产者可以同时向 Stream 中写入消息,而多个消费者可以独立订阅并消费消息。每个消费者可以有不同的消费速率。

    2. 消费组:Redis Stream引入了消费者组的概念,多个消费者可以加入同一个消费者组并共同消费消息,这确保了消息在消费时不会被多次处理。

    3. 消费者阻塞:消费者可以使用 XREADGROUP 命令以阻塞方式获取消息,只有当有新消息到达时才会被推送给消费者。

    4. 消费者自动确认:Redis Stream 支持自动确认消息,消费者可以告诉 Redis 何时确认已经成功处理了一条消息。

    5. 多 Stream 支持:你可以创建多个 Stream 来存储不同种类的数据,并分别处理它们。

    6. 有序性:消息在 Stream 中按照消息的时间戳有序存储,因此你可以按照消息的顺序读取数据。

    7. 持久性存储:Redis Stream 使用内存数据结构,但也支持将数据异步保存到磁盘,以确保数据不会丢失。

    2、对比

    对比redis的其他几种实现方式来说功能更加全面,支持可持久化和通过ack确认的方式基本实现了消息丢失的问题,当然对比专业的消息队列中间件来说还是有些不足的。
    需要看详细对比可以看 🔗redis队列对比 这篇文章
    在这里插入图片描述

    二、整合spring的data-redis实现

    1、使用依赖

    <dependencies>
      <dependency>
        <groupId>org.springframework.bootgroupId>
        <artifactId>spring-boot-starter-data-redisartifactId>
      dependency>
      <dependency>
        <groupId>org.apache.commonsgroupId>
        <artifactId>commons-pool2artifactId>
        <version>2.11.1version>
      dependency>
    		<dependency>
    			<groupId>org.springframework.bootgroupId>
    			<artifactId>spring-boot-starter-webartifactId>
    		dependency>
    		<dependency>
    			<groupId>org.projectlombokgroupId>
    			<artifactId>lombokartifactId>
    			<optional>trueoptional>
    		dependency>
    		  dependencies>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    2、配置类

    2.1、配置RedisTemplate bean

    重点是下面这一句,不能用json的序列化类,否则会序列化失败
    redisTemplate.setHashValueSerializer(RedisSerializer.string());

    	@Bean
    	public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
    		RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
    		redisTemplate.setConnectionFactory(connectionFactory);
    		redisTemplate.setKeySerializer(new StringRedisSerializer());
    		redisTemplate.setValueSerializer(new StringRedisSerializer());
    		redisTemplate.setHashKeySerializer(new StringRedisSerializer());
    		// 这个地方不可使用 json 序列化,如果使用的是ObjectRecord传输对象时,可能会有问题,会出现一个 java.lang.IllegalArgumentException: Value must not be null! 错误
    		redisTemplate.setHashValueSerializer(RedisSerializer.string());
    		return redisTemplate;
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    2.2、异常类

    @Slf4j
    public class CustomErrorHandler implements ErrorHandler {
        @Override
        public void handleError(Throwable throwable) {
            log.error("发生了异常",throwable);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    3、实体类

    该地方使用了两个实体类,主要是用于测试,如果不是指定的同一个类型时,指定的是父类的类型,是否可以正常反序列化接收消息

    3.1、User

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    @ToString
    public class Book extends User{
        private String title;
        private String author;
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    3.2、Book

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    @ToString
    public class User implements Serializable {
        private String name;
        private Integer age;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    4、发送消息

    发送消息主要是通过redisTemplate.opsForStream().add(record);进行发送到redis中(接收时会分两种方式接收,看后续!)

    4.1、RedisStreamUtil工具类

    用于实现消息发送、初始化组、key绑定组、清除消费了的消息等方法

    • 在第一次发送消息时需要先绑定接收的组和可key,否则在接收时会报不存在该组的异常
    • 发送消息后,需要把该条消费了的消息清除掉,否则会一直保持在stream中
    @Component
    @Slf4j
    public class RedisStreamUtil {
        public static final String STREAM_KEY_001 = "stream-001";
        @Resource
        private RedisTemplate<String,Object> redisTemplate;
    
    
        /**
         * 添加记录到流中
         * @param streamKey
         * @param t
         * @param 
         */
        public <T> RecordId add(String streamKey,T t){
            ObjectRecord<String, T> record = StreamRecords.newRecord()
                    .in(streamKey)  //key
                    .ofObject(t) //消息数据
                    .withId(RecordId.autoGenerate());
    //        发送消息
            RecordId recordId = redisTemplate.opsForStream().add(record);
    
            log.info("添加成功,返回的record-id[{}]",recordId);
    
    
            return recordId;
        }
    
        /**
         * 用来创建绑定流和组
         */
        public void addGroup(String key, String groupName){
            redisTemplate.opsForStream().createGroup(key,groupName);
        }
        /**
         * 用来判断key是否存在
         */
        public boolean hasKey(String key){
            if(key==null){
                return false;
            }else{
                return redisTemplate.hasKey(key);
            }
    
        }
        /**
         * 用来删除掉消费了的消息
         */
        public void delField(String key,RecordId recordIds){
            redisTemplate.opsForStream().delete(key,recordIds);
        }
    
        /**
         * 用来初始化 实现绑定key和消费组
         */
        public void initStream(String key, String group){
            //判断key是否存在,如果不存在则创建
            boolean hasKey = hasKey(key);
            if(!hasKey){
                Map<String,Object> map = new HashMap<>();
                map.put("key","value");
                RecordId recordId = add(key, map);
                addGroup(key,group);   //第一次初始化时需要把Stream和group绑定,
                delField(key,recordId);  //清除掉该条无用数据
                log.info("stream:{}-group:{} initialize success",key,group);
            }
        }
    
    
        public String getStreamKey001(){
            return STREAM_KEY_001;
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74

    4.2、通过延时队列线程池模拟发送消息

    • 该方法里通过模拟延时5秒后,每隔3秒发送一条数据,发送10条后关闭线程池
    /**
     * 在spring初始化时执行,定时发送消息到stream中,用于模拟发送消息
     */
    //@Component
    public class StreamMessageRunner implements ApplicationRunner {
    
        @Resource
        private RedisStreamUtil redisStreamUtil;
    
    
        @Override
        public void run(ApplicationArguments args) throws Exception {
            ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);
    
            AtomicInteger integer = new AtomicInteger(0);
            //使用延时队列线程池模拟发送数据消息
            pool.scheduleAtFixedRate(()->{
                User zhangsan = new User("zhangsan"+integer.get(), 1 + integer.get());
                RecordId recordId = redisStreamUtil.add(redisStreamUtil.getStreamKey001(), zhangsan);
                integer.getAndIncrement();
    //需要把消费了的消息清除掉,否则会一直保持在stream中,会被重复消费
                redisStreamUtil.delField(redisStreamUtil.getStreamKey001(),recordId);
                if (integer.get()>10){
                    System.out.println("---------退出发送消息--------");
                    pool.shutdown();
                }
            },5,3, TimeUnit.SECONDS);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    4.3、通过http主动发送消息

    • 通过分别发送父类和子类比对查看不同效果
    @RestController
    @RequestMapping("/index")
    public class index {
        @Resource
        private RedisStreamUtil redisStreamUtil;
    
       /**
        * 父类
        */
        @GetMapping("/login")
        public String login(User user){
    
            RecordId recordId = redisStreamUtil.add(redisStreamUtil.getStreamKey001(), user);
            redisStreamUtil.delField(redisStreamUtil.getStreamKey001(),recordId);
    
            return "成功!";
        }
        /**
        * 子类
        */
        @GetMapping("/login2")
        public String login(Book book){
    
            RecordId recordId = redisStreamUtil.add(redisStreamUtil.getStreamKey001(), book);
    
            redisStreamUtil.delField(redisStreamUtil.getStreamKey001(),recordId);
            return "成功!";
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    5、🌟消息接收

    5.1、不绑定消费组—可以实现广播📢效果

    节点消费者不绑定消费组,直接和stream进行绑定,即可实现广播的效果,每次有消息发送到该指定节点的stream,都可以接收到。

    如下图:有消息发送到redis Stream 里面绑定的A0到B2全部可以接收到这条消息
    在这里插入图片描述

    方式1:主动读取

    通过redisTemplate.opsForStream().read()方法主动去stream中读取消息

    /**
     * 独立消费者---可以读取到该key的全部消息
     */
    @Component
    @Slf4j
    public class XreadNonBlockConsumer01 implements InitializingBean, DisposableBean {
        private ThreadPoolExecutor threadPoolExecutor;
        @Resource
        private RedisTemplate<String,Object> redisTemplate;
        private volatile boolean stop = false;
    
    
        /**
         * 初始化bean时执行,以轮询的方式主动去stream的指定key里读取消息
         * @throws Exception
         */
        @Override
        public void afterPropertiesSet() throws Exception {
            // 初始化线程池
            threadPoolExecutor = new ThreadPoolExecutor(3, 5, 0, TimeUnit.SECONDS,
                    new LinkedBlockingDeque<>(), r -> {
                Thread thread = new Thread(r);
                thread.setDaemon(true);
                thread.setName("xread-nonblock-01");
                return thread;
            });
    
            StreamReadOptions options = StreamReadOptions.empty()
    //                如果没有数据,则阻塞1s,阻塞时间需要小于·spring.redis.timeout·
                    .block(Duration.ofMillis(1000))
    //                一直阻塞直到获取数据,可能会报超时异常
    //                .block(Duration.ofMillis(0))
    //                一次获取10条数据
                    .count(10);
    
            StringBuilder readBuilder = new StringBuilder("0-0");
    
            threadPoolExecutor.execute(()->{
                while (!stop){
                    //主动到redis的stream中去读取,options设置了每读取一次阻塞一秒
                    List<ObjectRecord<String, User>> objectRecords = redisTemplate.opsForStream()
                            .read(User.class, options,
                                    StreamOffset.create(RedisStreamUtil.STREAM_KEY_001, ReadOffset.from(readBuilder.toString())));
    
                    if (CollectionUtils.isEmpty(objectRecords)){
                        log.warn("没有读取到数据");
                        continue;
                    }
                    objectRecords.stream().forEach(objectRecord->{
                        log.info("获取到的数据信息 id:[{}] book:[{}]", objectRecord.getId(), objectRecord.getValue());
                        readBuilder.setLength(0);
                        readBuilder.append(objectRecord.getId());
    
                    });
                }
            });
    
    
        }
    
        /**
         * 在销毁bean时把线程池关闭
         * @throws Exception
         */
        @Override
        public void destroy() throws Exception {
            stop = true;
            threadPoolExecutor.shutdown();
            threadPoolExecutor.awaitTermination(3,TimeUnit.SECONDS);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    测试日志

    在这里插入图片描述

    🍉方式2:通过监听器监听是否有新消息

    具体代码和分组的是一样的,只不过不指定组而已,就合并在下面写了
    主要通过StreamMessageListenerContainer这个监听器类实现。

    主要通过下面这一句:

    container.receive(StreamOffset.fromStart(RedisStreamUtil.STREAM_KEY_001), new MonitorStreamListener("独立消费", null, null));

    5.2、指定消费组----实现一个组内只有一个成员可以接收到

    进行分组之后,一个组内,只会有一个成员可以读到消息,具体如下图,当然在使用时也可以绑定多个组,每个组接收不听的消息。下面方式就相当于mq了,交换机,队列和路由键的关系
    在这里插入图片描述

    5.2.1、配置类

    下面代码具体流程时先创建一个线程池;然后在配置消息监听容器,最后在把用于接收消息的监听器放入到监听容器中去,最后把这个侦听容器注入到bean去

    @Configuration
    public class RedisStreamConfiguration {
    
        @Resource
        private RedisStreamUtil redisStreamUtil;
        @Resource
        private RedisConnectionFactory redisConnectionFactory;
    
        @Bean(initMethod = "start",destroyMethod = "stop")
        public StreamMessageListenerContainer<String, ObjectRecord<String,User>> streamMessageListenerContainer(){
            AtomicInteger index = new AtomicInteger(1);
    //        获取本机线程数
            int processors = Runtime.getRuntime().availableProcessors();
            ThreadPoolExecutor executor = new ThreadPoolExecutor(processors, processors, 0,
                    TimeUnit.SECONDS,
                    new ArrayBlockingQueue<>(50)
                    , (r) -> {
                Thread thread = new Thread(r);
                thread.setName("async-stream-consumer-" + index.getAndIncrement());
                thread.setDaemon(true);
                return thread;
            }, new ThreadPoolExecutor.CallerRunsPolicy());
    
    //        消息监听容器,不能在外部实现。创建后,StreamMessageListenerContainer可以订阅Redis流并使用传入的消息
            StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String,ObjectRecord<String,User>> options =
                    StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                            .builder()
                            // 一次最多获取多少条消息
                            .batchSize(10)
                            // 运行 Stream 的 poll task
                            .executor(executor)
                            // Stream 中没有消息时,阻塞多长时间,需要比 `spring.redis.timeout` 的时间小
                            .pollTimeout(Duration.ofSeconds(1))
                            // ObjectRecord 时,将 对象的 filed 和 value 转换成一个 Map 比如:将Book对象转换成map
    //                        .objectMapper(new ObjectHashMapper())
                            // 获取消息的过程或获取到消息给具体的消息者处理的过程中,发生了异常的处理
                            .errorHandler(new CustomErrorHandler())
                            // 将发送到Stream中的Record转换成ObjectRecord,转换成具体的类型是这个地方指定的类型
                            .targetType(User.class)
                            .build();
    
    
            StreamMessageListenerContainer<String, ObjectRecord<String, User>> container = StreamMessageListenerContainer.create(redisConnectionFactory, options);
    
    
            //        初始化-绑定key和消费组
            redisStreamUtil.initStream(RedisStreamUtil.STREAM_KEY_001,"group-a");
    
    //        不绑定消费组,独立消费
            container.receive(StreamOffset.fromStart(RedisStreamUtil.STREAM_KEY_001), new MonitorStreamListener("独立消费", null, null));
    
            // 消费组A,不自动ack
            // 从消费组中没有分配给消费者的消息开始消费
    //        container.receive(Consumer.from("group-a","consumer-a"),
    //                StreamOffset.create(RedisStreamUtil.STREAM_KEY_001, ReadOffset.lastConsumed()),new MonitorStreamListener("消费者组A","group-a", "consumer-a"));
    
    //        自动ack
            container.receiveAutoAck(Consumer.from("group-a","consumer-b"),
                    StreamOffset.create(RedisStreamUtil.STREAM_KEY_001, ReadOffset.lastConsumed()),new MonitorStreamListener("消费者组B","group-a", "consumer-b"));
    
            return container;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63

    重要代码解析:

    1. .targetType(User.class) :在配置监听容器时,用于指定类型,不指定时默认是string类型,如果你传入的不是string机需要指定;如果配置的是父类的,也可以接收子类的消息,进行转换。但如果是配置的Object类型,接收时就会为路径,不能正常得到传入的对象(不知道为什么,有研究懂的可以解答一下)
    2. redisStreamUtil.initStream(RedisStreamUtil.STREAM_KEY_001,"group-a"):在第一次生成时,需要把消费组绑定该stream的key,否则会报错,具体内部执行逻辑可以看initStream()方法(或者自己手动通过命令到redis去绑定:xgroup create stream-001 group-a $)stream-001(key) group-a(消费组)
    3. container.receive(StreamOffset.fromStart(RedisStreamUtil.STREAM_KEY_001), new MonitorStreamListener("独立消费", null, null)) :该句是不绑定消费组,也就是广播的方式监听该key中的所有消息(和上面的区别是,该方式是被动的监听消息
    4. 🌟container.receiveAutoAck(Consumer.from("group-a","consumer-b") ,StreamOffset.create(RedisStreamUtil.STREAM_KEY_001, ReadOffset.lastConsumed()),new MonitorStreamListener("消费者组B","group-a", "consumer-b"))就是通过该句代码实现分组监听消息的,绑定了消费组和消费者的名字,以及监听器类。然后使用的自动ack的方式回复stream确认接收到了消息(或者通过手动ack的方式返回stream接收到了消息,否则会重复发送),
    5.2.2、监听器

    用于接收消息然后实现具体业务代码

    @Slf4j
    public class MonitorStreamListener <T> implements StreamListener<String, ObjectRecord<String,T>> {
    
        /**
         * 消费者类型:独立消费、消费组消费
         */
        private String consumerType;
        /**
         * 消费组
         */
        private String group;
        /**
         * 消费组中的某个消费者
         */
        private String consumerName;
    
        public MonitorStreamListener(String consumerType, String group, String consumerName) {
            this.consumerType = consumerType;
            this.group = group;
            this.consumerName = consumerName;
        }
        
        @Override
        public void onMessage(ObjectRecord<String, T> message) {
            log.info("接受到来自redis的消息");
    
            String stream = message.getStream();
            RecordId id = message.getId();
            User value = (User) message.getValue();
            value.getName();
    
    //        执行具体的接收到消息的业务逻辑
            if (StringUtils.isEmpty(group)) {
                log.info("[{}]: 接收到一个消息 stream:[{}],id:[{}],value:[{}]", consumerType, stream, id, value);
    
            } else {
                log.info("[{}] group:[{}] consumerName:[{}] 接收到一个消息 stream:[{}],id:[{}],value:[{}]", consumerType,
                        group, consumerName, stream, id, value);
            }
            
            // 当是消费组消费时,如果不是自动ack,则需要在这个地方手动ack
    //        redisTemplate.opsForStream()
    //                 .acknowledge("key","group","recordId");
            
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    通过延时队列发送到消息—测试结果:

    在这里插入图片描述

    通过http发送User到子类Book对象数据测试结果

    在这里插入图片描述
    结果:也是可以正常接受到的
    在这里插入图片描述

    三、完整代码

    🪟完整代码仓库地址

    四、引用

    https://juejin.cn/post/7029302992364896270#heading-0
    https://juejin.cn/post/6844904125822435341?searchId=202310141054532F9807A1000F6680C0DF#heading-1

  • 相关阅读:
    XTU-OJ 1191-Magic
    Google Analytics Service account 认证指南
    python web框架django面试题收藏
    sklearn快速入门教程:缺失值
    vs 2022无法安装 vc_runtimeMinmum_x86错误
    【华为机试真题 Python】按身高和体重排队
    说说用户线程和守护线程
    npm install:Could not resolve dependency:peer...
    PHP知识点
    隐藏层节点数对网络分类行为的影响
  • 原文地址:https://blog.csdn.net/weixin_52315708/article/details/133844633