您的位置:首页 > 编程语言 > Python开发

Python unittest实现接口自动化测试实战(经验分享)

2019-07-31 16:16 204 查看
版权声明:本文为博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。 本文链接:https://blog.csdn.net/haiyi_guo/article/details/97792968

Python+requests+Excel+unittest+HTMLTestRunner

开始之前:

先来梳理一下我们手动进行接口测试的流程是什么样的?

理解接口文档 —> 编写测试用例 —> 准备测试环境 —> 选择测试工具(Postman)—> 执行测试 —> 检查接口返回数据 —> 最后生成测试报告 —> 将测试报告以邮件的方式发送给相关大佬

那么,何为自动化测试呢?

自动化测试是把以人为驱动的测试行为转化为机器执行的一种过程。

现在,我们来看看具体是怎么实现的吧!

(以下是我自己实现的方式,大家可以参考下,有许多不完善的地方,也遇到了许多的问题,希望大神们多多指教!)

一、环境搭建:

1、安装Python3(具体安装方法可以百度)

2、安装需要用到的相关库requests、openpyxl

pip3 install requests(HTTP请求库)

pip3 install openpyxl(操作Excel表格库)

pip3 install configparser(操作.ini配置文件库)

后续有一个安装所需库的package.py文件,执行安装即可

3、安装一个IDE,推荐使用PyCharm(具体安装方法可以百度)

