• Golang分布式应用之ZooKeeper


    ZooKeeper是Apache下一个开源项目,提供分布式配置、同步服务以及命名注册等,是一个高可靠的分布式协调系统。

    其应用场景与etcd类似,可以使用在

    • 服务发现
    • 分布式锁
    • 选主
    • 分布式队列
    • 分布式系统协调
    • 负载均衡

    如在Hadooop、Kafka中将ZooKeeper作为核心组件。本文结合Golang来编写对应的中间件,所有代码见https://github.com/qingwave/gocorex

    服务注册

    服务注册主要细节在etcd中已提及,主要来解决分布式环境中服务注册注销与状态感知,包括:

    • 服务注册、注销
    • 服务宕机或异常时,自动注销
    • 感知服务端点变化

    借助zk实现服务发现:

    • 可以通过将端点写同一个目录(相同前缀,如/services/job/endpoint1, /services/job/endpoint2),写入临时节点,如果服务宕机,Session过期对应端点会自动删除
    • 通过Watch API可以监听端点变化

    核心代码如下:

    // 注册,1表示临时节点
    func (d *ZkDiscovery) Register(ctx context.Context) error {
    	_, err := d.conn.Create(d.myKey, []byte(d.Val), 1, d.ACL)
    	if err == zk.ErrNodeExists {
    		return nil
    	}
    	return err
    }
    
    // 注销,直接删除对应Key即可
    func (d *ZkDiscovery) UnRegister(ctx context.Context) error {
    	err := d.conn.Delete(d.myKey, -1)
    	if err == zk.ErrNoNode {
    		return nil
    	}
    	return err
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    服务监听通过zk Watch接口

    func (d *ZkDiscovery) Watch(ctx context.Context) error {
    	d.watchContext, d.watchCancel = context.WithCancel(ctx)
        // 获取最新列表
    	if err := d.refreshServices(); err != nil {
    		return err
    	}
    
    	if d.Callbacks.OnStartedDiscovering != nil {
    		d.Callbacks.OnStartedDiscovering(d.ListServices())
    	}
    
    	defer d.watchCancel()
    
    	defer func() {
    		if d.Callbacks.OnStoppedDiscovering != nil {
    			d.Callbacks.OnStoppedDiscovering()
    		}
    	}()
    
    loop:
        // 添加节点变化
    	children, _, ch, err := d.conn.ChildrenW(d.Path)
    	if err != nil {
    		return err
    	}
        d.setServices(containerx.NewSet(children...))
    	for {
    		select {
    		case <-d.watchContext.Done():
    			return nil
    		case e, ok := <-ch:
            // zk 是一个一次性触发器,收到事件后需要重新watch
    			if !ok {
    				goto loop
    			}
    			if e.Err != nil {
    				return e.Err
    			}
                // 当子节点变化时,获取最新服务列表
    			switch e.Type {
    			case zk.EventNodeCreated, zk.EventNodeChildrenChanged:
    				d.refreshServices()
    			}
    
    			switch e.State {
    			case zk.StateExpired:
    				return fmt.Errorf("node [%s] expired", d.myKey)
    			case zk.StateDisconnected:
    				return nil
    			}
    
    			if d.Callbacks.OnServiceChanged != nil {
    				d.Callbacks.OnServiceChanged(d.ListServices())
    			}
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57

    通过worker模拟不同的端点,测试代码如下:

    func main() {
    	ctx, cancel := context.WithCancel(context.Background())
    	worker := func(i int, run bool) {
    		id := fmt.Sprintf("10.0.0.%d", i)
    		val := fmt.Sprintf("10.0.0.%d", i)
    
    		sd, err := zkdiscovery.New(zkdiscovery.ZkDiscoveryConfig{
    			Endpoints:      []string{"127.0.0.1"},
    			Path:           "/zk/services",
    			SessionTimeout: 2 * time.Second,
    			Key:            id,
    			Val:            val,
    			Callbacks: zkdiscovery.DiscoveryCallbacks{
    				OnStartedDiscovering: func(services []zkdiscovery.Service) {
    					log.Printf("[%s] onstarted, services: %v", id, services)
    				},
    				OnStoppedDiscovering: func() {
    					log.Printf("[%s] onstoped", id)
    				},
    				OnServiceChanged: func(services []zkdiscovery.Service) {
    					log.Printf("[%s] onchanged, services: %v", id, services)
    				},
    			},
    		})
    
    		if err != nil {
    			log.Fatalf("failed to create service discovery: %v", err)
    		}
    		defer sd.Close()
    
    		if !run {
    			if sd.UnRegister(context.Background()); err != nil {
    				log.Fatalf("failed to unregister service [%s]: %v", id, err)
    			}
    			return
    		}
    
    		if err := sd.Register(context.Background()); err != nil {
    			log.Fatalf("failed to register service [%s]: %v", id, err)
    		}
    
    		if err := sd.Watch(ctx); err != nil {
    			log.Printf("[%s] failed to watch service: %v", id, err)
    		}
    	}
    
    	wg := group.NewGroup()
    	for i := 0; i < 3; i++ {
    		id := i
    		wg.Go(func() { worker(id, true) })
    	}
    
    	go func() {
    		time.Sleep(2 * time.Second)
    		worker(3, true)
    	}()
    
    	// unregister
    	go func() {
    		time.Sleep(4 * time.Second)
    		worker(1, false)
    	}()
    
    	// wg.Wait()
    
    	time.Sleep(5 * time.Second)
    	cancel()
    	time.Sleep(1 * time.Second)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69

    通过结果可以看到服务能够正常注册注销,而且可以监听到节点变化

    2022/08/09 03:01:29 connected to 127.0.0.1:2181
    2022/08/09 03:01:29 connected to 127.0.0.1:2181
    2022/08/09 03:01:29 connected to 127.0.0.1:2181
    2022/08/09 03:01:29 authenticated: id=72787622169739423, timeout=4000
    2022/08/09 03:01:29 re-submitting `0` credentials after reconnect
    2022/08/09 03:01:29 authenticated: id=72787622169739424, timeout=4000
    2022/08/09 03:01:29 authenticated: id=72787622169739425, timeout=4000
    2022/08/09 03:01:29 re-submitting `0` credentials after reconnect
    2022/08/09 03:01:29 re-submitting `0` credentials after reconnect
    2022/08/09 03:01:29 [10.0.0.2] onstarted, services: [{10.0.0.1 } {10.0.0.0 } {10.0.0.2 }]
    2022/08/09 03:01:29 [10.0.0.0] onstarted, services: [{10.0.0.0 } {10.0.0.2 } {10.0.0.1 }]
    2022/08/09 03:01:29 [10.0.0.1] onstarted, services: [{10.0.0.0 } {10.0.0.2 } {10.0.0.1 }]
    2022/08/09 03:01:31 connected to 127.0.0.1:2181
    2022/08/09 03:01:31 authenticated: id=72787622169739426, timeout=4000
    2022/08/09 03:01:31 re-submitting `0` credentials after reconnect
    2022/08/09 03:01:31 [10.0.0.0] onchanged, services: [{10.0.0.0 } {10.0.0.2 } {10.0.0.1 } {10.0.0.3 }]
    2022/08/09 03:01:31 [10.0.0.1] onchanged, services: [{10.0.0.3 } {10.0.0.0 } {10.0.0.2 } {10.0.0.1 }]
    2022/08/09 03:01:31 [10.0.0.2] onchanged, services: [{10.0.0.0 } {10.0.0.2 } {10.0.0.1 } {10.0.0.3 }]
    2022/08/09 03:01:31 [10.0.0.3] onstarted, services: [{10.0.0.1 } {10.0.0.3 } {10.0.0.0 } {10.0.0.2 }]
    2022/08/09 03:01:33 connected to 127.0.0.1:2181
    2022/08/09 03:01:33 authenticated: id=72787622169739427, timeout=4000
    2022/08/09 03:01:33 re-submitting `0` credentials after reconnect
    2022/08/09 03:01:33 [10.0.0.3] onchanged, services: [{10.0.0.0 } {10.0.0.2 } {10.0.0.3 }]
    2022/08/09 03:01:33 [10.0.0.2] onchanged, services: [{10.0.0.0 } {10.0.0.2 } {10.0.0.3 }]
    2022/08/09 03:01:33 [10.0.0.0] onchanged, services: [{10.0.0.3 } {10.0.0.0 } {10.0.0.2 }]
    2022/08/09 03:01:33 [10.0.0.1] onchanged, services: [{10.0.0.0 } {10.0.0.2 } {10.0.0.3 }]
    2022/08/09 03:01:33 recv loop terminated: EOF
    2022/08/09 03:01:33 send loop terminated: <nil>
    2022/08/09 03:01:34 [10.0.0.3] onstoped
    2022/08/09 03:01:34 [10.0.0.0] onstoped
    2022/08/09 03:01:34 [10.0.0.2] onstoped
    2022/08/09 03:01:34 [10.0.0.1] onstoped
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    分布式锁

    在包github.com/go-zookeeper/zk中已经实现了分布式锁,主要借助了ZooKeeper的临时节点的功能

    • 加锁时,创建临时节点(client与zk server会保持长链接,链接中断则创建的临时数据会被删除)
    • 解锁时,直接删除节点即可

    主要来看加锁过程

    func (l *Lock) LockWithData(data []byte) error {
    	if l.lockPath != "" {
    		return ErrDeadlock
    	}
    
    	prefix := fmt.Sprintf("%s/lock-", l.path)
    
    	path := ""
    	var err error
        // 重试3次
    	for i := 0; i < 3; i++ {
            // 创建临时顺序节点,同名节点会加序列号
    		path, err = l.c.CreateProtectedEphemeralSequential(prefix, data, l.acl)
    		if err == ErrNoNode {
    			// Create parent node.
    			parts := strings.Split(l.path, "/")
    			pth := ""
    			for _, p := range parts[1:] {
    				var exists bool
    				pth += "/" + p
                    // 父路径不存在,创建父节点
    				exists, _, err = l.c.Exists(pth)
    				if err != nil {
    					return err
    				}
    				if exists == true {
    					continue
    				}
    				_, err = l.c.Create(pth, []byte{}, 0, l.acl)
    				if err != nil && err != ErrNodeExists {
    					return err
    				}
    			}
    		} else if err == nil {
    			break
    		} else {
    			return err
    		}
    	}
    	if err != nil {
    		return err
    	}
        // 解析序列号
    	seq, err := parseSeq(path)
    	if err != nil {
    		return err
    	}
        // 获取lock下所有子节点,根据序列号判断是否获得锁
    	for {
    		children, _, err := l.c.Children(l.path)        
    		if err != nil {
    			return err
    		}
    
    		lowestSeq := seq
    		prevSeq := -1
    		prevSeqPath := ""
    
    		for _, p := range children {
    			s, err := parseSeq(p)
    			if err != nil {
    				return err
    			}
    			if s < lowestSeq {
    				lowestSeq = s
    			}
                // 获取此节点前一个序列号
    			if s < seq && s > prevSeq {
    				prevSeq = s
    				prevSeqPath = p
    			}
    		}
            // 如果当前节点序列号最低,则获取到锁
    		if seq == lowestSeq {
    			// Acquired the lock
    			break
    		}
    
    		// 否则等待节点删除
    		_, _, ch, err := l.c.GetW(l.path + "/" + prevSeqPath)
    		if err != nil && err != ErrNoNode {
    			return err
    		} else if err != nil && err == ErrNoNode {
    			// try again
    			continue
    		}
    
    		ev := <-ch
    		if ev.Err != nil {
    			return ev.Err
    		}
    	}
    
    	l.seq = seq
    	l.lockPath = path
    	return nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97

    主要逻辑如下:

    1. 创建临时顺序节点
    2. 如果父节点不存在,则创建父节点
    3. 获取lock下所有子节点序列号
    4. 如果当前节点序列号最小,则获得锁
    5. 否则,等待前一个删除,直到获取锁

    对比etcd的实现,大体思路基本一致,主要差异点在于

    • TTL实现:etcd通过Lease的实现TTL,获取锁后不断刷新Lease; zk通过Session来实现TTL,Session中止会自动清楚临时节点
    • 顺序获取锁:etcd通过Revision来实现;zk则通过临时顺序节点

    对比etcd

    ZooKeeper与etcd的使用场景高度重合,可以项目替代,主要区别有以下几点

    对比项ZooKeeperetcd
    一致性协议zabraft
    健康检查基于Session心跳,Lease刷新
    Watch一次性触发器、只能添加子节点创建、删除,事件不包含数据可以添加前缀、Range、子节点变化
    多版本控制不支持支持,所有Key含有Revision

    etcd作为后期之秀,在功能上更丰富,新项目可以优先尝试使用etcd作为其分布式协调引擎。

    总结

    本文分析了ZooKeeper在分布式锁、服务发现等场景上的实现方式,并对比了与etcd的差异点。

    本文所有代码见https://github.com/qingwave/gocorex,欢迎批评指正。

    Explore more in https://qingwave.github.io

  • 相关阅读:
    webpack源码分析——loader-runner库之runLoaders函数
    GIt 迭代需求经验
    Zabbix安装与部署
    警告:未配置spring boot 配置注解处理器
    干货 | BitSail Connector 开发详解系列一:Source
    [SPOJ FASTFLOW] Fast Maximum Flow [最大流]
    板块一 Servlet编程:第五节 Cookie对象全解 来自【汤米尼克的JAVAEE全套教程专栏】
    【论文阅读笔记】Supervised Contrastive Learning
    Java项目_家庭记账(简易版)
    opencv 暴力特征匹配+FLANN特征匹配
  • 原文地址:https://blog.csdn.net/u012986012/article/details/126262734