HttpRunner3源码阅读:8. 用例文件生成并格式化make
2021-08-09 14:57
741 查看
make
这个文件中主要实现了 相关的文件生成,目录生成
make.py,其实这个文件应该在client.py前看的
可用资料
jinja2[模板,常用于Python Web 前后端不分离]: https://jinja.palletsprojects.com/en/3.0.x/intro/ black[代码格式化]: https://black.readthedocs.io/en/stable/ from string import Template 也可实现字符串模板替换 string包下的Template: https://docs.python.org/3/library/string.html#format-string-syntax argparse[命令行选项]:https://docs.python.org/zh-cn/3/library/argparse.html
导包
import os import string import subprocess import sys from typing import Text, List, Tuple, Dict, Set, NoReturn # 模板库,写过flask 的应该很熟悉,其灵感来自django import jinja2 from loguru import logger from sentry_sdk import capture_exception from httprunner import exceptions, __version__ # v2 v3 版本兼容 from httprunner.compat import ( ensure_testcase_v3_api, ensure_testcase_v3, convert_variables, ensure_path_sep, ) # 导文件处理的 from httprunner.loader import ( load_folder_files, load_test_file, load_testcase, load_testsuite, load_project_meta, convert_relative_project_root_dir, ) # 验证器 from httprunner.response import uniform_validator # 变量合并,多进程处理 from httprunner.utils import merge_variables, is_support_multiprocessing
源码附注释
""" cache converted pytest files, avoid duplicate making pytest测试文件,缓存字典 """ pytest_files_made_cache_mapping: Dict[Text, Text] = {} """ save generated pytest files to run, except referenced testcase 运行集合,排除引入的测试用例 """ pytest_files_run_set: Set = set() # httprunner3, test_xx.py 模板 __TEMPLATE__ = jinja2.Template( """# NOTE: Generated By HttpRunner v{{ version }} # FROM: {{ testcase_path }} {% if imports_list and diff_levels > 0 %} import sys from pathlib import Path sys.path.insert(0, str(Path(__file__){% for _ in range(diff_levels) %}.parent{% endfor %})) {% endif %} {% if parameters %} import pytest from httprunner import Parameters {% endif %} from httprunner import HttpRunner, Config, Step, RunRequest, RunTestCase {% for import_str in imports_list %} {{ import_str }} {% endfor %} class {{ class_name }}(HttpRunner): {% if parameters %} @pytest.mark.parametrize("param", Parameters({{parameters}})) def test_start(self, param): super().test_start(param) {% endif %} config = {{ config_chain_style }} teststeps = [ {% for step_chain_style in teststeps_chain_style %} {{ step_chain_style }}, {% endfor %} ] if __name__ == "__main__": {{ class_name }}().test_start() """ ) # 不同操作系统地址符 兼容处理, 返回绝对地址 def __ensure_absolute(path: Text) -> Text: if path.startswith("./"): # Linux/Darwin, hrun ./test.yml path = path[len("./") :] elif path.startswith(".\\"): # Windows, hrun .\\test.yml path = path[len(".\\") :] path = ensure_path_sep(path) project_meta = load_project_meta(path) if os.path.isabs(path): absolute_path = path else: absolute_path = os.path.join(project_meta.RootDir, path) if not os.path.isfile(absolute_path): logger.error(f"Invalid testcase file path: {absolute_path}") sys.exit(1) return absolute_path def ensure_file_abs_path_valid(file_abs_path: Text) -> Text: """ ensure file path valid for pytest, handle cases when directory name includes dot/hyphen/space 确保地址有效 Args: file_abs_path: absolute file path Returns: ensured valid absolute file path """ project_meta = load_project_meta(file_abs_path) raw_abs_file_name, file_suffix = os.path.splitext(file_abs_path) file_suffix = file_suffix.lower() raw_file_relative_name = convert_relative_project_root_dir(raw_abs_file_name) if raw_file_relative_name == "": return file_abs_path path_names = [] for name in raw_file_relative_name.rstrip(os.sep).split(os.sep): # 包含数字0~9的字符串 string.digits if name[0] in string.digits: # ensure file name not startswith digit # 19 => T19, 2C => T2C name = f"T{name}" if name.startswith("."): # avoid ".csv" been converted to "_csv" pass else: # handle cases when directory name includes dot/hyphen/space name = name.replace(" ", "_").replace(".", "_").replace("-", "_") path_names.append(name) new_file_path = os.path.join( project_meta.RootDir, f"{os.sep.join(path_names)}{file_suffix}" ) return new_file_path # 按需生成测试模块 加让 __init__.py def __ensure_testcase_module(path: Text) -> NoReturn: """ ensure pytest files are in python module, generate __init__.py on demand """ init_file = os.path.join(os.path.dirname(path), "__init__.py") if os.path.isfile(init_file): return with open(init_file, "w", encoding="utf-8") as f: f.write("# NOTICE: Generated By HttpRunner. DO NOT EDIT!\n") def convert_testcase_path(testcase_abs_path: Text) -> Tuple[Text, Text]: """convert single YAML/JSON testcase path to python file""" testcase_new_path = ensure_file_abs_path_valid(testcase_abs_path) dir_path = os.path.dirname(testcase_new_path) file_name, _ = os.path.splitext(os.path.basename(testcase_new_path)) testcase_python_abs_path = os.path.join(dir_path, f"{file_name}_test.py") # convert title case, e.g. request_with_variables => RequestWithVariables name_in_title_case = file_name.title().replace("_", "") return testcase_python_abs_path, name_in_title_case def format_pytest_with_black(*python_paths: Text) -> NoReturn: # 格式化Python文件 Black logger.info("format pytest cases with black ...") try: if is_support_multiprocessing() or len(python_paths) <= 1: subprocess.run(["black", *python_paths]) else: logger.warning( f"this system does not support multiprocessing well, format files one by one ..." ) # 运行被 arg 描述的指令。等待指令完成,然后返回一个 CompletedProcess 实例。subprocess.run [subprocess.run(["black", path]) for path in python_paths] except subprocess.CalledProcessError as ex: capture_exception(ex) logger.error(ex) sys.exit(1) except FileNotFoundError: err_msg = """ missing dependency tool: black install black manually and try again: $ pip install black """ logger.error(err_msg) sys.exit(1) def make_config_chain_style(config: Dict) -> Text: # 模板文件中Config 格式化 config_chain_style = f'Config("{config["name"]}")' if config["variables"]: variables = config["variables"] config_chain_style += f".variables(**{variables})" if "base_url" in config: config_chain_style += f'.base_url("{config["base_url"]}")' if "verify" in config: config_chain_style += f'.verify({config["verify"]})' if "export" in config: config_chain_style += f'.export(*{config["export"]})' if "weight" in config: config_chain_style += f'.locust_weight({config["weight"]})' return config_chain_style def make_request_chain_style(request: Dict) -> Text: # Reuests部分格式化 method = request["method"].lower() url = request["url"] request_chain_style = f'.{method}("{url}")' if "params" in request: params = request["params"] request_chain_style += f".with_params(**{params})" if "headers" in request: headers = request["headers"] request_chain_style += f".with_headers(**{headers})" if "cookies" in request: cookies = request["cookies"] request_chain_style += f".with_cookies(**{cookies})" if "data" in request: data = request["data"] if isinstance(data, Text): data = f'"{data}"' request_chain_style += f".with_data({data})" if "json" in request: req_json = request["json"] if isinstance(req_json, Text): req_json = f'"{req_json}"' request_chain_style += f".with_json({req_json})" if "timeout" in request: timeout = request["timeout"] request_chain_style += f".set_timeout({timeout})" if "verify" in request: verify = request["verify"] request_chain_style += f".set_verify({verify})" if "allow_redirects" in request: allow_redirects = request["allow_redirects"] request_chain_style += f".set_allow_redirects({allow_redirects})" if "upload" in request: upload = request["upload"] request_chain_style += f".upload(**{upload})" return request_chain_style def make_teststep_chain_style(teststep: Dict) -> Text: # Step 部分格式化 if teststep.get("request"): # 全新请求 step_info = f'RunRequest("{teststep["name"]}")' elif teststep.get("testcase"): # 调用其他测试用例 step_info = f'RunTestCase("{teststep["name"]}")' else: raise exceptions.TestCaseFormatError(f"Invalid teststep: {teststep}") if "variables" in teststep: variables = teststep["variables"] step_info += f".with_variables(**{variables})" if "setup_hooks" in teststep: setup_hooks = teststep["setup_hooks"] for hook in setup_hooks: if isinstance(hook, Text): step_info += f'.setup_hook("{hook}")' elif isinstance(hook, Dict) and len(hook) == 1: assign_var_name, hook_content = list(hook.items())[0] step_info += f'.setup_hook("{hook_content}", "{assign_var_name}")' else: raise exceptions.TestCaseFormatError(f"Invalid setup hook: {hook}") if teststep.get("request"): step_info += make_request_chain_style(teststep["request"]) elif teststep.get("testcase"): testcase = teststep["testcase"] call_ref_testcase = f".call({testcase})" step_info += call_ref_testcase if "teardown_hooks" in teststep: teardown_hooks = teststep["teardown_hooks"] for hook in teardown_hooks: if isinstance(hook, Text): step_info += f'.teardown_hook("{hook}")' elif isinstance(hook, Dict) and len(hook) == 1: assign_var_name, hook_content = list(hook.items())[0] step_info += f'.teardown_hook("{hook_content}", "{assign_var_name}")' else: raise exceptions.TestCaseFormatError(f"Invalid teardown hook: {hook}") if "extract" in teststep: # request step step_info += ".extract()" for extract_name, extract_path in teststep["extract"].items(): step_info += f""".with_jmespath('{extract_path}', '{extract_name}')""" if "export" in teststep: # reference testcase step export: List[Text] = teststep["export"] step_info += f".export(*{export})" if "validate" in teststep: step_info += ".validate()" for v in teststep["validate"]: validator = uniform_validator(v) assert_method = validator["assert"] check = validator["check"] if '"' in check: # e.g. body."user-agent" => 'body."user-agent"' check = f"'{check}'" else: check = f'"{check}"' expect = validator["expect"] if isinstance(expect, Text): expect = f'"{expect}"' message = validator["message"] if message: step_info += f".assert_{assert_method}({check}, {expect}, '{message}')" else: step_info += f".assert_{assert_method}({check}, {expect})" return f"Step({step_info})" # 生成测试用例文件 def make_testcase(testcase: Dict, dir_path: Text = None) -> Text: """convert valid testcase dict to pytest file path""" # ensure compatibility with testcase format v2 testcase = ensure_testcase_v3(testcase) # validate testcase format load_testcase(testcase) testcase_abs_path = __ensure_absolute(testcase["config"]["path"]) logger.info(f"start to make testcase: {testcase_abs_path}") testcase_python_abs_path, testcase_cls_name = convert_testcase_path( testcase_abs_path ) if dir_path: testcase_python_abs_path = os.path.join( dir_path, os.path.basename(testcase_python_abs_path) ) global pytest_files_made_cache_mapping if testcase_python_abs_path in pytest_files_made_cache_mapping: return testcase_python_abs_path config = testcase["config"] config["path"] = convert_relative_project_root_dir(testcase_python_abs_path) config["variables"] = convert_variables( config.get("variables", {}), testcase_abs_path ) # prepare reference testcase imports_list = [] teststeps = testcase["teststeps"] for teststep in teststeps: if not teststep.get("testcase"): continue # make ref testcase pytest file ref_testcase_path = __ensure_absolute(teststep["testcase"]) test_content = load_test_file(ref_testcase_path) if not isinstance(test_content, Dict): raise exceptions.TestCaseFormatError(f"Invalid teststep: {teststep}") # api in v2 format, convert to v3 testcase if "request" in test_content and "name" in test_content: test_content = ensure_testcase_v3_api(test_content) test_content.setdefault("config", {})["path"] = ref_testcase_path ref_testcase_python_abs_path = make_testcase(test_content) # override testcase export ref_testcase_export: List = test_content["config"].get("export", []) if ref_testcase_export: step_export: List = teststep.setdefault("export", []) step_export.extend(ref_testcase_export) teststep["export"] = list(set(step_export)) # prepare ref testcase class name ref_testcase_cls_name = pytest_files_made_cache_mapping[ ref_testcase_python_abs_path ] teststep["testcase"] = ref_testcase_cls_name # prepare import ref testcase ref_testcase_python_relative_path = convert_relative_project_root_dir( ref_testcase_python_abs_path ) ref_module_name, _ = os.path.splitext(ref_testcase_python_relative_path) ref_module_name = ref_module_name.replace(os.sep, ".") import_expr = f"from {ref_module_name} import TestCase{ref_testcase_cls_name} as {ref_testcase_cls_name}" if import_expr not in imports_list: imports_list.append(import_expr) testcase_path = convert_relative_project_root_dir(testcase_abs_path) # current file compared to ProjectRootDir diff_levels = len(testcase_path.split(os.sep)) data = { "version": __version__, "testcase_path": testcase_path, "diff_levels": diff_levels, "class_name": f"TestCase{testcase_cls_name}", "imports_list": imports_list, "config_chain_style": make_config_chain_style(config), "parameters": config.get("parameters"), "teststeps_chain_style": [ make_teststep_chain_style(step) for step in teststeps ], } # 套入数据,替换模板中变量 content = __TEMPLATE__.render(data) # ensure new file's directory exists dir_path = os.path.dirname(testcase_python_abs_path) if not os.path.exists(dir_path): os.makedirs(dir_path) # 写入文件 with open(testcase_python_abs_path, "w", encoding="utf-8") as f: f.write(content) pytest_files_made_cache_mapping[testcase_python_abs_path] = testcase_cls_name __ensure_testcase_module(testcase_python_abs_path) logger.info(f"generated testcase: {testcase_python_abs_path}") return testcase_python_abs_path # 生成测试套件目录 def make_testsuite(testsuite: Dict) -> NoReturn: """convert valid testsuite dict to pytest folder with testcases""" # validate testsuite format load_testsuite(testsuite) testsuite_config = testsuite["config"] testsuite_path = testsuite_config["path"] testsuite_variables = convert_variables( testsuite_config.get("variables", {}), testsuite_path ) logger.info(f"start to make testsuite: {testsuite_path}") # create directory with testsuite file name, put its testcases under this directory testsuite_path = ensure_file_abs_path_valid(testsuite_path) testsuite_dir, file_suffix = os.path.splitext(testsuite_path) # demo_testsuite.yml => demo_testsuite_yml testsuite_dir = f"{testsuite_dir}_{file_suffix.lstrip('.')}" for testcase in testsuite["testcases"]: # get referenced testcase content testcase_file = testcase["testcase"] testcase_path = __ensure_absolute(testcase_file) testcase_dict = load_test_file(testcase_path) testcase_dict.setdefault("config", {}) testcase_dict["config"]["path"] = testcase_path # override testcase name testcase_dict["config"]["name"] = testcase["name"] # override base_url base_url = testsuite_config.get("base_url") or testcase.get("base_url") if base_url: testcase_dict["config"]["base_url"] = base_url # override verify if "verify" in testsuite_config: testcase_dict["config"]["verify"] = testsuite_config["verify"] # override variables # testsuite testcase variables > testsuite config variables testcase_variables = convert_variables( testcase.get("variables", {}), testcase_path ) testcase_variables = merge_variables(testcase_variables, testsuite_variables) # testsuite testcase variables > testcase config variables testcase_dict["config"]["variables"] = convert_variables( testcase_dict["config"].get("variables", {}), testcase_path ) testcase_dict["config"]["variables"].update(testcase_variables) # override weight if "weight" in testcase: testcase_dict["config"]["weight"] = testcase["weight"] # make testcase testcase_pytest_path = make_testcase(testcase_dict, testsuite_dir) pytest_files_run_set.add(testcase_pytest_path) # 生成的测试文件 缓存进入字典 def __make(tests_path: Text) -> NoReturn: """ make testcase(s) with testcase/testsuite/folder absolute path generated pytest file path will be cached in pytest_files_made_cache_mapping Args: tests_path: should be in absolute path """ logger.info(f"make path: {tests_path}") test_files = [] if os.path.isdir(tests_path): files_list = load_folder_files(tests_path) test_files.extend(files_list) elif os.path.isfile(tests_path): test_files.append(tests_path) else: raise exceptions.TestcaseNotFound(f"Invalid tests path: {tests_path}") for test_file in test_files: if test_file.lower().endswith("_test.py"): pytest_files_run_set.add(test_file) continue try: test_content = load_test_file(test_file) except (exceptions.FileNotFound, exceptions.FileFormatError) as ex: logger.warning(f"Invalid test file: {test_file}\n{type(ex).__name__}: {ex}") continue if not isinstance(test_content, Dict): logger.warning( f"Invalid test file: {test_file}\n" f"reason: test content not in dict format." ) continue # api in v2 format, convert to v3 testcase if "request" in test_content and "name" in test_content: test_content = ensure_testcase_v3_api(test_content) if "config" not in test_content: logger.warning( f"Invalid testcase/testsuite file: {test_file}\n" f"reason: missing config part." ) continue elif not isinstance(test_content["config"], Dict): logger.warning( f"Invalid testcase/testsuite file: {test_file}\n" f"reason: config should be dict type, got {test_content['config']}" ) continue # ensure path absolute test_content.setdefault("config", {})["path"] = test_file # testcase if "teststeps" in test_content: try: testcase_pytest_path = make_testcase(test_content) pytest_files_run_set.add(testcase_pytest_path) except exceptions.TestCaseFormatError as ex: logger.warning( f"Invalid testcase file: {test_file}\n{type(ex).__name__}: {ex}" ) continue # testsuite elif "testcases" in test_content: try: make_testsuite(test_content) except exceptions.TestSuiteFormatError as ex: logger.warning( f"Invalid testsuite file: {test_file}\n{type(ex).__name__}: {ex}" ) continue # invalid format else: logger.warning( f"Invalid test file: {test_file}\n" f"reason: file content is neither testcase nor testsuite" ) def main_make(tests_paths: List[Text]) -> List[Text]: if not tests_paths: return [] for tests_path in tests_paths: tests_path = ensure_path_sep(tests_path) if not os.path.isabs(tests_path): # 如果是个绝对路径 就返回True tests_path = os.path.join(os.getcwd(), tests_path) try: __make(tests_path) except exceptions.MyBaseError as ex: logger.error(ex) sys.exit(1) # format pytest files pytest_files_format_list = pytest_files_made_cache_mapping.keys() # 可视化所有生成的测试py文件 format_pytest_with_black(*pytest_files_format_list) return list(pytest_files_run_set) def init_make_parser(subparsers): """ make testcases: parse command line options and run commands. add_subparsers() 方法通常不带参数地调用并返回一个特殊的动作对象 """ parser = subparsers.add_parser( "make", help="Convert YAML/JSON testcases to pytest cases.", ) parser.add_argument( "testcase_path", nargs="*", help="Specify YAML/JSON testcase file/folder path" ) return parser
相关文章推荐
- HttpRunner3源码阅读:4. loader项目路径加载,用例文件转换、方法字典生成
- HttpRunner3源码阅读:3.工具文件
- HttpRunner3源码阅读:9. 测试用例中的类定义testcase
- httprunner(4)录制生成测试用例
- 根据测试用例的java源码自动生成TestNG的XML文件
- 根据测试用例的java源码自动生成TestNG的XML文件
- 导入android源码有错,R.java文件不能自动生成解决方法 http://caizi12.iteye.com/blog/975125
- Httprunner项目文件和测试用例组织
- 2019-03-18HttpRunnerManager用例配置-03:自定义辅助函数生成随机数(debugtalk.py )
- BT源码阅读兼移植三:种子文件的生成(1)
- gitea 源码阅读笔记 002 生成无依赖单文件可执行包
- HttpRunner3源码阅读:10.测试执行的处理 runner
- Python之使用unittest框架和HTMLTestRunner.py文件实现多个测试用例执行与测试报告生成
- 2019-03-18HttpRunnerManager用例配置-03:自定义辅助函数生成随机数(debugtalk.py )
- HttpRunner用例生成
- 根据测试用例的java源码自动生成TestNG的XML文件
- vivado sdk生成elf文件出错:make: Interrupt/Exception caught (code = 0xc00000fd, addr = 0x4227d3)
- 阅读Sofia-SIP源码五 源码文件文档化的注释
- Url地址重写,利用HttpHander手工编译页面并按需生成静态HTML文件
- spatialhadoop2.3源码阅读(五) grid 索引生成方法(一)