第五章我写了个两个方法read_extract_yaml、write_extract_yaml,用来关联接口数据。
今天我们来进一步封装,把要读写的字段都写到yaml文件里,这样测试多个接口我都只需要更改yaml文件,不需要更改代码了。
1、yaml文件新建extract参数,里面填写我需要从当前接口的出参里提取的字段。提取字段有两种方法,第一种是正则提取,第二种是json数据提取,我这个例子里正好一样提取一个。
-
name: 获取token鉴权码
request:
method: get
url: https://api.weixin.qq.com/cgi-bin/token
params:
grant_type: client_credential
appid: wx8e8b67ced3c4b88
secret: XXX
validate: none
extract:
access_token: '"access_token":"(.*?)"'
expires_in: $.expires_in
2、写文件的代码比较简单,判断yaml文件里是否有extract参数,接着判断提取的字段是正则提取还是json提取。将字段获取之后用write_extract_yaml方法写入到extract.yaml文件即可。
# 提取值并写入extract.yaml文件
if "extract" in caseinfo_keys:
for key, value in caseinfo["extract"].items():
if "(.*?)" in value or "(.+?)" in value:
zz_value = re.search(value, res.text)
if zz_value:
extract_value = {key: zz_value.group(1)}
write_extract_yaml(extract_value)
else:
json_value = jsonpath.jsonpath(return_json, value)
if json_value:
extract_value = {key: json_value[0]}
write_extract_yaml(extract_value)
3、保存数据后的extract.yaml文件
1、新建一个pm_select_flag.yaml,该测试用例主要是读取第一个接口输出的access_token,并查询小程序标签。如何去读取extract.yaml的字段呢?这里用了${read_extract_data(xxx)}
-
name: 查询小程序标签
request:
method: get
url: https://api.weixin.qq.com/cgi-bin/tags/get?access_token=${read_extract_data(access_token)}
validate: none
2、读取代码要稍微复杂一点,这里先学习一个小知识,python的getattr,该方法可以直接调用某一个类里的方法:
import random
class DebugTalk:
#获取随机数
def get_random_number(self,min,max):
rm = random.randint(int(min),int(max))
return str(rm)
if __name__ == '__main__':
a = DebugTalk().get_random_number(1,20)
print(a)
b = getattr(DebugTalk(), "get_random_number")
print(b(1,20))
可以看到这两个方法都能调取DebugTalk里的方法。
3、在RequestUtil里新建一个replace_value的方法,该方法的作用是看到yaml文件里有${AAA(xxx)},就去执行DebugTalk的AAA方法。这里的${read_extract_data(xxx)}就是去执行DebugTalk的read_extract_data方法。
# 替换值的方法
def replace_value(self, data):
# 1保存数据类型
data_type = type(data)
# 2判断数据类型
if isinstance(data, dict) or isinstance(data, list):
str_data = json.dumps(data)
else:
str_data = str(data)
# 替换
for cs in range(1, str_data.count('${') + 1):
if "${" in str_data and "}" in str_data:
start_index = str_data.index("${")
end_index = str_data.index("}", start_index)
old_value = str_data[start_index:end_index + 1]
# print(f"old_value:{old_value}")
# 反射:通过类的对象和方法字符串去调用方法
function_name = old_value[2:old_value.index('(')]
args_value1 = old_value[old_value.index('(') + 1:old_value.index(')')]
if args_value1 != "":
args_value2 = args_value1.split(',')
# print(function_name, args_value2)
new_value = getattr(DebugTalk(), function_name)(*args_value2)
else:
new_value = getattr(DebugTalk(), function_name)()
# print(f"new_value:{new_value}")
# 判断替换的新参数的类型
if isinstance(new_value, int) or isinstance(new_value, float):
str_data = str_data.replace('"' + old_value + '"', str(new_value))
else:
str_data = str_data.replace(old_value, str(new_value))
# 3还原数据类型
if isinstance(data, dict) or isinstance(data, list):
data = json.loads(str_data)
else:
data = data_type(str_data)
return data
4、更改统一请求的封装,请求之前将请求头和参数里的${AAA(xxx)}全部替换成实际值。
def send_request(self, method, url, **kwargs):
method = str(method).lower()
url = self.replace_value(url)
# 请求头和参数替换
for key, value in kwargs.items():
if key in ['params', 'data', 'json', 'headers']:
kwargs[key] = self.replace_value(value)
print(f"kwargs[key]:{kwargs[key]}")
elif key == "files":
for file_key, file_path in value.items():
value[file_key] = open(file_path, 'rb')
res = RequestUtil.session.request(method,url,**kwargs)
return res
5、完整的request_util.py 读写代码:
import json
import re
import requests
import jsonpath
from test7.commons.debug_util import DebugTalk
from test7.commons.yaml_util import write_extract_yaml
class RequestUtil:
session = requests.session()
# 替换值的方法
def replace_value(self, data):
# 1保存数据类型
data_type = type(data)
# 2判断数据类型
if isinstance(data, dict) or isinstance(data, list):
str_data = json.dumps(data)
else:
str_data = str(data)
# 替换
for cs in range(1, str_data.count('${') + 1):
if "${" in str_data and "}" in str_data:
start_index = str_data.index("${")
end_index = str_data.index("}", start_index)
old_value = str_data[start_index:end_index + 1]
# print(f"old_value:{old_value}")
# 反射:通过类的对象和方法字符串去调用方法
function_name = old_value[2:old_value.index('(')]
args_value1 = old_value[old_value.index('(') + 1:old_value.index(')')]
if args_value1 != "":
args_value2 = args_value1.split(',')
# print(function_name, args_value2)
new_value = getattr(DebugTalk(), function_name)(*args_value2)
else:
new_value = getattr(DebugTalk(), function_name)()
# print(f"new_value:{new_value}")
# 判断替换的新参数的类型
if isinstance(new_value, int) or isinstance(new_value, float):
str_data = str_data.replace('"' + old_value + '"', str(new_value))
else:
str_data = str_data.replace(old_value, str(new_value))
# 3还原数据类型
if isinstance(data, dict) or isinstance(data, list):
data = json.loads(str_data)
else:
data = data_type(str_data)
return data
#规范YAML测试用例
def standard_yaml(self,caseinfo):
caseinfo_keys = caseinfo.keys()
# 判断关键词是否完整
if "name" in caseinfo_keys and "request" in caseinfo_keys and "validate" in caseinfo_keys:
cs_request = caseinfo['request']
cs_request_keys = cs_request.keys()
if "method" in cs_request_keys and "url" in cs_request_keys:
method = cs_request.pop("method") #pop-删除列表里最后一个并且返回这个值
url = cs_request.pop("url")
res = self.send_request(method,url,**cs_request)
print(res.text)
# 写入yaml文件
return_json = ""
try:
return_json = res.json()
except Exception as e:
print("返回的结果不是json格式,不能使用jsonpath提取")
# 提取值并写入extract.yaml文件
if "extract" in caseinfo_keys:
for key, value in caseinfo["extract"].items():
if "(.*?)" in value or "(.+?)" in value:
zz_value = re.search(value, res.text)
if zz_value:
extract_value = {key: zz_value.group(1)}
write_extract_yaml(extract_value)
else:
json_value = jsonpath.jsonpath(return_json, value)
if json_value:
extract_value = {key: json_value[0]}
write_extract_yaml(extract_value)
return res
else:
print("二级关键字必须包含:method,url")
else:
print("一级关键字必须包含:name,request,validate")
#统一请求封装
def send_request(self, method, url, **kwargs):
method = str(method).lower()
url = self.replace_value(url)
# 请求头和参数替换
for key, value in kwargs.items():
if key in ['params', 'data', 'json', 'headers']:
kwargs[key] = self.replace_value(value)
print(f"kwargs[key]:{kwargs[key]}")
elif key == "files":
for file_key, file_path in value.items():
value[file_key] = open(file_path, 'rb')
res = RequestUtil.session.request(method,url,**kwargs)
return res
第一个接口生成的access_token给第二个接口使用,使用成功。
最后感谢每一个认真阅读我文章的人,看着粉丝一路的上涨和关注,礼尚往来总是要有的,虽然不是什么很值钱的东西,如果你用得到的话可以直接拿走
这些资料,对于做【软件测试】的朋友来说应该是最全面最完整的备战仓库,这个仓库也陪伴我走过了最艰难的路程,希望也能帮助到你!凡事要趁早,特别是技术行业,一定要提升技术功底。希望对大家有所帮助…….
如果你不想再体验一次自学时找不到资料,没人解答问题,坚持几天便放弃的感受的话,可以加入下方我的qq群大家一起讨论交流,里面也有各种软件测试资料和技术交流。