• Flink系列文档-(YY11)-watermark工作机制


    1 WaterMark生成工作机制

     观察源码

    1. public class BoundedOutOfOrdernessWatermarks implements WatermarkGenerator {
    2.    /** The maximum timestamp encountered so far. */
    3.    private long maxTimestamp;
    4.    /** The maximum out-of-orderness that this watermark generator assumes. */
    5.    private final long outOfOrdernessMillis;
    6.    /**
    7.     * Creates a new watermark generator with the given out-of-orderness bound.
    8.     *
    9.     * @param maxOutOfOrderness The bound for the out-of-orderness of the event timestamps.
    10.     */
    11.    public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) {
    12.        checkNotNull(maxOutOfOrderness, "maxOutOfOrderness");
    13.        checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative");
    14.        this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();
    15.        // start so that our lowest watermark would be Long.MIN_VALUE.
    16.        this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;
    17.   }
    18.    // ------------------------------------------------------------------------
    19.    @Override
    20.    public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
    21.        // 视当前的事件时间戳,更新(或不更新)maxTimestamp
    22.        maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
    23.   }
    24.    @Override
    25.    public void onPeriodicEmit(WatermarkOutput output) {
    26.        // 以当前eventTimeStamp-乱序延迟数-1,作为生成的watermark值
    27.        output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
    28.   }
    29. }

    2 水位线生成观察

    构造两条单并行度流,合并成一个单并行度流,来watermark,及重点观察“接收多个上游分区”的算子的watermark推进规律;

    1. package com.blok2;
    2. import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
    3. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    4. import org.apache.flink.configuration.Configuration;
    5. import org.apache.flink.streaming.api.datastream.DataStream;
    6. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    7. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    8. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    9. import org.apache.flink.streaming.api.functions.ProcessFunction;
    10. import org.apache.flink.util.Collector;
    11. import java.time.Duration;
    12. /**
    13. * @Date: 22.12.4
    14. * @Author: Hang.Nian.YY
    15. * @qq: 598196583
    16. * @Tips: 学大数据 ,到多易教育
    17. * @Description:
    18. */
    19. public class _03_测试水位线生成 {
    20. public static void main(String[] args) throws Exception {
    21. Configuration conf = new Configuration();
    22. conf.setInteger("rest.port", 8888);
    23. StreamExecutionEnvironment see = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
    24. see.setParallelism(1);
    25. // 两个数据流 , 并行度都是1
    26. DataStreamSource ds1 = see.socketTextStream("linux01", 8888);
    27. DataStreamSource ds2 = see.socketTextStream("linux01", 9999);
    28. // 为两个 源数据流 分配WM
    29. WatermarkStrategy wm = WatermarkStrategy
    30. .forBoundedOutOfOrderness(Duration.ofSeconds(1))
    31. .withTimestampAssigner(
    32. new SerializableTimestampAssigner() {
    33. @Override
    34. public long extractTimestamp(String element, long recordTimestamp) {
    35. return Long.parseLong(element.split(",")[1]);
    36. }
    37. }
    38. //自动推进水位线更新 ,避免长时间没有数据 ,造成水位线无法向前推进
    39. ).withIdleness(Duration.ofSeconds(1));
    40. SingleOutputStreamOperator sso1 = ds1.assignTimestampsAndWatermarks(wm);
    41. SingleOutputStreamOperator sso2 = ds2.assignTimestampsAndWatermarks(wm);
    42. DataStream res = sso1.union(sso2);
    43. res.process(new ProcessFunction() {
    44. @Override
    45. public void processElement(String value, ProcessFunction.Context ctx, Collector out) throws Exception {
    46. long wmTime = ctx.timerService().currentWatermark();
    47. Long currentTime = ctx.timestamp();
    48. System.out.println(value + " waterMaker: " + wmTime + " currentTime: " + currentTime);
    49. out.collect(value.toUpperCase());
    50. }
    51. }).print();
    52. see.execute("测试水位线策略");
    53. }
    54. }

     当前最大时间 - 允许乱序时间 -1 

    a,10000   waterMaker: -9223372036854775808   currentTime: 10000
    A,10000
    1,20000   waterMaker: 8999   currentTime: 20000
    1,20000
    a,30000   waterMaker: 18999   currentTime: 30000
    A,30000
    a,10000   waterMaker: 28999   currentTime: 10000
    A,10000

  • 相关阅读:
    批量剪辑视频怎么做?附保姆级教程,新手小白也能3分钟50+短视频。
    【牛客网刷题】VL2 异步复位的串联T触发器
    制作一个简单HTML游戏网页(HTML+CSS)仿龙之谷网络游戏官网
    Markdown 教程之如何在 Markdown 文档中添加流程图、方程式和交互式图形
    C++之STL简介
    angular抛出 ExpressionChangedAfterItHasBeenCheckedError错误分析
    echarts-可视化地图防重叠文本框
    多线程详细介绍
    网络协议 从入门到精通系列讲解 - 总目录
    急诊与灾难医学-重点以及习题
  • 原文地址:https://blog.csdn.net/qq_37933018/article/details/128149851