序言

我们通过一个系列文章跟大家详细展示一个 go-zero 微服务示例,整个系列分十篇文章,目录结构如下:

  1. 分布式事务(本文)

期望通过本系列带你在本机利用 Docker 环境利用 go-zero 快速开发一个商城系统,让你快速上手微服务。

首先,我们来看一下整体的服务拆分图:

禁止双击图片
DTM
golang

绝大多数的订单系统的事务都会跨服务,因此都有更新数据一致性的需求,都可以通过 DTM 大幅简化架构,形成一个优雅的解决方案。

而且 DTM 已经深度合作,原生的支持go-zero中的分布式事务,下面就来详细的讲解如何用 DTM 来帮助我们的订单系统解决一致性问题

go-zeroDTM
order rpcCreateOrderModelproduct rpcUpdate
func (l *CreateLogic) Create(in *order.CreateRequest) (*order.CreateResponse, error) {
 // 查询用户是否存在
 _, err := l.svcCtx.UserRpc.UserInfo(l.ctx, &user.UserInfoRequest{
  Id: in.Uid,
 })
 if err != nil {
  return nil, err
 }

 // 查询产品是否存在
 productRes, err := l.svcCtx.ProductRpc.Detail(l.ctx, &product.DetailRequest{
  Id: in.Pid,
 })
 if err != nil {
  return nil, err
 }
 // 判断产品库存是否充足
 if productRes.Stock <= 0 {
  return nil, status.Error(500, "产品库存不足")
 }

 newOrder := model.Order{
  Uid:    in.Uid,
  Pid:    in.Pid,
  Amount: in.Amount,
  Status: 0,
 }

 res, err := l.svcCtx.OrderModel.Insert(&newOrder)
 if err != nil {
  return nil, status.Error(500, err.Error())
 }

 newOrder.Id, err = res.LastInsertId()
 if err != nil {
  return nil, status.Error(500, err.Error())
 }

 _, err = l.svcCtx.ProductRpc.Update(l.ctx, &product.UpdateRequest{
  Id:     productRes.Id,
  Name:   productRes.Name,
  Desc:   productRes.Desc,
  Stock:  productRes.Stock - 1,
  Amount: productRes.Amount,
  Status: productRes.Status,
 })
 if err != nil {
  return nil, err
 }

 return &order.CreateResponse{
  Id: newOrder.Id,
 }, nil
}

之前我们说过,这里处理逻辑存在数据一致性问题,有可能订单创建成功了,但是在更新产品库存的时候可能会发生失败,这时候就会存在订单创建成功,产品库存没有减少的情况。

DTMSAGA
DTM
DTM
dtm->config.ymlMicroServiceTargetEndPointdtmetcd
# ......

# 微服务
MicroService:
  Driver: 'dtm-driver-gozero'           # 要处理注册/发现的驱动程序的名称
  Target: 'etcd://etcd:2379/dtmservice' # 注册 dtm 服务的 etcd 地址
  EndPoint: 'dtm:36790'

# ......
dtm_barrier
DTM
DTM
create database if not exists dtm_barrier
/*!40100 DEFAULT CHARACTER SET utf8mb4 */
;
drop table if exists dtm_barrier.barrier;
create table if not exists dtm_barrier.barrier(
  id bigint(22) PRIMARY KEY AUTO_INCREMENT,
  trans_type varchar(45) default '',
  gid varchar(128) default '',
  branch_id varchar(128) default '',
  op varchar(45) default '',
  barrier_id varchar(45) default '',
  reason varchar(45) default '' comment 'the branch type who insert this record',
  create_time datetime DEFAULT now(),
  update_time datetime DEFAULT now(),
  key(create_time),
  key(update_time),
  UNIQUE key(gid, branch_id, op, barrier_id)
);
dtmcli.SetBarrierTableName
OrderModelProductModel
modelDTM
$ vim mall/service/order/model/ordermodel.go

package model

......

type (
 OrderModel interface {
  TxInsert(tx *sql.Tx, data *Order) (sql.Result, error)
  TxUpdate(tx *sql.Tx, data *Order) error
    FindOneByUid(uid int64) (*Order, error)
 }
)

......

func (m *defaultOrderModel) TxInsert(tx *sql.Tx, data *Order) (sql.Result, error) {
 query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?)", m.table, orderRowsExpectAutoSet)
 ret, err := tx.Exec(query, data.Uid, data.Pid, data.Amount, data.Status)

 return ret, err
}

