• grpc使用etcd做服务注册与发现


    etcd是用go语言编写的key-value存储中间件组件,能保证多个节点数据的强一致性,适合存储重要数据,但不适合存储大量数据,比较适合做微服务注册中心及分布式锁等。

    etcd做服务发现原理简单分析:监听服务注册的key,当对应的值发生变化时,通知grpc更新服务列表地址

    etcd做服务注册原理简单分析:向etcd组件注册服务名称及地址,通过租约机制不断续约注册的key以保持服务的存活状态

    下面是实现的代码:

    目录结构:

    client

    --main.go

    proto

    --greet.pb.go

    --greet.proto

    server

    --main.go

    各文件代码:

    proto/greet.proto

    1. syntax = "proto3";
    2. option go_package = ".;greet";
    3. service Greet {
    4. rpc Hello(GreetRequest)returns(GreetResponse){}
    5. }
    6. message GreetRequest {
    7. string name = 1;
    8. }
    9. message GreetResponse {
    10. string message = 1;
    11. string from = 2;
    12. }

    在proto目录下执行protoc --go_out=plugins=grpc:. *.proto 生成greet.pb.go

    client/main.go

    1. package main
    2. import (
    3. "flag"
    4. "fmt"
    5. proto "grpc-etcd/proto"
    6. "log"
    7. "strings"
    8. "time"
    9. "go.etcd.io/etcd/client/v3"
    10. "golang.org/x/net/context"
    11. "google.golang.org/grpc"
    12. "google.golang.org/grpc/resolver"
    13. )
    14. var (
    15. ServiceName = flag.String("ServiceName", "greet_service", "service name") //服务名称
    16. EtcdAddr = flag.String("EtcdAddr", "127.0.0.1:2379", "register etcd address") //etcd的地址
    17. )
    18. var cli *clientv3.Client
    19. //etcd解析器
    20. type etcdResolver struct {
    21. etcdAddr string
    22. clientConn resolver.ClientConn
    23. }
    24. //初始化一个etcd解析器
    25. func newResolver(etcdAddr string) resolver.Builder {
    26. return &etcdResolver{etcdAddr: etcdAddr}
    27. }
    28. func (r *etcdResolver) Scheme() string {
    29. return "etcd"
    30. }
    31. //watch有变化以后会调用
    32. func (r *etcdResolver) ResolveNow(rn resolver.ResolveNowOptions) {
    33. log.Println("ResolveNow")
    34. fmt.Println(rn)
    35. }
    36. //解析器关闭时调用
    37. func (r *etcdResolver) Close() {
    38. log.Println("Close")
    39. }
    40. //构建解析器 grpc.Dial()同步调用
    41. func (r *etcdResolver) Build(target resolver.Target, clientConn resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
    42. var err error
    43. fmt.Println("call build...")
    44. //构建etcd client
    45. if cli == nil {
    46. cli, err = clientv3.New(clientv3.Config{
    47. Endpoints: strings.Split(r.etcdAddr, ";"),
    48. DialTimeout: 15 * time.Second,
    49. })
    50. if err != nil {
    51. fmt.Printf("连接etcd失败:%s\n", err)
    52. return nil, err
    53. }
    54. }
    55. r.clientConn = clientConn
    56. go r.watch("/" + target.Scheme + "/" + target.Endpoint + "/")
    57. return r, nil
    58. }
    59. //监听etcd中某个key前缀的服务地址列表的变化
    60. func (r *etcdResolver) watch(keyPrefix string) {
    61. //初始化服务地址列表
    62. var addrList []resolver.Address
    63. resp, err := cli.Get(context.Background(), keyPrefix, clientv3.WithPrefix())
    64. if err != nil {
    65. fmt.Println("获取服务地址列表失败:", err)
    66. } else {
    67. for i := range resp.Kvs {
    68. addrList = append(addrList, resolver.Address{Addr: strings.TrimPrefix(string(resp.Kvs[i].Key), keyPrefix)})
    69. }
    70. }
    71. r.clientConn.NewAddress(addrList)
    72. //监听服务地址列表的变化
    73. rch := cli.Watch(context.Background(), keyPrefix, clientv3.WithPrefix())
    74. for n := range rch {
    75. for _, ev := range n.Events {
    76. addr := strings.TrimPrefix(string(ev.Kv.Key), keyPrefix)
    77. switch ev.Type {
    78. case 0://mvccpb.PUT
    79. if !exists(addrList, addr) {
    80. addrList = append(addrList, resolver.Address{Addr: addr})
    81. r.clientConn.NewAddress(addrList)
    82. }
    83. fmt.Println("有新的服务注册:",addr)
    84. case 1://mvccpb.DELETE
    85. if s, ok := remove(addrList, addr); ok {
    86. addrList = s
    87. r.clientConn.NewAddress(addrList)
    88. }
    89. fmt.Println("服务注销:",addr)
    90. }
    91. }
    92. }
    93. }
    94. func exists(l []resolver.Address, addr string) bool {
    95. for i := range l {
    96. if l[i].Addr == addr {
    97. return true
    98. }
    99. }
    100. return false
    101. }
    102. func remove(s []resolver.Address, addr string) ([]resolver.Address, bool) {
    103. for i := range s {
    104. if s[i].Addr == addr {
    105. s[i] = s[len(s)-1]
    106. return s[:len(s)-1], true
    107. }
    108. }
    109. return nil, false
    110. }
    111. func main() {
    112. flag.Parse()
    113. //注册etcd解析器
    114. r := newResolver(*EtcdAddr)
    115. resolver.Register(r)
    116. //客户端连接服务器(负载均衡:轮询) 会同步调用r.Build()
    117. conn, err := grpc.Dial(r.Scheme()+"://author/"+*ServiceName, grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin"}`),grpc.WithInsecure())
    118. if err != nil {
    119. fmt.Println("连接服务器失败:", err)
    120. }
    121. defer conn.Close()
    122. //获得grpc句柄
    123. c := proto.NewGreetClient(conn)
    124. ticker := time.NewTicker(2 * time.Second)
    125. i:=1
    126. for range ticker.C {
    127. resp1, err := c.Hello(
    128. context.Background(),
    129. &proto.GreetRequest{Name: fmt.Sprintf("张三%d",i)},
    130. )
    131. if err != nil {
    132. fmt.Println("Hello调用失败:", err)
    133. return
    134. }
    135. fmt.Printf("Hello 响应:%s,来自:%s\n", resp1.Message, resp1.From)
    136. i++
    137. }
    138. }

    server/main.go

    1. /**
    2. * etcd demo server
    3. * author: JetWu
    4. * date: 2020.05.01
    5. */
    6. package main
    7. import (
    8. "flag"
    9. "fmt"
    10. proto "grpc-etcd/proto"
    11. "net"
    12. "os"
    13. "os/signal"
    14. "strings"
    15. "syscall"
    16. "time"
    17. "go.etcd.io/etcd/client/v3"
    18. "golang.org/x/net/context"
    19. "google.golang.org/grpc"
    20. )
    21. var host = "127.0.0.1" //服务器主机
    22. var (
    23. Port = flag.Int("Port", 3000, "listening port") //服务器监听端口
    24. ServiceName = flag.String("ServiceName", "greet_service", "service name") //服务名称
    25. EtcdAddr = flag.String("EtcdAddr", "127.0.0.1:2379", "register etcd address") //etcd的地址
    26. )
    27. var cli *clientv3.Client
    28. //rpc服务接口
    29. type greetServer struct{}
    30. func (gs *greetServer) Hello(ctx context.Context, req *proto.GreetRequest) (*proto.GreetResponse, error) {
    31. fmt.Printf("Hello 调用: %s\n", req.Name)
    32. return &proto.GreetResponse{
    33. Message: "Hello, " + req.Name,
    34. From: fmt.Sprintf("127.0.0.1:%d", *Port),
    35. }, nil
    36. }
    37. //将服务地址注册到etcd中
    38. func register(etcdAddr, serviceName, serverAddr string, ttl int64) error {
    39. var err error
    40. if cli == nil {
    41. //构建etcd client
    42. cli, err = clientv3.New(clientv3.Config{
    43. Endpoints: strings.Split(etcdAddr, ";"),
    44. DialTimeout: 15 * time.Second,
    45. })
    46. if err != nil {
    47. fmt.Printf("连接etcd失败:%s\n", err)
    48. return err
    49. }
    50. }
    51. //与etcd建立长连接,并保证连接不断(心跳检测)
    52. ticker := time.NewTicker(time.Second * time.Duration(ttl))
    53. go func() {
    54. key := getKey(serviceName,serverAddr)
    55. for {
    56. resp, err := cli.Get(context.Background(), key)
    57. //fmt.Printf("resp:%+v\n", resp)
    58. if err != nil {
    59. fmt.Printf("获取服务地址失败:%s", err)
    60. } else if resp.Count == 0 { //尚未注册
    61. err = keepAlive(serviceName, serverAddr, ttl)
    62. if err != nil {
    63. fmt.Printf("保持连接失败:%s", err)
    64. }
    65. }
    66. <-ticker.C
    67. }
    68. }()
    69. return nil
    70. }
    71. //组装etcd key
    72. func getKey(serviceName,serverAddr string) string {
    73. return fmt.Sprintf("/%s/%s/%s","etcd",serviceName,serverAddr)
    74. }
    75. //保持服务器与etcd的长连接
    76. func keepAlive(serviceName, serverAddr string, ttl int64) error {
    77. //创建租约
    78. leaseResp, err := cli.Grant(context.Background(), ttl)
    79. if err != nil {
    80. fmt.Printf("创建租期失败:%s\n", err)
    81. return err
    82. }
    83. //将服务地址注册到etcd中
    84. key := getKey(serviceName,serverAddr)
    85. _, err = cli.Put(context.Background(), key, serverAddr, clientv3.WithLease(leaseResp.ID))
    86. if err != nil {
    87. fmt.Printf("注册服务失败:%s", err)
    88. return err
    89. }
    90. fmt.Printf("etcd服务注册成功,key:%s,value:%s",key,serverAddr)
    91. //建立长连接
    92. ch, err := cli.KeepAlive(context.Background(), leaseResp.ID)
    93. if err != nil {
    94. fmt.Printf("建立长连接失败:%s\n", err)
    95. return err
    96. }
    97. //清空keepAlive返回的channel
    98. go func() {
    99. for {
    100. <-ch
    101. }
    102. }()
    103. return nil
    104. }
    105. //取消注册
    106. func unRegister(serviceName, serverAddr string) {
    107. if cli != nil {
    108. key := getKey(serviceName,serverAddr)
    109. cli.Delete(context.Background(), key)
    110. }
    111. }
    112. func main() {
    113. flag.Parse()
    114. //监听网络
    115. listener, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", *Port))
    116. if err != nil {
    117. fmt.Println("监听网络失败:", err)
    118. return
    119. }
    120. defer listener.Close()
    121. //创建grpc句柄
    122. srv := grpc.NewServer()
    123. defer srv.GracefulStop()
    124. //将greetServer结构体注册到grpc服务中
    125. proto.RegisterGreetServer(srv, &greetServer{})
    126. //将服务地址注册到etcd中
    127. serverAddr := fmt.Sprintf("%s:%d", host, *Port)
    128. fmt.Printf("greeting server address: %s\n", serverAddr)
    129. register(*EtcdAddr, *ServiceName, serverAddr, 5)
    130. //关闭信号处理
    131. ch := make(chan os.Signal, 1)
    132. signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGQUIT)
    133. go func() {
    134. s := <-ch
    135. unRegister(*ServiceName, serverAddr)
    136. if i, ok := s.(syscall.Signal); ok {
    137. os.Exit(int(i))
    138. } else {
    139. os.Exit(0)
    140. }
    141. }()
    142. //监听服务
    143. err = srv.Serve(listener)
    144. if err != nil {
    145. fmt.Println("监听异常:", err)
    146. return
    147. }
    148. }

    运行多个服务端,然后运行客户端

  • 相关阅读:
    如何区分一个项目是react还react native
    python中可变类型与不可变类型详细介绍
    vue3+element Plus实现弹框的拖拽、可点击底层页面功能
    结构化编程(SP,structured programming)
    【TFS-CLUB社区 第4期赠书活动】〖Flask Web全栈开发实战〗等你来拿,参与评论,即可有机获得
    【博客451】OVN中的Logical Flow 与 Open Flow
    LeetCode 0226. 翻转二叉树
    【云原生之Docker实战】使用Docker部署excalidraw白板绘图工具
    LeetCode:2. 两数之和
    基于希尔伯特变换的光反馈自混合干涉位移实时跟踪测量系统的瞬时相位计算matlab仿真
  • 原文地址:https://blog.csdn.net/kankan231/article/details/126212654