• 【周阳-JUC入门】【02】Callable_ForkJoin_异步回调



    持续学习&持续更新中…

    学习态度:守破离


    8锁问题

    class Phone {
    /*    public static synchronized void sendEmail() {
            try {
                TimeUnit.SECONDS.sleep(4); // 暂停4秒
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "----sendEmail");
        }
    
        public synchronized void sendSMS() {
            System.out.println(Thread.currentThread().getName() + "----sendSMS");
        }*/
    /*    public static synchronized void sendEmail() {
            try {
                TimeUnit.SECONDS.sleep(4); // 暂停4秒
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "----sendEmail");
        }
    
        public static synchronized void sendSMS() {
            System.out.println(Thread.currentThread().getName() + "----sendSMS");
        }*/
        public synchronized void sendEmail() {
            try {
                TimeUnit.SECONDS.sleep(4); // 暂停4秒
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "----sendEmail");
        }
    
        public synchronized void sendSMS() {
            System.out.println(Thread.currentThread().getName() + "----sendSMS");
        }
    
        public void hello() {
            System.out.println(Thread.currentThread().getName() + "----hello");
        }
    }
    
    /* 多线程8锁问题:
        1 标准访问,先打印短信还是邮件?
        2 邮件方法暂停4秒,先打印短信还是邮件?
        3 新增一个普通(无synchronized)的hello方法,是先打邮件还是hello?
        4 现在有两部手机,一个调用短信方法,一个调用邮件方法,先打印短信还是邮件?
        5 两个静态同步方法,同一部手机,先打印短信还是邮件?
        6 两个静态同步方法,两部手机,一个调用短信方法,一个调用邮件方法,先打印短信还是邮件?
        7 1个静态同步方法,1个普通同步方法,1部手机,先打印短信还是邮件?
        8 1个静态同步方法,1个普通同步方法,2部手机,先打印短信还是邮件?
     */
    public class Lock8 {
        public static void main(String[] args) throws Exception {
            Phone phone = new Phone();
            Phone phone2 = new Phone();
    //        new Thread(() -> phone.sendEmail(), "A").start();
    //        Thread.sleep(200); // 确保sendEmail方法先被执行
    //        new Thread(() -> phone.sendSMS(), "B").start();
    
    //        new Thread(() -> phone.hello(), "C").start();
    //        new Thread(() -> phone2.sendSMS(), "B").start();
    //        new Thread(() -> phone.sendEmail(), "A").start();
    //        Thread.sleep(200); // 确保sendEmail方法先被执行
    //        new Thread(() -> phone2.sendSMS(), "B").start();
        }
    }
    
    /*
    一个对象里面如果有多个synchronized方法,
    某一个时刻内,只要一个线程去调用其中的一个synchronized方法了,其它的线程都只能等待,
    换句话说,某一个时刻内,只能有唯一一个线程去访问这些synchronized方法
    一个对象里面的多个synchronized实例方法锁的都是当前对象this,
    被锁定后,其它的线程都不能进入到当前对象的其它的synchronized方法
    
    当一个线程试图访问同步代码块时,它首先必须得到锁,退出或抛出异常时必须释放锁。
    
    加个普通方法后和同步锁无关
    
    换成两个对象后,不是同一把锁了
    
    synchronized实现同步的基础:Java中的每一个对象都可以作为锁。
    具体表现为以下3种形式。
    对于普通同步方法,锁是当前实例对象。
    对于静态同步方法,锁是当前类的Class对象。
    对于同步方法块,锁是synchronized括号里配置的对象
    
    也就是说如果一个实例对象的非静态同步方法获取锁后,
    该实例对象的其他非静态同步方法必须等待获取锁的方法释放锁后才能获取锁,
    可是别的实例对象的非静态同步方法因为跟该实例对象的非静态同步方法用的是不同的锁,
    所以无须等待该实例对象已获取锁的非静态同步方法释放锁就可以获取他们自己的锁。
    
    所有的静态同步方法用的也是同一把锁——类对象本身,
    这两把锁是两个不同的对象(一个是this,一个是class),所以静态同步方法与非静态同步方法之间是不会竞争锁的。
    但是一旦一个静态同步方法获取锁后,其他的静态同步方法都必须等待该方法释放锁后才能获取锁
     */
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97

    容器线程不安全

    List不安全

    /*写时复制:
    CopyOnWrite容器即写时复制的容器。往一个容器添加元素的时候,不直接往当前容器Object[]添加,
    而是先将当前容器Object[]进行Copy,复制出一个新的容器Object[] newElements,
    然后向新的容器Object[] newElements里添加元素。
    添加元素后,再将原容器的引用指向新的容器setArray(newElements)。
    这样做的好处是可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。
    所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器。
    */
    
    public class NotSafeDemo {
        public static void main(String[] args) {
    //        List list = new ArrayList<>();
    
    //        解决方案
    //        List list = new Vector<>();
    //        List list = Collections.synchronizedList(new ArrayList<>());
            List<String> list = new CopyOnWriteArrayList<>();
            for (int i = 1; i <= 30; i++) {
                new Thread(() -> { // 让多个线程同时对容器进行读、写操作
                    list.add(UUID.randomUUID().toString());
                    System.out.println(list);
                }, String.valueOf(i)).start();
            }
        }
    
        public static void main1(String[] args) {
            List<String> list = new ArrayList<>();
            list.add("A");
            list.add("B");
            list.add("C");
            for (String s : list) {
                System.out.println(s);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    Set、Map不安全

    public class NotSafeDemo2 {
        public static void main(String[] args) {
    //        Set set = new HashSet<>();
    //        解决方案:
    //        Set set = Collections.synchronizedSet(new HashSet<>());
            Set<String> set = new CopyOnWriteArraySet<>();
            for (int i = 1; i <= 30; i++) {
                new Thread(() -> {
                    set.add(UUID.randomUUID().toString());
                    System.out.println(set);
                }, String.valueOf(i)).start();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    public class NotSafeDemo3 {
        public static void main(String[] args) {
    //        Map map = new HashMap<>();
    //        解决方案:
    //        Map map = Collections.synchronizedMap(new HashMap<>());
            Map<String, String> map = new ConcurrentHashMap<>();
            for (int i = 1; i <= 30; i++) {
                new Thread(() -> { // 让多个线程同时对容器进行读、写操作
                    map.put(Thread.currentThread().getName(), UUID.randomUUID().toString());
                    System.out.println(map);
                }, String.valueOf(i)).start();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    Callable

    class MyCallable implements Callable<String> {
        @Override
        public String call() throws InterruptedException {
            Thread.sleep(1000);
            System.out.println("异步任务执行中...");
            TimeUnit.SECONDS.sleep(1);
            System.out.println("异步任务执行完毕");
            return "执行成功!执行结果:" + Thread.currentThread().getName();
        }
    }
    
    /*
    FutureTask:未来的任务,可以看做是一个异步任务
    
    主线程中需要执行比较耗时的操作,但又不想阻塞主线程时,可以把这些任务交给FutureTask对象让其在后台完成
    
    当主线程将来需要时,就可以通过Future对象获得后台作业的计算结果或者执行状态。
    
    get方法可以执行多次,也即任务的执行结果可以获取多次,但,任务只会执行一次
    
    第一次调用get方法会阻塞等待FutureTask执行完任务,也就是说,仅在计算完成后才能检索结果,如果计算尚未完成,则阻塞 get 方法。
    
    因此一般将耗时任务丢给FutureTask即可,主线程(工作线程)之后想要使用执行结果时再去get即可
     */
    
    public class Three {
        public static void main(String[] args) {
            MyCallable myCallable = new MyCallable();
            FutureTask<String> futureTask = new FutureTask<>(myCallable);
            new Thread(futureTask).start(); // 一个异步任务只会执行一次
            new Thread(futureTask).start();
    
            // 这是两个异步任务
    //        new Thread(new FutureTask<>(myCallable)).start();
    //        new Thread(new FutureTask<>(myCallable)).start();
        }
    
        public static void main1(String[] args) throws Exception {
            FutureTask<String> futureTask = new FutureTask<>(new MyCallable()); // 创建一个异步任务
            new Thread(futureTask, "A").start(); // 让该异步任务去后台执行,不要妨碍主工作线程
    
            { // 主工作线程的工作
                System.out.println("main thread begin work 1.");
                TimeUnit.SECONDS.sleep(5);
                System.out.println("main thread end work 1.");
            }
    
            // 主工作线程需要用到异步任务的计算结果了
            System.out.println(futureTask.get()); // 想要使用异步任务的计算结果时就去调用它的get方法,get方法在执行完成之前会阻塞
    
            { // 主工作线程的工作
                System.out.println("main thread begin work 2.");
                System.out.println("main thread end work 2.");
            }
    
            System.out.println(futureTask.get()); // 异步任务的计算结果可以通过get方法获取多次
            System.out.println(futureTask.get());
            System.out.println(futureTask.get());
        }
    
        /*
        public static void main(String[] args) throws Exception {
            final ExecutorService executorService = Executors.newFixedThreadPool(10);
            final Future result = executorService.submit(new MyCallable());
            System.out.println(result.get());
            executorService.shutdown();
        }
         */
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69

    FutureTask的一个小应用:

    class A implements Callable<Integer> {
        @Override
        public Integer call() throws Exception {
            TimeUnit.SECONDS.sleep(1);
            return 10;
        }
    }
    
    class B implements Callable<Integer> {
    
        @Override
        public Integer call() throws Exception {
            TimeUnit.SECONDS.sleep(2);
            return 20;
        }
    }
    
    class C implements Callable<Integer> {
    
        @Override
        public Integer call() throws Exception {
            TimeUnit.SECONDS.sleep(3);
            return 30;
        }
    }
    
    public class ApplyCallable {
    
        public static void main(String[] args) throws Exception {
            int i = 100;
            int result = 0;
            final FutureTask<Integer> a = new FutureTask<>(new A());
            final FutureTask<Integer> b = new FutureTask<>(new B());
            final FutureTask<Integer> c = new FutureTask<>(new C());
            new Thread(a).start();
            new Thread(b).start();
            new Thread(c).start();
            result = i + a.get() + b.get() + c.get();
            System.out.println(result);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    ExecutorService和FutureTask的使用:

        public static void main(String[] args) throws Exception {
            final ExecutorService executorService = Executors.newSingleThreadExecutor();
            final FutureTask<String> future = (FutureTask<String>) executorService.submit(new Callable<String>() {
                @Override
                public String call() throws Exception {
                    System.out.println("被调用");
                    return Thread.currentThread().getName() + "\t 哈哈哈哈";
                }
            });
    //        System.out.println(future.get());
    //        System.out.println(future.get());
    //        System.out.println(future.get());
    //        System.out.println(future.get());
    //        executorService.shutdown();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    三个辅助类

    CountDownLatch(做减法)

    import java.util.concurrent.CountDownLatch;
    
    /**** 做减法 ****/
    
    /*
        让6个线程先执行完,再执行第7个线程
        (7个人在教室上晚自习,需要让班长最后走,因为他得关窗锁门)
    
        ========= 类似于倒计时计数器 ========
     */
    
    /*
     * CountDownLatch主要有两个方法,当一个或多个线程调用await方法时,这些线程会阻塞。
     * 其它线程调用countDown方法会将计数器减1(调用countDown方法的线程不会阻塞),
     * 当计数器的值变为0时,因await方法阻塞的线程会被唤醒,继续执行。
     */
    
    /**
     * @Description:
     *  *让一些线程阻塞直到另一些线程完成一系列操作后才被唤醒。
     *
     * CountDownLatch主要有两个方法,当一个或多个线程调用await方法时,这些线程会阻塞。
     * 其它线程调用countDown方法会将计数器减1(调用countDown方法的线程不会阻塞),
     * 当计数器的值变为0时,因await方法阻塞的线程会被唤醒,继续执行。
     *
     * 解释:6个同学陆续离开教室后值班同学才可以关门。
     *       main主线程必须要等前面6个线程完成全部工作后,自己才能开干
     */
    public class CountDownLatchDemo {
        public static void main(String[] args) throws InterruptedException {
            int members = 6;
            CountDownLatch countDownLatch = new CountDownLatch(members);
    
            for (int i = 1; i <= members; i++) {
                new Thread(() -> {
                    System.out.println(Thread.currentThread().getName() + "\t离开了教室");
    
                    countDownLatch.countDown();
    
                }, String.valueOf(i)).start();
            }
    
            countDownLatch.await();
            System.out.println("班长" + Thread.currentThread().getName() + "\t关窗锁门并离开了教室");
        }
        public static void main1(String[] args) {
            for (int i = 1; i <= 6; i++) { //6个上自习的同学,各自离开教室的时间不一致
                new Thread(() -> {
                    System.out.println(Thread.currentThread().getName() + "\t离开了教室");
                }, String.valueOf(i)).start();
            }
            System.out.println("班长" + Thread.currentThread().getName() + "\t关窗锁门并离开了教室");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54

    CyclicBarrier(做加法)

    import java.util.concurrent.CyclicBarrier;
    
    /**** 做加法 ****/
    
    /*
     * CyclicBarrier
     * 的字面意思是可循环(Cyclic)使用的屏障(Barrier)。它要做的事情是,
     * 让一组线程到达一个屏障(也可以叫同步点)时被阻塞,
     * 直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。
     * 线程进入屏障通过CyclicBarrier的await()方法。
     */
    
    // 集齐七龙珠方可召唤神龙、人到齐再开会
    public class CyclicBarrierDemo {
        public static void main(String[] args) {
            int num = 7;
    
            //CyclicBarrier(int parties, Runnable barrierAction) 
            CyclicBarrier cyclicBarrier = new CyclicBarrier(num, () -> System.out.println(Thread.currentThread().getName() + "召唤神龙"));
    
            for (int i = 1; i <= num; i++) {
                final int finalI = i;
                new Thread(() -> {
                    try {
                        System.out.println(Thread.currentThread().getName() + "收集到了第" + finalI + "颗龙珠");
                        cyclicBarrier.await();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }, String.valueOf(i)).start();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    Semaphore(信号量)

    /*
        Semaphore:信号量、信号灯
     */
    
    /*
    在信号量上我们定义两种操作:
      acquire(获取) 当一个线程调用acquire操作时,它要么通过成功获取信号量(信号量减1),
             要么一直等下去,直到有线程释放信号量,或超时。
      release(释放)实际上会将信号量的值加1,然后唤醒等待的线程。
    信号量主要用于两个目的,一个是用于多个共享资源的互斥使用,另一个用于并发线程数的控制。
     */
    
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.TimeUnit;
    
    /*
        模拟:7个人要上厕所,但只有3个坑位 【多个线程,多个资源】
         
        思考一下,假设 Semaphore semaphore = new Semaphore(1); 呢?也就是只有1个坑位呢? 【多个线程,1个资源】
     */
    public class SemaphoreDemo {
        public static void main(String[] args) {
            int ren = 7; // 7个人,7个线程
            int kengwei = 3; // 3个坑位
            Semaphore semaphore = new Semaphore(kengwei);
    
            for (int i = 1; i <= ren; i++) {
                new Thread(() -> {
                    try {
                        semaphore.acquire();
    
                        System.out.println(Thread.currentThread().getName() + "正在用");
                        TimeUnit.SECONDS.sleep(3);
                        System.out.println(Thread.currentThread().getName() + "用完了");
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        semaphore.release();
                    }
                }, String.valueOf(i)).start();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    读写锁ReentrantReadWriteLock

    class MyCache {
        private volatile Map<String, Object> map = new HashMap<>();
        private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
        private final ReentrantReadWriteLock.WriteLock writeLock =
                (ReentrantReadWriteLock.WriteLock) rwLock.writeLock();
        private final ReentrantReadWriteLock.ReadLock readLock =
                (ReentrantReadWriteLock.ReadLock) rwLock.readLock();
    
        public void put(String key, Object value) {
    //        rwLock.writeLock().lock();
            writeLock.lock();
            try {
                System.out.println(Thread.currentThread().getName() + "\t 正在写" + key);
                //暂停一会儿线程
                try {
                    TimeUnit.MILLISECONDS.sleep(300);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                map.put(key, value);
                System.out.println(Thread.currentThread().getName() + "\t 写完了" + key);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
    //            rwLock.writeLock().unlock();
                writeLock.unlock();
            }
    
        }
    
        public Object get(String key) {
    //        rwLock.readLock().lock();
            readLock.lock();
            Object result = null;
            try {
                System.out.println(Thread.currentThread().getName() + "\t 正在读" + key);
                try {
                    TimeUnit.MILLISECONDS.sleep(300);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                result = map.get(key);
                System.out.println(Thread.currentThread().getName() + "\t 读完了" + result);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
    //        rwLock.readLock().unlock();
                readLock.unlock();
            }
            return result;
        }
    }
    
    
    /*
    写时加锁
    读时不用加锁
     */
    
    /*
    多个线程同时读一个资源类没有任何问题,所以为了满足并发量,读取共享资源时,多个线程应该可以同时进行。
    但是
    如果有一个线程想去写共享资源类,就不应该再有其他线程可以对该资源进行读写操作
    
    小总结:
            读——读能共存
            读——写不能共存
            写——写不能共存
     */
    public class ReadWriteLockDemo {
    
        public static void main(String[] args) {
            MyCache myCache = new MyCache();
    
            for (int i = 1; i <= 5; i++) {
                final int num = i;
                new Thread(() -> {
                    myCache.put(num + "", num + "");
                }, String.valueOf(i)).start();
            }
    
            for (int i = 1; i <= 5; i++) {
                final int num = i;
                new Thread(() -> {
                    myCache.get(num + "");
                }, String.valueOf(i)).start();
            }
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90

    Java8之流式计算复习

    JUF:java.util.function

    在这里插入图片描述

    Java内置核心四大函数式接口

    在这里插入图片描述

    //R apply(T t);函数型接口,一个参数,一个返回值
    Function<String,Integer> function = t ->{return t.length();};
    System.out.println(function.apply("abcd"));
    
    //boolean test(T t);断定型接口,一个参数,返回boolean
    Predicate<String> predicate = t->{return t.startsWith("a");};
    System.out.println(predicate.test("a"));
    
    // void accept(T t);消费型接口,一个参数,没有返回值
    Consumer<String> consumer = t->{
        System.out.println(t);
    };
    consumer.accept("javaXXXX");
    
    //T get(); 供给型接口,无参数,有返回值
    Supplier<String> supplier =()->{return UUID.randomUUID().toString();};
    System.out.println(supplier.get());
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    Stream流

    是什么

    • 流(Stream) 是数据渠道,用于操作数据源(集合、数组等)所生成的元素序列。
    • 集合讲的是数据,流讲的是计算!

    啥特点

    • Stream 自己不会存储元素
    • Stream 不会改变源对象。相反,他们会返回一个持有结果的新Stream。
    • Stream 操作是延迟执行的。这意味着他们会等到需要结果的时候才执行。

    怎么用

    • 源头=>中间流水线=>结果

      • 创建一个Stream:一个数据源(数组、集合)
      • 中间操作:一个中间操作,处理数据源数据
      • 终止操作:一个终止操作,执行中间操作链,产生结果
      @NoArgsConstructor
      @AllArgsConstructor
      @Data
      @Accessors(chain = true) // 可以链式调用
      class User {
          private Integer id;
          private Integer age;
          private String name;
      }
      
      public class StreamDemo {
          /*
              找出:
                  偶数ID + age > 24 + 用户名转为大写 + 用户名倒着排序 + 只输出用户名 + 只输出一个用户名
           */
          public static void main(String[] args) {
      //        List list = new ArrayList<>();
      //        String[] strs = new String[10];
      //        // list ——> array
      //        final String[] strings1 = list.toArray(new String[list.size()]);
      //        // array ——> list
      //        final List list1 = Arrays.asList(strs);
      
              Arrays.asList(
                      new User(11, 23, "a"),
                      new User(12, 24, "b"),
                      new User(13, 22, "c"),
                      new User(14, 28, "d"),
                      new User(16, 26, "e")
              )
                      .stream()
      //                .filter(user -> user.getAge() > 24 && (user.getId() % 2 == 0))
                      .filter(user -> user.getAge() > 24)
                      .filter(user -> user.getId() % 2 == 0)
                      .map(user -> user.getName().toUpperCase())
                      .sorted((name1, name2) -> name2.compareTo(name1))
                      .limit(1)
                      .forEach(System.out::println);
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40

    分支合并ForkJoin

    原理

    • Fork:把一个复杂任务进行分拆,大事化小
    • Join:把分拆任务的结果进行合并在这里插入图片描述在这里插入图片描述

    相关类

    ForkJoinPool:分支合并池,类比 => 线程池

    在这里插入图片描述

    ForkJoinTask:ForkJoinTask,类比 => FutureTask

    在这里插入图片描述

    RecursiveTask:递归任务:继承后可以实现递归(自己调自己)调用的任务

    在这里插入图片描述

    实例

    // 从1加到100
    // 从0加到100
    
    class ExecuteSumTask extends RecursiveTask<Integer> {
        private static final int JUDGE_VALUE = 10;
        private final int begin;
        private final int end;
        private int result;
    
        public ExecuteSumTask(int begin, int end) {
            this.begin = begin;
            this.end = end;
        }
        
        /*
            @Override
            protected Integer compute() {
                if((end - begin)<=JUDGE_VALUE){
                   for(int i =begin;i <= end;i++){
                        result = result + i;
                   }
                }else{
                    int middle = (begin + end)/2;
                    ExecuteSumTask task01 = new ExecuteSumTask(begin,middle);
                    ExecuteSumTask task02 = new ExecuteSumTask(middle+1,end);
                    task01.fork();
                    task02.fork();
                    result =  task01.join() + task02.join();
                }
                return result;
            }
         */
    
        @Override
        protected Integer compute() {
            if ((end - begin) <= JUDGE_VALUE) {
                for (int i = begin; i <= end; i++) {
                    result += i;
                }
                return result;
            }
    
            int middle = (end + begin) / 2;
            final ForkJoinTask<Integer> task1 = new ExecuteSumTask(begin, middle).fork();
            final ForkJoinTask<Integer> task2 = new ExecuteSumTask(middle + 1, end).fork();
            return task1.join() + task2.join();
        }
    }
    
    /**
     * 分支合并例子
     * ForkJoinPool
     * ForkJoinTask
     * RecursiveTask
     */
    public class ForkJoinDemo {
        public static void main(String[] args) throws Exception {
            final ForkJoinPool pool = new ForkJoinPool();
    
            final ForkJoinTask<Integer> task = pool.submit(new ExecuteSumTask(1, 100));
    
            System.out.println(task.get());
    
            pool.shutdown();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66

    异步回调

    CompletableFuture.runAsync

        public static void main1(String[] args) throws Exception {
            // 去执行一个异步任务,相当于启动一个线程,一般不使用
            CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(() -> {
                System.out.println(Thread.currentThread().getName() + "\tcompletableFuture1");
            });
            completableFuture1.get();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    CompletableFuture.supplyAsync

        public static void main2(String[] args) throws Exception {
            //异步回调
            CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
                System.out.println(Thread.currentThread().getName() + "\tcompletableFuture2");
    //            int i = 10/0;
                return 1024;
            });
    
            final Integer result = completableFuture2
                    .whenComplete((t, u) -> { // 只要任务执行完就会回调
    
                        // whenCompleteAsync与whenComplete的区别在于这儿的线程环境
                        System.out.println(Thread.currentThread().getName() + "\twhenComplete");
    
                        // 任务执行成功,t就是任务成功执行的结果
                        // 任务执行失败,t就是null
                        System.out.println("-------t=" + t);
                        // 任务执行成功,u就是null
                        // 任务执行失败,u就是异常信息
                        System.out.println("-------u=" + u);
                    }).exceptionally(f -> { // 任务执行失败才回调
                        System.out.println("-------exception:" + f.getMessage());
                        return 4444;
                    }).get();
    
            System.out.println(result); // 不管任务是否执行成功都可以拿到一个返回值
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    whenComplete回调与whenCompleteAsync回调方法的对比:

        public static void main3(String[] args) throws Exception {
            //异步回调
            CompletableFuture<Integer> completableFuture3 = CompletableFuture.supplyAsync(() -> {
                System.out.println(Thread.currentThread().getName() + "\tcompletableFuture3");
    //            int i = 10/0;
                return 1024;
            });
    
            final Integer result = completableFuture3
                    .whenCompleteAsync((t, u) -> { // 只要任务执行完就会回调
    
                        // whenCompleteAsync与whenComplete的区别在于这儿的线程环境
                        System.out.println(Thread.currentThread().getName() + "\twhenCompleteAsync");
    
                        // 任务执行成功,t就是任务成功执行的结果
                        // 任务执行失败,t就是null
                        System.out.println("-------t=" + t);
                        // 任务执行成功,u就是null
                        // 任务执行失败,u就是异常信息
                        System.out.println("-------u=" + u);
                    }).exceptionally(f -> { // 任务执行失败才回调
                        System.out.println("-------exception:" + f.getMessage());
                        return 4444;
                    }).get();
    
            System.out.println(result); // 不管任务是否执行成功都可以拿到一个返回值
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    再来看一个例子对比一下:

        public static void main(String[] args) throws Exception {
            CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
                System.out.println(Thread.currentThread().getName() + "\completableFuture");
                return 1024;
            });
            completableFuture
                    .whenComplete((t, u) -> {
                        System.out.println(Thread.currentThread().getName() + "\twhenComplete");
                    })
                    .exceptionally(f -> {
                        return 4444;
                    }).get();
    
            Thread.sleep(2000);
    
            new Thread(() -> {
    
                CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
                    System.out.println(Thread.currentThread().getName() + "\tfuture");
                    return 1024;
                });
                try {
                    future
                            .whenComplete((t, u) -> {
                                System.out.println(Thread.currentThread().getName() + "\twhenComplete");
                            })
                            .exceptionally(f -> {
                                return 4444;
                            }).get();
                } catch (Exception e) {
                    e.printStackTrace();
                }
    
            }, "线程a").start();
    
            Thread.sleep(2000);
    
            new Thread(() -> {
    
                CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
                    System.out.println(Thread.currentThread().getName() + "\tfuture");
                    return 1024;
                });
                try {
                    future
                            .whenCompleteAsync((t, u) -> {
                                System.out.println(Thread.currentThread().getName() + "\twhenCompleteAsync");
                            })
                            .exceptionally(f -> {
                                return 4444;
                            }).get();
                } catch (Exception e) {
                    e.printStackTrace();
                }
    
            }, "线程b").start();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57

    CompletableFuture总结

    • runXxx 都是没有返回结果的;supplyXxxx都是可以获取返回结果的
    • 可以传入自定义的线程池,否则就是用默认的线程池
    • whenComplete :是执行当前任务的线程继续执行 whencomplete 的任务
    • whenCompleteAsync: 是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行
    • 方法不以 Async 结尾,意味着 Action 使用相同的线程执行,而 Async 可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)

    参考

    周阳: JUC入门.


    本文完,感谢您的关注支持!


  • 相关阅读:
    kafka怎么实现零拷贝(Zero-Copy)的?
    41、基于深度学习的自适应线性预测(matlab)
    IceRPC之服务器地址与TLS的安全性->快乐的RPC
    hadoop 3.x 伪集群安装
    深入了解RTMP推流技术:视频汇聚EasyCVR低延迟与高稳定性分析
    数据挖掘与机器学习:数据变换
    【面试经典150 | 哈希表】赎金信
    【C】atoi和offsetof的介绍和模拟实现
    SAP UI5 应用开发教程之一百零三 - 如何在 SAP UI5 应用中消费第三方库试读版
    [附源码]Python计算机毕业设计Django房屋租赁信息系统
  • 原文地址:https://blog.csdn.net/weixin_44018671/article/details/125468634