etcd是用go语言编写的key-value存储中间件组件,能保证多个节点数据的强一致性,适合存储重要数据,但不适合存储大量数据,比较适合做微服务注册中心及分布式锁等。
etcd做服务发现原理简单分析:监听服务注册的key,当对应的值发生变化时,通知grpc更新服务列表地址
etcd做服务注册原理简单分析:向etcd组件注册服务名称及地址,通过租约机制不断续约注册的key以保持服务的存活状态
下面是实现的代码:
目录结构:
client
--main.go
proto
--greet.pb.go
--greet.proto
server
--main.go
各文件代码:
proto/greet.proto
- syntax = "proto3";
-
- option go_package = ".;greet";
-
- service Greet {
- rpc Hello(GreetRequest)returns(GreetResponse){}
- }
-
- message GreetRequest {
- string name = 1;
- }
-
- message GreetResponse {
- string message = 1;
- string from = 2;
- }
在proto目录下执行protoc --go_out=plugins=grpc:. *.proto 生成greet.pb.go
client/main.go
- package main
-
- import (
- "flag"
- "fmt"
- proto "grpc-etcd/proto"
- "log"
- "strings"
- "time"
-
- "go.etcd.io/etcd/client/v3"
- "golang.org/x/net/context"
- "google.golang.org/grpc"
- "google.golang.org/grpc/resolver"
- )
-
-
- var (
- ServiceName = flag.String("ServiceName", "greet_service", "service name") //服务名称
- EtcdAddr = flag.String("EtcdAddr", "127.0.0.1:2379", "register etcd address") //etcd的地址
- )
-
- var cli *clientv3.Client
-
- //etcd解析器
- type etcdResolver struct {
- etcdAddr string
- clientConn resolver.ClientConn
- }
-
- //初始化一个etcd解析器
- func newResolver(etcdAddr string) resolver.Builder {
- return &etcdResolver{etcdAddr: etcdAddr}
- }
-
- func (r *etcdResolver) Scheme() string {
- return "etcd"
- }
-
- //watch有变化以后会调用
- func (r *etcdResolver) ResolveNow(rn resolver.ResolveNowOptions) {
- log.Println("ResolveNow")
- fmt.Println(rn)
- }
-
- //解析器关闭时调用
- func (r *etcdResolver) Close() {
- log.Println("Close")
- }
-
- //构建解析器 grpc.Dial()同步调用
- func (r *etcdResolver) Build(target resolver.Target, clientConn resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
- var err error
- fmt.Println("call build...")
- //构建etcd client
- if cli == nil {
- cli, err = clientv3.New(clientv3.Config{
- Endpoints: strings.Split(r.etcdAddr, ";"),
- DialTimeout: 15 * time.Second,
- })
- if err != nil {
- fmt.Printf("连接etcd失败:%s\n", err)
- return nil, err
- }
- }
-
- r.clientConn = clientConn
-
- go r.watch("/" + target.Scheme + "/" + target.Endpoint + "/")
-
- return r, nil
- }
-
- //监听etcd中某个key前缀的服务地址列表的变化
- func (r *etcdResolver) watch(keyPrefix string) {
- //初始化服务地址列表
- var addrList []resolver.Address
-
- resp, err := cli.Get(context.Background(), keyPrefix, clientv3.WithPrefix())
- if err != nil {
- fmt.Println("获取服务地址列表失败:", err)
- } else {
- for i := range resp.Kvs {
- addrList = append(addrList, resolver.Address{Addr: strings.TrimPrefix(string(resp.Kvs[i].Key), keyPrefix)})
- }
- }
-
- r.clientConn.NewAddress(addrList)
-
- //监听服务地址列表的变化
- rch := cli.Watch(context.Background(), keyPrefix, clientv3.WithPrefix())
- for n := range rch {
- for _, ev := range n.Events {
- addr := strings.TrimPrefix(string(ev.Kv.Key), keyPrefix)
- switch ev.Type {
- case 0://mvccpb.PUT
- if !exists(addrList, addr) {
- addrList = append(addrList, resolver.Address{Addr: addr})
- r.clientConn.NewAddress(addrList)
- }
- fmt.Println("有新的服务注册:",addr)
- case 1://mvccpb.DELETE
- if s, ok := remove(addrList, addr); ok {
- addrList = s
- r.clientConn.NewAddress(addrList)
- }
- fmt.Println("服务注销:",addr)
- }
- }
- }
- }
-
- func exists(l []resolver.Address, addr string) bool {
- for i := range l {
- if l[i].Addr == addr {
- return true
- }
- }
- return false
- }
-
- func remove(s []resolver.Address, addr string) ([]resolver.Address, bool) {
- for i := range s {
- if s[i].Addr == addr {
- s[i] = s[len(s)-1]
- return s[:len(s)-1], true
- }
- }
- return nil, false
- }
-
- func main() {
- flag.Parse()
-
- //注册etcd解析器
- r := newResolver(*EtcdAddr)
- resolver.Register(r)
-
- //客户端连接服务器(负载均衡:轮询) 会同步调用r.Build()
- conn, err := grpc.Dial(r.Scheme()+"://author/"+*ServiceName, grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin"}`),grpc.WithInsecure())
- if err != nil {
- fmt.Println("连接服务器失败:", err)
- }
- defer conn.Close()
-
- //获得grpc句柄
- c := proto.NewGreetClient(conn)
- ticker := time.NewTicker(2 * time.Second)
- i:=1
- for range ticker.C {
-
- resp1, err := c.Hello(
- context.Background(),
- &proto.GreetRequest{Name: fmt.Sprintf("张三%d",i)},
- )
- if err != nil {
- fmt.Println("Hello调用失败:", err)
- return
- }
- fmt.Printf("Hello 响应:%s,来自:%s\n", resp1.Message, resp1.From)
-
- i++
- }
- }
server/main.go
- /**
- * etcd demo server
- * author: JetWu
- * date: 2020.05.01
- */
- package main
-
- import (
- "flag"
- "fmt"
- proto "grpc-etcd/proto"
- "net"
- "os"
- "os/signal"
- "strings"
- "syscall"
- "time"
-
- "go.etcd.io/etcd/client/v3"
- "golang.org/x/net/context"
- "google.golang.org/grpc"
- )
-
-
- var host = "127.0.0.1" //服务器主机
- var (
- Port = flag.Int("Port", 3000, "listening port") //服务器监听端口
- ServiceName = flag.String("ServiceName", "greet_service", "service name") //服务名称
- EtcdAddr = flag.String("EtcdAddr", "127.0.0.1:2379", "register etcd address") //etcd的地址
- )
- var cli *clientv3.Client
-
- //rpc服务接口
- type greetServer struct{}
-
- func (gs *greetServer) Hello(ctx context.Context, req *proto.GreetRequest) (*proto.GreetResponse, error) {
- fmt.Printf("Hello 调用: %s\n", req.Name)
- return &proto.GreetResponse{
- Message: "Hello, " + req.Name,
- From: fmt.Sprintf("127.0.0.1:%d", *Port),
- }, nil
- }
-
-
- //将服务地址注册到etcd中
- func register(etcdAddr, serviceName, serverAddr string, ttl int64) error {
- var err error
-
- if cli == nil {
- //构建etcd client
- cli, err = clientv3.New(clientv3.Config{
- Endpoints: strings.Split(etcdAddr, ";"),
- DialTimeout: 15 * time.Second,
- })
- if err != nil {
- fmt.Printf("连接etcd失败:%s\n", err)
- return err
- }
- }
-
- //与etcd建立长连接,并保证连接不断(心跳检测)
- ticker := time.NewTicker(time.Second * time.Duration(ttl))
- go func() {
- key := getKey(serviceName,serverAddr)
- for {
- resp, err := cli.Get(context.Background(), key)
- //fmt.Printf("resp:%+v\n", resp)
- if err != nil {
- fmt.Printf("获取服务地址失败:%s", err)
- } else if resp.Count == 0 { //尚未注册
- err = keepAlive(serviceName, serverAddr, ttl)
- if err != nil {
- fmt.Printf("保持连接失败:%s", err)
- }
- }
- <-ticker.C
- }
- }()
-
- return nil
- }
-
- //组装etcd key
- func getKey(serviceName,serverAddr string) string {
- return fmt.Sprintf("/%s/%s/%s","etcd",serviceName,serverAddr)
- }
-
- //保持服务器与etcd的长连接
- func keepAlive(serviceName, serverAddr string, ttl int64) error {
- //创建租约
- leaseResp, err := cli.Grant(context.Background(), ttl)
- if err != nil {
- fmt.Printf("创建租期失败:%s\n", err)
- return err
- }
-
- //将服务地址注册到etcd中
- key := getKey(serviceName,serverAddr)
- _, err = cli.Put(context.Background(), key, serverAddr, clientv3.WithLease(leaseResp.ID))
-
- if err != nil {
- fmt.Printf("注册服务失败:%s", err)
- return err
- }
- fmt.Printf("etcd服务注册成功,key:%s,value:%s",key,serverAddr)
- //建立长连接
- ch, err := cli.KeepAlive(context.Background(), leaseResp.ID)
- if err != nil {
- fmt.Printf("建立长连接失败:%s\n", err)
- return err
- }
-
- //清空keepAlive返回的channel
- go func() {
- for {
- <-ch
- }
- }()
- return nil
- }
-
- //取消注册
- func unRegister(serviceName, serverAddr string) {
- if cli != nil {
- key := getKey(serviceName,serverAddr)
- cli.Delete(context.Background(), key)
- }
- }
-
- func main() {
- flag.Parse()
-
- //监听网络
- listener, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", *Port))
- if err != nil {
- fmt.Println("监听网络失败:", err)
- return
- }
- defer listener.Close()
-
- //创建grpc句柄
- srv := grpc.NewServer()
- defer srv.GracefulStop()
-
- //将greetServer结构体注册到grpc服务中
- proto.RegisterGreetServer(srv, &greetServer{})
-
- //将服务地址注册到etcd中
- serverAddr := fmt.Sprintf("%s:%d", host, *Port)
- fmt.Printf("greeting server address: %s\n", serverAddr)
- register(*EtcdAddr, *ServiceName, serverAddr, 5)
-
- //关闭信号处理
- ch := make(chan os.Signal, 1)
- signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGQUIT)
- go func() {
- s := <-ch
- unRegister(*ServiceName, serverAddr)
- if i, ok := s.(syscall.Signal); ok {
- os.Exit(int(i))
- } else {
- os.Exit(0)
- }
- }()
-
- //监听服务
- err = srv.Serve(listener)
- if err != nil {
- fmt.Println("监听异常:", err)
- return
- }
- }
运行多个服务端,然后运行客户端