Unittest的运行模式可以参考柠檬班的这个视频:点击戳这里
记录一下:unittestreport第三方库使用文档 :好看的自动生成的自动化测试报告
,戳这里
正文:首先:新建名字叫做Api_Tools项目:
common
interface
interface_auto_lib
models
env.info.cfg
main.spec
modlue_v0.1.xlsx
InterfaceTools.py
base.py的内容:
import re
import datetime
import time
from common.data_process import RandomInfo,DataProcess,FileOperate
from common.data_process import FileOperate
from common.log import Log
from common.config_reader import ConfigReader
import requests
import urllib3
import json
from shlex import join
import jsonpath
from common.dosqlserver import ExceteSqlserver
urllib3.disable_warnings() # 屏蔽https证书警告
class ExecuteCase:
total=0
success=0
fail=0
def __init__(self):
self.publicdict=RandomInfo().publicdict
self.dataprocess=DataProcess()
self.randominfo = RandomInfo()
self.fileoperate=FileOperate()
self.log = Log()
self.runstatus_file_path = ConfigReader().get_value("file", "runstatus_file_path")
self.relevantparamstag = True # 替换参数是否存在
self.asserttag = True # 判断断言成功或者失败
self.savetag = True # 判断token 是否存在 保存是否成功
# self.autotag = True # 随机是否生成 生成数据 例如 手机号 邮编
self.beforesqltag = True # 判断 ExecuteSql栏位 接口之前 提取sql中的列名和参数加入到全局变量是否成功 失败为False
self.executesqltag = True # 判断 ExpectSqlResult栏位 接口之后的sql断言 提取sql中的列名和参数加入到全局变量是否成功 失败为False
self.expectsqlresulttag = True # 判断 ExpectSqlResult栏位 接口之后的sql断言 executesqltag=False expectsqlresulttag一定为False
self.excetesqlserver=None # DB对象
def get_auto_data(self,caseid):
""" 随机参数数据准备 """
self.log.logMsg(4, f"execut auto generate data ...") # 执行自动生成数据...
if caseid.AutoData:
auto_data = caseid.AutoData.split(",")
try:
self.randominfo.auto_data_in(auto_data)
except Exception as e:
self.autotag = False
if self.caseid.IgnoreErr == 1:
raise Exception(self.log.logMsg(2, "auto generate data error"))
else:
self.log.logMsg(3, "auto generate data error")
else:
self.log.logMsg(4, f"auto generate data is null")
def get_headers(self, headers):
try:
data = DataProcess().str_replace(str(headers))
header = str(data).split(",")
if len(header) == 1:
headers_data = str(data).split(",")[0]
if headers_data != 'xx':
style = headers_data.lower()
self.request_header = self.randominfo.get_request_style(style) # 获取请求头格式 {
"WmrAuthorization":"${access_token}"}
# print(f"{'*'*30,self.request_header}")
return eval(self.request_header)
else:
raise Exception(self.log.logMsg(2, f'headers input error'))
else:
headers_data = str(data).split(",")[0]
if headers_data != 'xx':
style = headers_data.lower()
request_headers_style = self.randominfo.get_request_style(style)
self.request_header = json.loads(request_headers_style)
headers_parameter = str(data).split(",")[1] # 请求token等其他信息
assert_split = headers_parameter.split(";")
for asserts in assert_split:
header_key, header_value = asserts.split(":") # actual_value : json 表达式
value = self.ec.relevantParams(header_value, "Headers_Params") if re.findall(r'\${([\s\S]+?)\}',header_value) != [] else header_value
request_header_dic = {
}
request_header_dic.update({
header_key: value})
if headers_data != 'xx':
self.request_header.update({
header_key: value})
return self.request_header
else:
return request_header_dic
except Exception as e:
raise Exception(self.log.logMsg(2, f'headers fail'))
def data_repare(self,executecase_class,caseid):
""" url header boby 来源 """
self.ec = executecase_class
self.caseid = caseid
self.log.logMsg(4, f'global dictionary: {self.publicdict}')
self.log.logMsg(4, f'step 1--->data prepare before sending the request') # 发送请求前的数据准备
# executecase_class.get_auto_data(caseid) # 处理 AutoData 栏位数据 引入全局变量
executecase_class.get_beforsql(caseid) # 处理 ExecuteSql 栏位的sql数据以及sql操作 接口之前准备
try:
url = executecase_class.relevantParams(caseid.InerfacePath, "InerfacePath") if re.findall(r'\${([\s\S]+?)\}', caseid.InerfacePath) != [] else caseid.InerfacePath
body = executecase_class.relevantParams(caseid.Body, "Body") if re.findall(r'\${([\s\S]+?)\}',caseid.Body) != [] else caseid.Body
# header = executecase_class.relevantParams(caseid.Headers, "Headers") if re.findall(r'\${([\s\S]+?)\}',caseid.Headers) != [] else caseid.Headers
header = ConfigReader().get_value("style", "json") if caseid.Headers == '' else executecase_class.get_headers(caseid.Headers)
url,header,body = self.dataprocess.str_replace(url),self.dataprocess.str_replace(header),self.dataprocess.str_replace(body)
# 开打文件时候 不需要 Headers 中的 "Content-Type":"application/json;charset=UTF-8"
# 针对于打开文件时候的处理
if ":open(" in body and 'Content-Type' in header: # files = {
"file":open("data\\totegrp_20220602.csv","rb")}
header = json.loads(header) # 将Json字符串解码成python对象
del header['Content-Type']
self.log.logMsg(4, f'request headers:{header}') # 将python对象编码成Json字符串 显示为 {
}
else:
self.log.logMsg(4, f'request headers:{header}')
self.log.logMsg(4, f'request url:{url}')
if '"file":open("' in caseid.Body: # files = {
"file":open("data\\totegrp_20220602.csv","rb")}
body = {
"file": open(body.split('("')[1].replace('","rb")}',""), "rb")}
self.log.logMsg(4, f'request body:{body}')
else:
self.log.logMsg(4, f'request body:{body}')
return header,url,body
except Exception as e:
self.log.logMsg(2, f'data repare fail')
def case_process(self,response_result,executecase_class, caseid, count):
self.response_result=response_result # 接口返回值
try:
if self.response_result.status_code == 200:
self.log.logMsg(4, f'request send success')
self.log.logMsg(4, f'step 2--->check if the case is depended on by other interfaces')
executecase_class.exce_save_data(caseid, count) # 处理 token 保存到公共参数
self.log.logMsg(4, f'step 3--->response result assert')
executecase_class.get_assert_result(caseid.Assert) # 断言参数 接口断言 预期结果=$.实际结果
self.log.logMsg(4, f'step 4--->sql statement assert')
executecase_class.get_executesql(caseid) # 执行 接口之后的 ExecuteSql 栏位 执行断言sql
executecase_class.get_expectsqlresult(caseid) # 执行 接口之后的 ExecuteSql Result 栏位 断言
ec_init = [self.savetag, self.asserttag, self.relevantparamstag, self.beforesqltag,self.executesqltag, self.expectsqlresulttag]
self.log.logMsg(2, f'case execute fail') if False in ec_init else self.log.logMsg(4,f'case execute success')
return 0
else:
self.log.logMsg(3, f'internal server error') if response_result.status_code==500 else self.log.logMsg(3, f'please check requeste information if is correct')
self.log.logMsg(2, f'case execute fail')
return 1
except Exception as e:
return 1
def get_beforsql(self,caseid):
""" 接口执行之前的 caseid.BeforeSql 栏位"""
self.log.logMsg(4, f"execut beforesql statement ...") # 执行sql语句
if caseid.BeforeSql == "":
self.log.logMsg(4, f"beforesql statement is null")
else:
database = str(caseid.BeforeSql).split(":")[0] # 获取服务器地址
beforesql = str(caseid.BeforeSql).split(":")[1] # 获取sql
self.excetesqlserver = ExceteSqlserver(database) # 在cfg文本获取DB的对象
self.db_connect,self.cursor=self.excetesqlserver.get_db_connet(database) # 获取sql查询对象 游标
# self.beforesqltag 默认是状态值 True self.sqltage 正常为 True 否则为 False
self.sqltage = self.excetesqlserver.parse_sql(caseid,database,self.cursor,beforesql,self.beforesqltag)
# self.excetesqlserver.close_db(self.cursor,self.db_connect)
def get_executesql(self,caseid):
""" 接口执行之后的 caseid.ExecuteSql 栏位"""
self.log.logMsg(4, f"execut executesql statement ...")
if caseid.ExecuteSql == "":
self.log.logMsg(4, f"executesql statement is null")
else:
database = str(caseid.ExecuteSql).split(":")[0]
executesql = str(caseid.ExecuteSql).split(":")[1]
if self.excetesqlserver is None: # 在cfg文本获取DB的对象不存在
self.excetesqlserver = ExceteSqlserver(database) # 获取DB对象
self.db_connect,self.cursor=self.excetesqlserver.get_db_connet(database) # 重新获取DB连接
self.excetesqlserver.parse_sql(caseid,database,self.cursor,executesql,self.executesqltag) # 执行SQL
# self.excetesqlserver.close_db(self.cursor,self.db_connect)
else:
self.db_connect,self.cursor=self.excetesqlserver.get_db_connet(database)
self.excetesqlserver.parse_sql(caseid,database,self.cursor,executesql,self.executesqltag)
# self.excetesqlserver.close_db(self.cursor,self.db_connect)
def get_expectsqlresult(self, caseid):
""" 执行 ExpectSql Result 结果 """
self.log.logMsg(4, f"execut expectsqlresult ...")
if caseid.ExpectSqlResult=="":
self.log.logMsg(4, f"execut expectsqlresult is null")
else:
if caseid.ExpectSqlResult in self.publicdict.values():
self.log.logMsg(4,"expectsqlresult success")
else:
if self.caseid.IgnoreErr == 1:
raise Exception(self.log.logMsg(2, f"expectsqlresult fail,please check the expectsqlresult if is correct"))
else:
self.expectsqlresulttag = False
self.log.logMsg(3, "expectsqlresult fail,please check the expectsqlresult if is correct")
def get_assert_result(self,data:str):
""" 断言结果处理 caseid.Assert 例如 :bearer=$.token_type , 预期 == 实际(json提取表达式) """
self.log.logMsg(4,"execut assert...") # 开始断言
if data:
assert_split = data.split(";")
count = 0
if len(assert_split) > 1: # 断言条件有2个 甚至以上
for asserts in assert_split:
expect_value, actual_value = asserts.split("=") # actual_value = json表达式提取参数
expect_value = self.dataprocess.str_replace(expect_value)
jsonpath_value = jsonpath.jsonpath(self.response_result.json(),actual_value) # 返回的实际结果是一个列表
if jsonpath_value:
if "" + join(jsonpath_value) == expect_value: # ['password'] 提取为 password 实际 == 预期
self.log.logMsg(4, f'[{actual_value}] assert success')
count += 1
else:
jsonpath_value = "" + join(jsonpath_value)
self.log.logMsg(4, f'assert fail, expect != actual [{expect_value}] != [{jsonpath_value}]')
else:
if self.caseid.IgnoreErr == 1:
raise Exception(self.log.logMsg(2, f"please check if the jsonpath expression is correct"))
else:
self.asserttag=False
self.log.logMsg(3, "please check if the jsonpath expression is correct")
if count==len(assert_split):
pass
# self.log.logMsg(4, "assert all success")
else:
if self.caseid.IgnoreErr == 1:
self.runstatus_tofile(self.runstatus_file_path,1)
self.runreslut_count()
raise Exception(self.log.logMsg(2, f'assert not all success'))
else:
self.asserttag = False
self.log.logMsg(3, f'assert not all success')
else: # 断言条件只有1个
expect_value, actual_value = data.split("=")
jsonpath_value = jsonpath.jsonpath(self.response_result.json(), actual_value)
for jv in jsonpath_value:
if expect_value == "" + join(jv):
self.log.logMsg(4, f'[{actual_value}] assert success')
else:
self.log.logMsg(4, f'assert fail, expect != actual [{expect_value}] != [{actual_value}]')
if self.caseid.IgnoreErr == 1:
self.runstatus_tofile(self.runstatus_file_path, 1)
self.runreslut_count()
raise Exception(self.log.logMsg(2, f'assert fail'))
else:
self.log.logMsg(3, f'assert fail')
else:
self.log.logMsg(4, "assert is null")
def relevantParams(self,data:str,remarks=None):
""" 提取${}参数 存入公共参数 self.publicdict """
self.log.logMsg(4,f"execut {remarks} parameter replace...")
RelevantParams=re.findall(r'\${([\s\S]+?)\}',data)
if RelevantParams == []:
self.log.logMsg(4,f"{remarks} parameter replace is null")
return data
else:
for relevantparam in RelevantParams:
if relevantparam in self.publicdict.keys():
strData="${%s}" % relevantparam
publicRelevanValue=self.publicdict[relevantparam]
data=data.replace(strData,str(publicRelevanValue))
self.log.logMsg(4,f"{relevantparam} parameter replace success")
else:
if self.caseid.IgnoreErr == 1:
self.runstatus_tofile(self.runstatus_file_path, 1)
self.runreslut_count()
raise Exception(self.log.logMsg(2,f"{relevantparam} parameter replace fail"))
else:
self.relevantparamstag = False
self.log.logMsg(3,f"{relevantparam} parameter replace fail")
return data
def exce_save_data(self, caseid, count):
""" 保存 提取token """
if caseid.Save:
expect_value, actual_value = caseid.Save.split("=") # access_token=$.access_token
expect = expect_value + '_' + str(count)
jsonkey = jsonpath.jsonpath(json.loads(self.response_result.text), actual_value) # 提取返回值 token
if jsonkey:
for jv in jsonkey:
self.publicdict[expect] = jv
if self.publicdict[expect] in self.publicdict.values():
self.log.logMsg(4, f"'{expect}' token save success")
else:
if self.caseid.IgnoreErr == 1:
self.runstatus_tofile(self.runstatus_file_path, 1)
self.runreslut_count()
raise Exception(self.log.logMsg(2,"save data fail"))
else:
self.savetag = False
self.log.logMsg(3, "token save fail")
else:
if self.caseid.IgnoreErr == 1:
raise Exception(self.log.logMsg(2,"please check if the jsonpath expression is correct or variable name if writte correct"))
else:
self.savetag = False
self.log.logMsg(3, "please check if the jsonpath expression is correct or variable name if writte correct")
else:
self.log.logMsg(4,"save data is null")
def runstatus_tofile(self,runstatus_file_path,status):
time_runstatus,runstatus_title = datetime.datetime.now().strftime("%Y_%m_%d_%H_%M_%S"),f'RunStatus,Caseid'
self.fileoperate.writefile(runstatus_file_path + time_runstatus + ".csv",