大家好~我是
米洛
!
我正在从0到1打造一个开源的接口测试平台, 也在编写一套与之对应的完整教程
,希望大家多多支持。
欢迎关注我的公众号测试开发坑货
,获取最新文章教程!
上节我们打了个野,解决了一下APScheduler
的重复执行问题。在更上一节,我们编写好了Redis的在线执行功能。
那这一节我们就得把它运用到前置条件
里面来了。
由于博主都是先编码,再写文。所以是能展示成果的,今日份变化如下:
前置条件要想支持Redis,思路其实很简单,模式也和SQL
差不多。所以我们可以照搬SQL的代码。
之前我们的Executor都放在utils包下面,其实很不友好
。为了显示专业性,我决心把Executor从utils抽离出来,放到新目录core
下面。
这个core目录打算放一些用例执行的核心方法,咱们拭目以待。
这个就需要伟大的Pycharm了,我们其实新建一个core目录以后,直接把executor.py移动到core目录即可。
通过pycharm移动文件,他不仅仅只是移动文件那么简单,它还会帮你处理这个文件引入了哪些包,如果路径发生变化了,他也会自动帮忙更新,如果有其他文件引用了executor.py,它也会顺道帮助更新。
有如此强大的ide作为支撑,所以我玩起来也是随心所欲
。
随着case执行的逻辑越来越多,我们的executor.py有点hold不住,倒不是说一个py文件不能放1000行代码,只是我们没有必要让它那么臃肿
。
我们最能拆除的就是执行构造方法的函数了,首先我们的构造方法有固定的几种,先看看只支持sql和用例的时候,我们是怎么写的:
async def execute_constructor(self, env, index, path, params, req_params, constructor: Constructor): if not constructor.enable: self.append(f"当前路径: {path}, 构造方法: {constructor.name} 已关闭, 不继续执行") return if constructor.type == 0: try: data = json.loads(constructor.constructor_json) case_id = data.get("case_id") testcase, _ = await TestCaseDao.async_query_test_case(case_id) self.append(f"当前路径: {path}, 第{index + 1}条构造方法") # 说明是case executor = Executor(self.logger) new_param = data.get("params") if new_param: temp = json.loads(new_param) req_params.update(temp) result, err = await executor.run(env, case_id, params, req_params, f"{path}->{testcase.name}") if err: raise Exception(err) if not result["status"]: raise Exception(f"断言失败, 断言数据: {result.get('asserts', 'unknown')}") params[constructor.value] = result # await self.parse_params(testcase, params) except Exception as e: raise Exception(f"{path}->{constructor.name} 第{index + 1}个构造方法执行失败: {e}") elif constructor.type == 1: # 说明是sql语句 try: self.append(f"当前路径: {path}, 第{index + 1}条构造方法") data = json.loads(constructor.constructor_json) database = data.get("database") sql = data.get("sql") self.append(f"当前构造方法类型为sql, 数据库名: {database}\nsql: {sql}\n") sql_data = await DbConfigDao.execute_sql(env, database, sql) params[constructor.value] = sql_data self.append(f"当前构造方法返回变量: {constructor.value}\n返回值:\n {sql_data}\n") except Exception as e: raise Exception(f"{path}->{constructor.name} 第{index + 1}个构造方法执行失败: {e}")
可以看到,才2种方法,就已经很臃肿
了。其实更可读一点的写法,需要把几种构造方法拆开,拆开后分别有一个run方法,这样我们只需要判断类型是啥,执行对应的run方法即可。
from abc import ABC from app.models.constructor import Constructor class ConstructorAbstract(ABC): @staticmethod def run(executor, env, index, path, params, req_params, constructor: Constructor, **kwargs): pass
基类方法类似Java的abstract类,定义一个类,不实现里面的方法则无法实例化
。
测试用例这儿有些复杂的变动:
一般来说,我们是case调用前置条件的run方法,但由于测试用例需要再次调用executor,所以导致了什么呢?
executor引用testcase引用executor,也就是引用循环,这在python里面是禁止的,毕竟你套娃
了!
所以这里采取的土方法是把我的Executor类当参数传递进来 =。=(够土)
redis实现
其实执行redis的时候我们已经编写过类似的方法了,但因为之前是根据id执行redis,现在要改改,需要根据name执行。
根据name执行有些迫不得已,主要是因为我们case编写是适配多套环境,如果传入id的话,那么redis对应的都是固定环境的配置,无法适配多环境。
和sql一样,我们采取 env+name确定redis连接(name不变,env切换的时候,redis连接也变成动态了)
redis执行命令还需改动一下唉:
这里传入的参数不只是id了,当你传入id=xx的时候则根据id查询,传入name=xxx的时候就根据name查询,兼容了在线执行redis的接口
。
看起来一目了然,其实有点像interface和impl。
今天的内容就到这里啦~下一波做啥还没想好,可能会完善后置条件或者在线执行case也或者七牛云oss吧。前路漫漫,都已经83节了。
在线体验: http://test.pity.fun
喜欢的话可以给pity点个star哦,你们的star是我的动力= =