• Flink入门系列02-编程基础


    编程模板

    1. 获取一个编程、执行入口环境env
    2. 通过数据源组件,加载、创建 datastream
    3. 对 datastream 调用各种算子表达计算逻辑
    4. 通过 sink 算子指定计算结果的输出方式
    5. 在 env 上触发程序提交运行

    创建代码模板

    使用 maven 命令
    mvn archetype:generate                \
    	-DarchetypeGroupId=org.apache.flink   \
    	-DarchetypeArtifactId=flink-quickstart-java \
    	-DarchetypeVersion=1.15.1
    
    • 1
    • 2
    • 3
    • 4
    使用 shell 命令
    curl https://flink.apache.org/q/quickstart.sh | bash -s 1.15.1
    
    • 1

    编程入口

     // 创建一个编程入口环境
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();   // 批处理的入口环境
    
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  // 流批一体的入口环境
    ExecutionConfig config = env.getConfig();
    config.setExecutionMode(ExecutionMode.BATCH); // 指定批 或 流处理
    
    // 显式声明为本地运行环境,且带webUI
    Configuration configuration = new Configuration();
    configuration.setInteger("rest.port", 12345);
    StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    并行概念

    1. 每一个算子都可以成为一个独立 task
    2. 上下游算子间数据分发规则、并行度、共享槽位设置,可组成算子链成为一个task
    3. 每个任务在运行时都可拥有多个并行的运行实例
    4. 每个算子任务的并行度都可以在代码中显式设置

    source 算子

    source是用来获取外部数据的算子,按照获取数据的方式,可分为:

    基于集合的 Source
    DataStreamSource<Integer> fromElements = env.fromElements(1, 2, 3, 4, 5);
    
    • 1
    基于 Socket 网络端口的 Source
    SingleOutputStreamOperator<String> source = env.socketTextStream("localhost", 9999)
    
    • 1
    基于文件的 Source
    DataStreamSource<String> fileSource = env.readTextFile("data/input/wc.txt", "utf-8");
    
    • 1
    第三方 Connector Source
    KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
           // 设置订阅的目标主题
           .setTopics("tp01")
           // 设置消费者组id
           .setGroupId("gp01")
           // 设置kafka服务器地址
           .setBootstrapServers("hdp01:9092")
           // 起始消费位移的指定:
           //    OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST) 消费起始位移选择之前所提交的偏移量(如果没有,则重置为LATEST)
           //    OffsetsInitializer.earliest()  消费起始位移直接选择为 “最早”
           //    OffsetsInitializer.latest()  消费起始位移直接选择为 “最新”
           //    OffsetsInitializer.offsets(Map)  消费起始位移选择为:方法所传入的每个分区和对应的起始偏移量
           .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
           // 设置value数据的反序列化器
           .setValueOnlyDeserializer(new SimpleStringSchema())
           // 开启kafka底层消费者的自动位移提交机制
           //    它会把最新的消费位移提交到kafka的consumer_offsets中
           //    就算把自动位移提交机制开启,KafkaSource依然不依赖自动位移提交机制
           //    (宕机重启时,优先从flink自己的状态中去获取偏移量<更可靠>)
           .setProperty("auto.offset.commit", "true")
           // 把本source算子设置成  BOUNDED属性(有界流)
           //     将来本source去读取数据的时候,读到指定的位置,就停止读取并退出
           //     常用于补数或者重跑某一段历史数据
           // .setBounded(OffsetsInitializer.committedOffsets())
    
           // 把本source算子设置成  UNBOUNDED属性(无界流)
           //     但是并不会一直读数据,而是达到指定位置就停止读取,但程序不退出
           //     主要应用场景:需要从kafka中读取某一段固定长度的数据,然后拿着这段数据去跟另外一个真正的无界流联合处理
           //.setUnbounded(OffsetsInitializer.latest())
           .build();
    
    // env.addSource();  //  接收的是  SourceFunction接口的 实现类
    DataStreamSource<String> streamSource = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kfk-source");//  接收的是 Source 接口的实现类
    streamSource.print();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    自定义 Source
    DataStreamSource<EventLog> dataStreamSource = env.addSource(new MySourceFunction());
    
    • 1
    // 自定义类
    class MySourceFunction implements SourceFunction<EventLog>{
       private volatile boolean flag = true;
    
       @Override
       public void run(SourceContext<EventLog> ctx) throws Exception {
           EventLog eventLog = new EventLog();
           String[] events = {"appLaunch","pageLoad","adShow","adClick","itemShare","itemCollect","putBack","wakeUp","appClose"};
           HashMap<String, String> eventInfoMap = new HashMap<>();
    
           while(flag){
               eventLog.setGuid(RandomUtils.nextLong(1,1000));
               eventLog.setSessionId(RandomStringUtils.randomAlphabetic(12).toUpperCase());
               eventLog.setTimeStamp(System.currentTimeMillis());
               eventLog.setEventId(events[RandomUtils.nextInt(0,events.length)]);
               eventInfoMap.put(RandomStringUtils.randomAlphabetic(1),RandomStringUtils.randomAlphabetic(2));
               eventLog.setEventInfo(eventInfoMap);
               
               ctx.collect(eventLog);
               eventInfoMap.clear();
               Thread.sleep(RandomUtils.nextInt(500,1500));
           }
       }
    
       @Override
       public void cancel() {
           flag = false;
       }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    从并行度的角度,Source 又可分为并行 Source 和 非并行 Source:

    • 非并行 Source 的并行度只能为1,读取大量数据时效率较低,如 Socket Source。
    • 并行 Source 的并行度可以是1或多个,资源充足的情况下,并行度越大,效率越高,如 kafka source。

    其中自定义source

    • 实现 SourceFunction 或者 RichSourceFunction 接口, 这两者都是非并行的source算子
    • 实现 ParallelSourceFunction 或者 RichParallelSourceFunction 接口, 这两者都是可并行的source算子

    带 Rich 的,都拥有 open(), close(), getRuntimeContext() 方法
    带 Parallel 的,都可多实例并行执行

    Transformation 算子

    map算子
    SingleOutputStreamOperator<UserInfo> beanStream = streamSource.map(json -> JSON.parseObject(json, UserInfo.class));
    
    • 1
    filter算子
    SingleOutputStreamOperator<UserInfo> filtered = beanStream.filter(bean -> bean.getFriends().size() <= 3);
    
    • 1
    flatmap算子
    SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = upperCased.flatMap((String s, Collector<Tuple2<String, Integer>> collector) -> {
                String[] words = s.split("\\s+");
                for (String word : words) {
                    collector.collect(Tuple2.of(word, 1));
                }
            }).returns(new TypeHint<Tuple2<String, Integer>>() {});   // 通过 TypeHint 传达返回数据类型
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    keyBy算子
    // 各性别用户的好友总数
    KeyedStream<Tuple2<String, Integer>, String> keyedStream = flatten
            .map(bean -> Tuple2.of(bean.getGender(), 1))
            .returns(new TypeHint<Tuple2<String, Integer>>() {
            })
            .keyBy(tp -> tp.f0);
    SingleOutputStreamOperator<Tuple2<String, Integer>> genderFriendCount = keyedStream.sum(1);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    max、maxby算子
    //max、maxBy 区别: 更新状态的逻辑!  max只更新要求最大值的字段;  而 maxBy 会更新整条数据;
    tuple4Stream.keyBy(tp -> tp.f2)
            /*.max(3); */   // 各性别中,用户好友数量最大值
            .maxBy(3);  // 求各性别中,好友数量最大的那个人
    
    • 1
    • 2
    • 3
    • 4
    reduce算子
    //需求: 求各性别中,好友数量最大的那个人,而且如果前后两个人的好友数量相同,则输出的结果中,也需要将uid/name等信息更新成后面一条数据的值
    SingleOutputStreamOperator<Tuple4<Integer, String, String, Integer>> reduceResult = tuple4Stream.keyBy(tp -> tp.f2)
            .reduce(new ReduceFunction<Tuple4<Integer, String, String, Integer>>() {
                /**
                 *
                 * @param value1  是此前的聚合结果
                 * @param value2  是本次的新数据
                 * @return 更新后的聚合结果
                 * @throws Exception
                 */
                @Override
                public Tuple4<Integer, String, String, Integer> reduce(Tuple4<Integer, String, String, Integer> value1, Tuple4<Integer, String, String, Integer> value2) throws Exception {
                    if (value1 == null || value2.f3 >= value1.f3) {
                        return value2;
                    } else {
                        return value1;
                    }
                }
            });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    Sink 算子

    输出到控制台
    streamSource.print();
    
    • 1
    输出到文件系统
    • 方式一:输出为行格式文件
    // 构造一个FileSink对象
    
    FileSink<String> rowSink = FileSink
        .forRowFormat(new Path("out/filesink/"), new SimpleStringEncoder<String>("utf-8"))
        // 文件的滚动策略 (间隔时长10s,或文件大小达到 5M,就进行文件切换
        .withRollingPolicy(DefaultRollingPolicy.builder()
             .withRolloverInterval(Duration.ofMillis(10000)) // flink-1.15.0版本 时间控制不生效
             .withMaxPartSize(MemorySize.ofMebiBytes(5))
             .build())
        // 分桶的策略(划分子文件夹的策略)
        .withBucketAssigner(new DateTimeBucketAssigner<>())
        .withBucketCheckInterval(5)
        // 输出文件的文件名相关配置
        .withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("doitedu").withPartSuffix(".txt").build())
        .build();
    streamSource.map(JSON::toJSONString)
    	.sinkTo(rowSink) /*Sink 的实现类对象,用 sinkTo()来添加  */
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 方式二:输出为列格式文件
    ParquetWriterFactory<EventLog> writerFactory = AvroParquetWriters.forReflectRecord(EventLog.class);// 该方法,传入一个普通的JavaBean类,就可以自动通过反射来生成 Schema
    
    FileSink<EventLog> parquetSink = FileSink
            .forBulkFormat(new Path("out/bulksink2"), writerFactory) // 核心点: 要一个parquet文件输出器 writerFactory
            .withBucketAssigner(new DateTimeBucketAssigner<>()) // 分桶策略
            .withBucketCheckInterval(5) //分桶检查间隔
            .withRollingPolicy(OnCheckpointRollingPolicy.build())  // 当 checkpoint发生时,进行文件滚动
            .withOutputFileConfig(OutputFileConfig.builder().withPartSuffix(".parquet").withPartPrefix("log").build())
            .build();
    streamSource.sinkTo(parquetSink);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    输出到 kafka
    // 1. 构造一个kafka的sink算子
    KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
        .setBootstrapServers("hdp01:9092,hdp02:9092")
        .setRecordSerializer(KafkaRecordSerializationSchema.<String>builder()
        	.setTopic("event-log")
       		.setValueSerializationSchema(new SimpleStringSchema())
        	.build()
        )
    //  .setKafkaProducerConfig()
        .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
        .build();
    
    // 2. 把数据流输出到构造好的sink算子
    streamSource.map(JSON::toJSONString)
    	.sinkTo(kafkaSink);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    输出到 jdbc
    • 不保证 EOS语义的方式,所谓EOS语义指 Exactly Once
    SinkFunction<EventLog> jdbcSink = JdbcSink.sink(
            "insert into t_eventlog values (?,?,?,?,?) on duplicate key update guid=?,sessionId=?,eventId=?,ts=?,eventInfo=? ",
            new JdbcStatementBuilder<EventLog>() {
                @Override
                public void accept(PreparedStatement preparedStatement, EventLog eventLog) throws SQLException {
                    preparedStatement.setLong(1, eventLog.getGuid());
                    preparedStatement.setString(2, eventLog.getSessionId());
                    preparedStatement.setString(3, eventLog.getEventId());
                    preparedStatement.setLong(4, eventLog.getTimeStamp());
                    preparedStatement.setString(5, JSON.toJSONString(eventLog.getEventInfo()));
    
                    preparedStatement.setString(6, eventLog.getSessionId());
                    preparedStatement.setString(7, eventLog.getEventId());
                    preparedStatement.setLong(8, eventLog.getTimeStamp());
                    preparedStatement.setString(9, JSON.toJSONString(eventLog.getEventInfo()));
                }
            },
            JdbcExecutionOptions.builder()
                    .withMaxRetries(3)
                    .withBatchSize(1)
                    .build(),
            new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                    .withUrl("jdbc:mysql://hdp01:3306/abc?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8")
                    .withUsername("root")
                    .withPassword("123456")
                    .build()
    );
    
    // 输出数据
    streamSource.addSink(jdbcSink);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 可以提供 EOS 语义保证的 sink
    SinkFunction<EventLog> exactlyOnceSink = JdbcSink.exactlyOnceSink(
            "insert into t_eventlog values (?,?,?,?,?) on duplicate key update guid=?,sessionId=?,eventId=?,ts=?,eventInfo=? ",
            new JdbcStatementBuilder<EventLog>() {
                @Override
                public void accept(PreparedStatement preparedStatement, EventLog eventLog) throws SQLException {
                    preparedStatement.setLong(1, eventLog.getGuid());
                    preparedStatement.setString(2, eventLog.getSessionId());
                    preparedStatement.setString(3, eventLog.getEventId());
                    preparedStatement.setLong(4, eventLog.getTimeStamp());
                    preparedStatement.setString(5, JSON.toJSONString(eventLog.getEventInfo()));
    
                    preparedStatement.setString(6, eventLog.getSessionId());
                    preparedStatement.setString(7, eventLog.getEventId());
                    preparedStatement.setLong(8, eventLog.getTimeStamp());
                    preparedStatement.setString(9, JSON.toJSONString(eventLog.getEventInfo()));
                }
            },
            JdbcExecutionOptions.builder()
                    .withMaxRetries(3)
                    .withBatchSize(1)
                    .build(),
            JdbcExactlyOnceOptions.builder()
                    // mysql不支持同一个连接上存在并行的多个事务,必须把该参数设置为true
                    .withTransactionPerConnection(true)
                    .build(),
            new SerializableSupplier<XADataSource>() {
                @Override
                public XADataSource get() {
                    // XADataSource就是jdbc连接,不过它是支持分布式事务的连接
                    // 而且它的构造方法,不同的数据库构造方法不同
                    MysqlXADataSource xaDataSource = new MysqlXADataSource();
                    xaDataSource.setUrl("jdbc:mysql://hdp01:3306/abc?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8");
                    xaDataSource.setUser("root");
                    xaDataSource.setPassword("123456");
                    return xaDataSource;
                }
            }
    );
    
    // 输出数据
    streamSource.addSink(exactlyOnceSink);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
  • 相关阅读:
    picGo图床搭建gitee和smms(建议使用)
    测试工程师们需要认真思考的几个问题
    微信小程序——后台交互
    Codeforces Round 894 (Div. 3) E. Kolya and Movie Theatre
    【华为OD机试真题 JS】找单词
    基于ASP.NET+MYSQL的医院信息管理系统
    大数据培训技术Phoenix
    内存函数介绍
    深度学习【使用seq2seq实现聊天机器人】
    vue watch监听vuex中的数据变化
  • 原文地址:https://blog.csdn.net/qq_17310871/article/details/126489133