• 学习Redis这一篇就够了


    这里写目录标题

    本文脑图

    redis基本数据结构

    本文脑图

    前言

    Redis是基于c语言编写的开源非关系型内存数据库,可以用作数据库、缓存、消息中间件,这么优秀的东西客定要一点一点的吃透它。

    这是关于Redis五种数据结构详解,包括这五种的数据结构的底层原理实现。

    理论肯定是要用于实践的,因此最重要的还是实战部分,也就是这里还会讲解五种数据结构的应用场景。

    话不多说,我们直接进入主题,很多人都知道Redis的五种数据结构包括以下五种:

    1. String:字符串类型
    2. List:列表类型
    3. Set:无序集合类型
    4. ZSet:有序集合类型
    5. Hash:哈希表类型

    但是作为一名优秀的程序员可能不能只停留在只会用着五种类型进行crud工作,还是得深入了解这五种数据结构的底层原理。

    Redis核心对象

    在Redis中有一个核心的对象叫做redisObject ,是用来表示所有的key和value的,用redisObject结构体来表示String、Hash、List、Set、ZSet五种数据类型。

    redisObject的源代码在redis.h中,使用c语言写的,感兴趣的可以自行查看,关于redisObject我这里画了一张图,表示redisObject的结构如下所示:

    闪瞎人的五颜六色图

    在redisObject中type表示属于哪种数据类型,encoding表示该数据的存储方式,也就是底层的实现的该数据类型的数据结构。因此这篇文章具体介绍的也是encoding对应的部分。

    那么encoding中的存储类型又分别表示什么意思呢?具体数据类型所表示的含义,如下图所示:

    图片截图出自《Redis设计与实现第二版》

    可能看完这图,还是觉得一脸懵。不慌,会进行五种数据结构的详细介绍,这张图只是让你找到每种中数据结构对应的储存类型有哪些,大概脑子里有个印象。

    举一个简单的例子,你在Redis中设置一个字符串key 234,然后查看这个字符串的存储类型就会看到为int类型,非整数型的使用的是embstr储存类型,具体操作如下图所示:

    String类型

    String是Redis最基本的数据类型,上面的简介中也说到Redis是用c语言开发的。但是Redis中的字符串和c语言中的字符串类型却是有明显的区别。

    String类型的数据结构存储方式有三种int、raw、embstr。那么这三种存储方式有什么区别呢?

    int

    Redis中规定假如存储的是整数型值,比如set num 123这样的类型,就会使用 int的存储方式进行存储,在redisObject的ptr属性中就会保存该值。

    SDS

    假如存储的字符串是一个字符串值并且长度大于32个字节就会使用SDS(simple dynamic string)方式进行存储,并且encoding设置为raw;若是字符串长度小于等于32个字节就会将encoding改为embstr来保存字符串。

    SDS称为简单动态字符串,对于SDS中的定义在Redis的源码中有的三个属性int len、int free、char buf[]

    len保存了字符串的长度,free表示buf数组中未使用的字节数量,buf数组则是保存字符串的每一个字符元素。

    因此当你在Redsi中存储一个字符串Hello时,根据Redis的源代码的描述可以画出SDS的形式的redisObject结构图如下图所示:

    SDS与c语言字符串对比

    Redis使用SDS作为存储字符串的类型肯定是有自己的优势,SDS与c语言的字符串相比,SDS对c语言的字符串做了自己的设计和优化,具体优势有以下几点:

    (1)c语言中的字符串并不会记录自己的长度,因此每次获取字符串的长度都会遍历得到,时间的复杂度是O(n),而Redis中获取字符串只要读取len的值就可,时间复杂度变为O(1)。

    (2)c语言中两个字符串拼接,若是没有分配足够长度的内存空间就会出现缓冲区溢出的情况;而SDS会先根据len属性判断空间是否满足要求,若是空间不够,就会进行相应的空间扩展,所以不会出现缓冲区溢出的情况

    (3)SDS还提供空间预分配惰性空间释放两种策略。在为字符串分配空间时,分配的空间比实际要多,这样就能减少连续的执行字符串增长带来内存重新分配的次数

    当字符串被缩短的时候,SDS也不会立即回收不适用的空间,而是通过free属性将不使用的空间记录下来,等后面使用的时候再释放。

    具体的空间预分配原则是:当修改字符串后的长度len小于1MB,就会预分配和len一样长度的空间,即len=free;若是len大于1MB,free分配的空间大小就为1MB

    (4)SDS是二进制安全的,除了可以储存字符串以外还可以储存二进制文件(如图片、音频,视频等文件的二进制数据);而c语言中的字符串是以空字符串作为结束符,一些图片中含有结束符,因此不是二进制安全的。

    为了方便易懂,做了一个c语言的字符串和SDS进行对比的表格,如下所示:

    c语言字符串

    SDS

    获取长度的时间复杂度为O(n)

    获取长度的时间复杂度为O(1)

    不是二进制安全的

    是二进制安全的

    只能保存字符串

    还可以保存二进制数据

    n次增长字符串必然会带来n次的内存分配

    n次增长字符串内存分配的次数<=n

    String类型应用

    说到这里我相信很多人可以说已经精通Redis的String类型了,但是纯理论的精通,理论还是得应用实践,上面说到String可以用来存储图片,现在就以图片存储作为案例实现。

    (1)首先要把上传得图片进行编码,这里写了一个工具类把图片处理成了Base64得编码形式,具体得实现代码如下:

     /**
         * 将图片内容处理成Base64编码格式
         * @param file
         * @return
         */
        public static String encodeImg(MultipartFile file) {
            byte[] imgBytes = null;
            try {
                imgBytes = file.getBytes();
            } catch (IOException e) {
                e.printStackTrace();
            }
            BASE64Encoder encoder = new BASE64Encoder();
            return imgBytes==null?null:encoder.encode(imgBytes );
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    (2)第二步就是把处理后的图片字符串格式存储进Redis中,实现得代码如下所示:

        /**
         * Redis存储图片
         * @param file
         * @return
         */
        public void uploadImageServiceImpl(MultipartFile image) {
            String imgId = UUID.randomUUID().toString();
            String imgStr= ImageUtils.encodeImg(image);
            redisUtils.set(imgId , imgStr);
            // 后续操作可以把imgId存进数据库对应的字段,如果需要从redis中取出,只要获取到这个字段后从redis中取出即可。
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    这样就是实现了图片得二进制存储,当然String类型得数据结构得应用也还有常规计数:统计微博数、统计粉丝数等。

    Hash类型

    Hash对象的实现方式有两种分别是ziplist、hashtable,其中hashtable的存储方式key是String类型的,value也是以key value的形式进行存储。

    字典类型的底层就是hashtable实现的,明白了字典的底层实现原理也就是明白了hashtable的实现原理,hashtable的实现原理可以于HashMap的是底层原理相类比。

    字典

    两者在新增时都会通过key计算出数组下标,不同的是计算法方式不同,HashMap中是以hash函数的方式,而hashtable中计算出hash值后,还要通过sizemask 属性和哈希值再次得到数组下标。

    我们知道hash表最大的问题就是hash冲突,为了解决hash冲突,假如hashtable中不同的key通过计算得到同一个index,就会形成单向链表(链地址法),如下图所示:

    rehash

    在字典的底层实现中,value对象以每一个dictEntry的对象进行存储,当hash表中的存放的键值对不断的增加或者减少时,需要对hash表进行一个扩展或者收缩。

    这里就会和HashMap一样也会就进行rehash操作,进行重新散列排布。从上图中可以看到有ht[0]ht[1]两个对象,先来看看对象中的属性是干嘛用的。

    在hash表结构定义中有四个属性分别是dictEntry **table、unsigned long size、unsigned long sizemask、unsigned long used,分别表示的含义就是哈希表数组、hash表大小、用于计算索引值,总是等于size-1、hash表中已有的节点数

    ht[0]是用来最开始存储数据的,当要进行扩展或者收缩时,ht[0]的大小就决定了ht[1]的大小,ht[0]中的所有的键值对就会重新散列到ht[1]中。

    扩展操作:ht[1]扩展的大小是比当前 ht[0].used 值的二倍大的第一个 2 的整数幂;收缩操作:ht[0].used 的第一个大于等于的 2 的整数幂。

    当ht[0]上的所有的键值对都rehash到ht[1]中,会重新计算所有的数组下标值,当数据迁移完后ht[0]就会被释放,然后将ht[1]改为ht[0],并新创建ht[1],为下一次的扩展和收缩做准备。

    渐进式rehash

    假如在rehash的过程中数据量非常大,Redis不是一次性把全部数据rehash成功,这样会导致Redis对外服务停止,Redis内部为了处理这种情况采用渐进式的rehash

    Redis将所有的rehash的操作分成多步进行,直到都rehash完成,具体的实现与对象中的rehashindex属性相关,若是rehashindex 表示为-1表示没有rehash操作

    当rehash操作开始时会将该值改成0,在渐进式rehash的过程更新、删除、查询会在ht[0]和ht[1]中都进行,比如更新一个值先更新ht[0],然后再更新ht[1]。

    而新增操作直接就新增到ht[1]表中,ht[0]不会新增任何的数据,这样保证ht[0]只减不增,直到最后的某一个时刻变成空表,这样rehash操作完成。

    上面就是字典的底层hashtable的实现原理,说完了hashtable的实现原理,我们再来看看Hash数据结构的两一种存储方式ziplist(压缩列表)

    ziplist

    压缩列表(ziplist)是一组连续内存块组成的顺序的数据结构,压缩列表能够节省空间,压缩列表中使用多个节点来存储数据。

    压缩列表是列表键和哈希键底层实现的原理之一,压缩列表并不是以某种压缩算法进行压缩存储数据,而是它表示一组连续的内存空间的使用,节省空间,压缩列表的内存结构图如下:

    压缩列表中每一个节点表示的含义如下所示:

    1. zlbytes:4个字节的大小,记录压缩列表占用内存的字节数。
    2. zltail:4个字节大小,记录表尾节点距离起始地址的偏移量,用于快速定位到尾节点的地址。
    3. zllen:2个字节的大小,记录压缩列表中的节点数。
    4. entry:表示列表中的每一个节点。
    5. zlend:表示压缩列表的特殊结束符号'0xFF'

    再压缩列表中每一个entry节点又有三部分组成,包括previous_entry_ength、encoding、content

    1. previous_entry_ength表示前一个节点entry的长度,可用于计算前一个节点的其实地址,因为他们的地址是连续的。
    2. encoding:这里保存的是content的内容类型和长度。
    3. content:content保存的是每一个节点的内容。

    说到这里相信大家已经都hash这种数据结构已经非常了解,若是第一次接触Redis五种基本数据结构的底层实现的话,建议多看几遍,下面来说一说hash的应用场景。

    应用场景

    哈希表相对于String类型存储信息更加直观,擦欧总更加方便,经常会用来做用户数据的管理,存储用户的信息。

    hash也可以用作高并发场景下使用Redis生成唯一的id。下面我们就以这两种场景用作案例编码实现。

    存储用户数据

    第一个场景比如我们要储存用户信息,一般使用用户的ID作为key值,保持唯一性,用户的其他信息(地址、年龄、生日、电话号码等)作为value值存储。

    若是传统的实现就是将用户的信息封装成为一个对象,通过序列化存储数据,当需要获取用户信息的时候,就会通过反序列化得到用户信息。

    但是这样必然会造成序列化和反序列化的性能的开销,并且若是只修改其中的一个属性值,就需要把整个对象序列化出来,操作的动作太大,造成不必要的性能开销。

    若是使用Redis的hash来存储用户数据,就会将原来的value值又看成了一个k v形式的存储容器,这样就不会带来序列化的性能开销的问题。

    分布式生成唯一ID

    第二个场景就是生成分布式的唯一ID,这个场景下就是把redis封装成了一个工具类进行实现,实现的代码如下:

        // offset表示的是id的递增梯度值
        public Long getId(String key,String hashKey,Long offset) throws BusinessException{
            try {
                if (null == offset) {
                    offset=1L;
                }
                // 生成唯一id
                return redisUtil.increment(key, hashKey, offset);
            } catch (Exception e) {
                //若是出现异常就是用uuid来生成唯一的id值
                int randNo=UUID.randomUUID().toString().hashCode();
                if (randNo < 0) {
                    randNo=-randNo;
                }
                return Long.valueOf(String.format("%16d", randNo));
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    List类型

    Redis中的列表在3.2之前的版本是使用ziplistlinkedlist进行实现的。在3.2之后的版本就是引入了quicklist

    ziplist压缩列表上面已经讲过了,我们来看看linkedlist和quicklist的结构是怎么样的。

    linkedlist是一个双向链表,他和普通的链表一样都是由指向前后节点的指针。插入、修改、更新的时间复杂度尾O(1),但是查询的时间复杂度确实O(n)。

    linkedlist和quicklist的底层实现是采用链表进行实现,在c语言中并没有内置的链表这种数据结构,Redis实现了自己的链表结构。

    Redis中链表的特性:

    1. 每一个节点都有指向前一个节点和后一个节点的指针。
    2. 头节点和尾节点的prev和next指针指向为null,所以链表是无环的。
    3. 链表有自己长度的信息,获取长度的时间复杂度为O(1)。

    Redis中List的实现比较简单,下面我们就来看看它的应用场景。

    应用场景

    Redis中的列表可以实现阻塞队列,结合lpush和brpop命令就可以实现。生产者使用lupsh从列表的左侧插入元素,消费者使用brpop命令从队列的右侧获取元素进行消费。

    (1)首先配置redis的配置,为了方便我就直接放在application.yml配置文件中,实际中可以把redis的配置文件放在一个redis.properties文件单独放置,具体配置如下:

    spring
    	redis:
    		host: 127.0.0.1
    		port: 6379
    		password: user
    		timeout: 0
    		database: 2
    		pool:
    			max-active: 100
    			max-idle: 10
    			min-idle: 0
    			max-wait: 100000
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    (2)第二步创建redis的配置类,叫做RedisConfig,并标注上@Configuration注解,表明他是一个配置类。

    @Configuration
    public class RedisConfiguration {
    
    @Value("${spring.redis.host}")
    private String host;
    @Value("${spring.redis.port}")
    private int port;
    @Value("${spring.redis.password}")
    private String password;
    @Value("${spring.redis.pool.max-active}")
    private int maxActive;
    @Value("${spring.redis.pool.max-idle}")
    private int maxIdle;
    @Value("${spring.redis.pool.min-idle}")
    private int minIdle;
    @Value("${spring.redis.pool.max-wait}")
    private int maxWait;
    @Value("${spring.redis.database}")
    private int database;
    @Value("${spring.redis.timeout}")
    private int timeout;
    
    @Bean
    public JedisPoolConfig getRedisConfiguration(){
    	JedisPoolConfig jedisPoolConfig= new JedisPoolConfig();
    	jedisPoolConfig.setMaxTotal(maxActive);
    	jedisPoolConfig.setMaxIdle(maxIdle);
    	jedisPoolConfig.setMinIdle(minIdle);
    	jedisPoolConfig.setMaxWaitMillis(maxWait);
    	return jedisPoolConfig;
    }
    
    @Bean
    public JedisConnectionFactory getConnectionFactory() {
    	JedisConnectionFactory factory = new JedisConnectionFactory();
    	factory.setHostName(host);
    	factory.setPort(port);
    	factory.setPassword(password);
    	factory.setDatabase(database);
    	JedisPoolConfig jedisPoolConfig= getRedisConfiguration();
    	factory.setPoolConfig(jedisPoolConfig);
    	return factory;
    }
    
    @Bean
    public RedisTemplate getRedisTemplate() {
    	JedisConnectionFactory factory = getConnectionFactory();
    	RedisTemplate redisTemplate = new StringRedisTemplate(factory);
    	return redisTemplate;
    }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    (3)第三步就是创建Redis的工具类RedisUtil,自从学了面向对象后,就喜欢把一些通用的东西拆成工具类,好像一个一个零件,需要的时候,就把它组装起来。

    @Component
    public class RedisUtil {
    
    @Autowired
    private RedisTemplate redisTemplate;
    /**
    * 存消息到消息队列中
    * @param key 键
    * @param value 值
    * @return
    */
    public boolean lPushMessage(String key, Object value) {
    	try {
    			redisTemplate.opsForList().leftPush(key, value);
    			return true;
    	} catch (Exception e) {
    			e.printStackTrace();
    			return false;
    	}
    }
    
    /**
    * 从消息队列中弹出消息 - 
    * @param key 键
    * @return
    */
    public Object rPopMessage(String key) {
    	try {
    			return redisTemplate.opsForList().rightPop(key);
    	} catch (Exception e) {
    			e.printStackTrace();
    			return null;
    	}
    }
    
    /**
    * 查看消息
    * @param key 键
    * @param start 开始
    * @param end 结束 0 到 -1代表所有值
    * @return
    */
    public List getMessage(String key, long start, long end) {
    	try {
    			return redisTemplate.opsForList().range(key, start, end);
    	} catch (Exception e) {
    			e.printStackTrace();
    			return null;
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    这样就完成了Redis消息队列工具类的创建,在后面的代码中就可以直接使用。

    Set集合

    Redis中列表和集合都可以用来存储字符串,但是Set是不可重复的集合,而List列表可以存储相同的字符串,Set集合是无序的这个和后面讲的ZSet有序集合相对。

    Set的底层实现是ht和intset,ht(哈希表)前面已经详细了解过,下面我们来看看inset类型的存储结构。

    inset也叫做整数集合,用于保存整数值的数据结构类型,它可以保存int16_tint32_t 或者int64_t 的整数值。

    在整数集合中,有三个属性值encoding、length、contents[],分别表示编码方式、整数集合的长度、以及元素内容,length就是记录contents里面的大小。

    在整数集合新增元素的时候,若是超出了原集合的长度大小,就会对集合进行升级,具体的升级过程如下:

    1. 首先扩展底层数组的大小,并且数组的类型为新元素的类型。
    2. 然后将原来的数组中的元素转为新元素的类型,并放到扩展后数组对应的位置。
    3. 整数集合升级后就不会再降级,编码会一直保持升级后的状态。

    应用场景

    Set集合的应用场景可以用来去重、抽奖、共同好友、二度好友等业务类型。接下来模拟一个添加好友的案例实现:

    @RequestMapping(value = "/addFriend", method = RequestMethod.POST)
    public Long addFriend(User user, String friend) {
        String currentKey = null;
        // 判断是否是当前用户的好友
        if (AppContext.getCurrentUser().getId().equals(user.getId)) {
            currentKey = user.getId.toString();
        }
        //若是返回0则表示不是该用户好友
        return currentKey==null?0l:setOperations.add(currentKey, friend);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    假如两个用户A和B都是用上上面的这个接口添加了很多的自己的好友,那么有一个需求就是要实现获取A和B的共同好友,那么可以进行如下操作:

    public Set intersectFriend(User userA, User userB) {
        return setOperations.intersect(userA.getId.toString(), userB.getId.toString());
    }
    
    • 1
    • 2
    • 3

    举一反三,还可以实现A用户自己的好友,或者B用户自己的好友等,都可以进行实现。

    ZSet集合

    ZSet是有序集合,从上面的图中可以看到ZSet的底层实现是ziplistskiplist实现的,ziplist上面已经详细讲过,这里来讲解skiplist的结构实现。

    skiplist也叫做跳跃表,跳跃表是一种有序的数据结构,它通过每一个节点维持多个指向其它节点的指针,从而达到快速访问的目的。

    skiplist由如下几个特点:

    1. 有很多层组成,由上到下节点数逐渐密集,最上层的节点最稀疏,跨度也最大。
    2. 每一层都是一个有序链表,只扫包含两个节点,头节点和尾节点。
    3. 每一层的每一个每一个节点都含有指向同一层下一个节点和下一层同一个位置节点的指针。
    4. 如果一个节点在某一层出现,那么该以下的所有链表同一个位置都会出现该节点。

    具体实现的结构图如下所示:

    在跳跃表的结构中有head和tail表示指向头节点和尾节点的指针,能后快速的实现定位。level表示层数,len表示跳跃表的长度,BW表示后退指针,在从尾向前遍历的时候使用。

    BW下面还有两个值分别表示分值(score)和成员对象(各个节点保存的成员对象)。

    跳跃表的实现中,除了最底层的一层保存的是原始链表的完整数据,上层的节点数会越来越少,并且跨度会越来越大。

    跳跃表的上面层就相当于索引层,都是为了找到最后的数据而服务的,数据量越大,条表所体现的查询的效率就越高,和平衡树的查询效率相差无几。

    应用场景

    因为ZSet是有序的集合,因此ZSet在实现排序类型的业务是比较常见的,比如在首页推荐10个最热门的帖子,也就是阅读量由高到低,排行榜的实现等业务。

    下面就选用获取排行榜前前10名的选手作为案例实现,实现的代码如下所示:

    @Autowired
    private RedisTemplate redisTemplate;
    	/**
    	 * 获取前10排名
    	 * @return
    	 */
        public static List getZset(String key, long baseNum, LevelService levelService){
            ZSetOperations operations = redisTemplate.opsForZSet();
            // 根据score分数值获取前10名的数据
            Set> set = operations.reverseRangeWithScores(key,0,9);
            List list= new ArrayList();
            int i=1;
            for (ZSetOperations.TypedTuple o:set){
                int uid = (int) o.getValue();
                LevelCache levelCache = levelService.getLevelCache(uid);
                LevelVO levelVO = levelCache.getLevelVO();
                long score = (o.getScore().longValue() - baseNum + levelVO .getCtime())/CommonUtil.multiplier;
                levelVO .setScore(score);
                levelVO .setRank(i);
                list.add( levelVO );
                i++;
            }
            return list;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    以上的代码实现大致逻辑就是根据score分数值获取前10名的数据,然后封装成lawyerVO对象的列表进行返回。

    到这里我们已经精通Redis的五种基本数据类型了,又可以去和面试官扯皮了,扯不过就跑路吧,或者这篇文章多看几遍,相信对你总是有好处的。

    Redis内存分配策略

    概述

    今天就带来了一个面试常问的一个问题:假如你的Redis内存满了怎么办? 长期的把Redis作为缓存使用,总有一天会存满的时候对吧。

    这个面试题不慌呀,在Redis中有配置参数maxmemory可以设置Redis内存的大小

    在Redis的配置文件redis.conf文件中,配置maxmemory的大小参数如下所示:

    实际生产中肯定不是100mb的大小哈,不要给误导了,这里我只是让大家认识这个参数,一般小的公司都是设置为3G左右的大小。

    除了在配置文件中配置生效外,还可以通过命令行参数的形式,进行配置,具体的配置命令行如下所示:

    //获取maxmemory配置参数的大小
    127.0.0.1:6379> config get maxmemory
    //设置maxmemory参数为100mb
    127.0.0.1:6379> config set maxmemory 100mb
    
    • 1
    • 2
    • 3
    • 4

    倘若实际的存储中超出了Redis的配置参数的大小时,Redis中有淘汰策略,把需要淘汰的key给淘汰掉,整理出干净的一块内存给新的key值使用

    接下来我们就详细的聊一聊Redis中的淘汰策略,并且深入的理解每个淘汰策略的原理和应用的场景。

    淘汰策略

    Redis提供了6种的淘汰策略,其中默认的是noeviction,这6中淘汰策略如下:

    1. noeviction(默认策略):若是内存的大小达到阀值的时候,所有申请内存的指令都会报错。
    2. allkeys-lru:所有key都是使用LRU算法进行淘汰。
    3. volatile-lru:所有设置了过期时间的key使用LRU算法进行淘汰。
    4. allkeys-random:所有的key使用随机淘汰的方式进行淘汰。
    5. volatile-random:所有设置了过期时间的key使用随机淘汰的方式进行淘汰。
    6. volatile-ttl:所有设置了过期时间的key根据过期时间进行淘汰,越早过期就越快被淘汰

    假如在Redis中的数据有一部分是热点数据,而剩下的数据是冷门数据,或者我们不太清楚我们应用的缓存访问分布状况,这时可以使用allkeys-lru

    假如所有的数据访问的频率大概一样,就可以使用allkeys-random的淘汰策略。

    假如要配置具体的淘汰策略,可以在redis.conf配置文件中配置,具体配置如下所示:

    这只需要把注释给打开就可以,并且配置指定的策略方式,另一种的配置方式就是命令的方式进行配置,具体的执行命令如下所示:

    // 获取maxmemory-policy配置
    127.0.0.1:6379> config get maxmemory-policy
    // 设置maxmemory-policy配置为allkeys-lru
    127.0.0.1:6379> config set maxmemory-policy allkeys-lru
    
    • 1
    • 2
    • 3
    • 4

    在介绍6种的淘汰策略方式的时候,说到了LRU算法,那么什么是LRU算法呢?

    LRU算法

    LRU(Least Recently Used)即表示最近最少使用,也就是在最近的时间内最少被访问的key,算法根据数据的历史访问记录来进行淘汰数据。

    它的核心的思想就是:假如一个key值在最近很少被使用到,那么在将来也很少会被访问

    实际上Redis实现的LRU并不是真正的LRU算法,也就是名义上我们使用LRU算法淘汰键,但是实际上被淘汰的键并不一定是真正的最久没用的。

    Redis使用的是近似的LRU算法,通过随机采集法淘汰key,每次都会随机选出5个key,然后淘汰里面最近最少使用的key

    这里的5个key只是默认的个数,具体的个数也可以在配置文件中进行配置,在配置文件中的配置如下图所示:

    当近似LRU算法取值越大的时候就会越接近真实的LRU算法,可以这样理解,因为取值越大那么获取的数据就越全,淘汰中的数据的就越接近最近最少使用的数据

    那么为了实现根据时间实现LRU算法,Redis必须为每个key中额外的增加一个内存空间用于存储每个key的时间,大小是3字节。

    在Redis 3.0中对近似的LRU算法做了一些优化,Redis中会维护大小是16的一个候选池的内存。

    当第一次随机选取的采样数据,数据都会被放进候选池中,并且候选池中的数据会根据时间进行排序。

    当第二次以后选取的数据,只有小于候选池内的最小时间的才会被放进候选池中。

    当某一时刻候选池的数据满了,那么时间最大的key就会被挤出候选池。当执行淘汰时,直接从候选池中选取最近访问时间最小的key进行淘汰。

    这样做的目的就是选取出最近似符合最近最少被访问的key值,能够正确的淘汰key值,因为随机选取的样本中的最小时间可能不是真正意义上的最小时间。

    但是LRU算法有一个弊端:就是假如一个key值在以前都没有被访问到,然而最近一次被访问到了,那么就会认为它是热点数据,不会被淘汰。

    然而有些数据以前经常被访问到,只是最近的时间内没有被访问到,这样就导致这些数据很可能被淘汰掉,这样一来就会出现误判而淘汰热点数据。

    于是在Redis 4.0的时候除了LRU算法,新加了一种LFU算法,那么什么是LFU算法算法呢?

    LFU算法

    LFU(Least Frequently Used)即表示最近频繁被使用,也就是最近的时间段内,频繁被访问的key,它以最近的时间段的被访问次数的频率作为一种判断标准。

    它的核心思想就是:根据key最近被访问的频率进行淘汰,比较少被访问的key优先淘汰,反之则优先保留。

    LFU算法反映了一个key的热度情况,不会因为LRU算法的偶尔一次被访问被认为是热点数据。

    在LFU算法中支持volatile-lfu策略和allkeys-lfu策略。

    以上介绍了Redis的6种淘汰策略,这6种淘汰策略旨在告诉我们怎么做,但是什么时候做?这个还没说,下面我们就来详细的了解Redis什么时候执行淘汰策略。

    删除过期键策略

    在Redis种有三种删除的操作此策略,分别是:

    1. 定时删除:创建一个定时器,定时的执行对key的删除操作。
    2. 惰性删除:每次只有再访问key的时候,才会检查key的过期时间,若是已经过期了就执行删除。
    3. 定期删除:每隔一段时间,就会检查删除掉过期的key。

    定时删除对于内存来说是友好的,定时清理出干净的空间,但是对于cpu来说并不是友好的,程序需要维护一个定时器,这就会占用cpu资源。

    惰性的删除对于cpu来说是友好的,cpu不需要维护其它额外的操作,但是对于内存来说是不友好的,因为要是有些key一直没有被访问到,就会一直占用着内存。

    定期删除是上面两种方案的折中方案**,每隔一段时间删除过期的key,也就是根据具体的业务,合理的取一个时间定期的删除key**。

    通过最合理控制删除的时间间隔来删除key,减少对cpu的资源的占用消耗,使删除操作合理化。

    RDB和AOF 的淘汰处理

    在Redis中持久化的方式有两种RDBAOF,具体这两种详细的持久化介绍,可以参考这一篇文章[]。

    在RDB中是以快照的形式获取内存中某一时间点的数据副本,在创建RDB文件的时候可以通过savebgsave命令执行创建RDB文件。

    这两个命令都不会把过期的key保存到RDB文件中,这样也能达到删除过期key的效果。

    当在启动Redis载入RDB文件的时候,Master不会把过期的key载入,而Slave会把过期的key载入。

    在AOF模式下,Redis提供了Rewite的优化措施,执行的命令分别是REWRITEAOFBGREWRITEAOF这两个命令都不会把过期的key写入到AOF文件中,也能删除过期key

    Redis缓存三大问题

    前言

    日常的开发中,无不都是使用数据库来进行数据的存储,由于一般的系统任务中通常不会存在高并发的情况,所以这样看起来并没有什么问题。

    一旦涉及大数据量的需求,如一些商品抢购的情景,或者主页访问量瞬间较大的时候,单一使用数据库来保存数据的系统会因为面向磁盘磁盘读/写速度问题有严重的性能弊端,详细的磁盘读写原理请参考这一片[]。

    在这一瞬间成千上万的请求到来,需要系统在极短的时间内完成成千上万次的读/写操作,这个时候往往不是数据库能够承受的,极其容易造成数据库系统瘫痪,最终导致服务宕机的严重生产问题。

    为了克服上述的问题,项目通常会引入NoSQL技术,这是一种基于内存数据库,并且提供一定的持久化功能。

    Redis技术就是NoSQL技术中的一种。Redis缓存的使用,极大的提升了应用程序的性能和效率,特别是数据查询方面。

    但同时,它也带来了一些问题。其中,最要害的问题,就是数据的一致性问题,从严格意义上讲,这个问题无解。如果对数据的一致性要求很高,那么就不能使用缓存

    另外的一些典型问题就是,缓存穿透缓存击穿缓存雪崩。本篇文章从实际代码操作,来提出解决这三个缓存问题的方案,毕竟Redis的缓存问题是实际面试中高频问点,理论和实操要兼得。

    缓存穿透

    缓存穿透是指查询一条数据库和缓存都没有的一条数据,就会一直查询数据库,对数据库的访问压力就会增大,缓存穿透的解决方案,有以下两种:

    1. 缓存空对象:代码维护较简单,但是效果不好。
    2. 布隆过滤器:代码维护复杂,效果很好。

    缓存空对象

    缓存空对象是指当一个请求过来缓存中和数据库中都不存在该请求的数据,第一次请求就会跳过缓存进行数据库的访问,并且访问数据库后返回为空,此时也将该空对象进行缓存。

    若是再次进行访问该空对象的时候,就会直接击中缓存,而不是再次数据库,缓存空对象实现的原理图如下:

    缓存空对象的实现代码如下:

    public class UserServiceImpl {
         @Autowired
         UserDAO userDAO;
         @Autowired
         RedisCache redisCache;
     
         public User findUser(Integer id) {
              Object object = redisCache.get(Integer.toString(id));
              // 缓存中存在,直接返回
              if(object != null) {
                   // 检验该对象是否为缓存空对象,是则直接返回null
                   if(object instanceof NullValueResultDO) {
                        return null;
                   }
                   return (User)object;
              } else {  
                   // 缓存中不存在,查询数据库
                   User user = userDAO.getUser(id);
                   // 存入缓存
                   if(user != null) {
                        redisCache.put(Integer.toString(id),user);
                   } else {
                        // 将空对象存进缓存
                        redisCache.put(Integer.toString(id), new NullValueResultDO());
                   }
                   return user;
              }
         }          
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    缓存空对象的实现代码很简单,但是缓存空对象会带来比较大的问题,就是缓存中会存在很多空对象,占用内存的空间,浪费资源,一个解决的办法就是设置空对象的较短的过期时间,代码如下:

    // 再缓存的时候,添加多一个该空对象的过期时间60秒
    redisCache.put(Integer.toString(id), new NullValueResultDO(),60);
    
    • 1
    • 2

    布隆过滤器

    布隆过滤器是一种基于概率数据结构,主要用来判断某个元素是否在集合内,它具有运行速度快(时间效率),占用内存小的优点(空间效率),但是有一定的误识别率删除困难的问题。它只能告诉你某个元素一定不在集合内或可能在集合内。

    在计算机科学中有一种思想:空间换时间,时间换空间。一般两者是不可兼得,而布隆过滤器运行效率和空间大小都兼得,它是怎么做到的呢?

    在布隆过滤器中引用了一个误判率的概念,即它可能会把不属于这个集合的元素认为可能属于这个集合,但是不会把属于这个集合的认为不属于这个集合,布隆过滤器的特点如下:

    1. 一个非常大的二进制位数组 (数组里只有0和1)
    2. 若干个哈希函数
    3. 空间效率查询效率高
    4. 不存在漏报(False Negative):某个元素在某个集合中,肯定能报出来。
    5. 可能存在误报(False Positive):某个元素不在某个集合中,可能也被爆出来。
    6. 不提供删除方法,代码维护困难。
    7. 位数组初始化都为0,它不存元素的具体值,当元素经过哈希函数哈希后的值(也就是数组下标)对应的数组位置值改为1。

    实际布隆过滤器存储数据和查询数据的原理图如下:

    可能很多读者看完上面的特点和原理图,还是看不懂,别急下面通过图解一步一步的讲解布隆过滤器,总而言之一句简单的话概括就是布隆过滤器是一个很大二进制位数组,数组里面只存0和1

    初始化的布隆过滤器的结构图如下:

    以上只是画了布隆过滤器的很小很小的一部分,实际布隆过滤器是非常大的数组(这里的大是指它的长度大,并不是指它所占的内存空间大)。

    那么一个数据是怎么存进布隆过滤器的呢?

    当一个数据进行存入布隆过滤器的时候,会经过如干个哈希函数进行哈希(若是对哈希函数还不懂的请参考这一片[]),得到对应的哈希值作为数组的下标,然后将初始化的位数组对应的下标的值修改为1,结果图如下:

    当再次进行存入第二个值的时候,修改后的结果的原理图如下:

    所以每次存入一个数据,就会哈希函数的计算,计算的结果就会作为下标,在布隆过滤器中有多少个哈希函数就会计算出多少个下标,布隆过滤器插入的流程如下:

    1. 将要添加的元素给m个哈希函数
    2. 得到对应于位数组上的m个位置
    3. 将这m个位置设为1

    那么为什么会有误判率呢?

    假设在我们多次存入值后,在布隆过滤器中存在x、y、z这三个值,布隆过滤器的存储结构图如下所示:

    当我们要查询的时候,比如查询a这个数,实际中a这个数是不存在布隆过滤器中的,经过2哥哈希函数计算后得到a的哈希值分别为2和13,结构原理图如下:

    经过查询后,发现2和13位置所存储的值都为1,但是2和13的下标分别是x和z经过计算后的下标位置的修改,该布隆过滤器中实际不存在a,那么布隆过滤器就会误判改值可能存在,因为布隆过滤器不存元素值,所以存在误判率

    那么具体布隆过布隆过滤的判断的准确率和一下两个因素有关:

    1. 布隆过滤器大小:越大,误判率就越小,所以说布隆过滤器一般长度都是非常大的。
    2. 哈希函数的个数:哈希函数的个数越多,那么误判率就越小。

    那么为什么不能删除元素呢?

    原因很简单,因为删除元素后,将对应元素的下标设置为零,可能别的元素的下标也引用改下标,这样别的元素的判断就会收到影响,原理图如下:

    当你删除z元素之后,将对应的下标10和13设置为0,这样导致x和y元素的下标受到影响,导致数据的判断不准确,所以直接不提供删除元素的api。

    以上说的都是布隆过滤器的原理,只有理解了原理,在实际的运用才能如鱼得水,下面就来实操代码,手写一个简单的布隆过滤器。

    对于要手写一个布隆过滤器,首先要明确布隆过滤器的核心:

    • 若干哈希函数
    • 存值得Api
    • 判断值得Api

    实现得代码如下:

    public class MyBloomFilter {
        // 布隆过滤器长度
        private static final int SIZE = 2 << 10;
        // 模拟实现不同的哈希函数
        private static final int[] num= new int[] {5, 19, 23, 31,47, 71};   
        // 初始化位数组
        private BitSet bits = new BitSet(SIZE);
        // 用于存储哈希函数
        private MyHash[] function = new MyHash[num.length];
        
        // 初始化哈希函数
        public MyBloomFilter() {
            for (int i = 0; i < num.length; i++) {
                function [i] = new MyHash(SIZE, num[i]);
            }
        }
       
        // 存值Api 
        public void add(String value) {
            // 对存入得值进行哈希计算
            for (MyHash f: function) {
                // 将为数组对应的哈希下标得位置得值改为1
                bits.set(f.hash(value), true);
            }
        }
       
        // 判断是否存在该值得Api 
        public boolean contains(String value) {
            if (value == null) {
                return false;
            }
            boolean result= true;
            for (MyHash f : func) {
                result= result&& bits.get(f.hash(value));
            }
            return result;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    哈希函数代码如下:

    public static class MyHash {
            private int cap;
            private int seed;
            // 初始化数据
            public MyHash(int cap, int seed) {
                this.cap = cap;
                this.seed = seed;
            }
            // 哈希函数
            public int hash(String value) {
                int result = 0;
                int len = value.length();
                for (int i = 0; i < len; i++) {
                    result = seed * result + value.charAt(i);
                }
                return (cap - 1) & result;
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    布隆过滤器测试代码如下:

        public static void test {
            String value = "4243212355312";
            MyBloomFilter filter = new MyBloomFilter();
            System.out.println(filter.contains(value));
            filter.add(value);
            System.out.println(filter.contains(value));
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    以上就是手写了一个非常简单得布隆过滤器,但是实际项目中可能事由牛人或者大公司已经帮你写好的,如谷歌的Google Guava,只需要在项目中引入一下依赖:

    
        com.google.guava
        guava
        27.0.1-jre
    
    
    • 1
    • 2
    • 3
    • 4
    • 5

    实际项目中具体的操作代码如下:

    public static void MyBloomFilterSysConfig {
    
         @Autowired
         OrderMapper orderMapper
         
        // 1.创建布隆过滤器  第二个参数为预期数据量10000000,第三个参数为错误率0.00001
        BloomFilter bloomFilter =  BloomFilter.create(Funnels.stringFunnel(Charset.forName("utf-8")),10000000, 0.00001);
        // 2.获取所有的订单,并将订单的id放进布隆过滤器里面
        List orderList = orderMapper.findAll()
        for (Order order;orderList ) {
            Long id = order.getId();
            bloomFilter.put("" + id);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    在实际项目中会启动一个系统任务或者定时任务,来初始化布隆过滤器,将热点查询数据的id放进布隆过滤器里面,当用户再次请求的时候,使用布隆过滤器进行判断,改订单的id是否在布隆过滤器中存在,不存在直接返回null,具体操作代码:

    // 判断订单id是否在布隆过滤器中存在
    bloomFilter.mightContain("" + id)
    
    • 1
    • 2

    布隆过滤器的缺点就是要维持容器中的数据,因为订单数据肯定是频繁变化的,实时的要更新布隆过滤器中的数据为最新。

    缓存击穿

    缓存击穿是指一个key非常热点,在不停的扛着大并发,大并发集中对这一个点进行访问,当这个key在失效的瞬间,持续的大并发就穿破缓存,直接请求数据库,瞬间对数据库的访问压力增大。

    缓存击穿这里强调的是并发,造成缓存击穿的原因有以下两个:

    1. 该数据没有人查询过 ,第一次就大并发的访问。(冷门数据)
    2. 添加到了缓存,reids有设置数据失效的时间 ,这条数据刚好失效,大并发访问(热点数据)

    对于缓存击穿的解决方案就是加锁,具体实现的原理图如下:

    当用户出现大并发访问的时候,在查询缓存的时候和查询数据库的过程加锁,只能第一个进来的请求进行执行,当第一个请求把该数据放进缓存中,接下来的访问就会直接集中缓存,防止了缓存击穿

    业界比价普遍的一种做法,即根据key获取value值为空时,锁上,从数据库中load数据后再释放锁。若其它线程获取锁失败,则等待一段时间后重试。这里要注意,分布式环境中要使用分布式锁单机的话用普通的锁(synchronizedLock)就够了。

    下面以一个获取商品库存的案例进行代码的演示,单机版的锁实现具体实现的代码如下:

    // 获取库存数量
    public String getProduceNum(String key) {
        try {
            synchronized (this) {   //加锁
                // 缓存中取数据,并存入缓存中
                int num= Integer.parseInt(redisTemplate.opsForValue().get(key));
                
                if (num> 0) {
                    //没查一次库存-1
                    redisTemplate.opsForValue().set(key, (num- 1) + "");
                    System.out.println("剩余的库存为num:" + (num- 1));
                } else {
                    System.out.println("库存为0");
                }
            }
        } catch (NumberFormatException e) {
            e.printStackTrace();
        } finally {
        }
        return "OK";
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    分布式的锁实现具体实现的代码如下:

    public String getProduceNum(String key) {
        // 获取分布式锁
        RLock lock = redissonClient.getLock(key);
        try {
            // 获取库存数
            int num= Integer.parseInt(redisTemplate.opsForValue().get(key));  
            // 上锁           
            lock.lock();
            if (num> 0) {
                //减少库存,并存入缓存中
                redisTemplate.opsForValue().set(key, (num - 1) + "");
                System.out.println("剩余库存为num:" + (num- 1));
            } else {
                System.out.println("库存已经为0");
            }
        } catch (NumberFormatException e) {
            e.printStackTrace();
        } finally {
            //解锁
            lock.unlock();
        }
        return "OK";
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    缓存雪崩

    缓存雪崩 是指在某一个时间段,缓存集中过期失效。此刻无数的请求直接绕开缓存,直接请求数据库。

    造成缓存雪崩的原因,有以下两种:

    1. reids宕机
    2. 大部分数据失效

    比如天猫双11,马上就要到双11零点,很快就会迎来一波抢购,这波商品在23点集中的放入了缓存,假设缓存一个小时,那么到了凌晨24点的时候,这批商品的缓存就都过期了。

    而对这批商品的访问查询,都落到了数据库上,对于数据库而言,就会产生周期性的压力波峰,对数据库造成压力,甚至压垮数据库。

    缓存雪崩的原理图如下,当正常的情况下,key没有大量失效的用户访问原理图如下:

    当某一时间点,key大量失效,造成的缓存雪崩的原理图如下:

    对于缓存雪崩的解决方案有以下两种:

    1. 搭建高可用的集群,防止单机的redis宕机。
    2. 设置不同的过期时间,防止同意之间内大量的key失效。

    针对业务系统,永远都是具体情况具体分析,没有最好,只有最合适。于缓存其它问题,缓存满了和数据丢失等问题,我们后面继续深入的学习。最后也提一下三个词LRU、RDB、AOF,通常我们采用LRU策略处理溢出,Redis的RDB和AOF持久化策略来保证一定情况下的数据安全。

    redis持久化

    本文脑图

    Redis是一个基于内存的非关系型的数据库,数据保存在内存中,但是内存中的数据也容易发生丢失。这里Redis就为我们提供了持久化的机制,分别是RDB(Redis DataBase)AOF(Append Only File)

    Redis在以前的版本中是单线程的,而在6.0后对Redis的io模型做了优化,io Thread为多线程的,但是worker Thread仍然是单线程。

    在Redis启动的时候就会去加载持久化的文件,如果没有就直接启动,在启动后的某一时刻由继续持久化内存中产生的数据。

    接下来我们就来详细了解Redis的两种持久化机制RDB(Redis DataBase)AOF(Append Only File)

    RDB持久化机制

    什么是RDB持久化呢?RDB持久化就是将当前进程的数据以生成快照的形式持久化到磁盘中。对于快照的理解,我们可以理解为将当前线程的数据以拍照的形式保存下来。

    RDB持久化的时候会单独fork一个与当前进程一摸一样的子进程来进行持久化,因此RDB持久化有如下特点:

    1. 开机恢复数据快。
    2. 写入持久化文件快。

    RDB的持久化也是Redis默认的持久化机制,它会把内存中的数据以快照的形式写入默认文件名为dump.rdb中保存。

    在安装后的Redis中,Redis的配置都在redis.conf文件中,如下图所示,dbfilename就是配置RDB的持久化文件名。

    在这里插入图片描述

    持久化触发时机

    在RDB机制中触发内存中的数据进行持久化,有以下三种方式:

    (1)save命令:

    save命令不会fork子进程,通过阻塞当前Redis服务器,直到RDB完成为止,所以该命令在生产中一般不会使用。save命令执行原理图如下:

    在这里插入图片描述

    在redis.conf的配置中dir的配置就是RDB持久化后生成rdb二进制文件所在的位置,默认的位置是./,表示当前位置,哪里启动redis,就会在哪里生成持久化文件,如下图所示:

    在这里插入图片描述

    下面我们进行一下实操,演示一下二进制文件生成的过程,在我本机的电脑虚拟机中,我所在的位置如下,该文件夹是新创建的redis的数据存储文件夹。

    在这里插入图片描述

    然后我们直接在该位置启动我们的Redis服务,启动的命令如下:

    /root/redis-4.0.6/src/redis-server /root/redis-4.0.6/redis.conf
    
    • 1

    接着通过该命令:ps -aux | grep redis,查看我们的redis服务是否正常启动,若是显示如下图所示,则表示Redis是正常启动的:

    在这里插入图片描述

    正常启动后,直接登陆Redis,可以通过以下命令登陆Redis,如下图所示:

    在这里插入图片描述

    因为当前中Redis是新安装的,数据都是为空,什么都没有,然后通过下图的命令随意向Redis中输入几条命令,最后执行save命令,在该文件夹下就会出现dump.rdb持久化的数据文件。

    在这里插入图片描述

    当然上面说到,在新安装的Redis中默认的RDB数据持久化位置为./文件,一般我们会把它改成服务器自己的特定位置下,原理都是一样的,可以自己进行尝试,这里不再进行演示。

    (2)bgsave命令:

    bgsave命令会在后台fork一个与Redis主线程一摸一样的子线程,由子线程负责内存中的数据持久化。

    这样fork与主线程一样的子线程消耗了内存,但是不会阻塞主线程处理客户端请求,是以空间换时间的方式快照内存中的数据到到文件中。

    bgsave命令阻塞只会发生在fork子线程的时候,这段时间发生的非常短,可以忽略不计,如下图是 bgsave执行的流程图:

    在这里插入图片描述

    上面说到redis.conf中的dir配置是配置持久化文件生成的指定的目录,dbfilename是配置生成的文件名,也可以通过命令行使用命令来动态的设置这两个配置,命令如下:

    config set dir{newDir}
    config set dbfilename{newFileName}
    
    • 1
    • 2

    (3)自动化

    除了上面在命令行使用save和bgsave命令触发持久化,也可以在redis.conf配置文件中,完成配置,如下图所示:

    在这里插入图片描述

    在新安装的redis中由默认的以上三个save配置,save 900 1表示900秒内如果至少有1个key值变化,则进行持久化保存数据;

    save 300 10则表示300秒内如果至少有10个key值发生变化,则进行持久化,save 60 10000以此类推。

    通过以上的分析可以得出以下save和bgsave的对比区别:

    1. save是同步持久化数据,而bgsave是异步持久化数据。
    2. save不会fork子进程,通过主进程持久化数据,会阻塞处理客户端的请求,而bdsavefork子进程持久化数据,同时还可以处理客户端请求,高效。
    3. save不会消耗内存,而bgsave会消耗内存
    RDB的优缺点

    缺点: RDB持久化后的文件是紧凑的二进制文件,适合于备份、全量复制、大规模数据恢复的场景,对数据完整性和一致性要求不高,RDB会丢失最后一次快照的数据。

    优点: 开机的恢复数据快,写入持久化文件快。

    AOF持久化机制

    AOF持久化机制是以日志的形式记录Redis中的每一次的增删改操作,不会记录查询操作,以文本的形式记录,打开记录的日志文件就可以查看操作记录。

    AOF是默认不开启的,若是像开启AOF,在如下图的配置修改即可:

    在这里插入图片描述

    只需要把appendonly no修改为appendonly yes即可开启,在AOF中通过appendfilename配置生成的文件名,该文件名默认为appendonly.aof,路径也是通过dir配置的,这个于RDB的一样,具体的配置信息如下图所示:

    在这里插入图片描述

    AOF触发机制

    AOF带来的持久化更加安全可靠,默认提供三种触发机制,如下所示:

    1. no:表示等操作系统等数据缓存同步到磁盘中(快、持久化没保证)。
    2. always:同步持久化,每次发生数据变更时,就会立即记录到磁盘中(慢,安全)。
    3. everysec:表示每秒同步一次(默认值,很快,但是会丢失一秒内的数据)。

    AOF中每秒同步也是异步完成的,效率是非常高的,由于该机制对日志文件的写入操作是采用append的形式。

    因此在写入的过程即使宕机,也不会丢失已经存入日志文件的数据,数据的完整性是非常高的。

    在新安装的Redis的配置文件中,AOF的配置如下所示:

    在这里插入图片描述

    AOF重写机制

    但是,在写入所有的操作到日志文件中时,就会出现日志文件很多重复的操作,甚至是无效的操作,导致日志文件越来越大。

    所谓的无效的的操作,举个例子,比如某一时刻对一个k++,然后后面的某一时刻k–,这样k的值是保持不变的,那么这两次的操作就是无效的。

    如果像这样的无效操作很多,记录的文件臃肿,就浪费了资源空间,所以在Redis中出现了rewrite机制。

    redis提供了bgrewriteaof命令。将内存中的数据以命令的方式保存到临时文件中,同时会fork出一条新进程来将文件重写。

    重写AOF的日志文件不是读取旧的日志文件瘦身,而是将内存中的数据用命令的方式重写一个AOF文件,重新保存替换原来旧的日志文件,因此内存中的数据才是最新的。

    重写操作也会fork一个子进程来处理重写操作,重写以内存中的数据作为重写的源,避免了操作的冗余性,保证了数据的最新。

    在Redis以append的形式将修改的数据写入老的磁盘中 ,同时Redis也会创建一个新的文件用于记录此期间有哪些命令被执行。

    下面进行演示一下AOF的操作,首先先打开AOF机制,修改配置文件中的appendonly noappendonly yes,然后执行如下图的操作:

    在这里插入图片描述

    都显示执行成功,ls以下查看此时当前的文件夹终究会出现appendonly.aof
    ,AOF的数据持久化文件,通过cat命令查看内容:

    在这里插入图片描述

    从上面的存储的文件中可以看出,每一个命令是非常有规律的,比如第一次执行key *映射到该配置文件中的命令如下:

    *2 //表示该命令两组key 为一组 * 为一组
    $6 //表示SELECT有6字符
    SELECT
    $1 //表示下面的0一个字符
    0
    
    • 1
    • 2
    • 3
    • 4
    • 5

    然后执行set k1 1的命令,此命令映射到文件中的命令如下:

    *3 //表示该命令有三组set为一组 k1为一组 1为一组
    $3 // 表示set有三个字符
    set // 表示执行了set命令
    $2 // 表示k1有两个字符
    k1 // key值
    $1 // 便是value值的字符长度为1
    1  // value值
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    当AOF的日志文件增长到一定大小的时候Redis就能够bgrewriteaof对日志文件进行重写瘦身。当AOF配置文件大于改配置项时自动开启重写(这里指超过原大小的100%)。

    该配置可以通过如下的配置项进行配置:

    在这里插入图片描述

    AOF的优缺点

    优点: AOF更好保证数据不会被丢失,最多只丢失一秒内的数据,通过foek一个子进程处理持久化操作,保证了主进程不会进程io操作,能高效的处理客户端的请求。

    另外重写操作保证了数据的有效性,即使日志文件过大也会进行重写。

    AOF的日志文件的记录可读性非常的高,即使某一时刻有人执行flushall清空了所有数据,只需要拿到aof的日志文件,然后把最后一条的flushall给删除掉,就可以恢复数据。

    缺点: 对于相同数量的数据集而言,AOF文件通常要大于RDB文件。RDB 在恢复大数据集时的速度比 AOF 的恢复速度要快。AOF在运行效率上往往会慢于RDB。

    混合持久化

    在redis4.0后混合持久化(RDB+AOF)对重写的优化,4.0版本的混合持久化默认是关闭的,可以通过以下的配置开启混合持久化:

    在这里插入图片描述

    混合持久化也是通过bgrewriteaof来完成的,不同的是当开启混合持久化时,fork出的子进程先将共享内存的数据以RDB方式写入aof文件中,然后再将重写缓冲区的增量命令以AOF方式写入文件中。

    写入完成后通知主进程统计信息,并将新的含有RDB格式和AOF格式的AOF文件替换旧的AOF文件。简单的说:新的AOF文件前半段是以RDB格式的全量数据后半段是AOF格式的增量数据。

    优点: 混合持久化结合RDB持久化AOF持久化的优点,由于绝大部分的格式是RDB格式,加载速度快,增量数据以AOF方式保存,数据更少的丢失。

    RDB和AOF优势和劣势

    rdb适合大规模的数据恢复,由于rdb时异快照的形式持久化数据,恢复的数据快,在一定的时间备份一次,而aof的保证数据更加完整,损失的数据只在秒内。

    具体哪种更适合生产,在官方的建议中两种持久化机制同时开启,如果两种机制同时开启,优先使用aof持久化机制。

    redis事务

    前言

    前几天有读者说自己面试被问到Redis的事务,虽然不常用,但是面试竟然被问到,平时自己没有注意Redis的事务这一块,面试的时候被问到非常不好受。

    虽然,这位读者面试最后算是过了,但是薪资方面没有拿到自己理想的薪资。

    其实这个也是正常的,一般面试被问到烂大街的,谁还问你啊,专门挑一些不常见的来问你,就是为了压你的薪资。

    所以在这里写一篇文章对Redis的事务进行详细的讲解,估计对Redis事务从理解到原理深入这一篇就够了。

    以后面试都不用担心了再被问道Redis的事务了,这一篇主要讲解Redis事务原理和实操的演练,理解理论的同时也通过实操来证实理论。

    事务介绍

    Redis事务是一组命令的集合,将多个命令进行打包,然后这些命令会被顺序的添加到队列中,并且按顺序的执行这些命令。

    Redis事务中没有像Mysql关系型数据库事务隔离级别的概念,不能保证原子性操作,也没有像Mysql那样执行事务失败会进行回滚操作

    这个与Redis的特点:快速、高效有着密切的关联,因为一些列回滚操作、像事务隔离级别那这样加锁、解锁,是非常消耗性能的。所以,Redis中执行事务的流程只需要简单的下面三个步骤:

    1. 开始事务(MULTI)
    2. 命令入队
    3. 执行事务(EXEC)、撤销事务(DISCARD )

    在Redis中事务的实现主要是通过如下的命令实现的:

    命令

    功能描述

    MULTI

    事务开始的命令,执行该命令后,后面执行的对Redis数据类型的操作命令都会顺序的放进队列中,等待执行EXEC命令后队列中的命令才会被执行

    DISCARD

    放弃执行队列中的命令,你可以理解为Mysql的回滚操作,并且将当前的状态从事务状态改为非事务状态

    EXEC

    执行该命令后表示顺序执行队列中的命令,执行完后并将结果显示在客户端,将当前状态从事务状态改为非事务状态。若是执行该命令之前有key被执行WATCH命令并且又被其它客户端修改,那么就会放弃执行队列中的所有命令,在客户端显示报错信息,若是没有修改就会执行队列中的所有命令。

    WATCH key

    表示指定监视某个key,该命令只能在MULTI命令之前执行,如果监视的key被其他客户端修改,EXEC将会放弃执行队列中的所有命令

    UNWATCH

    取消监视之前通过WATCH 命令监视的key,通过执行EXEC 、DISCARD 两个命令之前监视的key也会被取消监视

    以上就是一个Redis事务的执行过程包含的命令,下面就来详细的围绕着这几个命令进行讲解。

    开始事务

    MULTI 命令表示事务的开始,当看到OK表示已经进入事务的状态:

    该命令执行后客户端会将当前的状态从非事务状态修改为事务状态,这一状态的切换是将客户端的flags属性中打开REDIS_MULTI来完成的,该命令可以理解关系型数据库Mysql的BEGIN TRANCATION语句:

    命令入队

    执行完MULTI命令后,后面执行的操作Redis五种类型的命令都会按顺序的进入命令队列中,该部分也是真正的业务逻辑的部分。

    Redis客户端的命令执行后若是当前状态处于事务状态命令就会进入队列中,并且返回QUEUED字符串,表示该命令已经进入了命令队列中,并且事务队列是以先进先出(FIFO)的方式保存入队的命令的。

    若是当前状态是非事务状态就会立即执行命令,并将结果返回客户端。在事务状态执行操作事务的命令就会被立即执行,如EXEC、DISCARD、UNWATCH

    结合上面的分析,Redis执行命令的流程如下图所示:

    事务的命令队列中有三个参数分别是:要执行的命令命令的参数参数的个数。例如:通过执行如下的命令:

    redis> MULTI
    OK
    redis> SET name "黎杜"
    QUEUED
    redis> GET name
    QUEUED
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    那么对应上面的队列中三个参数如下表格所示:

    执行的命令

    命令的参数

    参数的个数

    SET

    [“name”, “黎杜”]

    2

    GET

    [“name”]

    1

    执行事务

    当客户端执行EXEC命令的时候,上面的命令队列就会被按照先进先出的顺序被执行,当然执行的结果有成功有失败,这个后面分析。

    上面说到当客户端处于非事务的状态命令发送到服务端会被立即执行,若是客户端处于事务状态命令就会被放进命令队列。

    命令入队的时候,会按照顺序进入队列,队列以先进先出的特点来执行队列中的命令。

    若是客户端处于事务状态,执行的是EXEC、DISCARD、UNWATCH这些操作事务的命令,也会被立即执行。

    (1)正常执行

    还是上面的例子,执行如下的代码:

    redis> MULTI
    OK
    redis> SET name "黎杜"
    QUEUED
    redis> GET name
    QUEUED
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    所有的命令进入了队列,当最后执行EXEC,首先会执行SET命令,然后执行GET命令,并且执行后的结果也会进入一个队列中保存,最后返回给客户端:

    回复的类型

    回复的内容

    status code reply

    OK

    bulk reply

    “黎杜”

    所以最后你会在客户端看到OK、黎杜,这样的结果显示,这个也就是一个事务成功执行的过程。

    至此一个事务就完整的执行完成,并且此时客户端也从事务状态更改为非事务状态。

    (2)放弃事务

    当然你也可以放弃执行该事务,只要你再次执行DISCARD操作就会放弃执行此次的事务。具体代码如下所示:

    redis> MULTI
    OK
    redis> SET name "黎杜"
    QUEUED
    redis> GET name
    QUEUED
    redis> DISCARD    // 放弃执行事务
    OK
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    DISCARD命令取消一个事务的时候,就会将命令队列清空,并且将客户端的状态从事务状态修改为非事务的状态。

    Redis的事务是不可重复的,当客户端处于事务状态的时候,再次向服务端发送MULTI命令时,直接就会向客户端返回错误。

    WATCH 命令

    WATCH命令是在MULTI命令之前执行的,表示监视任意数量的key,与它对应的命令就是UNWATCH命令,取消监视的key。

    WATCH命令有点类似于乐观锁机制,在事务执行的时候,若是被监视的任意一个key被更改,则队列中的命令不会被执行,直接向客户端返回(nil)表示事务执行失败。

    下面我们来演示一下WATCH命令的操作流程,具体实现代码如下:

    redis> WATCH num
    OK
    redis> MULTI
    OK
    redis> incrby num 10
    QUEUED
    redis> decrby num 1
    QUEUED
    redis> EXEC   // 执行成功
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    这个是WATCH命令的正常的操作流程,若是在其它的客户端,修改了被监视的任意key,就会放弃执行该事务,如下图所示:

    客户端一

    客户端二

    WATCH num

    MULTI

    incrby num 10

    get num

    decrby num 1

    EXEC

    执行失败,返回(nil)

    WATCH命令的底层实现中保存了watched_keys 字典,字典的键保存的是监视的key,值是一个链表,链表中的每个节点值保存的是监视该key的客户端

    若是某个客户端不再监视某个key,该客户端就会从链表中脱离。如client3,通过执行UNWATCH命令,不再监视key1:

    错误处理

    上面说到Redis是没有回滚机制的,那么执行的过程,若是不小心敲错命令,Redis的命令发送到服务端没有被立即执行,所以是暂时发现不到该错误。

    那么在Redis中的错误处理主要分为两类:语法错误运行错误。下面主要来讲解一下这两类错误的区别。

    (1)语法错误

    比如执行命令的时候,命令的不存在或者错误的敲错命令、参数的个数不对等都会导致语法错误。

    下面来演示一下,执行下面的四个命令,前后的两个命令是正确的,中间的两个命令是错误的,如下所示:

    127.0.0.1:6379> multi
    OK
    127.0.0.1:6379> set num 1
    QUEUED
    127.0.0.1:6379> set num
    (error) ERR wrong number of arguments for 'set' command
    127.0.0.1:6379> ssset num 3
    (error) ERR unknown command 'ssset'
    127.0.0.1:6379> set num 2
    QUEUED
    127.0.0.1:6379> exec
    (error) EXECABORT Transaction discarded because of previous errors.
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    语法错误是在Redis语法检测的时候就能发现的,所以当你执行错误命令的时候,也会即使的返回错误的提示。

    最后,即使命令进入队列,只要存在语法错误,该队列中的命令都不会被执行,会直接向客户端返回事务执行失败的提示。

    (2)运行错误

    执行时使用不同类型的操作命令操作不同数据类型就会出现运行时错误,这种错误时Redis在不执行命令的情况下,是无法发现的。

    127.0.0.1:6379> multi
    OK
    127.0.0.1:6379> set num 3
    QUEUED
    127.0.0.1:6379> sadd num 4
    QUEUED
    127.0.0.1:6379> set num 6
    QUEUED
    127.0.0.1:6379> exec
    1) OK
    2) (error) WRONGTYPE Operation against a key holding the wrong kind of value
    3) OK
    127.0.0.1:6379> get key
    "6"
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    这样就会导致,正确的命令被执行,而错误的命令不会不执行,这也显示出Redis的事务并不能保证数据的一致性,因为中间出现了错误,有些语句还是被执行了。

    这样的结果只能程序员自己根据之前执行的命令,自己一步一步正确的回退,所谓自己的烂摊子,自己收拾。

    Redis事务与Mysql事务

    我们知道关系性数据库Mysql中具有事务的四大特性:原子性(Atomicity)、一致性(Consistency)、隔离性(Isolation)、持久性(Durability)

    但是Redis的事务为了保证Redis除了客户端的请求高效,去除了传统关系型数据库的事务回滚、加锁、解锁这些消耗性能的操作,Redis的事务实现简单。

    原子性中Redis的事务只能保证单个命令的原子性,多个命令就无法保证,如上面索道的运行时错误,即使中间有运行时错误出现也会正确的执行后面正确的命令,不具有回滚操作。

    既然没有了原子性,数据的一致性也就无法保证,这些都需要程序员自己手动去实现。

    Reids在进行事务的时候,不会被中断知道事务的运行结束,也具有一定的隔离性,并且Redis也能持久化数据。

    集群

    集群概述

    Redis作为缓存的高效中间件,在我们日常的开发中被频繁的使用,今天就来说一说Redis的四种模式,分别是单机版、主从复制、哨兵、以及集群模式

    可能,在一般公司的程序员使用单机版基本都能解决问题,在Redis的官网给出的数据是10W QPS,这对于应付一般的公司绰绰有余了,再不行就来个主从模式,实现都写分离,性能又大大提高。

    但是,我们作为有抱负的程序员,仅限于单机版和主从模式的crud是不行的,至少也要了解哨兵集群模式的原理,这样面试的时候才能和面试官扯皮啊。

    单机

    单机版的Redis就比较简单了,基本90%的程序员都是用过,官网推荐操作Redis的第三方依赖库是Jedis,在SpringBoot项目中,引入下面依赖就可以直接使用了:

    
    	  redis.clients
    	  jedis
    	  ${jedis.version}
    
    
    • 1
    • 2
    • 3
    • 4
    • 5

    优点

    单机版的Redis也有很多优点,比如实现实现简单、维护简单、部署简单、维护成本非常低,不需要其它额外的开支。

    缺点

    但是,因为是单机版的Redis所以也存在很多的问题,比如最明显的单点故障问题,一个Redis挂了,所有的请求就会直接打在了DB上。

    并且一个Redis抗并发数量也是有限的,同时要兼顾读写两种请求,只要访问量一上来,Redis就受不了了,另一方面单机版的Redis数据量存储也是有限的,数据量一大,再重启Redis的时候,就会非常的慢,所以局限性也是比较大的。

    实操搭建

    单机版的搭建教程,在网上有非常多的全面的教程,基本就是傻瓜式操作,特别是在本地搭建的话,基本使用yum快捷方便,几句命令就搞定了,这里推荐一个搭建教程:https://www.cnblogs.com/
    zuidongfeng/p/8032505.html。

    上面这个教程讲的非常的详细,环境的搭建本来是运维的工作,但是作为程序员尝试自己去搭建环境还是有必要的,而且搭建环境这种东西,基本就是一劳永逸,搭建一次,可能下次换电脑或者重装虚拟机才会再次搭建。

    这里也放出redis常用的redis.conf的配置项,并且附带注释,看我是不是很暖男:

    daemonize yes  // 设置后台启动,一般设置yes
    pidfile /var/run/redis.pid // edis以守护进程方式运行时,redis默认会把pid写入/var/run/redis.pid文件
    port 6379 // 默认端口为6379
    bind 127.0.0.1 //主机地址,设置未0.0.0.0表示都可以访问。127.0.0.1表示只允许本机访问
    timeout 900  // 客户端闲置多长时间后关闭连接,如果指定为0,表示关闭该功能
    logfile stdout // 日志记录方式,默认为标准输出
    logfile "./redis7001.log"  # 指明日志文件名
    databases 16 // 设置数据库的数量,默认数据库为0
    save  //有多少次更新操作,就将数据同步到数据文件
    	Redis默认配置文件中提供了三个条件:
    	save 900 1 //900秒(15分钟)内有1个更改
    	save 300 10 //300秒(5分钟)内有10个更改
    	save 60 10000  // 60秒内有10000个更改
    rdbcompression yes // 指定存储至本地数据库时是否压缩数据
    dbfilename dump.rdb //指定本地数据库文件名
    dir ./    //指定本地数据库存放目录
    slaveof  // 主从同步设置,设置主数据库的ip和端口
    # 如果非零,则设置SO_KEEPALIVE选项来向空闲连接的客户端发送ACK
    tcp-keepalive 60
    # 默认如果开启RDB快照(至少一条save指令)并且最新的后台保存失败,Redis将会停止接受写操作
    # 这将使用户知道数据没有正确的持久化到硬盘,否则可能没人注意到并且造成一些灾难
    stop-writes-on-bgsave-error yes
    # 默认如果开启RDB快照(至少一条save指令)并且最新的后台保存失败,Redis将会停止接受写操作。
    stop-writes-on-bgsave-error yes
    # 当导出到 .rdb 数据库时是否用LZF压缩字符串对象
    rdbcompression yes
    # 版本5的RDB有一个CRC64算法的校验和放在了文件的最后。这将使文件格式更加可靠。
    rdbchecksum yes
    # 持久化数据库的文件名
    dbfilename dump-master.rdb
    # 工作目录
    dir /usr/local/redis-4.0.8/redis_master/
    # slav服务连接master的密码
    masterauth testmaster123
    # 当一个slave失去和master的连接,或者同步正在进行中,slave的行为可以有两种:
    #1) 如果 slave-serve-stale-data 设置为 "yes" (默认值),slave会继续响应客户端请求,可能是正常数据,或者是过时了的数据,也可能是还没获得值的空数据。
    # 2) 如果 slave-serve-stale-data 设置为 "no",slave会回复"正在从master同步
    # (SYNC with master in progress)"来处理各种请求,除了 INFO 和 SLAVEOF 命令。
    slave-serve-stale-data yes
    # 配置是否仅读
    slave-read-only yes
    # 如果你选择“yes”Redis将使用更少的TCP包和带宽来向slaves发送数据。但是这将使数据传输到slave上有延迟,Linux内核的默认配置会达到40毫秒
    # 如果你选择了 "no" 数据传输到salve的延迟将会减少但要使用更多的带宽
    repl-disable-tcp-nodelay no
    # slave的优先级,优先级数字小的salve会优先考虑提升为master
    slave-priority 100
    # 密码验证
    requirepass testmaster123
    # redis实例最大占用内存,一旦内存使用达到上限,Redis会根据选定的回收策略(参见:
    # maxmemmory-policy)删除key
    maxmemory 3gb
    # 最大内存策略:如果达到内存限制了,Redis如何选择删除key。
    # volatile-lru -> 根据LRU算法删除带有过期时间的key。
    # allkeys-lru -> 根据LRU算法删除任何key。
    # volatile-random -> 根据过期设置来随机删除key, 具备过期时间的key。 
    # allkeys->random -> 无差别随机删, 任何一个key。 
    # volatile-ttl -> 根据最近过期时间来删除(辅以TTL), 这是对于有过期时间的key 
    # noeviction -> 谁也不删,直接在写操作时返回错误。
    maxmemory-policy volatile-lru
    # AOF开启
    appendonly no
    # aof文件名
    appendfilename "appendonly.aof"
    # fsync() 系统调用告诉操作系统把数据写到磁盘上,而不是等更多的数据进入输出缓冲区。
    # 有些操作系统会真的把数据马上刷到磁盘上;有些则会尽快去尝试这么做。
    # Redis支持三种不同的模式:
    # no:不要立刻刷,只有在操作系统需要刷的时候再刷。比较快。
    # always:每次写操作都立刻写入到aof文件。慢,但是最安全。
    # everysec:每秒写一次。折中方案。 
    appendfsync everysec
    # 如果AOF的同步策略设置成 "always" 或者 "everysec",并且后台的存储进程(后台存储或写入AOF
    # 日志)会产生很多磁盘I/O开销。某些Linux的配置下会使Redis因为 fsync()系统调用而阻塞很久。
    # 注意,目前对这个情况还没有完美修正,甚至不同线程的 fsync() 会阻塞我们同步的write(2)调用。
    # 为了缓解这个问题,可以用下面这个选项。它可以在 BGSAVE 或 BGREWRITEAOF 处理时阻止主进程进行fsync()。
    # 这就意味着如果有子进程在进行保存操作,那么Redis就处于"不可同步"的状态。
    # 这实际上是说,在最差的情况下可能会丢掉30秒钟的日志数据。(默认Linux设定)
    # 如果你有延时问题把这个设置成"yes",否则就保持"no",这是保存持久数据的最安全的方式。
    no-appendfsync-on-rewrite yes
    # 自动重写AOF文件
    auto-aof-rewrite-percentage 100
    auto-aof-rewrite-min-size 64mb
    # AOF文件可能在尾部是不完整的(这跟system关闭有问题,尤其是mount ext4文件系统时
    # 没有加上data=ordered选项。只会发生在os死时,redis自己死不会不完整)。
    # 那redis重启时load进内存的时候就有问题了。
    # 发生的时候,可以选择redis启动报错,并且通知用户和写日志,或者load尽量多正常的数据。
    # 如果aof-load-truncated是yes,会自动发布一个log给客户端然后load(默认)。
    # 如果是no,用户必须手动redis-check-aof修复AOF文件才可以。
    # 注意,如果在读取的过程中,发现这个aof是损坏的,服务器也是会退出的,
    # 这个选项仅仅用于当服务器尝试读取更多的数据但又找不到相应的数据时。
    aof-load-truncated yes
    # Lua 脚本的最大执行时间,毫秒为单位
    lua-time-limit 5000
    # Redis慢查询日志可以记录超过指定时间的查询
    slowlog-log-slower-than 10000
    # 这个长度没有限制。只是要主要会消耗内存。你可以通过 SLOWLOG RESET 来回收内存。
    slowlog-max-len 128
    # 客户端的输出缓冲区的限制,可用于强制断开那些因为某种原因从服务器读取数据的速度不够快的客户端
    client-output-buffer-limit normal 0 0 0
    client-output-buffer-limit slave 256mb 64mb 60
    client-output-buffer-limit pubsub 32mb 8mb 60
    # 当一个子进程重写AOF文件时,文件每生成32M数据会被同步
    aof-rewrite-incremental-fsync yes
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102

    由于,单机版的Redis在并发量比较大的时候,并且需要较高性能和可靠性的时候,单机版基本就不适合了,于是就出现了主从模式

    主从模式

    原理

    主从的原理还算是比较简单的,一主多从,主数据库(master)可以读也可以写(read/write),从数据库仅读(only read)

    但是,主从模式一般实现读写分离主数据库仅写(only write),减轻主数据库的压力,下面一张图搞懂主从模式的原理:

    主从模式原理就是那么简单,那他执行的过程(工作机制)又是怎么样的呢?再来一张图:


    当开启主从模式的时候,他的具体工作机制如下:

    1. 当slave启动后会向master发送SYNC命令,master节后到从数据库的命令后通过bgsave保存快照(RDB持久化),并且期间的执行的些命令会被缓存起来。
    2. 然后master会将保存的快照发送给slave,并且继续缓存期间的写命令。
    3. slave收到主数据库发送过来的快照就会加载到自己的数据库中。
    4. 最后master讲缓存的命令同步给slave,slave收到命令后执行一遍,这样master与slave数据就保持一致了。

    优点

    之所以运用主从,是因为主从一定程度上解决了单机版并发量大,导致请求延迟或者redis宕机服务停止的问题。

    从数据库分担主数据库的读压力,若是主数据库是只写模式,那么实现读写分离,主数据库就没有了读压力了。

    另一方面解决了单机版单点故障的问题,若是主数据库挂了,那么从数据库可以随时顶上来,综上来说,主从模式一定程度上提高了系统的可用性和性能,是实现哨兵和集群的基础。

    主从同步以异步方式进行同步,期间Redis仍然可以响应客户端提交的查询和更新的请求。

    缺点

    主从模式好是好,他也有自己的缺点,比如数据的一致性问题,假如主数据库写操作完成,那么他的数据会被复制到从数据库,若是还没有即使复制到从数据库,读请求又来了,此时读取的数据就不是最新的数据。

    若是从主同步的过程网络出故障了,导致主从同步失败,也会出现问题数据一致性的问题。

    主从模式不具备自动容错和恢复的功能,一旦主数据库,从节点晋升未主数据库的过程需要人为操作,维护的成本就会升高,并且主节点的写能力、存储能力都会受到限制。

    实操搭建

    下面的我们来实操搭建一下主从模式,主从模式的搭建还是比较简单的,我这里一台centos 7虚拟机,使用开启redis多实例的方法搭建主从。

    redis中开启多实例的方法,首先创建一个文件夹,用于存放redis集群的配置文件:

    mkdir redis
    
    • 1

    然后粘贴复制redis.conf配置文件:

    cp /root/redis-4.0.6/redis.conf /root/redis/redis-6379.conf
    cp /root/redis-4.0.6/redis.conf /root/redis/redis-6380.conf
    cp /root/redis-4.0.6/redis.conf /root/redis/redis-6381.conf
    
    • 1
    • 2
    • 3

    复制三份配置文件,一主两从,6379端口作为主数据库(master),6380、6381作为从数据库(slave)。

    首先是配置主数据库的配置文件:vi redis-6379.conf

    bind 0.0.0.0 # 注释掉或配置成0.0.0.0表示任意IP均可访问。
    protected-mode no # 关闭保护模式,使用密码访问。
    port 6379  # 设置端口,6380、6381依次为6380、6381。
    timeout 30 # 客户端连接空闲多久后断开连接,单位秒,0表示禁用
    daemonize yes # 在后台运行
    pidfile /var/run/redis_6379.pid  # pid进程文件名,6380、6381依次为redis_6380.pid、redis_6381.pid
    logfile /root/reids/log/6379.log # 日志文件,6380、6381依次为6380.log、6381.log
    save 900 1 # 900s内至少一次写操作则执行bgsave进行RDB持久化
    save 300 10
    save 60 10000 
    rdbcompression yes #是否对RDB文件进行压缩,建议设置为no,以(磁盘)空间换(CPU)时间
    dbfilename dump.rdb # RDB文件名称
    dir /root/redis/datas # RDB文件保存路径,AOF文件也保存在这里
    appendonly yes # 表示使用AOF增量持久化的方式
    appendfsync everysec # 可选值 always, everysec,no,建议设置为everysec
    requirepass 123456 # 设置密码
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    然后,就是修改从数据库的配置文件,在从数据库的配置文件中假如以下的配置信息:

    slaveof 127.0.0.1 6379 # 配置master的ip,port
    masterauth 123456 # 配置访问master的密码
    slaveof-serve-stale-data no 
    
    • 1
    • 2
    • 3

    接下来就是启动三个redis实例,启动的命令,先cd到redis的src目录下,然后执行:

    ./redis-server /root/redis/6379.conf
    ./redis-server /root/redis/6380.conf
    ./redis-server /root/redis/6381.conf
    
    • 1
    • 2
    • 3

    通过命令ps -aux | grep redis,查看启动的redis进程:

    如上图所示,表示启动成功,下面就开始进入测试阶段。

    测试

    我这里使用SecureCRT作为redis连接的客户端,同时启动三个SecureCRT,分别连接redis1的三个实例,启动时指定端口以及密码:

    ./redis-cli -p 6379 -a 123456
    
    • 1


    启动后,在master(6379),输入:set name ‘ldc’,在slave中通过get name,可以查看:


    数据同步成功,这有几个坑一个是redis.conf中没有设置对bind,会导致非本机的ip被过滤掉,一般配置0.0.0.0就可以了。

    另一个是没有配置密码requirepass 123456,会导致IO一直连接异常,这个是我遇到的坑,后面配置密码后就成功了。

    还有,就是查看redis的启动日志可以发现有两个warning,虽然不影响搭建主从同步,看着挺烦人的,但是有些人会遇到,有些人不会遇到。

    但是,我这个人比较有强迫症,百度也是有解决方案的,这里就不讲了,交给你们自己解决,这里只是告诉你有这个问题,有些人看都不看日志的,看到启动成功就认为万事大吉了,也不看日志,这习惯并不好。

    哨兵模式

    原理

    哨兵模式是主从的升级版,因为主从的出现故障后,不会自动恢复,需要人为干预,这就很蛋疼啊。

    在主从的基础上,实现哨兵模式就是为了监控主从的运行状况,对主从的健壮进行监控,就好像哨兵一样,只要有异常就发出警告,对异常状况进行处理。


    所以,总的概括来说,哨兵模式有以下的优点(功能点):

    1. 监控:监控master和slave是否正常运行,以及哨兵之间也会相互监控
    2. 自动故障恢复:当master出现故障的时候,会自动选举一个slave作为master顶上去。

    哨兵模式的监控配置信息,是通过配置从数据库的sentinel monitor 来指定的,比如:

    // mymaster 表示给master数据库定义了一个名字,后面的是master的ip和端口,1表示至少需要一个Sentinel进程同意才能将master判断为失效,如果不满足这个条件,则自动故障转移(failover)不会执行
    sentinel monitor mymaster 127.0.0.1 6379 1
    
    • 1
    • 2

    节点通信

    当然还有其它的配置信息,其它配置信息,在环境搭建的时候再说。当哨兵启动后,会与master建立一条连接,用于订阅master的_sentinel_:hello频道。

    该频道用于获取监控该master的其它哨兵的信息。并且还会建立一条定时向master发送INFO命令获取master信息的连接。

    当哨兵与master建立连接后,定期会向(10秒一次)master和slave发送INFO命令,若是master被标记为主观下线,频率就会变为1秒一次。

    并且,定期向_sentinel_:hello频道发送自己的信息,以便其它的哨兵能够订阅获取自己的信息,发送的内容包含哨兵的ip和端口、运行id、配置版本、master名字、master的ip端口还有master的配置版本等信息。

    以及,定期的向master、slave和其它哨兵发送PING命令(每秒一次),以便检测对象是否存活,若是对方接收到了PING命令,无故障情况下,会回复PONG命令。

    所以,哨兵通过建立这两条连接、通过定期发送INFO、PING命令来实现哨兵与哨兵、哨兵与master之间的通信。

    这里涉及到一些概念需要理解,INFO、PING、PONG等命令,后面还会有MEET、FAIL命令,以及主观下线,当然还会有客观下线,这里主要说一下这几个概念的理解:

    1. INFO:该命令可以获取主从数据库的最新信息,可以实现新结点的发现
    2. PING:该命令被使用最频繁,该命令封装了自身节点和其它节点的状态数据。
    3. PONG:当节点收到MEET和PING,会回复PONG命令,也把自己的状态发送给对方。
    4. MEET:该命令在新结点加入集群的时候,会向老节点发送该命令,表示自己是个新人
    5. FAIL:当节点下线,会向集群中广播该消息。

    上线和下线

    当哨兵与master相同之后就会定期一直保持联系,若是某一时刻哨兵发送的PING在指定时间内没有收到回复(sentinel down-after-milliseconds master-name milliseconds 配置),那么发送PING命令的哨兵就会认为该master主观下线Subjectively Down)。

    因为有可能是哨兵与该master之间的网络问题造成的,而不是master本身的原因,所以哨兵同时会询问其它的哨兵是否也认为该master下线,若是认为该节点下线的哨兵达到一定的数量(前面的quorum字段配置),就会认为该节点客观下线Objectively Down)。

    若是没有足够数量的sentinel同意该master下线,则该master客观下线的标识会被移除;若是master重新向哨兵的PING命令回复了客观下线的标识也会被移除。

    选举算法

    当master被认为客观下线后,又是怎么进行故障恢复的呢?原来哨兵中首先选举出一个老大哨兵来进行故障恢复,选举老大哨兵的算法叫做Raft算法

    1. 发现master下线的哨兵(sentinelA)会向其它的哨兵发送命令进行拉票,要求选择自己为哨兵大佬。
    2. 若是目标哨兵没有选择其它的哨兵,就会选择该哨兵(sentinelA)为大佬。
    3. 若是选择sentinelA的哨兵超过半数(半数原则),该大佬非sentinelA莫属。
    4. 如果有多个哨兵同时竞选,并且可能存在票数一致的情况,就会等待下次的一个随机时间再次发起竞选请求,进行新的一轮投票,直到大佬被选出来。

    选出大佬哨兵后,大佬哨兵就会对故障进行自动回复,从slave中选出一名slave作为主数据库,选举的规则如下所示:

    1. 所有的slave中slave-priority优先级最高的会被选中。
    2. 若是优先级相同,会选择偏移量最大的,因为偏移量记录着数据的复制的增量,越大表示数据越完整。
    3. 若是以上两者都相同,选择ID最小的。

    通过以上的层层筛选最终实现故障恢复,当选的slave晋升为master,其它的slave会向新的master复制数据,若是down掉的master重新上线,会被当作slave角色运行。

    优点

    哨兵模式是主从模式的升级版,所以在系统层面提高了系统的可用性和性能、稳定性。当master宕机的时候,能够自动进行故障恢复,需不要人为的干预。

    哨兵于哨兵之间、哨兵与master之间能够进行及时的监控,心跳检测,及时发现系统的问题,这都是弥补了主从的缺点。

    缺点

    哨兵一主多从的模式同样也会遇到写的瓶颈,已经存储瓶颈,若是master宕机了,故障恢复的时间比较长,写的业务就会受到影响。

    增加了哨兵也增加了系统的复杂度,需要同时维护哨兵模式。

    实操搭建

    最后,我们进行一下哨兵模式的搭建,配置哨兵模式还是比较简单的,在上面配置的主从模式的基础上,同时创建一个文件夹用于存放三个哨兵的配置文件:

    mkdir /root/redis-4.0.6/sentinel.conf  /root/redis/sentinel/sentinel1.conf 
    mkdir /root/redis-4.0.6/sentinel.conf  /root/redis/sentinel/sentinel2.conf 
    mkdir /root/redis-4.0.6/sentinel.conf  /root/redis/sentinel/sentinel3.conf 
    
    • 1
    • 2
    • 3

    分别在这三个文件中添加如下配置:

    daemonize yes # 在后台运行
    sentinel monitor mymaster 127.0.0.1 6379 1 # 给master起一个名字mymaster,并且配置master的ip和端口
    sentinel auth-pass mymaster 123456 # master的密码
    port 26379 #另外两个配置36379,46379端口
    sentinel down-after-milliseconds mymaster 3000 # 3s未回复PING就认为master主观下线
    sentinel parallel-syncs mymaster 2  # 执行故障转移时,最多可以有2个slave实例在同步新的master实例
    sentinel failover-timeout mymaster 100000 # 如果在10s内未能完成故障转移操作认为故障转移失败
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    配置完后分别启动三台哨兵:

    ./redis-server sentinel1.conf --sentinel
    ./redis-server sentinel2.conf --sentinel
    ./redis-server sentinel3.conf --sentinel
    
    • 1
    • 2
    • 3

    然后通过:ps -aux|grep redis进行查看:

    可以看到三台redis实例以及三个哨兵都已经正常启动,现登陆6379,通过INFO Repliaction查看master信息:


    当前master为6379,然后我们来测试一下哨兵的自动故障恢复,直接kill掉6379进程,然后通过登陆6380再次查看master的信息:


    可以看到当前的6380角色是master,并且6380可读可写,而不是只读模式,这说明我们的哨兵是起作用了,搭建成功,感兴趣的可以自行搭建,也有可能你会踩一堆的坑。

    Cluster模式

    最后,Cluster是真正的集群模式了,哨兵解决和主从不能自动故障恢复的问题,但是同时也存在难以扩容以及单机存储、读写能力受限的问题,并且集群之前都是一台redis都是全量的数据,这样所有的redis都冗余一份,就会大大消耗内存空间。

    集群模式实现了Redis数据的分布式存储,实现数据的分片,每个redis节点存储不同的内容,并且解决了在线的节点收缩(下线)和扩容(上线)问题。

    集群模式真正意义上实现了系统的高可用和高性能,但是集群同时进一步使系统变得越来越复杂,接下来我们来详细的了解集群的运作原理。

    数据分区原理

    集群的原理图还是很好理解的,在Redis集群中采用的使虚拟槽分区算法,会把redis集群分成16384 个槽(0 -16383)。

    比如:下图所示三个master,会把0 -16383范围的槽可能分成三部分(0-5000)、(5001-11000)、(11001-16383)分别数据三个缓存节点的槽范围。


    当客户端请求过来,会首先通过对key进行CRC16 校验并对 16384 取模(CRC16(key)%16383)计算出key所在的槽,然后再到对应的槽上进行取数据或者存数据,这样就实现了数据的访问更新。

    之所以进行分槽存储,是将一整堆的数据进行分片,防止单台的redis数据量过大,影响性能的问题。

    节点通信

    节点之间实现了将数据进行分片存储,那么节点之间又是怎么通信的呢?这个和前面哨兵模式讲的命令基本一样。

    首先新上线的节点,会通过 Gossip 协议向老成员发送Meet消息,表示自己是新加入的成员。

    老成员收到Meet消息后,在没有故障的情况下会恢复PONG消息,表示欢迎新结点的加入,除了第一次发送Meet消息后,之后都会发送定期PING消息,实现节点之间的通信。

    通信的过程中会为每一个通信的节点开通一条tcp通道,之后就是定时任务,不断的向其它节点发送PING消息,这样做的目的就是为了了解节点之间的元数据存储情况,以及健康状况,以便即使发现问题。

    数据请求

    上面说到了槽信息,在Redis的底层维护了unsigned char myslots[CLUSTER_SLOTS/8] 一个数组存放每个节点的槽信息。

    因为他是一个二进制数组,只有存储0和1值,如下图所示:

    这样数组只表示自己是否存储对应的槽数据,若是1表示存在该数据,0表示不存在该数据,这样查询的效率就会非常的高,类似于布隆过滤器,二进制存储。

    比如:集群节点1负责存储0-5000的槽数据,但是此时只有0、1、2存储有数据,其它的槽还没有存数据,所以0、1、2对应的值为1。

    并且,每个redis底层还维护了一个clusterNode数组,大小也是16384,用于储存负责对应槽的节点的ip、端口等信息,这样每一个节点就维护了其它节点的元数据信息,便于及时的找到对应的节点。

    当新结点加入或者节点收缩,通过PING命令通信,及时的更新自己clusterNode数组中的元数据信息,这样有请求过来也就能及时的找到对应的节点。


    有两种其它的情况就是,若是请求过来发现,数据发生了迁移,比如新节点加入,会使旧的缓存节点数据迁移到新结点。

    请求过来发现旧节点已经发生了数据迁移并且数据被迁移到新结点,由于每个节点都有clusterNode信息,通过该信息的ip和端口。此时旧节点就会向客户端发一个MOVED 的重定向请求,表示数据已经迁移到新结点上,你要访问这个新结点的ip和端口就能拿到数据,这样就能重新获取到数据。

    倘若正在发正数据迁移呢?旧节点就会向客户端发送一个ASK 重定向请求,并返回给客户端迁移的目标节点的ip和端口,这样也能获取到数据。

    扩容和收缩

    扩容和收缩也就是节点的上线和下线,可能节点发生故障了,故障自动回复的过程(节点收缩)。

    节点的收缩和扩容时,会重新计算每一个节点负责的槽范围,并发根据虚拟槽算法,将对应的数据更新到对应的节点。

    还有前面的讲的新加入的节点会首先发送Meet消息,详细可以查看前面讲的内容,基本一样的模式。

    以及发生故障后,哨兵老大节点的选举,master节点的重新选举,slave怎样晋升为master节点,可以查看前面哨兵模式选举过程。

    优点

    集群模式时一个无中心的架构模式,将数据进行分片,分不到对应的槽中,每个节点存储不同的数据内容,通过路由能够找到对应的节点负责存储的槽,能够实现高效率的查询。

    并且集群模式增加了横向和纵向的扩展能力,实现节点加入和收缩,集群模式时哨兵的升级版,哨兵的优点集群都有。

    缺点

    缓存的最大问题就是带来数据一致性问题,在平衡数据一致性的问题时,兼顾性能与业务要求,大多数都是以最终一致性的方案进行解决,而不是强一致性。

    并且集群模式带来节点数量的剧增,一个集群模式最少要6台机,因为要满足半数原则的选举方式,所以也带来了架构的复杂性。

    slave只充当冷备,并不能缓解master的读的压力。

    实操搭建

    集群模式的部署比较简单,只要在redis.conf加入下面的配置信息即可:

    port 6379# 本示例6个节点端口分别为6379、6380、6381、6382、6383、6384
    daemonize yes # r后台运行 
    pidfile /var/run/redis_6379.pid # 分别对应6379、6380、6381、6382、6383、6384
    cluster-enabled yes # 开启集群模式 
    masterauth 123456# 如果设置了密码,需要指定master密码
    cluster-config-file nodes_6379.conf # 集群的配置文件,同样对应6379、6380、6381、6382、6383、6384六个节点
    cluster-node-timeout 10000 # 请求超时时间
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    同时开启这六个实例,通过下面的命令将这六个实例以集群的方式运行

    ./redis-cli --cluster create --cluster-replicas 1 127.0.0.1:6379 127.0.0.1:6380 127.0.0.1:6381  127.0.0.1:6382  127.0.0.1:6383  127.0.0.1:6384  -a 123456
    
    • 1

    这样就实现了集群的搭建,好了这一期就完成了,看了一下字数一共1.7W字,原创不易,看完点个在看和分享,不要白嫖我,传承中华民族的良好美德。

    应用

    1.订阅与发布简介

    Redis发布与发布功能(Pub/Sub)是基于事件座位基本的通信机制,是目前应用比较普遍的通信模型,它的目的主要是解除消息的发布者与订阅者之间的耦合关系

    Redis作为消息发布和订阅之间的服务器,起到桥梁的作用,在Redis里面有一个channel的概念,也就是频道,发布者通过指定发布到某个频道,然后只要有订阅者订阅了该频道,该消息就会发送给订阅者,原理图如下所示:

    Redis同时也可以使用list类型实现消息队列(消息队列的实现以及应用场景会在下一篇文章继续讲解)。

    Redis的发布与订阅的功能应用还是比较广泛的,它的应用场景有很多。比如:最常见的就是实现实时聊天的功能,还是有就是博客的粉丝文章的推送,当博主推送原创文章的时候,就会将文章实时推送给博主的粉丝。

    简介完Redis的发布于订阅功能,下面就要来实操一下,包括linux命令的实操和java代码的实现。

    命令实操

    这里就假设各位读者都已经安装好自己的虚拟机环境和Redis了,若是没有安装好的,可以参考这一篇博文:https://www.cnblogs.com/
    zuidongfeng/p/8032505.html

    我这里是已经安装好了Redis了,直接启动我们的Redis,我已经设置好了开机启动,上面的那篇博文有讲解怎么设置开机启动。

    发布消息

    Redis中发布消息的命令是publish,具体使用如下所示:


    PUBLISH test “haha”:test表示频道的名称,haha表示发布的内容,这样就完成了一个一个消息的发布,后面的返回(integer)0表示0人订阅。

    订阅频道

    于此同时再启动一个窗口,这个窗口作为订阅者,订阅者的命令subscribe,使用SUBSCRIBE test就表示订阅了test这个频道

    订阅后返回的结果中由三条信息,第一个表示类型、第二个表示订阅的频道,第三个表示订阅的数量。接着在第一个窗口进行发布消息:


    可以看到发布者发布的消息,订阅者都会实时的接收到,并发订阅者收到的信息中也会出现三条信息,分别表示:返回值的类型、频道名称、消息内容

    取消订阅

    若是想取消之前的订阅可以使用unsubscribe命令,格式为:

    unsubscribe  频道名称
    // 取消之前订阅的test频道
    unsubscribe  test
    
    • 1
    • 2
    • 3

    输入命令后,返回以下结果:

    [root@pinyoyougou-docker src]# ./redis-cli 
    127.0.0.1:6379> UNSUBSCRIBE test
    1) "unsubscribe"
    2) "test"
    3) (integer) 0
    
    • 1
    • 2
    • 3
    • 4
    • 5

    它分别表示:返回值的类型、频道的名称、该频道订阅的数量

    按模式订阅

    除了直接以特定的名城进行订阅,还可以按照模式进行订阅,模式的方式进行订阅可以一次订阅多个频道,按照模式进行订阅的命令为psubscribe,具体格式如下:

    psubscribe  模式
    // 表示订阅名称以ldc开头的频道
    psubscribe  ldc*
    
    • 1
    • 2
    • 3

    输入上面的命令后,返回如下结果:

    127.0.0.1:6379> PSUBSCRIBE ldc*
    Reading messages... (press Ctrl-C to quit)
    1) "psubscribe"
    2) "ldc*"
    3) (integer) 1
    
    • 1
    • 2
    • 3
    • 4
    • 5

    这个也是非常简单,分别表示:返回的类型(表示按模式订阅类型)、订阅的模式、订阅数

    取消按模式订阅

    假如你想取消之前的按模式订阅,可以使用punsubscribe来取消,具体格式:

    punsubscribe 模式
    // 取消频道名称按照ldc开头的频道
    punsubscribe ldc*
    
    • 1
    • 2
    • 3

    他的返回值,如下所示:

    127.0.0.1:6379> PUNSUBSCRIBE ldc*
    1) "punsubscribe"
    2) "ldc*"
    3) (integer) 0
    
    • 1
    • 2
    • 3
    • 4

    这个就不多说了,表示的意思和上面的一样,可以看到上面的命令都是有规律的订阅SUBSCRIBE,取消就是UNSUBSCRIBE,前面加前缀UN,按模式订阅也是。

    查看订阅消息

    (1)你想查看某一个模式下订阅数是大于零的频道,可以使用如下格式的命令进行操作:

    pubsub channels 模式
    // 查看频道名称以ldc模式开头的订阅数大于零的频道
    pubsub channels ldc*
    
    • 1
    • 2
    • 3

    (2)假如你想查看某一个频道的订阅数,可以使用如下命令:

    pubsub numsub 频道名称
    
    • 1

    (3)查看按照模式的订阅数,可以使用如下命令进行操作:

    pubsub numpat
    
    • 1

    到这里以上的命令操作就基本结束了,下面就来代码实战。

    代码实练

    (1)首先第一步想要操作Redis,再SpringBoot项目中引入jedis的依赖,毕竟jedis是官方推荐使用操作Redis的工具。

    
        redis.clients
        jedis
        2.9.0
    
    
    • 1
    • 2
    • 3
    • 4
    • 5

    (2)然后创建发布者Publisher,用于消息的发布,具体代码如下:

    package com.ldc.org.myproject.demo.redis;
    
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.JedisPool;
    
    /**
     * 发布者
     * @author liduchang
     *
     */
    public class Publisher extends Thread{
    	// 连接池	
    	private final JedisPool jedisPool;
    	// 发布频道名称
    	private String name;
    	
    	public Publisher(JedisPool jedisPool, String name) {
    		super();
    		this.jedisPool = jedisPool;
    		this.name = name;
    	}
    	
    	@Override
    	public void run() {
    		// 获取要发布的消息
    		BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
    		// 获取连接
    		Jedis resource = jedisPool.getResource();
    		while (true) {
    			String message = null;
    			try {
    				message = reader.readLine();
    				if (!"exit".equals(message)) {
    					// 发布消息
    					resource.publish(name, "发布者:"+Thread.currentThread().getName()+"发布消息:"+message);
    				} else {
    					break;
    				}
    			} catch (IOException e) {
    				e.printStackTrace();
    			}
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    (3)接着创建订阅类Subscriber,并且继承JedisPubSub 类,重写onMessage、onSubscribe、onUnsubscribe三个方法,这三个方法的调用时机在注释上都有说明,具体的实现代码如下:

    package com.ldc.org.myproject.demo.redis;
    
    import com.fasterxml.jackson.core.sym.Name;
    import redis.clients.jedis.JedisPubSub;
    
    /**
     * 订阅者
     * @author liduchang
     */
    public class Subscriber extends JedisPubSub {
    	//订阅频道名称
    	private String name;
    	
    	public Subscriber(String name) {
    		this.name = name;
    	}
    
    	/**
    	 * 订阅者收到消息时会调用
    	 */
    	@Override
    	public void onMessage(String channel, String message) {
    		// TODO Auto-generated method stub
    		super.onMessage(channel, message);
    		System.out.println("频道:"+channel+"  接受的消息为:"+message);
    	}
    
    	/**
    	 * 订阅了频道会被调用
    	 */
    	@Override
    	public void onSubscribe(String channel, int subscribedChannels) {
    		System.out.println("订阅了频道:"+channel+"  订阅数为:"+subscribedChannels);
    	}
    
    	/**
    	 * 取消订阅频道会被调用
    	 */
    	@Override
    	public void onUnsubscribe(String channel, int subscribedChannels) {
    		System.out.println("取消订阅的频道:"+channel+"  订阅的频道数量为:"+subscribedChannels);
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    (4)这次创建的才是真正的订阅者SubThread,上面的Subscriber是指为了测试实订阅的时候或者发布消息,能够有信息输出:

    package com.ldc.org.myproject.demo.redis;
    
    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.JedisPool;
    
    /**
     * 订阅者线程
     * @author liduchang
     *
     */
    public class SubThread extends Thread {
    	
    	private final JedisPool jedisPool;
    	
    	private final Subscriber subscriber;
    	
    	private String name;
    	
    	public SubThread(JedisPool jedisPool,Subscriber subscriber,String name) {
    		super();
    		this.jedisPool = jedisPool;
    		this.subscriber = subscriber;
    		this.name = name;
    	}
    	
    	@Override
    	public void run() {
    		Jedis jedis = null;
    		try {
    			jedis = jedisPool.getResource();
    			// 订阅频道为name
    			jedis.subscribe(subscriber, name);
    		} catch (Exception e) {
    			System.err.println("订阅失败");
    		    e.printStackTrace();
    		} finally {
    			if (jedis!=null) {
    				 // jedis.close();
    				 //归还连接到redis池中
    				jedisPool.returnResource(jedis);
    			}
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    (5)后面就是测试了,分别测试发布与订阅的测试,发布者为TestPublisher,订阅者为TestSubscriber

    package com.ldc.org.myproject.demo.redis;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    import redis.clients.jedis.JedisPool;
    
    public class TestPublisher {
    	
    	public static void main(String[] args) throws InterruptedException {
    		JedisPool jedisPool = new JedisPool("192.168.163.155");
    		// 向ldc频道发布消息
    		Publisher publisher = new Publisher(jedisPool, "ldc");
    		publisher.start();
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    订阅者

    package com.ldc.org.myproject.demo.redis;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    import redis.clients.jedis.JedisPool;
    
    public class TestSubscriber1 {
    	
    	public static void main(String[] args) throws InterruptedException {
    		JedisPool jedisPool = new JedisPool("192.168.163.155",6379);
    		Subscriber subscriber = new Subscriber("黎杜");
    		// 订阅ldc频道
    		SubThread thread= new SubThread(jedisPool, subscriber, "ldc");
    		thread.start();
    		Thread.sleep(600000);
    		// 取消订阅
    		subscriber.unsubscribe("ldc");
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    这里为了测试方便就直接创建线程的方式,更好的话可以使用线程池的方式通过线程池的submit方法来执行线程,若是不用了可以使用shutdown方式关闭。

    好了这一期的Redis的实现订阅与发布的讲解就说完了,我们下一期在讲解Redis的集群的知识,下期再见。

    2.Redis实现的分布式锁

    之前讲了一片Redis事务的文章,很多读者Redis事务有啥用,主要是因为Redis的事务并没有Mysql的事务那么强大,所以一般的公司一般确实是用不到。

    这里就来说一说Redis事务的一个实际用途,它可以用来实现一个简单的秒杀系统的库存扣减,下面我们就来进行代码的实现。

    (1)首先使用线程池初始化5000个客户端。

    public static void intitClients() {
    	ExecutorService threadPool= Executors.newCachedThreadPool();
    	for (int i = 0; i < 5000; i++) {
    		threadPool.execute(new Client(i));
    	}
    	threadPool.shutdown();
    	
    	while(true){ 
             if(threadPool.isTerminated()){  
                 break;  
             }  
         }  
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    (2)接着初始化商品的库存数为1000。

    public static void initPrductNum() {
    		Jedis jedis = RedisUtil.getInstance().getJedis();
    		jedisUtils.set("produce", "1000");// 初始化商品库存数
    		RedisUtil.returnResource(jedis);// 返还数据库连接
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    (3)最后是库存扣减的每条线程的处理逻辑。

    /**
     * 顾客线程
     * 
     * @author linbingwen
     *
     */
    class client implements Runnable {
    	Jedis jedis = null;
    	String key = "produce"; // 商品数量的主键
    	String name;
     
    	public ClientThread(int num) {
    		name= "编号=" + num;
    	}
     
    	public void run() {
    	
    		while (true) {
    			jedis = RedisUtil.getInstance().getJedis();
    			try {
    				jedis.watch(key);
    				int num= Integer.parseInt(jedis.get(key));// 当前商品个数
    				if (num> 0) {
    					Transaction ts= jedis.multi(); // 开始事务
    					ts.set(key, String.valueOf(num - 1)); // 库存扣减
    					List result = ts.exec(); // 执行事务
    					if (result == null || result.isEmpty()) {
    						System.out.println("抱歉,您抢购失败,请再次重试");
    					} else {
    						System.out.println("恭喜您,抢购成功");
    						break;
    					}
    				} else {
    					System.out.println("抱歉,商品已经卖完");
    					break;
    				}
    			} catch (Exception e) {
    				e.printStackTrace();
    			} finally {
    				jedis.unwatch(); // 解除被监视的key
    				RedisUtil.returnResource(jedis);
    			}
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    在代码的实现中有一个重要的点就是商品的数据量被watch了,当前的客户端只要发现数量被改变就会抢购失败,然后不断的自旋进行抢购。

    这个是基于Redis事务实现的简单的秒杀系统,Redis事务中的watch命令有点类似乐观锁的机制,只要发现商品数量被修改,就执行失败。

    Redis实现分布式锁的第二种方式,可以使用setnx、getset、expire、del这四个命令来实现。

    1. setnx:命令表示如果key不存在,就会执行set命令,若是key已经存在,不会执行任何操作。
    2. getset:将key设置为给定的value值,并返回原来的旧value值,若是key不存在就会返回返回nil 。
    3. expire:设置key生存时间,当当前时间超出了给定的时间,就会自动删除key。
    4. del:删除key,它可以删除多个key,语法如下:DEL key [key …],若是key不存在直接忽略。

    下面通过一个代码案例是实现以下这个命令的操作方式:

    public void redis(Produce produce) {
            long timeout= 10000L; // 超时时间
            Long result= RedisUtil.setnx(produce.getId(), String.valueOf(System.currentTimeMillis() + timeout));
            if (result!= null && result.intValue() == 1) { // 返回1表示成功获取到锁
    	        RedisUtil.expire(produce.getId(), 10);//有效期为5秒,防止死锁
    	        //执行业务操作
    	        ......
    	        //执行完业务后,释放锁
    	        RedisUtil.del(produce.getId());
            } else {
               System.println.out("没有获取到锁")
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    在线程A通过setnx方法尝试去获取到produce对象的锁,若是获取成功旧会返回1,获取不成功,说明当前对象的锁已经被其它线程锁持有。

    获取锁成功后并设置key的生存时间,能够有效的防止出现死锁,最后就是通过del来实现删除key,这样其它的线程就也可以获取到这个对象的锁。

    执行的逻辑很简单,但是简单的同时也会出现问题,比如你在执行完setnx成功后设置生存时间不生效,此时服务器宕机,那么key就会一直存在Redis中。

    当然解决的办法,你可以在服务器destroy函数里面再次执行:

    RedisUtil.del(produce.getId());
    
    • 1

    或者通过定时任务检查是否有设置生存时间,没有的话都会统一进行设置生存时间。

    还有比较好的解决方案就是,在上面的执行逻辑里面,若是没有获取到锁再次进行key的生存时间:

    public void redis(Produce produce) {
            long timeout= 10000L; // 超时时间
            Long result= RedisUtil.setnx(produce.getId(), String.valueOf(System.currentTimeMillis() + timeout));
            if (result!= null && result.intValue() == 1) { // 返回1表示成功获取到锁
    	        RedisUtil.expire(produce.getId(), 10);//有效期为10秒,防止死锁
    	        //执行业务操作
    	        ......
    	        //执行完业务后,释放锁
    	        RedisUtil.del(produce.getId());
            } else {
                String value= RedisUtil.get(produce.getId());
                // 存在该key,并且已经超时
                if (value!= null && System.currentTimeMillis() > Long.parseLong(value)) {
                    String result = RedisUtil.getSet(produce.getId(), String.valueOf(System.currentTimeMillis() + timeout)); 
                    if (result == null || (result != null && StringUtils.equals(value, result))) {
                         RedisUtil.expire(produce.getId(), 10);//有效期为10秒,防止死锁
    			        //执行业务操作
    			        ......
    			        //执行完业务后,释放锁
    			        RedisUtil.del(produce.getId());
                    } else {
                        System.println("没有获取到锁")
                    }
                } else {
                    System.println("没有获取到锁")
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    这里对上面的代码进行了改进,在获取setnx失败的时候,再次重新判断该key的锁时间是否失效或者不存在,并重新设置生存的时间,避免出现死锁的情况。

    第三种Redis实现分布式锁,可以使用Redisson来实现,它的实现简单,已经帮我们封装好了,屏蔽了底层复杂的实现逻辑。

    先来一个Redisson的原理图,后面回对这个原理图进行详细的介绍:
    图片来源于网络

    我们在实际的项目中要使用它,只需要引入它的依赖,然后执行下面的代码:

    RLock lock = redisson.getLock("lockName");
    lock.locl();
    lock.unlock();
    
    • 1
    • 2
    • 3

    并且它还支持Redis单实例、Redis哨兵、redis cluster、redis master-slave等各种部署架构,都给你完美的实现,不用自己再次拧螺丝。

    但是,crud的同时还是要学习一下它的底层的实现原理,下面我们来了解下一下,对于一个分布式的锁的框架主要的学习分为下面的5个点:

    1. 加锁机制
    2. 解锁机制
    3. 生存时间延长机制
    4. 可重入加锁机制
    5. 锁释放机制

    只要掌握一个框架的这五个大点,基本这个框架的核心思想就已经掌握了,若是要你去实现一个锁机制框架,就会有大体的一个思路。

    Redisson中的加锁机制是通过lua脚本进行实现,Redisson首先会通过hash算法,选择redis cluster集群中的一个节点,接着会把一个lua脚本发送到Redis中。

    它底层实现的lua脚本如下:

    returncommandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
    	"if (redis.call('exists', KEYS[1]) == 0) then " +
    	      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
    	      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
    	      "return nil; " +
    	  "end; " +
    	  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
    	      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
    	      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
    	      "return nil; " +
    	  "end; " +
    	  "return redis.call('pttl', KEYS[1]);",
    	    Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    redis.call()的第一个参数表示要执行的命令,KEYS[1]表示要加锁的key值,ARGV[1]表示key的生存时间,默认时30秒,ARGV[2]表示加锁的客户端的ID。

    比如第一行中redis.call('exists', KEYS[1]) == 0) 表示执行exists命令判断Redis中是否含有KEYS[1],这个还是比较好理解的。

    lua脚本中封装了要执行的业务逻辑代码,它能够保证执行业务代码的原子性,它通过hset lockName命令完成加锁。

    若是第一个客户端已经通过hset命令成功加锁,当第二个客户端继续执行lua脚本时,会发现锁已经被占用,就会通过pttl myLock返回第一个客户端的持锁生存时间。

    若是还有生存时间,表示第一个客户端会继续持有锁,那么第二个客户端就会不停的自旋尝试取获取锁。

    假如第一个客户端持有锁的时间快到期了,想继续持有锁,可以给它启动一个watch dog看门狗,他是一个后台线程会每隔10秒检查一次,可以不断的延长持有锁的时间。

    Redisson中可重入锁的实现是通过incrby lockName来实现,重入一个计数就会+1,释放一次锁计数就会-1

    最后,使用完锁后执行del lockName就可以直接释放锁,这样其它的客户端就可以争抢到该锁了。

    这就是分布式锁的开源Redisson框架底层锁机制的实现原理,我们可以在生产中实现该框架实现分布式锁的高效使用。

    下面通过一个多窗口抢票的例子代码来实现:

    public class SellTicket implements Runnable {
        private int ticketNum = 1000;
        RLock lock = getLock();
        // 获取锁 
        private RLock getLock() {
            Config config = new Config();
            config.useSingleServer().setAddress("redis://localhost:6379");
            Redisson redisson = (Redisson) Redisson.create(config);
            RLock lock = redisson.getLock("keyName");
            return lock;
        }
     
        @Override
        public void run() {
            while (ticketNum>0) {
                // 获取锁,并设置超时时间
                lock.lock(1, TimeUnit.MINUTES);
                try {
                    if (ticketNum> 0) {
                        System.out.println(Thread.currentThread().getName() + "出售第 " + ticketNum-- + " 张票");
                    }
                } finally {
                    lock.unlock(); // 释放锁
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    测试的代码如下:

    public class Test {
        public static void main(String[] args) {
            SellTicket sellTick= new SellTicket();
            // 开启5五条线程,模拟5个窗口
            for (int i=1; i<=5; i++) {
                new Thread(sellTick, "窗口" + i).start();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    是不是感觉很简单,因为多线程竞争共享资源的复杂的过程它在底层都帮你实现了,屏蔽了这些复杂的过程,而你也就成为了优秀的API调用者。

    上面就是Redis三种方式实现分布式锁的方式,基于Redis的实现方式基本都会选择Redisson的方式进行实现,因为简单命令,不用自己拧螺丝,开箱即用。

  • 相关阅读:
    超360万台MySQL服务器在“裸奔”
    npm 换源
    Java基础JDBC
    Java基础篇 数组
    LR脚本录制3——Fiddler生成LR脚本(推荐)
    腾讯会议瞩目会议开启共享屏幕时候出现卡顿时候,是显卡问题。
    React 与 TS 结合使用时组件传参总结
    .Net Framework、.Net Core和.Net Standard的区别
    【React Scheduler源码第三篇】React Scheduler原理及手写源码
    【2024最新精简版】Redis面试篇
  • 原文地址:https://blog.csdn.net/m0_52789121/article/details/126566099