安装
win版
最新5.x版本 https://github.com/tporadowski/redis/releases/
最新3.x版本 https://github.com/microsoftarchive/redis/releases
安装
pip install redis
from redis import Redis
conn=Redis()
conn.set('name','xxx')
print(conn.get('name'))
conn.close()
Redis 支持的数据类型概述
Redis 是一个数据结构服务器。 Redis 的核心是提供一系列本机数据类型,可帮助您解决从缓存到队列再到事件处理的各种问题。
Redis 字符串是最基本的 Redis数据类型,表示字节序列。
set()
def set(
self,
name: KeyT,
value: EncodableT,
ex: Union[ExpiryT, None] = None, # 过期时间(秒)
px: Union[ExpiryT, None] = None, # 过期时间(毫秒)
nx: bool = False, # 如果设置为True,则只有name不存在时,当前set操作才执行,值存在,就修改不了,执行没效果。
xx: bool = False, # 如果设置为True,则只有name存在时,当前set操作才执行,值存在才能修改,值不存在,不会设置新值。
keepttl: bool = False, # 如果为True,保留与该键关联的生存时间。
get: bool = False, # 如果为True,将键name处的值设置为value并返回旧值存储在key中,如果key不存在则为None。
exat: Union[AbsExpiryT, None] = None, # 设置指定的Unix时间,该键将在该时间到期,单位为秒。
pxat: Union[AbsExpiryT, None] = None, # 设置指定的Unix时间,该键将在该时间到期,单位为毫秒。
import redis
conn = redis.Redis()
conn.set('name', 'xuxiaoxu')
conn.set('age', 18)
conn.close()
setnx()
# 相当于set(..., nx=True)
def setnx(self, name: KeyT, value: EncodableT) -> ResponseT:
conn.setnx('name','xxx') # 存在不修改,执行没效果
conn.setnx('hobby','pingpang')
psetex()
# 本质就是 set(..., px)设置时间(毫秒)
def psetex(self, name: KeyT, time_ms: ExpiryT, value: EncodableT):
conn.psetex('name', 3000, 'jasper')
mset()
# 批量设置
def mset(self, mapping: Mapping[AnyKeyT, EncodableT]) -> ResponseT:
conn.mset({'name': 'xuxiaoxu', 'age': 20, 'xxx': 'yyy'})
get()
def get(self, name: KeyT) -> ResponseT:
# 获取值,取到是bytes格式
# 可以在实例化redis对象时指定参数decode_responses=True
res = conn.get('name')
print(res) # b'xuxiaoxu'
mget()
# 批量获取
def mget(self, keys: KeysT, *args: EncodableT) -> ResponseT:
res = conn.mget('name', 'age')
print(res) # [b'xuxiaoxu', b'20']
getset()
# 先获取在设置
def getset(self, name: KeyT, value: EncodableT) -> ResponseT:
res = conn.getset('age', 30)
print(res) # b'20'
getrange()
# 取的是字节,左闭右闭区间。
def getrange(self, key: KeyT, start: int, end: int) -> ResponseT:
res = conn.getrange('name',0,3)
print(res) # b'xuxi'
setrange()
'''
从offset开始重新设置值,如果``offset``加上``value``的长度超过原始值的长
度,新值将比以前大。如果``offset``超过了原始值的长度,则用null填充。
'''
def setrange(self, name: KeyT, offset: int, value: EncodableT) -> ResponseT:
res = conn.setrange('name',10, 'ishandsome')
print(res) # 20
setbit()
# 设置或清除存储在键的字符串值中的偏移位。
# 根据值设置或清除位,该值可以是 0 或 1.
def setbit(self, name: KeyT, offset: int, value: int) -> ResponseT:
bitcount()
# 返回` ' key ' `的值中设置的位数。可选``start``和``end``参数表示要考虑哪些字节
def bitcount(
self,
key: KeyT,
start: Union[int, None] = None,
end: Union[int, None] = None,
mode: Optional[str] = None,
) -> ResponseT:
res = conn.bitcount('hobby', 0, 6)
print(res) # 28
strlen()
# 返回` ' name ' `的值中存储的字节数
def strlen(self, name: KeyT) -> ResponseT:
print(conn.strlen('hobby')) # 8
incr()
def incr(self, name: _Key, amount: int = ...) -> int: ...
res = conn.incr('age', amount=2)
print(res)
incrbyfloat()
def incrbyfloat(self, name: KeyT, amount: float = 1.0) -> ResponseT:
decr()
def decr(self, name, amount: int = ...) -> int: ...
print(conn.decr('age'))
append()
将字符串’ ’ value ’ ‘附加到' key '
的值上。如果“键”,不存在value
创建它。返回值’ ’ key ’ '的新长度。
def append(self, key: KeyT, value: EncodableT) -> ResponseT:
print(conn.append('name', 'hahaha')) # 14
hset()
'''
在hash `name`中设置`key`为`value`,
``mapping``接受一个由键值对组成的字典
添加到hash ``name``中。
``items``接受一个键值对列表作为参数
添加到hash ``name``中。
返回添加的字段数。
'''
def hset(
self,
name: str,
key: Optional[str] = None,
value: Optional[str] = None,
mapping: Optional[dict] = None,
items: Optional[list] = None,
) -> Union[Awaitable[int], int]:
res = conn.hset('info', 'name', 'xuxiaoxu', mapping={'age': 18, 'xx': 'yy'})
print(res) # 3
hmset()
将key设置为对应哈希name
中的值,mapping
字典中的键和值。
def hmset(self, name: str, mapping: dict) -> Union[Awaitable[str], str]:
res = conn.hmset('info',{'xx':'xx', 'yy':'yy'})
print(res)
# DeprecationWarning: Redis.hmset() is deprecated. Use Redis.hset() instead.
# res = conn.hmset('info',{'xx':'xx', 'yy':'yy'})
# 已被弃用,使用hset()即可。
hget()
返回哈希值key
中的值name
def hget(
self, name: str, key: str
) -> Union[Awaitable[Optional[str]], Optional[str]]:
res = conn.hget('info', 'xx')
print(res) # b'xx'
hmget()
返回与keys
顺序相同的值的列表。
def hmget(self, name: str, keys: List, *args: List) -> Union[Awaitable[List], List]:
res = conn.hmget('info', ('xx', 'yy'))
print(res) # [b'xx', b'yy']
hgetall()
返回一个Python字典,由散列的名称/值对组成
def hgetall(self, name: str) -> Union[Awaitable[dict], dict]:
res = conn.hgetall('info')
print(res) # {b'name': b'xuxiaoxu', b'age': b'18', b'xx': b'xx', b'yy': b'yy'}
hlen()
返回哈希值name
中的元素个数
def hlen(self, name: str) -> Union[Awaitable[int], int]:
res = conn.hlen('info')
print(res) # 4
hkeys()
返回hash name
中的键列表
def hkeys(self, name: str) -> Union[Awaitable[List], List]:
res = conn.hkeys('info')
print(res) # [b'name', b'age', b'xx', b'yy']
hvals()
返回hash name
中的值列表
def hvals(self, name: str) -> Union[Awaitable[List], List]:
res = conn.hvals('info')
print(res) # [b'xuxiaoxu', b'18', b'xx', b'yy']
hexists()
返回一个布尔值,表示key
是否存在于哈希name
中。
def hexists(self, name: str, key: str) -> Union[Awaitable[bool], bool]:
res = conn.hexists('info','xx')
print(res) # True
hdel()
从hash name
中删除keys
def hdel(self, name: str, *keys: List) -> Union[Awaitable[int], int]:
res = conn.hdel('info','xx', 'yy')
print(res) # 2
hincrby()
将hash name
中的key
的值增加amount
def hincrby(
self, name: str, key: str, amount: int = 1
) -> Union[Awaitable[int], int]:
conn.hset('ingo','age',18)
res = conn.hincrby('info','age',2)
print(res) # 20
hincrbyfloat()
将hash name
中的key
的值增加amount
def hincrby(
self, name: str, key: str, amount: int = 1
) -> Union[Awaitable[int], int]:
hscan()
在哈希表中递增地返回键值对切片,同样返回一个游标。
表示扫描位置。
match
允许按模式过滤键
count
允许提示最小返回次数
def hscan(
self,
name: KeyT,
cursor: int = 0,
match: Union[PatternT, None] = None,
count: Union[int, None] = None,
) -> ResponseT:
# count不准确,有点上下浮动。
res = conn.hscan('test', cursor=0, count=7)
print(res)
print(len(res[1]))
print(res[0])
res = conn.hscan('test', cursor=res[0], count=7)
print(res)
print(len(res[1]))
print(res[0])
hscan_iter()
使用HSCAN命令创建一个迭代器,这样客户端就不需要迭代,需要记住光标的位置。
match
允许按模式过滤键
count
允许提示最小返回次数
def hscan_iter(
self,
name: str,
match: Union[PatternT, None] = None,
count: Union[int, None] = None,
) -> Iterator:
cursor = "0"
while cursor != 0:
cursor, data = self.hscan(name, cursor=cursor, match=match, count=count)
yield from data.items()
res = conn.hscan_iter('test')
print(res) # 生成器
for i in res:
print(i)
conn.close()
lpush()
将values
推到列表的头部name
def lpush(self, name: str, *values: FieldT) -> Union[Awaitable[int], int]:
res = conn.lpush('redis_list', 1, 2, 3, 4, 5)
print(res) # 5
lpushx()
如果name
存在,将value
推到name
的头部
def lpushx(self, name: str, *values: FieldT) -> Union[Awaitable[int], int]:
res = conn.lpushx('redis_list', 6, 7, 8)
print(res) # 8
rpush()和rpushx()
与上面两个相反,推到列表的尾部。
llen()
返回列表的长度name
def llen(self, name: str) -> Union[Awaitable[int], int]:
res = conn.llen('redis_list')
print(res) # 8
linsert()
在列表中插入元素,该元素存储在基准值基准值之前或之后。
如果key不存在,则认为它是一个空列表,不会执行任何操作。
如果key存在但没有保存列表值,则返回错误。
def linsert(
self, name: str, where: str, refvalue: str, value: str
) -> Union[Awaitable[int], int]:
res = conn.linsert('redis_list', 'before', '3', 'new')
print(res) # 9
lset()
将list name
的index
元素设置为value
def lset(self, name: str, index: int, value: str) -> Union[Awaitable[str], str]:
res = conn.lset('redis_list', 0, 'lset')
print(res) # True
lrem()
删除第一次count
等于value
的元素从存储在name
中的列表中。
def lrem(self, name: str, count: int, value: str) -> Union[Awaitable[int], int]:
res = conn.lrem('redis_list', 1, '5')
print(res) # 1
lpop()
删除并返回列表name
的第一个元素。
默认情况下,该命令从开头弹出一个元素从列表中。当提供可选的count
参数时,响应将由多数量的元素组成,这取决于列表的长度。
从 Redis 版本 6.2.0 开始:添加了参数count。
def lpop(self, name: str, count: Optional[int] = None) -> Union[str, List, None]:
res = conn.lpop('redis_list')
print(res) # b'lset'
lindex()
从列表name
中返回位置index
支持负索引,从列表末尾开始
def lindex(
self, name: str, index: int
) -> Union[Awaitable[Optional[str]], Optional[str]]:
res = conn.lindex('redis_list', 0)
res1 = conn.lindex('redis_list', -1)
lrange()
返回列表name
之间的一个切片
定位start
和end
start
和end
也可以是负数
Python切片表示法
def lrange(self, name: str, start: int, end: int) -> Union[Awaitable[list], list]:
res = conn.lrange('redis_list',0,-1)
print(res)
ltrim()
修剪列表name
,删除所有不在切片内的
在start
和end
之间
start
和end
也可以是负数
Python切片表示法
def ltrim(self, name: str, start: int, end: int) -> Union[Awaitable[str], str]:
res = conn.ltrim('redis_list',1,-2)
rpoplpush()
从src
列表中取出一个值RPOP,然后在dst
列表上LPUSH。并返回rpop的值。
def rpoplpush(self, src: str, dst: str) -> Union[Awaitable[str], str]:
conn.lpush('new_list', '1')
res = conn.rpoplpush('redis_list', 'new_list')
print(res) # b'new'
blpop()
从第一个非空列表中取出一个值
在 keys
列表中命名。
如果keys
中的列表都没有值可以LPOP,则阻塞timeout
秒,或者直到一个值被推送给一个值名单的。
如果timeout为0,则无限期阻塞。
def blpop(
self, keys: List, timeout: Optional[int] = 0
) -> Union[Awaitable[list], list]:
res = conn.blpop('redis_list')
print(res)
brpoplpush()
BRPOPLPUSH是RPOPLPUSH 的阻塞变体。 当包含元素时,此命令的行为与RPOPLPUSH 完全相同。 当在MULTI/EXEC块中使用时,此命令的行为与RPOPLPUSH 完全相同。 当为空时,Redis 将阻止连接,直到另一个客户端 推到它或直到到达。 零点可用于无限期阻止。
def brpoplpush(
self, src: str, dst: str, timeout: Optional[int] = 0
) -> Union[Awaitable[Optional[str]], Optional[str]]:
conn.lpush('redis_list', 1, 2, 3, 4)
res = conn.brpoplpush('redis_list', 'new_list')
print(res)
五大数据类型都支持
delete()
删除一个或多个由names
指定的键。
def delete(self, *names: KeyT) -> ResponseT:
res = conn.delete('test')
print(res) # 1
exists()
返回存在的names
的数量
def exists(self, *names: KeyT) -> ResponseT:
res = conn.exists('new_list', 'redis_list', 'xxx')
print(res) # 2
res = conn.exists('xxx')
print(res) # 0
keys()
返回一个匹配pattern
的键列表
def keys(self, pattern: PatternT = "*", **kwargs) -> ResponseT:
res = conn.keys('*list*')
print(res) # [b'redis_list', b'new_list']
expire()
在键name
上为time
秒设置一个过期标志选项。time
可以用整数或Python的timedelta对象表示。
def expire(
self,
name: KeyT,
time: ExpiryT,
nx: bool = False,
xx: bool = False,
gt: bool = False,
lt: bool = False,
) -> ResponseT: