• Flink系列之Flink中Broadcast和Counter整理和实战



    title: Flink系列


    六、Flink Broadcast 编程实战

    6.1 理论

    Flink 的批处理 和 Spark 的批处理,都支持两个非常好的特性: 广播变量 + 累加器

    广播变量允许编程人员在每台机器上保持1个只读的缓存变量,而不是传送变量的副本给tasks,广播变量创建后,它可以运行在集群中的任何function上,而不需要多次传递给集群节点。另外需要记住,不应该修改广播变量,这样才能确保每个节点获取到的值都是一致的

    一句话解释,可以理解为是一个公共的共享变量,我们可以把一个dataset 数据集广播出去,然后不同的task在节点上都能够获取到,这个数据在每个节点上只会存在一份。如果不使用broadcast,则在每个节点中的每个task中都需要拷贝一份dataset数据集,比较浪费内存(也就是一个节点中可能会存在多份dataset数据)。

    用法:

    // 1:初始化数据
    DataSet toBroadcast = env.fromElements(1, 2, 3)
    // 2:广播数据
    withBroadcastSet(toBroadcast, "broadcastSetName");
    // 3:获取数据
    Collection broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName");
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    注意:

    1:广播出去的变量存在于每个节点的内存中,所以这个数据集不能太大。因为广播出去的数据,会常驻内存,除非程序执行结束。
    
    2:广播变量在初始化广播出去以后不支持修改,这样才能保证每个节点的数据都是一致的。
    
    • 1
    • 2
    • 3

    6.2 案例

    package com.aa.flinkjava.broadcast;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.functions.RichMapFunction;
    import org.apache.flink.api.java.DataSet;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.operators.DataSource;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.configuration.Configuration;
    
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    
    /**
     * @Author AA
     * @Date 2022/2/24 19:37
     * @Project bigdatapre
     * @Package com.aa.flinkjava.broadcast
     * Flink BroadCast 测试
     * 在这里做一个join的连接实现
     */
    public class FlinkBroadCastDemo {
        public static void main(String[] args) throws Exception {
            //1、获取运行环境
            ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
    
            //2、造数据
            ArrayList<Tuple2<String,Integer>> list = new ArrayList<>();
            list.add(new Tuple2<>("zhangsan",20));
            list.add(new Tuple2<>("lisi",21));
            list.add(new Tuple2<>("wangwu",22));
    
            //3、读取造的数据
            DataSource<Tuple2<String, Integer>> dataSource = executionEnvironment.fromCollection(list);
            dataSource.print("dataSource : ");
    
            //4、帮tuple2转化为hashmap 。 map中的key是用户姓名,value是用户年龄
            // DataSet>的数据类型可以直接修饰强制转换。
            DataSet<HashMap<String, Integer>> toBroadcast = dataSource.map(new MapFunction<Tuple2<String, Integer>, HashMap<String, Integer>>() {
                @Override
                public HashMap<String, Integer> map(Tuple2<String, Integer> tuple2) throws Exception {
                    HashMap<String, Integer> hashMap = new HashMap<>();
                    hashMap.put(tuple2.f0,tuple2.f1);
                    return hashMap;
                }
            });
    
            //5、再造一份 join 使用的数据
            DataSource<String> data2 = executionEnvironment.fromElements("zhangsan", "lisi", "wangwu");
            data2.print("data2 : ");
    
            //6、执行广播数据的一些操作
            // 下面这个DataSet类型也是强制转换的的
            DataSet<String> result = data2.map(new RichMapFunction<String, String>() {
    
                List<HashMap<String, Integer>> broadCastMap = new ArrayList<HashMap<String, Integer>>();
    
                HashMap<String, Integer> allMap = new HashMap<String, Integer>();
                @Override
                public void open(Configuration parameters) throws Exception {
                    super.open(parameters);
                    this.broadCastMap = getRuntimeContext().getBroadcastVariable("bdMapName");
                    for (HashMap map : broadCastMap) {
                        allMap.putAll(map);
                    }
                }
    
                /**
                 * @param s  s是data2中间的一个一个的元素,其实就是"zhangsan", "lisi", "wangwu" 这些值
                 *           根据 name("zhangsan", "lisi", "wangwu") 去广播变量中匹配获取相应的年龄
                 * @return
                 * @throws Exception
                 */
                @Override
                public String map(String s) throws Exception {
                    Integer age = allMap.get(s);
                    return s + "," + age; //输出拼接的结果
                }
            }).withBroadcastSet(toBroadcast, "bdMapName");
    
            //7、打印输出
            result.print();
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87

    七、Flink Counter 编程实战

    7.1 理论

    ​ Accumulator 即累加器,与 Mapreduce Counter 的应用场景差不多,都能很好地观察 Task 在运行期间的数据变化。可以在 Flink job 任务中的算子函数中操作累加器,但是只能在任务执行结束之后才能获得累加器的最终结果。

    ​ Counter 是一个具体的累加器 (Accumulator) 实现:IntCounter, LongCounter 和 DoubleCounter

    用法:

    // 1、创建累加器
    private IntCounter numlines = new IntCounter();
    // 2、注册累加器
    getRuntimeContext().addAccumulator("num", this.numLines);
    // 3、使用累加器
    this.numlines.add(1);
    // 4、获取累加器的结果
    myJobExecutionResult.getAccumulatorResult("num")
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    7.2 案例

    package com.aa.flinkjava.counter;
    
    import org.apache.flink.api.common.JobExecutionResult;
    import org.apache.flink.api.common.accumulators.IntCounter;
    import org.apache.flink.api.common.functions.RichMapFunction;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.operators.DataSource;
    import org.apache.flink.api.java.operators.MapOperator;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.configuration.Configuration;
    
    import java.util.ArrayList;
    
    /**
     * @Author AA
     * @Date 2022/2/25 14:27
     * @Project bigdatapre
     * @Package com.aa.flinkjava.counter
     * Flink 累加器 示例
     * 统计输入数据源的流入数据的次数。
     */
    public class FlinkCounterDemo {
        public static void main(String[] args) throws Exception {
            //1、获取运行环境
            ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setParallelism(3);
    
            //2、读取造的数据
            DataSource<String> dataSource = executionEnvironment.fromElements("a", "b", "c", "d", "e", "f", "g", "h", "i", "j");
    
            //3、定义一点逻辑,给累加器放进去
            MapOperator<String, String> result = dataSource.map(new RichMapFunction<String, String>() {
    
                //3-1 创建累加器对象
                private IntCounter numlines = new IntCounter();
    
                @Override
                public void open(Configuration parameters) throws Exception {
                    super.open(parameters);
                    //3-2 需要注册累加器
                    /*
                    在逻辑上来说,相当于在这个 application应用内部定义了一个变量 num 用来做统计。
                    但是,物理上,其实这个 num 变量是由分散在所有 Task 内部的 numlines 组成的。
                    一个 num 包含了很多个 numlines。其实最终拿到的结果,就是把所有 Task 中的 numlines 加起来,就是 num 的值。
                     */
                    this.getRuntimeContext().addAccumulator("num", this.numlines);
                }
    
                @Override
                public String map(String s) throws Exception {
                    //另外注意,可能有小伙伴觉得可以在这里定义普通变量统计也行,
                    // 注意:若并行度为1,使用普通的累加求和也可以,但是设置多个并行度,则普通的累加求和结果就不准啦。
    
                    //每运行一次就 向累加器中 添加1
                    this.numlines.add(1);
                    return s; //这里没有做什么逻辑,就是给来的数据原样输出了。但是上面统计了累加次数了。
                }
            });
    
            //4、给结果输出出去
            result.writeAsText("D:\\flinkcount3");
    
            //5、执行
            JobExecutionResult jobExecutionResult = executionEnvironment.execute();
    
            //6、看看累加器的结果
            Integer num = jobExecutionResult.getAccumulatorResult("num");
            System.out.println("累加器的输出的结果是: " + num);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71


    声明:
            文章中代码及相关语句为自己根据相应理解编写,文章中出现的相关图片为自己实践中的截图和相关技术对应的图片,若有相关异议,请联系删除。感谢。转载请注明出处,感谢。


    By luoyepiaoxue2014

    B站: https://space.bilibili.com/1523287361 点击打开链接
    微博地址: http://weibo.com/luoyepiaoxue2014 点击打开链接

  • 相关阅读:
    mysql中的各种日志
    css 写带三角形的对话框,空心的三角形边框
    python基础
    C++ - set 和 map 的实现(下篇)- set 和 map 的迭代器实现
    .net6项目模板搭建教程
    《财富》500 强企业要求 curl 开源工具作者提供免费及时的支持;基于Chromium的Edge浏览器正在整合文本预测功能 | 开源日报
    【LeetCode】【剑指offer】【二维数组中的查找】
    Tomcat监控指标判断应用“死了“吗
    B2C在线教育商城--前后端分离部署
    Vue2+summernote 编辑器
  • 原文地址:https://blog.csdn.net/luoyepiaoxue2014/article/details/128079216