func (m *defaultOrderModel) TxUpdate(tx *sql.Tx, data *Order) error {
 productIdKey := fmt.Sprintf("%s%v", cacheOrderIdPrefix, data.Id)
 _, err := m.Exec(func(conn sqlx.SqlConn) (result sql.Result, err error) {
  query := fmt.Sprintf("update %s set %s where `id` = ?", m.table, orderRowsWithPlaceHolder)
  return tx.Exec(query, data.Uid, data.Pid, data.Amount, data.Status, data.Id)
 }, productIdKey)
 return err
}

func (m *defaultOrderModel) FindOneByUid(uid int64) (*Order, error) {
 var resp Order

 query := fmt.Sprintf("select %s from %s where `uid` = ? order by create_time desc limit 1", orderRows, m.table)
 err := m.QueryRowNoCache(&resp, query, uid)

 switch err {
 case nil:
  return &resp, nil
 case sqlc.ErrNotFound:
  return nil, ErrNotFound
 default:
  return nil, err
 }
}

$ vim mall/service/product/model/productmodel.go

package model

......

type (
 ProductModel interface {
  TxAdjustStock(tx *sql.Tx, id int64, delta int) (sql.Result, error)
 }
)

......

func (m *defaultProductModel) TxAdjustStock(tx *sql.Tx, id int64, delta int) (sql.Result, error) {
 productIdKey := fmt.Sprintf("%s%v", cacheProductIdPrefix, id)
 return m.Exec(func(conn sqlx.SqlConn) (result sql.Result, err error) {
  query := fmt.Sprintf("update %s set stock=stock+? where stock >= -? and id=?", m.table)
  return tx.Exec(query, delta, delta, id)
 }, productIdKey)
}
product rpc
DecrStockDecrStockRevertproduct rpcDecrStockDecrStockRevert
$ vim mall/service/product/rpc/product.proto

syntax = "proto3";

package productclient;

option go_package = "product";

......

// 减产品库存
message DecrStockRequest {
    int64 id = 1;
    int64 num = 2;
}
message DecrStockResponse {
}
// 减产品库存

service Product {
    ......
    rpc DecrStock(DecrStockRequest) returns(DecrStockResponse);
    rpc DecrStockRevert(DecrStockRequest) returns(DecrStockResponse);
}
提示:修改后使用 goctl 工具重新生成下代码。
DecrStock
$ vim mall/service/product/rpc/internal/logic/decrstocklogic.go

package logic

import (
 "context"
 "database/sql"

 "mall/service/product/rpc/internal/svc"
 "mall/service/product/rpc/product"

 "github.com/dtm-labs/dtmcli"
 "github.com/dtm-labs/dtmgrpc"
 "github.com/zeromicro/go-zero/core/logx"
 "github.com/zeromicro/go-zero/core/stores/sqlx"
 "google.golang.org/grpc/codes"
 "google.golang.org/grpc/status"
)

type DecrStockLogic struct {
 ctx    context.Context
 svcCtx *svc.ServiceContext
 logx.Logger
}

func NewDecrStockLogic(ctx context.Context, svcCtx *svc.ServiceContext) *DecrStockLogic {
 return &DecrStockLogic{
  ctx:    ctx,
  svcCtx: svcCtx,
  Logger: logx.WithContext(ctx),
 }
}

func (l *DecrStockLogic) DecrStock(in *product.DecrStockRequest) (*product.DecrStockResponse, error) {
 // 获取 RawDB
 db, err := sqlx.NewMysql(l.svcCtx.Config.Mysql.DataSource).RawDB()
 if err != nil {
  return nil, status.Error(500, err.Error())
 }

 // 获取子事务屏障对象
 barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)
 if err != nil {
  return nil, status.Error(500, err.Error())
 }
 // 开启子事务屏障
 err = barrier.CallWithDB(db, func(tx *sql.Tx) error {
  // 更新产品库存
  result, err := l.svcCtx.ProductModel.TxAdjustStock(tx, in.Id, -1)
  if err != nil {
   return err
  }

  affected, err := result.RowsAffected()
  // 库存不足,返回子事务失败
  if err == nil && affected == 0 {
   return dtmcli.ErrFailure
  }

  return err
 })

 // 这种情况是库存不足,不再重试,走回滚
 if err == dtmcli.ErrFailure {
  return nil, status.Error(codes.Aborted, dtmcli.ResultFailure)
 }

 if err != nil {
  return nil, err
 }

 return &product.DecrStockResponse{}, nil
}
DecrStockRevertDecrStockDecrStock
$ vim mall/service/product/rpc/internal/logic/decrstockrevertlogic.go

