干货 | UniqueMergeTree:支持实时更新删除的ClickHouse表引擎

技术

picture.image

本文是分析型数据库团队高大月在CSDN数据库Meetup上的分享实录摘编。 关注字节跳动数据平台微信公众号,回复【PPT】阅读本次分享材料。

picture.image

文 | 高大月

来自字节跳动数据平台分析型数据库团队

picture.image

UniqueMergeTree开发的业务背景

首先,我们看一下哪些场景需要用到实时更新。我总结了三类场景。

第一类是业务需要对它的交易类数据进行实时分析, 需要把数据流同步到ClickHouse这类OLAP数据库中。大家知道,业务数据诸如订单数据天生是存在更新的,所以需要OLAP数据库去支持实时更新。

第二个场景和第一类比较类似,业务希望把TP数据库的表实时同步到ClickHouse,然后借助ClickHouse强大的分析能力进行实时分析, 这就需要支持实时的更新和删除。

最后一类场景的数据虽然不存在更新,但需要去重。 大家知道在开发实时数据的时候,很难保证数据流里没有重复数据,因此通常需要存储系统支持数据的幂等写入。

我们可以总结一下这三类场景的共同点:

从数据的新鲜度看

这三个场景其实都不需要亚秒级的新鲜度,往往做到秒级或者分钟级的数据新鲜度就可以了,因此可以采用mini-batch的实时同步方案。

从使用上看

这三类场景都可以通过提供基于唯一键的upsert功能来实现,不管是更新还是幂等处理的需求。

从读写要求上看

因为大家用OLAP数据库最核心的诉求是希望查询可以有一个非常低的延迟,所以对读的性能要求是非常高的。对于写,虽然也需要高吞吐,但更多关注Scalability,即能否通过加资源来提高数据流的写吞吐。

从高可用性上看

这三个场景都需要能支持多副本,来避免整个系统存在单点故障。

以上就是我们开发UniqueMergeTree的背景。

picture.image

常见的列存储实时更新方案

下面介绍下在列存储里支持实时更新的常见技术方案。

key-based merge on read

第一个方案叫key-based merge on read,它的整个思想比较类似LSMTree。对于写入,数据先根据key排序,然后生成对应的列存文件。每个Batch写入的文件对应一个版本号,版本号能用来表示数据的写入顺序。

同一批次的数据不包含重复key,但不同批次的数据包含重复key,这就需要在读的时候去做合并,对key相同的数据返回去最新版本的值,所以叫merge on read方案。ClickHouse的ReplacingMergeTree和Doris用的就是这种方案。

大家可以看到,它的写路径是非常简单的,是一个很典型的写优化方案。它的问题是读性能比较差,有几方面的原因。首先,key-based merge通常是单线程的,比较难并行。其次merge过程需要非常多的内存比较和内存拷贝。最后这种方案对谓词下推也会有一些限制。大家用过ReplacingMergeTree的话,应该对读性能问题深有体会。

这个方案也有一些变种,比如说可以维护一些index来加速merge过程,不用每次merge都去做key的比较。

mark-delete+insert

picture.image

Ref “Enhancements to SQLServer Column Stores”

下面以SQLServer的Column Stores为例介绍下这个方案。图中,每个RowGroup对应一个不可变的列存文件,并用Bitmap来记录每个RowGroup中被标记删除的行号,即DeleteBitmap。处理更新的时候,先查找key所属的RowGroup以及它在RowGroup中行号,更新RowGroup的DeleteBitmap,最后将更新后的数据写入Delta Store。查询的时候,不同RowGroup的扫描可以完全并行,只需要基于行号过滤掉属于DeleteBitmap的数据即可。

这个方案牺牲了写入性能。一方面写入时需要去定位key的具体位置,另一方面需要处理write-write冲突问题。

这个方案也有一些变种。比如说写入时先不去查找更新key的位置,而是先将这些key记录到一个buffer中,使用后台任务将这些key转成DeleteBitmap。然后在查询的时候通过merge on read的方式处理buffer中的增量key。

由于ClickHouse的ReplacingMergeTree已经实现了方案一,所以我们希望UniqueMergeTree能实现读优化的方案。

