技术的分类:
Web1.0的时代,数据访问量很有限,用一夫当关的高性能的单点服务器可以解决大部分问题。

随着Web2.0的时代的到来,用户访问量大幅度提升,同时产生了大量的用户数据。加上后来的智能移动设备的普及,所有的互联网平台都面临了巨大的性能挑战。



NoSQL(NoSQL = Not Only SQL ),意即“不仅仅是SQL”,泛指非关系型的数据库。NoSQL 不依赖业务逻辑方式存储,而以简单的key-value模式存储。因此大大的增加了数据库的扩展能力。
a. 不遵循SQL标准。
b. 不支持ACID。
c. 远超于SQL的性能。






HBase是Hadoop项目中的数据库。它用于需要对大量的数据进行随机、实时的读写操作的场景中。
HBase的目标就是处理数据量非常庞大的表,可以用普通的计算机处理超过10亿行数据,还可处理有数百万列元素的数据表。

Apache Cassandra是一款免费的开源NoSQL数据库,其设计目的在于管理由大量商用服务器构建起来的庞大集群上的海量数据集(数据量通常达到PB级别)。在众多显著特性当中,Cassandra最为卓越的长处是对写入及读取操作进行规模调整,而且其不强调主集群的设计思路能够以相对直观的方式简化各集群的创建与扩展流程。
计算机存储单位 计算机存储单位一般用B,KB,MB,GB,TB,EB,ZB,YB,BB来表示,它们之间的关系是:
位 bit (比特)(Binary Digits):存放一位二进制数,即 0 或 1,最小的存储单位。
字节 byte:8个二进制位为一个字节(B),最常用的单位。
1KB (Kilobyte 千字节)=1024B,
1MB (Megabyte 兆字节 简称“兆”)=1024KB,
1GB (Gigabyte 吉字节 又称“千兆”)=1024MB,
1TB (Trillionbyte 万亿字节 太字节)=1024GB,其中1024=2^10 ( 2 的10次方),
1PB(Petabyte 千万亿字节 拍字节)=1024TB,
1EB(Exabyte 百亿亿字节 艾字节)=1024PB,
1ZB (Zettabyte 十万亿亿字节 泽字节)= 1024 EB,
1YB (Jottabyte 一亿亿亿字节 尧字节)= 1024 ZB,
1BB (Brontobyte 一千亿亿亿字节)= 1024 YB.
注:“兆”为百万级数量单位。

主要应用:社会关系,公共交通网络,地图及网络拓谱(n*(n-1)/2)

https://db-engines.com/en/ranking



| Redis官方网站 | Redis中文官方网站 |
|---|---|
| http://redis.io | http://redis.cn/ |

安装C 语言的编译环境:
yum install centos-release-scl scl-utils-build
yum install -y devtoolset-8-toolchain
scl enable devtoolset-8 bash
测试 gcc版本 :
gcc --version


cd /opt/
tar -zxvf redis-6.2.1.tar.gz

cd redis-6.2.1
ll

make

注:如果没有准备好C语言编译环境,make 会报错—Jemalloc/jemalloc.h:没有那个文件

解决方案:
make distclean
#在redis-6.2.1目录下再次执行make命令(只是编译好)
make
make install

查看默认安装目录:

前台启动,命令行窗口不能关闭,否则服务器停止。
在/usr/local/bin下执行redis-server
redis-server
新打开一个连接执行ps -ef | grep redis
ps -ef | grep redis
退出前台执行的命令窗口,再执行ps -ef | grep redis

拷贝一份redis.conf到其他目录;
mkdir /myRedis
cp /opt/redis-6.2.1/redis.conf /myRedis
修改redis.conf 文件将里面的daemonize no 改成 yes,让服务在后台启动。

redis-server /myRedis/redis.conf
redis-cli

ping

单实例关闭:redis-cli shutdown
redis-cli shutdown
也可以进入终端后再关闭:
shutdown

多实例关闭,指定端口关闭:redis-cli -p 6379 shutdown
redis-cli -p 6379 shutdown
Redis是单线程+多路IO复用技术
多路复用是指使用一个线程来检查多个文件描述符(Socket)的就绪状态,比如调用select和poll函数,传入多个文件描述符,如果有一个文件描述符就绪,则返回,否则阻塞直到超时。得到就绪状态后进行真正的操作可以在同一个线程里执行,也可以启动线程执行(比如使用线程池)。
串行 vs 多线程+锁(memcached) vs 单线程+多路IO复用(Redis);
(与Memcache三点不同: 支持多数据类型,支持持久化,单线程+多路IO复用) 。

哪里去获得redis常见数据类型操作命令:http://www.redis.cn/commands.html
| 命令 | 解释 |
|---|---|
| keys * | 查看当前库所有key (匹配:keys *1) |
| exists key | 判断某个key是否存在 |
| type key | 查看你的key是什么类型 |
| del key | 删除指定的key数据 |
| unlink key | 根据value选择非阻塞删除(仅将keys从keyspace元数据中删除,真正的删除会在后续异步操作。) |
| expire key 10 | 10秒钟:为给定的key设置过期时间 |
| ttl key | 查看还有多少秒过期,-1表示永不过期,-2表示已过期 |
| select | 命令切换数据库 |
| dbsize | 查看当前数据库的key的数量 |
| flushdb | 清空当前库 |
| flushall | 通杀全部库 |
String是Redis最基本的类型,你可以理解成与Memcached一模一样的类型,一个key对应一个value。
String类型是二进制安全的。意味着Redis的string可以包含任何数据。比如jpg图片或者序列化的对象。
String类型是Redis最基本的数据类型,一个Redis中字符串value最多可以是512M

set k1 v1
set k2 v2

get k1
get k2

append k1 34
get k1

strlen k1

setnx k2 v3
set k3 v3
keys *

incr k1
set k4 4
incr k4
get k4

decr k4
get k4

incrby k4 10


所谓原子操作是指不会被线程调度机制打断的操作;
这种操作一旦开始,就一直运行到结束,中间不会有任何 context switch (切换到另一个线程)。
(1)在单线程中, 能够在单条指令中完成的操作都可以认为是"原子操作",因为中断只能发生于指令之间。
(2)在多线程中,不能被其它进程(线程)打断的操作就叫原子操作。
Redis单命令的原子性主要得益于Redis的单线程。
同时设置一个或多个 key-value对。
mset k5 v5 k6 v6
keys *

