今天以 LogisticRegression 为例, 梳理一下 Spark ML 的主体流程:
val lr = new LogisticRegression() |
fit(dataset: Dataset[_]) 里将会调用 train(), 核心的计算过程就在 train() 里.
train() 主体
数据输入
首先是转换格式:
val instances: RDD[Instance] = |
这里默认使用 StorageLevel.MEMORY_AND_DISK
输入数据格式: col($(labelCol)), w, col($(featuresCol))
转换成 Instance(label, weight, features)
,
label 是训练数据的标签, 比如: 0
, 1
等等,
features 是 SparseVector, 比如数据格式 :
indices : |
costFun
构造好 instances
RDD 后, 将定义 costFun
:
val getAggregatorFunc = new LogisticAggregator(bcFeaturesStd, numClasses, $(fitIntercept), |
costFun 是机器学习的核心, 训练的目的就是为了是 costFun 的代价达到最优.
迭代和收敛
有了 costFun 后, 剩下的就是训练迭代 :
val states = optimizer.iterations(new CachedDiffFunction(costFun), ...) |
optimizer
这里是 OWLQN
类, 父类: OWLQN -> LBFGS -> FirstOrderMinimizer,
optimizer.iterations() 对应 FirstOrderMinimizer.iterations() :
def iterations(f: DF, init: T): Iterator[State] = { |
这里将一直迭代到收敛条件为止, convergenceCheck 进行收敛检查,
实例是 SequenceConvergenceCheck, 由于我们设置了 maxIter
的条件,
所以 OWLQN
里边初始化时设置了基于 maxIter
的 ConvergenceCheck
,
通过 FirstOrderMinimizer.defaultConvergenceCheck(maxIter, tolerance)
.
其中的细节
上面说了主要流程, 其中有一些细节.
RDDLossFunction
org.apache.spark.ml.optim.loss.RDDLossFunction
RDDLossFunction.calculate()
override def calculate(coefficients: BDV[Double]): (Double, BDV[Double]) = { |
BDV 是 DenseVector, import breeze.linalg.{DenseVector => BDV}
treeAggregate() 是 RDD.treeAggregate()
getAggregator 是 LogisticAggregator, LogisticAggregator 接收 x 并计算梯度和 Loss , 并且两个 LogisticAggregator 可以进行 merge, 在 RDD.treeAggregate() 中使用
Agg 是 LogisticAggregator
regularization 是 L2Regularization
RDD.treeAggregate()
def treeAggregate[U: ClassTag](zeroValue: U)( |
这是一个关键的聚合函数, 在 Spark ML 中被大量使用, 也是涉及到 RDD 的分布式计算部分, treeAggregate() 相比 RDD.aggregate() 可以降低 Driver 压力和提高性能, 通过在 Executor 端的提前合并和统计, 它有一个 depth 参数可以控制这个聚合操作树的深度 (默认为2).