• Golang实战:利用Atomic和轮询机制实现任务排队和并发流量控制


    在一次开发大模型应用的工程化过程中,我们碰到一个问题,开源的模型核心代码是用Python写的,有自己的一套并发管理和排队机制,而模型一次只能处理一个生成任务,生成的时间也很长,在A10上,需要几秒钟到几十秒处理一个请求,就会导致在Python的锁上排队的其他请求被不断的阻塞。

    因为团队的主要开发语言是Golang,我们使用Golang开发了一个调度程序,大模型生成任务的请求先先提交到Golang服务,经过排队和流量控制,再转发到运行在本机的Python的程序处理。

    图片

    专门为每个部署的Python应用实例,配对启动一个Golang调度程序,相当于在不修改Python源代码的情况下,配置了一套Agent,可以实现对Python程序的扩展,独立的实现很多功能,例如运行统计、数据上报、状态检查、还有请求和响应的转发处理,可以为架构带来更大的灵活性,还可以配合服务端的调度程序和配置,控制客户端的行为。

    以上是为什么采用这种方案的架构思考,下面的文章,重点介绍在客户端Agent的实现过程中,通过Atomic采用等机制,实现了任务排队和无锁化可控的流量控制机制。

    实现思路

    1. 使用atomic包:通过atomic包实现对共享变量的原子操作,避免数据竞争。

    2. 轮询机制:通过轮询方式检查并处理队列中的请求,确保每次只处理一个任务。

    3. 超时机制:在轮询过程中计时,如果一个任务等待时间过长,直接返回错误,避免任务堆积。

    4. 队列管理:维护一个队列,记录当前排队的任务,如果队列满了,拒绝新请求

    示例代码

    1. package main
    2. import (
    3. "context"
    4. "fmt"
    5. "net/http"
    6. "sync"
    7. "sync/atomic"
    8. "time"
    9. "github.com/google/uuid"
    10. )
    11. const (
    12. DispatchCode_Success = iota + 1
    13. DispatchCode_TooManyRequest
    14. DispatchCode_WaitTooLong
    15. DispatchCode_Restarting
    16. )
    17. var (
    18. dispatchList []string
    19. lock sync.RWMutex
    20. counter int32
    21. lastRunTime int64
    22. runStatus int32
    23. )
    24. type dispatchFunc func(ctx context.Context, code int)
    25. func Recover(cleanups ...func()) {
    26. for _, cleanup := range cleanups {
    27. cleanup()
    28. }
    29. if p := recover(); p != nil {
    30. // log.Println(p)
    31. }
    32. }
    33. func runFunc(ctx context.Context, fc dispatchFunc, code int) {
    34. defer Recover()
    35. fc(ctx, code)
    36. }
    37. func addPendingTask(ctx context.Context, dispatchId string) {
    38. lock.Lock()
    39. defer lock.Unlock()
    40. dispatchList = append(
    41. dispatchList, dispatchId)
    42. }
    43. func checkPendingTask(ctx context.Context, dispatchId string) bool {
    44. lock.RLock()
    45. defer lock.RUnlock()
    46. if len(dispatchList) <= 0 {
    47. return true
    48. }
    49. if dispatchId == dispatchList[0] {
    50. return true
    51. }
    52. return false
    53. }
    54. func removePendingTask(ctx context.Context, dispatchId string) {
    55. lock.Lock()
    56. defer lock.Unlock()
    57. if len(dispatchList) <= 0 {
    58. return
    59. }
    60. for i, id := range dispatchList {
    61. if id == dispatchId {
    62. dispatchList = append(dispatchList[:i], dispatchList[i+1:]...)
    63. return
    64. }
    65. }
    66. }
    67. func dispatch(ctx context.Context, fc dispatchFunc) {
    68. if atomic.LoadInt32(&counter) > 3 {
    69. runFunc(ctx, fc, DispatchCode_TooManyRequest)
    70. return
    71. }
    72. requestStartAt := time.Now()
    73. dispatchId := uuid.New().String()
    74. addPendingTask(ctx, dispatchId)
    75. atomic.AddInt32(&counter, 1)
    76. defer atomic.AddInt32(&counter, -1)
    77. i := 0
    78. for {
    79. if time.Now().Sub(requestStartAt) > time.Minute*5 {
    80. removePendingTask(ctx, dispatchId)
    81. runFunc(ctx, fc, DispatchCode_WaitTooLong)
    82. return
    83. }
    84. if lastRunAt := atomic.LoadInt64(&lastRunTime); lastRunAt > 0 && time.Now().Sub(time.UnixMilli(lastRunAt)) > time.Minute*5 {
    85. removePendingTask(ctx, dispatchId)
    86. runFunc(ctx, fc, DispatchCode_WaitTooLong)
    87. return
    88. }
    89. if checkPendingTask(ctx, dispatchId) && atomic.LoadInt32(&runStatus) == 0 {
    90. break
    91. }
    92. time.Sleep(time.Millisecond * 100 * time.Duration(1+i%3))
    93. i++
    94. }
    95. removePendingTask(ctx, dispatchId)
    96. defer atomic.StoreInt32(&runStatus, 0)
    97. atomic.StoreInt32(&runStatus, 1)
    98. defer atomic.StoreInt64(&lastRunTime, 0)
    99. atomic.StoreInt64(&lastRunTime, time.Now().UnixMilli())
    100. runFunc(ctx, fc, DispatchCode_Success)
    101. }
    102. func requestHandler(w http.ResponseWriter, r *http.Request) {
    103. defer Recover()
    104. ctx := r.Context()
    105. // requestAt := time.Now()
    106. dispatch(ctx, func(ctx context.Context, code int) {
    107. // startAt := time.Now()
    108. if code == DispatchCode_TooManyRequest {
    109. w.WriteHeader(http.StatusTooManyRequests)
    110. _, _ = w.Write([]byte(`{"error": "too_many_requests", "code": 429}`))
    111. return
    112. } else if code == DispatchCode_WaitTooLong {
    113. w.WriteHeader(http.StatusRequestTimeout)
    114. _, _ = w.Write([]byte(`{"error": "request_timeout", "code": 408}`))
    115. return
    116. }
    117. result, err := processRequest(ctx)
    118. // endAt := time.Now()
    119. if err != nil {
    120. w.WriteHeader(http.StatusBadRequest)
    121. return
    122. }
    123. w.Header().Set("Content-Type", "application/json")
    124. _, _ = w.Write(result)
    125. })
    126. }
    127. func processRequest(ctx context.Context) ([]byte, error) {
    128. // 模拟处理请求的耗时操作
    129. fmt.Println("Processing request")
    130. time.Sleep(5 * time.Second)
    131. fmt.Println("Request processed")
    132. return nil, nil
    133. }
    134. func main() {
    135. http.HandleFunc("/", requestHandler)
    136. fmt.Println("Server started at :8080")
    137. if err := http.ListenAndServe(":8080", nil); err != nil {
    138. fmt.Println("Server failed:", err)
    139. }
    140. }

    回顾和总结

    我们所做的工作基于这样一个事实:processRequest的执行,同时只能由一个请求调用,否则所有请求都会被阻塞挂起,然后按照FIFO顺序,逐个处理完,如果中间需要释放被挂起的请求,唯一的方法是重启进程。

    最初的实现版本中,Agent的职能是数据采集、子进程管理和状态检查,辅助服务端的调度服务器,动态的选择执行大模型生成任务的节点。因此,我们采用了简单加锁的方法,在调用processRequest之前,要先加锁,然后执行完之后释放锁。这种方法在一段时间内运行得很平稳。

    然而,直到一次突发的请求高峰打破了这种平衡,所有可以调度的GPU服务器都堆积了大量的请求,因为锁的存在,堆积的请求都被关在一个无法编程访问到的队列中,程序层面什么也做不了,只能等待已经被阻塞的任务,全部按顺序执行完。就算新增加了机器,或者出现某个特别长的任务阻塞的情况,也无法针对后续的任务优化响应时间。

    为了解决这个问题,我们认识到问题在于锁是阻塞的,被阻塞的线程直到锁释放之前,是无法进行任何操作的;优化的思想,源于操作系统的实现中造就存在的思想。

    就是引入轮询和等待机制,将“死锁”变成“活锁”,如果请求来到的时候,队列中已经有太多的请求,那么可以直接拒绝再处理更多请求。

    程序的核心是轮询和等待,也就是for循环:

    1. 检查等待时间:首先检查是否等待时间过长,如果是,则直接报错,由调度服务将请求转发到其他节点。

    2. 任务执行条件:检查是否可以执行当前任务,如果当前没有请求在处理且任务在队列最前面,就立即执行当前任务。

    3. 轮询等待:如果无法执行,sleep一段时间再继续检查。Sleep的作用类似于“活锁”。

    实现这个程序的过程,在某种程度上,验证了我之前提出的一个观点。在Golang的并发模型中,父协程通过哪些机制可以跟子协程通信,终止子协程的运行。

    当时提出的假设是,子协程要能够在执行过程中被终止,必须要有一个前提条件,就是子协程在运行的过程中,有可编程的指令切换,在指令切换的过程中,增加一个检查指令,判断符合某种程度,就跳出后续指令的执行。

    如果子协程中有一个耗时很长,且无法通信的函数调用,那么父协程其实什么也做不了。就像被阻塞在某个锁上一样,程序就失去了对执行过程的控制。

    Golang实战:利用Atomic和轮询机制实现任务排队和并发流量控制icon-default.png?t=N7T8https://mp.weixin.qq.com/s/4muEJWyuV--XALJz8JemYw

  • 相关阅读:
    TDengine 3.0 三大创新详解
    药智网数据库介绍
    【人工智能Ⅰ】7-KNN & 决策树
    JAVA毕业设计vue校园菜鸟驿站管理系统计算机源码+lw文档+系统+调试部署+数据库
    环境配置、如何安装OpenHarmony HAR
    Java版分布式微服务云开发架构 Spring Cloud+Spring Boot+Mybatis 电子招标采购系统功能清单
    Element-ui el-table 使用 SortableJS 实现表格拖拽
    Sping高级(源码)--- 1.6Aware接口
    C++系列十:C++函数
    神经网络如何提高准确率,神经网络的求解方式
  • 原文地址:https://blog.csdn.net/liuwill/article/details/140983299