type OrderListener struct {
Code codes.Code
Detail string
ID int32
OrderAmount float32
Ctx context.Context
}
func (o *OrderListener) ExecuteLocalTransaction(msg *primitive.Message) primitive.LocalTransactionState {
var orderInfo model.OrderInfo
_ = json.Unmarshal(msg.Body, &orderInfo)
var goodsIds []int32
var shopCarts []model.ShoppingCart
goodsNumsMap := make(map[int32]int32)
if result := global.DB.Where(&model.ShoppingCart{User: orderInfo.User, Checked: true}).Find(&shopCarts); result.RowsAffected == 0 {
o.Code = codes.InvalidArgument
o.Detail = "没有选中结算的商品"
return primitive.RollbackMessageState
}
for _, shopCarts := range shopCarts {
goodsIds = append(goodsIds, shopCarts.Goods)
goodsNumsMap[shopCarts.Goods] = shopCarts.Nums
}
//跨服务调用 - 商品微服务 —— 批量查询商品与价格
goods, err := global.GoodsSrvClient.BatchGetGoods(context.Background(), &proto.BatchGoodsIdInfo{Id: goodsIds})
if err != nil {
o.Code = codes.Internal
o.Detail = "批量查询商品信息失败"
return primitive.RollbackMessageState
}
var orderAmount float32
var orderGoods []*model.OrderGoods
var goodsInvInfo []*proto.GoodsInvInfo
for _, good := range goods.Data {
orderAmount += good.ShopPrice * float32(goodsNumsMap[good.Id])
orderGoods = append(orderGoods, &model.OrderGoods{
Goods: good.Id,
GoodsName: good.Name,
GoodsImage: good.GoodsFrontImage,
GoodsPrice: good.ShopPrice,
Nums: goodsNumsMap[good.Id],
})
goodsInvInfo = append(goodsInvInfo, &proto.GoodsInvInfo{
GoodsId: good.Id,
Num: goodsNumsMap[good.Id],
})
}
//跨服务调用 - 库存微服务 —— 扣减库存
if _, err = global.InventorySrvClient.Sell(context.Background(), &proto.SellInfo{OrderSn: orderInfo.OrderSn, GoodsInfo: goodsInvInfo}); err != nil {
//如果是因为网络问题, 这种如何避免误判, 大家自己改写一下sell的返回逻辑
o.Code = codes.ResourceExhausted
o.Detail = "扣减库存失败"
return primitive.RollbackMessageState
}
tx := global.DB.Begin()
//生成订单表
orderInfo.OrderMount = orderAmount
if result := tx.Save(&orderInfo); result.RowsAffected == 0 {
tx.Rollback()
o.Code = codes.Internal
o.Detail = "创建订单失败"
return primitive.CommitMessageState
}
o.OrderAmount = orderAmount
o.ID = orderInfo.ID
for _, orderGood := range orderGoods {
orderGood.Order = orderInfo.ID
}
// 批量插入orderGoods
if result := tx.CreateInBatches(orderGoods, 100); result.RowsAffected == 0 {
tx.Rollback()
o.Code = codes.Internal
o.Detail = "批量插入订单商品失败"
return primitive.CommitMessageState
}
// 删除购物车的记录
if result := tx.Where(&model.ShoppingCart{User: orderInfo.User, Checked: true}).Delete(&model.ShoppingCart{}); result.RowsAffected == 0 {
tx.Rollback()
o.Code = codes.Internal
o.Detail = "删除购物车记录失败"
return primitive.CommitMessageState
}
//提交事务
tx.Commit()
o.Code = codes.OK
return primitive.RollbackMessageState
}
func (o *OrderListener) CheckLocalTransaction(msg *primitive.MessageExt) primitive.LocalTransactionState {
var orderInfo model.OrderInfo
_ = json.Unmarshal(msg.Body, &orderInfo)
//怎么检查之前的逻辑是否完成
if result := global.DB.Where(model.OrderInfo{OrderSn: orderInfo.OrderSn}).First(&orderInfo); result.RowsAffected == 0 {
return primitive.CommitMessageState //你并不能说明这里就是库存已经扣减了
}
return primitive.RollbackMessageState
}
func (*OrderServer) CreateOrder(ctx context.Context, req *proto.OrderRequest) (*proto.OrderInfoResponse, error) {
/*
新建订单
1. 从购物车中获取到选中的商品
2. 商品的价格自己查询 - 访问商品服务 (跨微服务)
3. 库存的扣减 - 访问库存服务 (跨微服务)
4. 订单的基本信息表 - 订单的商品信息表
5. 从购物车中删除已购买的记录
*/
orderListener := OrderListener{Ctx: ctx}
p, err := rocketmq.NewTransactionProducer(
&orderListener,
producer.WithNameServer([]string{"192.168.124.51:9876"}),
)
if err != nil {
zap.S().Errorf("生成producer失败: %s", err.Error())
return nil, err
}
if err = p.Start(); err != nil {
zap.S().Errorf("启动producer失败: %s", err.Error())
return nil, err
}
order := model.OrderInfo{
OrderSn: GenerateOrderSn(req.UserId),
Address: req.Address,
SignerName: req.Name,
SingerMobile: req.Mobile,
Post: req.Post,
User: req.UserId,
}
jsonString, _ := json.Marshal(order)
_, err = p.SendMessageInTransaction(context.Background(),
primitive.NewMessage("order_reback", jsonString))
if err != nil {
fmt.Printf("发送失败: %s\n", err)
return nil, status.Error(codes.Internal, "发送消息失败")
}
if orderListener.Code != codes.OK {
return nil, status.Error(orderListener.Code, orderListener.Detail)
}
return &proto.OrderInfoResponse{Id: orderListener.ID, OrderSn: order.OrderSn, Total: orderListener.OrderAmount}, nil
}
package model
import (
"database/sql/driver"
"encoding/json"
)
type Inventory struct {
BaseModel
Goods int32 `gorm:"type:int;index"` // 商品id
Stocks int32 `gorm:"type:int"` // 库存
Version int32 `gorm:"type:int"` //分布式锁的乐观锁
}
type GoodsDetail struct {
Goods int32
Num int32
}
type GoodsDetailList []GoodsDetail
func (g GoodsDetailList) Value() (driver.Value, error) {
return json.Marshal(g)
}
// Scan 实现 sql.Scanner 接口,Scan 将 value 扫描至 Jsonb
func (g *GoodsDetailList) Scan(value interface{}) error {
return json.Unmarshal(value.([]byte), &g)
}
type StockSellDetail struct {
OrderSn string `gorm:"type:varchar(200);index:idx_order_sn,unique;"`
Status int32 `gorm:"type:varchar(200)"` //1 表示已扣减 2. 表示已归还
Detail GoodsDetailList `gorm:"type:varchar(200)"`
}
func (StockSellDetail) TableName() string {
return "stockselldetail"
}
//type InventoryHistory struct {
// user int32
// goods int32
// nums int32
// order int32
// status int32 //1. 表示库存是预扣减, 幂等性, 2. 表示已经支付
//}
package main
import (
"fmt"
"gorm.io/driver/mysql"
"gorm.io/gorm"
"gorm.io/gorm/logger"
"gorm.io/gorm/schema"
"log"
"nd/inventory_srv/global"
"nd/inventory_srv/initialize"
"nd/inventory_srv/model"
"os"
"time"
)
func main() {
initialize.InitConfig()
dsn := fmt.Sprintf("root:jiushi@tcp(%s:3306)/mxshop_inventory_srv?charset=utf8mb4&parseTime=True&loc=Local", global.ServerConfig.MysqlInfo.Host)
newLogger := logger.New(
log.New(os.Stdout, "\r\n", log.LstdFlags), // io writer
logger.Config{
SlowThreshold: time.Second, // 慢 SQL 阈值
LogLevel: logger.Info, // Log level
Colorful: true, // 禁用彩色打印
},
)
// 全局模式
db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{
NamingStrategy: schema.NamingStrategy{
SingularTable: true,
},
Logger: newLogger,
})
if err != nil {
panic(err)
}
_ = db.AutoMigrate(&model.Inventory{}, &model.StockSellDetail{})
//插入一条数据
orderDetail := model.StockSellDetail{
OrderSn: "imooc-bobby",
Status: 1,
Detail: []model.GoodsDetail{{1, 2}, {2, 3}},
}
db.Create(&orderDetail)
var sellDetail model.StockSellDetail
db.Where(model.StockSellDetail{OrderSn: "imooc-bobby"}).First(&sellDetail)
fmt.Println(sellDetail.Detail)
}
func (*InventoryServer) Sell(ctx context.Context, req *proto.SellInfo) (*emptypb.Empty, error) {
client := goredislib.NewClient(&goredislib.Options{
//Addr: "192.168.78.131:6379",
Addr: fmt.Sprintf("%s:%d", global.ServerConfig.RedisInfo.Host, global.ServerConfig.RedisInfo.Port),
})
pool := goredis.NewPool(client) // or, pool := redigo.NewPool(...)
rs := redsync.New(pool)
tx := global.DB.Begin()
//这个时候应该先查询表,然后确定这个订单是否已经扣减过库存了,已经扣减过了就别扣减了
//并发时候会有漏洞, 同一个时刻发送了重复了多次, 使用锁,分布式锁
sellDetail := model.StockSellDetail{
OrderSn: req.OrderSn,
Status: 1,
}
var details []model.GoodsDetail
for _, goodInfo := range req.GoodsInfo {
details = append(details, model.GoodsDetail{
Goods: goodInfo.GoodsId,
Num: goodInfo.Num,
})
var inv model.Inventory
mutex := rs.NewMutex(fmt.Sprintf("goods_%d", goodInfo.GoodsId))
if err := mutex.Lock(); err != nil {
return nil, status.Errorf(codes.Internal, "获取redis分布式锁异常")
}
if result := global.DB.Where(&model.Inventory{Goods: goodInfo.GoodsId}).First(&inv); result.RowsAffected == 0 {
tx.Rollback() //回滚之前的操作
return nil, status.Errorf(codes.InvalidArgument, "没有库存信息")
}
//判断库存是否充足
if inv.Stocks < goodInfo.Num {
tx.Rollback() //回滚之前的操作
return nil, status.Errorf(codes.ResourceExhausted, "库存不足")
}
//扣减, 会出现数据不一致的问题 - 锁,分布式锁
inv.Stocks -= goodInfo.Num
tx.Save(&inv)
if ok, err := mutex.Unlock(); !ok || err != nil {
return nil, status.Errorf(codes.Internal, "释放redis分布式锁异常")
}
}
sellDetail.Detail = details
//写selldetail表
if result := tx.Create(&sellDetail); result.RowsAffected == 0 {
tx.Rollback()
return nil, status.Errorf(codes.Internal, "保存库存扣减历史失败")
}
tx.Commit() // 需要自己手动提交操作
//m.Unlock() //释放锁
return &emptypb.Empty{}, nil
}
func AutoReback(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
type OrderInfo struct {
OrderSn string
}
for i := range msgs {
//既然是归还库存,那么我应该具体的知道每件商品应该归还多少, 但是有一个问题是什么?重复归还的问题
//所以说这个接口应该确保幂等性, 你不能因为消息的重复发送导致一个订单的库存归还多次, 没有扣减的库存你别归还
//如何确保这些都没有问题, 新建一张表, 这张表记录了详细的订单扣减细节,以及归还细节
var orderInfo OrderInfo
err := json.Unmarshal(msgs[i].Body, &orderInfo)
if err != nil {
zap.S().Errorf("解析json失败: %v\n", msgs[i].Body)
return consumer.ConsumeSuccess, nil
}
//将inv的库存加回去 将selldetail的status设置为2, 要在事务中进行
tx := global.DB.Begin()
var sellDetail model.StockSellDetail
if result := tx.Model(&model.StockSellDetail{}).Where(&model.StockSellDetail{OrderSn: orderInfo.OrderSn, Status: 1}).First(&sellDetail); result.RowsAffected == 0 {
return consumer.ConsumeSuccess, nil
}
//如果查询到那么逐个归还库存
for _, orderGood := range sellDetail.Detail {
//update怎么用
//先查询一下inventory表在, update语句的 update xx set stocks=stocks+2
if result := tx.Model(&model.Inventory{}).Where(&model.Inventory{Goods: orderGood.Goods}).Update("stocks", gorm.Expr("stocks+?", orderGood.Num)); result.RowsAffected == 0 {
tx.Rollback()
return consumer.ConsumeRetryLater, nil
}
}
if result := tx.Model(&model.StockSellDetail{}).Where(&model.StockSellDetail{OrderSn: orderInfo.OrderSn}).Update("status", 2); result.RowsAffected == 0 {
tx.Rollback()
return consumer.ConsumeRetryLater, nil
}
tx.Commit()
return consumer.ConsumeSuccess, nil
}
return consumer.ConsumeSuccess, nil
}
func main() {
//省略。。。
go func() {
err = server.Serve(lis)
if err != nil {
panic("failed to start grpc:" + err.Error())
}
}()
//监听库存归还topic
c, _ := rocketmq.NewPushConsumer(
consumer.WithNameServer([]string{"192.168.124.51:9876"}),
consumer.WithGroupName("mxshop-inventory"),
)
if err := c.Subscribe("order_reback", consumer.MessageSelector{}, handler.AutoReback); err != nil {
fmt.Println("读取消息失败")
}
//省略。。。
}
func (o *OrderListener) ExecuteLocalTransaction(msg *primitive.Message) primitive.LocalTransactionState {
var orderInfo model.OrderInfo
_ = json.Unmarshal(msg.Body, &orderInfo)
var goodsIds []int32
var shopCarts []model.ShoppingCart
goodsNumsMap := make(map[int32]int32)
if result := global.DB.Where(&model.ShoppingCart{User: orderInfo.User, Checked: true}).Find(&shopCarts); result.RowsAffected == 0 {
o.Code = codes.InvalidArgument
o.Detail = "没有选中结算的商品"
return primitive.RollbackMessageState
}
for _, shopCarts := range shopCarts {
goodsIds = append(goodsIds, shopCarts.Goods)
goodsNumsMap[shopCarts.Goods] = shopCarts.Nums
}
//跨服务调用 - 商品微服务 —— 批量查询商品与价格
goods, err := global.GoodsSrvClient.BatchGetGoods(context.Background(), &proto.BatchGoodsIdInfo{Id: goodsIds})
if err != nil {
o.Code = codes.Internal
o.Detail = "批量查询商品信息失败"
return primitive.RollbackMessageState
}
var orderAmount float32
var orderGoods []*model.OrderGoods
var goodsInvInfo []*proto.GoodsInvInfo
for _, good := range goods.Data {
orderAmount += good.ShopPrice * float32(goodsNumsMap[good.Id])
orderGoods = append(orderGoods, &model.OrderGoods{
Goods: good.Id,
GoodsName: good.Name,
GoodsImage: good.GoodsFrontImage,
GoodsPrice: good.ShopPrice,
Nums: goodsNumsMap[good.Id],
})
goodsInvInfo = append(goodsInvInfo, &proto.GoodsInvInfo{
GoodsId: good.Id,
Num: goodsNumsMap[good.Id],
})
}
//跨服务调用 - 库存微服务 —— 扣减库存
if _, err = global.InventorySrvClient.Sell(context.Background(), &proto.SellInfo{OrderSn: orderInfo.OrderSn, GoodsInfo: goodsInvInfo}); err != nil {
//如果是因为网络问题, 这种如何避免误判, 大家自己改写一下sell的返回逻辑
o.Code = codes.ResourceExhausted
o.Detail = "扣减库存失败"
return primitive.RollbackMessageState
}
tx := global.DB.Begin()
//生成订单表
orderInfo.OrderMount = orderAmount
if result := tx.Save(&orderInfo); result.RowsAffected == 0 {
tx.Rollback()
o.Code = codes.Internal
o.Detail = "创建订单失败"
return primitive.CommitMessageState
}
o.OrderAmount = orderAmount
o.ID = orderInfo.ID
for _, orderGood := range orderGoods {
orderGood.Order = orderInfo.ID
}
// 批量插入orderGoods
if result := tx.CreateInBatches(orderGoods, 100); result.RowsAffected == 0 {
tx.Rollback()
o.Code = codes.Internal
o.Detail = "批量插入订单商品失败"
return primitive.CommitMessageState
}
// 删除购物车的记录
if result := tx.Where(&model.ShoppingCart{User: orderInfo.User, Checked: true}).Delete(&model.ShoppingCart{}); result.RowsAffected == 0 {
tx.Rollback()
o.Code = codes.Internal
o.Detail = "删除购物车记录失败"
return primitive.CommitMessageState
}
//发送延时消息
p, err := rocketmq.NewProducer(producer.WithNameServer([]string{"192.168.0.104:9876"}))
if err != nil {
panic("生成producer失败")
}
//不要在一个进程中使用多个producer, 但是不要随便调用shutdown因为会影响其他的producer
if err = p.Start(); err != nil {
panic("启动producer失败")
}
msg = primitive.NewMessage("order_timeout", msg.Body)
msg.WithDelayTimeLevel(3)
_, err = p.SendSync(context.Background(), msg)
if err != nil {
zap.S().Errorf("发送延时消息失败: %v\n", err)
tx.Rollback()
o.Code = codes.Internal
o.Detail = "发送延时消息失败"
return primitive.CommitMessageState
}
//提交事务
tx.Commit()
o.Code = codes.OK
return primitive.RollbackMessageState
}
package main
import (
"flag"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"nd/order_srv/handler"
"net"
"os"
"os/signal"
"syscall"
"github.com/satori/go.uuid"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
"nd/order_srv/global"
"nd/order_srv/initialize"
"nd/order_srv/proto"
"nd/order_srv/utils"
"nd/order_srv/utils/register/consul"
)
func main() {
IP := flag.String("ip", "0.0.0.0", "ip地址")
Port := flag.Int("port", 50060, "端口号") // 这个修改为0,如果我们从命令行带参数启动的话就不会为0
//初始化
initialize.InitLogger()
initialize.InitConfig()
initialize.InitDB()
initialize.InitSrvConn()
zap.S().Info(global.ServerConfig)
flag.Parse()
zap.S().Info("ip: ", *IP)
if *Port == 0 {
*Port, _ = utils.GetFreePort()
}
zap.S().Info("port: ", *Port)
server := grpc.NewServer()
proto.RegisterOrderServer(server, &handler.OrderServer{})
lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", *IP, *Port))
if err != nil {
panic("failed to listen:" + err.Error())
}
//注册服务健康检查
grpc_health_v1.RegisterHealthServer(server, health.NewServer())
//服务注册
register_client := consul.NewRegistryClient(global.ServerConfig.ConsulInfo.Host, global.ServerConfig.ConsulInfo.Port)
serviceId := fmt.Sprintf("%s", uuid.NewV4())
err = register_client.Register(global.ServerConfig.Host, *Port, global.ServerConfig.Name, global.ServerConfig.Tags, serviceId)
if err != nil {
zap.S().Panic("服务注册失败:", err.Error())
}
zap.S().Debugf("启动服务器, 端口: %d", *Port)
go func() {
err = server.Serve(lis)
if err != nil {
panic("failed to start grpc:" + err.Error())
}
}()
//监听订单超时topic
c, _ := rocketmq.NewPushConsumer(
consumer.WithNameServer([]string{"192.168.124.51:9876"}),
consumer.WithGroupName("mxshop-order"),
)
if err := c.Subscribe("order_timeout", consumer.MessageSelector{}, handler.OrderTimeout); err != nil {
fmt.Println("读取消息失败")
}
_ = c.Start()
//不能让主goroutine退出
//接收终止信号
quit := make(chan os.Signal)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
if err = register_client.DeRegister(serviceId); err != nil {
zap.S().Info("注销失败:", err.Error())
} else {
zap.S().Info("注销成功:")
}
}
func OrderTimeout(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
for i := range msgs {
var orderInfo model.OrderInfo
_ = json.Unmarshal(msgs[i].Body, &orderInfo)
fmt.Printf("获取到订单超时消息: %v\n", time.Now())
//查询订单的支付状态,如果已支付什么都不做,如果未支付,归还库存
var order model.OrderInfo
if result := global.DB.Model(model.OrderInfo{}).Where(model.OrderInfo{OrderSn: orderInfo.OrderSn}).First(&order); result.RowsAffected == 0 {
return consumer.ConsumeSuccess, nil
}
if order.Status != "TRADE_SUCCESS" {
tx := global.DB.Begin()
//归还库存,我们可以模仿order中发送一个消息到 order_reback中去
//修改订单的状态为已支付
order.Status = "TRADE_CLOSED"
tx.Save(&order)
p, err := rocketmq.NewProducer(producer.WithNameServer([]string{"192.168.124.51:9876"}))
if err != nil {
panic("生成producer失败")
}
if err = p.Start(); err != nil {
panic("启动producer失败")
}
_, err = p.SendSync(context.Background(), primitive.NewMessage("order_reback", msgs[i].Body))
if err != nil {
tx.Rollback()
fmt.Printf("发送失败: %s\n", err)
return consumer.ConsumeRetryLater, nil
}
//if err = p.Shutdown(); err != nil {panic("关闭producer失败")}
return consumer.ConsumeSuccess, nil
}
}
return consumer.ConsumeSuccess, nil
}