• 【Flink实战】玩转Flink里面核心的Source Operator实战


    🚀 作者 :“大数据小禅”

    🚀 文章简介 :【Flink实战】玩转Flink里面核心的Source Operator实战

    🚀 欢迎小伙伴们 点赞👍、收藏⭐、留言💬


    Flink 的API层级介绍Source Operator速览

    • Flink的API层级 为流式/批式处理应用程序的开发提供了不同级别的抽象

      • 第一层是最底层的抽象为有状态实时流处理,抽象实现是 Process Function,用于底层处理

      • 第二层抽象是 Core APIs,许多应用程序不需要使用到上述最底层抽象的 API,而是使用 Core APIs 进行开发

        • 例如各种形式的用户自定义转换(transformations)、联接(joins)、聚合(aggregations)、窗口(windows)和状态(state)操作等,此层 API 中处理的数据类型在每种编程语言中都有其对应的类。
      • 第三层抽象是 Table API。 是以表Table为中心的声明式编程API,Table API 使用起来很简洁但是表达能力差

        • 类似数据库中关系模型中的操作,比如 select、project、join、group-by 和 aggregate 等
        • 允许用户在编写应用程序时将 Table API 与 DataStream/DataSet API 混合使用
      • 第四层最顶层抽象是 SQL,这层程序表达式上都类似于 Table API,但是其程序实现都是 SQL 查询表达式

        • SQL 抽象与 Table API 抽象之间的关联是非常紧密的
      • 注意:Table和SQL层变动多,还在持续发展中,大致知道即可,核心是第一和第二层
        在这里插入图片描述

    • Flink编程模型

    在这里插入图片描述

    • Source来源

      • 元素集合

        • env.fromElements
        • env.fromColletion
        • env.fromSequence(start,end);
      • 文件/文件系统

        • env.readTextFile(本地文件);
        • env.readTextFile(HDFS文件);
      • 基于Socket

        • env.socketTextStream(“ip”, 8888)
      • 自定义Source,实现接口自定义数据源,rich相关的api更丰富

        • 并行度为1

          • SourceFunction
          • RichSourceFunction
        • 并行度大于1

          • ParallelSourceFunction
          • RichParallelSourceFunction
    • Connectors与第三方系统进行对接(用于source或者sink都可以)

      • Flink本身提供Connector例如kafka、RabbitMQ、ES等
      • 注意:Flink程序打包一定要将相应的connetor相关类打包进去,不然就会失败
    • Apache Bahir连接器

      • 里面也有kafka、RabbitMQ、ES的连接器更多
    • 总结 和外部系统进行读取写入的

      • 第一种 Flink 里面预定义的 source 和 sink。
      • 第二种 Flink 内部也提供部分 Boundled connectors。
      • 第三种是第三方 Apache Bahir 项目中的连接器。
      • 第四种是通过异步 IO 方式
        • 异步I/O是Flink提供的非常底层的与外部系统交互

    Flink 预定义的Source 数据源 案例实战

    • Source来源
      • 元素集合
        • env.fromElements
        • env.fromColletion
        • env.fromSequence(start,end);
     public static void main(String [] args) throws Exception {
    
            //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            //相同类型元素的数据流 source
            DataStream<String> stringDS1 = env.fromElements("java,SpringBoot", "spring cloud,redis", "kafka,小滴课堂");
            stringDS1.print("stringDS1");
    
            DataStream<String> stringDS2 = env.fromCollection(Arrays.asList("微服务项目大课,java","alibabacloud,rabbitmq","hadoop,hbase"));
            stringDS2.print("stringDS2");
    
            DataStreamSource<Long> longDS3 = env.fromSequence(0,10);
            longDS3.print("longDS3");
    
            //DataStream需要调用execute,可以取个名称
            env.execute("xdclass job");
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 文件/文件系统
      • env.readTextFile(本地文件);
      • env.readTextFile(HDFS文件);
    public static void main(String [] args) throws Exception {
    
            //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            DataStream<String> textDS = env.readTextFile("/Users/xdclass/Desktop/xdclass_access.log");
            //DataStream textDS = env.readTextFile("hdfs://xdclass_node:8010/file/log/words.txt");
            textDS.print();
            env.execute("xdclass job");
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 基于Socket
      • env.socketTextStream(“ip”, 8888)
       public static void main(String [] args) throws Exception {
    
            //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            DataStream<String> stringDataStream = env.socketTextStream("127.0.0.1",8888);
            stringDataStream.print();
            env.execute(" job");
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    Flink自定义的Source 数据源案例-订单来源实战

    • 自定义Source,实现接口自定义数据源

      • 并行度为1

        • SourceFunction
        • RichSourceFunction
      • 并行度大于1

        • ParallelSourceFunction
        • RichParallelSourceFunction
      • Rich相关的api更丰富,多了Open、Close方法,用于初始化连接等

    • 创建接口

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public class VideoOrder {
        private String tradeNo;
        private String title;
        private int money;
        private int userId;
        private Date createTime;
    
    }
    
    
    public class VideoOrderSource extends RichParallelSourceFunction<VideoOrder> {
    
    	private volatile Boolean flag = true;
    
        private  Random random = new Random();
    
        private static List<String> list = new ArrayList<>();
        static {
            list.add("spring boot2.x课程");
            list.add("微服务SpringCloud课程");
            list.add("RabbitMQ消息队列");
            list.add("Kafka课程");
            list.add("Flink流式技术课程");
            list.add("工业级微服务项目大课训练营");
            list.add("Linux课程");
        }
    
        @Override
        public void run(SourceContext<VideoOrder> ctx) throws Exception {
            while (flag){
                Thread.sleep(1000);
                String id = UUID.randomUUID().toString();
                int userId = random.nextInt(10);
                int money = random.nextInt(100);
                int videoNum = random.nextInt(list.size());
                String title = list.get(videoNum);
                ctx.collect(new VideoOrder(id,title,money,userId,new Date()));
            }
        }
    
        /**
         * 取消任务
         */
        @Override
        public void cancel() {
            flag = false;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 案例
    public static void main(String [] args) throws Exception {
    
            //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStream<VideoOrder> videoOrderDataStream = env.addSource(new VideoOrderSource());
            videoOrderDataStream.print();
    
            //DataStream需要调用execute,可以取个名称
            env.execute("custom source job");
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    不断产生很多订单

    在这里插入图片描述

  • 相关阅读:
    wpf中的StaticResource和DynamicResource
    Centos7部署Python3环境
    DevOps|研发效能治理:进化史、规模化与治理复杂性
    输入输出错误重定向
    专家建议|8大措施加速你的创新职业规划和成长
    循序渐进介绍基于CommunityToolkit.Mvvm 和HandyControl的WPF应用端开发(8) -- 使用Converter类实现内容的转义处理
    转债列表筛选及与正股数据整合:qlib+fastapi
    Kube-OVN-安装配置参数选项
    实战型开发1/3--结果&业务导向
    Leetcode.486 预测赢家
  • 原文地址:https://blog.csdn.net/weixin_45574790/article/details/132854114