序言
我们通过一个系列文章跟大家详细展示一个 go-zero 微服务示例,整个系列分十篇文章,目录结构如下:
- 分布式事务(本文)
期望通过本系列带你在本机利用 Docker 环境利用 go-zero 快速开发一个商城系统,让你快速上手微服务。
首先,我们来看一下整体的服务拆分图:
DTM
golang
绝大多数的订单系统的事务都会跨服务,因此都有更新数据一致性的需求,都可以通过 DTM 大幅简化架构,形成一个优雅的解决方案。
而且 DTM 已经深度合作,原生的支持go-zero中的分布式事务,下面就来详细的讲解如何用 DTM 来帮助我们的订单系统解决一致性问题
go-zeroDTM
order rpcCreateOrderModelproduct rpcUpdate
func (l *CreateLogic) Create(in *order.CreateRequest) (*order.CreateResponse, error) {
// 查询用户是否存在
_, err := l.svcCtx.UserRpc.UserInfo(l.ctx, &user.UserInfoRequest{
Id: in.Uid,
})
if err != nil {
return nil, err
}
// 查询产品是否存在
productRes, err := l.svcCtx.ProductRpc.Detail(l.ctx, &product.DetailRequest{
Id: in.Pid,
})
if err != nil {
return nil, err
}
// 判断产品库存是否充足
if productRes.Stock <= 0 {
return nil, status.Error(500, "产品库存不足")
}
newOrder := model.Order{
Uid: in.Uid,
Pid: in.Pid,
Amount: in.Amount,
Status: 0,
}
res, err := l.svcCtx.OrderModel.Insert(&newOrder)
if err != nil {
return nil, status.Error(500, err.Error())
}
newOrder.Id, err = res.LastInsertId()
if err != nil {
return nil, status.Error(500, err.Error())
}
_, err = l.svcCtx.ProductRpc.Update(l.ctx, &product.UpdateRequest{
Id: productRes.Id,
Name: productRes.Name,
Desc: productRes.Desc,
Stock: productRes.Stock - 1,
Amount: productRes.Amount,
Status: productRes.Status,
})
if err != nil {
return nil, err
}
return &order.CreateResponse{
Id: newOrder.Id,
}, nil
}
之前我们说过,这里处理逻辑存在数据一致性问题,有可能订单创建成功了,但是在更新产品库存的时候可能会发生失败,这时候就会存在订单创建成功,产品库存没有减少的情况。
DTMSAGA
DTM
DTM
dtm->config.ymlMicroServiceTargetEndPointdtmetcd
# ......
# 微服务
MicroService:
Driver: 'dtm-driver-gozero' # 要处理注册/发现的驱动程序的名称
Target: 'etcd://etcd:2379/dtmservice' # 注册 dtm 服务的 etcd 地址
EndPoint: 'dtm:36790'
# ......
dtm_barrier
DTM
DTM
create database if not exists dtm_barrier
/*!40100 DEFAULT CHARACTER SET utf8mb4 */
;
drop table if exists dtm_barrier.barrier;
create table if not exists dtm_barrier.barrier(
id bigint(22) PRIMARY KEY AUTO_INCREMENT,
trans_type varchar(45) default '',
gid varchar(128) default '',
branch_id varchar(128) default '',
op varchar(45) default '',
barrier_id varchar(45) default '',
reason varchar(45) default '' comment 'the branch type who insert this record',
create_time datetime DEFAULT now(),
update_time datetime DEFAULT now(),
key(create_time),
key(update_time),
UNIQUE key(gid, branch_id, op, barrier_id)
);
dtmcli.SetBarrierTableName
OrderModelProductModel
modelDTM
$ vim mall/service/order/model/ordermodel.go
package model
......
type (
OrderModel interface {
TxInsert(tx *sql.Tx, data *Order) (sql.Result, error)
TxUpdate(tx *sql.Tx, data *Order) error
FindOneByUid(uid int64) (*Order, error)
}
)
......
func (m *defaultOrderModel) TxInsert(tx *sql.Tx, data *Order) (sql.Result, error) {
query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?)", m.table, orderRowsExpectAutoSet)
ret, err := tx.Exec(query, data.Uid, data.Pid, data.Amount, data.Status)
return ret, err
}
func (m *defaultOrderModel) TxUpdate(tx *sql.Tx, data *Order) error {
productIdKey := fmt.Sprintf("%s%v", cacheOrderIdPrefix, data.Id)
_, err := m.Exec(func(conn sqlx.SqlConn) (result sql.Result, err error) {
query := fmt.Sprintf("update %s set %s where `id` = ?", m.table, orderRowsWithPlaceHolder)
return tx.Exec(query, data.Uid, data.Pid, data.Amount, data.Status, data.Id)
}, productIdKey)
return err
}
func (m *defaultOrderModel) FindOneByUid(uid int64) (*Order, error) {
var resp Order
query := fmt.Sprintf("select %s from %s where `uid` = ? order by create_time desc limit 1", orderRows, m.table)
err := m.QueryRowNoCache(&resp, query, uid)
switch err {
case nil:
return &resp, nil
case sqlc.ErrNotFound:
return nil, ErrNotFound
default:
return nil, err
}
}
$ vim mall/service/product/model/productmodel.go
package model
......
type (
ProductModel interface {
TxAdjustStock(tx *sql.Tx, id int64, delta int) (sql.Result, error)
}
)
......
func (m *defaultProductModel) TxAdjustStock(tx *sql.Tx, id int64, delta int) (sql.Result, error) {
productIdKey := fmt.Sprintf("%s%v", cacheProductIdPrefix, id)
return m.Exec(func(conn sqlx.SqlConn) (result sql.Result, err error) {
query := fmt.Sprintf("update %s set stock=stock+? where stock >= -? and id=?", m.table)
return tx.Exec(query, delta, delta, id)
}, productIdKey)
}
product rpc
DecrStockDecrStockRevertproduct rpcDecrStockDecrStockRevert
$ vim mall/service/product/rpc/product.proto
syntax = "proto3";
package productclient;
option go_package = "product";
......
// 减产品库存
message DecrStockRequest {
int64 id = 1;
int64 num = 2;
}
message DecrStockResponse {
}
// 减产品库存
service Product {
......
rpc DecrStock(DecrStockRequest) returns(DecrStockResponse);
rpc DecrStockRevert(DecrStockRequest) returns(DecrStockResponse);
}
提示:修改后使用 goctl 工具重新生成下代码。
DecrStock
$ vim mall/service/product/rpc/internal/logic/decrstocklogic.go
package logic
import (
"context"
"database/sql"
"mall/service/product/rpc/internal/svc"
"mall/service/product/rpc/product"
"github.com/dtm-labs/dtmcli"
"github.com/dtm-labs/dtmgrpc"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/stores/sqlx"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type DecrStockLogic struct {
ctx context.Context
svcCtx *svc.ServiceContext
logx.Logger
}
func NewDecrStockLogic(ctx context.Context, svcCtx *svc.ServiceContext) *DecrStockLogic {
return &DecrStockLogic{
ctx: ctx,
svcCtx: svcCtx,
Logger: logx.WithContext(ctx),
}
}
func (l *DecrStockLogic) DecrStock(in *product.DecrStockRequest) (*product.DecrStockResponse, error) {
// 获取 RawDB
db, err := sqlx.NewMysql(l.svcCtx.Config.Mysql.DataSource).RawDB()
if err != nil {
return nil, status.Error(500, err.Error())
}
// 获取子事务屏障对象
barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)
if err != nil {
return nil, status.Error(500, err.Error())
}
// 开启子事务屏障
err = barrier.CallWithDB(db, func(tx *sql.Tx) error {
// 更新产品库存
result, err := l.svcCtx.ProductModel.TxAdjustStock(tx, in.Id, -1)
if err != nil {
return err
}
affected, err := result.RowsAffected()
// 库存不足,返回子事务失败
if err == nil && affected == 0 {
return dtmcli.ErrFailure
}
return err
})
// 这种情况是库存不足,不再重试,走回滚
if err == dtmcli.ErrFailure {
return nil, status.Error(codes.Aborted, dtmcli.ResultFailure)
}
if err != nil {
return nil, err
}
return &product.DecrStockResponse{}, nil
}
DecrStockRevertDecrStockDecrStock
$ vim mall/service/product/rpc/internal/logic/decrstockrevertlogic.go
package logic
import (
"context"
"database/sql"
"mall/service/product/rpc/internal/svc"
"mall/service/product/rpc/product"
"github.com/dtm-labs/dtmcli"
"github.com/dtm-labs/dtmgrpc"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/stores/sqlx"
"google.golang.org/grpc/status"
)
type DecrStockRevertLogic struct {
ctx context.Context
svcCtx *svc.ServiceContext
logx.Logger
}
func NewDecrStockRevertLogic(ctx context.Context, svcCtx *svc.ServiceContext) *DecrStockRevertLogic {
return &DecrStockRevertLogic{
ctx: ctx,
svcCtx: svcCtx,
Logger: logx.WithContext(ctx),
}
}
func (l *DecrStockRevertLogic) DecrStockRevert(in *product.DecrStockRequest) (*product.DecrStockResponse, error) {
// 获取 RawDB
db, err := sqlx.NewMysql(l.svcCtx.Config.Mysql.DataSource).RawDB()
if err != nil {
return nil, status.Error(500, err.Error())
}
// 获取子事务屏障对象
barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)
if err != nil {
return nil, status.Error(500, err.Error())
}
// 开启子事务屏障
err = barrier.CallWithDB(db, func(tx *sql.Tx) error {
// 更新产品库存
_, err := l.svcCtx.ProductModel.TxAdjustStock(tx, in.Id, 1)
return err
})
if err != nil {
return nil, err
}
return &product.DecrStockResponse{}, nil
}
order rpc
CreateRevertorder rpcCreateCreateRevert
$ vim mall/service/order/rpc/order.proto
syntax = "proto3";
package orderclient;
option go_package = "order";
......
service Order {
rpc Create(CreateRequest) returns(CreateResponse);
rpc CreateRevert(CreateRequest) returns(CreateResponse);
......
}
提示:修改后使用 goctl 工具重新生成下代码。
CreateCreateproduct rpcDecrStock
$ vim mall/service/order/rpc/internal/logic/createlogic.go
package logic
import (
"context"
"database/sql"
"fmt"
"mall/service/order/model"
"mall/service/order/rpc/internal/svc"
"mall/service/order/rpc/order"
"mall/service/user/rpc/user"
"github.com/dtm-labs/dtmgrpc"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/stores/sqlx"
"google.golang.org/grpc/status"
)
type CreateLogic struct {
ctx context.Context
svcCtx *svc.ServiceContext
logx.Logger
}
func NewCreateLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CreateLogic {
return &CreateLogic{
ctx: ctx,
svcCtx: svcCtx,
Logger: logx.WithContext(ctx),
}
}
func (l *CreateLogic) Create(in *order.CreateRequest) (*order.CreateResponse, error) {
// 获取 RawDB
db, err := sqlx.NewMysql(l.svcCtx.Config.Mysql.DataSource).RawDB()
if err != nil {
return nil, status.Error(500, err.Error())
}
// 获取子事务屏障对象
barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)
if err != nil {
return nil, status.Error(500, err.Error())
}
// 开启子事务屏障
if err := barrier.CallWithDB(db, func(tx *sql.Tx) error {
// 查询用户是否存在
_, err := l.svcCtx.UserRpc.UserInfo(l.ctx, &user.UserInfoRequest{
Id: in.Uid,
})
if err != nil {
return fmt.Errorf("用户不存在")
}
newOrder := model.Order{
Uid: in.Uid,
Pid: in.Pid,
Amount: in.Amount,
Status: 0,
}
// 创建订单
_, err = l.svcCtx.OrderModel.TxInsert(tx, &newOrder)
if err != nil {
return fmt.Errorf("订单创建失败")
}
return nil
}); err != nil {
return nil, status.Error(500, err.Error())
}
return &order.CreateResponse{}, nil
}
CreateRevert9(无效状态)
$ vim mall/service/order/rpc/internal/logic/createrevertlogic.go
package logic
import (
"context"
"database/sql"
"fmt"
"mall/service/order/rpc/internal/svc"
"mall/service/order/rpc/order"
"mall/service/user/rpc/user"
"github.com/dtm-labs/dtmgrpc"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/stores/sqlx"
"google.golang.org/grpc/status"
)
type CreateRevertLogic struct {
ctx context.Context
svcCtx *svc.ServiceContext
logx.Logger
}
func NewCreateRevertLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CreateRevertLogic {
return &CreateRevertLogic{
ctx: ctx,
svcCtx: svcCtx,
Logger: logx.WithContext(ctx),
}
}
func (l *CreateRevertLogic) CreateRevert(in *order.CreateRequest) (*order.CreateResponse, error) {
// 获取 RawDB
db, err := sqlx.NewMysql(l.svcCtx.Config.Mysql.DataSource).RawDB()
if err != nil {
return nil, status.Error(500, err.Error())
}
// 获取子事务屏障对象
barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)
if err != nil {
return nil, status.Error(500, err.Error())
}
// 开启子事务屏障
if err := barrier.CallWithDB(db, func(tx *sql.Tx) error {
// 查询用户是否存在
_, err := l.svcCtx.UserRpc.UserInfo(l.ctx, &user.UserInfoRequest{
Id: in.Uid,
})
if err != nil {
return fmt.Errorf("用户不存在")
}
// 查询用户最新创建的订单
resOrder, err := l.svcCtx.OrderModel.FindOneByUid(in.Uid)
if err != nil {
return fmt.Errorf("订单不存在")
}
// 修改订单状态9,标识订单已失效,并更新订单
resOrder.Status = 9
err = l.svcCtx.OrderModel.TxUpdate(tx, resOrder)
if err != nil {
return fmt.Errorf("订单更新失败")
}
return nil
}); err != nil {
return nil, status.Error(500, err.Error())
}
return &order.CreateResponse{}, nil
}
order api
order rpcCreateCreateRevertproduct rpcDecrStockDecrStockRevertorder apiSAGA事务模式
product rpc
$ vim mall/service/order/api/etc/order.yaml
Name: Order
Host: 0.0.0.0
Port: 8002
......
OrderRpc:
Etcd:
Hosts:
- etcd:2379
Key: order.rpc
ProductRpc:
Etcd:
Hosts:
- etcd:2379
Key: product.rpc
product rpc
$ vim mall/service/order/api/internal/config/config.go
package config
import (
"github.com/zeromicro/go-zero/rest"
"github.com/zeromicro/go-zero/zrpc"
)
type Config struct {
rest.RestConf
Auth struct {
AccessSecret string
AccessExpire int64
}
OrderRpc zrpc.RpcClientConf
ProductRpc zrpc.RpcClientConf
}
product rpc
$ vim mall/service/order/api/internal/svc/servicecontext.go
package svc
import (
"mall/service/order/api/internal/config"
"mall/service/order/rpc/orderclient"
"mall/service/product/rpc/productclient"
"github.com/zeromicro/go-zero/zrpc"
)
type ServiceContext struct {
Config config.Config
OrderRpc orderclient.Order
ProductRpc productclient.Product
}
func NewServiceContext(c config.Config) *ServiceContext {
return &ServiceContext{
Config: c,
OrderRpc: orderclient.NewOrder(zrpc.MustNewClient(c.OrderRpc)),
ProductRpc: productclient.NewProduct(zrpc.MustNewClient(c.ProductRpc)),
}
}
gozerodtm
$ vim mall/service/order/api/order.go
package main
import (
......
_ "github.com/dtm-labs/driver-gozero" // 添加导入 `gozero` 的 `dtm` 驱动
)
var configFile = flag.String("f", "etc/order.yaml", "the config file")
func main() {
......
}
order apiCreate
$ vim mall/service/order/api/internal/logic/createlogic.go
package logic
import (
"context"
"mall/service/order/api/internal/svc"
"mall/service/order/api/internal/types"
"mall/service/order/rpc/order"
"mall/service/product/rpc/product"
"github.com/dtm-labs/dtmgrpc"
"github.com/zeromicro/go-zero/core/logx"
"google.golang.org/grpc/status"
)
type CreateLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewCreateLogic(ctx context.Context, svcCtx *svc.ServiceContext) CreateLogic {
return CreateLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *CreateLogic) Create(req types.CreateRequest) (resp *types.CreateResponse, err error) {
// 获取 OrderRpc BuildTarget
orderRpcBusiServer, err := l.svcCtx.Config.OrderRpc.BuildTarget()
if err != nil {
return nil, status.Error(100, "订单创建异常")
}
// 获取 ProductRpc BuildTarget
productRpcBusiServer, err := l.svcCtx.Config.ProductRpc.BuildTarget()
if err != nil {
return nil, status.Error(100, "订单创建异常")
}
// dtm 服务的 etcd 注册地址
var dtmServer = "etcd://etcd:2379/dtmservice"
// 创建一个gid
gid := dtmgrpc.MustGenGid(dtmServer)
// 创建一个saga协议的事务
saga := dtmgrpc.NewSagaGrpc(dtmServer, gid).
Add(orderRpcBusiServer+"/orderclient.Order/Create", orderRpcBusiServer+"/orderclient.Order/CreateRevert", &order.CreateRequest{
Uid: req.Uid,
Pid: req.Pid,
Amount: req.Amount,
Status: 0,
}).
Add(productRpcBusiServer+"/productclient.Product/DecrStock", productRpcBusiServer+"/productclient.Product/DecrStockRevert", &product.DecrStockRequest{
Id: req.Pid,
Num: 1,
})
// 事务提交
err = saga.Submit()
if err != nil {
return nil, status.Error(500, err.Error())
}
return &types.CreateResponse{}, nil
}
SagaGrpc.Addactiongrpcmall/service/order/rpc/order/order.pb.gomall/service/product/rpc/product/product.pb.goInvoke
go-zeroDTM
10.3.1 测试分布式事务正常流程
postman/api/product/createstock1
postman/api/order/createpid1
10
barrier
10.3.2 测试分布式事务失败流程1
10postman/api/order/create
219
barrier(gid = fqYS8CbYbK8GkL8SCuTRUF)(branch_id = 01)(branch_id = 02)
- 这个分布式事务的操作流程
DTMorder rpcCreateDTMproduct rpcDecrStockpidDTMorder rpcCreateRevertDTMproduct rpcDecrStockRevertproduct rpcDecrStockDecrStockRevert
10.3.3 测试分布式事务失败流程2
1product rpcDecrStock
postman/api/order/createpid1
391100
barrier(gid = ZbjYHv2jNra7RMwyWjB5Lc)(branch_id = 01)(branch_id = 02)product rpcDecrStock
DTM
子事务屏障会自动识别正向操作是否已执行,失败流程1未执行业务操作,所以补偿时,也不会执行补偿的业务操作;失败流程2执行了业务操作,所以补偿时,也会执行补偿的业务操作。
项目地址
go-zero
微信交流群
关注『微服务实践』公众号并点击 交流群 获取社区群二维码。