Hive 合并 input files

建设数据仓库时, 有些上游表是自己无法控制的, 可能是历史原因, 可能是其他业务组的表.
当这些上游表是由大量小文件组成时, 读取这些数据和启动对应的 mapper 数目会影响数据处理的效率.

打印相关参数:

set hive.input.format;
set mapreduce.input.fileinputformat.split.maxsize;
set mapreduce.input.fileinputformat.split.minsize;
set mapreduce.input.fileinputformat.split.minsize.per.node;
set mapreduce.input.fileinputformat.split.minsize.per.rack;
set hive.merge.mapfiles;
set hive.merge.mapredfiles;
set hive.merge.size.per.task;
set hive.merge.smallfiles.avgsize;

造数据

set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.reducers.bytes.per.reducer=512000;

CREATE TABLE test_1 (a bigint, b string)
partitioned by(part_1 string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';

insert overwrite table test_1 partition(part_1)
SELECT CAST(rand() 30000000 AS bigint) AS a,
CAST(rand()
100 AS string) AS b,
'data_input' AS part_1
from (select 1) x
lateral view posexplode (split (space (90000),' ')) e as i,x
distribute by cast(rand()*5 as int);

写入后文件有 5 个, 每个文件 500KB 左右.

CombineHiveInputFormat

当不使用 CombineHiveInputFormat 时, 通过日志或 WebUI 可知,
Stage-1 的 number of splitsnumber of mappers 会有 5 个.

set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;

运行统计 Jobs :

select count(1) from (
select a, count(1) from test_1 where part_1='data_input' group by a
) a;

而切换到 CombineHiveInputFormat 之后, 重跑 SQL ,
可以发现 number of splitsnumber of mappers 降低到 5 个以下:

set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

也就是其中有些文件被合并到 一个 inputSplit 中去了.

进一步合并

如果要再进一步合并, 可以设置 per.node 和 per.rack 上的 minsize,
让同一个 node 和同一个 rack 上的文件合并, 达到设置的 split 大小:

set mapreduce.input.fileinputformat.split.minsize.per.node=256000000;
set mapreduce.input.fileinputformat.split.minsize.per.rack=256000000;

重跑 SQL 后可以发现, number of splitsnumber of mappers 降低到 1 个.

这里用的 split 的大小是默认参数, 具体还可以加上以下两个参数进行控制:

set mapreduce.input.fileinputformat.split.maxsize;  
set mapreduce.input.fileinputformat.split.minsize;

了解这些以后, 设置合理的参数可以使 数据作业更稳定, 更好的应对不同的上游数据.

参考