5.4. 图操作符3

相邻聚合(Neighborhood Aggregation)

图分析任务的一个关键步骤是汇总每个顶点附近的信息。例如我们可能想知道每个用户的追随者的数量或者每个用户的追随者的平均年龄。许多迭代图算法(如PageRank,最短路径和连通体) 多次聚合相邻顶点的属性。

为了提高性能,主要的聚合操作从graph.mapReduceTriplets改为了新的graph.AggregateMessages。虽然API的改变相对较小,但是我们仍然提供了过渡的指南。

聚合消息(aggregateMessages)

GraphX中的核心聚合操作是aggregateMessages。 这个操作将用户定义的sendMsg函数应用到图的每个边三元组(edge triplet),然后应用mergeMsg函数在其目的顶点聚合这些消息。

class Graph[VD, ED] {
  def aggregateMessages[Msg: ClassTag](
      sendMsg: EdgeContext[VD, ED, Msg] => Unit,
      mergeMsg: (Msg, Msg) => Msg,
      tripletFields: TripletFields = TripletFields.All)
    : VertexRDD[Msg]
}

用户自定义的sendMsg函数是一个EdgeContext类型。它暴露源和目的属性以及边缘属性 以及发送消息给源和目的属性的函数(sendToSrcsendToDst)。 可将sendMsg函数看做map-reduce过程中的map函数。用户自定义的mergeMsg函数指定两个消息到相同的顶点并保存为一个消息。可以将mergeMsg函数看做map-reduce过程中的reduce函数。 aggregateMessages 操作返回一个包含聚合消息(目的地为每个顶点)的VertexRDD[Msg]。没有接收到消息的顶点不包含在返回的VertexRDD中。

另外,aggregateMessages 有一个可选的tripletFields参数,它指出在EdgeContext中,哪些数据被访问(如源顶点特征而不是目的顶点特征)。tripletsFields可能的选项定义在TripletFields中。 tripletFields参数用来通知GraphX仅仅只需要EdgeContext的一部分允许GraphX选择一个优化的连接策略。例如,如果我们想计算每个用户的追随者的平均年龄,我们仅仅只需要源字段。 所以我们用TripletFields.Src表示我们仅仅只需要源字段。

在下面的例子中,我们用aggregateMessages操作计算每个用户更年长的追随者的年龄。

// Import random graph generation library
import org.apache.spark.graphx.util.GraphGenerators
// Create a graph with "age" as the vertex property.  Here we use a random graph for simplicity.
val graph: Graph[Double, Int] =
  GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble )
// Compute the number of older followers and their total age
val olderFollowers: VertexRDD[(Int, Double)] = graph.aggregateMessages[(Int, Double)](
  triplet => { // Map Function
    if (triplet.srcAttr > triplet.dstAttr) {
      // Send message to destination vertex containing counter and age
      triplet.sendToDst(1, triplet.srcAttr)
    }
  },
  // Add counter and age
  (a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function
)
// Divide total age by number of older followers to get average age of older followers
val avgAgeOfOlderFollowers: VertexRDD[Double] =
  olderFollowers.mapValues( (id, value) => value match { case (count, totalAge) => totalAge / count } )
// Display the results
avgAgeOfOlderFollowers.collect.foreach(println(_))

当消息(以及消息的总数)是常量大小(列表和连接替换为浮点数和添加)时,aggregateMessages操作的效果最好。

Map Reduce三元组过渡指南

在之前版本的GraphX中,利用[mapReduceTriplets]操作完成相邻聚合。

class Graph[VD, ED] {
  def mapReduceTriplets[Msg](
      map: EdgeTriplet[VD, ED] => Iterator[(VertexId, Msg)],
      reduce: (Msg, Msg) => Msg)
    : VertexRDD[Msg]
}

mapReduceTriplets操作在每个三元组上应用用户定义的map函数,然后保存用用户定义的reduce函数聚合的消息。然而,我们发现用户返回的迭代器是昂贵的,它抑制了我们添加额外优化(例如本地顶点的重新编号)的能力。 aggregateMessages 暴露三元组字段和函数显示的发送消息到源和目的顶点。并且,我们删除了字节码检测转而需要用户指明三元组的哪些字段实际需要。

下面的代码用到了mapReduceTriplets

val graph: Graph[Int, Float] = ...
def msgFun(triplet: Triplet[Int, Float]): Iterator[(Int, String)] = {
  Iterator((triplet.dstId, "Hi"))
}
def reduceFun(a: Int, b: Int): Int = a + b
val result = graph.mapReduceTriplets[String](msgFun, reduceFun)

下面的代码用到了aggregateMessages

val graph: Graph[Int, Float] = ...
def msgFun(triplet: EdgeContext[Int, Float, String]) {
  triplet.sendToDst("Hi")
}
def reduceFun(a: Int, b: Int): Int = a + b
val result = graph.aggregateMessages[String](msgFun, reduceFun)

计算度信息

最一般的聚合任务就是计算顶点的度,即每个顶点相邻边的数量。在有向图中,经常需要知道顶点的入度、出度以及总共的度。GraphOps 类包含一个操作集合用来计算每个顶点的度。例如,下面的例子计算最大的入度、出度和总度。

// Define a reduce operation to compute the highest degree vertex
def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
  if (a._2 > b._2) a else b
}
// Compute the max degrees
val maxInDegree: (VertexId, Int)  = graph.inDegrees.reduce(max)
val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(max)
val maxDegrees: (VertexId, Int)   = graph.degrees.reduce(max)

Collecting Neighbors

在某些情况下,通过收集每个顶点相邻的顶点及它们的属性来表达计算可能更容易。这可以通过collectNeighborIdscollectNeighbors操作来简单的完成

class GraphOps[VD, ED] {
  def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
  def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[ Array[(VertexId, VD)] ]
}

这些操作是非常昂贵的,因为它们需要重复的信息和大量的通信。如果可能,尽量用aggregateMessages操作直接表达相同的计算。

缓存和不缓存

在Spark中,RDDs默认是不缓存的。为了避免重复计算,当需要多次利用它们时,我们必须显示地缓存它们。GraphX中的图也有相同的方式。当利用到图多次时,确保首先访问Graph.cache()方法。

在迭代计算中,为了获得最佳的性能,不缓存可能是必须的。默认情况下,缓存的RDDs和图会一直保留在内存中直到因为内存压力迫使它们以LRU的顺序删除。对于迭代计算,先前的迭代的中间结果将填充到缓存 中。虽然它们最终会被删除,但是保存在内存中的不需要的数据将会减慢垃圾回收。只有中间结果不需要,不缓存它们是更高效的。这涉及到在每次迭代中物化一个图或者RDD而不缓存所有其它的数据集。 在将来的迭代中仅用物化的数据集。然而,因为图是由多个RDD组成的,正确的不持久化它们是困难的。对于迭代计算,我们建议使用Pregel API,它可以正确的不持久化中间结果。