第五章我写了个两个方法read_extract_yaml、write_extract_yaml,用来关联接口数据。
今天我们来进一步封装,把要读写的字段都写到yaml文件里,这样测试多个接口我都只需要更改yaml文件,不需要更改代码了。
1、yaml文件新建extract参数,里面填写我需要从当前接口的出参里提取的字段。提取字段有两种方法,第一种是正则提取,第二种是json数据提取,我这个例子里正好一样提取一个。
- name: 获取token鉴权码 request: method: get url: https://api.weixin.qq.com/cgi-bin/token params: grant_type: client_credential appid: wx8e8b67ced3c4b88 secret: XXX validate: none extract: access_token: '"access_token":"(.*?)"' expires_in: $.expires_in
2、写文件的代码比较简单,判断yaml文件里是否有extract参数,接着判断提取的字段是正则提取还是json提取。将字段获取之后用write_extract_yaml方法写入到extract.yaml文件即可。
# 提取值并写入extract.yaml文件 if "extract" in caseinfo_keys: for key, value in caseinfo["extract"].items(): if "(.*?)" in value or "(.+?)" in value: zz_value = re.search(value, res.text) if zz_value: extract_value = {key: zz_value.group(1)} write_extract_yaml(extract_value) else: json_value = jsonpath.jsonpath(return_json, value) if json_value: extract_value = {key: json_value[0]} write_extract_yaml(extract_value)
3、保存数据后的extract.yaml文件

1、新建一个pm_select_flag.yaml,该测试用例主要是读取第一个接口输出的access_token,并查询小程序标签。如何去读取extract.yaml的字段呢?这里用了${read_extract_data(xxx)}
- name: 查询小程序标签 request: method: get url: https://api.weixin.qq.com/cgi-bin/tags/get?access_token=${read_extract_data(access_token)} validate: none
2、读取代码要稍微复杂一点,这里先学习一个小知识,python的getattr,该方法可以直接调用某一个类里的方法:
import random
class DebugTalk: #获取随机数 def get_random_number(self,min,max): rm = random.randint(int(min),int(max)) return str(rm)
if __name__ == '__main__': a = DebugTalk().get_random_number(1,20) print(a)
b = getattr(DebugTalk(), "get_random_number") print(b(1,20))
可以看到这两个方法都能调取DebugTalk里的方法。

