MR 引擎 - COUNT(DISTINCT) SQL 优化

做数据仓库的同学, 在很多情况下都是使用 SQL 来完成一些任务, 查看一下数据的统计情况, 比如使用 groupBy 和 count.
但是有时贪图方便没有注意, 写 SQL 时就直接用 count(distinct foo_field) 进行查询, 这时如果表的数据量比较大就会出现问题.

示例


举个例子, 查询用户数:

SELECT 
COUNT(DISTINCT uid)
FROM log_table

当表的数据量大时, 运行上面的代码之后会发现, 想要的结果很长时间没有出来, 甚至无法跑出来.

原因


通过日志和 Job Server 的信息可以发现, stage 只有一个, reducer 也只有一个.

用 explain 看执行计划也可以发现 (以下输出只留了部分关键内容):

+-----------------------------------------------------------------------------------------------------+--+
| Explain |
+-----------------------------------------------------------------------------------------------------+--+
| STAGE DEPENDENCIES: |
| Stage-1 is a root stage |
| Stage-0 depends on stages: Stage-1 |
| |
| STAGE PLANS: |
| Stage: Stage-1 |
| Map Reduce |
| Map Operator Tree: |
| TableScan |
| Select Operator |
| expressions: uid (type: bigint) |
| outputColumnNames: _col0 |
| Group By Operator |
| aggregations: count(DISTINCT _col0) |
| keys: _col0 (type: bigint) |
| mode: hash |
| outputColumnNames: _col0, _col1 |
| Reduce Output Operator |
| key expressions: _col0 (type: bigint) |
| sort order: + |
| Reduce Operator Tree: |
| Group By Operator |
| aggregations: count(DISTINCT KEY._col0:0._col0) |
| mode: mergepartial |
| outputColumnNames: _col0 |
| File Output Operator |
| |
| Stage: Stage-0 |
| Fetch Operator |
| limit: -1 |
| Processor Tree: |
| ListSink |
| |
+-----------------------------------------------------------------------------------------------------+--+

可以看到, 除了最后的 Fetch, 只有一个 stage, 其中 map side 进行 count, 之后只有一个 reducer 进行聚合计算 total count.

修改


SELECT 
COUNT(uid)
FROM (SELECT uid FROM log_table GROUP BY uid) a

再看 explain, 可以发现, stage 多了一个:

+-----------------------------------------------------------------------------------------------------+--+
| Explain |
+-----------------------------------------------------------------------------------------------------+--+
| STAGE DEPENDENCIES: |
| Stage-1 is a root stage |
| Stage-2 depends on stages: Stage-1 |
| Stage-0 depends on stages: Stage-2 |
| |
| STAGE PLANS: |
| Stage: Stage-1 |
| Map Reduce |
| Map Operator Tree: |
| TableScan |
| Select Operator |
| expressions: uid (type: bigint) |
| outputColumnNames: uid |
| Group By Operator |
| keys: uid (type: bigint) |
| mode: hash |
| outputColumnNames: _col0 |
| Reduce Output Operator |
| key expressions: _col0 (type: bigint) |
| sort order: + |
| Map-reduce partition columns: _col0 (type: bigint) |
| Reduce Operator Tree: |
| Group By Operator |
| keys: KEY._col0 (type: bigint) |
| mode: mergepartial |
| outputColumnNames: _col0 |
| Group By Operator |
| aggregations: count(_col0) |
| mode: hash |
| outputColumnNames: _col0 |
| File Output Operator |
| |
| Stage: Stage-2 |
| Map Reduce |
| Map Operator Tree: |
| TableScan |
| Reduce Output Operator |
| sort order: |
| value expressions: _col0 (type: bigint) |
| Reduce Operator Tree: |
| Group By Operator |
| aggregations: count(VALUE._col0) |
| mode: mergepartial |
| outputColumnNames: _col0 |
| File Output Operator |
| |
| Stage: Stage-0 |
| Fetch Operator |
| limit: -1 |
| Processor Tree: |
| ListSink |
| |
+-----------------------------------------------------------------------------------------------------+--+

可以看到, 由于 hive.map.aggr 默认为 true, Mapper 端对 uid 进行 hash 方式的 partial aggregate, Reducer 进行 merge partial 并 count, 然后再来一遍 Map Reduce.

由于对数据进行了分组, 之后的 Reducer 数增加了, 不再是一个 Reducer 对接这么多 Mapper, 尤其是 uid 这种值范围比较大的类型和数据量比较大的情况, 虽然增加了 stage 和网络数据传输, 这时的任务是比 count(distinct) 快的.

需要注意的是, tez 引擎的话是会优化 count(distinct) 的, 新版本 Hive 也可以开启 count(distinct) 优化 : HIVE-16654, 具体情况需要看正在使用的 Hive 版本和执行计划, 以及Job Server 上的最终的执行任务.

参考