• 并发编程进阶学习篇


    相关说明

    该文章相关的学习笔记及练习代码将会放在个人 github, 博主也会尽量进行github的日更新,博客将会不定期进行更新,在更新的过程可能存在诸多不足的地方,愿大家指出,可博客私信/评论,也可QQ :674619459 联系我,一起讨论,还可以在GitHub上面提issue 。欢迎大家关注我的github,一起学习Java,一起进步!!。

    一、学习目标

    • 学习并发编程思维(OS的进程、线程管理)及海量业务处理
    • 熟悉Java并发编程API的使用(可结合JDK源码进行分析)
    • 结合nacos等开源框架分析并发编程的设计,提升设计能力

    二、相关书籍

    1. 操作系统内功
    • 见计算机基础部分
    1. Java并发书籍
    • java 并发编程从入门到精通:从基础概述到线程安全、API及线程池框架等使用
    • java 并发编程的艺术:比较贴近实操,API及JVM的内存模型等
    • java 并发编程实战:和并发编程的艺术类似
    • Java并发实现原理:JDK源码剖析 ,深入地进行剖析讲解。
    1. 相关官网推荐

    一、多线程之间的交互

    阻塞队列

    1. 基本概念

    对于队列的概念肯定是比较熟悉,即FIFO的一种受限线性表,在Java中有Queue的顶级接口。对应队列的主要操作有 插入、删除,在不考虑多线程的环境下;若队列为满继续添加或为空继续删除,一般的设计面向该失败的策略有抛异常、扩容、返回特殊值,必须严格按照顺序不停留的执行。而在多线程或者单线程考虑阻塞的情况下,则可进行放缓,可以进行等待满足要求再进行操作,这个等待的过程即为阻塞的来源,对应典型的消费者和生产者模型。

    在这里插入图片描述

    1. 基本分类

    在Java中提供的阻塞队列有以下:
    在这里插入图片描述

    1. 如何设计
    • 阻塞队列 ,再普通队列的基础上 添加take(),poll()两种关键性操作,实现的机制主要通过ReetrantLock锁机制进行在相应的接口判断是否为空/满,在该情况下则进行阻塞。

    • 阻塞队列的设计代码

      public E take() throws InterruptedException {
          final E x;
          final int c;
          final AtomicInteger count = this.count;
          final ReentrantLock takeLock = this.takeLock;
          takeLock.lockInterruptibly();
          try {
              while (count.get() == 0) {
                  notEmpty.await();
              }
              x = dequeue();
              c = count.getAndDecrement();
              if (c > 1)
                  notEmpty.signal();
          } finally {
              takeLock.unlock();
          }
          if (c == capacity)
              signalNotFull();
          return x;
      }
      
    • 关于锁的设计

    • ArrayBlockingQueue 与LinkedBlockingQueue(采用独立锁,性能更优)的锁设计对比 代码

      
      ArrayBlockingQueue/** Main lock guarding all access */
          final ReentrantLock lock;
      
          /** Condition for waiting takes */
          @SuppressWarnings("serial")  // Classes implementing Condition may be serializable.
          private final Condition notEmpty;
      
          /** Condition for waiting puts */
          @SuppressWarnings("serial")  // Classes implementing Condition may be serializable.
          private final Condition notFull;
      
      LinkedBlockingQueue: 采用独立锁,性能更高,
      
      /** Lock held by take, poll, etc */
      private final ReentrantLock takeLock = new ReentrantLock();
      
      /** Wait queue for waiting takes */
      @SuppressWarnings("serial")// Classes implementing Condition may be serializable.
      private final Condition notEmpty = takeLock.newCondition();
      
      /** Lock held by put, offer, etc */
      private final ReentrantLock putLock = new ReentrantLock();
      
      /** Wait queue for waiting puts */
      @SuppressWarnings("serial")// Classes implementing Condition may be serializable.
      private final Condition notFull = putLock.newCondition();
      

    主要工具类

    • Semaphore
    • CountDownLatch
    • CyclicBarrier

    抽象队列化同步器AQS

    二、线程池

    基本概念

    1. 目标
    • 合理管理资源、线程,防止资源耗尽宕机
    • 更加有效地使用线程
    • 设计相关类图

    在这里插入图片描述

    基本原理

    分析源码:ThreadPoolExecutor

    在这里插入图片描述

    自定义线程池

    在知道关于线程池的七大核心参数、四大拒绝策略和线程池的工作流程之后,进行自定义的线程池设计将会是顺手拈来。接下来,我们来简单尝试一下。

    • 创建线程工厂
    public class DefaultThreadFactory implements ThreadFactory {
        
        private final String DOT = ".";
        
        private volatile AtomicInteger threadId = new AtomicInteger(0);
        
        private String name;
        
        public DefaultThreadFactory(String name) {
            if(!name.endsWith(DOT)){
                name = name + DOT;
            }
            this.name = name;
        }
        
        @Override
        public Thread newThread(Runnable r) {
            int id  = threadId.getAndIncrement();
            String threadName = name + id;
            Thread thread = new Thread(r,threadName);
            thread.setDaemon(true);// 守护线程,--可结合docker容器运用交互式、守式
            return thread;
        }
    }
    
    • 配置参数的常量
    public class ExecutorConfig {
        public static final StringDEFAULT_THREAD_FACTORY_NAME="demo.thread.factory.test";
        public static final IntegerTEST_EXECUTOR_CORE_SIZE= 2;
        public static final IntegerTEST_EXECUTOR_MAX_SIZE= 9;
        public static final IntegerTEST_EXECUTOR_QUEUE_SIZE= 5;
    }
    
    • 定义线程池
    public class TestExecutor {
    
    //定义线程池
    //阻塞队列若不设置,则无限大
    private final static Executorexecutor= new ThreadPoolExecutor(ExecutorConfig.TEST_EXECUTOR_CORE_SIZE,ExecutorConfig.TEST_EXECUTOR_MAX_SIZE,1000L,
                TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(ExecutorConfig.TEST_EXECUTOR_QUEUE_SIZE),new DefaultThreadFactory(ExecutorConfig.DEFAULT_THREAD_FACTORY_NAME),
                new ThreadPoolExecutor.DiscardOldestPolicy());
    //定义延时线程池
    private final static ExecutorscheduleExecutor= new ScheduledThreadPoolExecutor(ExecutorConfig.TEST_EXECUTOR_CORE_SIZE,
                new DefaultThreadFactory(ExecutorConfig.DEFAULT_THREAD_FACTORY_NAME));
    
        public static Executor getExecutor(){
               returnexecutor;
        }
    
        public static Executor getScheduleExecutor(){
               returnscheduleExecutor;
        }
    }
    
    • 定义任务
    • 测试
    public class ExecutorTests {
        @Test
        public void test(){
            final int taskNum = 100;
            Executor executor =  TestExecutor.getExecutor();
            for(int i=0;i

    简单应用分析

    • Tomcat的请求线程池处理
    • NGINX的并发请求处理
    • Nacos的线程池框架分析

    nacos中,由于是分布式部署,Server之间需要进行通信保持数据或服务的一致性。因此,在配置变更时需要进行通知其他的server,为了保证通信通知的高效与服务的高可用,此处可采用线程池进行异步化的处理。在nacos中的通知机制设计的关键类为AsyncNotifyService 。关于通知机制的大致图如下:

    在这里插入图片描述

    对于通知机制,此篇文章不去探讨,我们主要来看看通知机制的底层多线程的设计。

    在这里插入图片描述

    ConfigExecutor 入手,进行点击快捷键 ctrl+alt(进入实现),ctrl (点击方法,查看实现),进行查看分析。我们主要去分析一下线程池的执行任务类型(延时定时任务、立刻提交执行任务)、执行任务的工厂及命名、资源分配大小等管理。简化版的大致逻辑如下:

    在这里插入图片描述

    在ConfigExecutor进行统一定义了所有Config配置所需要的线程池,因为是常量,地址不变,线程池是共享。设计为常用类、线程池变量为静态共享常量:

    final class ConfigExecutor

    private static final ExecutorDUMP_EXECUTOR= ExecutorFactory.Managed
            .newSingleExecutorService(ClassUtils.getCanonicalName(Config.class),
                    new NameThreadFactory("com.alibaba.nacos.config.embedded.dump"));
    

    并暴露了调用任务、延时任务等两种主要类型的方法供外部类调用(对于上层通知机制或其他应用层即可直接调用):

    public static void executeAsyncNotify(Runnable runnable) {
    ASYNC_NOTIFY_EXECUTOR.execute(runnable);
    }
    
    public static void scheduleAsyncNotify(Runnable command, long delay, TimeUnit unit) {
    ASYNC_NOTIFY_EXECUTOR.schedule(command, delay, unit);
    }
    

    在进行分配线程池时,通过 ExecutorFactory.Managed 进行统一分配管理,而线程工厂则通过new NameThreadFactory()进行创建;

    1. NameThreadFactory中进行线程的命名管理及线程设置(例如守护线程的设置)
    2. ExecutorFactory.Managed 进行资源分配、组命名等管理。此处重点关注private static final ThreadPoolManager *THREAD_POOL_MANAGER* = ThreadPoolManager.*getInstance*(); 中的ThreadPoolManager

    该处采用单例设计模式,进行保证线程池管理类的单例,资源不被随意消耗。所有的线程池创建来源需要向此处进行注册。

    1. ThreadPoolManager 中主要关注resourcesManagerlockers 的设计,此处有锁机制,因为在创建线程池的同时存在并发写(向其中注册线程池资源)的问题,因此需要进行并发锁机制控制,设置如以下代码(lock设置为的map结构,对线程池进行命名空间分组管理,对应的锁也是锁对应的命名空间组,有点类似ConcurrentHashMap中的Segement分段锁的感觉)。

    private Map>> resourcesManager;

    private Map lockers = new ConcurrentHashMap<>(8);

    public void register(String namespace, String group, ExecutorService executor) {
        if (!resourcesManager.containsKey(namespace)) {
            lockers.putIfAbsent(namespace, new Object());
        }
        final Object monitor = lockers.get(namespace);
        synchronized (monitor) {
            Map> map = resourcesManager.get(namespace);
            if (map == null) {
                map = new HashMap<>(8);
                map.computeIfAbsent(group, key -> new HashSet<>()).add(executor);
                resourcesManager.put(namespace, map);
                return;
            }
            map.computeIfAbsent(group, key -> new HashSet<>()).add(executor);
        }
    }
    
  • 相关阅读:
    Linux--进程概念
    数据结构初阶--二叉树介绍(基本性质+堆实现顺序结构)
    【深入理解kafka系列】第一章 初识kafka
    《从0开始写一个微内核操作系统》0-环境准备
    左神:中级提升班5
    WebSocket is already in CLOSING or CLOSED state
    LCR 122.路径加密
    C++之继承、派生
    PHP会话技术session我不允许还有人不会!
    【2】c++11新特性(稳定性和兼容性)—>超长整型 long long
  • 原文地址:https://blog.csdn.net/qq_44654974/article/details/127106752