• go-zero map reduce的代码实现


    项目地址:GitHub - zeromicro/go-zero: A cloud-native Go microservices framework with cli tool for productivity.

    一 背景

    在微服务中开发中,api网关扮演对外提供restful api的角色,而api的数据往往会依赖其他服务,复杂的api更是会依赖多个服务。
    虽然单个被依赖服务的耗时一般都比较低,但如果多个服务串行依赖的话那么整个api的耗时将会大大增加。

    那么通过什么手段来优化呢?我们首先想到的是通过并发来的方式来处理依赖,这样就能降低整个依赖的耗时,Go基础库中为我们提供了 WaitGroup 工具用来进行并发控制,但实际业务场景中多个依赖如果有一个出错我们期望能立即返回而不是等所有依赖都执行完再返回结果,而且WaitGroup中对变量的赋值往往需要加锁,每个依赖函数都需要添加Add和Done对于新手来说比较容易出错

    基于以上的背景,go-zero框架中为我们提供了并发处理工具MapReduce,该工具开箱即用。

    MapReduce是Google提出的一个软件架构,用于大规模数据集的并行运算,go-zero中的MapReduce工具正是借鉴了这种架构思想

    go-zero框架中的MapReduce工具主要用来对批量数据进行并发的处理,以此来提升服务的性能
     

    二 核心代码

    2.1 基础概念

    • generate  生产数据
       
      1. type (
      2. // GenerateFunc is used to let callers send elements into source.
      3. GenerateFunc func(source chan<- interface{})
      4. )
    • mapper  对generate生产的数据进行处理
      1. type (
      2. // MapFunc is used to do element processing and write the output to writer.
      3. MapFunc func(item interface{}, writer Writer)
      4. // MapperFunc is used to do element processing and write the output to writer,
      5. // use cancel func to cancel the processing.
      6. MapperFunc func(item interface{}, writer Writer, cancel func(error))
      7. )
    • reducer 对mapper处理后的数据做聚合返回
       
      1. type (
      2. // ReducerFunc is used to reduce all the mapping output and write to writer,
      3. // use cancel func to cancel the processing.
      4. ReducerFunc func(pipe <-chan interface{}, writer Writer, cancel func(error))
      5. // VoidReducerFunc is used to reduce all the mapping output, but no output.
      6. // Use cancel func to cancel the processing.
      7. VoidReducerFunc func(pipe <-chan interface{}, cancel func(error))
      8. )

       
    • souece channel
      无缓冲的channel,用于 generte和mapper通信。
      generate生产的数据会写入source channel,mapper 则读取source channle的数据进行处理。
       
    • collector channel
      有缓冲区的channe,缓冲区的长度是option.worker的数量。
      mapper处理完成后的数据写入collector channel,reduce读取collector数据进行处理。
       
    • output channel
      无缓冲区channel ,用于记录reducer处理后最终数据。

    2.2 代码

     2.2.1 buildSource


    buildSource方法通过执行我们自定义generate方法产生数据,并返回无缓冲的channel。
    mapper从该channel中读取数据

    1. func buildSource(generate GenerateFunc, panicChan *onceChan) chan interface{} {
    2. source := make(chan interface{})
    3. go func() {
    4. defer func() {
    5. if r := recover(); r != nil {
    6. panicChan.write(r)
    7. }
    8. close(source)
    9. }()
    10. generate(source)
    11. }()
    12. return source
    13. }

     2.2.2 executeMappers

    主线程中调用executeMappers方法,executeMappers消费generate生产的数据,每一个item都会起一个goroutine单独处理,并将处理结果写入 collector。
    mapper默认最大并发数为16,可以通过WithWorkers进行设置。

    1. func executeMappers(mCtx mapperContext) {
    2. var wg sync.WaitGroup
    3. defer func() {
    4. wg.Wait()
    5. close(mCtx.collector)
    6. drain(mCtx.source)
    7. }()
    8. var failed int32
    9. pool := make(chan lang.PlaceholderType, mCtx.workers)
    10. writer := newGuardedWriter(mCtx.ctx, mCtx.collector, mCtx.doneChan)
    11. for atomic.LoadInt32(&failed) == 0 {
    12. select {
    13. case <-mCtx.ctx.Done():
    14. return
    15. case <-mCtx.doneChan:
    16. return
    17. case pool <- lang.Placeholder:
    18. item, ok := <-mCtx.source
    19. if !ok {
    20. <-pool
    21. return
    22. }
    23. wg.Add(1)
    24. go func() {
    25. defer func() {
    26. if r := recover(); r != nil {
    27. atomic.AddInt32(&failed, 1)
    28. mCtx.panicChan.write(r)
    29. }
    30. wg.Done()
    31. <-pool
    32. }()
    33. mCtx.mapper(item, writer)
    34. }()
    35. }
    36. }
    37. }

    2.2.3 reducer

    reducer单线程消费collctor channel中数据。并将最终结果写入到output channel 返回。

    1. finish := func() {
    2. closeOnce.Do(func() {
    3. close(done)
    4. close(output)
    5. })
    6. }
    7. // if done is closed, all mappers and reducer should stop processing
    8. done := make(chan lang.PlaceholderType)
    9. go func() {
    10. defer func() {
    11. drain(collector)
    12. if r := recover(); r != nil {
    13. panicChan.write(r)
    14. }
    15. finish()
    16. }()
    17. reducer(collector, writer, cancel)
    18. }()

    2.2.4主线程

    1. // MapReduceChan maps all elements from source, and reduce the output elements with given reducer.
    2. func mapReduceWithPanicChan(source <-chan interface{}, panicChan *onceChan, mapper MapperFunc,
    3. reducer ReducerFunc, opts ...Option) (interface{}, error) {
    4. options := buildOptions(opts...)
    5. // output is used to write the final result
    6. output := make(chan interface{})
    7. defer func() {
    8. // reducer can only write once, if more, panic
    9. for range output {
    10. panic("more than one element written in reducer")
    11. }
    12. }()
    13. // collector is used to collect data from mapper, and consume in reducer
    14. collector := make(chan interface{}, options.workers)
    15. // if done is closed, all mappers and reducer should stop processing
    16. done := make(chan lang.PlaceholderType)
    17. writer := newGuardedWriter(options.ctx, output, done)
    18. var closeOnce sync.Once
    19. // use atomic.Value to avoid data race
    20. var retErr errorx.AtomicError
    21. finish := func() {
    22. closeOnce.Do(func() {
    23. close(done)
    24. close(output)
    25. })
    26. }
    27. cancel := once(func(err error) {
    28. if err != nil {
    29. retErr.Set(err)
    30. } else {
    31. retErr.Set(ErrCancelWithNil)
    32. }
    33. drain(source)
    34. finish()
    35. })
    36. go func() {
    37. defer func() {
    38. drain(collector)
    39. if r := recover(); r != nil {
    40. panicChan.write(r)
    41. }
    42. finish()
    43. }()
    44. reducer(collector, writer, cancel)
    45. }()
    46. go executeMappers(mapperContext{
    47. ctx: options.ctx,
    48. mapper: func(item interface{}, w Writer) {
    49. mapper(item, w, cancel)
    50. },
    51. source: source,
    52. panicChan: panicChan,
    53. collector: collector,
    54. doneChan: done,
    55. workers: options.workers,
    56. })
    57. select {
    58. case <-options.ctx.Done():
    59. cancel(context.DeadlineExceeded)
    60. return nil, context.DeadlineExceeded
    61. case v := <-panicChan.channel:
    62. panic(v)
    63. case v, ok := <-output:
    64. if err := retErr.Load(); err != nil {
    65. return nil, err
    66. } else if ok {
    67. return v, nil
    68. } else {
    69. return nil, ErrReduceNoOutput
    70. }
    71. }
    72. }


    三 使用示例

    mapreduce提供的方法:

    • func Finish(fns ...func() error) error
      处理固定数量的依赖,返回error,有一个error立即返回
       
    • func FinishVoid(fns ...func())
      Finish方法功能类似,没有错误返回值
       
    • func ForEach(generate GenerateFunc, mapper ForEachFunc, opts ...Option)
       
    • func MapReduce(generate GenerateFunc, mapper MapperFunc, reducer ReducerFunc, opts ...Option) (interface{}, error)

       
    • func MapReduceChan(source <-chan interface{}, mapper MapperFunc, reducer ReducerFunc,opts ...Option) (interface{}, error)

       
    • func MapReduceVoid(generate GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error



       

    流程图:

  • 相关阅读:
    ubuntu 22.04 深度学习环境配置
    数字人几十秒画的画卖了17万,真人画家羡慕哭了
    矩阵分解方法(主要是非负矩阵分解)--学习笔记
    嵌入式数据库sqlite3子句和函数的使用基础(06)
    ubuntu 22.04 安装python-pcl
    排序算法--快速排序
    高级CSS属性实现的瀑布流的三种方法
    c语言:矩阵交换
    【小程序项目开发--京东商城】uni-app之自定义搜索组件(上)-- 组件UI
    Go命令大全:全面解析与实践
  • 原文地址:https://blog.csdn.net/qq_16399991/article/details/125730714