3、在RequestUtil里新建一个replace_value的方法,该方法的作用是看到yaml文件里有${AAA(xxx)},就去执行DebugTalk的AAA方法。这里的${read_extract_data(xxx)}就是去执行DebugTalk的read_extract_data方法。
# 替换值的方法def replace_value(self, data): # 1保存数据类型 data_type = type(data)
# 2判断数据类型 if isinstance(data, dict) or isinstance(data, list): str_data = json.dumps(data) else: str_data = str(data) # 替换 for cs in range(1, str_data.count('${') + 1): if "${" in str_data and "}" in str_data: start_index = str_data.index("${") end_index = str_data.index("}", start_index) old_value = str_data[start_index:end_index + 1] # print(f"old_value:{old_value}")
# 反射:通过类的对象和方法字符串去调用方法 function_name = old_value[2:old_value.index('(')] args_value1 = old_value[old_value.index('(') + 1:old_value.index(')')] if args_value1 != "": args_value2 = args_value1.split(',') # print(function_name, args_value2) new_value = getattr(DebugTalk(), function_name)(*args_value2) else: new_value = getattr(DebugTalk(), function_name)()
# print(f"new_value:{new_value}") # 判断替换的新参数的类型 if isinstance(new_value, int) or isinstance(new_value, float): str_data = str_data.replace('"' + old_value + '"', str(new_value)) else: str_data = str_data.replace(old_value, str(new_value))
# 3还原数据类型 if isinstance(data, dict) or isinstance(data, list): data = json.loads(str_data) else: data = data_type(str_data) return data
4、更改统一请求的封装,请求之前将请求头和参数里的${AAA(xxx)}全部替换成实际值。
def send_request(self, method, url, **kwargs): method = str(method).lower() url = self.replace_value(url)
# 请求头和参数替换 for key, value in kwargs.items(): if key in ['params', 'data', 'json', 'headers']: kwargs[key] = self.replace_value(value) print(f"kwargs[key]:{kwargs[key]}") elif key == "files": for file_key, file_path in value.items(): value[file_key] = open(file_path, 'rb')
res = RequestUtil.session.request(method,url,**kwargs) return res
5、完整的request_util.py 读写代码:
import jsonimport reimport requestsimport jsonpath
from test7.commons.debug_util import DebugTalkfrom test7.commons.yaml_util import write_extract_yaml
class RequestUtil: session = requests.session()
# 替换值的方法 def replace_value(self, data): # 1保存数据类型 data_type = type(data)
# 2判断数据类型 if isinstance(data, dict) or isinstance(data, list): str_data = json.dumps(data) else: str_data = str(data) # 替换 for cs in range(1, str_data.count('${') + 1): if "${" in str_data and "}" in str_data: start_index = str_data.index("${") end_index = str_data.index("}", start_index) old_value = str_data[start_index:end_index + 1] # print(f"old_value:{old_value}")
# 反射:通过类的对象和方法字符串去调用方法 function_name = old_value[2:old_value.index('(')] args_value1 = old_value[old_value.index('(') + 1:old_value.index(')')] if args_value1 != "": args_value2 = args_value1.split(',') # print(function_name, args_value2) new_value = getattr(DebugTalk(), function_name)(*args_value2) else: new_value = getattr(DebugTalk(), function_name)()
# print(f"new_value:{new_value}") # 判断替换的新参数的类型 if isinstance(new_value, int) or isinstance(new_value, float): str_data = str_data.replace('"' + old_value + '"', str(new_value)) else: str_data = str_data.replace(old_value, str(new_value))
# 3还原数据类型 if isinstance(data, dict) or isinstance(data, list): data = json.loads(str_data) else: data = data_type(str_data) return data
#规范YAML测试用例 def standard_yaml(self,caseinfo): caseinfo_keys = caseinfo.keys() # 判断关键词是否完整 if "name" in caseinfo_keys and "request" in caseinfo_keys and "validate" in caseinfo_keys: cs_request = caseinfo['request'] cs_request_keys = cs_request.keys() if "method" in cs_request_keys and "url" in cs_request_keys: method = cs_request.pop("method") #pop-删除列表里最后一个并且返回这个值 url = cs_request.pop("url") res = self.send_request(method,url,**cs_request) print(res.text) # 写入yaml文件 return_json = "" try: return_json = res.json() except Exception as e: print("返回的结果不是json格式,不能使用jsonpath提取")
# 提取值并写入extract.yaml文件 if "extract" in caseinfo_keys: for key, value in caseinfo["extract"].items(): if "(.*?)" in value or "(.+?)" in value: zz_value = re.search(value, res.text) if zz_value: extract_value = {key: zz_value.group(1)} write_extract_yaml(extract_value) else: json_value = jsonpath.jsonpath(return_json, value) if json_value: extract_value = {key: json_value[0]} write_extract_yaml(extract_value)
return res else: print("二级关键字必须包含:method,url") else: print("一级关键字必须包含:name,request,validate")
#统一请求封装 def send_request(self, method, url, **kwargs): method = str(method).lower() url = self.replace_value(url)
# 请求头和参数替换 for key, value in kwargs.items(): if key in ['params', 'data', 'json', 'headers']: kwargs[key] = self.replace_value(value) print(f"kwargs[key]:{kwargs[key]}") elif key == "files": for file_key, file_path in value.items(): value[file_key] = open(file_path, 'rb')
res = RequestUtil.session.request(method,url,**kwargs) return res
第一个接口生成的access_token给第二个接口使用,使用成功。


最后感谢每一个认真阅读我文章的人,看着粉丝一路的上涨和关注,礼尚往来总是要有的,虽然不是什么很值钱的东西,如果你用得到的话可以直接拿走

这些资料,对于做【软件测试】的朋友来说应该是最全面最完整的备战仓库,这个仓库也陪伴我走过了最艰难的路程,希望也能帮助到你!凡事要趁早,特别是技术行业,一定要提升技术功底。希望对大家有所帮助…….
如果你不想再体验一次自学时找不到资料,没人解答问题,坚持几天便放弃的感受的话,可以加入下方我的qq群大家一起讨论交流,里面也有各种软件测试资料和技术交流。