在《在线代码级性能剖析,补全分布式追踪的最后一块“短板”》中有提到再复杂的业务逻辑,都是基于线程去进行执行,那skywalking怎样利用方法栈快照进行代码级性能剖析的,出于好奇心,一起来debug看看其中的奥妙



接收页面请求,通过ProfileTaskMutationService#createTask将任务存入ES中,索引名为:profile_task-*(profile_task-20220807)
public ProfileTaskCreationResult createTask(final String serviceId,
final String endpointName,
final long monitorStartTime,
final int monitorDuration,
final int minDurationThreshold,
final int dumpPeriod,
final int maxSamplingCount) throws IOException {
// check data
final String errorMessage = checkDataSuccess(
serviceId, endpointName, taskStartTime, taskEndTime, monitorDuration, minDurationThreshold, dumpPeriod,
maxSamplingCount
);
if (errorMessage != null) {
return ProfileTaskCreationResult.builder().errorReason(errorMessage).build();
}
// create task
final long createTime = System.currentTimeMillis();
final ProfileTaskRecord task = new ProfileTaskRecord();
task.setServiceId(serviceId);
task.setEndpointName(endpointName.trim());
task.setStartTime(taskStartTime);
task.setDuration(monitorDuration);
task.setMinDurationThreshold(minDurationThreshold);
task.setDumpPeriod(dumpPeriod);
task.setCreateTime(createTime);
task.setMaxSamplingCount(maxSamplingCount);
task.setTimeBucket(TimeBucket.getMinuteTimeBucket(taskStartTime));
NoneStreamProcessor.getInstance().in(task);
return ProfileTaskCreationResult.builder().id(task.id()).build();
}
CacheUpdateTimer#updateProfileTask更新profileTask缓存:ProfileTaskCache$profileTaskDownstreamCache
public void run() {
if (status == GRPCChannelStatus.CONNECTED) {
try {
ProfileTaskCommandQuery.Builder builder = ProfileTaskCommandQuery.newBuilder();
// sniffer info
builder.setService(Config.Agent.SERVICE_NAME).setServiceInstance(Config.Agent.INSTANCE_NAME);
// last command create time
builder.setLastCommandTime(ServiceManager.INSTANCE.findService(ProfileTaskExecutionService.class)
.getLastCommandCreateTime());
// 发起ProfileTaskCommandQuery请求
Commands commands = profileTaskBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS)
.getProfileTaskCommands(builder.build());
// 处理响应
ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);
} catch (Throwable t) {
}
}
}
public void getProfileTaskCommands(ProfileTaskCommandQuery request, StreamObserver responseObserver) {
// query profile task list by service id
final String serviceId = IDManager.ServiceID.buildId(request.getService(), true);
final String serviceInstanceId = IDManager.ServiceInstanceID.buildId(serviceId, request.getServiceInstance());
// 从缓存中取出对应服务的任务
final List profileTaskList = profileTaskCache.getProfileTaskList(serviceId);
if (CollectionUtils.isEmpty(profileTaskList)) {
responseObserver.onNext(Commands.newBuilder().build());
responseObserver.onCompleted();
return;
}
// build command list
final Commands.Builder commandsBuilder = Commands.newBuilder();
final long lastCommandTime = request.getLastCommandTime();
for (ProfileTask profileTask : profileTaskList) {
// if command create time less than last command time, means sniffer already have task
if (profileTask.getCreateTime() <= lastCommandTime) {
continue;
}
// record profile task log -->索引名为:sw_profile_task_log-20220808
recordProfileTaskLog(profileTask, serviceInstanceId, ProfileTaskLogOperationType.NOTIFIED);
// add command -->将ProfileTask转换为ProfileTaskCommand返回
commandsBuilder.addCommands(commandService.newProfileTaskCommand(profileTask).serialize().build());
}
responseObserver.onNext(commandsBuilder.build());
responseObserver.onCompleted();
}
public void receiveCommand(Commands commands) {
for (Command command : commands.getCommandsList()) {
try {
BaseCommand baseCommand = CommandDeserializer.deserialize(command);
boolean success = this.commands.offer(baseCommand);
} catch (UnsupportedCommandException e) {
}
}
}
public void run() {
final CommandExecutorService commandExecutorService = ServiceManager.INSTANCE.findService(CommandExecutorService.class);
while (isRunning) {
try {
// 取出commands队列的任务
BaseCommand command = commands.take();
if (isCommandExecuted(command)) {
continue;
}
commandExecutorService.execute(command);
serialNumberCache.add(command.getSerialNumber());
} catch (CommandExecutionException e) {
LOGGER.error(e, "Failed to execute command[{}].", e.command().getCommand());
} catch (Throwable e) {
LOGGER.error(e, "There is unexpected exception");
}
}
}
public void addProfileTask(ProfileTask task) {
// update last command create time
if (task.getCreateTime() > lastCommandCreateTime) {
lastCommandCreateTime = task.getCreateTime();
}
// check profile task limit
final CheckResult dataError = checkProfileTaskSuccess(task);
if (!dataError.isSuccess()) {
LOGGER.warn(
"check command error, cannot process this profile task. reason: {}", dataError.getErrorReason());
return;
}
// add task to list
profileTaskList.add(task);
// schedule to start task
long timeToProcessMills = task.getStartTime() - System.currentTimeMillis();
PROFILE_TASK_SCHEDULE.schedule(() -> processProfileTask(task), timeToProcessMills, TimeUnit.MILLISECONDS);
}
private void profiling(ProfileTaskExecutionContext executionContext) throws InterruptedException {
// 监控间隔 ->10ms
int maxSleepPeriod = executionContext.getTask().getThreadDumpPeriod();
// run loop when current thread still running
long currentLoopStartTime = -1;
// 循环
while (!Thread.currentThread().isInterrupted()) {
currentLoopStartTime = System.currentTimeMillis();
// each all slot 采集插槽
AtomicReferenceArray profilers = executionContext.threadProfilerSlots();
int profilerCount = profilers.length();
for (int slot = 0; slot < profilerCount; slot++) {
ThreadProfiler currentProfiler = profilers.get(slot);
if (currentProfiler == null) {
continue;
}
switch (currentProfiler.profilingStatus().get()) {
case PENDING:
// check tracing context running time
currentProfiler.startProfilingIfNeed();
break;
case PROFILING:
// dump stack
TracingThreadSnapshot snapshot = currentProfiler.buildSnapshot();
if (snapshot != null) {
profileTaskChannelService.addProfilingSnapshot(snapshot);
} else {
// tell execution context current tracing thread dump failed, stop it
executionContext.stopTracingProfile(currentProfiler.tracingContext());
}
break;
}
}
// sleep to next period
// if out of period, sleep one period
long needToSleep = (currentLoopStartTime + maxSleepPeriod) - System.currentTimeMillis();
needToSleep = needToSleep > 0 ? needToSleep : maxSleepPeriod;
Thread.sleep(needToSleep);
}
}

