Graphx中的三角计数

在网络中计算节点的重要性有着比较多的应用, 比如找到一些大点进行信息传播或者找到一些异常点进行风险判定等。


有几种常用的方法:

  1. betweeness(界数):首先找到网络中的所有最短路径,然后对每个最短路径中的节点计数,值越大越重要。这种方法找到的是网络中“媒介”或者说“桥接”属性比较强的节点,但是有个关键的问题是时间复杂度太大。
  2. pagerank:还可以用pagerank计算网络中比较“权威”的点,或者其它的传播类算法应该也是可以的。

这里介绍一种基于三角计数的方法。网络中的三角形指的是三个节点中的任意两个节点都存在边,可以认为是弱化版的“团(Clique)”。如果一个节点能够频繁地和多个不同节点之间形成这种三角形结构,那么这个点的周围一定是比较稠密的,并且这个点也是其中有重要影响力的。以社交网络举例会比较好理解:三个人之间都存在好友关系的话基本可以认为是一个小群,然后一个人又和大量的不同人都在一个小群中的话,那TA一定是个社交达人了。



Graphx中已经封装好了现成的算法可以调用,Triangle-Counting的实现思路也很简单。具体来说有以下三步:

  1. 计算出每个节点的邻居节点集合
  2. 对于每个边计算出两个节点的交集,并将交集大小传递给两个节点
  3. 对于每个节点计算接收到的累加值并除以2(这个点在三角形有两条边)

The algorithm is relatively straightforward and can be computed in three steps:

  • Compute the set of neighbors for each vertex.
  • For each edge compute the intersection of the sets and send the count to both vertices.
  • Compute the sum at each vertex and divide by two since each triangle is counted twice.

Spark在执行这个算法之前会对网络进行一些标准化的处理,如果输入已经是标准化的话,那么可以直接调用runPreCanonicalized方法来减少开销

There are two implementations. The default TriangleCount.run implementation first removes self cycles and canonicalizes the graph to ensure that the following conditions hold:

  • There are no self edges
  • All edges are oriented (src is greater than dst) 确保两个点之间只有一条边
  • There are no duplicate edges

However, the canonicalization procedure is costly as it requires repartitioning the graph.

If the input data is already in "canonical form" with self cycles removed then the TriangleCount.runPreCanonicalized should be used instead. val canonicalGraph = graph.mapEdges(e => 1).removeSelfEdges().canonicalizeEdges() val counts = TriangleCount.runPreCanonicalized(canonicalGraph).vertices


源代码:

object TriangleCount {

  def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Int, ED] = {
    // Transform the edge data something cheap to shuffle and then canonicalize
    val canonicalGraph = graph.mapEdges(e => true).removeSelfEdges().convertToCanonicalEdges()
    // Get the triangle counts
    val counters = runPreCanonicalized(canonicalGraph).vertices
    // Join them bath with the original graph
    graph.outerJoinVertices(counters) { (vid, _, optCounter: Option[Int]) =>
      optCounter.getOrElse(0)
    }
  }


  def runPreCanonicalized[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Int, ED] = {
    // Construct set representations of the neighborhoods
    val nbrSets: VertexRDD[VertexSet] =
      graph.collectNeighborIds(EdgeDirection.Either).mapValues { (vid, nbrs) =>
        val set = new VertexSet(nbrs.length)
        var i = 0
        while (i < nbrs.length) {
          // prevent self cycle
          if (nbrs(i) != vid) {
            set.add(nbrs(i))
          }
          i += 1
        }
        set
      }

    // join the sets with the graph
    val setGraph: Graph[VertexSet, ED] = graph.outerJoinVertices(nbrSets) {
      (vid, _, optSet) => optSet.getOrElse(null)
    }

    // Edge function computes intersection of smaller vertex with larger vertex
    def edgeFunc(ctx: EdgeContext[VertexSet, ED, Int]): Unit = {
      val (smallSet, largeSet) = if (ctx.srcAttr.size < ctx.dstAttr.size) {
        (ctx.srcAttr, ctx.dstAttr)
      } else {
        (ctx.dstAttr, ctx.srcAttr)
      }
      val iter = smallSet.iterator
      var counter: Int = 0
      while (iter.hasNext) {
        val vid = iter.next()
        if (vid != ctx.srcId && vid != ctx.dstId && largeSet.contains(vid)) {
          counter += 1
        }
      }
      ctx.sendToSrc(counter)
      ctx.sendToDst(counter)
    }

    // compute the intersection along edges
    val counters: VertexRDD[Int] = setGraph.aggregateMessages(edgeFunc, _ + _)
    // Merge counters with the graph and divide by two since each triangle is counted twice
    graph.outerJoinVertices(counters) { (_, _, optCounter: Option[Int]) =>
      val dblCount = optCounter.getOrElse(0)
      // This algorithm double counts each triangle so the final count should be even
      require(dblCount % 2 == 0, "Triangle count resulted in an invalid number of triangles.")
      dblCount / 2
    }
  }
}
复制代码

关于clique的信息可以参考:codelibrary.tech/ml/communit…


原文链接: codelibrary.tech/scala/spark…

猜你喜欢

转载自juejin.im/post/7082522082134736926