同时获取一个或多个 value 。
mget k1 k2 k3

同时设置一个或多个 key-value 对,当且仅当所有给定 key 都不存在。
msetnx k1 v1 k7 v7
k7 v7 k8 v8
keys *

获得值的范围,类似java中的substring,前包,后包。
getrange k1 0 -1
getrange k1 0 0

用 < value> 覆写< key>所储存的字符串值,从<起始位置>开始(索引从0开始)。
setrange k1 0 a
get k1

设置键值的同时,设置过期时间,单位秒。
setex k9 10 v9
#十秒后
get k9

以新换旧,设置了新值同时获得旧值。
getset k1 v11

String的数据结构为简单动态字符串(Simple Dynamic String,缩写SDS)。是可以修改的字符串,内部结构实现上类似于Java的ArrayList,采用预分配冗余空间的方式来减少内存的频繁分配。

如图中所示,内部为当前字符串实际分配的空间capacity一般要高于实际字符串长度len。当字符串长度小于1M时,扩容都是加倍现有的空间,如果超过1M,扩容时一次只会多扩1M的空间。需要注意的是字符串最大长度为512M。
单键多值:
Redis 列表是简单的字符串列表,按照插入顺序排序。你可以添加一个元素到列表的头部(左边)或者尾部(右边)。
它的底层实际是个双向链表,对两端的操作性能很高,通过索引下标的操作中间的节点性能会较差。

从左边/右边插入一个或多个值。
lpush k1 v1 v11 v111
rpush k2 v2 v22 v222
keys *

从左边/右边吐出一个值。值在键在,值光键亡。
lpop k1
lpop k1
lpop k1
lpop k1

从< key1>列表右边吐出一个值,插到< key2>列表左边。
lpush k1 v1 v11 v111
rpoplpush k1 k2
lpop k2

按照索引下标获得元素(从左到右)
左边第一个,-1右边第一个,(0-1表示获取所有)
lrange k1 0 -1

按照索引下标获得元素(从左到右)
lindex k1 0

获得列表长度
llen k1

在< value>的后面插入< newvalue>插入值
linsert k1 before v1 v1111
lrange k1 0 -1

从左边删除n个value(从左到右)
lrange k1 0 -1
lrem k1 1 v11
lrange k1 0 -1

将列表key下标为index的值替换成value
lset k1 0 v11111
lrange k1 0 -1


Redis set对外提供的功能与list类似是一个列表的功能,特殊之处在于set是可以自动排重的,当你需要存储一个列表数据,又不希望出现重复数据时,set是一个很好的选择,并且set提供了判断某个成员是否在一个set集合内的重要接口,这个也是list所不能提供的。
Redis的Set是string类型的无序集合。它底层其实是一个value为null的hash表,所以添加,删除,查找的复杂度都是O(1)。
一个算法,随着数据的增加,执行时间的长短,如果是O(1),数据增加,查找数据的时间不变。
将一个或多个 member 元素加入到集合 key 中,已经存在的 member 元素将被忽略
sadd k1 v1 v11 v111
sadd k2 v2 v22 v222

取出该集合的所有值。
smembers k1
smembers k2

判断集合< key>是否为含有该值,有1,没有0
sismember k1 v11
sismember k1 v22

返回该集合的元素个数。
scard k1

删除集合中的某个元素。
srem k1 v1
smembers k1

随机从该集合中吐出一个值。
spop k1
spop k1

随机从该集合中取出n个值。不会从集合中删除 。
srandmember k2 2
srandmember k2 2

把集合中一个值从一个集合移动到另一个集合
smembers k1
smembers k2
smove k1 k1 v1
smembers k1
smembers k2

返回两个集合的交集元素。
sadd k1 v2 v22
sinter k1 k2

返回两个集合的并集元素。
sunion k1 k2

返回两个集合的差集元素(key1中的,不包含key2中的)。
sdiff k1 k2




给< key>集合中的 < field>键赋值< value>
hset k1 name jack age 12
hset k2 name lucy age 16

从集合取出 value
hget k1 name
hget k1 age

批量设置hash的值
hmset k4 name gorry age 18
查看哈希表 key 中,给定域 field 是否存在。
hexists k1 name

列出该hash集合的所有field
hkeys k1

列出该hash集合的所有value
hvals k1

为哈希表 key 中的域 field 的值加上增量 increment

将哈希表 key 中的域 field 的值设置为 value ,当且仅当域 field 不存在 .
hsetnx k1 addr beiJing
hget k1 addr

Hash类型对应的数据结构是两种:ziplist(压缩列表),hashtable(哈希表)。当field-value长度较短且个数较少时,使用ziplist,否则使用hashtable。
将一个或多个 member 元素及其 score 值加入到有序集 key 当中。
zadd topn 200 java 300 c++ 500 python 50 javaScript

返回有序集 key 中,下标在
带WITHSCORES,可以让分数一起和值返回到结果集。
zrange topn 0 -1
zrange topn 0 -1 WITHSCORES


返回有序集 key 中,所有 score 值介于 min 和 max 之间(包括等于 min 或 max )的成员。有序集成员按 score 值递增(从小到大)次序排列。
zrangebyscore topn 200 500
zrangebyscore topn 200 500 withscores


返回有序集 key 中,所有 score 值介于 min 和 max 之间(包括等于 min 或 max )的成员。有序集成员按 score 值递增(从大到小)次序排列。
zrevrangebyscore topn 500 200
zrevrangebyscore topn 500 200 withscores


zincrby topn c# 255
zrange topn 0 -1


删除该集合下,指定值的元素
zrange topn 0 -1 withscores
zrem topn java
zrange topn 0 -1


返回该值在集合中的排名,从0开始
zrank topn python

SortedSet(zset)是Redis提供的一个非常特别的数据结构,一方面它等价于Java的数据结构Map
zset底层使用了两个数据结构:
(1)hash,hash的作用就是关联元素value和权重score,保障元素value的唯一性,可以通过元素value找到相应的score值。
(2)跳跃表,跳跃表的目的在于给元素value排序,根据score的范围获取元素列表。
有序集合在生活中比较常见,例如根据成绩对学生排名,根据得分对玩家排名等。对于有序集合的底层实现,可以用数组、平衡树、链表等。数组不便元素的插入、删除;平衡树或红黑树虽然效率高但结构复杂;链表查询需要遍历所有效率低。Redis采用的是跳跃表。跳跃表效率堪比红黑树,实现远比红黑树简单。
对比有序链表和跳跃表,从链表中查询出51
(1) 有序链表
要查找值为51的元素,需要从第一个元素开始依次查找、比较才能找到。共需要6次比较。
(2) 跳跃表
从第2层开始,1节点比51节点小,向后比较。
21节点比51节点小,继续向后比较,后面就是NULL了,所以从21节点向下到第1层
在第1层,41节点比51节点小,继续向后,61节点比51节点大,所以从41向下
在第0层,51节点为要查找的节点,节点被找到,共查找4次。
从此可以看出跳跃表比有序链表效率要高。
自定义目录:/myredis/redis.conf
配置大小单位,开头定义了一些基本的度量单位,只支持bytes,不支持bit;
大小写不敏感;

