UndoDoUnDoUndo并且如果操作之间没有相互依赖关系,可以通过并发操作来提高运行效率,执行流程图如下
实现
TransactionCommandtype TransactionCommand interface {
Do() (define.ExecuteResult, error)
UnDo() (define.ExecuteResult, error)
GetContext() context.Context
}
目前实现了 DB 操作的 Do 和 Undo ,其他存储类型的正逆向操作同样进行 接口实现即可
type transactionCommandDBImpl struct {
OriginData define.TransactionUpdateData
NewData define.TransactionUpdateData
WhereCondition define.FilterCondition
ctx context.Context
maxRetryCount int
}
func NewTransactionCommandUpdateDBImpl(
originData define.TransactionUpdateData,
newData define.TransactionUpdateData,
whereCondition define.FilterCondition,
ctx context.Context,
maxRetryCnt int,
) *transactionCommandDBImpl {
return &transactionCommandDBImpl{
OriginData: originData,
NewData: newData,
WhereCondition: whereCondition,
ctx: ctx,
maxRetryCount: maxRetryCnt,
}
}
func (t *transactionCommandDBImpl) GetContext() context.Context {
return t.ctx
}
func (t *transactionCommandDBImpl) Do() (define.ExecuteResult, error) {
engine := t.getDbClient()
// 通过反射注入where 条件
engine = sql_utils.QuerySQLCommonWhere(engine, t.WhereCondition)
res := engine.Table(t.WhereCondition.GetTableName()).Updates(t.NewData.GetUpdateFields())
if err := res.GetError(); err != nil {
return nil, errors.WithStack(err)
}
executeResult := define.NewUpdateExecuteResult(res.RowsAffected())
return executeResult, nil
}
func (t *transactionCommandDBImpl) UnDo() (define.ExecuteResult, error) {
engine := t.getDbClient()
// 通过反射注入where 条件
engine = sql_utils.QuerySQLCommonWhere(engine, t.WhereCondition)
res := engine.Table(t.WhereCondition.GetTableName()).Updates(t.OriginData.GetUpdateFields())
if err := res.GetError(); err != nil {
return nil, errors.WithStack(err)
}
executeResult := define.NewUpdateExecuteResult(res.RowsAffected())
return executeResult, nil
}
TransactionCommandExecutortype TransactionCommandExecutor struct {
commands []TransactionCommand
successCommands []TransactionCommand
}
func NewTransactionCommandExecutor(commands []TransactionCommand) *TransactionCommandExecutor {
return &TransactionCommandExecutor{
commands: commands,
successCommands: make([]TransactionCommand, 0, len(commands)),
}
}
自动执行所有命令
func (e *TransactionCommandExecutor) AutoExecute(ctx context.Context) error {
err := e.Execute(ctx)
if err != nil || len(e.failCommands) > 0 {
logger.CtxLogErrorf(ctx, "err : %+v", err)
if unDoErr := e.UndoSuccessCommand(ctx); unDoErr != nil {
logger.CtxLogErrorf(ctx, "undo success command err : %+v", unDoErr)
}
return err
}
return nil
}
// Execute
// @Description: 并发执行更新操作,一个发生错误则全部undo
func (e *TransactionCommandExecutor) Execute(ctx context.Context) error {
if len(e.commands) == 0 {
return nil
}
e.wg.Add(len(e.commands))
for _, c := range e.commands {
go func(transactionCommand TransactionCommand) {
defer e.wg.Done()
// 如果已经有失败的命令则立即返回
if len(e.failCommands) > 0 {
return
}
executeResult, err := transactionCommand.Do()
if err != nil {
logger.CtxLogErrorf(transactionCommand.GetContext(), "execute : %+v", err)
e.appendFail(transactionCommand)
return
}
if !executeResult.IsSuccess() {
e.appendFail(transactionCommand)
return
}
e.appendSuccess(transactionCommand)
}(c)
}
e.wg.Wait()
return nil
}
func (e *TransactionCommandExecutor) appendSuccess(command TransactionCommand) {
e.Lock()
e.successCommands = append(e.successCommands, command)
e.Unlock()
}
func (e *TransactionCommandExecutor) appendFail(command TransactionCommand) {
e.Lock()
e.failCommands = append(e.failCommands, command)
e.Unlock()
}
// UndoSuccessCommand
// @Description: 并发回滚操作
func (e *TransactionCommandExecutor) UndoSuccessCommand(ctx context.Context) error {
if len(e.successCommands) == 0 {
return nil
}
if len(e.failCommands) == 0 {
return nil
}
// 并发进行回滚
e.wg.Add(len(e.commands))
for _, c := range e.successCommands {
go func(transactionCommand TransactionCommand) {
defer e.wg.Done()
executeResult, err := transactionCommand.UnDo()
if err != nil {
logger.CtxLogErrorf(transactionCommand.GetContext(), "execute : %+v", err)
return
}
if !executeResult.IsSuccess() {
// log or return
return
}
}(c)
}
e.wg.Wait()
return nil
}