本文已参加「新人创造礼」活动,一同开启创造之路。

前言

一般,主动化测验用例在履行完成后,都会发送一个成果告诉,以提示测验人员或测验leader测验用例的履行成果。如有测验失利的情况,测验人员再去检查详细的测验陈述,检查是哪个场景没有测验经过。当时较为盛行的提示办法有:

  • 邮件
  • 企业微信、钉钉等push音讯

因为我们公司所运用的工作软件是企业微信,因而,在完成测验成果告诉提示的功用时,选用的是企业微信。当时较为盛行的完成办法有两种形式:

  • 企业微信应用告诉:需要在企业微信中创建一个应用,再获取Secret
  • 一般群音讯推送:需要在群中增加一个群机器人(会主动生成webhook_url,以供后续接口调用)

因为办法一需要在企业微信中创建应用(需要管理员操作权限),整体完成起来较为繁琐,因而我选用的是第二种群机器人的完成办法。

一、完成原理及完成作用

1.外部链路流程

利用pytest hook函数实现自动化测试结果推送企业微信

2.内部调用原理及进程

利用pytest hook函数实现自动化测试结果推送企业微信

1)各模块&办法功用:

  • RedisHandler基类:用于初始化redis衔接、查询数据、写入数据
  • CaseCount基类:用于初始化用例核算、获取成功&失利&越过&报错的用例数以及核算用例经过率
  • EnterpriseWechatNotification基类:用于界说发送企业微信音讯的内容模板、界说调用hook_url(群机器人)发送音讯办法
  • hook办法pytest_runtest_makereport:用于获取pytest履行后的测验成果、将成果写入缓存、生成控制台测验陈述
  • hook办法pytest_terminal_summary:用于获取履行成果、调用发送音讯办法发送微信音讯

2)详细调用原理、流程:

① 前提:

  • 已增加企业微信群机器人,并记住hook地址;

利用pytest hook函数实现自动化测试结果推送企业微信

  • python+pytest已编写测验用例;

② pytest运转测验用例,RedisHandler衔接redis,pytest_runtest_makereport获取用例履行成果,并:

  • 调用RedisHandler中的写入缓存办法,将成果写入缓存;
  • 调用CaseCount中的核算用例经过率办法获取用例经过率;
  • 将获取到的各条测验成果分输出到控制台进行展现:↓(Windows本地运转作用)

利用pytest hook函数实现自动化测试结果推送企业微信

③ pytest_terminal_summary办法:

  • 别离调用CaseCount中的获取经过、失利、越过、报错的用例条数的办法(此办法调用RedisHandler中的get_key办法),获取到各个(经过、失利、越过、报错)履行成果核算;
  • 调用CaseCount中的核算用例经过率办法获取用例经过率;
  • 调用EnterpriseWechatNotification中的发送企业微信音讯办法,将获取到的各个(经过、失利、越过、报错)履行成果的数量核算与EnterpriseWechatNotification中预界说的模板进行拼接,发送到企业微信;
  • 将获取到的各个(经过、失利、越过、报错)履行成果与用例经过率一同,输出到控制台展现:↓(Windows本地运转作用)

利用pytest hook函数实现自动化测试结果推送企业微信

二、编码完成

1.各个基类

1)RedisHandler基类

用于初始化redis衔接、查询数据、写入数据

import redis
class RedisHandler:
    def __init__(self, host, port=6379, db=0):
        # 生成客户端衔接,StrictRedis()默认运用衔接池,不用再独自运用ConnectPool
        self.client = redis.StrictRedis(host=host, port=port, db=db)
    def set_string(self, name: str, value, ex=None, px=None, nx=False, xx=False) -> None:
        """
        缓存中写入str(单个)
        :param name: 缓存称号
        :param value: 缓存值
        :param ex: 过期时刻(秒)
        :param px: 过期时刻(毫秒)
        :param nx: 假如设置为True,则只要name不存在时,当时set操作才履行(新增)
        :param xx: 假如设置为True,则只要name不存在时,当时set操作才履行(修正)
        :return:
        """
        self.client.set(name, value=value, ex=ex, px=px, nx=nx, xx=xx)
    def incr(self, key):
        """
        运用incr办法,处理并发问题
        当key不存在时,会先初始为0,每次调用,则会+1
        :param key:
        :return:
        """
        self.client.incr(key)
    def get_key(self, name):
        """读取缓存"""
        result = self.client.get(name)
        return result

