我们可以使用 ReentrantLock 来替代 Synchronized锁,实现方法为:
- Lock lock = new ReentrantLock();
- void m2 (){
- try{
- // 加锁
- lock.lock();
- for(int i=0;i<10000;i++){
- count.incrementAndGet(); // 相当于线程安全的 count++
- }
- }catch (Exception e){
- e.printStackTrace();
- }finally {
- // 释放锁
- lock.unlock();
- }
- }
使用Synchronized是自动解锁的。但是使用lock锁,必须使用try-catch-finally包裹,在try中加锁,在finally中释放锁。
允许同一时刻多个读线程访问,但所有写线程均被阻塞。读写分离,并发性提升。
java中实现类的读写锁为 ReentrantReadWriteLock 类。
- import java.util.Random;
- import java.util.concurrent.locks.Lock;
- import java.util.concurrent.locks.ReadWriteLock;
- import java.util.concurrent.locks.ReentrantLock;
- import java.util.concurrent.locks.ReentrantReadWriteLock;
-
- public class ReadWriteLockTest {
- static Lock lock = new ReentrantLock();
- private static int value;
-
- static ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
- static Lock readLock = readWriteLock.readLock();
- static Lock writeLock = readWriteLock.writeLock();
-
- public static void read (Lock lock){
- try {
- lock.lock();
- // 模拟读操作
- Thread.sleep(1000);
- System.out.println("read over!");
- } catch (InterruptedException e) {
- e.printStackTrace();
- } finally {
- lock.unlock();
- }
- }
-
- public static void write (Lock lock,int v){
- try {
- lock.lock();
- // 模拟写操作
- Thread.sleep(1000);
- value = v;
- System.out.println("write over!");
- } catch (InterruptedException e) {
- e.printStackTrace();
- } finally {
- lock.unlock();
- }
- }
-
- public static void main(String[] args) {
- // 使用 互斥锁(排他锁) lock,需要一个一个执行线程
- Runnable readR = () -> read(lock);
- Runnable writeR = () -> write(lock,new Random().nextInt());
- for(int i=0;i<18;i++) new Thread(readR).start();
- for(int i=0;i<2;i++) new Thread(writeR).start();
- }
- }
这里我们使用 互斥锁 lock 来去模拟读写操作时会发现,每个线程一个一个执行,只有当前线程执行完毕,下一个线程抢到资源才继续执行。
如果我们使用读写锁进行模拟操作,会发现,读操作几乎是在一瞬间全部执行完毕,没有等待。
- public static void main(String[] args) {
- // 使用 读写锁 readWriteLock,所有的读写操作并发执行,读写不需要等待
- Runnable readLockR = () -> read(readLock);
- Runnable writeLockR = () -> write(writeLock,new Random().nextInt());
- for(int i=0;i<18;i++) new Thread(readLockR).start();
- for(int i=0;i<2;i++) new Thread(writeLockR).start();
- }
(1)我们可以使用 tryLock 进行尝试锁定,不管锁定与否,方法都将继续执行可以根据 tryLock 的返回值来判定是否锁定,也可以指定tryLock的时间,由于tryLock(time)抛出异常,所以要注意unclock的释放。
(2)使用 lock.lockInterruptibly(); 方法,可以将当前获得锁的线程中断,抛出异常并释放锁,需要配合 ti.interrupt(); 方法使用。
- Thread ti = new Thread(() -> {
- try {
- lock.lockInterruptibly();
- index.compareAndSet(10, 11);
- index.compareAndSet(11, 10);
- System.out.println(Thread.currentThread().getName()+": 10->11->10");
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- });
- ti.interrupt();
(3)初始化一个公平锁
static Lock lock = new ReentrantLock(true);
参数为 true表示为公平锁。当前所有线程在等待队列中,这个时候来了一个新的线程,如果是公平锁,则它会去查看是否有队列在等待,如果有,则让这个新的线程也加入队列等待。如果不是公平锁,这个线程会去抢占资源。
CountDownLath:允许其他多个线程等待当前latch线程,只有当前线程的latch计数器为0时,才放行,让其他线程执行。
latch.countDown():每执行一次 latch.countDown,计算器减一,直到为0。
latch.await(): latch.await相当于一道门,只有 latch 的计数器为0的时候才放行。
- import java.util.concurrent.CountDownLatch;
-
- public class CountDownLathTest {
- private static void usingCountDownLatch() {
- Thread[] threads = new Thread[100];
- // 定义一个CountDownLatch,给定计数器为 threads.length
- CountDownLatch latch = new CountDownLatch(threads.length);
- for (int i = 0; i < threads.length; i++) {
- threads[i] = new Thread(() -> {
- int result = 0;
- for (int j = 0; j < 100; j++) result += j;
- // 每执行一次 latch.countDown,计算器减一,直到为0
- latch.countDown();
- System.out.println(Thread.currentThread().getName());
- });
- }
- for (int i = 0; i < threads.length; i++) {
- threads[i].start();
- }
- try {
- // latch.await相当于一道门,只有 latch 的计数器为0的时候才放行
- latch.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("latch计数器为0,放行了") ;
- }
-
- public static void main(String[] args) {
- usingCountDownLatch();
- }
- }
CountDownLath 的作用和 join() 作用一样,都是让其他线程等待,先执行指定线程。
- private static void usingJoin() {
- Thread[] threads = new Thread[100];
- for (int i = 0; i < threads.length; i++) {
- threads[i] = new Thread(() -> {
- int result = 0;
- for (int j = 0; j < 100; j++) result += j;
- System.out.println(Thread.currentThread().getName());
- });
- }
- for (int i = 0; i < threads.length; i++) {
- threads[i].start();
- }
- for (int i = 0; i < threads.length; i++) {
- try {
- // 将线程循环放入,确保执行顺序
- threads[i].join();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- System.out.println("join顺序线程执行完毕,到我了") ;
- }
CyclicBarrier:可循环使用屏障,定义一个指定瓶颈的CyclicBarrier,当调用 await方法到达这个瓶颈后,就去执行相应的瓶颈方法。
cyclicBarrier.await():如果没有达到计数器瓶颈,就等待
cyclicBarrier.reset():使计数器重置
- public static void main(String[] args) {
- // CyclicBarrier 第二个参数作用是,当达到瓶颈 20 后,去做一些事情
- CyclicBarrier cyclicBarrier = new CyclicBarrier(20, new Runnable() {
- @Override
- public void run() {
- System.out.println("达到了瓶颈,重置");
- }
- });
- for (int i=0;i<100;i++){
- int finalI = i;
- new Thread(() -> {
- try {
- // 每执行一次cyclicBarrier.await(),相当与 cyclicBarrier的计数器+1,直到等于20,执行瓶颈方法
- cyclicBarrier.await();
- } catch (BrokenBarrierException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }).start();
- }
-
- }
CyclicBarrier使用场景:
1. 限流,当请求数量达到一定数量时,进行限流。
2. 复杂操作数据库、网络、文件
3. 并发执行线程-操作线程-操作
Semaphore :用来控制同时访问资源的线程数量。
可以指定是否为公平锁。
公平锁:获取锁时,如果获取的顺序符合请求的绝对时间顺序,则为公平锁,FIFO。
- // 允许 n 个线程同时执行
- Semaphore semaphore = new Semaphore(1);
- // 可以指定是否为公平锁
- Semaphore semaphore2 = new Semaphore(2,true);
- new Thread(() -> {
- try {
- semaphore.acquire();
- System.out.println("T1 running ...");
- Thread.sleep(200);
- System.out.println("T1 running ...");
- semaphore.release();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }).start();
- new Thread(() -> {
- try {
- semaphore.acquire();
- System.out.println("T2 running ...");
- Thread.sleep(200);
- System.out.println("T2 running ...");
- semaphore.release();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }).start();
- }
T1 running ...
T1 running ...
T2 running ...
T2 running ...
如果我们把 new Semaphore(1); 改为 new Semaphore(2); 执行结果:
T1 running ...
T2 running ...
T1 running ...
T2 running ...
可以看到,为1时,T2只能等 T1执行完毕,才开始执行。为2时,T1、T2可以同时执行。
主要使用场景为限流,类比高速收费站,5个车道2个收费站。
Exchanger:用于线程通信(线程交换数据)的工具类,提供一个同步点,当第一个线程执行了 exchanger()方法,它会一直等待下一个线程也执行 exchanger()方法,然后进行交换数据。可以设置最大等待时间。
- import java.util.concurrent.Exchanger;
-
- public class ExchangerTest {
- static Exchanger
exchanger = new Exchanger<>(); - public static void main(String[] args) {
- new Thread(() ->{
- String s = "T1";
- try {
- // 设置多大等待时间
- // s = exchanger.exchange(s,2000, TimeUnit.SECONDS);
- s = exchanger.exchange(s);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println(Thread.currentThread().getName()+" "+s);
- },"t1").start();
-
- new Thread(() ->{
- String s = "T2";
- try {
- s = exchanger.exchange(s);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println(Thread.currentThread().getName()+" "+s);
- },"t2").start();
- }
- }
t2 T1
t1 T2
使用场景如:游戏中俩个人交换装备。
LockSupport:定义了一组静态方法,提供了最基本的线程阻塞和唤醒功能。
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.locks.LockSupport;
-
- public class LockSupportTest {
- public static void main(String[] args) {
- Thread t = new Thread(() ->{
- for(int i =0;i<10;i++){
- System.out.println(i);
- if(i == 5){
- // 阻塞当前线程
- LockSupport.park();
- }
- try {
- TimeUnit.SECONDS.sleep(1);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- });
- t.start();
- // 唤醒指定线程
- // LockSupport.unpark(t);
- }
- }
0
1
2
3
4
5
由于 LockSupport.park(); 当循环到5的时候,阻塞当前线程。
将 LockSupport.unpark(t); 打开后,唤醒线程,完整打印。
0
1
2
3
4
5
6
7
8
9
当我们查看 part()方法的实现后发现,这个方法是 Unsafe 类实现的。还记得之前 CAS 中说到的这个类吗?
所有以 AtomXXX开头的类,底层都是使用cas方法,并是通过 Unsafe类实现的。
Unsafe类的出现等于c/c++的指针,给 java语言赋予了原来 C/C++实现的指针方法。
- /**
- * Disables the current thread for thread scheduling purposes unless the
- * permit is available.
- *
- *
If the permit is available then it is consumed and the call
- * returns immediately; otherwise the current thread becomes disabled
- * for thread scheduling purposes and lies dormant until one of three
- * things happens:
- *
- *
- *
- *
- Some other thread invokes {@link #unpark unpark} with the
- * current thread as the target; or
- *
- *
- Some other thread {@linkplain Thread#interrupt interrupts}
- * the current thread; or
- *
- *
- The call spuriously (that is, for no reason) returns.
- *
- *
- *
This method does not report which of these caused the
- * method to return. Callers should re-check the conditions which caused
- * the thread to park in the first place. Callers may also determine,
- * for example, the interrupt status of the thread upon return.
- */
- public static void park() {
- UNSAFE.park(false, 0L);
- }
翻译:
出于线程调度目的禁用当前线程,除非允许可用。
如果许可证可用,则将其使用,呼叫立即返回;否则,当前线程将出于线程调度目的而被禁用并处于休眠状态,直到发生以下三种情况之一:
其他一些线程以当前线程作为目标进行调用 unpark ;或者
其他线程 中断 当前线程;或
虚假调用(即无缘无故)返回。
此方法 不会 报告哪些导致该方法返回。调用方应重新检查导致线程首先停放的条件。例如,调用方还可以确定线程在返回时的中断状态