• 使用Jedis监听Redis Stream 实现消息队列功能


    简介

    之前使用SpringBoot去监听Redis Stream实现了消息队列的功能,本次分享的是使用Jedis来实现同样的功能,而且还可以继续拓展功能,因为Jedis我觉得还是比之前那种方式要灵活。本次实现的监听可以使用多线程去监听。

    之前通过SpringBoot实现文章链接:
    SpringBoot 中使用Redis Stream 实现消息监听

    视频演示

    使用Jedis自己实现监听Redis Stream的功能达到消息队列的效果Demo

    实现原理

    这次实现监听我分为了通过群组和消费者监听和模式使用xread的原生监听,它们的区别就是如果使用的是通过群组和消费者这样的监听可以确保消息只会被同一个消费者消费一次,不会对消息进行重复消费,适合要求数据唯一性的场景,比如入库或者其他的操作。默认的xread实现方式监听有几个线程那么这几个线程会同时收到相同的插入的消息,可以理解为广播的方式去接受消息。

    这里主要基于的是Redis Stream中的一下几个命令对应Jedis的方法:

    • xadd:创建群组
    • xread:读取数据
    • xgroup: 创建群组
    • xreadgroup:读取群组消息

    读取时主要用到了它们的block属性,将block属性设置为0则表示一直阻塞直到收到新的消息,然后我将这一步骤放到一个轮询中,实现了阻塞收到消息后进入到下一次阻塞,从而实现了监听到效果。

    实现代码

    本次demo代码很简单,都放到了一个类中,并且只要有redis就可以在修改代码中的配置后直接进行运行,不需要手动创建stream或者群组等操作。

    pom.xml文件

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>vip.huhailong</groupId>
        <artifactId>JRedisMQ</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <properties>
            <maven.compiler.source>8</maven.compiler.source>
            <maven.compiler.target>8</maven.compiler.target>
            <jedis.version>4.2.3</jedis.version>
        </properties>
    
        <!--  jedis dependency  -->
        <dependencies>
            <dependency>
                <groupId>redis.clients</groupId>
                <artifactId>jedis</artifactId>
                <version>${jedis.version}</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
                <version>1.7.36</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-classic -->
            <dependency>
                <groupId>ch.qos.logback</groupId>
                <artifactId>logback-classic</artifactId>
                <version>1.2.11</version>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>2.0.7</version>
            </dependency>
        </dependencies>
    
    </project>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    实现代码

    package jredismq.test;
    
    import com.alibaba.fastjson.JSONObject;
    import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.JedisPool;
    import redis.clients.jedis.StreamEntryID;
    import redis.clients.jedis.params.XAddParams;
    import redis.clients.jedis.params.XReadGroupParams;
    import redis.clients.jedis.params.XReadParams;
    import redis.clients.jedis.resps.StreamEntry;
    
    import java.time.LocalDateTime;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.stream.IntStream;
    
    /**
     * 使用jedis实现监听stream消息
     */
    public class JedisStreamMQTest {
    
        private static final Logger logger = LoggerFactory.getLogger(JedisStreamMQTest.class);
    
        public static void main(String[] args) {
    		//以下内容根据自己的情况进行修改
            String host = "192.168.1.110";
            int port = 6379;
            int timeout = 1000;
            String password = "huhailong";
            int database = 0;
    
            String streamKeyName = "streamtest";
            String groupName = "testgroup";
    
            String[]consumerNames = {"huhailong", "xiaohu"};
    
            String listenerType = "DEFAULT";  //GROUP or DEFAULT
    
            //创建 redis 连接池实例
            JedisPool pool = new JedisPool(new GenericObjectPoolConfig<>(),host,port,timeout,password,database);
    
            JedisStreamMQTest test = new JedisStreamMQTest();
            test.createGroup(pool,streamKeyName,groupName); //创建群组
    
            if("GROUP".equals(listenerType)){
                test.listenerByGroup(pool,streamKeyName,groupName,consumerNames);   //使用群组和消费者监听
            }else{
                test.listenerDefault(pool,streamKeyName);
            }
    
            new Thread(()->{    //线程3:用于写入stream数据
                Jedis jedis = pool.getResource();
                while(true) {
                    try {
                        Thread.sleep(500L);
                        Map<String,String> map = new HashMap<>();
                        map.put("currentTime", LocalDateTime.now().toString());
                        jedis.xadd(streamKeyName,map, XAddParams.xAddParams());
                    } catch (Exception e) {
                        logger.error(e.getMessage());
                    }
                }
            }).start();
        }
    
        /**
         * 使用群组和消费者监听,该监听可以确保消息不会重复消费,因为每个组每个用户只会消费一次消息
         * @param keyName   stream 名称
         * @param groupName 群组名称
         * @param consumerNames 消费者名称集合
         */
        private void listenerByGroup(JedisPool pool, String keyName, String groupName, String...consumerNames){
            Map<String, StreamEntryID> entryIDMap = new HashMap<>();
            entryIDMap.put(keyName,StreamEntryID.UNRECEIVED_ENTRY);
            //以下为了演示简单就不用线程池了,直接创建两个线程说明问题
            IntStream.range(0,2).forEach(i->{
                Jedis jedis = pool.getResource();   //创建jedis实例
                new Thread(()->{
                    while(true){
                        try{
                            Thread.sleep(500L);
                            //下面的 xreadGroup 方法等同与redis中的xreadgroup命令,将block阻塞时间设置为0表示一直阻塞知道收到消息,然后上面StreamEntryID设置为接收最新值
                            List<Map.Entry<String, List<StreamEntry>>> entries = jedis.xreadGroup(groupName, consumerNames[i], XReadGroupParams.xReadGroupParams().block(0), entryIDMap);
                            logger.info("Thread:-{},result:{}",Thread.currentThread().getName(), JSONObject.toJSONString(entries.get(0).getValue().get(0).getFields()));
    //                        jedis.xack(keyName,groupName,entries.get(0).getValue().get(0).getID()); //确认消息
                            jedis.xdel(keyName,entries.get(0).getValue().get(0).getID());   //删除消息
                        } catch (Exception e){
                            logger.error(e.getMessage());
                        }
                    }
                }).start();
            });
    
        }
    
        /**
         * 不使用群组和消费者的概念读取,多个线程会重复消费数据
         * @param keyName stream 名称
         */
        private void listenerDefault(JedisPool pool, String keyName){
            Map<String, StreamEntryID> entryIDMap = new HashMap<>();
            entryIDMap.put(keyName,StreamEntryID.LAST_ENTRY);
            //以下为了演示简单就不用线程池了,直接创建两个线程说明问题
            IntStream.range(0,2).forEach(i->{
                new Thread(()->{
                    Jedis jedis = pool.getResource();   //创建jedis实例
                    while(true){
                        try{
                            Thread.sleep(500L);
                            List<Map.Entry<String, List<StreamEntry>>> entries = jedis.xread(XReadParams.xReadParams().block(0), entryIDMap);
                            logger.info("Thread:-{},result:{}",Thread.currentThread().getName(), JSONObject.toJSONString(entries.get(0).getValue().get(0).getFields()));
                            jedis.xdel(keyName,entries.get(0).getValue().get(0).getID());
                        } catch (Exception e){
                            logger.error(e.getMessage());
                        }
                    }
                }).start();
            });
        }
    
        private void createGroup(JedisPool pool, String keyName, String groupName){
            Jedis jedis = pool.getResource();
            try{
                //StreamEntryID 表示创建群组并接收新的消息,这里可以根据自己需要设置,0表示读取所有历史消息,后面的boolean值表示如果stream不存在是否创建stream
                jedis.xgroupCreate(keyName,groupName,StreamEntryID.LAST_ENTRY,true);
            } catch (Exception e){
                //这里捕获异常的原因是可能创建时群组已经存在
                logger.error(e.getMessage());
            }
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136

    本demo的代码还是比较简单的,大家可以根据自己的需求修改和封装。我也在继续探索使用这种方式封装和完善成一个完整的项目,不依赖于第三方框架例如Spring的项目,这样就可以灵活的使用它了,觉得有用的小伙伴要点赞哟!

  • 相关阅读:
    Openssl数据安全传输平台008:业务数据分析+工厂方法
    在PG或HGDB上启用块校验checksum
    PID控制理论
    使用Jenkins加gitee自动构建部署SpringBoot项目
    虚拟化逻辑架构: 创建KVM中的VM与实现VNC远程登录
    基于VHDL的简易CPU设计
    Freeswitch操作基本配置
    元宇宙的宏观与微观趋势
    000-视频与网络应用篇-目录
    PyCharm运行python测试,报错“没有发现测试”/“空套件”
  • 原文地址:https://blog.csdn.net/hhl18730252820/article/details/125457406