在大数据量中 Spark 数据倾斜问题定位排查及解决|社区征文

2023总结大数据

1. 开篇

2023年即将过去,又到了一年一度的技术总结时刻,在这一年,参与了多个大数据项目的开发建设工作,也参与了几个数仓项目的治理优化工作,在这么多的项目中,让我印象比较深刻的就是在使用Spark引擎执行任务出现的报错现象,接下来就回顾复盘下这次任务报错现象及具体的解决方案。

2. 问题描述

因为现在大多数的批量任务都是使用Spark去执行,所以Spark的地位在公司是举足轻重,那么对于Spark的深入理解和优化显得尤为重要,部门人员都在深入学习Spark的执行过程,底层原理等,以期待遇到问题之后能够快速解决。

下面对于某次Spark任务执行过程中报错原因描述。

目前公司DWD层及之后的表都是Iceberg表,因为我们的业务特性,需要对数据进行行级更新和删除,传统的Hive表不支持行级数据操作,粒度都是表级的,如果采用传统Hive表形式,每次对数据进行更新的成本是非常高的,需要全表数据参与,后面经过调研,发现Iceberg是支持行级更新,并且和Spark结合的比较好,经过测试之后发现没有问题,后面数仓整体就迁到了Iceberg中。

这次任务的执行语句描述:将ODS层的表按照主键去重后插入到DWD层中,表为分区表,DWD层表格式是iceberg格式。

insert overwrite table hive_prod.dwd_xml.dwd_xml_order_cn

select 

pid

,app_date_o

,app_date_s

,app_docnumber_o

,app_docnumber_s

,app_number

,filename

...

,from_unixtime(unix_timestamp(),'yyyy-MM-dd HH:dd:ss'as update_time

,cleandate

,etldate

from (

select t1.*

,row_number() over(partition by pid order by etldate desc,filename descas rn

from hive_prod.ods_xml.ods_xml_order_cn t1) t2 where rn=1

order by cleandate,etldate;

iceberg格式的表可以不显示的指定表分区字段,但是要求在写入分区表之前根据每个任务(Spark 分区)的分区规范对分区字段进行排序,上述sql中cleandate,etldate是分区字段。

等待几分钟,报错:

picture.image

查看Spark UI:

picture.image

发现task id是445的任务处理的数据量远远大于其他的任务。考虑到时数据倾斜问题。

查看此任务的日志:

picture.image

出现内存溢出。

多次测试上述sql,在集群资源空闲很多时,偶尔可以执行成功,但是执行时间超过25分钟。

picture.image

3. 分析推断

初步分析Spark的每个task任务处理的数量和每个分区的数量有关。

以下是统计的表中每个分区的数据量:

picture.image

下图是Spark处理时的task任务:

picture.image

发现:表中的分区数量和Task任务数量是对应的,也就是一个分区Spark只起了一个Task任务。

表中的分区数据分布不均匀,20221213这个分区的数量是8000多万,一个Task处理肯定会出现数据倾斜。

但即使分区数据分布均衡,但是每个分区数据量很大也会有问题,假设表就10个分区,如果每个分区数据量都是1亿数据,那么最终一个Task处理1亿数据,还是会有内存溢出风险。

4. 调查原因

点击进入Spark UI界面,找到SQL一栏,进入我们执行的SQL语句中:

picture.image

调查发现,在Spark中执行iceberg分区表时,如果有全局排序操作,那么会使得同一分区的数据进入到一个task任务中进行排序,如果某个分区数据量比较大,就会导致任务执行非常慢,或者报错,如下图,溢写到磁盘中的数据已经43.9G了。

picture.image

5. 解决方案

只要找到原因,解决就比较简单了

解决方案一:

还是采用这种动态分区方式,但将全局排序order by改为局部排序sort by,因为iceberg不要求全局排序,只保证每个reduce内的数据有序即可。

改造代码如下:

insert overwrite table hive_prod.dwd_xml.dwd_xml_order_cn

select 

pid

,app_date_o

,app_date_s

,app_docnumber_o

,app_docnumber_s

,app_number

,filename

...

,from_unixtime(unix_timestamp(),'yyyy-MM-dd HH:dd:ss'as update_time

,cleandate

,etldate

from (

select t1.*

,row_number() over(partition by pid order by etldate desc,filename descas rn

from hive_prod.ods_xml.ods_xml_order_cn t1) t2 where rn=1

sort by cleandate,etldate;

上述代码只把order by改为了sort by。

再次执行,在Spark UI上查看执行过程

Task数量变多了,并且每个Task的数量分布很均衡:

picture.image

整个跑批时间缩短至不到4分钟:

picture.image

解决方案二:

既然是排序操作导致的,那就不要排序了,但是如果不要排序,就不能采用动态分区,将出现数据倾斜的分区采用静态分区方式导入,代码如下:


insert overwrite table hive_prod.dwd_xml.dwd_xml_order_cn

partition (cleandate='20221213',etldate='20221205')

select 

pid

,app_date_o

,app_date_s

,app_docnumber_o

,app_docnumber_s

,app_number

,filename

...

,from_unixtime(unix_timestamp(),'yyyy-MM-dd HH:dd:ss'as update_time

,cleandate

,etldate

from (

select t1.*

,row_number() over(partition by pid order by etldate desc,filename descas rn

from hive_prod.ods_xml.ods_xml_order_cn t1) t2 where rn=1

and cleandate='20221213' and  etldate = '20221205' 

其余分区可以采用动态分区。

6. 总结

对于数据倾斜的问题,在大数据中是很容易出现的,数据倾斜其实就是部分数据分区的数据量远大于其他分区,导致计算任务无法充分利用集群资源,从而影响严重整体性能。

对于数据倾斜问题,结合上述案例做一个总结:

首先第一步就是要进行原因分析

  1. 数据分布不均匀:数据本身存在不均匀的特点,例如某些键值对的数量远远超过其他键值对。

  2. 数据倾斜引起的操作:某些操作(例如groupByKey、reduceByKey等)可能会导致数据倾斜,特别是在数据经过多次shuffle的情况下。

第二步就是找解决方案

在预处理阶段:

  1. 均匀分布数据:可以通过一些预处理方法来尽量使数据分布均匀,例如使用salting技术给键添加随机前缀。

  2. 数据重分区:通过重新分区数据,将数据均匀地分布到多个分区中,减少数据倾斜的可能性。

在运行时阶段:

  1. 增加分区数:通过增加分区数来提高并行度,从而减轻数据倾斜的影响。

  2. 使用聚合操作代替groupByKey:groupByKey操作容易导致数据倾斜,可以尝试使用聚合操作(如reduceByKey、combineByKey)来替代。

  3. 使用自定义分区器:根据数据的特点,编写自定义分区器,将数据均匀地分布到多个分区中。

  4. 增加缓存:对于一些频繁使用的数据,可以将其缓存到内存中,减少重复计算和shuffle操作。

第三步就是监控与调优

  1. 监控任务进度:实时监控任务的进度,发现倾斜问题及时采取相应措施。

  2. 数据采样与分析:对倾斜的数据进行采样和分析,找出导致倾斜的原因,有针对性地解决问题。

  3. 动态调整资源:根据倾斜程度,动态调整集群资源分配,增加倾斜数据所在分区的计算资源。

本文首发于 InfoQ 写作平台,原文链接:在大数据量中Spark数据倾斜问题定位排查及解决_大数据_五分钟学大数据_InfoQ写作社区

0
0
0
0
评论
未登录
看完啦,登录分享一下感受吧~
暂无评论