类似jsp中的include,多实例的情况可以把公用的配置文件提取出来

默认情况bind=127.0.0.1只能接受本机的访问请求;
不写的情况下,无限制接受任何ip地址的访问;
生产环境肯定要写你应用服务器的地址;服务器是需要远程访问的,所以需要将其注释掉;
注:如果开启了protected-mode,那么在没有设定bind ip且没有设密码的情况下,Redis只允许接受本机的响应;

保存配置,停止服务,重启启动查看进程,不再是本机访问了。

将本机访问保护模式设置no

端口号,默认 6379

设置tcp的backlog,backlog其实是一个连接队列,backlog队列总和=未完成三次握手队列 + 已经完成三次握手队列。
在高并发环境下你需要一个高backlog值来避免慢客户端连接问题。
注意Linux内核会将这个值减小到/proc/sys/net/core/somaxconn的值(128),所以需要确认增大/proc/sys/net/core/somaxconn和/proc/sys/net/ipv4/tcp_max_syn_backlog(128)两个值来达到想要的效果;

一个空闲的客户端维持多少秒会关闭,0表示关闭该功能。即永不关闭。

对访问客户端的一种心跳检测,每个n秒检测一次。
单位为秒,如果设置为0,则不会进行Keepalive检测,建议设置成60 ;

是否为后台进程,设置为yes;
守护进程,后台启动;

存放pid文件的位置,每个实例会产生一个不同的pid文件;

指定日志记录级别,Redis总共支持四个级别:debug、verbose、notice、warning,默认为notice;
四个级别根据使用阶段来选择,生产环境选择notice 或者warning;

日志文件名称

设定库的数量 默认16,默认数据库为0,可以使用SELECT 命令在连接上指定数据库id;

访问密码的查看、设置和取消;
在命令中设置密码,只是临时的。重启redis服务器,密码就还原了。
永久设置,需要再配置文件中进行设置。
config get requirepass
config set requirepass "123456"
auth 123456
config get requirepass

重启后:
shutdown
redis-server /myRedis/redis.conf
redis-cli
config get requirepass





Redis 发布订阅 (pub/sub) 是一种消息通信模式:发送者 (pub) 发送消息,订阅者 (sub) 接收消息。
Redis 客户端可以订阅任意数量的频道。


SUBSCRIBE channel1

publish channel1 hello
返回的1是订阅者数量

现代计算机用二进制(位) 作为信息的基础单位, 1个字节等于8位, 例如“abc”字符串是由3个字节组成, 但实际在计算机存储时将其用二进制表示, “abc”分别对应的ASCII码分别是97、 98、 99, 对应的二进制分别是01100001、 01100010和01100011,如下图:
合理地使用操作位能够有效地提高内存使用率和开发效率。
Redis提供了Bitmaps这个“数据类型”可以实现对位的操作:
(1) Bitmaps本身不是一种数据类型, 实际上它就是字符串(key-value) , 但是它可以对字符串的位进行操作。
(2) Bitmaps单独提供了一套命令, 所以在Redis中使用Bitmaps和使用字符串的方法不太相同。 可以把Bitmaps想象成一个以位为单位的数组, 数组的每个单元只能存储0和1, 数组的下标在Bitmaps中叫做偏移量。

(1)格式
setbit< key>< offset>< value>设置Bitmaps中某个偏移量的值(0或1)
*offset:偏移量从0开始
(2)实例
每个独立用户是否访问过网站存放在Bitmaps中, 将访问的用户记做1, 没有访问的用户记做0, 用偏移量作为用户的id。
设置键的第offset个位的值(从0算起) , 假设现在有20个用户,userid=1, 6, 11, 15, 19的用户对网站进行了访问, 那么当前Bitmaps初始化结果如图。

unique:users:20220821代表2022-08-21这天的独立访问用户的Bitmaps
setbit unique:users:20220821 1 1
setbit unique:users:20220821 6 1
setbit unique:users:20220821 11 1
setbit unique:users:20220821 15 1
setbit unique:users:20220821 19 1

注:
(1)格式
getbit< key>< offset>获取Bitmaps中某个偏移量的值;
获取键的第offset位的值(从0开始算);
(2)实例
获取id=8的用户是否在2020-11-06这天访问过, 返回0说明没有访问过:
getbit unique:users:20220821 8
getbit unique:users:20220821 11
getbit unique:users:20220821 1

统计字符串被设置为1的bit数。一般情况下,给定的整个字符串都会被进行计数,通过指定额外的 start 或 end 参数,可以让计数只在特定的位上进行。start 和 end 参数的设置,都可以使用负数值:比如 -1 表示最后一个位,而 -2 表示倒数第二个位,start、end 是指bit组的字节的下标数,二者皆包含。
(1)格式
bitcount< key>[start end] 统计字符串从start字节到end字节比特值为1的数量;
(2)实例
计算2022-11-06这天的独立访问用户数量
bitcount unique:users:20220821
start和end代表起始和结束字节数, 下面操作计算用户id在第1个字节到第3个字节之间的独立访问用户数, 对应的用户id是11, 15, 19。
BITCOUNT unique:users:20220821 1 3

注意:redis的setbit设置或清除的是bit位置,而bitcount计算的是byte位置。
(1)格式
bitop and(or/not/xor) < destkey> [key…]
bitop是一个复合操作, 它可以做多个Bitmaps的and(交集) 、 or(并集) 、 not(非) 、 xor(异或) 操作并将结果保存在destkey中。
(2)实例
2020-08-22 日访问网站的userid=1,2,5,9。
setbit unique:users:20220822 1 1
setbit unique:users:20220822 2 1
setbit unique:users:20220822 5 1
setbit unique:users:20220822 9 1
2020-08-23 日访问网站的userid=0,1,4,9。
setbit unique:users:20220823 0 1
setbit unique:users:20220823 1 1
setbit unique:users:20220823 4 1
setbit unique:users:20220823 9 1
计算出两天都访问过网站的用户数量
bitop and unique:users:and:20220822_23 unique:users:20220822 unique:users:20220823
bitcount unique:users:and:20220822_23


