• go实现分布式锁


    简介

    本文代码地址

    本文以扣减库存为例,分别实现进程锁;mysql的悲观锁;乐观锁以及redis的分布式锁

    CREATE TABLE `stocks` (
      `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
      `goods` varchar(20) DEFAULT NULL COMMENT '商品id',
      `stocks` int(11) DEFAULT NULL COMMENT '库存',
      `version` int(11) DEFAULT NULL COMMENT '乐观锁',
      PRIMARY KEY (`id`),
      KEY `idx_stocks_goods` (`goods`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    在这里插入图片描述

    无锁

    service

    package service
    
    import (
    	context "context"
    	"go-locks/no-lock/db"
    	"go-locks/no-lock/model"
    	"go-locks/no-lock/proto"
    	"google.golang.org/grpc/codes"
    	"google.golang.org/grpc/status"
    	"google.golang.org/protobuf/types/known/emptypb"
    )
    
    type Server struct{}
    
    func (s Server) SellStock(ctx context.Context, request *proto.StockRequest) (*emptypb.Empty, error) {
    	tx := db.DB.Begin()
    	for _, info := range request.StockInfos {
    		var stock model.Stock
    		if result := tx.Where(&model.Stock{Goods: info.GoodsId}).First(&stock); result.RowsAffected == 0 {
    			return nil, status.Error(codes.NotFound, "商品信息不存在")
    		}
    		if stock.Stocks < info.Num {
    			// 库存不足 回滚事务
    			tx.Rollback()
    			return nil, status.Error(codes.ResourceExhausted, "库存不足")
    		}
    		stock.Stocks -= info.Num
    		tx.Save(stock)
    	}
    	tx.Commit()
    	return &emptypb.Empty{}, nil
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    client

    package main
    
    import (
    	"context"
    	"go-locks/no-lock/proto"
    	"google.golang.org/grpc"
    	"google.golang.org/grpc/credentials/insecure"
    	"sync"
    )
    
    var client proto.StockClient
    
    func main() {
    	conn, err := grpc.Dial("127.0.0.1:8088", grpc.WithTransportCredentials(insecure.NewCredentials()))
    	if err != nil {
    		panic(err)
    	}
    	client = proto.NewStockClient(conn)
    	var wg sync.WaitGroup
    	wg.Add(20)
    	for i := 0; i < 20; i++ {
    		go TestSellStock(&wg)
    	}
    	wg.Wait()
    }
    
    func TestSellStock(wg *sync.WaitGroup) {
    	defer wg.Done()
    	_, err := client.SellStock(context.Background(), &proto.StockRequest{
    		StockInfos: []*proto.StockInfo{
    			{
    				GoodsId: "123456",
    				Num:     1,
    			},
    		},
    	})
    	if err != nil {
    		panic(err)
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    测试

    在这里插入图片描述
    开启20gorouinte去扣减123456的库存;正常情况下123456的库存应该剩余480件,但由于我们没有进行加锁,导致库存还剩485件.这种情况在真实场景下是绝对不能接受的

    进程锁

    service

    package service
    
    import (
    	context "context"
    	"go-locks/process-lock/db"
    	"go-locks/process-lock/model"
    	"go-locks/process-lock/proto"
    	"google.golang.org/grpc/codes"
    	"google.golang.org/grpc/status"
    	"google.golang.org/protobuf/types/known/emptypb"
    	"sync"
    )
    
    type Server struct{}
    
    var mutex sync.Mutex
    
    func (s Server) SellStock(ctx context.Context, request *proto.StockRequest) (*emptypb.Empty, error) {
    	// 加锁
    	mutex.Lock()
    	tx := db.DB.Begin()
    	for _, info := range request.StockInfos {
    		var stock model.Stock
    		if result := tx.Where(&model.Stock{Goods: info.GoodsId}).First(&stock); result.RowsAffected == 0 {
    			return nil, status.Error(codes.NotFound, "商品信息不存在")
    		}
    		if stock.Stocks < info.Num {
    			// 库存不足 回滚事务
    			tx.Rollback()
    			return nil, status.Error(codes.ResourceExhausted, "库存不足")
    		}
    		stock.Stocks -= info.Num
    		tx.Save(stock)
    	}
    	tx.Commit()
    	// 解锁
    	mutex.Unlock()
    	return &emptypb.Empty{}, nil
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    client

    package main
    
    import (
    	"context"
    	"go-locks/process-lock/proto"
    	"google.golang.org/grpc"
    	"google.golang.org/grpc/credentials/insecure"
    	"sync"
    )
    
    var client proto.StockClient
    
    func main() {
    	conn, err := grpc.Dial("127.0.0.1:8088", grpc.WithTransportCredentials(insecure.NewCredentials()))
    	if err != nil {
    		panic(err)
    	}
    	client = proto.NewStockClient(conn)
    	var wg sync.WaitGroup
    	wg.Add(20)
    	for i := 0; i < 20; i++ {
    		go TestSellStock(&wg)
    	}
    	wg.Wait()
    }
    
    func TestSellStock(wg *sync.WaitGroup) {
    	defer wg.Done()
    	_, err := client.SellStock(context.Background(), &proto.StockRequest{
    		StockInfos: []*proto.StockInfo{
    			{
    				GoodsId: "123457",
    				Num:     1,
    			},
    		},
    	})
    	if err != nil {
    		panic(err)
    	}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    测试

    在这里插入图片描述
    单从这次结果上看是没有什么问题,但其实还是有问题的:

    1. 这里使用的锁,他是锁住的整块代码,不管进来的是那个商品都会要等待释放锁才能去获取锁执行,有很严重的性能问题
    2. 这里使用到的锁,是进程级别的锁,是go语言提供的锁,但是在真实的场景下都是多实例部署的,在多实例场景下,仍然会出现无锁时的问题

    mysql悲观锁

    悲观锁总是假设最坏的情况,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会阻塞直到它拿到锁

    service

    package service
    
    import (
    	context "context"
    	"go-locks/pessimistic-lock/db"
    	"go-locks/pessimistic-lock/model"
    	"go-locks/pessimistic-lock/proto"
    	"google.golang.org/grpc/codes"
    	"google.golang.org/grpc/status"
    	"google.golang.org/protobuf/types/known/emptypb"
    	"gorm.io/gorm/clause"
    )
    
    type Server struct{}
    
    func (s Server) SellStock(ctx context.Context, request *proto.StockRequest) (*emptypb.Empty, error) {
    	tx := db.DB.Begin()
    	for _, info := range request.StockInfos {
    		var stock model.Stock
    		// 通过for update 语句实现mysql的悲观锁
    		if result := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Where(&model.Stock{Goods: info.GoodsId}).First(&stock); result.RowsAffected == 0 {
    			return nil, status.Error(codes.NotFound, "商品信息不存在")
    		}
    		if stock.Stocks < info.Num {
    			// 库存不足 回滚事务
    			tx.Rollback()
    			return nil, status.Error(codes.ResourceExhausted, "库存不足")
    		}
    		stock.Stocks -= info.Num
    		tx.Save(stock)
    	}
    	tx.Commit()
    	return &emptypb.Empty{}, nil
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    client

    package main
    
    import (
    	"context"
    	"go-locks/pessimistic-lock/proto"
    	"google.golang.org/grpc"
    	"google.golang.org/grpc/credentials/insecure"
    	"sync"
    )
    
    var client proto.StockClient
    
    func main() {
    	conn, err := grpc.Dial("127.0.0.1:8088", grpc.WithTransportCredentials(insecure.NewCredentials()))
    	if err != nil {
    		panic(err)
    	}
    	client = proto.NewStockClient(conn)
    	var wg sync.WaitGroup
    	wg.Add(20)
    	for i := 0; i < 20; i++ {
    		go TestSellStock(&wg)
    	}
    	wg.Wait()
    }
    
    func TestSellStock(wg *sync.WaitGroup) {
    	defer wg.Done()
    	_, err := client.SellStock(context.Background(), &proto.StockRequest{
    		StockInfos: []*proto.StockInfo{
    			{
    				GoodsId: "123458",
    				Num:     1,
    			},
    		},
    	})
    	if err != nil {
    		panic(err)
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    测试

    在这里插入图片描述

    mysql乐观锁

    乐观锁顾名思义,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号等机制。乐观锁适用于多读的应用类型,这样可以提高吞吐量,

    service

    package service
    
    import (
    	context "context"
    	"go-locks/optimistic-lock/db"
    	"go-locks/optimistic-lock/model"
    	"go-locks/optimistic-lock/proto"
    	"google.golang.org/grpc/codes"
    	"google.golang.org/grpc/status"
    	"google.golang.org/protobuf/types/known/emptypb"
    	"log"
    )
    
    type Server struct{}
    
    func (s Server) SellStock(ctx context.Context, request *proto.StockRequest) (*emptypb.Empty, error) {
    	tx := db.DB.Begin()
    	for _, info := range request.StockInfos {
    		var stock model.Stock
    		for {
    			if result := db.DB.Where(&model.Stock{Goods: info.GoodsId}).First(&stock); result.RowsAffected == 0 {
    				return nil, status.Error(codes.NotFound, "商品信息不存在")
    			}
    			if stock.Stocks < info.Num {
    				// 库存不足 回滚事务
    				tx.Rollback()
    				return nil, status.Error(codes.ResourceExhausted, "库存不足")
    			}
    			stock.Stocks -= info.Num
    			if result := tx.Model(&model.Stock{}).Select("Stocks", "Version").
    				Where("goods = ? AND version = ?", info.GoodsId, stock.Version).Updates(&model.Stock{Stocks: stock.Stocks, Version: stock.Version + 1}); result.RowsAffected == 0 {
    				// version 字段冲突;扣减失败
    				log.Println("库存扣减失败;重试")
    			} else {
    				// 库存扣减成功
    				log.Println("库存扣减成功")
    				break
    			}
    		}
    	}
    	tx.Commit()
    	return &emptypb.Empty{}, nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    client

    package main
    
    import (
    	"context"
    	"go-locks/optimistic-lock/proto"
    	"google.golang.org/grpc"
    	"google.golang.org/grpc/credentials/insecure"
    	"sync"
    )
    
    var client proto.StockClient
    
    func main() {
    	conn, err := grpc.Dial("127.0.0.1:8088", grpc.WithTransportCredentials(insecure.NewCredentials()))
    	if err != nil {
    		panic(err)
    	}
    	client = proto.NewStockClient(conn)
    	var wg sync.WaitGroup
    	wg.Add(20)
    	for i := 0; i < 20; i++ {
    		go TestSellStock(&wg)
    	}
    	wg.Wait()
    }
    
    func TestSellStock(wg *sync.WaitGroup) {
    	defer wg.Done()
    	_, err := client.SellStock(context.Background(), &proto.StockRequest{
    		StockInfos: []*proto.StockInfo{
    			{
    				GoodsId: "123459",
    				Num:     1,
    			},
    		},
    	})
    	if err != nil {
    		panic(err)
    	}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    测试

    在这里插入图片描述

    redis分布式锁

    service

    package service
    
    import (
    	context "context"
    	"fmt"
    	"go-locks/redis-lock/db"
    	"go-locks/redis-lock/model"
    	"go-locks/redis-lock/proto"
    	"go-locks/redis-lock/redis"
    	"google.golang.org/grpc/codes"
    	"google.golang.org/grpc/status"
    	"google.golang.org/protobuf/types/known/emptypb"
    )
    
    type Server struct{}
    
    func (s Server) SellStock(ctx context.Context, request *proto.StockRequest) (*emptypb.Empty, error) {
    	tx := db.DB.Begin()
    	for _, info := range request.StockInfos {
    		var stock model.Stock
    		// 使用redis分布式锁,仅对当前商品进行加锁,不会影响其他商品
    		mutex := redis.Redsy.NewMutex(fmt.Sprintf("goods_%s", info.GoodsId))
    		if err := mutex.Lock(); err != nil {
    			return nil, status.Error(codes.Internal, "获取redis分布式锁异常")
    		}
    		if result := tx.Where(&model.Stock{Goods: info.GoodsId}).First(&stock); result.RowsAffected == 0 {
    			return nil, status.Error(codes.NotFound, "商品信息不存在")
    		}
    		if stock.Stocks < info.Num {
    			// 库存不足 回滚事务
    			tx.Rollback()
    			return nil, status.Error(codes.ResourceExhausted, "库存不足")
    		}
    		stock.Stocks -= info.Num
    		tx.Save(stock)
    		if ok, err := mutex.Unlock(); !ok || err != nil {
    			return nil, status.Error(codes.Internal, "释放redis分布式锁异常")
    		}
    	}
    	tx.Commit()
    	return &emptypb.Empty{}, nil
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    client

    package main
    
    import (
    	"context"
    	"go-locks/redis-lock/proto"
    	"google.golang.org/grpc"
    	"google.golang.org/grpc/credentials/insecure"
    	"sync"
    )
    
    var client proto.StockClient
    
    func main() {
    	conn, err := grpc.Dial("127.0.0.1:8088", grpc.WithTransportCredentials(insecure.NewCredentials()))
    	if err != nil {
    		panic(err)
    	}
    	client = proto.NewStockClient(conn)
    	var wg sync.WaitGroup
    	wg.Add(20)
    	for i := 0; i < 20; i++ {
    		go TestSellStock(&wg)
    	}
    	wg.Wait()
    }
    
    func TestSellStock(wg *sync.WaitGroup) {
    	defer wg.Done()
    	_, err := client.SellStock(context.Background(), &proto.StockRequest{
    		StockInfos: []*proto.StockInfo{
    			{
    				GoodsId: "123460",
    				Num:     1,
    			},
    		},
    	})
    	if err != nil {
    		panic(err)
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    测试

    在这里插入图片描述

    总结

    • 无锁: 无锁即使在单机情况下也会出问题,不建议使用
    • 进程锁: 进程锁仅在单机情况下安全,性能存在瓶颈
    • mysql悲观锁: 分布式环境下安全.比较适合写入操作比较频繁的场景,如果出现大量的读取操作,每次读取的时候都会进行加锁,这样会增加大量的锁的开销,降低了系统的吞吐量.在特殊情况下还会升级成表锁,分布式环境下安全,但是性能依然存在瓶颈
    • mysql乐观锁: 分布式环境下安全,比较适合读取操作比较频繁的场景,如果出现大量的写入操作,数据发生冲突的可能性就会增大,为了保证数据的一致性,应用层需要不断的重新获取数据,这样会增加大量的查询操作,降低了系统的吞吐量.
    • redis分布式锁: 分布式环境下安全,并且redis有很好的性能,而且可以对单个商品进行加锁,只会阻塞住对同一商品的请求,并不会阻塞所有请求,大大提升了吞吐量
  • 相关阅读:
    Layui快速入门之第九节 表格事件的使用
    前端(二十七)——封装指南:Axios接口、常用功能、Vue和React中的封装技术
    Sui基金会与沙迦美国大学宣布合作开设区块链学院
    docker部署springboot项目到服务器
    Davinci Developer Classic SWC新建port并连接非complete port方式
    k8s--基础--23.6--认证-授权-准入控制--通过kubeconfig登陆dashboard
    前端周刊第十八期
    如何保证Redis的HA高可用
    GB28181的主动、被动的含义
    Vidmore Screen Recorder 1.1.62 学习
  • 原文地址:https://blog.csdn.net/qq_43135259/article/details/125508990