2)CaseCount基类

用于初始化用例核算、获取成功&失利&越过&报错的用例数以及核算用例经过率

from api_test.common.redis_handler import RedisHandler
from api_test.config.db_config import DBConfig
class CaseCountName:
    ERROR: str = "error_count"
    FAILED: str = "failed_count"
    PASSED: str = "passed_count"
    SKIP: str = "skip_count"
    TOTAL: str = "total_count"
class CaseCount:
    """
    redis 缓存核算用例履行情况
    """
    def __init__(self):
        self.redis = RedisHandler(host=DBConfig.redis_config.get('host'))  # redis主机地址能够写死在这里,也能够从装备类中获取
    def init_process(self):
        """
        初始化进展、总数、成功数、失利数
        """
        self.redis.set_string(CaseCountName.TOTAL, 0)
        self.redis.set_string(CaseCountName.SKIP, 0)
        self.redis.set_string(CaseCountName.PASSED, 0)
        self.redis.set_string(CaseCountName.FAILED, 0)
        self.redis.set_string(CaseCountName.ERROR, 0)
    def failed_count(self):
        """获取失利用例数"""
        return int(self.redis.get_key(CaseCountName.FAILED))
    def passed_count(self):
        """获取经过用例总数"""
        return int(self.redis.get_key(CaseCountName.PASSED))
    def skip_count(self):
        """获取越过用例数"""
        return int(self.redis.get_key(CaseCountName.SKIP))
    def error_count(self):
        """报错用例数"""
        return int(self.redis.get_key(CaseCountName.ERROR))
    def total_count(self):
        """用例总数"""
        return int(self.redis.get_key(CaseCountName.TOTAL))
    def pass_rate(self):
        """核算用例成功率"""
        try:
            rate = round((self.passed_count() + self.skip_count()) / self.total_count() * 100, 2)
            return rate
        except ZeroDivisionError:
            raise Exception("履行失利,未检测到用例履行数量")

3)EnterpriseWechatNotification基类

用于界说发送企业微信音讯的内容模板、界说调用hook_url(群机器人)发送音讯办法

import os
import json
import requests
import platform
def get_env_from_jenkins(name):
    """从Jenkins中获取大局环境变量"""
    return os.getenv(name) and os.getenv(name).strip()
ProjectName = get_env_from_jenkins("JOB_NAME")  # Jenkins构建项目称号
BUILD_URL = get_env_from_jenkins("BUILD_URL")  # Jenkins构建项目URL
BUILD_NUMBER = get_env_from_jenkins("BUILD_NUMBER")  # Jenkins构建编号
class EnterpriseWechatNotification:
    def __init__(self, hook: list):
        # 企业微信群机器人的hook地址,一个机器人就一个,多个就界说多个,能够写死,也能够写在装备类中
        self.hook_url_list = [f"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={i}" for i in hook]
        # allure生成陈述的地址,Jenkins履行时会用到,Windows暂未装备allure地址
        self.allure_url = f"http://192.168.1.122:8088/jenkins/job/{ProjectName}/{BUILD_NUMBER}/allure/"
        self.header = {'Content-Type': 'application/json'}
    def send_msg(self, result=''):
        """发送企业微信音讯告诉"""
        global payload
        linux_content = f"""** 【{ProjectName}】**
> 项目称号:{ProjectName}
> 构件编号:#{BUILD_NUMBER}
> 测验环境:{platform.system()}
> [陈述链接]({self.allure_url})
> [控制台链接]({BUILD_URL})
{result}"""
        windows_content = f"""** 【auto_test_project】**
> 测验环境:{platform.system()}
{result}"""
        if platform.system() == "Linux":
            payload = {
                "msgtype": "markdown",
                "markdown": {
                    "content": linux_content
                }
            }
        elif platform.system() == "Windows":
            payload = {
                "msgtype": "markdown",
                "markdown": {
                    "content": windows_content
                }
            }
        for hook_url in self.hook_url_list:
            requests.post(url=hook_url, headers=self.header, data=json.dumps(payload))

注意事项: get_env_from_jenkins办法为从Jenkins获取大局变量,检查大局变量的路径为:Jenkins流水线语法-大局变量-env,见下图:

利用pytest hook函数实现自动化测试结果推送企业微信

