• springboot:整合redis之消息队列


    springboot:整合redis之消息队列

    一、项目准备

    依赖

            <!-- RedisTemplate -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-data-redis</artifactId>
            </dependency>
            <!-- Redis-Jedis -->
            <dependency>
                <groupId>redis.clients</groupId>
                <artifactId>jedis</artifactId>
                <version>2.9.0</version>
            </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    application.yaml配置文件

    spring:
      redis:
        host: 127.0.0.1
        port: 6379
        database: 0
        timeout: 4000
        jedis:
          pool:
            max-wait: -1
            max-active: -1
            max-idle: 20
            min-idle: 10
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    二、配置类

    public class ObjectMapperConfig {
    
        public static final ObjectMapper objectMapper;
        private static final String PATTERN = "yyyy-MM-dd HH:mm:ss";
    
        static {
            JavaTimeModule javaTimeModule = new JavaTimeModule();
            javaTimeModule.addSerializer(LocalDateTime.class, new LocalDateTimeSerializer());
            javaTimeModule.addDeserializer(LocalDateTime.class, new LocalDateTimeDeserializer());
            objectMapper = new ObjectMapper()
                    // 转换为格式化的json(控制台打印时,自动格式化规范)
                    //.enable(SerializationFeature.INDENT_OUTPUT)
                    // Include.ALWAYS  是序列化对像所有属性(默认)
                    // Include.NON_NULL 只有不为null的字段才被序列化,属性为NULL 不序列化
                    // Include.NON_EMPTY 如果为null或者 空字符串和空集合都不会被序列化
                    // Include.NON_DEFAULT 属性为默认值不序列化
                    .setSerializationInclusion(JsonInclude.Include.NON_NULL)
                    // 如果是空对象的时候,不抛异常
                    .configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false)
                    // 反序列化的时候如果多了其他属性,不抛出异常
                    .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
                    // 取消时间的转化格式,默认是时间戳,可以取消,同时需要设置要表现的时间格式
                    .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false)
                    .setDateFormat(new SimpleDateFormat(PATTERN))
                    // 对LocalDateTime序列化跟反序列化
                    .registerModule(javaTimeModule)
    
                    .setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY)
                    // 此项必须配置,否则会报java.lang.ClassCastException: java.util.LinkedHashMap cannot be cast to XXX
                    .enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY)
            ;
        }
    
        static class LocalDateTimeSerializer extends JsonSerializer<LocalDateTime> {
            @Override
            public void serialize(LocalDateTime value, JsonGenerator gen, SerializerProvider serializers) throws IOException {
                gen.writeString(value.format(DateTimeFormatter.ofPattern(PATTERN)));
            }
        }
    
        static class LocalDateTimeDeserializer extends JsonDeserializer<LocalDateTime> {
            @Override
            public LocalDateTime deserialize(JsonParser p, DeserializationContext deserializationContext) throws IOException {
                return LocalDateTime.parse(p.getValueAsString(), DateTimeFormatter.ofPattern(PATTERN));
            }
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    @Configuration
    public class RedisConfig {
    
        /**
         * redisTemplate配置
         */
        @Bean
        public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
            RedisTemplate<String, Object> template = new RedisTemplate<>();
            // 配置连接工厂
            template.setConnectionFactory(factory);
    
            //使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值(默认使用JDK的序列化方式)
            Jackson2JsonRedisSerializer<Object> jacksonSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
            jacksonSerializer.setObjectMapper(ObjectMapperConfig.objectMapper);
            StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
    
            // 使用StringRedisSerializer来序列化和反序列化redis的key,value采用json序列化
            template.setKeySerializer(stringRedisSerializer);
            template.setValueSerializer(jacksonSerializer);
    
            // 设置hash key 和value序列化模式
            template.setHashKeySerializer(stringRedisSerializer);
            template.setHashValueSerializer(jacksonSerializer);
            template.afterPropertiesSet();
    
            return template;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    三、redis中list数据类型

    在Redis中,List类型是按照插入顺序排序的字符串链表。和数据结构中的普通链表一样,我们可以在其头部和尾部添加新的元素

    优势:

    1. 顺序排序,保证先进先出
    2. 队列为空时,自动从Redis数据库删除
    3. 在队列的两头插入或删除元素,效率极高,即使队列中元素达到百万级
    4. List中可以包含的最大元素数量是4294967295

    定时器监听队列

    生产者

    @Slf4j
    @Component
    public class MessageProducer {
    
        public static final String MESSAGE_KEY = "message:queue";
    
        @Autowired
        private RedisTemplate<String,Object> redisTemplate;
    
        public void lPush() {
            for (int i = 0; i < 10; i++) {
                new Thread(() -> {
                    Long size = redisTemplate.opsForList().leftPush(MESSAGE_KEY, Thread.currentThread().getName() + ":hello world");
                    log.info(Thread.currentThread().getName() + ":put message size = " + size);
                }).start();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    消费者:消费消息,定时器以达到监听队列功能

    @Slf4j
    @Component
    @EnableScheduling
    public class MessageConsumer {
    
        public static final String MESSAGE_KEY = "message:queue";
    
        @Autowired
        private RedisTemplate<String,Object> redisTemplate;
    
        @Scheduled(initialDelay = 5 * 1000, fixedRate = 2 * 1000)
        public void rPop() {
            String message = (String) redisTemplate.opsForList().rightPop(MESSAGE_KEY);
            log.info(message);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    @RestController
    public class RedisController {
    
        @Autowired
        private MessageProducer messageProducer;
    
        @GetMapping("/lPush")
        public void lPush() {
            messageProducer.lPush();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    测试

    http://localhost:8080/lPush

    在这里插入图片描述

    可能出现的问题:

    1.通过定时器监听List中是否有待处理消息,每执行一次都会发起一次连接,这会造成不必要的浪费。

    2.生产速度大于消费速度,队列堆积,消息时效性差,占用内存。

    运行即监控队列

    修改消息消费者代码。

    当队列没有元素时,会阻塞10秒,然后再次监听队列,
    需要注意的是,阻塞时间必须小于连接超时时间

    @Slf4j
    @Component
    @EnableScheduling
    public class MessageConsumer {
    
        public static final String MESSAGE_KEY = "message:queue";
    
        @Autowired
        private RedisTemplate<String,Object> redisTemplate;
    
        //@Scheduled(initialDelay = 5 * 1000, fixedRate = 2 * 1000)
        public void rPop() {
            String message = (String) redisTemplate.opsForList().rightPop(MESSAGE_KEY);
            log.info(message);
        }
    
        @PostConstruct
        public void brPop() {
            new Thread(() -> {
                while (true) {
                    String message = (String) redisTemplate.opsForList().rightPop(MESSAGE_KEY, 10, TimeUnit.SECONDS);
                    log.info(message);
                }
            }).start();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    在这里插入图片描述

    阻塞时间不能为负,直接报错超时为负
    阻塞时间为零,此时阻塞时间等于超时时间,最后报错连接超时
    阻塞时间大于超时时间,报错连接超时

    测试:

    在这里插入图片描述

    消息不可重复消费,因为消息从队列POP之后就被移除了,即不支持多个消费者消费同一批数据

    消息丢失,消费期间发生异常,消息未能正常消费

    四、发布/订阅模式

    消息可以重复消费,多个消费者订阅同一频道即可

    一个消费者根据匹配规则订阅多个频道

    消费者只能消费订阅之后发布的消息,这意味着,消费者下线再上线这期间发布的消息将会丢失

    数据不具有持久化。同样Redis宕机也会数据丢失

    消息发布后,是推送到一个缓冲区(内存),消费者从缓冲区拉取消息,当消息堆积,缓冲区溢出,消费者就会被迫下线,同时释放对应的缓冲区

    RedisConfig中添加监听器

        /**
         * redis消息监听器容器
         */
        @Bean
        public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
            RedisMessageListenerContainer container = new RedisMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
    
            //订阅频道,通配符*表示任意多个占位符
            container.addMessageListener(new MySubscribe(), new PatternTopic("channel*"));
    
            return container;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    订阅者

    package com.yzm.redis08.message;
    
    import org.springframework.data.redis.connection.Message;
    import org.springframework.data.redis.connection.MessageListener;
    
    public class MySubscribe implements MessageListener {
    
        @Override
        public void onMessage(Message message, byte[] bytes) {
            System.out.println("订阅频道:" + new String(message.getChannel()));
            System.out.println("接收数据:" + new String(message.getBody()));
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    消息发布

        @GetMapping("/publish")
        public void publish() {
            redisTemplate.convertAndSend("channel_first", "hello world");
        }
    
    • 1
    • 2
    • 3
    • 4

    在这里插入图片描述

    另一种发布方式

        /**
         * redis消息监听器容器
         */
        @Bean
        public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
            RedisMessageListenerContainer container = new RedisMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
    
            //订阅频道,通配符*表示任意多个占位符
            container.addMessageListener(new MySubscribe(), new PatternTopic("channel*"));
            // 通配符?:表示一个占位符
            MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(new MySubscribe2(), "getMessage");
            listenerAdapter.afterPropertiesSet();
            container.addMessageListener(listenerAdapter, new PatternTopic("channel?"));
    
            return container;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    public class MySubscribe2 {
    
        public void getMessage(Object message, String channel) {
            System.out.println("订阅频道2:" + channel);
            System.out.println("接收数据2:" + message);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
        @GetMapping("/publish2")
        public void publish2() {
            redisTemplate.convertAndSend("channel2", "hello world");
        }
    
    • 1
    • 2
    • 3
    • 4

    在这里插入图片描述

    消息是实体对象,进行转换

    @Data
    @Builder
    @NoArgsConstructor
    @AllArgsConstructor
    public class User implements Serializable {
        private static final long serialVersionUID = 5250232737975907491L;
        private Integer id;
        private String username;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    public class MySubscribe3 implements MessageListener {
    
        @Override
        public void onMessage(Message message, byte[] bytes) {
            Jackson2JsonRedisSerializer<User> jacksonSerializer = new Jackson2JsonRedisSerializer<>(User.class);
            jacksonSerializer.setObjectMapper(ObjectMapperConfig.objectMapper);
            User user = jacksonSerializer.deserialize(message.getBody());
            
            System.out.println("订阅频道3:" + new String(message.getChannel()));
            System.out.println("接收数据3:" + user);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
        /**
         * redis消息监听器容器
         */
        @Bean
        public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
            RedisMessageListenerContainer container = new RedisMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
    
            //订阅频道,通配符*:表示任意多个占位符
            container.addMessageListener(new MySubscribe(), new PatternTopic("channel*"));
            // 通配符?:表示一个占位符
            MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(new MySubscribe2(), "getMessage");
            listenerAdapter.afterPropertiesSet();
            container.addMessageListener(listenerAdapter, new PatternTopic("channel?"));
    
            container.addMessageListener(new MySubscribe3(), new PatternTopic("user"));
    
            return container;
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
        @GetMapping("/publish3")
        public void publish3() {
            User user = User.builder().id(1).username("yzm").build();
            redisTemplate.convertAndSend("user", user);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    在这里插入图片描述

    五、ZSet实现延迟队列

    生产消息,score = 时间搓+60s随机数

        public static final String MESSAGE_ZKEY = "message:ZSetqueue";
        public volatile AtomicInteger count =  new AtomicInteger();
        public void zAdd() {
            for (int i = 0; i < 10; i++) {
                new Thread(() -> {
                    int increment = count.getAndIncrement();
                    log.info(Thread.currentThread().getName() + ":put message to zset = " + increment);
                    double score = System.currentTimeMillis() + new Random().nextInt(60 * 1000);
                    redisTemplate.opsForZSet().add(MESSAGE_ZKEY, Thread.currentThread().getName() + " hello zset:" + increment, score);
                }).start();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    消费者:定时任务,每秒执行一次

        public static final String MESSAGE_ZKEY = "message:ZSetqueue";
        public SimpleDateFormat simpleDateFormat = new SimpleDateFormat();
        @Scheduled(initialDelay = 5 * 1000, fixedRate = 1000)
        public void zrangebysocre() {
            log.info("延时队列消费。。。");
            // 拉取score小于当前时间戳的消息
            Set<Object> messages = redisTemplate.opsForZSet().rangeByScore(MESSAGE_ZKEY, 0, System.currentTimeMillis());
            if (messages != null) {
                for (Object message : messages) {
                    Double score = redisTemplate.opsForZSet().score(MESSAGE_ZKEY, message);
                    log.info("消费了:" + message + "消费时间为:" + simpleDateFormat.format(score));
                    redisTemplate.opsForZSet().remove(MESSAGE_ZKEY, message);
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
        @GetMapping("/zadd")
        public void zadd() {
            messageProducer.zAdd();
        }
    
    • 1
    • 2
    • 3
    • 4

    在这里插入图片描述

  • 相关阅读:
    爬虫(2) - Requests(1) | Requests模块的深度解析
    C/C++学习 -- RSA算法
    前端项目:小程序电商管理平台难点整理
    15 个面向深度学习爱好者的开放数据集
    《吐血整理》高级系列教程-吃透Fiddler抓包教程(24)-Fiddler如何优雅地在正式和测试环境之间来回切换-中篇
    HTTP图解基础知识
    API(八)cosocket常用SDK
    STM32CubeMX教程9 USART/UART 异步通信
    Android 发布Library 到远程maven 私服仓库(Nexus)
    跨境电商去哪做好?东南亚六国电商情况一览
  • 原文地址:https://blog.csdn.net/weixin_43296313/article/details/125444514