我们可以知道一开始进来我们需要先获取Partition分区器对应的TopicPartiiton对应的对列,这样好进行放消息进去。
Deque dq = getOrCreateDeque(tp);
private Deque getOrCreateDeque(TopicPartition tp) {
/**
* CopyonWriteMap:
* get
* put
*
*/
//直接从batches里面获取当前分区对应的存储队列
Deque d = this.batches.get(tp);
//我们现在用到是场景驱动的方式,代码第一次执行到这儿的死活
//是获取不到队列的,也就是说d 这个变量的值为null
if (d != null)
return d;
//代码继续执行,创建出来一个新的空队列,
d = new ArrayDeque<>();
//把这个空的队列存入batches 这个数据结构里面
Deque previous = this.batches.putIfAbsent(tp, d);
if (previous == null)
return d;
else
//直接返回新的结果
return previous;
}
这里面一个很关键的数据结构就是batches, 也就是每个消息进来都要去执行一下这个方法找到对应的对列。假设我们业务的tqs高达10w/s,那么即使分配到多个线程,那么你的调用次数也有可能达到w/s,那么这种情况下怎么才能高效的读取呢? 考虑到线程安全的问题我们还需要加锁吗? 一旦加锁不就会影响整体的一个吞吐量了吗。 Kafka这里用了类似JDK 的CopyOnWriteArrayList这个的实现,他本质上是采用了读写分离,写入加锁,读取不加锁的思想。我们先回忆回忆CopyOnWriteArrayList的特性:
写入时复制,即写入时加速,copy原有的数据,然后生成新的数组,让底层指向这个新的数组。它的优劣:
因为我们的分区数是有限的,一般来讲我们使用32,64,128,256已经算是极大的分区数目了,那么也就是针对写我们最多也就是写入32,64,128,256次,这种场景即使我们加锁去copy写入也不会存在问题,但是我们却可以快速读取上万的tps流量。
看下Kafka CopyOnWriteMap 的实现
关键的几个方法:
public synchronized V put(K k, V v) {
//新的内存空间
//读写分离
//往新的内存空间里面插入
//读,读数据就老读空间里面去
Map<K, V> copy = new HashMap<K, V>(this.map);
//插入数据
V prev = copy.put(k, v);
//赋值给map
this.map = Collections.unmodifiableMap(copy);
return prev;
}
整个方法使用的是synchronized关键字去修饰的,说明这个方法是线程安全。即使加了锁,这段代码的性能依然很好,因为里面都是纯内存的操作, 而且操作的次数很有限。
这种设计方式,采用的是读写分离的设计思想。读操作和写操作 是相互不影响的。所以我们读数据的操作就是线程安全的。
最后把值赋给了map,map是用volatile关键字修饰的。说明这个map是具有可见性的,这样的话,如果get数据的时候,这儿的值发生了变化,也是能感知到的。
再看看读操作:
public V get(Object k) {
return map.get(k);
}
读没有加锁,读取数据的时候性能很高(高并发的场景下,肯定性能很高)并且是线程安全的。