1. 开篇
2023年即将过去,又到了一年一度的技术总结时刻,在这一年,参与了多个大数据项目的开发建设工作,也参与了几个数仓项目的治理优化工作,在这么多的项目中,让我印象比较深刻的就是在使用Spark引擎执行任务出现的报错现象,接下来就回顾复盘下这次任务报错现象及具体的解决方案。
2. 问题描述
因为现在大多数的批量任务都是使用Spark去执行,所以Spark的地位在公司是举足轻重,那么对于Spark的深入理解和优化显得尤为重要,部门人员都在深入学习Spark的执行过程,底层原理等,以期待遇到问题之后能够快速解决。
下面对于某次Spark任务执行过程中报错原因描述。
目前公司DWD层及之后的表都是Iceberg表,因为我们的业务特性,需要对数据进行行级更新和删除,传统的Hive表不支持行级数据操作,粒度都是表级的,如果采用传统Hive表形式,每次对数据进行更新的成本是非常高的,需要全表数据参与,后面经过调研,发现Iceberg是支持行级更新,并且和Spark结合的比较好,经过测试之后发现没有问题,后面数仓整体就迁到了Iceberg中。
这次任务的执行语句描述:将ODS层的表按照主键去重后插入到DWD层中,表为分区表,DWD层表格式是iceberg格式。
insert overwrite table hive_prod.dwd_xml.dwd_xml_order_cn
select
pid
,app_date_o
,app_date_s
,app_docnumber_o
,app_docnumber_s
,app_number
,filename
...
,from_unixtime(unix_timestamp(),'yyyy-MM-dd HH:dd:ss') as update_time
,cleandate
,etldate
from (
select t1.*
,row_number() over(partition by pid order by etldate desc,filename desc) as rn
from hive_prod.ods_xml.ods_xml_order_cn t1) t2 where rn=1
order by cleandate,etldate;
iceberg格式的表可以不显示的指定表分区字段,但是要求在写入分区表之前根据每个任务(Spark 分区)的分区规范对分区字段进行排序,上述sql中cleandate,etldate是分区字段。
等待几分钟,报错:
查看Spark UI:
发现task id是445的任务处理的数据量远远大于其他的任务。考虑到时数据倾斜问题。
查看此任务的日志:
出现内存溢出。
多次测试上述sql,在集群资源空闲很多时,偶尔可以执行成功,但是执行时间超过25分钟。
3. 分析推断
初步分析Spark的每个task任务处理的数量和每个分区的数量有关。
以下是统计的表中每个分区的数据量:
下图是Spark处理时的task任务:
发现:表中的分区数量和Task任务数量是对应的,也就是一个分区Spark只起了一个Task任务。
表中的分区数据分布不均匀,20221213这个分区的数量是8000多万,一个Task处理肯定会出现数据倾斜。
但即使分区数据分布均衡,但是每个分区数据量很大也会有问题,假设表就10个分区,如果每个分区数据量都是1亿数据,那么最终一个Task处理1亿数据,还是会有内存溢出风险。
4. 调查原因
点击进入Spark UI界面,找到SQL一栏,进入我们执行的SQL语句中:
调查发现,在Spark中执行iceberg分区表时,如果有全局排序操作,那么会使得同一分区的数据进入到一个task任务中进行排序,如果某个分区数据量比较大,就会导致任务执行非常慢,或者报错,如下图,溢写到磁盘中的数据已经43.9G了。
5. 解决方案
只要找到原因,解决就比较简单了
解决方案一:
还是采用这种动态分区方式,但将全局排序order by改为局部排序sort by,因为iceberg不要求全局排序,只保证每个reduce内的数据有序即可。
改造代码如下:
insert overwrite table hive_prod.dwd_xml.dwd_xml_order_cn
select
pid
,app_date_o
,app_date_s
,app_docnumber_o
,app_docnumber_s
,app_number
,filename
...
,from_unixtime(unix_timestamp(),'yyyy-MM-dd HH:dd:ss') as update_time
,cleandate
,etldate
from (
select t1.*
,row_number() over(partition by pid order by etldate desc,filename desc) as rn
from hive_prod.ods_xml.ods_xml_order_cn t1) t2 where rn=1
sort by cleandate,etldate;
上述代码只把order by改为了sort by。
再次执行,在Spark UI上查看执行过程
Task数量变多了,并且每个Task的数量分布很均衡:
整个跑批时间缩短至不到4分钟:
解决方案二:
既然是排序操作导致的,那就不要排序了,但是如果不要排序,就不能采用动态分区,将出现数据倾斜的分区采用静态分区方式导入,代码如下:
insert overwrite table hive_prod.dwd_xml.dwd_xml_order_cn
partition (cleandate='20221213',etldate='20221205')
select
pid
,app_date_o
,app_date_s
,app_docnumber_o
,app_docnumber_s
,app_number
,filename
...
,from_unixtime(unix_timestamp(),'yyyy-MM-dd HH:dd:ss') as update_time
,cleandate
,etldate
from (
select t1.*
,row_number() over(partition by pid order by etldate desc,filename desc) as rn
from hive_prod.ods_xml.ods_xml_order_cn t1) t2 where rn=1
and cleandate='20221213' and etldate = '20221205'
其余分区可以采用动态分区。
6. 总结
对于数据倾斜的问题,在大数据中是很容易出现的,数据倾斜其实就是部分数据分区的数据量远大于其他分区,导致计算任务无法充分利用集群资源,从而影响严重整体性能。
对于数据倾斜问题,结合上述案例做一个总结:
首先第一步就是要进行原因分析:
-
数据分布不均匀:数据本身存在不均匀的特点,例如某些键值对的数量远远超过其他键值对。
-
数据倾斜引起的操作:某些操作(例如groupByKey、reduceByKey等)可能会导致数据倾斜,特别是在数据经过多次shuffle的情况下。
第二步就是找解决方案:
在预处理阶段:
-
均匀分布数据:可以通过一些预处理方法来尽量使数据分布均匀,例如使用salting技术给键添加随机前缀。
-
数据重分区:通过重新分区数据,将数据均匀地分布到多个分区中,减少数据倾斜的可能性。
在运行时阶段:
-
增加分区数:通过增加分区数来提高并行度,从而减轻数据倾斜的影响。
-
使用聚合操作代替groupByKey:groupByKey操作容易导致数据倾斜,可以尝试使用聚合操作(如reduceByKey、combineByKey)来替代。
-
使用自定义分区器:根据数据的特点,编写自定义分区器,将数据均匀地分布到多个分区中。
-
增加缓存:对于一些频繁使用的数据,可以将其缓存到内存中,减少重复计算和shuffle操作。
第三步就是监控与调优:
-
监控任务进度:实时监控任务的进度,发现倾斜问题及时采取相应措施。
-
数据采样与分析:对倾斜的数据进行采样和分析,找出导致倾斜的原因,有针对性地解决问题。
-
动态调整资源:根据倾斜程度,动态调整集群资源分配,增加倾斜数据所在分区的计算资源。
本文首发于 InfoQ 写作平台,原文链接:在大数据量中Spark数据倾斜问题定位排查及解决_大数据_五分钟学大数据_InfoQ写作社区