• 【JavaEE多线程】深入解析Java并发工具类与应用实践



    JUC(java.util.concurrent)的常见类

    concurrent:并发(多线程)

    Callable 接口

    Callable 是一个 interface。也是一种创建多线程的方式,相当于把线程封装了一个 “返回值”。方便程序员借助多线程的方式计算结果。

    Runnable能表示一个任务(run方法),返回void

    Callable也能表示一个任务(call方法),返回一个具体的值,类型可以通过泛型参数来指定(Object)

    如果进行多线程操作,如果你关心多线程执行的过程,使用Runnable,比如线程池,定时器,就是用的Runnable只关心过程

    如果进行多线程操作,如果你关心多线程的计算结果,使用Callable,比如通过多线程的方式计算一个公式,计算1+2+…+1000

    代码示例: 创建线程计算 1 + 2 + 3 + … + 1000

    不使用 Callable 版本

    • 创建一个类 Result , 包含一个 sum 表示最终结果, lock 表示线程同步使用的锁对象.
    • main 方法中先创建 Result 实例, 然后创建一个线程 t. 在线程内部计算 1 + 2 + 3 + … + 1000.
    • 主线程同时使用 wait 等待线程 t 计算结束. (注意, 如果执行到 wait 之前, 线程 t 已经计算完了, 就不必等待了).
    • 当线程 t 计算完毕后, 通过 notify 唤醒主线程, 主线程再打印结果.
    static class Result {
        public int sum = 0;
        public Object lock = new Object();
    }
    public static void main(String[] args) throws InterruptedException {
        Result result = new Result();
        Thread t = new Thread() {
            @Override
            public void run() {
                int sum = 0;
                for (int i = 1; i <= 1000; i++) {
                    sum += i;
               }
                synchronized (result.lock) {
                    result.sum = sum;
                    result.lock.notify();
               }
           }
       };
        t.start();
        synchronized (result.lock) {
            while (result.sum == 0) {
                result.lock.wait();
           }
            System.out.println(result.sum);
       }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    可以看到, 上述代码需要一个辅助类 Result, 还需要使用一系列的加锁和 wait notify 操作, 代码复杂, 容易出错.

    使用 Callable 版本

    • 创建一个匿名内部类, 实现 Callable 接口. Callable 带有泛型参数. 泛型参数表示返回值的类型.
    • 重写 Callable 的 call 方法, 完成累加的过程. 直接通过返回值返回计算结果.
    • 把 callable 实例使用 FutureTask 包装一下.
    • 创建线程, 线程的构造方法传入 FutureTask . 此时新线程就会执行 FutureTask 内部的 Callable 的call 方法, 完成计算. 计算结果就放到了 FutureTask 对象中.
    • 在主线程中调用 futureTask.get() 能够阻塞等待新线程计算完毕. 并获取到 FutureTask 中的结果.
    Callable<Integer> callable = new Callable<Integer>() {
        @Override
        public Integer call() throws Exception {
            int sum = 0;
            for (int i = 1; i <= 1000; i++) {
                sum += i;
           }
            return sum;
       }
    };
    FutureTask<Integer> futureTask = new FutureTask<>(callable);
    Thread t = new Thread(futureTask);//使用Callable不能作为Thread的构造方法参数,是借助FutureTask
    t.start();
    int result = futureTask.get();//通过FutureTask获取Callable的call方法的结果,get类似join一样,如果call方法没算完会阻塞等待
    System.out.println(result);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    可以看到, 使用 Callable 和 FutureTask 之后, 代码简化了很多, 也不必手动写线程同步代码了.

    理解Callable

    Callable 和 Runnable 相对, 都是描述一个 “任务”. Callable 描述的是带有返回值的任务, Runnable 描述的是不带返回值的任务.

    Callable 通常需要搭配 FutureTask 来使用. FutureTask 用来保存 Callable 的返回结果. 因为Callable 往往是在另一个线程中执行的, 啥时候执行完并不确定.

    FutureTask 就可以负责这个等待结果出来的工作.

    理解FutureTask

    想象去吃麻辣烫. 当餐点好后, 后厨就开始做了. 同时前台会给你一张 “小票” . 这个小票就是FutureTask. 后面我们可以随时凭这张小票去查看自己的这份麻辣烫做出来了没

    目前为止学到的创建线程的方式:

    1. 直接继承Thread
    2. 实现Runnable
    3. 使用lambda
    4. 使用线程池
    5. 使用Callable

    其中1、2、5搭配匿名内部类使用

    ReentrantLock

    可重入互斥锁,和 synchronized 定位类似,都是用来实现互斥效果,保证线程安全

    ReentrantLock的用法:

    • lock():加锁,如果获取不到锁就死等
    • trylock(超时时间):加锁,如果获取不到锁,等待一定的时间之后就放弃加锁
    • unlock():解锁

    ReentrantLock具有一些特点,是synchronized 不具备的功能:

    1. 提供了一个tryLock方法进行加锁:

    ​ 对于lock操作,如果加锁不成功,就会阻塞等待(死等)

    ​ 对于tryLock,如果加锁失败,直接返回false/也可以设定等待时间

    1. ReentrantLock有两种模式,可以在工作在公平锁状态下,也可以工作在非公平锁状态下。构造方法中通过参数设定的公平/非公平模式
    2. ReentrantLock也有等待通知机制,搭配Condition这样的类来完成,这里的等待通知要比wait、notify功能更强
    3. 但是ReentrantLock也可能容易遗漏unlock(),通常使用finally来执行

    ReentrantLock 和 synchronized 的区别:

    • synchronized 是一个关键字,是 JVM 内部实现的(大概率是基于 C++ 实现)。ReentrantLock 是标准库的一个类,在 JVM 外实现的(基于 Java 实现)。
    • synchronized 使用时不需要手动释放锁。ReentrantLock 使用时需要手动释放。使用起来更灵活,但是也容易遗漏 unlock.
    • synchronized 在申请锁失败时,会死等. ReentrantLock 可以通过 trylock 的方式等待一段时间就放弃.
    • synchronized 是非公平锁。ReentrantLock 默认是非公平锁,可以通过构造方法传入一个 true 开启公平锁模式.。

    如何选择使用哪个锁?

    • 锁竞争不激烈的时候,使用 synchronized,效率更高,自动释放更方便(实际开发也是用synchronized多)
    • 锁竞争激烈的时候,使用 ReentrantLock,搭配 trylock 更灵活控制加锁的行为,而不是死等
    • 如果需要使用公平锁,使用 ReentrantLock

    原子类

    原子类内部用的是 CAS 实现,所以性能要比加锁实现 i++ 高很多。原子类有以下几个

    • AtomicBoolean
    • AtomicInteger
    • AtomicIntegerArray
    • AtomicLong
    • AtomicReference
    • AtomicStampedReference

    以 AtomicInteger 举例,常见方法有

    addAndGet(int delta);   i += delta;
    decrementAndGet(); --i;
    getAndDecrement(); i--;
    incrementAndGet(); ++i;
    getAndIncrement(); i++;
    
    • 1
    • 2
    • 3
    • 4
    • 5

    应用场景:

    1. 计数需求

    ​ 播放量、点赞量…

    1. 统计效果

    ​ 统计出现错误的请求数目、统计收到的请求数目(衡量服务器的压力)、统计每个请求的响应时间=>平均响应时间(衡量服务器的运行效率)…

    ​ 通过上述统计内容,实现监控服务器,获取/统计/展示/报警

    线程池

    虽然创建销毁线程比创建销毁进程更轻量,但是在频繁创建销毁线程的时候还是会比较低效.

    线程池就是为了解决这个问题。如果某个线程不再使用了,并不是真正把线程释放,而是放到一个 “池子” 中,下次如果需要用到线程就直接从池子中取,不必通过系统来创建了。

    ExecutorService 和 Executors

    代码示例

    • ExecutorService 表示一个线程池实例.
    • Executors 是一个工厂类,能够创建出几种不同风格的线程池.
    • ExecutorService 的 submit 方法能够向线程池中提交若干个任务.
    ExecutorService pool = Executors.newFixedThreadPool(10);
    pool.submit(new Runnable() {
        @Override
        public void run() {
            System.out.println("hello");
       }
    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    Executors 创建线程池的几种方式

    • newFixedThreadPool:创建固定线程数的线程池
    • newCachedThreadPool:创建线程数目动态增长的线程池.
    • newSingleThreadExecutor:创建只包含单个线程的线程池.
    • newScheduledThreadPool:设定 延迟时间后执行命令,或者定期执行命令。是进阶版的 Timer.

    Executors 本质上是 ThreadPoolExecutor 类的封装.

    ThreadPoolExecutor

    ThreadPoolExecutor 提供了更多的可选参数,可以进一步细化线程池行为的设定.

    ThreadPoolExecutor 的构造方法

    信号量 Semaphore

    信号量, 用来表示 “可用资源的个数”. 本质上就是一个计数器,描述的是当前这个线程,是否“有临界资源可以用”

    临界资源:多个线程/进程等并发执行的实体可以公共使用到的资源(多个线程修改同一个资源,这个变量就可以认为是临界资源)

    理解信号量

    可以把信号量想象成是停车场的展示牌: 当前有车位 100 个. 表示有 100 个可用资源.

    当有车开进去的时候, 就相当于申请一个可用资源, 可用车位就 -1 (这个称为信号量的 P 操作)

    当有车开出来的时候, 就相当于释放一个可用资源, 可用车位就 +1 (这个称为信号量的 V 操作)

    如果计数器的值已经为 0 了, 还尝试申请资源, 就会阻塞等待, 直到有其他线程释放资源.

    这个阻塞等待的过程有点像锁是吧。

    锁,本质上就是一个特殊的信号量(里面的数值,非0即1,二元信号量)

    信号量要比锁更广义,不仅可以描述一个资源,还可以描述N个资源。

    但还是锁用的更多一些

    Semaphore 的 PV 操作中的加减计数器操作都是原子的, 可以在多线程环境下直接使用.

    CountDownLatch

    针对特定场景一个组件

    同时等待 N 个任务执行结束

    好像跑步比赛,10个选手依次就位,哨声响才同时出发;所有选手都通过终点,才能公布成绩。

    • 构造 CountDownLatch 实例,初始化 10 表示有 10 个任务需要完成.
    • 每个任务执行完毕, 都调用 latch.countDown(),在 CountDownLatch 内部的计数器同时自减.
    • 主线程中使用 latch.await(),阻塞等待所有任务执行完毕,相当于计数器为 0 了。(这里await的a是all的意思)

    集合类

    原来的集合类,大部分都不是线程安全的

    Vector,Stack,HashTable,是线程安全的(不建议用),其他的集合类不是线程安全的

    Vector、HashTable是上古时期搞出来的集合类

    加了锁不一定就线程安全了,不加锁也不一定就线程不安全了

    多线程环境使用 ArrayList

    1. 自己使用同步机制 (synchronized 或者 ReentrantLock)
    2. Collections.synchronizedList(new ArrayList)

    synchronizedList 是标准库提供的一个基于 synchronized 进行线程同步的 List.

    synchronizedList 的关键操作上都带有 synchronized

    相当于让ArrayList像Vector一样使用

    1. 使用 CopyOnWriteArrayList

    CopyOnWrite容器即写时复制的容器。

    • 当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy,复制出一个新的容器,然后新的容器里添加元素,
    • 添加完元素之后,再将原容器的引用指向新的容器。

    这样做的好处是我们可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会

    添加任何元素。

    所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器。

    优点

    在读多写少的场景下, 性能很高, 不需要加锁竞争.

    缺点

    1. 占用内存较多.
    2. 新写的数据不能被第一时间读取到.

    多线程环境使用队列

    1. ArrayBlockingQueue

    基于数组实现的阻塞队列

    1. LinkedBlockingQueue

    基于链表实现的阻塞队列

    1. PriorityBlockingQueue

    基于堆实现的带优先级的阻塞队列

    1. TransferQueue

    最多只包含一个元素的阻塞队列

    多线程环境使用哈希表

    HashMap 本身不是线程安全的

    在多线程环境下使用哈希表可以使用:

    • Hashtable
    • ConcurrentHashMap
    1. Hashtable

    只是简单的把关键方法加上了 synchronized 关键字

    public synchronized V put(K key, V value) {
    
    • 1
    public synchronized V get(Object key) {
    
    • 1

    这相当于直接针对 Hashtable 对象本身加锁

    • 如果多线程访问同一个 Hashtable 就会直接造成锁冲突
    • size 属性也是通过 synchronized 来控制同步,也是比较慢的
    • 一旦触发扩容, 就由该线程完成整个扩容过程。这个过程会涉及到大量的元素拷贝,效率会非常低
    1. ConcurrentHashMap

    相比于 Hashtable 做出了一系列的改进和优化. 以 Java1.8 为例

    • 读操作没有加锁(但是使用了 volatile 保证从内存读取结果), 只对写操作进行加锁. 加锁的方式仍然是用 synchronized, 但是不是锁整个对象, 而是 “锁桶” (用每个链表的头结点作为锁对象), 大大降低了锁冲突的概率.
    • 充分利用 CAS 特性. 比如 size 属性通过 CAS 来更新. 避免出现重量级锁的情况.
    • 优化了扩容方式: 化整为零
      • 发现需要扩容的线程, 只需要创建一个新的数组, 同时只搬几个元素过去.
      • 扩容期间, 新老数组同时存在.
      • 后续每个来操作 ConcurrentHashMap 的线程, 都会参与搬家的过程. 每个操作负责搬运一小
      • 部分元素.
      • 搬完最后一个元素再把老数组删掉.
      • 这个期间, 插入只往新数组加.
      • 这个期间, 查找需要同时查新数组和老数组
  • 相关阅读:
    SkyWalking全景解析:从原理到实现的分布式追踪之旅
    「Verilog学习笔记」优先编码器Ⅰ
    为DuiLib的Edit控件增加PlaceHolderText
    JavaEE初阶:网络原理之TCP/IP
    ECC有关DER文件的解析(Java)
    ExtJS - UI组件 - Buttion
    Ubuntu Seata开机自启动服务
    xcode15出现大量Duplicate symbols
    【微信小程序开发(三)】实现卡片的层叠滑动
    Jmeter插件PerfMon Metrics Collector安装使用及报错解决
  • 原文地址:https://blog.csdn.net/Hsusan/article/details/137942504