• 4. 广播变量


    在这里插入图片描述

    一、分区规则(DataStream Broadcast)和广播变量(Flink Broadcast)

    1.1 DataStream Broadcast(分区规则)

    ​ 分区规则是把元素广播给所有的分区,数据会被重复处理。

    DataStream.broadcast()
    
    • 1

    1.2 Flink Broadcast(广播变量)

    ​ 类似于Spark广播变量,广播的数据是Dataset,接收广播的也是Dataset

    import org.apache.flink.api.common.functions.RichMapFunction
    import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
    import org.apache.flink.configuration.Configuration
    import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
    
    
    object BroadCastTest1 {
      def main(args: Array[String]): Unit = {
        val env = ExecutionEnvironment.getExecutionEnvironment  //创建批处理执行环境
    
        val broadcastData = List(("zs", 18), ("ls", 28), ("ww", 38)) // 创建要广播的dataset
        val tupleData = env.fromCollection(broadcastData)
        val toBroadcastData = tupleData.map(tup => {
          Map(tup._1->tup._2)
        })
    
    
        val text: DataSet[String] = env.fromElements("zs", "ls", "ww")  //创建接收广播的dataset
        val result = text.map(new RichMapFunction[String, String] {
          var listData: java.util.List[Map[String, Int]] = null
          var allMap = Map[String, Int]()
    
          override def open(parameters: Configuration): Unit = {
            this.listData = getRuntimeContext.getBroadcastVariable[Map[String, Int]]("bd")	//获取broadcast
    
            val it = listData.iterator()
    
            while(it.hasNext) {
              val next = it.next()
              allMap = allMap.++(next)
            }
          }
          override def map(value: String): String = {
            val age = allMap.getOrElse(value, 1)
            value + ", " + age
          }
        }).withBroadcastSet(toBroadcastData, "bd")
    
        result.print
    
        env.execute()
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
  • 相关阅读:
    Linux常用的压缩和解压命令gzip,gunzip,tar,zip, unzip和bzip2,bunzip2
    [Machine Learning] 多任务学习
    day17:SSM整合
    智谱API调用
    【mysql】游标的基本使用
    taro vue3 ts nut-ui 项目
    Shell编程之第一讲——基础知识认识
    数据库(一)
    mysqld_exporter监控MySQL服务
    将组件直接绑定到vue实例上面的写法
  • 原文地址:https://blog.csdn.net/weixin_43124279/article/details/132776105