• 33、订单-库存服务集成rocketmq


    一、库存服务的一致性

    • 库存服务的特殊性
      • 假设我们使用了rocketmq基于可靠消息的分布消息,订单服务发送给rocket的消息,需要保证消费者(库存服务)一定能消费
      • 对于非库存服务,如积分服务等是没有上限限制的,这个是没有问题的
      • 但是对于库存服务,这个比较特殊,库存的资源是有限的,一旦没有了库存就无法消费了;这个就必须要实现没有库存的时候通知生产者,否则rocketmq会一直产生消息而消费者库存服务又无法消费
    • 库存服务一致性解决方案:核心就是将half消息设计成归还的消息

    在这里插入图片描述


    二、订单服务优化

    • order_srv/handler/handle_order.go:新建订单逻辑优化
    type OrderListener struct {
    	Code        codes.Code
    	Detail      string
    	ID          int32
    	OrderAmount float32
    	Ctx         context.Context
    }
    
    func (o *OrderListener) ExecuteLocalTransaction(msg *primitive.Message) primitive.LocalTransactionState {
    	var orderInfo model.OrderInfo
    	_ = json.Unmarshal(msg.Body, &orderInfo)
    
    	var goodsIds []int32
    	var shopCarts []model.ShoppingCart
    	goodsNumsMap := make(map[int32]int32)
    	if result := global.DB.Where(&model.ShoppingCart{User: orderInfo.User, Checked: true}).Find(&shopCarts); result.RowsAffected == 0 {
    		o.Code = codes.InvalidArgument
    		o.Detail = "没有选中结算的商品"
    		return primitive.RollbackMessageState
    	}
    
    	for _, shopCarts := range shopCarts {
    		goodsIds = append(goodsIds, shopCarts.Goods)
    		goodsNumsMap[shopCarts.Goods] = shopCarts.Nums
    	}
    
    	//跨服务调用 - 商品微服务 —— 批量查询商品与价格
    	goods, err := global.GoodsSrvClient.BatchGetGoods(context.Background(), &proto.BatchGoodsIdInfo{Id: goodsIds})
    	if err != nil {
    		o.Code = codes.Internal
    		o.Detail = "批量查询商品信息失败"
    		return primitive.RollbackMessageState
    	}
    
    	var orderAmount float32
    	var orderGoods []*model.OrderGoods
    	var goodsInvInfo []*proto.GoodsInvInfo
    	for _, good := range goods.Data {
    		orderAmount += good.ShopPrice * float32(goodsNumsMap[good.Id])
    		orderGoods = append(orderGoods, &model.OrderGoods{
    			Goods:      good.Id,
    			GoodsName:  good.Name,
    			GoodsImage: good.GoodsFrontImage,
    			GoodsPrice: good.ShopPrice,
    			Nums:       goodsNumsMap[good.Id],
    		})
    		goodsInvInfo = append(goodsInvInfo, &proto.GoodsInvInfo{
    			GoodsId: good.Id,
    			Num:     goodsNumsMap[good.Id],
    		})
    	}
    	//跨服务调用 - 库存微服务 —— 扣减库存
    	if _, err = global.InventorySrvClient.Sell(context.Background(), &proto.SellInfo{OrderSn: orderInfo.OrderSn, GoodsInfo: goodsInvInfo}); err != nil {
    		//如果是因为网络问题, 这种如何避免误判, 大家自己改写一下sell的返回逻辑
    		o.Code = codes.ResourceExhausted
    		o.Detail = "扣减库存失败"
    		return primitive.RollbackMessageState
    	}
    
    	tx := global.DB.Begin()
    	//生成订单表
    	orderInfo.OrderMount = orderAmount
    	if result := tx.Save(&orderInfo); result.RowsAffected == 0 {
    		tx.Rollback()
    		o.Code = codes.Internal
    		o.Detail = "创建订单失败"
    		return primitive.CommitMessageState
    	}
    
    	o.OrderAmount = orderAmount
    	o.ID = orderInfo.ID
    	for _, orderGood := range orderGoods {
    		orderGood.Order = orderInfo.ID
    	}
    
    	// 批量插入orderGoods
    	if result := tx.CreateInBatches(orderGoods, 100); result.RowsAffected == 0 {
    		tx.Rollback()
    		o.Code = codes.Internal
    		o.Detail = "批量插入订单商品失败"
    		return primitive.CommitMessageState
    	}
    
    	// 删除购物车的记录
    	if result := tx.Where(&model.ShoppingCart{User: orderInfo.User, Checked: true}).Delete(&model.ShoppingCart{}); result.RowsAffected == 0 {
    		tx.Rollback()
    		o.Code = codes.Internal
    		o.Detail = "删除购物车记录失败"
    		return primitive.CommitMessageState
    	}
    
    	//提交事务
    	tx.Commit()
    	o.Code = codes.OK
    	return primitive.RollbackMessageState
    }
    
    func (o *OrderListener) CheckLocalTransaction(msg *primitive.MessageExt) primitive.LocalTransactionState {
    	var orderInfo model.OrderInfo
    	_ = json.Unmarshal(msg.Body, &orderInfo)
    
    	//怎么检查之前的逻辑是否完成
    	if result := global.DB.Where(model.OrderInfo{OrderSn: orderInfo.OrderSn}).First(&orderInfo); result.RowsAffected == 0 {
    		return primitive.CommitMessageState //你并不能说明这里就是库存已经扣减了
    	}
    
    	return primitive.RollbackMessageState
    }
    
    func (*OrderServer) CreateOrder(ctx context.Context, req *proto.OrderRequest) (*proto.OrderInfoResponse, error) {
    	/*
    		新建订单
    			1. 从购物车中获取到选中的商品
    			2. 商品的价格自己查询 - 访问商品服务 (跨微服务)
    			3. 库存的扣减 - 访问库存服务 (跨微服务)
    			4. 订单的基本信息表 - 订单的商品信息表
    			5. 从购物车中删除已购买的记录
    	*/
    	orderListener := OrderListener{Ctx: ctx}
    	p, err := rocketmq.NewTransactionProducer(
    		&orderListener,
    		producer.WithNameServer([]string{"192.168.124.51:9876"}),
    	)
    	if err != nil {
    		zap.S().Errorf("生成producer失败: %s", err.Error())
    		return nil, err
    	}
    	if err = p.Start(); err != nil {
    		zap.S().Errorf("启动producer失败: %s", err.Error())
    		return nil, err
    	}
    
    	order := model.OrderInfo{
    		OrderSn:      GenerateOrderSn(req.UserId),
    		Address:      req.Address,
    		SignerName:   req.Name,
    		SingerMobile: req.Mobile,
    		Post:         req.Post,
    		User:         req.UserId,
    	}
    
    	jsonString, _ := json.Marshal(order)
    
    	_, err = p.SendMessageInTransaction(context.Background(),
    		primitive.NewMessage("order_reback", jsonString))
    	if err != nil {
    		fmt.Printf("发送失败: %s\n", err)
    		return nil, status.Error(codes.Internal, "发送消息失败")
    	}
    
    	if orderListener.Code != codes.OK {
    		return nil, status.Error(orderListener.Code, orderListener.Detail)
    	}
    
    	return &proto.OrderInfoResponse{Id: orderListener.ID, OrderSn: order.OrderSn, Total: orderListener.OrderAmount}, nil
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157

    三、库存服务优化

    1 - 库存服务表结构

    • inventory_srv/handler/inventory.go
    package model
    
    import (
    	"database/sql/driver"
    	"encoding/json"
    )
    
    type Inventory struct {
    	BaseModel
    	Goods   int32 `gorm:"type:int;index"` // 商品id
    	Stocks  int32 `gorm:"type:int"`       // 库存
    	Version int32 `gorm:"type:int"`       //分布式锁的乐观锁
    }
    
    type GoodsDetail struct {
    	Goods int32
    	Num   int32
    }
    
    type GoodsDetailList []GoodsDetail
    
    func (g GoodsDetailList) Value() (driver.Value, error) {
    	return json.Marshal(g)
    }
    
    // Scan 实现 sql.Scanner 接口,Scan 将 value 扫描至 Jsonb
    func (g *GoodsDetailList) Scan(value interface{}) error {
    	return json.Unmarshal(value.([]byte), &g)
    }
    
    type StockSellDetail struct {
    	OrderSn string          `gorm:"type:varchar(200);index:idx_order_sn,unique;"`
    	Status  int32           `gorm:"type:varchar(200)"` //1 表示已扣减 2. 表示已归还
    	Detail  GoodsDetailList `gorm:"type:varchar(200)"`
    }
    
    func (StockSellDetail) TableName() string {
    	return "stockselldetail"
    }
    
    //type InventoryHistory struct {
    //	user int32
    //	goods int32
    //	nums int32
    //	order int32
    //	status int32 //1. 表示库存是预扣减, 幂等性, 2. 表示已经支付
    //}
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • inventory_srv/model/main/main.go:生成表结构 StockSellDetail,插入数据并查询数据
    package main
    
    import (
    	"fmt"
    	"gorm.io/driver/mysql"
    	"gorm.io/gorm"
    	"gorm.io/gorm/logger"
    	"gorm.io/gorm/schema"
    	"log"
    	"nd/inventory_srv/global"
    	"nd/inventory_srv/initialize"
    	"nd/inventory_srv/model"
    	"os"
    	"time"
    )
    
    func main() {
    	initialize.InitConfig()
    	dsn := fmt.Sprintf("root:jiushi@tcp(%s:3306)/mxshop_inventory_srv?charset=utf8mb4&parseTime=True&loc=Local", global.ServerConfig.MysqlInfo.Host)
    
    	newLogger := logger.New(
    		log.New(os.Stdout, "\r\n", log.LstdFlags), // io writer
    		logger.Config{
    			SlowThreshold: time.Second, // 慢 SQL 阈值
    			LogLevel:      logger.Info, // Log level
    			Colorful:      true,        // 禁用彩色打印
    		},
    	)
    
    	// 全局模式
    	db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{
    		NamingStrategy: schema.NamingStrategy{
    			SingularTable: true,
    		},
    		Logger: newLogger,
    	})
    	if err != nil {
    		panic(err)
    	}
    
    	_ = db.AutoMigrate(&model.Inventory{}, &model.StockSellDetail{})
    	//插入一条数据
    	orderDetail := model.StockSellDetail{
    		OrderSn: "imooc-bobby",
    		Status:  1,
    		Detail:  []model.GoodsDetail{{1, 2}, {2, 3}},
    	}
    	db.Create(&orderDetail)
    
    	var sellDetail model.StockSellDetail
    	db.Where(model.StockSellDetail{OrderSn: "imooc-bobby"}).First(&sellDetail)
    	fmt.Println(sellDetail.Detail)
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54

    在这里插入图片描述
    在这里插入图片描述

    2 - 库存服务sell接口

    • inventory_srv/handler/inventory.go:插入selldetail表数据
    func (*InventoryServer) Sell(ctx context.Context, req *proto.SellInfo) (*emptypb.Empty, error) {
    	client := goredislib.NewClient(&goredislib.Options{
    		//Addr: "192.168.78.131:6379",
    		Addr: fmt.Sprintf("%s:%d", global.ServerConfig.RedisInfo.Host, global.ServerConfig.RedisInfo.Port),
    	})
    	pool := goredis.NewPool(client) // or, pool := redigo.NewPool(...)
    	rs := redsync.New(pool)
    
    	tx := global.DB.Begin()
    
    	//这个时候应该先查询表,然后确定这个订单是否已经扣减过库存了,已经扣减过了就别扣减了
    	//并发时候会有漏洞, 同一个时刻发送了重复了多次, 使用锁,分布式锁
    	sellDetail := model.StockSellDetail{
    		OrderSn: req.OrderSn,
    		Status:  1,
    	}
    	var details []model.GoodsDetail
    
    	for _, goodInfo := range req.GoodsInfo {
    		details = append(details, model.GoodsDetail{
    			Goods: goodInfo.GoodsId,
    			Num:   goodInfo.Num,
    		})
    
    		var inv model.Inventory
    		mutex := rs.NewMutex(fmt.Sprintf("goods_%d", goodInfo.GoodsId))
    		if err := mutex.Lock(); err != nil {
    			return nil, status.Errorf(codes.Internal, "获取redis分布式锁异常")
    		}
    		if result := global.DB.Where(&model.Inventory{Goods: goodInfo.GoodsId}).First(&inv); result.RowsAffected == 0 {
    			tx.Rollback() //回滚之前的操作
    			return nil, status.Errorf(codes.InvalidArgument, "没有库存信息")
    		}
    		//判断库存是否充足
    		if inv.Stocks < goodInfo.Num {
    			tx.Rollback() //回滚之前的操作
    			return nil, status.Errorf(codes.ResourceExhausted, "库存不足")
    		}
    		//扣减, 会出现数据不一致的问题 - 锁,分布式锁
    		inv.Stocks -= goodInfo.Num
    		tx.Save(&inv)
    
    		if ok, err := mutex.Unlock(); !ok || err != nil {
    			return nil, status.Errorf(codes.Internal, "释放redis分布式锁异常")
    		}
    	}
    	sellDetail.Detail = details
    	//写selldetail表
    	if result := tx.Create(&sellDetail); result.RowsAffected == 0 {
    		tx.Rollback()
    		return nil, status.Errorf(codes.Internal, "保存库存扣减历史失败")
    	}
    	tx.Commit() // 需要自己手动提交操作
    	//m.Unlock() //释放锁
    	return &emptypb.Empty{}, nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56

    3 - 库存订阅rocketmq消息

    • inventory_srv/model/inventory.go:实现归还业务逻辑
    func AutoReback(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
    	type OrderInfo struct {
    		OrderSn string
    	}
    	for i := range msgs {
    		//既然是归还库存,那么我应该具体的知道每件商品应该归还多少, 但是有一个问题是什么?重复归还的问题
    		//所以说这个接口应该确保幂等性, 你不能因为消息的重复发送导致一个订单的库存归还多次, 没有扣减的库存你别归还
    		//如何确保这些都没有问题, 新建一张表, 这张表记录了详细的订单扣减细节,以及归还细节
    		var orderInfo OrderInfo
    		err := json.Unmarshal(msgs[i].Body, &orderInfo)
    		if err != nil {
    			zap.S().Errorf("解析json失败: %v\n", msgs[i].Body)
    			return consumer.ConsumeSuccess, nil
    		}
    
    		//将inv的库存加回去 将selldetail的status设置为2, 要在事务中进行
    		tx := global.DB.Begin()
    		var sellDetail model.StockSellDetail
    		if result := tx.Model(&model.StockSellDetail{}).Where(&model.StockSellDetail{OrderSn: orderInfo.OrderSn, Status: 1}).First(&sellDetail); result.RowsAffected == 0 {
    			return consumer.ConsumeSuccess, nil
    		}
    		//如果查询到那么逐个归还库存
    		for _, orderGood := range sellDetail.Detail {
    			//update怎么用
    			//先查询一下inventory表在, update语句的 update xx set stocks=stocks+2
    			if result := tx.Model(&model.Inventory{}).Where(&model.Inventory{Goods: orderGood.Goods}).Update("stocks", gorm.Expr("stocks+?", orderGood.Num)); result.RowsAffected == 0 {
    				tx.Rollback()
    				return consumer.ConsumeRetryLater, nil
    			}
    		}
    
    		if result := tx.Model(&model.StockSellDetail{}).Where(&model.StockSellDetail{OrderSn: orderInfo.OrderSn}).Update("status", 2); result.RowsAffected == 0 {
    			tx.Rollback()
    			return consumer.ConsumeRetryLater, nil
    		}
    		tx.Commit()
    		return consumer.ConsumeSuccess, nil
    	}
    	return consumer.ConsumeSuccess, nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • inventory_srv/main.go:监听库存归还topic
    func main() {
    	//省略。。。
    
    	go func() {
    		err = server.Serve(lis)
    		if err != nil {
    			panic("failed to start grpc:" + err.Error())
    		}
    	}()
    
    	//监听库存归还topic
    	c, _ := rocketmq.NewPushConsumer(
    		consumer.WithNameServer([]string{"192.168.124.51:9876"}),
    		consumer.WithGroupName("mxshop-inventory"),
    	)
    
    	if err := c.Subscribe("order_reback", consumer.MessageSelector{}, handler.AutoReback); err != nil {
    		fmt.Println("读取消息失败")
    	}
    	//省略。。。
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    4 - 订单超时归还

    • order_srv/handler/handle_order.go:ExecuteLocalTransaction中发送延迟消息
    func (o *OrderListener) ExecuteLocalTransaction(msg *primitive.Message) primitive.LocalTransactionState {
    	var orderInfo model.OrderInfo
    	_ = json.Unmarshal(msg.Body, &orderInfo)
    
    	var goodsIds []int32
    	var shopCarts []model.ShoppingCart
    	goodsNumsMap := make(map[int32]int32)
    	if result := global.DB.Where(&model.ShoppingCart{User: orderInfo.User, Checked: true}).Find(&shopCarts); result.RowsAffected == 0 {
    		o.Code = codes.InvalidArgument
    		o.Detail = "没有选中结算的商品"
    		return primitive.RollbackMessageState
    	}
    
    	for _, shopCarts := range shopCarts {
    		goodsIds = append(goodsIds, shopCarts.Goods)
    		goodsNumsMap[shopCarts.Goods] = shopCarts.Nums
    	}
    
    	//跨服务调用 - 商品微服务 —— 批量查询商品与价格
    	goods, err := global.GoodsSrvClient.BatchGetGoods(context.Background(), &proto.BatchGoodsIdInfo{Id: goodsIds})
    	if err != nil {
    		o.Code = codes.Internal
    		o.Detail = "批量查询商品信息失败"
    		return primitive.RollbackMessageState
    	}
    
    	var orderAmount float32
    	var orderGoods []*model.OrderGoods
    	var goodsInvInfo []*proto.GoodsInvInfo
    	for _, good := range goods.Data {
    		orderAmount += good.ShopPrice * float32(goodsNumsMap[good.Id])
    		orderGoods = append(orderGoods, &model.OrderGoods{
    			Goods:      good.Id,
    			GoodsName:  good.Name,
    			GoodsImage: good.GoodsFrontImage,
    			GoodsPrice: good.ShopPrice,
    			Nums:       goodsNumsMap[good.Id],
    		})
    		goodsInvInfo = append(goodsInvInfo, &proto.GoodsInvInfo{
    			GoodsId: good.Id,
    			Num:     goodsNumsMap[good.Id],
    		})
    	}
    	//跨服务调用 - 库存微服务 —— 扣减库存
    	if _, err = global.InventorySrvClient.Sell(context.Background(), &proto.SellInfo{OrderSn: orderInfo.OrderSn, GoodsInfo: goodsInvInfo}); err != nil {
    		//如果是因为网络问题, 这种如何避免误判, 大家自己改写一下sell的返回逻辑
    		o.Code = codes.ResourceExhausted
    		o.Detail = "扣减库存失败"
    		return primitive.RollbackMessageState
    	}
    
    	tx := global.DB.Begin()
    	//生成订单表
    	orderInfo.OrderMount = orderAmount
    	if result := tx.Save(&orderInfo); result.RowsAffected == 0 {
    		tx.Rollback()
    		o.Code = codes.Internal
    		o.Detail = "创建订单失败"
    		return primitive.CommitMessageState
    	}
    
    	o.OrderAmount = orderAmount
    	o.ID = orderInfo.ID
    	for _, orderGood := range orderGoods {
    		orderGood.Order = orderInfo.ID
    	}
    
    	// 批量插入orderGoods
    	if result := tx.CreateInBatches(orderGoods, 100); result.RowsAffected == 0 {
    		tx.Rollback()
    		o.Code = codes.Internal
    		o.Detail = "批量插入订单商品失败"
    		return primitive.CommitMessageState
    	}
    
    	// 删除购物车的记录
    	if result := tx.Where(&model.ShoppingCart{User: orderInfo.User, Checked: true}).Delete(&model.ShoppingCart{}); result.RowsAffected == 0 {
    		tx.Rollback()
    		o.Code = codes.Internal
    		o.Detail = "删除购物车记录失败"
    		return primitive.CommitMessageState
    	}
    
    	//发送延时消息
    	p, err := rocketmq.NewProducer(producer.WithNameServer([]string{"192.168.0.104:9876"}))
    	if err != nil {
    		panic("生成producer失败")
    	}
    
    	//不要在一个进程中使用多个producer, 但是不要随便调用shutdown因为会影响其他的producer
    	if err = p.Start(); err != nil {
    		panic("启动producer失败")
    	}
    
    	msg = primitive.NewMessage("order_timeout", msg.Body)
    	msg.WithDelayTimeLevel(3)
    	_, err = p.SendSync(context.Background(), msg)
    	if err != nil {
    		zap.S().Errorf("发送延时消息失败: %v\n", err)
    		tx.Rollback()
    		o.Code = codes.Internal
    		o.Detail = "发送延时消息失败"
    		return primitive.CommitMessageState
    	}
    
    	//提交事务
    	tx.Commit()
    	o.Code = codes.OK
    	return primitive.RollbackMessageState
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • order_srv/main.go:监听超时topic
    package main
    
    import (
    	"flag"
    	"fmt"
    	"github.com/apache/rocketmq-client-go/v2"
    	"github.com/apache/rocketmq-client-go/v2/consumer"
    	"nd/order_srv/handler"
    	"net"
    	"os"
    	"os/signal"
    	"syscall"
    
    	"github.com/satori/go.uuid"
    	"go.uber.org/zap"
    	"google.golang.org/grpc"
    	"google.golang.org/grpc/health"
    	"google.golang.org/grpc/health/grpc_health_v1"
    
    	"nd/order_srv/global"
    	"nd/order_srv/initialize"
    	"nd/order_srv/proto"
    	"nd/order_srv/utils"
    	"nd/order_srv/utils/register/consul"
    )
    
    func main() {
    	IP := flag.String("ip", "0.0.0.0", "ip地址")
    	Port := flag.Int("port", 50060, "端口号") // 这个修改为0,如果我们从命令行带参数启动的话就不会为0
    
    	//初始化
    	initialize.InitLogger()
    	initialize.InitConfig()
    	initialize.InitDB()
    	initialize.InitSrvConn()
    	zap.S().Info(global.ServerConfig)
    
    	flag.Parse()
    	zap.S().Info("ip: ", *IP)
    	if *Port == 0 {
    		*Port, _ = utils.GetFreePort()
    	}
    	zap.S().Info("port: ", *Port)
    
    	server := grpc.NewServer()
    	proto.RegisterOrderServer(server, &handler.OrderServer{})
    	lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", *IP, *Port))
    	if err != nil {
    		panic("failed to listen:" + err.Error())
    	}
    
    	//注册服务健康检查
    	grpc_health_v1.RegisterHealthServer(server, health.NewServer())
    
    	//服务注册
    	register_client := consul.NewRegistryClient(global.ServerConfig.ConsulInfo.Host, global.ServerConfig.ConsulInfo.Port)
    	serviceId := fmt.Sprintf("%s", uuid.NewV4())
    	err = register_client.Register(global.ServerConfig.Host, *Port, global.ServerConfig.Name, global.ServerConfig.Tags, serviceId)
    	if err != nil {
    		zap.S().Panic("服务注册失败:", err.Error())
    	}
    	zap.S().Debugf("启动服务器, 端口: %d", *Port)
    
    	go func() {
    		err = server.Serve(lis)
    		if err != nil {
    			panic("failed to start grpc:" + err.Error())
    		}
    	}()
    
    	//监听订单超时topic
    	c, _ := rocketmq.NewPushConsumer(
    		consumer.WithNameServer([]string{"192.168.124.51:9876"}),
    		consumer.WithGroupName("mxshop-order"),
    	)
    
    	if err := c.Subscribe("order_timeout", consumer.MessageSelector{}, handler.OrderTimeout); err != nil {
    		fmt.Println("读取消息失败")
    	}
    	_ = c.Start()
    	//不能让主goroutine退出
    
    	//接收终止信号
    	quit := make(chan os.Signal)
    	signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
    	<-quit
    	if err = register_client.DeRegister(serviceId); err != nil {
    		zap.S().Info("注销失败:", err.Error())
    	} else {
    		zap.S().Info("注销成功:")
    	}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • order_srv/handler/handle_order.go:OrderTimeout实现超时业务逻辑
    func OrderTimeout(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
    	for i := range msgs {
    		var orderInfo model.OrderInfo
    		_ = json.Unmarshal(msgs[i].Body, &orderInfo)
    
    		fmt.Printf("获取到订单超时消息: %v\n", time.Now())
    		//查询订单的支付状态,如果已支付什么都不做,如果未支付,归还库存
    		var order model.OrderInfo
    		if result := global.DB.Model(model.OrderInfo{}).Where(model.OrderInfo{OrderSn: orderInfo.OrderSn}).First(&order); result.RowsAffected == 0 {
    			return consumer.ConsumeSuccess, nil
    		}
    		if order.Status != "TRADE_SUCCESS" {
    			tx := global.DB.Begin()
    			//归还库存,我们可以模仿order中发送一个消息到 order_reback中去
    			//修改订单的状态为已支付
    			order.Status = "TRADE_CLOSED"
    			tx.Save(&order)
    
    			p, err := rocketmq.NewProducer(producer.WithNameServer([]string{"192.168.124.51:9876"}))
    			if err != nil {
    				panic("生成producer失败")
    			}
    
    			if err = p.Start(); err != nil {
    				panic("启动producer失败")
    			}
    
    			_, err = p.SendSync(context.Background(), primitive.NewMessage("order_reback", msgs[i].Body))
    			if err != nil {
    				tx.Rollback()
    				fmt.Printf("发送失败: %s\n", err)
    				return consumer.ConsumeRetryLater, nil
    			}
    
    			//if err = p.Shutdown(); err != nil {panic("关闭producer失败")}
    			return consumer.ConsumeSuccess, nil
    		}
    	}
    	return consumer.ConsumeSuccess, nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    5 - rocketmq中shutdown的坑

    • rocketmq源码分析

    在这里插入图片描述


    四、完整源码

    • 完整源码下载mxshop_srvsV12.0rar
    • 源码说明:(nacos的ip配置自行修改,全局变量DEV_CONFIG设置:1=zsz,2=comp,3=home)
      • goods_srv/model/sql/mxshop_goods.sql:包含了建表语句
      • other_import/api.json:YApi的导入文件
      • other_import/nacos_config_export_user.zip:nacos的user配置集导入文件
      • other_import/nacos_config_export_goods.zip:nacos的goods配置集导入文件
      • other_import/nacos_config_export_inventory.zip:nacos的inventory的配置导入文件
      • other_import/nacos_config_export_orders.zip:nacos的orders的配置导入文件
      • other_import/nacos_config_export_userop.zip:nacos的userop的配置导入文件
      • other_import/install.zip:rocketmq的docker-compose安装文件
  • 相关阅读:
    SQL优化策略
    PIL(Python Imaging Library)图像处理库教程
    算法设计与分析期末复习题
    谷歌最新版本下载最新驱动网址chrome driver Version: 122.0.6261.111
    算法模型总结:哈希
    JAVA计算机毕业设计食品点评及售卖系统源码+系统+mysql数据库+lw文档
    Trie树(字典树)C++详解
    Executor接口实现线程池管理
    《web课程设计》用HTML CSS做一个简洁、漂亮的个人博客网站
    22-07-05 七牛云存储图片、用户头像上传
  • 原文地址:https://blog.csdn.net/qq23001186/article/details/126339133