背景:

  你可能会很少用到批量更新,但是批量更新的需求确实存在,比如上传大量数据时,这个是需要批量更新的。如果你还是一条一条的更新,如果程序反应很慢,会影响用户的体验。

批量更新实现的四种方式:

  批量更新有四种方式,我最终选择了最后一种方式,理由会给出。
1、.replace into 批量更新:
replace into test_tbl (id,dr) values (1,‘2’),(2,‘3’),…(x,‘y’);
2、insert into …on duplicate key update批量更新
insert into test_tbl (id,dr) values (1,‘2’),(2,‘3’),…(x,‘y’) on duplicate key update dr=values(dr);
3、创建临时表,先更新临时表,然后从临时表中update:
例如:
create temporary table tmp(id int(4) primary key,dr varchar(50));
insert into tmp values (0,‘gone’), (1,‘xx’),…(m,‘yy’);
update test_tbl, tmp set test_tbl.dr=tmp.dr where test_tbl.id=tmp.id;
注意:这种方法需要用户有temporary 表的create 权限。
4、update 表名 set 字段 = case id when 1 then 字段值 when 2 then 字段值 when 3 then 字段值 end where id in (1,2,3)
对于第1、2种方式,网上大多人认为不是合适的理由是:
replace into 和 insert into on duplicate key update的不同在于:
replace into 操作本质是对重复的记录先delete 后insert,如果更新的字段不全会将缺失的字段置为缺省值
insert into 则是只update重复记录,不会改变其它字段。
 所以两者的区别只有一个,insert … on deplicate udpate保留了所有字段的旧值,再覆盖然后一起insert进去,而replace没有保留旧值,直接删除再insert新值。

从底层执行效率上来讲,replace要比insert … on deplicate update效率要高,但是在写replace的时候,字段要写全,防止老的字段数据被删除。
 
第四种方式,其实就是拼接的case when case when.
拼接的SQL语句类似于这种:

* UPDATE book 
*     SET book_id = CASE id 
*         WHEN 1 THEN 3 
*         WHEN 2 THEN 4 
*         WHEN 3 THEN 5 
*     END, 
*     name = CASE id 
*         WHEN 1 THEN 'New Title 1'
*         WHEN 2 THEN 'New Title 2'
*         WHEN 3 THEN 'New Title 3'
*     END
* WHERE id IN (1,2,3)   

case when 的方式实现批量更新:

  原理很简单,其实就是拼接成一条条的SQL语句,这条SQL语句是按照case when这种方式拼接的。在这里我做了限制,设置大于200条数据就会自动的切分SQL,防止一条SQL语句量过大。
代码实现:

