process function相对于前文所述的map、flatmap、filter算子来说,最大的区别是其让开发人员对数据的处理逻辑拥有更大的自由度;同时,ProcessFunction 继承了RichFunction,因而具备了getRuntimeContext() ,open() ,close()等方法;
在不同类型的datastream上,(比如keyed stream、windowedStream、ConnectedStream等),应用process function时,flink提供了大量不同类型的process function,让其针对不同的datastream拥有更具针对性的功能;
- ProcessFunction (普通DataStream上调process时)
- KeyedProcessFunction (KeyedStream上调process时)
- ProcessWindowFunction(WindowedStream上调process时)
- ProcessAllWindowFunction(AllWindowedStream上调process时)
- CoProcessFuntion (ConnectedStreams上调process时)
- ProcessJoinFunction (JoinedStreams上调process时)
- BroadcastProcessFunction (BroadCastConnectedStreams上调process时)
- KeyedBroadcastProcessFunction(KeyedBroadCastConnectedStreams上调process时)
各种算子运算后所生成的datastream类型,及各种datastream类型之间的互相转换关系
在不同类型的 数据流上,调用process算子时,所需要传入的ProcessFunction也会有不同
数据源头调用方法
- package com.blok;
-
- import com.mysql.cj.jdbc.MysqlXADataSource;
- import com.pojo.BigYY;
- import com.yao.yao.base01.MySource;
- import com.yao.yao.base01.beans.DaYao;
- import org.apache.flink.api.common.functions.RuntimeContext;
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.connector.jdbc.*;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.functions.ProcessFunction;
- import org.apache.flink.streaming.api.functions.sink.SinkFunction;
- import org.apache.flink.util.Collector;
- import org.apache.flink.util.function.SerializableSupplier;
-
- import javax.sql.XADataSource;
- import java.sql.PreparedStatement;
- import java.sql.SQLException;
-
- /**
- * @Date: 22.11.11
- * @Author: Hang.Nian.YY
- * @qq: 598196583
- * @Tips: 学大数据 ,到多易教育
- * @Description:
- */
- public class _19Base_API_Process_Function {
-
- public static void main(String[] args) throws Exception {
- Configuration conf = new Configuration();
- conf.setInteger("rest.port", 8888);
- StreamExecutionEnvironment see = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
- see.setParallelism(1);
-
- DataStreamSource
ds = see.socketTextStream("linux01", 8899); - /**
- * 壹
- * 源头数据 调用process方法
- */
- ds.process(new ProcessFunction
() { // 泛型1输入数据类型 泛型2输出数据类型 - /**
- *每来一条数据 方法被调用一次
- * ctx 上下文对象 可以操作测流
- * out 用于输出
- *
- */
- @Override
- public void processElement(String value, ProcessFunction
.Context ctx, Collector out) throws Exception { - String[] arr = value.split(",");
- out.collect(new BigYY(arr[0], arr[1], arr[2], Double.parseDouble(arr[3])));
- }
-
- /**
- * 生命周期方法
- * @param parameters The configuration containing the parameters attached to the contract.
- * @throws Exception
- */
- @Override
- public void open(Configuration parameters) throws Exception {
- RuntimeContext runtimeContext = getRuntimeContext();
- // 获取程序运行的task信息
- runtimeContext.getTaskNameWithSubtasks();
- runtimeContext.getTaskName();
- runtimeContext.getIndexOfThisSubtask();
- // 获取状态对象 获取累加器等
- // runtimeContext.getBroadcastVariable() ;
- super.open(parameters);
- }
-
- @Override
- public void close() throws Exception {
- super.close();
- }
- });
-
-
- }
- }
keyBy后调用方法
- package com.blok;
-
- import com.mysql.cj.jdbc.MysqlXADataSource;
- import com.pojo.BigYY;
- import com.yao.yao.base01.MySource;
- import com.yao.yao.base01.beans.DaYao;
- import org.apache.flink.api.common.functions.RuntimeContext;
- import org.apache.flink.api.java.functions.KeySelector;
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.connector.jdbc.*;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.datastream.KeyedStream;
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
- import org.apache.flink.streaming.api.functions.ProcessFunction;
- import org.apache.flink.streaming.api.functions.sink.SinkFunction;
- import org.apache.flink.util.Collector;
- import org.apache.flink.util.function.SerializableSupplier;
-
- import javax.sql.XADataSource;
- import java.sql.PreparedStatement;
- import java.sql.SQLException;
-
- /**
- * @Date: 22.11.11
- * @Author: Hang.Nian.YY
- * @qq: 598196583
- * @Tips: 学大数据 ,到多易教育
- * @Description:
- */
- public class _19Base_API_Process_Function {
-
- public static void main(String[] args) throws Exception {
- Configuration conf = new Configuration();
- conf.setInteger("rest.port", 8888);
- StreamExecutionEnvironment see = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
- see.setParallelism(1);
-
- DataStreamSource
ds = see.socketTextStream("linux01", 8899); - /**
- * 壹
- * 源头数据 调用process方法
- */
- SingleOutputStreamOperator
ds2 = ds.process(new ProcessFunction() { // 泛型1输入数据类型 泛型2输出数据类型 - /**
- *每来一条数据 方法被调用一次
- * ctx 上下文对象 可以操作测流
- * out 用于输出
- *
- */
- @Override
- public void processElement(String value, ProcessFunction
.Context ctx, Collector out) throws Exception { - String[] arr = value.split(",");
- out.collect(new BigYY(arr[0], arr[1], arr[2], Double.parseDouble(arr[3])));
- }
-
- /**
- * 生命周期方法
- * @param parameters The configuration containing the parameters attached to the contract.
- * @throws Exception
- */
- @Override
- public void open(Configuration parameters) throws Exception {
- RuntimeContext runtimeContext = getRuntimeContext();
- // 获取程序运行的task信息
- runtimeContext.getTaskNameWithSubtasks();
- runtimeContext.getTaskName();
- runtimeContext.getIndexOfThisSubtask();
- // 获取状态对象 获取累加器等
- // runtimeContext.getBroadcastVariable() ;
- super.open(parameters);
- }
-
- @Override
- public void close() throws Exception {
- super.close();
- }
- });
-
- /**
- * 贰
- * 处理数据源 , 将数据封装成指定的数据格式 然后对数据进行分组
- * 在keyBy后使用process方法处理数据
- */
- // 将数据按照城市分组
- ds2.keyBy(BigYY::getCity);
- KeyedStream
keyed = ds2.keyBy(new KeySelector() { - @Override
- public String getKey(BigYY value) throws Exception {
- return value.getCity();
- }
- });
-
- // KeyedProcessFunction
- // 泛型1 流的key的数据类型 泛型2 输入的数据类型 泛型3 输出的数据类型
- keyed.process(new KeyedProcessFunction
() { -
- // 生命周期方法
- @Override
- public void open(Configuration parameters) throws Exception {
- RuntimeContext runtimeContext = getRuntimeContext();
- super.open(parameters);
- }
-
- // 数据处理的方法
- @Override
- public void processElement(BigYY value, KeyedProcessFunction
.Context ctx, Collector out) throws Exception { -
- }
-
- // 生命周期方法
- @Override
- public void close() throws Exception {
- super.close();
- }
- });
-
- // ProcessFunction 这方法已经过期
- keyed.process(new ProcessFunction
() { - @Override
- public void processElement(BigYY value, ProcessFunction
.Context ctx, Collector out) throws Exception { -
- }
- });
-
-
- }
- }
开窗后调用方法
- package com.blok;
-
- import com.mysql.cj.jdbc.MysqlXADataSource;
- import com.pojo.BigYY;
- import com.yao.yao.base01.MySource;
- import com.yao.yao.base01.beans.DaYao;
- import org.apache.flink.api.common.functions.RuntimeContext;
- import org.apache.flink.api.java.functions.KeySelector;
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.connector.jdbc.*;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.datastream.KeyedStream;
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
- import org.apache.flink.streaming.api.functions.ProcessFunction;
- import org.apache.flink.streaming.api.functions.sink.SinkFunction;
- import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
- import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction;
- import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
- import org.apache.flink.streaming.api.windowing.time.Time;
- import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
- import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
- import org.apache.flink.util.Collector;
- import org.apache.flink.util.function.SerializableSupplier;
-
- import javax.sql.XADataSource;
- import java.sql.PreparedStatement;
- import java.sql.SQLException;
-
- /**
- * @Date: 22.11.11
- * @Author: Hang.Nian.YY
- * @qq: 598196583
- * @Tips: 学大数据 ,到多易教育
- * @Description:
- */
- public class _19Base_API_Process_Function {
-
- public static void main(String[] args) throws Exception {
- Configuration conf = new Configuration();
- conf.setInteger("rest.port", 8888);
- StreamExecutionEnvironment see = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
- see.setParallelism(1);
-
- DataStreamSource
ds = see.socketTextStream("linux01", 8899); - /**
- * 壹
- * 源头数据 调用process方法
- */
- SingleOutputStreamOperator
ds2 = ds.process(new ProcessFunction() { // 泛型1输入数据类型 泛型2输出数据类型 - /**
- *每来一条数据 方法被调用一次
- * ctx 上下文对象 可以操作测流
- * out 用于输出
- *
- */
- @Override
- public void processElement(String value, ProcessFunction
.Context ctx, Collector out) throws Exception { - String[] arr = value.split(",");
- out.collect(new BigYY(arr[0], arr[1], arr[2], Double.parseDouble(arr[3])));
- }
-
- /**
- * 生命周期方法
- * @param parameters The configuration containing the parameters attached to the contract.
- * @throws Exception
- */
- @Override
- public void open(Configuration parameters) throws Exception {
- RuntimeContext runtimeContext = getRuntimeContext();
- // 获取程序运行的task信息
- runtimeContext.getTaskNameWithSubtasks();
- runtimeContext.getTaskName();
- runtimeContext.getIndexOfThisSubtask();
- // 获取状态对象 获取累加器等
- // runtimeContext.getBroadcastVariable() ;
- super.open(parameters);
- }
-
- @Override
- public void close() throws Exception {
- super.close();
- }
- });
-
- /**
- * 叁
- * 开窗后调用 process方法
- * 依然后生命周期方法
- * 获取运行时环境的方法
- */
- ds2
- .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10))) // 滚动窗口
- .process(new ProcessAllWindowFunction
() { - @Override
- public void process(ProcessAllWindowFunction
.Context context, Iterable elements, Collector out) throws Exception { -
- }
- });
-
- ds2.countWindowAll(10).process(new ProcessAllWindowFunction
() { - @Override
- public void process(ProcessAllWindowFunction
.Context context, Iterable elements, Collector out) throws Exception { -
- }
- });
-
-
- }
- }
开窗后才有的apply方法
- /**
- * @Date: 22.11.11
- * @Author: Hang.Nian.YY
- * @qq: 598196583
- * @Tips: 学大数据 ,到多易教育
- * @Description:
- */
- public class _19Base_API_Process_Function {
-
- public static void main(String[] args) throws Exception {
- Configuration conf = new Configuration();
- conf.setInteger("rest.port", 8888);
- StreamExecutionEnvironment see = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
- see.setParallelism(1);
-
- DataStreamSource
ds = see.socketTextStream("linux01", 8899); - /**
- * 壹
- * 源头数据 调用process方法
- */
- SingleOutputStreamOperator
ds2 = ds.process(new ProcessFunction() { // 泛型1输入数据类型 泛型2输出数据类型 - /**
- *每来一条数据 方法被调用一次
- * ctx 上下文对象 可以操作测流
- * out 用于输出
- *
- */
- @Override
- public void processElement(String value, ProcessFunction
.Context ctx, Collector out) throws Exception { - String[] arr = value.split(",");
- out.collect(new BigYY(arr[0], arr[1], arr[2], Double.parseDouble(arr[3])));
- }
-
- /**
- * 生命周期方法
- * @param parameters The configuration containing the parameters attached to the contract.
- * @throws Exception
- */
- @Override
- public void open(Configuration parameters) throws Exception {
- RuntimeContext runtimeContext = getRuntimeContext();
- // 获取程序运行的task信息
- runtimeContext.getTaskNameWithSubtasks();
- runtimeContext.getTaskName();
- runtimeContext.getIndexOfThisSubtask();
- // 获取状态对象 获取累加器等
- // runtimeContext.getBroadcastVariable() ;
- super.open(parameters);
- }
-
- @Override
- public void close() throws Exception {
- super.close();
- }
- });
-
- /**
- * 叁
- * 开窗后调用 apply
- * 没有生命周期方法
- * 不能运行时环境的方法
- */
- ds2
- .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10))) // 滚动窗口
- .apply(new AllWindowFunction
() { - //窗口触发一次 调用一次这个方法
- //values 迭代本窗口中所有数据的迭代器
- @Override
- public void apply(TimeWindow window, Iterable
values, Collector out) throws Exception { - double sum = 0d;
- for (BigYY by : values) {
-
- double appearance = by.getAppearance();
- sum += appearance;
- }
- out.collect(sum);
- }
- });
-
-
- }
- }