• 从JVM角度理解Java并发(下)


    前言

    本文内容主要参考黑马JUC并发编程,《Java并发编程的艺术》,所有代码可在github我的主页下载。

    一、Java内存模型(JMM)

    在第二章我们学习了锁的相关知识,认识到了Moniter,实际上,正是它保证了临界区代码的原子性(即使线程切换,它也能占据临界区的锁,不会被干扰),JMM主要体现在以下三个方面,本章我们继续介绍后续的可见性和有序性。

    • 原子性-保证指令不会受线程上下文切换的影响
    • 可见性-保证指令不会受cpu缓存影响
    • 有序性-保证指令不会受cpu指令并行优化影响

    前面学习的synchronized能有效实现原子性,可见性,以及能从逻辑上实现有序性(毕竟代码块内相当于单线程,即使无序也不会出现线程干扰),但synchronized始终是锁,有没有更轻量的方式去实现可见性和有序性呢?我们先从这二者的问题出发。

    1.1 可见性

    1.1.1 问题表述

    请看下面代码,我们想通过修改flag的值,让线程t运行1s之后停下来,但是却不能如意。

    public class VisibleQuestion {
        static boolean flag = true;
    
        public static void main(String[] args) throws InterruptedException {
            Thread t = new Thread(()->{
                while (flag) {
                    //..
                }
            }, "t");
            t.start();
            TimeUnit.SECONDS.sleep(1);
            // 想让线程停下来  
            flag = false;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    1.1.2 解决办法

    实际上,JMM定义了线程与主存之间的抽象关系,线程之间的共享变量存储在主内存中,每个线程都有一个私有的本地内存,本地内存中存储了该线程以读/写共享变量的副本。
    在这里插入图片描述
    如果线程A要与线程B之间要通信,那么需要经历两个步骤:

    1. 线程A把本地内存A中更新过的共享变量刷新到主存。
    2. 线程B到主存中去读取A更新过的共享变量。

    以我们提到的问题为例,也就是线程t内部存在了flag的副本,当主线程更改了flag的值时,没有经历以上的两个步骤。
    那么,我们让线程每次都从主存读取变量就好了,Java提供了关键字volatile实现这一功能

    static volatile boolean flag = true;
    
    • 1

    注意,volatile并不能保证原子性,只能保证能看到最新值,并不能解决指令交错,所以仅用在一个写线程,多个读线程的情况。若存在指令交错,还得用到synchronized,它既能保证原子性又能保证可见性(清空线程工作内存)。

    1.2 有序性

    1.2.1 问题表述

    为了提高性能,编译器和处理器会对指令做重排序,重排序分为3种:

    1. 编译器优化重排序。
    2. 指令级并行的重排序。
    3. 内存系统的重排序。

    Java代码到最终实际执行的指令序列,会分别经历上面三种重排序。
    如下测试类所示,假设线程1执行actor1,线程2执行actor2。

    class Test {
        int num = 0;
        int r = 0;
        boolean ready = false;
    
        // 线程1
        public void actor1() {
            if (ready) {
                r = num + num;
            } else {
                r = 1;
            }
        }
    
        // 线程2
        public void actor2() {
            num = 2;
            ready = true;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    假如线程1在先,那么r最终的结果应该是1,如果线程2在先,r则为4,然而实际情况,在重复多次的情况下却会出现r=0。

    	public static void main(String[] args) {
            Test test = new Test();
    
            Thread t1 = new Thread(()->test.actor1(), "t1");
            Thread t2 = new Thread(()->test.actor2(), "t2");
    
            t1.start();
            t2.start();
    
            try {
                t1.join();
                t2.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            System.out.println(test.r);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    这是由于actor2或许会被编译器重排序

    	public void actor2() {
            ready = true;
            num = 2;
        }
    
    • 1
    • 2
    • 3
    • 4

    1.2.2 解决办法

    可以使用volatile禁止指令重排

    volatile boolean ready = false;
    
    • 1

    它会让使用到ready变量之前的代码都禁止重排(详见5.3节volatile原理)

    1.3 volatile原理

    既然可见性和有序性都能用volatile解决,那么就非常有必要探究一下它的原理了,volatile的底层实现原理是内存屏障

    • 对volatile变量的写指令后会加入写屏障
    • 对volatile变量的读指令前会加入读屏障

    1.3.1 保证可见性

    写屏障保证在该屏障之前,对共享变量的改动,都同步到主存中。

    	public void actor2() {
            num = 2;
            ready = true; // ready是volatile带写屏障
            // 写屏障
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    也就是说,num, ready两个变量都会及时同步到主存。
    读屏障保证在该屏障之后,对共享变量的读取,加载的是主存中最新的数据。

    	public void actor1() {
    		// 读屏障
            if (ready) { // ready是volatile带读屏障
                r = num + num;
            } else {
                r = 1;
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    这里对ready,num的读取都是加载的最新数据。

    1.3.2 保证有序性

    读写屏障还有一个功能,就是任何代码不能逃离屏障,如

    	public void actor2() {
            num = 2;
            ready = true; // ready是volatile带写屏障
            // 写屏障
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    这里num = 2;这行代码不能被重排到ready=true;的后面,因为ready后是写屏障。故能在一定程度上禁止指令重排。

    1.4 happen-before规则

    happen-before规定了对共享变量的写操作对其他线程的读操作可见,它是可见性与有序性的一套规则总结,抛开以下规则,JMM并不能保证一个线程对共享变量的写,对于其他线程对该共享变量的读可见。

    1. 程序顺序规则。同一个线程的每个操作,对它之后的操作都可见。
    2. 监视器锁规则。线程解锁m之前对变量的写,对接下来对m加锁的其他线程对该变量的读可见,也就是说,synchronized能保证线程之间的可见性。
    3. volatile变量规则。线程对volatile变量的写,对接下来其他线程对该变量的读可见。
    4. 传递性。A h-b B,且B h-b C,那么A h-b C。
    5. 线程start前对变量的写,对该线程开始后对该变量的读可见。
    6. 线程结束前对该变量的写,对其他线程得知它结束后的读可见(其他线程调用t.join()等待它结束)。

    1.5 final原理

    1.5.1 写final

    对final对象的赋值会在指令后加入写屏障,该屏障保证1)有序性。写屏障之前的代码不会重排序到屏障后面,2)可见性。写屏障之前的所有赋值操作会被同步到主存。

    public class TestFinal{
    	final int a = 20;
    }
    
    • 1
    • 2
    • 3

    字节码
    在这里插入图片描述

    1.5.2 读final

    读final时,从目标类直接复制该值到本类,而非final,我们知道是去拿目标值。

    public class TestFinalDemo {
        public static void main(String[] args) {
            System.out.println(TestFinal.A);
        }
    }
    
    class TestFinal{
        static final int A = 10;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    二、线程池

    juc里面的线程池类
    在这里插入图片描述

    2.1 线程池的状态

    线程池存在5种状态,ThreadPoolExecutor使用int的高3位来表示线程池的状态,低29位表示线程数量

    状态名高3位接收新任务处理阻塞队列任务说明
    RUNNING111YY接收新任务,处理队列任务
    SHUTDOWN000NY不接收新任务,但会处理队列剩余任务
    STOP001NN中断正在执行的任务,抛弃队列任务
    TIDYING010--任务全部执行完毕,活动线程为0,即将进入终结
    TERMINATED011--终结状态

    将这些信息全部存储在一个原子变量ctl中,将线程状态和线程数合二为一,这样可以用一次cas操作赋值。

    2.2 ThreadPoolExecutor构造方法

    	public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
                                  //...
                                  }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • corePoolSize 核心线程数
    • maximumPoolSize 最大线程数
    • keepAliveTime 生存时间(针对救急线程)
    • unit 时间单位
    • workQueue 任务阻塞队列
    • threadFactory 线程工厂
    • handler 拒绝策略

    线程池的任务调度逻辑如下图所示
    在这里插入图片描述
    超过核心线程数那部分叫做“救急线程”,这些线程具有存活时间keepAliveTime,当任务执行完毕后超过生存时间,它们都会被销毁。
    如果队列满了,线程也达到最大线程数了,我们就使用拒绝策略。

    jdk提供了以下几种阻塞队列,BlockingQueue接口继承了Queue接口,与普通队列的区别在于阻塞队列存在线程的等待与唤醒,且依赖于重入锁的Condition。
    在这里插入图片描述

    • ArrayBlockingQueue 有界的任务队列
    • LinkedBlockingDeque 无界的任务队列,除非资源耗尽,不会存在任务入队失败的情况,有耗尽内存的风险
    • SynchronousQueue 没有容量,每一个插入操作都要等待一个删除,反之一样,提交的任务不会保存,总是交给线程执行,如果没有空闲线程,则尝试创建新的线程,通常要设置很大的maximumPoolSize,不然很容易执行拒绝策略
    • PriorityBlockingQueue 可以控制任务执行先后顺序的无界队列
    • DelayedWorkQueue 延时队列,用于ScheduledThreadPoolExecutor

    jdk提供了以下几种拒绝策略
    在这里插入图片描述

    • AbortPolicy 直接抛出异常(默认)
    • CallerRunsPolicy 调用者线程执行任务
    • DiscardOldestPolicy 丢弃最老的一个请求,尝试执行当前任务
    • DiscardPolicy 丢弃该任务

    线程工厂是线程池生产线程的地方,主要用于线程的统一命名,默认使用Executors.defaultThreadFactory()产生,它的源代码如下

    static class DefaultThreadFactory implements ThreadFactory {
    		// 线程池号
            private static final AtomicInteger poolNumber = new AtomicInteger(1);
            private final ThreadGroup group;
            // 线程号
            private final AtomicInteger threadNumber = new AtomicInteger(1);
            private final String namePrefix;
    
            DefaultThreadFactory() {
                SecurityManager s = System.getSecurityManager();
                group = (s != null) ? s.getThreadGroup() :
                                      Thread.currentThread().getThreadGroup();
                namePrefix = "pool-" +
                              poolNumber.getAndIncrement() +
                             "-thread-";
            }
    
    		// 实现ThreadFactory接口的方法
            public Thread newThread(Runnable r) {
            	// 创建线程,命名
                Thread t = new Thread(group, r,
                                      namePrefix + threadNumber.getAndIncrement(),
                                      0);
                if (t.isDaemon())
                    t.setDaemon(false);
                if (t.getPriority() != Thread.NORM_PRIORITY)
                    t.setPriority(Thread.NORM_PRIORITY);
                return t;
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    2.3 线程池提交

    ThreadPoolExecutor提供了以下提交的方法(有些是由它的抽象父类提供的)

    2.3.1 支持单个任务提交

    • void execute(Runnable command) 提交实现Runnable接口的任务,没有返回值
    • Future submit(Runnable task) 提交实现Runnable接口的任务,并有一个获取返回值的凭证Future,但由于Runnable接口并没有返回值,所以get会获得null
    • Future submit(Runnable task, T result) 暂时不太清楚result做什么
    • Future submit(Callable task) 提交实现Callable接口的任务,并有一个获取返回值的凭证Future,通过get可以阻塞获取该任务的返回值

    总体来说,就两种方法execute和submit,execute仅处理Runnable接口,不能获取返回值,且遇到异常直接抛出,submit二者都能处理,能获取返回值,遇到异常不会直接抛出,而是通过get方法捕获异常。

    2.3.2 支持批量提交

    • List> invokeAll(Collection> tasks)
    • List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) 一定时间内,任务没有执行完,会把后续的取消掉

    2.3.3 支持批量提交但只执行一个

    哪个任务先执行成功,返回此任务的执行结果,其他任务取消

    • T invokeAny(Collection> tasks)
    • T invokeAny(Collection> tasks, long timeout, TimeUnit unit)
    public class ThreadPoolExecutorDemo {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            // 自定义线程池
            ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 5, 1000,
                    TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(50));
    
            // 提交单个
            Future<String> future = pool.submit(() -> {
                TimeUnit.SECONDS.sleep(2);
                return "ok";
            });
            System.out.println(future.get());
    
            // 批量提交
            List<Future<String>> futures = pool.invokeAll(Arrays.asList(
                    () -> {
                        TimeUnit.SECONDS.sleep(1);
                        return "1";
                    },
                    () -> {
                        TimeUnit.SECONDS.sleep(2);
                        return "2";
                    },
                    () -> {
                        TimeUnit.SECONDS.sleep(3);
                        return "3";
                    }
            ));
            futures.forEach(f-> {
                try {
                    System.out.println(f.get());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            });
    
            // 批量竞争提交
            String result = pool.invokeAny(Arrays.asList(
                    () -> {
                        TimeUnit.SECONDS.sleep(1);
                        return "a";
                    },
                    () -> {
                        TimeUnit.SECONDS.sleep(2);
                        return "b";
                    },
                    () -> {
                        TimeUnit.SECONDS.sleep(3);
                        return "c";
                    }
            ));
            System.out.println(result);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56

    输出:

    ok
    1
    2
    3
    a
    
    • 1
    • 2
    • 3
    • 4
    • 5

    2.4 关闭线程池

    2.4.1 shutdown

    将线程池状态变为SHUTDOWN,不会接收新任务,但已提交的任务会执行完。shutdown并不会阻塞主线程的执行,要想等待shutdown执行完毕,主线程需要调用awaitTermination(long timeout, TimeUnit unit)方法,等待线程池真正终结。

    public void shutdown() {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                checkShutdownAccess();
                // 修改线程池状态
                advanceRunState(SHUTDOWN);
                // 仅会打断空闲线程
                interruptIdleWorkers();
                onShutdown(); // hook for ScheduledThreadPoolExecutor
            } finally {
                mainLock.unlock();
            }
            // 尝试终结(没有运行的线程可以终结)
            tryTerminate();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    2.4.2 shotdownNow

    不会接收新任务,会将队列中的任务返回,并用interrupt的方式中断正在执行的任务。

    public List<Runnable> shutdownNow() {
            List<Runnable> tasks;
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                checkShutdownAccess();
                // 线程状态改为STOP
                advanceRunState(STOP);
                // 打断所有线程
                interruptWorkers();
                // 获取队列中剩余任务
                tasks = drainQueue();
            } finally {
                mainLock.unlock();
            }
            // 尝试终结
            tryTerminate();
            return tasks;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    2.5 线程池创建须知

    2.5.1 线程池的个数

    一般来说,不同的任务类型应该使用不同的线程池,这样能够避免饥饿,提升效率。
    试想,如果存在这样一个场景,在餐厅我们有点餐和做菜两种动作,如果我们创建一个线程池(容量为2),我们让线程池里面的线程无差别的执行这两个动作,假如点餐过后需要等待菜做好才能继续,在这种情况下,就会出现饥饿。现在同时来了两位客人,我们使用两个线程异步执行点餐任务,这时就没有线程去执行做菜任务了,系统就无法推进,所以,我们需要创建两个线程池,让这两个线程池分别去处理不同的任务。

    2.5.2 线程池的大小

    线程池的大小到底设置为多少比较合适呢?如果线程池过小会导致程序不能重复利用系统资源,导致饥饿,如果过大导致线程频繁上下文切换,影响cpu吞吐量。
    CPU密集运算
    通常采用CPU核数+1能实现最优CPU利用率,+1是保证页缺失导致线程暂停时,额外的这个线程就能顶上去,保证CPU时钟不被浪费。

    I/O密集运算
    对于I/O密集运算的服务(数据库操作等),CPU不总是繁忙,常常空闲下来,这种情况可以参考一下经验公式

    线程数 = 核数 \* 期望CPU利用率 * 总时间(CPU计算时间+等待时间)/ CPU计算时间
    
    • 1

    例如4核CPU计算时间占10%,等待时间为90%,期望CPU被100%利用,套用公式 4 * 100% * 100% / 10% = 40,可以创建大小为40的线程池。

    2.6 任务调度线程池

    在前面我们介绍了普通线程池ThreadPoolExecutor,JDK还提供了一种线程池,可以延时/周期进行线程调度。

    ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(2);
    
    • 1

    从本章开始的类图中,可以看到ScheduledThreadPoolExecutor继承了ScheduledThreadPoolExecutor并且实现了ScheduledExecutorService接口,这个接口定义了schedule,scheduleAtFixedRate,scheduleWithFixedDelay方法。
    先从构造器来认识ScheduledThreadPoolExecutor吧

    	public ScheduledThreadPoolExecutor(int corePoolSize) {
            super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
                  new DelayedWorkQueue());
        }
    
    • 1
    • 2
    • 3
    • 4

    可以看到,它依然是调用父类的构造方法,不同的是,采用了延时队列。
    而它所实现的ScheduledExecutorService接口的方法,功能分别如下

    • schedule(Runnable command, long delay, TimeUnit unit) 延时delay后执行任务
    • schedule(Callable callable,long delay, TimeUnit unit) 延时delay后执行任务,并能获得返回值
    • scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) 延时intialDelay后开始,以线程开始时间为基准,每隔period执行一次任务,如果周期<任务执行时间,那么会在任务执行结束后,立刻进行下一次调用
    • scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) 延时intialDelay后开始,以线程结束时间为基准,每隔period执行一次任务
    public class ScheduledThreadPoolExecutorDemo {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(2);
    
            // 延时执行 两个接口
            pool.schedule(()-> System.out.println("hello"), 1, TimeUnit.SECONDS);
            ScheduledFuture<String> future = pool.schedule(() -> {
                System.out.println("hello");
                return "s";
            }, 1, TimeUnit.SECONDS);
            System.out.println(future.get());
    
            // 周期执行,以上个任务开始计算 总共等待2秒
            pool.scheduleAtFixedRate(()->{
                System.out.println("running1");
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, 0, 2, TimeUnit.SECONDS);
    
    
            // 周期执行,以上个任务结束计算 总共等待3秒
            pool.scheduleWithFixedDelay(()->{
                        System.out.println("running1");
                        try {
                            TimeUnit.SECONDS.sleep(1);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }, 0, 2, TimeUnit.SECONDS
            );
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    2.7 Fork/Join

    Fork/Join体现的是一种分治思想,适用于能够进行任务拆分的cpu密集计算。
    它在分治算法的基础上加入了多线程,可以把每个子任务交由不同线程来完成,默认会创建与cpu核心数大小相同的线程池。
    在这里插入图片描述
    比如,我们要计算1-10的累加和,可以采用如下方法

    public class ForkJoinDemo {
        public static void main(String[] args) {
            ForkJoinPool pool = new ForkJoinPool(4);
            System.out.println(pool.invoke(new MyTask(1, 10)));
        }
    }
    
    class MyTask extends RecursiveTask<Integer> {
    
        private int begin;
        private int end;
    
        public MyTask(int begin, int end) {
            this.begin = begin;
            this.end = end;
        }
    
        @Override
        protected Integer compute() {
            if (begin == end) {
                return begin;
            }
    
            int mid = (begin + end) / 2;
            MyTask t1 = new MyTask(begin, mid);
            MyTask t2 = new MyTask(mid + 1, end);
            // 让其他线程去执行任务
            t1.fork();
            t2.fork();
            System.out.println(Thread.currentThread().getName() + ",fork");
    
            int res = t1.join() + t2.join();
            return res;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    输出:

    ForkJoinPool-1-worker-1,fork
    ForkJoinPool-1-worker-3,fork
    ForkJoinPool-1-worker-1,fork
    ForkJoinPool-1-worker-2,fork
    ForkJoinPool-1-worker-3,fork
    ForkJoinPool-1-worker-0,fork
    ForkJoinPool-1-worker-1,fork
    ForkJoinPool-1-worker-3,fork
    ForkJoinPool-1-worker-0,fork
    55
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    可以看到,该运算使用了4个线程分别去执行任务。

    三、JUC并发包

    3.1 LockSupport

    3.1.1 基本使用

    线程阻塞工具,使用park()阻塞当前线程,unpark()让线程继续执行(对比suspend()和resume()),特别的,它还支持限时阻塞(parkNanos()/parkUntil()),优点:

    1. 阻塞的线程处于waiting/timed-waiting状态;
    2. 底层实现类似于信号量机制,即使unpark()在前,也不会影响代码逻辑;
    3. 支持中断影响;
    4. 支持精确唤醒。
    Thread t = new Thread("t") {
    	@Override
        public void run() {
        	System.out.println("t park...");
            LockSupport.park();
            System.out.println("t unpark...");
        }
    };
    t.start();
    LockSupport.unpark(t);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    可以看到,park功能与wait类似,但park不需要锁,能精确唤醒线程,且能不考虑暂停与唤醒的时序。

    3.1.2 底层原理

    先来看看park方法
    在这里插入图片描述

    • 当前线程调用Unsafe.park()方法
    • 检查_counter,本情况为0,这时,获得_mutex互斥锁
    • 线程进入_cond条件变量阻塞
    • 设置_counter=0

    下面来看一下正常情况调用unpark方法
    在这里插入图片描述

    • 调用Unsafe.unpark(Thread_0)方法,设置_counter为1
    • 唤醒_cond变量中的Thread_0
    • Thread_0恢复运行
    • 设置_counter=0

    最后看一下先调用unpark再调用park时
    在这里插入图片描述

    • 调用Unsafe.unpark(Thread_0)方法,设置_counter为1
    • 当前线程调用Unsafe.park()方法
    • 检查_counter,本情况为1,线程无需阻塞,继续运行
    • 设置_counter=0

    3.2 ReentrantLock

    相对于synchronized,它具有以下特点:

    • 可中断阻塞
    • 可以设置超时时间
    • 可以设置为公平锁
    • 支持多个条件变量

    与synchronized一样,都支持可重入。

    public class ReentrantLockDemo {
        private static ReentrantLock lock = new ReentrantLock();
    
        public static void main(String[] args) {
            lock.lock();
            try {
                // 临界区代码
            } finally {
                lock.unlock();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    3.2.1 中断阻塞

    可以在线程阻塞的时候,将该线程中断。

    	private static void interruptTest() {
            Thread t1 = new Thread("t1") {
                @Override
                public void run() {
                    try {
                        System.out.println("t1 尝试获取锁");
                        lock.lockInterruptibly();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                        System.out.println("t1 被中断");
                        return;
                    }
    
                    try {
                        System.out.println("t1 获取到锁");
                    } finally {
                        lock.unlock();
                    }
                }
            };
    
            lock.lock();
            t1.start();
            t1.interrupt();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    输出:

    t1 尝试获取锁
    t1 被中断
    java.lang.InterruptedException
    	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1220)
    	at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335)
    	at com.example.thread.jdkconcurrency.ReentrantLockDemo$1.run(ReentrantLockDemo.java:31)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    3.2.2 锁超时

    ReentrantLock支持有限等待锁资源,避免死锁产生(如哲学家进餐问题,使用synchronized就会出现死锁,若使用ReentrantLock,可以分别尝试获得左边筷子、右边筷子,若右边筷子获取失败,则释放掉左边的筷子)。

    	private static void tryLockTest() {
            Thread t1 = new Thread("t1") {
                @Override
                public void run() {
                    try {
                        System.out.println("t1 尝试去获得锁");
                        if (!lock.tryLock(1, TimeUnit.SECONDS)) {
                            System.out.println("t1 没有获取到锁 时间到了退出");
                            return;
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    try {
                        System.out.println("t1 获取到锁了");
                    } finally {
                        lock.unlock();
                    }
                }
            };
    
            lock.lock();
            t1.start();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    3.2.3 公平锁

    ReentrantLock默认不公平,即唤醒阻塞线程让它们自己去竞争锁,而不考虑线程阻塞的顺序。
    设置方法:

    private static ReentrantLock lock = new ReentrantLock(true);
    
    • 1

    虽然公平锁能减轻饥饿问题,但公平锁会降低并发度。

    3.2.4 条件变量

    我们知道,进入synchronized后,若发现还有条件未满足,可以进入WaitSet进行等待,但是synchronized只有一个WaitSet,也就是说,只能判断一个条件,而ReentrantLock可以判断多个条件,相当于程序员可以自己创建多个WaitSet。
    假如存在t1, t2, t3, t4四个线程,t1线程要等待有烟才能工作,t2线程要等待有咖啡才能工作,t3线程负责送烟,t4线程负责送咖啡,代码实现如下。

    public class ReentrantLockDemo {
        private static ReentrantLock lock = new ReentrantLock(true);
        private static boolean cigerate = false;
        private static boolean coffee = false;
        private static Condition waitCigerate = lock.newCondition();
        private static Condition waitCoffee = lock.newCondition();
    
        public static void main(String[] args) {
            conditionTest();
        }
    
        private static void conditionTest() {
    
            Thread t1 = new Thread("t1") {
                @Override
                public void run() {
                    lock.lock();
                    try {
                        while (!cigerate) {
                            System.out.println("t1 香烟还没到 等会儿");
                            waitCigerate.await();
                        }
    
                        System.out.println("t1 香烟到了 工作");
                        TimeUnit.SECONDS.sleep(1);
    
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        lock.unlock();
                    }
                }
            };
    
            Thread t2 = new Thread("t2") {
                @Override
                public void run() {
                    lock.lock();
                    try {
                        while (!coffee) {
                            System.out.println("t2 咖啡还没到 等会儿");
                            waitCoffee.await();
                        }
    
                        System.out.println("t2 咖啡到了 工作");
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        lock.unlock();
                    }
                }
            };
    
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            Thread t3 = new Thread("t3") {
                @Override
                public void run() {
                    try {
                        lock.lock();
                        System.out.println("t3 开始送烟");
                        cigerate = true;
                        waitCigerate.signal();
                    } finally {
                        lock.unlock();
                    }
                }
            };
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            Thread t4 = new Thread("t4") {
                @Override
                public void run() {
                    try {
                        lock.lock();
                        System.out.println("t4 开始送咖啡");
                        coffee = true;
                        waitCoffee.signal();
                    } finally {
                        lock.unlock();
                    }
                }
            };
    
            t1.start();
            t2.start();
            t3.start();
            t4.start();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97

    输出:

    t1 香烟还没到 等会儿
    t2 咖啡还没到 等会儿
    t3 开始送烟
    t4 开始送咖啡
    t1 香烟到了 工作
    t2 咖啡到了 工作
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    可以看到,四个线程同时进入临界区工作,且线程1和线程2分别在等待不同的条件。

    3.3 AQS

    3.3.1 自定义锁

    队列同步器AbstractQueuedSynchronizer(AQS)用来构建锁或者其他同步组件的基础框架,它使用一个int类型的state表示同步状态,通过内置的FIFO队列完成资源获取线程的排队工作。
    AQS基于模板方法模式,开发者仅需继承同步器并重写指定方法,并调用同步器提供的模板方法即可,我们先来简单实现一个基于AQS的自定义锁(独占锁)。
    首先我们的自定义锁得实现Lock接口,在类内部定义一个内部类,并继承同步器,同时,必须重写tryAcquire,tryRelease,isHeldExclusively方法,这三个方法若不重写直接调用会抛异常。

    class MyLock implements Lock {
    
        class MySync extends AbstractQueuedSynchronizer {
    
            @Override
            protected boolean tryAcquire(int arg) {
                if (compareAndSetState(0, 1)) {
                    // 加锁,并设置owner为当前线程
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
                return false;
            }
    
            @Override
            protected boolean tryRelease(int arg) {
                setExclusiveOwnerThread(null);
                // 解锁,此时只有自己有锁
                setState(0);
                return true;
            }
    
            @Override
            protected boolean isHeldExclusively() {
                return getState() == 1;
            }
    
            public Condition newCondition() {
                return new ConditionObject();
            }
        }
    
        private MySync sync = new MySync();
    
        @Override
        public void lock() {}
    
        @Override
        public void lockInterruptibly() throws InterruptedException {}
    
        @Override
        public boolean tryLock() {}
    
        @Override
        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {}
    
        @Override
        public void unlock() {}
    
        @Override
        public Condition newCondition() {}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52

    队列同步器提供了compareAndSetState,setExclusiveOwnerThread,getState方法获取和修改同步状态,state为0表示无锁状态,不为0表示有锁状态,并且同步器存有指向当前占有锁的线程。
    当线程需要获得锁时,它会尝试将state由0改为1,若修改成功,则获得锁,若失败,则进入队列。
    Lock的实现也就是调用我们所实现的这几个方法

    	@Override
        public void lock() {
            // 加锁,不成功进入等待队列
            sync.acquire(1);
        }
    
        @Override
        public void lockInterruptibly() throws InterruptedException {
            sync.acquireInterruptibly(1);
        }
    
        @Override
        public boolean tryLock() {
            return sync.tryAcquire(1);
        }
    
        @Override
        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
            return sync.tryAcquireNanos(1, unit.toNanos(time));
        }
    
        @Override
        public void unlock() {
            sync.release(1);
        }
    
        @Override
        public Condition newCondition() {
            return sync.newCondition();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    以上例子是独占式的获取同步状态的情况,队列同步器也支持共享式的获取同步状态。JUC有很多类都是基于队列同步器,包括ReentrantLock,ReentrantReadWriteLock和CountDownLatch等,synchronized的同步借助于操作系统监视器,而队列同步器则是纯Java来做的。

    3.3.2 同步队列

    由上一节我们知道AQS的大致实现逻辑为:当线程需要获得锁时,它会尝试将state由0改为1,若修改成功,则获得锁,若失败,则进入队列。AQS是怎么做的呢?它又是在何时调用我们重写的tryAcquire和tryRelease方法?
    首先,我们需要了解AQS的队列结构,如下图所示,
    在这里插入图片描述
    队列采用双向链表实现,并有头尾指针,当线程获取同步状态失败时,同步器会为其构造一个节点并加入同步队列,同时阻塞当前线程,当同步状态释放时,会把首节点的线程唤醒,使其再次尝试获得同步状态。
    节点具有的属性

    • waitStatus:值为0表示初始状态,值为-1表示后继节点处于等待状态,当前节点释放同步状态时,将会通知后继节点,值为-2表示节点在Condition上
    • prev:前驱
    • next:后继

    队列的加入过程需要保证线程安全(CAS),设置头节点则不用。

    3.3.3 独占式同步状态获取与释放

    由上一节我们知道,我们通过调用sync.acquire(1)方法加锁,该方法是AQS提供的模板方法,代码如下所示

    	public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    首先,线程会尝试加锁(tryAcquire),如果加锁成功,则获得锁,如果加锁失败,则构造节点(addWaiter)加入队列,加入队列后,若线程的前驱节点为头节点,则线程尝试自旋获得锁(acquireQueued),当自旋尝试一段时间还是未获得同步状态后,进入阻塞状态,并将前驱节点的waitStatus设置为-1,若自旋获得了锁,则将队头指向当前节点。

    	final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                	// 前驱节点
                    final Node p = node.predecessor();
                    // 如果前驱节点为头结点,自旋获得锁
                    if (p == head && tryAcquire(arg)) {
                    	// 将队头指向当前节点
                        setHead(node);
                        // 前驱节点释放
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    // 自旋一段时间进入阻塞
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    注:队头是持有同步状态的线程。
    在这里插入图片描述

    下面来看锁的释放,上一节我们调用的是sync.release()解锁,我们看看AQS的模板方法

    	public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                // 如果waitStatus不为0,则需要唤醒后继节点
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    释放锁后,若head不为空(存在队列),则需要检查head节点的waitStatus,若其不为0,则需要唤醒后继节点。

    3.3.4 Condition接口

    与Synchronized类似,AQS也支持等待/通知,且又有不同,其中最大的不同在于AQS支持多个等待队列,而监视器只支持一个,我们看看AQS是怎么实现的。
    由我们的自定义锁可以看到,newCondition()方法调用的是new ConditionObject(),而ConditionObject是AQS的内部类,这个类实现了Condition接口,和我们前面的见过的同步队列结构非常相似,每个ConditionObject都包含一个队列(称为等待队列),且和同步队列使用相同的节点结构,即AQS内部定义的Node。
    在这里插入图片描述
    而AQS拥有一个同步队列和多个等待队列,
    在这里插入图片描述
    await()方法
    调用Condition的await()方法时(当前线程一定获取了锁),会使当前线程进入等待队列并释放锁,同时线程状态变为等待状态,从代码层面来看,该方法会将当前线程构造成节点并加入等待队列,然后释放同步状态,唤醒同步队列中的后继节点,当前线程进入等待状态。

    		public final void await() throws InterruptedException {
                if (Thread.interrupted())
                    throw new InterruptedException();
                // 当前线程加入等待队列
                Node node = addConditionWaiter();
                // 释放锁 从同步队列移除
                int savedState = fullyRelease(node);
                int interruptMode = 0;
                // 当线程被唤醒时,检查是否获得了同步锁
                while (!isOnSyncQueue(node)) {
                	// 阻塞当前线程
                    LockSupport.park(this);
                    // 如果是中断唤醒 抛出中断异常
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                }
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                if (node.nextWaiter != null) // clean up if cancelled
                    unlinkCancelledWaiters();
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    signal()方法
    该方法会唤醒在等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移动到同步队列中。
    从代码层面上看,首先进行检查,当前线程是否获得了锁,接着获取等待队列的首节点,将其移动到同步队列并使用LockSupport唤醒节点中的线程。

    		public final void signal() {
    			// 检查当前线程是否获得了锁
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                // 拿到首节点
                Node first = firstWaiter;
                if (first != null)
                    doSignal(first);
            }
    
    		private void doSignal(Node first) {
                do {
                	// 把首节点从等待队列移除
                    if ( (firstWaiter = first.nextWaiter) == null)
                        lastWaiter = null;
                    first.nextWaiter = null;
                } while (!transferForSignal(first) &&
                         (first = firstWaiter) != null);
            }
    
    		final boolean transferForSignal(Node node) {
    
    			// 将节点的状态由-2设为0
            	if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
                	return false;
     			// 通过CAS移动到同步队列 返回node的前驱
            	Node p = enq(node);
            	int ws = p.waitStatus;
            	if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            		// 唤醒node的线程
                	LockSupport.unpark(node.thread);
            	return true;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    而singleAll()则是相当于对每一个节点执行一次signal()方法,也就是将等待队列中的所有节点全部移动到同步队列中,并唤醒每个节点的线程。

    3.4 ReentrantLock原理

    在这里插入图片描述
    ReentrantLock的大致框架与我们再上一节自定义的锁非常类似,都实现了Lock,且在内部继承了AQS(Sync),不同的是,它还从Sync衍生出两个子类,分别实现公平与非公平锁。

    3.4.1 实现可重入

    我们在前面所做的自定义锁,当同一个线程尝试再次进入临界区时,会被锁阻塞,我们获得锁时,将state从0改为了1,如果再次进入时,检测当前占用锁的线程是否为当前线程,若为当前线程,则将state加一,不就实现重入了吗。

    		final boolean nonfairTryAcquire(int acquires) {
    			// 当前线程
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                // 如果为锁重入
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    // 增加state
                    setState(nextc);
                    return true;
                }
                return false;
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    那么在释放的时候,就不能像之前那样直接把state置为0,而需要不断减少。

    		protected final boolean tryRelease(int releases) {
                int c = getState() - releases;
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
                // 当state为0才真正释放
                if (c == 0) {
                    free = true;
                    setExclusiveOwnerThread(null);
                }
                setState(c);
                return free;
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    3.4.2 实现公平与非公平

    默认情况下,为非公平锁,即只要线程设置同步状态成功,它就获得锁,而公平锁保证锁的获取顺序符合请求的绝对时间顺序,它在获取锁的时候会去检查当前节点是否具有前驱节点,如果有,则表明有线程在当前线程的前面请求锁,获取失败。

    	protected final boolean tryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    如果判断它是否有前驱节点呢?

    1. 如果头尾相等,表明队列为空,返回false
    2. 我们知道,队头是当前获得同步状态的线程,所以我们需要检查队头的下一个线程是否为当前线程,若不是当前线程,则表明有线程在当前线程之前请求,返回true
    	public final boolean hasQueuedPredecessors() {
            // The correctness of this depends on head being initialized
            // before tail and on head.next being accurate if the current
            // thread is first in queue.
            Node t = tail; // Read fields in reverse initialization order
            Node h = head;
            Node s;
            return h != t &&
                ((s = h.next) == null || s.thread != Thread.currentThread());
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    为什么要将非公平锁设置成默认呢?
    这是由于公平锁需要严格按照FIFO的顺序,会造成大量线程的切换,而非公平锁虽然可能会造成“饥饿”,但极少的线程切换,保证了更大的吞吐量。

    3.5 ReentrantReadWriteLock

    3.5.1 读写锁的应用之缓存一致性

    我们在使用redis的时候,难免会遇到与数据库的一致性问题,更新的时候,到底是先删缓存再更库,还是先更库再删缓存?
    首先,我们已先删缓存再更新这种情况,看看数据不一致是怎么产生的。
    一般来说,缓存的使用为查询和更新操作

    • 查询操作:若缓存有,则从缓存读,若缓存没有,则从数据库读,并放入缓存。
    • 更新操作:删缓存,再更库

    现假设线程1做查询操作,线程2做更新操作,如果线程2先删除了缓存,此时线程1查库并放入缓存,线程2又修改了数据库,就会造成数据不一致(数据库已被修改,但缓存还是原来的)。

    其实,造成缓存不一致的原因无非就是更新那两步(更库和删缓存),只要把这两步捆绑起来,做成原子操作就可以了,这就需要加锁,synchronized和reetrantLock二者都行,但是会使查询之间互斥,这就需要读写锁出马了。

    class CacheReadWriteLock {
    
        @Autowired
        private RedisTemplate redisTemplate;
    
        private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    
        // 查询
        public Object query() {
    
            /**
             为什么要加读锁?
             假如线程1要去读,线程2要去改
             线程1先进行if判断,进入if内,此时线程2删除了redis
             线程1get的时候就会为null
    
             读写锁比synchronized和ReentrantLock更加高效,读之间并不互斥
             **/
            // 读锁
            lock.readLock().lock();
            // 如果redis有,直接返回
            try {
                if (redisTemplate.hasKey("test")) {
                    return redisTemplate.opsForValue().get("test");
                }
            } finally {
                lock.readLock().unlock();
            }
    
            // 写锁
            lock.writeLock().lock();
            try {
                // 双重检查 或许会有多个线程
                if (redisTemplate.hasKey("test")) {
                    return redisTemplate.opsForValue().get("test");
                }
                // 如果redis没有,则查询数据库
                Object data = sqlOpt();
                // 放入redis
                redisTemplate.opsForValue().set("test", data);
    
                return data;
            } finally {
                lock.writeLock().unlock();
            }
    
        }
    
        // 修改
        public void update() {
            // 写锁
            lock.writeLock().lock();
            try {
                // 删除redis缓存
                redisTemplate.delete("test");
                // 修改数据库
                sqlUpdate();
            } finally {
                lock.writeLock().unlock();
            }
    
        }
    
        // 查询数据库
        private Object sqlOpt() {
            return new Object();
        }
    
        // 修改数据库
        private void sqlUpdate() {
    
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74

    当然,这种方式仅仅只从并发角度,假设redis的每一次操作都成功来考虑,若考虑redis本身删除失败的情况,则更为复杂,或许会订阅mysql的binlog。

    3.5.2 写锁的获取和释放

    读写状态设计:读写锁的state同时维护了写锁和读锁状态,高16位表示读,低16位表示写。
    知道这个设计后,我们来看看写锁的获取和释放,与ReentrantLock的区别在于,需要分别讨论写读的情况。

    protected final boolean tryAcquire(int acquires) {
                Thread current = Thread.currentThread();
                int c = getState();
                // w = c & 0x0000FFFF
                int w = exclusiveCount(c);
                if (c != 0) {
                    // 存在读锁或者当前获取线程不是已经获取写锁的线程
                    if (w == 0 || current != getExclusiveOwnerThread())
                        return false;
                    if (w + exclusiveCount(acquires) > MAX_COUNT)
                        throw new Error("Maximum lock count exceeded");
                    // 重入+1
                    setState(c + acquires);
                    return true;
                }
                if (writerShouldBlock() ||
                    !compareAndSetState(c, c + acquires))
                    return false;
                setExclusiveOwnerThread(current);
                return true;
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    写锁的释放,当写锁状态为0时释放掉锁,并且唤醒后继节点。

    		protected final boolean tryRelease(int releases) {
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                int nextc = getState() - releases;
                // 写锁状态是否为0
                boolean free = exclusiveCount(nextc) == 0;
                if (free)
                    setExclusiveOwnerThread(null);
                setState(nextc);
                return free;
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    3.5.3 读锁的获取与释放

    若当前线程获取了写锁或者写锁未被获取,则当前线程增加读状态,成功获取读锁;若其他线程已经获取了写锁,则当前线程以SHARED加入同步队列,并进入自旋,若自旋获得锁成功,则通知后继第一个写锁之前的所有读锁线程,若自旋几次都失败,则进入阻塞状态。

    tryAcquireShared返回值情况

    • -1表示失败
    • 0表示成功,但后继节点不会被唤醒
    • 正数表示成功,且数值是后继还有几个节点需要唤醒,读锁返回1
    		protected final int tryAcquireShared(int unused) { 
                Thread current = Thread.currentThread();
                int c = getState();
                // 如果写锁状态不为0且当前加锁的线程不是自己
                if (exclusiveCount(c) != 0 &&
                    getExclusiveOwnerThread() != current)
                    return -1;
                int r = sharedCount(c);
                if (!readerShouldBlock() &&
                    r < MAX_COUNT &&
                    compareAndSetState(c, c + SHARED_UNIT)) {
                    // 此处省略设置当前线程获取读锁的次数
                    
                    // 获取锁成功
                    return 1;
                }
                return fullTryAcquireShared(current);
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    若获取锁失败(-1),则执行doAcquireShared方法,

    	public final void acquireShared(int arg) {
            if (tryAcquireShared(arg) < 0)
                doAcquireShared(arg);
        }
    
    • 1
    • 2
    • 3
    • 4
    	private void doAcquireShared(int arg) {
    		// 往同步队列添加SHARED的节点
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                boolean interrupted = false;
                // 自旋
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head) {
                        int r = tryAcquireShared(arg);
                        // 获取锁成功
                        if (r >= 0) {
                        	// 设置头并检查后继是否有读锁,把后继的读锁全部唤醒
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            if (interrupted)
                                selfInterrupt();
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    读锁的释放与写锁的释放基本一致。

    3.6 semaphore

    3.6.1 基本使用

    信号量,用来限制能同时访问共享资源的线程上限,通常用来限流,在访问高峰期时,让请求线程阻塞,高峰期过去再释放许可(仅仅控制线程数,而不是限制资源数)。

    public class SemaphoreDemo {
        public static void main(String[] args) {
            // 高峰线程数为3
            Semaphore semaphore = new Semaphore(3);
    
            for (int i = 0; i < 10; i++) {
                new Thread(() -> {
                    try {
                        semaphore.acquire();
                        System.out.println("running...");
                        TimeUnit.SECONDS.sleep(1);
                        System.out.println("end...");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        semaphore.release();
                    }
    
                }).start();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    3.6.2 acquire原理

    信号量的原理与读写锁非常类似,如果懂了读写锁,我们稍稍看一下也就能明白信号量了(信号量的初始化赋值也就是给state赋值)。

    	public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            if (tryAcquireShared(arg) < 0)
                doAcquireSharedInterruptibly(arg);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    tryAcquireShared这个方法我们在读写锁的读锁见过,返回-1表示获取失败,大于等于0表示获取成功。
    我们看看非公平的实现方法,

    		final int nonfairTryAcquireShared(int acquires) {
                for (;;) {
                    int available = getState();
                    int remaining = available - acquires;
                    if (remaining < 0 ||
                        compareAndSetState(available, remaining))
                        return remaining;
                }
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    非常简单,获取当前state,然后减一,返回剩余的信号量值。
    如果获取失败,则执行doAcquireSharedInterruptibly方法,

    	private void doAcquireSharedInterruptibly(int arg)
            throws InterruptedException {
            // 节点类型为shared
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head) {
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    非常熟悉的一段代码,排行老二自旋获取信号量,获取不到就阻塞。
    若是公平信号量,区别仅仅在于tryAcquireShared的时候需要检查是否具有前驱节点,如果有,则立马返回-1。

    3.6.3 release原理

    	public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    首先将信号量加回去(tryReleaseShared),

    	protected final boolean tryReleaseShared(int releases) {
                for (;;) {
                    int current = getState();
                    int next = current + releases;
                    if (next < current) // overflow
                        throw new Error("Maximum permit count exceeded");
                    if (compareAndSetState(current, next))
                        return true;
                }
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    然后唤醒后继节点(doReleaseShared),由于节点类型为shared,这一操作会将后继所有节点都唤醒,由它们竞争信号量。

    	private void doReleaseShared() {
            for (;;) {
                Node h = head;
                if (h != null && h != tail) {
                    int ws = h.waitStatus;
                    if (ws == Node.SIGNAL) {
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            continue;            // loop to recheck cases
                        unparkSuccessor(h);
                    }
                    else if (ws == 0 &&
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue;                // loop on failed CAS
                }
                if (h == head)                   // loop if head changed
                    break;
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    四、设计模式

    4.1 保护性暂停

    保护性暂停(GuardedSuspension),用在一个线程等待另一个线程的执行结果

    • 有一个结果需要从一个线程传递到另一个线程,让他们关联同一个GuardedObject
    • 如果结果不断的从一个线程到另一个线程,可以使用消息队列(见生产者/消费者模式)
    • join、future的实现都是用的GuardedSuspension
    • 因为要等待另一方的结果,该模式属于同步模式
      在这里插入图片描述

    具体示例如下:t1需要等待t2的结果,它们都借助了GuardedObject对象。

    public class GuardedObject {
        // 结果
        private String res;
    
        public String get() {
            synchronized (this) {
                // 没有结果 等待结果
                while (res == null) {
                    try {
                        this.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                return res;
            }
        }
    
        // 产生结果
        public void set(String s) {
            synchronized (this) {
                res = s;
                this.notifyAll();
            }
        }
    
        public static void main(String[] args) {
            GuardedObject guardedObject = new GuardedObject();
            Thread t1 = new Thread("t1") {
                @Override
                public void run() {
                    // 获取结果
                    System.out.println("t1 等待结果");
                    String res = guardedObject.get();
                    System.out.println("t1 拿到结果啦:" + res);
                }
            };
    
            Thread t2 = new Thread("t2") {
                @Override
                public void run() {
                    System.out.println("t2 正在产生结果");
                    // 产生结果
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
    
                    String res = "nihao";
                    guardedObject.set(res);
                }
            };
    
            t1.start();
            t2.start();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58

    也可以加入超时get做进一步优化,由于引入了while,所以每次都要计算经历的时间。

    // 超时等待
    	public String get(long timeout) {
            synchronized (this) {
                // 没有结果 等待结果
                // 开始时间
                long begin = System.currentTimeMillis();
                // 经历的时间
                long passTime = 0;
                while (res == null) {
                    // 这一轮循环应该等待的时间
                    long waitTime = timeout - passTime;
                    if (waitTime <= 0) {
                        break;
                    }
                    try {
                        this.wait(waitTime);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    passTime = System.currentTimeMillis() - begin;
                }
                return res;
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    4.2 join()原理

    学习了前面的保护性暂停后,join()的原理就很好理解了,源代码如下

    public final synchronized void join(long millis) throws InterruptedException {
            long base = System.currentTimeMillis();
            long now = 0;
    
            if (millis < 0) {
                throw new IllegalArgumentException("timeout value is negative");
            }
    
            if (millis == 0) {
            	// 无限等待
                while (isAlive()) {
                    wait(0);
                }
            } else {
            	// 有限等待
                while (isAlive()) {
                    long delay = millis - now;
                    if (delay <= 0) {
                        break;
                    }
                    wait(delay);
                    now = System.currentTimeMillis() - base;
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    当一个线程调用t.join()时,它就要等待 t 运行结束,这里synchronized锁的是t这个对象。
    这种等待什么时候被唤醒呢?在java中,Thread类线程执行完run()方法后,一定会自动执行notifyAll()方法,所以当线程执行完后,它就会被唤醒。

    4.3 生产者/消费者

    前面介绍的保护性暂停对结果的拿和取都是一一对应的,对于多对多的这种情况,我们来看看优秀的生产者/消费者模式。
    在这里插入图片描述
    首先我们来定义消息类型,为了保证消息的安全,一旦创建就不能更改,使用final修饰。

    final class Msg {
        private int id;
        private Object val;
    
        public Msg(int id, Object val) {
            this.id = id;
            this.val = val;
        }
    
        public int getId() {
            return id;
        }
    
        public Object getVal() {
            return val;
        }
    
        @Override
        public String toString() {
            return "Msg{" +
                    "id=" + id +
                    ", val=" + val +
                    '}';
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    接下来定义队列,队列主要包含take()put()两个方法,容器使用双端队列,由于存在对list的并发操作,所以对list加锁,同时在take时判断队列是否为空,若为空则进入阻塞等待,若不为空则取出数据消费并通知所有在等待的线程,put操作同理。

    public class MessageQueue {
    
        private LinkedList<Msg> list = new LinkedList<>();
        // 容量
        private int cap;
    
        public MessageQueue(int cap) {
            this.cap = cap;
        }
    
        // 获取消息
        public Msg take() {
            synchronized (list) {
                while (list.isEmpty()) {
                    try {
                        System.out.println("队列为空,消费者线程等待");
                        list.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
    
                Msg msg = list.removeFirst();
                System.out.println("已消费消息:" + msg);
                list.notifyAll();
                return msg;
            }
        }
    
        // 存入消息
        public void put(Msg msg) {
            synchronized (list) {
                while (list.size() == cap) {
                    try {
                        System.out.println("队列已满,生产者线程等待");
                        list.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                list.addLast(msg);
                System.out.println("已生产消息:" + msg);
                list.notifyAll();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    测试:这里我们有三个生产者生产数据,只有一个消费者不断的从中取出数据

    public static void main(String[] args) {
            MessageQueue queue = new MessageQueue(2);
    
            // 模拟生产者
            for (int i = 0; i < 3; i++) {
                int id = i;
                new Thread(() -> queue.put(new Msg(id, "值"+id)), "生产者" + i).start();
            }
    
            // 模拟消费者
            new Thread(() -> {
                while (true) {
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    Msg take = queue.take();
                }
    
            }).start();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    输出:

    已生产消息:Msg{id=2, val=值2}
    已生产消息:Msg{id=1, val=值1}
    队列已满,生产者线程等待
    已消费消息:Msg{id=2, val=值2}
    已生产消息:Msg{id=0, val=值0}
    已消费消息:Msg{id=1, val=值1}
    已消费消息:Msg{id=0, val=值0}
    队列为空,消费者线程等待
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    4.4 同步模式

    4.4.1 固定运行顺序

    有t1和t2两个线程,它们分别打印1和2,现要求先打印2再打印1,怎么做能保证?
    方法一:wait/notify

    public class OrderPrint {
    
        private static final Object o = new Object();
        private static boolean runFlag = false; // 即使t2先运行 也不影响t1
    
        public static void main(String[] args) {
            method1();
        }
    
        private static void method1() {
            Thread t1 = new Thread("t1") {
                @Override
                public void run() {
                    synchronized (o) {
                        try {
                            while(!runFlag) {
                                o.wait();
                                System.out.println(1);
                            }
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            };
    
            Thread t2 = new Thread("t2") {
                @Override
                public void run() {
                    synchronized (o) {
                        System.out.println(2);
                        runFlag = true;
                        o.notify();
                    }
                }
            };
    
            t1.start();
            t2.start();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    方法二:LockSupport

    	private static void method2() {
            Thread t1 = new Thread("t1") {
                @Override
                public void run() {
                    LockSupport.park();
                    System.out.println(1);
                }
            };
    
            Thread t2 = new Thread("t2") {
                @Override
                public void run() {
                    System.out.println(2);
                    LockSupport.unpark(t1);
                }
            };
    
            t1.start();
            t2.start();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    4.4.2 交替输出

    下面来看线程1输出a5次,线程2输出b5次,线程3输出c5次,现要求输出abcabcabcabcabc,如何实现?
    方法一:wait/notify
    根据面向对象思想,我们把交替打印这个情景抽象出来为一个类,线程通过调用这个类的方法,可以实现打印/阻塞,交替打印需要关注

    1. 当前该打印的线程
    2. 下一个该打印的线程
    class WaitNotify{
        // 当前标记 1 2 3, 表示当前该那个线程执行
        private int flag;
        // 循环次数
        private int loopNum;
    
        public WaitNotify(int flag, int loopNum) {
            this.flag = flag;
            this.loopNum = loopNum;
        }
    
        public void print(String str, int waitFlag, int nextFlag) {
            for (int i = 0; i < loopNum; i++) {
                synchronized (this) {
                    // 当前标记和进入执行线程的标记不一致
                    while(waitFlag != flag) {
                        try {
                            this.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
    
                    System.out.print(str);
                    flag = nextFlag;
                    this.notifyAll();
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    那么线程的写法就非常简单了。

    	private static void method1() {
            WaitNotify waitNotify = new WaitNotify(1, 5);
            Thread t1 = new Thread("t1") {
                @Override
                public void run() {
                	// 打印"a", 标记1, 下一个标记2
                    waitNotify.print("a", 1, 2);
                }
            };
            Thread t2 = new Thread("t2") {
                @Override
                public void run() {
                    waitNotify.print("b", 2, 3);
                }
            };
            Thread t3 = new Thread("t3") {
                @Override
                public void run() {
                    waitNotify.print("c", 3, 1);
                }
            };
    
            t1.start();
            t2.start();
            t3.start();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    方法二:ReentrantLock
    ReentrantLock提供了多个”休息室“(WaitSet),可以考虑给三个线程分别分配一间”休息室“,起初三个线程都在休息等待,然后由主线程唤醒线程1,再由线程1做后续的工作。

    class AwaitSignal extends ReentrantLock {
        private int loopNum;
    
        public AwaitSignal(int loopNum) {
            this.loopNum = loopNum;
        }
    
        public void print(String str, Condition cur, Condition next) {
            for (int i = 0; i < loopNum; i++) {
                lock();
                try {
                    cur.await();
                    System.out.print(str);
                    next.signal();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    unlock();
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    同样,单独抽离一个类用做实现交替打印。

    	private static void method2() throws InterruptedException {
    
            AwaitSignal awaitSignal = new AwaitSignal(5);
            Condition a = awaitSignal.newCondition();
            Condition b = awaitSignal.newCondition();
            Condition c = awaitSignal.newCondition();
    
            Thread t1 = new Thread("t1") {
                @Override
                public void run() {
                    awaitSignal.print("a", a, b);
                }
            };
            Thread t2 = new Thread("t2") {
                @Override
                public void run() {
                    awaitSignal.print("b", b, c);
                }
            };
            Thread t3 = new Thread("t3") {
                @Override
                public void run() {
                    awaitSignal.print("c", c, a);
                }
            };
    
            t1.start();
            t2.start();
            t3.start();
    
            TimeUnit.SECONDS.sleep(1);
            System.out.println("begin...");
            awaitSignal.lock();
            try {
                a.signal();
            } finally {
                awaitSignal.unlock();
            }
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    方法三:LockSupport

    class Park {
        private int loopNum;
    
        public Park(int loopNum) {
            this.loopNum = loopNum;
        }
    
        public void print(String str, Thread next) {
            for (int i = 0; i < loopNum; i++) {
                // 当前线程等待
                LockSupport.park();
                // 线程被唤醒
                System.out.print(str);
                // 通知下一个线程
                LockSupport.unpark(next);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    先让所有线程等待,并设置好每个线程需要唤醒的下一个线程。

    	static Thread t1;
        static Thread t2;
        static Thread t3;
        
    	private static void method3() {
            Park park = new Park(5);
            t1 = new Thread(() -> park.print("a", t2), "t1");
            t2 = new Thread(() -> park.print("b", t3), "t2");
            t3 = new Thread(() -> park.print("c", t1), "t3");
            t1.start();
            t2.start();
            t3.start();
    
            LockSupport.unpark(t1);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    就像多米诺骨牌一样,推倒一个,后续相继都被前一个所推倒。

    4.5 单例模式

    相信双重检查单例模式大家再熟悉不过了,但是有必要再一次重温一下该模式的精妙之处,我们很容易想到以下单例的代码

    public class Singleton {
        private static Singleton uniqueInstance;
    
        private Singleton() {}
        
    	public static synchronized Singleton getInstance() {
            if (uniqueInstance == null) {
                uniqueInstance = new Singleton();
            }
            return uniqueInstance;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    改进1:双重检查
    虽然能实现单例,但由于对象只创建一次,创建过后锁就成为了不必要的开销,所以引入双重检查。

    	//同步代码块 双重检查加锁
        public static Singleton getInstance() {
            if (uniqueInstance == null){
                synchronized (Singleton.class){
                    if (uniqueInstance == null)
                        uniqueInstance = new Singleton();
                }
            }
            return uniqueInstance;
        }	
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    改进2:volatile禁止指令重排
    加入双重检查后,由于jvm指令重排,在uniqueInstance = new Singleton();这行代码可能会出现

    1. 创建Singleton对象
    2. 给uniqueInstance赋予Singleton对象地址
    3. 调用构造器初始化对象

    这样的情况,假如在2之后发生了线程切换,另一线程过来就会拿到未初始化的对象。
    所以这里需要使用volatile禁止指令重排。

    private volatile static Singleton uniqueInstance;
    
    • 1

    加入volatile过后,在对uniqueInstance = new Singleton();这行赋值时就不会出现2在3之前的情况了,

    1. 创建Singleton对象
    2. 调用构造器初始化对象
    3. 给uniqueInstance赋予Singleton对象地址(其后跟随写屏障)

    由于第三步末尾具有写屏障,所以能保证对象在创建初始化完成后才赋值给uniqueInstance。
    有了两步优化,单例模式为

    public class Singleton {
        private volatile static Singleton uniqueInstance;
    
        private Singleton() {}  
    
        //同步代码块 双重检查加锁
        public static Singleton getInstance() {
            if (uniqueInstance == null){
                synchronized (Singleton.class){
                    if (uniqueInstance == null)
                        uniqueInstance = new Singleton();
                }
            }
            return uniqueInstance;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    4.6 享元模式

    4.6.1 应用场景

    定义:重用数量有限的同一类对象,减少内存空间占用。(前提:final对象)
    我们知道,final修饰的对象它的值一般不可更改,如String的内部值value就是一个final修饰的char[],对它的各种更改都会重新创建对象,这样的话不就会产生很多对象占用内存了吗,享元模式提前创建好对象,能够一定程度的重用,减少内存空间占用。
    比如Boolean, Byte, Short, Integer, Long, Character等包装类提供了valueOf()方法,以Long为例,我们看看源代码

    	public static Long valueOf(long l) {
            final int offset = 128;
            if (l >= -128 && l <= 127) { // will cache
                return LongCache.cache[(int)l + offset];
            }
            return new Long(l);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    它会去判断 l 是否在-128-127之间,如果在此之间,那就不用创建对象,直接从缓存取,当调用cache时,就会初始化静态内部类LongCache,从而缓存-128-127之间的对象。

    	private static class LongCache {
            private LongCache(){}
    
            static final Long cache[] = new Long[-(-128) + 127 + 1];
    
            static {
                for(int i = 0; i < cache.length; i++)
                    cache[i] = new Long(i - 128);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    一般来说

    • Byte, Short, Long缓存的范围都是-128-127
    • Character的缓存范围是0-127
    • Integer的默认缓存的范围是-128-127,最小值不变,最大能调整
    • Boolean缓存了TRUE和FALSE

    4.6.2 自定义连接池

    场景:一个线上商城应用,每秒访问量达到数千,如果每次请求都创建和关闭数据库连接,性能会收到极大影响。这时预先创建好一批连接,放入连接池,一次请求到达后,从连接池获取连接,使用完毕后还回连接池,这样既节约了连接的创建和关闭时间,也实现了连接的重用,不至于让庞大的连接数压垮数据库。
    为了简单起见,对于连接,我们仅仅实现Connection接口,而不做具体的工作。

    class MockConnection implements Connection {
    
        private String name;
    
        public MockConnection(String name) {
            this.name = name;
        }
    
        @Override
        public String toString() {
            return "MockConnection{" +
                    "name='" + name + '\'' +
                    '}';
        }
    
        @Override
        public Statement createStatement() throws SQLException {
            return null;
        }
    	//...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    接下来自定义我们的连接池,由它来管理连接,该连接池一旦构造,就生成n个连接,并定义了“借”和“还”的操作。在“借”操作里,我们使用了cas来保证借的原子性,并对没有借到的线程做等待处理。

    class Pool {
        // 1.连接池大小
        private final int poolSize;
    
        // 2.连接对象数组
        private Connection[] connections;
    
        // 3.连接状态数组 0表示空闲,1表示繁忙
        private AtomicIntegerArray status;
    
        // 4.构造方法初始化
        Pool(int poolSize) {
            this.poolSize = poolSize;
            // 一开始就创建了poolSize个连接 享元模式
            this.connections = new Connection[poolSize];
            this.status = new AtomicIntegerArray(new int[poolSize]);
            for (int i = 0; i < poolSize; i++) {
                connections[i] = new MockConnection("连接" + (i + 1));
            }
        }
    
        // 5.借连接
        public Connection borrow() {
            while (true) {
                for (int i = 0; i < poolSize; i++) {
                    // 获取空闲连接
                    if (status.get(i) == 0) {
                        if (status.compareAndSet(i, 0, 1)) {
                            System.out.println(Thread.currentThread().getName() + " borrow " + connections[i]);
                            return connections[i];
                        }
                    }
                }
                // 如果没有空闲连接 等待
                synchronized (this) {
                    try {
                        System.out.println(Thread.currentThread().getName() + " wait...");
                        this.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    
        // 6.还连接
        public void free(Connection conn) {
            for (int i = 0; i < poolSize; i++) {
                if (connections[i] == conn) {
                    status.set(i, 0);
                    synchronized (this) {
                        System.out.println(Thread.currentThread().getName() + " free " + conn);
                        this.notifyAll();
                    }
                    break;
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59

    测试,我们将线程池的大小设置为2,并且有五个线程去竞争获取。

    public class DiyConnectionPool {
        public static void main(String[] args) {
            Pool pool = new Pool(2);
            for (int i = 0; i < 5; i++) {
                new Thread(() -> {
                    Connection conn = pool.borrow();
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        pool.free(conn);
                    }
                }, "t" + i).start();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    输出:

    t1 borrow MockConnection{name='连接2'}
    t3 wait...
    t0 borrow MockConnection{name='连接1'}
    t4 wait...
    t2 wait...
    t1 free MockConnection{name='连接2'}
    t2 borrow MockConnection{name='连接1'}
    t3 wait...
    t4 borrow MockConnection{name='连接2'}
    t0 free MockConnection{name='连接1'}
    t3 wait...
    t2 free MockConnection{name='连接1'}
    t3 borrow MockConnection{name='连接1'}
    t4 free MockConnection{name='连接2'}
    t3 free MockConnection{name='连接1'}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    4.6.3 自定义线程池

    假想存在这样一个场景,我们的服务器一瞬间来了一万个任务,为了做到高并发,我们肯定需要创建线程,如果为每一个任务创建一个线程,那么系统的负载将会非常大,所以需要一个管理线程的工具,这个工具能有效的应对并发情况,管理线程,并分配它们去执行任务,这个工具就叫做线程池。
    首先,我们不可能创建一万个线程吧,我们只能创建有限个线程,这样任务来的速度就会大于线程处理任务的速度,这种速度不匹配的情况,就要用到队列,如果线程都在忙,任务就进入队列等待,当线程处理完它手头的工作后,就去队列里面取,流程图如下所示。
    在这里插入图片描述
    我们先来定义这个队列,当队列满的时候,入队应该被阻塞,等待线程消费任务,当队列空的时候,出队应该被阻塞,等待放入任务,由于存在两个条件,所以我们使用Condition来完成等待和通知,具体实现如下所示

    class BlockingQueue<T> {
        // 1.任务队列
        private Deque<T> queue = new LinkedList<>();
    
        // 2.锁
        private ReentrantLock lock = new ReentrantLock();
    
        // 3.生产者条件变量
        private Condition fullWaitSet = lock.newCondition();
    
        // 4.消费者条件变量
        private Condition emptyWaitSet = lock.newCondition();
    
        // 5.容量
        private int capcity;
    
        public BlockingQueue(int capcity) {
            this.capcity = capcity;
        }
    
        // 带超时的阻塞获取
        public T poll(long timeout, TimeUnit unit) {
            lock.lock();
            try {
                // 统一转换为纳秒
                long nanos = unit.toNanos(timeout);
                while (queue.isEmpty()) {
                    try {
                        // 返回的是剩余等待时间
                        if (nanos <= 0) {
                            return  null;
                        }
                        nanos = emptyWaitSet.awaitNanos(nanos);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                T t = queue.removeFirst();
                fullWaitSet.signal();
                return t;
            } finally {
                lock.unlock();
            }
        }
    
        // 阻塞获取
        public T take() {
            lock.lock();
            try {
                while (queue.isEmpty()) {
                    try {
                        emptyWaitSet.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                T t = queue.removeFirst();
                fullWaitSet.signal();
                return t;
            } finally {
                lock.unlock();
            }
        }
    
        // 阻塞添加
        public void put(T task) {
            lock.lock();
            try {
                while (queue.size() == capcity) {
                    try {
                        System.out.println("等待加入任务队列{" + task + "}");
                        fullWaitSet.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("加入任务队列{" + task + "}");
                queue.addLast(task);
                emptyWaitSet.signal();
            } finally {
                lock.unlock();
            }
        }
    
        // 带超时时间的阻塞添加
        public boolean offer(T task, long timeout, TimeUnit timeUnit) {
            lock.lock();
            try {
                long nanos = timeUnit.toNanos(timeout);
                while (queue.size() == capcity) {
                    try {
                        System.out.println("等待加入任务队列{" + task + "}");
                        if (nanos <= 0) {
                            return false;
                        }
                        nanos = fullWaitSet.awaitNanos(nanos);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("加入任务队列{" + task + "}");
                queue.addLast(task);
                emptyWaitSet.signal();
                return true;
            } finally {
                lock.unlock();
            }
        }
    
        // 获取大小
        public int size() {
            lock.lock();
            try {
                return queue.size();
            } finally {
                lock.unlock();
            }
        }
    
        public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
            lock.lock();
            try {
                // 队列是否满
                if (queue.size() == capcity) {
                    rejectPolicy.reject(this, task);
                } else {
                    System.out.println("加入任务队列{" + task + "}");
                    queue.addLast(task);
                    emptyWaitSet.signal();
                }
            } finally {
                lock.unlock();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135

    可以看到,队列定义了如下几个方法:

    • poll(long timeout, TimeUnit unit) ,带超时的阻塞获取
    • take(),阻塞获取
    • put(T task),阻塞添加
    • tryPut(RejectPolicy rejectPolicy, T task),带拒绝策略的添加
    • size(),队列目前的任务数

    队列定义好后,就要定义我们的线程池,首先,我们需要一个集合来存储线程,还需要具体的执行方法,当任务数没有超过核心线程数时,创建线程,当任务数超过核心线程数后,就加入队列中。

    class ThreadPool {
        // 任务队列
        private BlockingQueue<Runnable> taskQueue;
    
        // 线程集合
        private HashSet<Worker> workers = new HashSet<>();
    
        // 核心线程数
        private int coreSize;
    
        // 任务的超时时间
        private long timeout;
    
        private TimeUnit timeUnit;
    
        private RejectPolicy<Runnable> rejectPolicy;
    
        public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity, RejectPolicy<Runnable> rejectPolicy) {
            this.coreSize = coreSize;
            this.timeout = timeout;
            this.timeUnit = timeUnit;
            this.rejectPolicy = rejectPolicy;
            this.taskQueue = new BlockingQueue<>(queueCapcity);
        }
    
        // 执行任务
        public synchronized void execute(Runnable task) {
            // 当任务数没有超过核心线程数时,交由worker对象执行
            if (workers.size() < coreSize) {
                Worker worker = new Worker(task);
                System.out.println("新增 worker{" + worker + "},{" + task + "}");
                workers.add(worker);
                worker.start();
            } else {
                // 当超过核心线程数时,加入任务队列
                //taskQueue.put(task);
                // 1.死等
                // 2.带超时等待
                // 3.放弃任务
                // 4.抛出异常
                // 5.调用者自己执行
                taskQueue.tryPut(rejectPolicy, task);
            }
        }
    
        class Worker extends Thread{
            private Runnable task;
    
            public Worker(Runnable task) {
                this.task = task;
            }
    
            @Override
            public void run() {
                // 1.当task不空,执行任务
                // 2.task执行完毕,接着从任务队列获取任务执行
                while (task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {
                    try {
                        System.out.println(Thread.currentThread().getName() + "正在执行{" + task + "}");
                        task.run();
                    } catch (Exception e) {
    
                    } finally {
                        task = null;
                    }
                }
                synchronized (workers) {
                    System.out.println("worker 被移除{" + this + "}");
                    workers.remove(this);
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73

    而线程的任务则是完成自己的任务,当队列还有任务时,去队列取任务执行。
    当超过核心线程数,也超过队列的最大容量时,可以由调用者决定执行哪种拒绝策略,我们使用接口把采用具体策略的权力交由调用者。

    public static void main(String[] args) {
    
            ThreadPool threadPool = new ThreadPool(2, 1000, TimeUnit.MILLISECONDS, 10, ((queue, task) -> {
                // 1.死等
                //queue.put(task);
                // 2.带超时等待
                //queue.offer(task, 500, TimeUnit.MILLISECONDS);
                // 3.放弃任务
                //System.out.println("我放弃");
                // 4.抛出异常
                //throw new RuntimeException("任务执行失败" + task);
                // 5.调用者自己执行
                task.run();
            }));
            for (int i = 0; i < 15; i++) {
                int j = i;
                threadPool.execute(()-> {
                    try {
                        Thread.sleep(100000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(j);
                });
            }
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
  • 相关阅读:
    怎么恢复永久删除的文件?这3个方法很实用!
    使用 Verilog 做一个可编程数字延迟定时器 LS7211-7212
    大数据必学Java基础(六):程序中常见问题和编译方式
    【Linux】权限不足的情况下在指定环境运行command
    sql16(Leetcode1251平均售价)
    7.21 - 每日一题 - 408
    计算机毕业设计hadoop+spark知识图谱课程推荐系统 课程预测系统 课程大数据 课程数据分析 课程大屏 mooc慕课推荐系统 大数据毕业设计
    天龙八部科举答题问题和答案(全5/8)
    Java:单例模式探究
    自定义注解实现日志打印时屏蔽特定字段不打印
  • 原文地址:https://blog.csdn.net/Yungang_Young/article/details/125695663