计算出任意一天都访问过网站的用户数量(例如月活跃就是类似这种) , 可以使用or求并集;
bitop or unique:users:or:20220822_23 unique:users:20220822 unique:users:20220823
bitcount unique:users:or:20220822_23

假设网站有1亿用户, 每天独立访问的用户有5千万, 如果每天用集合类型和Bitmaps分别存储活跃用户可以得到表;
很明显, 这种情况下使用Bitmaps能节省很多的内存空间, 尤其是随着时间推移节省的内存还是非常可观的;

但Bitmaps并不是万金油, 假如该网站每天的独立访问用户很少, 例如只有10万(大量的僵尸用户) , 那么两者的对比如下表所示, 很显然, 这时候使用Bitmaps就不太合适了, 因为基本上大部分位都是0。

(1)格式
pfadd < key>< element> [element …] 添加指定元素到 HyperLogLog 中
(2)实例
pfadd hlll "redis"
pfadd hlll "mysql"
pfadd hlll "redis"

将所有元素添加到指定HyperLogLog数据结构中。如果执行命令后HLL估计的近似基数发生变化,则返回1,否则返回0。
(1)格式
pfcount< key> [key …] 计算HLL的近似基数,可以计算多个HLL,比如用HLL存储每天的UV,计算一周的UV可以使用7天的UV合并计算即可。
(2)实例
pfcount hlll
pfcount hlll redis

(1)格式
pfmerge< destkey>< sourcekey> [sourcekey …]
将一个或多个HLL合并后的结果存储在另一个HLL中,比如每月活跃用户可以使用每天的活跃用户来合并计算可得
(2)实例
pfcount hlll
PFMERGE hll2 hlll
pfcount hll2

Redis 3.2 中增加了对GEO类型的支持。GEO,Geographic,地理信息的缩写。该类型,就是元素的2维坐标,在地图上就是经纬度。redis基于该类型,提供了经纬度设置,查询,范围查询,距离查询,经纬度Hash等常见操作。
(1)格式
geoadd< key>< longitude>< latitude>< member> [longitude latitude member…]
添加地理位置(经度,纬度,名称)
(2)实例
geoadd china:city 121.47 31.23 hsnaghai
geoadd china:city 106.50 29.53 chongqing
geoadd china:city 114.05 22.52 shenzhen 116.38 39.90 beijing
两极无法直接添加,一般会下载城市数据,直接通过 Java 程序一次性导入。
有效的经度从 -180 度到 180 度。有效的纬度从 -85.05112878 度到 85.05112878 度。
当坐标位置超出指定范围时,该命令将会返回一个错误。
已经添加的数据,是无法再次往里面添加的。
(1)格式
geopos < key>< member> [member…] 获得指定地区的坐标值
(2)实例
geopos china:city chongqing

(1)格式
geodist< key>< member1>< member2> [m|km|ft|mi ] 获取两个位置之间的直线距离
(2)实例
获取两个位置之间的直线距离
geodist china:city chongqing shenzhen km
单位:
m 表示单位为米[默认值]。
km 表示单位为千米。
mi 表示单位为英里。
ft 表示单位为英尺。
如果用户没有显式地指定单位参数, 那么 GEODIST 默认使用米作为单位
(1)格式
georadius< key>< longitude>< latitude>radius m|km|ft|mi
以给定的经纬度为中心,找出某一半径内的元素(经度 纬度 距离 单位)
(2)实例
georadius china:city 110 30 1000 km

redis.clients
jedis
3.2.0
systemctl stop/disable firewalld.service

@Test
public void testJedis01(){
Jedis jedis = new Jedis("192.168.153.129", 6379);
String pong = jedis.ping();
if("PONG".equalsIgnoreCase(pong)){
System.out.println("连接成功"+pong);
}else{
System.out.println("连接失败!!!!");
}
jedis.close();
}

@Test
public void testJedis01(){
Jedis jedis = new Jedis("192.168.153.129", 6379);
String pong = jedis.ping();
if("PONG".equalsIgnoreCase(pong)){
System.out.println("连接成功"+pong);//连接成功PONG
jedis.flushDB();
jedis.set("k1","v1");
jedis.set("k2","v2");
jedis.set("k3","v3");
Set keys = jedis.keys("*");
System.out.println(keys.size());//3
for(String key:keys){
System.out.println(key);//k3 k1 k2
}
System.out.println(jedis.exists("k1"));//true
System.out.println(jedis.ttl("k1"));//-1
System.out.println(jedis.get("k1"));//v1
}else{
System.out.println("连接失败!!!!");
}
jedis.close();
}

@Test
public void testJedis02(){
Jedis jedis = new Jedis("192.168.153.129", 6379);
String pong = jedis.ping();
if("PONG".equalsIgnoreCase(pong)){
System.out.println("连接成功"+pong);//连接成功PONG
jedis.flushDB();
jedis.mset("str1","v1","str2","v2","str3","v3");
System.out.println(jedis.mget("str1","str2","str3"));
}else{
System.out.println("连接失败!!!!");
}
jedis.close();
}

@Test
public void testJedis03(){
Jedis jedis = new Jedis("192.168.153.129", 6379);
String pong = jedis.ping();
if("PONG".equalsIgnoreCase(pong)){
System.out.println("连接成功"+pong);//连接成功PONG
jedis.flushDB();
jedis.rpush("k1", "v1", "v2", "v3");
List list = jedis.lrange("k1", 0, -1);
for(String li:list){
System.out.println(li);
}
}else{
System.out.println("连接失败!!!!");
}
jedis.close();
}

@Test
public void testJedis04(){
Jedis jedis = new Jedis("192.168.153.129", 6379);
String pong = jedis.ping();
if("PONG".equalsIgnoreCase(pong)){
System.out.println("连接成功"+pong);//连接成功PONG
jedis.flushDB();
jedis.sadd("set","set01");
jedis.sadd("set","set02");
jedis.sadd("set","set03");
jedis.sadd("set","set04");
Set sets = jedis.smembers("set");
for(String set:sets){
System.out.println(set);
}
}else{
System.out.println("连接失败!!!!");
}
jedis.close();
}

@Test
public void testJedis05(){
Jedis jedis = new Jedis("192.168.153.129", 6379);
String pong = jedis.ping();
if("PONG".equalsIgnoreCase(pong)){
System.out.println("连接成功"+pong);//连接成功PONG
jedis.flushDB();
jedis.hset("hash","userName","lisi");
System.out.println(jedis.hget("hash","userName"));
Map map = new HashMap<>();
map.put("telephone","123456789");
map.put("address","beijing");
map.put("email","zyy@qq.com");
jedis.hmset("hash1", map);
List result = jedis.hmget("hash1", "telephone", "address", "email");
for(String hash:result){
System.out.println(hash);
}
}else{
System.out.println("连接失败!!!!");
}
jedis.close();
}

@Test
public void testJedis06(){
Jedis jedis = new Jedis("192.168.153.129", 6379);
String pong = jedis.ping();
if("PONG".equalsIgnoreCase(pong)){
System.out.println("连接成功"+pong);//连接成功PONG
jedis.flushDB();
jedis.zadd("zSet",100,"z3");
jedis.zadd("zSet",90,"z4");
jedis.zadd("zSet",80,"z5");
jedis.zadd("zSet",70,"z6");
Set zSet = jedis.zrange("zSet", 0, -1);
for(String set:zSet){
System.out.println(set);
}
}else{
System.out.println("连接失败!!!!");
}
jedis.close();
}

要求:
package com.tedu.java;
import org.junit.Test;
import redis.clients.jedis.Jedis;
import java.util.Random;
/**
* @author: zyy
* @date: 2022/8/21 10:25
* @description: TODO
* @version: 1.0
* @描述:
**/
public class PhoneCode {
@Test
public void test01(){
//模拟验证码发送 获取验证码
verifyCode("13678765435");
//将获取的验证码,调用模拟验证码校验
//getRedisCode("13678765435","123767");
}
//1.生成6位数字验证码
public static String getRandomCode(){
Random random = new Random();
String code="";
for (int i=0;i<6;i++){
int nextInt = random.nextInt(10);
code+=nextInt;
}
return code;
}
//2.每个手机每天只能发送三次,验证码放到redis中,设置过期时间120
public static void verifyCode(String phone){
//连接redis
Jedis jedis = new Jedis("192.168.153.129", 6379);
String pong = jedis.ping();
if("PONG".equalsIgnoreCase(pong)){
//拼接key以及手机发送次数key
String countKey = "VerifyCode"+phone+"count";
//验证码key
String codeKey = "VerifyCode"+phone+"code";
//每个手机每天只能发送三次
String count = jedis.get(countKey);
if(count==null){
//没有发送次数,第一次发送
//设置发送次数是1
jedis.setex(countKey,24*60*60,"1");
}else if(Integer.parseInt(count)<=2){
//发送次数+1
jedis.incr(countKey);
}else{
//发送三次或者三次以上,不能再发送
System.out.println("今天发送次数已经超过三次");
jedis.close();
}
//发送验证码放到redis里面
String vCode = getRandomCode();
jedis.setex(codeKey,120,vCode);
}else{
System.out.println("连接失败!!!!");
}
jedis.close();
}
//3 验证码校验
public static void getRedisCode(String phone, String code) {
//从redis获取验证码
Jedis jedis = new Jedis("192.168.153.129", 6379);
String pong = jedis.ping();
if("PONG".equalsIgnoreCase(pong)){
//验证码key
String codeKey = "VerifyCode"+phone+"code";
String redisCode = jedis.get(codeKey);
if(redisCode.equalsIgnoreCase(redisCode)){
System.out.println("验证成功");
}else{
System.out.println("验证失败");
}
}else{
System.out.println("连接失败!!!!");
}
jedis.close();
}
}

org.springframework.boot
spring-boot-starter-data-redis
org.apache.commons
commons-pool2
2.6.0
#Redis服务器地址
spring.redis.host=192.168.140.136
#Redis服务器连接端口
spring.redis.port=6379
#Redis数据库索引(默认为0)
spring.redis.database= 0
#连接超时时间(毫秒)
spring.redis.timeout=1800000
#连接池最大连接数(使用负值表示没有限制)
spring.redis.lettuce.pool.max-active=20
#最大阻塞等待时间(负数表示没限制)
spring.redis.lettuce.pool.max-wait=-1
#连接池中的最大空闲连接
spring.redis.lettuce.pool.max-idle=5
#连接池中的最小空闲连接
spring.redis.lettuce.pool.min-idle=0
@EnableCaching
@Configuration
public class RedisConfig extends CachingConfigurerSupport {
@Bean
public RedisTemplate redisTemplate(RedisConnectionFactory factory) {
RedisTemplate template = new RedisTemplate<>();
RedisSerializer redisSerializer = new StringRedisSerializer();
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
template.setConnectionFactory(factory);
//key序列化方式
template.setKeySerializer(redisSerializer);
//value序列化
template.setValueSerializer(jackson2JsonRedisSerializer);
//value hashmap序列化
template.setHashValueSerializer(jackson2JsonRedisSerializer);
return template;
}
@Bean
public CacheManager cacheManager(RedisConnectionFactory factory) {
RedisSerializer redisSerializer = new StringRedisSerializer();
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
//解决查询缓存转换异常的问题
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
// 配置序列化(解决乱码的问题),过期时间600秒
RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
.entryTtl(Duration.ofSeconds(600))
.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(redisSerializer))
.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(jackson2JsonRedisSerializer))
.disableCachingNullValues();
RedisCacheManager cacheManager = RedisCacheManager.builder(factory)
.cacheDefaults(config)
.build();
return cacheManager;
}
}
RedisTestController中添加测试方法
@RestController
@RequestMapping("/redisTest")
public class RedisTestController {
@Autowired
private RedisTemplate redisTemplate;
@GetMapping
public String testRedis() {
//设置值到redis
redisTemplate.opsForValue().set("name","lucy");
//从redis获取值
String name = (String)redisTemplate.opsForValue().get("name");
return name;
}
}
Redis事务是一个单独的隔离操作:事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。
Redis事务的主要作用就是串联多个命令防止别的命令插队。
从输入Multi命令开始,输入的命令都会依次进入命令队列中,但不会执行,直到输入Exec后,Redis会将之前的命令队列中的命令依次执行。
组队的过程中可以通过discard来放弃组队。
案例:
multi
set k1 v1
set k2 v2
set k3 v3
exec
组队成功,提交成功

