• Redis从入门到精通


    1 NoSQL

    1、NoSQL非关系型的数据库

    1.不依赖业务逻辑数据库存储,以简单key-value存储,数据库的扩展能力强
    2.不遵循SQL标准
    3.不支持ACID

    适用于

    • 高并发读写
    • 海量数据读写
    • 数据可扩展

    不适用于

    • 事务存储
    • 复杂数据库

    2、NoSQL优点

    1.缓存数据库,完全在内存中,速度快,数据结构简单

    2.减少io操作,数据库和表拆分,虽然破坏业务逻辑,即外加一个缓存数据库,提高数据库速度,也可以用专门的存储方式,以及针对不同的数据结构存储

    3、其他数据库

    • Memcache:不持久化;类型单一:key-value;作为缓存数据库
    • Redis:数据都在内存中,支持持久化;支持key-value、及多种数据结构;作为缓存数据库
    • MongoDB:文档型数据库;key-value,且value可为json,并提供丰富查询功能; 支持二进制数据及大型对象;特点替代RDBMS ,成为独立的数据库

    2 redis概述与安装

    1、特点

    • Redis是一个开源的key-value存储系统
    • 支持多种数据结构,包括5种基本数据结构和3种复杂数据结构
    • Redis支持各种不同方式的排序
    • 数据都是缓存在内存中,周期性的把更新的数据写入磁盘或者把修改操作写入追加的记录文件
    • 实现了master-slave(主从)同步

    2、应用场景

    • 高速缓存

      • 高频次,热门访问的数据,降低数据库IO
      • 分布式架构,做session共享
    • 持久化数据

    在这里插入图片描述

    3、安装

    1、安装gcc:C语言编译环境

    yum install gcc
    gcc -version 
    
    • 1
    • 2

    2、安装redis

    安装包目录:cd /usr/local/src
    下载:wget https://download.redis.io/releases/redis-6.2.1.tar.gz
    解压:tar -zxvf redis-6.2.1.tar.gz
    进入目录:cd redis-6.2.1
    编译:make
    安装:make install
    
    默认位置:cd /usr/local/bin  安装完成
    redis-benchmark:性能测试工具
    redis-check-aof:修复有问题的AOF文件,rdb和aof后面讲
    redis-check-dump:修复有问题的dump.rdb文件
    redis-sentinel:Redis集群使用
    redis-server:Redis服务器启动命令
    redis-cli:客户端,操作入口
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    3、后台启动

    备份redis.conf:
    	cd /usr/local/src/redis-6.2.1
    	cp redis.conf /etc/redis.conf
    修改配置文件:
        cd /ect
        vi redis.conf
        修改:daemonize yes
        修改密码:requirepass xxxxxx【重要】
    启动
    	cd /usr/local/bin
    	redis-server /etc/redis.conf
    客户端连接
    	redis-cli -a xxxxxx【密码】
    	查看设置的密码:config get requirepass
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    配置文件

    修改redis配置文件,表示支持远程访问
    注释:# bind 127.0.0.1 
    修改protected-mode no
    
    • 1
    • 2
    • 3

    4、关闭

    查看redis的端口号:ps -ef | grep redis
    kill -9 xxx(端口号)
    redis-clli shutdown
    
    • 1
    • 2
    • 3

    5、连接redis可能遇到的问题

    关闭linux防火墙(坑)

    需要打开防火墙,开放对应的端口 6379

    systemctl start firewalld # 开启防火墙
    systemctl status firewalld # 查看防火墙的状态
    firewall-cmd --zone=public --add-port=6379/tcp --permanent  # 开放端口 6379
    firewall-cmd --reload # 重新加载对应的防火墙策略
    
    • 1
    • 2
    • 3
    • 4

    4、相关知识点

    Redis是单线程+多路IO复用技术

    多路复用是指使用一个线程来检查多个文件描述符(Socket)的就绪状态,比如调用select和poll函数

    3 常用数据结构

    1、 key值键位

    key值的操作:

    • keys * 查看当前库所有key

    • set key value 设置key值与value

    • exists key 判断key是否存在

    • type key 查看key是什么类型

    • del key 删除指定的key数据

    • unlink key 根据value选择非阻塞删除

      ------仅将keys从keyspace元数据中删除,真正的删除会在后续异步操作

    • expire key 10 10秒钟:为给定的key设置过期时间

    • ttl key 查看还有多少秒过期,-1表示永不过期,-2表示已过期

    库的选择:

    • select 命令切换数据库
    • dbsize 查看当前数据库的key数量
    • flushdb 清空当前库
    • flushall 通杀全部库

    2、string字符串

    • 一个key对应一个value
    • 二进制安全的,即可包含任何数据
    • value最多可以是512m

    参数设置:

    • set key value 设置key值
    • get key 查询key值
    • append key value 将给定的value追加到原值末尾
    • strlen key 获取值的长度
    • setnx key value 只有在key不存在的时候,设置key值
    • incr key 将key值存储的数字增1,只对数字值操作,如果为空,新增值为1
    • decr key 将key值存储的数字减1,只对数字值操作,如果为空,新增值为1
    • incrby/decrby key <步长> 将key值存储的数字增减如步长

    补充:

    • 原子操作,不会被打断,从开始到结束
    • 单线程不会被打断
    • 多线程很难说,被打断的就不是原子操作

    补充额外的字符串参数:

    • mset key value key value..同时设置一个或者多个key-value
    • mget key key...同时获取一个或多个value
    • msetnx key value key value..同时设置一个或者多个key-value.当且仅当所有给定key都不存在
    • getrange key <起始位置> <结束位置> 获取key的起始位置和结束位置的值
    • setrange key <起始位置> value 将value的值覆盖起始位置开始
    • setex key <> value 设置键值的同时,设置过期时间
    • getset key value 用新值换旧值

    3、list列表

    常用命令:

    • lpush/rpush key value value...从左或者右插入一个或者多个值(头插与尾插)
    • lpop/rpop key 从左或者右吐出一个或者多个值(值在键在,值都没,键都没)
    • rpoplpush key1 key2 从key1列表右边吐出一个值,插入到key2的左边
    • lrange key start stop 按照索引下标获取元素(从左到右)
    • lrange key 0 -1 获取所有值
    • lindex key index 按照索引下标获得元素
    • llen key 获取列表长度
    • linsert key before/after value newvalue 在value的前面插入一个新值
    • lrem key n value 从左边删除n个value值
    • lset key index value 在列表key中的下标index中修改值value

    4、set集合

    字典,哈希表

    自动排重且为无序的

    常用命令:

    • sadd key value value... 将一个或者多个member元素加入集合key中,已经存在的member元素被忽略
    • smembers key 取出该集合的所有值
    • sismember key value 判断该集合key是否含有改值
    • scard key 返回该集合的元素个数
    • srem key value value 删除集合中的某个元素
    • spop key 随机从集合中取出一个元素
    • srandmember key n 随即从该集合中取出n个值,不会从集合中删除
    • smove <一个集合a><一个集合b> value 将一个集合a的某个value移动到另一个集合b
    • sinter key1 key2 返回两个集合的交集元素
    • sunion key1 key2 返回两个集合的并集元素
    • sdiff key1 key2 返回两个集合的差集元素(key1有的,key2没有)

    5、hash哈希
    键值对集合,特别适合用于存储对象类型

    常用命令:

    • hset key field value 给key集合中的filed键赋值value
    • hget key1 field 集合field取出value
    • hmset key1 field1 value1 field2 value2 批量设置hash的值
    • hexists key1 field 查看哈希表key中,给定域field是否存在
    • hkeys key 列出该hash集合的所有field
    • hvals key 列出该hash集合的所有value
    • hincrby key field increment 为哈希表key中的域field的值加上增量1 -1
    • hsetnx key field value 将哈希表key中的域field的值设置为value,当且仅当域field不存在
      例如 hset user:1000 id 1

    6、Zset有序集合

    没有重复元素的字符串集合,按照相关的分数进行排名,排名从低到高,排名可重复

    有序集合 zset 与普通集合 set 非常相似,是一个没有重复元素的字符串集合

    常用命令:

    • zadd key score1 value1 score2 value2 将一个或多个member元素及其score值加入到有序key中
    • zrange key start stop (withscores) 返回有序集key,下标在start与stop之间的元素,带withscores,可以让分数一起和值返回到结果集。
    • zrangebyscore key min max(withscores) 返回有序集key,所有score值介于min和max之间(包括等于min或max)的成员。有序集成员按score的值递增次序排列
    • zrevrangebyscore key max min (withscores)同上,改为从大到小排列
    • zincrby key increment value为元素的score加上增量
    • zrem key value 删除该集合下,指定值的元素
    • zcount key min max 统计该集合,分数区间内的元素个数
    • zrank key value 返回该值在集合中的排名,从0开始

    4 配置文件

    • bind
      • 默认情况bind=127.0.0.1只能接受本机的访问请求
      • 不写:无限制接受任何ip地址的访问
      • 生产环境:需要远程访问,要将其注释掉
      • 如果开启了protected-mode,那么在没有设定bind ip且没有设密码的情况下,Redis只允许接受本机的响应
    • protected-mode:将本机访问保护模式设置no
    • Port:默认6379
    • tcp-backlog
      • backlog是一个连接队列,backlog队列总和=未完成三次握手队列 + 已经完成三次握手队列
      • 在高并发环境下,需要一个高backlog值来避免慢客户端连接问题
    • timeout:一个空闲的客户端维持多少秒会关闭,0表示关闭该功能。即永不关闭
    • tcp-keepalive
      • 对访问客户端的一种心跳检测,每个n秒检测一次。
      • 单位为秒,如果设置为0,则不会进行Keepalive检测,建议设置成60
    • daemonize:是否诶后台进程,设置为yes
    • pidfile:每个实例会产生一个不同的pid文件
    • loglevel:指定日志记录级别,Redis总共支持四个级别:debug、verbose、notice、warning,默认为notice。生产环境选择notice 或者warning
    • logfile:日志文件名称
    • database 16:设定库的数量默认16,默认数据库为0,可以使用SELECT 命令在连接上指定数据库id
    • requirepass:设置密码
    • maxclients:设置redis同时可以与多少个客户端进行连接,默认1000
    • maxmemory
      • 设置redis可以使用的内存量。一旦到达内存使用上限,redis将会试图移除内部数据
      • 建议必须设置,否则将内存占满,造成服务器宕机
    • maxmemory-policy:设置移除规则
      • volatile-lru:使用LRU算法移除key,只对设置了过期时间的键;(最近最少使用)
      • allkeys-lru:在所有集合key中,使用LRU算法移除key
      • volatile-random:在过期集合中移除随机的key,只对设置了过期时间的键
      • allkeys-random:在所有集合key中,移除随机的key
      • volatile-ttl:移除那些TTL值最小的key,即那些最近要过期的key
      • noeviction:不进行移除。针对写操作,只是返回错误信息
    • maxmemory-samples:设置样本数量

    5 Redis的发布与订阅

    发送者 (pub) 发送消息,订阅者 (sub) 接收消息

    Redis 客户端可以订阅任意数量的频道

    1、 打开一个客户端订阅channel1:SUBSCRIBE channel1

    2、打开另一个客户端,给channel1发布消息hello:publish channel1 hello

    3、打开第一个客户端可以看到发送的消息

    6 新数据结构

    1、Bitmaps

    • 合理使用操作位可以有效地提高内存使用率和开发使用率
    • 本身是一个字符串,不是数据类型,数组的每个单元只能存放0和1,数组的下标在Bitmaps叫做偏移量
    • 节省空间,一般存储活跃用户

    命令参数:

    • setbit key offset value设置值

    • getbit key offset 获取某个偏移量的值

    • bitcount 统计数值

    • bitcount key (start end)统计字符串从start 到end字节比特值为1的数量

      redis的setbit设置或清除的是bit位置,而bitcount计算的是byte的位

    • bitop and(or/not/xor)destkey key复合操作,交并非异或,结果保存在destkey

    2、HyperLogLog

    • 统计网页中页面访问量
    • 只会根据输入元素来计算基数,而不会储存输入元素本身,不能像集合那样,返回输入的各个元素
    • 基数估计是在误差可接受的范围内,快速计算(不重复元素的结算)

    命令参数:

    • pfadd key element添加指定的元素到hyperloglog中,成功则返回1,不成功返回0
    • pfcount key 计算key的近似基数,即添加了多少个不重复元素
    • pfmerge destkey sourcekey sourcekey 一个或多个key合并后的结果存在另一个key

    3、Geographic

    提供经纬度设置,查询范围,距离查询

    命令参数:

    • geoadd key longitude latitude member添加地理位置(经度纬度名称)
      当坐标超出指定的范围,命令会返回一个错误,已经添加的数据,无法再添加
    • geopos key member 获取指定地区的坐标值
    • geodist key member1 member2 (m km ft mi)获取两个位置之间的直线距离
    • georadius key longitude latitude radius (m km ft mi)以给定的经纬度为中心,找出某一半径的内元素

    7 Jedis

    配置

    <dependencies>
        <dependency>
            <groupId>redis.clientsgroupId>
            <artifactId>jedisartifactId>
            <version>3.6.1version>
        dependency>
    dependencies>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    测试常用操作命令

    @Test
        public void fun() {
            Jedis jedis = new Jedis("192.168.55.129", 6379);
            jedis.auth("xxxxxx"); // 设置密码
            
            jedis.set("k1", "v1");
            String k1 = jedis.get("k1");
            System.out.println(k1);
            
            Set<String> keys = jedis.keys("*");
            for (String key : keys) {
                System.out.println(key);
            }
        }
    jedis.close();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    注意:连接失败检查

    关闭linux防火墙:systemctl stop firewalld.service

    redis.conf中注释掉bind 127.0.0.1 ,然后 protected-mode no

    案例:手机验证码功能

    1、输入手机号,点击发送后随机生成6位数字码,2分钟有效

    ​ Random随机生成6位数;设置过期时间120s

    2、输入验证码,点击验证,返回成功或失败

    ​ 比较redis中的验证码和输入的验证码

    3、每个手机号每天只能输入3次

    ​ incr每次发送之后+1,大于2时提示不能发送

    package com.mys.jedis;
    
    import redis.clients.jedis.Jedis;
    
    import java.util.Random;
    
    /**
     * @author mys
     * @date 2022/11/13 16:18
     */
    public class PhoneCode {
        public static void main(String[] args) {
            // 模拟验证码发送
            verifyCode("13811111111");
    //        getRedisCode("13811111111", "614493");
        }
    
        // 1. 生成6位随机数
        public static String getCode() {
            Random random = new Random();
            String code = "";
            for (int i = 0; i < 6; i ++) {
                int rand = random.nextInt(10);
                code += rand;
            }
            return code;
        }
    
        // 2.每个手机每天只能发送三次,验证码放到redis中并设置过期时间
        public static void verifyCode(String phone) {
            // 连接redis
            Jedis jedis = new Jedis("192.168.55.129", 6379);
            jedis.auth("xiaoqian");
    
            // 手机发送次数key
            String countKey = "VerifyCode" + phone + ":count";
            // 验证码key
            String codeKey = "VerifyCode" + phone + ":code";
    
            // 每个手机每天只能发送3次
            String count = jedis.get(countKey);
            if (count == null) {
                jedis.setex(codeKey, 24*60*60, "1");
            } else if(Integer.parseInt(count) <= 2) {
                jedis.incr(countKey);
            } else {
                System.out.println("发送此处已经超过3次了");
                jedis.close();
                return;
            }
    
            // 发送验证码到redis
            String vcode = getCode();
            jedis.setex(codeKey, 120, vcode);
            jedis.close();
        }
    
        // 3.验证码校验
        public static void getRedisCode(String phone, String code) {
            // 连接redis
            Jedis jedis = new Jedis("192.168.55.129", 6379);
            jedis.auth("xiaoqian");
            // 验证码key
            String codeKey = "VerifyCode" + phone + ":code";
            String redisCode = jedis.get(codeKey);
            if (redisCode.equals(code)) {
                System.out.println("success");
            } else {
                System.out.println("fail");
            }
            jedis.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73

    8 SpringBoot整合Redis

    1、整合依赖文件
    springboot和连接池

    
    <dependency>
    <groupId>org.springframework.bootgroupId>
    <artifactId>spring-boot-starter-data-redisartifactId>
    dependency>
    
    
    <dependency>
    <groupId>org.apache.commonsgroupId>
    <artifactId>commons-pool2artifactId>
    <version>2.6.0version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    2、application.properties配置redis配置

    #Redis服务器地址
    spring.redis.host=192.168.55.129
    spring.redis.password=xiaoqian
    #Redis服务器连接端口
    spring.redis.port=6379
    #Redis数据库索引(默认为0)
    spring.redis.database= 0
    #连接超时时间(毫秒)
    spring.redis.timeout=1800000
    #连接池最大连接数(使用负值表示没有限制)
    spring.redis.lettuce.pool.max-active=20
    #最大阻塞等待时间(负数表示没限制)
    spring.redis.lettuce.pool.max-wait=-1
    #连接池中的最大空闲连接
    spring.redis.lettuce.pool.max-idle=5
    #连接池中的最小空闲连接
    spring.redis.lettuce.pool.min-idle=0
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    3、添加redis配置类

    @EnableCaching // 开启缓存
    @Configuration
    public class RedisConfig extends CachingConfigurerSupport {
    
        @Bean
        public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
            RedisTemplate<String, Object> template = new RedisTemplate<>();
            RedisSerializer<String> redisSerializer = new StringRedisSerializer();
            Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
            ObjectMapper om = new ObjectMapper();
    //        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
            om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
            jackson2JsonRedisSerializer.setObjectMapper(om);
            template.setConnectionFactory(factory);
    //key序列化方式
            template.setKeySerializer(redisSerializer);
    //value序列化
            template.setValueSerializer(jackson2JsonRedisSerializer);
    //value hashmap序列化
            template.setHashValueSerializer(jackson2JsonRedisSerializer);
            return template;
        }
    
        @Bean
        public CacheManager cacheManager(RedisConnectionFactory factory) {
            RedisSerializer<String> redisSerializer = new StringRedisSerializer();
            Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
    //解决查询缓存转换异常的问题
            ObjectMapper om = new ObjectMapper();
    //        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
            om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
            jackson2JsonRedisSerializer.setObjectMapper(om);
    // 配置序列化(解决乱码的问题),过期时间600秒
            RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
                    .entryTtl(Duration.ofSeconds(600))
                    .serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(redisSerializer))
                    .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(jackson2JsonRedisSerializer))
                    .disableCachingNullValues();
            RedisCacheManager cacheManager = RedisCacheManager.builder(factory)
                    .cacheDefaults(config)
                    .build();
            return cacheManager;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    4、测试类

    @RestController
    @RequestMapping("/redisTest")
    public class RedisTestController {
    
        @Autowired
        private RedisTemplate redisTemplate;
    
        @GetMapping
        public String testRedis() {
            redisTemplate.opsForValue().set("name", "mys");
            String name = (String) redisTemplate.opsForValue().get("name");
            return name;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    9 Redis事务-锁机制-秒杀

    1、事务

    作用:串联多个命令防止别的命令插队

    命令:

    • muti-组队阶段
    • exec-执行阶段
    • discard-回滚阶段

    组队中某个命令出现了错误,执行时整个的所有队列都会被取消

    执行中某个命令出现了错误,则只有报错的命令不会被执行,而其他的命令都会执行,不会回滚

    悲观锁:每次操作都上锁

    乐观锁:即给数据加上版本号,同步更新数据以及加上版本号

    • watch:在执行multi之前,先执行watch key1 [key2],可以监视key ,如果在事务执行之前,key 被其他命令所改动,那么事务将被打断

    • unwatch:取消 WATCH 命令对所有 key 的监视

    事务的三特性

    • 单独的隔离操作(不会被打断)
    • 没有隔离级别
    • 不保证原子性

    对比一下sql的事务特性,为acid,一致性,隔离性,原子性,持久性

    而redis的事务特性没有隔离,因为都是单独的隔离而不会被打断,而且也不保证原子性,错了等待不到消息不会进行回滚

    2、事务的秒杀案例

    在redis存入商品数,设定秒杀时间,提供用户秒杀窗口,用户秒杀成功,redis中商品数-1,用户信息也存入redis中(为了相同用户只能秒杀一次)

    基本实现

    public class SecKill {
        public static boolean doSecKill(String uid, String prodid) throws Exception{
            // 1.uid和prodid非空判断
            if (uid == null || prodid == null) {
                return false;
            }
            // 2.连接redis
            Jedis jedis = new Jedis("192.168.55.129", 6379);
            jedis.auth("xiaoqian");
            // 3.拼接key
            // 3.1 库存key
            String kcKey = "sk:" + prodid + ":qt";
            // 3.2 秒杀成功用户key
            String userKey = "sk:" + prodid + ":user";
            // 4.获取库存,如果库存null,秒杀还没开始
            String kc = jedis.get(kcKey);
            if (kc == null) {
                System.out.println("秒杀还没开始,请等待");
                jedis.close();
                return false;
            }
            // 5.判断用户是否重复秒杀操作
            if (jedis.sismember(userKey, uid)) {
                System.out.println("已经秒杀成功,不能重复秒杀");
            }
            // 6.判断商品数量,如果库存小于1,秒杀结束
            if (Integer.parseInt(kc) <= 0) {
                System.out.println("秒杀已经结束");
                jedis.close();
                return false;
            }
            // 7.秒杀过程
            // 7.1 库存-1
            jedis.decr(kcKey);
            // 7.2 把秒杀成功用户添加到清单里面
            jedis.sadd(userKey, uid);
            System.out.println("秒杀成功了");
            jedis.close();
    
            return true;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    高并发测试

    下载工具:yum install httpd-tools
    
    -n:请求次数
    -c:并发次数
    -T:设计的类型,可以是post,get
    -p:提交的参数
    
    ab -n 2000 -c 200 -k -p ~/postfile -T application/x-www-form-urlencoded http://172.22.xxx:8081/Seckill/doseckill
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    问题:超卖;连接超时

    1、通过连接池,解决超时问题

    // 通过连接池得到jedis对象
    JedisPool jedisPoolInstance = JedisPoolUtil.getJedisPoolInstance();
    Jedis jedis = jedisPoolInstance.getResource();
    
    • 1
    • 2
    • 3

    2、通过乐观锁,解决超卖问题

    // (1)监视库存
    jedis.watch(kcKey);
    // 4.获取库存,如果库存null,秒杀还没开始
    
    // (2)使用事务
            Transaction multi = jedis.multi();
            // 组队
            multi.decr(kcKey);
            multi.sadd(userKey, uid);
            // 执行
            List<Object> res = multi.exec();
            if (res == null || res.size() == 0) {
                System.out.println("秒杀失败");
                jedis.close();
                return false;
            }
    // 7.秒杀过程
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    3、库存遗留问题

    乐观锁造成库存遗留问题

    解决:引入lua脚本

    通过lua脚本解决争抢问题,实际上是redis 利用其单线程的特性,用任务队列的方式解决多任务并发问题

    10 持久化

    1、RDB

    在指定的时间间隔内将内存中的数据集快照写入磁盘

    备份流程:

    • 单独创建(fork)一个子进程来进行持久化
    • 持久化过程结束后,用临时文件替换上次持久化好的文件

    Fork:复制一个与当前进程一样的进程,并作为原进程的子进程,一般情况父进程和子进程会共用同一段物理内存

    数据如果有变化的,会在/usr/local/bin目录下生成一个dump.rdb的文件

    配置文件:

    • save :只管保存
    • bgsave:在后台异步进行快照操作, 同时还可以响应客户端请求
    • lastsave :获取最后一次成功执行快照的时间
    • flushall:产生dump.rdb文件,但里面是空的
    • stop-writes-on-bgsave-error:当Redis无法写入磁盘的话,直接关掉Redis的写操作
    • rdbcompression:压缩文件,LZF算法压缩
    • rdbchecksum:检查完整性,在存储快照后,使用CRC64算法来进行数据校验

    备份与恢复

    • 备份:拷贝一份*.rdb文件
    • 恢复:关闭redis,将拷贝的*.rdb文件复制到工作目录,启动redis,备份数据会直接加载

    优点:

    • 适合大规模的数据恢复
    • 对数据完整性和一致性要求不高更适合使用
    • 节省磁盘空间
    • 恢复速度快

    缺点:

    • Fork的时内存中的数据被克隆了一份
    • 在fork时使用了写时拷贝技术,如果数据庞大时还是比较消耗性能
    • 在备份周期在一定间隔时间做一次备份,所以如果Redis意外down掉的话,就会丢失最后一次快照后的所有修改

    停止RDB:redis-cli config set save ""

    2、AOF

    日志的形式来记录每个写操作,只许追加文件但不可以改写文件

    持久化流程:

    • 客户端的请求写命令会被append追加到AOF缓冲区内
    • AOF缓冲区根据AOF持久化策略[always,everysec,no]将操作sync同步到磁盘的AOF文件中;
    • AOF文件大小超过重写策略或手动重写时,会对AOF文件rewrite重写,压缩AOF文件容量;
    • Redis服务重启时,会重新load加载AOF文件中的写操作达到数据恢复的目的;

    特点:

    • AOF默认不开启

    • AOF和RDB同时开启,默认读取AOF数据

    • 备份恢复流程和RDB一样,都是拷贝文件,恢复时再使用

    • 正常恢复:修改默认的appendonly no,改为yes;拷贝aof文件,重启redis

    • 异常恢复:如果遇到AOF文件损坏,通过/usr/local/bin/redis-check-aof--fix appendonly.aof进行恢复

    • 同步频率:

      • appendfsync always:始终同步,立刻写入日志
      • appendfsync everysec:每秒记入日志一次,如果宕机,本秒的数据可能丢失
      • appendfsync no:不主动进行同步,把同步时机交给操作系统

    11 主从复制

    master/slaver机制,Master以写为主,Slave以读为主

    搭建一主两从流程:

    • 创建/myredis文件夹

    • 复制redis.conf配置文件

    • 一主两从,创建三个配置文件

      • redis6379.conf
      • redis6380.conf
      • redis6381.conf
      include /myredis/redis.conf
      pidfile /var/run/redis_6379.pid
      port 6379
      dbfilename dump6379.rdb
      
      • 1
      • 2
      • 3
      • 4
    • 启动三个服务

      redis-server redis6379.conf
      redis-server redis6380.conf
      redis-server redis6381.conf
      
      redis-cli -p 6379 -a xxx
      redis-cli -p 6380 -a xxx
      redis-cli -p 6381-a xxx
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
    • 查看当前主机的运行状态:info replication

    • 在从机上执行slaveof 主机ip 端口号

    命令:

    info replication查看信息

    slaveof IP 端口9可以设置从服务器

    redis-server redis.conf启动服务器端

    redis-cli -p 端口 启动客户端

    ps -ef | grep redis:查看redis进程

    特点:

    • 从服务器挂掉后,重启变成了主服务器
    • 重新设置为从服务器,能看到挂掉那段时间主服务器添加的数据
    • 主服务器挂掉后,从服务器还是从服务器,也知道主服务器挂掉了
    • 主服务器重新启动后,还是主服务器,数据不丢失

    原理:

    • 当从服务器连接上主服务器之后,从服务器向主服务器发送进行数据同步的消息。
    • 主服务器接到从服务器发送过来的同步消息,把主服务器数据进行持久化到rdb文件,把rdb文件发送给从服务器,从服务器拿到rdb进行读取。
    • 每次主服务器进行写操作之后,和从服务器进行数据同步。(主服务器主动做的)

    薪火相传

    主——> 从1——>从2

    反客为主

    slaveof no one从机变成主机

    非自动

    哨兵模式

    反客为主的自动版

    主要是为了监控主机宕机之后,从机可以立马变为主机,就和上面的反客为主一样,但不用手动设置。能够后台监控主机是否故障,如果故障了根据投票数自动将从库转换为主库。

    流程:

    • 创建文件: /myredis/sentinel.conf
    • 写入:sentinel monitor mymaster 127.0.0.1 6379 1
    • 启动哨兵:redis-sentinel /myredis/sentinel.conf

    选择从服务器优先级:

    • slave-priority 100,值越小优先级越高

    • 选择偏移量最大的

    • 选择runid最小的从服务

    12 集群

    1、问题:

    • 容量不够,redis如何进行扩容?
    • 并发写操作,redis如何进行分摊?

    2、方法:

    • 代理主机

    • 无中心化集群(推荐)

      任何一台服务器都可以作为集群入口,并且之间互相连通

    在这里插入图片描述

    3、集群:

    • redis集群实现了对Redis的水平扩容,即启动N个redis节点,将整个数据库分布存储在这N个节点中,每个节点存储总数据的1/N
    • redis集群通过分区(partition)来提供一定程度的可用性(availability):即使集群中有一部分节点失效或者无法进行通讯,集群也可以继续处理命令请求

    4、搭建集群流程:

    • 删除rdb aof文件

    • 创建6个实例 6379 6380 6381 6389 6390 6391

    • 分别修改配置文件

      include /myredis/redis.conf
      pidfile /var/run/redis_6379.pid
      port 6379
      dbfilename dump6379.rdb
      cluster-enabled yes
      cluster-config-file nodes-6379.conf
      cluster-node-timeout 15000
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
    • 启动6个服务

    • 将6个节点合成一个集群

      需要进入到/usr/local/src/redis-6.2.1/src目录下执行

      redis-cli --cluster create --cluster-replicas 1 192.168.55.129:6379 192.168.55.129:6380 192.168.55.129:6381 192.168.55.129:6389 192.168.55.129:6390 192.168.55.129:6391 -a xxx
      
      • 1
    • redis-cli -c -p 6379用集群的方式连接

    • cluster nodes查看集群信息

    • 节点分配原则:尽量保证每个主数据库运行在不同的IP地址,每个从库和主库不在一个IP地址上

    5、slots:插槽

    • 一个 Redis 集群包含 16384 个插槽(hash slot), 数据库中的每个键都属于这 16384 个插槽的其中一个

    • 集群使用公式 CRC16(key) % 16384 来计算键 key 属于哪个槽

    • 不能使用mget,mset等多键操作,通过{}来定义组的概念

      mset k1 v1 k2 v2 =>×

      mset k1{user} v1 k2{user} v2 => √

    • 查询集群中的值 cluster keyslot k1

      cluster countkeysinslot 4478

      cluster get keysinslot 4478 10

    6、故障恢复

    • 主机挂掉,从机升为master,主机恢复后,主机变为从机
    • 主机、从机都挂掉
      • cluster-require-full-coverage 为yes ,整个集群都挂掉
      • cluster-require-full-coverage 为no ,该插槽数据全都不能使用,也无法存储

    7、优点:

    • 实现扩容
    • 分摊压力
    • 无中心配置相对简单

    8、缺点:

    • 多键操作是不被支持的
    • 多键的Redis事务是不被支持的,lua脚本不被支持

    13 应用问题解决

    1、缓存穿透

    特点:

    • 应用服务器压力变大
    • redis命中率降低
    • 一直查询数据库

    原因:redis查不到数据,并且出现很多非正常url访问

    解决方案:

    • 对空值缓存,设置过期时间变短
    • 设置可访问的名单,bitmaps定义一个可访问名单
    • 采用布隆过滤器
    • 进行实时监控

    2、缓存击穿

    特点:

    • 数据库访问压力shunjianzengjia
    • redis里面没有出现大量key过期
    • redis正常运行

    原因:redis中某个key过期,大量访问使用这个key

    解决方案:

    • 预先设置热门数据,加大key的过期时长
    • 实时调整
    • 使用锁

    3、缓存雪崩

    特点:数据库压力变大,服务器崩溃

    原因:在极少时间段,查询大量key集中过期

    解决方案:

    • 构建多级缓存架构
    • 使用锁或队列
    • 设置过期标志,更新缓存
    • 将缓存失效时间分散开

    4、分布式锁

    实现方案:

    • 基于数据库实现分布式锁
    • 基于缓存Redis
    • 基于Zookeeper

    使用Redis实现分布式锁:

    • setnx 设置锁,del 释放锁

    • exprie设置key的过期时间,自动释放

    • 上锁之后,突然出现异常,无法设置过期时间

      set users 10 nx ex 12 既上锁又设置过期时间UUID防误删

    @GetMapping("testLock")
    public void testLock(){
        //1获取锁,setne ,顺便设置过期时间
        Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "111",3,TimeUnit.SECONDS);
        //2获取锁成功、查询num的值
        if(lock){
            Object value = redisTemplate.opsForValue().get("num");
            //2.1判断num为空return
            if(StringUtils.isEmpty(value)){
                return;
            }
            //2.2有值就转成成int
            int num = Integer.parseInt(value+"");
            //2.3把redis的num加1
            redisTemplate.opsForValue().set("num", ++num);
            //2.4释放锁,del
            redisTemplate.delete("lock");
    
        }else{
            //3获取锁失败、每隔0.1秒再获取
            try {
                Thread.sleep(100);
                testLock();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    UUID防误删:

    多一个判断是否是你的锁,虽然是共享锁,都是一样的,但是可以上锁之后在设置时间,还要给每个用户的这把锁都来一个uuid

    @GetMapping("testLock")
    public void testLock(){
    	String uuid = UUID.randomUUID().toString();
        //1获取锁,setne ,顺便设置过期时间
        Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid,3,TimeUnit.SECONDS);
        //2获取锁成功、查询num的值
        if(lock){
           ...
            String lockUuid = (String)redisTemplate.opsForValue().get("lock");
            if(uuid.equals(lockUuid)){
                 //2.4释放锁,del
            	redisTemplate.delete("lock");
            }
        }else{
           ...
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    lua脚本保证原子性

    在判断它的uuid相等之后,正准备解锁,发现又误解他人锁,所以引入lua脚本保证它的原子性
    在这里插入图片描述

  • 相关阅读:
    基于图像识别的迁移学习之二
    算法分析与设计CH6:堆排序详解(文末附优先队列完整代码)
    飞机大战Java版完整版
    力扣刷题记录120.1-----718. 最长重复子数组
    从零开始Blazor Server(4)--登录系统
    毕业搬砖后,依然躲不过考试,分享30道过华为机考的LeetCode高效刷题经验
    Github上前十大开源Rust项目
    HttpContext.TraceIdentifier那严谨的设计
    员工如何通过自助方式重置AD密码
    编译buildroot出错,这个怎么解决呢,感谢
  • 原文地址:https://blog.csdn.net/mys_mys/article/details/127849040