• 34、幂等性


    一、服务雪崩

    • 什么是服务雪崩:服务雪崩效应是一种因“服务提供者的不可用”(原因)导致“服务调用者不可用”(结果),并将不可用逐渐放大的现象
    • 图示服务雪崩:Service A的流量波动很大,流量经常会突然性增加!那么在这种情况下,就算Service A能扛得住请求,Service B和Service C未必能扛得住这突发的请求。此时,如果Service C因为抗不住请求,变得不可用。那么Service B的请求也会阻塞,慢慢耗尽Service B的线程资源,Service B就会变得不可用。紧接着,Service A也会不可用

    在这里插入图片描述


    二、超时、重试和幂等性

    1 - 超时、重试和幂等性概念

    • grpc的超时:timeout为了保护服务,避免consumer服务因为provider响应慢而变得响应很慢,这样consumer可以尽量保持原有的性能
    • grpc的重试
      • 如果provider只是偶尔抖动,那么超时后直接放弃,不做后续处理,就会导致当前请求错误,也会带来业务方面的损失
      • 对于这种偶尔抖动,可以在超时后重试一下,重试如果正常返回了,那么这次请求就被挽救了,能够正常给前端返回数据,只不过比原来响应慢一点
      • 重试可以考虑切换一台机器来进行调用,因为原机器可能由于临时负载高而性能下降,重试会加剧其性能问题,而换一台机器,得到更快返回的概率更大一些
    • 幂等性
      • 如果允许consumer重试,那么provider就要能够做到幂等
      • 同一个请求被consumer多次调用,对provider产生的影响是一致的
      • 而且这幂等应该是服务级别的,而不是某台机器层面的,重试调用任何一台机器,都应该做到幂等

    2 - 哪些情况下需要考虑幂等性

    • http请求的类型:同样的请求发送多次
      • get:获取商品信息,这个不会引起商品数据的变化 —— 不需要考虑幂等性
      • post:比较常见,这种借口需要考虑到幂等性
      • put:
        • 不一定要实现幂等性
          • a.put把1号商品的价格改为200,网络返回的时候抖动了,重试
          • b.第2次借口还是会把1号商品的价格改为200-这种情况下没有幂等性的问题
        • 出现幂等性问题的情况
          • a.购物车中的商品,调用一次这个商品的数量加一(用户本身只想买11件,但是却变成了13件,这就需要解决幂等性问题了)
            • 第一次调用,原本的值是10,之后的数量变为11,但是返回的时候出现了网络抖动
            • 第二次发送,原本的值是11,之后的数量变量12,但是返回的时候出现了网络抖动
            • 第三次发送,原本的值是12,之后的数量变量13,但是返回的时候出现了网络抖动
      • delete:
        • 一般不具备幂等性的要求
        • 第一次调用删除数据
        • 第二次调用还是删除当前的数据

    三、go中实现grpc的调用重试

    package main
    
    import (
    	"context"
    	"fmt"
    	"google.golang.org/grpc/credentials/insecure"
    	"time"
    
    	"google.golang.org/grpc/codes"
    
    	grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
    	"google.golang.org/grpc"
    
    	"test/proto"
    )
    
    func main() {
    	//stream
    	interceptor := func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
    		start := time.Now()
    		err := invoker(ctx, method, req, reply, cc, opts...)
    		fmt.Printf("耗时:%s\n", time.Since(start))
    		return err
    	}
    	var opts []grpc.DialOption
    	opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
    	retryOpts := []grpc_retry.CallOption{
    		grpc_retry.WithMax(3),
    		grpc_retry.WithPerRetryTimeout(1 * time.Second),
    		grpc_retry.WithCodes(codes.Unknown, codes.DeadlineExceeded, codes.Unavailable),
    	}
    
    	opts = append(opts, grpc.WithUnaryInterceptor(interceptor))
    	//这个请求应该多长时间超时, 这个重试应该几次、当服务器返回什么状态码的时候重试
    	opts = append(opts, grpc.WithUnaryInterceptor(grpc_retry.UnaryClientInterceptor(retryOpts...)))
    	conn, err := grpc.Dial("127.0.0.1:50051", opts...)
    	if err != nil {
    		panic(err)
    	}
    	defer conn.Close()
    
    	c := proto.NewGreeterClient(conn)
    	r, err := c.SayHello(context.Background(), &proto.HelloRequest{Name: "bobby"})
    	if err != nil {
    		panic(err)
    	}
    	fmt.Println(r.Message)
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • server/server.go
    package main
    
    import (
    	"context"
    	"fmt"
    	"net"
    	"time"
    
    	"google.golang.org/grpc"
    
    	"test/proto"
    )
    
    type Server struct{}
    
    func (s *Server) SayHello(ctx context.Context, request *proto.HelloRequest) (*proto.HelloReply,
    	error) {
    	time.Sleep(2 * time.Second)
    	return &proto.HelloReply{
    		Message: "hello " + request.Name,
    	}, nil
    }
    
    func main() {
    	interceptor := func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
    		fmt.Println("接收到了一个新的请求")
    		res, err := handler(ctx, req)
    		fmt.Println("请求已经完成")
    		return res, err
    	}
    
    	opt := grpc.UnaryInterceptor(interceptor)
    	g := grpc.NewServer(opt)
    	proto.RegisterGreeterServer(g, &Server{})
    	lis, err := net.Listen("tcp", "0.0.0.0:50051")
    	if err != nil {
    		panic("failed to listen:" + err.Error())
    	}
    	err = g.Serve(lis)
    	if err != nil {
    		panic("failed to start grpc:" + err.Error())
    	}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 先运行server再运行client

    在这里插入图片描述


    四、常用的幂等性解决方案

    1 - 唯一索引,防止新增脏数据

    • 示例:新建用户的时候将手机号设置为唯一索引,那么即使重试,也只会新建一个用户,不会因为重试导致当前用户注册了两个用户
    • 要点:唯一索引或唯一组合索引来防止新增数据存在脏数据(当表存在唯一索引,并发时新增报错的时候,再查询一次就可以了,数据应该已经存在,返回结果即可)

    在这里插入图片描述

    2 - token机制,防止页面重复提交

    • 业务要求:页面的数据只能被点击提交一次
    • 发生原因:由于重复点击或者网络重发,或者nginx重发等情况会导致数据被重复提交
    • 解决办法:集群环境,采用token加redis(redis单线程的,处理需要排队)
    • 处理流程
      • ①.数据提交前要向服务申请token,token放到redis或内存,token有效时间
      • ②.提交后台校验token,同时删除token,生成新的token返回
    • token特点:要申请,一次有效性,可以限流
    • 注意:redis要用删除操作来判断token,删除成功代表token校验通过,如果select+delete来检验token,存在并发问题,不建议使用

    3 - 悲观锁

    • 获取数据的时候加锁select * from table_xxx where id = 'xxx' for update;
    • 注意:id字段一定是主键或者唯一索引,不然是锁表,会芭比q的;悲观锁使用时一般伴随事务一起使用,数据锁定时间可能会很长,根据实际情况选用

    4 - 乐观锁

    • 乐观锁方案:乐观锁只是在更新数据那一刻锁表,其他事件不锁表,所以相对于悲观锁,效率更高
    • 乐观锁的实现方式:实现方式多种多样,可以通过version或者其他状态条件
      • 通过版本号实现:update table_xxx set name=#name#,version=version+1 where version=#version#
      • 通过条件限制:update table_xxx set avai_amount = avai_amount-#subAmount# where avai_amount-#subAmount# >= 0
        • 要求:quality - #subQuality# >= ,这种情景适合不用版本号,只更新是做数据安全校验,适合库存模型,扣份额和回滚份额,性能更高
    • 注意:乐观锁的更新操作,最好用主键或者唯一索引来更新;这样是行锁,否则更新时会锁表,上面两个sql改成下面的两个更好
      • update table_xxx set name=#name#,version=version+1 where id=#id# and version=#version#
      • update table_xxx set avai_amount = avai_amount-#subAmount# where id=#id# and avai_amount-#subAmount# >= 0

    5 - 分布式锁

    • 示例说明:插入数据的例子,如果是分布式系统,构建全局唯一索引比较困难,例如唯一性的字段没法确定,这时候可以引入分布式锁,通过第三方的系统(redis或zookeeper),在业务系统插入数据或者更新数据,获取分布式锁,然后做操作,之后释放锁,这样其实是把多线程并发的锁的思路,引入多个系统,也就是分布式系统中的解决思路

    6 - select + insert

    • 说明:并发不高的后台系统,或者一些任务JOB,为了支持幂等,支持重复执行,简单的处理方法是,先查询一下关键数据,判断是否已经执行过,再进行业务处理就可以了
      在这里插入图片描述

    7 - 对外提供接口的api如何保证幂等

    • 说明:如银联提供的付款接口 —— 需要接入商户提交付款请求时附带 —— source来源,seq序列号;source+seq在数据库里面做唯一索引,防止多次付款(并发时,只能处理一个请求)
  • 相关阅读:
    使用vscode操作本地git
    基于SSM+Vue的随心淘网管理系统
    mybatisPlus笔记
    vue用法示例(二)
    C++面向对象程序设计 - 派生类的构造函数和析构函数
    golang的channel探索
    关于华为OD,你应该知道的
    接口与外设数据传送方式(笔记)
    并查集快速合并(Java 实例代码)
    Mysql数据库索引
  • 原文地址:https://blog.csdn.net/qq23001186/article/details/126339342