Hive 分区写入指定文件数 以及陷阱

Hive 写入动态分区时, 常常遇到小文件和文件数量过大的问题,

常用方法是:

  1. 利用 distribute by 来进行控制数据分发到 reducer 写文件的过程
  2. 为了避免出现数据倾斜, distribute by 后面会跟上一个 rand() 随机数
  3. 若要控制文件数, 可以在这个 rand() 上乘以一个数字(即文件数), 并转换为整型.
  4. rand() 有个细节问题要改, 可能丢失数据, 见最后一节.

实操

建个表:

CREATE TABLE test_1 (a bigint, b string) 
partitioned by(part_1 string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS PARQUET;

造些数据:

set hive.exec.dynamic.partition.mode=nonstrict;

insert overwrite table test_1 partition(part_1)
SELECT CAST(rand() * 3000000 AS bigint) AS a,
CAST(rand() * 100 AS string) AS b
, 'data_input' as part_1
from (select 1) x
lateral view posexplode (split (space (9000000),' ')) e as i,x ;

预估总的 input 数据量后, 可以设置一个 reducer 处理的数据量(bytes):

set hive.exec.reducers.bytes.per.reducer=...;

动态分区插入, 设置分区下的文件数, 比如设置为 5:

insert overwrite table test_1 partition(part_1)
select a,b,cast(cast(rand()*5 as int) as string) as part_1
from test_1
where part_1='data_input'
distribute by cast(rand()*5 as int);

写入分区后, 可以看到分区下的文件数目 :

> hdfs dfs -ls /user/hive/warehouse/hive_niko_test.db/test_1/part_1=0 | awk '{print $1 " " $8;}' 
Found
-rw-r--r-- /user/hive/warehouse/hive_niko_test.db/test_1/part_1=0/000000_0
-rw-r--r-- /user/hive/warehouse/hive_niko_test.db/test_1/part_1=0/000001_0
-rw-r--r-- /user/hive/warehouse/hive_niko_test.db/test_1/part_1=0/000002_0
-rw-r--r-- /user/hive/warehouse/hive_niko_test.db/test_1/part_1=0/000003_0
-rw-r--r-- /user/hive/warehouse/hive_niko_test.db/test_1/part_1=0/000004_0

bin/hdfs dfs -ls /user/hive/warehouse/hive_niko_test.db/test_1/part_1=1 | awk '{print $1 " " $8;}'
Found
-rw-r--r-- /user/hive/warehouse/hive_niko_test.db/test_1/part_1=1/000000_0
-rw-r--r-- /user/hive/warehouse/hive_niko_test.db/test_1/part_1=1/000001_0
-rw-r--r-- /user/hive/warehouse/hive_niko_test.db/test_1/part_1=1/000002_0
-rw-r--r-- /user/hive/warehouse/hive_niko_test.db/test_1/part_1=1/000003_0
-rw-r--r-- /user/hive/warehouse/hive_niko_test.db/test_1/part_1=1/000004_0

...

运行时可以输出执行计划, 进行对比 :

set hive.log.explain.output=true;
  • 没有 distribute by 时:

    Stage-1 is a root stage [MAPRED]
    Stage-0 depends on stages: Stage-1 [MOVE]
    Stage-2 depends on stages: Stage-0 [STATS]
  • 有 distribute by 时:

    Stage-1 is a root stage [MAPRED] 
    Stage-0 depends on stages: Stage-1 [MOVE]
    Stage-3 depends on stages: Stage-1 [MAPRED]
    Stage-2 depends on stages: Stage-0, Stage-3 [STATS]

可以看到多了一个 Stage-3.

rand() 可能丢失数据的陷阱

因为 map reduce 过程中, 某些中间任务可能失败, 使得上游 task 重跑,
rand() 被重新调用, 重新生成的随机数的序列会变化, 因此导致数据被发到错误的下游 reducer.
所以 rand() 最好改为 rand(int seed), 生成可重现的序列.