基于ClickHouse的复杂查询实现与优化|社区征文

ClickHouse数据库

项目背景

ClickHouse的执行模式与Druid、ES等大数据引擎类似,其基本的查询模式可分为两个阶段。第一阶段,Coordinator在收到查询后,将请求发送给对应的Worker节点。第二阶段,Worker节点完成计算,Coordinator在收到各Worker节点的数据后进行汇聚和处理,并将处理后的结果返回。

image.png

两阶段的执行模式能够较为高效地支持目前许多常见的业务场景,例如各类大宽表单的查询,这也是ClickHouse最擅长的场景。ClickHouse的优点是简单、高效,通常来说,简单就意味着高效。但随着企业业务的持续发展,愈加复杂的业务场景对ClickHouse提出了以下三类挑战。

第一类,当一阶段返回的数据较多,且二阶段计算较为复杂时,Coordinator会承受较大压力,容易成为Query的瓶颈。 例如一些重计算的Agg算子,如Count Distinct,若采用哈希表的方式进行去重,第二阶段需在Coordinator单机上去合并各个Worker的哈希表。这个计算量会很重且无法并行。

第二类,由于目前ClickHouse模式并不支持Shuffle,因此对于Join而言,右表必须为全量数据。 无论是普通Join还是Global Join,当右表的数据量较大时,若将数据都放到内存中,会比较容易OOM。若将数据spill到磁盘,虽然可以解决内存问题,但由于有磁盘 IO 和数据序列化、反序列化的代价,因此查询的性能会受到影响。特别是当Join采用Hash Join时,如果右表是一张大表,构建也会比较慢。针对构建问题,近期社区也进行了一些右表并行构建的优化,数据按照Join key进行Split来并行地构建多个Hash Table,但额外的代价是左右表都需要增加一次Split操作。

第三类,则是关于复杂查询(如多表 Join、嵌套多个子查询、window function 等),ClickHouse对这类需求场景的支持并不是特别友好, 由于ClickHouse并不能通过Shuffle来分散数据增加执行并行度,并且其生成的Pipeline在一些case下并不能充分并行。因此在某些场景下,难以发挥集群的全部资源。

image.png

随着企业业务复杂度的不断提升,复杂查询,特别是有多轮的分布式Join,且有很多agg的计算的需求会越来越强烈。在这种情况下,业务并不希望所有的Query都按照ClickHouse擅长的模式进行,即通过上游数据 ETL 来产生大宽表。这样做对ETL的成本较大,并且可能会有一些数据冗余。企业的集群资源是有限的,但整体的数据量会持续增长,因此在这种情况下,我们希望能够充分地去利用机器的资源,来应对这种越来越复杂的业务场景和SQL。所以我们的目标是基于ClickHouse能够高效支持复杂查询。

技术方案

对于ClickHouse复杂查询的实现,我们采用了分Stage的执行方式,来替换掉目前ClickHouse的两阶段执行方式。类似于其他的分布式数据库引擎,例如Presto等,会将一个复杂的Query按数据交换情况切分成多个 Stage,各Stage之间则通过Exchange完成数据交换。

Stage之间的数据交换主要有以下三种形式。

  • 按照单个或者多个key进行Shuffle
  • 将单个或者多个节点的数据汇聚到一个节点上,称为Gather
  • 将同一份数据复制到多个节点上,称为Broadcast或广播

对于单个Stage执行,继续复用ClickHouse目前底层的执行方式。开发上按照不同功能切分不同模块。各个模块预定接口,减少彼此的依赖与耦合。即使模块发生变动或内部逻辑调整,也不会影响其他模块。其次,对模块采用插件架构,允许模块按照灵活配置支持不同的策略。这样便能够根据不同业务场景实现不同的策略。

image.png

首先,当Coordinator接受复杂的查询以后, 它会在当前的语法树的基础上,根据节点类型和数据分布情况,插入Exchange节点,并生成一个分布式Plan。其次,Coordinator节点会根据ExchangeNode类型切分Plan,并生成每个Stage执行计划片段。

接着,Coordinator节点会调用SegmentScheduler调度器, 将各Stage的PlanSegment发送给Worker节点。当Worker接收到PlanSegment后,InterpreterPlanSegment会完成数据的读取和执行,通过ExchangeManager完成数据的交互。最后,Coordinator从最后一轮Stage所对应的ExchangeManager中去读取数据,并返回给Client。

查询片段调度器SegmentScheduler负责调度查询不同的PlanSegment,根据上下游依赖关系和数据分布,以及Stage并行度和worker分布和状态信息,按照一定的调度策略,将PlanSemgent发给不同的 Worker 节点。

image.png

目前而言,我们在进行计划下发和调度时,主要实现了两种策略。

