• 并发-Java中的并发工具类


    Java中的并发工具类

    等待多线程完成的CountDownLatch

    • 允许一个或多个线程等待其他线程完成操作

    • 需求:需要解析一个Excel里多个sheet数据,考虑每个线程解析一个sheet的数据,等所有sheet都解析完程序需要提示解析完成

      • 实现方法:

      • join - 让当前执行线程等待join线程执行结束。

        • 原理:不停检查join线程是否存活,如果存活则让当前线程永远等待。

        • 直到join中止在之后,线程this.notifyAll方法会被调用

        • public class JoinCountDownLatchTest {
          
              public static void main(String[] args) throws InterruptedException {
                  Thread parser1 = new Thread(new Runnable() {
                      @Override
                      public void run() {
          
                      }
                  });
                  Thread parser2 = new Thread(new Runnable() {
                      @Override
                      public void run() {
                          System.out.println("parser2 finish");
                      }
                  });
                  parser1.start();
                  parser2.start();
                  parser1.join();
                  parser2.join();
                  System.out.println("all parser finish");
              }
          }
          
          • 1
          • 2
          • 3
          • 4
          • 5
          • 6
          • 7
          • 8
          • 9
          • 10
          • 11
          • 12
          • 13
          • 14
          • 15
          • 16
          • 17
          • 18
          • 19
          • 20
          • 21
          • 22
      • countDownLatch实现

        • 构造函数接收一个int类型的参数作为计数器。等待N个点完成(可以是N个线程,也可以是1个线程N个步骤)

        • 调用countDown方法,N减1,

        • await方法会阻塞当前线程,直到N变成0;

        • await(long time,TimeUnit unit) 等待特定时间,不会再阻塞当前线程。

        • public class CountDownLatchTest {
          
              static CountDownLatch c = new CountDownLatch(2);
          
              public static void main(String[] args) throws InterruptedException {
                  new Thread(new Runnable() {
                      @Override
                      public void run() {
                          System.out.println(1);
                          c.countDown();
                          System.out.println(2);
                          c.countDown();
                      }
                  }).start();
                  c.await();
                  System.out.println("3");
              }
          }
          
          • 1
          • 2
          • 3
          • 4
          • 5
          • 6
          • 7
          • 8
          • 9
          • 10
          • 11
          • 12
          • 13
          • 14
          • 15
          • 16
          • 17
          • 18

    同步屏障CyclicBarrier

    简介
    • 可循环使用的屏障,让一组线程到达一个屏障(同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。

    • 默认的构造方法CyclicBarrier(int parties)参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier已经到达屏障,然后当前线程被阻塞。

    • 一下代碼输入为1,2, 或2,1 如果数量为3,则主线程和子线程会永远等待,没有输出,因为没有第三个执行await方法。

    • public class CyclicBarrierTets {
      
          static CyclicBarrier c = new CyclicBarrier(2);
      
          public static void main(String[] args) {
              new Thread(new Runnable() {
                  @Override
                  public void run() {
                      try {
                          c.await();
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      } catch (BrokenBarrierException e) {
                          e.printStackTrace();
                      }
                      System.out.println(1);
                  }
              }).start();
              try{
                  c.await();
              } catch (InterruptedException e) {
                  e.printStackTrace();
              } catch (BrokenBarrierException e) {
                  e.printStackTrace();
              }
              System.out.println(2);
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
    • 构造函数CyclicBarrier(int parties, Runnable barrier-Action):用于在线程到达屏障时,优先执行barrierAction

    • 因为设置了拦截线程数量是2,所以必须等代码中的第一个线程和线程A都执行完后,才会继续执行主线程,然后输出2

    • //输出  3 1 2
      public class CyclicBarrierTets2 {
      
          static CyclicBarrier c = new CyclicBarrier(2, new A());
      
      
          static class A implements Runnable{
      
              @Override
              public void run() {
                  System.out.println(3);
              }
          }
      
          public static void main(String[] args) {
              new Thread(new Runnable() {
                  @Override
                  public void run() {
                      try{
                          c.await();
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      } catch (BrokenBarrierException e) {
                          e.printStackTrace();
                      }
                      System.out.println(1);
                  }
              }).start();
              try {
                  c.await();
              } catch (InterruptedException e) {
                  e.printStackTrace();
              } catch (BrokenBarrierException e) {
                  e.printStackTrace();
              }
              System.out.println(2);
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
    应用场景

    可以用于多线程计算数据,最后合并计算结果的场景。

    例如:使用线程池创建4个线程,分别计算每个sheet里的数据,每个sheet计算结果1

    import java.util.Map;
    import java.util.concurrent.*;
    
    public class BankWaterService implements Runnable {
    
        //创建四个屏障,处理完之后执行当前类的run方法
        private CyclicBarrier c = new CyclicBarrier(4, this);
        //假设只有4个sheet,所以只启动4个线程
        private Executor executor = Executors.newFixedThreadPool(4);
        //保存每个sheet计算出的银流结果
        private ConcurrentHashMap<String, Integer> sheetBankWaterCount = new ConcurrentHashMap<>();
    
        private void count(){
            for (int i = 0; i < 4; i++) {
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        //计算当前sheet银流数据,计算代码省略
                        sheetBankWaterCount.put(Thread.currentThread().getName(), 1);
                        //银流计算完成,插入一个屏障
                        try {
                            c.await();
                        }catch (InterruptedException | BrokenBarrierException e){
                            e.printStackTrace();
                        }
                    }
                });
            }
        }
    
        @Override
        public void run() {
    
            int result = 0;
            //汇总每个sheet计算出的结果
            for (Map.Entry<String, Integer> sheet : sheetBankWaterCount.entrySet()) {
               result += sheet.getValue();
            }
            //将结果输出
            sheetBankWaterCount.put("result",result);
            System.out.println(result);
        }
    
        public static void main(String[] args) {
            BankWaterService bankWaterService = new BankWaterService();
            bankWaterService.count();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    CyclicBarrier和CountDownLatch区别
    • CountDownLatch计数器只能使用一次,CyclicBarrier计数器可以使用reset重置

    • CyclicBarrier还有更复杂的方法

      • getNumberWaiting:可以获得Cyclc-Barrier阻塞的线程数量。

      • isBroken方法用来了解阻塞的线程是否被中断

      • import java.util.concurrent.CyclicBarrier;
        
        
        public class CyclicBarrier4 {
            static CyclicBarrier c = new CyclicBarrier(2);
        
            public static void main(String[] args) {
                Thread thread = new Thread(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            c.await();
                        }catch (Exception e){
        
                        }
                    }
                });
                thread.start();
                thread.interrupt();
                try {
                    c.await();
                }catch (Exception e){
                    System.out.println(c.isBroken());
                }
            }
        }
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
        • 13
        • 14
        • 15
        • 16
        • 17
        • 18
        • 19
        • 20
        • 21
        • 22
        • 23
        • 24
        • 25
        • 26

    控制并发线程数的Semaphore

    • 用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用公共资源

    • 应用场景:

      • 可以用于流量控制,例如数据库连接

      • 需求:读取几万个文件的数据,因为都是IO密集型任务,可以启动几十个线程并发地读取,如果读到内存后,需要存储到数据库中,而数据库的连接数只有10个,必须控制只有10个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接。

      • public class SemaphoreTest {
        
            private static final int THREAD_COUNT = 30;
            private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
            private static Semaphore s = new Semaphore(10);
        
            public static void main(String[] args) {
                for (int i = 0; i < THREAD_COUNT; i++) {
                    threadPool.execute(new Runnable() {
                        @Override
                        public void run() {
                            try {
                                //获取许可证
                                s.acquire();
                                System.out.println("save data");
                                //归还许可证
                                s.release();
                            }catch (InterruptedException e){
        
                            }
                        }
                    });
                }
                threadPool.shutdown();
            }
        }
        
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
        • 13
        • 14
        • 15
        • 16
        • 17
        • 18
        • 19
        • 20
        • 21
        • 22
        • 23
        • 24
        • 25
        • 26
        • 27
    • Semaphore其他方法

      • intavailablePermits():返回此信号量中当前可用的许可证数量
      • intgetQueueLength():返回正在等待获取许可证的线程数
      • booleanhasQueuedThreads():是否有线程正在等待获取许可证
      • void reducePermits(int reduction):减少reduction个许可证,是个protected方法
      • Collection getQueuedThreads():返回所有等待获取许可证的线程集合,是个protected方法

    线程间交换数据的Exchanger

    • 用于线程间协作的工具类,用于进行线程间的数据交换

    • 提供一个同步点,在这个同步点两个线程可以交换彼此的数据,第一个线程执行exchange()方法,会一直等待第二个线程也执行exchange方法,当两个线程都到达同步点时,两个线程可以交换数据。

    • 应用场景:

      • 用于遗传算法:交换两个人的数据,并使用交叉规则得出2个交配结果,

      • 用于校对工作:AB岗两人进行录入,录入后对两个人数据进行校对

      • public class ExchangerTest {
        
            private static final Exchanger<String> exgr = new Exchanger<>();
            private static ExecutorService threadPool = Executors.newFixedThreadPool(2);
        
            public static void main(String[] args) {
                threadPool.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            String A = "aaaa";
                            exgr.exchange(A);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                });
                threadPool.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            String B = "bbb";
                            String A = exgr.exchange(B);
                            System.out.println("是否一致" +A.equals(B) + "A:" + A + "B:" + B);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                });
                threadPool.shutdown();
            }
        }
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
        • 13
        • 14
        • 15
        • 16
        • 17
        • 18
        • 19
        • 20
        • 21
        • 22
        • 23
        • 24
        • 25
        • 26
        • 27
        • 28
        • 29
        • 30
        • 31
        • 32
    • 如果两个线程有一个没有执行exchange方法,则会一直等待,如果担心有特殊情况发生,避免一直等待,可以使用exchange(V x, long timeout, TimeUnit unit)设置最大等待时长

  • 相关阅读:
    【微信小程序】全局样式文件app.wxss、页面的根元素page、 app.json中的window配置项
    项目管理标杆和先驱——华为管理体系(PMP),一文看懂!
    macOS Ventura13.0.1解决office缺少“宋体”等问题。安装微软雅黑、宋体等字体。
    详解--Hash(中文也称散列、哈希)
    JavaWeb搭建学生管理系统(手把手)
    最新外链系统强势来袭
    C语言if语句 输入一个字符,判断是字母、数字、特殊字符
    【Linux】进程控制 (万字)
    redis底层都有哪些数据结构?带你了解redis是如何存储数据的
    记录一个Cortex-M23的一个重要问题
  • 原文地址:https://blog.csdn.net/weixin_39795049/article/details/132724507