flink的processFunction是实现高级功能的重要模块,所以flink也是单独列出了一节来讲解,不仅如此市面上的flink开发的书籍基本上也是会把processFunction单独列出一个小节来讲解。
另外理解本小节的代码,需要读者理解状态,以及flink三种时间:event time,ingestion time,processing time.下面的例子用的是process time. 本节内容不适合新手小白,适合对flink有一定了解以及有一定编程经验的人。
process函数其实和普通的map flatMap没什么本质上的区别,都是接收一条数据,然后发送出去。 但是。。。但是仅仅如此还不能体现出它的重要性以及特别之处。 它核心的特殊的几点如下:
说明:其中3,和4 结合可以实现一些复杂的功能,是processFunction的核心。比如基于状态注册一些定时触发器,然后再触发器中发出特定的结果。 这个功能常用于监控数据流的特性,实战中很有用。后面我会留一个这方面的例子,这里暂时不多说。
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
public class processFunctionSideOut {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//定义两条流
DataStream> stream1 = env.fromElements(
Tuple2.of("a", 1L),
Tuple2.of("b", 2L),
Tuple2.of("a", 2000L),
Tuple2.of("a", 3000L),
Tuple2.of("b", 2000L),
Tuple2.of("c", 2001L)
);
OutputTag> outputTag = new OutputTag>("sideout"){};
SingleOutputStreamOperator> ds = stream1.process(new ProcessFunction, Tuple2>() {
@Override
public void processElement(Tuple2 value, Context ctx, Collector> out) throws Exception {
if(value.f0.equals("a")){
out.collect(value);
}else {
ctx.output(outputTag,value);
}
}
});
ds.print();//输出所有的 a的数据
// ds.getSideOutput(outputTag).print();//输出所有不为a的数据
env.execute();
}
}
理解此代码需要读者掌握状态的概念,之后其他的文章会讲解状态,这里暂且不提。大家只需要知道状态就是说存储了一个全局变量我可以无障碍访问即可,状态中的数据是原子性的。
此小节只是告诉大家KeyedprocessFunction的重要性就在于我们可以自己维护状态,可以自己控制数据的发出。
思路:
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
public class processFunctionAverage {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//定义两条流
DataStream> ds = env.fromElements(
Tuple2.of("b", 1L),
Tuple2.of("b", 1L),
Tuple2.of("b", 2L),
Tuple2.of("b", 2L),
Tuple2.of("b", 3L),
Tuple2.of("b", 5L),
Tuple2.of("a", 8L),
Tuple2.of("a", 7L)
);
ds.keyBy((KeySelector, String>) value -> value.f0).process(new KeyedProcessFunction, Tuple2>() {
private ValueState counter;//存储数据条数
private ValueState> element;//存储临时数据
@Override
public void processElement(Tuple2 value, Context ctx, Collector> out) throws Exception {
if (counter.value() == null) {
counter.update(1);//遇见第一条数据的时候,计数器为1
} else {
counter.update(counter.value() + 1);
}
if (element.value() == null) {
element.update(value);//element只存储上一次到来的数据
}
if (counter.value() == 2) {
out.collect(Tuple2.of(element.value().f0, (double) ((element.value().f1 + value.f1) / 2)));
//发出结果之后清楚状态
counter.clear();
element.clear();
}
}
@Override
public void open(Configuration parameters) throws Exception {
counter = getRuntimeContext().getState(new ValueStateDescriptor("counter", Types.INT));
element = getRuntimeContext().getState(new ValueStateDescriptor<>("element", Types.TUPLE(Types.STRING, Types.LONG)));
}
}).print();
env.execute();
}
}
结果:
6> (a,7.0)
2> (b,1.0)
2> (b,2.0)
2> (b,4.0)
注意:想多说一句open方法,所有的process方法都继承了RichFucntion,
RichFunction是顶层的接口,核心方法有两个:
定时器也不是很难理解的东西,比如这么一个场景,我们的数据流是温度传感器的数据, 如果某个传感器在未来的一分钟温度持续升高,那么我们就发出一个警告,这个警告数据的发出就可以借助定时器功能呢。 这是一个真实开发中常见的功能。我们上面也说了processFunction两个核心(1.访问状态 2.定时器功能), 定时器常用来做报警功能。
我们可以把定时器理解为闹钟,用法很简单 -> 1.注册一个定时器 2.如果定时器触发则调用onTimer函数做处理。
ProcessFunction有两个常用的方法,上面的demo中我们已经见到了processElement方法,七四还有一个onTimer方法哦。
源码如下:
@PublicEvolving
public abstract class KeyedProcessFunction extends AbstractRichFunction {
//每来一个流数据就会调用一次哦
public abstract void processElement(I value, Context ctx, Collector out) throws Exception;
//定时器触发的时候的回调函数,只有在定时器闹钟响起之后才会被触发哦
public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception {}
}
所谓的定时器,其实就是一个时间处理器,在指定的时间运行自定义的逻辑,其实现主要分为以下两步:
下面是代码:Device是设备数据,其属性包含设备Id和属性
要求对十秒内温度持续上升的设备发出报警。
public class Device {
String deviceID;
Double temperature;
public Device() {
}
public String getDeviceID() {
return deviceID;
}
public Double getTemperature() {
return temperature;
}
public void setDeviceID(String deviceID) {
this.deviceID = deviceID;
}
public void setTemperature(Double temperature) {
this.temperature = temperature;
}
@Override
public String toString() {
return "Device{" +
"deviceID='" + deviceID + '\'' +
", temperature=" + temperature +
'}';
}
}
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Random;
public class DeviceSource implements SourceFunction {
private boolean flag = true;
@Override
public void run(SourceContext ctx) throws Exception {
double tem = 1.1;
int i = 1;
while (flag) {
Device device = new Device();
String tmpID = String.valueOf(new Random().nextInt(2));
device.setDeviceID("device_" + tmpID);
if (tmpID.contains("0")) {
device.setTemperature(1.1);
} else {
device.setTemperature(tem);
}
ctx.collect(device);
tem += 0.1;//模拟温度递增的设备数据
Thread.sleep(1000);//每一秒产生一条数据
i++;
if (i == 30) {
cancel();
}
}
}
@Override
public void cancel() {
flag = false;
}
}
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
public class KeyedprocessFunctionAlarm {
//为了方便叙述,下面代码中我们把 定时器的触发时间说成了闹钟时间。
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DeviceSource deviceSource = new DeviceSource();
DataStreamSource source = env.addSource(deviceSource);
source.keyBy((KeySelector) value -> value.getDeviceID()).process(new MyKeyedProcessFunction()).print();
env.execute("test Timer alarm.");
}
}
class MyKeyedProcessFunction extends KeyedProcessFunction {
private ValueState clockTime;//存储闹钟的时间戳
private ValueState deviceTemperature;//存储当前device的温度
@Override
public void open(Configuration parameters) throws Exception {
//初始化闹钟状态, 初始化温度状态
clockTime = getRuntimeContext().getState(new ValueStateDescriptor("timer", Types.LONG));
deviceTemperature = getRuntimeContext().getState(new ValueStateDescriptor("temperature", Types.DOUBLE));
}
@Override
public void processElement(Device device, Context ctx, Collector out) throws Exception {
if (deviceTemperature.value() == null) {
// 第一次获取到温度,则不做任何处理,直接存入状态
} else {
//说明已经流入了数据
double preTemperature = deviceTemperature.value();//获取缓存的温度
if (device.getTemperature() < preTemperature) {
//温度下降,则删除定时器
if (clockTime.value() != null) {//排除第二次就下降,第二次下降的时候根本还未注册定时器,何谈删除?所以加上这个判断段
System.out.println(String.format("processElement:[key=%s,清除定时器]", device.getDeviceID()));
ctx.timerService().deleteEventTimeTimer(clockTime.value());
clockTime.clear();
}
} else if (device.getTemperature() > preTemperature && clockTime.value() == null) {
System.out.println(String.format("设置定时器:[key = %s]", device.getDeviceID()));
//只有不存在 闹钟时间的时候才会注册定时器
long timerTS = ctx.timerService().currentProcessingTime() + 2000;//未来闹钟的设定时间, 比现在多2秒
ctx.timerService().registerProcessingTimeTimer(timerTS);//注册定时器
clockTime.update(timerTS);
}
}
deviceTemperature.update(device.getTemperature());//更新当前温度
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception {
out.collect(String.format("报警数据:[deviceID=%s,currenTempreture=%s]", ctx.getCurrentKey(), deviceTemperature.value()));
//发出报警之后,要清除状态以及清除定时器闹钟
System.out.println(String.format("闹钟被触发:[key=%s,删除定时器且清空闹钟时间]",ctx.getCurrentKey() ));
ctx.timerService().deleteProcessingTimeTimer(clockTime.value());
clockTime.clear();
}
}
结果如下:
设置定时器:[key = device_1]
报警数据:[deviceID=device_1,currenTempreture=1.3000000000000003]
闹钟被触发:[key=device_1,删除定时器且清空闹钟时间]
设置定时器:[key = device_1]
报警数据:[deviceID=device_1,currenTempreture=1.7000000000000006]
闹钟被触发:[key=device_1,删除定时器且清空闹钟时间]
设置定时器:[key = device_1]
报警数据:[deviceID=device_1,currenTempreture=2.000000000000001]
闹钟被触发:[key=device_1,删除定时器且清空闹钟时间]
设置定时器:[key = device_1]
报警数据:[deviceID=device_1,currenTempreture=2.4000000000000012]
闹钟被触发:[key=device_1,删除定时器且清空闹钟时间]
设置定时器:[key = device_1]
报警数据:[deviceID=device_1,currenTempreture=2.6000000000000014]
闹钟被触发:[key=device_1,删除定时器且清空闹钟时间]
设置定时器:[key = device_1]
报警数据:[deviceID=device_1,currenTempreture=2.7000000000000015]
闹钟被触发:[key=device_1,删除定时器且清空闹钟时间]
设置定时器:[key = device_1]
报警数据:[deviceID=device_1,currenTempreture=3.300000000000002]
闹钟被触发:[key=device_1,删除定时器且清空闹钟时间]
设置定时器:[key = device_1]
说明:结果中根本没有device_0的数据,全都是device_1的报警数据,和我们的预期是符合的,因为产生的device数据中,device_0的温度是不变的,而device_1的温度是递增的。而我们通过processFunction对一秒内温度持续增加的数据发出的报警,所以上述结果是没任何问题的。
顾名思义这是对connect的流数据进行处理,类似于coMap
注意:CoProcessFunction不可以使用定时器,因为定时器只能在keyedStream使用
connectedStreams.process(new CoProcessFunction, Tuple2, String>() {
@Override
public void processElement1(Tuple3 value, Context ctx, Collector out) throws Exception {
//处理第一个流数据
}
@Override
public void processElement2(Tuple2 value, Context ctx, Collector out) throws Exception {
//处理第二个流数据
}
})
顾名思义这是对掉用A.connect(B).keyBy(KeySelector,KeySelector)的流数据进行处理, 可以使用定时器,因为是KeyedStream
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class connectDemo {
private static final Logger logger = LoggerFactory.getLogger(connectDemo.class);
public static void main(String[] args) throws Exception {
logger.info("程序开始运行....");
// 获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 读取数据
DataStreamSource> input01 = env.fromElements(ONE);
DataStreamSource> input02 = env.fromElements(TWO);
ConnectedStreams, Tuple2> connectedStreams = input01.connect(input02);
connectedStreams.keyBy((KeySelector, String>) value -> value.f0,
(KeySelector, String>) value -> value.f0).process(new KeyedCoProcessFunction, Tuple2, String>() {
@Override
public void processElement1(Tuple3 value, Context ctx, Collector out) throws Exception {
//处理第一个流
}
@Override
public void processElement2(Tuple2 value, Context ctx, Collector out) throws Exception {
//处理第二个流
}
})
ds.addSink(new PrintSinkFunction<>("这是我的自定义输出:", false));
env.execute("TestAggFunctionOnWindow");
}
public static final Tuple3[] ONE = new Tuple3[] {
Tuple3.of("一班", "张三", 1L),
Tuple3.of("一班", "李四", 2L),
Tuple3.of("一班", "王五", 3L)
};
public static final Tuple2[] TWO = new Tuple2[]{
Tuple2.of("二班", "赵六"),
Tuple2.of("二班", "小七"),
Tuple2.of("二班", "小八")
};
}
这里谈一下定时器吧,我们已经知道定时器功能是通过: ctx.timerService().registerProcessingTimeTimer(timerTS);//注册定时器
ctx.timerService().deleteEventTimeTimer(clockTime.value());//删除定时器
定时器的使用仅限于keysedStream,所以要额外注意。
另外定时器一般要结合状态来使用, 而涉及到状态大家一定要特别谨慎, 因为状态如果你不手动删除的话会一直保存的,默认是保存在内存中, 如果我们配置了状态后端(即状态的存储方式) 则大多数会配置成RocksDB中,也就是保存在磁盘中了。 换句话说,我们自定义的状态需要在触发定时器之后手动删除,否则可能会带来灾难性的后果。 以后的小节中我会专门来讲解状态的知识,此小节读者需要有谨慎的意识即可。
最后:flink的基础知识我会一直分享下去,帅哥们,点个关注吧。