// tableName表的名字,itemList你定义的数组类型的结构体,[]*Demo
func buildBatchUpdateSQLArray(tableName string, dataList interface{}) ([]string, *wmserror.WMSError) {

	fieldValue := reflect.ValueOf(dataList)
	fieldType := reflect.TypeOf(dataList).Elem().Elem()
	sliceLength := fieldValue.Len()
	fieldNum := fieldType.NumField()

	// 检验结构体标签是否为空和重复
	verifyTagDuplicate := make(map[string]string)
	for i := 0; i < fieldNum; i++ {
		fieldTag := fieldType.Field(i).Tag.Get("gorm")

		fieldName := GetFieldName(fieldTag)
		if len(strings.TrimSpace(fieldName)) == 0 {
			return nil, wmserror.NewError(constant.ErrParam, "the structure attribute should have tag")
		}

		if !strings.HasPrefix(fieldName, "id;") {
			return nil, wmserror.NewError(constant.ErrParam, "the structure attribute should have primary_key")
		}

		_, ok := verifyTagDuplicate[fieldName]
		if !ok {
			verifyTagDuplicate[fieldName] = fieldName
		} else {
			return nil, wmserror.NewError(constant.ErrParam, "the structure attribute %v tag is not allow duplication", fieldName)
		}

	}

	var IDList []string
	updateMap := make(map[string][]string)
	for i := 0; i < sliceLength; i++ {
		// 得到某一个具体的结构体的
		structValue := fieldValue.Index(i).Elem()
		for j := 0; j < fieldNum; j++ {
			elem := structValue.Field(j)

			var temp string
			switch elem.Kind() {
			case reflect.Int64:
				temp = strconv.FormatInt(elem.Int(), 10)
			case reflect.String:
				if strings.Contains(elem.String(), "'") {
					temp = fmt.Sprintf("'%v'", strings.ReplaceAll(elem.String(), "'", "\\'"))
				} else {
					temp = fmt.Sprintf("'%v'", elem.String())
				}
			case reflect.Float64:
				temp = strconv.FormatFloat(elem.Float(), 'f', -1, 64)
			case reflect.Bool:
				temp = strconv.FormatBool(elem.Bool())
			default:
				return nil, wmserror.NewError(constant.ErrConert, "type conversion error, param is %v", fieldType.Field(j).Tag.Get("json"))
			}

			gormTag := fieldType.Field(j).Tag.Get("gorm")

			fieldTag := GetFieldName(gormTag)

			if strings.HasPrefix(fieldTag, "id;") {
				id, err := strconv.ParseInt(temp, 10, 64)
				if err != nil {
					return nil, wmserror.NewError(constant.ErrConert, err.Error())
				}
				// id 的合法性校验
				if id < 1 {
					return nil, wmserror.NewError(constant.ErrParam, "this structure should have a primary key and gt 0")
				}
				IDList = append(IDList, temp)
				continue
			}

			valueList := append(updateMap[fieldTag], temp)
			updateMap[fieldTag] = valueList
		}
	}

	length := len(IDList)
	size := 200
	SQLQuantity := getSQLQuantity(length, size)
	var SQLArray []string
	k := 0

	for i := 0; i < SQLQuantity; i++ {
		count := 0

		var record bytes.Buffer
		record.WriteString("UPDATE " + tableName + " SET ")

		for fieldName, fieldValueList := range updateMap {
			record.WriteString(fieldName)
			record.WriteString(" = CASE " + "id")

			for j := k; j < len(IDList) && j < len(fieldValueList) && j < size+k; j++ {
				record.WriteString(" WHEN " + IDList[j] + " THEN " + fieldValueList[j])
			}
			count++
			if count != fieldNum-1 {
				record.WriteString(" END, ")
			}
		}

		record.WriteString(" END WHERE ")
		record.WriteString("id" + " IN (")
		min := size + k
		if len(IDList) < min {
			min = len(IDList)
		}
		record.WriteString(strings.Join(IDList[k:min], ","))
		record.WriteString(");")

		k += size
		SQLArray = append(SQLArray, record.String())
	}

	return SQLArray, nil
}

func getSQLQuantity(length, size int) int {
	SQLQuantity := int(math.Ceil(float64(length) / float64(size)))
	return SQLQuantity
}

func GetFieldName(fieldTag string) string {
	fieldTagArr := strings.Split(fieldTag, ":")
	if len(fieldTagArr) == 0 {
		return ""
	}

	fieldName := fieldTagArr[len(fieldTagArr)-1]

	return fieldName
}

type Demo struct {
	ID            int64   `gorm:"column:id;primary_key" json:"id"`
	name          string  `gorm:"column:name" json:"name"`
	Width         float64 `gorm:"column:width" json:"width"`
}

var demo []*Demo
demo1 := &Demo{1, "nihao", 12.0 }
demo2 := &Demo{2, "renzhen", 13.0}
demo3 := &Demo{3, "duidai", 12.0}
demo4 := &Demo{4, "xiexie", 13.0}
demo5 := &Demo{5, "OOP", 12.0}
demo = append(demo, demo1, demo2, demo3, demo4, demo5)
_, err := buildBatchUpdateSQLArray("demo_tab", demo)
if err != nil {
	t.Log(err)
}

批量更新的优势:

1、从网络传输方面来说,批量插入多条数据,更省空间。
2. 从连接数量来说,批量插入只使用一个连接,在使用数据库连接池的情况下,逐个插入可能会占用多个连接。
3. 从事务方面来说,逐条插入每次都会新建一个事务,批量插入只会使用一个事务。
4. 从日志方面来说,由于逐条插入每次都会插入binlog事务日志,也会影响效率。

已经出现的批量更新:

mybatis 以及 mybatis plus 还有JPA都支持批量更新的操作。

 <updateid="updateBatch"parameterType="java.util.List">   
update mydata_table 
    set  status=
    <foreach collection="list" item="item" index="index" 
        separator=" " open="case ID" close="end">
        when #{item.id} then #{item.status}
    </foreach>
    where id in
    <foreach collection="list" index="index" item="item" 
        separator="," open="(" close=")">
        #{item.id,jdbcType=BIGINT}
    </foreach>
 </update>

这是mybatis 批量更新的xml的配置,最终生成的SQL语句就是case when类似的。

最后:

  因为有这种场景的存在,所以,才会去写这个非常好用的组件的,写了一晚上,如果你用的很舒服。大家给个鼓励吧。