• 多线程设计模式


    两阶段终止

    线程T1需要终止线程T2时,如果使用stop方法直接终止线程T2,T2申请的锁得不到释放;因此使用两阶段终止模式更加优雅

    监控线程

    在这里插入图片描述

    @Slf4j
    public class TwoPhaseTermination{
        private Thread monitor;
    	private volatile boolean stop = false;
    	// 判断是否执行过start方法
    	private boolean starting = false;
    
        public void start(){
        	synchronized(this){
        		if(starting){
       				return;
       			}
       			starting = true;
        	}
            monitor = new Thread(()->{
                while(true){
                    Thread current = Thread.currentThread();
                    if(stop){
                        log.debug("料理后事");
                        break;
                    }
                    try{
                        Thread.sleep(1000);
                        log.debug("执行监控记录");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            monitor.start();
        }
    
        /**
         * 停止监控线程
         */
        public void stop(){
        	stop = true;
            monitor.interrupt();
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    保护性暂停

    保护性暂停用在一个线程等待另一个线程的执行结果时
    要点

    • 有一个结果需要从一个线程传递到另一个线程,让他们关联同一个GuardedObject
    • 如果有结果不断从一个线程到另一个线程,那么可以使用消息队列
    • JDK中,join的实现、Future的实现采用的就是这个模式
    • 因为要等待另一方的结果,所以归类到同步模式
      在这里插入图片描述
    class GuardedObject{
        private Object response;
    
        /**
         * 获取结果
         * @return
         */
        public Object getResponse(long timeout) {
            synchronized (this){
                long begin = System.currentTimeMillis();
                long passedTime = 0;
                while(response==null){
                    if(passedTime>=timeout){
                        break;
                    }
                    try{
                        this.wait(timeout-passedTime);
                    }catch (InterruptedException e){
                        e.printStackTrace();
                    }
                    passedTime = System.currentTimeMillis()-begin;
                }
                return response;
            }
        }
    
        /**
         * 设置结果
         * @param response
         */
        public void complete(Object response){
            synchronized (this){
                this.response = response;
                this.notifyAll();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    join原理

    public final synchronized void join(long millis)
        throws InterruptedException {
        	// 刚开始进入等待的时间
            long base = System.currentTimeMillis();
            // 已经等待过的时间
            long now = 0;
    
            if (millis < 0) {
                throw new IllegalArgumentException("timeout value is negative");
            }
    		// 超时时间等于0,则一直等待
            if (millis == 0) {
                while (isAlive()) {
                    wait(0);
                }
            } else {
            	// 超时时间大于0
                while (isAlive()) {
                	// 剩余的可以等待的时间
                    long delay = millis - now;
                    // 如果剩余的可以用的时间小于等于0,表示没法再等待了,直接退出
                    if (delay <= 0) {
                        break;
                    }
                    wait(delay);
                    now = System.currentTimeMillis() - base;
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    异步模式之生产者/消费者

    • 与保护性暂停中的GuardObject不同,不需要产生结果和消费结果的线程一一对应
    • 消费队列可以用来平衡生产和消费的线程资源
    • 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
    • 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
    • JDK中各种阻塞队列,采用的就是这种模式
      在这里插入图片描述
    /***
     * 线程间通信队列
     */
    class MessageQueue{
        private LinkedList<Message> queue = new LinkedList<>();
        private int capacity;
    
        public MessageQueue(int capacity){
            this.capacity = capacity;
        }
    
        public void put(Message message){
            synchronized (queue){
                while(queue.size()==capacity){
                    try {
                        queue.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                // 从尾部加入消息
                queue.addLast(message);
                queue.notifyAll();
            }
    
        }
    
        public Message take(){
            synchronized (queue){
                while(queue.isEmpty()){
                    try {
                        queue.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                // 从头部取得消息
                Message message = queue.removeFirst();
                queue.notifyAll();
                return message;
            }
    
        }
    }
    
    class Message{
        private int id;
        private Object value;
    
        public Message(int id,Object value){
            this.id = id;
            this.value = value;
        }
    
        public int getId() {
            return id;
        }
    
        public Object getValue() {
            return value;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62

    同步模式之顺序控制

    固定运行顺序

    必须现有两个线程,一个打印1,一个打印2;先要求先打印2再打印1
    wait-notify

    @Slf4j
    public final class Demo{
        private static boolean hasPrintTwo = false;
        private static Object lock = new Object();
        public static void main(String[] args) throws InterruptedException {
            Object lock = new Object();
            Thread printOne = new Thread(()->{
                synchronized (lock){
                    while(!hasPrintTwo){
                        try {
                            lock.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    log.debug("{}",1);
                }
    
            },"t1");
    
            Thread printTwo = new Thread(()->{
                log.debug("{}",2);
                hasPrintTwo = true;
                synchronized (lock){
                    lock.notifyAll();
                }
    
            },"t2");
    
            printOne.start();
            Thread.sleep(1000);
            printTwo.start();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    park/unpark

    @Slf4j
    public final class Demo{
        private static boolean hasPrintTwo = false;
        public static void main(String[] args) throws InterruptedException {
            Object lock = new Object();
            Thread printOne = new Thread(()->{
                while(!hasPrintTwo){
                    LockSupport.park();
                }
                log.debug("{}",1);
    
            },"t1");
    
            Thread printTwo = new Thread(()->{
                log.debug("{}",2);
                hasPrintTwo = true;
                LockSupport.unpark(printOne);
    
            },"t2");
    
            printOne.start();
            Thread.sleep(1000);
            printTwo.start();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    交替输出

    三个线程交替输出a,b,c
    wait/notify

    @Slf4j
    public final class Demo{
        public static void main(String[] args) throws InterruptedException {
            WaitNotify waitNotify = new WaitNotify(1,5);
            Thread printA = new Thread(()->{
                waitNotify.print("A",1,2);
            },"t1");
    
            Thread printB = new Thread(()->{
                waitNotify.print("B",2,3);
            },"t2");
    
            Thread printC = new Thread(()->{
                waitNotify.print("C",3,1);
            },"t3");
    
            printA.start();
            printB.start();
            printC.start();
        }
    }
    
    class WaitNotify{
        /**
         * 等待标记,当等待标记和传入的标记一致才会退出等待
         */
        private int flag;
        /**
         * 循环次数
         */
        private int loopNumber;
    
        public WaitNotify(int flag,int loopNumber){
            this.flag = flag;
            this.loopNumber = loopNumber;
        }
    
        /**
         * 循环打印
         * @param str
         * @param waitFlag
         * @param nextFlag
         */
        public void print(String str,int waitFlag,int nextFlag){
            for (int i = 0; i < loopNumber; i++) {
                synchronized (this){
                    while(flag != waitFlag){
                        try {
                            this.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    System.out.println(str);
                    flag = nextFlag;
                    this.notifyAll();
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60

    await/signal

    @Slf4j
    public final class Demo{
        public static void main(String[] args) throws InterruptedException {
            AwaitSignal awaitSignal = new AwaitSignal(5);
            Condition a = awaitSignal.newCondition();
            Condition b = awaitSignal.newCondition();
            Condition c = awaitSignal.newCondition();
            Thread printA = new Thread(()->{
                awaitSignal.print("A",a,b);
            },"t1");
    
            Thread printB = new Thread(()->{
                awaitSignal.print("B",b,c);
            },"t2");
    
            Thread printC = new Thread(()->{
                awaitSignal.print("C",c,a);
            },"t3");
    
            printA.start();
            printB.start();
            printC.start();
    
            // 唤醒第一个线程,接下来的线程会被循环睡眠并唤醒
            Thread.sleep(1000);
            awaitSignal.lock();
            try{
                a.signal();
            }finally {
                awaitSignal.unlock();
            }
        }
    }
    
    class AwaitSignal extends ReentrantLock{
        private int loopNumber;
    
    
        public AwaitSignal(int loopNumber){
            this.loopNumber = loopNumber;
        }
    
        public void print(String str,Condition current,Condition next){
            for (int i = 0; i < loopNumber; i++) {
                lock();
                try {
                    // 将当前条件进入等待
                    current.await();
                    // 退出当前条件的等待
                    System.out.println(str);
                    // 唤醒下一条件
                    next.signal();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    unlock();
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60

    park/unpark

    @Slf4j
    public final class Demo{
        static Thread printA;
        static Thread printB;
        static Thread printC;
        public static void main(String[] args) throws InterruptedException {
            ParkUnPark parkUnPark = new ParkUnPark(5);
            printA = new Thread(()->{
                parkUnPark.print("A",printB);
            },"t1");
    
            printB = new Thread(()->{
                parkUnPark.print("B",printC);
            },"t2");
    
            printC = new Thread(()->{
                parkUnPark.print("C",printA);
            },"t3");
    
            printA.start();
            printB.start();
            printC.start();
    
            // 唤醒第一个线程,接下来的线程会被循环睡眠并唤醒
            Thread.sleep(1000);
            LockSupport.unpark(printA);
        }
    }
    
    class ParkUnPark{
        private int loopNumber;
    
    
        public ParkUnPark(int loopNumber){
            this.loopNumber = loopNumber;
        }
    
        public void print(String str,Thread next){
            for (int i = 0; i < loopNumber; i++) {
                LockSupport.park();
                System.out.println(str);
                LockSupport.unpark(next);
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
  • 相关阅读:
    R语言时间序列数据提取:使用xts包的first函数提取时间序列中最前面一个月的数据(first 1 month)
    【VS插件】VS code上的Remote - SSH
    WebGPT VS WebGPU
    【shell】反引号的转义问题
    Leetcode 340. 至多包含 K 个不同字符的最长子串(滑动窗口)
    一个简单好用安全的开源交互审计系统、轻量级堡垒机系统
    学好Elasticsearch系列-索引的批量操作
    【Linux】CentOS 虚拟机
    ubuntu16.04下thrift安装及遇到的问题
    【Hadoop】-Apache Hive使用语法与概念原理[15]
  • 原文地址:https://blog.csdn.net/m0_48468380/article/details/127433597