picture.image

UniqueMergeTree使用与实现

下面介绍UniqueMergeTree的具体使用。我们先介绍一下它的特性。

UniqueMergeTree表引擎特性

首先UniqueMergeTree支持通过UNIQUE KEY关键词来指定这张表的唯一键,引擎会实现唯一约束。对于UNIQUE表的写入,我们会采用upsert的语义,即如果写入的是新key,那就直接插入数据;如果写入的key已经存在,那就更新对应的数据。

然后我们也支持,指定UNIQUE KEY的value来删除数据,满足实时行删除的需求。然后和ReplacingMergeTree一样,也支持指定一个版本字段来解决回溯场景可能出现的低版本数据覆盖高版本数据的问题。最后我们也支持数据在多副本的同步。

下面是一个使用示例。首先我们建了一张UniqueMergeTree的表,表引擎的参数和ReplacingMergeTree是一样的,不同点是可以通过UNIQUE KEY关键词来指定这张表的唯一键,它可以是多个字段,可以包含表达式等等。

picture.image

下面对这张表做写入操作就会用到upsert的语义,比如说第6行写了四条数据,但只包含1和2两个key,所以对于第7行的select,每个key只会返回最高版本的数据。对于第11行的写入,key 2是一个已经存在的key,所以会把key 2对应的name更新成B3; key 3是新key,所以直接插入。最后对于行删除操作,我们增加了一个delete flag的虚拟列,用户可以通过这个虚拟列标记Batch中哪些是要删除,哪些是要upsert。

示例展示的是单shard的写入,而生产环境通常包含多个shard,。多个shard写入的时候就涉及到了你要解决数据分片的问题,其实它的主要目的就是我们需要把相同的key的数据写到同一个shard里,不然如果你的key可能存在多个shard的话,你的去重开销就非常大。

分布式表写入:分片方案选择

上面的示例展示了单shard的写入,然而生产环境通常包含多个shard,如何实现相同key的数据写往同一个shard呢?这里有两种方案。

  • internal sharding: 即由引擎本身来实现数据的分片。具体来说,可以直接把数据写到ClickHouse的分布式表,它会根据sharding key实现数据的分片和路由。Internal sharding的优点是分片方式对用户透明,不容易出错;另外不同表的分片算法是一致的,在做多表关联的时候,可以利用数据的分片特征来优化查询。这是ByteHouse云数仓版使用的方式。

external sharding: 由用户或者SDK负责数据的分片和路由,这是ByteHouse企业版使用的方式。Internal sharding有个问题是,在实时写入场景,每个微批本身就不大,如果再对它进行分片会产生更多的小文件,影响写入吞吐。External sharding在外部实现数据的攒批,每个微批只写一个shard,这样batch size更大,整体的写吞吐会更高。它的问题是需要由用户端保证分片的正确性,比较容易出错。External sharding比较适合kafka导入等单一写入场景。如果表有多个写入通道,用户需要保证多个通道采用一致的分片方式,成本更高。

单机版实现:UniqueMerge Tree读写链路

下面介绍下UniqueMergeTree在单节点的读写链路。

写链路: 首先要判断写入key所属的part以及它在part中的行号,接着去更新对应part的delete bitmap,将写入key从原来的part里标记删除掉,最后将新数据写入新part里。为了实现上面的逻辑,我们为每个part新增了一个key index,用于加速从唯一键值到行号的查找。另外每个part包含多个delete file,每个delete file对应一个特定版本的delete bitmap。

读链路: 先获取所有part的delete bitmap快照,然后读取每个part的时候使用对应的delete bitmap过滤掉标记删除的行。这样就保证了整体的唯一性约束。

此外,还需要考虑 并发场景的两种冲突: write-write conflict和write-merge conflict。

picture.image

先介绍write-write conflict。产生该冲突的原因是write使用了upsert语义,因此当两个并发事务更新同一行的时候会产生冲突。比如左图中的两个并发事务同时更新P1的Key A行,如果不做并发控制,两个事务可能都去标记删除P1中的Key A行,然后写出P2和P3,最终P2和P3就同时包含了Key A。

