我们知道处理常用的滑动窗口分配器,滚动窗口分配器,全局窗口分配器,会话窗口分配器外,我们可以实现自己的自定义窗口分配器,以实现我们的自己的窗口逻辑
package wikiedits.assigner;
import com.google.common.collect.Lists;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import java.util.Collection;
import java.util.Collections;
public class IntervalWindowAssigner
extends WindowAssigner<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
private long windowSize = 60 * 1000L;
private IntervalWindowAssigner() {}
@Override
public Collection<TimeWindow> assignWindows(
Object element, long timestamp, WindowAssignerContext context) {
long startTime = timestamp - (timestamp % windowSize);
long endTime = startTime + windowSize;
return Lists.newArrayList(new TimeWindow(startTime, endTime));
}
@Override
public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return EventTimeTrigger.create();
}
@Override public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
return new TimeWindow.Serializer();
}
@Override public boolean isEventTime() {
return true;
}
}
注意,TimeWindow时间窗口是左边右开的形式,参见下图所示
代码里面是以maxTimeStamp()为准的