组队阶段报错,提交失败
组队成功,提交有成功有失败情况
组队中某个命令出现了报告错误,执行时整个的所有队列都会被取消;

如果执行阶段某个命令报出了错误,则只有报错的命令不会被执行,而其他的命令都会执行,不会回滚。

一个请求想给金额减8000;
一个请求想给金额减5000;
一个请求想给金额减1000;


悲观锁(Pessimistic Lock), 顾名思义,就是很悲观,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会block直到它拿到锁。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁。

乐观锁(Optimistic Lock), 顾名思义,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号等机制。乐观锁适用于多读的应用类型,这样可以提高吞吐量。Redis就是利用这种check-and-set机制实现事务的。
在执行multi之前,先执行watch key1 [key2],可以监视一个(或多个) key ,如果在事务执行之前这个(或这些) key 被其他命令所改动,那么事务将被打断。
取消 WATCH 命令对所有 key 的监视。
如果在执行 WATCH 命令之后,EXEC 命令或DISCARD 命令先被执行了的话,那么就不需要再执行UNWATCH 了。
http://doc.redisfans.com/transaction/exec.html
官网介绍:http://www.redis.io
Redis 提供了2个不同形式的持久化方式。

在指定的时间间隔内将内存中的数据集快照写入磁盘, 也就是行话讲的Snapshot快照,它恢复时是将快照文件直接读到内存里。
Redis会单独创建(fork)一个子进程来进行持久化,会先将数据写入到 一个临时文件中,待持久化过程都结束了,再用这个临时文件替换上次持久化好的文件。 整个过程中,主进程是不进行任何IO操作的,这就确保了极高的性能 如果需要进行大规模数据的恢复,且对于数据恢复的完整性不是非常敏感,那RDB方式要比AOF方式更加的高效。RDB的缺点是最后一次持久化后的数据可能丢失。

在redis.conf中配置文件名称,默认为dump.rdb

db文件的保存路径,也可以修改。默认为Redis启动时命令行所在的目录下
dir “/myredis/”

执行flushall命令,也会产生dump.rdb文件,但里面是空的,无意义
格式:save 秒钟 写操作次数
RDB是整个内存的压缩过的Snapshot,RDB的数据结构,可以配置复合的快照触发条件。
默认是1分钟内改了1万次,或5分钟内改了10次,或15分钟内改了1次。
禁用:不设置save指令,或者给save传入空字符串。
当Redis无法写入磁盘的话,直接关掉Redis的写操作。推荐yes.
对于存储到磁盘中的快照,可以设置是否进行压缩存储。如果是的话,redis会采用LZF算法进行压缩。
如果你不想消耗CPU来进行压缩的话,可以设置为关闭此功能。推荐yes。
在存储快照后,还可以让redis使用CRC64算法来进行数据校验,但是这样做会增加大约10%的性能消耗,如果希望获取到最大的性能提升,可以关闭此功能,推荐yes。
先通过config get dir 查询rdb文件的目录 ,将*.rdb的文件拷贝到别的地方。
rdb的恢复:
1. 关闭Redis
2. 先把备份的文件拷贝到工作目录下 cp dump2.rdb dump.rdb
3.启动Redis, 备份数据会直接加载

动态停止RDB:redis-cli config set save “”#save后给空值,表示禁用保存策略。

以日志的形式来记录每个写操作(增量保存),将Redis执行过的所有写指令记录下来(读操作不记录), 只许追加文件但不可以改写文件,redis启动之初会读取该文件重新构建数据,换言之,redis 重启的话就根据日志文件的内容将写指令从前到后执行一次以完成数据的恢复工作。
(1)客户端的请求写命令会被append追加到AOF缓冲区内;
(2)AOF缓冲区根据AOF持久化策略[always,everysec,no]将操作sync同步到磁盘的AOF文件中;
(3)AOF文件大小超过重写策略或手动重写时,会对AOF文件rewrite重写,压缩AOF文件容量;
(4)Redis服务重启时,会重新load加载AOF文件中的写操作达到数据恢复的目的;

可以在redis.conf中配置文件名称,默认为 appendonly.aof。
AOF文件的保存路径,同RDB的路径一致。
AOF和RDB同时开启,系统默认取AOF的数据(数据不会存在丢失)。
修改默认的appendonly no,改为yes。
将有数据的aof文件复制一份保存到对应目录(查看目录:config get dir).
恢复:重启redis然后重新加载。

修改默认的appendonly no,改为yes。
如遇到AOF文件损坏,通过/usr/local/bin/redis-check-aof--fix appendonly.aof进行恢复。
备份被写坏的AOF文件。
恢复:重启redis,然后重新加载。
AOF采用文件追加方式,文件会越来越大为避免出现此种情况,新增了重写机制, 当AOF文件的大小超过所设定的阈值时,Redis就会启动AOF文件的内容压缩, 只保留可以恢复数据的最小指令集.可以使用命令bgrewriteaof。
系统载入时或者上次重写完毕时,Redis会记录此时AOF大小,设为base_size,
如果Redis的AOF当前大小>= base_size +base_size*100% (默认)且当前大小>=64mb(默认)的情况下,Redis会对AOF进行重写。


1. 备份机制更稳健,丢失数据概率更低。
2. 可读的日志文本,通过操作AOF稳健,可以处理误操作。

官方推荐两个都启用。

建议不要,因为RDB更适合用于备份数据库(AOF在不断变化不好备份), 快速重启,而且不会有AOF可能潜在的bug,留着作为一个万一的手段。
1. 因为RDB文件只用作后备用途,建议只在Slave上持久化RDB文件,而且只要15分钟备份一次就够了,只保留save 900 1这条规则。
2.如果使用AOF,好处是在最恶劣情况下也只会丢失不超过两秒数据,启动脚本较简单只load自己的AOF文件就可以了。
代价:
一是带来了持续的IO,
二是AOF rewrite的最后将rewrite过程中产生的新数据写到新文件造成的阻塞几乎是不可避免的。
只要硬盘许可,应该尽量减少AOF rewrite的频率,AOF重写的基础大小默认值64M太小了,可以设到5G以上。
默认超过原大小100%大小时重写可以改到适当的数值。
主机数据更新后根据配置和策略, 自动同步到备机的master/slaver机制,Master以写为主,Slave以读为主。