二、结构的划分

  • common:存放一些公共的方法
  • config: config.ini 配置文件
  • readConfig.py 读取配置文件信息
  • logs:存放日志,不创建也可以,在执行脚本过程中会自动生成的
  • report:存放执行脚本过程中生成的测试报告
  • shellFile:存放在测试执行的过程需要调用的shell脚本
  • source:存放在测试执行的过程需要用的源文件。比如图片、视频等
  • testCase:存放测试用例代码
  • testDataFile:存放测试数据文件(包括Excel、JSON文件)
      case_list.txt 控制该条测试用例的执行或不执行
    • installEnv.sh 如果要在Liux系统下执行脚本,只需要运行一下该shell脚本即可
    • package.py 安装运行该脚本所需要用到的python相关库
    • reset_env.py 重置测试环境(每次启动自动化测试之前,进行测试环境的清理,保持环境的干净)
    • runcase.py 启动自动化测试系统入口

    二、配置文件目录

    1、配置文件config.ini

    就是用来存放基本不变的东西,比如测试环境、发送邮件的信息,所有一成不变东西都可以放在这个文件下

    [code][HTTP]
    protocol = http
    
    # 开发环境
    # ip = 192.168.0.128
    # port = 3002
    
    # 测试环境
    ip = 192.168.0.118
    port = 3002
    
    # 生成环境
    basics = www.xxx.com
    
    [EMAIL]
    is_send = no
    title = xxx Test Report
    mail_host = smtp.163.com
    mail_user = xxxxxx
    mail_pass = xxxxxx
    sender = xxxxxx@163.com
    receive_user = xxxxxx1@qq.com/xxxxxx2@qq.com
    
    [EXCEL]
    module = A
    api_name = B
    case_id = C
    case_name = D
    url = E
    premise = F
    header = G
    method = H
    data = I
    param = J
    check = K
    expected = L
    return = M
    rely = N

    [EXCEL] 是Excel测试用例表格每一列对应的字母常量,用于后面的读取Excel表格数据使用,比如:用例标题case_name = D

     2、读取配置文件readConfig.py

    该文件是用来读取config.ini文件的,用到configparser第三方库

    [code]#!/usr/bin/python3
    # coding=utf-8
    import configparser
    import os
    
    proDir = os.path.split(os.path.realpath(__file__))[0]
    configPath = os.path.join(proDir, "config.ini")
    
    class ReadConfig:
    def __init__(self):
    self.cf = configparser.ConfigParser()
    self.cf.read(configPath)
    
    def get_base_url(self):
    protocol = self.cf.get("HTTP", "protocol")
    # 测试环境
    ip = self.cf.get("HTTP", "ip")
    port = self.cf.get("HTTP", "port")
    base_url = protocol + '://' + ip + ':' + port
    # 生成环境
    # basics = self.cf.get("HTTP", "basics")
    # base_url = protocol + '://' + basics
    return base_url
    
    def get_email(self, mail_key):
    email_value = self.cf.get("EMAIL", mail_key)
    return email_value

    三、测试数据设计

    1、Excel测试用例,所有的测试用例都写在Excel表格里,然后读取Excel,进行自动测试

    请求头部和请求数据都是写的关键字,关键字对应的数据放在JSON文件

    三、公共方法common目录

    这里就介绍一下几个主要的方法

    1、myLog.py

    ——主要用到logging模块可以替代print函数的功能,并能将标准输出输入到日志文件保存起来(logging模块是Python标准库,不需要安装)

    ——我主要对其进行了一下封装定义成自己想要的输出log的格式、控制输出到控制台的等级、需要打印的日志等级,直接看源码吧!

    [code]#!/usr/bin/python3
    # coding=utf-8
    import logging
    import os
    import time
    from datetime import datetime
    
    logger = logging.getLogger(__name__)
    path_dir = str(os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir)))
    logpath = path_dir + '/logs'
    
    def create_file(file_name):
    path = file_name[:-7]
    if not os.path.isdir(path):
    os.makedirs(path)
    
    if not os.path.isfile(file_name):
    fb = open(file_name, mode='w', encoding='utf-8')
    fb.close()
    else:
    # print("不需要创建")
    pass
    
    def set_handler(level):
    if level == 'error':
    logger.addHandler(MyLog.err_handler)
    logger.addHandler(MyLog.handler)
    logger.addHandler(MyLog.console)
    
    def remove_handler(level):
    if level == 'error':
    logger.removeHandler(MyLog.err_handler)
    logger.removeHandler(MyLog.handler)
    logger.removeHandler(MyLog.console)
    
    def set_now_time():
    now_time = time.strftime(MyLog.time_format, time.localtime(time.time()))
    return now_time
    
    class MyLog:
    # 基本设置
    path = os.path.join(logpath, str(datetime.now().strftime("%Y%m%d")))
    now = time.strftime("%H-%M-%S", time.localtime(time.time()))  # 获取当前时间
    log_file = path + "/log.log"
    err_file = path + "/err.log"
    create_file(log_file)
    create_file(err_file)
    time_format = "%Y-%m-%d %H:%M:%S"
    
    # 设置需要输出到控制台的等级
    console = logging.StreamHandler()
    # console.setLevel(logging.WARNING)
    console.setLevel(logging.INFO)
    
    # 设置需要打印的日志等级
    logger.setLevel(logging.DEBUG)
    handler = logging.FileHandler(log_file, encoding='utf-8')
    err_handler = logging.FileHandler(err_file, encoding='utf-8')
    
    @staticmethod
    def debug(message, name='', line=''):
    set_handler('debug')
    if name != '' and line != '':
    logger.debug(
    "[" + set_now_time() + "]" + " - " + name + ', ' + 'Line %s' % line + "] " + " - " + "DEBUG:" + " " + message)
    remove_handler('debug')
    else:
    logger.debug(
    "[" + set_now_time() + "]" + " - " + "DEBUG:" + " " + message)
    remove_handler('debug')
    
    @staticmethod
    def info(message, name='', line=''):
    set_handler('info')
    if name != '' and line != '':
    logger.info(
    "[" + set_now_time() + "]" + " - " + name + ', ' + 'Line %s' % line + "] " + " - " + "INFO:" + " " + message)
    remove_handler('info')
    else:
    logger.info(
    "[" + set_now_time() + "]" + " - " + "INFO:" + " " + message)
    remove_handler('info')
    
    @staticmethod
    def warning(message, name='', line=''):
    set_handler('warning')
    if name != '' and line != '':
    logger.warning(
    "[" + set_now_time() + "]" + " - " + name + ', ' + 'Line %s' % line + "] " + " - " + "WARNING:" + " " + message)
    remove_handler('warning')
    else:
    logger.warning(
    "[" + set_now_time() + "]" + " - " + "WARNING:" + " " + message)
    remove_handler('warning')
    
    @staticmethod
    def error(message, name='', line=''):
    set_handler('error')
    if name != '' and line != '':
    logger.error(
    "[" + set_now_time() + "]" + " - " + name + ', ' + 'Line %s' % line + "] " + " - " + "ERROR:" + " " + message)
    remove_handler('error')
    else:
    logger.error(
    "[" + set_now_time() + "]" + " - " + "ERROR:" + " " + message)
    remove_handler('error')

    2、httpSet.yp

    ——requests是python http 库,它是第三方库,所以使用前需要先安装(具体使用可以查阅官方文档http://cn.python-requests.org/zh_CN/latest/

    ——主要是对requests的一些请求方式进行了一下封装修改,就不多介绍了,直接看源码吧!

    [code]#!/usr/bin/python3
    # coding=utf-8
    import json
    import requests
    from common.myLog import MyLog
    
    false = False
    true = True
    
    class HttpMethod:
    def __init__(self):
    self.log = MyLog()
    
    def get_method(self, url, data=None, headers=None):
    try:
    res = requests.get(url=url, params=data, headers=headers)
    status_code = res.status_code
    res_json = res.json()
    return status_code, res_json  # 返回响应码,响应内容
    except Exception as e:
    self.log.error("Error:%s" % e)
    
    def post_method(self, url, files=None, data=None, headers=None):
    try:
    if files:
    res = requests.post(url=url, files=files, data=data, headers=headers)
    else:
    res = requests.post(url=url, data=json.dumps(data), headers=headers)
    status_code = res.status_code
    res_json = res.json()
    return status_code, res_json  # 返回响应码,响应内容
    except Exception as e:
    self.log.error("Error:%s" % e)
    
    def put_method(self, url, data=None, headers=None):
    try:
    res = requests.put(url=url, data=json.dumps(data), headers=headers)
    status_code = res.status_code
    res_json = res.json()
    return status_code, res_json  # 返回响应码,响应内容
    except Exception as e:
    self.log.error("Error:%s" % e)
    
    def delete_method(self, url, data=None, headers=None):
    try:
    res = requests.delete(url=url, data=json.dumps(data), headers=headers)
    status_code = res.status_code
    res_json = res.json()
    return status_code, res_json  # 返回响应码,响应内容
    except Exception as e:
    self.log.error("Error:%s" % e)
    
    def http_method(self, method, url, files=None, data=None, headers=None):
    """判断请求方法
    :param method: 请求方法
    :param url: 接口路径
    :param data: 请求数据
    :param headers: 请求头
    :return:
    """
    if method == 'get':
    status_code, res_json = self.get_method(url, data, headers)
    elif method == 'post':
    status_code, res_json = self.post_method(url, files, data, headers)
    elif method == 'put':
    status_code, res_json = self.put_method(url, data, headers)
    else:
    status_code, res_json = self.delete_method(url, data, headers)
    return status_code, json.dumps(res_json, ensure_ascii=False, sort_keys=False, indent=2)  # 对json数据进行格式化输出

    3、operationExcelSheet.py

    ——openpyxl是python第三方库,使用前需要先安装(具体使用可以百度一下)

    ——主要是对Excel表格里面的不同sheet进行读写操作,就不啰嗦啦,看源码吧!

    [code]#!/usr/bin/python3
    # coding=utf-8
    # Excel有多个sheet
    
    import os
    from openpyxl import load_workbook
    
    proDir = os.path.split(os.path.realpath(__file__))[0]
    excelPath = os.path.join(proDir, "../testDataFile/TestCase.xlsx")
    
    class OperationExcel:
    def __init__(self):
    self.open_excel = load_workbook(excelPath)  # 打开Excel表格
    
    def open_excel_sheet(self, sheet_name):
    """设置需要操作的sheet
    :param sheet_name: 表名
    :return:
    """
    return self.open_excel[sheet_name]
    
    def from_ab_get_data(self, sheet_name, cell, row):
    """通过单元格获取数据,例如:A2
    :param sheet_name: 表名
    :param cell: 所在列A, B, ...
    :param row: 所在行1, 2, ...
    :return: 对应单元格的值
    """
    open_sheet = self.open_excel_sheet(sheet_name)
    value = open_sheet[cell + str(row)].value
    return value
    
    def from_xy_get_data(self, sheet_name, x, y):
    """ 通过单元格坐标获取数据,例如:(1, 2)
    :param sheet_name: 表名
    :param x: 横坐标x
    :param y: 纵坐标y
    :return:返回该坐标(x, y)对应的数据
    """
    open_sheet = self.open_excel_sheet(sheet_name)
    value = open_sheet.cell(x, y).value
    return value
    
    def write_data(self, sheet_name, cell, row, write_value):
    """写入数据
    :param sheet_name: 表名
    :param cell: 所在列A, B, ...
    :param row: 所在行1, 2, ...
    :param write_value: 写入的值
    :return:
    """
    wb = load_workbook(filename=excelPath)
    ws = wb[sheet_name]
    ws[cell + str(row)] = write_value
    wb.save(filename=excelPath)

    4、operationJson.py

    ——json是python标准库,不需要安装

    ——主要是对json数据文件进行读写操作,这里默认对data.json这个文件进行操作,如果要指定操作某个json文件只需要在调用该方法的时候传入json文件路径即可,其它也没啥好说的,看源码吧!

    [code]#!/usr/bin/python3
    # coding=utf-8
    import json
    import os
    
    proDir = os.path.split(os.path.realpath(__file__))[0]
    jsonPath = os.path.join(proDir, "../testDataFile/data.json")
    
    class OperationJson:
    def __init__(self, file_name=None):
    if file_name:
    self.file_name = file_name
    else:
    self.file_name = jsonPath
    
    def open_json(self):
    """打开json文件
    :return:返回json文件数据
    """
    with open(self.file_name, 'r') as fp:
    data = json.load(fp)
    return data
    fp.close()
    
    def key_get_data(self, key):
    """通过key值获取数据
    :param key: 需要获取的值对应的key
    :return:
    """
    data = self.open_json()[key]
    return data
    
    def write_data(self, w_data, key1, key2=None):
    """修改json数据
    :param w_data: 修改后的数据
    :param key1: 要修改的键值1
    :param key2: 要修改的键值2
    :return:
    """
    data_dict = self.open_json()
    if key2 == None:
    data_dict[key1] = w_data
    else:
    data_dict[key1][key2] = w_data
    with open(self.file_name, 'w') as fp:
    fp.write(json.dumps(data_dict, ensure_ascii=False, sort_keys=False, indent=2))  # 对写入的json数据进行格式化
    fp.close()

    5、readTestData.py

    ——该文件是读取测试数据的,这个比较简单了,就是结合operationExcelSheet.py和operationJson.py一起使用,看具体源码吧!

    ——因为我在执行测试的时候有遇到python不能识别true和false,所以我在开头定义全局变量false = False 和 true = True

    [code]#!/usr/bin/python3
    # coding=utf-8
    from common.operationExcelSheet import OperationExcel
    from common.operationJson import OperationJson
    from config.readConfig import ReadConfig
    
    false = False
    true = True
    
    class ReadTestData:
    def __init__(self, file_name=None):
    self.open_excel = OperationExcel()
    self.set_excel = ReadConfig()
    if file_name:
    self.open_json = OperationJson(file_name)
    else:
    self.open_json = OperationJson()
    
    def get_module(self, sheet_name, row):
    cell = self.set_excel.get_excel('module')
    module = self.open_excel.from_ab_get_data(sheet_name, cell, row)
    return module
    
    def get_api_name(self, sheet_name, row):
    cell = self.set_excel.get_excel('api_name')
    case_id = self.open_excel.from_ab_get_data(sheet_name, cell, row)
    return case_id
    
    def get_case_id(self, sheet_name, row):
    cell = self.set_excel.get_excel('case_id')
    case_id = self.open_excel.from_ab_get_data(sheet_name, cell, row)
    return case_id
    
    def get_case_name(self, sheet_name, row):
    cell = self.set_excel.get_excel('case_name')
    case_title = self.open_excel.from_ab_get_data(sheet_name, cell, row)
    return case_title
    
    def get_url(self, sheet_name, row):
    cell = self.set_excel.get_excel('url')
    url = self.open_excel.from_ab_get_data(sheet_name, cell, row)
    return url
    
    def get_premise(self, sheet_name, row):
    cell = self.set_excel.get_excel('premise')
    premise = self.open_excel.from_ab_get_data(sheet_name, cell, row)
    return premise
    
    def get_header(self, sheet_name, row):
    cell = self.set_excel.get_excel('header')
    headers_key = self.open_excel.from_ab_get_data(sheet_name, cell, row)
    headers = self.open_json.key_get_data(headers_key)
    return headers
    
    def get_method(self, sheet_name, row):
    cell = self.set_excel.get_excel('method')
    method = self.open_excel.from_ab_get_data(sheet_name, cell, row)
    return method
    
    def get_request_data(self, sheet_name, row):
    cell = self.set_excel.get_excel('data')
    request_key = self.open_excel.from_ab_get_data(sheet_name, cell, row)
    # request_list = request_str.split(',')
    # print(request_list)
    request_data = self.open_json.key_get_data(request_key)
    return request_data
    
    def get_param(self, sheet_name, row):
    cell = self.set_excel.get_excel('param')
    param = self.open_excel.from_ab_get_data(sheet_name, cell, row)
    request_param = self.open_json.key_get_data(param)
    return request_param
    
    def get_check(self, sheet_name, row):
    cell = self.set_excel.get_excel('check')
    check = self.open_excel.from_ab_get_data(sheet_name, cell, row)
    return check
    
    def get_expect_result(self, sheet_name, row):
    cell = self.set_excel.get_excel('expected')
    expect_result = self.open_excel.from_ab_get_data(sheet_name, cell, row)
    expect_result_dict = eval(expect_result)
    return expect_result_dict
    
    def get_return_data(self, sheet_name, row):
    cell = self.set_excel.get_excel('return')
    return_data = self.open_excel.from_ab_get_data(sheet_name, cell, row)
    return return_data
    
    def get_rely_data(self, sheet_name, row):
    cell = self.set_excel.get_excel('rely')
    data = self.open_excel.from_ab_get_data(sheet_name, cell, row)
    rely_data = self.open_json.key_get_data(data)
    return rely_data

    6、sendEmail.py

    ——该文件是构造发送邮件的方法,用到python标准库smtplib和email

    ——可以获取最新的测试报告,把最新的测试报告以文本和附件的形式发送,看源码吧~

    [code]#!/usr/bin/python3
    # coding=utf-8
    import smtplib
    import os
    from common.myLog import MyLog
    from email.mime.text import MIMEText
    from email.mime.multipart import MIMEMultipart
    from email.header import Header
    from config.readConfig import ReadConfig
    
    # 路径
    path_dir = str(os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir)))
    reportpath = path_dir + '/report'
    
    local_readConfig = ReadConfig()
    
    class SendEmail:
    def __init__(self):
    global host, user, password, sender, title
    host = local_readConfig.get_email('mail_host')  # 邮箱服务器
    user = local_readConfig.get_email('mail_user')  # 发件人用户名
    password = local_readConfig.get_email('mail_pass')  # 发件人邮箱授权码,非登录密码
    sender = local_readConfig.get_email('sender')  # 发件人邮箱
    title = local_readConfig.get_email('title')  # 邮件标题
    self.logger = MyLog()
    self.receive_user = local_readConfig.get_email('receive_user')  # 收件人邮箱
    self.receive_user_list = []
    for i in str(self.receive_user).split('/'):
    self.receive_user_list.append(i)
    
    def send_email(self):
    """把最新的测试报告以邮件的方式发送"""
    # 构造邮件
    file_new = self.get_new_report()
    f = open(file_new, 'rb')
    content = f.read()
    message = MIMEMultipart()
    message['From'] = "{}".format(sender)  # 发件人
    message['To'] = ",".join(self.receive_user_list)  # 收件人
    message['Subject'] = Header(title, 'utf-8')  # 标题
    message.attach(MIMEText(content, 'html', 'utf-8'))
    
    # 添加附件
    filename = file_new[-31:]
    att = MIMEText(content, 'base64', 'utf-8')
    att["Content-Type"] = 'application/octet-stream'
    att["Content-Disposition"] = 'attachment; filename=%s' % filename
    message.attach(att)
    
    # 发送邮件
    try:
    server = smtplib.SMTP()
    server.connect(host)
    server.login(user, password)  # 登录验证
    server.sendmail(sender, self.receive_user_list, message.as_string())  # 发送
    server.quit()  # 关闭
    self.logger.info("邮件发送成功!")
    except smtplib.SMTPException as e:
    # print("邮件发送失败!")
    self.logger.error("邮件发送失败!请检查邮件配置%s" % e)
    
    def get_new_report(self):
    """获取最新的测试报告"""
    lists = os.listdir(reportpath)
    if lists:
    lists.sort(key=lambda fn: os.path.getmtime(reportpath + '\\' + fn))
    file_new = os.path.join(reportpath, lists[-1])
    return file_new

    四、测试用例代码

    测试用例代码文件名一定要以test开头命名,用的是unittest框架进行编写的,测试过程分三步:

    • 第一步:获取请求数据
    • 第二步:发送请求,获取返回结果数据
    • 第三步:对结果数据进行断言

    最好把每一步的操作都打印到log里面,方便测试结束后查看,嗯...看源码吧!

    [code]#!/usr/bin/python3
    # coding=utf-8
    import json
    import os
    import unittest
    
    from common.httpSet import HttpMethod
    from common.myLog import MyLog
    from common.operationJson import OperationJson
    from common.readTestData import ReadTestData
    from config.readConfig import ReadConfig
    from common.getRunLine import get_run_line
    
    proDir = os.path.split(os.path.realpath(__file__))[0]
    file_name = os.path.join(proDir, "../../testDataFile/orchestrator_account.json")
    print('file_name:%s' % file_name)
    
    class LoginTest(unittest.TestCase):
    def setUp(self):
    self.data = ReadTestData(file_name)
    self.hea_data = ReadTestData()
    self.http = HttpMethod()
    self.config = ReadConfig()
    self.log = MyLog()
    self.json = OperationJson()
    self.sheet = 'app_test_case'
    self.row = list(range(2, 7))
    self.log.info(message="----------测试开始----------", name="test01_OrcLogin.py")
    
    def tearDown(self):
    self.log.info(message="----------测试结束----------", name="test01_OrcLogin.py")
    
    def test_login01(self):
    """orc admin正常登录"""
    self.log.info(message="test_login01", name="test01_OrcLogin.py", line=get_run_line())
    # 获取测试数据
    method = self.data.get_method(self.sheet, self.row[0])
    url = self.config.get_base_url() + self.data.get_url(self.sheet, self.row[0])
    headers = self.hea_data.get_header(self.sheet, self.row[0])
    data = self.data.get_request_data(self.sheet, self.row[0])
    self.log.info(message="第一步: 获取请求数据")
    self.log.info(message="请求方法:%s" % method)
    self.log.info(message="请求接口:%s" % url)
    self.log.info(message="请求数据:%s" % data)
    
    # 发送请求
    status_code, res_json = self.http.http_method(method=method, url=url, data=data, headers=headers)
    dict_json = json.loads(res_json)  # 把json数据转换成字典对象
    # print("dict_json:%s" % dict_json)
    # print("dict_json type:%s" % type(dict_json))
    # print("dict_json type:%s" % type(res_json))
    self.log.info(message="第二步:发送请求,获取返回数据:")
    self.log.info(message="%s" % res_json)
    if dict_json["status"]:
    orc_token = dict_json["orchestrator_admin_token"]  # 提取orc_token
    self.log.info(message="提取orc_token", name="test_login01")
    self.log.info(message="%s" % orc_token, name="test_login01")
    authorization = "Bearer " + orc_token
    self.json.write_data(authorization, "orc_token_header", "Authorization")  # 把orc_token写入json文件
    
    # 断言
    self.log.info(message="第三步:断言")
    self.assertEqual(status_code, 200, msg=">>>接口请求失败")
    self.assertTrue(dict_json["status"], msg=">>>断言失败,实际返回结果:%s" % dict_json)
    self.assertEqual(dict_json["username"], "orc_admin",
    msg=">>>断言失败,实际返回值是:%s" % dict_json["username"])
    
    def test_login02(self):
    """登录失败,密码错误"""
    self.log.info(message="test_login02", name="test01_OrcLogin.py", line=get_run_line())
    # 获取测试数据
    method = self.data.get_method(self.sheet, self.row[1])
    url = self.config.get_base_url() + self.data.get_url(self.sheet, self.row[1])
    headers = self.hea_data.get_header(self.sheet, self.row[1])
    data = self.data.get_request_data(self.sheet, self.row[1])
    expect = self.data.get_expect_result(self.sheet, self.row[1])
    self.log.info(message="第一步: 获取请求数据")
    self.log.info(message="请求方法:%s" % method)
    self.log.info(message="请求接口:%s" % url)
    self.log.info(message="请求数据:%s" % data)
    self.log.info(message="期望结果:%s" % expect)
    
    # 发送请求
    status_code, res_json = self.http.http_method(method=method, url=url, data=data, headers=headers)
    dict_json = json.loads(res_json)  # 把json数据转换成字典对象
    self.log.info(message="第二步:发送请求,获取返回数据:")
    self.log.info(message="%s" % res_json)
    self.log.info(message="第三步:断言")
    # 断言
    self.assertEqual(status_code, 200, msg=">>>接口请求失败")
    self.assertFalse(dict_json["status"], msg=">>>断言失败,实际返回结果:%s" % dict_json)
    self.assertEqual(dict_json["err"]["code"], expect["err"]["code"],
    msg=">>>断言失败,实际返回结果:%s" % dict_json["err"]["code"])
    self.assertEqual(dict_json["err"]["message"], expect["err"]["message"],
    msg=">>>断言失败,实际返回结果:%s" % dict_json["err"]["message"])
    
    def test_login03(self):
    """登录失败,账户不存在"""
    self.log.info(message="test_login03", name="test01_OrcLogin.py", line=get_run_line())
    # 获取测试数据
    method = self.data.get_method(self.sheet, self.row[2])
    url = self.config.get_base_url() + self.data.get_url(self.sheet, self.row[2])
    headers = self.hea_data.get_header(self.sheet, self.row[2])
    data = self.data.get_request_data(self.sheet, self.row[2])
    expect = self.data.get_expect_result(self.sheet, self.row[2])
    self.log.info(message="第一步: 获取请求数据")
    self.log.info(message="请求方法:%s" % method)
    self.log.info(message="请求接口:%s" % url)
    self.log.info(message="请求数据:%s" % data)
    self.log.info(message="期望结果:%s" % expect)
    
    # 发送请求
    status_code, res_json = self.http.http_method(method=method, url=url, data=data, headers=headers)
    dict_json = json.loads(res_json)  # 把json数据转换成字典对象
    self.log.info(message="第二步:发送请求,获取返回数据:")
    self.log.info(message="%s" % res_json)
    self.log.info(message="第三步:断言")
    
    # 断言
    self.assertEqual(status_code, 200, msg=">>>接口请求失败")
    self.assertFalse(dict_json["status"], msg=">>>断言失败,实际返回结果:%s" % dict_json)
    self.assertEqual(dict_json["err"]["code"], expect["err"]["code"],
    msg=">>>断言失败,实际返回结果:%s" % dict_json["err"]["code"])
    self.assertEqual(dict_json["err"]["message"], expect["err"]["message"],
    msg=">>>断言失败,实际返回结果:%s" % dict_json["err"]["message"])
    
    # @unittest.skip("跳过测试")
    def test_login04(self):
    """登录失败,缺少username字段"""
    self.log.info(message="test_login04", name="test01_OrcLogin.py", line=get_run_line())
    # 获取测试数据
    method = self.data.get_method(self.sheet, self.row[3])
    url = self.config.get_base_url() + self.data.get_url(self.sheet, self.row[3])
    headers = self.hea_data.get_header(self.sheet, self.row[3])
    data = self.data.get_request_data(self.sheet, self.row[3])
    expect = self.data.get_expect_result(self.sheet, self.row[3])
    self.log.info(message="第一步: 获取请求数据")
    self.log.info(message="请求方法:%s" % method)
    self.log.info(message="请求接口:%s" % url)
    self.log.info(message="请求数据:%s" % data)
    self.log.info(message="期望结果:%s" % expect)
    
    # 发送请求
    status_code, res_json = self.http.http_method(method=method, url=url, data=data, headers=headers)
    dict_json = json.loads(res_json)  # 把json数据转换成字典对象
    self.log.info(message="第二步:发送请求,获取返回数据:")
    self.log.info(message="%s" % res_json)
    self.log.info(message="第三步:断言")
    
    # 断言
    self.assertEqual(status_code, 200, msg=">>>接口请求失败")
    self.assertFalse(dict_json["status"], msg=">>>实际返回结果:%s" % dict_json)
    self.assertEqual(dict_json["err"]["code"], expect["err"]["code"],
    msg=">>>断言失败,实际返回结果:%s" % dict_json["err"]["code"])
    self.assertEqual(dict_json["err"]["message"], expect["err"]["message"],
    msg=">>>断言失败,实际返回结果:%s" % dict_json["err"]["message"])
    
    def test_login05(self):
    """登录失败,缺少password字段"""
    self.log.info(message="test_login05", name="test01_OrcLogin.py", line=get_run_line())
    # 获取测试数据
    method = self.data.get_method(self.sheet, self.row[4])
    url = self.config.get_base_url() + self.data.get_url(self.sheet, self.row[4])
    headers = self.hea_data.get_header(self.sheet, self.row[4])
    data = self.data.get_request_data(self.sheet, self.row[4])
    expect = self.data.get_expect_result(self.sheet, self.row[4])
    self.log.info(message="第一步: 获取请求数据")
    self.log.info(message="请求方法:%s" % method)
    self.log.info(message="请求接口:%s" % url)
    self.log.info(message="请求数据:%s" % data)
    self.log.info(message="期望结果:%s" % expect)
    
    # 发送请求
    status_code, res_json = self.http.http_method(method=method, url=url, data=data, headers=headers)
    dict_json = json.loads(res_json)  # 把json数据转换成字典对象
    self.log.info(message="第二步:发送请求,获取返回数据:")
    self.log.info(message="%s" % res_json)
    self.log.info(message="第三步:断言")
    
    # 断言
    self.assertEqual(status_code, 200, msg=">>>接口请求失败")
    self.assertFalse(dict_json["status"], msg=">>>断言失败,实际返回结果:%s" % dict_json)
    self.assertEqual(dict_json["err"]["code"], expect["err"]["code"],
    msg=">>>断言失败,实际返回结果:%s" % dict_json["err"]["code"])
    self.assertEqual(dict_json["err"]["message"], expect["err"]["message"],
    msg=">>>断言失败,实际返回结果:%s" % dict_json["err"]["message"])
    
    if __name__ == "__main__":
    unittest.main()

    五、执行文件

    测试用例代码写完之后,现在就要写一个执行文件,只需要执行该文件就可以跑所有测试用例了,它就是runcase.py文件

    1、添加测试套件

    把所有要执行的测试用例写到case_list.txt文件,如果是有些用例想暂时不执行,就可以在开头添加"#"

    2、执行测试

    在使用之前需要下载HTMLTestRunner包(可以官方下载pip install html-testRunner),也可以在网上找一个别的大神做过优化的HTMLTestRunner包下载下来直接用(我也是这么干的~~)

    用到测试报告模块HTMLTestRunner来执行测试套件并生成html测试报告,废话就不多说了,看源码吧!

    [code]#!/usr/bin/python3
    # coding=utf-8
    import os
    import sys
    import time
    import unittest
    
    from common.HTMLTestRunnerNew import HTMLTestRunner
    from common.myLog import MyLog
    from common.sendEmail import SendEmail
    from config.readConfig import ReadConfig
    from reset_env import ResetEnv
    
    proDir = os.path.split(os.path.realpath(__file__))[0]
    case_list_path = os.path.join(proDir, "case_list.txt")
    test_case_path = os.path.join(proDir, "testCase")
    
    class RunTest:
    
    def __init__(self):
    self.logger = MyLog()
    self.readconfig = ReadConfig()
    self.send_mail = SendEmail()
    self.env = ResetEnv()
    self.is_send = self.readconfig.get_email("is_send")
    
    # 测试报告基本信息
    self.testers = "HaiYi"
    self.title = "XXX接口测试报告"
    self.description = "测试环境:Develop,IP地址:%s" % self.readconfig.get_base_url()
    
    # 导入TestCase目录下的全部测试用例
    # self.discover = unittest.defaultTestLoader.discover(test_case_path, pattern='test*.py')
    
    # 导入指定测试用例列表文件
    self.case_list_file = case_list_path
    self.case_list_list = []
    
    # 重置测试环境
    self.is_env = self.env.delete_db()
    
    def get_case_list(self):
    """获取需要进行运行的测试用例列表"""
    fb = open(self.case_list_file)
    for i in fb.readlines():
    data = str(i)
    if data != '' and not data.startswith('#'):
    self.case_list_list.append(data.replace('\n', ''))
    fb.close()
    print(self.case_list_list)
    
    def set_test_suite(self):
    """设置添加测试套件"""
    self.get_case_list()
    test_suite = unittest.TestSuite()
    suite_module = []
    for case in self.case_list_list:
    case_name = case.split('/')[-1]
    print(case_name + '.py')
    discover = unittest.defaultTestLoader.discover(test_case_path, pattern=case_name + '.py')
    suite_module.append(discover)
    
    if len(suite_module) > 0:
    for suite in suite_module:
    for test_name in suite:
    test_suite.addTest(test_name)
    else:
    return None
    return test_suite
    
    def run_test(self):
    """执行测试"""
    if self.is_env:
    try:
    test_suite = self.set_test_suite()  # 获取测试套件
    now = time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime(time.time()))  # 获取当前日期时间
    public_path = os.path.dirname(os.path.abspath(sys.argv[0]))
    # filename = public_path + "/report/" + now + "_report.html"  # 保存的报告路径和名称
    filename = public_path + "/report/" + "index.html"  # 保存的报告路径和名称
    print("测试报告目录:%s" % filename)
    fp = open(filename, 'wb')
    runner = HTMLTestRunner(stream=fp,
    tester=self.testers,
    title=self.title,
    description=self.description
    )
    if test_suite is not None:
    runner.run(test_suite)  # 执行指定添加的测试用例套件
    # runner.run(self.discover) # 执行TestCase目录下的全部测试用例
    else:
    self.logger.info("Have no case to test.")
    except Exception as e:
    self.logger.error(str(e))
    finally:
    self.logger.warning("---------------All Test End---------------")
    fp.close()
    # 发送电子邮件
    if self.is_send == 'yes':
    self.send_mail.send_email()
    self.logger.warning("测试报告已发送电子邮件!")
    elif self.is_send == 'no':
    self.logger.warning("测试报告不发送电子邮件!")
    else:
    self.logger.error("测试报告发送电子邮件为未知状态,请检查配置!")
    else:
    self.logger.warning("测试环境清理失败的,无法继续执行测试!!!")
    
    if __name__ == "__main__":
    run = RunTest()
    run.run_test()

    到这里就全部介绍完成了,我知道有许多不完善的地方,在开发过程中也遇到了许多的问题,希望大神们多多指教,谢谢!

    下面奉上两张测试报告的截图,一个是HTML的,一个邮箱的

    HTML文件测试报告:

    邮箱测试报告:

  • 内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
    标签: