该文章相关的学习笔记及练习代码将会放在个人 github, 博主也会尽量进行github的日更新,博客将会不定期进行更新,在更新的过程可能存在诸多不足的地方,愿大家指出,可博客私信/评论,也可QQ :674619459 联系我,一起讨论,还可以在GitHub上面提issue 。欢迎大家关注我的github,一起学习Java,一起进步!!。
对于队列的概念肯定是比较熟悉,即FIFO的一种受限线性表,在Java中有Queue的顶级接口。对应队列的主要操作有 插入、删除,在不考虑多线程的环境下;若队列为满继续添加或为空继续删除,一般的设计面向该失败的策略有抛异常、扩容、返回特殊值,必须严格按照顺序不停留的执行。而在多线程或者单线程考虑阻塞的情况下,则可进行放缓,可以进行等待满足要求再进行操作,这个等待的过程即为阻塞的来源,对应典型的消费者和生产者模型。
在Java中提供的阻塞队列有以下:
阻塞队列 ,再普通队列的基础上 添加take(),poll()两种关键性操作,实现的机制主要通过ReetrantLock锁机制进行在相应的接口判断是否为空/满,在该情况下则进行阻塞。
阻塞队列的设计代码
public E take() throws InterruptedException {
final E x;
final int c;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
关于锁的设计
ArrayBlockingQueue 与LinkedBlockingQueue(采用独立锁,性能更优)的锁设计对比 代码
ArrayBlockingQueue:
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
@SuppressWarnings("serial") // Classes implementing Condition may be serializable.
private final Condition notEmpty;
/** Condition for waiting puts */
@SuppressWarnings("serial") // Classes implementing Condition may be serializable.
private final Condition notFull;
LinkedBlockingQueue: 采用独立锁,性能更高,
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
@SuppressWarnings("serial")// Classes implementing Condition may be serializable.
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
@SuppressWarnings("serial")// Classes implementing Condition may be serializable.
private final Condition notFull = putLock.newCondition();
分析源码:ThreadPoolExecutor
在知道关于线程池的七大核心参数、四大拒绝策略和线程池的工作流程之后,进行自定义的线程池设计将会是顺手拈来。接下来,我们来简单尝试一下。
public class DefaultThreadFactory implements ThreadFactory {
private final String DOT = ".";
private volatile AtomicInteger threadId = new AtomicInteger(0);
private String name;
public DefaultThreadFactory(String name) {
if(!name.endsWith(DOT)){
name = name + DOT;
}
this.name = name;
}
@Override
public Thread newThread(Runnable r) {
int id = threadId.getAndIncrement();
String threadName = name + id;
Thread thread = new Thread(r,threadName);
thread.setDaemon(true);// 守护线程,--可结合docker容器运用交互式、守式
return thread;
}
}
public class ExecutorConfig {
public static final StringDEFAULT_THREAD_FACTORY_NAME="demo.thread.factory.test";
public static final IntegerTEST_EXECUTOR_CORE_SIZE= 2;
public static final IntegerTEST_EXECUTOR_MAX_SIZE= 9;
public static final IntegerTEST_EXECUTOR_QUEUE_SIZE= 5;
}
public class TestExecutor {
//定义线程池
//阻塞队列若不设置,则无限大
private final static Executorexecutor= new ThreadPoolExecutor(ExecutorConfig.TEST_EXECUTOR_CORE_SIZE,ExecutorConfig.TEST_EXECUTOR_MAX_SIZE,1000L,
TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(ExecutorConfig.TEST_EXECUTOR_QUEUE_SIZE),new DefaultThreadFactory(ExecutorConfig.DEFAULT_THREAD_FACTORY_NAME),
new ThreadPoolExecutor.DiscardOldestPolicy());
//定义延时线程池
private final static ExecutorscheduleExecutor= new ScheduledThreadPoolExecutor(ExecutorConfig.TEST_EXECUTOR_CORE_SIZE,
new DefaultThreadFactory(ExecutorConfig.DEFAULT_THREAD_FACTORY_NAME));
public static Executor getExecutor(){
returnexecutor;
}
public static Executor getScheduleExecutor(){
returnscheduleExecutor;
}
}
public class ExecutorTests {
@Test
public void test(){
final int taskNum = 100;
Executor executor = TestExecutor.getExecutor();
for(int i=0;i
在nacos中,由于是分布式部署,Server之间需要进行通信保持数据或服务的一致性。因此,在配置变更时需要进行通知其他的server,为了保证通信通知的高效与服务的高可用,此处可采用线程池进行异步化的处理。在nacos中的通知机制设计的关键类为AsyncNotifyService
。关于通知机制的大致图如下:
对于通知机制,此篇文章不去探讨,我们主要来看看通知机制的底层多线程的设计。
从ConfigExecutor
入手,进行点击快捷键 ctrl+alt(进入实现),ctrl (点击方法,查看实现),进行查看分析。我们主要去分析一下线程池的执行任务类型(延时定时任务、立刻提交执行任务)、执行任务的工厂及命名、资源分配大小等管理。简化版的大致逻辑如下:
在ConfigExecutor进行统一定义了所有Config配置所需要的线程池,因为是常量,地址不变,线程池是共享。设计为常用类、线程池变量为静态共享常量:
final class ConfigExecutor
private static final ExecutorDUMP_EXECUTOR= ExecutorFactory.Managed
.newSingleExecutorService(ClassUtils.getCanonicalName(Config.class),
new NameThreadFactory("com.alibaba.nacos.config.embedded.dump"));
并暴露了调用任务、延时任务等两种主要类型的方法供外部类调用(对于上层通知机制或其他应用层即可直接调用):
public static void executeAsyncNotify(Runnable runnable) {
ASYNC_NOTIFY_EXECUTOR.execute(runnable);
}
public static void scheduleAsyncNotify(Runnable command, long delay, TimeUnit unit) {
ASYNC_NOTIFY_EXECUTOR.schedule(command, delay, unit);
}
在进行分配线程池时,通过 ExecutorFactory.Managed
进行统一分配管理,而线程工厂则通过new NameThreadFactory()
进行创建;
NameThreadFactory
中进行线程的命名管理及线程设置(例如守护线程的设置)ExecutorFactory.Managed
进行资源分配、组命名等管理。此处重点关注private static final ThreadPoolManager *THREAD_POOL_MANAGER* = ThreadPoolManager.*getInstance*();
中的ThreadPoolManager
该处采用单例设计模式,进行保证线程池管理类的单例,资源不被随意消耗。所有的线程池创建来源需要向此处进行注册。
ThreadPoolManager
中主要关注resourcesManager
和lockers
的设计,此处有锁机制,因为在创建线程池的同时存在并发写(向其中注册线程池资源)的问题,因此需要进行并发锁机制控制,设置如以下代码(lock设置为的map结构,对线程池进行命名空间分组管理,对应的锁也是锁对应的命名空间组,有点类似ConcurrentHashMap中的Segement分段锁的感觉)。private Map
private Map
public void register(String namespace, String group, ExecutorService executor) {
if (!resourcesManager.containsKey(namespace)) {
lockers.putIfAbsent(namespace, new Object());
}
final Object monitor = lockers.get(namespace);
synchronized (monitor) {
Map> map = resourcesManager.get(namespace);
if (map == null) {
map = new HashMap<>(8);
map.computeIfAbsent(group, key -> new HashSet<>()).add(executor);
resourcesManager.put(namespace, map);
return;
}
map.computeIfAbsent(group, key -> new HashSet<>()).add(executor);
}
}