TP数据库一般通过锁或者OCC的方式处理write-write conflict,但在AP场景中用行锁或者行级冲突检测的代价是比较高的。考虑到AP场景数据都是批量写入,我们采用了更简单的表锁来实现单表的写入串行化。

再来看右图的write-merge conflict。多个part在后台合并过程中,并发的前台写入事务可能会更新part的delete bitmap。如果不做并发控制,就会发生写入事务标记删除的行在part合并后“复活”的现象。要解决这个问题,后台合并任务需要感知到合并过程中,前台写入事务更新了哪些key。

处理Write-Merge Conflict

picture.image

我们给每个merge task新增了一个DeleteBuffer,用于缓存merge过程中前台写入任务删除的key。

Merge task开始时,先获取表锁创建DeleteBuffer,并获取input part的delete bitmap快照。接着读取input part,过滤掉标记删除的行,生成合并后的临时part。这个过程中,并发的写入事务如果发现要更新delete bitmap的part正在被合并,就会将要删除的key记录到merge task的DeleteBuffer。Merge task在提交前会再次获取表锁,将DeleteBuffer中的key转成新part的delete bitmap。

那么如何限制DeleteBuffer的内存使用呢? 一种简单有效的方式是,写入事务如果发现DeleteBuffer的大小超过了阈值,就直接abort对应的merge任务,等待下次合并。 因为DeleteBuffer比较大说明在合并过程中input part有很多增量的删除,重试可以减小merge后的part大小。

性能评估

picture.image

我们使用YCSB对UniqueMergeTree的写入和查询性能做了性能测试,结果如上图。可以看到,与ReplacingMergeTree相比,UniqueMergeTree的写入性能虽然会有40%到50%的下降,但在查询性能上取得了数量级的提升。我们进一步对比了UniqueMergeTree和普通MergeTree的查询性能,发现两者是非常接近的。 查询性能的提升主要归功于以下几点:

  • 避免了单线程的merge-on-read,流水线完全并行化
  • DeleteBitmap的最新版本常驻内存
  • 标记删除的Mark可以直接跳过
  • Combine pre-where filter & delete filter,减少IColumn::filter次数

picture.image

总结:经验与后续规划

我们在2020年初上线了UniqueMergeTree,目前线上应用的表数量超过了1000,还是非常受业务欢迎的。整个过程中,我认为做的比较对的决策有两点:

  • 在读写权衡方面, 牺牲一部分写性能来换取更高的读性能。我们发现很多业务场景的痛点是查询性能。虽然UniqueMergeTree的写吞吐不如MergeTree,但通过增加shard横向扩展,已经能满足大部分业务的需求。
  • 设计上没有对表的数据量做太多限制。 例如KeyIndex,一种做法是假设KeyIndex可以完全存储在内存中,但我们认为这会限制UniqueMergeTree的应用场景。因此虽然我们第一版实现的也是in-meomry index,但后来比较顺利地演进到了disk-based index。

对于后续规划,我们会重点尝试两个方向:

  • 部分列更新: 有些场景需要多个数据流更新同一张表的不同字段,因此需要部分列更新的能力。

  • 写吞吐优化: 写吞吐会直接影响每个集群能接入的实时数据规模。我们在表锁粒度和KeyIndex两方面都看到了进一步优化的空间。

关注字节跳动数据平台微信公众号,

回复【PPT】阅读本次分享材料。

产品介绍

火山引擎ByteHouse

统一的大数据分析平台。目前提供企业版和云数仓两种版本,企业版是基于开源的企业级分析型数据库,支持用户交互式分析PB级别数据,通过多种自研表引擎,灵活支持各类数据分析和应用;云数仓版作为云原生的数据分析平台,实现统一的离线和实时数据分析,并通过弹性扩展的计算层和分布式存储层,有效降低 企业大数据分析。后台回复数字“6”了解产品。

picture.image

分析型数据库团队

正在招人,

点击 阅读原文

了解

96
0
0
0
相关产品
评论
未登录
看完啦,登录分享一下感受吧~
暂无评论