目前,字节跳动数据质量平台对于批处理数据的质量管理能力已经十分丰富,提供了包括表行数、空值、异常值、重复值、异常指标等多种模板的数据质量监控能力,也提供了基于spark的自定义监控能力。另外,该平台还提供了数据对比和数据探查功能,为用户在数据开发阶段及时发现数据质量问题提供了便利的手段。
本文分上下两次连载,作者系 字节跳动数据平台开发套件团队高级研发工程师于啸雨 。 点击阅读上篇-技术调研与思考
为更好地满足流式数据用户的数据质量监控需求,同时填补数据质量平台在流式数据源方面的空白,字节跳动数据质量平台团队于2020年下半年,以Kafka数据写入延迟监控为切入点,陆续调研、开发、上线了一系列基于Flink StreamSQL的流式数据质量监控。
DataLeap
流式数据质量监控的技术架构
以Kafka数据源为例,流式数据质量监控的技术架构及流程图如下所示:
每个流式监控规则从创建到执行和触发报警的 主要流程如下 :
1、用户在数据质量平台上创建监控规则。
2、数据质量平台收到规则创建请求后,会做以下三件事:
- 将规则元数据保存到DB。
- 根据规则的报警指标定义,在数据开发平台上创建对应的Flink SQL任务。
- 将报警条件映射为报警平台的触发规则。
3、Flink SQL作业将消费Kafka的数据,计算监控指标,并写到TSDB中。
4、报警平台将基于TSDB中的时序数据,周期性地检测是否触发报警。若触发报警,将回调数据质量平台。
5、数据质量平台根据报警平台的回调请求,处理后续报警发送逻辑。
Flink SQL作业的执行逻辑
用户在数据质量平台上配置kafka数据的监控规则时,有可能会为一个topic配置多个监控规则,为节约资源,便于统一管理,数据质量平台将相同topic的所有监控规则放在同一个Flink SQL作业中计算。Flink SQL作业的构成如下图所示。
从逻辑上,该Flink SQL作业分为source、rule、sink三个阶段:
source阶段: 本阶段的主要目标是根据Kafka topic等配置,创建Kafka Source Table,Flink底层执行逻辑是消费Kafka的数据,并根据Source Table的schema进行解析、提取相应的字段,供后续Rule阶段使用。根据需求,创建的Kafka Source Table支持从消息header中取字段,支持json和pb格式,也支持按百分比取样消费数据(针对数据量较大的topic的采样监控策略)。
rule阶段: 本阶段执行所有监控规则的实际计算逻辑,每个监控规则对应一个或多个rule sql,由各个rule sql计算相应的监控指标,insert到Sink Table表中。
sink阶段: 本阶段的主要目的是将计算结果存到TSDB中。
上述三个阶段,在Flink SQL作业中,分别用创建Kafka Source Table的DDL,计算指标结果的Rule Sqls、创建TSDB Sink Table的DDL三类SQL来描述。以下的SQL示例,即展示了某个topic配置了3个监控规则,其Flink作业所包含的SQL逻辑:
-- SOURCE TABLE DDL:定义了Kafka数据源及消费策略
create table data_source (
app_id BIGINT,
name VARCHAR,
_meta_timestamp BIGINT,
time_formatted VARCHAR,
price DOUBLE,
type VARCHAR
) with (
'format.type' = 'json',
'update-mode' = 'append',
'metadata.fields.mapping' = 'timestamp=_meta_timestamp',
'connector.version' = '0.10',
'format.derive-schema' = 'true',
'connector.topic' = 'kafka_topic',
'connector.group.id' = 'kafka_group_id',
'connector.startup-mode' = 'latest-offset',
'connector.type' = 'kafka',
'format.skip-dirty' = 'true',
...
);
-- SINK TABLE DDL:定义了TSDB Sink
create table data_sink(
type varchar,
name varchar,
val double,
tags varchar
) with (
'connector.type' = 'tsdb',
'connector.tsdb.table' = 'tsdb_table',
...
);
-- RULE SQL 1:计算字段app_id的空值率
INSERT INTO data_sink
SELECT
'COUNTER' AS type,
'r0' AS name,
CAST(1.0 AS DOUBLE) AS val,
CONCAT('{', IF(`app_id` is null, 'app_id=nit', 'app_id=normal') , ',' , IF(`name` is null, 'name=nit', 'name=normal') ,'}') AS tags
FROM data_source;
-- RULE SQL 2:以time_formatted为事件时间,计算数据写入延迟
INSERT INTO data_sink
SELECT
'TIMER' AS type,
'r1' AS name,
CAST((_meta_timestamp - UNIX_TIMESTAMP(time_formatted, 'yyyy-MM-dd HH:mm:ss') * 1000) AS DOUBLE) AS val,
'{}' AS tags
FROM data_source;
-- RULE SQL 3:计算指标app_id的取值范围
INSERT INTO data_sink
SELECT
'TIMER' AS type,
'r2.app_id' AS name,
CAST(app_id AS DOUBLE) AS val,
'{}' AS tags
FROM data_source
WHERE app_id IS NOT NULL;
Flink SQL作业的管理
流式监控的Flink SQL作业被托管到Dataleap数据开发平台上。数据开发平台对流式作业有较完善的运维和管理机制。因此,在作业管理方面,数据质量平台仅需要处理以下几点:
1、规则的新建、删除、关闭等操作时,向数据开发平台提交Flink SQL作业的创建或更新
2、借助数据开发平台的能力,实现作业的启停、重启、监控等运维操作。
3、另外,用户可在数据质量平台上对监控作业的资源进行调整(如下图所示),对资源配置的调整会实时更新到对应的Flink SQL作业中。
报警的诊断和管理
在创建规则时,数据质量平台会将报警条件映射为报警平台的报警规则。报警平台将周期性地根据报警规则查询TSDB,诊断报警。若满足报警条件,将回调数据质量平台。由数据质量平台负责后续的报警内容生成和发送,并记录报警时间和结果。
采用这种方式,流式监控的报警卡片内容、报警发送方式、报警级别、报警接收人等配置均由数据质量平台维护。这部分与已有的batch监控基本相同,直接复用了已有能力。
另外,用户在ACK(报警屏蔽)时,数据质量平台将记录ACK信息,但不将ACK信息报给报警平台。在后续处理报警平台的回调时,会加一层过滤,不向用户发送已ACK(处于屏蔽状态)的报警,但仍保存报警信息,供用户查看屏蔽期间内的报警结果。
监控结果展示
监控的Flink SQL作业将计算结果sink到了TSDB,因此在用户查看历史监控指标结果时,数据质量平台可实时从TSDB拉取结果进行展示。
此外,数据质量平台也在页面上将TSDB的表名、metric名透传给用户,便于用户更灵活地查看监控结果,或将监控结果配置到自己的Grafana看板中。
DataLeap
流式监控现状
支持的监控类型
目前,在字节跳动,数据质量平台已上线支持了时间字段延迟、空值监控、字段监控和自定义指标监控四种监控类型。
1、时间字段延迟
时间字段延迟实际是指数据的生产延迟或者写入延迟,是指数据从产生到写入kafka的延迟情况。实际的计算方式是数据写入kafka的时间(从消息header中获取的kafka LogAppendTime)减去用户配置的代表事件时间的字段时间。如用户配置的字段时间是11:50:00,写入kafka的LogAppendTime是11:50:01,则认为写入延迟为1秒。
2、空值监控
空值监控用于检查用户指定字段的空值率,超过阈值即报警。其计算方式也很直接,直接计算特定时间段内,指定字段空值的数量/总的数据条数。
3、字段监控
字段监控的处理逻辑更为简单,直接将字段的值(30秒的聚合结果)写入TSDB。主要使用场景是用来监控字段的值是否在预期的范围内。
4、自定义指标监控
自定义指标监控是基于用户自定义的SQL计算结果进行的监控。用户可根据Flink SQL的语法,将需要监控的指标用SQL的形式表达出,然后对计算结果配置报警条件。
问题与挑战
基于Flink SQL的流式数据质量监控,目前面临的问题与挑战主要有:
1、使用FlInk SQL时,要求数据能够序列化成指定schema的流式表,因此其前提是数据的schema已知且符合预期。若用户需要对数据的schema进行监控,比如数据能否成功解析、或某些字段的类型是否正确,则很难用Flink SQL实现。
2、流式监控作业本身是一个Flink流式作业,因此其也存在着其它流式作业共有的问题。如计算资源如何合理分配、消费延迟等。若监控作业本身消费延迟,将会影响流式数据质量监控报警的时效性。目前我们对监控作业消费延迟、资源使用等指标配置了监控报警。在资源不足、消费延迟时仍需要人工干预处理。
除上述问题外,目前产品和技术上也存在着一些细节问题,这里不再赘述了。
从整体上看,该方案架构较为简洁,充分利用了Flink SQL以及一些其它基础组件和平台的能力,也复用了数据质量平台batch监控的一些能力,以较低的技术成本提供了较为丰富的流式数据监控功能。
本文中的技术方案和能力现在已经通过火山引擎大数据研发治理套件DataLeap面向企业开放。
点击
阅读原文
了解产品详情
产品介绍
火山引擎大数据研发治理套件DataLeap
一站式数据中台套件,帮助用户快速完成数据集成、开发、运维、治理、资产、安全等全套数据中台建设,帮助数据团队有效的降低工作成本和数据维护成本、挖掘数据价值、为企业决策提供数据支撑。 后台回复数字“2”了解产品
附-参考资料
http://griffin.apache.org/docs/profiling.html
How to Monitor Data Stream Quality Using Spark Streaming and Delta Lake
https://github.com/awslabs/deequ
- End -