package logic

import (
 "context"
 "database/sql"

 "mall/service/product/rpc/internal/svc"
 "mall/service/product/rpc/product"

 "github.com/dtm-labs/dtmcli"
 "github.com/dtm-labs/dtmgrpc"
 "github.com/zeromicro/go-zero/core/logx"
 "github.com/zeromicro/go-zero/core/stores/sqlx"
 "google.golang.org/grpc/status"
)

type DecrStockRevertLogic struct {
 ctx    context.Context
 svcCtx *svc.ServiceContext
 logx.Logger
}

func NewDecrStockRevertLogic(ctx context.Context, svcCtx *svc.ServiceContext) *DecrStockRevertLogic {
 return &DecrStockRevertLogic{
  ctx:    ctx,
  svcCtx: svcCtx,
  Logger: logx.WithContext(ctx),
 }
}

func (l *DecrStockRevertLogic) DecrStockRevert(in *product.DecrStockRequest) (*product.DecrStockResponse, error) {
 // 获取 RawDB
 db, err := sqlx.NewMysql(l.svcCtx.Config.Mysql.DataSource).RawDB()
 if err != nil {
  return nil, status.Error(500, err.Error())
 }

 // 获取子事务屏障对象
 barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)
 if err != nil {
  return nil, status.Error(500, err.Error())
 }
 // 开启子事务屏障
 err = barrier.CallWithDB(db, func(tx *sql.Tx) error {
  // 更新产品库存
  _, err := l.svcCtx.ProductModel.TxAdjustStock(tx, in.Id, 1)
  return err
 })

 if err != nil {
  return nil, err
 }

 return &product.DecrStockResponse{}, nil
}
order rpc
CreateRevertorder rpcCreateCreateRevert
$ vim mall/service/order/rpc/order.proto

syntax = "proto3";

package orderclient;

option go_package = "order";

......

service Order {
    rpc Create(CreateRequest) returns(CreateResponse);
    rpc CreateRevert(CreateRequest) returns(CreateResponse);
    ......
}
提示:修改后使用 goctl 工具重新生成下代码。
CreateCreateproduct rpcDecrStock
$ vim mall/service/order/rpc/internal/logic/createlogic.go

package logic

import (
 "context"
 "database/sql"
 "fmt"

 "mall/service/order/model"
 "mall/service/order/rpc/internal/svc"
 "mall/service/order/rpc/order"
 "mall/service/user/rpc/user"

 "github.com/dtm-labs/dtmgrpc"
 "github.com/zeromicro/go-zero/core/logx"
 "github.com/zeromicro/go-zero/core/stores/sqlx"
 "google.golang.org/grpc/status"
)

type CreateLogic struct {
 ctx    context.Context
 svcCtx *svc.ServiceContext
 logx.Logger
}

func NewCreateLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CreateLogic {
 return &CreateLogic{
  ctx:    ctx,
  svcCtx: svcCtx,
  Logger: logx.WithContext(ctx),
 }
}

func (l *CreateLogic) Create(in *order.CreateRequest) (*order.CreateResponse, error) {
 // 获取 RawDB
 db, err := sqlx.NewMysql(l.svcCtx.Config.Mysql.DataSource).RawDB()
 if err != nil {
  return nil, status.Error(500, err.Error())
 }

 // 获取子事务屏障对象
 barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)
 if err != nil {
  return nil, status.Error(500, err.Error())
 }
 // 开启子事务屏障
 if err := barrier.CallWithDB(db, func(tx *sql.Tx) error {
  // 查询用户是否存在
  _, err := l.svcCtx.UserRpc.UserInfo(l.ctx, &user.UserInfoRequest{
   Id: in.Uid,
  })
  if err != nil {
   return fmt.Errorf("用户不存在")
  }

  newOrder := model.Order{
   Uid:    in.Uid,
   Pid:    in.Pid,
   Amount: in.Amount,
   Status: 0,
  }
  // 创建订单
  _, err = l.svcCtx.OrderModel.TxInsert(tx, &newOrder)
  if err != nil {
   return fmt.Errorf("订单创建失败")
  }

  return nil
 }); err != nil {
  return nil, status.Error(500, err.Error())
 }

 return &order.CreateResponse{}, nil
}
CreateRevert9(无效状态)
$ vim mall/service/order/rpc/internal/logic/createrevertlogic.go

package logic

import (
 "context"
 "database/sql"
 "fmt"

 "mall/service/order/rpc/internal/svc"
 "mall/service/order/rpc/order"
 "mall/service/user/rpc/user"

 "github.com/dtm-labs/dtmgrpc"
 "github.com/zeromicro/go-zero/core/logx"
 "github.com/zeromicro/go-zero/core/stores/sqlx"
 "google.golang.org/grpc/status"
)

type CreateRevertLogic struct {
 ctx    context.Context
 svcCtx *svc.ServiceContext
 logx.Logger
}

func NewCreateRevertLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CreateRevertLogic {
 return &CreateRevertLogic{
  ctx:    ctx,
  svcCtx: svcCtx,
  Logger: logx.WithContext(ctx),
 }
}

func (l *CreateRevertLogic) CreateRevert(in *order.CreateRequest) (*order.CreateResponse, error) {
 // 获取 RawDB
 db, err := sqlx.NewMysql(l.svcCtx.Config.Mysql.DataSource).RawDB()
 if err != nil {
  return nil, status.Error(500, err.Error())
 }

 // 获取子事务屏障对象
 barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)
 if err != nil {
  return nil, status.Error(500, err.Error())
 }
 // 开启子事务屏障
 if err := barrier.CallWithDB(db, func(tx *sql.Tx) error {
  // 查询用户是否存在
  _, err := l.svcCtx.UserRpc.UserInfo(l.ctx, &user.UserInfoRequest{
   Id: in.Uid,
  })
  if err != nil {
   return fmt.Errorf("用户不存在")
  }
  // 查询用户最新创建的订单
  resOrder, err := l.svcCtx.OrderModel.FindOneByUid(in.Uid)
  if err != nil {
   return fmt.Errorf("订单不存在")
  }
  // 修改订单状态9,标识订单已失效,并更新订单
  resOrder.Status = 9
  err = l.svcCtx.OrderModel.TxUpdate(tx, resOrder)
  if err != nil {
   return fmt.Errorf("订单更新失败")
  }

  return nil
 }); err != nil {
  return nil, status.Error(500, err.Error())
 }

 return &order.CreateResponse{}, nil
}
order api
order rpcCreateCreateRevertproduct rpcDecrStockDecrStockRevertorder apiSAGA事务模式
product rpc
$ vim mall/service/order/api/etc/order.yaml

Name: Order
Host: 0.0.0.0
Port: 8002

......

OrderRpc:
  Etcd:
    Hosts:
    - etcd:2379
    Key: order.rpc

ProductRpc:
  Etcd:
    Hosts:
    - etcd:2379
    Key: product.rpc
product rpc
$ vim mall/service/order/api/internal/config/config.go

package config

import (
 "github.com/zeromicro/go-zero/rest"
 "github.com/zeromicro/go-zero/zrpc"
)

type Config struct {
 rest.RestConf

 Auth struct {
  AccessSecret string
  AccessExpire int64
 }

 OrderRpc   zrpc.RpcClientConf
 ProductRpc zrpc.RpcClientConf
}
product rpc
$ vim mall/service/order/api/internal/svc/servicecontext.go

package svc

import (
 "mall/service/order/api/internal/config"
 "mall/service/order/rpc/orderclient"
 "mall/service/product/rpc/productclient"

 "github.com/zeromicro/go-zero/zrpc"
)

type ServiceContext struct {
 Config config.Config

 OrderRpc   orderclient.Order
 ProductRpc productclient.Product
}

func NewServiceContext(c config.Config) *ServiceContext {
 return &ServiceContext{
  Config:     c,
  OrderRpc:   orderclient.NewOrder(zrpc.MustNewClient(c.OrderRpc)),
  ProductRpc: productclient.NewProduct(zrpc.MustNewClient(c.ProductRpc)),
 }
}
gozerodtm
$ vim mall/service/order/api/order.go

