• golang Goroutine超时控制


    1.个人理解 

    1. package main
    2. import (
    3. "context"
    4. "fmt"
    5. "runtime"
    6. "time"
    7. )
    8. func main() {
    9. // 为了方便查看设置的计数器
    10. //go func() {
    11. // var o int64
    12. // for {
    13. // o++
    14. // fmt.Println(o)
    15. // time.Sleep(time.Second)
    16. // }
    17. //}()
    18. // 开启协程
    19. for i := 0; i < 100; i++ {
    20. go func(i int) {
    21. // 利用context 设置超时上下文
    22. ctx, cancel := context.WithTimeout(context.TODO(), 2*time.Second)
    23. // 主动退出信号
    24. endDone := make(chan struct{})
    25. // 再次开启子协程异步处理业务逻辑
    26. go func() {
    27. select {
    28. // 监听是否超时
    29. case <-ctx.Done():
    30. fmt.Println("Goroutine timeout")
    31. return
    32. // 处理业务逻辑
    33. default:
    34. if i == 1 {
    35. time.Sleep(10 * time.Second)
    36. }
    37. // 此处代码会继续执行
    38. //fmt.Println("代码逻辑继续执行")
    39. // 主动退出
    40. close(endDone)
    41. return
    42. }
    43. }()
    44. // 监听父协程状态
    45. select {
    46. // 超时退出父协程,这里需要注意此时如果子协程已经执行并超时,子协程会继续执行中直到关闭,这块需要关注下。比如:查看数据确定数据是否已被修改等。
    47. case <-ctx.Done():
    48. fmt.Println("超时退出", i)
    49. cancel()
    50. return
    51. // 主动关闭
    52. case <-endDone:
    53. fmt.Println("主动退出", i)
    54. cancel()
    55. return
    56. }
    57. }(i)
    58. }
    59. //time.Sleep(8 * time.Second)
    60. time.Sleep(12 * time.Second)
    61. // 查看当前还存在多少运行中的goroutine
    62. fmt.Println("number of goroutines:", runtime.NumGoroutine())
    63. }

    2.go-zero实现方式

    1. package main
    2. import (
    3. "context"
    4. "fmt"
    5. "runtime/debug"
    6. "strings"
    7. "time"
    8. )
    9. var (
    10. // ErrCanceled是取消上下文时返回的错误。
    11. ErrCanceled = context.Canceled
    12. // ErrTimeout是当上下文的截止日期过去时返回的错误。
    13. ErrTimeout = context.DeadlineExceeded
    14. )
    15. // DoOption定义了自定义DoWithTimeout调用的方法。
    16. type DoOption func() context.Context
    17. // DoWithTimeout运行带有超时控制的fn。
    18. func DoWithTimeout(fn func() error, timeout time.Duration, opts ...DoOption) error {
    19. parentCtx := context.Background()
    20. for _, opt := range opts {
    21. parentCtx = opt()
    22. }
    23. ctx, cancel := context.WithTimeout(parentCtx, timeout)
    24. defer cancel()
    25. // 创建缓冲区大小为1的通道以避免goroutine泄漏
    26. done := make(chan error, 1)
    27. panicChan := make(chan interface{}, 1)
    28. go func() {
    29. defer func() {
    30. if p := recover(); p != nil {
    31. // 附加调用堆栈以避免在不同的goroutine中丢失
    32. panicChan <- fmt.Sprintf("%+v\n\n%s", p, strings.TrimSpace(string(debug.Stack())))
    33. }
    34. }()
    35. done <- fn()
    36. }()
    37. select {
    38. case p := <-panicChan:
    39. panic(p)
    40. case err := <-done:
    41. return err
    42. case <-ctx.Done():
    43. return ctx.Err()
    44. }
    45. }
    46. // WithContext使用给定的ctx自定义DoWithTimeout调用。
    47. func WithContext(ctx context.Context) DoOption {
    48. return func() context.Context {
    49. return ctx
    50. }
    51. }
    52. func main() {
    53. ctx, cancel := context.WithCancel(context.Background())
    54. go func() {
    55. fmt.Println(1111)
    56. time.Sleep(time.Second * 5)
    57. fmt.Println(2222)
    58. cancel()
    59. }()
    60. err := DoWithTimeout(func() error {
    61. fmt.Println("aaaa")
    62. time.Sleep(10 * time.Second)
    63. fmt.Println("bbbb")
    64. return nil
    65. }, 3*time.Second, WithContext(ctx))
    66. fmt.Println(err)
    67. time.Sleep(15 * time.Second)
    68. //err := DoWithTimeout(func() error {
    69. // fmt.Println(111)
    70. // time.Sleep(time.Second * 3)
    71. // fmt.Println(222)
    72. // return nil
    73. //}, time.Second*2)
    74. //
    75. //fmt.Println(err)
    76. //time.Sleep(6 * time.Second)
    77. //
    78. //fmt.Println("number of goroutines:", runtime.NumGoroutine())
    79. }
    1. package fx
    2. import (
    3. "context"
    4. "testing"
    5. "time"
    6. "github.com/stretchr/testify/assert"
    7. )
    8. func TestWithPanic(t *testing.T) {
    9. assert.Panics(t, func() {
    10. _ = DoWithTimeout(func() error {
    11. panic("hello")
    12. }, time.Millisecond*50)
    13. })
    14. }
    15. func TestWithTimeout(t *testing.T) {
    16. assert.Equal(t, ErrTimeout, DoWithTimeout(func() error {
    17. time.Sleep(time.Millisecond * 50)
    18. return nil
    19. }, time.Millisecond))
    20. }
    21. func TestWithoutTimeout(t *testing.T) {
    22. assert.Nil(t, DoWithTimeout(func() error {
    23. return nil
    24. }, time.Millisecond*50))
    25. }
    26. func TestWithCancel(t *testing.T) {
    27. ctx, cancel := context.WithCancel(context.Background())
    28. go func() {
    29. time.Sleep(time.Millisecond * 10)
    30. cancel()
    31. }()
    32. err := DoWithTimeout(func() error {
    33. time.Sleep(time.Minute)
    34. return nil
    35. }, time.Second, WithContext(ctx))
    36. assert.Equal(t, ErrCanceled, err)
    37. }

    参考文献:

     https://github.com/zeromicro/go-zero/blob/master/core/fx/timeout.go

     一文搞懂 Go 超时控制_51CTO博客_go 超时处理

  • 相关阅读:
    智慧工地源代码 SaaS模式云平台
    [Docker Java 服务]Docker 容器中Java服务问题排查
    接口抓包分析与mock
    JD(按关键字搜索商品)API接口
    vue-vuex
    Maven的profiles多环境配置
    《Windows内核安全编程》第三章 串口的过滤
    LabVIEW在OPC中使用基金会现场总线
    查询硬盘序列号、物理地址及对应批处理命令
    React组件复用
  • 原文地址:https://blog.csdn.net/Yuuuuuubs/article/details/132733796