滴水穿石不是水的力量,而是坚持的力量
1 为什么需要远程登录Linux
说明: 公司开发时候, 具体的情况是这样的
linux 服务器是开发小组共享的.
正式上线的项目是运行在公网的.
因此程序员需要远程登录到 centos 进行项目管理或者开发.
画出简单的网络拓扑示意图(帮助理解)
远程登录客户端有 Xshell5, Xftp5 , 我们学习使用 Xshell5 和 Xftp , 其它的远程工具大同小异.
2 远程登录Linux-Xshell5
特别说明:如果希望安装好 XShell 5 就可以远程访问 Linux 系统的话,需要有一个前提,就是Linux 启用了 SSHD 服务,该服务会监听 22 号端口
。
- XShell7 下载
xshell7和xftp7下载地址:https://www.xshell.com/zh/free-for-home-school/
然后你就会看到:
最后一步:
下载完成:
4 . XShell5 安装
然后傻瓜式安装,狂点下一步
5.XShell使用前提
有一个前提:必须要知道linux的ip地址 & XShell所在的windows电脑必须ping通linux
在linux终端里面要用管理员权限
去操作
先以普通用户登进去,然后在终端里输入:
sudo su root # 以管理员权限切换至root用户,不可以可以用下一行
切换到管理员权限
注:ctrl+alt+T打开终端
去获取linux的ip:使用命令:ifconfig
cmd里面ping一下
ok
6.报错:网络连接激活失败
连不上网那更连不上XShell了!!!
解决方案:
首先我声明昂~错误种类很多,所以解决方案也很多
7 .XShell连接linux
打开XShell,点击新建
填信息,点击确定
,不是连接昂
点击关闭
`不是点关闭昂!!!!!是点击连接`先选中上面的会话,然后点击连接!!!
双击
在Windows和linux之间上传下载文件
1 .下载
下载方法同XShell
xshell7和xftp7下载地址:https://www.xshell.com/zh/free-for-home-school/
2.使用前提
有一个前提:必须要知道linux的ip地址 & XShell所在的windows电脑必须ping通linux
3.XFTP连接linux
SFTP
22
ifconfig的那个
点击连接!!!
如果点击关闭,那之前做的一切就没有意义了!!!
问题来了,没有连接上
解决:
4 .使用
效果:
技术的分类
1、解决功能性的问题
:Java、Jsp、RDBMS、Tomcat、HTML、Linux、JDBC
2、解决扩展性的问题
:Struts、Spring、SpringMVC、Hibernate、Mybatis
3、解决性能的问题
:NoSQL、Java线程、Hadoop、Nginx、MQ、ElasticSearch
NoSQL是为了解决性能问题而产生的一门技术,而Redis是典型的NoSQL数据库
Web1.0的时代,数据访问量很有限,用一夫当关的高性能的单点服务器可以解决大部分问题
随着Web2.0的时代的到来,用户访问量大幅度提升,同时产生了大量的用户数据。加上后来的智能移动设备的普及,所有的互联网平台都面临了巨大的性能挑战。
Nginx
用分布式或者集群方式来做架构,产生最典型的问题:session共享问题
session共享问题解决方案
NoSQL(NoSQL = Not Only SQL ),意即“不仅仅是SQL”,泛指非关系型的数据库。
NoSQL 不依赖业务逻辑方式存储,而以简单的key-value模式存储。因此大大的增加了数据库的扩展能力。
(用不着sql的和用了sql也不行的情况,请考虑用NoSql)
HBase是Hadoop项目中的数据库。它用于需要对大量的数据进行随机、实时的读写操作的场景中。
HBase的目标就是处理数据量非常庞大的表,可以用普通的计算机处理超过10亿行数据,还可处理有数百万列元素的数据表。
计算机存储单位 计算机存储单位一般用B,KB,MB,GB,TB,EB,ZB,YB,BB来表示,它们之间的关系是:
位 bit (比特)(Binary Digits):存放一位二进制数,即 0 或 1,最小的存储单位。
字节 byte:8个二进制位为一个字节(B),最常用的单位。
1KB (Kilobyte 千字节)=1024B,
1MB (Megabyte 兆字节 简称“兆”)=1024KB,
1GB (Gigabyte 吉字节 又称“千兆”)=1024MB,
1TB (Trillionbyte 万亿字节 太字节)=1024GB,其中1024=2^10 ( 2 的10次方),
1PB(Petabyte 千万亿字节 拍字节)=1024TB,
1EB(Exabyte 百亿亿字节 艾字节)=1024PB,
1ZB (Zettabyte 十万亿亿字节 泽字节)= 1024 EB,
1YB (Jottabyte 一亿亿亿字节 尧字节)= 1024 ZB,
1BB (Brontobyte 一千亿亿亿字节)= 1024 YB.
注:“兆”为百万级数量单位。
http://db-engines.com/en/ranking
我是新建了一个文件夹Redis6,然后把压缩包传到里面去了
产生错误原因:
最后一步点击了关闭,而不是连接
改正:
cd redis6不要加"/" 路径不对
安装位置:
在放redis6的那个压缩包所在的文件夹中执行项下面命令
安装C 语言的编译环境
apt install gcc
ok,安装成功了
tar -zxvf
make install
前台启动redis后,就不能再在输入框中输入命令,否则就会将服务端停止。
前台启动,命令行窗口不能关闭,否则服务器停止===>窗口不能再做其他操作了
前台启动redis关闭:直接Ctrl+C关闭。
关窗口还可以继续使用
2.进入redis6压缩包的解压文件夹中
3.将redis6压缩包的解压文件夹中的redis.conf复制一份到/etc
查看一下:
4 .将
etc下
的文件redis.conf(128行)里面的daemonize no 改成 yes,让服务在后台启动
vi redis.conf
然后就是vim编辑器界面了,我说一下怎么操作:
5 redis启动
进bin目录下启动
:/usr/local/bin
redis-server /etc/redis.conf
6.查看
ps -ef | grep redis
端口号:637
7 通过客户端连接redis
bin目录下
redis-cli
有ip;有端口号==>连上了
8 测试验证:ping命令
9 redis关闭
如果是后台启动,即上面客户端连接成功后,首先输入
exit
退出redis客户端
直接叉掉也可以关闭
配合关系型数据库做高速缓存
多样的数据结构存储持久化数据
Redis是单线程+多路IO复用技术
多路复用是指使用一个线程来检查多个文件描述符(Socket)的就绪状态,比如调用select和poll函数,传入多个文件描述符,如果有一个文件描述符就绪,则返回,否则阻塞直到超时。得到就绪状态后进行真正的操作可以在同一个线程里执行,也可以启动线程执行(比如使用线程池)
串行 vs 多线程+锁(memcached) vs 单线程+多路IO复用(Redis)
(与Memcache三点不同: 支持多数据类型,支持持久化,单线程+多路IO复用)
获得redis常见数据类型操作命令http://www.redis.cn/commands.html
keys *
:查看当前库所有key (匹配:keys *1)
刚开始是空的,为了演示效果,先认为加几条
放了三个接下来演示:
exists key
:判断某个key是否存在
返回1表示存在
返回0表示不存在
type key
: 查看你的key是什么类型
del key
: 删除指定的key数据
unlink key
: 根据value选择非阻塞删除
仅将keys从keyspace元数据中删除,真正的删除会在后续异步操作。就是它告诉你删了,其实它没删,后续它再删掉
expire key time
time秒钟:为给定的key设置过期时间
ttl key
: 查看还有多少秒过期,-1表示永不过期,-2表示已过期
select命令切换数据库
在redis中默认有十几个数据库
select 15===>切换到15号数据库
dbsize查看当前数据库的key的数量
flushdb清空当前库
flushall通杀全部库
String是Redis最基本的类型,你可以理解成与Memcached一模一样的类型,一个key对应一个value。
String类型是二进制安全的
。意味着Redis的string可以包含任何数据。比如jpg图片或者序列化的对象。
只要能存成字符串,都能放到里面去
String类型是Redis最基本的数据类型,一个Redis中字符串value最多可以是512M
set 添加键值对
相同的key,后面的会覆盖前面的
*NX:当数据库中key不存在时,可以将key-value添加数据库
*XX:当数据库中key存在时,可以将key-value添加数据库,与NX参数互斥
*EX:key的超时秒数
*PX:key的超时毫秒数,与EX互斥
get 查询对应键值
append 将给定的 追加到原值的末尾
若key不存在,则会新建key
strlen 获得值的长度
setnx 只有在
key 不存在时
设置 key 的值
incr
将 key 中储存的数字值增1
只能对数字值
操作,如果为空,新增值为1
decr
将 key 中储存的数字值减1
只能对数字值
操作,如果为空,新增值为-1
incrby / decrby <步长>将 key 中储存的数字值增减。自定义步长。
原子性
所谓原子操作是指不会被线程调度机制打断的操作;
这种操作一旦开始,就一直运行到结束,中间不会有任何 context switch (切换到另一个线程)。
(1)在单线程中, 能够在单条指令中完成的操作都可以认为是"原子操作",因为中断只能发生于指令之间。
(2)在多线程中,不能被其它进程(线程)打断的操作就叫原子操作。 Redis单命令的原子性主要得益于Redis的单线程。
案例: java中的i++是否是原子操作?不是
i=0;两个线程分别对i进行++100次,值是多少?
mset …
同时设置一个或多个 key-value对
mget …
同时获取一个或多个 value
msetnx …
同时设置一个或多个 key-value 对,当且仅当所有给定 key 都不存在
。
如果有key之前存在了,那就不会成功
原子性,有一个失败则都失败
getrange <起始位置><结束位置>
获得值的范围,类似java中的substring,前包,后包
setrange <起始位置>
用 覆写
所储存的字符串值,从<起始位置>开始(索引从0开始)。
setex <过期时间>
设置键值的同时,设置过期时间,单位秒。
getset
以新换旧,设置了新值同时获得旧值。
String的数据结构为简单动态字符串(Simple Dynamic String,缩写SDS)。是可以修改的字符串,内部结构实现上类似于Java的ArrayList,采用预分配冗余空间的方式来减少内存的频繁分配.
如图中所示,内部为当前字符串实际分配的空间capacity一般要高于实际字符串长度len。当字符串长度小于1M时,扩容都是加倍现有的空间,如果超过1M,扩容时一次只会多扩1M的空间。需要注意的是字符串最大长度为512M
单键多值
Redis 列表是简单的字符串列表,按照插入顺序排序。你可以添加一个元素到列表的头部(左边)或者尾部(右边)。
它的底层实际是个双向链表
,对两端的操作性能很高,通过索引下标的操作中间的节点性能会较差。
lpush … 从左边插入一个或多个值。
放的时候是v1 v2 v3但是取的时候是相反的
从左边开始放,后放的会把先前放的往右挤
取得时候从左边开始取
rpush … 从右边插入一个或多个值。
放的时候是v1 v2 v3但是取的时候是相反的
从右边开始放,后放的会把先前放的往左挤
取得时候从左边开始取
lpop/rpop 从左边/右边吐出一个值。值在键在,值光键亡。
rpoplpush 从 列表右边吐出一个值,插到列表左边。
从k1右边取个值放到k2左边来
lrange
按照索引下标获得元素(从左到右)
lrange mylist 0 -1 0左边第一个,-1右边第一个,(0到-1表示获取所有)
lindex 按照索引下标获得元素(从左到右)
llen 获得列表长度
linsert before 在的后面插入插入值
lrem 从左边删除n个value(从左到右)
lset 将列表key下标为index的值替换成value
从零开始
List的数据结构为快速链表quickList。
首先在列表元素较少的情况下会使用一块连续的内存存储,这个结构是ziplist,也即是压缩列表。
它将所有的元素紧挨着一起存储,分配的是一块连续的内存。
当数据量比较多的时候才会改成quicklist。
因为普通的链表需要的附加指针空间太大,会比较浪费空间。比如这个列表里存的只是int类型的数据,结构上还需要两个额外的指针prev和next。
Redis将链表和ziplist结合起来组成了quicklist。也就是将多个ziplist使用双向指针串起来使用。这样既满足了快速的插入删除性能,又不会出现太大的空间冗余。
Redis set对外提供的功能与list类似是一个列表的功能,特殊之处在于set是可以**自动排重
**的,当你需要存储一个列表数据,又不希望出现重复数据时,set是一个很好的选择,并且set提供了判断某个成员是否在一个set集合内的重要接口,这个也是list所不能提供的。
Redis的Set是string类型的无序集合。它底层其实是一个value为null的hash表,所以添加,删除,查找的复杂度都是O(1)。
一个算法,随着数据的增加,执行时间的长短,如果是O(1),数据增加,查找数据的时间不变
sadd …
将一个或多个 member 元素加入到集合 key 中,已经存在的 member 元素将被忽略
smembers 取出该集合的所有值。
sismember 判断集合是否为含有该值,有1,没有0
scard返回该集合的元素个数。
srem
… 删除集合中的某个元素。
spop 随机从该集合中吐出一个值。
取出之后就删掉了
srandmember
随机
从该集合中取出n个值。不会从集合中删除 。
smove value把集合中一个值从一个集合移动到另一个集合
sinter 返回两个集合的交集元素。
sunion 返回两个集合的并集元素。
sdiff 返回两个集合的差集元素(key1中的,不包含key2中的)
Set数据结构是dict字典,字典是用哈希表实现的。
Java中HashSet的内部实现使用的是HashMap,只不过所有的value都指向同一个对象。Redis的set结构也是一样,它的内部也使用hash结构,所有的value都指向同一个内部值
Redis hash 是一个键值对集合。
Redis hash是一个string类型的field和value的映射表,hash特别适合用于存储对象
。
类似Java里面的Map
用户ID为查找的key,存储的value用户对象包含姓名,年龄,生日等信息,如果用普通的key/value结构来存储
主要有以下2种存储方式:
hset 给集合中的 键赋值
hget 从集合取出 value
hmset … 批量设置hash的值
hexists查看哈希表 key 中,给定域 field 是否存在。
hkeys 列出该hash集合的所有field
hvals 列出该hash集合的所有value
hincrby 为哈希表 key 中的域 field 的值加上增量 1 -1
hsetnx 将哈希表 key 中的域 field 的值设置为 value ,
当且仅当域 field 不存在
.
Hash类型对应的数据结构是两种:ziplist(压缩列表),hashtable(哈希表)。当field-value长度较短且个数较少时,使用ziplist,否则使用hashtable
Redis有序集合zset与普通集合set非常相似,是一个没有重复元素
的字符串集合。
不同之处是有序集合的每个成员都关联了一个**评分
(**score),这个评分(score)被用来按照从最低分到最高分的方式排序集合中的成员。集合的成员是唯一的,但是评分可以是重复了
。
因为元素是有序的, 所以你也可以很快的根据评分(score)或者次序(position)来获取一个范围的元素。
访问有序集合的中间元素也是非常快的,因此你能够使用有序集合作为一个没有重复成员的智能列表。
zadd …
将一个或多个 member 元素及其 score 值加入到有序集 key 当中。
zrange
[WITHSCORES] 从小到大
返回有序
集 key 中,下标在
带WITHSCORES,可以让分数一起和值返回到结果集。
zrangebyscore key minmax [withscores] [limit offset count]
返回有序集 key 中,所有 score 值介于 min 和 max 之间(包括等于 min 或 max )的成员。有序集成员按 score 值递增(从小到大)次序排列。
zrevrangebyscore key maxmin [withscores] [limit offset count]
同上,改为从大到小排列。
zincrby 为元素的score加上增量
zrem 删除该集合下,指定值的元素
zcount 统计该集合,分数区间内的元素个数
zrank 返回该值在集合中的排名,
从0开始
。
案例:如何利用zset实现一个文章访问量的排行榜?
SortedSet(zset)是Redis提供的一个非常特别的数据结构,一方面它等价于Java的数据结构Map
zset底层使用了两个数据结构
(1)hash,hash的作用就是关联元素value和权重score,保障元素value的唯一性,可以通过元素value找到相应的score值。
(2)跳跃表,跳跃表的目的在于给元素value排序,根据score的范围获取元素列表。
有序集合在生活中比较常见,例如根据成绩对学生排名,根据得分对玩家排名等。对于有序集合的底层实现,可以用数组、平衡树、链表等。数组不便元素的插入、删除;平衡树或红黑树虽然效率高但结构复杂;链表查询需要遍历所有效率低。Redis采用的是跳跃表。跳跃表效率堪比红黑树,实现远比红黑树简单。
对比有序链表和跳跃表,从链表中查询出51
有序链表
要查找值为51的元素,需要从第一个元素开始依次查找、比较才能找到。共需要6次比较
跳跃表
从第2层开始,1节点比51节点小,向后比较。
21节点比51节点小,继续向后比较,后面就是NULL了,所以从21节点向下到第1层
在第1层,41节点比51节点小,继续向后,61节点比51节点大,所以从41向下
在第0层,51节点为要查找的节点,节点被找到,共查找4次。
从此可以看出跳跃表比有序链表效率要高
配置大小单位,开头定义了一些基本的度量单位,只支持bytes,不支持bit
大小写不敏感
类似jsp中的include,多实例的情况可以把公用的配置文件提取出来
默认情况bind=127.0.0.1只能接受本机
的访问请求
不写的情况下,无限制接受任何ip地址的访问
生产环境肯定要写你应用服务器的地址;服务器是需要远程访问的,所以需要将其注释掉
如果开启了protected-mode,那么在没有设定bind ip且没有设密码的情况下,Redis只允许接受本机的响应
把76行注掉(前面加#)
保存配置,停止服务,重启启动查看进程,不再是本机访问了。
将本机访问保护模式设置no
将下面95行的yes改成no
端口号,默认 6379
设置tcp的backlog,backlog其实是一个连接队列,backlog队列总和=未完成三次握手队列 + 已经完成三次握手队列。
在高并发环境下你需要一个高backlog值来避免慢客户端连接问题。
注意Linux内核会将这个值减小到/proc/sys/net/core/somaxconn的值(128),所以需要确认增大/proc/sys/net/core/somaxconn和/proc/sys/net/ipv4/tcp_max_syn_backlog(128)两个值来达到想要的效果
一个空闲的客户端维持多少秒会关闭,0表示关闭该功能。即
永不关闭
。
对访问客户端的一种
心跳检测
,每隔n秒检测一次。
单位为秒,如果设置为0,则不会进行Keepalive检测,建议设置成60
是否为后台进程,设置为yes
守护进程,后台启动
将248行的no改成yes
,这样就可以后台启动了
存放pid文件的位置,每个实例会产生一个不同的pid文件
指定日志记录级别,Redis总共支持四个级别:debug、verbose、notice、warning,默认为notice
四个级别根据使用阶段来选择,生产环境选择notice 或者warning
日志文件名称
设定库的数量 默认16
,默认数据库为0,可以使用SELECT 命令在连接上指定数据库id
访问密码的查看、设置和取消
在命令中设置密码,只是临时的。重启redis服务器,密码就还原了。
永久设置,需要再配置文件中进行设置。
设置redis同时可以与多少个客户端进行连接。
默认情况下为10000个客户端。
如果达到了此限制,redis则会拒绝新的连接请求,并且向这些连接请求方发出“max number of clients reached”以作回应。
建议必须设置,否则,将内存占满,造成服务器宕机
设置redis可以使用的内存量。一旦到达内存使用上限,redis将会试图移除内部数据,移除规则可以通过axmemory-policy来指定。
如果redis无法根据移除规则来移除内存中的数据,或者设置了“不允许移除”,那么redis则会针对那些需要申请内存的指令返回错误信息,比如SET、LPUSH等。
但是对于无内存申请的指令,仍然会正常响应,比如GET等。如果你的redis是主redis(说明你的redis有从redis),那么在设置内存使用上限时,需要在系统中留出一些内存空间给同步队列缓存,只有在你设置的是“不移除”的情况下,才不用考虑这个因素。
volatile-lru:使用LRU算法移除key,只对设置了过期时间的键;(最近最少使用)
allkeys-lru:在所有集合key中,使用LRU算法移除key
volatile-random:在过期集合中移除随机的key,只对设置了过期时间的键
allkeys-random:在所有集合key中,移除随机的key
volatile-ttl:移除那些TTL值最小的key,即那些最近要过期的key
noeviction:不进行移除。针对写操作,只是返回错误信息
设置样本数量,LRU算法和最小TTL算法都并非是精确的算法,而是估算值,所以你可以设置样本的大小,redis默认会检查这么多个key并选择其中LRU的那个。
一般设置3到7的数字,数值越小样本越不准确,但性能消耗越小。
Redis 发布订阅 (pub/sub) 是一种消息通信模式:发送者 (pub) 发送消息,订阅者 (sub) 接收消息。
Redis 客户端可以订阅任意数量的频道
① 打开一个客户端订阅channel1
SUBSCRIBE channel1
② 打开另一个客户端,给channel1发布消息hello
publish channel1 hello
返回的1是订阅者数量
③ 打开第一个客户端可以看到发送的消息
注:发布的消息没有持久化,如果在订阅的客户端收不到hello,只能收到订阅后发布的消息
现代计算机用二进制(位) 作为信息的基础单位, 1个字节等于8位
, 例如“abc”字符串是由3个字节组成, 但实际在计算机存储时将其用二进制表示, “abc”分别对应的ASCII码分别是97、 98、 99, 对应的二进制分别是01100001、 01100010和01100011,如下图
合
理地使用操作位能够有效地提高内存使用率和开发效率。
Redis提供了Bitmaps这个“数据类型”可以实现对位的操作
:
(1) Bitmaps本身不是一种数据类型
, 实际上它就是字符串(key-value)
, 但是它可以对字符串的位进行操作
。
(2) Bitmaps单独提供了一套命令, 所以在Redis中使用Bitmaps和使用字符串的方法不太相同。 可以把Bitmaps想象成一个以位为单位的数组
, 数组的每个单元只能存储0和1
, 数组的下标在Bitmaps中叫做偏移量
。
1、setbit
(1)格式
setbit
设置Bitmaps中某个偏移量的值(0或1)
*offset:偏移量(下标)从0开始
(2)实例
每个独立用户是否访问过网站存放在Bitmaps中, 将访问的用户记做1, 没有访问的用户记做0, 用偏移量作为用户的id
。
设置键的第offset个位的值(从0算起) , 假设现在有20个用户,userid=1, 6, 11, 15, 19的用户对网站进行了访问, 那么当前Bitmaps初始化结果如图
unique:users:20201106代表2020-11-06这天的独立访问用户的Bitmaps
注:
很多应用的用户id以一个指定数字(例如10000) 开头, 直接将用户id和Bitmaps的偏移量对应势必会造成一定的浪费, 通常的做法是每次做setbit操作时将用户id减去这个指定数字
。
在第一次初始化Bitmaps时, 假如偏移量非常大, 那么整个初始化过程执行会比较慢, 可能会造成Redis的阻塞。
2、getbit
(1)格式
getbit
获取键的第offset位的值(从0开始算
)
(2)实例
获取id=8的用户是否在2020-11-06这天访问过, 返回0说明没有访问过:
注:因为100根本不存在,所以也是返回0
3、bitcount
统计字符串被设置为1的bit数
。一般情况下,给定的整个字符串都会被进行计数,通过指定额外的 start 或 end 参数,可以让计数只在特定的位上进行。start 和 end 参数的设置,都可以使用负数值:比如 -1 表示最后一个位,而 -2 表示倒数第二个位,start、end 是指bit组的字节的下标数,二者皆包含。
(1)格式
bitcount
统计字符串从start字节到end字节比特值为1的数量
(2)实例
id在第1个字节到第3个字节之间的独立访问用户数
, 对应的用户id是11, 15, 19。举例: K1 【01000001 01000000 00000000 00100001】,对应【0,1,2,3】
bitcount K1 1 2 : 统计下标1、2字节组中bit=1的个数,即01000000 00000000
–》bitcount K1 1 2 --》1
bitcount K1 1 3 : 统计下标1、2字节组中bit=1的个数,即01000000 00000000 00100001
–》bitcount K1 1 3 --》3
bitcount K1 0 -2 : 统计下标0到下标倒数第2,字节组中bit=1的个数,即01000001 01000000 00000000
–》bitcount K1 0 -2 --》3
注意:redis的setbit设置或清除的是bit位置,而bitcount计算的是byte位置。
4、bitop
(1)格式
bitop and(or/not/xor) [key…]
bitop and unique:users:and:20201104_03
unique:users:20201103unique:users:2020110
bitop是一个复合操作, 它可以做多个Bitmaps的and(交集) 、 or(并集) 、 not(非) 、 xor(异或) 操作并将结果保存在destkey中
。
(2)实例
2020-11-04 日访问网站的userid=1,2,5,9
setbit unique:users:20201104 1 1
setbit unique:users:20201104 2 1
setbit unique:users:20201104 5 1
setbit unique:users:20201104 9 1
2020-11-03 日访问网站的userid=0,1,4,9。
setbit unique:users:20201103 0 1
setbit unique:users:20201103 1 1
setbit unique:users:20201103 4 1
setbit unique:users:20201103 9 1
计算出两天都访问过网站的用户数量
bitop and unique:users:and:20201104_03
unique:users:20201103unique:users:20201104
计算出任意一天都访问过网站的用户数量(例如月活跃就是类似这种) , 可以使用or求并集
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-oO0Zl0E7-1666246435052)(C:\Users\郁孤台下清江水\AppData\Roaming\Typora\typora-user-images\image-20220326221012183.png)]
在工作当中,我们经常会遇到与统计相关的功能需求
,比如统计网站PV(PageView页面访问量),可以使用Redis的incr、incrby轻松实现。
但像UV(UniqueVisitor,独立访客)、独立IP数、搜索记录数等需要去重和计数的问题
如何解决?这种求集合中不重复元素个数的问题称为基数问题
。
解决基数问题有很多种方案:
(1)数据存储在MySQL表中,使用distinct count
计算不重复个数
(2)使用Redis提供的hash、set、bitmaps
等数据结构来处理
以上的方案结果精确,但随着数据不断增加,导致占用空间越来越大,对于非常大 的数据集是不切实际的
。
能否能够降低一定的精度来平衡存储空间
?Redis推出了HyperLogLog
Redis HyperLogLog 是用来做基数统计的算法
,HyperLogLog 的优点是,在输入元素的数量或者体积非常非常大时,计算基数所需的空间总是固定的、并且是很小的
。
在 Redis 里面,每个 HyperLogLog 键只需要花费 12 KB 内存,就可以计算接近 2^64 个不同元素的基数。这和计算基数时,元素越多耗费内存就越多的集合形成鲜明对比。
但是,因为 HyperLogLog 只会根据输入元素来计算基数,而不会储存输入元素本身,所以 HyperLogLog 不能像集合那样,返回输入的各个元素。
什么是基数?
比如数据集 {1, 3, 5, 7, 5, 7, 8}, 那么这个数据集的基数集为 {1, 3, 5 ,7, 8}, 基数(不重复元素)为5。 基数估计就是在误差可接受的范围内,快速计算基数
。
1、pfadd
自动去重
2、pfcount
3、pfmerge
Redis 3.2 中增加了对GEO类型的支持。GEO,Geographic,地理信息的缩写
。该类型,就是元素的2维坐标,在地图上就是经纬度
。redis基于该类型,提供了经纬度设置,查询,范围查询,距离查询,经纬度Hash等常见操作
。
1、geoadd
2、geopos
3、geodist
4、georadius
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-RQLcRpbb-1666246435059)(C:\Users\郁孤台下清江水\AppData\Roaming\Typora\typora-user-images\image-20220326221731085.png)]
jedis是redis的一个客户端工具,就是通过java来操作redis
jdbc==>通过java来操作数据库
jedis==>通过java来操作redis,是一个客户端工具,支持java语言对redis进行各种操作
===>之前演示的各种命令,都可以通过jedis来完成
没有前端页面===>quickstar就行
<dependency>
<groupId>redis.clientsgroupId>
<artifactId>jedisartifactId>
<version>3.2.0version>
dependency>
public class JedisDemo1 {
public static void main(String[] args) {
//创建Jedis对象
Jedis jedis = new Jedis("192.168.88.3", 6379);
//测试
String value = jedis.ping();
System.out.println(value);
}
}
- 在redis.conf中将bind注掉,否则只能本机访问
- 在redis.conf中将本机访问保护模式
protected-mode
设置为no
- 连接超时错误
这是因为没有关掉虚拟机linux里面的防火墙
Ubuntu 平台的防火墙关闭命令
sudo ufw status
sudo ufw disable
sudo ufw enable
1)key不要太长,最好不要操作1024个字节,这不仅会消耗内存还会降低查找 效率
2)key不要太短,如果太短会降低key的可读性
3)在项目中,key最好有一个统一的命名规范(根据企业的需求)
1)字符串型 string
2)字符串列表 lists
3)字符串集合sets
4)有序字符串集合 sorted sets
5)哈希类型 hashs
jedis 就是 java redis 的简写
Jedis jedis = new Jedis(String ip , String port);
1 .jedis中对键通用的操作
方法 | 描述 | 返回值 /补充说明 |
---|---|---|
jedis.flush | ||
jedis.flushDB | 清空数据 | |
boolean jedis.exists(String key) | 判断某个键是否存在 | true = 存在,false= 不存在 |
jedis.set(String key,String value) | 新增键值对(key,value) | 返回String类型的OK代表成功 |
Set | 获取所有key | 返回set 无序集合 |
jedis.del(String key) | 删除键为key的数据项 | |
jedis.expire(String key,int i) | 设置键为key的过期时间为i秒 | |
int jedis.ttl(String key) | 获取建委key数据项的剩余时间(秒) | |
jedis.persist(String key) | 移除键为key属性项的生存时间限制 | |
jedis.type(String key) | 查看键为key所对应value的数据类型 |
2 .jedis中 字符串的操作
字符串类型是Redis中最为基础的数据存储类型,它在Redis中是二进制安全的,这 便意味着该类型可以接受任何格式的数据,如JPEG图像数据或Json对象描述信息等。 在Redis中字符串类型的Value最多可以容纳的数据长度是512M。
语法 | 描述 |
---|---|
jedis.set(String key,String value) | 增加(或覆盖)数据项 |
jedis.setnx(String key,String value) | 不覆盖增加数据项(重复的不插入) |
jedis.setex(String ,int t,String value) | 增加数据项并设置有效时间 |
jedis.del(String key) | 删除键为key的数据项 |
jedis.get(String key) | 获取键为key对应的value |
jedis.append(String key, String s) | 在key对应value 后边扩展字符串 s |
jedis.mset(String k1,String V1,String K2,String V2,…) | 增加多个键值对 |
String[] jedis.mget(String K1,String K2,…) | 获取多个key对应的value |
jedis.del(new String[](String K1,String K2,… )) | 删除多个key对应的数据项 |
String jedis.getSet(String key,String value) | 获取key对应value并更新value |
String jedis.getrang(String key , int i, int j) | 获取key对应value第i到j字符 ,从0开始,包头包尾 |
3 . jedis中对整数和浮点数操作
语法 | 描述 |
---|---|
jedis.incr(String key) | 将key对应的value 加1 |
jedis.incrBy(String key,int n) | 将key对应的value 加 n |
jedis.decr(String key) | 将key对应的value 减1 |
jedis.decrBy(String key , int n) | 将key对应的value 减 n |
4 .jedis中对列表(list)操作
在Redis中,List类型是按照插入顺序排序的字符串链表。和数据结构中的普通链表 一样,我们可以在其头部(left)和尾部(right)添加新的元素。在插入时,如果该键并不存在,Redis将为该键创建一个新的链表。如果链表中所有的元素均被移除,那么该键也将会被从数据库中删除。List中可以包含的最大元素数量是 4294967295。
从元素插入和删除的效率视角来看,如果我们是在链表的两头插入或删除元素,这将 会是非常高效的操作,即使链表中已经存储了百万条记录,该操作也可以在常量时间内完成。然而需要说明的是,如果元素插入或删除操作是作用于链表中间,那将会是非常低效的。
list 元素的下表从0开始
语法 | 描述 |
---|---|
jedis.lpush(String key, String v1, String v2,…) | 添加一个List , 注意:如果已经有该List对应的key, 则按顺序在左边追加 一个或多个 |
jedis.rpush(String key , String vn) | key对应list右边插入元素 |
jedis.lrange(String key,int i,int j) | 获取key对应list区间[i,j]的元素,注:从左边0开始,包头包尾 |
jedis.lrem(String key,int n , String val) | 删除list中 n个元素val |
jedis.ltrim(String key,int i,int j) | 删除list区间[i,j] 之外的元素 |
jedis.lpop(String key) | key对应list ,左弹出栈一个元素 |
jedis.rpop(String key) | key对应list ,右弹出栈一个元素 |
jedis.lset(String key,int index,String val) | 修改key对应的list指定下标index的元素 |
jedis.llen(String key) | 获取key对应list的长度 |
jedis.lindex(String key,int index) | 获取key对应list下标为index的元素 |
jedis.sort(String key) | 把key对应list里边的元素从小到大排序 (后边详细介绍) |
5 .jedis 集合set 操作
在Redis中,我们可以将Set类型看作为没有排序的字符集合,和List类型一样,也可以在该类型的数据值上执行添加、删除或判断某一元素是否存在等操作。需要 说明的是,这些操作的时间是常量时间。Set可包含的最大元素数是4294967295。
和List类型不同的是,Set集合中不允许出现重复的元素。和List类型相比,Set类型在功能上还存在着一个非常重要的特性,即在服务器端完成多个Sets之间的聚合计 算操作,如unions、intersections和differences(就是交集并集那些了)。由于这些操作均在服务端完成, 因此效率极高,而且也节省了大量的网络IO开销
set 的方法都以s开头
语法 | 操作 |
---|---|
jedis.sadd(String key,String v1,String v2,…) | 添加一个set |
jedis.smenbers(String key) | 获取key对应set的所有元素 |
jedis.srem(String key,String val) | 删除集合key中值为val的元素 |
jedis.srem(String key, Sting v1, String v2,…) | 删除值为v1, v2 , …的元素 |
jedis.spop(String key) | 随机弹出栈set里的一个元素 |
jedis.scared(String key) | 获取set元素个数 |
jedis.smove(String key1, String key2, String val) | 将元素val从集合key1中移到key2中 |
jedis.sinter(String key1, String key2) | 获取集合key1和集合key2的交集 |
jedis.sunion(String key1, String key2) | 获取集合key1和集合key2的并集 |
jedis.sdiff(String key1, String key2) | 获取集合key1和集合key2的差集 |
- jedis中 有序集合Zsort
Sorted-Sets和Sets类型极为相似,它们都是字符串的集合,都不允许重复的成员出现在一个Set中。它们之间的主要差别是Sorted-Sets中的每一个成员都会有一个分数(score)与之关联,Redis正是通过分数来为集合中的成员进行从小到大的排序。然 而需要额外指出的是,尽管Sorted-Sets中的成员必须是唯一的,但是分数(score) 却是可以重复的。
在Sorted-Set中添加、删除或更新一个成员都是非常快速的操作,其时间复杂度为集合中成员数量的对数。由于Sorted-Sets中的成员在集合中的位置是有序的,因此,即便是访问位于集合中部的成员也仍然是非常高效的。事实上,Redis所具有的这一特征在很多其它类型的数据库中是很难实现的,换句话说,在该点上要想达到和Redis同样的高效,在其它数据库中进行建模是非常困难的。
例如:游戏排名、微博热点话题等使用场景。
语法 | 描述 |
---|---|
jedis.zadd(String key,Map map) | 添加一个ZSet |
jedis.hset(String key,int score , int val) | 往 ZSet插入一个元素(Score-Val) |
jedis.zrange(String key, int i , int j) | 获取ZSet 里下表[i,j] 区间元素Val |
jedis. zrangeWithScore(String key,int i , int j) | 获取ZSet 里下表[i,j] 区间元素Score - Val |
jedis.zrangeByScore(String , int i , int j) | 获取ZSet里score[i,j]分数区间的元素(Score-Val) |
jeids.zscore(String key,String value) | 获取ZSet里value元素的Score |
jedis.zrank(String key,String value) | 获取ZSet里value元素的score的排名 |
jedis.zrem(String key,String value) | 删除ZSet里的value元素 |
jedis.zcard(String key) | 获取ZSet的元素个数 |
jedis.zcount(String key , int i ,int j) | 获取ZSet总score在[i,j]区间的元素个数 |
jedis.zincrby(String key,int n , String value) | 把ZSet中value元素的score+=n |
7 .jedis中 哈希(Hash)操作
Redis中的Hashes类型可以看成具有String Key和String Value的map容器。所以该类型非常适合于存储值对象的信息。如Username、Password和Age等。如果Hash中包含很少的字段,那么该类型的数据也将仅占用很少的磁盘空间。每一个Hash可以存储4294967295个键值对。
规律: 哈希的 方法 都以 h 开头,含有m字符的一般是多个的, (multiple: 多个的)
语法 | 描述 |
---|---|
jedis.hmset(String key,Map map) | 添加一个Hash |
jedis.hset(String key , String key, String value) | 向Hash中插入一个元素(K-V) |
jedis.hgetAll(String key) | 获取Hash的所有(K-V) 元素 |
jedis.hkeys(String key) | 获取Hash所有元素的key |
jedis.hvals(String key) | 获取Hash所有元素 的value |
jedis.hincrBy(String key , String k, int i) | 把Hash中对应的k元素的值 val+=i |
jedis.hdecrBy(String key,String k, int i) | 把Hash中对应的k元素的值 val-=i |
jedis.hdel(String key , String k1, String k2,…) | 从Hash中删除一个或多个元素 |
jedis.hlen(String key) | 获取Hash中元素的个数 |
jedis.hexists(String key,String K1) | 判断Hash中是否存在K1对应的元素 |
jedis.hmget(String key,String K1,String K2) | 获取Hash中一个或多个元素value |
8 .排序操作
使用排序, 首先需要生成一个排序对象
SortingParams sortingParams = new SortingParams();
语法 | 描述 |
---|---|
jedis.sort(String key,sortingParams.alpha()) | 队列按首字母a-z 排序 |
jedis.sort(String key, sortingParams.asc() ) | 队列按数字升序排列 |
jedis.sort(String key , sortingParams.desc()) | 队列按数字降序排列 |
使用示例:
Jedis jedis = JedisPoolUtils.getJedis();
SortingParams sortingParams = new SortingParams();
List<String> sort = jedis.sort("list02", sortingParams.desc());
这里排序指的是返回的sort是有序的,而之前的list02 依然是以前的顺序。
jedis.set("k1", "v1");
jedis.set("k2", "v2");
jedis.set("k3", "v3");
Set<String> keys = jedis.keys("*");
System.out.println(keys.size());
for (String key : keys) {
System.out.println(key);
}
System.out.println(jedis.exists("k1"));
System.out.println(jedis.ttl("k1"));
System.out.println(jedis.get("k1"));
jedis.mset("str1","v1","str2","v2","str3","v3");
System.out.println(jedis.mget("str1","str2","str3"));
List<String> list = jedis.lrange("mylist",0,-1);
for (String element : list) {
System.out.println(element);
}
jedis.sadd("orders", "order01");
jedis.sadd("orders", "order02");
jedis.sadd("orders", "order03");
jedis.sadd("orders", "order04");
Set<String> smembers = jedis.smembers("orders");
for (String order : smembers) {
System.out.println(order);
}
jedis.srem("orders", "order02");
jedis.hset("hash1","userName","lisi");
System.out.println(jedis.hget("hash1","userName"));
Map<String,String> map = new HashMap<String,String>();
map.put("telphone","13810169999");
map.put("address","atguigu");
map.put("email","abc@163.com");
jedis.hmset("hash2",map);
List<String> result = jedis.hmget("hash2", "telphone","email");
for (String element : result) {
System.out.println(element);
}
jedis.zadd("zset01", 100d, "z3");
jedis.zadd("zset01", 90d, "l4");
jedis.zadd("zset01", 80d, "w5");
jedis.zadd("zset01", 70d, "z6");
Set<String> zrange = jedis.zrange("zset01", 0, -1);
for (String e : zrange) {
System.out.println(e);
}
分析:
代码:
import redis.clients.jedis.Jedis;
import java.util.Random;
public class PhoneCode {
public static void main(String[] args) {
//模拟验证码发送
verifyCode("137000000");
//校验发送的验证码
//getRedisCode("137000000","验证码");
}
//1.随即生成一个六位数的验证码
public static String getCode(){
Random random = new Random();
String code = "";
for (int i = 0; i < 6; i++) {
int rand = random.nextInt(10);
code += rand;
}
return code;
}
//2.每个手机每天只能发送三次,验证码放到redis中,设置过期时间
public static void verifyCode(String phone){
//连接redis
Jedis jedis = new Jedis("192.168.50.128", 6379);
//输入redis连接密码
jedis.auth("root");
//拼接key
//手机发送次数key
String countKey = "VerifyCode" + phone + ":count";
//验证码key
String codeKey = "VerifyCode" + phone + ":code";
//每个手机每天只能发送3次
String count = jedis.get(countKey);
if (count == null){
//没有发送次数,第一次发送
//设置发送次数是1
jedis.setex(countKey,24 * 60 * 60,"1");
}else if (Integer.parseInt(count) <= 2){
//发送次数+1
jedis.incr(countKey);
}else if (Integer.parseInt(count) > 2){
//发送三次了,不能再发送了
System.out.println("今天的发送次数已经超过3次了");
//关闭连接
jedis.close();
}
//发送的验证码要放到redis中,设置过期时间为120s
String vcode = getCode();
jedis.setex(codeKey,120,vcode);
jedis.close();
}
//3.验证码校验
public static void getRedisCode(String phone,String code){
//从redis中获取验证码
Jedis jedis = new Jedis("192.168.50.128", 6379);
jedis.auth("root");
//验证码key
String codeKey = "VerifyCode" + phone + ":code";
String redisCode = jedis.get(codeKey);
//判断
if (redisCode.equals(code)){
System.out.println("成功");
}else {
System.out.println("失败");
}
jedis.close();
}
}
Spring Boot Data(数据) redis提供了RedisTemplate和StringRedisTemplate
其中StringRedisTemplate是RedisTemplate的子类,两个方法基本一致,不同之处主要体现在操作的数据类型不同:
- RedisTemplate中的两个泛型都是Object,意味着存储的key和value都可以是一个对象
- StringRedisTemplate的两个泛型都是String,意味着StringRedisTemplate的key和value都只能是字符串
RedisTemplate
:类似于一个redis的模板,封装操作redis里面各种 各样的类型,如果是关系型数据库,基本上就叫jdbcT emplate等等
redis是一个key value==>value类型有五种:String,list,set,zset,hash
>日后操作的时候,通过java操作时:
key是字符串>value也是字符串
key是list===>value也是放的一个String数组
==>java里面默认放的都是字符串
如果日后想让redis做更复杂处理==>把一个对象放到redis中===>对象序列化
RedisTemplate的key和value都允许是对象
==>默认自动序列化和自动反序列化
==>放入的对象必须实现对象序列化接口:Serializable
可能有一种需求:日后key和value放的都是字符串,不放对象==>StringRedisTemplate
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-data-redisartifactId>
dependency>
<dependency>
<groupId>org.apache.commonsgroupId>
<artifactId>commons-pool2artifactId>
<version>2.6.0version>
dependency>
springboot 2.0以后redis的客户端已经默认不使用jedis了,而是Lettuce
而且spring-boot-starter-data-redis依赖里面已经带了Lettuce,不用引入了
#Redis服务器地址
spring.redis.host=192.168.88.3
#Redis服务器连接端口
spring.redis.port=6379
#Redis数据库索引(默认为0)
spring.redis.database= 0
#连接超时时间(毫秒)
spring.redis.timeout=1800000
#连接池最大连接数(使用负值表示没有限制)
spring.redis.lettuce.pool.max-active=20
#最大阻塞等待时间(负数表示没限制)
spring.redis.lettuce.pool.max-wait=-1
#连接池中的最大空闲连接
spring.redis.lettuce.pool.max-idle=5
#连接池中的最小空闲连接
spring.redis.lettuce.pool.min-idle=0
配置redisTemplate
默认情况下的模板只能支持 RedisTemplate
==>只能存入字符串 很多时候,我们需要自定义 RedisTemplate ,设置序列化器
根据springboot知识,得出这个配置类创建了两个对象:
RedisTemplate
CacheManager cacheManager
@EnableCaching //处理缓存
@Configuration
public class RedisConfig extends CachingConfigurerSupport {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
RedisSerializer<String> redisSerializer = new StringRedisSerializer();
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
template.setConnectionFactory(factory);
//key序列化方式
template.setKeySerializer(redisSerializer);
//value序列化
template.setValueSerializer(jackson2JsonRedisSerializer);
//value hashmap序列化
template.setHashValueSerializer(jackson2JsonRedisSerializer);
return template;
}
@Bean
public CacheManager cacheManager(RedisConnectionFactory factory) {
RedisSerializer<String> redisSerializer = new StringRedisSerializer();
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
//解决查询缓存转换异常的问题
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
// 配置序列化(解决乱码的问题),过期时间600秒
RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
.entryTtl(Duration.ofSeconds(600))
.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(redisSerializer))
.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(jackson2JsonRedisSerializer))
.disableCachingNullValues();
RedisCacheManager cacheManager = RedisCacheManager.builder(factory)
.cacheDefaults(config)
.build();
return cacheManager;
}
}
什么是ObjectMapper
ObjectMapper怎么使用
由于Spring Boot的自动配置JacksonAutoConfiguration中有依赖引入和配置,所以不需要我们额外配置
Jackson是spring-boot-starter-json的一部分,spring-boot-starter-web中包含spring-boot-starter-json。也就是说,当项目中引入spring-boot-starter-web后会自动引入spring-boot-starter-json
RedisTestController中添加测试方法
@RestController
@RequestMapping("/redisTest")
public class RedisTestController {
@Autowired
private RedisTemplate redisTemplate;
@GetMapping
public String testRedis() {
//设置值到redis
redisTemplate.opsForValue().set("name","lucy");
//从redis获取值
String name = (String)redisTemplate.opsForValue().get("name");
return name;
}
}
首先使用@Autowired注入RedisTemplate(后面直接使用,就不特殊说明)
@Autowired
private RedisTemplate redisTemplate;
1、删除单个key
// 删除key
public void delete(String key){
redisTemplate.delete(key);
}
2、删除多个key
// 删除多个key
public void deleteKey (String ...keys){
redisTemplate.delete(keys);
}
3、指定key的失效时间
// 指定key的失效时间
public void expire(String key,long time){
redisTemplate.expire(key,time,TimeUnit.MINUTES);
}
4、根据key获取过期时间
// 根据key获取过期时间
public long getExpire(String key){
Long expire = redisTemplate.getExpire(key);
return expire;
}
5、判断key是否存在
// 判断key是否存在
public boolean hasKey(String key){
return redisTemplate.hasKey(key);
}
1 .添加缓存(2/3是1的递进值)
//1、通过redisTemplate设置值
redisTemplate.boundValueOps("StringKey").set("StringValue");
redisTemplate.boundValueOps("StringKey").set("StringValue",1, TimeUnit.MINUTES);
//2、通过BoundValueOperations设置值
BoundValueOperations stringKey = redisTemplate.boundValueOps("StringKey");
stringKey.set("StringVaule");
stringKey.set("StringValue",1, TimeUnit.MINUTES);
//3、通过ValueOperations设置值
ValueOperations ops = redisTemplate.opsForValue();
ops.set("StringKey", "StringVaule");
ops.set("StringValue","StringVaule",1, TimeUnit.MINUTES);
2 .设置过期时间(单独设置)
redisTemplate.boundValueOps("StringKey").expire(1,TimeUnit.MINUTES);
redisTemplate.expire("StringKey",1,TimeUnit.MINUTES);
3 .获取缓存值(2/3是1的递进值)
//1、通过redisTemplate设置值
String str1 = (String) redisTemplate.boundValueOps("StringKey").get();
//2、通过BoundValueOperations获取值
BoundValueOperations stringKey = redisTemplate.boundValueOps("StringKey");
String str2 = (String) stringKey.get();
//3、通过ValueOperations获取值
ValueOperations ops = redisTemplate.opsForValue();
String str3 = (String) ops.get("StringKey");
4 .删除key
Boolean result = redisTemplate.delete("StringKey");
5 .顺序递增
redisTemplate.boundValueOps("StringKey").increment(3L);
6 .顺序递减
redisTemplate.boundValueOps("StringKey").increment(-3L);
1 .添加缓存(2/3是1的递进值)
//1、通过redisTemplate设置值
redisTemplate.boundHashOps("HashKey").put("SmallKey", "HashVaue");
//2、通过BoundValueOperations设置值
BoundHashOperations hashKey = redisTemplate.boundHashOps("HashKey");
hashKey.put("SmallKey", "HashVaue");
//3、通过ValueOperations设置值
HashOperations hashOps = redisTemplate.opsForHash();
hashOps.put("HashKey", "SmallKey", "HashVaue");
2 .设置过期时间(单独设置)
redisTemplate.boundValueOps("HashKey").expire(1,TimeUnit.MINUTES);
redisTemplate.expire("HashKey",1,TimeUnit.MINUTES);
3 .添加一个Map集合
HashMap<String, String> hashMap = new HashMap<>();
redisTemplate.boundHashOps("HashKey").putAll(hashMap );
4 .设置过期时间(单独设置)
redisTemplate.boundValueOps("HashKey").expire(1,TimeUnit.MINUTES);
redisTemplate.expire("HashKey",1,TimeUnit.MINUTES);
5 .提取所有的小key
//1、通过redisTemplate获取值
Set keys1 = redisTemplate.boundHashOps("HashKey").keys();
//2、通过BoundValueOperations获取值
BoundHashOperations hashKey = redisTemplate.boundHashOps("HashKey");
Set keys2 = hashKey.keys();
//3、通过ValueOperations获取值
HashOperations hashOps = redisTemplate.opsForHash();
Set keys3 = hashOps.keys("HashKey");
6 .提取所有的value值
//1、通过redisTemplate获取值
List values1 = redisTemplate.boundHashOps("HashKey").values();
//2、通过BoundValueOperations获取值
BoundHashOperations hashKey = redisTemplate.boundHashOps("HashKey");
List values2 = hashKey.values();
//3、通过ValueOperations获取值
HashOperations hashOps = redisTemplate.opsForHash();
List values3 = hashOps.values("HashKey");
7 .根据key提取value值
//1、通过redisTemplate获取
String value1 = (String) redisTemplate.boundHashOps("HashKey").get("SmallKey");
//2、通过BoundValueOperations获取值
BoundHashOperations hashKey = redisTemplate.boundHashOps("HashKey");
String value2 = (String) hashKey.get("SmallKey");
//3、通过ValueOperations获取值
HashOperations hashOps = redisTemplate.opsForHash();
String value3 = (String) hashOps.get("HashKey", "SmallKey");
8 .获取所有的键值对集合
//1、通过redisTemplate获取
Map entries = redisTemplate.boundHashOps("HashKey").entries();
//2、通过BoundValueOperations获取值
BoundHashOperations hashKey = redisTemplate.boundHashOps("HashKey");
Map entries1 = hashKey.entries();
//3、通过ValueOperations获取值
HashOperations hashOps = redisTemplate.opsForHash();
Map entries2 = hashOps.entries("HashKey");
9 .删除
//删除小key
redisTemplate.boundHashOps("HashKey").delete("SmallKey");
//删除大key
redisTemplate.delete("HashKey");
10 .判断Hash中是否含有该值
Boolean isEmpty = redisTemplate.boundHashOps("HashKey").hasKey("SmallKey");
1 .添加Set缓存(值可以是一个,也可是多个)(2/3是1的递进值)
//1、通过redisTemplate设置值
redisTemplate.boundSetOps("setKey").add("setValue1", "setValue2", "setValue3");
//2、通过BoundValueOperations设置值
BoundSetOperations setKey = redisTemplate.boundSetOps("setKey");
setKey.add("setValue1", "setValue2", "setValue3");
//3、通过ValueOperations设置值
SetOperations setOps = redisTemplate.opsForSet();
setOps.add("setKey", "SetValue1", "setValue2", "setValue3");
2 .设置过期时间(单独设置)
redisTemplate.boundValueOps("setKey").expire(1,TimeUnit.MINUTES);
redisTemplate.expire("setKey",1,TimeUnit.MINUTES);
12
3 .根据key获取Set中的所有值
//1、通过redisTemplate获取值
Set set1 = redisTemplate.boundSetOps("setKey").members();
//2、通过BoundValueOperations获取值
BoundSetOperations setKey = redisTemplate.boundSetOps("setKey");
Set set2 = setKey.members();
//3、通过ValueOperations获取值
SetOperations setOps = redisTemplate.opsForSet();
Set set3 = setOps.members("setKey");
4 .根据value从一个set中查询,是否存在
Boolean isEmpty = redisTemplate.boundSetOps("setKey").isMember("setValue2");
5 .获取Set缓存的长度
Long size = redisTemplate.boundSetOps("setKey").size();
6 .移除指定的元素
Long result1 = redisTemplate.boundSetOps("setKey").remove("setValue1");
7 .移除指定的key
Boolean result2 = redisTemplate.delete("setKey");
添加缓存(2/3是1的递进值)
//1、通过redisTemplate设置值
redisTemplate.boundListOps("listKey").leftPush("listLeftValue1");
redisTemplate.boundListOps("listKey").rightPush("listRightValue2");
//2、通过BoundValueOperations设置值
BoundListOperations listKey = redisTemplate.boundListOps("listKey");
listKey.leftPush("listLeftValue3");
listKey.rightPush("listRightValue4");
//3、通过ValueOperations设置值
ListOperations opsList = redisTemplate.opsForList();
opsList.leftPush("listKey", "listLeftValue5");
opsList.rightPush("listKey", "listRightValue6");
将List放入缓存
ArrayList<String> list = new ArrayList<>();
redisTemplate.boundListOps("listKey").rightPushAll(list);
redisTemplate.boundListOps("listKey").leftPushAll(list);
设置过期时间(单独设置)
redisTemplate.boundValueOps("listKey").expire(1,TimeUnit.MINUTES);
redisTemplate.expire("listKey",1,TimeUnit.MINUTES);
获取List缓存全部内容(起始索引,结束索引)
List listKey1 = redisTemplate.boundListOps("listKey").range(0, 10);
从左或从右弹出一个元素
String listKey2 = (String) redisTemplate.boundListOps("listKey").leftPop(); //从左侧弹出一个元素
String listKey3 = (String) redisTemplate.boundListOps("listKey").rightPop(); //从右侧弹出一个元素
根据索引查询元素
String listKey4 = (String) redisTemplate.boundListOps("listKey").index(1);
获取List缓存的长度
Long size = redisTemplate.boundListOps("listKey").size();
根据索引修改List中的某条数据(key,索引,值)
redisTemplate.boundListOps("listKey").set(3L,"listLeftValue3");
移除N个值为value(key,移除个数,值)
redisTemplate.boundListOps("listKey").remove(3L,"value");
向集合中插入元素,并设置分数
//1、通过redisTemplate设置值
redisTemplate.boundZSetOps("zSetKey").add("zSetVaule", 100D);
//2、通过BoundValueOperations设置值
BoundZSetOperations zSetKey = redisTemplate.boundZSetOps("zSetKey");
zSetKey.add("zSetVaule", 100D);
//3、通过ValueOperations设置值
ZSetOperations zSetOps = redisTemplate.opsForZSet();
zSetOps.add("zSetKey", "zSetVaule", 100D);
向集合中插入多个元素,并设置分数
DefaultTypedTuple<String> p1 = new DefaultTypedTuple<>("zSetVaule1", 2.1D);
DefaultTypedTuple<String> p2 = new DefaultTypedTuple<>("zSetVaule2", 3.3D);
redisTemplate.boundZSetOps("zSetKey").add(new HashSet<>(Arrays.asList(p1,p2)));
按照排名先后(从小到大)打印指定区间内的元素, -1为打印全部
Set<String> range = redisTemplate.boundZSetOps("zSetKey").range(0, -1);
获得指定元素的分数
Double score = redisTemplate.boundZSetOps("zSetKey").score("zSetVaule");
返回集合内的成员个数
Long size = redisTemplate.boundZSetOps("zSetKey").size();
返回集合内指定分数范围的成员个数(Double类型)
Long COUNT = redisTemplate.boundZSetOps("zSetKey").count(0D, 2.2D);
返回集合内元素在指定分数范围内的排名(从小到大)
Set byScore = redisTemplate.boundZSetOps("zSetKey").rangeByScore(0D, 2.2D);
带偏移量和个数,(key,起始分数,最大分数,偏移量,个数)
Set<String> ranking2 = redisTemplate.opsForZSet().rangeByScore("zSetKey", 0D, 2.2D 1, 3);
返回集合内元素的排名,以及分数(从小到大)
Set<TypedTuple<String>> tuples = redisTemplate.boundZSetOps("zSetKey").rangeWithScores(0L, 3L);
for (TypedTuple<String> tuple : tuples) {
System.out.println(tuple.getValue() + " : " + tuple.getScore());
}ss
返回指定成员的排名
//从小到大
Long startRank = redisTemplate.boundZSetOps("zSetKey").rank("zSetVaule");
//从大到小
Long endRank = redisTemplate.boundZSetOps("zSetKey").reverseRank("zSetVaule");
从集合中删除指定元素
redisTemplate.boundZSetOps("zSetKey").remove("zSetVaule");
删除指定索引范围的元素(Long类型)
redisTemplate.boundZSetOps("zSetKey").removeRange(0L,3L);
删除指定分数范围内的元素(Double类型)
redisTemplate.boundZSetOps("zSetKey").removeRangeByScorssse(0D,2.2D);
为指定元素加分(Double类型)
Double score = redisTemplate.boundZSetOps("zSetKey").incrementScore("zSetVaule",1.1D);
Redis事务是一个单独的隔离操作:事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断
。
Redis事务的主要作用就是串联多个命令
,防止别的命令插队
。
从输入Multi命令开始,输入的命令都会依次进入命令队列中,但不会执行,直到输入Exec后,Redis会将之前的命令队列中的命令依次执行
。
组队的过程中可以通过discard来放弃组队
。
案例:
组队中某个命令出现了报告错误,执行时整个的所有队列都会被取消。
如果执行阶段某个命令报出了错误,则只有报错的命令不会被执行,而其他的命令都会执行,不会回滚。
想想一个场景:有很多人有你的账户,同时去参加双十一抢购
悲观锁(Pessimistic Lock), 顾名思义,就是很悲观,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁
,这样别人想拿这个数据就会block直到它拿到锁。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁。
乐观锁(Optimistic Lock), 顾名思义,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁
,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号等机制。乐观锁适用于多读的应用类型,这样可以提高吞吐量。Redis就是利用这种check-and-set机制实现事务的。
在执行multi之前,先执行watch key1 [key2]
,可以监视一个(或多个) key
,如果在事务执行之前这个(或这些) key 被其他命令所改动,那么事务将被打断。
取消 WATCH 命令对所有 key 的监视。
如果在执行 WATCH 命令之后,EXEC 命令或DISCARD 命令先被执行了的话,那么就不需要再执行UNWATCH 了。
http://doc.redisfans.com/transaction/exec.html
单独的隔离操作
没有隔离级别的概念
队列中的命令没有提交之前都不会实际被执行,因为事务提交前任何指令都不会被实际执行
不保证原子性
事务中如果有一条命令执行失败,其后的命令仍然会被执行,没有回滚
使用工具ab模拟测试
CentOS6 默认安装
CentOS7需要手动安装
yum install httpd-tools
(1) 进入cd /run/media/root/CentOS 7 x86_64/Packages(路径跟centos6不同)
(2) 顺序安装
apr-1.4.8-3.el7.x86_64.rpm
apr-util-1.5.2-6.el7.x86_64.rpm
httpd-tools-2.4.6-67.el7.centos.x86_64.rpm
vim postfile 模拟表单提交参数,以&符号结尾;存放当前目录。
内容:prodid=0101&
ab -n 2000 -c 200 -k -p ~/postfile -T application/x-www-form-urlencoded http://192.168.2.115:8081/Seckill/doseckill
ab -n 2000 -c 200 -k -p postfile -T ‘application/x-www-form-urlencoded’ http://192.168.140.1:8080/seckill/doseckill
增加-r参数,-r Don’t exit on socket receive errors.
ab -n 2000 -c 100 -r -p postfile -T ‘application/x-www-form-urlencoded’ http://192.168.140.1:8080/seckill/doseckill
ab -n 2000 -c 100 -p postfile -T ‘application/x-www-form-urlencoded’ http://192.168.137.1:8080/seckill/doseckill
已经秒光,可是还有库存。原因,就是乐观锁导致很多请求都失败。先点的没秒到,后点的可能秒到了。
节省每次连接redis服务带来的消耗,把连接好的实例反复利用。
通过参数管理连接的行为
代码见项目中
Lua 是一个小巧的脚本语言,Lua脚本可以很容易的被C/C++ 代码调用,也可以反过来调用C/C++的函数,Lua并没有提供强大的库,一个完整的Lua解释器不过200k,所以Lua不适合作为开发独立应用程序的语言,而是作为嵌入式脚本语言。
很多应用程序、游戏使用LUA作为自己的嵌入式脚本语言,以此来实现可配置性、可扩展性。
这其中包括魔兽争霸地图、魔兽世界、博德之门、愤怒的小鸟等众多游戏插件或外挂。
https://www.w3cschool.cn/lua/
将复杂的或者多步的redis操作,写为一个脚本,一次提交给redis执行,减少反复连接redis的次数。提升性能。
LUA脚本是类似redis事务,有一定的原子性,不会被其他命令插队,可以完成一些redis事务性的操作。
但是注意redis的lua脚本功能,只有在Redis 2.6以上的版本才可以使用。
利用lua脚本淘汰用户,解决超卖问题。
redis 2.6版本以后,通过lua脚本解决争抢问题,实际上是redis 利用其单线程的特性,用任务队列的方式解决多任务并发问题。
官网介绍:http://www.redis.io
Redis 提供了2个不同形式的持久化方式。
①用哪个好
官方推荐两个都启用。
如果对数据不敏感,可以选单独用RDB。
不建议单独用 AOF,因为可能会出现Bug。
如果只是做纯内存缓存,可以都不用
⑤官网建议
RDB持久化方式能够在指定的时间间隔能对你的数据进行快照存储
AOF持久化方式记录每次对服务器写的操作,当服务器重启的时候会重新执行这些命令来恢复原始的数据,AOF命令以redis协议追加保存每次写的操作到文件末尾.
Redis还能对AOF文件进行后台重写,使得AOF文件的体积不至于过大
只做缓存:如果你只希望你的数据在服务器运行的时候存在,你也可以不使用任何持久化方式.
同时开启两种持久化方式
在这种情况下,当redis重启的时候会优先载入AOF文件来恢复原始的数据, 因为在通常情况下AOF文件保存的数据集要比RDB文件保存的数据集要完整.
RDB的数据不实时,同时使用两者时服务器重启也只会找AOF文件。那要不要只使用AOF呢?
建议不要,因为RDB更适合用于备份数据库(AOF在不断变化不好备份), 快速重启,而且不会有AOF可能潜在的bug,留着作为一个万一的手段。
性能建议
因为RDB文件只用作后备用途,建议只在Slave上持久化RDB文件,而且只要15分钟备份一次就够了,只保留save 900 1这条规则。
如果使用AOF,好处是在最恶劣情况下也只会丢失不超过两秒数据,启动脚本较简单只load自己的AOF文件就可以了。
代价,一是带来了持续的IO,二是AOF rewrite的最后将rewrite过程中产生的新数据写到新文件造成的阻塞几乎是不可避免的。
只要硬盘许可,应该尽量减少AOF rewrite的频率,AOF重写的基础大小默认值64M太小了,可以设到5G以上。
默认超过原大小100%大小时重写可以改到适当的数值。
在指定的时间间隔内将内存中的数据集快照写入磁盘
, 也就是行话讲的Snapshot快照
,它恢复时是将快照文件直接读到内存里
时间间隔:每隔多长时间
数据集快照:略
Redis会单独创建一个子进程(fork)
来进行持久化,会先将数据写入到 一个临时文件
中,待持久化过程都结束了,再用这个临时文件替换上次持久化好的文件
。 整个过程中,主进程是不进行任何IO操作的,这就确保了极高的性能 如果需要进行大规模数据的恢复,且对于数据恢复的完整性不是非常敏感,那RDB方式要比AOF方式更加的高效。
RDB的缺点是最后一次持久化后的数据可能丢失
。
Fork的作用是复制一个与当前进程一样的进程
。新进程的所有数据
(变量、环境变量、程序计数器等) 数值都和原进程一致
,但是是一个全新的进程,并作为原进程的子进程
在Linux程序中,fork()会产生一个和父进程完全相同的子进程
,但子进程在此后多会exec系统调用,出于效率考虑,Linux中引入了“写时复制技术
”
一般情况父进程和子进程会共用同一段物理内存,只有进程空间的各段的内容要发生变化时,才会将父进程的内容复制一份给子进程。
在redis.conf中配置文件名称,默认为dump.rdb
rdb文件的保存路径,也可以修改。默认为Redis启动时命令行所在的目录下
./<=>当前路径
dir “/myredis/”
save :save时只管保存,其它不管,全部阻塞。手动保存
。不建议。
bgsave:Redis会在后台异步进行快照操作,快照同时还可以响应客户端请求。
可以通过lastsave 命令获取最后一次成功执行快照的时间
执行flushall命令,也会产生dump.rdb文件,但里面是空的,无意义
格式:save 秒钟 写操作次数
RDB是整个内存的压缩过的Snapshot,RDB的数据结构,可以配置复合的快照触发条件,
默认是1分钟内改了1万次,或5分钟内改了10次,或15分钟内改了1次。
禁用
不设置save指令,或者给save传入空字符串
当Redis无法写入磁盘的话,直接关掉Redis的写操作。推荐yes.
yes===>关闭写操作
对于存储到磁盘中的快照,可以设置是否进行压缩存储。如果是的话,redis会采用LZF算法进行压缩。
如果你不想消耗CPU来进行压缩的话,可以设置为关闭此功能。推荐yes.
在存储快照后,还可以让redis使用CRC64算法来进行数据校验,
但是这样做会增加大约10%的性能消耗,如果希望获取到最大的性能提升,可以关闭此功能
推荐yes.
先通过config get dir 查询rdb文件的目录
将*.rdb的文件拷贝到别的地方
rdb的恢复
然后把redis停掉,把dump.rgb删掉
然后做恢复,把新加的那个改名回原先那个,然后启动redis==>自动恢复
数据大小发生了变化==>做了持久化
适合大规模的数据恢复
对数据完整性和一致性要求不高更适合使用
节省磁盘空间
恢复速度快
Fork的时候,内存中的数据被克隆了一份,大致2倍的膨胀性需要考虑
虽然Redis在fork时使用了写时拷贝技术,但是如果数据庞大时还是比较消耗性能。
在备份周期在一定间隔时间做一次备份,所以如果Redis意外down掉的话,就会丢失最后一次快照后的所有修改。
动态停止RDB:redis-cli config set save “”#save后给空值,表示禁用保存策略
以日志的形式来记录每个写操作
(增量保存),将Redis执行过的所有写指令记录下来(读操作不记录)
, 只许追加文件但不可以改写文件
,redis启动之初会读取该文件重新构建数据,换言之,redis 重启的话就根据日志文件的内容将写指令从前到后执行一次 以完成数据的恢复工作
(1)客户端的请求写命令会被append追加到AOF缓冲区内;
(2)AOF缓冲区根据AOF持久化策略[always,everysec,no]将操作sync同步到磁盘的AOF文件中;
(3)AOF文件大小超过重写策略或手动重写时,会对AOF文件rewrite重写,压缩AOF文件容量;
(4)Redis服务重启时,会重新load加载AOF文件中的写操作达到数据恢复的目的;
可以在redis.conf中配置文件名称,默认为 appendonly.aof
AOF文件的保存路径,同RDB的路径一致
。
AOF和RDB同时开启,系统默认取AOF的数据(数据不会存在丢失)
AOF的备份机制和性能虽然和RDB不同, 但是备份和恢复的操作同RDB一样,都是拷贝备份文件,需要恢复时再拷贝到Redis工作目录下,启动系统即加载。
正常恢复
异常恢复
appendfsync always
始终同步,每次Redis的写入都会立刻记入日志;性能较差但数据完整性比较好
appendfsync everysec
每秒同步,每秒记入日志一次,如果宕机,本秒的数据可能丢失。
appendfsync no
redis不主动进行同步,把同步时机交给操作系统。
1是什么:
AOF采用文件追加方式,文件会越来越大为避免出现此种情况,新增了重写机制, 当AOF文件的大小超过所设定的阈值时,Redis就会启动AOF文件的内容压缩, 只保留可以恢复数据的最小指令集.可以使用命令bgrewriteaof
2重写原理,如何实现重写
AOF文件持续增长而过大时,会fork出一条新进程来将文件重写(也是先写临时文件最后再rename),redis4.0版本后的重写,是指上就是把rdb 的快照,以二级制的形式附在新的aof头部,作为已有的历史数据,替换掉原来的流水账操作。
no-appendfsync-on-rewrite:
如果 no-appendfsync-on-rewrite=yes ,不写入aof文件只写入缓存,用户请求不会阻塞,但是在这段时间如果宕机会丢失这段时间的缓存数据。(降低数据安全性,提高性能)
如果 no-appendfsync-on-rewrite=no, 还是会把数据往磁盘里刷,但是遇到重写操作,可能会发生阻塞。(数据安全,但是性能降低)
触发机制,何时重写
Redis会记录上次重写时的AOF大小,默认配置是当AOF文件大小是上次rewrite后大小的一倍且文件大于64M时触发
重写虽然可以节约大量磁盘空间,减少恢复时间。但是每次重写还是有一定的负担的,因此设定Redis要满足一定条件才会进行重写。
auto-aof-rewrite-percentage:设置重写的基准值,文件达到100%时开始重写(文件是原来重写后文件的2倍时触发)
auto-aof-rewrite-min-size:设置重写的基准值,最小文件64MB。达到这个值开始重写。
例如:文件达到70MB开始重写,降到50MB,下次什么时候开始重写?100MB
系统载入时或者上次重写完毕时,Redis会记录此时AOF大小,设为base_size,
如果Redis的AOF当前大小>= base_size +base_size*100% (默认)且当前大小>=64mb(默认)的情况下,Redis会对AOF进行重写。
3、重写流程
(1)bgrewriteaof触发重写,判断是否当前有bgsave或bgrewriteaof在运行,如果有,则等待该命令结束后再继续执行。
(2)主进程fork出子进程执行重写操作,保证主进程不会阻塞。
(3)子进程遍历redis内存中数据到临时文件,客户端的写请求同时写入aof_buf缓冲区和aof_rewrite_buf重写缓冲区保证原AOF文件完整以及新AOF文件生成期间的新的数据修改动作不会丢失。
(4)1).子进程写完新的AOF文件后,向主进程发信号,父进程更新统计信息。2).主进程把aof_rewrite_buf中的数据写入到新的AOF文件。
(5)使用新的AOF文件覆盖旧的AOF文件,完成AOF重写。
备份机制更稳健,丢失数据概率更低。
可读的日志文本,通过操作AOF稳健,可以处理误操作
比起RDB占用更多的磁盘空间。
恢复备份速度要慢。
每次读 写都同步的话,有一定的性能压力。
存在个别Bug,造成恢复不能。
主机数据更新后根据配置和策略, 自动同步到备机的master/slaver机制
,Master
以写为主
,Slave以读为主
读写分离,性能扩展
;主里面做写操作,从里面做读操作
容灾快速恢复
:当你一台从挂掉了,快速去找其他从
>一主多从
>必须只能有一台主服务器
>那么主机要是挂了怎么办==>集群
cp /etc/redis.conf /myredis/redis.conf
首先:关闭AOF
改为no
,保存,退出
一主两从,要用不同端口启动,木的办法,因为我们是在同一个电脑里面
redis6379.conf
使用vim创建配置文件
vi redis6379.conf
使用vim编写配置文件,通过include形式引入基础部分,其他的我们自己配
include /myredis/redis.conf
#通过include引入公共文件
pidfile /var/run/redis_6379.pid
#配置pid的一个位置
port 6379
# 端口
dbfilename dump6379.rdb
#rdb文件的名称
保存退出
根据这个配置文件复制出另外两个
cp redis6379.conf redis6380.conf
cp redis6379.conf redis6381.conf
然后去两个文件中更改内容
改成和文件名里数字相对应的
这三个服务虽然启动了,但是并没有主从的效果,这三个是独立的
分别连上这三个服务进行查看
redis-cli -p 6379
redis-cli -p 6380
redis-cli -p 6381
info replication
上面这个命令可以查看主从
===>都是相互独立的,没有主从效果,都是主机
弄出主从效果来
成为某个实例的从服务器:
slaveof <ip><port>
ip是主机
ip 这个ip不是linux的,而是windows的127.0.0.1
port是主机
端口号
1、在从机上(6380和6381)执行: slaveof 127.0.0.1 6379
去主机里面看看
主机:
从机
:
强调:从机里面只能做读操作!!主机读写都可以,但是一般只用来写,读从机来分担服务器压力
主机挂掉,重启就行,一切如初
从机重启需重设:slaveof 127.0.0.1 6379
可以将配置增加到文件中。永久生效。
切入点问题?slave1、slave2是从头开始复制还是从切入点开始复制?比如从k4进来,那之前的k1,k2,k3是否也可以复制?
从机是否可以写?set可否?
主机shutdown后情况如何?从机是上位还是原地待命?
主机又回来了后,主机新增记录,从机还能否顺利复制?
其中一台从机down后情况如何?依照原有它能跟上大部队吗?
把一个从机挂掉,主机进行写操作
把挂掉的从机重启
重启之后又变成了主服务器
把挂掉又重启的服务器变成从机,然后查询
把主服务器挂掉
去从服务器里面看看内容
从机是忠心耿耿啊
把主服务器重启启动起来
重启之后还是主服务器
上一个Slave可以是下一个slave的Master,Slave同样可以接收其他 slaves的连接和同步请求,那么该slave作为了链条中下一个的master, 可以有效减轻master的写压力,去中心化降低风险。
用 slaveof
中途变更转向:会清除之前的数据,重新建立拷贝最新的
风险是一旦某个slave宕机,后面的slave都没法备份
主机挂了,从机还是从机,无法写数据了
把6380变成6381的从服务器
特点和一主二仆类似:
主服务器挂掉了,从服务器还是从服务器,主服务器重启之后依旧是主服务器
当一个master宕机后,后面的slave可以立刻升为master,其后面的slave不用做任何修改。
用
slaveof no one
将从机变为主机。
缺点:这是手动完成的
自动完成场景:
在运行状态,服务器主机挂掉了,运营人员能够马上知道并且重启==>太难了,很麻烦
>自动完成
>哨兵模式
Slave启动成功连接到master后会发送一个sync命令
Master接到命令启动后台的存盘进程,同时收集所有接收到的用于修改数据集命令, 在后台进程执行完毕之后,master将传送整个数据文件到slave,以完成一次完全同步
全量复制:而slave服务在接收到数据库文件数据后,将其存盘并加载到内存中。
增量复制:Master继续将新的所有收集到的修改命令依次传给slave,完成同步
但是只要是重新连接master,一次完全同步(全量复制)将被自动执行
反客为主的自动版,能够后台监控主机是否故障,如果故障了根据投票数自动将从库转换为主库
1 .调整为一主二仆模式,6379带着6380、6381
2 .自定义的/myredis目录下新建sentinel.conf文件,名字绝不能错
为了明显,再打开一个客户端的连接,在里面创建文件
cd /myredis
vi sentinel.conf
3 .配置哨兵,填写内容
sentinel monitor mymaster 127.0.0.1 6379 1
sentinel monitor 给监控对象起的名称 主机 端口号 1
其中mymaster为监控对象起的服务器名称
1 为至少有多少个哨兵同意迁移的数量
例如本例一主二仆,1就表示主机挂了,一个从机同意就可以切换
4 .启动哨兵
redis-sentinel sentinel.conf
5 .当主机挂掉,从机制选举中产生新的主机
(大概10秒左右可以看到哨兵窗口日志,切换了新的主机)
哪个从机会被选举为主机呢?
根据优先级别:slave-priority
原主机重启后会变为从机。
6 .复制延时
由于所有的写操作都是先在Master上操作,然后同步更新到Slave上,所以从Master同步到Slave机器有一定的延迟,当系统很繁忙的时候,延迟问题会更加 严重,Slave机器数量的增加也会使这个问题更加严重。
优先级在redis.conf中默认:slave-priority 100,值越小优先级越高
偏移量是指获得原主机数据最全的
每个redis实例启动后都会随机生成一个40位的runid
写java代码识别哪个是主哪个是从?==>改一下连接池
sentinelSet.add(“192.168.11.103:26379”);
private static JedisSentinelPool jedisSentinelPool=null;
public static Jedis getJedisFromSentinel(){
if(jedisSentinelPool==null){
Set<String> sentinelSet=new HashSet<>();
sentinelSet.add("192.168.11.103:26379");
JedisPoolConfig jedisPoolConfig =new JedisPoolConfig();
jedisPoolConfig.setMaxTotal(10); //最大可用连接数
jedisPoolConfig.setMaxIdle(5); //最大闲置连接数
jedisPoolConfig.setMinIdle(5); //最小闲置连接数
jedisPoolConfig.setBlockWhenExhausted(true); //连接耗尽是否等待
jedisPoolConfig.setMaxWaitMillis(2000); //等待时间
jedisPoolConfig.setTestOnBorrow(true); //取连接的时候进行一下测试 ping pong
jedisSentinelPool=new JedisSentinelPool("mymaster",sentinelSet,jedisPoolConfig);
return jedisSentinelPool.getResource();
}else{
return jedisSentinelPool.getResource();
}
}
就是让它挂掉就ok了
⭐️⭐️
1.redis-cli -p 端口号
进去
2.shutdown exit
直接×掉是不会关闭主从复制的,再次打开依旧是主从复制状态
和关闭redis一样
容量不够,redis如何进行扩容?
并发写操作, redis如何分摊?
另外,主从模式,薪火相传模式,主机宕机,导致ip地址发生变化,应用程序中配置需要修改对应的主机地址、端口等信息。
之前通过代理主机来解决
但是redis3.0中提供了解决方案。就是**无中心化集群配置
**。
任何一台服务器都可以作为集群的入口,他们之间互相连通
Redis 集群实现了对Redis的水平扩容,即启动N个redis节点,将整个数据库分布存储在这N个节点中,每个节点存储总数据的1/N。
Redis 集群通过分区(partition)来提供一定程度的可用性(availability): 即使集群中有一部分节点失效或者无法进行通讯, 集群也可以继续处理命令请求。
用六台服务器搭建出来一个redis的集群
将rdb,aof文件都删除掉。
rm -rf dump63*
用vim打开配置文件在里面修改
cluster-enabled yes #打开集群模式
cluster-config-file nodes-6379.conf #设定节点配置文件名
cluster-node-timeout 15000 #设定节点失联时间,超过该时间(毫秒),集群自动进行主从切换。
这两个是在学习主从复制的时候创建的
rm -rf redis6380.conf
rm -rf redis6381.conf
通过上面几步,已经创建好一个配置文件了,再复制出5个来
cp redis6379.conf redis6380.conf
cp redis6379.conf redis6381.conf
cp redis6379.conf redis6389.conf
cp redis6379.conf redis6390.conf
cp redis6379.conf redis6391.conf
去复制出来的文件里面修改端口号,
使用vim打开配置文件,使用命令替换
先输冒号进入…模式,然后再输入下面内容
%s/6379/6380
#把6379替换成6380
以相同方式替换掉另外四个
保证启动都是成功的,通过ps命令查看进程
ps -ef | grep redis
没成功,79,80,81没有cluster和nodes文件,是主从复制的时候配置的主从关系还在,先解除主从关系
果然是因为这个,关闭
然后重启,ok
组合之前,请确保所有redis实例启动后,nodes-xxxx.conf文件都生成正常。
合体:
第一步,进入src目录下
cd /home/gy/redis6/redis-6.2.1/src
如果用的是比较老旧的版本,需要额外装一个环境,但是我们用的是最新版6.2.1,就不需要装了,他帮我们装好了
第二步,输入一下命令
redis-cli --cluster create --cluster-replicas 1 ip1:端口号1 ip2:端口号2 ip3:端口号3 ip4:端口号4 ip5:端口号5
redis-cli --cluster create --cluster-replicas 1 192.168.88.3:6379 192.168.88.3:6380 192.168.88.3:6381 192.168.88.3:6389 192.168.88.3:6390 192.168.88.3:6391
此处不要用127.0.0.1, 请用真实IP地址
我自己的电脑的linux是192.168.88.3
–replicas 1 采用最简单的方式配置集群,一台主机,一台从机,6台服务器,正好三组。
1代表一个主机有一个从机,如果是2就代表一个主机有两个从机
ok
16384 slots covered
普通方式登录
可能直接进入读主机,存储数据时,会出现MOVED重定向操作。所以,应该以集群方式登录(在src目录下)
。
进入src目录下,执行以下命令
cd /home/gy/redis6/redis-6.2.1/src
redis-cli -c -p 端口号
通过以上命令就可以进行集群连接,连接的话用任何结点/端口号都是可以的
前提是进到主机中
集群关闭直接将各个节点的进程kill掉即可
kill -9 PID # PID是进程号,查看进程时会显示
一个集群至少要有三个主节点。
选项 --cluster-replicas 1 表示我们希望为集群中的每个主节点创建一个从节点。
分配原则尽量保证每个主数据库运行在不同的ip地址,每个从库和主库不在一个ip地址上
其实是强调集群中的每个redis服务尽量不要在同一个服务器下,不是强调ip不同
一个 Redis 集群包含 16384 个插槽(hash slot), 数据库中的每个键都属于这 16384 个插槽的其中一个
集群使用公式 CRC16(key) % 16384 来计算键 key 属于哪个槽, 其中 CRC16(key) 语句用于计算键 key 的 CRC16 校验和 集群中的每个节点负责处理一部分插槽。
举个例子, 如果一个集群可以有主节点, 其中:
节点 A 负责处理 0 号至 5460 号插槽。
节点 B 负责处理 5461 号至 10922 号插槽。
节点 C 负责处理 10923 号至 16383 号插槽。
在redis-cli每次录入、查询键值,redis都会计算出该key应该送往的插槽,如果不是该客户端对应服务器的插槽,redis会报错,并告知应前往的redis实例地址和端口。
redis-cli客户端提供了 –c 参数实现自动重定向。
如 redis-cli -c –p 6379 登入后,再录入、查询键值对可以自动重定向。
不在一个slot下的键值,是不能使用mget,mset等多键操作。
可以通过{}来定义组的概念,从而使key中{}内相同内容的键值对放到一个slot中去
。
CLUSTER GETKEYSINSLOT
返回 count 个 slot 槽中的键。
如果主节点下线?从节点能否自动升为主节点?注意:15秒超时
主节点恢复后,主从关系会如何?主节点回来变成从机。
①把主机挂掉
②再连上,然后查看
6379挂了,主机是6389
6379再启动起来,会成为6389的小弟
如果所有某一段插槽的主从节点都宕掉,redis服务是否还能继续?
如果某一段插槽的主从都挂掉,而cluster-require-full-coverage 为yes ,那么 ,整个集群都挂掉
如果某一段插槽的主从都挂掉,而cluster-require-full-coverage 为no ,那么,该插槽数据全都不能使用,也无法存储。
只是该段,其他插槽依旧可以使用
redis.conf中的参数 cluster-require-full-coverage
即使连接的不是主机,集群会自动切换主机存储。主机写,从机读。
无中心化主从集群。无论从哪台主机写的数据,其他主机上都能读到数据。
实现扩容
分摊压力
无中心配置相对简单
多键操作是不被支持的
多键的Redis事务是不被支持的。lua脚本不被支持
由于集群方案出现较晚,很多公司已经采用了其他的集群方案,而代理或者客户端分片的方案想要迁移至redis cluster,需要整体迁移而不是逐步过渡,复杂度较大。
key对应的数据在数据源并不存在,每次针对此key的请求从缓存获取不到,请求都会压到数据源,从而可能压垮数据源。比如用一个不存在的用户id获取用户信息,不论缓存还是数据库都没有,若黑客利用此漏洞进行攻击可能压垮数据库。
一个一定不存在缓存及查询不到的数据,由于缓存是不命中时被动写的,并且出于容错考虑,如果从存储层查不到数据则不写入缓存,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义。
解决方案:
(1) **对空值缓存
:**如果一个查询返回的数据为空(不管是数据是否不存在),我们仍然把这个空结果(null)进行缓存,设置空结果的过期时间会很短,最长不超过五分钟
(2) 设置可访问的名单(白名单)
:
使用bitmaps类型
定义一个可以访问的名单,名单id作为bitmaps的偏移量,每次访问和bitmap里面的id进行比较,如果访问id不在bitmaps里面,进行拦截,不允许访问
。
(3) 采用布隆过滤器
:(布隆过滤器(Bloom Filter)是1970年由布隆提出的。它实际上是一个很长的二进制向量(位图)和一系列随机映射函数(哈希函数)。
布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都远远超过一般的算法,缺点是有一定的误识别率和删除困难。)
将所有可能存在的数据哈希到一个足够大的bitmaps中,一个一定不存在的数据会被 这个bitmaps拦截掉,从而避免了对底层存储系统的查询压力。
(4) **进行实时监控
:**当发现Redis的命中率开始急速降低,需要排查访问对象和访问的数据,和运维人员配合,可以设置黑名单限制服务
一般是黑客搞得==>报警,交给网警
key对应的数据存在,但在redis中过期,此时若有大量并发请求过来,这些请求发现缓存过期一般都会从后端DB加载数据并回设到缓存,这个时候大并发的请求可能会瞬间把后端DB压垮。
key可能会在某些时间点被超高并发地访问,是一种非常“热点”的数据。这个时候,需要考虑一个问题:缓存被“击穿”的问题。
解决问题:
(1)预先设置热门数据
:在redis高峰访问之前,把一些热门数据提前存入到redis里面,加大这些热门数据key的时长
(2)实时调整
:现场监控哪些数据热门,实时调整key的过期时长
(3)使用锁
:
(1) 就是在缓存失效的时候(判断拿出来的值为空),不是立即去load db。
(2) 先使用缓存工具的某些带成功操作返回值的操作(比如Redis的SETNX)去set一个mutex key
(3) 当操作返回成功时,再进行load db的操作,并回设缓存,最后删除mutex key;
(4) 当操作返回失败,证明有线程在load db,当前线程睡眠一段时间再重试整个get缓存的方法。
key对应的数据存在,但在redis中过期,此时若有大量并发请求过来,这些请求发现缓存过期一般都会从后端DB加载数据并回设到缓存,这个时候大并发的请求可能会瞬间把后端DB压垮。
缓存雪崩与缓存击穿的区别在于这里针对很多key缓存,前者则是某一个key
正常访问
缓存失效瞬间
缓存失效时的雪崩效应对底层系统的冲击非常可怕!
解决方案:
(1) **构建多级缓存架构:**nginx缓存 + redis缓存 +其他缓存(ehcache等)
(2) 使用锁或队列:
用加锁或者队列的方式保证来保证不会有大量的线程对数据库一次性进行读写,从而避免失效时大量的并发请求落到底层存储系统上。不适用高并发情况
(3) 设置过期标志更新缓存:
记录缓存数据是否过期(设置提前量),如果过期会触发通知另外的线程在后台去更新实际key的缓存。
(4) 将缓存失效时间分散开:
比如我们可以在原有的失效时间基础上增加一个随机值,比如1-5分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件。
随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统
后,由于分布式系统多线程、多进程并且分布在不同机器上
,这将使原单机部署情况下的并发控制锁策略失效
,单纯的Java API并不能提供分布式锁的能力。为了解决这个问题就**需要一种跨JVM的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题!
**
分布式锁主流的实现方案:
基于数据库实现分布式锁
基于缓存(Redis等)
基于Zookeeper
每一种分布式锁解决方案都有各自的优缺点:
性能:redis最高
可靠性:zookeeper最高
这里,我们就基于redis实现分布式锁。
setnx
setnx相当于加了一把锁,只有当上一个释放了,下一个才能继续
怎么释放?
==>删掉
设置key过期时间,自动释放
缺点:
如果上一个一直不释放,那下一个就进不去了
==>设置过期时间
上锁同时设置过期时间
为了防止上锁之后,设置过期时间之前断电了或者说服务器挂了等等导致设置过期时间没有成功
NX :只在键不存在时,才对键进行设置操作。 SET key value NX 效果等同于 SETNX key value 。
EX second :设置键的过期时间为 second 秒。 SET key value EX second 效果等同于 SETEX key second value 。
PX millisecond :设置键的过期时间为 millisecond 毫秒。 SET key value PX millisecond 效果等同于 PSETEX key millisecond value 。
XX :只在键已经存在时,才对键进行设置操作。
多个客户端同时获取锁(setnx)
获取成功,执行业务逻辑{从db获取数据,放入缓存},执行完成释放锁(del)
其他客户端等待重试
设置key三秒以后过期
@RestController
@RequestMapping("/redisTest")
public class RedisTestController {
@Autowired
private RedisTemplate redisTemplate;
@GetMapping("testLock")
public void testLock(){
//1获取锁,setne
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "111",3,TimeUnit.SECONDS);
//2获取锁成功、查询num的值
if(lock){
Object value = redisTemplate.opsForValue().get("num");
//2.1判断num为空return
if(StringUtils.isEmpty(value)){
return;
}
//2.2有值就转成成int
int num = Integer.parseInt(value+"");
//2.3把redis的num加1
redisTemplate.opsForValue().set("num", ++num);
//2.4释放锁,del
redisTemplate.delete("lock");
}else{
//3获取锁失败、每隔0.1秒再获取
try {
Thread.sleep(100);
testLock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
接下来把代码做个测试,这个测试直接访问就可以了
ab工具简介
ab全称为:apache bench
ab是Apache超文本传输协议(HTTP)的性能测试工具。其设计意图是描绘当前所安装的Apache的执行性能,主要是显示你安装的Apache每秒可以处理多少个请求。
ab是apache自带的压力测试工具。ab非常实用,它不仅可以对apache服务器进行网站访问压力测试,也可以对或其它类型的服务器进行压力测试。比如nginx、tomcat、IIS等。
ab命令
ab -n 1000 -c 100 http链接地址
其中-n表示请求数,-c表示并发数
eg: 上面java代码的ab命令
ab -n 1000 -c 100 http://192.168.88.3:8080/redisTest/testLock
①设置num为0: set num 0;通过代码每次加1
上述代码是针对num的,先去看看num
②ab测试
ab -n 1000 -c 100 http://192.168.88.3:8080/redisTest/testLock
③再去get一下num
因为上面ab命令是1000次请求,每次请求执行的是加一,所以结束后num应该是1000
问题:a释放了b的锁
解决方案:使用uuid防止误删
@RestController
@RequestMapping("/redisTest")
public class RedisTestController {
@Autowired
private RedisTemplate redisTemplate;
@GetMapping("testLock")
public void testLock(){
String uuid=UUID.randomUUID().toString();
//1获取锁,setne
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid,3,TimeUnit.SECONDS);
//2获取锁成功、查询num的值
if(lock){
Object value = redisTemplate.opsForValue().get("num");
//2.1判断num为空return
if(StringUtils.isEmpty(value)){
return;
}
//2.2有值就转成成int
int num = Integer.parseInt(value+"");
//2.3把redis的num加1
redisTemplate.opsForValue().set("num", ++num);
//2.4释放锁,del
//判断比较uuid值是否一样
String lockUuid=(String) redisTemplate.opsForValue().get("lock");
if(lockUuid.equals(uuid)){
redisTemplate.delete("lock");
}
}else{
//3获取锁失败、每隔0.1秒再获取
try {
Thread.sleep(100);
testLock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
问题:删除操作缺乏原子性
===>使用lua脚本
Lua 是一个小巧的脚本语言,Lua脚本可以很容易的被C/C++ 代码调用,也可以反过来调用C/C++的函数,Lua并没有提供强大的库,一个完整的Lua解释器不过200k,所以Lua不适合作为开发独立应用程序的语言,而是作为嵌入式脚本语言。
很多应用程序、游戏使用LUA作为自己的嵌入式脚本语言,以此来实现可配置性、可扩展性。
这其中包括魔兽争霸地图、魔兽世界、博德之门、愤怒的小鸟等众多游戏插件或外挂。
https://www.w3cschool.cn/lua/
将复杂的或者多步的redis操作,写为一个脚本,一次提交给redis执行,减少反复连接redis的次数。提升性能。
LUA脚本是类似redis事务,有一定的原子性,不会被其他命令插队,可以完成一些redis事务性的操作。
但是注意redis的lua脚本功能,只有在Redis 2.6以上的版本才可以使用
。
redis 2.6版本以后,通过lua脚本解决争抢问题,实际上是redis 利用其单线程的特性,用任务队列的方式解决多任务并发问题。⭐️⭐️
@RestController
@RequestMapping("/redisTest")
public class RedisTestController {
@Autowired
private RedisTemplate redisTemplate;
@GetMapping("testLockLua")
public void testLockLua() {
//1 声明一个uuid ,将做为一个value 放入我们的key所对应的值中
String uuid = UUID.randomUUID().toString();
//2 定义一个锁:lua 脚本可以使用同一把锁,来实现删除!
String skuId = "25"; // 访问skuId 为25号的商品 100008348542
String locKey = "lock:" + skuId; // 锁住的是每个商品的数据
// 3 获取锁
Boolean lock = redisTemplate.opsForValue().setIfAbsent(locKey, uuid, 3, TimeUnit.SECONDS);
// 第一种: lock 与过期时间中间不写任何的代码。
// redisTemplate.expire("lock",10, TimeUnit.SECONDS);//设置过期时间
// 如果true
if (lock) {
// 执行的业务逻辑开始
// 获取缓存中的num 数据
Object value = redisTemplate.opsForValue().get("num");
// 如果是空直接返回
if (StringUtils.isEmpty(value)) {
return;
}
// 不是空 如果说在这出现了异常! 那么delete 就删除失败! 也就是说锁永远存在!
int num = Integer.parseInt(value + "");
// 使num 每次+1 放入缓存
redisTemplate.opsForValue().set("num", String.valueOf(++num));
/*使用lua脚本来锁*/
// 定义lua 脚本
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
// 使用redis执行lua执行
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptText(script);
// 设置一下返回值类型 为Long
// 因为删除判断的时候,返回的0,给其封装为数据类型。如果不封装那么默认返回String 类型,
// 那么返回字符串与0 会有发生错误。
redisScript.setResultType(Long.class);
// 第一个要是script 脚本 ,第二个需要判断的key,第三个就是key所对应的值。
redisTemplate.execute(redisScript, Arrays.asList(locKey), uuid);
} else {
// 其他线程等待
try {
// 睡眠
Thread.sleep(1000);
// 睡醒了之后,调用方法。
testLockLua();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
1、加锁
// 1. 从redis中获取锁,set k1 v1 px 20000 nx
String uuid = UUID.randomUUID().toString();
Boolean lock = this.redisTemplate.opsForValue()
.setIfAbsent("lock", uuid, 2, TimeUnit.SECONDS);
2、使用lua释放锁
// 2. 释放锁 del
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
// 设置lua脚本返回的数据类型
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
// 设置lua脚本返回类型为Long
redisScript.setResultType(Long.class);
redisScript.setScriptText(script);
redisTemplate.execute(redisScript, Arrays.asList("lock"),uuid);
3、重试
Thread.sleep(500);
testLock();
为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件:
Redis Desktop Manager是一款简单快速、跨平台的Redis桌面管理工具,也被称作Redis可视化工具;支持命令控制台操作,以及常用,查询key,rename,delete等操作。
傻瓜式安装
过lua脚本解决争抢问题,实际上是redis 利用其单线程的特性,用任务队列的方式解决多任务并发问题。⭐️⭐️
@RestController
@RequestMapping("/redisTest")
public class RedisTestController {
@Autowired
private RedisTemplate redisTemplate;
@GetMapping("testLockLua")
public void testLockLua() {
//1 声明一个uuid ,将做为一个value 放入我们的key所对应的值中
String uuid = UUID.randomUUID().toString();
//2 定义一个锁:lua 脚本可以使用同一把锁,来实现删除!
String skuId = "25"; // 访问skuId 为25号的商品 100008348542
String locKey = "lock:" + skuId; // 锁住的是每个商品的数据
// 3 获取锁
Boolean lock = redisTemplate.opsForValue().setIfAbsent(locKey, uuid, 3, TimeUnit.SECONDS);
// 第一种: lock 与过期时间中间不写任何的代码。
// redisTemplate.expire("lock",10, TimeUnit.SECONDS);//设置过期时间
// 如果true
if (lock) {
// 执行的业务逻辑开始
// 获取缓存中的num 数据
Object value = redisTemplate.opsForValue().get("num");
// 如果是空直接返回
if (StringUtils.isEmpty(value)) {
return;
}
// 不是空 如果说在这出现了异常! 那么delete 就删除失败! 也就是说锁永远存在!
int num = Integer.parseInt(value + "");
// 使num 每次+1 放入缓存
redisTemplate.opsForValue().set("num", String.valueOf(++num));
/*使用lua脚本来锁*/
// 定义lua 脚本
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
// 使用redis执行lua执行
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptText(script);
// 设置一下返回值类型 为Long
// 因为删除判断的时候,返回的0,给其封装为数据类型。如果不封装那么默认返回String 类型,
// 那么返回字符串与0 会有发生错误。
redisScript.setResultType(Long.class);
// 第一个要是script 脚本 ,第二个需要判断的key,第三个就是key所对应的值。
redisTemplate.execute(redisScript, Arrays.asList(locKey), uuid);
} else {
// 其他线程等待
try {
// 睡眠
Thread.sleep(1000);
// 睡醒了之后,调用方法。
testLockLua();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
1、加锁
// 1. 从redis中获取锁,set k1 v1 px 20000 nx
String uuid = UUID.randomUUID().toString();
Boolean lock = this.redisTemplate.opsForValue()
.setIfAbsent("lock", uuid, 2, TimeUnit.SECONDS);
2、使用lua释放锁
// 2. 释放锁 del
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
// 设置lua脚本返回的数据类型
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
// 设置lua脚本返回类型为Long
redisScript.setResultType(Long.class);
redisScript.setScriptText(script);
redisTemplate.execute(redisScript, Arrays.asList("lock"),uuid);
3、重试
Thread.sleep(500);
testLock();
为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件:
[外链图片转存中…(img-j7zE5jho-1666246435127)]
Redis Desktop Manager是一款简单快速、跨平台的Redis桌面管理工具,也被称作Redis可视化工具;支持命令控制台操作,以及常用,查询key,rename,delete等操作。
傻瓜式安装
[外链图片转存中…(img-pmtCerXp-1666246435127)]
[外链图片转存中…(img-C0TDwF2j-1666246435128)]
[外链图片转存中…(img-jRrQvPXJ-1666246435129)]
[外链图片转存中…(img-eCLbg9HE-1666246435129)]