克隆多台虚拟机,分别为:
| 端口号 | 主/从 |
|---|---|
| 192.168.153.129 | 主机 |
| 192.168.153.128 | 从机 |
| 192.168.153.130 | 从机 |
修改:/myRedis/redis.conf中:




redis-server /myRedis/redis.config
ps -ef | grep redis
info replication
打印主从复制的相关信息

slaveof 192.168.153.129 6379
成为192.168.153.129实例的从服务器。
2. 再次查看:info replication
128:

129:

130:



次从机联通后,都会给主机发送sync指令主机立刻进行存盘操作,发送RDB文件,给从机从机收到RDB文件后,进行全盘加载之后每次主机的写操作,都会立刻发送给从机,从机执行相同的命令。

上一个Slave可以是下一个slave的Master,Slave同样可以接收其他 slaves的连接和同步请求,那么该slave作为了链条中下一个的master, 可以有效减轻master的写压力,去中心化降低风险。
用 slaveof :中途变更转向:会清除之前的数据,重新建立拷贝最新的。风险是一旦某个slave宕机,后面的slave都没法备份。主机挂了,从机还是从机,无法写数据了。


当一个master宕机后,后面的slave可以立刻升为master,其后面的slave不用做任何修改。
用 slaveof no one 将从机变为主机。

反客为主的自动版,能够后台监控主机是否故障,如果故障了根据投票数自动将从库转换为主库。


sentinel monitor mymaster 192.168.153.128 6379 1
其中mymaster为监控对象起的服务器名称, 1 为至少有多少个哨兵同意迁移的数量。
cd /usr/local/bin
#redis做压测可以用自带的redis-benchmark工具
redis-sentinel /myredis/sentinel.conf

由于所有的写操作都是先在Master上操作,然后同步更新到Slave上,所以从Master同步到Slave机器有一定的延迟,当系统很繁忙的时候,延迟问题会更加严重,Slave机器数量的增加也会使这个问题更加严重。

private static JedisSentinelPool jedisSentinelPool=null;
public static Jedis getJedisFromSentinel(){
if(jedisSentinelPool==null){
Set sentinelSet=new HashSet<>();
sentinelSet.add("192.168.153.128:26379");
JedisPoolConfig jedisPoolConfig =new JedisPoolConfig();
jedisPoolConfig.setMaxTotal(10); //最大可用连接数
jedisPoolConfig.setMaxIdle(5); //最大闲置连接数
jedisPoolConfig.setMinIdle(5); //最小闲置连接数
jedisPoolConfig.setBlockWhenExhausted(true); //连接耗尽是否等待
jedisPoolConfig.setMaxWaitMillis(2000); //等待时间
jedisPoolConfig.setTestOnBorrow(true); //取连接的时候进行一下测试 ping pong
jedisSentinelPool=new JedisSentinelPool("mymaster",sentinelSet,jedisPoolConfig);
return jedisSentinelPool.getResource();
}else{
return jedisSentinelPool.getResource();
}
}
容量不够,redis如何进行扩容?
并发写操作, redis如何分摊?
另外,主从模式,薪火相传模式,主机宕机,导致ip地址发生变化,应用程序中配置需要修改对应的主机地址、端口等信息。
之前通过代理主机来解决,但是redis3.0中提供了解决方案。就是无中心化集群配置。
将rdb,aof文件都删除掉。
开启daemonize yes
Log文件名字
Dump.rdb名字
appendonly 关掉或者换名字
cluster-enabled yes 打开集群模式
cluster-config-file nodes-6379.conf 设定节点配置文件名
cluster-node-timeout 15000 设定节点失联时间,超过该时间(毫秒),集群自动进行主从切换。
redis-server /myRedis/redis.config
组合之前,请确保所有redis实例启动后,nodes-xxxx.conf文件都生成正常。
cd /opt/redis-6.2.1/src
redis-cli --cluster create --cluster-replicas 1 192.168.153.128:6379
192.168.153.129:6379 192.168.153.130:6379 192.168.153.131:6379
192.168.153.132:6379 192.168.153.133:6379
此处不要用127.0.0.1, 请用真实IP地址!!!
–replicas 1 采用最简单的方式配置集群,一台主机,一台从机,正好三组。
组合之前,请确保所有redis实例启动后,nodes-xxxx.conf文件都生成正常。
redis-cli -c
cluster nodes
一个集群至少要有三个主节点。
选项 --cluster-replicas 1 表示我们希望为集群中的每个主节点创建一个从节点。
分配原则尽量保证每个主数据库运行在不同的IP地址,每个从库和主库不在一个IP地址上。
一个 Redis 集群包含 16384 个插槽(hash slot), 数据库中的每个键都属于这 16384 个插槽的其中一个,集群使用公式 CRC16(key) % 16384 来计算键 key 属于哪个槽, 其中 CRC16(key) 语句用于计算键 key 的 CRC16 校验和 。
集群中的每个节点负责处理一部分插槽。 举个例子, 如果一个集群可以有主节点, 其中:
节点 A 负责处理 0 号至 5460 号插槽。
节点 B 负责处理 5461 号至 10922 号插槽。
节点 C 负责处理 10923 号至 16383 号插槽。
如 redis-cli -c –p 6379 登入后,再录入、查询键值对可以自动重定向。


CLUSTER GETKEYSINSLOT < slot>< count> 返回 count 个 slot 槽中的键。

如果主节点下线?从节点能否自动升为主节点?注意:15秒超时

主节点恢复后,主从关系会如何?主节点回来变成从机。

- 如果某一段插槽的主从都挂掉,而***cluster-require-full-coverage 为yes*** ,那么 ,整个集群都挂掉;
- 如果某一段插槽的主从都挂掉,***而cluster-require-full-coverage 为no*** ,那么,该插槽数据全都不能使用,也无法存储。
redis.conf中的参数 cluster-require-full-coverage
即使连接的不是主机,集群会自动切换主机存储。主机写,从机读。
无中心化主从集群。无论从哪台主机写的数据,其他主机上都能读到数据。
public class JedisClusterTest {
public static void main(String[] args) {
Setset =new HashSet();
set.add(new HostAndPort("192.168.153.128",6379));
JedisCluster jedisCluster=new JedisCluster(set);
jedisCluster.set("k1", "v1");
System.out.println(jedisCluster.get("k1"));
}
}
key对应的数据在数据源并不存在,每次针对此key的请求从缓存获取不到,请求都会压到数据源,从而可能压垮数据源。比如用一个不存在的用户id获取用户信息,不论缓存还是数据库都没有,若黑客利用此漏洞进行攻击可能压垮数据库。

