• 【大数据分析】基于Graphx的shortestpath源码分析


    简介

    Graphx 集成了shortestpath 最短路径算法,具体采用的是迪杰斯特拉算法,引用库为:org.apache.spark.graphx.lib.ShortestPaths。该算法用于计算图中所有的到目标点(点集)的距离。

    shortestpath的大致使用方法

    val landmarks = Seq(1, 4).map(_.toLong)
    val results = ShortestPaths.run(graph, landmarks)
    
    • 1
    • 2

    根据上面的代码可知:
    (1)landmarks 是需要被计算与 graph 所有节点距离的节点(作为目标点)。
    (2)landmarks 里的节点实际上也是 graph 中的节点,只是节点 id 被抽了出来。
    (3)Seq 说明 landmarks 是一个集合。

    算法解析

    Graphx图数据处理基础:Pregel

    Pregel是Google提出的大规模分布式图计算平台,专门用来解决网页链接分析、社交数据挖掘等实际应用中涉及的大规模分布式图计算问题。目前的图计算模型基本上都遵循BSP计算模式。

    (1)消息传递。基于Pregel实现的图算法基本上都通过节点与节点之间传递消息来实现。总体上可分为初始化阶段发送消息前发送消息后(接收消息)三个阶段。
    (2)节点状态。休眠(inactive)和激活(active),每次Pregel仅会针对处于激活态的节点进行消息传递。
    (3)算法的每次迭代都是针对所有“激活”的数据进行处理。

    ShortestPaths流程简述

    在这里插入图片描述
    假设有一份图数据 G,它的节点为 V ,目标节点集合为 LANDMARKS,如上图所示,LANDMARKS={1,4}
    针对图算法的三个阶段,ShortestPaths的实现过程如下所示:
    一、初始化阶段:
    (1)为每一个V赋予一个 Map,来存储它与节点 T(LANDMARKS中的节点)的距离,如果该节点也存在在LANDMARKS中,则初始值为0,否则赋予一个空Map。
    (2)将G中的点全部激活(上图为红色),然后所有的 v 会同时以自身作为出发点,探索整个G,来填充自己的 Map。

    二、一次迭代内的内容
    (1)针对所有处于激活态的节点做处理,实际上是针对所有S或者T处于激活状态的 S->T (点-边->点)元组。
    (2)对于每个S->T元组,先获取T中的Map(简写为T(Map)),将 T(Map)中的value+1后与S中的Map(简写为S(Map))做一次Merge,如果Merge的结果与S原本的结果相等,则将T(Map)作为消息传递给S。否则,则不发生消息传递
    (3)发生消息传递,S收到消息,将消息中的内容与S(Map)做一次Merge。结果作为新的S(Map),此时收到消息的S处于激活态。下一次迭代会将与S相关的三元组进行(2)
    (4)不发生消息传递,该S->T元组由于不发生消息传递,进入休眠。如果某一个节点B处于A->B->C的元组关系中,而A->B元组发生了消息传递,而B->C没有,则B依然处于激活状态。
    (5)重复(1),(2),(3),(4)直到没有处于激活态的节点。

    ShortestPath源码

    package com.edata.bigdata.algorithm.networks
    
    import org.apache.spark.graphx.{EdgeTriplet, Graph, Pregel, VertexId}
    
    import scala.reflect.ClassTag
    
    /**
     * @author: Alan Sword
     * @description: Compute node connectivity between all pairs(or partly) of nodes.
     */
    
    object ShortestPath extends Serializable {
    
      /**
       * @description: A Map definition that used to create the Map attributes for each vertex
       */
      type APNCMap = Map[VertexId, Int]
    
      /**
       * @param x : element that used to create a APNCMap
       * @description: update the apncmap by adding 1 to each element
       * @return : APNCMap type object
       */
      private def makeAPNCMap(x: (VertexId, Int)*) = Map(x: _*)
    
      /**
       * @param apncmap :the map attributes that needed to update
       * @description: update the apncmap by adding 1 to each element
       * @return apncmap
       */
      private def updateAPNCMap(apncmap: APNCMap): APNCMap = apncmap.map { case (v, d) => v -> (d + 1) }
    
      /**
       * @param apncmap1 :the first APNCMap that needed to merge
       * @param apncmap2 :the second APNCMap that needed to merge
       * @description: merge two key set,and then merge two APNCMap by choosing the smaller of two elements with the same key
       * @return
       */
      private def mergeAPNCMap(apncmap1: APNCMap, apncmap2: APNCMap): APNCMap = {
        (apncmap1.keySet ++ apncmap2.keySet).map {
          k => k -> math.min(apncmap1.getOrElse(k, Int.MaxValue), apncmap2.getOrElse(k, Int.MaxValue))
        }(collection.breakOut)
      }
    
      /**
       * @param id   : vertex id
       * @param attr : the APNCMap's attributes of vertex
       * @param msg  : the message received by the vertex
       * @description: this function will be called when a vertex receive a message, and it will merge the vertex's original attributes and APNCMap-type message
       * @return
       */
      def vertexProgram(id: VertexId, attr: APNCMap, msg: APNCMap): APNCMap = {
        mergeAPNCMap(attr, msg)
      }
    
      /**
       * @param edge : the triple ( S->T ) in graph
       * @description: call updateAPNCMap with T's attributes as the argument,and then call mergeAPNCMap with its result and S's attrubutes as arguments
       * @return
       */
      def sendMessage(edge: EdgeTriplet[APNCMap, _]): Iterator[(VertexId, APNCMap)] = {
        val newAttr = updateAPNCMap(edge.dstAttr)
        if (edge.srcAttr != mergeAPNCMap(newAttr, edge.srcAttr))
          Iterator((edge.srcId, newAttr))
        else
          Iterator.empty
      }
    
      /**
       * @param graph     : All the vertexs in graph will be taken as the starting vertexs
       * @param landmarks : All the vertexs in landmarks will be taken as the target vertexs
       * @tparam VD :the type of vertex's attributes
       * @tparam ED :the type of edge's attributes
       * @description: The main running method,including several steps:
       *               1.Initialization,initialize the APNCMap attributes for each vertex & activate all the vertex.
       *               2.Sending Message,for each triple that contain active vertex,determine whether a message needs to be send,if not,inactivate the related vertex.
       *               3.Receiving Message,recieve message & active the related vertex.
       * @return
       */
      def run[VD, ED: ClassTag](graph: Graph[VD, ED], landmarks: Seq[VertexId]): Graph[APNCMap, ED] = {
        //initialization,initialize the APNCMap attributes for each vertex & active all the vertex
        val APNCGraph = graph.mapVertices { (vid, attr) =>
          if (landmarks.contains(vid)) makeAPNCMap(vid -> 0) else makeAPNCMap()
        }
        // all the vertex will receive this message, and be activated
        val initialMessage = makeAPNCMap()
        //for each triple that contain active vertex,determine whether a message needs to be send,if not,inactivate the related vertex
        //recieve message & active the related vertex
        Pregel(APNCGraph, initialMessage)(vertexProgram, sendMessage, mergeAPNCMap)
      }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
  • 相关阅读:
    30、三维表面重建-Convolutional Occupancy Network
    Spring 注解的 整理+理解
    Python数据攻略-Pandas和NumPy的基础函数方法
    [附源码]计算机毕业设计JAVAjsp网上蛋糕订购系统
    Hadoop3教程(二十八):(生产调优篇)NN、DN的多目录配置及磁盘间数据均衡
    计算机网络第四层 运输层
    【MyBatis篇】MyBatis框架基础知识笔记
    测试工程师应具备何种心态?
    轻量级3D STEP STL 格式处理 SDK 之CAD DLL C++ Crack
    DocuWare 文档管理系统Intelligent Indexing(智能索引)、 Forms(表单)和连接到Outlook 功能
  • 原文地址:https://blog.csdn.net/sword_csdn/article/details/126461322