• Spark 组件 GraphX、Streaming


    Spark 组件 GraphX、Streaming

    Spark GraphX 用于处理图和图并行计算。GraphX 将图的表达和操作嵌入到 Spark 的数据流 API 中,允许用户在图上执行高效的并行计算。GraphX 结合了图计算和数据流计算的功能,使得它能够处理复杂的数据分析任务。

    Spark Streaming 用于处理实时数据流。它允许开发者以微批处理(Micro-batch)的方式处理实时数据,提供了一个高层次的 API,可以轻松地将批处理操作应用于实时数据流

    一、Spark GraphX

    1.1 GraphX 的主要概念

    1. 顶点 (Vertex)
      • 图中的节点,表示实体或对象。每个顶点都有一个唯一的标识符(ID)和属性。
    2. 边 (Edge)
      • 图中的连接,表示顶点之间的关系。每条边连接两个顶点,并且也可以有属性。
    3. 图 (Graph)
      • 由顶点和边组成的结构。GraphX 使用 Graph 类来表示图,顶点和边的集合分别由 RDD[VertexId, VD]RDD[Edge[ED]] 表示,其中 VDED 是顶点和边的属性类型。
    4. Triplet
      • GraphX 中的 EdgeTriplet 代表一条边及其连接的两个顶点的信息,允许同时访问顶点和边的属性。

    1.2 GraphX 的核心操作

    1. 图构造 (Graph Construction)
      • 通过顶点和边的 RDD 来构建图。例如,使用 Graph(vertices, edges) 构造一个图。
    2. 图转换 (Graph Transformation)
      • 对图进行操作,例如过滤顶点和边 (subgraph),或将顶点和边的属性映射到新属性 (mapVerticesmapEdges)。
    3. 聚合消息 (Aggregate Messages)
      • 用于从邻接顶点或边聚合信息。这在实现图算法(如 PageRank)时特别有用。
    4. 图算法 (Graph Algorithms)
      • GraphX 提供了一些预定义的图算法,如 PageRank、Connected Components、Shortest Paths 和 Triangle Counting。

    1.3 示例代码

    1.3.1 简单的 GraphX 示例,创建一个图并运行 PageRank 算法:
    import org.apache.spark.graphx._
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.SparkSession
    
    object GraphXExample {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
          .setAppName("graphx example")
          .setMaster("local[*]")
        
        // 创建 SparkSession
        val spark = SparkSession
          .builder
          .config(conf)
          .getOrCreate()
    
        // 创建顶点 RDD
        val vertices: RDD[(VertexId, String)] = spark.sparkContext.parallelize(Seq(
          (1L, "Alice"),
          (2L, "Bob"),
          (3L, "Charlie"),
          (4L, "David")
        ))
    
        // 创建边 RDD
        val edges: RDD[Edge[Int]] = spark.sparkContext.parallelize(Seq(
          Edge(1L, 2L, 1),
          Edge(2L, 3L, 1),
          Edge(3L, 4L, 1),
          Edge(4L, 1L, 1)
        ))
    
        // 创建图
        val graph = Graph(vertices, edges)
    
        // 运行 PageRank 算法
        // PageRank 算法最初是用于搜索引擎中对网页进行排序的基础算法,通过分析网络中的链接结构来评估每个网页的相对重要性。
        // 这里0.0001为收敛阈值
        val ranks = graph.pageRank(0.0001).vertices
    
        // 打印结果
        ranks.collect().foreach { case (id, rank) => println(s"Vertex $id has rank $rank") }
    
        // 停止 SparkSession
        spark.stop()
      }
    }
    
    
    1.3.2 Spark GraphX 的一些基本操作和概念
    // 创建图
    val graph = Graph(vertices, edges)
    
    // vertices:获取图中的所有顶点。
    graph.vertices.collect.foreach(println)
    
    // mapVertices:对顶点的属性进行变换。
    val newGraph = graph.mapVertices((id, attr) => attr.toUpperCase)
    
    // edges:获取图中的所有边。
    graph.edges.collect.foreach(println)
    
    // mapEdges:对边的属性进行变换。
    val newGraph = graph.mapEdges(e => e.attr * 2)
    
    // 查看所有的边(将点带入)【完整】
    graph.triplets.foreach(println)
    // 入度:指向自己的个数
    graph.inDegrees.foreach(println) // (人id,入度)
    
    // 出度:指向别人的个数
    graph.outDegrees.foreach(println) // (人id,出度)
    
    // 将入度和出度一起显示
    graph
        .inDegrees
        .join(graph.outDegrees)
        .foreach(println) // (人id,(入度,出度))
    
    // 度:入度+出度
    graph.degrees.foreach(println)
    
    // subgraph:根据条件创建一个子图,保留满足条件的顶点和边。
    val subGraph = graph.subgraph(vpred = (id, attr) => attr != "Bob")
    
    // joinVertices:将图的顶点属性与一个新的 RDD 进行连接,并更新顶点属性。
    val newAttrs: RDD[(Long, String)] = sc.parallelize(Seq(
      (1L, "Alice_new"),
      (4L, "David_new")
    ))
    
    val joinedGraph = graph.joinVertices(newAttrs) {
      case (id, oldAttr, newAttr) => newAttr
    }
    
    
    1.3.3 图算法
    // PageRank 算法用于计算图中每个顶点的重要性。
    val ranks = graph.pageRank(0.0001).vertices
    ranks.collect.foreach(println)
    
    // 连接组件: 连通组件算法用于查找图中的连通子图。
    val connectedComponents = graph.connectedComponents().vertices
    connectedComponents.collect.foreach(println)
    
    // 三角计数: 三角计数算法用于计算每个顶点所属的三角形数量。
    val triangleCounts = graph.triangleCount().vertices
    triangleCounts.collect.foreach(println)
    
    // 图的持久化与加载: 图可以通过将顶点和边的 RDD 存储在 HDFS 或其他文件系统中进行持久化。
    // 保存图的顶点和边
    graph.vertices.saveAsTextFile("hdfs://path/to/vertices")
    graph.edges.saveAsTextFile("hdfs://path/to/edges")
    
    // 从文件中加载图
    val loadedVertices: RDD[(Long, String)] = sc.textFile("hdfs://path/to/vertices").map { line =>
      val parts = line.split(",")
      (parts(0).toLong, parts(1))
    }
    
    val loadedEdges: RDD[Edge[Int]] = sc.textFile("hdfs://path/to/edges").map { line =>
      val parts = line.split(",")
      Edge(parts(0).toLong, parts(1).toLong, parts(2).toInt)
    }
    
    val loadedGraph: Graph[String, Int] = Graph(loadedVertices, loadedEdges)
    
    1.3.4 Pregel 模型

    Pregel 算法是一种迭代、消息传递的计算模型,特别适用于处理图的遍历和递归问题,如最短路径计算、PageRank、连通组件检测等。
    Pregel 的核心思想是将图计算任务表示为一个超级步(superstep)序列,每个超级步由以下几个阶段组成:

    1. 消息传递:每个顶点可以向相邻的顶点发送消息。
    2. 消息处理:每个顶点接收来自相邻顶点的消息,并更新自己的状态。
    3. 顶点计算:顶点在处理完消息后可以决定是否继续活跃或停止计算(halt)。

    这个过程会不断迭代,直到所有顶点都停止计算或达到指定的迭代次数。

    // Pregel 模型: 
    val initialMsg = ... // 定义初始消息
    val maxIterations = 10 // 最大迭代次数
    
    val resultGraph = graph.pregel(initialMsg, maxIterations)(
      // 顶点程序,处理接收到的消息并更新顶点属性
      vprog = (id, attr, msg) => {
        // 根据顶点的属性和收到的消息,更新顶点属性
      },
      // 发送消息,定义如何从一个顶点向相邻的顶点发送消息
      sendMsg = triplet => {
        // 根据边的属性和顶点的状态,决定发送的消息
      },
      // 聚合消息,定义如何合并一个顶点接收到的所有消息
      mergeMsg = (msg1, msg2) => {
        // 合并接收到的多个消息
      }
    )
    

    使用 Pregel 模型可以计算图中某个起点到其他所有顶点的最短路径。

    import org.apache.spark.SparkConf
    import org.apache.spark.graphx._
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.SparkSession
    
    object GraphxTest {
      def main(args: Array[String]): Unit = {
    
        val conf = new SparkConf()
          .setAppName("graphx example")
          .setMaster("local[*]")
    
        // 创建 SparkSession
        val spark = SparkSession
          .builder
          .config(conf)
          .getOrCreate()
    
        val vertices: RDD[(VertexId, String)] = spark.sparkContext.parallelize(Seq(
          (1L, "Alice"),
          (2L, "Bob"),
          (3L, "Charlie"),
          (4L, "David")
        ))
    
        // 创建边 RDD
        val edges: RDD[Edge[Int]] = spark.sparkContext.parallelize(Seq(
          Edge(1L, 2L, 1),
          Edge(2L, 3L, 1),
          Edge(2L, 4L, 5),
          Edge(3L, 4L, 2),
          Edge(4L, 1L, 1)
        ))
    
        // 创建图
        val graph: Graph[String, PartitionID] = Graph(vertices, edges)
    
        // 设置源顶点 ID
        val sourceId: VertexId = 1L
    
        // 初始化图的顶点属性,将源顶点的距离设为 0.0,其他顶点设为无穷大
        val initialGraph = graph.mapVertices((id, name) =>
          if (id == sourceId) (name, 0.0) else (name, Double.PositiveInfinity)
        )
    
        // 运行 Pregel 算法计算单源最短路径 (SSSP)
        val sssp = initialGraph.pregel(Double.PositiveInfinity)(
          // 顶点程序:更新顶点属性(即距离)为当前距离与新接收到的距离中的较小值
          vprog = (id, attr, msg) => (attr._1, Math.min(attr._2, msg)),
    
          // 消息发送函数:计算从源顶点到目标顶点的距离,如果新计算的距离小于目标顶点当前的距离,则发送消息
          sendMsg = triplet => {
            if (triplet.srcAttr._2 + triplet.attr < triplet.dstAttr._2) {
              Iterator((triplet.dstId, triplet.srcAttr._2 + triplet.attr))
            } else {
              Iterator.empty
            }
          },
          // 消息合并函数:在目标顶点收到多个消息时,取最短的距离
          mergeMsg = (a, b) => math.min(a, b)
        )
    
        // 打印最终的最短路径结果
        sssp.vertices.collect.foreach { case (id, (name, dist)) =>
          println(s"Distance from $sourceId to $id ($name) is $dist")
        }
    
        // 停止 SparkSession
        spark.stop()
      }
    }
    
    

    1.4 GraphX 的应用场景

    • 社交网络分析:分析用户之间的关系,如好友推荐、社交影响力分析。
    • 路径分析:计算最短路径、页面排名等。
    • 社区检测:识别图中连接紧密的子图或社区。
    • 网络优化:通过图分析网络拓扑结构,优化数据流路由等。

    二、Spark Streaming

    2.1 Spark Streaming 的主要概念

    Spark Streaming 提供了一个强大且易用的 API,使开发者能够轻松地构建实时数据处理应用,特别适合需要低延迟、高吞吐量的场景。

    1. DStream (Discretized Stream)
      • DStream 是 Spark Streaming 中的核心抽象,表示一个连续的数据流。它可以被视为一系列 RDD(Resilient Distributed Datasets)的集合,每个 RDD 都包含某个时间间隔内的数据。
      • DStream 可以从各种输入源(如 Kafka、Flume、TCP 套接字等)创建,也可以通过对现有 DStream 的转换来创建。
    2. Transformations (转换操作)
      • 类似于 Spark 的 RDD 转换操作,DStream 支持各种转换操作,如 mapfilterreduceByKey 等。每个转换操作都会应用于 DStream 中的每个 RDD,生成一个新的 DStream。
    3. Output Operations (输出操作)
      • DStream 提供多种输出操作,将处理后的数据输出到外部系统。例如,printsaveAsTextFilessaveAsHadoopFilesforeachRDD 等。
    4. Window Operations (窗口操作)
      • Spark Streaming 支持窗口操作,允许对一段时间范围内的数据进行聚合。例如,可以计算过去 10 秒钟内的每 2 秒的数据。

    2.2 示例代码

    一个简单的 Spark Streaming 示例,读取 TCP 套接字数据,并进行词频统计:

    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    object SparkStream {
      def main(args: Array[String]): Unit = {
        // 创建 Spark 配置对象,并设置应用名称和运行模式
        val conf = new SparkConf()
          .setMaster("local[*]")  // 使用本地模式运行,[*] 表示使用所有可用的 CPU 核心
          .setAppName("spark-streaming")  // 设置应用程序名称
    
        // 创建 StreamingContext,设置批处理间隔为 5 秒
        val scc = new StreamingContext(conf, Seconds(5))
    
        // 监听本地端口 9999,接收流式数据
        scc.socketTextStream("localhost", 9999)
          .mapPartitions(
            _.flatMap(
              _.replaceAll("[^a-zA-Z ]+", "")  
                .split("\\s+") 
                .map((_, 1))
            )
          )
          // 按单词进行归约,计算每个单词的出现次数
          .reduceByKey(_ + _)
          // 打印结果到控制台
          .print()
    
        // 启动 Spark Streaming 计算
        scc.start()
        // 等待应用程序终止
        scc.awaitTermination()
      }
    }
    

    2.3 Spark Streaming 的集成

    • 与 Kafka 集成:Spark Streaming 可以从 Kafka 中读取数据流,用于实时日志处理、监控等场景。
    • 与 Flume 集成:结合 Flume 进行分布式日志收集,然后使用 Spark Streaming 实时处理和分析日志。
    • 与 HDFS、S3 等集成:将处理后的数据输出到 HDFS、S3 等分布式文件系统进行持久化存储。
    • 与 SQL 和 MLlib 集成:Spark Streaming 可以与 Spark SQL 和 MLlib 集成,进行实时的数据分析和机器学习任务。

    2.4 Spark Streaming 的应用场景

    • 实时日志分析:监控服务器日志、应用日志,检测异常情况。
    • 实时 ETL (Extract, Transform, Load):对流式数据进行清洗、转换,并写入到数据仓库。
    • 实时监控与报警:对实时数据流进行分析,当检测到特定条件时触发报警。
    • 在线推荐系统:基于实时用户行为数据进行推荐,例如在线广告推荐。
  • 相关阅读:
    ViTPose+:迈向通用身体姿态估计的视觉Transformer基础模型
    Java方法重载
    大容量中间继电器 RXMH2 RK223 067 DC110V JOSEF约瑟
    排序——交换排序
    GSCoolink GSV6182 带嵌入式MCU的MIPI D-PHY转HDMI 2.0
    【SwiftUI项目】0009、SwiftUI项目-费用跟踪-记账App项项目-第1/3部分 - 本地数据
    mac废纸篓的如何还原?
    成都瀚网科技有限公司:抖店怎么开通直播?
    鸿蒙开发系列教程(二十三)--List 列表操作(2)
    Hive集群高可用配置与impala集群高可用配置
  • 原文地址:https://blog.csdn.net/weixin_74292291/article/details/141096718