• 2、Flink DataStreamAPI 概述(下)


    代码示例

    Maven 依赖

    <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java</artifactId>
                <version>1.19.0</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients</artifactId>
                <version>1.19.0</version>
            </dependency>
    
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
                <version>1.7.25</version>
            </dependency>
    
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
                <version>1.7.25</version>
            </dependency>
        </dependencies>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    log4j.properties

    log4j.rootLogger=INFO, console
    log4j.appender.console=org.apache.log4j.ConsoleAppender
    log4j.appender.console.layout=org.apache.log4j.PatternLayout
    log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    
    • 1
    • 2
    • 3
    • 4

    1、_01_QuickStart

    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.core.execution.JobClient;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
    import org.apache.flink.util.Collector;
    
    import java.time.Duration;
    
    public class _01_QuickStart {
    
        public static void main(String[] args) throws Exception {
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            DataStream<Tuple2<String, Integer>> dataStream = env
                    .socketTextStream("localhost", 8888)
                    .flatMap(new Splitter())
                    .keyBy(value -> value.f0)
                    .window(TumblingProcessingTimeWindows.of(Duration.ofSeconds(5)))
                    .sum(1);
    
            dataStream.print();
    
    //        JobExecutionResult jobExecutionResult = env.execute("Window WordCount");
    
            //程序完成时打印
            //JobExecutionResult=>Program execution finished
            //Job with JobID c01e1255752cbb34469a9a10177e637c has finished.
            //Job Runtime: 25596 ms
    //        System.out.println("JobExecutionResult=>"+jobExecutionResult.getJobExecutionResult());
    
            JobClient jobClient = env.executeAsync("Window WordCount");
    
            // Java程序可以通过JobClient同Flink Job交互
            // jobID=>32e976f03ac7243c09a5cf07c0739921
            // jobStatus=>RUNNING
            System.out.println("jobID=>"+jobClient.getJobID());
            System.out.println("jobStatus=>"+jobClient.getJobStatus().get());
        }
    
        public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
            @Override
            public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
                for (String word : sentence.split(",")) {
                    out.collect(new Tuple2<String, Integer>(word, 1));
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    2、_02_ReadFileSource

    import org.apache.flink.api.common.ExecutionConfig;
    import org.apache.flink.api.java.io.TextInputFormat;
    import org.apache.flink.core.fs.Path;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
    
    import java.io.File;
    
    public class _02_ReadFileSource {
        public static void main(String[] args) throws Exception {
            ExecutionConfig executionConfig = new ExecutionConfig();
            executionConfig.setAutoWatermarkInterval(1000);
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(executionConfig.toConfiguration());
            env.setBufferTimeout(1000);
            System.out.println("自动生成水位线间隔=>"+env.getConfig().getAutoWatermarkInterval());
            //第一次打印
            //1> a
            //5> h
            //6> i
            //4> f
            //2> c
            //3> e
            //1> b
            //2> d
            //4> g
            //向本地文件中新增三行insert
            //第二次打印
            //5> insert
            //2> i
            //2> insert
            //3> insert
            //1> g
            //1> h
            //8> d
            //7> a
            //8> e
            //7> b
            //8> f
            //7> c
            DataStreamSource<String> source = env.readFile(new TextInputFormat(Path.fromLocalFile(new File("word.txt")))
                    , "/Users/***/Desktop/word.txt"
                    , FileProcessingMode.PROCESS_CONTINUOUSLY
                    , 10000);
    
            source.print();
    
            env.execute();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    3、_03_CollectAsync

    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.util.CloseableIterator;
    
    public class _03_CollectAsync {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // 从元素列表创建一个 DataStream
            DataStream<Integer> myInts = env.fromElements(1, 2, 3, 4, 5);
    
            CloseableIterator<Integer> iterator = myInts.collectAsync();
    
            env.execute();
    
            while (iterator.hasNext()){
                System.out.println("iterator=>"+iterator.next());
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    4、_04_JobClientStopWithSavepoint

    import org.apache.flink.api.java.io.TextInputFormat;
    import org.apache.flink.core.execution.JobClient;
    import org.apache.flink.core.fs.Path;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
    import org.apache.flink.util.CloseableIterator;
    
    import java.io.File;
    
    // 在 程序完成时 或者 CheckPoint触发时 才会输出结果
    public class _04_JobClientStopWithSavepoint {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            env.enableCheckpointing(2000);
    
            DataStreamSource<String> source = env.readFile(new TextInputFormat(Path.fromLocalFile(new File("word.txt")))
                    , "/Users/***/Desktop/word.txt"
                    , FileProcessingMode.PROCESS_CONTINUOUSLY
                    , 10000);
    
            CloseableIterator<String> iterator = source.collectAsync();
    
            JobClient jobClient = env.executeAsync();
    
    //        TimeUnit.SECONDS.sleep(5);
    //        jobClient.stopWithSavepoint(false,"/Users/hhx/Desktop/", SavepointFormatType.DEFAULT);
    
            while (iterator.hasNext()){
                System.out.println("iterator=>"+iterator.next());
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
  • 相关阅读:
    设计模式之适配器模式
    VoxWeekly|The Sandbox 生态周报|20230918
    day43-反射02
    Nextcloud fpm 版在 Dokcer 下安装踩坑
    k8s优雅停服
    解锁 OpenAI 密钥的新利器:OpenAI Key 有效性查询工具,支持GPT4
    Stream流总结
    海思3559 sample解析:vio
    【面试必备】我跟面试官聊了一个小时线程池!
    Oracle 19c OCP 1Z0-083题库(7-12)
  • 原文地址:https://blog.csdn.net/m0_50186249/article/details/138162117