在微服务中开发中,api网关扮演对外提供restful api的角色,而api的数据往往会依赖其他服务,复杂的api更是会依赖多个服务。
虽然单个被依赖服务的耗时一般都比较低,但如果多个服务串行依赖的话那么整个api的耗时将会大大增加。那么通过什么手段来优化呢?我们首先想到的是通过并发来的方式来处理依赖,这样就能降低整个依赖的耗时,Go基础库中为我们提供了 WaitGroup 工具用来进行并发控制,但实际业务场景中多个依赖如果有一个出错我们期望能立即返回而不是等所有依赖都执行完再返回结果,而且WaitGroup中对变量的赋值往往需要加锁,每个依赖函数都需要添加Add和Done对于新手来说比较容易出错
基于以上的背景,go-zero框架中为我们提供了并发处理工具MapReduce,该工具开箱即用。
MapReduce是Google提出的一个软件架构,用于大规模数据集的并行运算,go-zero中的MapReduce工具正是借鉴了这种架构思想
go-zero框架中的MapReduce工具主要用来对批量数据进行并发的处理,以此来提升服务的性能
- type (
- // GenerateFunc is used to let callers send elements into source.
- GenerateFunc func(source chan<- interface{})
- )
-
- type (
- // MapFunc is used to do element processing and write the output to writer.
- MapFunc func(item interface{}, writer Writer)
- // MapperFunc is used to do element processing and write the output to writer,
- // use cancel func to cancel the processing.
- MapperFunc func(item interface{}, writer Writer, cancel func(error))
- )
-
- type (
- // ReducerFunc is used to reduce all the mapping output and write to writer,
- // use cancel func to cancel the processing.
- ReducerFunc func(pipe <-chan interface{}, writer Writer, cancel func(error))
- // VoidReducerFunc is used to reduce all the mapping output, but no output.
- // Use cancel func to cancel the processing.
- VoidReducerFunc func(pipe <-chan interface{}, cancel func(error))
- )
buildSource方法通过执行我们自定义generate方法产生数据,并返回无缓冲的channel。
mapper从该channel中读取数据
- func buildSource(generate GenerateFunc, panicChan *onceChan) chan interface{} {
- source := make(chan interface{})
- go func() {
- defer func() {
- if r := recover(); r != nil {
- panicChan.write(r)
- }
- close(source)
- }()
-
- generate(source)
- }()
-
- return source
- }
主线程中调用executeMappers方法,executeMappers消费generate生产的数据,每一个item都会起一个goroutine单独处理,并将处理结果写入 collector。
mapper默认最大并发数为16,可以通过WithWorkers进行设置。
-
- func executeMappers(mCtx mapperContext) {
- var wg sync.WaitGroup
- defer func() {
- wg.Wait()
- close(mCtx.collector)
- drain(mCtx.source)
- }()
-
- var failed int32
- pool := make(chan lang.PlaceholderType, mCtx.workers)
- writer := newGuardedWriter(mCtx.ctx, mCtx.collector, mCtx.doneChan)
- for atomic.LoadInt32(&failed) == 0 {
- select {
- case <-mCtx.ctx.Done():
- return
- case <-mCtx.doneChan:
- return
- case pool <- lang.Placeholder:
- item, ok := <-mCtx.source
- if !ok {
- <-pool
- return
- }
-
- wg.Add(1)
- go func() {
- defer func() {
- if r := recover(); r != nil {
- atomic.AddInt32(&failed, 1)
- mCtx.panicChan.write(r)
- }
- wg.Done()
- <-pool
- }()
-
- mCtx.mapper(item, writer)
- }()
- }
- }
- }
reducer单线程消费collctor channel中数据。并将最终结果写入到output channel 返回。
-
-
- finish := func() {
- closeOnce.Do(func() {
- close(done)
- close(output)
- })
- }
- // if done is closed, all mappers and reducer should stop processing
- done := make(chan lang.PlaceholderType)
- go func() {
- defer func() {
- drain(collector)
- if r := recover(); r != nil {
- panicChan.write(r)
- }
- finish()
- }()
-
- reducer(collector, writer, cancel)
- }()
-
- // MapReduceChan maps all elements from source, and reduce the output elements with given reducer.
- func mapReduceWithPanicChan(source <-chan interface{}, panicChan *onceChan, mapper MapperFunc,
- reducer ReducerFunc, opts ...Option) (interface{}, error) {
- options := buildOptions(opts...)
- // output is used to write the final result
- output := make(chan interface{})
- defer func() {
- // reducer can only write once, if more, panic
- for range output {
- panic("more than one element written in reducer")
- }
- }()
-
- // collector is used to collect data from mapper, and consume in reducer
- collector := make(chan interface{}, options.workers)
- // if done is closed, all mappers and reducer should stop processing
- done := make(chan lang.PlaceholderType)
- writer := newGuardedWriter(options.ctx, output, done)
- var closeOnce sync.Once
- // use atomic.Value to avoid data race
- var retErr errorx.AtomicError
- finish := func() {
- closeOnce.Do(func() {
- close(done)
- close(output)
- })
- }
- cancel := once(func(err error) {
- if err != nil {
- retErr.Set(err)
- } else {
- retErr.Set(ErrCancelWithNil)
- }
-
- drain(source)
- finish()
- })
-
- go func() {
- defer func() {
- drain(collector)
- if r := recover(); r != nil {
- panicChan.write(r)
- }
- finish()
- }()
-
- reducer(collector, writer, cancel)
- }()
-
- go executeMappers(mapperContext{
- ctx: options.ctx,
- mapper: func(item interface{}, w Writer) {
- mapper(item, w, cancel)
- },
- source: source,
- panicChan: panicChan,
- collector: collector,
- doneChan: done,
- workers: options.workers,
- })
-
- select {
- case <-options.ctx.Done():
- cancel(context.DeadlineExceeded)
- return nil, context.DeadlineExceeded
- case v := <-panicChan.channel:
- panic(v)
- case v, ok := <-output:
- if err := retErr.Load(); err != nil {
- return nil, err
- } else if ok {
- return v, nil
- } else {
- return nil, ErrReduceNoOutput
- }
- }
- }
mapreduce提供的方法:
流程图: