• Flink中序列化RoaringBitmap不同方式的对比


    背景

    flink中,我们有时候会使用到RoaringBitmap进行统计计数等操作,而当使用RoaringBitmap时,这就涉及到了最重要的问题,如何序列化?序列化的目的是为了进行网络通信或者状态序列化的目的,本文的重点是比较kryo使用默认的序列化器序列化RoaringBitmap和自定义序列化器序列化RoaringBitmap的性能对比

    性能对比

    当在flink中使用RoaringBitmap时,flink自身携带的序列化器是没法处理这个类的序列化的,只能交给kryo进行序列化,而kryo都是使用FieldSerializer来对对象进行序列化,当kryo对RoaringBitmap类进行序列化时,他会对里面的在这里插入图片描述
    的每个字段分别调用对应的序列化器进行序列化/反序列化,但是其实这样的性能不高,因为其实不是这里面数组里面的每个元素都需要序列化,而是可以根据情况来决定的,所以RoaringBitmap本身提供了serializer/deserializer方法,这相比于直接序列化每个字段有极大的性能提升,所以我们这里需要实现自己的kryo序列化器来直接使用RoaringBitmap提供的serializer/deserializer方法,以下是得到的性能测试结果:
    在这里插入图片描述
    附测试代码:

        @Benchmark @OperationsPerInvocation(value = SerializationFrameworkMiniBenchmarks.RECORDS_PER_INVOCATION_SER)
        public void serializerKryoBitmap(FlinkEnvironmentContext context) throws Exception {
            StreamExecutionEnvironment env = context.env;
            env.setParallelism(4);
            ExecutionConfig executionConfig = env.getConfig();
            executionConfig.enableForceKryo();
    
            env.addSource(new BitMapWrapperSource(RECORDS_PER_INVOCATION_SER, 10)).rebalance()
                    .addSink(new SelfBitMapSink<BitMapWrapper>());
            env.execute();
        }
    
        @Benchmark @OperationsPerInvocation(value = SerializationFrameworkMiniBenchmarks.RECORDS_PER_INVOCATION_SER)
        public void serializerKryoBitmapSerializer(FlinkEnvironmentContext context) throws Exception {
            StreamExecutionEnvironment env = context.env;
            env.setParallelism(4);
            ExecutionConfig executionConfig = env.getConfig();
            executionConfig.enableForceKryo();
            executionConfig.registerTypeWithKryoSerializer(RoaringBitmap.class, BitMapSerializer.class);
    
            env.addSource(new BitMapWrapperSource(RECORDS_PER_INVOCATION_SER, 10)).rebalance()
                    .addSink(new SelfBitMapSink<BitMapWrapper>());
            env.execute();
        }
      /**
         * RoaringBitmapSource
         */
        public static class BitMapWrapperSource extends BaseSourceWithKeyRange<BitMapWrapper> {
            private static final long serialVersionUID = 2941333602938145599L;
    
            private transient BitMapWrapper template;
    
            public BitMapWrapperSource(int numEvents, int numKeys) {
                super(numEvents, numKeys);
            }
    
            @Override protected void init() {
                super.init();
                template = initNewBitMapWrapper(0);
            }
    
    
            @Override protected BitMapWrapper getElement(int keyId) {
                template.setId(keyId);
                return template;
            }
    
            private BitMapWrapper initNewBitMapWrapper(int keyId) {
                BitMapWrapper template = new BitMapWrapper();
                RoaringBitmap r32 = new RoaringBitmap();
                for (int i = 0; i < BITMAP_INIT_NUM / 2; i++) {
                    r32.add(1000000 + i);
                }
                for (int i = BITMAP_INIT_NUM / 2; i < BITMAP_INIT_NUM; i++) {
                    r32.add(9000000 + i);
                }
                r32.runOptimize();
                template.setR1(r32);
                template.setId(keyId);
                return template;
            }
        }
        public class SelfBitMapSink<BitMapWrapper> implements SinkFunction<org.apache.flink.benchmark.selfbitmap.BitMapWrapper>, SupportsConcurrentExecutionAttempts {
    
        private static final long serialVersionUID = 1L;
    
        public SelfBitMapSink() {
        }
    
        public void invoke(org.apache.flink.benchmark.selfbitmap.BitMapWrapper value) throws Exception{
            if (value.getId() > 10 || value.getId() < 0) {
                throw new Exception("id is illegal" + value.getId());
            }
            if(value.getR1().getCardinality() != SerializationFrameworkMiniBenchmarks.BITMAP_INIT_NUM){
                throw new Exception("bitmap error" + value.getR1().getCardinality());
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
  • 相关阅读:
    从零实现深度学习框架——重构计算图的实现
    VS2019报错:应用程序无法正常启动(0xc000007b),请单击确定关闭应用程序;VCRUNTIME140.dll报错
    智慧矿山AI算法助力煤矿安全:人员越界识别精准迅速
    [spark]action算子
    go gorm select * 优化
    uniapp vue2 vuex 持久化
    基于C语言实现图书借阅管理系统
    Elasticsearch基础篇(六):es映射和常用的字段类型
    力扣:第 304 场周赛
    LeetCode刷题day23||669. 修剪二叉搜索树&&108.将有序数组转换为二叉搜索树&&538.把二叉搜索树转换为累加树--二叉树
  • 原文地址:https://blog.csdn.net/lixia0417mul2/article/details/133458787