经常听到小伙伴说,当前学习了多线程、并发、设计模式等一些知识后,仍然不知道换成其他业务场景后该怎么使用和集成,对于这个问题,其实我们还是要多学习下开源系统/框架是如何应用的,思考并理解他们的内在,他们那么设计是为了解决什么样的技术场景与问题。
Nacos是一个Spring Cloud Alibaba微服务注册与配置中心,本身是通过SpringBoot作为启动环境的开发的,其内部大量使用了多线程、并发、线程池、任务调度、一致性协议等等,所以我们完全可以把他当成一个业务系统去理解和学习,包括一些基础功能的CRUD,和核心功能的流程和代码。
CAS的全称为Compare-And-Swap,它是一条CPU并发原语。 它的功能是判断内存某个位置的值是否为预期值,如果是则更改为新的值,这个过程是原子的。他会比较当前工作内存中的值和主内存中的值,如果相同则执行规定操作,否则继续比较直到主内存和工作内存中的值一致为止。
在 Java 中,Java 并没有直接实现 CAS,CAS 相关的实现是通过 C++ 内联汇编的形式实现的。Java 代码需通过 JNI 才能调用。
CAS 是一条 CPU 的原子指令(cmpxchg指令),不会造成所谓的数据不一致问题,Unsafe 提供的 CAS 方法(如compareAndSwapXXX)底层实现即为 CPU 指令 cmpxchg。
先来看一下Java中关于CAS经常使用的一些类,位于 java.util.concurrent.atomic包下面,常用的如下:
- AtomicBoolean
- AtomicInteger
- AtomicLong
- AtomicStampedReference
- 。。。。。
在看看Nacos中的CAS并发这些类都怎么使用的,用在哪些场景里面了。
我们知道,当我们使用线程池的时候,需要自定义工厂,来标识线程名称和作用,在自定义线程工厂的时候都会指定线程的名字,以便于排查问题Nacos中的NameThreadFactory使用AtomicInteger来维护名字中的自增顺序维护,代码如下:
- public class NameThreadFactory implements ThreadFactory {
-
- private final AtomicInteger id = new AtomicInteger(0);
-
- private String name;
-
- public NameThreadFactory(String name) {
- if (!name.endsWith(StringUtils.DOT)) {
- name += StringUtils.DOT;
- }
- this.name = name;
- }
-
- @Override
- public Thread newThread(Runnable r) {
- String threadName = name + id.getAndIncrement();
- Thread thread = new Thread(r, threadName);
- thread.setDaemon(true);
- return thread;
- }
- }
以后当我们使用线程池的时候,可以仿照这个代码片段,自定义线程的名称和序号。
客户端模块中的ServerListManager类AtomicInteger类存储当前服务节点,核心代码如下所示:
- package com.alibaba.nacos.client.naming.core;
- public class ServerListManager implements ServerListFactory, Closeable {
-
- private final AtomicInteger currentIndex = new AtomicInteger();
-
- private final List
serverList = new ArrayList<>(); -
- public ServerListManager(Properties properties, String namespace) {
- this.namespace = namespace;
- initServerAddr(properties);
- if (!serverList.isEmpty()) {
- currentIndex.set(new Random().nextInt(serverList.size()));
- } else {
- throw new NacosLoadException("serverList is empty,please check configuration");
- }
- }
-
- @Override
- public String genNextServer() {
- int index = currentIndex.incrementAndGet() % getServerList().size();
- return getServerList().get(index);
- }
-
- @Override
- public String getCurrentServer() {
- return getServerList().get(currentIndex.get() % getServerList().size());
- }
-
- }
可以看到基于AtomicInteger类可以实现一个简单的轮训服务器的算法。
TaskExecuteWorker类的设计和使用方式很值得我们学习,尤其在任务类的开发模式中,我们的日常开发中比较常用,优雅的进行了线程的关闭处理。核心代码如下:
- public final class TaskExecuteWorker implements NacosTaskProcessor, Closeable {
-
- /**
- * Max task queue size 32768.
- */
- private static final int QUEUE_CAPACITY = 1 << 15;
-
- private final Logger log;
-
- private final String name;
-
- private final BlockingQueue
queue; -
- private final AtomicBoolean closed;
-
- public TaskExecuteWorker(final String name, final int mod, final int total) {
- this(name, mod, total, null);
- }
-
- public TaskExecuteWorker(final String name, final int mod, final int total, final Logger logger) {
- this.name = name + "_" + mod + "%" + total;
- this.queue = new ArrayBlockingQueue
(QUEUE_CAPACITY); - this.closed = new AtomicBoolean(false);
- this.log = null == logger ? LoggerFactory.getLogger(TaskExecuteWorker.class) : logger;
- new InnerWorker(name).start();
- }
-
- public String getName() {
- return name;
- }
-
- @Override
- public boolean process(NacosTask task) {
- if (task instanceof AbstractExecuteTask) {
- putTask((Runnable) task);
- }
- return true;
- }
-
- private void putTask(Runnable task) {
- try {
- queue.put(task);
- } catch (InterruptedException ire) {
- log.error(ire.toString(), ire);
- }
- }
-
- public int pendingTaskCount() {
- return queue.size();
- }
-
- /**
- * Worker status.
- */
- public String status() {
- return name + ", pending tasks: " + pendingTaskCount();
- }
-
- @Override
- public void shutdown() throws NacosException {
- queue.clear();
- closed.compareAndSet(false, true);
- }
-
- /**
- * Inner execute worker.
- */
- private class InnerWorker extends Thread {
-
- InnerWorker(String name) {
- setDaemon(false);
- setName(name);
- }
-
- @Override
- public void run() {
- while (!closed.get()) {
- try {
- Runnable task = queue.take();
- long begin = System.currentTimeMillis();
- task.run();
- long duration = System.currentTimeMillis() - begin;
- if (duration > 1000L) {
- log.warn("task {} takes {}ms", task, duration);
- }
- } catch (Throwable e) {
- log.error("[TASK-FAILED] " + e.toString(), e);
- }
- }
- }
- }
- }
可以从源代码中看到使用了AtomicBoolean closed类完成一个比较优雅停止线程的方式。这个类的设计,完全可以作为我们日常开发中的一个提高系统并发处理能力的一个模板。
有的时候,我们会有一些全局的类,去控制一些资源或线程的状态,那么就可以使用Java中那些基于CAS实现的原子类。
相关还有一些其他模块,这里就不一一列举了。感兴趣的可以去下载Nacos的源代码并进行学习和思考。
本篇文章,来源于我的最近公开的Github项目: nacos-guide(基于SpringBoot开发的Nacos风头正盛,专门为Java开发者分享Nacos源码分析、Nacos好的代码设计),喜欢的欢迎star。
仓库地址为:https://github.com/wuchubuzai2018/nacos-guide
项目的发布预览地址为:https://wuchubuzai2018.github.io/nacos-guide/
本篇文章首发于我的知识星球,喜欢的可以加入,免费。

你的喜欢是我坚持的动力,虽然过程会很消耗时间,希望可以坚持输出。