• 使用etcd选举sdk实践master/slave故障转移


    本次将记录[利用etcd选主sdk实践master/slave高可用], 并利用etcdctl原生脚本验证选主sdk的工作原理。

    master/slave高可用集群

    本文目标

    在异地多机房部署节点,slave作为备用实例启动,但不接受业务流量, 监测到master宕机,slave节点自动提升为master并接管业务流量。

    基本思路

    各节点向etcd注册带租约的节点信息, 并各自维持心跳保活,选主sdk根据目前存活的、最早创建的节点信息键值对 来判断leader, 并通过watch机制通知业务代码leader变更。

    讲道理,每个节点只需要知道两个信息就能各司其职

    • 谁是leader > 当前节点是什么角色=> 当前节点该做什么事情
    • 感知集群leader变更的能力 ===》当前节点现在要不要改变行为

    除了官方etcd客户端go.etcd.io/etcd/client/v3, 还依赖go.etcd.io/etcd/client/v3/concurrency package:实现了基于etcd的分布式锁、屏障、选举

    选主过程 实质 api
    竞选前先查询leader了解现场 查询当前存活的,最早创建的kv值 *concurrency.Election.Leader()
    初始化时,各节点向etcd阻塞式竞选 各节点向etcd注册带租约的键值对 *concurrency.Election.compaign
    建立master/slave集群,还能及时收到变更通知 通过chan传递最新的leader value *concurrency.Election.Observe()

    重点解读

    1.初始化etcd go客户端

    注意:etcd客户端和服务端是通过grpc来通信,目前新版本的etcd客户端默认使用非阻塞式连接, 也就是说v3.New函数仅表示从指定配置创建etcd客户端。

    为快速确定etcd选举的可用性,本实践使用阻塞式创建客户端:

    cli, err := v3.New(v3.Config{
    		Endpoints:   addr,
    		DialTimeout: time.Second * 5,
    		DialOptions: []grpc.DialOption{grpc.WithBlock()},
    	})
    	if err != nil {
    		log.WithField("instance", Id).Errorln(err)
    		return nil, err
    	}
    

    2. 竞选

    使用阻塞式命令compaign竞选之前,应先查询当前leader

    // 将id:ip:port作为竞选时写入etcd的value
    func (c *Client) Election(id string, notify chan<- bool) error {
    	//竞选前先试图去了解情况
    	ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
    	defer cancel()
    	resp, err := c.election.Leader(ctx)
    	if err != nil {
    		if err != concurrency.ErrElectionNoLeader {
    			return err
    		}
    	} else { // 已经有leader了
    		c.Leader = string(resp.Kvs[0].Value)
    		notify <- (c.Leader == id)
    	}
    
    	if err = c.election.Campaign(context.TODO(), id); err != nil {
    		log.WithError(err).WithField("id", id).Error("Campaign error")
    		return err
    	} else {
    		log.Infoln("Campaign success!!!")
    		c.Leader = id
    		notify <- true
    	}
    	c.election.Key()
    	return nil
    }
    

    竞选:election.Campaign(val) 的实质是将K:V(节点id)添加到etcd, 并给与10s的TTL保活

    过程 输出 备注
    etcdctl lease grant 10 lease 694d813d3a3ccbfb granted with TTL(10s)
    etcdctl lease keep-alive 694d813d3a3ccbfb lease 694d813d3a3ccbfb keepalived with TTL(20)...... NewSession(ttl:10s) 默认创建带keep-alive的lease
    etcdctl put --lease=694d813d3a3ccbfb /merc/694d813d3a3ccbfb id ok ---

    key: keyprefix/leaseId , leaseId 是全局生成的leaseId的16进制表示

    func (e *Election) Campaign(ctx context.Context, val string) error {
    	s := e.session
    	client := e.session.Client()
    
    	k := fmt.Sprintf("%s%x", e.keyPrefix, s.Lease())
    	txn := client.Txn(ctx).If(v3.Compare(v3.CreateRevision(k), "=", 0))
    	txn = txn.Then(v3.OpPut(k, val, v3.WithLease(s.Lease())))
    	txn = txn.Else(v3.OpGet(k))
    	resp, err := txn.Commit()
    	if err != nil {
    		return err
    	}
    	e.leaderKey, e.leaderRev, e.leaderSession = k, resp.Header.Revision, s
    	if !resp.Succeeded {
    		kv := resp.Responses[0].GetResponseRange().Kvs[0]
    		e.leaderRev = kv.CreateRevision
    		if string(kv.Value) != val {
    			if err = e.Proclaim(ctx, val); err != nil {
    				e.Resign(ctx)
    				return err
    			}
    		}
    	}
    

    当选: 当前存活的、最早创建的key是leader , 也就是说master/slave故障转移并不是随机的

    3. watch leader变更

    golang使用信道完成goroutine通信,

    本例声明信道: notify = make(chan bool, 1)

    一石二鸟:标记集群leader是否发生变化;信道内传值表示当前节点是否是leader

    func (c *Client) Watchloop(id string, notify chan<- bool) error {
    	ch := c.election.Observe(context.TODO()) // 观察leader变更
    	tick := time.NewTicker(c.askTime)
    
    	defer tick.Stop()
    	for {
    		var leader string
    
    		select {
    		case _ = <-c.sessionCh:
    			log.Warning("Recv session event")
    			return fmt.Errorf("session Done") // 一次续约不稳,立马退出程序
    		case e := <-ch:
    			log.WithField("event", e).Info("watch leader event")
    			leader = string(e.Kvs[0].Value)
    			ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
    			defer cancel()
    			resp, err := c.election.Leader(ctx)
    			if err != nil {
    				if err != concurrency.ErrElectionNoLeader {
    					return err
    				} else { // 目前没leader,开始竞选了
    					if err = c.election.Campaign(context.TODO(), id); err != nil {
    						log.WithError(err).WithField("id", id).Error("Campaign error")
    						return err
    					} else { // 竞选成功
    						leader = id
    					}
    				}
    			} else {
    				leader = string(resp.Kvs[0].Value)
    			}
    		}
    		if leader != c.Leader {
    			log.WithField("before", c.Leader).WithField("after", leader == id).Info("leader changed")
    			notify <- (leader == id)
    		}
    		c.Leader = leader
    	}
    }
    

    c.election.Observe(context.TODO()) 返回最新的leader信息,配合select case控制结构能够及时拿到leader变更信息。

    如题:通过Leader字段和chan <- bool, 掌控了整个选举集群的状态, 可根据这两个信息去完成业务上的master/slave故障转移。

    使用etcdctl确定leader

    election.Leader的源码证明了[当前存活的,最早创建的kv为leader]

    // Leader returns the leader value for the current election.
    func (e *Election) Leader(ctx context.Context) (*v3.GetResponse, error) {
    	client := e.session.Client()
    	resp, err := client.Get(ctx, e.keyPrefix, v3.WithFirstCreate()...)
    	if err != nil {
    		return nil, err
    	} else if len(resp.Kvs) == 0 {
    		// no leader currently elected
    		return nil, ErrElectionNoLeader
    	}
    	return resp, nil
    }
    

    等价于./etcdctl --endpoints=127.0.0.1:2379 get /merc --prefix --sort-by=CREATE --order=ASCEND --limit=1

    --sort-by :以x标准(创建时间)检索数据
    -- order : 以升降序对已检出的数据排序
    -- limit: 从已检出的数据中取x条数据显示

  • 相关阅读:
    Ubuntu系统之管理文件权限二
    华为数通方向HCIP-DataCom H12-831题库(多选题:81-100)
    nginx下载与安装教程
    【数据结构】栈与队列
    BurpSuit官方实验室之信息泄露
    马克·雷伯特访谈:机器人的未来及波士顿动力的创新之路
    Jmeter(七):jmeter连接数据库/中元件的执行顺序&作用域详解
    【数据结构与算法分析】0基础带你学数据结构与算法分析07--二叉树
    RCD负载箱的安全性能和认证标准有哪些?
    java毕业设计售电公司用户管理系统mybatis+源码+调试部署+系统+数据库+lw
  • 原文地址:https://www.cnblogs.com/JulianHuang/p/16166069.html