package main

import (
 ......

 _ "github.com/dtm-labs/driver-gozero" // 添加导入 `gozero` 的 `dtm` 驱动
)

var configFile = flag.String("f", "etc/order.yaml", "the config file")

func main() {
 ......
}
order apiCreate
$ vim mall/service/order/api/internal/logic/createlogic.go

package logic

import (
 "context"

 "mall/service/order/api/internal/svc"
 "mall/service/order/api/internal/types"
 "mall/service/order/rpc/order"
 "mall/service/product/rpc/product"

 "github.com/dtm-labs/dtmgrpc"
 "github.com/zeromicro/go-zero/core/logx"
 "google.golang.org/grpc/status"
)

type CreateLogic struct {
 logx.Logger
 ctx    context.Context
 svcCtx *svc.ServiceContext
}

func NewCreateLogic(ctx context.Context, svcCtx *svc.ServiceContext) CreateLogic {
 return CreateLogic{
  Logger: logx.WithContext(ctx),
  ctx:    ctx,
  svcCtx: svcCtx,
 }
}

func (l *CreateLogic) Create(req types.CreateRequest) (resp *types.CreateResponse, err error) {
 // 获取 OrderRpc BuildTarget
 orderRpcBusiServer, err := l.svcCtx.Config.OrderRpc.BuildTarget()
 if err != nil {
  return nil, status.Error(100, "订单创建异常")
 }

 // 获取 ProductRpc BuildTarget
 productRpcBusiServer, err := l.svcCtx.Config.ProductRpc.BuildTarget()
 if err != nil {
  return nil, status.Error(100, "订单创建异常")
 }

 // dtm 服务的 etcd 注册地址
 var dtmServer = "etcd://etcd:2379/dtmservice"
 // 创建一个gid
 gid := dtmgrpc.MustGenGid(dtmServer)
 // 创建一个saga协议的事务
 saga := dtmgrpc.NewSagaGrpc(dtmServer, gid).
  Add(orderRpcBusiServer+"/orderclient.Order/Create", orderRpcBusiServer+"/orderclient.Order/CreateRevert", &order.CreateRequest{
   Uid:    req.Uid,
   Pid:    req.Pid,
   Amount: req.Amount,
   Status: 0,
  }).
  Add(productRpcBusiServer+"/productclient.Product/DecrStock", productRpcBusiServer+"/productclient.Product/DecrStockRevert", &product.DecrStockRequest{
   Id:  req.Pid,
   Num: 1,
  })

 // 事务提交
 err = saga.Submit()
 if err != nil {
  return nil, status.Error(500, err.Error())
 }

 return &types.CreateResponse{}, nil
}
SagaGrpc.Addactiongrpcmall/service/order/rpc/order/order.pb.gomall/service/product/rpc/product/product.pb.goInvoke
禁止双击图片
go-zeroDTM

10.3.1 测试分布式事务正常流程

postman/api/product/createstock1
禁止双击图片
禁止双击图片
postman/api/order/createpid1
禁止双击图片
禁止双击图片
10
禁止双击图片
barrier
禁止双击图片

10.3.2 测试分布式事务失败流程1

10postman/api/order/create
禁止双击图片
219
禁止双击图片
barrier(gid = fqYS8CbYbK8GkL8SCuTRUF)(branch_id = 01)(branch_id = 02)
禁止双击图片
  • 这个分布式事务的操作流程
DTMorder rpcCreateDTMproduct rpcDecrStockpidDTMorder rpcCreateRevertDTMproduct rpcDecrStockRevertproduct rpcDecrStockDecrStockRevert

10.3.3 测试分布式事务失败流程2

1product rpcDecrStock
禁止双击图片
postman/api/order/createpid1
禁止双击图片
391100
禁止双击图片
barrier(gid = ZbjYHv2jNra7RMwyWjB5Lc)(branch_id = 01)(branch_id = 02)product rpcDecrStock
禁止双击图片
DTM
子事务屏障会自动识别正向操作是否已执行,失败流程1未执行业务操作,所以补偿时,也不会执行补偿的业务操作;失败流程2执行了业务操作,所以补偿时,也会执行补偿的业务操作。

项目地址

go-zero

微信交流群

关注『微服务实践』公众号并点击 交流群 获取社区群二维码。