GraphX 图并行计算 (三)

介绍完前面的图计算, 终于到了 GraphX, 那么它就如何实现的呢 ?

Graph 对象

假设有这么一个图, 描述的是大学的人际关系:

vertexTable 的数据我们用一个 RDD vertices 来保存, edgeTable 的数据我们同样也用一个 RDD edges 来保存, 这两个 RDD 可以从外部存储(如HDFS)加载.
有了这两个 RDD, 我们就可以构造一个 Graph 对象 :

val graph = Graph(vertices, edges)

这个 Graph 就是 GraphX 用来描述图的类, 定义如下:

class Graph[VD, ED] {
val vertices: VertexRDD[VD]
val edges: EdgeRDD[ED]
val triplets: RDD[EdgeTriplet[VD, ED]]
...
}

可以看到, 除了 vertices 和 edges, 还有一个 triplets, 这个 triplet 可以理解为是携带了 Vertex 数据的 Edge, 如下图:

所以在源码里 triplets 的实现定义是 lazy val triplets ... (GraphImpl#triplets), 用户只需提供 vertices 和 edges 即可.

需要说一下, GraphX 使用的是 VertexCut, 支持自定义 Partitioner.

Graph Operators

有个 graph 对象, 那就可以对这个图进行各种操作了.
graph 的 operators 支持这些:
https://spark.apache.org/docs/2.2.2/graphx-programming-guide.html#summary-list-of-operators

比较核心的是 aggregateMessages() :

def aggregateMessages[Msg: ClassTag](
sendMsg: EdgeContext[VD, ED, Msg] => Unit,
mergeMsg: (Msg, Msg) => Msg,
tripletFields: TripletFields = TripletFields.All)
: VertexRDD[A]

通过这个函数我们可以自定义 edge 的 sendMsg 函数和目标顶点的 mergeMsg 函数来聚合每个顶点的邻居顶点和边, 并返回得到一个 VertexRDD , 这个 VertexRDD 包括了发往各个顶点的聚合消息, 需要注意的是: 未收到消息的顶点不包括在其中.

还有一个比较重要的 API 就是 pregel().

def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)(
vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId,A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED]

Pregel 的算法我们之前已经介绍过了, 这里 GraphX 提供了一个 Pregel API, 可谓非常实用. 我们用单源最短路径 sssp 的计算例子来感受一下, 只需定义几个简单函数完成任务 :

val sssp = initialGraph.pregel(Double.PositiveInfinity)(
(id, dist, newDist) => math.min(dist, newDist), // Vertex Program
triplet => { // Send Message
if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
} else {
Iterator.empty
}
},
(a,b) => math.min(a,b) // Merge Message
)
sssp.vertices.collect...

首先 Double.PositiveInfinity 作为初始信息 defaultMessage 传入各个顶点, 第一次迭代时, vprog 会在所有顶点上执行并以 defaultMessage 作为输入信息, 之后只会在受到信息的顶点上执行并计算新的顶点值.
接着这些收到信息的顶点(此时当前迭代作为源顶点), 其出度边的目标顶点, 将会应用 sendMsg 自定义函数计算来消息结果, 如此不断迭代直到图的顶点没有消息可以发送.
GraphX 多了一个 mergeMsg 自定义函数, 该函数主要用来合并两条消息, 当消息较多时, 提前合并可以提高性能.

因此我们计算 sssp, vprog 的业务逻辑是 math.min(dist, newDist) 取当前迭代中累积值较小的路径, sendMsg 的业务逻辑则是 如果源结点的路径累积值+其出度边的路径值<目标结点的累积值 (triplet.srcAttr + triplet.attr < triplet.dstAttr), 那么需要接收这个消息并进行计算出新的路径值发送到目标结点. mergeMsg 的业务逻辑则是 取路径累积值较小的消息即可( math.min(a,b) ).

这样, 我们就实现了 sssp 计算, 是不是很简单, 作为使用者, 主要编写 vprogsendMsg 里业务逻辑 (对应 Pregel 算法框架的 vertex_func 和 sendmessage 函数), 就可以实现复杂的算法.

Pregel API 实现

GraphX 的 pregel() API 很实用, 然而打开其源码会发现, 底层是基于 RDD 和 mapreduce 实现的, 更象是 GAS 而非 Pregel.

org.apache.spark.graphx.Pregel#apply :

def apply[VD: ClassTag, ED: ClassTag, A: ClassTag]
(graph: Graph[VD, ED],
initialMsg: A,
maxIterations: Int = Int.MaxValue,
activeDirection: EdgeDirection = EdgeDirection.Either)
(vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED] =
{
require(maxIterations > 0, s"Maximum number of iterations must be greater than 0," +
s" but got ${maxIterations}")

var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache()
// compute the messages
var messages = GraphXUtils.mapReduceTriplets(g, sendMsg, mergeMsg)
var activeMessages = messages.count()
// Loop
var prevG: Graph[VD, ED] = null
var i = 0
while (activeMessages > 0 && i < maxIterations) {
// Receive the messages and update the vertices.
prevG = g
g = g.joinVertices(messages)(vprog).cache()

val oldMessages = messages
// Send new messages, skipping edges where neither side received a message. We must cache
// messages so it can be materialized on the next line, allowing us to uncache the previous
// iteration.
messages = GraphXUtils.mapReduceTriplets(
g, sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()
// The call to count() materializes `messages` and the vertices of `g`. This hides oldMessages
// (depended on by the vertices of g) and the vertices of prevG (depended on by oldMessages
// and the vertices of g).
activeMessages = messages.count()

logInfo("Pregel finished iteration " + i)

// Unpersist the RDDs hidden by newly-materialized RDDs
oldMessages.unpersist(blocking = false)
prevG.unpersistVertices(blocking = false)
prevG.edges.unpersist(blocking = false)
// count the iteration
i += 1
}
messages.unpersist(blocking = false)
g
} // end of apply

流程简介:

  • 第一次 GraphXUtils.mapReduceTriplets 计算返回所有顶点的目标顶点接收 initialMsg 后的结果(返回VertexRDD) , 其底层也是调用前面提到的 aggregateMessages 实现.
  • 这个返回的 message (VertexRDD) 将会和 g 进行join, 然后调用 vprog 作为 map function 进行转换得到新的图 g'. (g.joinVertices(messages)(vprog))
  • 接着不断迭代这个计算过程, 直到活跃消息为0或达到最大迭代次数( activeMessages > 0 && i < maxIterations ).

Pregel + GAS

到这里, 我们也发现了, GraphX 的 Pregel 编程 API 是一个很有趣的东西.
GraphX 虽然提供了 Pregel 编程模型, 但我们也发现, 它的实现其实更像 GAS,或者说 GraphX 融合了 Pregel 和 GAS, 再以 triplet / mapreduce / RDD 实现了 spark 平台上的图计算.