• 异步模式之工作线程


    工作线程模式

    基本定义

    例如,海底捞的服务员(线程),轮流处理每位客人的点餐(任务),如果为每位客人都配一名专属的服务员,那么成本就太高了(对比另一种多线程设计模式:Thread-Per-Message)

    • 注意,不同任务类型应该使用不同的线程池,这样能够避免饥饿,并能提升效率

    例如,如果一个餐馆的工人既要招呼客人(任务类型A),又要到后厨做菜(任务类型B)显然效率不咋地,分成服务员(线程池A)与厨师(线程池B)更为合理

    饥饿

    固定大小线程池会有饥饿现象 :

    • 两个工人是同一个线程池中的两个线程

    • 他们要做的事情是:为客人点餐和到后厨做菜,这是两个阶段的工作 :

      • 客人点餐:必须先点完餐,等菜做好,上菜,在此期间处理点餐的工人必须等待
      • 后厨做菜:直接做
    • 比如工人A 处理了点餐任务,接下来它要等着 工人B 把菜做好,然后上菜,他俩也配合的蛮好

    • 但现在同时来了两个客人,这个时候工人A 和工人B 都去处理点餐了,这时没人做饭了,饥饿

    饥饿的解决办法

    饥饿现象的演示
    package cn.knightzz.pattern;
    
    import lombok.extern.slf4j.Slf4j;
    import org.junit.Test;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    import java.util.Random;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    
    
    @SuppressWarnings("all")
    @Slf4j(topic = "c.DeadLock")
    public class DeadLockTest {
    
        List<String> MENU = Arrays.asList("地三鲜", "辣子鸡", "宫保鸡丁", "红烧肉");
        Random RANDOM = new Random();
    
        String cooking(){
            return MENU.get(RANDOM.nextInt(MENU.size()));
        }
    
        @Test
        public void deadLock01() throws IOException {
    
            ExecutorService executorService = Executors.newFixedThreadPool(2);
    
            executorService.execute(() ->{
                // 处理点餐
                log.debug("处理点餐!");
                // 新创建线程做菜
                Future<String> future = executorService.submit(() -> {
                    log.debug("做菜..");
                    return cooking();
                });
                try {
                    log.debug("上菜 : {} ", future.get());
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
            });
            executorService.execute(() ->{
                // 处理点餐
                log.debug("处理点餐!");
                // 新创建线程做菜
                Future<String> future = executorService.submit(() -> {
                    log.debug("做菜..");
                    return cooking();
                });
                try {
                    log.debug("上菜 : {} ", future.get());
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
            });
            System.in.read();
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65

    上面的代码可以看到, 服务员先处理点餐, 然后新创建线程处理做菜, 当只有一位客人的时候可以可以正常处理的, 但是如果有两位客人, 但是线程池大小只有2, 就会饥饿线程, 导致死锁image-20220902095503274

    解决办法

    增加线程池的核心线程数也是一种办法, 但是我们不能一直使用这种方法, 这样会减少CPU的利用率, 因为当客人少的时候, 核心线程还是这么多

    解决办法也很简单, 不同的任务使用不同的线程池去处理 :

    • 点餐线程使用点餐线程池去处理
    • 做饭线程使用做饭线程池去处理
     ExecutorService waiterPool = Executors.newFixedThreadPool(2);
            ExecutorService cookiePool = Executors.newFixedThreadPool(2);
    
            waiterPool.execute(() ->{
                // 处理点餐
                log.debug("处理点餐!");
                // 新创建线程做菜
                Future<String> future = cookiePool.submit(() -> {
                    log.debug("做菜..");
                    return cooking();
                });
                try {
                    log.debug("上菜 : {} ", future.get());
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
            });
            waiterPool.execute(() ->{
                // 处理点餐
                log.debug("处理点餐!");
                // 新创建线程做菜
                Future<String> future = cookiePool.submit(() -> {
                    log.debug("做菜..");
                    return cooking();
                });
                try {
                    log.debug("上菜 : {} ", future.get());
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
            });
            System.in.read();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    线程池大小设置

    创建多少线程池合适 ?

    • 过小会导致程序不能充分地利用系统资源、容易导致饥饿

    • 过大会导致更多的线程上下文切换,占用更多内存

    通常根据不同类型 : CPU密集型运算 和 IO密集型运算

    CPU 密集型运算

    通常采用 cpu 核数 + 1 能够实现最优的 CPU 利用率,+1 是保证当线程由于页缺失故障(操作系统)或其它原因

    导致暂停时,额外的这个线程就能顶上去,保证 CPU 时钟周期不被浪费

    IO密集型运算

    CPU 不总是处于繁忙状态,例如,当你执行业务计算时,这时候会使用 CPU 资源,但当你执行 I/O 操作时、远程

    RPC 调用时,包括进行数据库操作时,这时候 CPU 就闲下来了,你可以利用多线程提高它的利用率。

    经验公式如下 :

    • 线程数 = 核数 * 期望 CPU 利用率 * 总时间(CPU计算时间+等待时间) / CPU 计算时间

    例如 4 核 CPU 计算时间是 50% ,其它等待时间是 50%,期望 cpu 被 100% 利用,套用公式 :

    • 4 * 100% * 100% / 50% = 8

    例如 4 核 CPU 计算时间是 10% ,其它等待时间是 90%,期望 cpu 被 100% 利用,套用公式 :

    • 4 * 100% * 100% / 10% = 40
  • 相关阅读:
    【微信小程序】利用MPFlutter开发微信小程序
    GBASE 8A v953报错集锦44--sftp 加载大文件报错和tmp 目录权限改变导致加载失败
    sqlserver查询多个分类的最新时间数据
    Latex安装与使用
    2E服务-WriteDataByIdentifier
    Centos7虚拟机硬盘扩容 + 修改Docker默认存储位置
    当你离开现在的公司,你的百万业务量还能保持吗?
    Vue+Electron打包桌面应用(从零到一完整教程)
    拖延症:关于如何停止拖延的科学指南
    【RabbitMQ实战】01 3分钟在Linux上安装RabbitMQ
  • 原文地址:https://blog.csdn.net/weixin_40040107/article/details/126680470