日常工作中,数据开发上线完一个任务后并不是就可以高枕无忧,时常因上游链路数据异常或者自身处理逻辑的 BUG 导致产出的数据结果不可信。而问题发现可经历较长周期(尤其离线场景),往往是业务方通过上层数据报表发现数据异常后 push 数据方去定位问题(对于一个较冷的报表,这个周期可能会更长)。
由于数据加工链路较长,需借助数据血缘关系逐个任务排查,也会导致问题定位难度增大,严重影响开发效率。如数据问题未及时发现,可能导致业务方作出错误决策。此类问题可统一归属为大数据领域数据质量问题。本文将向大家介绍伴鱼基础架构数据团队在应对该类问题时推出的平台化产品-数据质量中心的设计与实现。
业内数据质量平台化产品介绍不多,主要对两个开源产品和一个云平台产品进行调研。
Apache Griffin,eBay 开源基于 Apache Hadoop 和 Apache Spark 的数据质量服务平台。
数据质量平台的核心流程:
平台对数据质检规则进行了分类(业内普遍认可的数据质量的六大标准):
该项目仅在 Accuracy 类的规则上实现。Griffin是完全闭环的平台化产品。质检任务执行依赖内置定时调度器的调度,调度执行时间由用户在 UI 上设定。任务将通过 Apache Livy 组件提交至配置的 Spark 集群。即质检实时性难保,无法强行阻断产出异常数据的任务,二者不是在同一调度平台被调度,时序也不能保持串行。
Qualitis,微众银行开源的一款数据质量管理系统。同样,它提供了一整套统一的流程来定义和检测数据集的质量并及时报告问题。从整个流程上看我们依然可以用 Define、Measure 和 Analyze 描述。它是基于其开源的另一款组件 Linkis 进行计算任务的代理分发,底层依赖 Spark 引擎,同时可以与其开源的 DataSphereStudio 任务开发平台无缝衔接,也就实现了在任务执行的工作流中嵌入质检任务,满足质检时效性的要求。可见,Qualitis 需借助微众银行开源一系列产品才好用。
DataWorks,阿里云提供一站式大数据工场,包括数据质量在内的产品解决方案。实现依赖阿里云其他产品组件支持。DataWorks 数据质量部分的使用介绍从产品形态上给了我们很大的帮助,对我们产品设计有指导性作用。
离线调度开发平台基于 Apache Dolphinscheduler(简称DS)实现,分布式去中心化,易扩展的可视化 DAG 调度系统,支持包括 Shell、Python、Spark、Flink 等多种类型的 Task 任务,并具有很好的扩展性。
Master 节点负责任务的监听和调度,Worker 节点则负责任务的执行。值得注意的是,每一个需要被调度的任务必然需要设置一个调度时间的表达式(cron 表达式),由 Quartz 定时为任务生成待执行的 DAG Command,有且仅有一个 Master 节点获得执行权,掌管该 DAG 各任务节点的调度执行。
平台整体架构图:
各模块设计权衡。
业界数据质量有六大标准,但:
可将这些问题统一归类为:平台在规则设定上是否需要和业界数据质量标准所抽象出来的概念进行绑定。很遗憾没找到有关数据质量标准更细化和指导性的描述,作为一个开发,这些概念比较费解,更贴近程序员视角是「show me the code」,因此我们决定将这一层概念弱化。未来实践过程后再细思。
如何对规则提供一种通用描述(or DSL)?
跳脱出前文所描述一切背景和概念,仔细思考数据质检过程,本质就是通过一次真实的任务执行产出结果,对比输出结果与期望是否满足,以验证任务逻辑正确性。和 Unit Testing 类比:
据此,可用 Unit Testing 概念从以下深入:
数据任务执行产出结果是一张 Hive 表,要对这张 Hive 表数据加工、提取以获得需要Actual Value。涉及 Hive 表加工,就想到以 SQL 实现,通过 Query 和 一系列 Aggregation 拿到结果,此结果结构又可分为:
显然单行且单列标量是期望,因为易于结果比较(就目前能想到的规则,都可通过 SQL 提取为一个标量结果)。因此,规则设计中,需要规则创建者输入一段用于结果提取的 SQL,该段 SQL 执行结果需要为一个标量。
既然 Actual Value 是标量,Expected Value 也是标量,需要规则创建者在平台输入。
上述标量的类型决定断言比较方式。目前只支持数值型标量的比较方式,包含「大于」、「等于」及「小于」三种比较算子。
三要素即可完整的描述规则想要表达的核心逻辑。如表述「字段为空异常」规则(潜在含义:字段为空的行数大于 0 时判定异常):
为了规则复用抽象出的一个概念,模板中包含规则的 SQL 定义、规则的比较方式、参数定义(注:SQL 中包含一些占位符,这些占位符将以参数的形式被定义,在规则实体定义时需要用户明确具体含义)以及其他的一些元信息。
「字段空值的行数」模板示例:
基于规则模板构建,是规则的具象表达。
在规则实体中将明确规则的 Expected Value、比较方式中具体的比较算子、参数的含义以及其他的一些元信息。基于同一个规则模板,可构造多个规则实体。
「某表 user_id 唯一性校验」规则示例:
规则可能不仅针对单表校验,多表case,这套规则模板同样适用,只要能将逻辑用 SQL 表达。
在 DS 的前端交互上支持为任务直接绑定校验规则,规则列表通过 API 从 DQC 获取,这种方式在用户的使用体验上存在一定的割裂(规则创建和绑定在两个平台完成)。同时,在 DQC 的前端亦可以直接设置关联调度,为已有任务绑定质检规则,任务列表通过 API 从 DS 获取。同一个任务可绑定多个质检规则,这些信息将存储至 DS 的 DAG 元信息中。但是:
主要有两种方式:
最终选型后者,ID List可使对 DS 侵入最低。
规则的强弱性质由用户为任务绑定规则时设定,此性质决定了规则执行的方式。
强规则
和当前所执行的任务节点同步执行,一旦规则检测失败整个任务节点将置为执行失败的状态,后续任务节点的执行会被阻断。对应 DS 中的执行过程:
弱规则
和当前所执行的任务节点异步执行,规则检测结果对于原有的任务执行状态无影响,从而也就不能阻断后续任务的执行。对应 DS 中的执行过程:
可以看出在强弱规则的执行方式上,对 DS 调度部分的代码有一定的侵入,但这个改动不大,成本是可以接受的。
上文提及到一个 Job Task 绑定的规则(可能有多个)将被转换为一个 DQC Task 被 DS 调度执行,接下来我们就讨论下 DQC Task 的实现细节以及由此引出的 DQC SDK 的设计和实现。
DQC Task 继承自 DS 中的抽象类 AbstractTask,只需要实现抽象方法 handle(任务执行的具体实现)即可。那么对于我们的质检任务,实际上执行逻辑可以拆分成以下几步:
最核心的步骤为 Query 的执行。Query 的实现方式又可分为两种:
Spark 实现
Presto SQL 实现
选择后者,最易实现,离线场景计算耗时也可接受。同时由于一个 DQC Task 包含多条规则,在拼接 SQL 时将同表的规则聚合以减少 IO 次数。不同的 SQL 交由不同的线程并行执行。
上述执行逻辑其实是一个完整且闭环的功能模块,因此我们想到将其作为一个单独的 SDK 对外提供,并以 Jar 包的形式被 DS 依赖,后续即便是更换调度引擎,这部分的逻辑可直接迁移使用(当然概率很低)。那么 DS 中 DQC Task 的 handle 逻辑也就变得异常简单,直接以 Shell 形式调用 SDK ,进一步降低对 DS 代码的侵入。
单条规则的质检结果将在平台上直接展现,目前我们还未对任务级的规则进行聚合汇总,这是接下来需要完善的。对于质检失败的任务将向报警接收人发送报警。
平台解决了规则创建、规则执行的问题,而在实践过程中,对用户而言更关心:
这些是很难通过平台自动实现的,因为平台理解不了业务的信息,平台能做的只能是通过质量检测报告给与用户反馈。因此这个事情需要具体的开发人员对核心场景进行梳理,在充分理解业务场景后根据实际情况进行设定。话又说回来,平台只是工具,每一个数据开发人员应当提升保证数据质量的意识,这又涉及到组织内规范落地的问题了。
数据质量管理是一个长期的过程,未来在平台化方向我们还有几个关键的部分有待继续推进:
关注我,紧跟本系列专栏文章,咱们下篇再续!
作者简介:魔都技术专家兼架构,多家大厂后端一线研发经验,各大技术社区头部专家博主,编程严选网创始人。具有丰富的引领团队经验,深厚业务架构和解决方案的积累。
负责:
- 中央/分销预订系统性能优化
- 活动&优惠券等营销中台建设
- 交易平台及数据中台等架构和开发设计
目前主攻降低软件复杂性设计、构建高可用系统方向。
参考: