接口自动化测试已成为保证软件质量和稳定性的重要手段。而Redis作为一个高性能的缓存数据库,具备快速读写、多种数据结构等特点,为接口自动化测试提供了强大的支持。勇哥这里粗略介绍如何结合Python操作Redis,并将其应用于接口自动化测试框架中,以提升测试效率和数据管理能力。
(1)Redis的安装和配置
在开始之前,首先需要安装Redis并进行相应的配置,
redis官网:Redis
redis中文网:Redis中文网
安装完成后,确保Redis服务已成功启动,并正确配置了连接信息(如主机地址、端口号、密码等),这块信息就不过多介绍了哟!
(2)Redis与接口自动化测试框架的集成
使用Python操作Redis需要导入相应的客户端库,例如:
- pip install redis
- import redis
(3)初始化Redis连接
在接口自动化测试框架的初始化过程中,可以添加连接Redis的代码,确保测试过程中能够与Redis建立连接。
- class TestFramework:
- def __init__(self):
- self.redis = redis.Redis(host='localhost', port=6379, password='your_password')
(4)字符串操作
- # 设置键为"key1"的字符串值为"Hello, Redis!"
- r.set('key1', 'Hello, Redis!')
-
- # 获取键为"key1"的字符串值
- value = r.get('key1')
- print(value) # 输出: b'Hello, Redis!'
(5)列表操作
- # 向名为"list1"的列表左侧插入元素
- r.lpush('list1', 'item1')
- r.lpush('list1', 'item2')
- r.lpush('list1', 'item3')
-
- # 获取名为"list1"的列表所有元素
- items = r.lrange('list1', 0, -1)
- print(items) # 输出: [b'item3', b'item2', b'item1']
(6)哈希表操作
- # 设置名为"hash1"的哈希表字段和值
- r.hset('hash1', 'field1', 'value1')
- r.hset('hash1', 'field2', 'value2')
-
- # 获取名为"hash1"的哈希表字段和值
- value1 = r.hget('hash1', 'field1')
- value2 = r.hget('hash1', 'field2')
- print(value1, value2) # 输出: b'value1' b'value2'
(7)集合操作
- # 向名为"set1"的集合添加元素
- r.sadd('set1', 'item1')
- r.sadd('set1', 'item2')
- r.sadd('set1', 'item3')
-
- # 获取名为"set1"的集合所有元素
- items = r.smembers('set1')
- print(items) # 输出: {b'item1', b'item2', b'item3'}
以上就是 redis 的常见操作,是不是比写 `sql` 语句简单,是不是 `so easy!!`
封装Redis操作方法
为了方便接口自动化测试框架使用,又要开始封装了,简单封装代码如下:
- class RedisClient:
- def __init__(self):
- self.redis = redis.Redis(host='localhost', port=6379, password='your_password')
-
- def set_data(self, key, value, expire_time=None):
- self.redis.set(key, value)
- if expire_time is not None:
- self.redis.expire(key, expire_time)
-
- def get_data(self, key):
- return self.redis.get(key)
-
- def delete_data(self, key):
- self.redis.delete(key)
-
- def hash_set_field(self, key, field, value):
- self.redis.hset(key, field, value)
-
- def hash_get_field(self, key, field):
- return self.redis.hget(key, field)
-
- def hash_delete_field(self, key, field):
- self.redis.hdel(key, field)
接口自动化中比较常用的是字符串了,为了满足更多场景的需求,我们价格哈希数据结构的封装操作方法。
(1)测试数据管理
接口自动化测试中,将测试数据存储在Redis中,如用户信息、配置参数等。通过使用封装的Redis操作方法,可以方便地进行数据的增、删、改、查。
- redis_client= RedisClient()
- redis_client.set_data('user:1', '{"name": "kira", "age": 18}')
- user = redis_client.get_data('user:1')
- print(user.decode()) # 输出:{"name": "kira", "age": 18}
(2)处理接口依赖数据
一般步骤如下:
上代码:
- redis_client = RedisClient()
- # 第一个接口,设置依赖数据
- def first_api():
- response = requests.get('https://api.example.com/first')
- data = response.json()
- redis_client.set_data('key', data['value'])
- def second_api():
- # 获取依赖数据
- dependency_data = redis_client.get_data('key')
- response = requests.post('https://api.example.com/second', data={'data': dependency_data})
- result = response.json()
- # 处理接口响应结果
- if __name__ == '__main__':
- first_api()
- second_api()
(3)缓存管理
遇到需要频繁访问的接口,怎么半?
为了减少接口调用的开销和提高测试效率,可以使用Redis作为缓存工具,将接口的响应结果缓存起来,以便后续的测试用例重复使用。
- redis_client= RedisClient()
- def get_user_info(user_id):
- cache_key = f'user:{user_id}'
- user_info = redis_client.get_data(cache_key)
- if not user_info:
- # 调用接口获取用户信息
- user_info = api.get_user_info(user_id)
- redis_client.set_data(cache_key, user_info, expire_time=3600)
- return user_info
咱们首先检查Redis缓存中是否已存在对应的用户信息,如果不存在,则调用接口获取用户信息并将其存储到Redis缓存中,以备后续使用。同时,通过设置expire_time
参数,可以为缓存数据设置过期时间,避免过期数据的使用。
(4)并发测试
在自动化测试中,针对并发场景的测试很重要,我们可以并发模拟一些实际场景,比如:利用redis的原子性和分布式锁,为每个用户创建一个唯一的标识,存到redis中,这样不同用户请求就可以通过检查和比对redis的结果来模拟并发访问了,例如:
- # 创建 Redis 客户端
- redis_client = RedisClient()
-
- def get_user_info(user_id):
- cache_key = f'user:{user_id}'
- user_info = redis_client.get_data(cache_key)
- if not user_info:
- # 调用接口获取用户信息
- response = requests.get(f'http://127.0.0.1:5000/?user_id={user_id}')
- if response.status_code == 200:
- user_info = response.text
- print(user_info)
- redis_client.set_data(cache_key, user_info, expire_time=3600)
- else:
- print(f"Failed to retrieve user info for user_id: {user_id}. Status code: {response.status_code}")
- return user_info
-
- # 并发测试函数
- def run_concurrent_test(user_ids):
- with concurrent.futures.ThreadPoolExecutor() as executor:
- # 提交任务到线程池
- future_to_user_id = {executor.submit(get_user_info, user_id): user_id for user_id in user_ids}
- # 处理返回结果
- for future in concurrent.futures.as_completed(future_to_user_id):
- user_id = future_to_user_id[future]
- try:
- user_info = future.result()
- print(f"user_id: {user_id}; user_info: {user_info}")
- except Exception as e:
- print(f"Error occurred for user_id: {user_id}, Error: {str(e)}")
-
- if __name__ == '__main__':
- u_ids = [i for i in range(10, 99)]
- run_concurrent_test(u_ids)
我们创建线程池,使用submit 将任务(get_user_info)提交到线程池,每个任务一个 user_id,这里简单打印每个用户id,对于的信息,通过并发执行多任务,可以同一时间内获取多个用户信息,提高测试效率。
感谢每一个认真阅读我文章的人,礼尚往来总是要有的,虽然不是什么很值钱的东西,如果你用得到的话可以直接拿走:
这些资料,对于【软件测试】的朋友来说应该是最全面最完整的备战仓库,这个仓库也陪伴上万个测试工程师们走过最艰难的路程,希望也能帮助到你!有需要的小伙伴可以点击下方小卡片领取