数据倾斜是 Spark 任务中要处理的常见问题, 其对应的解决方案总结了一下.
快速尝试 提高并行度
假设发现并行的 task 或 partition 数目较小, 可以直接修改 并行度,
试图降低每个 task 处理的数据量, 缓解数据倾斜和加速计算, 快速测试任务结果.
但是, 如果数据倾斜发生在某个 key 上时, 这个方法是无用的, 需要使用其他方法.
这个方法只是处理问题时变动最小的方法, 只能够缓解和减小数据倾斜的影响.
预处理上游 Hive 表数据
假设 Spark 读取的上游 Hive 表存在数据倾斜, 并在进行数据的 shuffle 操作时耗费了许多时间.
那么可以考虑在 Hive 表产出时进行预处理
, 提前进行 shuffle 操作,
这样只需要处理一次, 下游用户的任务不需要再进行 shuffle 操作, 避免了重复计算和不必要的等待时间.
虽然但这不是根本的解决方法, 因为预处理时仍然会有数据倾斜, 但是在数据链路和架构上来说是优化,
是更加优雅与合理的, Spark 等等下游任务不需要重复浪费这个计算时间, 而且后续只需要解决这个预处理程序的数据倾斜即可.
可以直接处理的 少数导致倾斜的 key
假设在少数 key 上存在数据倾斜, 例如这些 key 上都对应着成百上千万条的记录,
比如像 null、-1、0、等等一些异常的日志 key, 这些如果在当前业务上没有意义,
那么可以直接在 where 或 filter 中过滤掉, 不参与后续的计算, 自然这些数据倾斜就不会产生了.
key 增加前缀 + 数据膨胀
现实中数据倾斜发生的 key 往往不会仅仅发生在无用的 key 上, 因此上面的方法是不够用的.
这些 key 可能是有用的, 是与当前业务计算相关的, 比如热门商品 key 、热门视频 key 等等,
这些 key 注定了其数据就是会比其他 key 多出许多, 意味着我们需要处理这些数据倾斜的 key.
一个常用方法是: 对 key 增加随机前缀, 搭配数据膨胀的方法, 使得倾斜数据得以切分.
这里存在几种情况:
- 聚合类的 shuffle, 这种情况下只需要对 key 加上随机前缀,
先进行第一阶段的局部聚合, 然后(去掉前缀)进行第二阶段的全局聚合. - join 类型的 shuffle, 这种情况则需要进行数据膨胀了.
join 类型的 shuffle 又存在几种情况:
- 如果倾斜的 key 是少数, 那么可以考虑先进行采样, 得到这些 key 后, 对这些 key 特殊处理,
首先加上一个 n 以内的随机前缀, 再对 join 的另一侧的数据膨胀 n 倍,
依此加上 0 到 (n-1) 的前缀, 进行数据膨胀后再 join. - 如果倾斜的 key 比较多, 此时只能统一处理了.
这个方法可以根本解决数据倾斜问题, 虽然增加了数据量, 但是最终的任务时间得以大大的缩短.
map join 小表
还有一种情况, 可以避免数据倾斜, 那就是和 小表 join 的时候.
当 join 操作的其中一个表或 RDD 数据量比较小时(比如一两GB), 那么可以
使用 Broadcast 变量和 map() 算子实现 join 操作, 从而避免掉 shuffle 操作.
这是一种很实用的方法, 我之前也在工作中使用到, 小表往往是维表类型.