• Java线程的并发工具类


    Fork-Join

    核心思想

    Fork/Join框架是Java 7提供的一个用于并行执行任务的框架, 核心思想就是把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果

    Fork就是把一个大任务切分为若干子任务并行的执行,Join就是合并这些子任务的执行结果,最后得到这个大任务的结果。比如计算1+2+…+10000,可以分割成10个子任务,每个子任务分别对1000个数进行求和,最终汇总这10个子任务的结果。

    运行流程图

    fork-join运行流程图

    工作密取

    当前线程的 Task 已经全被执行完毕,则自动取到其他线程的 Task 池中取出 Task 继续执行。
    ForkJoinPool 中维护着多个线程(一般为 CPU 核数)在不断地执行 Task,每个线程除了执行自己职务内的 Task之外,还会根据自己工作线程的闲置情况去获取其他繁忙的工作线程的Task,如此一来就能能够减少线程阻塞或是闲置的时间,提高CPU利用率
    fork-join工作密取

    Fork-Join标准范式

    我们要使用 ForkJoin 框架,必须首先

    1. 创建 ForkJoinPool
    2. 创建一个 ForkJoinTask 任务, 提供在任务中执行 fork 和 join 的操作机制。 通常我们不直接继承 ForkjoinTask 类,只需要直接继承其子类 RecursiveTask, RecursiveAction;
      RecursiveAction,用于没有返回结果的任务
      RecursiveTask,用于有返回值的任务
      ForkJoinTask继承关系
    3. ForkJoinTask要通过 ForkJoinPool 来执行,使用 submit 或 invoke 提交。两者的区别是: invoke 是同步执行,调用之后需要等待任务完成,才能执行后面的代码; submit 是异步执行。
    4. 通过 ForkJoinTask 的 join() 和 get() 方法当任务完成的时候返回计算结果。 ForkJoinTask工作流程
      注意:在我们自己实现的 compute 方法里,首先需要判断任务是否足够小,如果足够小就直接执行任务。如果不足够小,就必须分割成两个子任务,每个子任务在调用 invokeAll 方法时,又会进入 compute 方法,看看当前子任务是否需要继续分割成孙任务,如果不需要继续分割,则执行当前子任务并返回结果。使用 join 方法会等待子任务执行完并得到其结果。

    CountDownLatch

    概述

    CountDownLatch是在jdk1.5的时候引入的,位于java.util.concurrent并发包中,CountDownLatch叫做闭锁。
    CountDownLatch是一个同步工具类,它允许一个或多个线程一直等待,直到其它线程执行完后再执行。

    工作原理

    CountDownLatch是通过一个计数器来实现的,每当一个线程完成了自己的任务后,计数器的值相应的减1。当计数器的值减到0时,表示所有的线程都已完成任务,然后在CountDownLatch上等待的线程就可以恢复执行接下来的任务。
    CountDownLatch工作流程图

    常用方法

    1.public CountDownLatch(int count)
    CountDownLatch接收一个int型的参数,表示要调用CountDown()方法的次数;
    2.public void await()
    使当前线程进入同步队列等待,直到计数器的值减到0或者当前线程被中断,当前线程就会被唤醒;
    3.public boolean await(long timeout,TimeUnit unit)
    带超时时间的await();
    4.public void CountDown()
    使计数器的值减1,如果减到了0,则会唤醒所有等待在这个CountDownLatch上的线程;
    5.public long getCount()
    获取CountDownLatch的数值,也就是计数器的值;

    实现原理

    CountDownLatch的实现原理主要是通过内部类Sync来实现的,内部类Sync是AQS的子类,主要是通过重写AQS的共享式获取和释放同步状态方法来实现的。源码如下:

    CountDownLatch初始化时需要定义调用count的次数,然后每调用一次countDown方法都会计数减一,源码如下:

       private final Sync sync;
    
        public CountDownLatch(int count) {
            if (count < 0) throw new IllegalArgumentException("count < 0");
            this.sync = new Sync(count);
        }
        
        public void await() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
    
        public void countDown() {
            sync.releaseShared(1);
        }
    
     ### CountDownLatch.Sync
     private static final class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = 4982264981922014374L;
    
            Sync(int count) {
                setState(count);
            }
    
            int getCount() {
                return getState();
            }
    
            protected int tryAcquireShared(int acquires) {
                return (getState() == 0) ? 1 : -1;
            }
    
            protected boolean tryReleaseShared(int releases) {
                // Decrement count; signal when transition to zero
                for (;;) {
                    int c = getState();
                    if (c == 0)
                        return false;
                    int nextc = c-1;
                    if (compareAndSetState(c, nextc))
                        return nextc == 0;
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    总结:CountDownLatch实际完全依靠AQS的共享式获取和释放同步状态来实现,初始化时定义AQS的state值,每调用countDown实际就是释放一次AQS的共享式同步状态,await方法实际就是尝试获取AQS的同步状态,只有当同步状态值为0时才能获取成功。

    使用场景

    1.实现最大的并行性:
    有时我们想同时启动多个线程,实现最大程度的并行性.如果我们创建一个初始计数器为1的CountDownLatch,多个线程在开始执行任务前首先CountDownLatch.await()在这个锁上等待,只需要主线程调用一次CountDown()方法就可以让其它所有等待的线程同时恢复执行.类似于赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑;

    2.开始执行前等待N个线程完成各自的任务后,进行汇总合并:
    例如,我们需要使用多线程统计完所有的数据之后,做一个汇总,就可以使用CountDownLatch。

    缺点

    CountDownLatch是一次性的,计数器的值只能在构造方法中初始化一次,之后不能再次对其设置值,当CountDownLatch使用完毕后,它不能再次被使用。

    CyclicBarrier

    概述

    其作用是让一组线程到达公共屏障点时阻塞,直到最后一个线程也到达公共屏障点,屏障才会打开,而且该屏障(barrier)在这一组线程释放后可以重用,所以称为循环(Cyclic)的屏障(Barrier)。

    常用方法

    1. await(),使当前线程进入同步队列进行等待,并且值减1;
    2. await(long timeout, TimeUnit unit),带超时时间的await();
    3. getParties(),返回需要跳过栅栏的线程数量,它就是初始化构造函数的parties值;
    4. getNumberWaiting(),返回目前在栅栏边等待的线程数量,它是一个变化的值;
    5. reset(),重置初始状态的障碍。如果有线程在栅栏边等待会抛出异常BrokenBarrierException;

    与CountDownLatch对比

    CountDownLatchCyclicBarrier
    减计数方式减计数方式
    计数为0时释放所有等待的线程计数为0时,优先执行一个barrierAction,然后释放所有等待线程
    计数为0时,无法重置计数为0时,可以重置
    调用countDown()方法计数减1,调用await()方法只进行阻塞(主线程),对计数没任何影响调用await()方法计数减1,若减1后的值不等于0,则线程阻塞(子线程
    不可重复利用可重复利用

    实现原理

    先看下CyclicBarrier的构造方法:

        public CyclicBarrier(int parties, Runnable barrierAction) {
            if (parties <= 0) throw new IllegalArgumentException();
            this.parties = parties; //同步屏障总需线程数
            this.count = parties; // 当前剩余需要达到的线程数
            this.barrierCommand = barrierAction;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    构造方法中主要是给两个属性parties(总线程数)、count(当前剩余线程数)进行赋值,这里需要两个值的原因是CyclicBarrier提供了重置的功能,当调用reset方法重置时就需要将count值再赋值成parties的值。

    再看下await方法的实现逻辑:

       public int await() throws InterruptedException, BrokenBarrierException {
            try {
            //调用dowait
                return dowait(false, 0L);
            } catch (TimeoutException toe) {
                throw new Error(toe); // cannot happen
            }
        }
    
    private int dowait(boolean timed, long nanos)
            throws InterruptedException, BrokenBarrierException,
                   TimeoutException {
             //使用ReentrantLock
            final ReentrantLock lock = this.lock;
            //加锁操作
            lock.lock();
            try {
                final Generation g = generation;
    
                if (g.broken)
                    throw new BrokenBarrierException();
    			//响应线程中断
                if (Thread.interrupted()) {
                    breakBarrier();
                    throw new InterruptedException();
                }
    			// count自减操作
                int index = --count;
                //判断当前还需达到同步屏障的线程数是否为0
                if (index == 0) {  // tripped
                    boolean ranAction = false;
                    try {
                    	//barrierCommand是同步屏障打开后需要执行的Runnable对象
                        final Runnable command = barrierCommand;
                        if (command != null)
                        	//如果Runnable对象不为空直接执行Runnable线程任务
                            command.run();
                        ranAction = true;
                        //本次同步屏障全部达成,唤醒所有线程并开始下一次同步屏障
                        nextGeneration();
                        return 0;
                    } finally {
                        if (!ranAction)
                            breakBarrier();
                    }
                }
    
                // 一直轮询,直到中断、超时或异常
                for (;;) {
                    try {
                        if (!timed)
                        	//调用Condition对象的await方法使当前线程进入等待状态
                            trip.await();
                        else if (nanos > 0L)
                            nanos = trip.awaitNanos(nanos);
                    } catch (InterruptedException ie) {
                        if (g == generation && ! g.broken) {
                            breakBarrier();
                            throw ie;
                        } else {
                            // We're about to finish waiting even if we had not
                            // been interrupted, so this interrupt is deemed to
                            // "belong" to subsequent execution.
                            Thread.currentThread().interrupt();
                        }
                    }
    
                    if (g.broken)
                        throw new BrokenBarrierException();
    
                    if (g != generation)
                        return index;
    
                    if (timed && nanos <= 0L) {
                        breakBarrier();
                        throw new TimeoutException();
                    }
                }
            } finally {
                lock.unlock();
            }
        }
    
        private void nextGeneration() {
            // 唤醒所有线程
            trip.signalAll();
            // 重新赋值
            count = parties;
            generation = new Generation();
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91

    可以看出CyclicBarrier的实现原理主要是通过ReentrantLock和Condition来实现的,主要实现流程如下:

    1. 创建CyclicBarrier时定义了CyclicBarrier对象需要达到的线程数count;
    2. 每当一个线程执行了await方法时,需要先通过ReentrantLock进行加锁操作,然后对count进行自减操作,操作成功则判断当前count是否为0;
    3. 如果当前count不为0则调用Condition的await方法使当前线程进入等待状态;
    4. 如果当前count为0则表示同步屏障已经完全,此时调用Condition的signalAll方法唤醒之前所有等待的线程,并开启循环的下一次同步屏障功能;
    5. 唤醒其他线程之后,其他线程继续执行剩余的逻辑。
      CyclicBarrier工作流程图

    Semaphore

    概述

    Semaphore 通常我们叫它信号量, 可以用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用资源。

    使用场景

    通常用于那些资源有明确访问数量限制的场景,常用于限流 。

    常用方法

    1. acquire() ,获取一个令牌,在获取到令牌、或者被其他线程调用中断之前线程一直处于阻塞状态。
    2. acquire(int permits) ,获取一个令牌,在获取到令牌、或者被其他线程调用中断、或超时之前线程一直处于阻塞状态。
    3. acquireUninterruptibly() ,获取一个令牌,在获取到令牌之前线程一直处于阻塞状态(忽略中断)。
    4. tryAcquire(),尝试获得令牌,返回获取令牌成功或失败,不阻塞线程。
    5. tryAcquire(long timeout, TimeUnit unit),尝试获得令牌,在超时时间内循环尝试获取,直到尝试获取成功或超时返回,不阻塞线程。
    6. release(),释放一个令牌,唤醒一个获取令牌不成功的阻塞线程。
    7. hasQueuedThreads(),等待队列里是否还存在等待线程。
    8. getQueueLength(),获取等待队列里阻塞的线程数。
    9. drainPermits(),清空令牌把可用令牌数置为0,返回清空令牌的数量。
      10.availablePermits(),返回可用的令牌数量。

    实现原理

    可参考:Semaphore实现原理

    Exchange

    概述

    Exchanger是一个用于两个线程间数据交换的工具类,它提供一个公共点,在这个公共点,两个线程可以交换彼此的数据。当一个线程调用exchange方法后将进入等待状态,直到另外一个线程调用exchange方法,双方完成数据交换后继续执行。

    常用方法

    1. exchange(V x),阻塞当前线程,直到另外一个线程调用exchange方法或者当前线程被中断。x : 需要交换的对象。
    2. exchange(V x, long timeout, TimeUnit unit),阻塞当前线程,直到另外一个线程调用exchange方法或者当前线程被中断或者等待超时。

    工作流程

    Exchange工作流程

    Callable、Future和FutureTask

    Callable与Runnable

    Runnable

    public interface Runnable {
        public abstract void run();
    }
    
    • 1
    • 2
    • 3

    Callable

    public interface Callable<V> {
        /**
         * Computes a result, or throws an exception if unable to do so.
         *
         * @return computed result
         * @throws Exception if unable to compute a result
         */
        V call() throws Exception;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    Runnable执行run方法结束后没有返回值,而Callable执行call方法结束后有返回值

    Future

    概述
    Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果

    public interface Future<V> {
    	//cancel方法用来取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false。
    	//参数mayInterruptIfRunning表示是否允许取消正在执行却没有执行完毕的任务,如果设置true,则表示可以取消正在执行过程中的任务;
    	//如果任务已经完成,则无论mayInterruptIfRunning为true还是false,此方法肯定返回false,即如果取消已经完成的任务会返回false;
    	//如果任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若mayInterruptIfRunning设置为false,则返回false;
    	//如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,肯定返回true。
        boolean cancel(boolean mayInterruptIfRunning);
    
    	//isCancelled方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true。
        boolean isCancelled();
    	
    	//isDone方法表示任务是否已经完成,若任务完成,则返回true;
        boolean isDone();
    		
    	//get()方法用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回;
        V get() throws InterruptedException, ExecutionException;
    
    	//用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null。(在超时的时候不是返回null,而是抛TimeoutException);
        V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    也就是说Future提供了三种功能:

    1. 判断任务是否完成;
    2. 能够中断任务;
    3. 能够获取任务执行结果。

    因为Future只是一个接口,所以是无法直接用来创建对象使用的,因此就有了下面的FutureTask。

    FutureTask

    我们先来看一下FutureTask的实现:

    public class FutureTask<V> implements RunnableFuture<V>{
    }
    
    • 1
    • 2

    FutureTask类实现了RunnableFuture接口,我们看一下RunnableFuture接口的实现:

    public interface RunnableFuture<V> extends Runnable, Future<V> {
        void run();
    }
    
    • 1
    • 2
    • 3

    可以看出RunnableFuture继承了Runnable接口和Future接口,而FutureTask实现了RunnableFuture接口。所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。

    FutureTask提供了2个构造器:

    public FutureTask(Callable<V> callable) {
    }
    public FutureTask(Runnable runnable, V result) {
    }
    
    • 1
    • 2
    • 3
    • 4
  • 相关阅读:
    面试面经|java面试异常面试题
    能用就行——玄学问题:Compile with TORCH_USE_CUDA_DSA to enable device-side assertions
    贪心算法-金条切割问题
    Swagger使用
    C++容器适配器操作总结(代码+示例)
    【Java】线程的同步和互斥锁
    浅谈基于QT的截图工具的设计与实现
    【推荐】javaweb JAVA JSP企业财务管理系统记账管理系统jsp财务管理网站(理财系统财务管理系统)源码
    2023服务端测试开发必备技能:Mock测试
    知识图谱:知识融合
  • 原文地址:https://blog.csdn.net/a734474820/article/details/126934154