使用golang多线程批量更新数据数据。
共4个文件
main.go
package main
import (
"bufio"
"fmt"
_ "github.com/go-sql-driver/mysql"
"os"
. "pt"
)
func main() {
args := os.Args
if len(args) == 2 {
switch args[1] {
case "1":
Cu.Run()
case "2":
//平台负债表数据静态化
Fz.Run()
case "0":
os.Exit(0)
default:
}
}
if len(args) == 1 {
for {
fmt.Println("操作目录: ")
fmt.Println("1、平台有效客户更新(202:13306-platform)。 ")
fmt.Println("2、平台负债数据静态化(202:13306-platform)。")
fmt.Println("0、退出。 ")
inputReader := bufio.NewReader(os.Stdin)
command, _, _ := inputReader.ReadLine()
code := string(command)
switch code {
case "1":
Cu.Run()
case "2":
//平台负债表数据静态化
Fz.Run()
case "0":
os.Exit(0)
default:
fmt.Println("default")
}
fmt.Println("-------处理完成-------")
}
}
}
pt/lib.go
package pt
import (
"database/sql"
_ "github.com/go-sql-driver/mysql"
"log"
"os"
"time"
)
const (
DSN string = "root:mysqladmin56@tcp(192.168.0.202:13306)/platform?charset=utf8"
//DSN string = "root:@tcp(127.0.0.1:3306)/db_name?charset=utf8"
)
/**
* 数据库连接
*/
func Mydb() *sql.DB {
db, err := sql.Open("mysql", DSN)
if err != nil {
log.Fatalf("Open database error: %s\n", err)
}
//defer db.Close() //不关闭连接
err = db.Ping()
if err != nil {
log.Fatal(err)
}
return db
}
/**
* 升级日志写入 文件追加参数奇葩 多 要3个
* @param {[type]} log string [description]
* @return {[type]} [description]
*/
func writeResult(tag string, data string) {
str_time := time.Now().Format("2006_01_02")
filename := tag + "_" + str_time + ".log"
fl, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0644)
if err != nil {
log.Println(err)
}
defer fl.Close()
fl.WriteString(data)
fl.WriteString("\n")
}
pt/fz.go
// 负债数据静态化封装
package pt
import (
"database/sql"
"fmt"
"log"
"time"
)
//门店负债数据
type FzData struct {
mdid int
xjye float32
zsye float32
lcye float32
month string
}
type FzDataMap map[int]FzData
type FzClass struct {
list FzDataMap
}
//list 使用前需要被初始化 所以直接在new时初始化 以下2种初始化方式都可
func NewFz() *FzClass {
fz := &FzClass{list: make(FzDataMap)}
//fz := new(FzClass)
//fz.list = make(FzDataMap)
return fz
}
func (fz *FzClass) Show() *FzClass {
for i, value := range fz.list {
fmt.Println(i)
fmt.Println(value)
}
return fz
}
//暂无卵用 方便大数据扩展
func (fz *FzClass) Add(row FzData) *FzClass {
fz.list[row.mdid] = row
return fz
}
//门店数据静态化 入库
func (fz *FzClass) toDb(db *sql.DB, row FzData) {
var num int
one, err := db.Query("SELECT COUNT(*) AS num FROM `static_fz` WHERE `md_id` = ? AND `month` = ? ", row.mdid, row.month)
if err != nil {
log.Println(err)
}
defer one.Close()
for one.Next() {
err := one.Scan(&num)
if err != nil {
log.Fatal(err)
}
}
if num > 0 {
//存在更新
stmt, _ := db.Prepare("UPDATE `static_fz` SET `xjye`=?, `zsye`=?, `lcye`=? WHERE `md_id` = ? AND `month` = ? ")
defer stmt.Close()
stmt.Exec(row.xjye, row.zsye, row.lcye, row.mdid, row.month)
} else {
//不存在插入
stmt, _ := db.Prepare("INSERT INTO `static_fz` (`md_id`, `xjye`, `zsye`, `lcye`,`month`) VALUES (?,?,?,?,?)")
defer stmt.Close()
stmt.Exec(row.mdid, row.xjye, row.zsye, row.lcye, row.month)
}
}
/**
* 某一公司负债处理
* @param {[type]} c chan int [日志管道]
* @param {[type]} comp_id int [公司ID]
* @param {[type]} fz_month int [负债月份]
*/
func (fz *FzClass) oneComp(c chan string, comp_id int, fz_month string) {
db := Mydb()
defer db.Close()
sql := "SELECT ed.`id`, SUM(IF(cc.type = 1, t.balance, 0)) AS xjye, SUM(IF(cc.type = 2, t.balance, 0)) AS zsye, IFNULL(tt.`lcye`,0) AS lcye FROM `customer_capital` `t` LEFT JOIN company_capital cc ON cc.id = t.capital_id LEFT JOIN customer_info ci ON ci.id = t.cu_id LEFT JOIN employ_dept ed ON ed.id = ci.store_id LEFT JOIN (SELECT ed.`id`,SUM(TRUNCATE(osd.pay_price / num * t.re_num, 1)) AS lcye FROM `customer_re_project` `t` LEFT JOIN customer_info ci ON ci.id = t.cu_id LEFT JOIN employ_dept ed ON ed.id = ci.store_id LEFT JOIN order_sale_detail osd ON osd.id = t.detail_id WHERE (ed.comp_id = ?) GROUP BY ed.id) tt ON tt.id = ed.id WHERE (ed.comp_id = ?) GROUP BY ed.id"
rows, err := db.Query(sql, comp_id, comp_id)
if err != nil {
log.Println(err)
}
defer rows.Close()
var rowData FzData
rowData.month = fz_month
for rows.Next() {
err := rows.Scan(&rowData.mdid, &rowData.xjye, &rowData.zsye, &rowData.lcye)
if err != nil {
log.Fatal(err)
}
//fz.Add(rowData)
fz.toDb(db, rowData)
//返回管道信息写入
c <- "公司:" + fmt.Sprintf("%d", comp_id) + "->门店:" + fmt.Sprintf("%d", rowData.mdid) + "(处理完成)"
}
err = rows.Err()
if err != nil {
log.Fatal(err)
}
close(c)
}
func (fz *FzClass) Run() {
db := Mydb()
defer db.Close()
sql := "SELECT id FROM `company_info` WHERE `status` = 1"
rows, err := db.Query(sql)
if err != nil {
log.Println(err)
}
defer rows.Close()
chs := make([]chan string, 0) //开多个管道接受消息
fz_month := time.Now().Format("200601")
var i int = 0
var id int
for rows.Next() {
c := make(chan string)
chs = append(chs, c)
err := rows.Scan(&id)
if err != nil {
log.Fatal(err)
}
go fz.oneComp(c, id, fz_month)
i = i + 1
}
err = rows.Err()
if err != nil {
log.Fatal(err)
}
for _, ch := range chs { //多管道写法
for {
x, ok := <-ch
if ok == false {
break
}
writeResult("fz", x)
fmt.Println(x) //消息回收处理 可扩展写入文件日志
}
}
}
var Fz *FzClass
func init() {
Fz = NewFz()
}
pt/custatus.go
// 客户属性自动更新封装
// 需要公司开启自动更新并配置客户过期周期时间
package pt
import (
"fmt"
"log"
"strconv"
"strings"
"time"
)
type CuStatusClass struct {
}
func NewCu() *CuStatusClass {
obj := new(CuStatusClass)
return obj
}
/**
*判断客户状态
**/
func (obj *CuStatusClass) getState(orders int, practs int, state int) int {
if orders > 0 || practs > 0 {
return 1 //最近有订单OR有实操 为有效客户
}
if state == 3 {
return 3 //死档客户
}
if state == -1 {
return -1 //无效客户
}
//默认返回为久党客户
return 2
}
/*
* 获取公司关于有效客户的配置天数 默认30天
*/
func (obj *CuStatusClass) getConfig(str string) int {
var num int
n := strings.Index(str, "member_config")
if n == -1 {
num = 30
} else {
start := n + 20
end := n + 22
num2 := string([]byte(str)[start:end])
num, _ = strconv.Atoi(num2)
}
return num
}
/**
* 更新一个公司的客户状态 (PT) 考虑新建数据库连接 提高效率
* @param {[type]} db *sql.DB [description]
* @param {[type]} c chan int [description]
* @param {[type]} comp_id int [公司ID]
* @param {[type]} num int [有效期天数]
* @return {[type]} [description]
*/
func (obj *CuStatusClass) updateOneComp(c chan string, comp_id int, num int) {
db := Mydb()
defer db.Close()
end := time.Now().Unix()
start := end - 3600*24*int64(num) //前推num天
sql := "SELECT a.id,a.name,a.status,(SELECT COUNT(*) FROM `order_sale` WHERE `cu_id` = a.id AND `pay_time` > ? AND `pay_time` < ? AND `type` IN (1,2)) AS orders, (SELECT COUNT(*) FROM `practice_order` WHERE `cu_id` = a.id AND `pay_time` > ? AND `pay_time` < ?) AS practs FROM `customer_info` AS a LEFT JOIN `config_membership` AS m ON a.membership_id = m.id WHERE m.`is_member` = 1 AND a.`company_id` = ?"
rows, err := db.Query(sql, start, end, start, end, comp_id)
if err != nil {
log.Println(err)
}
defer rows.Close()
var id int
var orders int
var practs int
var name string
var status int
for rows.Next() {
err := rows.Scan(&id, &name, &status, &orders, &practs)
if err != nil {
log.Fatal(err)
}
new_status := obj.getState(orders, practs, status)
if status != new_status {
stmt, err := db.Prepare("UPDATE `customer_info` SET `status`=? WHERE `id`=?")
defer stmt.Close()
if err != nil {
log.Println(err)
return
}
stmt.Exec(new_status, id)
//返回管道信息写入
c <- fmt.Sprintf("%d", comp_id) + ":" + fmt.Sprintf("%d", id) + " " + name + " " + fmt.Sprintf("%d", status) + "->" + fmt.Sprintf("%d", new_status)
}
}
err = rows.Err()
if err != nil {
log.Fatal(err)
}
close(c)
}
/**
* 多公司并发处理 (PT)
* @param {[type]} db *sql.DB [description]
* @return {[type]} [description]
*/
func (obj *CuStatusClass) Run() {
db := Mydb()
defer db.Close()
sql := "SELECT id , auto_cu_status, config FROM `company_info` WHERE `status` = 1"
rows, err := db.Query(sql)
if err != nil {
log.Println(err)
}
defer rows.Close()
chs := make([]chan string, 0) //开多个管道接受消息
var id int
var auto int
var config string
for rows.Next() {
err := rows.Scan(&id, &auto, &config)
if err != nil {
log.Fatal(err)
}
if auto == 1 {
num := obj.getConfig(config) //客户有效期设置
c := make(chan string)
chs = append(chs, c)
go obj.updateOneComp(c, id, num)
}
}
err = rows.Err()
if err != nil {
log.Fatal(err)
}
for _, ch := range chs { //多管道写法
for {
x, ok := <-ch
if ok == false {
break
}
writeResult("cu_status", x)
fmt.Println(x) //消息回收处理 可扩展写入文件日志
}
}
}
var Cu *CuStatusClass
func init() {
Cu = NewCu()
}
有疑问加站长微信联系(非本文作者)