2019独角兽企业重金招聘Python工程师标准>>> hot3.png

golang+数据库定时任务
   项目背景大致如下,楼主在用nodejs写项目时遇到一些需要定时去处理的事情,例如僵尸用户定时清除,一些产品定时下架,邮件定时发送等等! 期初使用nodejs setTimeOut递归嵌套实现,后来发现内存不断飙升,故而放弃,最终改用了性能不错的golang实现

数据库设计


输入图片说明

字段名称含义
id编号
name任务名称
create_at创建时间
type1. 执行一次 2.循环执行
separate_time执行间隔
status执行状态 0.未开始 1. 执行中 -1.执行失败 -2.手动暂停
remark备注信息
fn要执行的数据库存储过程或函数
start_time开始执行时间
next_exec_time下次执行时间
last_exec_time上次执行时间
fn_typeemail, sql 等等

大致实现流程


  1. 需要有一个死循环,sleep 10s启动然后sleep 10 ...
    		for {time.Sleep(10 * time.Second)go execTask(*db) //使用子进程执行,防止卡死主进程}
  1. 开始执行,查找需要执行的任务
rows, err := db.Query("SELECT id,name,status,type,fn,fn_type, separate_time FROM public.tasks where (status = 0 and start_time < now()) or (status = 1 and next_exec_time < now());")
  1. 执行任务
res, err := db.Exec(fn)
  1. 执行任务成功后,更新下次执行时间
func setTaskNextExecTime(db sql.DB, taskId string, separateTime int64) error {next_exec_time := time.Now().Unix() + separateTimenextTime := time.Unix(next_exec_time, 999)res, err := db.Exec("UPDATE tasks set status = 1, last_exec_time=now(), next_exec_time=$2 WHERE id = $1::uuid", taskId, nextTime)res = nillog.Println(res)return err;
}

优缺点


    优点:1. 所有任务执行状态都可以查询到,例如任务异常或者上次执行时间,下次执行时间2. 增加一个定时任务,只需要在数据库插入一条记录就OK缺点:1. 如果要绑定非数据库可操作任务,需要自己扩展

项目源码


// MTask project main.go
package mainimport ("database/sql"_ "github.com/lib/pq""log""time""os""io/ioutil""encoding/json"
)//配置结构体
type Conf struct {Db map[string] string
}//读取配置文件
func readConf(path string) (Conf, error) {var c Confvar err errorfi, err := os.Open(path)if err != nil {return c, err } else {defer fi.Close()//读取配置文件fd, err := ioutil.ReadAll(fi)if err != nil {return c, err} else {var c Conferr = json.Unmarshal(fd, &c)if err != nil {return c, err} else {return c, err}}}return c, err
}func main() {c, err := readConf("./conf.json")if err != nil {log.Print(err)panic(err)}db, err := sql.Open("postgres", c.Db["postgres"])if err != nil {log.Print(err)} else {defer db.Close()for {time.Sleep(10 * time.Second)go execTask(*db)}}
}func execTask(db sql.DB) {defer func() {if err := recover(); err != nil {log.Print(err)log.Printf("执行任务时发生错误:%s", err)}}();log.Println("开始执行任务.......")rows, err := db.Query("SELECT id,name,status,type,fn,fn_type, separate_time FROM public.tasks where (status = 0 and start_time < now()) or (status = 1 and next_exec_time < now());")if err != nil {log.Print(err)} else {defer rows.Close()for rows.Next() {var id stringvar name stringvar status intvar taskType intvar separateTime int64var fn stringvar fnType stringerr = rows.Scan(&id, &name, &status, &taskType, &fn, &fnType, &separateTime)if err != nil {//记录错误,同时更新任务信息为异常log.Print(err)err = setTaskExecFail(db, id)if err != nil {log.Print(err)}} else {if (fnType == "sql") {res, err := db.Exec(fn)if err != nil {log.Print(err)err = setTaskExecFail(db, id)if err != nil {log.Print(err)}log.Printf("任务:%s执行时出错", name)} else {res = nillog.Println(res)if taskType == 1 {err = setTaskExecSuccess(db, id)if err != nil {log.Print(err)}log.Printf("任务:%s执行完成", name)} else {err = setTaskNextExecTime(db, id, separateTime)if err != nil {log.Print(err)}}log.Printf("任务:%s执行成功", name)}} else if (fnType == "bash") {log.Printf("这是一个bash任务")} else if (fnType == "python") {log.Printf("这是一个python任务")} else if (fnType == "email") {//发送email任务err = ExecEmailTask(db)if err != nil {handleFail(db, id)log.Println(err)} else {handleSuccess(db, id)}log.Printf("发送邮件任务")setTaskExecSuccess(db, id)setTaskNextExecTime(db, id, separateTime)} else if (fnType == "sms") {//发送短信任务log.Printf("发送短信任务")}}}err = rows.Err()if err != nil {log.Print(err)}}log.Println("结束执行任务....")
}func setTaskExecFail(db sql.DB, taskId string) error {res, err := db.Exec("UPDATE tasks set status = -2 WHERE id = $1::uuid", taskId)err = nillog.Println(res)return err
}func setTaskExecSuccess(db sql.DB, taskId string) error {res, err := db.Exec("UPDATE tasks set status = 2 WHERE id = $1::uuid", taskId)err = nillog.Println(res)return err
}func setTaskNextExecTime(db sql.DB, taskId string, separateTime int64) error {next_exec_time := time.Now().Unix() + separateTimenextTime := time.Unix(next_exec_time, 999)res, err := db.Exec("UPDATE tasks set status = 1, last_exec_time=now(), next_exec_time=$2 WHERE id = $1::uuid", taskId, nextTime)res = nillog.Println(res)return err;
}