第一种是依赖调度, 根据Stage依赖关系定义拓扑结构,产生DAG图,并根据DAG图调度Stage。依赖调度要等到依赖Stage启动以后,才会调度对应的Stage。例如两表Join,会先调度左右表读取Stage,之后再调度Join这个Stage,因为Join的Stage依赖于左右表的Stage。

第二种是AllAtOnce策略, 先计算每个Stage的相关信息,后一次性调度所有Stage。

相比而言,这两种策略是在容错、资源使用和延时上去做取舍。

第一种策略依赖调度,可以实现更好的容错。由于ClickHouse数据可以有多个副本,读数据时,如部分节点连接失败,可以尝试它的副本节点。对后续依赖的节点的Stage来说,并不需要感知到前面 Stage 的执行情况。非Source Stage,本身没有对数据的依赖,所以容错能力会更强,只要保证Stage并行度的节点存活即可。甚至极端情况下,如需保证Query正常执行,也可以降低Stage的并行度。但调度存在依赖关系,并不能完全并行,会增加调度的时长。Stage较多的情况下,调度延时可能会占据SQL整体不小的比例。针对上述问题的可做如下优化:对于一些没有依赖关系的,尽可能支持并行。例如同一个Stage的不同节点,可以并行。没有依赖关系的Stage,也可以并行。

第二种调度策略是AllAtOnce,通过并行可以极大降低调度延时。为防止出现大量网络IO线程,可以通过异步化手段控制线程数目。AllAtOnce策略的缺点是容错性没有依赖调度好,每一个Stage的Worker在调度前就已经确定了,调度过程中有一个Worker出现连接异常,则整个Query都会失败。另一类情况,Stage在上游数据还没有ready,就被调度起来了,则需要较长时间等数据。例如Final的agg Stage,要等Partial agg完成以后才能够拿到对应的数据。虽然我们也对此进行了一些优化,并不会长时间空跑,浪费CPU资源。但是其实也消耗了一部分资源,例如需要去创建这些执行的线程。

ClickHouse的查询节点执行主要是以SQL形式在节点间互相交互。在切分Stage后,我们需要支持能够执行一个单独的PlanSegment的执行计划。因此,InterpreterPlanSegment主要的作用就是接受一个序列化后的PlanSegment,能够在Worker节点上去运行整个PlanSegment的逻辑。此外,我们也进行了功能和性能上的增强,例如支持一个Stage处理多个Join,这样便可以减少Stage的数目和一些不必要的传输,用一个Stage就可以完成整个Join的过程。InterpreterPlanSegment的执行会上报对应的状态信息,如出现执行异常,会将异常信息报告给查询片段调度器,调度器会取消Query其他的Stage的Worker执行。

ExchangeManager是PlanSegment数据交换的媒介,能平衡数据上下游处理的能力。整体而言,我们的设计采用Push与队列的方式,当上游的数据ready时,主动推送给下游,并在这个基础上支持了反压的能力。

image.png

在整个流程中,上下游都会通过队列来优化发送和读取,上游与下游会有一个自己的队列。当队列饱和的时候,会通过类似反压的机制来控制上游这个执行速度,若上游计算快,下游处理能力比较慢,出现下游处理不过来的情况,则会通过反压的方式来控制上游执行的速度。

由于采用push和队列,因此要考虑一个相对比较特殊的场景,在某些case的情况下,下游的Stage并不需要读取全部的上游的数据。例如Limit100,下游只需读取100条数据,而上游可能会产生非常大规模的数据。因此在这种情况下,当下游的Stage读取到足够的数据后,它需要能够主动取消上游Stage的执行,并且清空队列。

ExchangeManager考虑的优化点较多, 例如细粒度的内存控制,能够按照实例、Query、Segment等多个层次进行内存控制,避免OOM。更长期的考虑是在一些对延迟要求不高、数据量大的场景,通过将数据 Spill 到磁盘,降低内存的使用

image.png

第二,为了提升传输效率,小数据要做Merge,大数据要做Split。 同时,在网络传输和处理某些场景的时候,需要做一种有序性的保证。例如在Sort的场景,Partial Sort和Merge Sort的网络传输过程必须要保证是有序的,传输数据不能出现乱序的情况,否则进行Merge Sort时数据就会出问题,并影响最终结果。

第三,连接的复用和网络的优化, 包括上下游在同一个节点,尽可能走内存交换,而不走网络。这样可以减少网络开销以及数据的序列化和反序列化的代价。此外,ClickHouse在计算上做了非常充足的优化,因此其在某些场景中,内存带宽会成为瓶颈,在ExchangeManager的一些场景中,可以用一些零拷贝和其他优化,尽量减少内存的拷贝。

第四,异常处理和监控。 相比于单机,分布式情况下异常情况会更加复杂,且更加难以感知。通过重试能够避免一些节点短时性的高负载或者异常对查询的影响。做好监控,在出问题的时候,能快速感知,并进行排查,也能够针对性地去做优化。

