在分析了qs的大致源码后golang源码分析:dtm分布式事务(1),我们分析下dtm-example的源码结构,每个例子都是类似的。
先看下main.go里面的main函数
func main() {hintExit("")busi.BusiConf = dtmimp.DBConfbusi.ResetXaData()app, gsvr := busi.Startup()examples.AddRoutes(app)if cmd == "qs"go busi.RunHTTP(app)busi.QsMain()examples.IsExists(cmd)
如果没有知名具体实例参数,列出参数列表
func hintExit(msg string) {_, cmd := range examples.Commands
然后是将数据库里未提交的事务回滚掉,learn/dtm/dtm-examples/busi/utils.go
func ResetXaData() {db.Must().Raw("xa recover").Scan(&xas)for _, xa := range xas {db.Must().Exec(fmt.Sprintf("xa rollback '%s'", xa.Data))}
其中数据库连接参数定义在
busi/base_types.go
var StoreHost = "localhost"var BusiConf = dtmcli.DBConf{Driver: "mysql",Host: StoreHost,Port: 3306,User: "root",}
获取db连接dtmutil/db.go
func DbGet(conf dtmcli.DBConf, ops ...func(*gorm.DB)) *DB {dsn := dtmimp.GetDsn(conf)db, ok := dbs.Load(dsn)db1, err := gorm.Open(getGormDialetor(conf.Driver, dsn), &gorm.Config{SkipDefaultTransaction: true,})dbs.Store(dsn, db)
然后启动业务服务,对外提供服务busi/startup.go
func Startup() (*gin.Engine, *grpc.Server) {svr := GrpcStartup()app := BaseAppStartup()
其中grpc服务
func GrpcStartup() *grpc.Server {conn, err := grpc.Dial(dtmutil.DefaultGrpcServerDtmClient = dtmgpb.NewDtmClient(conn)conn1, err := grpc.Dial(BusiGrpc,BusiCli = NewBusiClient(conn1)s := grpc.NewServer(RegisterBusiServer(s, &busiServer{})
服务实现位于busi/busi_grpc.pb.go
type busiServer struct {UnimplementedBusiServer}
对外提供了所有业务接口
func (s *busiServer) QueryPreparedfunc (s *busiServer) TransIn(ctx context.Context, in *ReqGrpc)func (s *busiServer) TransOut(ctx context.Context, in *ReqGrpc)func (s *busiServer) TransInRevert(ctx context.Context, in *ReqGrpc)func (s *busiServer) TransOutRevert(ctx context.Context, in *ReqGrpc)func (s *busiServer) TransInConfirm(ctx context.Context, in *ReqGrpc)func (s *busiServer) TransOutConfirm(ctx context.Context, in *ReqGrpc)func (s *busiServer) TransInTcc(ctx context.Context, in *ReqGrpc)func (s *busiServer) TransOutTcc(ctx context.Context, in *ReqGrpc)func (s *busiServer) TransInXa(ctx context.Context, in *ReqGrpc)func (s *busiServer) TransOutXa(ctx context.Context, in *ReqGrpc)func (s *busiServer) TransInTccNested(ctx context.Context, in *ReqGrpc)func (s *busiServer) TransOutHeaderYes(ctx context.Context, in *ReqGrpc)func (s *busiServer) TransOutHeaderNo(ctx context.Context, in *ReqGrpc)
然后是注册http服务,busi/base_http.go
func BaseAppStartup() *gin.Engine {app := dtmutil.GetGinApp()app.Use(func(c *gin.Context) {v := MainSwitch.NextResult.Fetch()BaseAddRoute(app)addJrpcRoute(app)for k, v := range setupFuncs {v(app)
其中,注册http路由和json rpc路由的实现如下
func BaseAddRoute(app *gin.Engine) {app.POST(BusiAPI+"/workflow/resume", dtmutil.WrapHandler(func(ctx *gin.Context) interface{}
busi/base_jrpc.go
func addJrpcRoute(app *gin.Engine) {app.POST("/api/json-rpc", dtmutil.WrapHandler(func(c *gin.Context) interface{} {
var setupFuncs = map[string]setupFunc{}
注册了barrier,busi/barrier.go
func init() {setupFuncs["BarrierSetup"] = func(app *gin.Engine) {app.POST(BusiAPI+"/SagaBTransIn", dtmutil.WrapHandler(func(c *gin.Context) interface{} {barrier := MustBarrierFromGin(c)ti, err := dtmcli.BarrierFromQuery(c.Request.URL.Query())return BarrierFrom(dtmimp.EscapeGet(qs, "trans_type"), dtmimp.EscapeGet(qs, "gid"), dtmimp.EscapeGet(qs, "branch_id"), dtmimp.EscapeGet(qs, "op"))return barrier.CallWithDB(pdbGet(), func(tx *sql.Tx) error {return SagaAdjustBalance(tx, TransInUID, reqFrom(c).Amount, reqFrom(c).TransInResult)
func (bb *BranchBarrier) CallWithDB(db *sql.DB, busiCall BarrierBusiFunc) error {tx, err := db.Begin()
接着是注册路由examples/startup.go
var routes = []PostRoute{}func AddRoutes(app *gin.Engine) {for _, r := range routes {app.POST(r.Route, dtmutil.WrapHandler(r.Handler))
对于例子"qs"
go busi.RunHTTP(app)time.Sleep(200 * time.Millisecond)busi.QsMain()
代码位于busi/base_http.go,仅仅启动了http服务进行监听
func RunHTTP(app *gin.Engine) {err := app.Run(fmt.Sprintf(":%d", BusiPort))
busi/quick_start.go 启动了 QsStartSvr()供dtm回调用
func QsMain() {QsStartSvr()QsFireRequest()select {}}
func QsStartSvr() {app := gin.New()qsAddRoute(app)log.Printf("quick start examples listening at %d", qsBusiPort)go func() {_ = app.Run(fmt.Sprintf(":%d", qsBusiPort))
func qsAddRoute(app *gin.Engine) {app.POST(qsBusiAPI+"/TransIn", func(c *gin.Context) {
然后就是触发我们的业务请求,将我们的参数和回调地址注册给dtm,dtmutil/consts.go
func QsFireRequest() string {req := &gin.H{"amount": 30}saga := dtmcli.NewSaga(dtmServer, shortuuid.New()).Add(qsBusi+"/TransOut", qsBusi+"/TransOutCompensate", req).Add(qsBusi+"/TransIn", qsBusi+"/TransInCompensate", req)err := saga.Submit()
DefaultHTTPServer = "http://localhost:36789/api/dtmsvr"DefaultJrpcServer = "http://localhost:36789/api/json-rpc"DefaultGrpcServer = "localhost:36790"
分析完qs例子后,其他例子是类似的,对于grpc请求,会初始化grpc workflow
workflow.InitGrpc(dtmutil.DefaultGrpcServer, busi.BusiGrpc, gsvr)busi.BusiCli = busi.NewBusiClient(conn1)
http请求,对应的是http workflow
workflow.InitHTTP(dtmutil.DefaultHTTPServer, busi.Busi+"/workflow/resume")
然后分别启动grpc回调监听,http回调监听,最后通过Call触发请求
go busi.RunGrpc(gsvr)go busi.RunHTTP(app)examples.Call(cmd)
对于每一个例子是如何注册进去的呢?examples/startup.go
func AddCommand(name string, fn func() string) {Commands = append(Commands, commandInfo{Arg: name, Action: fn})
其中的Action,就是注册的时候注册的执行方法,在Call方法里面会被调用:
func Call(name string) {c.Action()
例子的注入时机在init函数中Commands,以xa为例:examples/http_xa.go
func init() {AddCommand("http_xa", func() string {err := dtmcli.XaGlobalTransaction(dtmutil.DefaultHTTPServer, gid, func(xa *dtmcli.Xa) (*resty.Response, error) {resp, err := xa.CallBranch(&busi.ReqHTTP{Amount: 30}, busi.Busi+"/TransOutXa")return xa.CallBranch(&busi.ReqHTTP{Amount: 30}, busi.Busi+"/TransInXa")