• Kafka RecordAccumulator 二


    在这里插入图片描述

    我们可以知道一开始进来我们需要先获取Partition分区器对应的TopicPartiiton对应的对列,这样好进行放消息进去。

    Deque dq = getOrCreateDeque(tp);
    private Deque getOrCreateDeque(TopicPartition tp) {
    
            /**
             * CopyonWriteMap:
             *      get
             *      put
             *
             */
            //直接从batches里面获取当前分区对应的存储队列
    
            Deque d = this.batches.get(tp);
            //我们现在用到是场景驱动的方式,代码第一次执行到这儿的死活
            //是获取不到队列的,也就是说d 这个变量的值为null
            if (d != null)
                return d;
            //代码继续执行,创建出来一个新的空队列,
            d = new ArrayDeque<>();
            //把这个空的队列存入batches 这个数据结构里面
            Deque previous = this.batches.putIfAbsent(tp, d);
            if (previous == null)
                return d;
            else
                //直接返回新的结果
                return previous;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    这里面一个很关键的数据结构就是batches, 也就是每个消息进来都要去执行一下这个方法找到对应的对列。假设我们业务的tqs高达10w/s,那么即使分配到多个线程,那么你的调用次数也有可能达到w/s,那么这种情况下怎么才能高效的读取呢? 考虑到线程安全的问题我们还需要加锁吗? 一旦加锁不就会影响整体的一个吞吐量了吗。 Kafka这里用了类似JDK 的CopyOnWriteArrayList这个的实现,他本质上是采用了读写分离,写入加锁,读取不加锁的思想。我们先回忆回忆CopyOnWriteArrayList的特性:

    写入时复制,即写入时加速,copy原有的数据,然后生成新的数组,让底层指向这个新的数组。它的优劣:

    • 因为每次写入都会加锁并copy数组,所以内存开销会大,性能会有影响,最好就是写的场景较少。
      即它保证不了有时序要求的读写(无法保证读取最新的),但是可以保证最终一致性。比较适用于读多写少的场景。

    因为我们的分区数是有限的,一般来讲我们使用32,64,128,256已经算是极大的分区数目了,那么也就是针对写我们最多也就是写入32,64,128,256次,这种场景即使我们加锁去copy写入也不会存在问题,但是我们却可以快速读取上万的tps流量。

    看下Kafka CopyOnWriteMap 的实现
    关键的几个方法:

     public synchronized V put(K k, V v) {
            //新的内存空间
            //读写分离
            //往新的内存空间里面插入
            //读,读数据就老读空间里面去
            Map<K, V> copy = new HashMap<K, V>(this.map);
            //插入数据
            V prev = copy.put(k, v);
            //赋值给map
            this.map = Collections.unmodifiableMap(copy);
            return prev;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    整个方法使用的是synchronized关键字去修饰的,说明这个方法是线程安全。即使加了锁,这段代码的性能依然很好,因为里面都是纯内存的操作, 而且操作的次数很有限。

    这种设计方式,采用的是读写分离的设计思想。读操作和写操作 是相互不影响的。所以我们读数据的操作就是线程安全的。

    最后把值赋给了map,map是用volatile关键字修饰的。说明这个map是具有可见性的,这样的话,如果get数据的时候,这儿的值发生了变化,也是能感知到的。

    再看看读操作:

    public V get(Object k) {
            return map.get(k);
        }
    
    • 1
    • 2
    • 3

    读没有加锁,读取数据的时候性能很高(高并发的场景下,肯定性能很高)并且是线程安全的。

  • 相关阅读:
    Dubbo(四):Spring 整合 Dubbo 源码分析
    计算机网络---TCPIP网络编程实验
    数据治理:为什么不见BI作关联分析
    LeetCode75——Day10
    Dnsmasq的使用
    安装配置SPDK
    WPViewPDF Delphi 和 .NET 的 PDF 查看组件
    SpringBoot异步任务、邮件发送、定时任务
    数据转换成json格式
    【王道操作系统】ch1计算机系统概述-04操作系统结构
  • 原文地址:https://blog.csdn.net/zhangkai1992/article/details/127779512