• Skywalking系列学习之Trace Profiling源码分析


    前言

      在《在线代码级性能剖析,补全分布式追踪的最后一块“短板”》中有提到再复杂的业务逻辑,都是基于线程去进行执行,那skywalking怎样利用方法栈快照进行代码级性能剖析的,出于好奇心,一起来debug看看其中的奥妙

    demo演示

    1. 打开skywalking UI,点击新建Trace Profiling任务
      在这里插入图片描述
    2. 配置Trace Profiling任务 在这里插入图片描述
    3. 查看堆栈信息
      在这里插入图片描述

    源码分析

    UI创建任务

    1. 接收页面请求,通过ProfileTaskMutationService#createTask将任务存入ES中,索引名为:profile_task-*(profile_task-20220807)

       public ProfileTaskCreationResult createTask(final String serviceId,
                                                  final String endpointName,
                                                  final long monitorStartTime,
                                                  final int monitorDuration,
                                                  final int minDurationThreshold,
                                                  final int dumpPeriod,
                                                  final int maxSamplingCount) throws IOException {
       	 // check data
          final String errorMessage = checkDataSuccess(
              serviceId, endpointName, taskStartTime, taskEndTime, monitorDuration, minDurationThreshold, dumpPeriod,
              maxSamplingCount
          );
          if (errorMessage != null) {
              return ProfileTaskCreationResult.builder().errorReason(errorMessage).build();
          }
      	
      	// create task
          final long createTime = System.currentTimeMillis();
          final ProfileTaskRecord task = new ProfileTaskRecord();
          task.setServiceId(serviceId);
          task.setEndpointName(endpointName.trim());
          task.setStartTime(taskStartTime);
          task.setDuration(monitorDuration);
          task.setMinDurationThreshold(minDurationThreshold);
          task.setDumpPeriod(dumpPeriod);
          task.setCreateTime(createTime);
          task.setMaxSamplingCount(maxSamplingCount);
          task.setTimeBucket(TimeBucket.getMinuteTimeBucket(taskStartTime));
          NoneStreamProcessor.getInstance().in(task);
      
          return ProfileTaskCreationResult.builder().id(task.id()).build();
       }                                        
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
    2. CacheUpdateTimer#updateProfileTask更新profileTask缓存:ProfileTaskCache$profileTaskDownstreamCache

    agent发起ProfileTaskCommandQuery请求

    1. agent通过ProfileTaskChannelService发起ProfileTaskCommandQuery请求
      public void run() {
          if (status == GRPCChannelStatus.CONNECTED) {
              try {
                  ProfileTaskCommandQuery.Builder builder = ProfileTaskCommandQuery.newBuilder();
      
                  // sniffer info
                  builder.setService(Config.Agent.SERVICE_NAME).setServiceInstance(Config.Agent.INSTANCE_NAME);
      
                  // last command create time
                  builder.setLastCommandTime(ServiceManager.INSTANCE.findService(ProfileTaskExecutionService.class)
                                                                    .getLastCommandCreateTime());
      			
      			// 发起ProfileTaskCommandQuery请求
                  Commands commands = profileTaskBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS)
                                                             .getProfileTaskCommands(builder.build());
      			// 处理响应
                  ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);
              } catch (Throwable t) {
              }
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
    2. 服务端通过ProfileTaskServiceHandler#getProfileTaskCommands接收ProfileTaskCommandQuery请求
       public void getProfileTaskCommands(ProfileTaskCommandQuery request, StreamObserver responseObserver) {
          // query profile task list by service id
          final String serviceId = IDManager.ServiceID.buildId(request.getService(), true);
          final String serviceInstanceId = IDManager.ServiceInstanceID.buildId(serviceId, request.getServiceInstance());
          // 从缓存中取出对应服务的任务
          final List profileTaskList = profileTaskCache.getProfileTaskList(serviceId);
          if (CollectionUtils.isEmpty(profileTaskList)) {
              responseObserver.onNext(Commands.newBuilder().build());
              responseObserver.onCompleted();
              return;
          }
      
          // build command list
          final Commands.Builder commandsBuilder = Commands.newBuilder();
          final long lastCommandTime = request.getLastCommandTime();
      
          for (ProfileTask profileTask : profileTaskList) {
              // if command create time less than last command time, means sniffer already have task
              if (profileTask.getCreateTime() <= lastCommandTime) {
                  continue;
              }
      
              // record profile task log -->索引名为:sw_profile_task_log-20220808
              recordProfileTaskLog(profileTask, serviceInstanceId, ProfileTaskLogOperationType.NOTIFIED);
      
              // add command -->将ProfileTask转换为ProfileTaskCommand返回
              commandsBuilder.addCommands(commandService.newProfileTaskCommand(profileTask).serialize().build());
          }
      
          responseObserver.onNext(commandsBuilder.build());
          responseObserver.onCompleted();
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
    3. agent通过CommandService#receiveCommand处理ProfileTaskCommand返回,放入阻塞队列commands中
      public void receiveCommand(Commands commands) {
          for (Command command : commands.getCommandsList()) {
              try {
                  BaseCommand baseCommand = CommandDeserializer.deserialize(command);
                  boolean success = this.commands.offer(baseCommand);
              } catch (UnsupportedCommandException e) {
              }
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9

    agent异步处理ProfileTaskCommand

    1. CommandService线程循环检测commands队列的任务,交给不同command执行器去执行对应的任务
      public void run() {
         	final CommandExecutorService commandExecutorService = ServiceManager.INSTANCE.findService(CommandExecutorService.class);
      
         	while (isRunning) {
              try {
              	// 取出commands队列的任务
                  BaseCommand command = commands.take();
      
                  if (isCommandExecuted(command)) {
                      continue;
                  }
      
                  commandExecutorService.execute(command);
                  serialNumberCache.add(command.getSerialNumber());
              } catch (CommandExecutionException e) {
                  LOGGER.error(e, "Failed to execute command[{}].", e.command().getCommand());
              } catch (Throwable e) {
                  LOGGER.error(e, "There is unexpected exception");
              }
         	 }
       }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
    2. ProfileTaskCommandExecutor#execute将ProfileTaskCommand转换为ProfileTask
    3. ProfileTaskExecutionService#addProfileTask处启动定时任务处理ProfileTask
      public void addProfileTask(ProfileTask task) {
          // update last command create time
          if (task.getCreateTime() > lastCommandCreateTime) {
              lastCommandCreateTime = task.getCreateTime();
          }
      
          // check profile task limit
          final CheckResult dataError = checkProfileTaskSuccess(task);
          if (!dataError.isSuccess()) {
              LOGGER.warn(
                  "check command error, cannot process this profile task. reason: {}", dataError.getErrorReason());
              return;
          }
      
          // add task to list
          profileTaskList.add(task);
      
          // schedule to start task
          long timeToProcessMills = task.getStartTime() - System.currentTimeMillis();
          PROFILE_TASK_SCHEDULE.schedule(() -> processProfileTask(task), timeToProcessMills, TimeUnit.MILLISECONDS);
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
    4. ProfileTaskExecutionService#processProfileTask新建ProfileThread线程丢入线程池中,得到其返回profilingFuture(方便后面关闭)

    ProfileThread开始profiling

    1. ProfileThread线程循环处理ProfileTaskExecutionContext的profilingSegmentSlots(profilingSegmentSlots什么时候插入呢?–>下文有答案)
    2. 通过Thread#getStackTrace获取线程栈,将其转换为线程快照TracingThreadSnapshot
    3. 将线程快照TracingThreadSnapshot放入快照队列snapshotQueue中
        private void profiling(ProfileTaskExecutionContext executionContext) throws InterruptedException {
    		
    		// 监控间隔 ->10ms
            int maxSleepPeriod = executionContext.getTask().getThreadDumpPeriod();
    
            // run loop when current thread still running
            long currentLoopStartTime = -1;
            // 循环
            while (!Thread.currentThread().isInterrupted()) {
                currentLoopStartTime = System.currentTimeMillis();
    
                // each all slot 采集插槽
                AtomicReferenceArray profilers = executionContext.threadProfilerSlots();
                int profilerCount = profilers.length();
                for (int slot = 0; slot < profilerCount; slot++) {
                    ThreadProfiler currentProfiler = profilers.get(slot);
                    if (currentProfiler == null) {
                        continue;
                    }
    
                    switch (currentProfiler.profilingStatus().get()) {
    
                        case PENDING:
                            // check tracing context running time
                            currentProfiler.startProfilingIfNeed();
                            break;
    
                        case PROFILING:
                            // dump stack
                            TracingThreadSnapshot snapshot = currentProfiler.buildSnapshot();
                            if (snapshot != null) {
                                profileTaskChannelService.addProfilingSnapshot(snapshot);
                            } else {
                                // tell execution context current tracing thread dump failed, stop it
                                executionContext.stopTracingProfile(currentProfiler.tracingContext());
                            }
                            break;
    
                    }
                }
    
                // sleep to next period
                // if out of period, sleep one period
                long needToSleep = (currentLoopStartTime + maxSleepPeriod) - System.currentTimeMillis();
                needToSleep = needToSleep > 0 ? needToSleep : maxSleepPeriod;
                Thread.sleep(needToSleep);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    profilingSegmentSlots什么时候插入呢?

    1. 在agent拦截入口方法前(譬如tomcat),初始化TracingContext会插入slot到profilingSegmentSlots(通过Thread.currentThread()获取线程栈信息)
      在这里插入图片描述
       public ProfileStatusReference attemptProfiling(TracingContext tracingContext,
                                                     String traceSegmentId,
                                                     String firstSpanOPName) {
      	........
      	final ThreadProfiler threadProfiler = new ThreadProfiler(
              tracingContext, traceSegmentId, Thread.currentThread(), this);
          int slotLength = profilingSegmentSlots.length();
          for (int slot = 0; slot < slotLength; slot++) {
              if (profilingSegmentSlots.compareAndSet(slot, null, threadProfiler)) {
                  return threadProfiler.profilingStatus();
              }
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
    2. 在agent拦截入口方法后(譬如tomcat),将之前插入slot重置为null 在这里插入图片描述
      在这里插入图片描述

    agent将线程快照异步发送给Server端

    1. ProfileTaskChannelService在boot时会启动500ms的定时任务,从快照队列snapshotQueue取出快照放入缓存中,批量发送给server端
      public void boot() {
      	.......
      	sendSnapshotFuture = Executors.newSingleThreadScheduledExecutor(
                  new DefaultNamedThreadFactory("ProfileSendSnapshotService")
              ).scheduleWithFixedDelay(
                  new RunnableWithExceptionProtection(
                      () -> {
                          List buffer = new ArrayList<>(Config.Profile.SNAPSHOT_TRANSPORT_BUFFER_SIZE);
                          //从快照队列snapshotQueue取出快照
                          snapshotQueue.drainTo(buffer);
                          if (!buffer.isEmpty()) {
                              sender.send(buffer);
                          }
                      },
                      t -> LOGGER.error("Profile segment snapshot upload failure.", t)
                  ), 0, 500, TimeUnit.MILLISECONDS
              );
           ........         
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
    2. ProfileSnapshotSender#send将TracingThreadSnapshot转换为ThreadSnapshot发送给server

    束语

      通过本篇文章可以知道UI创建任务 --> agent获取任务 --> agent上报线程快照的整个流程,了解skywalking在其中使用大量的异步变成技巧,后续继续挖掘学习。

  • 相关阅读:
    25 Python的collections模块
    近世代数——Part2 群:基础与子群
    MySQL——索引
    安全功能测试
    Spring5应用之JDK动态代理
    基于微信小程序的语言课学习系统设计与实现(源码+lw+部署文档+讲解等)
    Unity 3D 基础——Coroutine 协同程序
    Python语法--列表(类似数组)
    vue3基础流程
    阿里巴巴中国站获得淘口令真实url API 返回值说明
  • 原文地址:https://blog.csdn.net/weixin_40803011/article/details/126208133