• 【实战-08】flink DataStream 如何实现去重


    摘要

    假设我们有一批订单数据实时接入kafka, flink需要对订单数据做处理,值得注意的是订单数据 要求绝对不可以重复处理。 考虑到订单数据上报到kafka的时候存在重复上报的可能性,因此需要我们flink处理的时候 避免进行重复处理。在flinksql 中我们有去重的方式,请参考flinksql 去重 。 但是我们本小结来讨论DataStream Api如何去重。

    分析

    我们很容易想到:假设订单的唯一主键就是order_id, 要想达到去重的效果应该可以想到用State 存储已经处理过的订单,新的订单来临的时候判断是否存在于State中,如果不存在则处理,存在则视为重复订单,需要放弃当前订单。
    上面的思想理论上是没有问题的,但是实际上却会产生不小的问题。 上面的额分析中,state会缓存所有已经处理过的订单id, 要知道kakfa的数据是源源不断的, 那么也就意味着我们需要缓存的state 会越来越大, 没错这就像一个不断膨胀的炸弹,总有一天会炸掉。因此我们需要在分析下。 也就是说Datastream 的缓存(状态)不能一直存在,否则总会内存溢出。 如果是你你会怎么做? 没错给缓存增加一个过期时间。 而这个时间就要结合业务来确定。
    加入我有订单数据, 正常来说即便是重复订单,一定会在一个小时之内过来(严格贴合业务),如何做去重代码呢。 直接看代码。

    补充一点

    flink的状态(用于失败重启恢复), 默认的一些source源都内置的状态的实现,而同时我们也可以再算子内部自定义一些状态。这些状态也就是上文说的缓存。我为什么总是提 缓存这两个字? 那么缓存和状态有什么区别? 答案就是状态的本质就是缓存,但是必须按照flink的要求写, 不能自己随便定义一个Map List 来当缓存。 因为flink需要失败重启的时候可以实现缓存的重新读取,重新恢复失败前的缓存。 那也就意味着失败的时候缓存需要落盘。 这 落盘的代码flink帮我们做好了,我们只需要按照flink的要求定义 状态, 状态会自动 松柏落盘,恢复的时候重新读取。 所以说, 我的理解是:flink的状态就是 可以自动落盘的 缓存。

    上代码

    状态存储用的是Rocksdb,这部分需要maven依赖,请自行查询资料。

    
    import com.boke.realtime.ads.bean.TrackDirtyEvent;
    import com.boke.realtime.ads.bean.TrackDirtyReason;
    import com.boke.realtime.ads.bean.TrackEvent;
    import org.apache.flink.api.common.state.StateTtlConfig;
    import org.apache.flink.api.common.state.ValueState;
    import org.apache.flink.api.common.state.ValueStateDescriptor;
    import org.apache.flink.api.common.typeinfo.TypeHint;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
    import org.apache.flink.util.Collector;
    import org.apache.flink.util.OutputTag;
    
    /**
     * @author penggan
     */
    public class KeyedStateDeduplicateProcess extends KeyedProcessFunction<Long, Order, Order> {
        private ValueState<Boolean> isExist;
    
        private OutputTag<Order> dirtyOutput;
    
        public KeyedStateDeduplicateProcess(OutputTag<Order> dirtyOutput) {
            this.dirtyOutput = dirtyOutput;
        }
    
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            ValueStateDescriptor<Boolean> keyedStateDuplicated =
                    new ValueStateDescriptor<>("KeyedStateDeduplication",
                            TypeInformation.of(new TypeHint<Boolean>() {
                            }));
            // 过期时间设定为 2 小时
            StateTtlConfig ttlConfig = StateTtlConfig
                    //过期时间
                    .newBuilder(org.apache.flink.api.common.time.Time.hours(1))
                    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                    //在未清理状态之前是否可以访问过期的状态
                    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                    //压缩清除之前数据量
                    .cleanupInRocksdbCompactFilter(50000000L)
                    .build();
            //keyedStateDuplicated 中的值保留两个小时
            keyedStateDuplicated.enableTimeToLive(ttlConfig);
            isExist = getRuntimeContext().getState(keyedStateDuplicated);
        }
    
        @Override
        public void processElement(Order value, Context ctx, Collector<Order> out) throws Exception {
            //isExist中的值保留两个小时,这就意味着,在两个小时内同一个appid+logId 的数据只有第一条数据会被collect到下游算子,其他的数据被反配到脏数据通道
            if (null == isExist.value()) {
                out.collect(value);
                isExist.update(true);
    
            } else {
               
                Order order = Order.fromEvent(value,'重复数据');
                ctx.output(dirtyOutput, order);
            }
        }
    }
    
    public class Main{
    public static void main(String[] args) throws Exception {
    ...省略其它
     OutputTag<TrackDirtyEvent> dirtyOutput = new OutputTag<TrackDirtyEvent>("dirty_tag") {};//脏数据通道
     SingleOutputStreamOperator<Order> deduplicatedStream = orderStream.keyBy((KeySelector<Order, Long>) o
                String uiqKey = o.orderId;
                return Hashing.murmur3_128(5).hashUnencodedChars(uiqKey).asLong();
            })
       deduplicatedStream
       				  .process(new KeyedStateDeduplicateProcess(dirtyOutput))
                      .uid("deduplicate")
                      .name("deduplicate");     
    }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77

    分析: 代码整体逻辑,根据orderId KeyBy, 然后KeyedStateDeduplicateProcess 中的逻辑就是再指定时间内,同一个
    orderId 的数据只有第一条会发送到下游,其他的则会流入脏数据通道。
    上面代码整体逻辑清晰,需要的话简单改改可以直接用的。不懂可以留言。

    关于状态(即缓存)

    flink内置的缓存(状态)只有四种,且只能用于KeyBy之后的数据流,未调用KeyBy则不能用,会报错的。

    1. ValueState getState(ValueStateDescriptor)
      支持的函数:update(T) T value()
    2. ReducingState getReducingState(ReducingStateDescriptor)
      支持的函数: add(T) addAll(List) update(List) Iterable get()
    3. ListState getListState(ListStateDescriptor)
      支持的函数: add(T)
    4. AggregatingState getAggregatingState(AggregatingStateDescriptor)
      支持的函数:add(IN)
    5. MapState getMapState(MapStateDescriptor)
      支持的函数:put(UK, UV) putAll(Map) get(UK) entries() keys() values() isEmpty()
      具体细节本章不再涉及,大家主要的重点在于 keyBy orderId 然后再process函数中对orderId 的去重。巧妙地用的Boolean 而不是用的ListState, 你也可以用ListState 将处理后的id放入list, 然后给ListState设置过期时间(也就是对某个key需要进行去重的时间), 然后判断在集合中就不发送到下游,不在集合中就发送到下游。 这种方案比较灵活,就不需要keyBy的时候用oderId了。 千言万语不能完全说明白。 这篇文章小白读了估计费劲,想多说,但是说多了也确实不适合初入门的朋友们。 大抵是这么个意思了。
  • 相关阅读:
    算法——动态规划(新)
    PHP-Composer包开发、发布流程
    Redis-命令操作Redis
    轻量应用服务器和云服务器有哪些区别?该如何选择?
    JTAG调试结构
    正则匹配绕过总计之[极客大挑战 2019]RCE ME
    第07章 循环神经网络
    树的前序遍历非递归思路
    【lwip】10-ICMP协议&源码分析
    Pandas中的Series对象详解(含Python代码)
  • 原文地址:https://blog.csdn.net/qq_36066039/article/details/134313825