• Go-Zero 业务开发军火库


    一、介绍

    “遇山开山,遇水架桥,千头万绪,止于一端”,这是笔者对Go-Zero的一些浅薄的认知,Go-Zero是在《晓黑板》发展和语言转型过程中工程化的最佳实践,根据遇到的通用性问题,打造的开箱即用的武器库。
    笔者也因为工作过程中的需要封装了一个开源库Go-Core所以非常能够理解Go-Zero创造出来的背景以及核心,我也挑选了几个关键词来表述这些研发理念。

    核心标签:

    • 保持简单,第一原则
    • 工具大于约定和文档
    • 对业务开发友好,封装复杂度
    • 约束做一件事只有一种方式
    • 生产品质对三方库保持谨慎

    在这里插入图片描述
    Github:https://github.com/zeromicro/go-zero/

    最新文档:https://go-zero.dev/

    历史文档【推荐其中的扩展阅读内容】:https://legacy.go-zero.dev/

    《go-zero 微服务框架的架构设计》:bilibili.com/video/BV1rD4y127PD

    二、目录结构

    笔者整理了一下Go-Zero各级目录,最最最核心的就是core目录,里面存放着大量抽象封装好的方法,在不同场景不同的业务就可以拿来就用。

    相对于其他Golang中的模块化的框架或组件,使用Go-Zero有一个最核心的不同,那就是你可以不用知道具体是怎么实现的,但是你需要有一个大致的场景和工具的映射,这个时候在遇到具体的场景时就能拿来即用和指哪打哪的效果。

    笔者为了大家能够全面的了解,对于源码框架中的目录分层做了一次阅读做了一次标注。

    .
    ├── core
    │   ├── bloom // 布隆过滤器
    │   ├── breaker // 熔断器 支持 googleBreaker
    │   ├── cmdline // cmd工具 回车确认继续执行
    │   ├── codec // 算法库 gzip hmac rsa aes
    │   ├── collection
    │   │   ├── cache.go // 内存缓存:自动失效、大小限制、命中率、并发安全、缓存击穿
    │   │   ├── fifo.go // FIFO queue   先进先出队列 FIFO queue
    │   │   ├── rollingwindow.go // 滑动窗口
    │   │   └── timingwheel.go // 时间轮:基于list双向链表实现
    │   ├── conf // 配置文件 支持json、yaml、yml 支持环境变量替换
    │   ├── contextx // 上下文封装
    │   ├── discov // 服务注册发现 基于etcd
    │   │   ├── internal 
    │   │   └── kubernetes // k8s 基础yaml
    │   ├── errorx // 基于 atomic 原子性的error,遍历判断errors
    │   ├── executors // 任务池:最大任务数、最大字节数、手动执行 clickhouse批量插入实践(重启清空数据存储在内存)
    │   ├── filex // 文件操作库
    │   ├── fs 
    │   ├── fx // 流处理 线性编程到流程编程,如Kafka日志转换清洗过滤去重
    │   ├── hash // md5 Md5Hex 
    │   ├── iox // io操作封装
    │   ├── jsontype // 支持 mogo的bson
    │   ├── jsonx // json库封装
    │   ├── lang
    │   ├── limit
    │   │   ├── periodlimit.go // 滑动窗口限流:基于redis-incrby、redis-lua保障原子性
    │   │   └── toekenlimit.go // 令牌桶限流:redis-lua保障原子性
    │   ├── load
    │   ├── logx // 日志库:文件输出or制台输出、耗时输出、压缩分片和trace库绑定
    │   ├── mapping // 映射 yaml to json等
    │   ├── mathx // 数学计算包
    │   ├── metric // 基于基于 github.com/prometheus/client_golang 封装
    │   ├── mr // MapReduce并发处理工具 
    │   ├── naming
    │   ├── netx // 网络包
    │   ├── proc
    │   ├── prof
    │   ├── prometheus // 基于 github.com/prometheus/client_golang 封装
    │   ├── queue // 队列抽象定义
    │   ├── rescue // Recover:通用异常捕获打印log,支持方法注入
    	│   ├── search // 。。。。。。
    	│   ├── service // 服务定义:服务吗、环境名、Prometheus、trace
    │   ├── stat // 运行态的各类统计 资源 响应 消耗
    │   ├── stores
    │   │   ├── builder
    │   │   ├── cache // 缓存抽象
    │   │   ├── clickhouse
    │   │   ├── kv // 对redis操作的各种封装
    │   │   ├── mongo // 基于 github.com/globalsign/mgo
    │   │   ├── mongoc // 基于 github.com/ClickHouse/clickhouse-go  提供 sqlx底层引擎
    │   │   ├── postgres // 基于 github.com/lib/pq 提供 sqlx底层引擎
    │   │   ├── redis // redis:包含分布式锁redisLockLua 脚本保证原子性(基于go-redis/redis/v8)
    │   │   ├── sqlc // 数据库主键缓存、数据库唯一主键缓存、数据库共享调用
    │   │   └── sqlx // mysql:api封装非ORM框架、sql逆向生成status、支持dtm分布式事务集成(基于go-sql-driver/mysql)
    │   ├── stringx // 字符串处理工具集:关键词替换、敏感词过滤
    │   ├── syncx // 包含各类原子操作和并发操作控制
    │   │   ├── 。。。。。。
    │   │   └── singleflight.go // 共享调用:多次调用复用结果
    │   ├── sysx
    │   │   ├── automaxprocs.go // 设置process基于go.uber.org/automaxprocs/maxprocs 识别container分配的cpu数
    	│   │   └── host.go // 获取hostname 如果无法获取将随机,仅通过os函数查询一次并进行变量复制
    │   ├── threading
    │   │   ├── routinegroup.go // 并发任务:对sync.WaitGroup
    │   │   ├── routines.go // 并发任务RunSafe方法的实现 支持Recover
    │   │   ├── taskrunner.go // 异步并发任务:基于channel实现
    │   │   └── workergroup.go // 任务组:一个任务指定执行次数
    │   ├── timex // 时间处理包
    │   ├── trace // 链路追踪:支持Jaeger和Zipkin
    │   └── utils
    │   │   ├── times.go // 封装timex ElapsedTimer计时器
    │   │   ├── uuid.go // 基于 github.com/google/uuid
    │   │   └── version.go // 字符串版本对比 
    ├── rest // http 网络库 兼容 net/http 包含JWT和httpc、支持自动降载减少RT时间、支持Prometheus-api
    ├── tools
    │   └── goctl // 脚手架 提升工程初始化效率
    └── zrpc // grpc 封装、支持etcd服务发现、负载均衡、拦截器、支持自动降载减少RT时间、支持Prometheus-api
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78

    PS:分布式任务调度基于 github.com/zeromicro/go-queue 外部依赖Beanstalks or kafka、redis setnx保证了每条消息只被消费一次,分布式任务调度还有也不少选择比如借助kubernetes或者xx-job都是一种方式,只是选择的道路不一样。

    三、例子🌰

    笔者条选择在Go-Zero中几个有代表性的使用方式

    FX 流处理

    大家可以回忆一下再写脚本时如果简单还好如果遇到复杂的脚本200行以上是起步,虽然自己写起来还好但是时间一长就会出现不可读逻辑不清晰的问题(还有这个变量是干嘛的? 网上划2页恍然大悟这个是从XX表中查询的基础数据~~),通过FX组件就能通过将线性逻辑套路化,阶段式编写数据统计处理逻辑。

    package main
    
    import (
        "fmt"
        "os"
        "os/signal"
        "syscall"
        "time"
    
        "github.com/zeromicro/go-zero/core/fx"
    )
    
    func main() {
        ch := make(chan int)
    
        go inputStream(ch)
        go outputStream(ch)
    
        c := make(chan os.Signal, 1)
        signal.Notify(c, syscall.SIGTERM, syscall.SIGINT)
        <-c
    }
    
    func inputStream(ch chan int) {
        count := 0
        for {
            ch <- count
            time.Sleep(time.Millisecond * 500)
            count++
        }
    }
    
    func outputStream(ch chan int) {
        fx.From(func(source chan<- interface{}) {
            for c := range ch {
                source <- c
            }
        }).Walk(func(item interface{}, pipe chan<- interface{}) {
            count := item.(int)
            pipe <- count
        }).Filter(func(item interface{}) bool {
            itemInt := item.(int)
            if itemInt%2 == 0 {
                return true
            }
            return false
        }).ForEach(func(item interface{}) {
            fmt.Println(item)
        })
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    在这里插入图片描述

    MapReduce 并发处理

    并发处理必须考虑的功能点:

    • error处理,如果出现error报错怎么拿到对应的报错内容(errgroup也能实现同样的效果,但是需要g.Wait()能少一步是一补)
    • 异常终止,如果出现任何一个error报错,应该立马获得结果的返回
    • goroutine 池,不要过渡创建goroutine会带来内存回收压力容易导致内存溢出

    用法一:仅仅当并发处理

    func productDetail(uid, pid int64) (*ProductDetail, error) {
        var pd ProductDetail
        err := mr.Finish(func() (err error) {
            pd.User, err = userRpc.User(uid)
            return
        }, func() (err error) {
            pd.Store, err = storeRpc.Store(pid)
            return
        }, func() (err error) {
            pd.Order, err = orderRpc.Order(pid)
            return
        })
    
        if err != nil {
            log.Printf("product detail error: %v", err)
            return nil, err
        }
    
        return &pd, nil
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    用法二:分阶段对数据进行处理
    相比FX流处理,使用这种方式更适合用于接口中数据量不大的数据,FX更加适合交流或者是队列消费的后续数据处理

    func checkLegal(uids []int64) ([]int64, error) {
        r, err := mr.MapReduce(func(source chan<- interface{}) {
            for _, uid := range uids {
                source <- uid
            }
        }, func(item interface{}, writer mr.Writer, cancel func(error)) {
            uid := item.(int64)
            ok, err := check(uid)
            if err != nil {
                cancel(err)
            }
            if ok {
                writer.Write(uid)
            }
        }, func(pipe <-chan interface{}, writer mr.Writer, cancel func(error)) {
            var uids []int64
            for p := range pipe {
                uids = append(uids, p.(int64))
            }
            writer.Write(uids)
        })
        if err != nil {
            log.Printf("check error: %v", err)
            return nil, err
        }
    
        return r.([]int64), nil
    }
    
    func check(uid int64) (bool, error) {
        // do something check user legal
        return true, nil
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    redis 分布式锁

    • ex seconds :设置key过期时间,单位s
    • px milliseconds :设置key过期时间,单位毫秒
    • nx:key不存在时,设置key的值
    • xx:key存在时,才会去设置key的值
    redisLockKey := fmt.Sprintf("%v%v", redisTpl, headId)
    // 1. New redislock
    redisLock := redis.NewRedisLock(redisConn, redisLockKey)
    // 2. 可选操作,设置 redislock 过期时间
    redisLock.SetExpire(redisLockExpireSeconds)
    if ok, err := redisLock.Acquire(); !ok || err != nil {
      return nil, errors.New("当前有其他用户正在进行操作,请稍后重试")
    }
    defer func() {
      recover()
      // 3. 释放锁
      redisLock.Release()
    }()
    
    基于lua
    lockCommand = `if redis.call("GET", KEYS[1]) == ARGV[1] then
        redis.call("SET", KEYS[1], ARGV[1], "PX", ARGV[2])
        return "OK"
    else
        return redis.call("SET", KEYS[1], ARGV[1], "NX", "PX", ARGV[2])
    end`
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    内存缓存

    • 缓存失效
    • 失效防止并发穿透
    • 内存超载
    // 初始化 cache,其中 WithLimit 可以指定最大缓存的数量
    c, err := collection.NewCache(time.Minute, collection.WithLimit(10000))
    if err != nil {
      panic(err)
    }
    
    // 设置缓存
    c.Set("key", user)
    
    // 获取缓存,ok:是否存在
    v, ok := c.Get("key")
    
    // 删除缓存
    c.Del("key")
    
    // 获取缓存,如果 key 不存在的,则会调用 func 去生成缓存
    // 基于 singleflight.group 控制并发
    v, err := c.Take("key", func() (interface{}, error) {
      return user, nil
    })
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
  • 相关阅读:
    SpringBoot集成swagger
    【kubernetes的三种网络】
    你掉进过新技术的“大坑”吗?
    CPP emplace_bake 和 push_back 的相同和区别
    thinkphp5 注入 反序列化写文件 phar反序列化
    数据库系统原理【练习题】——第一章:概述
    如何制作CSR(Certificate Signing Request)文件?
    就只说 3 个 Java 面试题 —— 02
    最大似然估计的介绍
    Python基础笔记持续记录
  • 原文地址:https://blog.csdn.net/u011142688/article/details/125468535