• 接口自动化测试实操


    实现思路

    使用excel管理用例用例信息,requests模块发送http请求,实现了记录日志,邮件发送测试报告的功能

    目录结构如下:

    下面直接上代码:

    统筹脚本

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    31

    32

    33

    34

    35

    36

    37

    38

    39

    40

    # -*- coding:utf-8 -*-

    import os

    from interface import Interface

    from testcase_get import Get_testcase

    from result_save import Save_test_result

    from result_send import Send_report

    from config.config import Config

    from logging_save import logger

    if __name__ == "__main__":

        cur_path = os.path.split(os.path.realpath(__file__))[0]  # 获取当前文件绝对路径

        case_path = os.path.join(cur_path, "test_case""20170602.xls")

        test_case = Get_testcase(case_path).readExcel()  # 获取用例

        if not isinstance(test_case, list):  # 判断用例是否获取成功

            logger.info("Test_case get failed... \n Done!")

        else:

            logger.info("获取用例成功")

            # 调用接口

            test_result = Interface().interfaceTest(test_case)

            # 获取执行结果,用于发邮件

            count_success = test_result[3]

            count_failure = test_result[4]

            failed_case_detail = test_result[5]

            # 保存测试结果

            Save_test_result().save_result(case_path, test_result[0], test_result[1], test_result[2])

            logger.info("保存测试结果成功")

            # 获取邮件配置信息

            mail_config = Config(os.path.join(cur_path, "config""mail.conf")).get_mail_config()

            logger.info("获取邮箱配置成功")

            login_user = mail_config[0]

            login_pwd = mail_config[1]

            from_addr = mail_config[2]

            to_addrs = mail_config[3]

            smtp_server = mail_config[4]

            mail_send = Send_report(count_success, count_failure, failed_case_detail)

            # 获取最新测试报告

            last_report = mail_send.newest_report()

            logger.info("邮件发送结果")

            mail_send.send_result(login_user, login_pwd,from_addr, to_addrs,smtp_server,last_report)

            logger.info("DONE!")

    请求封装

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    31

    32

    33

    34

    35

    36

    37

    38

    39

    40

    41

    42

    43

    44

    45

    46

    47

    48

    49

    50

    51

    52

    53

    54

    55

    56

    57

    58

    59

    60

    61

    62

    63

    64

    65

    66

    67

    68

    69

    70

    71

    72

    73

    74

    75

    76

    77

    78

    79

    80

    # coding:utf-8

    import json

    import requests

    from logging_save import logger

    from result_check import Result_check

    from url_transform import urltransform

    class Interface:

        def __init__(self, ):

            pass

        def interfaceTest(self, case_list):

            """

            接口调用主函数

            """

            # 用于存结果

            res_flags = []

            # 用于存请求报文

            request_urls = []

            # 用于存返回报文

            responses = []

            # 用户存失败的用例

            failed_case = []

            # 统计成功失败的用例数

            count_success = 0

            count_failure = 0

            for case in case_list:

                try:

                    # 模块

                    product = case[0]

                    # 用例id

                    case_id = case[1]

                    # 用例标题

                    interface_name = case[2].strip('\n')

                    # 用例描述

                    case_detail = case[3]

                    # 请求方式

                    method = case[4]

                    # 请求url

                    url = case[5]

                    # 入参

                    param = case[6]

                    # 预期结果

                    res_check = case[7]

                except Exception as e:

                    return '测试用例格式不正确!%s' % e

                # 定义消息头信息

                headers = {'content-type''application/json',

                           'User-Agent''Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:22.0) Gecko/20100101 Firefox/22.0'}

                # 对url进行封装

                new_url = urltransform().urltransform(url, method, param)

                if method.upper() == 'GET':

                    results = requests.get(new_url).text

                    logger.info(u'正在调用接口: %s' % interface_name)

                    # print results

                    responses.append(results)

                    # 用于存储预期结果与实际结果的比较结果

                    res = Result_check().interface_result_check(results, res_check)

                    request_urls.append(new_url)

                else:

                    request_urls.append(new_url)

                    if param == '':

                        pass

                    else:

                        data = json.loads(param)  # 将参数转化为json格式

                    results = requests.post(new_url, data=json.dumps(data), headers=headers).text

                    responses.append(results)

                    res = Result_check().interface_result_check(results, res_check)

                if 'pass' in res:

                    res_flags.append('pass')

                    count_success += 1

                else:

                    logger.warning(u'接口返回结果与预期结果不一致!失败URL: %s METHOD :%s' % (url, method))

                    res_flags.append('fail')

                    count_failure += 1

                    failed_case.append((interface_name, method, url))

            logger.info(u'共执行 %s 条用例,PASS: %s,FAILED: %s' % (len(case_list), count_success, count_failure))

            return res_flags, request_urls, responses, count_success, count_failure, failed_case

    日志封装

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    31

    32

    33

    34

    35

    36

    37

    38

    39

    40

    41

    42

    43

    44

    45

    46

    47

    48

    49

    50

    51

    52

    53

    54

    55

    56

    57

    58

    59

    60

    61

    62

    63

    64

    65

    66

    67

    68

    69

    70

    71

    72

    73

    74

    75

    76

    77

    78

    79

    80

    81

    82

    83

    # coding=utf-8

    import logging

    import sys

    import traceback

    import time

    class LoggingUtils:

        '''

        ===========封装日志工具类的基本操作=============

        '''

        def __init__(self,logfile):

            '''

            :param logfile:

            '''

            self.logger = logging.getLogger(logfile)

            self.hdlr = logging.FileHandler(logfile)

            formatter = logging.Formatter('%(asctime)s %(levelname)s - %(message)s')

            self.ch = logging.StreamHandler()

            self.ch.setLevel(logging.INFO)

            self.ch.setFormatter(formatter)

            self.hdlr.setFormatter(formatter)

            self.logger.addHandler(self.hdlr)

            self.logger.addHandler(self.ch)

            self.logger.setLevel(logging.DEBUG)

        def debug(self, msg):

            '''

            :param msg:

            :return:

            '''

            self.logger.debug(msg)

            self.hdlr.flush()

        def info(self, msg):

            '''

             

            :param msg:

            :return:

            '''

            self.logger.info(msg)

            self.hdlr.flush()

        def warning(self,msg):

            self.logger.warning(msg)

            self.hdlr.flush()

        def error(self, msg):

            '''

            :param msg:

            :return:

            '''

            self.logger.error(msg)

            # self.logger.removeHandler(logging.StreamHandler())

            self.logger.removeHandler(self.ch)

            self.hdlr.flush()

        def error_sys(self, limit=None):

            '''

            :param limit:

            :return:

            '''

            exceptionType, exceptionValue, exceptionTraceback = sys.exc_info()

            if limit is None:

                if hasattr(sys, 'tracebacklimit'):

                    limit = sys.tracebacklimit

            = 0

            eline = '\n'

            while exceptionTraceback is not None and (limit is None or n < limit):

                = exceptionTraceback.tb_frame

                lineno = exceptionTraceback.tb_lineno

                co = f.f_code

                filename = co.co_filename

                name = co.co_name

                eline += ' File "%s", line %d, in %s \n ' % (filename, lineno, name)

                exceptionTraceback = exceptionTraceback.tb_next

                = + 1

            eline += "\n".join(traceback.format_exception_only(exceptionType, exceptionValue))

            self.logger.error(eline)

            self.hdlr.flush()

    timer = time.strftime('%Y-%m-%d',time.localtime())

    logger = LoggingUtils('%s.log'%timer)

    结果比对

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    31

    32

    33

    34

    35

    36

    37

    38

    39

    40

    41

    42

    43

    44

    45

    46

    47

    48

    49

    #coding:utf-8

    class result_check():

        def __init__(self):

            pass

        def result_check(self,results,res_check):

            '''

            结果对比函数

            '''

            #返回结果,将结果中的json数据转化为可以和预期结果比较的数据

            res = results.replace('":"','=').replace('" : "','=')

            #预期结果,是xx=11;xx=22

            res_check = res_check.split(';')

            for in res_check:

                if in res:

                    pass

                else:

                    return '结果不匹配 '+ str(s)

            return 'pass'

     result_save.py   保存测试结果的模块,复制原有的用例,保存为新的excel

    #coding:utf-8

    from xlutils import copy

    import xlrd

    import time

    import os

    class Save_test_result():

        def __init__(self):

            pass

        def save_result(self,file_path,res_flags,request_urls,responses):

            '''

            :return:

            '''

            book = xlrd.open_workbook(file_path)

            new_book = copy.copy(book)

            sheet = new_book.get_sheet(0)

            = 1

            for request_url, response, flag in zip(request_urls, responses, res_flags):

                sheet.write(i, 8, u'%s' % request_url)

                sheet.write(i, 9, u'%s' % response)

                sheet.write(i, 10, u'%s' % flag)

                += 1

            report_path = os.path.abspath(os.path.join('report'))

            if not os.path.exists(report_path):

                os.makedirs(report_path)

            new_book.save(os.path.abspath(os.path.join(report_path, 'Report@%s.xls' % time.strftime('%Y.%m.%d@%H%M%S'))))

    结果邮件

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    31

    32

    33

    34

    35

    36

    37

    38

    39

    40

    41

    42

    43

    44

    45

    46

    47

    48

    49

    50

    51

    52

    53

    54

    55

    56

    57

    58

    59

    60

    61

    62

    63

    64

    65

    66

    67

    68

    69

    70

    71

    72

    73

    74

    75

    76

    77

    78

    79

    80

    81

    82

    83

    84

    85

    86

    87

    88

    89

    90

    91

    #coding:utf-8

    import smtplib

    from email.mime.text import MIMEText

    from email.header import Header

    from email.mime.multipart import MIMEMultipart

    import os

    from logging_save import  logger

    class Send_report(object):

        def __init__(self,count_success,count_failure,failed_case):

            '''

            :param count_success:

            :param count_failure:

            :param failed_case:

            '''

            self.count_success = count_success

            self.count_failure = count_failure

            self.failed_case = failed_case

        def newest_report(self,testreport='report'):

            '''

            获取最新的测试报告

            :param testreport:

            :return:

            '''

            lists = os.listdir(testreport)

            lists.sort(key=lambda fn: os.path.getmtime(os.path.join(testreport,fn)))

            file_new = os.path.join(testreport, lists[-1])

            logger.info('获取最新附件报告成功')

            return file_new

        def send_result(self,username,passwd,from_addr,to_addrs,smtpserver,*args):

            '''

            :param username:

            :param passwd:

            :param from_addr:

            :param to_addrs:

            :param smtpserver:

            :param args:

            :return:

            '''

            sender = from_addr

            subject = '财富港接口测试结果'

            username = username

            passwd = passwd

            '''邮件内容'''

            tille = (u'用例名称', u'请求方式', u'url')

            details = (u'成功: ' + str(self.count_success) + u'失败: ' + str(self.count_failure)) + '\n' + u'失败的用例如下 :' + \

                      '\n' + '\n'.join(str(zip(tille, i)) for in self.failed_case).decode('unicode-escape')

            logger.info('邮件附件为: %s' %(args[0].split('\\')[1]))

            if args != None#判断是否添加附件

                msg = MIMEMultipart()

                msg.attach(MIMEText(details, 'plain''utf-8'))

                = 0

                while i < len(args): #可以添加多个附件

                    part = MIMEText(open(args[i], 'rb').read(), 'base64''utf-8')

                    part["Content-Type"= 'application/octet-stream'

                    part["Content-Disposition"= 'attachment; filename="%s"'%args[i]

                    msg.attach(part) #添加附件

                    += 1

                msg['subject'= Header(subject, 'utf-8')

                msg['From'= from_addr

                msg['To'= ','.join(eval(to_addrs)) #兼容多个收件人

                smtp = smtplib.SMTP()

                try:

                    smtp.connect(smtpserver)

                    smtp.login(username, passwd)

                    smtp.sendmail(sender, eval(to_addrs), msg.as_string())

                    smtp.close()

                    logger.info('带附件测试报告发送成功!')

                except smtplib.SMTPAuthenticationError,e:

                    logger.error('邮箱账户或密码错误: '+ str(e))

            else:

                msg = MIMEText(details, 'plain''utf-8')

                msg['subject'= Header(subject, 'utf-8')

                msg['From'= from_addr

                msg['To'=  ','.join(eval(to_addrs))

                smtp = smtplib.SMTP()

                try:

                    smtp.connect(smtpserver)

                    smtp.login(username, passwd)

                    smtp.sendmail(sender, eval(to_addrs), msg.as_string())

                    logger.info('测试报告发送成功!')

                    smtp.close()

                except smtplib.SMTPAuthenticationError,e:

                    logger.error('邮箱账户或密码错误 : '+str(e))

    用例获取及数据格式化

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    31

    #coding:utf-8

    import xlrd

    from logging_save import logger

    class Get_testcase(object):

        def __init__(self, file_path):

            '''

            :param file_path: 用例文件路径

            '''

            self.file_path = file_path

        def readExcel(self):

            '''

            读取用例函数

            :return: 测试用例列表

            '''

            try:

                book = xlrd.open_workbook(self.file_path)  # 打开excel

            except Exception, error:

                logger.error('路径不在或者excel不正确 : ' + str(error))

                return error

            else:

                sheet = book.sheet_by_index(0)  # 取第一个sheet页

                rows = sheet.nrows  # 取这个sheet页的所有行数

                case_list = []  # 用于保存用例信息

                for in range(rows):

                    if i != 0:

                        case_list.append(sheet.row_values(i)) # 把每一条测试用例添加到case_list中

                return case_list

    请求url转换

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    #coding:utf-8

    class urltransform(object):

        def __init__(self):

            pass

        def urltransform(self, url, method, param):

            '''

            :return:

            '''

            if param == '':

                new_url = url

            else:

                if method.upper() == 'GET':

                    new_url = url + '?' + param.replace(';''&')  #如果有参数,且为GET方法则组装url

                else:

                    new_url = url

            return new_url

    测试用例excel结构

    config目录下,config.py   获取配置文件信息的模块

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    #conding:utf-8

    import ConfigParser

    class Config(object):

        def __init__(self,file_path):

            self.config = ConfigParser.ConfigParser()

            self.config.read(file_path)

        def get_mail_config(self):

            login_user = self.config.get('SMTP''login_user')

            login_pwd = self.config.get('SMTP''login_pwd')

            from_addr = self.config.get('SMTP''from_addr')

            to_addrs = self.config.get('SMTP''to_addrs')

            smtp_server = self.config.get('SMTP''smtp_server')

            port = self.config.get('SMTP''port')

            return login_user, login_pwd , from_addr, to_addrs,smtp_server, port

        def report_save_config(self):

            pass

    mail.conf

    1

    2

    3

    4

    5

    6

    7

    8

    [SMTP]

    login_user = 18******@163.com

    login_pwd = ******

    from_addr =  BI<18******@163.com>

    to_addrs = ['18******@163.com']

    #to_addrs = ['1******@qq.com','******.com']

    smtp_server = smtp.163.com

    port = 25

    测试报告

    邮件接收结果

    Python接口自动化测试零基础入门到精通(2023最新版)

  • 相关阅读:
    【每日一题】 Dijkstra求最短路 II,堆优化算法
    企微SCRM在私域流量运营中的痛点解决之道
    java-php-python--移动公司crm客户关系管理系统开发与实现-计算机毕业设计
    从底层分析并详解SpringAOP底层实现
    SQL查询中的小技巧:SELECT 1 和 LIMIT 1 替代 count(*)
    【Docker】Docker-compose及Consul多容器编排工具
    SpringMVC之JSON返回&异常处理机制
    模板特化--类和函数
    Go --- go-elasticsearch介绍及简单使用
    数据库系统原理与应用教程(007)—— 数据库相关概念
  • 原文地址:https://blog.csdn.net/m0_68405758/article/details/134083508