优化与诊断

首先是Join的多种实现和优化。根据数据的规模和分布,可以根据不同的场景去选择合适的Join的实现方式:

  • Shuffle Join,是目前使用方式最多,也是最常见的。
  • Broadcast Join,大表Join小表场景,将右表广播到左表的所有Worker节点上面,这样可以避免左表大表的数据传输。
  • Colocate Join,如果左右表都已按照Join key分布,并且它们是相通的分布的话,其实不需要去做数据的exchange,可以将数据的传输减到最小。

其次是网络连接的优化,核心本质是减少连接的建立和使用,特别是在数据需要Shuffle时,下一轮Stage中的每一个节点都要从上游的Stage中的每个节点去拉取数据。若集群整体的节点数较多,且存在很多较复杂的Query,就会建立非常多的连接。

image.png

目前在字节内部,ClickHouse集群的规模非常大,在当前 ClickHouse 二阶段执行的高并发情况下,单机最大可能会建立几万个连接。因此必须要进行网络连接的优化,特别是支持连接的复用,每个连接上可以跑多个Stage查询。通过尽可能去复用连接,在不同的节点之间,能够建立固定数目的连接,不同的Query、Stage都会复用这些连接,连接数并不会随着Query和Stage的规模的增长而增长。

网络传输优化,在数据中心内,远程的直接的内存访问,通常指RDMA,是一种能够超过远程主机操作系统的内核,去访问内存里的数据的技术。由于这种技术不需要经过操作系统,所以不仅节省了大量的CPU资源,同样也提升了系统吞吐量,降低了系统的网络通信延迟,尤其适合大规模并行的计算机集群。由于 ClickHouse 在计算层面做了很多优化,而网络带宽相比于内存带宽要小不少,在一些数据量传输特别大的场景,网络传输会成为一定的瓶颈。为了提升网络传输的效率和提升数据 exchange 的吞吐,一方面可以引入压缩来降低传输数据量,另一方面可以引入 RDMA 来减少一定的开销。经过测试,在一些数据传输量大的场景,有不小的收益。

利用Runtime Filter的优化在不少数据库也有使用。 Join的算子通常是OLAP引擎里最耗时的算子,优化Join算子有两种思路。一种思路是可以提升Join算子的性能。比如对于 HashJoin,可以优化 HashTable 实现,也可以实现更好的哈希算法,包括做一些更好的并行的方式。

image.png

另一种思路是,如果本身算子耗时比较重,可以减少参与算子计算的数据。 Runtime Filter是在一些场景下特别是事实表Join多张维度表的星型模型场景有比较好的效果。在此类场景下,通常事实表的规模会非常大,而大部分的过滤条件都是在维度表上面。
Runtime Filter的作用,是通过在Join的Probe端,提前过滤掉并不会命中Join条件的输入数据,从而大幅减少Join中的数据传输和计算。通过这种方式,能够减少整体的执行时间。因此我们在复杂查询上也支持了Runtime Filter,目前主要支持Min Max和Bloom Filter。如果 runtime filter 的列(join column)构建了索引(主键、skip index…),是需要重新生成 pipeline 的。因为命中索引后,可能会减少数据的读取,pipeline 并行度和对应数据的处理 range 都可能发生变化。如果 runtime filter 的列跟索引无关,可以在计划生成的时候预先带上过滤条件,一开始为空,只是占位,runtime filter 下发的时候把占位信息改成真正的过滤条件即可。这样即使 runtime filter 下发超时了,查询片段已经开始执行,只要查询片段没有执行完,之后的数据仍然可以进行过滤。

但需要注意的是,Runtime Filter是一种特殊场景下的优化,针对场景是右表数据量不大,并且构建的Runtime Filter对左表有比较好的过滤效果。 若右表数据量较大,构建的Runtime Filter的时间比较久,或对左表的数据过滤没有效果。Runtime Filter反而会增加查询的耗时和计算的开销。因此要根据数据的特征和规模来决定是否开启优化。

性能诊断和分析对复杂查询很关键,由于引入了复杂查询的多Stage模型,SQL执行的模式会变得复杂。对此的优化首先是尽可能完善各类Metrics, 包括Query执行时间、不同Stage执行时间、起始时间、结束时间、处理的IO数据量、算子处理的数据、执行情况,以及各类的算子Metrics和一些Profile Events(例如Runtime Filter会有构建时间、过滤数据量等Metrics)。其次,我们记录了反压信息与上下游的队列长度, 以此推断Stage的执行情况和瓶颈。通常可以有如下判断:

  • 输入和输出队列数目同为低或同为高分别表明当前 stage 处理正常或处于被下游反压,此时可以通过反压信息来进一步判断。
  • 当输入和输出队列数目不一样,这可能是出于反压传导的中间状态或者该 stage 就是反压的根源。
  • 如果一个 stage 的输出队列数目很多,且经常被反压,通常是被下游 stage 所影响,所以可以排除它本身是反压根源的可能性,更多关注它的下游。
  • 如果一个 stage 的输出队列数目很少,但其输入队列的数目很高,则表明它有可能是反压的根源。优化目标是提升这个 stage 的处理能力。