public ProfileStatusReference attemptProfiling(TracingContext tracingContext,
String traceSegmentId,
String firstSpanOPName) {
........
final ThreadProfiler threadProfiler = new ThreadProfiler(
tracingContext, traceSegmentId, Thread.currentThread(), this);
int slotLength = profilingSegmentSlots.length();
for (int slot = 0; slot < slotLength; slot++) {
if (profilingSegmentSlots.compareAndSet(slot, null, threadProfiler)) {
return threadProfiler.profilingStatus();
}
}
}


public void boot() {
.......
sendSnapshotFuture = Executors.newSingleThreadScheduledExecutor(
new DefaultNamedThreadFactory("ProfileSendSnapshotService")
).scheduleWithFixedDelay(
new RunnableWithExceptionProtection(
() -> {
List buffer = new ArrayList<>(Config.Profile.SNAPSHOT_TRANSPORT_BUFFER_SIZE);
//从快照队列snapshotQueue取出快照
snapshotQueue.drainTo(buffer);
if (!buffer.isEmpty()) {
sender.send(buffer);
}
},
t -> LOGGER.error("Profile segment snapshot upload failure.", t)
), 0, 500, TimeUnit.MILLISECONDS
);
........
}
通过本篇文章可以知道UI创建任务 --> agent获取任务 --> agent上报线程快照的整个流程,了解skywalking在其中使用大量的异步变成技巧,后续继续挖掘学习。