UndoDoUnDo
Undo

并且如果操作之间没有相互依赖关系,可以通过并发操作来提高运行效率,执行流程图如下

执行流程图

实现

TransactionCommand
type TransactionCommand interface {
	Do() (define.ExecuteResult, error)
	UnDo() (define.ExecuteResult, error)
	GetContext() context.Context
}

目前实现了 DB 操作的 Do 和 Undo ,其他存储类型的正逆向操作同样进行 接口实现即可

type transactionCommandDBImpl struct {
	OriginData     define.TransactionUpdateData
	NewData        define.TransactionUpdateData
	WhereCondition define.FilterCondition
	ctx            context.Context
	maxRetryCount  int
}

func NewTransactionCommandUpdateDBImpl(
	originData define.TransactionUpdateData,
	newData define.TransactionUpdateData,
	whereCondition define.FilterCondition,
	ctx context.Context,
	maxRetryCnt int,
) *transactionCommandDBImpl {
	return &transactionCommandDBImpl{
		OriginData:     originData,
		NewData:        newData,
		WhereCondition: whereCondition,
		ctx:            ctx,
		maxRetryCount:  maxRetryCnt,
	}
}

func (t *transactionCommandDBImpl) GetContext() context.Context {
	return t.ctx
}


func (t *transactionCommandDBImpl) Do() (define.ExecuteResult, error) {
	engine := t.getDbClient()
	// 通过反射注入where 条件
	engine = sql_utils.QuerySQLCommonWhere(engine, t.WhereCondition)
	res := engine.Table(t.WhereCondition.GetTableName()).Updates(t.NewData.GetUpdateFields())
	if err := res.GetError(); err != nil {
		return nil, errors.WithStack(err)
	}
	executeResult := define.NewUpdateExecuteResult(res.RowsAffected())
	return executeResult, nil
}

func (t *transactionCommandDBImpl) UnDo() (define.ExecuteResult, error) {
	engine := t.getDbClient()
	// 通过反射注入where 条件
	engine = sql_utils.QuerySQLCommonWhere(engine, t.WhereCondition)
	res := engine.Table(t.WhereCondition.GetTableName()).Updates(t.OriginData.GetUpdateFields())
	if err := res.GetError(); err != nil {
		return nil, errors.WithStack(err)
	}
	executeResult := define.NewUpdateExecuteResult(res.RowsAffected())
	return executeResult, nil
}
TransactionCommandExecutor
type TransactionCommandExecutor struct {
	commands        []TransactionCommand
	successCommands []TransactionCommand
}

func NewTransactionCommandExecutor(commands []TransactionCommand) *TransactionCommandExecutor {
	return &TransactionCommandExecutor{
		commands:        commands,
		successCommands: make([]TransactionCommand, 0, len(commands)),
	}
}

自动执行所有命令

func (e *TransactionCommandExecutor) AutoExecute(ctx context.Context) error {
	err := e.Execute(ctx)
	if err != nil || len(e.failCommands) > 0 {
		logger.CtxLogErrorf(ctx, "err : %+v", err)
		if unDoErr := e.UndoSuccessCommand(ctx); unDoErr != nil {
			logger.CtxLogErrorf(ctx, "undo success command err : %+v", unDoErr)
		}
		return err
	}
	return nil
}

// Execute
// @Description: 并发执行更新操作,一个发生错误则全部undo
func (e *TransactionCommandExecutor) Execute(ctx context.Context) error {
	if len(e.commands) == 0 {
		return nil
	}

	e.wg.Add(len(e.commands))
	for _, c := range e.commands {
		go func(transactionCommand TransactionCommand) {
			defer e.wg.Done()
			// 如果已经有失败的命令则立即返回
			if len(e.failCommands) > 0 {
				return
			}
			executeResult, err := transactionCommand.Do()
			if err != nil {
				logger.CtxLogErrorf(transactionCommand.GetContext(), "execute : %+v", err)
				e.appendFail(transactionCommand)
				return
			}
			if !executeResult.IsSuccess() {
				e.appendFail(transactionCommand)
				return
			}
			e.appendSuccess(transactionCommand)
		}(c)
	}
	e.wg.Wait()
	return nil
}

func (e *TransactionCommandExecutor) appendSuccess(command TransactionCommand) {
	e.Lock()
	e.successCommands = append(e.successCommands, command)
	e.Unlock()
}

func (e *TransactionCommandExecutor) appendFail(command TransactionCommand) {
	e.Lock()
	e.failCommands = append(e.failCommands, command)
	e.Unlock()
}

// UndoSuccessCommand
// @Description: 并发回滚操作
func (e *TransactionCommandExecutor) UndoSuccessCommand(ctx context.Context) error {
	if len(e.successCommands) == 0 {
		return nil
	}
	if len(e.failCommands) == 0 {
		return nil
	}
	// 并发进行回滚
	e.wg.Add(len(e.commands))
	for _, c := range e.successCommands {
		go func(transactionCommand TransactionCommand) {
			defer e.wg.Done()
			executeResult, err := transactionCommand.UnDo()
			if err != nil {
				logger.CtxLogErrorf(transactionCommand.GetContext(), "execute : %+v", err)
				return
			}
			if !executeResult.IsSuccess() {
				// log or return
				return
			}
		}(c)
	}
	e.wg.Wait()
	return nil
}