Case Class Encoder
编写 Spark Job, 在从旧的 RDD API 到 DataSet API 的过程中,
必然碰到 Encoder 的问题, 比如使用 case class Person(name: String, age: Long)
:
import spark.implicits._ |
我们需要 import spark.implicits._
, 才能编译通过,
因为 Dataset
需要 Person 类的 Encoder :
def as[U : Encoder]: Dataset[U] = Dataset[U](sparkSession, logicalPlan) |
首先看 spark.implicits
, 它是 SQLImplicits
中, 其成员中有一个 implicit
转换 :
implicit def newProductEncoder[T <: Product : TypeTag]: Encoder[T] = Encoders.product[T] |
因为 case class
默认继承了 Product, TypeTag
是用来解决 runtime Type Erasure 问题 的.
主要看 Encoders.product[T]
, 它的定义对应的是 ExpressionEncoder()
实例, 如下 :
def product[T <: Product : TypeTag]: Encoder[T] = ExpressionEncoder() |
因此 Spark 通过 Scala 的 implicits 转换
获得了自动提供的 Encoder,
自定义的 Case Class 得到了对应的 Encoder, 我们 import spark.implicits._
就是这样发挥作用.
参考:
https://stackoverflow.com/questions/12218641/what-is-a-typetag-and-how-do-i-use-it