总的来说,SQL的场景包罗万象,非常复杂的场景有时还是需要对引擎有一定了解的同学去诊断和分析,给出优化建议。 字节目前也在不断完善这些经验,希望能够通过不断完善Metrics和分析的路径,持续减轻Oncall的负担,在某些场景下能够更加准确地给出优化建议。

效果与展望

根据上述所提,目前执行模型存在三个缺点,我们进行了复杂查询的优化,因此需要验证这种新的模式是否能够解决发现的问题,测试场景如下:

  • 第二阶段计算较复杂,且第一阶段数据较多
  • Hash Join右表是大表
  • 多表Join,模拟复杂Query

以SSB 1T数据作为数据集,环境则是构建了8个节点的集群。

Case1——二阶段计算复杂。 我们看到有一个比较重的计算算子UniqExact,就是count distinct的计算方式,通过Hash表做去重。count distinct默认采用这种算法,当我们使用复杂查询后,Query的执行时间从8.5秒减少到2.198秒。第二阶段 agg uniqExact 算子的合并原本由coordinator单点合并,现在通过按照group by key shuffle后可以由多个节点并行完成。因此通过shuffle减轻了coordinator的 merge agg 压力。
图片

image.png Case2——右表为大表。 由于 ClickHouse 对多表的优化做的还不是很到位。这里采用子查询来下推过滤的条件。在这个case中,Lineorder是一张大表,采用复杂查询的模式以后,Query执行时间从17秒优化到了1.7秒。由于Lineorder是一张大表,通过Shuffle可以将数据按照Join key Shuffle到各Worker节点上,这样就减少了右表构建的压力。

image.png

Case3——多表Join。 开启复杂查询后,Query的执行时间从8.58秒优化到4.464秒,所有的右表都可以同时开始数据的处理和构建。为了和现有模式做对比,复杂查询这里并没有开启 runtime filter,开启 runtime filter 后效果会更好。

image.png

事实上,优化器对复杂查询的性能提升也非常大,通过一些RBO的规则,例如常见的谓词下推、相关子查询的处理等,可以极大提升SQL的执行效率。在复杂查询的模式下,由于有优化器的存在,用户甚至不需要写得非常复杂,优化器自动去完成这些下推和RBO规则优化。

此外,选择用哪一种Join的实现,也会对Join的性能影响较大。若能够满足Join Key分布,使用Colocate Join可以减少左右表Shuffle的传输代价。在多表Join的情况下,Join的顺序和Join的实现方式对执行的时长影响,会比两表Join更大。借助这种数据的统计信息,通过一些CBO的优化,可以得到一个比较好的执行模式。有了优化器,业务同学可以按照业务逻辑来写任何的 SQL,引擎自动计算出相对最优的 SQL 计划并执行,加速查询的执行。

总结一下,ClickHouse目前的执行模式在很多单表的场景下表现非常优异,我们主要针对复杂场景做优化, 通过实现多Stage的模式,实现了Stage之间的数据的传输,从工程实践上做了较多尝试和优化,去提升执行和网络传输的性能。并希望通过完善Metrics和智能诊断来降低SQL分析和调优的门槛。

目前已经实现了第一步,未来字节仍有很多努力的方向。首先,是要继续去提升执行和Exchange的性能。 这里不谈论引擎执行通用的优化,比如更好的索引或者算子的优化,主要是跟复杂查询模式有关。举一个例子,比如 Stage 复用,在 SQL 出现子查询结果被反复使用的场景,比如一些多表 join 和 CTE 场景可能有帮助。通过 Stage 复用可以减少相同数据的多次读取。Stage 复用我们之前就已经支持,但是用的场景比较少,未来准备更灵活和通用。其次,Metrics和智能诊断加强。 SQL的灵活度很高,因此一些复杂查询如果没有Metrics其实几乎很难去做诊断和调优。以上都是字节跳动数据平台在未来会长期的持续去发力的方向。

0
0
0
0
关于作者
相关资源
字节跳动 NoSQL 的实践与探索
随着 NoSQL 的蓬勃发展越来越多的数据存储在了 NoSQL 系统中,并且 NoSQL 和 RDBMS 的界限越来越模糊,各种不同的专用 NoSQL 系统不停涌现,各具特色,形态不一。本次主要分享字节跳动内部和火山引擎 NoSQL 的实践,希望能够给大家一定的启发。
相关产品
评论
未登录
看完啦,登录分享一下感受吧~
暂无评论