• 【项目实战】从零开始设计并实现一个接口异常链路分析器


    这不是马上要到1024了吗,这不得弄个什么工具给部门项目提提效😯?

    1. 背景

    在我们服务端应用当中,我们往往会要求更高的性能和更高的稳定性,但实际开发的过程中,可能会出现很多赶时间的情况(也不排除代码水平问题),那么代码就会写的比较随性🤣。

    这里我主要想探讨的是 服务的不合理调用链路 对项目造成的影响。

    首先介绍一下这里所指的 服务的不合理调用链路,它们包括但不限于以下情况:

    • 同一个接口中反复调用同一个SQL或Dubbo。
    • 接口入参没有做限制,加载大量数据,或某条SQL一次性查询出大量数据。
    • SQL查询速度慢,没有正确使用索引或出现不合理的连表。

    其实就是一些耗时操作,它们本来就比较费时间,还要被重复调用,或者是一些不合理的SQL,这很显然是可优化的。优化方法也很明显,限制参数、循环查询改成批量查询、SQL加索引等。

    所以现在想要找出那些不合理的代码,怎么找就成了问题的关键。


    PS:以下是我和我导师(wingli 的个人主页)的设计方案,与公司业务无关,所以才写成博客进行分享,各位且看且珍惜。(记得点赞关注✍️)

    2. 方案选择

    经过一上午的研究,得出以下几种方案。

    方法

    实现方式

    优点

    缺点

    总结

    静态代码分析

    可以通过idea插件的方式,写一些代码检测逻辑。找出可能存在的不合理代码,约等于一个简单的代码review。

    可以在编码期就针对性的做一些提醒,让写代码的人及时调整实现方式。

    无法做到运行时处理,无法判断一些运行期可能出现的问题,比如大对象,而且以前的老项目也顾及不到。

    应该属于是一个有用但收益相对较低的方案。

    分析APM上报的数据

    APM会默认采集sql、dubbo、http、redis等调用链路数据,然后上报到ES,我们可以去ES中获取这些数据,然后对这些数据进行分析,寻找不合理的调用链路。

    链路调用的数据比较全,而且已经普及到所有项目当中了。

    APM采样的数据方式无法兼容我们所有的需求,比如无法判断参数是否过大、是否多次重复调用相同的接口等。

    有基础内容,但不完全满足需求。

    Java agent埋点

    自己做一个采样工具,基于Java agent来实现,在需要被采集信息的方法前后插入代码,实现信息收集。

    可自定义程度高,可以完全实现需求。

    需要自己实现,有一定的工作量。

    可以实现,也能满足需求,就是时间成本会高一些。

    拦截器保留慢接口数据

    通过拦截请求,判断请求耗时,采集高耗时的接口信息。

    实现简单。

    可收集的数据少,无法满足需求。如链路中多次调用相同的SQL或者Dubbo,无法收集。

    实现简单,但不能满足需求,不如分析APM上报数据。

    人工review

    自己在写的时候多检查几遍,并且代码评审时部门成员也认真提出不合理调用。

    无需代码层面实现,没准还可以提高团队review次数。

    更加消耗时间,而且没有保障,如果没注意就错过了,也难以对以前的项目全部进行review

    需要消耗大量的人员时间,属于是减效。

    综合考虑各种方案的情况,决定通过 agent埋点 的方法来实现。

    2.1. Java agent

    这块估计很多小伙伴也不太了解,所以跟大家简单介绍一下。先来点官方的:

    Java agent 是JVM提供的一种机制,允许开发者在应用程序运行时修改或增强已加载的类和字节码。通过 Java agent,开发者可以在不修改源代码的情况下,对应用程序进行动态的修改、监控和增强。

    Java agent 的工作原理是通过在 JVM 启动时,通过命令行参数动态加载一个特殊的 jar 文件,这个 jar 文件被我们称为 Java agent。Java agent 可以用于字节码增强、性能监控、类加载和转换等方面的应用。它为开发者提供了更灵活和强大的工具,用于对 Java 应用程序进行动态修改和增强。

    乍一看,感觉有点像Spring AOP,这里讲一下它们的区别:

    Java agent 提供了更底层的字节码级别的修改和增强能力,可以在任何 Java 应用程序中使用,而 Spring AOP 是基于代理模式的框架,主要用于在 Spring 容器中对业务逻辑进行增强。

    大概就是这样了,如果你还不太清除 Java agent 到底是个什么东西的话,那你就当它是“一种在字节码上进行修改的AOP”

    更详细的Java agent教程可以去网上找找其他文章,因为我也不太懂,这里我就不做过多介绍了。

    3. 实现方案

    3.1. 项目架构

    项目架构分为:埋点层、收集层、上报层、持久层、展示层。其中前三层在agent服务当中,后两层在server服务当中,如下图所示:

    被检测的服务是使用了 Java agent 进行增强的服务,埋点层、收集层、上报层都是由agent插入到源项目代码当中的,对源代码无侵入性。

    3.2. 埋点方案

    首先我们需要对每一个请求都进行信息收集,我们把一个完整的请求称为 Transaction ,目前考虑到的请求入口包括:

    • 来自客户端的HTTP请求
    • 来自其他服务调用的Dubbo请求
    • 来自MQ发来的消息消费
    • 来自定时任务的调度事件

    其次我们需要收集的信息主要是一些可能比较耗时的操作,我们把每一个操作称为 Span ,主要包括一些涉及到IO的操作:

    • 数据库操作
    • Redis操作
    • Dubbo操作
    • MQ操作
    • 通过HTTP调用三方服务的操作
    • 复杂的计算逻辑

    我们在上述每一个入口处和每一个耗时操作前后都通过agent添加埋点,入口前后的埋点用来表示请求的开始和结束,而耗时操作前后的埋点则会获取当前堆栈中的相关数据,这样就构成了第一层【埋点层】。

    核心代码示例如下:

    1. public class BadCallDetectTransformer implements ClassFileTransformer {
    2. private static final CopyOnWriteArraySet enhancedClass = new CopyOnWriteArraySet<>();
    3. private Instrumentation inst;
    4. public BadCallDetectTransformer(Instrumentation inst) {
    5. this.inst = inst;
    6. }
    7. public byte[] transform(ClassLoader loader, String className, Class classBeingRedefined, ProtectionDomain protectionDomain, byte[] classfileBuffer) throws IllegalClassFormatException {
    8. if (className == null) return classfileBuffer;
    9. className = className.replace("/", ".");
    10. try {
    11. //transaction http
    12. if (className.equals("org.springframework.web.method.support.InvocableHandlerMethod")) {
    13. if (!enhancedClass.contains(className)) {
    14. String beforeCode = TransformerUtils.unShadeIfNecessary("shaded.com.seewo.detect.agent.helper.TransactionHelper.httpTransactionEnter($args,$0);");
    15. String afterCode = TransformerUtils.unShadeIfNecessary("shaded.com.seewo.detect.agent.helper.TransactionHelper.httpTransactionExit($args,$0,$_);");
    16. classfileBuffer = TransformerUtils.insertMethodBefore("doInvoke", loader, classfileBuffer, beforeCode);
    17. classfileBuffer = TransformerUtils.insertMethodAsFinally("doInvoke", loader, classfileBuffer, afterCode);
    18. logger.warn("enhanced class:{}", className);
    19. enhancedClass.add(className);
    20. }
    21. }
    22. //span mybatis
    23. if (className.equals("org.apache.ibatis.binding.MapperMethod")) {
    24. if (!enhancedClass.contains(className)) {
    25. //org.apache.ibatis.binding.MapperMethod,org.apache.ibatis.session.SqlSession,java.lang.Object
    26. String beforeCode = TransformerUtils.unShadeIfNecessary("shaded.com.seewo.detect.agent.helper.SpanHelper.mybatisSpanEnter($2,$0);");
    27. String afterCode = TransformerUtils.unShadeIfNecessary("shaded.com.seewo.detect.agent.helper.SpanHelper.mybatisSpanExit($2,$0,$_);");
    28. classfileBuffer = TransformerUtils.insertMethodBefore("execute", loader, classfileBuffer, beforeCode);
    29. classfileBuffer = TransformerUtils.insertMethodAsFinally("execute", loader, classfileBuffer, afterCode);
    30. logger.warn("enhanced class:{}", className);
    31. enhancedClass.add(className);
    32. }
    33. }
    34. } catch (Throwable t) {
    35. logger.error("enhance class:{} fail.", className, t);
    36. }
    37. return classfileBuffer;
    38. }
    39. }

    其中 TransactionHelper 和 SpanHelper 的核心示例代码如下:

    1. /**
    2. * transaction
    3. */
    4. public class TransactionHelper {
    5. public static void httpTransactionEnter(Object[] args, InvocableHandlerMethod invocableHandlerMethod) {
    6. String mark = "";
    7. long argLength = -1;
    8. TransactionData transactionData = null;
    9. try {
    10. Method method = invocableHandlerMethod.getMethod();
    11. Class clazz = method.getDeclaringClass();
    12. mark = clazz.getName() + "#" + method.getName();
    13. if (!Collector.shouldBeCollect(mark, TransactionTypeEnum.HTTP, true)) {
    14. return;
    15. }
    16. TransactionHttpData transactionHttpData = new TransactionHttpData();
    17. RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
    18. if (requestAttributes instanceof ServletRequestAttributes) {
    19. String uri = ((ServletRequestAttributes) requestAttributes).getRequest().getRequestURI();
    20. transactionHttpData.setUri(uri);
    21. }
    22. argLength = Utils.toJSONStringWithCatch(args).length();
    23. transactionData = transactionHttpData;
    24. } catch (Throwable t) {
    25. logger.warn("httpTransactionEnter err.", t);
    26. } finally {
    27. Collector.transactionEnter(mark, TransactionTypeEnum.HTTP, argLength, transactionData);
    28. }
    29. }
    30. public static void httpTransactionExit(Object[] args, InvocableHandlerMethod invocableHandlerMethod, Object returnObj) {
    31. String mark = "";
    32. long resultLength = -1;
    33. TransactionData transactionData = null;
    34. try {
    35. Method method = invocableHandlerMethod.getMethod();
    36. Class clazz = method.getDeclaringClass();
    37. mark = clazz.getName() + "#" + method.getName();
    38. if (!Collector.shouldBeCollect(mark, TransactionTypeEnum.HTTP, false)) {
    39. return;
    40. }
    41. resultLength = Utils.toJSONStringWithCatch(returnObj).length();
    42. } catch (Throwable t) {
    43. logger.warn("httpTransactionExit err.", t);
    44. } finally {
    45. Collector.transactionExit(mark, resultLength, transactionData);
    46. }
    47. }
    48. }
    49. /**
    50. * span
    51. */
    52. public class SpanHelper {
    53. public static void mybatisSpanEnter(Object[] args, MapperMethod mapperMethod) {
    54. String mark = "";
    55. long length = -1;
    56. SpanData spanData = null;
    57. try {
    58. Field sqlCommandField = MapperMethod.class.getDeclaredField("command");
    59. sqlCommandField.setAccessible(true);
    60. MapperMethod.SqlCommand sqlCommand = (MapperMethod.SqlCommand) sqlCommandField.get(mapperMethod);
    61. mark = sqlCommand.getName();
    62. if (!Collector.shouldBeCollect(mark, SpanTypeEnum.MYBATIS)) {
    63. return;
    64. }
    65. String sqlCommandType = sqlCommand.getType().toString();
    66. length = Utils.toJSONStringWithCatch(args).length();
    67. } catch (Throwable t) {
    68. logger.warn("mybatisSpanEnter err.", t);
    69. } finally {
    70. Collector.spanEnter(mark, SpanTypeEnum.MYBATIS, length, spanData);
    71. }
    72. }
    73. public static void mybatisSpanExit(Object[] args, MapperMethod mapperMethod, Object returnObj) {
    74. String mark = "";
    75. long length = -1;
    76. SpanData spanData = null;
    77. try {
    78. Field sqlCommandField = MapperMethod.class.getDeclaredField("command");
    79. sqlCommandField.setAccessible(true);
    80. MapperMethod.SqlCommand sqlCommand = (MapperMethod.SqlCommand) sqlCommandField.get(mapperMethod);
    81. mark = sqlCommand.getName();
    82. if (!Collector.shouldBeCollect(mark, SpanTypeEnum.MYBATIS)) {
    83. return;
    84. }
    85. length = Utils.toJSONStringWithCatch(returnObj).length();
    86. } catch (Throwable t) {
    87. logger.warn("mybatisSpanExit err.", t);
    88. } finally {
    89. Collector.spanExit(mark, length, spanData);
    90. }
    91. }
    92. }

    3.3. 收集方案

    埋点层获取到主要信息后,就调用【收集层】的方法,把这些数据添加到一个临时的缓存中,并通过线程id把前后关联的数据连接起来,以形成一个完整的调用链路,这里可以用 ThreadLocal 来充当这一层的缓存。

    收集层接口传入的主要参数应该包括:

    • 方法的类型,DB、Redis、Dubbo、MQ等
    • 方法的唯一标识,用于判断是否多次重复调用了同样的接口
    • 方法的参数长度
    • 其他需要的数据

    一个请求开启时,记录初始时间,后续每次收到一条数据,就计算一下距离上次传入数据经过了多久,然后把他们放到当前线程的List内。

    当请求结束后,计算出总耗时,同时判断该数据是否需要被上报,并不是所有数据都有上报价值,事实上,大部分的数据都是不需要上报的。

    核心代码实现:

    1. public class Collector {
    2. private static final ThreadLocal transactionThreadLocal = new ThreadLocal<>();
    3. private static final AtomicInteger atomicInteger = new AtomicInteger(0);
    4. /**
    5. * 是否需要收集
    6. */
    7. public static boolean shouldBeCollect(String mark, TransactionTypeEnum transactionTypeEnum, boolean isEnter) {
    8. if (isEnter) {
    9. // 采样
    10. return Math.random() < 0.01 && atomicInteger.get() < 100;
    11. } else {
    12. Transaction transaction = getTransaction();
    13. return transaction != null;
    14. }
    15. }
    16. /**
    17. * 是否需要收集
    18. */
    19. public static boolean shouldBeCollect(String mark, SpanTypeEnum spanTypeEnum) {
    20. Transaction transaction = getTransaction();
    21. return transaction != null;
    22. }
    23. /**
    24. * 事务开始
    25. *
    26. * @param mark 事务标记
    27. * @param transactionTypeEnum http dubbo mq ...
    28. * @param length
    29. * @param transactionData
    30. */
    31. public static void transactionEnter(String mark, TransactionTypeEnum transactionTypeEnum, long length, TransactionData transactionData) {
    32. try {
    33. // 值判定
    34. if (StringUtils.isBlank(mark) || length < 0) {
    35. return;
    36. }
    37. Transaction transaction = new Transaction();
    38. transaction.setTransactionType(transactionTypeEnum.getType());
    39. transaction.setMark(mark);
    40. transaction.setStartTime(System.currentTimeMillis());
    41. transaction.setTransactionData(transactionData);
    42. transaction.setArgLength(length);
    43. initTransaction(transaction);
    44. } catch (Throwable t) {
    45. logger.warn("transactionEnter err.");
    46. clearTransaction();
    47. }
    48. }
    49. public static void transactionExit(String mark, long length, TransactionData transactionData) {
    50. try {
    51. // 值判定
    52. if (StringUtils.isBlank(mark) || length < 0) {
    53. return;
    54. }
    55. Transaction transaction = getTransaction();
    56. if (transaction == null) return;
    57. // 计算耗时
    58. transaction.setCostTime(System.currentTimeMillis() - transaction.getStartTime());
    59. if (transaction.getCostTime() < 20) {
    60. return;
    61. }
    62. transaction.setTransactionData(transactionData);
    63. transaction.setReturnLength(length);
    64. // 整合计算数据
    65. CallStatData linkData = mergeData(transaction);
    66. // 上报数据
    67. MessageClient.add(linkData);
    68. } catch (Throwable t) {
    69. logger.warn("transactionExit err.");
    70. } finally {
    71. clearTransaction();
    72. }
    73. }
    74. /**
    75. * @param spanData sql的长度, dubbo的方法名,mq的topic
    76. */
    77. public static void spanEnter(String mark, SpanTypeEnum spanTypeEnum, long length, SpanData spanData) {
    78. try {
    79. // 值判定
    80. if (StringUtils.isBlank(mark) || length < 0) {
    81. return;
    82. }
    83. Transaction transaction = getTransaction();
    84. if (transaction == null) return;
    85. Span span = new Span();
    86. span.setMark(mark);
    87. span.setSpanType(spanTypeEnum);
    88. span.setSpanData(spanData);
    89. span.setStartTime(System.currentTimeMillis());
    90. span.setArgLength(length);
    91. transaction.getSpanList().add(span);
    92. } catch (Throwable t) {
    93. logger.warn("transactionExit err.");
    94. clearTransaction();
    95. }
    96. }
    97. public static void spanExit(String mark, long length, SpanData spanData) {
    98. try {
    99. // 值判定
    100. if (StringUtils.isBlank(mark) || length < 0) {
    101. return;
    102. }
    103. Transaction transaction = getTransaction();
    104. if (transaction == null) return;
    105. Span span = transaction.getSpanList().get(transaction.getSpanList().size() - 1);
    106. span.setCostTime(System.currentTimeMillis() - span.getStartTime());
    107. span.setReturnLength(length);
    108. } catch (Throwable t) {
    109. logger.warn("transactionExit err.");
    110. clearTransaction();
    111. }
    112. }
    113. private static void clearTransaction() {
    114. transactionThreadLocal.remove();
    115. atomicInteger.decrementAndGet();
    116. }
    117. private static void initTransaction(Transaction transaction) {
    118. transactionThreadLocal.set(transaction);
    119. atomicInteger.incrementAndGet();
    120. }
    121. private static Transaction getTransaction() {
    122. return transactionThreadLocal.get();
    123. }
    124. }

    3.3.1. 收集规则

    判断该数据是否需要被收集,我们可以根据该请求的一些指标,来判断它可能的不合理程度,再加上随机采样的一些方法,来决定是否要保留这条数据。

    暂且制定一些简单的规则:

    • 请求的执行时长超过2s,收集(耗时过长)
    • 记录每个接口的平均执行耗时,如果单个请求耗时超过平均耗时的5倍,收集(异常情况耗时过长)
    • 请求的相关参数长度超过2000,收集(可能没做接口参数限制或分页查询限制)
    • 调用链路中,出现较多的重复调用,收集(代码层面可能有优化空间)
    • 调用链路中,收集到的span超过50个,收集(代码层面有优化空间)
    • 其他正常流通的数据中,进行低频率动态采样,具体采样规则:
      • 采样基本频率为 1/100
      • 判断当前请求数压力,压力较大时降低频率
      • 判断JVM内存情况,内存不足时,降低频率
    3.3.1.1. 保险措施

    同时,为了减少服务压力极大和内存严重不足时agent对源系统的影响,我们可以在请求开启时也做一次判断,如果当前压力值过大,我们可以直接放弃对当前链路所有基础数据的保存。

    当压力极大时,系统本身就是非常不稳定的,可能所有的接口耗时都会提高非常多,这种情况下可能会导致收集器大量收集信息,从而加速服务的崩溃。

    目前测试阶段数据量还比较少,所以基本都是全量收集,这块收集过滤的代码还没实现🤣。

    3.3.2. 数据整合

    当一条 Transaction 数据确认要被收集时,我们把它进行整合,主要是对 Span 当中的重复数据进行压缩,变成一条独立的数据,这样可以节省很多空间。

    目前的话,我们考虑保留的 Transaction 数据包含:

    • 总耗时
    • 多次重复调用的span信息列表
      • span调用次数
      • span调用平均耗时
    • 入参长度
    • 出参长度
    • 请求开始时间

    整合完之后就可以发送给下一层了,同时把 ThreadLocal 中的数据也进行清除。

    核心代码:

    1. /**
    2. * Class: Collector
    3. * 整合数据
    4. */
    5. private static CallStatData mergeData(Transaction transaction) {
    6. CallStatData callStatData = new CallStatData();
    7. callStatData.setMark(transaction.getMark());
    8. callStatData.setCostTime(transaction.getCostTime());
    9. callStatData.setStartTime(transaction.getStartTime());
    10. callStatData.setTransactionType(transaction.getTransactionType());
    11. callStatData.setArgLength(transaction.getArgLength());
    12. callStatData.setReturnLength(transaction.getReturnLength());
    13. callStatData.setTransactionData(transaction.getTransactionData());
    14. SpanCallCountStat countStat = new SpanCallCountStat();
    15. HashMap statMap = new HashMap<>();
    16. // 统计调用次数和耗时、参数长度等信息
    17. for (Span span : transaction.getSpanList()) {
    18. countStat.addCallCount(span.getSpanType());
    19. // 使用 type+mark 作为key
    20. String mapKey = span.getSpanType().getType() + span.getMark();
    21. SpanStat spanStat = statMap.computeIfAbsent(mapKey, key -> {
    22. // 初始化
    23. SpanStat value = new SpanStat();
    24. value.setMark(span.getMark());
    25. value.setSpanType(span.getSpanType().getType());
    26. return value;
    27. });
    28. // 添加调用信息
    29. spanStat.addCallCount(span.getCostTime(), span.getArgLength(), span.getReturnLength());
    30. }
    31. callStatData.setCallStat(countStat);
    32. callStatData.setSpanList(new ArrayList<>(statMap.values()));
    33. return callStatData;
    34. }
    35. /**
    36. * Class: SpanStat
    37. * 添加调用次数,参数:耗时、参数长度、返回值长度
    38. */
    39. public void addCallCount(long costTime, long argLength, long returnLength) {
    40. callCount++;
    41. // 平均执行时间的增量计算公式:(当前执行时间 - 历史平均执行时间 * 当前执行次数) / 总执行次数
    42. avgCostTime += (costTime - avgCostTime) / callCount;
    43. maxCostTime = Math.max(maxCostTime, costTime);
    44. avgArgLength += (argLength - avgArgLength) / callCount;
    45. maxArgLength = Math.max(maxArgLength, argLength);
    46. avgReturnLength += (returnLength - avgReturnLength) / callCount;
    47. maxReturnLength = Math.max(maxReturnLength, returnLength);
    48. }

    注意,到这里还只是简单了做了收集和处理,并没有持久化起来。

    如果我们在agent服务中直接进行持久化,那势必会对原服务有较大的影响,不仅要求原服务提供数据源,还要求该数据源中有一张专门的表来供我们存储。

    所以我们可以考虑把这部分功能进行分离,创建一个单独的server服务,来完成持久层和展示层的操作,然后所有的agent就统一把数据上报到这个server服务里来。如此,我们就需要一个【上报层】。

    3.4. 上报方案

    上报数据的方式可以考虑:

    上报方式

    优点

    缺点

    HTTP

    兼容性好,原项目不需要添加其他依赖,用Java原生类库就可以实现。

    性能一般,且需要自己做超时处理等操作

    RPC框架

    性能比HTTP高,且有完善的框架,可以自动重试、自动熔断

    需要依赖RPC框架

    MQ

    性能高,而且可以享受到MQ的好处,即使server挂了也不影响其他服务

    需要依赖MQ

    所以性能上的优先级肯定是 MQ > RPC > HTTP,但兼容性方面 HTTP 是最好的。最佳方式肯定是三者都支持,允许服务自定义配置,但默认使用HTTP。

    目前我们先考虑实现HTTP的方式,如果整体效果不错,对项目优化有帮助,再考虑实现其他功能。

    3.4.1. HTTP上报数据

    上报数据相对来说是比较耗时的,我们可以使用 异步+批量上报 的方式来尽可能减少对业务的影响。

    当收集层把数据传给上报层的时候,我们先存储到一个临时的容器里,每隔一段时间,再单独用一个线程把这段时间内收到的数据进行批量上报。由于收集层是多线程的,所以这个临时的容器需要用线程安全集合类。

    上报数据时所开启的HTTP请求不需要等待服务端的返回,只需要发送成功就好了,如此可以更快的完成上报。

    核心代码如下:

    1. public class MessageClient {
    2. private static final LinkedBlockingDeque linkDataCache = new LinkedBlockingDeque<>(200);
    3. private static URL reportUrl;
    4. public static void add(CallStatData linkData) {
    5. if (reportUrl == null) {
    6. return;
    7. }
    8. if (linkDataCache.size() > 200) {
    9. logger.warn("链路检测服务:缓存数据过多,丢弃数据");
    10. return;
    11. }
    12. linkDataCache.add(linkData);
    13. }
    14. static {
    15. try {
    16. // url初始化
    17. initUrl();
    18. if (reportUrl != null) {
    19. // 定时任务初始化
    20. initTimer();
    21. }
    22. logger.info("链路检测服务:MessageClient初始化完成");
    23. } catch (Throwable e) {
    24. logger.error("链路检测服务:MessageClient初始化失败");
    25. e.printStackTrace();
    26. }
    27. }
    28. /**
    29. * 上报地址初始化
    30. */
    31. private static void initUrl() {
    32. // url初始化
    33. String url = System.getProperty("callstat.url");
    34. if (url == null) {
    35. String env = System.getProperty("env");
    36. if (env == null) env = System.getProperty("ENV");
    37. if (env == null) env = "fat"; // 默认测试环境
    38. env = env.toLowerCase();
    39. switch (env) {
    40. case "dev":
    41. url = "http://127.0.0.1:8077/v1/callstat";
    42. break;
    43. case "fat":
    44. default:
    45. url = "xxxxxxx/v1/callstat";
    46. break;
    47. }
    48. }
    49. try {
    50. reportUrl = new URL(url);
    51. } catch (MalformedURLException e) {
    52. logger.error("链路检测服务:上报地址初始化失败,{}", e.getMessage());
    53. }
    54. }
    55. /**
    56. * 定时任务初始化
    57. */
    58. private static void initTimer() {
    59. // 创建定时器任务
    60. TimerTask timerTask = new TimerTask() {
    61. @Override
    62. public void run() {
    63. MessageClient.start();
    64. }
    65. };
    66. ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(1);
    67. scheduledThreadPool.scheduleAtFixedRate(timerTask, 1, 2, TimeUnit.SECONDS);
    68. }
    69. private static void start() {
    70. if (linkDataCache.isEmpty()) {
    71. return;
    72. }
    73. logger.debug("链路检测服务:开始发送缓存数据,数量:{}", linkDataCache.size());
    74. // 分离缓存数据
    75. ArrayList oldRecords = new ArrayList<>(linkDataCache.size());
    76. linkDataCache.drainTo(oldRecords);
    77. // 发送数据
    78. sendHttpRequests(oldRecords);
    79. logger.debug("链路检测服务:发送数据结束");
    80. }
    81. }

    3.5. 持久化方案

    持久化是在一个独立的server服务中,用来接收所有agent发送的数据。

    3.5.1. 直接存

    最简单的做法,每一个HTTP请求进来我们都插入一次数据库,就像下面这样:

    事实上,我还有一个更好更傻逼的方案!MQ!

    3.5.2. MQ自产自销

    虽然在agent当中强依赖MQ不好,但我们可以在server服务当中依赖MQ,当server接收到数据时,就发送到MQ中,同时自己也去消费该MQ中的消息,属于是 自产自销

    这样做的好处是可以利用到mq的异步、削峰、限流,从而提高server服务的承受能力,在数据量较大的时候应该会有比较好的表现。大概长这样:

    目前我们先采用每次请求都直接落库的方案来实现。

    3.6. 展示方案

    这一步就很简单了,可以根据自己想要的样子来做,数据都已经有了,取出来处理下,然后输出出来就行,我选择使用 easypoi 来生成Excel。

    因为数据是无限,但接口是有限的,所以我们在展示的时候,也可以对每一个接口的数据进行聚合。

    最后我们统计的内容有:

    • 接口标识
    • 类型
    • 执行次数
    • 平均调用dubbo数
    • 最大调用dubbo数
    • 平均查询sql数
    • 最大查询sql数
    • 平均执行耗时
    • 最大执行耗时
    • 平均入参长度
    • 最大入参长度
    • 平均出参长度
    • 最大出参长度
    • 不合理调用情况span列表

    核心代码如下:

    1. override fun getReport(): String {
    2. val callStatPoList: List = callStatMapper.all()
    3. val callStatDtoList: MutableList = ArrayList(callStatPoList.size)
    4. for (callStatPo in callStatPoList) {
    5. callStatDtoList.add(toDto(callStatPo))
    6. }
    7. // 聚合数据
    8. val reports = mergeCallStat(callStatDtoList)
    9. // 挑出异常数据
    10. val errorReports = reports.filter {
    11. it.avgCostTime > 500 || it.maxCostTime > 2000 || it.maxArgLength > 1000 || it.maxReturnLength > 1000 || it.maxSpanCallCount > 10
    12. || it.badSpanList.any { span -> span.maxCallCount > 5 || span.avgCostTime > 200 }
    13. }.onEach {
    14. it.badSpanList = it.badSpanList.filter { span -> span.maxCallCount > 5 || span.avgCostTime > 200 }
    15. }
    16. // 生成报告
    17. val exportParams = ExportParams("服务链路报告", "服务链路报告")
    18. exportParams.type = ExcelType.XSSF
    19. exportParams.height = 20
    20. return exportExcel(exportParams, CallStatReport::class.java, errorReports)
    21. }
    22. /**
    23. * 聚合数据
    24. */
    25. private fun mergeCallStat(callStatDtoList: List<CallStatDto>): Collection {
    26. val reportMap = HashMap()
    27. for (stat in callStatDtoList) {
    28. val mapKey = stat.transactionType + stat.mark
    29. val report = reportMap.computeIfAbsent(mapKey) {
    30. CallStatReport().apply {
    31. this.mark = stat.mark
    32. this.transactionType = stat.transactionType
    33. }
    34. }
    35. report.addCallCount(stat.costTime, stat.argLength, stat.returnLength, stat.callStat.dubboCallCount, stat.callStat.sqlCallCount)
    36. report.addSpanList(stat.spanList)
    37. }
    38. return reportMap.values.onEach {
    39. it.format()
    40. }
    41. }

    3.7. 效果展示

    后续可以考虑优化下展示的样式,比如标红异常数据,更加直观的看到那些数据是异常的。

    4. 最后

    目前我们内部还在试用阶段,如果你觉得不错,可以参考我们的实现思路自己也做一个,没准也能帮你找到一些不合理的代码设计。

    虽然但是,这个项目目前还比较简陋,很多地方都不太完善,各位佬有好的建议或者意见都欢迎在评论区提出,我一定积极听取,保证不改。


    本来这是我和我导师写的项目,但是看完这篇文章,它也是你的了!

    所以

    点赞、收藏、关注!

  • 相关阅读:
    EvaluatorFilter简介说明
    【Linux-Windows】简述IPv4子网掩码网关和DNS
    Caldera安装及简单使用
    “转型做 Saas 失败后,我们归档了 5700+Star 的 GitHub 项目!”
    打印机共享设置步骤
    Java ClassNotFoundException异常解决指南
    C语言之递归
    第5章 C语言高级的库函数
    mysql8 docker部署命令
    Linux XWindow的安装和配置
  • 原文地址:https://blog.csdn.net/little_stick_i/article/details/133958638