2.pytest的hook办法,界说在conftest.py中

1)pytest_runtest_makereport

用于获取pytest履行后的测验成果、将成果写入缓存、生成控制台测验陈述

2)pytest_terminal_summary

用于获取履行成果、调用发送音讯办法发送音讯告诉到企业微信

import time
import pytest
from api_test.config.config import HookUrlConf
from api_test.common.send_enterprise_wechat import EnterpriseWechatNotification
from api_test.common.redis_handler import RedisHandler
from api_test.config.db_config import DBConfig
from api_test.common.case_count_control import CaseCountName, CaseCount
redis = RedisHandler(host=DBConfig.redis_config.get('host'))
case_count = CaseCount()
case_count.init_process()  # 初始化Redis中的用例核算缓存数据
@pytest.hookimpl(hookwrapper=True, tryfirst=True)
def pytest_runtest_makereport(item, call):
    """获取测验成果、生成测验陈述"""
    print('------------------------------------')
    out = yield
    report = out.get_result()
    if report.when == 'call':
        # print(f"测验陈述:{report}")
        # print(f"步骤:{report.when}")
        print(f"用例id:{report.nodeid}")
        print(f"用例描述:{str(item.function.__doc__)}")
        print(f"运转成果:{report.outcome}")
        """将用例履行成果写入缓存"""
        if report.outcome == 'passed':
            redis.incr(CaseCountName.PASSED)
        if report.outcome == 'failed':
            redis.incr(CaseCountName.FAILED)
    if report.when == 'setup':
        if report.outcome == 'skipped':
            redis.incr(CaseCountName.SKIP)
    CaseCount().total_run_count()
def pytest_terminal_summary(terminalreporter, exitstatus, config):
    """收集测验成果,从Redis缓存数据中获取"""
    total_case = case_count.total_count()
    pass_case = case_count.passed_count()
    fail_case = case_count.failed_count()
    skip_case = case_count.skip_count()
    error_case = case_count.error_count()
    pass_rate = case_count.pass_rate()
    run_time = round((time.time() - terminalreporter._sessionstarttime), 2)
    print("******用例履行成果核算******")
    print(f"总用例数:{total_case}条")
    print(f"经过:{pass_case}条")
    print(f"失利:{fail_case}条")
    print(f"越过:{skip_case}条")
    print(f"报错:{error_case}条")
    print(f"用例经过率:{pass_rate}%")
    print(f"用时:{run_time}s")
    desc = """
        本次履行情况如下:
        总用例数为:{}
        经过用例数:<font>{}条</font>
        失利用例数:<font>{}条</font>
        错误用例数:{}
        越过用例数:{}
        经过率为:{} %
        用时:{}s
        """.format(total_case, pass_case, fail_case, error_case, skip_case, pass_rate, run_time)
    EnterpriseWechatNotification(hook=HookUrlConf.HOOK_URL.value).send_msg(desc)  # 履行成果发送企业微信

上述两个hook函数中的print都是为了将履行成果打印在控制台

三、运转进程与运转作用

1.运转进程

  • Windows本地运转

此处为语雀视频卡片,点击链接检查:Windows运转.mp4

  • Jenkins触发运转

此处为语雀视频卡片,点击链接检查:Jenkins运转作用.mp4

2.企业微信音讯告诉

1)经过Jenkins触发运转的告诉作用:↓

利用pytest hook函数实现自动化测试结果推送企业微信

2)Windows本地手动触发运转的告诉作用:↓

利用pytest hook函数实现自动化测试结果推送企业微信

小结


以上就是利用pytest的hook函数:pytest_runtest_makereport、pytest_terminal_summary‍+redis,完成主动收集测验成果并发送音讯告诉到企业微信的原理及进程:

  • 不管是接口主动化测验仍是UI主动化测验都能够经过这种办法来完成音讯告诉;
  • 除了在代码中调用pytest hook函数完成音讯告诉外,Jenkins也能够经过装置插件到达邮件告诉、履行Python脚本到达企微音讯告诉的意图;
  • 测验成果的存储纷歧定要用到redis,也能够写在本地文件等,多一层调用,就多一层处理和或许面对的调试报错,别的redis所在服务器衔接出错也会影响用例的正常运转;
  • 发送音讯的内容样式支撑Markdown,发送内容还能够持续优化,比如:告诉哪条用例报错等等;
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。