Spark ML 主要计算过程 (LR)

今天以 LogisticRegression 为例, 梳理一下 Spark ML 的主体流程:

val lr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.3)
.setElasticNetParam(0.8)
val lrModel = lr.fit(training)

fit(dataset: Dataset[_]) 里将会调用 train(), 核心的计算过程就在 train() 里.

train() 主体

数据输入

首先是转换格式:

   val instances: RDD[Instance] =
dataset.select(col($(labelCol)), w, col($(featuresCol))).rdd.map {
case Row(label: Double, weight: Double, features: Vector) =>
Instance(label, weight, features)
}

instances.persist(StorageLevel.MEMORY_AND_DISK)

这里默认使用 StorageLevel.MEMORY_AND_DISK

输入数据格式: col($(labelCol)), w, col($(featuresCol))
转换成 Instance(label, weight, features),

label 是训练数据的标签, 比如: 0, 1 等等,

features 是 SparseVector, 比如数据格式 :

indices :
[127, 128, ..]
values :
[51.0, 159.0, ..]

costFun

构造好 instances RDD 后, 将定义 costFun :

val getAggregatorFunc = new LogisticAggregator(bcFeaturesStd, numClasses, $(fitIntercept),
multinomial = isMultinomial)(_)

val costFun = new RDDLossFunction(instances, getAggregatorFunc, regularization,
$(aggregationDepth))

costFun 是机器学习的核心, 训练的目的就是为了是 costFun 的代价达到最优.

迭代和收敛

有了 costFun 后, 剩下的就是训练迭代 :

val states = optimizer.iterations(new CachedDiffFunction(costFun), ...)

optimizer 这里是 OWLQN 类, 父类: OWLQN -> LBFGS -> FirstOrderMinimizer,

optimizer.iterations() 对应 FirstOrderMinimizer.iterations() :

def iterations(f: DF, init: T): Iterator[State] = {
val adjustedFun = adjustFunction(f)
infiniteIterations(f, initialState(adjustedFun, init)).takeUpToWhere{s =>
convergenceCheck.apply(s, s.convergenceInfo) match {
case Some(converged) =>
logger.info(s"Converged because ${converged.reason}")
true
case None =>
false
}
}
}

这里将一直迭代到收敛条件为止, convergenceCheck 进行收敛检查,
实例是 SequenceConvergenceCheck, 由于我们设置了 maxIter 的条件,
所以 OWLQN 里边初始化时设置了基于 maxIterConvergenceCheck,
通过 FirstOrderMinimizer.defaultConvergenceCheck(maxIter, tolerance).

其中的细节

上面说了主要流程, 其中有一些细节.

RDDLossFunction

org.apache.spark.ml.optim.loss.RDDLossFunction
RDDLossFunction.calculate()

override def calculate(coefficients: BDV[Double]): (Double, BDV[Double]) = {
val bcCoefficients = instances.context.broadcast(Vectors.fromBreeze(coefficients))
val thisAgg = getAggregator(bcCoefficients)
val seqOp = (agg: Agg, x: T) => agg.add(x)
val combOp = (agg1: Agg, agg2: Agg) => agg1.merge(agg2)
val newAgg = instances.treeAggregate(thisAgg)(seqOp, combOp, aggregationDepth)
val gradient = newAgg.gradient
val regLoss = regularization.map { regFun =>
val (regLoss, regGradient) = regFun.calculate(Vectors.fromBreeze(coefficients))
BLAS.axpy(1.0, regGradient, gradient)
regLoss
}.getOrElse(0.0)
bcCoefficients.destroy(blocking = false)
(newAgg.loss + regLoss, gradient.asBreeze.toDenseVector)
}

BDV 是 DenseVector, import breeze.linalg.{DenseVector => BDV}
treeAggregate() 是 RDD.treeAggregate()
getAggregator 是 LogisticAggregator, LogisticAggregator 接收 x 并计算梯度和 Loss , 并且两个 LogisticAggregator 可以进行 merge, 在 RDD.treeAggregate() 中使用
Agg 是 LogisticAggregator
regularization 是 L2Regularization

RDD.treeAggregate()

def treeAggregate[U: ClassTag](zeroValue: U)(
seqOp: (U, T) => U,
combOp: (U, U) => U,
depth: Int = 2): U

这是一个关键的聚合函数, 在 Spark ML 中被大量使用, 也是涉及到 RDD 的分布式计算部分, treeAggregate() 相比 RDD.aggregate() 可以降低 Driver 压力和提高性能, 通过在 Executor 端的提前合并和统计, 它有一个 depth 参数可以控制这个聚合操作树的深度 (默认为2).