• Flink系列文档-(YY03)-Flink编程基础API


    1 Flink编程入口

    首先获取flink编程的核心入口对象  

    1. /**
    2. * 获取批处理入口对象
    3. */
    4. // 1) 普通的批处理对象
    5. ExecutionEnvironment environment1 = ExecutionEnvironment.getExecutionEnvironment();
    6. Configuration configuration = new Configuration();
    7. configuration.setInteger("rest.port" , 80820);
    8. // 2) 带本地界面的批处理入口对象
    9. ExecutionEnvironment environment2 = ExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
    10. /**
    11. * 获取流处理入口对象
    12. */
    13. StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
    14. StreamExecutionEnvironment see2 = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
    15. /**
    16. * 获取流处理入口对象
    17. * 可以处理批次数据
    18. * 也可以处理流式数据
    19. */
    20. see.setRuntimeMode(RuntimeExecutionMode.BATCH) ; // 批处理模式
    21. see.setRuntimeMode(RuntimeExecutionMode.STREAMING) ; // 流处理模式
    22. see.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC) ; // 自动处理模式

     注意:上面代码中开启WEBUI本地运行模式时 需要添加如下依赖

    1. <dependency>
    2.    <groupId>org.apache.flinkgroupId>
    3.    <artifactId>flink-runtime-web_2.12artifactId>
    4.    <version>${flink.version}version>
    5. dependency>

    2 基本Source算子

    source是用来获取外部数据的算子,按照获取数据的方式,可以分为:

    1. 基于集合的Source
    2. 基于Socket网络端口的Source
    3. 基于文件的Source
    4. 第三方Connector Source
    5. 自定义Source

    从并行度的角度,source又可以分为非并行的source和并行的source。

    1. 非并行source:并行度只能为1,即只有一个运行时实例,在读取大量数据时效率比较低,通常是用来做一些实验或测试,例如Socket Source;
    2. 并行Source:并行度可以是1到多个,在计算资源足够的前提下,并行度越大,效率越高。例如Kafka Source;

    2.1 基于集合的Source(常用于测试) 

    可将一个普通的Java集合、迭代器或者可变参数转换成一个分布式数据流DataStream;

    1. /**
    2. * @Date: 22.11.7
    3. * @Author: Hang.Nian.YY
    4. * @qq: 598196583
    5. * @Tips: 学大数据 ,到多易教育
    6. * @Description: SOURCE: 基于集合的source算子
    7. */
    8. public class Base_API_Source_Demo01 {
    9. public static void main(String[] args) throws Exception {
    10. Configuration configuration = new Configuration();
    11. configuration.setInteger("rest.port" , 80820);
    12. StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
    13. // 设置处理数据的并行度
    14. see.setParallelism(2) ;
    15. /**
    16. * 基于集合的source算子
    17. */
    18. //1)
    19. DataStreamSource ds1 = see.fromSequence(1, 10);
    20. //2)
    21. DataStreamSource ds2 = see.fromElements("big", "data", "hang", "yy", "dy");
    22. //3)
    23. List list = Arrays.asList("big", "data", "hang", "yy", "dy");
    24. DataStreamSource ds3 = see.fromCollection(list);
    25. //4)
    26. // 可以并行化处理的数据流 , 指定数据并行化分发的规则
    27. // 参数一 使用flink自己现成的具有数据分发规则的迭代器的实现类 参数二 返回数据类型
    28. DataStreamSource ds4 = see.fromParallelCollection(new NumberSequenceIterator(1L, 10L), Long.class);
    29. // 将数据流出到控制台 ,print可以指定参数 : 输出标记
    30. ds1.print("输出数据: ") ;
    31. // 触发程序提交
    32. see.execute() ;
    33. }
    34. }

    2.2 基于Socket的Source(常用于测试) 

    非并行的Source,通过socket通信来获取数据得到数据流;

    该方法还有多个重载的方法,如:socketTextStream(String hostname, int port, String delimiter, long maxRetry)可以指定行分隔符和最大重新连接次数。

    1. //调用env的socketTextStream方法,从指定的Socket地址和端口创建DataStream
    2. DataStreamSource lines = env.socketTextStream("localhost", 8888);

    提示:socketSource是一个非并行source,如果使用socketTextStream读取数据,在启动Flink程序之前,必须先启动一个Socket服务,为了方便,Mac或Linux用户可以在命令行终端输入nc -lk 8888启动一个Socket服务并在命令行中向该Socket服务发送数据。

    2.3 基于文件的Source

    基于文件的Source,本质上就是使用指定的FileInputFormat组件读取数据,可以指定TextInputFormat、CsvInputFormat、BinaryInputFormat等格式;

    底层都是ContinuousFileMonitoringFunction(连续的文件监控),这个类继承了RichSourceFunction,RichSourceFunction产生的Source都是非并行的Source;

      readFile

    readFile(FileInputFormat inputFormat, String filePath) 方法可以指定读取文件的FileInputFormat 格式,参数FileProcessingMode,可取值:

    1. PROCESS_ONCE,只读取文件中的数据一次,读取完成后,程序退出
    2. PROCESS_CONTINUOUSLY,会一直监听指定的文件,文件的内容发生变化后,会将以前的内容和新的内容全部都读取出来,进而造成数据重复读取。
    1. String path = "file:///Users/xing/Desktop/a.txt";
    2. //PROCESS_CONTINUOUSLY模式是一直监听指定的文件或目录,2秒钟检测一次文件是否发生变化
    3. DataStreamSource lines = env.readFile(new TextInputFormat(null), path,
    4.        FileProcessingMode.PROCESS_CONTINUOUSLY, 2000);

      readTextFile

    readTextFile(String filePath) 可以从指定的目录或文件读取数据,默认使用的是TextInputFormat格式读取数据,还有一个重载的方法readTextFile(String filePath, String charsetName)可以传入读取文件指定的字符集,默认是UTF-8编码。该方法是一个有限的数据源,数据读完后,程序就会退出,不能一直运行。该方法底层调用的是readFile方法,FileProcessingMode为PROCESS_ONCE

    DataStreamSource lines = env.readTextFile(path);

     2.4 Kafka Source (生产中常用)

    在实际生产环境中,为了保证flink可以高效地读取数据源中的数据,通常是跟一些分布式消息中件结合使用,例如Apache Kafka。Kafka的特点是分布式、多副本、高可用、高吞吐、可以记录偏移量等。Flink和Kafka整合可以高效的读取数据,并且可以保证Exactly Once(精确一次性语义)。

    首先在maven项目的pom.xml文件中导入Flink跟Kafka整合的依赖

    1. <dependency>
    2.    <groupId>org.apache.flinkgroupId>
    3.    <artifactId>flink-connector-kafka_2.12artifactId>
    4.    <version>1.14.4version>
    5. dependency>

    代码示例

    1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    2. KafkaSource kafkaSource = KafkaSource.builder()
    3.       .setBootstrapServers("doitedu01:9092")
    4.       .setValueOnlyDeserializer(new SimpleStringSchema())
    5.       .setStartingOffsets(OffsetsInitializer.latest())
    6.       .setTopics("flink-01")
    7.       .setGroupId("fk03")
    8.       .build();
    9. WatermarkStrategy strategy = WatermarkStrategy
    10.       .forBoundedOutOfOrderness(Duration.ZERO)
    11.       .withTimestampAssigner(new SerializableTimestampAssigner() {
    12.            @Override
    13.            public long extractTimestamp(String element, long recordTimestamp) {
    14.                String[] arr = element.split(",");
    15.                return Long.parseLong(arr[3]);
    16.           }
    17.       });
    18. // 1,a,100,1646784001000
    19. DataStreamSource stream1 = env.fromSource(kafkaSource,strategy,"").setParallelism(2);

    新版本API中,flink会把kafka消费者的消费位移记录在算子状态中,这样就实现了消费位移状态的容错,从而可以支持端到端的exactly-once;

    2.5 自定义Source​​​​​​​ 

    Flink的DataStream API可以让开发者根据实际需要,灵活的自定义Source,本质上就是定义一个类,实现SourceFunction或继承RichParallelSourceFunction,实现run方法和cancel方法。

    自定义source

       可以实现   SourceFunction  或者 RichSourceFunction , 这两者都是非并行的source算子

       也可实现   ParallelSourceFunction  或者 RichParallelSourceFunction , 这两者都是可并行的source算子

    -- 带 Rich的,都拥有 open() ,close()生命周期方法 ,getRuntimeContext() 方法

    -- 带 Parallel的,都可多实例并行执行

    创建非并行 Source 数据源

    1. class MySource0 implements SourceFunction{
    2. @Override
    3. public void run(SourceContext ctx) throws Exception {
    4. }
    5. @Override
    6. public void cancel() {
    7. }
    8. }
    9. /**
    10. * RichXXX 方法中具有
    11. * 1) getRuntimeContext 运行时上下文
    12. * 2) open()
    13. * 3) close() 等生命周期方法
    14. */
    15. class MySource02 extends RichSourceFunction{
    16. ArrayList list = new ArrayList<>() ;
    17. @Override
    18. public void open(Configuration parameters) throws Exception {
    19. BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("data/yy.txt")));
    20. String line = null ;
    21. while ((line = br.readLine())!=null){
    22. String[] arr = line.split(",");
    23. DaYao daYao = new DaYao(arr[0], arr[1], arr[2], Double.parseDouble(arr[3]));
    24. list.add(daYao) ;
    25. }
    26. }
    27. @Override
    28. public void run(SourceContext ctx) throws Exception {
    29. while (true){
    30. int index = RandomUtils.nextInt(list.size());
    31. DaYao daYao = list.get(index);
    32. ctx.collect(daYao);
    33. Thread.sleep(2000);
    34. }
    35. }
    36. @Override
    37. public void cancel() {
    38. }
    39. @Override
    40. public void close() throws Exception {
    41. super.close();
    42. }
    43. }

    创建并行的source 数据源

    1. class MySource03 implements ParallelSourceFunction{
    2. @Override
    3. public void run(SourceContext ctx) throws Exception {
    4. }
    5. @Override
    6. public void cancel() {
    7. }
    8. }

    ​​​​​​​

    1. /**
    2. * @Date: 22.11.07
    3. * @Author: Hang.Nian.YY
    4. * @qq: 598196583
    5. * @Tips: 学大数据 ,到多易教育
    6. * @Description:
    7. */
    8. public class MySource extends RichParallelSourceFunction {
    9. ArrayList list = new ArrayList<>() ;
    10. @Override
    11. public void open(Configuration parameters) throws Exception {
    12. BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("data/dayao.txt")));
    13. String line = null ;
    14. while ((line = br.readLine())!=null){
    15. String[] arr = line.split(",");
    16. DaYao daYao = new DaYao(arr[0], arr[1], arr[2], Double.parseDouble(arr[3]));
    17. list.add(daYao) ;
    18. }
    19. }
    20. @Override
    21. public void run(SourceContext ctx) throws Exception {
    22. while (true){
    23. int index = RandomUtils.nextInt(list.size());
    24. DaYao daYao = list.get(index);
    25. ctx.collect(daYao);
    26. Thread.sleep(2000);
    27. }
    28. }
    29. @Override
    30. public void cancel() {
    31. }
    32. }

  • 相关阅读:
    美团面试官:高并发、任务执行时间短的业务怎样使用线程池?
    【架构】常见技术点--服务治理
    探画系统探画系统开发源码分享
    Redis 定长队列的探索和实践
    【RabbitMQ】什么是RabbitMQ?RabbitMQ有什么用?应用场景有那些?
    Python复习笔记5——常用模块
    运用精益管理思想提升MES管理系统建设水平
    个人实现的可任意折叠QToolBox——AdvancedToolBox
    python学习003——enumerate() 函数
    给Git仓库添加.gitignore:清理、删除、排除被Git误添加的临时文件
  • 原文地址:https://blog.csdn.net/qq_37933018/article/details/127739852