• Java多线程同步工具类:Semaphore、CountDownLatch 、CyclicBarrier、Exchanger原理剖析


    Java多线程同步工具类:Semaphore、CountDownLatch 、CyclicBarrier、Exchanger原理剖析

    前驱知识准备:AbstractQueuedSynchronizer队列同步器

    [Java多线程之:队列同步器AbstractQueuedSynchronizer原理剖析]

    一、Semaphore

    原理剖析

    Semaphore也就是信号量,提供了资源数量的并发访问控制,可以用于限制访问某些资源(物理或逻辑的)的线程数目。Semaphore是AQS队列同步器中对共享锁的子类实现。Semaphore维护了一个许可证集合,有多少资源需要限制就维护多少许可证集合,假如这里有N个资源,那就对应于N个许可证,同一时刻也只能有N个线程访问。一个线程获取许可证就调用acquire方法,用完了释放资源就调用release方法。

    Semaphore核心代码功能使用如下所示:

    // ⼀开始有5份共享资源。第⼆个参数表示是否是公平
    Semaphore myResources = new Semaphore(5, true);
    
    
    // 工作线程每获取⼀份资源,就在该对象上记下来
    // 在获取的时候是按照公平的方式还是非公平的方式,就要看上⼀行代码的第二个参数了。
    // ⼀般⾮公平抢占效率较高。
    myResources.acquire();
    
    // 工作线程每归还⼀份资源,就在该对象上记下来
    // 此时资源可以被其他线程使⽤
    myResources.release();
    
    /*
    释放指定数目的许可,并将它们归还给信标。
    可用许可数加上该指定数目。
    如果线程需要获取N个许可,在有N个许可可用之前,该线程阻塞。
    如果线程获取了N个许可,还有可用的许可,则依次将这些许可赋予等待获取许可的其他线程。
    */
    semaphore.release(2);
    
    /*
    从信标获取指定数⽬的许可。如果可用许可数目不够,则线程阻塞,直到被中断。
    
    该⽅法效果与循环相同,
    for (int i = 0; i < permits; i++) acquire();
    只不过该方法是原⼦操作。
    
    如果可用许可数不够,则当前线程阻塞,直到:(⼆选⼀)
    1. 如果其他线程释放了许可,并且可用的许可数满足当前线程的请求数字;
    2. 其他线程中断了当前线程。
    
    permits – 要获取的许可数
    */
    semaphore.acquire(3);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    Semaphore在争抢资源时的示意图如下图所示,假设有n个线程来获取Semaphore⾥⾯的10份资源(n > 10), n个线程中只有10个线程能获取到,其他线程都会阻塞。直到有线程释放了资源,其他线程才能获取到。

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ro65uLPv-1670164010648)(E:\笔记\截图\SemaPhore争抢资源示意图.png)]

    当初始的资源个数为1的时候, Semaphore退化为排他锁。正因为如此, Semaphone的实现原理和锁十分类似,是基于AQS,有公平和非公平之分

    public void acquire() throws InterruptedException {
    	sync.acquireSharedInterruptibly(1);
    }
    public void release() {
    	sync.releaseShared(1);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    SemaPhore也是使用了队列同步器AbstractQueuedSynchronizer来实现多线程间的同步操作,abstract static class Sync extends AbstractQueuedSynchronizer

    acquire()方法通过判断剩余资源数和线程所需资源数的差值是否小于0,如果小于0,则当前线程进行阻塞,如果大于0,对state变量(state变量在AQS中表示同步状态)进行CAS减操作,减到0之后,线程阻塞。在release里对state变量进行CAS加操作。

    public class Semaphore {
    	protected final boolean tryReleaseShared(int releases) {
    	for (;;) {
    		int current = getState();
    		int next = current + releases;
    		if (next < current) // overflow
    			throw new Error("Maximum permit count exceeded");
    		if (compareAndSetState(current, next))
    			return true;
    		}
    	}
    	static final class FairSync extends Sync {
    		// ...
    		FairSync(int permits) {
    		super(permits);
    	}
    	protected int tryAcquireShared(int acquires) {
    		for (;;) {
    			if (hasQueuedPredecessors())
    				return -1;
    			int available = getState();
                // 判断剩余资源数,如果<0,说明资源数不够了,获取资源失败
    			int remaining = available - acquires;
    			if (remaining < 0 || compareAndSetState(available, remaining))
    				return remaining;
    			}
    		}
    	}
    }
    
    /**
         * Acquires in shared interruptible mode.
         * @param arg the acquire argument
         */
        private void doAcquireSharedInterruptibly(int arg)
            throws InterruptedException {
            // addWaiter将当前线程封装成共享节点,放在等待队列尾部
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                for (;;) {
                    // 当前线程前驱节点
                    final Node p = node.predecessor();
                    if (p == head) {
                        // 尝试获取args数量的资源数,如果资源数不够,则返回<0
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            // 向后传播唤醒当前节点后的节点
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63

    关于该方法的更多详细原理剖析,可以看AQS介绍的相关博客:[Java多线程之:队列同步器AbstractQueuedSynchronizer原理剖析](https://blog.csdn.net/Urbanears/article/details/128177063?spm=1001.2014.3001.5502)

    实战案例

    下面给出一个案例来测试Semaphore的使用:

    case: 自习室抢座,写作业:

    抢座位的线程:

    import java.util.Random;
    import java.util.concurrent.Semaphore;
    
    public class MyThread extends Thread {
        private final Semaphore semaphore;
        private final Random random = new Random();
        public MyThread(String name, Semaphore semaphore) {
            super(name);
            this.semaphore = semaphore;
        }
        @Override
        public void run() {
            try {
                semaphore.acquire();
                System.out.println(Thread.currentThread().getName() + " - 抢座成功,开始写作业");
                Thread.sleep(random.nextInt(1000));
                System.out.println(Thread.currentThread().getName() + " - 作业完成,腾出座位");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            semaphore.release();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    主方法:

    import java.util.concurrent.Semaphore;
    public class Demo {
        public static void main(String[] args) throws InterruptedException {
            Semaphore semaphore = new Semaphore(2);
            for (int i = 0; i < 5; i++) {
                new MyThread("学⽣-" + (i + 1), semaphore).start();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    上面主方法中,调用new Semaphore(2)定义了2份共享资源,或者两份许可证,也就是同一时刻最多只允许2个线程执行。输出结果:

    学⽣-1 - 抢座成功,开始写作业
    学⽣-2 - 抢座成功,开始写作业
    学⽣-2 - 作业完成,腾出座位
    学⽣-3 - 抢座成功,开始写作业
    学⽣-1 - 作业完成,腾出座位
    学⽣-4 - 抢座成功,开始写作业
    学⽣-3 - 作业完成,腾出座位
    学⽣-5 - 抢座成功,开始写作业
    学⽣-5 - 作业完成,腾出座位
    学⽣-4 - 作业完成,腾出座位
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    二、CountDownLatch

    原理剖析

    CountDownLoatch类常被用于计数递减的多线程同步中。在构造CountDownLatch的时候需要传入一个整数n,在这个整数“倒数”到0之前,主线程需要等待,而这个“倒数”过程则是由各个执行线程驱动的,每个线程执行完一个任务“倒数”一次。

    下图为CountDownLatch相关类的继承层次, CountDownLatch原理和Semaphore原理类似,同样是基于AQS,不过没有公平和非公平之分。

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ThWA3Ee0-1670164010650)(E:\笔记\截图\CountDownLatch1.png)]

    await()实现分析

    public void await() throws InterruptedException {
    	// AQS的模板⽅法
    	sync.acquireSharedInterruptibly(1);
    }
    
    public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    	if (Thread.interrupted())
    		throw new InterruptedException();
    	// 被CountDownLatch.Sync实现
    	if (tryAcquireShared(arg) < 0)
    		doAcquireSharedInterruptibly(arg);
    }
    
    protected int tryAcquireShared(int acquires) {
    	return (getState() == 0) ? 1 : -1;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    从tryAcquireShared(…)⽅法的实现来看,只要state != 0,调⽤await()方法的线程便会被放⼊AQS的阻塞队列,进⼊阻塞状态。

    countDown()实现分析

    public void countDown() {
    	sync.releaseShared(1);
    }
    
    // AQS的模板⽅法
    public final boolean releaseShared(int arg) {
    	// 由CountDownLatch.Sync实现
    	if (tryReleaseShared(arg)) {
    		doReleaseShared();
    		return true;
    	}
    	return false;
    }
    
    protected boolean tryReleaseShared(int releases) {
    	// Decrement count; signal when transition to zero
    	for (;;) {
    		int c = getState();
    		if (c == 0)
    			return false;
    		int nextc = c - 1;
    		if (compareAndSetState(c, nextc))
    			return nextc == 0;
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    countDown()调用的AQS的模板方法releaseShared(),里面的tryReleaseShared()由CountDownLatch.Sync实现。从上面的代码可以看出,只有state=0, tryReleaseShared()才会返回true,然后执行doReleaseShared(…),⼀次性唤醒队列中所有阻塞的线程。

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5dDPy7p4-1670164010650)(E:\笔记\截图\CountDownLatch2.png)]

    总结

    由于是基于AQS阻塞队列来实现的,所以可以让多个线程都阻塞在state=0条件上,通过countDown()⼀直减state,减到0后⼀次性唤醒所有线程。如上图所示,假设初始总数为M, N个线程await(), M个线程countDown(),减到0之后, N个线程被唤醒。

    实战案例

    假设⼀个主线程要等待5个 Worker 线程执行完才能退出,可以使用CountDownLatch来实现:

    import java.util.Random;
    import java.util.concurrent.CountDownLatch;
    public class MyThread extends Thread {
    	private final CountDownLatch latch;
    	private final Random random = new Random();
    	public MyThread(String name, CountDownLatch latch) {
    		super(name);
    		this.latch = latch;
    	}
        
    	@Override
    	public void run() {
    		try {
    			Thread.sleep(random.nextInt(2000));
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    		System.out.println(Thread.currentThread().getName() + "运⾏结束");
    		latch.countDown();
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    主方法:

    import java.util.concurrent.CountDownLatch;
    public class Main {
    	public static void main(String[] args) throws InterruptedException {
    		CountDownLatch latch = new CountDownLatch(5);
    		new MyThread("线程1", latch).start();
    		new MyThread("线程2", latch).start();
    		new MyThread("线程3", latch).start();
    		new MyThread("线程4", latch).start();
            
    		// new MyThread("线程5", latch).start();
    		// 主线程等待,直到计数为0
    		latch.await();
    		System.out.println("程序运⾏结束");
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    三、CyclicBarrier

    原理剖析

    CyclicBarrier使用方式比较简单:

    CyclicBarrier barrier = new CyclicBarrier(5);
    barrier.await();
    
    • 1
    • 2

    该类用于协调多个线程同步执行操作的场合。

    CyclicBarrier基于ReentrantLock+Condition实现 。

    public class CyclicBarrier {
    	private final ReentrantLock lock = new ReentrantLock();
    	// ⽤于线程之间相互唤醒
    	private final Condition trip = lock.newCondition();
    	// 线程总数
    	private final int parties;
    	private int count;
    	private Generation generation = new Generation();
    	// ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    下面详细介绍 CyclicBarrier 的实现原理。先看构造方法:

    public CyclicBarrier(int parties, Runnable barrierAction) {
    	if (parties <= 0) throw new IllegalArgumentException();
    	// 参与⽅数量
    	this.parties = parties;
    	this.count = parties;
    	// 当所有线程被唤醒时,执⾏barrierCommand表示的Runnable。
    	this.barrierCommand = barrierAction;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    接下来看⼀下await()方法的实现过程。

        public int await() throws InterruptedException, BrokenBarrierException {
            try {
                return dowait(false, 0L);
            } catch (TimeoutException toe) {
                throw new Error(toe); // cannot happen
            }
        }
    
    	/**
         * Main barrier code, covering the various policies.
         */
        private int dowait(boolean timed, long nanos)
            throws InterruptedException, BrokenBarrierException,
                   TimeoutException {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                final Generation g = generation;
    
                if (g.broken)
                    throw new BrokenBarrierException();
    			
                // 响应中断
                if (Thread.interrupted()) {
                    // 唤醒所有阻塞的线程
                    breakBarrier();
                    throw new InterruptedException();
                }
    			
                // 每个线程调用一次await()
                int index = --count;
                // 当count减成0的时候,此线程唤醒其他所有线程
                if (index == 0) {  // tripped
                    boolean ranAction = false;
                    try {
                        final Runnable command = barrierCommand;
                        if (command != null)
                            command.run();
                        ranAction = true;
                        nextGeneration();
                        return 0;
                    } finally {
                        if (!ranAction)
                            breakBarrier();
                    }
                }
    
                // loop until tripped, broken, interrupted, or timed out
                for (;;) {
                    try {
                        if (!timed)
                            trip.await();
                        else if (nanos > 0L)
                            nanos = trip.awaitNanos(nanos);
                    } catch (InterruptedException ie) {
                        if (g == generation && ! g.broken) {
                            breakBarrier();
                            throw ie;
                        } else {
                            // We're about to finish waiting even if we had not
                            // been interrupted, so this interrupt is deemed to
                            // "belong" to subsequent execution.
                            Thread.currentThread().interrupt();
                        }
                    }
    
                    if (g.broken)
                        throw new BrokenBarrierException();
    
                    if (g != generation)
                        return index;
    
                    if (timed && nanos <= 0L) {
                        breakBarrier();
                        throw new TimeoutException();
                    }
                }
            } finally {
                lock.unlock();
            }
        }
    	
    	private void nextGeneration() {
            // signal completion of last generation
            trip.signalAll();
            // set up next generation
            count = parties;
            generation = new Generation();
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90

    关于上面的方法,有几点说明:

    1. **CyclicBarrier是可以被重用的。**比如对于10个线程,这10个线程互相等待,到齐后⼀起被唤醒,各自执行接下来的逻辑。然后,这10个线程继续互相等待,到齐后再⼀起被唤醒。每⼀轮被称为⼀个Generation,就是⼀次同步点。
    2. CyclicBarrier 会响应中断。 10 个线程没有到齐,如果有线程收到了中断信号,所有阻塞的线程也会被唤醒,就是上⾯的breakBarrier()方法。然后count被重置为初始值(parties),重新开始。
    3. 上面的回调方法, barrierAction只会被第10个线程执行1次(在唤醒其他9个线程之前),而不是10个线程每个都执行1次。

    实战案例

    使用场景: 10个工程师⼀起来公司应聘,招聘方式分为笔试和面试。首先,要等⼈到齐后,开始笔试。笔试结束之后,再⼀起参加面试。把10个⼈看作10个线程, 10个线程之间的同步过程如下图所示:

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-vsxIf3Nv-1670164010650)(E:\笔记\截图\CyclicBarrier.png)]

    Main类:

    import java.util.concurrent.BrokenBarrierException;
    import java.util.concurrent.CyclicBarrier;
    
    public class Main {
    	public static void main(String[] args) throws BrokenBarrierException,
    	InterruptedException {
    		CyclicBarrier barrier = new CyclicBarrier(5);
    		for (int i = 0; i < 5; i++) {
    			new MyThread("线程-" + (i + 1), barrier).start();
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    MyThread类:

    import java.util.Random;
    import java.util.concurrent.BrokenBarrierException;
    import java.util.concurrent.CyclicBarrier;
    public class MyThread extends Thread {
    	private final CyclicBarrier barrier;
    	private final Random random = new Random();
    	public MyThread(String name, CyclicBarrier barrier) {
    		super(name);
            this.barrier = barrier;
    	}
        
        
    	@Override
    	public void run() {
    		try {
    			Thread.sleep(random.nextInt(2000));
    			System.out.println(Thread.currentThread().getName() + " - 已经到达公司");
    			barrier.await();
                
    			Thread.sleep(random.nextInt(2000));
    			System.out.println(Thread.currentThread().getName() + " - 已经笔试结束");
    			barrier.await();
                
    			Thread.sleep(random.nextInt(2000));
    			System.out.println(Thread.currentThread().getName() + " - 已经⾯试结束");
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		} catch (BrokenBarrierException e) {
    			e.printStackTrace();
    		}
    		super.run();
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    在整个过程中,有2个同步点:第1个同步点,要等所有应聘者都到达公司,再⼀起开始笔试;第2个同步点,要等所有应聘者都结束笔试,之后⼀起进⼊面试环节。

    输出结果:

    线程-2 - 已经到达公司
    线程-1 - 已经到达公司
    线程-5 - 已经到达公司
    线程-3 - 已经到达公司
    线程-4 - 已经到达公司
    线程-3 - 已经笔试结束
    线程-2 - 已经笔试结束
    线程-1 - 已经笔试结束
    线程-4 - 已经笔试结束
    线程-5 - 已经笔试结束
    线程-1 - 已经面试结束
    线程-4 - 已经面试结束
    线程-2 - 已经面试结束
    线程-5 - 已经面试结束
    线程-3 - 已经面试结束
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    可以看到,每次5个线程结束后,才会执行barrier.await();下面的代码。

    四、Exchanger

    原理剖析

    Exchanger用于于线程之间交换数据,其核心机制和Lock⼀样,也是CAS+park/unpark。

    首先,在Exchanger内部,有两个内部类: Participant和Node,代码如下:

    public class Exchanger<V> {
    	// ...
    	// 添加了Contended注解,表示伪共享与缓存⾏填充
    	@jdk.internal.vm.annotation.Contended static final class Node {
    		int index; // Arena index
    		int bound; // Last recorded value of Exchanger.bound
    		int collides; // 本次绑定中, CAS操作失败次数
    		int hash; // ⾃旋伪随机
    		Object item; // 本线程要交换的数据
    		volatile Object match; // 对⽅线程交换来的数据
    		// 当前线程
    		volatile Thread parked; // 当前线程阻塞的时候设置该属性,不阻塞为null。
    	}
        
    	static final class Participant extends ThreadLocal<Node> {
    		public Node initialValue() { return new Node(); }
    	}
    	// ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    每个线程在调用exchange(...)方法交换数据的时候,会先创建⼀个Node对象。
    这个Node对象就是对该线程的包装,里面包含了3个重要字段:第⼀个是该线程要交互的数据,第⼆个是对方线程交换来的数据,最后⼀个是该线程自身。
    ⼀个Node只能⽀持2个线程之间交换数据,要实现多个线程并行地交换数据,需要多个Node,因此在Exchanger里面定义了Node数组:

    	/**
         * Elimination array; null until enabled (within slotExchange).
         * Element accesses use emulation of volatile gets and CAS.
         */
        private volatile Node[] arena;
    
    • 1
    • 2
    • 3
    • 4
    • 5

    exchange(V x)实现分析

    明白了⼤致思路,下面来看exchange(V x)⽅法的详细实现:

    	@SuppressWarnings("unchecked")
        public V exchange(V x) throws InterruptedException {
            Object v;
            Object item = (x == null) ? NULL_ITEM : x; // translate null args
            if ((arena != null ||
                 (v = slotExchange(item, false, 0L)) == null) &&
                ((Thread.interrupted() || // disambiguates null return
                  (v = arenaExchange(item, false, 0L)) == null)))
                throw new InterruptedException();
            return (v == NULL_ITEM) ? null : (V)v;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    上面方法中

    • 如果arena不是null,表示启⽤了arena方式交换数据。如果arena不是null,并且线程被中断,则抛异常
    • 如果arena不是null,并且arenaExchange的返回值为null,则抛异常。对⽅线程交换来的null值是封装为NULL_ITEM对象的,而不是null。
    • 如果slotExchange的返回值是null,并且线程被中断,则抛异常。
    • 如果slotExchange的返回值是null,并且areaExchange的返回值是null,则抛异常。

    slotExchange的实现

    public class Exchanger<V> {
    	// ...
    	/**
    	* 如果不启⽤arenas,则使⽤该⽅法进⾏线程间数据交换。
        *
    	* @param item 需要交换的数据
    	* @param timed 是否是计时等待, true表示是计时等待
    	* @param ns 如果是计时等待,该值表示最⼤等待的时⻓。
    	* @return 对⽅线程交换来的数据;如果等待超时或线程中断,或者启⽤了arena,则返回null。
    	*/
    	private final Object slotExchange(Object item, boolean timed, long ns) {
    		// participant在初始化的时候设置初始值为new Node()
    		// 获取本线程要交换的数据节点
    		Node p = participant.get();
    		// 获取当前线程
    		Thread t = Thread.currentThread();
    		// 如果线程被中断,则返回null。
    		if (t.isInterrupted())
    			return null;
    		for (Node q;;) {
                // 如果slot⾮空,表明有其他线程在等待该线程交换数据
    			if ((q = slot) != null) {
    				// CAS操作,将当前线程的slot由slot设置为null
    				// 如果操作成功,则执⾏if中的语句
    				if (SLOT.compareAndSet(this, q, null)) {
    					// 获取对⽅线程交换来的数据
    					Object v = q.item;
    					// 设置要交换的数据
    					q.match = item;
    					// 获取q中阻塞的线程对象
    					Thread w = q.parked;
    					if (w != null)
    						// 如果对⽅阻塞的线程⾮空,则唤醒阻塞的线程
    						LockSupport.unpark(w);
    					return v;
    				}
    				// create arena on contention, but continue until slot null
    				// 创建arena⽤于处理多个线程需要交换数据的场合,防⽌slot冲突
    				if (NCPU > 1 && bound == 0 &&
    					BOUND.compareAndSet(this, 0, SEQ)) {
    						arena = new Node[(FULL + 2) << ASHIFT];
    					}
    			}
    			// 如果arena不是null,需要调⽤者调⽤arenaExchange⽅法接着获取对⽅线程交换来的数据
    			else if (arena != null)
    				return null;
    			else {
    				// 如果slot为null,表示对⽅没有线程等待该线程交换数据
    				// 设置要交换的本⽅数据
    				p.item = item;
    				// 设置当前线程要交换的数据到slot
    				// CAS操作,如果设置失败,则进⼊下⼀轮for循环
    				if (SLOT.compareAndSet(this, null, p))
    					break;
    				p.item = null;
    			}
    		}
            
    		// 没有对⽅线程等待交换数据,将当前线程要交换的数据放到slot中,是⼀个Node对象
    		// 然后阻塞,等待唤醒
    		int h = p.hash;
    		// 如果是计时等待交换,则计算超时时间;否则设置为0。
    		long end = timed ? System.nanoTime() + ns : 0L;
    		// 如果CPU核⼼数⼤于1,则使⽤SPINS数,⾃旋;否则为1,没必要⾃旋。
    		int spins = (NCPU > 1) ? SPINS : 1;
    		// 记录对⽅线程交换来的数据
    		Object v;
    		// 如果p.match==null,表示还没有线程交换来数据
    		while ((v = p.match) == null) {
    			// 如果⾃旋次数⼤于0,计算hash随机
                if (spins > 0) {
                	// 生成随机数,⽤于⾃旋次数控制
    				h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
    				if (h == 0)
    					h = SPINS | (int)t.getId();
    				else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
    					Thread.yield();
    			// p是ThreadLocal记录的当前线程的Node。
    			// 如果slot不是p表示slot是别的线程放进去的
    			} else if (slot != p) {
    				spins = SPINS;
    			} else if (!t.isInterrupted() && arena == null &&
    					(!timed || (ns = end - System.nanoTime()) > 0L)) {
    				p.parked = t;
    				if (slot == p) {
    					if (ns == 0L)
    						// 阻塞当前线程
    						LockSupport.park(this);
    					else
    						// 如果是计时等待,则阻塞当前线程指定时间
    						LockSupport.parkNanos(this, ns);
    				}
    				p.parked = null;
    			} else if (SLOT.compareAndSet(this, p, null)) {
    				// 没有被中断但是超时了,返回TIMED_OUT,否则返回null
    				v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
    				break;
    			}
    		}
    		// match设置为null值 CAS
    		MATCH.setRelease(p, null);
    		p.item = null;
    		p.hash = h;
    		// 返回获取的对⽅线程交换来的数据
    		return v;
    	}
    	// ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108

    arenaExchange的实现

    public class Exchanger<V> {
    	// ...
    	/**
    	* 当启⽤arenas的时候,使⽤该⽅法进⾏线程间的数据交换。
    	*
    	* @param item 本线程要交换的⾮null数据。
    	* @param timed 如果需要计时等待,则设置为true。
    	* @param ns 表示计时等待的最⼤时⻓。
    	* @return 对⽅线程交换来的数据。如果线程被中断,或者等待超时,则返回null。
    	*/
    	private final Object arenaExchange(Object item, boolean timed, long ns) {
    		Node[] a = arena;
    		int alen = a.length;
    		Node p = participant.get();
    		// 访问下标为i处的slot数据
    		for (int i = p.index;;) { // access slot at i
    			int b, m, c;
    			int j = (i << ASHIFT) + ((1 << ASHIFT) - 1);
    			if (j < 0 || j >= alen)
    				j = alen - 1;
    			
                // 取出arena数组的第j个Node元素
    			Node q = (Node)AA.getAcquire(a, j);
    			// 如果q不是null,则将数组的第j个元素由q设置为null
    			if (q != null && AA.compareAndSet(a, j, q, null)) {
                    // 获取对⽅线程交换来的数据
                    Object v = q.item; // release
                    // 设置本⽅线程交换的数据
                    q.match = item;
                    // 获取对⽅线程对象
                    Thread w = q.parked;
                    if (w != null)
                    // 如果对⽅线程⾮空,则唤醒对⽅线程
                    	LockSupport.unpark(w);
                    return v;
    			}
    			// 如果⾃旋次数没达到边界,且q为null
    			else if (i <= (m = (b = bound) & MMASK) && q == null) {
                    // 提供本⽅数据
                    p.item = item; // offer
                    // 将arena的第j个元素由null设置为p
                    if (AA.compareAndSet(a, j, null, p)) {
                    	long end = (timed && m == 0) ? System.nanoTime() + ns : 0L;
                    	Thread t = Thread.currentThread(); // wait
                    	// ⾃旋等待
                    	for (int h = p.hash, spins = SPINS;;) {
                    		// 获取对⽅交换来的数据
                    		Object v = p.match;
                    		// 如果对⽅交换来的数据⾮空
                    		if (v != null) {
                    			// 将p设置为null, CAS操作
                    			MATCH.setRelease(p, null);
                                // 清空
                                p.item = null; // clear for next use
                                p.hash = h;
                                // 返回交换来的数据
                                return v;
                        	}
                            // 产⽣随机数,⽤于限制⾃旋次数
                            else if (spins > 0) {
                            	h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift
                            	if (h == 0) // initialize hash
                           			 h = SPINS | (int)t.getId();
                            	else if (h < 0 && // approx 50% true
                            				(--spins & ((SPINS >>> 1) - 1)) == 0)
                           			 Thread.yield(); // two yields per wait
                            }
                            // 如果arena的第j个元素不是p
                            else if (AA.getAcquire(a, j) != p)
                            	spins = SPINS; // releaser hasn't set match yet
                            else if (!t.isInterrupted() && m == 0 &&
                            			(!timed ||
                            			(ns = end - System.nanoTime()) > 0L)) {
                            	p.parked = t; // minimize window
                            	if (AA.getAcquire(a, j) == p) {
                            		if (ns == 0L)
                            			// 当前线程阻塞,等待交换数据
                            			LockSupport.park(this);
                            		else
                            			LockSupport.parkNanos(this, ns);
                            	}
                            	p.parked = null;
                            }
                            // arena的第j个元素是p并且CAS设置arena的第j个元素由p设置为null成功
                            else if (AA.getAcquire(a, j) == p &&
                            			AA.compareAndSet(a, j, p, null)) {
                            	if (m != 0) // try to shrink
                            		BOUND.compareAndSet(this, b, b + SEQ - 1);
                            	p.item = null;
                            	p.hash = h;
                            	i = p.index >>>= 1; // descend
                            	// 如果线程被中断,则返回null值
                            	if (Thread.interrupted())
                            		return null;
                            	if (timed && m == 0 && ns <= 0L)
                            		// 如果超时,返回TIMED_OUT。
                            		return TIMED_OUT;
                            	break; // expired; restart
                            }
    					}
    				}
    				else
    					p.item = null; // clear offer
    			}
    			//
    			else {
    				if (p.bound != b) { // stale; reset
    					p.bound = b;
        				p.collides = 0;
    					i = (i != m || m == 0) ? m : m - 1;
    				}
    				else if ((c = p.collides) < m || m == FULL ||
    							!BOUND.compareAndSet(this, b, b + SEQ + 1)) {
    					p.collides = c + 1;
    					i = (i == 0) ? m : i - 1; // cyclically traverse
    				}
    				else
    					i = m + 1; // grow
    				p.index = i;
    			}
    		}
    	}
    	// ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124

    实战案例

    Exchanger用于于线程之间交换数据,其使用代码很简单,是⼀个exchange(…)方法,使用示例如下:

    import java.util.Random;
    import java.util.concurrent.Exchanger;
    
    public class ExchangerTest {
        private static final Random random = new Random();
        public static void main(String[] args) {
    		// 建⼀个多线程共⽤的exchange对象
    		// 把exchange对象传给3个线程对象。每个线程在自己的run⽅法中调⽤exchange,把自己的数据作为参数
    		// 传递进去,返回值是另外⼀个线程调⽤exchange传进去的参数
            Exchanger<String> exchanger = new Exchanger<>();
            new Thread("线程1") {
                @Override
                public void run() {
                    while (true) {
                        try {
    						// 如果没有其他线程调⽤exchange,线程阻塞,直到有其他线程调⽤exchange为⽌。
                            String otherData = exchanger.exchange("交换数据1");
                            System.out.println(Thread.currentThread().getName() + "得到 <==" + otherData);
                                    Thread.sleep(random.nextInt(2000));
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }.start();
            new Thread("线程2") {
                @Override
                public void run() {
                    while (true) {
                        try {
                            String otherData = exchanger.exchange("交换数据2");
                            System.out.println(Thread.currentThread().getName() + "得到 <==" + otherData);
                                    Thread.sleep(random.nextInt(2000));
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }.start();
            new Thread("线程3") {
                @Override
                public void run() {
                    while (true) {
                        try {
                            String otherData = exchanger.exchange("交换数据3");
                            System.out.println(Thread.currentThread().getName() + "得到 <==" + otherData);
                                    Thread.sleep(random.nextInt(2000));
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }.start();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56

    输出结果:

    线程2得到 <==交换数据1
    线程1得到 <==交换数据2
    线程2得到 <==交换数据3
    线程3得到 <==交换数据2
    线程1得到 <==交换数据2
    线程2得到 <==交换数据1
    线程3得到 <==交换数据1
    线程1得到 <==交换数据3
    线程2得到 <==交换数据1
    线程1得到 <==交换数据2
    线程1得到 <==交换数据2
    线程2得到 <==交换数据1
    ...
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    可以看到,三个线程间彼此交换自己的数据。

  • 相关阅读:
    一文彻底解析数据库设计思路
    C# + Oracel 批量插入数据(List,Array等)
    BOM的常用操作和有关获取页面/窗口高度、宽度及滚动的兼容性写法
    Pytorch详细教程——12.Tensors For Deep Learning
    基于Python和mysql开发的智慧校园答题考试系统(源码+数据库+程序配置说明书+程序使用说明书)
    抖音实战~取关博主
    docker基于alpine基础镜像合集(java、python)集成chrome
    Pr:基本图形(文本)
    Lesson1:酶预测大赛1
    windbg的时间旅行实现对 C# 程序的终极调试!
  • 原文地址:https://blog.csdn.net/Urbanears/article/details/128178633