【Spark】 aggregate 和 treeAggregate 的区别

aggregate()


def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {
// Clone the zero value since we will also be serializing it as part of tasks
var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())
val cleanSeqOp = sc.clean(seqOp)
val cleanCombOp = sc.clean(combOp)
val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)
sc.runJob(this, aggregatePartition, mergeResult)
jobResult
}

有 3 个参数:
zeroValue 是初始值.
seqOp 是在一个 partition 中进行 accumulate results 的函数.
combOp 是用来 combine results 的函数.

如上可知 :
aggregatePartition 是在每个 RDD 的 partition 数据上 apply 的函数.
跟踪 combOp , 它被封装为 mergeResult 函数作为 resultHandler 传入 runJob() ,
可就是说 aggregate() 最终是在 Driver 端合并所有分区的 results.

treeAggregate()

再来看 treeAggregate():

def treeAggregate[U: ClassTag](zeroValue: U)(
seqOp: (U, T) => U,
combOp: (U, U) => U,
depth: Int = 2): U = withScope {
require(depth >= 1, s"Depth must be greater than or equal to 1 but got $depth.")
if (partitions.length == 0) {
Utils.clone(zeroValue, context.env.closureSerializer.newInstance())
} else {
val cleanSeqOp = context.clean(seqOp)
val cleanCombOp = context.clean(combOp)
val aggregatePartition =
(it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
var partiallyAggregated: RDD[U] = mapPartitions(it => Iterator(aggregatePartition(it)))
var numPartitions = partiallyAggregated.partitions.length
val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / depth)).toInt, 2)
// If creating an extra level doesn't help reduce
// the wall-clock time, we stop tree aggregation.

// Don't trigger TreeAggregation when it doesn't save wall-clock time
while (numPartitions > scale + math.ceil(numPartitions.toDouble / scale)) {
numPartitions /= scale
val curNumPartitions = numPartitions
partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex {
(i, iter) => iter.map((i % curNumPartitions, _))
}.foldByKey(zeroValue, new HashPartitioner(curNumPartitions))(cleanCombOp).values
}
val copiedZeroValue = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
partiallyAggregated.fold(copiedZeroValue)(cleanCombOp)
}
}

var partiallyAggregated: RDD[U] = mapPartitions(it => Iterator(aggregatePartition(it)))
进行第一次聚合, 由 partiallyAggregated 的变量名也可以知道,
这是部分聚合的结果, 接下来还会进行多次聚合.

变量 depth 是合并树的深度 , 默认是 2 , 根据 depth 计算 scale, 得到需要循环的次数:

val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / depth)).toInt, 2)

接下来进入循环中, 每次会 combine 当前所在树层级的分区 Aggregate 结果, 并缩小下一层的分区数目 :

while (numPartitions > scale + math.ceil(numPartitions.toDouble / scale)) {
numPartitions /= scale
val curNumPartitions = numPartitions
partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex {
(i, iter) => iter.map((i % curNumPartitions, _))
}.foldByKey(zeroValue, new HashPartitioner(curNumPartitions))(cleanCombOp).values
}

最后在树的根结点, 得到最终的聚合结果 :

partiallyAggregated.fold(copiedZeroValue)(cleanCombOp)

fold() 中会触发 runJob().

区别

对比上面的 aggregate() 和 treeAggregate() 可以发现,
由于 treeAggregate() 可以在最终的 combine 前, 进行一定次数的提前合并,
因此可以避免所有初始分区的结果直接发送的 Driver 机器上.

如下图所示 (图片来源) :

参考

https://umbertogriffo.gitbook.io/apache-spark-best-practices-and-tuning/rdd/treereduce_and_treeaggregate_demystified