Spark implicits - 【Case Class Encoder】

Case Class Encoder

编写 Spark Job, 在从旧的 RDD API 到 DataSet API 的过程中,
必然碰到 Encoder 的问题, 比如使用 case class Person(name: String, age: Long) :

import spark.implicits._
val ds = spark.read.json(Constants.FP_EMPLOYEE).as[Person]

我们需要 import spark.implicits._, 才能编译通过,
因为 Dataset 需要 Person 类的 Encoder :

def as[U : Encoder]: Dataset[U] = Dataset[U](sparkSession, logicalPlan)

首先看 spark.implicits, 它是 SQLImplicits 中, 其成员中有一个 implicit 转换 :

implicit def newProductEncoder[T <: Product : TypeTag]: Encoder[T] = Encoders.product[T]

因为 case class 默认继承了 Product, TypeTag 是用来解决 runtime Type Erasure 问题 的.
主要看 Encoders.product[T], 它的定义对应的是 ExpressionEncoder() 实例, 如下 :

def product[T <: Product : TypeTag]: Encoder[T] = ExpressionEncoder()

因此 Spark 通过 Scala 的 implicits 转换 获得了自动提供的 Encoder,
自定义的 Case Class 得到了对应的 Encoder, 我们 import spark.implicits._ 就是这样发挥作用.

参考:
https://stackoverflow.com/questions/12218641/what-is-a-typetag-and-how-do-i-use-it