一、框架结构:

工程目录

二、Case文件设计

三、基础包 base3.1 封装get/post请求(runmethon.py)
-
- 1 import requests
- 2 import json
- 3 class RunMethod:
- 4 def post_main(self,url,data,header=None):
- 5 res = None
- 6 if header !=None:
- 7 res = requests.post(url=url,data=data,headers=header)
- 8 else:
- 9 res = requests.post(url=url,data=data)
- 10 return res.json()
- 11
- 12 def get_main(self,url,data=None,header=None):
- 13 res = None
- 14 if header !=None:
- 15 res = requests.get(url=url,data=data,headers=header,verify=False)
- 16 else:
- 17 res = requests.get(url=url,data=data,verify=False)
- 18 return res.json()
- 19
- 20 def run_main(self,method,url,data=None,header=None):
- 21 res = None
- 22 if method == 'Post':
- 23 res = self.post_main(url,data,header)
- 24 else:
- 25 res = self.get_main(url,data,header)
- 26 return json.dumps(res,ensure_ascii=False,sort_keys=True,indent=2)
3.2 封装mock(mock.py)
- 1 from mock import mock
- 2 #模拟mock 封装
- 3 def mock_test(mock_method,request_data,url,method,response_data):
- 4 mock_method = mock.Mock(return_value=response_data)
- 5 res = mock_method(url,method,request_data)
- 6 return res
四、数据操作包 operation_data4.1 获取excel单元格中的内容(get_data.py)
-
- 1 #coding:utf-8
- 2 from tool.operation_excel import OperationExcel
- 3 import data_config
- 4 from tool.operation_json import OperetionJson
- 5 from tool.connect_db import OperationMysql
- 6 class GetData:
- 7 def __init__(self):
- 8 self.opera_excel = OperationExcel()
- 9
- 10 #去获取excel行数,就是case的个数
- 11 def get_case_lines(self):
- 12 return self.opera_excel.get_lines()
- 13
- 14 #获取是否执行
- 15 def get_is_run(self,row):
- 16 flag = None
- 17 col = int(data_config.get_run())
- 18 run_model = self.opera_excel.get_cell_value(row,col)
- 19 if run_model == 'yes':
- 20 flag = True
- 21 else:
- 22 flag = False
- 23 return flag
- 24
- 25 #是否携带header
- 26 def is_header(self,row):
- 27 col = int(data_config.get_header())
- 28 header = self.opera_excel.get_cell_value(row,col)
- 29 if header != '':
- 30 return header
- 31 else:
- 32 return None
- 33
- 34 #获取请求方式
- 35 def get_request_method(self,row):
- 36 col = int(data_config.get_run_way())
- 37 request_method = self.opera_excel.get_cell_value(row,col)
- 38 return request_method
- 39
- 40 #获取url
- 41 def get_request_url(self,row):
- 42 col = int(data_config.get_url())
- 43 url = self.opera_excel.get_cell_value(row,col)
- 44 return url
- 45
- 46 #获取请求数据
- 47 def get_request_data(self,row):
- 48 col = int(data_config.get_data())
- 49 data = self.opera_excel.get_cell_value(row,col)
- 50 if data == '':
- 51 return None
- 52 return data
- 53
- 54 #通过获取关键字拿到data数据
- 55 def get_data_for_json(self,row):
- 56 opera_json = OperetionJson()
- 57 request_data = opera_json.get_data(self.get_request_data(row))
- 58 return request_data
- 59
- 60 #获取预期结果
- 61 def get_expcet_data(self,row):
- 62 col = int(data_config.get_expect())
- 63 expect = self.opera_excel.get_cell_value(row,col)
- 64 if expect == '':
- 65 return None
- 66 return expect
- 67
- 68 #通过sql获取预期结果
- 69 def get_expcet_data_for_mysql(self,row):
- 70 op_mysql = OperationMysql()
- 71 sql = self.get_expcet_data(row)
- 72 res = op_mysql.search_one(sql)
- 73 return res.decode('unicode-escape')
- 74
- 75 def write_result(self,row,value):
- 76 col = int(data_config.get_result())
- 77 self.opera_excel.write_value(row,col,value)
- 78
- 79 #获取依赖数据的key
- 80 def get_depend_key(self,row):
- 81 col = int(data_config.get_data_depend())
- 82 depent_key = self.opera_excel.get_cell_value(row,col)
- 83 if depent_key == "":
- 84 return None
- 85 else:
- 86 return depent_key
- 87
- 88 #判断是否有case依赖
- 89 def is_depend(self,row):
- 90 col = int(data_config.get_case_depend())
- 91 depend_case_id = self.opera_excel.get_cell_value(row,col)
- 92 if depend_case_id == "":
- 93 return None
- 94 else:
- 95 return depend_case_id
- 96
- 97 #获取数据依赖字段
- 98 def get_depend_field(self,row):
- 99 col = int(data_config.get_field_depend())
- 100 data = self.opera_excel.get_cell_value(row,col)
- 101 if data == "":
- 102 return None
- 103 else:
- 104 return data
4.2 获取excel中每个列(data_config.py)
-
- 1 #coding:utf-8
- 2 class global_var:
- 3 #case_id
- 4 Id = '0'
- 5 request_name = '1'
- 6 url = '2'
- 7 run = '3'
- 8 request_way = '4'
- 9 header = '5'
- 10 case_depend = '6'
- 11 data_depend = '7'
- 12 field_depend = '8'
- 13 data = '9'
- 14 expect = '10'
- 15 result = '11'
- 16 #获取caseid
- 17 def get_id():
- 18 return global_var.Id
- 19
- 20 #获取url
- 21 def get_url():
- 22 return global_var.url
- 23
- 24 def get_run():
- 25 return global_var.run
- 26
- 27 def get_run_way():
- 28 return global_var.request_way
- 29
- 30 def get_header():
- 31 return global_var.header
- 32
- 33 def get_case_depend():
- 34 return global_var.case_depend
- 35
- 36 def get_data_depend():
- 37 return global_var.data_depend
- 38
- 39 def get_field_depend():
- 40 return global_var.field_depend
- 41
- 42 def get_data():
- 43 return global_var.data
- 44
- 45 def get_expect():
- 46 return global_var.expect
- 47
- 48 def get_result():
- 49 return global_var.result
- 50
- 51 def get_header_value():
- 52 return global_var.header
4.3 解决数据依赖(dependent.py )
-
- 1 #coding:utf-8
- 2 import sys
- 3 import json
- 4 sys.path.append('C:/Users/lxz/Desktop/InterFace_JIA')
- 5 from tool.operation_excel import OperationExcel
- 6 from base.runmethod import RunMethod
- 7 from operation_data.get_data import GetData
- 8 from jsonpath_rw import jsonpath,parse
- 9 class DependdentData:
- 10 def __init__(self,case_id):
- 11 self.case_id = case_id
- 12 self.opera_excel = OperationExcel()
- 13 self.data = GetData()
- 14
- 15 #通过case_id去获取该case_id的整行数据
- 16 def get_case_line_data(self):
- 17 rows_data = self.opera_excel.get_rows_data(self.case_id)
- 18 return rows_data
- 19
- 20 #执行依赖测试,获取结果
- 21 def run_dependent(self):
- 22 run_method = RunMethod()
- 23 row_num = self.opera_excel.get_row_num(self.case_id)
- 24 request_data = self.data.get_data_for_json(row_num)
- 25 #header = self.data.is_header(row_num)
- 26 method = self.data.get_request_method(row_num)
- 27 url = self.data.get_request_url(row_num)
- 28 res = run_method.run_main(method,url,request_data)
- 29 return json.loads(res)
- 30
- 31 #根据依赖的key去获取执行依赖测试case的响应,然后返回
- 32 def get_data_for_key(self,row):
- 33 depend_data = self.data.get_depend_key(row)
- 34 response_data = self.run_dependent()
- 35 json_exe = parse(depend_data)
- 36 madle = json_exe.find(response_data)
- 37 return [math.value for math in madle][0]
- 38
- 39 if __name__ == '__main__':
- 40 order = {
- 41 "data": {
- 42 "_input_charset": "utf-8",
- 43 "body": "京东订单-1710141907182334",
- 44 "it_b_pay": "1d",
- 45 "notify_url": "http://order.imooc.com/pay/notifyalipay",
- 46 "out_trade_no": "1710141907182334",
- 47 "partner": "2088002966755334",
- 48 "payment_type": "1",
- 49 "seller_id": "yangyan01@tcl.com",
- 50 "service": "mobile.securitypay.pay",
- 51 "sign": "kZBV53KuiUf5HIrVLBCcBpWDg%2FnzO%2BtyEnBqgVYwwBtDU66Xk8VQUTbVOqDjrNymCupkVhlI%2BkFZq1jOr8C554KsZ7Gk7orC9dDbQl
- pr%2BaMmdjO30JBgjqjj4mmM%2Flphy9Xwr0Xrv46uSkDKdlQqLDdGAOP7YwOM2dSLyUQX%2Bo4%3D",
- 52 "sign_type": "RSA",
- 53 "string": "_input_charset=utf-8&body=京东订单-1710141907182334&it_b_pay=1d¬ify_url=http://order.imooc.com/pay/
- notifyalipay&out_trade_no=1710141907182334&partner=2088002966755334&payment_type=1&seller_id=yangyan01@
- tcl.com&service=mobile.securitypay.pay&subject=京东订单-1710141907182334&total_fee=299&sign=kZBV53KuiUf5H
- IrVLBCcBpWDg%2FnzO%2BtyEnBqgVYwwBtDU66Xk8VQUTbVOqDjrNymCupkVhlI%2BkFZq1jOr8C554KsZ7Gk7orC9dDbQlpr%2BaMmdjO30
- JBgjqjj4mmM%2Flphy9Xwr0Xrv46uSkDKdlQqLDdGAOP7YwOM2dSLyUQX%2Bo4%3D&sign_type=RSA",
- 54 "subject": "京东订单-1710141907182334",
- 55 "total_fee": 299
- 56 },
- 57 "errorCode": 1000,
- 58 "errorDesc": "成功",
- 59 "status": 1,
- 60 "timestamp": 1507979239100
- 61 }
- 62 res = "data.out_trade_no"
- 63 json_exe = parse(res)
- 64 madle = json_exe.find(order)
- 65 print [math.value for math in madle][0]
五、工具类包 tool5.1 操作excel (operation_excel.py)
-
- 1 #coding:utf-8
- 2 import xlrd
- 3 from xlutils.copy import copy
- 4 class OperationExcel:
- 5 def __init__(self,file_name=None,sheet_id=None):
- 6 if file_name:
- 7 self.file_name = file_name
- 8 self.sheet_id = sheet_id
- 9 else:
- 10 self.file_name = '../dataconfig/case1.xls'
- 11 self.sheet_id = 0
- 12 self.data = self.get_data()
- 13
- 14 #获取sheets的内容
- 15 def get_data(self):
- 16 data = xlrd.open_workbook(self.file_name)
- 17 tables = data.sheets()[self.sheet_id]
- 18 return tables
- 19
- 20 #获取单元格的行数
- 21 def get_lines(self):
- 22 tables = self.data
- 23 return tables.nrows
- 24
- 25 #获取某一个单元格的内容
- 26 def get_cell_value(self,row,col):
- 27 return self.data.cell_value(row,col)
- 28
- 29 #写入数据
- 30 def write_value(self,row,col,value):
- 31 '''
- 32 写入excel数据
- 33 row,col,value
- 34 '''
- 35 read_data = xlrd.open_workbook(self.file_name)
- 36 write_data = copy(read_data)
- 37 sheet_data = write_data.get_sheet(0)
- 38 sheet_data.write(row,col,value)
- 39 write_data.save(self.file_name)
- 40
- 41 #根据对应的caseid 找到对应行的内容
- 42 def get_rows_data(self,case_id):
- 43 row_num = self.get_row_num(case_id)
- 44 rows_data = self.get_row_values(row_num)
- 45 return rows_data
- 46
- 47 #根据对应的caseid找到对应的行号
- 48 def get_row_num(self,case_id):
- 49 num = 0
- 50 clols_data = self.get_cols_data()
- 51 for col_data in clols_data:
- 52 if case_id in col_data:
- 53 return num
- 54 num = num+1
- 55
- 56
- 57 #根据行号,找到该行的内容
- 58 def get_row_values(self,row):
- 59 tables = self.data
- 60 row_data = tables.row_values(row)
- 61 return row_data
- 62
- 63 #获取某一列的内容
- 64 def get_cols_data(self,col_id=None):
- 65 if col_id != None:
- 66 cols = self.data.col_values(col_id)
- 67 else:
- 68 cols = self.data.col_values(0)
- 69 return cols
- 70
- 71
- 72 if __name__ == '__main__':
- 73 opers = OperationExcel()
- 74 print opers.get_cell_value(1,2)
5.2判断字符串包含,判断字典是否相等(common_util.py)
-
- 1 #coding:utf-8
- 2 import json
- 3 class CommonUtil:
- 4 def is_contain(self,str_one,str_two):
- 5 '''
- 6 判断一个字符串是否再另外一个字符串中
- 7 str_one:查找的字符串
- 8 str_two:被查找的字符串
- 9 '''
- 10 flag = None
- 11 if isinstance(str_one,unicode):
- 12 str_one = str_one.encode('unicode-escape').decode('string_escape')
- 13 return cmp(str_one,str_two)
- 14 if str_one in str_two:
- 15 flag = True
- 16 else:
- 17 flag = False
- 18 return flag
- 19
- 20
- 21 def is_equal_dict(self,dict_one,dict_two):
- 22 '''
- 23 判断两个字典是否相等
- 24 '''
- 25 if isinstance(dict_one,str):
- 26 dict_one = json.loads(dict_one)
- 27 if isinstance(dict_two,str):
- 28 dict_two = json.loads(dict_two)
- 29 return cmp(dict_one,dict_two)
5.3 操作header(operation_herder.py)
-
- 1 #coding:utf-8
- 2 import requests
- 3 import json
- 4 from operation_json import OperetionJson
- 5
- 6
- 7 class OperationHeader:
- 8
- 9 def __init__(self,response):
- 10 self.response = json.loads(response)
- 11
- 12 def get_response_url(self):
- 13 '''
- 14 获取登录返回的token的url
- 15 '''
- 16 url = self.response['data']['url'][0]
- 17 return url
- 18
- 19 def get_cookie(self):
- 20 '''
- 21 获取cookie的jar文件
- 22 '''
- 23 url = self.get_response_url()+"&callback=jQuery21008240514814031887_1508666806688&_=1508666806689"
- 24 cookie = requests.get(url).cookies
- 25 return cookie
- 26
- 27 def write_cookie(self):
- 28 cookie = requests.utils.dict_from_cookiejar(self.get_cookie())
- 29 op_json = OperetionJson()
- 30 op_json.write_data(cookie)
- 31
- 32 if __name__ == '__main__':
- 33
- 34 url = "http://www.jd.com/passport/user/login"
- 35 data = {
- 36 "username":"18513199586",
- 37 "password":"111111",
- 38 "verify":"",
- 39 "referer":"https://www.jd.com"
- 40 }
- 41 res = json.dumps(requests.post(url,data).json())
- 42 op_header = OperationHeader(res)
- 43 op_header.write_cookie()
5.4 操作json文件(operation_json.py)
-
- 1 #coding:utf-8
- 2 import json
- 3 class OperetionJson:
- 4
- 5 def __init__(self,file_path=None):
- 6 if file_path == None:
- 7 self.file_path = '../dataconfig/user.json'
- 8 else:
- 9 self.file_path = file_path
- 10 self.data = self.read_data()
- 11
- 12 #读取json文件
- 13 def read_data(self):
- 14 with open(self.file_path) as fp:
- 15 data = json.load(fp)
- 16 return data
- 17
- 18 #根据关键字获取数据
- 19 def get_data(self,id):
- 20 print type(self.data)
- 21 return self.data[id]
- 22
- 23 #写json
- 24 def write_data(self,data):
- 25 with open('../dataconfig/cookie.json','w') as fp:
- 26 fp.write(json.dumps(data))
- 27
- 28
- 29
- 30 if __name__ == '__main__':
- 31 opjson = OperetionJson()
- 32 print opjson.get_data('shop')
5.5 操作数据库(connect_db.py)
-
- 1 #coding:utf-8
- 2 import MySQLdb.cursors
- 3 import json
- 4 class OperationMysql:
- 5 def __init__(self):
- 6 self.conn = MySQLdb.connect(
- 7 host='localhost',
- 8 port=3306,
- 9 user='root',
- 10 passwd='123456',
- 11 db='le_study',
- 12 charset='utf8',
- 13 cursorclass=MySQLdb.cursors.DictCursor
- 14 )
- 15 self.cur = self.conn.cursor()
- 16
- 17 #查询一条数据
- 18 def search_one(self,sql):
- 19 self.cur.execute(sql)
- 20 result = self.cur.fetchone()
- 21 result = json.dumps(result)
- 22 return result
- 23
- 24 if __name__ == '__main__':
- 25 op_mysql = OperationMysql()
- 26 res = op_mysql.search_one("select * from web_user where Name='ailiailan'")
- 27 print res
5.6 发送报告邮件(send_email.py)
-
- 1 #coding:utf-8
- 2 import smtplib
- 3 from email.mime.text import MIMEText
- 4 class SendEmail:
- 5 global send_user
- 6 global email_host
- 7 global password
- 8 email_host = "smtp.163.com"
- 9 send_user = "jiaxiaonan666@163.com"
- 10 password = "jia_668"
- 11 def send_mail(self,user_list,sub,content):
- 12 user = "jiaxiaonan"+"<"+send_user+">"
- 13 message = MIMEText(content,_subtype='plain',_charset='utf-8')
- 14 message['Subject'] = sub
- 15 message['From'] = user
- 16 message['To'] = ";".join(user_list)
- 17 server = smtplib.SMTP()
- 18 server.connect(email_host)
- 19 server.login(send_user,password)
- 20 server.sendmail(user,user_list,message.as_string())
- 21 server.close()
- 22
- 23 def send_main(self,pass_list,fail_list):
- 24 pass_num = float(len(pass_list))
- 25 fail_num = float(len(fail_list))
- 26 count_num = pass_num+fail_num
- 27 #90%
- 28 pass_result = "%.2f%%" %(pass_num/count_num*100)
- 29 fail_result = "%.2f%%" %(fail_num/count_num*100)
- 30
- 31
- 32 user_list = ['609037724@qq.com']
- 33 sub = "接口自动化测试报告"
- 34 content = "此次一共运行接口个数为%s个,通过个数为%s个,失败个数为%s,通过率为%s,失败率为%s" %(count_num,pass_num,fail_num,pass_result,fail_result )
- 35 self.send_mail(user_list,sub,content)
- 36
- 37 if __name__ == '__main__':
- 38 sen = SendEmail()
- 39 sen.send_main([1,2,3,4],[2,3,4,5,6,7])
六、主函数run_test.py
-
- 1 #coding:utf-8
- 2 import sys
- 3 sys.path.append("C:/Users/lxz/Desktop/InterFace_JIA")
- 4 from base.runmethod import RunMethod
- 5 from operation_data.get_data import GetData
- 6 from tool.common_util import CommonUtil
- 7 from operation_data.dependent_data import DependdentData
- 8 from tool.send_email import SendEmail
- 9 from tool.operation_header import OperationHeader
- 10 from tool.operation_json import OperetionJson
- 11 class RunTest:
- 12 def __init__(self):
- 13 self.run_method = RunMethod()
- 14 self.data = GetData()
- 15 self.com_util = CommonUtil()
- 16 self.send_mai = SendEmail()
- 17
- 18 #程序执行的
- 19 def go_on_run(self):
- 20 res = None
- 21 pass_count = []
- 22 fail_count = []
- 23 #10 0,1,2,3
- 24 rows_count = self.data.get_case_lines()
- 25 for i in range(1,rows_count):
- 26 is_run = self.data.get_is_run(i)
- 27 if is_run:
- 28 url = self.data.get_request_url(i)
- 29 method = self.data.get_request_method(i)
- 30 request_data = self.data.get_data_for_json(i)
- 31 expect = self.data.get_expcet_data_for_mysql(i)
- 32 header = self.data.is_header(i)
- 33 depend_case = self.data.is_depend(i)
- 34 if depend_case != None:
- 35 self.depend_data = DependdentData(depend_case)
- 36 #获取的依赖响应数据
- 37 depend_response_data = self.depend_data.get_data_for_key(i)
- 38 #获取依赖的key
- 39 depend_key = self.data.get_depend_field(i)
- 40 request_data[depend_key] = depend_response_data
- 41 if header == 'write':
- 42 res = self.run_method.run_main(method,url,request_data)
- 43 op_header = OperationHeader(res)
- 44 op_header.write_cookie()
- 45
- 46 elif header == 'yes':
- 47 op_json = OperetionJson('../dataconfig/cookie.json')
- 48 cookie = op_json.get_data('apsid')
- 49 cookies = {
- 50 'apsid':cookie
- 51 }
- 52 res = self.run_method.run_main(method,url,request_data,cookies)
- 53 else:
- 54 res = self.run_method.run_main(method,url,request_data)
- 55
- 56 if self.com_util.is_equal_dict(expect,res) == 0:
- 57 self.data.write_result(i,'pass')
- 58 pass_count.append(i)
- 59 else:
- 60 self.data.write_result(i,res)
- 61 fail_count.append(i)
- 62 self.send_mai.send_main(pass_count,fail_count)
- 63
- 64 #将执行判断封装
- 65 #def get_cookie_run(self,header):
- 66
- 67
- 68 if __name__ == '__main__':
- 69 run = RunTest()
- 70 run.go_on_run()
最后感谢每一个认真阅读我文章的人,礼尚往来总是要有的,虽然不是什么很值钱的东西,如果你用得到的话可以直接拿走:

这些资料,对于【软件测试】的朋友来说应该是最全面最完整的备战仓库,这个仓库也陪伴上万个测试工程师们走过最艰难的路程,希望也能帮助到你!