一个一定不存在缓存及查询不到的数据,由于缓存是不命中时被动写的,并且出于容错考虑,如果从存储层查不到数据则不写入缓存,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义。
解决方案:
(1) 对空值缓存:如果一个查询返回的数据为空(不管是数据是否不存在),我们仍然把这个空结果(null)进行缓存,设置空结果的过期时间会很短,最长不超过五分钟。
(2) 设置可访问的名单(白名单):
使用bitmaps类型定义一个可以访问的名单,名单id作为bitmaps的偏移量,每次访问和bitmap里面的id进行比较,如果访问id不在bitmaps里面,进行拦截,不允许访问。
(3) 采用布隆过滤器:(布隆过滤器(Bloom Filter)是1970年由布隆提出的。它实际上是一个很长的二进制向量(位图)和一系列随机映射函数(哈希函数)。
布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都远远超过一般的算法,缺点是有一定的误识别率和删除困难。)将所有可能存在的数据哈希到一个足够大的bitmaps中,一个一定不存在的数据会被 这个bitmaps拦截掉,从而避免了对底层存储系统的查询压力。
(4) 进行实时监控:当发现Redis的命中率开始急速降低,需要排查访问对象和访问的数据,和运维人员配合,可以设置黑名单限制服务。
key对应的数据存在,但在redis中过期,此时若有大量并发请求过来,这些请求发现缓存过期一般都会从后端DB加载数据并回设到缓存,这个时候大并发的请求可能会瞬间把后端DB压垮。

key可能会在某些时间点被超高并发地访问,是一种非常“热点”的数据。这个时候,需要考虑一个问题:缓存被“击穿”的问题。
解决问题:
(1)预先设置热门数据:在redis高峰访问之前,把一些热门数据提前存入到redis里面,加大这些热门数据key的时长。
(2)实时调整:现场监控哪些数据热门,实时调整key的过期时长。
(3)使用锁:
a. 就是在缓存失效的时候(判断拿出来的值为空),不是立即去load db;
b. 先使用缓存工具的某些带成功操作返回值的操作(比如Redis的SETNX)去set一个mutex key;
c. 当操作返回成功时,再进行load db的操作,并回设缓存,最后删除mutex key;
d. 当操作返回失败,证明有线程在load db,当前线程睡眠一段时间再重试整个get缓存的方法。

key对应的数据存在,但在redis中过期,此时若有大量并发请求过来,这些请求发现缓存过期一般都会从后端DB加载数据并回设到缓存,这个时候大并发的请求可能会瞬间把后端DB压垮。
缓存雪崩与缓存击穿的区别在于这里针对很多key缓存,前者则是某一个key正常访问。


缓存失效时的雪崩效应对底层系统的冲击非常可怕!
解决方案:
(1)构建多级缓存架构:nginx缓存 + redis缓存 +其他缓存(ehcache等);
(2)使用锁或队列:用加锁或者队列的方式保证来保证不会有大量的线程对数据库一次性进行读写,从而避免失效时大量的并发请求落到底层存储系统上。不适用高并发情况。
(3)设置过期标志更新缓存:记录缓存数据是否过期(设置提前量),如果过期会触发通知另外的线程在后台去更新实际key的缓存。
(4)将缓存失效时间分散开:比如我们可以在原有的失效时间基础上增加一个随机值,比如1-5分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件。
Redis ACL是Access Control List(访问控制列表)的缩写,该功能允许根据可以执行的命令和可以访问的键来限制某些连接。
在Redis 5版本之前,Redis 安全规则只有密码控制 还有通过rename 来调整高危命令比如 flushdb , KEYS* , shutdown 等。Redis 6 则提供ACL的功能对用户进行更细粒度的权限控制 :
(1)接入权限:用户名和密码 ;
(2)可以执行的命令 ;
(3)可以操作的 KEY;
参考官网:https://redis.io/topics/acl
acl list

acl cat
(2) 加参数类型名可以查看类型下具体命令
acl cat string

acl whoami

A. 启动和禁用用户:
on 激活某用户账号
off 禁用某用户账号。注意,已验证的连接仍然可以工作。如果默认用户被标记为off,则新连接将在未进行身份验证的情况下启动,并要求用户使用AUTH选项发送AUTH或HELLO,以便以某种方式进行身份验证。
B. 权限的添加删除:
+:将指令添加到用户可以调用的指令列表中。
-:从用户可执行指令列表移除指令。
+@:添加该类别中用户要调用的所有指令,有效类别为@admin、@set、@sortedset…等,通过调用ACL CAT命令查看完整列表。特殊类别@all表示所有命令,包括当前存在于服务器中的命令,以及将来将通过模块加载的命令。
-@:从用户可调用指令中移除类别。
allcommands:+@all的别名。
nocommand:-@all的别名。
C.可操作键的添加或删除:
~:添加可作为用户可操作的键的模式。例如~*允许所有的键。
(2) 通过命令创建新用户默认权限
acl setuser user1
在上面的示例中,我根本没有指定任何规则。如果用户不存在,这将使用just created的默认属性来创建用户。如果用户已经存在,则上面的命令将不执行任何操作。
(3)设置有用户名、密码、ACL权限、并启用的用户
acl setuser user2 on >password ~cached:* +get
(4) 切换用户,验证权限

Redis6终于支撑多线程了,告别单线程了吗?
IO多线程其实指客户端交互部分的网络IO交互处理模块多线程,而非执行命令多线程。Redis6执行命令依然是单线程。
Redis 6 加入多线程,但跟 Memcached 这种从 IO处理到数据访问多线程的实现模式有些差异。Redis 的多线程部分只是用来处理网络数据的读写和协议解析,执行命令仍然是单线程。之所以这么设计是不想因为多线程而变得复杂,需要去控制 key、lua、事务,LPUSH/LPOP 等等的并发问题。整体的设计大体如下:

另外,多线程IO默认也是不开启的,需要再配置文件中配置
io-threads-do-reads yes
io-threads 4
之前老版Redis想要搭集群需要单独安装ruby环境,Redis 5 将 redis-trib.rb 的功能集成到 redis-cli 。另外官方 redis-benchmark 工具开始支持 cluster 模式了,通过多线程的方式对多个分片进行压测。
redis-benchmark --help

Redis6新功能还有: