• InheritableThreadLocal 在线程池中进行父子线程间消息传递出现消息丢失的解析


    在日常研发过程中,我们经常面临着需要在线程内,线程间进行消息传递,比如在修改一些开源组件源码的过程中,需要将外部参数透传到内部,如果进行方法参数重载,则涉及到的改动量过大,这样,我们可以依赖ThreadLocal 来进行消息传递。

    ThreadLocal 是 存储在线程栈帧中的一块数据存储区域,其可以做到线程与线程之间的读写隔离。

    但是在我们的日常场景中,经常会出现 父线程 需要向子线程中传递消息,而 ThreadLocal  仅能在当前线程上进行数据缓存,因此 我们需要使用 InheritableThreadLocal  来实现 父子线程间的消息传递

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    // 定义消息<br>public class ThreadLocalMessage {
     
     
        private final InheritableThreadLocal<Msg> msg;
     
        private ThreadLocalMessage() {
            msg = new InheritableThreadLocal<>();
        }
     
        public Msg getMsg() {
            return this.msg.get();
        }
     
        public void setMsg(Msg msg) {
            this.msg.set(msg);
        }
     
        public void clear() {
            msg.remove();
        }
     
        private static final ThreadLocalMessage threadLocalMessage = new ThreadLocalMessage();
     
        public static ThreadLocalMessage getInstance() {
            return threadLocalMessage;
        }
     
        /**
         * 获取线程中的消息
         *
         * @return
         */
        public static Msg getOrCreateMsg() {
            Msg msg = ThreadLocalMessage.getInstance().getMsg();
            if (msg == null) {
                msg = new Msg();
            }
            return msg;
        }
     
        public static class Msg {
     
            /**
             * taskId
             */
            private String taskId;
     
            private Map<String, Object> others;
     
            private int retCode;
     
            public Msg() {
            }
     
            public String getTaskId() {
                return taskId;
            }
     
            public void setTaskId(String taskId) {
                this.taskId = taskId;
            }
     
            @Override
            public String toString() {
                return "Msg{" +
                        "taskId='" + taskId + '\'' +
                        ", others=" + others +
                        ", retCode=" + retCode +
                        '}';
            }
        }
     
    }

      

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    // 定义线程池<br>@EnableAsync
    @Configuration
    public class ExecutorConfig {
     
        private final Logger log = LoggerFactory.getLogger(getClass());
     
        @Value("${executor.corePool:2}")
        private Integer corePool;
        @Value("${executor.maxPool:10}")
        private Integer maxPool;
        @Value("${executor.queue:2}")
        private Integer queue;
     
     
        @Bean("cdl-executor")
        public Executor executor() {
            log.info("start async Executor");
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            //配置核心线程数
            executor.setCorePoolSize(corePool);
            //配置最大线程数
            executor.setMaxPoolSize(maxPool);
            //配置队列大小
            executor.setQueueCapacity(queue);
            //配置线程池中的线程的名称前缀
            executor.setThreadNamePrefix("async-executor-");
     
            // 设置拒绝策略
            executor.setRejectedExecutionHandler((r, e) -> {
                // .....
            });
     
            // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
            //执行初始化
            executor.initialize();
            return executor;<br>        // 使用TTL 初始化 executor
            //return TtlExecutors.getTtlExecutor(executor);
        }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    // 创建子线程进行消息传递并打印<br>public String test() throws Exception{
            for (int i = 0 ; i < 20; i++){
                ThreadLocalMessage.Msg msg = ThreadLocalMessage.getOrCreateMsg();
                msg.setTaskId("task_id_"+i);
                ThreadLocalMessage.getInstance().setMsg(msg);
                myService.testThread(i);
                ThreadLocalMessage.getInstance().clear();
            }
            return "ok";
        }

      

    经过代码测试,我们创建了一个池子大小为10 的线程,并发启动了20个线程去进行父子线程消息传递,结果如下:

     

     

     

    经过测试,我们发现 只有10个线程 的消息传递成功了,其余10个线程的消息均丢失了,这是什么原因呢。。。

    遇到这个问题,我们首先得弄清楚 InheritableThreadLocal 是如何在父子线程间进行消息传递的

    InheritableThreadLocal 在父线程创建子线程的时候,会将父线程中InheritableThreadLocal  中存储的数据 拷贝一份 存储到子线程的 InheritableThreadLocal  中

    而我们使用的 线程池,线程池是会反复利用线程的,当线程池没有被创建满,每次都是新创建线程,直到线程池创建满了,再需要使用线程就会从线程池中拿已经创建好的线程。

    问题就出在这里,由于后面的线程 是从线程池中去捞已经创建好的线程,不会走创建逻辑,也就无法触发 InheritableThreadLocal 中向子线程 拷贝,这也就是为什么  InheritableThreadLocal  合并线程池 使用时,出现了 消息丢失的原因

     

    如何解决????

    阿里巴巴开源的TTL ,用于解决线程池中的父子线程复用,线程数据传递,可以完美解决这个问题

    1
    2
    3
    4
    5
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>transmittable-thread-local</artifactId>
        <version>2.0.0</version>
    </dependency>

      

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    @EnableAsync
    @Configuration
    public class ExecutorConfig {
     
        private final Logger log = LoggerFactory.getLogger(getClass());
     
        @Value("${executor.corePool:2}")
        private Integer corePool;
        @Value("${executor.maxPool:10}")
        private Integer maxPool;
        @Value("${executor.queue:2}")
        private Integer queue;
     
     
        @Bean("cdl-executor")
        public Executor executor() {
            log.info("start async Executor");
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            //配置核心线程数
            executor.setCorePoolSize(corePool);
            //配置最大线程数
            executor.setMaxPoolSize(maxPool);
            //配置队列大小
            executor.setQueueCapacity(queue);
            //配置线程池中的线程的名称前缀
            executor.setThreadNamePrefix("async-executor-");
     
            // 设置拒绝策略
            executor.setRejectedExecutionHandler((r, e) -> {
                // .....
            });
     
            // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
            //执行初始化
            executor.initialize();
            // 使用TTL 的 executor
            return TtlExecutors.getTtlExecutor(executor);
            //return executor;
        }
    }

      

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    public class ThreadLocalMessage {
     
        private final TransmittableThreadLocal<Msg> msg;
     
        private ThreadLocalMessage() {
            msg = new TransmittableThreadLocal<>();
        }
     
        public Msg getMsg() {
            return this.msg.get();
        }
     
        public void setMsg(Msg msg) {
            this.msg.set(msg);
        }
     
        public void clear() {
            msg.remove();
        }
     
        private static final ThreadLocalMessage threadLocalMessage = new ThreadLocalMessage();
     
        public static ThreadLocalMessage getInstance() {
            return threadLocalMessage;
        }
     
        /**
         * 获取线程中的消息
         *
         * @return
         */
        public static Msg getOrCreateMsg() {
            Msg msg = ThreadLocalMessage.getInstance().getMsg();
            if (msg == null) {
                msg = new Msg();
            }
            return msg;
        }
     
        public static class Msg {
     
            /**
             * taskId
             */
            private String taskId;
     
     
            public Msg() {
            }
     
            public String getTaskId() {
                return taskId;
            }
     
            public void setTaskId(String taskId) {
                this.taskId = taskId;
            }
     
            @Override
            public String toString() {
                return "Msg{" +
                        "taskId='" + taskId + '\'' +
                        '}';
            }
        }
     
    }

      

    按照之前的调用方法再试一次,结果如下:

     

     可以发现未出现数据丢失的情况

     

  • 相关阅读:
    Python常见设计模式库之python-patterns使用详解
    为什么推荐使用SSL付费证书?
    太阳能发电与蓄电池研究(Matlab代码实现)
    Pytorch实现Bert模型
    Go语言入门【4】循环语句
    牛客网:设计LRU缓存结构 设计LFU缓存结构
    使用pytorch搭建MobileNetV2并基于迁移学习训练
    【UVA No. 442】 矩阵链乘 Matrix Chain Multiplication
    基于神经网络匹配度的模拟电路故障诊断
    好用的方法记录
  • 原文地址:https://www.cnblogs.com/hujunhui530/p/16423851.html