Hudi / Hbase / Hadoop 3 版本兼容问题

环境:

Hudi: 0.14
Spark: 3.4
HBase: 2.4.9
Hadoop: 3

在使用 Hudi 建表和插入数据到 HDFS 时, 出现错误:

Caused by: java.lang.NoSuchMethodError: org.apache.hadoop.hdfs.client.HdfsDataInputStream.getReadStatistics()Lorg/apache/hadoop/hdfs/DFSInputStream$ReadStatistics;

这种一看像是版本兼容问题, 已经有用户遇到过,
是 Hudi 使用的 HBase, 其依赖的 Hadoop 版本为 v2,
和 v3 存在兼容问题, 需要下载源码重新编译.

接下来, 进行重新编译打包.

下载 hudi 源码

git clone https://github.com/apache/hudi.git
cd hudi
view pom.xml

可以看到使用的依赖版本:

<hadoop.version>2.10.2</hadoop.version>
<hbase.version>2.4.9</hbase.version>

下载 HBase 源码, 重编译

重编译和 install :

git clone https://github.com/apache/hbase
git checkout rel/2.4.9

mvn clean install -Denforcer.skip -DskipTests -Dhadoop.profile=3.0 -Psite-install-step

重编译 hudi

mvn clean package -Dflink1.17 -Dspark3.4 -DskipTests

编译错误:

[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.0:compile (default-compile) on project hudi-common: Compilation failure: Compilation failure: 
[ERROR] ./hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:[261,9] no suitable method found for collect(java.util.stream.Collector<org.apache.hudi.common.model.HoodieColumnRangeMetadata<java.lang.Comparable>,capture#1 of ?,java.util.Map<java.lang.String,org.apache.hudi.common.model.HoodieColumnRangeMetadata<java.lang.Comparable>>>)

这个是 Java Version 高于 1.8 导致的问题, 改回 1.8 即可.

注意: HBase 的编译最好也用 1.8 进行, 避免出现这样的错误:
java.lang.NoSuchMethodError: java.nio.ByteBuffer.position(I)Ljava/nio/ByteBuffer;.

编译和 package 之后, 得到 hudi-spark3.4-bundle.jar, 复制使用即可.

Spark 测试

启动 Spark Shell :
(--jars 参数加上了上面我们重新编译的 hudi bundle 包)

export SPARK_VERSION=3.4
bin/spark-shell --master foo --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' --conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar' --jars /foo/hudi-spark3.4-bundle_2.12-0.14.1.jar

插入数据:

import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.table.HoodieTableConfig._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.keygen.constant.KeyGeneratorOptions._
import org.apache.hudi.common.model.HoodieRecord
import spark.implicits._

val tableName = "trips_table"
val basePath = "hdfs:///foo/trips_table"

val columns = Seq("ts","uuid","rider","driver","fare","city")
val data =
Seq((1695159649087L,"334e26e9-8355-45cc-97c6-c31daf0df330","rider-A","driver-K",19.10,"san_francisco"),
(1695091554788L,"e96c4396-3fad-413a-a942-4cb36106d721","rider-C","driver-M",27.70 ,"san_francisco"),
(1695046462179L,"9909a8b1-2d15-4d3d-8ec9-efc48c536a00","rider-D","driver-L",33.90 ,"san_francisco"),
(1695516137016L,"e3cf430c-889d-4015-bc98-59bdce1e530c","rider-F","driver-P",34.15,"sao_paulo" ),
(1695115999911L,"c8abbe79-8d89-47ea-b4ce-4d224bae5bfa","rider-J","driver-T",17.85,"chennai"));

var inserts = spark.createDataFrame(data).toDF(columns:_*)
inserts.write.format("hudi").
option(PARTITIONPATH_FIELD_NAME.key(), "city").
option(TABLE_NAME, tableName).
mode(Overwrite).
save(basePath)

查询数据:

val tripsDF = spark.read.format("hudi").load(basePath)
tripsDF.createOrReplaceTempView("trips_table")
spark.sql("SELECT uuid, fare, ts, rider, driver, city FROM trips_table WHERE fare > 20.0").show()

+--------------------+-----+-------------+-------+--------+-------------+
| uuid| fare| ts| rider| driver| city|
+--------------------+-----+-------------+-------+--------+-------------+
|9909a8b1-2d15-4d3...| 33.9|1695046462179|rider-D|driver-L|san_francisco|
|e96c4396-3fad-413...| 27.7|1695091554788|rider-C|driver-M|san_francisco|
|e3cf430c-889d-401...|34.15|1695516137016|rider-F|driver-P| sao_paulo|
+--------------------+-----+-------------+-------+--------+-------------+

spark.sql("SELECT _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare FROM trips_table").show()

OK, Hudi 写入到 Hadoop 3 HDFS 完成.

参考

https://www.jianshu.com/p/9622ba2509a2