• 打造千万级流量秒杀第二十课 etcd 实战:如何使用 etcd 存储配置信息?


    欢迎来到多级缓存实战模块!

    首先请你思考这样一个业务场景:一个有 10 个节点的集群,当你修改了某一项配置后,你希望能立即同步给这 10 个节点,并更新到各节点的缓存中,该怎么做?

    一种方法是你获取到这 10 个节点的地址信息,然后逐个调用接口同步给它们。但这会带来新的问题:你如何准确获取这 10 个节点的地址信息,又如何将配置稳定可靠地同步给它们?如果集群部署到多个可用区,且是用容器部署的,你拿到的地址不一定是最新的、完全可用的地址,最后很可能因部分节点同步失败而导致节点数据不一致。

    另一种方法是让这 10 个节点自己来取。那怎么让它们知道配置变更了呢?它们又该去哪儿获取最新配置呢?

    把这个问题抽象下就是:A 系统修改了数据,需要即时同步给 B、C、D 系统的所有节点。显然,这是非常经典的数据同步问题。想要解决,我们可以引入一个分布式系统协调器,如 etcd来处理。

    在秒杀系统中,我们使用 etcd 存储集群信息和活动信息,以此解决分布式系统中的数据同步问题。除此之外,在多级缓存中我们还有一种为 Redis 缓存可用性兜底的手段。具体是怎么做的呢?

    etcd 初始化

    在使用 etcd 存储集群信息和活动信息前,我们需要先对 etcd 进行初始化。具体来说,在 infrastructure/stores/etcd 目录下的 etcd.go 中,我实现了 Init 和 GetClient 这两个函数。其中,在 Init 函数里,获取 etcd 相关的配置,并初始化 etcd 客户端,用 sync.Once 确保只初始化一次;在 GetClient 函数里,返回 Init 函数中初始化好的客户端。具体代码如下:

    var etcdCli *etcd.Client
    var etcdOnce = &sync.Once{}
    func Init() error {
       var err error
       etcdOnce.Do(
          func() {
             endpoints := viper.GetStringSlice("etcd.endpoints")
             username := viper.GetString("etcd.username")
             password := viper.GetString("etcd.password")
             cfg := etcd.Config{
                Endpoints:   endpoints,
                DialTimeout: time.Second,
                Username:    username,
                Password:    password,
             }
             etcdCli, err = etcd.New(cfg)
          })
       return err
    }
    func GetClient() *etcd.Client {
       return etcdCli
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    接下来,我在 cmd/root.go 中调用 etcd.Init ,初始化 etcd 客户端。代码如下:

    if err := etcd.Init(); err != nil {
       panic(err)
    }
    
    • 1
    • 2
    • 3
    • 1
    • 2
    • 3

    存储集群信息

    既然要用 etcd 存储集群信息,那么集群信息都有哪些呢?

    总的来说,秒杀系统的集群信息主要是秒杀服务节点信息,比如服务地址、版本号、协议类型等。在秒杀系统集群内,主要是将秒杀 API 服务的节点信息提供给 Admin 服务,用于调用 API 服务的 RPC 接口同步配置;在秒杀系统集群外使用节点信息,主要通过服务发现的方式,将节点暴露给监控系统或者其他微服务调用。

    在 etcd 中,我们可以将节点信息存放到 /seckill/nodes 这个 key 下。以 IP 为 10.10.11.12 的节点为例,我们用 addr 字段保存服务地址,用 proto 保存协议类型,用 version 字段保存版本号。由于节点信息只会在节点启动时变更,我们用 json 格式来存放,具体的数据示例如下:

    {
      "addr": "10.10.11.12:8080",
      "proto": "http",
      "version": "v1.0"
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 1
    • 2
    • 3
    • 4
    • 5

    我们可以用 Go 结构体来定义一个新类型 Node,用于处理 json 格式的节点信息,字段名跟前面提到的 json 格式保持一致。如下所示:

    type Node struct {
       Addr    string `json:"addr"`
       Version string `json:"version"`
       Proto   string `json:"proto"`
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    节点信息的管理需要使用三个函数:Register、Deregister 和 Discover,它们分别用于服务节点的注册、注销和发现。

    具体怎么实现呢?

    第一步,我定义了个 cluster 结构体,用于保存集群管理过程中需要的所有信息,包括锁、etcd 客户端、节点列表等,然后在 Init 函数中初始化它。具体代码如下所示:

    type cluster struct {
       sync.RWMutex
       cli     *etcdv3.Client
       service string
       once    *sync.Once
       deregCh map[string]chan struct{}
       nodes   map[string]*Node
       v       *viper.Viper
    }
    var defaultCluster *cluster
    var once = &sync.Once{
    func Init(service string) {
       once.Do(func() {
          defaultCluster = &cluster{
             cli:     etcd.GetClient(),
             service: service,
             once:    &sync.Once{},
             deregCh: make(map[string]chan struct{}),
             nodes:   make(map[string]*Node),
          }
       })
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    第二步,我实现了 Register、Deregister 和 Discover 这三个函数。

    具体来说,我在 Register 函数中将节点信息注册到 etcd,设置一个有效期,并定期更新,确保节点信息是最新的。在 Deregister 函数中将注册到 etcd 的节点信息删除。在 Discover 函数中,监控节点变更,并及时更新本地内存缓存中的节点信息,将最新的节点信息返回给调用者。代码如下:

    func Register(node *Node, ttl int) error {
       const minTTL = 2
       c := defaultCluster
       key := c.makeKey(node)
       if ttl < minTTL {
          ttl = minTTL
       }
       var errCh = make(chan error)
       go func() {
          kv := etcdv3.NewKV(c.cli)
          closeCh := make(chan struct{})
          lease := etcdv3.NewLease(c.cli)
          val, _ := json.Marshal(node)
          var curLeaseId etcdv3.LeaseID = 0
          ticker := time.NewTicker(time.Duration(ttl/2) * time.Second)
          register := func() error {
             if curLeaseId == 0 {
                leaseResp, err := lease.Grant(context.TODO(), int64(ttl))
                if err != nil {
                   return err
                }
                if _, err := kv.Put(context.TODO(), key, string(val), etcdv3.WithLease(leaseResp.ID)); err != nil {
                   return err
                }
                curLeaseId = leaseResp.ID
             } else {
                if _, err := lease.KeepAliveOnce(context.TODO(), curLeaseId); err == rpctypes.ErrLeaseNotFound {
                   curLeaseId = 0
                }
             }
             return nil
          }
          if err := register(); err != nil {
             logrus.Error("register node failed, error:", err)
             errCh <- err
          }
          close(errCh)
          for {
             select {
             case <-ticker.C:
                if err := register(); err != nil {
                   logrus.Error("register node failed, error:", err)
                   panic(err)
                }
             case <-closeCh:
                ticker.Stop()
                return
             }
          }
       }()
       err := <-errCh
       return err
    }
    func Deregister(node *Node) error {
       c := defaultCluster
       c.Lock()
       defer c.Unlock()
       key := c.makeKey(node)
       if ch, ok := c.deregCh[key]; ok {
          close(ch)
          delete(c.deregCh, key)
       }
       _, err := c.cli.Delete(context.Background(), key, etcdv3.WithPrefix())
       return err
    }
    func Discover() (output []*Node, err error) {
       c := defaultCluster
       key := fmt.Sprintf("/%s/nodes/", c.service)
       c.once.Do(func() {
          var resp *etcdv3.GetResponse
          resp, err = c.cli.Get(context.Background(), key, etcdv3.WithPrefix())
          if err != nil {
             return
          }
          for _, kv := range resp.Kvs {
             k := string(kv.Key)
             if len(k) > len(key) {
                var node *Node
                json.Unmarshal(kv.Value, &node)
                if node != nil {
                   c.Lock()
                   c.nodes[k] = node
                   c.Unlock()
                }
             }
          }
          watchCh := c.cli.Watch(context.Background(), key, etcdv3.WithPrefix())
          go func() {
             for {
                select {
                case resp := <-watchCh:
                   for _, evt := range resp.Events {
                      k := string(evt.Kv.Key)
                      if len(k) <= len(key) {
                         continue
                      }
                      switch evt.Type {
                      case etcdv3.EventTypePut:
                         var node *Node
                         json.Unmarshal(evt.Kv.Value, &node)
                         if node != nil {
                            c.Lock()
                            c.nodes[k] = node
                            c.Unlock()
                         }
                      case etcdv3.EventTypeDelete:
                         c.Lock()
                         if _, ok := c.nodes[k]; ok {
                            delete(c.nodes, k)
                         }
                         c.Unlock()
                      }
                   }
                }
             }
          }()
       })
       if err != nil {
          return nil, err
       }
       c.RLock()
       for _, node := range c.nodes {
          output = append(output, node)
       }
       c.RUnlock()
       return
    }
    func (c *cluster) makeKey(node *Node) string {
       id := strings.Replace(node.Addr, ".", "-", -1)
       id = strings.Replace(id, ":", "-", -1)
       return fmt.Sprintf("/%s/nodes/%s", c.service, id)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132

    第三步,修改 interfaces/rpc 目录下的 rpc.go,在 Run 函数中加上注册节点信息的代码,其中初始化一个变量 node,如下所示:

    //初始化集群
    cluster.Init("seckill")
    var addr string
    if addr, err = utils.Extract(bind); err == nil {
       //注册节点信息
       version := viper.GetString("api.version")
       if version == "" {
          version = "v0.1"
       }
       once.Do(func() {
          node = &cluster.Node{
             Addr:    addr,
             Version: version,
             Proto:   "gRPC",
          }
          err = cluster.Register(node, 6)
       })
    }
    if err != nil {
       return err
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    另外,我还在 Exit 函数中加上了注销节点的代码,主要是调用 cluster.Deregister 函数将节点信息从 etcd 中删除。

    第四步,在 Run 函数里初始化集群,并加上一段代码用于输出获取到的节点信息。代码如下:

    cluster.Init("seckill")
    if nodes, err := cluster.Discover(); err == nil {
       log, _ := json.Marshal(nodes)
       logrus.Info("discover nodes ", string(log))
    } else {
       logrus.Error("discover nodes error:", err)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    注意,这个 Run 函数是位于 interfaces/admin 目录下的 admin.go 里面哦。

    第五步,编译成功后,我们先启动 api,然后启动 admin。可以看到 admin 中输出了节点信息日志,说明 admin 服务成功发现了 api 服务的节点。此时,使用 etcdctl 也能获取到节点信息。

    为了验证节点注册功能是否正常工作,我用 pkill -9 seckill 命令将 api 和 admin 都杀掉,使用 etcdctl 前两次还能获取到节点信息,但后面就获取不到了。就是因为杀掉 api 服务后,服务停止了更新过期时间,导致 etcd 将过期的节点信息删除。这也就证明了服务节点注册功能是正常工作的。

    运行效果如下图所示:

    Drawing 0.png

    存储集群配置

    秒杀集群配置主要有日志等级、限流器速度、熔断条件等。它们存放到 etcd 中 /seckill/config 这个 key 下,其中的 logLevel 保存日志等级, rateLimit 保存中间层和底层限流器的速度, circuitBreaker 保存熔断条件中的 cpu 使用率和请求时延。具体配置示例如下:

    {
      "logLevel":"info",
      "rateLimit": {
        "middle": 100000,
        "low": 10000,
      }
      "circuitBreaker":{
        "cpu": 80,
        "latency": 1000,
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    对应到 Go 代码中,大致步骤如下:

    1. 将上面的 json 配置用一个 Config 结构体保存,字段名跟 json 中的保持一致,然后把它放在 infrastructure/cluster 目录下的 cluster.go 中;

    2. 实现一个 WatchClusterConfig 函数,主要是监控并处理 etcd 中集群配置的 Delete 和 Put 事件,并及时更新内存缓存中的配置数据;

    3. 实现一个 GetClusterConfig 函数,用于将缓存中的配置提供给其他模块使用。

    注意,在更新和读取缓存中配置数据的时候,都需要加锁,以免获取到的配置数据不是最新的。

    代码如下所示:

    type Config struct {
       LogLevel  string `json:"logLevel"`
       RateLimit struct {
          Middle int `json:"middle"`
          Low    int `json:"low"`
       } `json:"rateLimit"`
       CircuitBreaker struct {
          Cpu     int `json:"cpu"`
          Latency int `json:"latency"`
       } `json:"circuitBreaker"`
    }
    var configLock = &sync.RWMutex{}
    var config = &Config{}
    func WatchClusterConfig() error {
       cli := etcd.GetClient()
       key := "/seckill/config"
       resp, err := cli.Get(context.Background(), key)
       if err != nil {
          return err
       }
       update := func(kv *mvccpb.KeyValue) (bool, error) {
          if string(kv.Key) == key {
             var tmpConfig *Config
             err = json.Unmarshal(kv.Value, &tmpConfig)
             if err != nil {
                logrus.Error("update cluster config failed, error:", err)
                return false, err
             }
             configLock.Lock()
             *config = *tmpConfig
             logrus.Info("update cluster config ", *config)
             configLock.Unlock()
             return true, nil
          }
          return false, nil
       }
       for _, kv := range resp.Kvs {
          if ok, err := update(kv); ok {
             break
          } else if err != nil {
             return err
          }
       }
       go func() {
          watchCh := cli.Watch(context.Background(), key)
          for resp := range watchCh {
             for _, evt := range resp.Events {
                if evt.Type == etcdv3.EventTypePut {
                   if ok, err := update(evt.Kv); ok {
                      break
                   } else if err != nil {
                      break
                   }
                }
             }
          }
       }()
       return nil
    }
    func GetClusterConfig() Config {
       configLock.RLock()
       defer configLock.RUnlock()
       return *config
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64

    接下来,为了能同时启动两个 api 服务,我先将 config/seckill.toml 配置文件拷贝一份,新的配置文件为 config/seckill1.toml,将里面的端口号和 pid 文件修改成与老的配置不一样,比如端口号改成 8083 和 8084,pid 文件改成 .pid1。

    现在我们用这两份配置文件启动两个 api 服务,然后使用下面的 etcdctl 命令将配置数据写入到 etcd 。

    ETCDCTL_API=3 etcdctl put /seckill/config '{"logLevel":"info","rateLimit":{"middle":100000,"low":10000},"circuitBreaker":{"cpu":80,"latency":1000}}'
    
    • 1

    此时我们就能在两个运行 api 服务的命令行终端看到 update cluster config 这样的日志,而且它们的时间戳一样,说明它们几乎能同时接收到配置变更事件,并在日志中输出最新的配置。

    效果如下图所示:

    Drawing 1.png

    图片1.png

    小结

    这一讲我主要介绍了如何用 etcd 存储集群信息和集群配置。实际上,集群信息和配置管理有很多种方法,比如你还可以用 consul 来实现配置管理、服务注册和发现,那可能会比用 etcd 更简单些。

    我之所以在这里用 etcd ,主要是为了给你介绍集群配置管理、服务注册和发现的底层代码原理。当你真正学会了自己去实现一套类似的代码后,你在使用其他方案时将会更容易理解它们的原理。

    另外,还需要注意一点,我在这里用的 etcd 版本是 v3,在代码和命令行上与 v2 有些差别。

    思考题: 如果用 consul 来实现集群信息和配置管理,该如何做呢?

    期待你在留言区回答。

    好了,这一讲就到这里了,下一讲我将给你介绍“如何使用 Redis 缓存库存信息”。到时见!

    源码地址:https://github.com/lagoueduCol/MiaoSha-Yiletian


    精选评论

    **耀:

    老师我很喜欢你的课程,每次都能学到很多知识,谢谢老师。

    **0835:

    老师,问一下,这些配置信息,可以用redis来做存储吗?不是redis的性能更好一些吗?

        讲师回复:

        Redis性能虽然好,但无法保证数据一致和可靠。ETCD是会将数据落磁盘的,会有WAL日志,基于WAL之上用RAFT确保数据一致,不担心数据丢失。

  • 相关阅读:
    OpenGL ES EAGLContext 和 EGLContext
    Matlab之数组、包含分配给类别的值函数categorical
    RHCE---时间服务器
    java中的lambda表达式
    区块链侧链技术(0)——衍生知识补充
    怎样图片转文字?两分钟让你实现快速转文字
    【接口测试】JMeter调用JS文件实现RSA加密
    python 操作redis 消息队列
    A的闭包+B的闭包包含于A+B的闭包
    Python中的三元运算符详解
  • 原文地址:https://blog.csdn.net/fegus/article/details/126361788