• 走进JUC的世界


    概念

    同步锁:synchronized、Lock区别

    1、synchronized是不需要进行手动解锁

    2、synchronized可以锁方法、锁同步代码块

    3、synchronized是Java自带关键字

    4、Lock锁是一个类且它拥有synchronized的所有功能还具备扩展

    5、Lock锁的实现类ReentrantLock可以实现公平和非公平锁

    6、Lock锁需要手动加锁和手动解锁

    7、synchronized不可中断而Lock锁可以实现中断

    • synchronized

      • 当修饰方法时:锁的是方法调用者(this)
      • 当使用static synchronized修饰方法时,锁的是Class对象(类名.class)
      • 也可以使用代码块方式来锁取Class对象(类名.class)
    • Lock : 主要使用到的实现类ReentrantLock(可重入锁)

      • ReentrantLock() -> 非公平锁(默认)(所谓非公平锁既是可以进行插队操作)
      • ReentrantLock(true) -> 公平锁(所谓公平锁就是需要排队,不可以进行插队操作)

    集合的线程不安全情况和解决方案

    List : ArrayList不安全List,但是在单线程情况下是高效的!

    多线程下错误案例

    List<Integer> list = new ArrayList<>();
    
    for (int i = 0; i < 30; i++) {
          final Integer temp = i;
            new Thread(()->{
                list.add(temp);
                System.out.println(list);
           }, String.valueOf(temp)).start();
      }
    
    //结果出现并发修改异常ConcurrentModificationException
    [0, 1, 2, 3, 5, 4, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 19, 18, 20, 21, 23, 22, 24, 25, 26, 27, 29, 28]
    //Exception in thread "11" Exception in thread "15" Exception in thread "19" java.util.ConcurrentModificationException
    

    解决方案

    //1.使用集合安全类进行转换
    List<Integer> list = Collections.synchronizedList(new ArrayList<>());
            for (int i = 0; i < 30; i++) {
                final Integer temp = i;
                new Thread(()->{
                    list.add(temp);
                    System.out.println(list);
                }, String.valueOf(temp)).start();
            }
    //2.使用List对应的Vector
     List<Integer> list = new Vector<>();
            for (int i = 0; i < 30; i++) {
                final Integer temp = i;
                new Thread(()->{
                    list.add(temp);
                    System.out.println(list);
                }, String.valueOf(temp)).start();
            }
    //3.使用CopyOnWriteArrayList
    List<Integer> list = new CopyOnWriteArrayList<>();
            for (int i = 0; i < 30; i++) {
                final Integer temp = i;
                new Thread(()->{
                    list.add(temp);
                    System.out.println(list);
                }, String.valueOf(temp)).start();
            }
    

    Set集合:HashSet

    多线程下错误案例

    Set<Integer> set = new HashSet<>();
            for (int i = 0; i < 30; i++) {
                final Integer temp = i;
                new Thread(()->{
                    set.add(temp);
                    System.out.println(set);
                }, String.valueOf(temp)).start();
            }
    // 结果:抛出ConcurrentModificationException并发修改异常!
    

    解决方案:

    // 1.使用Collections.synchronizedSet安全集合包装
     Set<Integer> set = Collections.synchronizedSet(new HashSet<>());
    // 2.使用CopyOnWriteArraySet
    Set<Integer> set = new CopyOnWriteArraySet<>();
    

    map集合:HashMap

    Map<String, Object> map = new HashMap<>();
            for (int i = 0; i < 30; i++) {
                final Integer temp = i;
                new Thread(()->{
                   map.put(String.valueOf(temp), temp);
                    System.out.println(map);
                }, String.valueOf(temp)).start();
            }
    //结果:Exception in thread "6" java.util.ConcurrentModificationException并发修改异常
    

    解决方案:

    // 1.ConcurrentHashMap
    Map<String, Object> map = new ConcurrentHashMap<>();
    for (int i = 0; i < 70; i++) {
        final Integer temp = i;
        new Thread(()->{
           map.put(String.valueOf(temp), temp);
            System.out.println(map);
        }, String.valueOf(temp)).start();
    }
    // 2.Hashtable(效率低)
    
    

    常用线程辅助类

    CountDownLatch(减法计数器)

    CountDownLatch countDownLatch = new CountDownLatch(10); // 传入一个数字,要执行多少次
    countDownLatch.countDown();  // 每次执行完一个任务后,进行减1操作
    
    countDownLatch.await();  // 等待计数器归零,只有等上面执行次数完毕后,才能执行后面的操作
    

    CyclicBarrier(加法计数器)

    CyclicBarrier cyclicBarrier = new CyclicBarrier(10);  // 初始化计数器容量,默认构造Runnable为null
    
    
    // 当计数器到达10的时候,就执行Runnable里面的具体操作
    CyclicBarrier cyclicBarrier = new CyclicBarrier(10, new Runnable() {
                @Override
                public void run() {
                    System.out.println("executor other thing!");
                }
            });
    
    cyclicBarrier.await();  // 等待计数器到达初始化计数器值,然后才能执行下面操作!
    
    

    栗子:

    //初始化cyclicBarrier加法计数器
    CyclicBarrier cyclicBarrier = new CyclicBarrier(10, new Runnable() {
         @Override
          public void run() {
                System.out.println("执行到第十个啦,完结!");
            }
      });
    
            for (int i = 1; i <= 10; i++) {
                int u = i;
                new Thread(()->{
                    try {
                        System.out.println("执行到第" + u +"个了, 还剩" + (10 - u) + "个");
                        //每执行完一个线程就进行加一操作!当执行完第十个就触发cyclicBarrier初始化中的Runnable接口实现
                        cyclicBarrier.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }).start();
            }
    

    Semaphore(信号量)

    Semaphore : 一般用于限流情况

    semaphore.acquire():获得线程使用权限

    semaphore.release():释放线程使用权限

            Semaphore semaphore = new Semaphore(2);
    
            for (int i = 1; i <= 4; i++) {
                new Thread(() -> {
                    try {
                        // 得到线程执行权限,当线程数到达了信号量初始化容量,其他线程就会等待(阻塞)当前线程执行完毕并释放执行权限才可继续执行!
                        semaphore.acquire(); 
                        System.out.println("当前线程:" + Thread.currentThread().getName() + "开始执行...");
                        TimeUnit.SECONDS.sleep(2);
                        System.out.println("当前线程:" + Thread.currentThread().getName() + "执行完毕");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        semaphore.release(); //释放线程执行权限
                    }
    
                }, String.valueOf(i)).start();
            }
        }
    
    // 结果
    当前线程:1开始执行...
    当前线程:2开始执行...
    当前线程:1执行完毕
    当前线程:2执行完毕
    // 到达信号量最大容量,其他线程就进行等待(阻塞)
    当前线程:3开始执行...
    当前线程:4开始执行...
    当前线程:3执行完毕
    当前线程:4执行完毕
    

    读写锁

    ReadWriteLock

    主要使用到:ReentrantReadWriteLock(实现类)

    概念

    • 读写锁共存

      • 读 -> 读 可以共存
      • 读 -> 写 不能共存(不能边修改边读取,就会出现读取的数据不正确情况)
      • 写 -> 写 不能共存(可能出现一个线程正在修改原来的值,另一个线程也在修改原来的值,出现两个线程修改后,最后读取的数据不是自己修改的数据)
    • 独占/共享锁

      • 独占锁:也就是写锁,同一时刻只能有一个线程可以对数据进行写的操作
      • 共享锁:也就是读锁,同一时刻可以出现多个线程对数据进行读取的操作,且读取的数据都是同一份数据
    //开启两个读写线程,分别进行写和读操作        
    for (int i = 0; i < 5; i++) {
                final Integer temp = i;
                new Thread(()->{
                    mapDemo.put(String.valueOf(temp), temp + 10000);
                }, "线程->" + String.valueOf(temp)).start();
            }
    
            for (int i = 5; i < 10; i++) {
                final Integer temp = i;
                new Thread(()->{
                    mapDemo.get(String.valueOf(temp));
                }, "线程->" + String.valueOf(temp)).start();
           }
    
    // 初始化读写锁
      private volatile Map<String, Object> map = new ConcurrentHashMap<>();
        private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    
    
        public void put(String key, Object value) {
            //写入加锁
            readWriteLock.writeLock().lock();
            try {
                System.out.println(Thread.currentThread().getName() + "开始写入.....");
                map.put(key, value);
                System.out.println(Thread.currentThread().getName() + "写入完毕.....");
            }finally {
                //写完释放锁
                readWriteLock.writeLock().unlock();
            }
        }
    
        public Object get(String key) {
            //读取加锁
            readWriteLock.readLock().lock();
    
            Object object = null;
            try {
                System.out.println(Thread.currentThread().getName() + "开始读取----------->");
                object = map.get(key);
                System.out.println(Thread.currentThread().getName() + "读取完成----------->");
            }finally {
    		//读取解锁
                readWriteLock.readLock().unlock();
            }
            return object;
        }
    
    // 运行结果:发现写入的时候,总是只有一个线程可以在同一时间进行写入,而读取可以多个线程同时读取
    线程->1开始写入.....
    线程->1写入完毕.....
    线程->0开始写入.....
    线程->0写入完毕.....
    线程->3开始写入.....
    线程->3写入完毕.....
    线程->2开始写入.....
    线程->2写入完毕.....
    线程->4开始写入.....
    线程->4写入完毕.....
    线程->5开始读取----------->
    线程->5读取完成----------->
    线程->7开始读取----------->
    线程->8开始读取----------->
    线程->8读取完成----------->
    线程->6开始读取----------->
    线程->9开始读取----------->
    线程->9读取完成----------->
    线程->7读取完成----------->
    线程->6读取完成----------->
    

    阻塞队列

    • ArrayBlockingQueue
      • add()与offer()区别:add在超出容量时会抛出异常,而offer则不会抛出异常,而是拒绝添加到队列中!
      • 移除区别(remove()与poll()区别):当队列中无元素时,remove会抛出异常,而poll则是返回null
      • 查看队首(element()与peek()区别):当队列为空时,element会抛出异常,而peek
      ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(4);   
    
            arrayBlockingQueue.add("A");
            arrayBlockingQueue.add("B");
            arrayBlockingQueue.add("C");
            arrayBlockingQueue.add("D");
    //        arrayBlockingQueue.add("E");
            System.out.println(arrayBlockingQueue);
    //结果:
    [A, B, C, D]
    
    /**
    注意:
    1. 当元素超过队列的容量时,就会抛出异常java.lang.IllegalStateException: Queue full
    2. 当添加null时,抛出空指针异常 java.lang.NullPointerException
    3.使用offer代替add使用
    */
    //1.错误案例:容量为4,但是添加了五个元素
            ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(4);
    
            arrayBlockingQueue.add("A");
            arrayBlockingQueue.add("B");
            arrayBlockingQueue.add("C");
            arrayBlockingQueue.add("D");
            arrayBlockingQueue.add("E");
            System.out.println(arrayBlockingQueue);
    //结果:
    Exception in thread "main" java.lang.IllegalStateException: Queue full
    
    //2.错误案例:添加null数据,抛出空指针异常
         ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(4);
    
            arrayBlockingQueue.add("A");
            arrayBlockingQueue.add("B");
            arrayBlockingQueue.add("C");
            arrayBlockingQueue.add(null);
            System.out.println(arrayBlockingQueue);
    //结果:
    Exception in thread "main" java.lang.NullPointerException
        
    //3.使用offer代替add添加元素
    ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(4);
            arrayBlockingQueue.offer("A");
            arrayBlockingQueue.offer("B");
            arrayBlockingQueue.offer("C");
            arrayBlockingQueue.offer("D");
            arrayBlockingQueue.offer("E");
            System.out.println(arrayBlockingQueue);
    
    //add与offer区别:add在超出容量时会抛出异常,而offer则不会抛出异常,而是拒绝添加到队列中!
    //结果:
    [A, B, C, D]
    
    
    • ArrayBlockingQueue 延迟等待

      //延迟添加等待----------------> offer()
      ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(4);
      
      System.out.println(arrayBlockingQueue.offer("A"));
      System.out.println(arrayBlockingQueue.offer("B"));
      System.out.println(arrayBlockingQueue.offer("C"));
      System.out.println(arrayBlockingQueue.offer("D"));
      //延迟12秒添加,如果队列已满就返回false(表示添加失败)
      System.out.println(arrayBlockingQueue.offer("E", 12, TimeUnit.SECONDS));
      
      System.out.println(arrayBlockingQueue);
      //结果:
      true
      true
      true
      true
      false
      [A, B, C, D]
          
      // 延迟取出等待-------------> poll()
              ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(4);
      
              arrayBlockingQueue.offer("A");
              arrayBlockingQueue.offer("B");
              arrayBlockingQueue.offer("C");
              arrayBlockingQueue.offer("D");
              arrayBlockingQueue.offer("E", 2, TimeUnit.SECONDS);
      
              System.out.println(arrayBlockingQueue.poll());
              System.out.println(arrayBlockingQueue.poll());
              System.out.println(arrayBlockingQueue.poll());
              System.out.println(arrayBlockingQueue.poll());
              System.out.println(arrayBlockingQueue.poll(2, TimeUnit.SECONDS));
      // 结果:
      A
      B
      C
      D
      //延迟等待2秒钟再弹出,如果队列为空,就返回null
      null
      
    • 同步队列(SynchronousQueue)

      • 特性:只能存储一个对象/值,当存入之后必须等待取出之后才能进行再次存入

    栗子:

    new Thread(()->{
        try {
            synchronousQueue.put(1);
            System.out.println(Thread.currentThread().getName() + ":put  " + 1);
            synchronousQueue.put(2);
            System.out.println(Thread.currentThread().getName() + ":put  " + 2);
            synchronousQueue.put(3);
            System.out.println(Thread.currentThread().getName() + ":put  " + 3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } }
            , "put线程:").start();
    
    new Thread(()->{
        try {
            System.out.println(Thread.currentThread().getName() + ":take -> " + synchronousQueue.take());
            System.out.println(Thread.currentThread().getName() + ":take -> " + synchronousQueue.take());
            System.out.println(Thread.currentThread().getName() + ":take -> " + synchronousQueue.take());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } }
            , "take线程:").start();
    
    //结果:
    put线程::put  1
    take线程::take -> 1
    put线程::put  2
    take线程::take -> 2
    put线程::put  3
    take线程::take -> 3
    

    线程池

    • 线程复用(节约了系统资源)

    • 控制最大并发数(当达到线程池容量,就需要等待其他线程完成,才能继续进入)

    • 管理线程

    • Executors线程池

    ExecutorService executorService = Executors.newSingleThreadExecutor(); // 单个线程的池子
    ExecutorService executorService = Executors.newFixedThreadPool(10); //开启十个固定线程的池子
    ExecutorService executorService = Executors.newCachedThreadPool(); //可伸缩线程池, 如果线程池中线程已全被使用就创建新的线程池
    

    newSingleThreadExecutor

    ExecutorService executorService = Executors.newSingleThreadExecutor(); // 单个线程的池子
    try {
        for (int i1 = 0; i1 < 10; i1++) {
            //执行线程
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + " 执行了线程..");
                }
            });
        }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        //关闭线程池
        executorService.shutdown();
    }
    //结果:只有一个线程在重复利用执行
    pool-1-thread-1 执行了线程..
    pool-1-thread-1 执行了线程..
    pool-1-thread-1 执行了线程..
    pool-1-thread-1 执行了线程..
    pool-1-thread-1 执行了线程..
    pool-1-thread-1 执行了线程..
    pool-1-thread-1 执行了线程..
    pool-1-thread-1 执行了线程..
    pool-1-thread-1 执行了线程..
    pool-1-thread-1 执行了线程..
    

    newFixedThreadPool

    ExecutorService executorService = Executors.newFixedThreadPool(5); //开启十个固定线程的池子
    try {
        for (int i = 0; i < 10; i++) {
            //执行线程
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + " 执行了线程..");
                }
            });
        }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        //关闭线程池
        executorService.shutdown();
    }
    //结果:五个不同的线程重复使用
    pool-1-thread-4 执行了线程..
    pool-1-thread-1 执行了线程..
    pool-1-thread-4 执行了线程..
    pool-1-thread-3 执行了线程..
    pool-1-thread-2 执行了线程..
    pool-1-thread-5 执行了线程..
    pool-1-thread-2 执行了线程..
    pool-1-thread-3 执行了线程..
    pool-1-thread-4 执行了线程..
    pool-1-thread-1 执行了线程..
    

    newCachedThreadPool

    ExecutorService executorService = Executors.newCachedThreadPool(); //可伸缩线程池, 如果线程池中线程已全被使用就创建新的线程池
    
    try {
        for (int i = 0; i < 10; i++) {
            //开启线程
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + " 执行了线程..");
                }
            });
        }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        //关闭线程池
        executorService.shutdown();
    }
    //结果:开启新线程,当已开启的线程执行完毕,放入池子中又可以进行使用,如果开启的线程都还在执行中,就创建新的线程
    pool-1-thread-1 执行了线程..
    pool-1-thread-6 执行了线程..
    pool-1-thread-5 执行了线程..
    pool-1-thread-3 执行了线程..
    pool-1-thread-4 执行了线程..
    pool-1-thread-2 执行了线程..
    pool-1-thread-8 执行了线程..
    pool-1-thread-7 执行了线程..
    pool-1-thread-9 执行了线程..
    pool-1-thread-10 执行了线程..
    

    线程池参数

    • 7大参数
    public ThreadPoolExecutor(int corePoolSize,   //核心线程数
                              int maximumPoolSize, //最大线程数
                              long keepAliveTime,  //线程存活时间
                              TimeUnit unit,      //线程时间单元
                              BlockingQueue<Runnable> workQueue) {  //阻塞队列
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory()   //默认线程工厂
             , defaultHandler);   //线程拒绝策略(当达到了最大线程数时,采用线程拒绝策略)
    }
    

    Spring自带的任务执行器线程池

    @Bean("scheduledTaskExecutor")
    public ThreadPoolTaskExecutor scheduledTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();  //创建任务执行器线程池
        executor.setCorePoolSize(3);//设置核心线程数
        executor.setMaxPoolSize(5); //设置最大线程数
        executor.setQueueCapacity(1024*100);  //设置一个队列容量
        executor.setThreadNamePrefix("parking-index-task"); //线程名称
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 拒绝策略
        executor.initialize(); //初始化线程池
        return executor;
    }
    

    线程四大拒绝策略应用场景

    AbortPolicy: 当队列中线程已满,就抛出异常
    DiscardPolicy:当队列满了,就丢弃任务,不会抛出异常
    CallerRunsPolicy: 队列已满时,就使用调用者的线程去执行,当处理器关闭就丢弃此线程需求
    DiscardOldestPolicy:当队列满了,去尝试和较早的线程竞争,当最早的线程即将执行完成就把当前任务使用即将完成的线程执行
    
    源码解释:
    AbortPolicy:
    public AbortPolicy() { }
    
            /**
             * Always throws RejectedExecutionException.
             *
             * @param r the runnable task requested to be executed
             * @param e the executor attempting to execute this task
             * @throws RejectedExecutionException always
             解释:总是把RejectedExecutionException。Params: r—请求执行的可运行任务e—尝试执行该任务的执行器抛出:RejectedExecutionException—always
             */
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                throw new RejectedExecutionException("Task " + r.toString() +
                                                     " rejected from " +
                                                     e.toString());
            }
    
    DiscardPolicy:
        public DiscardPolicy() { }
    
            /**
             * Does nothing, which has the effect of discarding task r.
             *
             * @param r the runnable task requested to be executed
             * @param e the executor attempting to execute this task
             解释:什么都不做,这有丢弃任务r的效果。参数:r -请求被执行的可运行任务e -试图执行该任务的执行程序
             */
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            }
    CallerRunsPolicy:
        public CallerRunsPolicy() { }
    
            /**
             * Executes task r in the caller's thread, unless the executor
             * has been shut down, in which case the task is discarded.
             *
             * @param r the runnable task requested to be executed
             * @param e the executor attempting to execute this task
             解释:在调用者的线程中执行任务r,除非执行器已经关闭,在这种情况下,任务将被丢弃。参数:r—请求执行的可运行任务e—尝试执行该任务的执行程序
             */
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                if (!e.isShutdown()) {
                    r.run();
                }
    DiscardOldestPolicy:
        public DiscardOldestPolicy() { }
    
            /**
             * Obtains and ignores the next task that the executor
             * would otherwise execute, if one is immediately available,
             * and then retries execution of task r, unless the executor
             * is shut down, in which case task r is instead discarded.
             *
             * @param r the runnable task requested to be executed
             * @param e the executor attempting to execute this task
             解释:获取并忽略执行器将执行的下一个任务(如果有一个任务立即可用),然后重试执行任务r,除非执行器被关闭,在这种情况下,任务r将被丢弃。参数:r—请求执行的可运行任务e—尝试执行该任务的执行程序
             */
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                if (!e.isShutdown()) {
                    e.getQueue().poll();
                    e.execute(r);
                }
    
    

    Cpu密集型

    int availableProcessors = Runtime.getRuntime().availableProcessors();  //获取Cpu核数,适合设置核心线程池的大小
    

    IO密集型

    int availableProcessors = Runtime.getRuntime().availableProcessors();   //
    int maximumPoolSize = availableProcessors * 2;   // Io密集型一般设置为Cpu核数的两倍,防止
    

    ForkJoin

    • 任务拆分
    public class DoMain extends RecursiveTask<Long> {
    
        private Long start;
        private Long end;
        private final Long threshold = 10_0000_0000L;
    
        public DoMain(Long start, Long end) {
            this.start = start;
            this.end = end;
        }
    
        /**
        	递归分解大数据,每次进行两段两段操作
        */
        @Override
        protected Long compute() {
            Long res = 0L;
            if ((end - start) > threshold) {
                Long middle = (end + start) / 2;
                //分两次进行计算
                ForkJoinTask<Long> fork1 = new DoMain(start, middle).fork();
                Long res1 = fork1.join();
                ForkJoinTask<Long> fork2 = new DoMain(middle, end).fork();
                Long res2 = fork2.join();
                res = res1 + res2;
            } else {
                for (Long i = start; i < end; i++) {
                    res += i;
                }
            }
    
            return res;
        }
    }
    
      //这样创建线程不规范,这里只是简易操作!
            new Thread(() -> {
                long l = System.currentTimeMillis();
                DoMain doMain = new DoMain(0L, 500_0000_0000L);
                ForkJoinTask<Long> submit = new ForkJoinPool().submit(doMain);
                try {
                    System.out.println("forkJoin输出结果:" + submit.get());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
                System.out.println("forkJoin所用时间: " + (System.currentTimeMillis() - l));
            }).start();
    
            //这样创建线程不规范,这里只是简易操作!
            new Thread(() -> {
                long start = System.currentTimeMillis();
                Long res = 0L;
                for (Long i = 0L; i < 500_0000_0000L; i++) {
                    res += i;
                }
                System.out.println("普通循环输出结果:" + res);
                System.out.println("普通所用时间" + (System.currentTimeMillis() - start));
            }).start();
    

    计算对比

    stream环输出结果: 124999999750000000
    stream所用时间2304
    普通循环输出结果:124999999750000000
    普通所用时间14116
    forkJoin输出结果:124999999750000000
    forkJoin所用时间: 14468

    stream流计算

            new Thread(() -> {
                long start = System.currentTimeMillis();
                long longStream = LongStream.range(0L, 5_0000_0000L).parallel().reduce(0L, Long::sum);
                System.out.println("stream环输出结果: " + longStream);
                System.out.println("stream所用时间" + (System.currentTimeMillis() - start));
            }).start();
    

    volatile

    • 保证了可见性
    • 不保证原子性(也就是多线程情况下,无法保证同一个值被多个线程修改)
    • 保证了禁止指令重排(当程序启动时,它可能并不是按照我们代码的顺序执行,比如初始化,可能就不是按照我们写的代码步骤来的,这就是指令重排,保证指令不重排就可以使用volatile关键字进行声明)
    /**
     * 1.使用volatile禁止指令重排
     * 2. 使用AtomicInteger原子类保证是原子操作
     */
    public static volatile AtomicInteger num = new AtomicInteger();
    
    public static void main(String[] args) {
    
        for (int i = 0; i < 20; i++) {
            new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    //进行加一操作
                    num.getAndIncrement();
                }
            }).start();
        }
    
        //当线程数大于2时,暂停main线程,让给其他线程执行
        while (Thread.activeCount() > 2) {
            Thread.yield();
        }
        System.out.println(num);
    

    原子类操作源码

    public final int getAndIncrement() {
        return unsafe.getAndAddInt(this, valueOffset, 1);
    }
    
    	//原子类底层代码,使用了CAS(比较替换算法,也就是自旋锁)
    	// compareAndSwapInt底层是调用c++操作内存,对应的是native关键字
        public final int getAndAddInt(Object var1, long var2, int var4) {
            int var5;
            do {
                var5 = this.getIntVolatile(var1, var2);
            } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
    
            return var5;
        }
    

    CAS简单实现

    栗子:

    @SneakyThrows
    public static void main(String[] args) {
    
        CasLock casLock = new CasLock();
    
        new Thread(()->{
            try {
                casLock.lock();
            }catch (Exception e) {
                e.printStackTrace();
            }finally {
                casLock.unLock();
            }
    
        }, "Thread1").start();
    
        TimeUnit.SECONDS.sleep(2);
    
        new Thread(()->{
            try {
                casLock.lock();
            }catch (Exception e) {
                e.printStackTrace();
            }finally {
                casLock.unLock();
            }
    
        }, "Thread2").start();
    
    }
    
    
    public static class CasLock {
        AtomicReference<Thread> lock = new AtomicReference<>();
    
        public void lock() {
            Thread thread = Thread.currentThread();
            if (lock.get() == null) { //拿到泛型中Thread的值进行比较
                System.out.println(thread.getName() + "----> 开始自旋...");
            }
    
            while (lock.compareAndSet(null, thread)) {
    
            }
        }
    
        public void unLock() {
            Thread thread = Thread.currentThread();
            System.out.println(thread.getName() + "----> 解锁成功!");
            //解锁
            lock.compareAndSet(thread, null);
        }
    
    }
    

    __EOF__

  • 本文作者: 阿辉ya
  • 本文链接: https://www.cnblogs.com/curryAhui/p/16099200.html
  • 关于博主: 评论和私信会在第一时间回复。或者直接私信我。
  • 版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!
  • 声援博主: 如果您觉得文章对您有帮助,可以点击文章右下角推荐一下。
  • 相关阅读:
    linux日志审计常用命令
    crmeb知识付费2.1免授权版本,包含PC端,可包更新
    Cisco简单配置(十六)—EIGRP
    字节迎来新 CFO,或重启上市;马斯克以 440 亿美元收购 Twitter;FFmpeg 支持 JPEG-XL|极客头条
    matplotlib从起点出发(8)_Tutorial_8_Legend
    Redis
    vue3面试题
    2016-2023全国MEM国家A类线趋势图:浙大MEM要高多少?
    数组拆分-力扣561-C++
    PostgreSQL教程(三十一):服务器管理(十三)之监控数据库活动
  • 原文地址:https://www.cnblogs.com/curryAhui/p/16099200.html