• Flink系列文档-(YY07)-Flink编程API-process function


    1 概述

    process function相对于前文所述的map、flatmap、filter算子来说,最大的区别是其让开发人员对数据的处理逻辑拥有更大的自由度;同时,ProcessFunction 继承了RichFunction,因而具备了getRuntimeContext() ,open() ,close()等方法;

    在不同类型的datastream上,(比如keyed stream、windowedStream、ConnectedStream等),应用process function时,flink提供了大量不同类型的process function,让其针对不同的datastream拥有更具针对性的功能;

    1. ProcessFunction (普通DataStream上调process时)
    2. KeyedProcessFunction (KeyedStream上调process时)
    3. ProcessWindowFunction(WindowedStream上调process时)
    4. ProcessAllWindowFunction(AllWindowedStream上调process时)
    5. CoProcessFuntion (ConnectedStreams上调process时)
    6. ProcessJoinFunction (JoinedStreams上调process时)
    7. BroadcastProcessFunction (BroadCastConnectedStreams上调process时)
    8. KeyedBroadcastProcessFunction(KeyedBroadCastConnectedStreams上调process时)

      各种算子运算后所生成的datastream类型,及各种datastream类型之间的互相转换关系 

     

    在不同类型的 数据流上,调用process算子时,所需要传入的ProcessFunction也会有不同 

    2 示例代码

    数据源头调用方法 

    1. package com.blok;
    2. import com.mysql.cj.jdbc.MysqlXADataSource;
    3. import com.pojo.BigYY;
    4. import com.yao.yao.base01.MySource;
    5. import com.yao.yao.base01.beans.DaYao;
    6. import org.apache.flink.api.common.functions.RuntimeContext;
    7. import org.apache.flink.configuration.Configuration;
    8. import org.apache.flink.connector.jdbc.*;
    9. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    10. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    11. import org.apache.flink.streaming.api.functions.ProcessFunction;
    12. import org.apache.flink.streaming.api.functions.sink.SinkFunction;
    13. import org.apache.flink.util.Collector;
    14. import org.apache.flink.util.function.SerializableSupplier;
    15. import javax.sql.XADataSource;
    16. import java.sql.PreparedStatement;
    17. import java.sql.SQLException;
    18. /**
    19. * @Date: 22.11.11
    20. * @Author: Hang.Nian.YY
    21. * @qq: 598196583
    22. * @Tips: 学大数据 ,到多易教育
    23. * @Description:
    24. */
    25. public class _19Base_API_Process_Function {
    26. public static void main(String[] args) throws Exception {
    27. Configuration conf = new Configuration();
    28. conf.setInteger("rest.port", 8888);
    29. StreamExecutionEnvironment see = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
    30. see.setParallelism(1);
    31. DataStreamSource ds = see.socketTextStream("linux01", 8899);
    32. /**
    33. * 壹
    34. * 源头数据 调用process方法
    35. */
    36. ds.process(new ProcessFunction() { // 泛型1输入数据类型 泛型2输出数据类型
    37. /**
    38. *每来一条数据 方法被调用一次
    39. * ctx 上下文对象 可以操作测流
    40. * out 用于输出
    41. *
    42. */
    43. @Override
    44. public void processElement(String value, ProcessFunction.Context ctx, Collector out) throws Exception {
    45. String[] arr = value.split(",");
    46. out.collect(new BigYY(arr[0], arr[1], arr[2], Double.parseDouble(arr[3])));
    47. }
    48. /**
    49. * 生命周期方法
    50. * @param parameters The configuration containing the parameters attached to the contract.
    51. * @throws Exception
    52. */
    53. @Override
    54. public void open(Configuration parameters) throws Exception {
    55. RuntimeContext runtimeContext = getRuntimeContext();
    56. // 获取程序运行的task信息
    57. runtimeContext.getTaskNameWithSubtasks();
    58. runtimeContext.getTaskName();
    59. runtimeContext.getIndexOfThisSubtask();
    60. // 获取状态对象 获取累加器等
    61. // runtimeContext.getBroadcastVariable() ;
    62. super.open(parameters);
    63. }
    64. @Override
    65. public void close() throws Exception {
    66. super.close();
    67. }
    68. });
    69. }
    70. }

    keyBy后调用方法

    1. package com.blok;
    2. import com.mysql.cj.jdbc.MysqlXADataSource;
    3. import com.pojo.BigYY;
    4. import com.yao.yao.base01.MySource;
    5. import com.yao.yao.base01.beans.DaYao;
    6. import org.apache.flink.api.common.functions.RuntimeContext;
    7. import org.apache.flink.api.java.functions.KeySelector;
    8. import org.apache.flink.configuration.Configuration;
    9. import org.apache.flink.connector.jdbc.*;
    10. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    11. import org.apache.flink.streaming.api.datastream.KeyedStream;
    12. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    13. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    14. import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
    15. import org.apache.flink.streaming.api.functions.ProcessFunction;
    16. import org.apache.flink.streaming.api.functions.sink.SinkFunction;
    17. import org.apache.flink.util.Collector;
    18. import org.apache.flink.util.function.SerializableSupplier;
    19. import javax.sql.XADataSource;
    20. import java.sql.PreparedStatement;
    21. import java.sql.SQLException;
    22. /**
    23. * @Date: 22.11.11
    24. * @Author: Hang.Nian.YY
    25. * @qq: 598196583
    26. * @Tips: 学大数据 ,到多易教育
    27. * @Description:
    28. */
    29. public class _19Base_API_Process_Function {
    30. public static void main(String[] args) throws Exception {
    31. Configuration conf = new Configuration();
    32. conf.setInteger("rest.port", 8888);
    33. StreamExecutionEnvironment see = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
    34. see.setParallelism(1);
    35. DataStreamSource ds = see.socketTextStream("linux01", 8899);
    36. /**
    37. * 壹
    38. * 源头数据 调用process方法
    39. */
    40. SingleOutputStreamOperator ds2 = ds.process(new ProcessFunction() { // 泛型1输入数据类型 泛型2输出数据类型
    41. /**
    42. *每来一条数据 方法被调用一次
    43. * ctx 上下文对象 可以操作测流
    44. * out 用于输出
    45. *
    46. */
    47. @Override
    48. public void processElement(String value, ProcessFunction.Context ctx, Collector out) throws Exception {
    49. String[] arr = value.split(",");
    50. out.collect(new BigYY(arr[0], arr[1], arr[2], Double.parseDouble(arr[3])));
    51. }
    52. /**
    53. * 生命周期方法
    54. * @param parameters The configuration containing the parameters attached to the contract.
    55. * @throws Exception
    56. */
    57. @Override
    58. public void open(Configuration parameters) throws Exception {
    59. RuntimeContext runtimeContext = getRuntimeContext();
    60. // 获取程序运行的task信息
    61. runtimeContext.getTaskNameWithSubtasks();
    62. runtimeContext.getTaskName();
    63. runtimeContext.getIndexOfThisSubtask();
    64. // 获取状态对象 获取累加器等
    65. // runtimeContext.getBroadcastVariable() ;
    66. super.open(parameters);
    67. }
    68. @Override
    69. public void close() throws Exception {
    70. super.close();
    71. }
    72. });
    73. /**
    74. * 贰
    75. * 处理数据源 , 将数据封装成指定的数据格式 然后对数据进行分组
    76. * 在keyBy后使用process方法处理数据
    77. */
    78. // 将数据按照城市分组
    79. ds2.keyBy(BigYY::getCity);
    80. KeyedStream keyed = ds2.keyBy(new KeySelector() {
    81. @Override
    82. public String getKey(BigYY value) throws Exception {
    83. return value.getCity();
    84. }
    85. });
    86. // KeyedProcessFunction
    87. // 泛型1 流的key的数据类型 泛型2 输入的数据类型 泛型3 输出的数据类型
    88. keyed.process(new KeyedProcessFunction() {
    89. // 生命周期方法
    90. @Override
    91. public void open(Configuration parameters) throws Exception {
    92. RuntimeContext runtimeContext = getRuntimeContext();
    93. super.open(parameters);
    94. }
    95. // 数据处理的方法
    96. @Override
    97. public void processElement(BigYY value, KeyedProcessFunction.Context ctx, Collector out) throws Exception {
    98. }
    99. // 生命周期方法
    100. @Override
    101. public void close() throws Exception {
    102. super.close();
    103. }
    104. });
    105. // ProcessFunction 这方法已经过期
    106. keyed.process(new ProcessFunction() {
    107. @Override
    108. public void processElement(BigYY value, ProcessFunction.Context ctx, Collector out) throws Exception {
    109. }
    110. });
    111. }
    112. }

    开窗后调用方法

    1. package com.blok;
    2. import com.mysql.cj.jdbc.MysqlXADataSource;
    3. import com.pojo.BigYY;
    4. import com.yao.yao.base01.MySource;
    5. import com.yao.yao.base01.beans.DaYao;
    6. import org.apache.flink.api.common.functions.RuntimeContext;
    7. import org.apache.flink.api.java.functions.KeySelector;
    8. import org.apache.flink.configuration.Configuration;
    9. import org.apache.flink.connector.jdbc.*;
    10. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    11. import org.apache.flink.streaming.api.datastream.KeyedStream;
    12. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    13. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    14. import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
    15. import org.apache.flink.streaming.api.functions.ProcessFunction;
    16. import org.apache.flink.streaming.api.functions.sink.SinkFunction;
    17. import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
    18. import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction;
    19. import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
    20. import org.apache.flink.streaming.api.windowing.time.Time;
    21. import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
    22. import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    23. import org.apache.flink.util.Collector;
    24. import org.apache.flink.util.function.SerializableSupplier;
    25. import javax.sql.XADataSource;
    26. import java.sql.PreparedStatement;
    27. import java.sql.SQLException;
    28. /**
    29. * @Date: 22.11.11
    30. * @Author: Hang.Nian.YY
    31. * @qq: 598196583
    32. * @Tips: 学大数据 ,到多易教育
    33. * @Description:
    34. */
    35. public class _19Base_API_Process_Function {
    36. public static void main(String[] args) throws Exception {
    37. Configuration conf = new Configuration();
    38. conf.setInteger("rest.port", 8888);
    39. StreamExecutionEnvironment see = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
    40. see.setParallelism(1);
    41. DataStreamSource ds = see.socketTextStream("linux01", 8899);
    42. /**
    43. * 壹
    44. * 源头数据 调用process方法
    45. */
    46. SingleOutputStreamOperator ds2 = ds.process(new ProcessFunction() { // 泛型1输入数据类型 泛型2输出数据类型
    47. /**
    48. *每来一条数据 方法被调用一次
    49. * ctx 上下文对象 可以操作测流
    50. * out 用于输出
    51. *
    52. */
    53. @Override
    54. public void processElement(String value, ProcessFunction.Context ctx, Collector out) throws Exception {
    55. String[] arr = value.split(",");
    56. out.collect(new BigYY(arr[0], arr[1], arr[2], Double.parseDouble(arr[3])));
    57. }
    58. /**
    59. * 生命周期方法
    60. * @param parameters The configuration containing the parameters attached to the contract.
    61. * @throws Exception
    62. */
    63. @Override
    64. public void open(Configuration parameters) throws Exception {
    65. RuntimeContext runtimeContext = getRuntimeContext();
    66. // 获取程序运行的task信息
    67. runtimeContext.getTaskNameWithSubtasks();
    68. runtimeContext.getTaskName();
    69. runtimeContext.getIndexOfThisSubtask();
    70. // 获取状态对象 获取累加器等
    71. // runtimeContext.getBroadcastVariable() ;
    72. super.open(parameters);
    73. }
    74. @Override
    75. public void close() throws Exception {
    76. super.close();
    77. }
    78. });
    79. /**
    80. * 叁
    81. * 开窗后调用 process方法
    82. * 依然后生命周期方法
    83. * 获取运行时环境的方法
    84. */
    85. ds2
    86. .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10))) // 滚动窗口
    87. .process(new ProcessAllWindowFunction() {
    88. @Override
    89. public void process(ProcessAllWindowFunction.Context context, Iterable elements, Collector out) throws Exception {
    90. }
    91. });
    92. ds2.countWindowAll(10).process(new ProcessAllWindowFunction() {
    93. @Override
    94. public void process(ProcessAllWindowFunction.Context context, Iterable elements, Collector out) throws Exception {
    95. }
    96. });
    97. }
    98. }

    开窗后才有的apply方法

    1. /**
    2. * @Date: 22.11.11
    3. * @Author: Hang.Nian.YY
    4. * @qq: 598196583
    5. * @Tips: 学大数据 ,到多易教育
    6. * @Description:
    7. */
    8. public class _19Base_API_Process_Function {
    9. public static void main(String[] args) throws Exception {
    10. Configuration conf = new Configuration();
    11. conf.setInteger("rest.port", 8888);
    12. StreamExecutionEnvironment see = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
    13. see.setParallelism(1);
    14. DataStreamSource ds = see.socketTextStream("linux01", 8899);
    15. /**
    16. * 壹
    17. * 源头数据 调用process方法
    18. */
    19. SingleOutputStreamOperator ds2 = ds.process(new ProcessFunction() { // 泛型1输入数据类型 泛型2输出数据类型
    20. /**
    21. *每来一条数据 方法被调用一次
    22. * ctx 上下文对象 可以操作测流
    23. * out 用于输出
    24. *
    25. */
    26. @Override
    27. public void processElement(String value, ProcessFunction.Context ctx, Collector out) throws Exception {
    28. String[] arr = value.split(",");
    29. out.collect(new BigYY(arr[0], arr[1], arr[2], Double.parseDouble(arr[3])));
    30. }
    31. /**
    32. * 生命周期方法
    33. * @param parameters The configuration containing the parameters attached to the contract.
    34. * @throws Exception
    35. */
    36. @Override
    37. public void open(Configuration parameters) throws Exception {
    38. RuntimeContext runtimeContext = getRuntimeContext();
    39. // 获取程序运行的task信息
    40. runtimeContext.getTaskNameWithSubtasks();
    41. runtimeContext.getTaskName();
    42. runtimeContext.getIndexOfThisSubtask();
    43. // 获取状态对象 获取累加器等
    44. // runtimeContext.getBroadcastVariable() ;
    45. super.open(parameters);
    46. }
    47. @Override
    48. public void close() throws Exception {
    49. super.close();
    50. }
    51. });
    52. /**
    53. * 叁
    54. * 开窗后调用 apply
    55. * 没有生命周期方法
    56. * 不能运行时环境的方法
    57. */
    58. ds2
    59. .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10))) // 滚动窗口
    60. .apply(new AllWindowFunction() {
    61. //窗口触发一次 调用一次这个方法
    62. //values 迭代本窗口中所有数据的迭代器
    63. @Override
    64. public void apply(TimeWindow window, Iterable values, Collector out) throws Exception {
    65. double sum = 0d;
    66. for (BigYY by : values) {
    67. double appearance = by.getAppearance();
    68. sum += appearance;
    69. }
    70. out.collect(sum);
    71. }
    72. });
    73. }
    74. }

  • 相关阅读:
    excel高级绘图技巧100讲(二十二)-如何对不规则数据进行分列
    学习笔记-接口测试(postman、jmeter)
    统计学习方法-感知机
    动作捕捉系统用于室内组合定位技术研究
    详细介绍Linux环境的搭建以及相关问题出现的解决
    编译原理 —— 编译器
    申请国外博士后如何写好推荐信
    C++模板
    python 为什么这么受欢迎?python的优势到底在哪里?
    C++数据结构与算法:布隆过滤器(Bloom Filter)原理与实现
  • 原文地址:https://blog.csdn.net/qq_37933018/article/details/127813366