• 分布式链路追踪如何跨线程


    背景

    我们希望实现全链路信息,但是代码中一般都会异步的线程处理

    解决思路

    我们可以对以前的 Runable 和 Callable 进行增强。

    可以使用 ali 已经存在的实现方式。

    TransmittableThreadLocal (TTL) 解决异步执行时上下文传递的问题

    核心的实现思路如下:

    1)异步执行前,把当前线程的 MDC 信息放入执行对象中。

    2)异步执行时,把执行对象中的信息放入 MDC 等信息。

    3) 异步执行后,清空执行对象。

    问题

    Runable 和 Callable 只是接口,没有额外信息,所以需要进行增强。

    实现方式

    接口定义

    1. package com.github.houbb.heaven.support.concurrent.context;
    2. import java.util.Map;
    3. /**
    4. * 跨线程处理类
    5. *
    6. * @since 0.3.0
    7. */
    8. public interface CrossThreadProcessor {
    9. /**
    10. * 初始化上下文
    11. * @param contextMap 上下文
    12. */
    13. void initContext(Map<String, Object> contextMap);
    14. /**
    15. * 执行之前
    16. * @param contextMap 上下文
    17. */
    18. void beforeExecute(Map<String, Object> contextMap);
    19. /**
    20. * 执行之后
    21. * @param contextMap 上下文
    22. */
    23. void afterExecute(Map<String, Object> contextMap);
    24. }

    对可执行接口进行增强

    1. package com.github.houbb.heaven.support.concurrent.context;
    2. import com.github.houbb.heaven.util.lang.SpiUtil;
    3. import java.util.ArrayList;
    4. import java.util.HashMap;
    5. import java.util.List;
    6. import java.util.Map;
    7. import java.util.concurrent.Callable;
    8. /**
    9. * 跨线程处理
    10. *
    11. * 作用:用来跨线程处理传递信息,比如 async,线程池等。
    12. *
    13. * 比如在 aop 中,直接处理。
    14. *
    15. *
    16. * Object[] args = point.args();
    17. * Object arg0 = args[0];
    18. *
    19. * // 直接转换为当前的对象
    20. * if(arg0 instanceOf Runnable) {
    21. * args[0] = new CrossThreadWrapper((Runnable)arg0);
    22. * } else if(arg0 instanceOf Callable) {
    23. * args[0] = new CrossThreadWrapper((Callable)arg0);
    24. * }
    25. *
    26. * // 继续处理
    27. *
  • * @param 泛型
  • * @since 0.3.0
  • */
  • public class CrossThreadWrapper<T> implements Runnable, Callable<T> {
  • private Runnable runnable;
  • private Callable<T> callable;
  • /**
  • * 通过 spi 获取所有的实现类
  • */
  • private static List<CrossThreadProcessor> processorList = new ArrayList<>();
  • /**
  • * 上下文
  • */
  • private final Map<String, Object> context = new HashMap<>();
  • static {
  • processorList = SpiUtil.getClassImplList(CrossThreadProcessor.class);
  • }
  • public CrossThreadWrapper(Runnable runnable) {
  • // 任务执行之前
  • this.initContext();
  • this.runnable = runnable;
  • }
  • public CrossThreadWrapper(Callable<T> callable) {
  • this.initContext();
  • this.callable = callable;
  • }
  • @Override
  • public void run() {
  • try {
  • beforeExecute();
  • this.runnable.run();
  • } finally {
  • afterExecute();
  • }
  • }
  • @Override
  • public T call() throws Exception {
  • try {
  • beforeExecute();
  • return this.callable.call();
  • } finally {
  • afterExecute();
  • }
  • }
  • /**
  • * 初始化上下文
  • */
  • protected void initContext() {
  • for(CrossThreadProcessor processor : processorList) {
  • processor.initContext(context);
  • }
  • }
  • /**
  • * 执行前
  • */
  • protected void beforeExecute() {
  • for(CrossThreadProcessor processor : processorList) {
  • processor.beforeExecute(context);
  • }
  • }
  • /**
  • * 执行之后
  • */
  • protected void afterExecute() {
  • for(CrossThreadProcessor processor : processorList) {
  • processor.afterExecute(context);
  • }
  • }
  • }
  • 用法

    实现接口

    我们只需要实现 CrossThreadProcessor 接口。

    然后 spi 中配置,服务会自动发现。

    aop

    可以在 spring aop 中,对以前的方法执行进行增强。

  • 相关阅读:
    【scikit-learn基础】--『监督学习』之 层次聚类
    【Linux】Linux操作系统
    进入mysql命令行之后,怎么退出
    工业设备状态监测中的声发射技术应用
    阿里云邮件推送配置教程:有哪些关键步骤?
    搜索与图论-树与图的广度优先遍历
    高精度电流源的应用领域有哪些
    C++11—线程库
    Mysql 45讲学习笔记(二十七)主库出问题了
    845. 八数码
  • 原文地址:https://blog.csdn.net/leread/article/details/133828779