• springBoot异步线程,父子线程数据传递的5种姿势


    在实际开发过程中我们需要父子之间传递一些数据,比如用户信息等。该文章从5种解决方案解决父子之间数据传递困扰

    姿势1:ThreadLocal+TaskDecorator

    用户工具类 UserUtils

    1. /**
    2. *使用ThreadLocal存储共享的数据变量,如登录的用户信息
    3. */
    4. public class UserUtils {
    5. private static final ThreadLocal userLocal=new ThreadLocal<>();
    6. public static String getUserId(){
    7. return userLocal.get();
    8. }
    9. public static void setUserId(String userId){
    10. userLocal.set(userId);
    11. }
    12. public static void clear(){
    13. userLocal.remove();
    14. }
    15. }
    16. 复制代码

    自定义CustomTaskDecorator

    1. /**
    2. * 线程池修饰类
    3. */
    4. public class CustomTaskDecorator implements TaskDecorator {
    5. @Override
    6. public Runnable decorate(Runnable runnable) {
    7. // 获取主线程中的请求信息(我们的用户信息也放在里面)
    8. String robotId = UserUtils.getUserId();
    9. System.out.println(robotId);
    10. return () -> {
    11. try {
    12. // 将主线程的请求信息,设置到子线程中
    13. UserUtils.setUserId(robotId);
    14. // 执行子线程,这一步不要忘了
    15. runnable.run();
    16. } finally {
    17. // 线程结束,清空这些信息,否则可能造成内存泄漏
    18. UserUtils.clear();
    19. }
    20. };
    21. }
    22. }
    23. 复制代码

    ExecutorConfig

    在原来的基础上增加 executor.setTaskDecorator(new CustomTaskDecorator());

    1. @Bean(name = "asyncServiceExecutor")
    2. public Executor asyncServiceExecutor() {
    3. log.info("start asyncServiceExecutor----------------");
    4. //ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    5. //使用可视化运行状态的线程池
    6. ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();
    7. //配置核心线程数
    8. executor.setCorePoolSize(corePoolSize);
    9. //配置最大线程数
    10. executor.setMaxPoolSize(maxPoolSize);
    11. //配置队列大小
    12. executor.setQueueCapacity(queueCapacity);
    13. //配置线程池中的线程的名称前缀
    14. executor.setThreadNamePrefix(namePrefix);
    15. // rejection-policy:当pool已经达到max size的时候,如何处理新任务
    16. // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
    17. executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    18. //增加线程池修饰类
    19. executor.setTaskDecorator(new CustomTaskDecorator());
    20. //增加MDC的线程池修饰类
    21. //executor.setTaskDecorator(new MDCTaskDecorator());
    22. //执行初始化
    23. executor.initialize();
    24. log.info("end asyncServiceExecutor------------");
    25. return executor;
    26. }
    27. 复制代码

    AsyncServiceImpl

    1. /**
    2. * 使用ThreadLocal方式传递
    3. * 带有返回值
    4. * @throws InterruptedException
    5. */
    6. @Async("asyncServiceExecutor")
    7. public CompletableFuture<String> executeValueAsync2() throws InterruptedException {
    8. log.info("start executeValueAsync");
    9. System.out.println("异步线程执行返回结果......+");
    10. log.info("end executeValueAsync");
    11. return CompletableFuture.completedFuture(UserUtils.getUserId());
    12. }
    13. 复制代码

    Test2Controller

    1. /**
    2. * 使用ThreadLocal+TaskDecorator的方式
    3. * @return
    4. * @throws InterruptedException
    5. * @throws ExecutionException
    6. */
    7. @GetMapping("/test2")
    8. public String test2() throws InterruptedException, ExecutionException {
    9. UserUtils.setUserId("123456");
    10. CompletableFuture<String> completableFuture = asyncService.executeValueAsync2();
    11. String s = completableFuture.get();
    12. return s;
    13. }
    14. 复制代码

    姿势2:RequestContextHolder+TaskDecorator

    自定义CustomTaskDecorator

    1. /**
    2. * 线程池修饰类
    3. */
    4. public class CustomTaskDecorator implements TaskDecorator {
    5. @Override
    6. public Runnable decorate(Runnable runnable) {
    7. // 获取主线程中的请求信息(我们的用户信息也放在里面)
    8. RequestAttributes attributes = RequestContextHolder.getRequestAttributes();
    9. return () -> {
    10. try {
    11. // 将主线程的请求信息,设置到子线程中
    12. RequestContextHolder.setRequestAttributes(attributes);
    13. // 执行子线程,这一步不要忘了
    14. runnable.run();
    15. } finally {
    16. // 线程结束,清空这些信息,否则可能造成内存泄漏
    17. RequestContextHolder.resetRequestAttributes();
    18. }
    19. };
    20. }
    21. }
    22. 复制代码

    ExecutorConfig

    在原来的基础上增加 executor.setTaskDecorator(new CustomTaskDecorator());

    1. @Bean(name = "asyncServiceExecutor")
    2. public Executor asyncServiceExecutor() {
    3. log.info("start asyncServiceExecutor----------------");
    4. //ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    5. //使用可视化运行状态的线程池
    6. ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();
    7. //配置核心线程数
    8. executor.setCorePoolSize(corePoolSize);
    9. //配置最大线程数
    10. executor.setMaxPoolSize(maxPoolSize);
    11. //配置队列大小
    12. executor.setQueueCapacity(queueCapacity);
    13. //配置线程池中的线程的名称前缀
    14. executor.setThreadNamePrefix(namePrefix);
    15. // rejection-policy:当pool已经达到max size的时候,如何处理新任务
    16. // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
    17. executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    18. //增加线程池修饰类
    19. executor.setTaskDecorator(new CustomTaskDecorator());
    20. //增加MDC的线程池修饰类
    21. //executor.setTaskDecorator(new MDCTaskDecorator());
    22. //执行初始化
    23. executor.initialize();
    24. log.info("end asyncServiceExecutor------------");
    25. return executor;
    26. }
    27. 复制代码

    AsyncServiceImpl

    1. /**
    2. * 使用RequestAttributes获取主线程传递的数据
    3. * @return
    4. * @throws InterruptedException
    5. */
    6. @Async("asyncServiceExecutor")
    7. public CompletableFuture<String> executeValueAsync3() throws InterruptedException {
    8. log.info("start executeValueAsync");
    9. System.out.println("异步线程执行返回结果......+");
    10. RequestAttributes attributes = RequestContextHolder.getRequestAttributes();
    11. Object userId = attributes.getAttribute("userId", 0);
    12. log.info("end executeValueAsync");
    13. return CompletableFuture.completedFuture(userId.toString());
    14. }
    15. 复制代码

    Test2Controller

    1. /**
    2. * RequestContextHolder+TaskDecorator的方式
    3. * @return
    4. * @throws InterruptedException
    5. * @throws ExecutionException
    6. */
    7. @GetMapping("/test3")
    8. public String test3() throws InterruptedException, ExecutionException {
    9. RequestAttributes attributes = RequestContextHolder.getRequestAttributes();
    10. attributes.setAttribute("userId","123456",0);
    11. CompletableFuture<String> completableFuture = asyncService.executeValueAsync3();
    12. String s = completableFuture.get();
    13. return s;
    14. }
    15. 复制代码

    姿势3:MDC+TaskDecorator

    自定义MDCTaskDecorator

    1. /**
    2. * 线程池修饰类
    3. */
    4. public class MDCTaskDecorator implements TaskDecorator {
    5. @Override
    6. public Runnable decorate(Runnable runnable) {
    7. // 获取主线程中的请求信息(我们的用户信息也放在里面)
    8. String userId = MDC.get("userId");
    9. Map<String, String> copyOfContextMap = MDC.getCopyOfContextMap();
    10. System.out.println(copyOfContextMap);
    11. return () -> {
    12. try {
    13. // 将主线程的请求信息,设置到子线程中
    14. MDC.put("userId",userId);
    15. // 执行子线程,这一步不要忘了
    16. runnable.run();
    17. } finally {
    18. // 线程结束,清空这些信息,否则可能造成内存泄漏
    19. MDC.clear();
    20. }
    21. };
    22. }
    23. }
    24. 复制代码

    ExecutorConfig

    在原来的基础上增加 executor.setTaskDecorator(new MDCTaskDecorator());

    1. @Bean(name = "asyncServiceExecutor")
    2. public Executor asyncServiceExecutor() {
    3. log.info("start asyncServiceExecutor----------------");
    4. //ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    5. //使用可视化运行状态的线程池
    6. ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();
    7. //配置核心线程数
    8. executor.setCorePoolSize(corePoolSize);
    9. //配置最大线程数
    10. executor.setMaxPoolSize(maxPoolSize);
    11. //配置队列大小
    12. executor.setQueueCapacity(queueCapacity);
    13. //配置线程池中的线程的名称前缀
    14. executor.setThreadNamePrefix(namePrefix);
    15. // rejection-policy:当pool已经达到max size的时候,如何处理新任务
    16. // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
    17. executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    18. //增加MDC的线程池修饰类
    19. executor.setTaskDecorator(new MDCTaskDecorator());
    20. //执行初始化
    21. executor.initialize();
    22. log.info("end asyncServiceExecutor------------");
    23. return executor;
    24. }
    25. 复制代码

    AsyncServiceImpl

    1. /**
    2. * 使用MDC获取主线程传递的数据
    3. * @return
    4. * @throws InterruptedException
    5. */
    6. @Async("asyncServiceExecutor")
    7. public CompletableFuture<String> executeValueAsync5() throws InterruptedException {
    8. log.info("start executeValueAsync");
    9. System.out.println("异步线程执行返回结果......+");
    10. log.info("end executeValueAsync");
    11. return CompletableFuture.completedFuture(MDC.get("userId"));
    12. }
    13. 复制代码

    Test2Controller

    1. /**
    2. * 使用MDC+TaskDecorator方式
    3. * 本质也是ThreadLocal+TaskDecorator方式
    4. * @return
    5. * @throws InterruptedException
    6. * @throws ExecutionException
    7. */
    8. @GetMapping("/test5")
    9. public String test5() throws InterruptedException, ExecutionException {
    10. MDC.put("userId","123456");
    11. CompletableFuture<String> completableFuture = asyncService.executeValueAsync5();
    12. String s = completableFuture.get();
    13. return s;
    14. }
    15. 复制代码

    姿势4:InheritableThreadLocal

    用户工具类 UserInheritableUtils

    1. //**
    2. *使用InheritableThreadLocal存储线程之间共享的数据变量,如登录的用户信息
    3. */
    4. public class UserInheritableUtils {
    5. private static final InheritableThreadLocal<String> userLocal=new InheritableThreadLocal<>();
    6. public static String getUserId(){
    7. return userLocal.get();
    8. }
    9. public static void setUserId(String userId){
    10. userLocal.set(userId);
    11. }
    12. public static void clear(){
    13. userLocal.remove();
    14. }
    15. }
    16. 复制代码

    AsyncServiceImpl

    1. /**
    2. * 使用InheritableThreadLocal获取主线程传递的数据
    3. * @return
    4. * @throws InterruptedException
    5. */
    6. @Async("asyncServiceExecutor")
    7. public CompletableFuture<String> executeValueAsync4() throws InterruptedException {
    8. log.info("start executeValueAsync");
    9. System.out.println("异步线程执行返回结果......+");
    10. log.info("end executeValueAsync");
    11. return CompletableFuture.completedFuture(UserInheritableUtils.getUserId());
    12. }
    13. 复制代码

    Test2Controller

    1. /**
    2. * 使用InheritableThreadLocal方式
    3. * @return
    4. * @throws InterruptedException
    5. * @throws ExecutionException
    6. */
    7. @GetMapping("/test4")
    8. public String test4(@RequestParam("userId") String userId) throws InterruptedException, ExecutionException {
    9. UserInheritableUtils.setUserId(userId);
    10. CompletableFuture<String> completableFuture = asyncService.executeValueAsync4();
    11. String s = completableFuture.get();
    12. return s;
    13. }
    14. 复制代码

    姿势5:TransmittableThreadLocal

    用户工具类 UserTransmittableUtils

    1. /**
    2. *使用TransmittableThreadLocal存储线程之间共享的数据变量,如登录的用户信息
    3. */
    4. public class UserTransmittableUtils {
    5. private static final TransmittableThreadLocal userLocal=new TransmittableThreadLocal<>();
    6. public static String getUserId(){
    7. return userLocal.get();
    8. }
    9. public static void setUserId(String userId){
    10. userLocal.set(userId);
    11. }
    12. public static void clear(){
    13. userLocal.remove();
    14. }
    15. }
    16. }
    17. 复制代码

    AsyncServiceImpl

    1. /**
    2. * 使用TransmittableThreadLocal获取主线程传递的数据
    3. * @return
    4. * @throws InterruptedException
    5. */
    6. @Async("asyncServiceExecutor")
    7. public CompletableFuture<String> executeValueAsync6() throws InterruptedException {
    8. log.info("start executeValueAsync");
    9. System.out.println("异步线程执行返回结果......+");
    10. log.info("end executeValueAsync");
    11. return CompletableFuture.completedFuture(UserTransmittableUtils.getUserId());
    12. }
    13. 复制代码

    Test2Controller

    1. /**
    2. * 使用TransmittableThreadLocal方式
    3. * @return
    4. * @throws InterruptedException
    5. * @throws ExecutionException
    6. */
    7. @GetMapping("/test6")
    8. public String test6() throws InterruptedException, ExecutionException {
    9. UserTransmittableUtils.setUserId("123456");
    10. CompletableFuture<String> completableFuture = asyncService.executeValueAsync6();
    11. String s = completableFuture.get();
    12. return s;
    13. }
    14. 复制代码

    maven依赖

    1. <dependency>
    2. <groupId>com.alibaba</groupId>
    3. <artifactId>transmittable-thread-local</artifactId>
    4. <version>2.12.1</version>
    5. </dependency>
    6. 复制代码

    方案对比

    方案1,方案2,方案3主要是借助TaskDecorator进行父子线程之间传递数据。其中MDC方案主要借鉴于MDC的日志跟踪的思想来实现,关于MDC相关的日志跟踪后续会学习分享

    方案4和方案5使用InheritableThreadLocal和TransmittableThreadLocal来实现,其中TransmittableThreadLocal是阿里InheritableThreadLocal进行优化封装。

    本人推荐使用方案5,哈哈。

    简答说一下InheritableThreadLocal

    1. public static void main(String[] args) {
    2. ThreadPoolExecutor executor = new ThreadPoolExecutor(1,1,1,
    3. TimeUnit.MINUTES,new ArrayBlockingQueue<>(1));
    4. ThreadLocal local = new InheritableThreadLocal();
    5. local.set(1);
    6. executor.execute(()->{
    7. System.out.println("打印1:"+local.get());
    8. });
    9. local.set(2);
    10. System.out.println("打印2:"+local.get());
    11. executor.execute(()->{
    12. System.out.println("打印3:"+local.get());
    13. });
    14. new Thread(new Runnable() {
    15. @Override
    16. public void run() {
    17. System.out.println("打印4:"+local.get());
    18. }
    19. }).start();
    20. }
    21. 复制代码

    运行结果如下

    1. 打印2:2
    2. 打印1:1
    3. 打印3:1
    4. 打印4:2
    5. 复制代码

    分析: 分析打印3为什么是1,InheritableThreadLocal的继承性是在new Thread创建子线程时候在构造函数内把父线程内线程变量拷贝到子线程内部的。 为了不在创建新线程耗费资源,我们一般会用线程池,线程池的线程会复用,那么线程中的ThreadLocal便不对了,可能是旧的,因为线程是旧的。

    总结

    上面的的方案你学会了么

  • 相关阅读:
    [思维]Shortest Path in GCD Graph 2022杭电多校第9场 1008
    力扣26:删除有序数组中的重复项
    关于foreach标签传值为list的拼接条件问题,得用size来判断非空
    从头开始机器学习:逻辑回归
    计算机毕业设计之交互式大学英语学习平台
    Scrum敏捷认证CSM(ScrumMaster)官方课程
    Idea工具中,使用Mapper对象有红线
    了解事件冒泡
    04-css浮动&边距
    docker docker-compose安装(centos7)
  • 原文地址:https://blog.csdn.net/m0_73311735/article/details/126890403